Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 14 additions & 64 deletions all/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,9 @@

[`Next`](https://jsr.io/@observable/core/doc/~/Observer.next)s an
[`Array`](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Array) of
the latest values from _all_ of input's
[`Observable`](https://jsr.io/@observable/core/doc/~/Observable)s, in
values from _all_ of the given `observables` in
[iteration](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Iteration_protocols#the_iterable_protocol)
order. Each emitted array is a frozen snapshot.
order.

## Build

Expand All @@ -22,19 +21,19 @@ Run `deno task test` or `deno task test:ci` to execute the unit tests via

## Examples

Array of sources
Array of observables

```ts
import { all } from "@observable/all";
import { forOf } from "@observable/for-of";
import { pipe } from "@observable/pipe";

const source1 = forOf([1, 2, 3]);
const source2 = forOf([4, 5, 6]);
const source3 = forOf([7, 8, 9]);

const observable1 = forOf([1, 2, 3]);
const observable2 = forOf([4, 5, 6]);
const observable3 = forOf([7, 8, 9]);
const controller = new AbortController();
all([source1, source2, source3]).subscribe({

all([observable1, observable2, observable3]).subscribe({
signal: controller.signal,
next: (value) => console.log("next", value),
return: () => console.log("return"),
Expand All @@ -48,36 +47,19 @@ all([source1, source2, source3]).subscribe({
// "return"
```

Empty source ends immediately
Array with an empty observable

```ts
import { all } from "@observable/all";
import { forOf } from "@observable/for-of";
import { pipe } from "@observable/pipe";
import { empty } from "@observable/empty";

const source1 = forOf([1, 2, 3]);
const source2 = forOf([7, 8, 9]);

const observable1 = forOf([1, 2, 3]);
const observable2 = forOf([7, 8, 9]);
const controller = new AbortController();
all([source1, empty, source2]).subscribe({
signal: controller.signal,
next: (value) => console.log("next", value),
return: () => console.log("return"),
throw: (value) => console.log("throw", value),
});

// Console output:
// "return"
```

Empty array

```ts
import { all } from "@observable/all";

const controller = new AbortController();
all([]).subscribe({
all([observable1, empty, observable2]).subscribe({
signal: controller.signal,
next: (value) => console.log("next", value),
return: () => console.log("return"),
Expand All @@ -88,46 +70,14 @@ all([]).subscribe({
// "return"
```

Iterable of sources
Empty observable array

```ts
import { all } from "@observable/all";
import { Subject } from "@observable/core";

const source1 = new Subject<number>();
const source2 = source1;
const source3 = new Subject<number>();

const controller = new AbortController();
all(new Set([source1, source2, source3])).subscribe({
signal: controller.signal,
next: (value) => console.log("next", value),
return: () => console.log("return"),
throw: (value) => console.log("throw", value),
});
source2.next(1);
source1.next(2);
source3.next(3); // "next" [2, 3]
source1.next(4); // "next" [4, 3]
source2.next(5); // "next" [4, 5]
source1.return();
source3.return(); // "return"
source2.return();
```

Iterable with empty member

```ts
import { all } from "@observable/all";
import { forOf } from "@observable/for-of";
import { pipe } from "@observable/pipe";
import { empty } from "@observable/empty";

const source1 = forOf([1, 2, 3]);
const source2 = forOf([7, 8, 9]);

const controller = new AbortController();
all(new Set([source1, empty, source2])).subscribe({
all([]).subscribe({
signal: controller.signal,
next: (value) => console.log("next", value),
return: () => console.log("return"),
Expand Down
2 changes: 1 addition & 1 deletion all/deno.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@observable/all",
"version": "0.14.0",
"version": "0.15.0",
"license": "MIT",
"exports": "./mod.ts",
"publish": { "exclude": ["*.test.ts"] }
Expand Down
99 changes: 52 additions & 47 deletions all/mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,22 @@ import { takeUntil } from "@observable/take-until";
import { finalize } from "@observable/finalize";

/**
* [`Next`](https://jsr.io/@observable/core/doc/~/Observer.next)s an [`Array`](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Array)
* of the latest {@linkcode Values|values} from _all_ of {@linkcode input}'s [`Observable`](https://jsr.io/@observable/core/doc/~/Observable)s, in
* [index](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Array#array_indices) order.
* [`Next`](https://jsr.io/@observable/core/doc/~/Observer.next)s {@linkcode Values|values} from _all_ of the given
* {@linkcode observables} in [index](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Array#array_indices)
* order.
* @example
* Array of sources
* Array of observables
* ```ts
* import { all } from "@observable/all";
* import { forOf } from "@observable/for-of";
* import { pipe } from "@observable/pipe";
*
* const source1 = forOf([1, 2, 3]);
* const source2 = forOf([4, 5, 6]);
* const source3 = forOf([7, 8, 9]);
*
* const observable1 = forOf([1, 2, 3]);
* const observable2 = forOf([4, 5, 6]);
* const observable3 = forOf([7, 8, 9]);
* const controller = new AbortController();
* all([source1, source2, source3]).subscribe({
*
* all([observable1, observable2, observable3]).subscribe({
* signal: controller.signal,
* next: (value) => console.log("next", value),
* return: () => console.log("return"),
Expand All @@ -40,18 +40,18 @@ import { finalize } from "@observable/finalize";
* // "return"
* ```
* @example
* Empty source ends immediately
* Array with an empty observable
* ```ts
* import { all } from "@observable/all";
* import { forOf } from "@observable/for-of";
* import { pipe } from "@observable/pipe";
* import { empty } from "@observable/empty";
*
* const source1 = forOf([1, 2, 3]);
* const source2 = forOf([7, 8, 9]);
*
* const observable1 = forOf([1, 2, 3]);
* const observable2 = forOf([7, 8, 9]);
* const controller = new AbortController();
* all([source1, empty, source2]).subscribe({
*
* all([observable1, empty, observable2]).subscribe({
* signal: controller.signal,
* next: (value) => console.log("next", value),
* return: () => console.log("return"),
Expand All @@ -62,11 +62,12 @@ import { finalize } from "@observable/finalize";
* // "return"
* ```
* @example
* Empty array
* Empty observable array
* ```ts
* import { all } from "@observable/all";
*
* const controller = new AbortController();
*
* all([]).subscribe({
* signal: controller.signal,
* next: (value) => console.log("next", value),
Expand All @@ -79,51 +80,51 @@ import { finalize } from "@observable/finalize";
* ```
*/
export function all<const Values extends ReadonlyArray<unknown>>(
input: Readonly<{ [Key in keyof Values]: Observable<Values[Key]> }>,
observables: Readonly<{ [Key in keyof Values]: Observable<Values[Key]> }>,
): Observable<Values>;
/**
* [`Next`](https://jsr.io/@observable/core/doc/~/Observer.next)s an [`Array`](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Array)
* of the latest {@linkcode Value|values} from _all_ of {@linkcode input}'s [`Observable`](https://jsr.io/@observable/core/doc/~/Observable)s, in
* [iteration](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Iteration_protocols#the_iterable_protocol) order.
* of {@linkcode Value|values} from _all_ of the given {@linkcode observables} in [iteration](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Iteration_protocols#the_iterable_protocol)
* order.
* @example
* Iterable of sources
* Iterable of observables
* ```ts
* import { all } from "@observable/all";
* import { Subject } from "@observable/core";
*
* const source1 = new Subject<number>();
* const source2 = source1;
* const source3 = new Subject<number>();
*
* const subject1 = new Subject<number>();
* const subject2 = subject1;
* const subject3 = new Subject<number>();
* const controller = new AbortController();
* all(new Set([source1, source2, source3])).subscribe({
*
* all(new Set([subject1, subject2, subject3])).subscribe({
* signal: controller.signal,
* next: (value) => console.log("next", value),
* return: () => console.log("return"),
* throw: (value) => console.log("throw", value),
* });
* source2.next(1);
* source1.next(2);
* source3.next(3); // "next" [2, 3]
* source1.next(4); // "next" [4, 3]
* source2.next(5); // "next" [4, 5]
* source1.return();
* source3.return(); // "return"
* source2.return();
* subject2.next(1);
* subject1.next(2);
* subject3.next(3); // "next" [2, 3]
* subject1.next(4); // "next" [4, 3]
* subject2.next(5); // "next" [4, 5]
* subject1.return();
* subject3.return(); // "return"
* subject2.return();
* ```
* @example
* Iterable with empty member
* Iterable with an empty observable
* ```ts
* import { all } from "@observable/all";
* import { forOf } from "@observable/for-of";
* import { pipe } from "@observable/pipe";
* import { empty } from "@observable/empty";
*
* const source1 = forOf([1, 2, 3]);
* const source2 = forOf([7, 8, 9]);
*
* const observable1 = forOf([1, 2, 3]);
* const observable2 = forOf([7, 8, 9]);
* const controller = new AbortController();
* all(new Set([source1, empty, source2])).subscribe({
*
* all(new Set([observable1, empty, observable2])).subscribe({
* signal: controller.signal,
* next: (value) => console.log("next", value),
* return: () => console.log("return"),
Expand All @@ -134,12 +135,16 @@ export function all<const Values extends ReadonlyArray<unknown>>(
* // "return"
* ```
*/
export function all<Value>(input: Iterable<Observable<Value>>): Observable<ReadonlyArray<Value>>;
export function all<Value>(input: Iterable<Observable<Value>>): Observable<ReadonlyArray<Value>> {
export function all<Value>(
observables: Iterable<Observable<Value>>,
): Observable<ReadonlyArray<Value>>;
export function all<Value>(
observables: Iterable<Observable<Value>>,
): Observable<ReadonlyArray<Value>> {
if (!arguments.length) throw new TypeError("1 argument required but 0 present");
if (!isIterable(input)) throw new TypeError("Parameter 1 is not of type 'Iterable'");
if (!isIterable(observables)) throw new TypeError("Parameter 1 is not of type 'Iterable'");

if (Array.isArray(input) && !input.length) return empty;
if (Array.isArray(observables) && !observables.length) return empty;

// Use defer so we do not start iterating until subscription, we get a fresh iteration for each subscription,
// and we get a fresh variable scope for each subscription.
Expand All @@ -149,15 +154,15 @@ export function all<Value>(input: Iterable<Observable<Value>>): Observable<Reado
*/
let receivedFirstValueCount = 0;
/**
* The normalized {@linkcode input} which has a known length for subsequent logic.
* The normalized {@linkcode observables} which has a known length for subsequent logic.
*/
const inputArray: ReadonlyArray<Observable<Value>> = Array.isArray(input)
? input
: Array.from(input);
const observableArray: ReadonlyArray<Observable<Value>> = Array.isArray(observables)
? observables
: Array.from(observables);
/**
* Tracking the expected number of first values that need to be received before the first snapshot is emitted.
*/
const expectedFirstValueCount = inputArray.length;
const expectedFirstValueCount = observableArray.length;

/**
* Tracking a known list of buffered values, so we don't have to clone them while nexting to prevent reentrant behaviors.
Expand All @@ -175,7 +180,7 @@ export function all<Value>(input: Iterable<Observable<Value>>): Observable<Reado
const stop = new Subject<void>();

return pipe(
forOf(inputArray),
forOf(observableArray),
mergeMap((observable, index) => {
/**
* Tracking if the observable is empty to be evaluated by subsequent logic.
Expand Down
14 changes: 6 additions & 8 deletions async-await/README.md
Original file line number Diff line number Diff line change
@@ -1,12 +1,8 @@
# [@observable/async-await](https://jsr.io/@observable/async-await)

Uses the
[async function](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Statements/async_function)
syntax to
[`await`](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Operators/await) the
provided `expression` and [`next`](https://jsr.io/@observable/core/doc/~/Observer.next) it's
resolved value through the returned
[`Observable`](https://jsr.io/@observable/core/doc/~/Observable).
[`Await`](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Operators/await)s the
given `expression`, [`next`](https://jsr.io/@observable/core/doc/~/Observer.next)s its resolved
value, and then [`return`](https://jsr.io/@observable/core/doc/~/Observer.return)s.

## Build

Expand Down Expand Up @@ -106,7 +102,9 @@ Use the following prompt with AI assistants to help them understand this library
You are helping me with code that uses @observable/async-await from the @observable library ecosystem.

WHAT IT DOES:
`asyncAwait(promise)` converts a Promise/PromiseLike into an Observable that emits the resolved value, then calls `return()`. If the promise rejects, it calls `throw()` with the error.
`asyncAwait(expression)` applies `await` to the given expression, then `next`s the resolved value and
`return`s. The argument may be a `Promise`, a thenable, or any other value (same rules as `await` in
an async function — non-thenables are `next`ed as-is). Rejection surfaces as `throw()`.

CRITICAL DIFFERENCES FROM RxJS:
- Observer uses `return`/`throw` — NOT `complete`/`error`
Expand Down
2 changes: 1 addition & 1 deletion async-await/deno.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@observable/async-await",
"version": "0.3.0",
"version": "0.4.0",
"license": "MIT",
"exports": "./mod.ts",
"publish": { "exclude": ["*.test.ts"] }
Expand Down
8 changes: 3 additions & 5 deletions async-await/mod.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
import { Observable } from "@observable/core";

/**
* Uses the [async function](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Statements/async_function)
* syntax to [`await`](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Operators/await) the provided
* {@linkcode expression} and [`next`](https://jsr.io/@observable/core/doc/~/Observer.next) it's resolved value through
* the returned [`Observable`](https://jsr.io/@observable/core/doc/~/Observable).
* @param expression - A [`Promise`](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Promise), a [thenable object](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Promise#thenables), or any value to wait for.
* [`Await`](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Operators/await)s the given
* {@linkcode expression}, [`next`](https://jsr.io/@observable/core/doc/~/Observer.next)s its resolved value, and then
* [`return`](https://jsr.io/@observable/core/doc/~/Observer.return)s.
* @example
* Resolved promise
* ```ts
Expand Down
9 changes: 3 additions & 6 deletions at/README.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
# [@observable/at](https://jsr.io/@observable/at)

Filters [`next`](https://jsr.io/@observable/core/doc/~/Observer.next)ed values from the
[source](https://jsr.io/@observable/core#source)
[`Observable`](https://jsr.io/@observable/core/doc/~/Observable) to only the
[`next`](https://jsr.io/@observable/core/doc/~/Observer.next)ed value at the given `index`. Negative
`indices` count back from the last [`next`](https://jsr.io/@observable/core/doc/~/Observer.next)ed
value in the sequence.
[`Next`](https://jsr.io/@observable/core/doc/~/Observer.next)s the first value at the given `index`
and then [`return`](https://jsr.io/@observable/core/doc/~/Observer.return)s. Negative indices count
back from the last value in the sequence.

## Build

Expand Down
Loading
Loading