Skip to content
63 changes: 51 additions & 12 deletions at/mod.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { materialize } from "@observable/materialize";
import type { ObserverNotification } from "@observable/materialize";
import { at } from "./mod.ts";
import { throwError } from "@observable/throw-error";
import { tap } from "@observable/tap";

Deno.test("at should throw if no arguments are provided", () => {
assertThrows(
Expand Down Expand Up @@ -170,14 +171,23 @@ Deno.test("at(negative fractional) should truncate index -2.8 to -2", () => {
});

Deno.test("at(positive) when source has fewer items should emit nothing then return", () => {
const notifications: Array<ObserverNotification<number>> = [];
const observable = pipe(forOf([1, 2]), at(5), materialize());
const notifications: Array<["tap", value: number] | ObserverNotification<number>> = [];
const observable = pipe(
forOf([1, 2]),
tap((value) => notifications.push(["tap", value])),
at(5),
materialize(),
);

observable.subscribe(
new Observer((notification) => notifications.push(notification)),
);

assertEquals(notifications, [["return"]]);
assertEquals(notifications, [
["tap", 1],
["tap", 2],
["return"],
]);
});

Deno.test("at(-1) should emit only the last value", () => {
Expand All @@ -203,36 +213,65 @@ Deno.test("at(-2) should emit only the second-to-last value", () => {
});

Deno.test("at(negative) when source has fewer than |index| items should emit nothing then return", () => {
const notifications: Array<ObserverNotification<number>> = [];
const observable = pipe(forOf([1, 2]), at(-5), materialize());
const notifications: Array<["tap", value: number] | ObserverNotification<number>> = [];
const observable = pipe(
forOf([1, 2]),
tap((value) => notifications.push(["tap", value])),
at(-5),
materialize(),
);

observable.subscribe(
new Observer((notification) => notifications.push(notification)),
);

assertEquals(notifications, [["return"]]);
assertEquals(notifications, [
["tap", 1],
["tap", 2],
["return"],
]);
});

Deno.test("at(Infinity) should never emit", () => {
const notifications: Array<ObserverNotification<number>> = [];
const observable = pipe(forOf([1, 2, 3]), at(Infinity), materialize());
const notifications: Array<["tap", value: number] | ObserverNotification<number>> = [];
const observable = pipe(
forOf([1, 2, 3]),
tap((value) => notifications.push(["tap", value])),
at(Infinity),
materialize(),
);

observable.subscribe(
new Observer((notification) => notifications.push(notification)),
);

assertEquals(notifications, [["return"]]);
assertEquals(notifications, [
["tap", 1],
["tap", 2],
["tap", 3],
["return"],
]);
});

Deno.test("at(-Infinity) should never emit", () => {
const notifications: Array<ObserverNotification<number>> = [];
const observable = pipe(forOf([1, 2, 3]), at(-Infinity), materialize());
const notifications: Array<["tap", value: number] | ObserverNotification<number>> = [];
const observable = pipe(
forOf([1, 2, 3]),
tap((value) => notifications.push(["tap", value])),
at(-Infinity),
materialize(),
);

observable.subscribe(
new Observer((notification) => notifications.push(notification)),
);

assertEquals(notifications, [["return"]]);
assertEquals(notifications, [
["tap", 1],
["tap", 2],
["tap", 3],
["return"],
]);
});

Deno.test("at should pump throws through itself", () => {
Expand Down
36 changes: 26 additions & 10 deletions debounce/mod.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { debounce } from "./mod.ts";
import { flat } from "@observable/flat";
import { throwError } from "@observable/throw-error";
import { of } from "@observable/of";
import { tap } from "@observable/tap";

Deno.test("debounce should return empty if milliseconds is negative", () => {
// Arrange
Expand All @@ -33,33 +34,48 @@ Deno.test("debounce should return empty if milliseconds is NaN", () => {

Deno.test("debounce should ignore values but propagate return when milliseconds is Infinity", () => {
// Arrange
const notifications: Array<ObserverNotification<number>> = [];
const source = forOf([1, 2, 3]);
const materialized = pipe(source, debounce(Infinity), materialize());
const notifications: Array<["tap", value: number] | ObserverNotification<number>> = [];

// Act
materialized.subscribe(
pipe(
forOf([1, 2, 3]),
tap((value) => notifications.push(["tap", value])),
debounce(Infinity),
materialize(),
).subscribe(
new Observer((notification) => notifications.push(notification)),
);

// Assert
assertEquals(notifications, [["return"]]);
assertEquals(notifications, [
["tap", 1],
["tap", 2],
["tap", 3],
["return"],
]);
});

Deno.test("debounce should ignore values but propagate throw when milliseconds is Infinity", () => {
// Arrange
const error = new Error("test error");
const notifications: Array<ObserverNotification<number>> = [];
const source = flat([forOf([1, 2]), throwError(error)]);
const materialized = pipe(source, debounce(Infinity), materialize());
const notifications: Array<["tap", value: number] | ObserverNotification<number>> = [];

// Act
materialized.subscribe(
pipe(
flat([forOf([1, 2]), throwError(error)]),
tap((value) => notifications.push(["tap", value])),
debounce(Infinity),
materialize(),
).subscribe(
new Observer((notification) => notifications.push(notification)),
);

// Assert
assertEquals(notifications, [["throw", error]]);
assertEquals(notifications, [
["tap", 1],
["tap", 2],
["throw", error],
]);
});

Deno.test("debounce should emit value after timeout expires", () => {
Expand Down
177 changes: 177 additions & 0 deletions delay/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
# [@observable/delay](https://jsr.io/@observable/delay)

Delays values by the given number of `milliseconds`.

## Build

Automated by [JSR](https://jsr.io/)

## Publishing

Automated by `.github\workflows\publish.yml`.

## Running unit tests

Run `deno task test` or `deno task test:ci` to execute the unit tests via
[Deno](https://deno.land/).

## Examples

1000 milliseconds

```ts
import { delay } from "@observable/delay";
import { of } from "@observable/of";
import { pipe } from "@observable/pipe";

const controller = new AbortController();
pipe(of([1, 2, 3, 4, 5]), delay(1_000)).subscribe({
signal: controller.signal,
next: (value) => console.log("next", value),
return: () => console.log("return"),
throw: (value) => console.log("throw", value),
});

// Console output (after 1 second):
// "next" 1
// "next" 2
// "next" 3
// "next" 4
// "next" 5
// "return"
```

0 milliseconds

```ts
import { delay } from "@observable/delay";
import { of } from "@observable/of";
import { pipe } from "@observable/pipe";

const controller = new AbortController();
pipe(of([1, 2, 3, 4, 5]), delay(0)).subscribe({
signal: controller.signal,
next: (value) => console.log("next", value),
return: () => console.log("return"),
throw: (value) => console.log("throw", value),
});

// Console output (synchronously):
// "next" 1
// "next" 2
// "next" 3
// "next" 4
// "next" 5
// "return"
```

Infinite milliseconds

```ts
import { delay } from "@observable/delay";
import { of } from "@observable/of";
import { pipe } from "@observable/pipe";

const controller = new AbortController();
pipe(of([1, 2, 3, 4, 5]), delay(Infinity)).subscribe({
signal: controller.signal,
next: (value) => console.log("next", value),
return: () => console.log("return"),
throw: (value) => console.log("throw", value),
});

// Console output (synchronously):
// "return"
```

Negative milliseconds

```ts
import { delay } from "@observable/delay";
import { of } from "@observable/of";
import { pipe } from "@observable/pipe";

const controller = new AbortController();
pipe(of([1, 2, 3, 4, 5]), delay(-1)).subscribe({
signal: controller.signal,
next: (value) => console.log("next", value),
return: () => console.log("return"),
throw: (value) => console.log("throw", value),
});

// Console output (synchronously):
// "return"
```

NaN milliseconds

```ts
import { delay } from "@observable/delay";
import { of } from "@observable/of";
import { pipe } from "@observable/pipe";

const controller = new AbortController();
pipe(of([1, 2, 3, 4, 5]), delay(NaN))).subscribe({
signal: controller.signal,
next: (value) => console.log("next", value),
return: () => console.log("return"),
throw: (value) => console.log("throw", value),
});

// Console output (synchronously):
// "return"
```
Comment thread
alexanderharding marked this conversation as resolved.

## AI Prompt

Use the following prompt with AI assistants to help them understand this library:

````
You are helping me with code that uses @observable/delay from the @observable library ecosystem.

WHAT IT DOES:
`delay(milliseconds)` shifts each emitted value forward in time by scheduling it after `milliseconds` have elapsed. Values stay in the same order as the source.

CRITICAL: This library is NOT RxJS. Key differences:
- `delay` is a standalone function used with `pipe()` — NOT a method on Observable
- Observer uses `return`/`throw` — NOT `complete`/`error`
- Unsubscription via `AbortController.abort()` — NOT `subscription.unsubscribe()`

USAGE PATTERN:
```ts
import { delay } from "@observable/delay";
import { of } from "@observable/of";
import { pipe } from "@observable/pipe";

const controller = new AbortController();

pipe(of([1, 2, 3]), delay(1_000)).subscribe({
signal: controller.signal,
next: (value) => console.log(value),
return: () => console.log("done"),
throw: (error) => console.error(error),
});
```

WRONG USAGE:
```ts
// ✗ WRONG: delay is NOT a method on Observable
of([1, 2, 3]).delay(1000) // This does NOT work!
```

MILLISECONDS SEMANTICS:
- **Positive number:** each `next` is delayed by that many milliseconds; source order is preserved.
- **0:** values pass through without using timers (effectively immediate).
- **Negative or `NaN`:** the result is an empty Observable (no `next`; behaves like `@observable/empty`).
- **`Infinity`:** all values are dropped; when the source returns, the output returns (no delayed values).

ABORT / UNSUBSCRIBE:
Aborting the observer clears pending timers. After abort, fired timer callbacks must not deliver values to the consumer.

ARGUMENT VALIDATION:
`delay` requires a `number` for milliseconds. The returned operator function requires an Observable source; both throw `TypeError` if missing or wrong type.
````

## Glossary And Semantics

[@observable/core](https://jsr.io/@observable/core#glossary-and-semantics)
7 changes: 7 additions & 0 deletions delay/deno.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"name": "@observable/delay",
"version": "0.1.0",
"license": "MIT",
"exports": "./mod.ts",
"publish": { "exclude": ["*.test.ts"] }
}
Loading
Loading