From 51ed6e93089b622b6274c838602f0662bd67499f Mon Sep 17 00:00:00 2001 From: Alexander Harding Date: Sun, 25 Jan 2026 13:28:37 -0700 Subject: [PATCH 1/8] Added delay operator --- delay/README.md | 47 +++++++ delay/deno.json | 7 + delay/mod.test.ts | 345 ++++++++++++++++++++++++++++++++++++++++++++++ delay/mod.ts | 54 ++++++++ deno.json | 1 + 5 files changed, 454 insertions(+) create mode 100644 delay/README.md create mode 100644 delay/deno.json create mode 100644 delay/mod.test.ts create mode 100644 delay/mod.ts diff --git a/delay/README.md b/delay/README.md new file mode 100644 index 0000000..dde75f5 --- /dev/null +++ b/delay/README.md @@ -0,0 +1,47 @@ +# [@observable/delay](https://jsr.io/@observable/delay) + +Drops the first `count` values [`next`](https://jsr.io/@observable/core/doc/~/Observer.next)ed by +the [source](https://jsr.io/@observable/core#source). + +## Build + +Automated by [JSR](https://jsr.io/) + +## Publishing + +Automated by `.github\workflows\publish.yml`. + +## Running unit tests + +Run `deno task test` or `deno task test:ci` to execute the unit tests via +[Deno](https://deno.land/). + +## Example + +```ts +import { delay } from "@observable/delay"; +import { of } from "@observable/of"; +import { pipe } from "@observable/pipe"; + +const controller = new AbortController(); +pipe(of([1, 2, 3, 4, 5]), delay(1_000)).subscribe({ + signal: controller.signal, + next: (value) => console.log("next", value), + return: () => console.log("return"), + throw: (value) => console.log("throw", value), +}); + +// Console output (after 1 second): +// "next" 1 +// "next" 2 +// "next" 3 +// "next" 4 +// "next" 5 +// "return" +``` + +``` +# Glossary And Semantics + +[@observable/core](https://jsr.io/@observable/core#glossary-and-semantics) +``` diff --git a/delay/deno.json b/delay/deno.json new file mode 100644 index 0000000..28e1f01 --- /dev/null +++ b/delay/deno.json @@ -0,0 +1,7 @@ +{ + "name": "@observable/delay", + "version": "0.1.0", + "license": "MIT", + "exports": "./mod.ts", + "publish": { "exclude": ["*.test.ts"] } +} diff --git a/delay/mod.test.ts b/delay/mod.test.ts new file mode 100644 index 0000000..96f4858 --- /dev/null +++ b/delay/mod.test.ts @@ -0,0 +1,345 @@ +import { assertEquals, assertInstanceOf, assertStrictEquals, assertThrows } from "@std/assert"; +import { Observer } from "@observable/core"; +import { empty } from "@observable/empty"; +import { never } from "@observable/never"; +import { of } from "@observable/of"; +import { pipe } from "@observable/pipe"; +import { materialize, type ObserverNotification } from "@observable/materialize"; +import { delay } from "./mod.ts"; + +Deno.test( + "delay should return an empty observable if the milliseconds is less than 0", + () => { + // Arrange + const source = of([1, 2, 3]); + + // Act + const result = pipe(source, delay(-1)); + + // Assert + assertStrictEquals(result, empty); + }, +); + +Deno.test("delay should return the source observable if the milliseconds is 0", () => { + // Arrange + const source = of([1, 2, 3]); + + // Act + const result = pipe(source, delay(0)); + + // Assert + assertStrictEquals(result, source); +}); + +Deno.test("delay should return empty if the milliseconds is NaN", () => { + // Arrange + const source = of([1, 2, 3]); + + // Act + const result = pipe(source, delay(NaN)); + + // Assert + assertStrictEquals(result, empty); +}); + +Deno.test("delay should return never if the milliseconds is Infinity", () => { + // Arrange + const source = of([1, 2, 3]); + + // Act + const result = pipe(source, delay(Infinity)); + + // Assert + assertStrictEquals(result, never); +}); + +Deno.test( + "delay should delay the values if the milliseconds is a positive number", + () => { + // Arrange + let overrode = true; + const milliseconds = 1_000; + const setTimeoutId = Math.random(); + const notifications: Array> = []; + const setTimeoutCalls: Array> = []; + const originalSetTimeout = globalThis.setTimeout; + Object.defineProperty(globalThis, "setTimeout", { + value: (...args: Parameters) => { + setTimeoutCalls.push(args); + return overrode ? setTimeoutId : originalSetTimeout(...args); + }, + }); + + // Act + pipe(of([1, 2, 3]), delay(milliseconds), materialize()).subscribe( + new Observer((notification) => notifications.push(notification)), + ); + + // Assert + assertStrictEquals(setTimeoutCalls.length, 3); + assertEquals(notifications, []); + + // Trigger all timeouts + for (const [callback, delayMs] of setTimeoutCalls) { + assertStrictEquals(delayMs, milliseconds); + assertInstanceOf(callback, Function); + callback(); + } + + assertEquals(notifications, [ + ["next", 1], + ["next", 2], + ["next", 3], + ["return"], + ]); + overrode = false; + }, +); + +Deno.test( + "delay should clear timeouts on unsubscription after the subscription is created", + () => { + // Arrange + let overrideGlobals = true; + const milliseconds = 1_000; + const setTimeoutIds: number[] = []; + const notifications: Array> = []; + const setTimeoutCalls: Array> = []; + const clearTimeoutCalls: Array> = []; + const originalSetTimeout = globalThis.setTimeout; + const originalClearTimeout = globalThis.clearTimeout; + const controller = new AbortController(); + const materialized = pipe(of([1, 2, 3]), delay(milliseconds), materialize()); + let idCounter = 0; + Object.defineProperty(globalThis, "setTimeout", { + value: (...args: Parameters) => { + setTimeoutCalls.push(args); + if (overrideGlobals) { + const id = ++idCounter; + setTimeoutIds.push(id); + return id; + } + return originalSetTimeout(...args); + }, + }); + Object.defineProperty(globalThis, "clearTimeout", { + value: (...args: Parameters) => { + clearTimeoutCalls.push(args); + return overrideGlobals ? undefined : originalClearTimeout(...args); + }, + }); + + // Act + materialized.subscribe( + new Observer({ + next: (notification) => notifications.push(notification), + signal: controller.signal, + }), + ); + controller.abort(); + + // Assert + assertStrictEquals(setTimeoutCalls.length, 3); + assertEquals(notifications, []); + // Each timeout should be cleared + assertEquals(clearTimeoutCalls.length, 3); + for (const id of setTimeoutIds) { + assertEquals( + clearTimeoutCalls.some(([clearedId]) => clearedId === id), + true, + ); + } + + // Triggering callbacks after abort should not emit notifications + for (const [callback] of setTimeoutCalls) { + assertInstanceOf(callback, Function); + callback(); + } + assertEquals(notifications, []); + overrideGlobals = false; + }, +); + +Deno.test( + "delay should not setup timeouts on unsubscription before the subscription is created", + () => { + // Arrange + let overrideGlobals = true; + const milliseconds = 1_000; + const notifications: Array> = []; + const setTimeoutCalls: Array> = []; + const clearTimeoutCalls: Array> = []; + const originalSetTimeout = globalThis.setTimeout; + const originalClearTimeout = globalThis.clearTimeout; + const controller = new AbortController(); + const materialized = pipe(of([1, 2, 3]), delay(milliseconds), materialize()); + Object.defineProperty(globalThis, "setTimeout", { + value: (...args: Parameters) => { + setTimeoutCalls.push(args); + return overrideGlobals ? Math.random() : originalSetTimeout(...args); + }, + }); + Object.defineProperty(globalThis, "clearTimeout", { + value: (...args: Parameters) => { + clearTimeoutCalls.push(args); + return overrideGlobals ? undefined : originalClearTimeout(...args); + }, + }); + controller.abort(); + + // Act + materialized.subscribe( + new Observer({ + next: (notification) => notifications.push(notification), + signal: controller.signal, + }), + ); + + // Assert + assertEquals(notifications, []); + assertEquals(setTimeoutCalls, []); + assertEquals(clearTimeoutCalls, []); + overrideGlobals = false; + }, +); + +Deno.test( + "delay should throw an error if the milliseconds is not provided", + () => { + // Arrange / Act / Assert + assertThrows( + () => delay(...([] as unknown as Parameters)), + TypeError, + "1 argument required but 0 present", + ); + }, +); + +Deno.test( + "delay should throw an error if the milliseconds is not of type 'Number'", + () => { + // Arrange / Act / Assert + assertThrows( + () => delay("s" as unknown as number), + TypeError, + "Parameter 1 is not of type 'Number'", + ); + }, +); + +Deno.test( + "delay should throw an error if the source is not provided to the inner function", + () => { + // Arrange + const delayFn = delay(1_000); + + // Act / Assert + assertThrows( + () => delayFn(...([] as unknown as Parameters)), + TypeError, + "1 argument required but 0 present", + ); + }, +); + +Deno.test( + "delay should throw an error if the source is not of type 'Observable'", + () => { + // Arrange + const delayFn = delay(1_000); + + // Act / Assert + assertThrows( + () => delayFn("not-observable" as unknown as Parameters[0]), + TypeError, + "Parameter 1 is not of type 'Observable'", + ); + }, +); + +Deno.test( + "delay should emit values immediately when milliseconds is 0", + () => { + // Arrange + let overrideGlobals = true; + const notifications: Array> = []; + const setTimeoutCalls: Array> = []; + const clearTimeoutCalls: Array> = []; + const originalSetTimeout = globalThis.setTimeout; + const originalClearTimeout = globalThis.clearTimeout; + const materialized = pipe(of([1, 2, 3]), delay(0), materialize()); + Object.defineProperty(globalThis, "setTimeout", { + value: (...args: Parameters) => { + setTimeoutCalls.push(args); + return overrideGlobals ? Math.random() : originalSetTimeout(...args); + }, + }); + Object.defineProperty(globalThis, "clearTimeout", { + value: (...args: Parameters) => { + clearTimeoutCalls.push(args); + return overrideGlobals ? undefined : originalClearTimeout(...args); + }, + }); + + // Act + materialized.subscribe( + new Observer((notification) => notifications.push(notification)), + ); + + // Assert + assertEquals(notifications, [ + ["next", 1], + ["next", 2], + ["next", 3], + ["return"], + ]); + assertEquals(setTimeoutCalls, []); + assertEquals(clearTimeoutCalls, []); + overrideGlobals = false; + }, +); + +Deno.test( + "delay should preserve the order of values after delay", + () => { + // Arrange + let overrode = true; + const milliseconds = 500; + const notifications: Array> = []; + const setTimeoutCalls: Array> = []; + const originalSetTimeout = globalThis.setTimeout; + Object.defineProperty(globalThis, "setTimeout", { + value: (...args: Parameters) => { + setTimeoutCalls.push(args); + return overrode ? Math.random() : originalSetTimeout(...args); + }, + }); + + // Act + pipe(of([5, 4, 3, 2, 1]), delay(milliseconds), materialize()).subscribe( + new Observer((notification) => notifications.push(notification)), + ); + + // Assert + assertStrictEquals(setTimeoutCalls.length, 5); + assertEquals(notifications, []); + + // Trigger all timeouts in order + for (const [callback, delayMs] of setTimeoutCalls) { + assertStrictEquals(delayMs, milliseconds); + assertInstanceOf(callback, Function); + callback(); + } + + assertEquals(notifications, [ + ["next", 5], + ["next", 4], + ["next", 3], + ["next", 2], + ["next", 1], + ["return"], + ]); + overrode = false; + }, +); diff --git a/delay/mod.ts b/delay/mod.ts new file mode 100644 index 0000000..87ef8ee --- /dev/null +++ b/delay/mod.ts @@ -0,0 +1,54 @@ +import { isObservable, type Observable, toObservable } from "@observable/core"; +import { MinimumArgumentsRequiredError, ParameterTypeError } from "@observable/internal"; +import { empty } from "@observable/empty"; +import { pipe } from "@observable/pipe"; +import { mergeMap } from "@observable/merge-map"; +import { timeout } from "@observable/timeout"; +import { map } from "@observable/map"; +import { never } from "@observable/never"; + +/** + * Delays the [`next`](https://jsr.io/@observable/core/doc/~/Observer.next)ing of values from the + * [source](https://jsr.io/@observable/core#source) [`Observable`](https://jsr.io/@observable/core/doc/~/Observable) + * by a given number of {@linkcode milliseconds}. + * @example + * ```ts + * import { delay } from "@observable/delay"; + * import { of } from "@observable/of"; + * import { pipe } from "@observable/pipe"; + * + * const controller = new AbortController(); + * pipe(of([1, 2, 3, 4, 5]), delay(1_000)).subscribe({ + * signal: controller.signal, + * next: (value) => console.log("next", value), + * return: () => console.log("return"), + * throw: (value) => console.log("throw", value), + * }); + * + * // Console output (after 1 second): + * // "next" 1 + * // "next" 2 + * // "next" 3 + * // "next" 4 + * // "next" 5 + * // "return" + * ``` + */ +export function delay( + milliseconds: number, +): (source: Observable) => Observable { + if (arguments.length === 0) throw new MinimumArgumentsRequiredError(); + if (typeof milliseconds !== "number") throw new ParameterTypeError(0, "Number"); + return function delayFn(source) { + if (arguments.length === 0) throw new MinimumArgumentsRequiredError(); + if (!isObservable(source)) throw new ParameterTypeError(0, "Observable"); + if (milliseconds < 0 || Number.isNaN(milliseconds)) return empty; + if (milliseconds === Infinity) return never; + return pipe( + source, + milliseconds === 0 + ? toObservable + : mergeMap((value) => pipe(timeout(milliseconds), map(() => value))), + ); + }; +} diff --git a/deno.json b/deno.json index 3f6e1d1..34a692e 100644 --- a/deno.json +++ b/deno.json @@ -10,6 +10,7 @@ "core", "debounce", "defer", + "delay", "distinct", "distinct-until-changed", "drop", From 95da2cd328768ea121a649e80a58a76e1305a3df Mon Sep 17 00:00:00 2001 From: Alexander Harding Date: Sun, 25 Jan 2026 13:29:20 -0700 Subject: [PATCH 2/8] Applied formatting --- delay/mod.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/delay/mod.ts b/delay/mod.ts index 87ef8ee..08e9705 100644 --- a/delay/mod.ts +++ b/delay/mod.ts @@ -19,7 +19,7 @@ import { never } from "@observable/never"; * * const controller = new AbortController(); * pipe(of([1, 2, 3, 4, 5]), delay(1_000)).subscribe({ - * signal: controller.signal, + * signal: controller.signal, * next: (value) => console.log("next", value), * return: () => console.log("return"), * throw: (value) => console.log("throw", value), From 2491c52b864665d28f0d9fa8026ebccaa8dfcc2f Mon Sep 17 00:00:00 2001 From: Alexander Harding Date: Sun, 8 Feb 2026 10:54:10 -0700 Subject: [PATCH 3/8] Refactor delay operator so it's early return logic is in alignment with other operators in the repo --- delay/mod.test.ts | 32 +++++++++++++++++--------------- delay/mod.ts | 15 ++++++--------- 2 files changed, 23 insertions(+), 24 deletions(-) diff --git a/delay/mod.test.ts b/delay/mod.test.ts index 96f4858..071fce4 100644 --- a/delay/mod.test.ts +++ b/delay/mod.test.ts @@ -1,8 +1,7 @@ import { assertEquals, assertInstanceOf, assertStrictEquals, assertThrows } from "@std/assert"; import { Observer } from "@observable/core"; import { empty } from "@observable/empty"; -import { never } from "@observable/never"; -import { of } from "@observable/of"; +import { ofIterable } from "@observable/of-iterable"; import { pipe } from "@observable/pipe"; import { materialize, type ObserverNotification } from "@observable/materialize"; import { delay } from "./mod.ts"; @@ -11,7 +10,7 @@ Deno.test( "delay should return an empty observable if the milliseconds is less than 0", () => { // Arrange - const source = of([1, 2, 3]); + const source = pipe([1, 2, 3], ofIterable()); // Act const result = pipe(source, delay(-1)); @@ -21,9 +20,9 @@ Deno.test( }, ); -Deno.test("delay should return the source observable if the milliseconds is 0", () => { +Deno.test("delay should return the source observable if the milliseconds is 0 and source is an Observable instance", () => { // Arrange - const source = of([1, 2, 3]); + const source = pipe([1, 2, 3], ofIterable()); // Act const result = pipe(source, delay(0)); @@ -34,7 +33,7 @@ Deno.test("delay should return the source observable if the milliseconds is 0", Deno.test("delay should return empty if the milliseconds is NaN", () => { // Arrange - const source = of([1, 2, 3]); + const source = pipe([1, 2, 3], ofIterable()); // Act const result = pipe(source, delay(NaN)); @@ -43,15 +42,18 @@ Deno.test("delay should return empty if the milliseconds is NaN", () => { assertStrictEquals(result, empty); }); -Deno.test("delay should return never if the milliseconds is Infinity", () => { +Deno.test("delay should drop all values and return when source returns if the milliseconds is Infinity", () => { // Arrange - const source = of([1, 2, 3]); + const source = pipe([1, 2, 3], ofIterable()); + const notifications: Array> = []; // Act - const result = pipe(source, delay(Infinity)); + pipe(source, delay(Infinity), materialize()).subscribe( + new Observer((notification) => notifications.push(notification)), + ); // Assert - assertStrictEquals(result, never); + assertEquals(notifications, [["return"]]); }); Deno.test( @@ -72,7 +74,7 @@ Deno.test( }); // Act - pipe(of([1, 2, 3]), delay(milliseconds), materialize()).subscribe( + pipe(pipe([1, 2, 3], ofIterable()), delay(milliseconds), materialize()).subscribe( new Observer((notification) => notifications.push(notification)), ); @@ -110,7 +112,7 @@ Deno.test( const originalSetTimeout = globalThis.setTimeout; const originalClearTimeout = globalThis.clearTimeout; const controller = new AbortController(); - const materialized = pipe(of([1, 2, 3]), delay(milliseconds), materialize()); + const materialized = pipe(pipe([1, 2, 3], ofIterable()), delay(milliseconds), materialize()); let idCounter = 0; Object.defineProperty(globalThis, "setTimeout", { value: (...args: Parameters) => { @@ -173,7 +175,7 @@ Deno.test( const originalSetTimeout = globalThis.setTimeout; const originalClearTimeout = globalThis.clearTimeout; const controller = new AbortController(); - const materialized = pipe(of([1, 2, 3]), delay(milliseconds), materialize()); + const materialized = pipe(pipe([1, 2, 3], ofIterable()), delay(milliseconds), materialize()); Object.defineProperty(globalThis, "setTimeout", { value: (...args: Parameters) => { setTimeoutCalls.push(args); @@ -268,7 +270,7 @@ Deno.test( const clearTimeoutCalls: Array> = []; const originalSetTimeout = globalThis.setTimeout; const originalClearTimeout = globalThis.clearTimeout; - const materialized = pipe(of([1, 2, 3]), delay(0), materialize()); + const materialized = pipe(pipe([1, 2, 3], ofIterable()), delay(0), materialize()); Object.defineProperty(globalThis, "setTimeout", { value: (...args: Parameters) => { setTimeoutCalls.push(args); @@ -317,7 +319,7 @@ Deno.test( }); // Act - pipe(of([5, 4, 3, 2, 1]), delay(milliseconds), materialize()).subscribe( + pipe(pipe([5, 4, 3, 2, 1], ofIterable()), delay(milliseconds), materialize()).subscribe( new Observer((notification) => notifications.push(notification)), ); diff --git a/delay/mod.ts b/delay/mod.ts index 08e9705..90ff451 100644 --- a/delay/mod.ts +++ b/delay/mod.ts @@ -1,11 +1,12 @@ -import { isObservable, type Observable, toObservable } from "@observable/core"; +import { isObservable, type Observable } from "@observable/core"; import { MinimumArgumentsRequiredError, ParameterTypeError } from "@observable/internal"; import { empty } from "@observable/empty"; import { pipe } from "@observable/pipe"; import { mergeMap } from "@observable/merge-map"; import { timeout } from "@observable/timeout"; import { map } from "@observable/map"; -import { never } from "@observable/never"; +import { asObservable } from "@observable/as-observable"; +import { drop } from "@observable/drop"; /** * Delays the [`next`](https://jsr.io/@observable/core/doc/~/Observer.next)ing of values from the @@ -43,12 +44,8 @@ export function delay( if (arguments.length === 0) throw new MinimumArgumentsRequiredError(); if (!isObservable(source)) throw new ParameterTypeError(0, "Observable"); if (milliseconds < 0 || Number.isNaN(milliseconds)) return empty; - if (milliseconds === Infinity) return never; - return pipe( - source, - milliseconds === 0 - ? toObservable - : mergeMap((value) => pipe(timeout(milliseconds), map(() => value))), - ); + if (milliseconds === Infinity) return pipe(source, drop(Infinity)); + if (milliseconds === 0) return pipe(source, asObservable()); + return pipe(source, mergeMap((value) => pipe(timeout(milliseconds), map(() => value)))); }; } From b677c6be556a4f5a8726b7cfd94699a13c14b9e0 Mon Sep 17 00:00:00 2001 From: Alexander Harding Date: Sun, 8 Feb 2026 10:56:06 -0700 Subject: [PATCH 4/8] Corrected delay readme --- delay/README.md | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/delay/README.md b/delay/README.md index dde75f5..8883a23 100644 --- a/delay/README.md +++ b/delay/README.md @@ -1,7 +1,8 @@ # [@observable/delay](https://jsr.io/@observable/delay) -Drops the first `count` values [`next`](https://jsr.io/@observable/core/doc/~/Observer.next)ed by -the [source](https://jsr.io/@observable/core#source). +Delays the [`next`](https://jsr.io/@observable/core/doc/~/Observer.next)ing of values from the +[source](https://jsr.io/@observable/core#source) +[`Observable`](https://jsr.io/@observable/core/doc/~/Observable) by a given number of milliseconds. ## Build @@ -20,11 +21,11 @@ Run `deno task test` or `deno task test:ci` to execute the unit tests via ```ts import { delay } from "@observable/delay"; -import { of } from "@observable/of"; +import { ofIterable } from "@observable/of-iterable"; import { pipe } from "@observable/pipe"; const controller = new AbortController(); -pipe(of([1, 2, 3, 4, 5]), delay(1_000)).subscribe({ +pipe([1, 2, 3, 4, 5], ofIterable(), delay(1_000)).subscribe({ signal: controller.signal, next: (value) => console.log("next", value), return: () => console.log("return"), @@ -40,8 +41,6 @@ pipe(of([1, 2, 3, 4, 5]), delay(1_000)).subscribe({ // "return" ``` -``` -# Glossary And Semantics +## Glossary And Semantics [@observable/core](https://jsr.io/@observable/core#glossary-and-semantics) -``` From 7cc08d00802a9325ea263410ba860f4f1438623a Mon Sep 17 00:00:00 2001 From: Alexander Harding Date: Sun, 12 Apr 2026 11:57:40 -0600 Subject: [PATCH 5/8] Added modern logic to the delay operator --- delay/README.md | 93 +++++++++++++++++++++++++++++++++++++++++++--- delay/mod.test.ts | 20 +++++----- delay/mod.ts | 95 ++++++++++++++++++++++++++++++++++++++++++----- 3 files changed, 182 insertions(+), 26 deletions(-) diff --git a/delay/README.md b/delay/README.md index 8883a23..01ae47d 100644 --- a/delay/README.md +++ b/delay/README.md @@ -1,8 +1,6 @@ # [@observable/delay](https://jsr.io/@observable/delay) -Delays the [`next`](https://jsr.io/@observable/core/doc/~/Observer.next)ing of values from the -[source](https://jsr.io/@observable/core#source) -[`Observable`](https://jsr.io/@observable/core/doc/~/Observable) by a given number of milliseconds. +Delays values by the given number of `milliseconds`. ## Build @@ -17,15 +15,17 @@ Automated by `.github\workflows\publish.yml`. Run `deno task test` or `deno task test:ci` to execute the unit tests via [Deno](https://deno.land/). -## Example +## Examples + +1000 milliseconds ```ts import { delay } from "@observable/delay"; -import { ofIterable } from "@observable/of-iterable"; +import { of } from "@observable/of"; import { pipe } from "@observable/pipe"; const controller = new AbortController(); -pipe([1, 2, 3, 4, 5], ofIterable(), delay(1_000)).subscribe({ +pipe(of([1, 2, 3, 4, 5]), delay(1_000)).subscribe({ signal: controller.signal, next: (value) => console.log("next", value), return: () => console.log("return"), @@ -41,6 +41,87 @@ pipe([1, 2, 3, 4, 5], ofIterable(), delay(1_000)).subscribe({ // "return" ``` +0 milliseconds + +```ts +import { delay } from "@observable/delay"; +import { of } from "@observable/of"; +import { pipe } from "@observable/pipe"; + +const controller = new AbortController(); +pipe(of([1, 2, 3, 4, 5]), delay(0)).subscribe({ + signal: controller.signal, + next: (value) => console.log("next", value), + return: () => console.log("return"), + throw: (value) => console.log("throw", value), +}); + +// Console output (synchronously): +// "next" 1 +// "next" 2 +// "next" 3 +// "next" 4 +// "next" 5 +// "return" +``` + +Infinite milliseconds + +```ts +import { delay } from "@observable/delay"; +import { of } from "@observable/of"; +import { pipe } from "@observable/pipe"; + +const controller = new AbortController(); +pipe(of([1, 2, 3, 4, 5]), delay(Infinity)).subscribe({ + signal: controller.signal, + next: (value) => console.log("next", value), + return: () => console.log("return"), + throw: (value) => console.log("throw", value), +}); + +// Console output (synchronously): +// "return" +``` + +Negative milliseconds + +```ts +import { delay } from "@observable/delay"; +import { of } from "@observable/of"; +import { pipe } from "@observable/pipe"; + +const controller = new AbortController(); +pipe(of([1, 2, 3, 4, 5]), delay(-1)).subscribe({ + signal: controller.signal, + next: (value) => console.log("next", value), + return: () => console.log("return"), + throw: (value) => console.log("throw", value), +}); + +// Console output (synchronously): +// "return" +``` + +NaN milliseconds + +```ts +import { delay } from "@observable/delay"; +import { of } from "@observable/of"; +import { pipe } from "@observable/pipe"; + +const controller = new AbortController(); +pipe(of([1, 2, 3, 4, 5]), delay(NaN))).subscribe({ + signal: controller.signal, + next: (value) => console.log("next", value), + return: () => console.log("return"), + throw: (value) => console.log("throw", value), +}); + +// Console output (synchronously): +// "return" +``` + ## Glossary And Semantics [@observable/core](https://jsr.io/@observable/core#glossary-and-semantics) diff --git a/delay/mod.test.ts b/delay/mod.test.ts index 071fce4..36e40c1 100644 --- a/delay/mod.test.ts +++ b/delay/mod.test.ts @@ -1,16 +1,16 @@ import { assertEquals, assertInstanceOf, assertStrictEquals, assertThrows } from "@std/assert"; import { Observer } from "@observable/core"; import { empty } from "@observable/empty"; -import { ofIterable } from "@observable/of-iterable"; import { pipe } from "@observable/pipe"; import { materialize, type ObserverNotification } from "@observable/materialize"; import { delay } from "./mod.ts"; +import { forOf } from "@observable/for-of"; Deno.test( "delay should return an empty observable if the milliseconds is less than 0", () => { // Arrange - const source = pipe([1, 2, 3], ofIterable()); + const source = forOf([1, 2, 3]); // Act const result = pipe(source, delay(-1)); @@ -22,7 +22,7 @@ Deno.test( Deno.test("delay should return the source observable if the milliseconds is 0 and source is an Observable instance", () => { // Arrange - const source = pipe([1, 2, 3], ofIterable()); + const source = forOf([1, 2, 3]); // Act const result = pipe(source, delay(0)); @@ -33,7 +33,7 @@ Deno.test("delay should return the source observable if the milliseconds is 0 an Deno.test("delay should return empty if the milliseconds is NaN", () => { // Arrange - const source = pipe([1, 2, 3], ofIterable()); + const source = forOf([1, 2, 3]); // Act const result = pipe(source, delay(NaN)); @@ -44,7 +44,7 @@ Deno.test("delay should return empty if the milliseconds is NaN", () => { Deno.test("delay should drop all values and return when source returns if the milliseconds is Infinity", () => { // Arrange - const source = pipe([1, 2, 3], ofIterable()); + const source = forOf([1, 2, 3]); const notifications: Array> = []; // Act @@ -74,7 +74,7 @@ Deno.test( }); // Act - pipe(pipe([1, 2, 3], ofIterable()), delay(milliseconds), materialize()).subscribe( + pipe(forOf([1, 2, 3]), delay(milliseconds), materialize()).subscribe( new Observer((notification) => notifications.push(notification)), ); @@ -112,7 +112,7 @@ Deno.test( const originalSetTimeout = globalThis.setTimeout; const originalClearTimeout = globalThis.clearTimeout; const controller = new AbortController(); - const materialized = pipe(pipe([1, 2, 3], ofIterable()), delay(milliseconds), materialize()); + const materialized = pipe(forOf([1, 2, 3]), delay(milliseconds), materialize()); let idCounter = 0; Object.defineProperty(globalThis, "setTimeout", { value: (...args: Parameters) => { @@ -175,7 +175,7 @@ Deno.test( const originalSetTimeout = globalThis.setTimeout; const originalClearTimeout = globalThis.clearTimeout; const controller = new AbortController(); - const materialized = pipe(pipe([1, 2, 3], ofIterable()), delay(milliseconds), materialize()); + const materialized = pipe(forOf([1, 2, 3]), delay(milliseconds), materialize()); Object.defineProperty(globalThis, "setTimeout", { value: (...args: Parameters) => { setTimeoutCalls.push(args); @@ -270,7 +270,7 @@ Deno.test( const clearTimeoutCalls: Array> = []; const originalSetTimeout = globalThis.setTimeout; const originalClearTimeout = globalThis.clearTimeout; - const materialized = pipe(pipe([1, 2, 3], ofIterable()), delay(0), materialize()); + const materialized = pipe(forOf([1, 2, 3]), delay(0), materialize()); Object.defineProperty(globalThis, "setTimeout", { value: (...args: Parameters) => { setTimeoutCalls.push(args); @@ -319,7 +319,7 @@ Deno.test( }); // Act - pipe(pipe([5, 4, 3, 2, 1], ofIterable()), delay(milliseconds), materialize()).subscribe( + pipe(forOf([5, 4, 3, 2, 1]), delay(milliseconds), materialize()).subscribe( new Observer((notification) => notifications.push(notification)), ); diff --git a/delay/mod.ts b/delay/mod.ts index 90ff451..798b43a 100644 --- a/delay/mod.ts +++ b/delay/mod.ts @@ -1,18 +1,16 @@ import { isObservable, type Observable } from "@observable/core"; -import { MinimumArgumentsRequiredError, ParameterTypeError } from "@observable/internal"; import { empty } from "@observable/empty"; import { pipe } from "@observable/pipe"; import { mergeMap } from "@observable/merge-map"; import { timeout } from "@observable/timeout"; import { map } from "@observable/map"; -import { asObservable } from "@observable/as-observable"; import { drop } from "@observable/drop"; +import { from } from "@observable/from"; /** - * Delays the [`next`](https://jsr.io/@observable/core/doc/~/Observer.next)ing of values from the - * [source](https://jsr.io/@observable/core#source) [`Observable`](https://jsr.io/@observable/core/doc/~/Observable) - * by a given number of {@linkcode milliseconds}. + * Delays {@linkcode Value|values} by the given number of {@linkcode milliseconds}. * @example + * 1000 milliseconds * ```ts * import { delay } from "@observable/delay"; * import { of } from "@observable/of"; @@ -34,18 +32,95 @@ import { drop } from "@observable/drop"; * // "next" 5 * // "return" * ``` + * @example + * 0 milliseconds + * ```ts + * import { delay } from "@observable/delay"; + * import { of } from "@observable/of"; + * import { pipe } from "@observable/pipe"; + * + * const controller = new AbortController(); + * pipe(of([1, 2, 3, 4, 5]), delay(0)).subscribe({ + * signal: controller.signal, + * next: (value) => console.log("next", value), + * return: () => console.log("return"), + * throw: (value) => console.log("throw", value), + * }); + * + * // Console output (synchronously): + * // "next" 1 + * // "next" 2 + * // "next" 3 + * // "next" 4 + * // "next" 5 + * // "return" + * ``` + * @example + * Infinite milliseconds + * ```ts + * import { delay } from "@observable/delay"; + * import { of } from "@observable/of"; + * import { pipe } from "@observable/pipe"; + * + * const controller = new AbortController(); + * pipe(of([1, 2, 3, 4, 5]), delay(Infinity)).subscribe({ + * signal: controller.signal, + * next: (value) => console.log("next", value), + * return: () => console.log("return"), + * throw: (value) => console.log("throw", value), + * }); + * + * // Console output (synchronously): + * // "return" + * ``` + * @example + * Negative milliseconds + * ```ts + * import { delay } from "@observable/delay"; + * import { of } from "@observable/of"; + * import { pipe } from "@observable/pipe"; + * + * const controller = new AbortController(); + * pipe(of([1, 2, 3, 4, 5]), delay(-1)).subscribe({ + * signal: controller.signal, + * next: (value) => console.log("next", value), + * return: () => console.log("return"), + * throw: (value) => console.log("throw", value), + * }); + * + * // Console output (synchronously): + * // "return" + * ``` + * @example + * NaN milliseconds + * ```ts + * import { delay } from "@observable/delay"; + * import { of } from "@observable/of"; + * import { pipe } from "@observable/pipe"; + * + * const controller = new AbortController(); + * pipe(of([1, 2, 3, 4, 5]), delay(NaN))).subscribe({ + * signal: controller.signal, + * next: (value) => console.log("next", value), + * return: () => console.log("return"), + * throw: (value) => console.log("throw", value), + * }); + * + * // Console output (synchronously): + * // "return" + * ``` */ export function delay( milliseconds: number, ): (source: Observable) => Observable { - if (arguments.length === 0) throw new MinimumArgumentsRequiredError(); - if (typeof milliseconds !== "number") throw new ParameterTypeError(0, "Number"); + if (!arguments.length) throw new TypeError("1 argument required but 0 present"); + if (typeof milliseconds !== "number") throw new TypeError("Parameter 1 is not of type 'Number'"); return function delayFn(source) { - if (arguments.length === 0) throw new MinimumArgumentsRequiredError(); - if (!isObservable(source)) throw new ParameterTypeError(0, "Observable"); + if (!arguments.length) throw new TypeError("1 argument required but 0 present"); + if (!isObservable(source)) throw new TypeError("Parameter 1 is not of type 'Observable'"); if (milliseconds < 0 || Number.isNaN(milliseconds)) return empty; if (milliseconds === Infinity) return pipe(source, drop(Infinity)); - if (milliseconds === 0) return pipe(source, asObservable()); + if (milliseconds === 0) return from(source); return pipe(source, mergeMap((value) => pipe(timeout(milliseconds), map(() => value)))); }; } From a0992690eb305ce834773e13e58db637eb07992b Mon Sep 17 00:00:00 2001 From: Alexander Harding Date: Mon, 13 Apr 2026 22:54:18 -0600 Subject: [PATCH 6/8] Add AI prompt and usage guidelines to delay README --- delay/README.md | 50 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 50 insertions(+) diff --git a/delay/README.md b/delay/README.md index 01ae47d..6594cbb 100644 --- a/delay/README.md +++ b/delay/README.md @@ -122,6 +122,56 @@ pipe(of([1, 2, 3, 4, 5]), delay(NaN))).subscribe({ // "return" ``` +## AI Prompt + +Use the following prompt with AI assistants to help them understand this library: + +```` +You are helping me with code that uses @observable/delay from the @observable library ecosystem. + +WHAT IT DOES: +`delay(milliseconds)` shifts each emitted value forward in time by scheduling it after `milliseconds` have elapsed. Values stay in the same order as the source. + +CRITICAL: This library is NOT RxJS. Key differences: +- `delay` is a standalone function used with `pipe()` — NOT a method on Observable +- Observer uses `return`/`throw` — NOT `complete`/`error` +- Unsubscription via `AbortController.abort()` — NOT `subscription.unsubscribe()` + +USAGE PATTERN: +```ts +import { delay } from "@observable/delay"; +import { of } from "@observable/of"; +import { pipe } from "@observable/pipe"; + +const controller = new AbortController(); + +pipe(of([1, 2, 3]), delay(1_000)).subscribe({ + signal: controller.signal, + next: (value) => console.log(value), + return: () => console.log("done"), + throw: (error) => console.error(error), +}); +``` + +WRONG USAGE: +```ts +// ✗ WRONG: delay is NOT a method on Observable +of([1, 2, 3]).delay(1000) // This does NOT work! +``` + +MILLISECONDS SEMANTICS: +- **Positive number:** each `next` is delayed by that many milliseconds; source order is preserved. +- **0:** values pass through without using timers (effectively immediate). +- **Negative or `NaN`:** the result is an empty Observable (no `next`; behaves like `@observable/empty`). +- **`Infinity`:** all values are dropped; when the source returns, the output returns (no delayed values). + +ABORT / UNSUBSCRIBE: +Aborting the observer clears pending timers. After abort, fired timer callbacks must not deliver values to the consumer. + +ARGUMENT VALIDATION: +`delay` requires a `number` for milliseconds. The returned operator function requires an Observable source; both throw `TypeError` if missing or wrong type. +```` + ## Glossary And Semantics [@observable/core](https://jsr.io/@observable/core#glossary-and-semantics) From 3e954a7af6823da95749bb73d3748312e2cf337b Mon Sep 17 00:00:00 2001 From: Alexander Harding Date: Mon, 13 Apr 2026 23:42:32 -0600 Subject: [PATCH 7/8] Added better test coverage for the Infinity argument --- delay/mod.test.ts | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/delay/mod.test.ts b/delay/mod.test.ts index 36e40c1..fb6ceb3 100644 --- a/delay/mod.test.ts +++ b/delay/mod.test.ts @@ -5,6 +5,7 @@ import { pipe } from "@observable/pipe"; import { materialize, type ObserverNotification } from "@observable/materialize"; import { delay } from "./mod.ts"; import { forOf } from "@observable/for-of"; +import { tap } from "@observable/tap"; Deno.test( "delay should return an empty observable if the milliseconds is less than 0", @@ -44,16 +45,25 @@ Deno.test("delay should return empty if the milliseconds is NaN", () => { Deno.test("delay should drop all values and return when source returns if the milliseconds is Infinity", () => { // Arrange - const source = forOf([1, 2, 3]); - const notifications: Array> = []; + const notifications: Array<["tap", value: number] | ObserverNotification> = []; // Act - pipe(source, delay(Infinity), materialize()).subscribe( + pipe( + forOf([1, 2, 3]), + tap((value) => notifications.push(["tap", value])), + delay(Infinity), + materialize(), + ).subscribe( new Observer((notification) => notifications.push(notification)), ); // Assert - assertEquals(notifications, [["return"]]); + assertEquals(notifications, [ + ["tap", 1], + ["tap", 2], + ["tap", 3], + ["return"], + ]); }); Deno.test( From 2d9bf052bfbc03d76bcf02fd38634b0e450223b8 Mon Sep 17 00:00:00 2001 From: Alexander Harding Date: Mon, 13 Apr 2026 23:43:36 -0600 Subject: [PATCH 8/8] Added better test coverage for various logic that has a condition where all values can be filtered --- at/mod.test.ts | 63 +++++++++++++++++++++++++++++++++++--------- debounce/mod.test.ts | 36 ++++++++++++++++++------- drop/mod.test.ts | 19 +++++++++---- filter/mod.test.ts | 25 ++++++++++++++++++ pairwise/mod.test.ts | 31 +++++++++++++--------- 5 files changed, 135 insertions(+), 39 deletions(-) diff --git a/at/mod.test.ts b/at/mod.test.ts index 5e6e669..2f94bd4 100644 --- a/at/mod.test.ts +++ b/at/mod.test.ts @@ -7,6 +7,7 @@ import { materialize } from "@observable/materialize"; import type { ObserverNotification } from "@observable/materialize"; import { at } from "./mod.ts"; import { throwError } from "@observable/throw-error"; +import { tap } from "@observable/tap"; Deno.test("at should throw if no arguments are provided", () => { assertThrows( @@ -170,14 +171,23 @@ Deno.test("at(negative fractional) should truncate index -2.8 to -2", () => { }); Deno.test("at(positive) when source has fewer items should emit nothing then return", () => { - const notifications: Array> = []; - const observable = pipe(forOf([1, 2]), at(5), materialize()); + const notifications: Array<["tap", value: number] | ObserverNotification> = []; + const observable = pipe( + forOf([1, 2]), + tap((value) => notifications.push(["tap", value])), + at(5), + materialize(), + ); observable.subscribe( new Observer((notification) => notifications.push(notification)), ); - assertEquals(notifications, [["return"]]); + assertEquals(notifications, [ + ["tap", 1], + ["tap", 2], + ["return"], + ]); }); Deno.test("at(-1) should emit only the last value", () => { @@ -203,36 +213,65 @@ Deno.test("at(-2) should emit only the second-to-last value", () => { }); Deno.test("at(negative) when source has fewer than |index| items should emit nothing then return", () => { - const notifications: Array> = []; - const observable = pipe(forOf([1, 2]), at(-5), materialize()); + const notifications: Array<["tap", value: number] | ObserverNotification> = []; + const observable = pipe( + forOf([1, 2]), + tap((value) => notifications.push(["tap", value])), + at(-5), + materialize(), + ); observable.subscribe( new Observer((notification) => notifications.push(notification)), ); - assertEquals(notifications, [["return"]]); + assertEquals(notifications, [ + ["tap", 1], + ["tap", 2], + ["return"], + ]); }); Deno.test("at(Infinity) should never emit", () => { - const notifications: Array> = []; - const observable = pipe(forOf([1, 2, 3]), at(Infinity), materialize()); + const notifications: Array<["tap", value: number] | ObserverNotification> = []; + const observable = pipe( + forOf([1, 2, 3]), + tap((value) => notifications.push(["tap", value])), + at(Infinity), + materialize(), + ); observable.subscribe( new Observer((notification) => notifications.push(notification)), ); - assertEquals(notifications, [["return"]]); + assertEquals(notifications, [ + ["tap", 1], + ["tap", 2], + ["tap", 3], + ["return"], + ]); }); Deno.test("at(-Infinity) should never emit", () => { - const notifications: Array> = []; - const observable = pipe(forOf([1, 2, 3]), at(-Infinity), materialize()); + const notifications: Array<["tap", value: number] | ObserverNotification> = []; + const observable = pipe( + forOf([1, 2, 3]), + tap((value) => notifications.push(["tap", value])), + at(-Infinity), + materialize(), + ); observable.subscribe( new Observer((notification) => notifications.push(notification)), ); - assertEquals(notifications, [["return"]]); + assertEquals(notifications, [ + ["tap", 1], + ["tap", 2], + ["tap", 3], + ["return"], + ]); }); Deno.test("at should pump throws through itself", () => { diff --git a/debounce/mod.test.ts b/debounce/mod.test.ts index cb7b708..f575302 100644 --- a/debounce/mod.test.ts +++ b/debounce/mod.test.ts @@ -8,6 +8,7 @@ import { debounce } from "./mod.ts"; import { flat } from "@observable/flat"; import { throwError } from "@observable/throw-error"; import { of } from "@observable/of"; +import { tap } from "@observable/tap"; Deno.test("debounce should return empty if milliseconds is negative", () => { // Arrange @@ -33,33 +34,48 @@ Deno.test("debounce should return empty if milliseconds is NaN", () => { Deno.test("debounce should ignore values but propagate return when milliseconds is Infinity", () => { // Arrange - const notifications: Array> = []; - const source = forOf([1, 2, 3]); - const materialized = pipe(source, debounce(Infinity), materialize()); + const notifications: Array<["tap", value: number] | ObserverNotification> = []; // Act - materialized.subscribe( + pipe( + forOf([1, 2, 3]), + tap((value) => notifications.push(["tap", value])), + debounce(Infinity), + materialize(), + ).subscribe( new Observer((notification) => notifications.push(notification)), ); // Assert - assertEquals(notifications, [["return"]]); + assertEquals(notifications, [ + ["tap", 1], + ["tap", 2], + ["tap", 3], + ["return"], + ]); }); Deno.test("debounce should ignore values but propagate throw when milliseconds is Infinity", () => { // Arrange const error = new Error("test error"); - const notifications: Array> = []; - const source = flat([forOf([1, 2]), throwError(error)]); - const materialized = pipe(source, debounce(Infinity), materialize()); + const notifications: Array<["tap", value: number] | ObserverNotification> = []; // Act - materialized.subscribe( + pipe( + flat([forOf([1, 2]), throwError(error)]), + tap((value) => notifications.push(["tap", value])), + debounce(Infinity), + materialize(), + ).subscribe( new Observer((notification) => notifications.push(notification)), ); // Assert - assertEquals(notifications, [["throw", error]]); + assertEquals(notifications, [ + ["tap", 1], + ["tap", 2], + ["throw", error], + ]); }); Deno.test("debounce should emit value after timeout expires", () => { diff --git a/drop/mod.test.ts b/drop/mod.test.ts index c74313c..d45e386 100644 --- a/drop/mod.test.ts +++ b/drop/mod.test.ts @@ -4,6 +4,7 @@ import { empty } from "@observable/empty"; import { forOf } from "@observable/for-of"; import { pipe } from "@observable/pipe"; import { materialize, type ObserverNotification } from "@observable/materialize"; +import { tap } from "@observable/tap"; import { drop } from "./mod.ts"; Deno.test( @@ -72,17 +73,25 @@ Deno.test("drop should return empty if the count is NaN", () => { Deno.test("drop should ignore all elements if the count is Infinity", () => { // Arrange - const source = forOf([1, 2, 3]); - const notifications: Array> = []; - const materialized = pipe(source, drop(Infinity), materialize()); + const notifications: Array<["tap", value: number] | ObserverNotification> = []; // Act - materialized.subscribe( + pipe( + forOf([1, 2, 3]), + tap((value) => notifications.push(["tap", value])), + drop(Infinity), + materialize(), + ).subscribe( new Observer((notification) => notifications.push(notification)), ); // Assert - assertEquals(notifications, [["return"]]); + assertEquals(notifications, [ + ["tap", 1], + ["tap", 2], + ["tap", 3], + ["return"], + ]); }); Deno.test( diff --git a/filter/mod.test.ts b/filter/mod.test.ts index 13f909f..df4e454 100644 --- a/filter/mod.test.ts +++ b/filter/mod.test.ts @@ -6,6 +6,7 @@ import { filter } from "./mod.ts"; import { materialize, type ObserverNotification } from "@observable/materialize"; import { flat } from "@observable/flat"; import { throwError } from "@observable/throw-error"; +import { tap } from "@observable/tap"; Deno.test( "filter should filter the items emitted by the source observable", @@ -27,6 +28,30 @@ Deno.test( }, ); +Deno.test( + "filter should drop all values and return when the predicate never matches", + () => { + // Arrange + const notifications: Array<["tap", value: number] | ObserverNotification> = []; + + // Act + pipe( + forOf([1, 2, 3]), + tap((value) => notifications.push(["tap", value])), + filter(() => false), + materialize(), + ).subscribe(new Observer((notification) => notifications.push(notification))); + + // Assert + assertEquals(notifications, [ + ["tap", 1], + ["tap", 2], + ["tap", 3], + ["return"], + ]); + }, +); + Deno.test("filter should pump throws right through itself", () => { // Arrange const notifications: Array> = []; diff --git a/pairwise/mod.test.ts b/pairwise/mod.test.ts index 50b9352..7c32e95 100644 --- a/pairwise/mod.test.ts +++ b/pairwise/mod.test.ts @@ -6,8 +6,9 @@ import { pipe } from "@observable/pipe"; import { forOf } from "@observable/for-of"; import { of } from "@observable/of"; import { materialize, type ObserverNotification } from "@observable/materialize"; -import { pairwise } from "./mod.ts"; +import { type Pair, pairwise } from "./mod.ts"; import { empty } from "@observable/empty"; +import { tap } from "@observable/tap"; Deno.test("pairwise should emit pairs of consecutive values", () => { // Arrange @@ -32,22 +33,28 @@ Deno.test("pairwise should emit pairs of consecutive values", () => { Deno.test("pairwise should not emit if source emits only one value", () => { // Arrange - const notifications: Array> = []; - const source = of(1); - const materialized = pipe(source, pairwise(), materialize()); + const notifications: Array<["tap", value: number] | ObserverNotification>> = []; // Act - materialized.subscribe( + pipe( + of(1), + tap((value) => notifications.push(["tap", value])), + pairwise(), + materialize(), + ).subscribe( new Observer((notification) => notifications.push(notification)), ); // Assert - assertEquals(notifications, [["return"]]); + assertEquals(notifications, [ + ["tap", 1], + ["return"], + ]); }); Deno.test("pairwise should not emit if source is empty", () => { // Arrange - const notifications: Array> = []; + const notifications: Array>> = []; const materialized = pipe(empty, pairwise(), materialize()); // Act @@ -77,7 +84,7 @@ Deno.test("pairwise should emit exactly one pair when source emits two values", Deno.test("pairwise should pump throws right through itself", () => { // Arrange const error = new Error("test error"); - const notifications: Array> = []; + const notifications: Array>> = []; const source = flat([forOf([1, 2, 3]), throwError(error)]); const materialized = pipe(source, pairwise(), materialize()); @@ -97,7 +104,7 @@ Deno.test("pairwise should pump throws right through itself", () => { Deno.test("pairwise should honor unsubscribe", () => { // Arrange const controller = new AbortController(); - const notifications: Array> = []; + const notifications: Array>> = []; const source = flat([ forOf([1, 2, 3, 4, 5]), throwError(new Error("Should not make it here")), @@ -149,7 +156,7 @@ Deno.test("pairwise should throw when source is not an Observable", () => { Deno.test("pairwise should work with Subject", () => { // Arrange - const notifications: Array> = []; + const notifications: Array>> = []; const source = new Subject(); const materialized = pipe(source, pairwise(), materialize()); @@ -172,8 +179,8 @@ Deno.test("pairwise should work with Subject", () => { Deno.test("pairwise should reset state for each subscription", () => { // Arrange - const notifications1: Array> = []; - const notifications2: Array> = []; + const notifications1: Array>> = []; + const notifications2: Array>> = []; const source = forOf([1, 2, 3]); const pairwiseSource = pipe(source, pairwise());