diff --git a/at/mod.test.ts b/at/mod.test.ts index 5e6e669..2f94bd4 100644 --- a/at/mod.test.ts +++ b/at/mod.test.ts @@ -7,6 +7,7 @@ import { materialize } from "@observable/materialize"; import type { ObserverNotification } from "@observable/materialize"; import { at } from "./mod.ts"; import { throwError } from "@observable/throw-error"; +import { tap } from "@observable/tap"; Deno.test("at should throw if no arguments are provided", () => { assertThrows( @@ -170,14 +171,23 @@ Deno.test("at(negative fractional) should truncate index -2.8 to -2", () => { }); Deno.test("at(positive) when source has fewer items should emit nothing then return", () => { - const notifications: Array> = []; - const observable = pipe(forOf([1, 2]), at(5), materialize()); + const notifications: Array<["tap", value: number] | ObserverNotification> = []; + const observable = pipe( + forOf([1, 2]), + tap((value) => notifications.push(["tap", value])), + at(5), + materialize(), + ); observable.subscribe( new Observer((notification) => notifications.push(notification)), ); - assertEquals(notifications, [["return"]]); + assertEquals(notifications, [ + ["tap", 1], + ["tap", 2], + ["return"], + ]); }); Deno.test("at(-1) should emit only the last value", () => { @@ -203,36 +213,65 @@ Deno.test("at(-2) should emit only the second-to-last value", () => { }); Deno.test("at(negative) when source has fewer than |index| items should emit nothing then return", () => { - const notifications: Array> = []; - const observable = pipe(forOf([1, 2]), at(-5), materialize()); + const notifications: Array<["tap", value: number] | ObserverNotification> = []; + const observable = pipe( + forOf([1, 2]), + tap((value) => notifications.push(["tap", value])), + at(-5), + materialize(), + ); observable.subscribe( new Observer((notification) => notifications.push(notification)), ); - assertEquals(notifications, [["return"]]); + assertEquals(notifications, [ + ["tap", 1], + ["tap", 2], + ["return"], + ]); }); Deno.test("at(Infinity) should never emit", () => { - const notifications: Array> = []; - const observable = pipe(forOf([1, 2, 3]), at(Infinity), materialize()); + const notifications: Array<["tap", value: number] | ObserverNotification> = []; + const observable = pipe( + forOf([1, 2, 3]), + tap((value) => notifications.push(["tap", value])), + at(Infinity), + materialize(), + ); observable.subscribe( new Observer((notification) => notifications.push(notification)), ); - assertEquals(notifications, [["return"]]); + assertEquals(notifications, [ + ["tap", 1], + ["tap", 2], + ["tap", 3], + ["return"], + ]); }); Deno.test("at(-Infinity) should never emit", () => { - const notifications: Array> = []; - const observable = pipe(forOf([1, 2, 3]), at(-Infinity), materialize()); + const notifications: Array<["tap", value: number] | ObserverNotification> = []; + const observable = pipe( + forOf([1, 2, 3]), + tap((value) => notifications.push(["tap", value])), + at(-Infinity), + materialize(), + ); observable.subscribe( new Observer((notification) => notifications.push(notification)), ); - assertEquals(notifications, [["return"]]); + assertEquals(notifications, [ + ["tap", 1], + ["tap", 2], + ["tap", 3], + ["return"], + ]); }); Deno.test("at should pump throws through itself", () => { diff --git a/debounce/mod.test.ts b/debounce/mod.test.ts index cb7b708..f575302 100644 --- a/debounce/mod.test.ts +++ b/debounce/mod.test.ts @@ -8,6 +8,7 @@ import { debounce } from "./mod.ts"; import { flat } from "@observable/flat"; import { throwError } from "@observable/throw-error"; import { of } from "@observable/of"; +import { tap } from "@observable/tap"; Deno.test("debounce should return empty if milliseconds is negative", () => { // Arrange @@ -33,33 +34,48 @@ Deno.test("debounce should return empty if milliseconds is NaN", () => { Deno.test("debounce should ignore values but propagate return when milliseconds is Infinity", () => { // Arrange - const notifications: Array> = []; - const source = forOf([1, 2, 3]); - const materialized = pipe(source, debounce(Infinity), materialize()); + const notifications: Array<["tap", value: number] | ObserverNotification> = []; // Act - materialized.subscribe( + pipe( + forOf([1, 2, 3]), + tap((value) => notifications.push(["tap", value])), + debounce(Infinity), + materialize(), + ).subscribe( new Observer((notification) => notifications.push(notification)), ); // Assert - assertEquals(notifications, [["return"]]); + assertEquals(notifications, [ + ["tap", 1], + ["tap", 2], + ["tap", 3], + ["return"], + ]); }); Deno.test("debounce should ignore values but propagate throw when milliseconds is Infinity", () => { // Arrange const error = new Error("test error"); - const notifications: Array> = []; - const source = flat([forOf([1, 2]), throwError(error)]); - const materialized = pipe(source, debounce(Infinity), materialize()); + const notifications: Array<["tap", value: number] | ObserverNotification> = []; // Act - materialized.subscribe( + pipe( + flat([forOf([1, 2]), throwError(error)]), + tap((value) => notifications.push(["tap", value])), + debounce(Infinity), + materialize(), + ).subscribe( new Observer((notification) => notifications.push(notification)), ); // Assert - assertEquals(notifications, [["throw", error]]); + assertEquals(notifications, [ + ["tap", 1], + ["tap", 2], + ["throw", error], + ]); }); Deno.test("debounce should emit value after timeout expires", () => { diff --git a/delay/README.md b/delay/README.md new file mode 100644 index 0000000..6594cbb --- /dev/null +++ b/delay/README.md @@ -0,0 +1,177 @@ +# [@observable/delay](https://jsr.io/@observable/delay) + +Delays values by the given number of `milliseconds`. + +## Build + +Automated by [JSR](https://jsr.io/) + +## Publishing + +Automated by `.github\workflows\publish.yml`. + +## Running unit tests + +Run `deno task test` or `deno task test:ci` to execute the unit tests via +[Deno](https://deno.land/). + +## Examples + +1000 milliseconds + +```ts +import { delay } from "@observable/delay"; +import { of } from "@observable/of"; +import { pipe } from "@observable/pipe"; + +const controller = new AbortController(); +pipe(of([1, 2, 3, 4, 5]), delay(1_000)).subscribe({ + signal: controller.signal, + next: (value) => console.log("next", value), + return: () => console.log("return"), + throw: (value) => console.log("throw", value), +}); + +// Console output (after 1 second): +// "next" 1 +// "next" 2 +// "next" 3 +// "next" 4 +// "next" 5 +// "return" +``` + +0 milliseconds + +```ts +import { delay } from "@observable/delay"; +import { of } from "@observable/of"; +import { pipe } from "@observable/pipe"; + +const controller = new AbortController(); +pipe(of([1, 2, 3, 4, 5]), delay(0)).subscribe({ + signal: controller.signal, + next: (value) => console.log("next", value), + return: () => console.log("return"), + throw: (value) => console.log("throw", value), +}); + +// Console output (synchronously): +// "next" 1 +// "next" 2 +// "next" 3 +// "next" 4 +// "next" 5 +// "return" +``` + +Infinite milliseconds + +```ts +import { delay } from "@observable/delay"; +import { of } from "@observable/of"; +import { pipe } from "@observable/pipe"; + +const controller = new AbortController(); +pipe(of([1, 2, 3, 4, 5]), delay(Infinity)).subscribe({ + signal: controller.signal, + next: (value) => console.log("next", value), + return: () => console.log("return"), + throw: (value) => console.log("throw", value), +}); + +// Console output (synchronously): +// "return" +``` + +Negative milliseconds + +```ts +import { delay } from "@observable/delay"; +import { of } from "@observable/of"; +import { pipe } from "@observable/pipe"; + +const controller = new AbortController(); +pipe(of([1, 2, 3, 4, 5]), delay(-1)).subscribe({ + signal: controller.signal, + next: (value) => console.log("next", value), + return: () => console.log("return"), + throw: (value) => console.log("throw", value), +}); + +// Console output (synchronously): +// "return" +``` + +NaN milliseconds + +```ts +import { delay } from "@observable/delay"; +import { of } from "@observable/of"; +import { pipe } from "@observable/pipe"; + +const controller = new AbortController(); +pipe(of([1, 2, 3, 4, 5]), delay(NaN))).subscribe({ + signal: controller.signal, + next: (value) => console.log("next", value), + return: () => console.log("return"), + throw: (value) => console.log("throw", value), +}); + +// Console output (synchronously): +// "return" +``` + +## AI Prompt + +Use the following prompt with AI assistants to help them understand this library: + +```` +You are helping me with code that uses @observable/delay from the @observable library ecosystem. + +WHAT IT DOES: +`delay(milliseconds)` shifts each emitted value forward in time by scheduling it after `milliseconds` have elapsed. Values stay in the same order as the source. + +CRITICAL: This library is NOT RxJS. Key differences: +- `delay` is a standalone function used with `pipe()` — NOT a method on Observable +- Observer uses `return`/`throw` — NOT `complete`/`error` +- Unsubscription via `AbortController.abort()` — NOT `subscription.unsubscribe()` + +USAGE PATTERN: +```ts +import { delay } from "@observable/delay"; +import { of } from "@observable/of"; +import { pipe } from "@observable/pipe"; + +const controller = new AbortController(); + +pipe(of([1, 2, 3]), delay(1_000)).subscribe({ + signal: controller.signal, + next: (value) => console.log(value), + return: () => console.log("done"), + throw: (error) => console.error(error), +}); +``` + +WRONG USAGE: +```ts +// ✗ WRONG: delay is NOT a method on Observable +of([1, 2, 3]).delay(1000) // This does NOT work! +``` + +MILLISECONDS SEMANTICS: +- **Positive number:** each `next` is delayed by that many milliseconds; source order is preserved. +- **0:** values pass through without using timers (effectively immediate). +- **Negative or `NaN`:** the result is an empty Observable (no `next`; behaves like `@observable/empty`). +- **`Infinity`:** all values are dropped; when the source returns, the output returns (no delayed values). + +ABORT / UNSUBSCRIBE: +Aborting the observer clears pending timers. After abort, fired timer callbacks must not deliver values to the consumer. + +ARGUMENT VALIDATION: +`delay` requires a `number` for milliseconds. The returned operator function requires an Observable source; both throw `TypeError` if missing or wrong type. +```` + +## Glossary And Semantics + +[@observable/core](https://jsr.io/@observable/core#glossary-and-semantics) diff --git a/delay/deno.json b/delay/deno.json new file mode 100644 index 0000000..28e1f01 --- /dev/null +++ b/delay/deno.json @@ -0,0 +1,7 @@ +{ + "name": "@observable/delay", + "version": "0.1.0", + "license": "MIT", + "exports": "./mod.ts", + "publish": { "exclude": ["*.test.ts"] } +} diff --git a/delay/mod.test.ts b/delay/mod.test.ts new file mode 100644 index 0000000..fb6ceb3 --- /dev/null +++ b/delay/mod.test.ts @@ -0,0 +1,357 @@ +import { assertEquals, assertInstanceOf, assertStrictEquals, assertThrows } from "@std/assert"; +import { Observer } from "@observable/core"; +import { empty } from "@observable/empty"; +import { pipe } from "@observable/pipe"; +import { materialize, type ObserverNotification } from "@observable/materialize"; +import { delay } from "./mod.ts"; +import { forOf } from "@observable/for-of"; +import { tap } from "@observable/tap"; + +Deno.test( + "delay should return an empty observable if the milliseconds is less than 0", + () => { + // Arrange + const source = forOf([1, 2, 3]); + + // Act + const result = pipe(source, delay(-1)); + + // Assert + assertStrictEquals(result, empty); + }, +); + +Deno.test("delay should return the source observable if the milliseconds is 0 and source is an Observable instance", () => { + // Arrange + const source = forOf([1, 2, 3]); + + // Act + const result = pipe(source, delay(0)); + + // Assert + assertStrictEquals(result, source); +}); + +Deno.test("delay should return empty if the milliseconds is NaN", () => { + // Arrange + const source = forOf([1, 2, 3]); + + // Act + const result = pipe(source, delay(NaN)); + + // Assert + assertStrictEquals(result, empty); +}); + +Deno.test("delay should drop all values and return when source returns if the milliseconds is Infinity", () => { + // Arrange + const notifications: Array<["tap", value: number] | ObserverNotification> = []; + + // Act + pipe( + forOf([1, 2, 3]), + tap((value) => notifications.push(["tap", value])), + delay(Infinity), + materialize(), + ).subscribe( + new Observer((notification) => notifications.push(notification)), + ); + + // Assert + assertEquals(notifications, [ + ["tap", 1], + ["tap", 2], + ["tap", 3], + ["return"], + ]); +}); + +Deno.test( + "delay should delay the values if the milliseconds is a positive number", + () => { + // Arrange + let overrode = true; + const milliseconds = 1_000; + const setTimeoutId = Math.random(); + const notifications: Array> = []; + const setTimeoutCalls: Array> = []; + const originalSetTimeout = globalThis.setTimeout; + Object.defineProperty(globalThis, "setTimeout", { + value: (...args: Parameters) => { + setTimeoutCalls.push(args); + return overrode ? setTimeoutId : originalSetTimeout(...args); + }, + }); + + // Act + pipe(forOf([1, 2, 3]), delay(milliseconds), materialize()).subscribe( + new Observer((notification) => notifications.push(notification)), + ); + + // Assert + assertStrictEquals(setTimeoutCalls.length, 3); + assertEquals(notifications, []); + + // Trigger all timeouts + for (const [callback, delayMs] of setTimeoutCalls) { + assertStrictEquals(delayMs, milliseconds); + assertInstanceOf(callback, Function); + callback(); + } + + assertEquals(notifications, [ + ["next", 1], + ["next", 2], + ["next", 3], + ["return"], + ]); + overrode = false; + }, +); + +Deno.test( + "delay should clear timeouts on unsubscription after the subscription is created", + () => { + // Arrange + let overrideGlobals = true; + const milliseconds = 1_000; + const setTimeoutIds: number[] = []; + const notifications: Array> = []; + const setTimeoutCalls: Array> = []; + const clearTimeoutCalls: Array> = []; + const originalSetTimeout = globalThis.setTimeout; + const originalClearTimeout = globalThis.clearTimeout; + const controller = new AbortController(); + const materialized = pipe(forOf([1, 2, 3]), delay(milliseconds), materialize()); + let idCounter = 0; + Object.defineProperty(globalThis, "setTimeout", { + value: (...args: Parameters) => { + setTimeoutCalls.push(args); + if (overrideGlobals) { + const id = ++idCounter; + setTimeoutIds.push(id); + return id; + } + return originalSetTimeout(...args); + }, + }); + Object.defineProperty(globalThis, "clearTimeout", { + value: (...args: Parameters) => { + clearTimeoutCalls.push(args); + return overrideGlobals ? undefined : originalClearTimeout(...args); + }, + }); + + // Act + materialized.subscribe( + new Observer({ + next: (notification) => notifications.push(notification), + signal: controller.signal, + }), + ); + controller.abort(); + + // Assert + assertStrictEquals(setTimeoutCalls.length, 3); + assertEquals(notifications, []); + // Each timeout should be cleared + assertEquals(clearTimeoutCalls.length, 3); + for (const id of setTimeoutIds) { + assertEquals( + clearTimeoutCalls.some(([clearedId]) => clearedId === id), + true, + ); + } + + // Triggering callbacks after abort should not emit notifications + for (const [callback] of setTimeoutCalls) { + assertInstanceOf(callback, Function); + callback(); + } + assertEquals(notifications, []); + overrideGlobals = false; + }, +); + +Deno.test( + "delay should not setup timeouts on unsubscription before the subscription is created", + () => { + // Arrange + let overrideGlobals = true; + const milliseconds = 1_000; + const notifications: Array> = []; + const setTimeoutCalls: Array> = []; + const clearTimeoutCalls: Array> = []; + const originalSetTimeout = globalThis.setTimeout; + const originalClearTimeout = globalThis.clearTimeout; + const controller = new AbortController(); + const materialized = pipe(forOf([1, 2, 3]), delay(milliseconds), materialize()); + Object.defineProperty(globalThis, "setTimeout", { + value: (...args: Parameters) => { + setTimeoutCalls.push(args); + return overrideGlobals ? Math.random() : originalSetTimeout(...args); + }, + }); + Object.defineProperty(globalThis, "clearTimeout", { + value: (...args: Parameters) => { + clearTimeoutCalls.push(args); + return overrideGlobals ? undefined : originalClearTimeout(...args); + }, + }); + controller.abort(); + + // Act + materialized.subscribe( + new Observer({ + next: (notification) => notifications.push(notification), + signal: controller.signal, + }), + ); + + // Assert + assertEquals(notifications, []); + assertEquals(setTimeoutCalls, []); + assertEquals(clearTimeoutCalls, []); + overrideGlobals = false; + }, +); + +Deno.test( + "delay should throw an error if the milliseconds is not provided", + () => { + // Arrange / Act / Assert + assertThrows( + () => delay(...([] as unknown as Parameters)), + TypeError, + "1 argument required but 0 present", + ); + }, +); + +Deno.test( + "delay should throw an error if the milliseconds is not of type 'Number'", + () => { + // Arrange / Act / Assert + assertThrows( + () => delay("s" as unknown as number), + TypeError, + "Parameter 1 is not of type 'Number'", + ); + }, +); + +Deno.test( + "delay should throw an error if the source is not provided to the inner function", + () => { + // Arrange + const delayFn = delay(1_000); + + // Act / Assert + assertThrows( + () => delayFn(...([] as unknown as Parameters)), + TypeError, + "1 argument required but 0 present", + ); + }, +); + +Deno.test( + "delay should throw an error if the source is not of type 'Observable'", + () => { + // Arrange + const delayFn = delay(1_000); + + // Act / Assert + assertThrows( + () => delayFn("not-observable" as unknown as Parameters[0]), + TypeError, + "Parameter 1 is not of type 'Observable'", + ); + }, +); + +Deno.test( + "delay should emit values immediately when milliseconds is 0", + () => { + // Arrange + let overrideGlobals = true; + const notifications: Array> = []; + const setTimeoutCalls: Array> = []; + const clearTimeoutCalls: Array> = []; + const originalSetTimeout = globalThis.setTimeout; + const originalClearTimeout = globalThis.clearTimeout; + const materialized = pipe(forOf([1, 2, 3]), delay(0), materialize()); + Object.defineProperty(globalThis, "setTimeout", { + value: (...args: Parameters) => { + setTimeoutCalls.push(args); + return overrideGlobals ? Math.random() : originalSetTimeout(...args); + }, + }); + Object.defineProperty(globalThis, "clearTimeout", { + value: (...args: Parameters) => { + clearTimeoutCalls.push(args); + return overrideGlobals ? undefined : originalClearTimeout(...args); + }, + }); + + // Act + materialized.subscribe( + new Observer((notification) => notifications.push(notification)), + ); + + // Assert + assertEquals(notifications, [ + ["next", 1], + ["next", 2], + ["next", 3], + ["return"], + ]); + assertEquals(setTimeoutCalls, []); + assertEquals(clearTimeoutCalls, []); + overrideGlobals = false; + }, +); + +Deno.test( + "delay should preserve the order of values after delay", + () => { + // Arrange + let overrode = true; + const milliseconds = 500; + const notifications: Array> = []; + const setTimeoutCalls: Array> = []; + const originalSetTimeout = globalThis.setTimeout; + Object.defineProperty(globalThis, "setTimeout", { + value: (...args: Parameters) => { + setTimeoutCalls.push(args); + return overrode ? Math.random() : originalSetTimeout(...args); + }, + }); + + // Act + pipe(forOf([5, 4, 3, 2, 1]), delay(milliseconds), materialize()).subscribe( + new Observer((notification) => notifications.push(notification)), + ); + + // Assert + assertStrictEquals(setTimeoutCalls.length, 5); + assertEquals(notifications, []); + + // Trigger all timeouts in order + for (const [callback, delayMs] of setTimeoutCalls) { + assertStrictEquals(delayMs, milliseconds); + assertInstanceOf(callback, Function); + callback(); + } + + assertEquals(notifications, [ + ["next", 5], + ["next", 4], + ["next", 3], + ["next", 2], + ["next", 1], + ["return"], + ]); + overrode = false; + }, +); diff --git a/delay/mod.ts b/delay/mod.ts new file mode 100644 index 0000000..798b43a --- /dev/null +++ b/delay/mod.ts @@ -0,0 +1,126 @@ +import { isObservable, type Observable } from "@observable/core"; +import { empty } from "@observable/empty"; +import { pipe } from "@observable/pipe"; +import { mergeMap } from "@observable/merge-map"; +import { timeout } from "@observable/timeout"; +import { map } from "@observable/map"; +import { drop } from "@observable/drop"; +import { from } from "@observable/from"; + +/** + * Delays {@linkcode Value|values} by the given number of {@linkcode milliseconds}. + * @example + * 1000 milliseconds + * ```ts + * import { delay } from "@observable/delay"; + * import { of } from "@observable/of"; + * import { pipe } from "@observable/pipe"; + * + * const controller = new AbortController(); + * pipe(of([1, 2, 3, 4, 5]), delay(1_000)).subscribe({ + * signal: controller.signal, + * next: (value) => console.log("next", value), + * return: () => console.log("return"), + * throw: (value) => console.log("throw", value), + * }); + * + * // Console output (after 1 second): + * // "next" 1 + * // "next" 2 + * // "next" 3 + * // "next" 4 + * // "next" 5 + * // "return" + * ``` + * @example + * 0 milliseconds + * ```ts + * import { delay } from "@observable/delay"; + * import { of } from "@observable/of"; + * import { pipe } from "@observable/pipe"; + * + * const controller = new AbortController(); + * pipe(of([1, 2, 3, 4, 5]), delay(0)).subscribe({ + * signal: controller.signal, + * next: (value) => console.log("next", value), + * return: () => console.log("return"), + * throw: (value) => console.log("throw", value), + * }); + * + * // Console output (synchronously): + * // "next" 1 + * // "next" 2 + * // "next" 3 + * // "next" 4 + * // "next" 5 + * // "return" + * ``` + * @example + * Infinite milliseconds + * ```ts + * import { delay } from "@observable/delay"; + * import { of } from "@observable/of"; + * import { pipe } from "@observable/pipe"; + * + * const controller = new AbortController(); + * pipe(of([1, 2, 3, 4, 5]), delay(Infinity)).subscribe({ + * signal: controller.signal, + * next: (value) => console.log("next", value), + * return: () => console.log("return"), + * throw: (value) => console.log("throw", value), + * }); + * + * // Console output (synchronously): + * // "return" + * ``` + * @example + * Negative milliseconds + * ```ts + * import { delay } from "@observable/delay"; + * import { of } from "@observable/of"; + * import { pipe } from "@observable/pipe"; + * + * const controller = new AbortController(); + * pipe(of([1, 2, 3, 4, 5]), delay(-1)).subscribe({ + * signal: controller.signal, + * next: (value) => console.log("next", value), + * return: () => console.log("return"), + * throw: (value) => console.log("throw", value), + * }); + * + * // Console output (synchronously): + * // "return" + * ``` + * @example + * NaN milliseconds + * ```ts + * import { delay } from "@observable/delay"; + * import { of } from "@observable/of"; + * import { pipe } from "@observable/pipe"; + * + * const controller = new AbortController(); + * pipe(of([1, 2, 3, 4, 5]), delay(NaN))).subscribe({ + * signal: controller.signal, + * next: (value) => console.log("next", value), + * return: () => console.log("return"), + * throw: (value) => console.log("throw", value), + * }); + * + * // Console output (synchronously): + * // "return" + * ``` + */ +export function delay( + milliseconds: number, +): (source: Observable) => Observable { + if (!arguments.length) throw new TypeError("1 argument required but 0 present"); + if (typeof milliseconds !== "number") throw new TypeError("Parameter 1 is not of type 'Number'"); + return function delayFn(source) { + if (!arguments.length) throw new TypeError("1 argument required but 0 present"); + if (!isObservable(source)) throw new TypeError("Parameter 1 is not of type 'Observable'"); + if (milliseconds < 0 || Number.isNaN(milliseconds)) return empty; + if (milliseconds === Infinity) return pipe(source, drop(Infinity)); + if (milliseconds === 0) return from(source); + return pipe(source, mergeMap((value) => pipe(timeout(milliseconds), map(() => value)))); + }; +} diff --git a/deno.json b/deno.json index 43d2e23..a507991 100644 --- a/deno.json +++ b/deno.json @@ -9,6 +9,7 @@ "core", "debounce", "defer", + "delay", "distinct", "distinct-until-changed", "drop", diff --git a/drop/mod.test.ts b/drop/mod.test.ts index c74313c..d45e386 100644 --- a/drop/mod.test.ts +++ b/drop/mod.test.ts @@ -4,6 +4,7 @@ import { empty } from "@observable/empty"; import { forOf } from "@observable/for-of"; import { pipe } from "@observable/pipe"; import { materialize, type ObserverNotification } from "@observable/materialize"; +import { tap } from "@observable/tap"; import { drop } from "./mod.ts"; Deno.test( @@ -72,17 +73,25 @@ Deno.test("drop should return empty if the count is NaN", () => { Deno.test("drop should ignore all elements if the count is Infinity", () => { // Arrange - const source = forOf([1, 2, 3]); - const notifications: Array> = []; - const materialized = pipe(source, drop(Infinity), materialize()); + const notifications: Array<["tap", value: number] | ObserverNotification> = []; // Act - materialized.subscribe( + pipe( + forOf([1, 2, 3]), + tap((value) => notifications.push(["tap", value])), + drop(Infinity), + materialize(), + ).subscribe( new Observer((notification) => notifications.push(notification)), ); // Assert - assertEquals(notifications, [["return"]]); + assertEquals(notifications, [ + ["tap", 1], + ["tap", 2], + ["tap", 3], + ["return"], + ]); }); Deno.test( diff --git a/filter/mod.test.ts b/filter/mod.test.ts index 13f909f..df4e454 100644 --- a/filter/mod.test.ts +++ b/filter/mod.test.ts @@ -6,6 +6,7 @@ import { filter } from "./mod.ts"; import { materialize, type ObserverNotification } from "@observable/materialize"; import { flat } from "@observable/flat"; import { throwError } from "@observable/throw-error"; +import { tap } from "@observable/tap"; Deno.test( "filter should filter the items emitted by the source observable", @@ -27,6 +28,30 @@ Deno.test( }, ); +Deno.test( + "filter should drop all values and return when the predicate never matches", + () => { + // Arrange + const notifications: Array<["tap", value: number] | ObserverNotification> = []; + + // Act + pipe( + forOf([1, 2, 3]), + tap((value) => notifications.push(["tap", value])), + filter(() => false), + materialize(), + ).subscribe(new Observer((notification) => notifications.push(notification))); + + // Assert + assertEquals(notifications, [ + ["tap", 1], + ["tap", 2], + ["tap", 3], + ["return"], + ]); + }, +); + Deno.test("filter should pump throws right through itself", () => { // Arrange const notifications: Array> = []; diff --git a/pairwise/mod.test.ts b/pairwise/mod.test.ts index 50b9352..7c32e95 100644 --- a/pairwise/mod.test.ts +++ b/pairwise/mod.test.ts @@ -6,8 +6,9 @@ import { pipe } from "@observable/pipe"; import { forOf } from "@observable/for-of"; import { of } from "@observable/of"; import { materialize, type ObserverNotification } from "@observable/materialize"; -import { pairwise } from "./mod.ts"; +import { type Pair, pairwise } from "./mod.ts"; import { empty } from "@observable/empty"; +import { tap } from "@observable/tap"; Deno.test("pairwise should emit pairs of consecutive values", () => { // Arrange @@ -32,22 +33,28 @@ Deno.test("pairwise should emit pairs of consecutive values", () => { Deno.test("pairwise should not emit if source emits only one value", () => { // Arrange - const notifications: Array> = []; - const source = of(1); - const materialized = pipe(source, pairwise(), materialize()); + const notifications: Array<["tap", value: number] | ObserverNotification>> = []; // Act - materialized.subscribe( + pipe( + of(1), + tap((value) => notifications.push(["tap", value])), + pairwise(), + materialize(), + ).subscribe( new Observer((notification) => notifications.push(notification)), ); // Assert - assertEquals(notifications, [["return"]]); + assertEquals(notifications, [ + ["tap", 1], + ["return"], + ]); }); Deno.test("pairwise should not emit if source is empty", () => { // Arrange - const notifications: Array> = []; + const notifications: Array>> = []; const materialized = pipe(empty, pairwise(), materialize()); // Act @@ -77,7 +84,7 @@ Deno.test("pairwise should emit exactly one pair when source emits two values", Deno.test("pairwise should pump throws right through itself", () => { // Arrange const error = new Error("test error"); - const notifications: Array> = []; + const notifications: Array>> = []; const source = flat([forOf([1, 2, 3]), throwError(error)]); const materialized = pipe(source, pairwise(), materialize()); @@ -97,7 +104,7 @@ Deno.test("pairwise should pump throws right through itself", () => { Deno.test("pairwise should honor unsubscribe", () => { // Arrange const controller = new AbortController(); - const notifications: Array> = []; + const notifications: Array>> = []; const source = flat([ forOf([1, 2, 3, 4, 5]), throwError(new Error("Should not make it here")), @@ -149,7 +156,7 @@ Deno.test("pairwise should throw when source is not an Observable", () => { Deno.test("pairwise should work with Subject", () => { // Arrange - const notifications: Array> = []; + const notifications: Array>> = []; const source = new Subject(); const materialized = pipe(source, pairwise(), materialize()); @@ -172,8 +179,8 @@ Deno.test("pairwise should work with Subject", () => { Deno.test("pairwise should reset state for each subscription", () => { // Arrange - const notifications1: Array> = []; - const notifications2: Array> = []; + const notifications1: Array>> = []; + const notifications2: Array>> = []; const source = forOf([1, 2, 3]); const pairwiseSource = pipe(source, pairwise());