Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 82 additions & 40 deletions async/unstable_pool_settled.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ export interface PooledMapSettledOptions {
*
* If the input iterable itself throws, all currently executing items are
* allowed to finish and their settled results are yielded, then the iterator
* closes. The error from the input iterable is not propagated to the consumer.
* rejects with the original error from the input iterable.
*
* @experimental **UNSTABLE**: New API, yet to be vetted.
*
Expand Down Expand Up @@ -76,6 +76,13 @@ export interface PooledMapSettledOptions {
* @returns An async iterator yielding `PromiseSettledResult<R>` for each item,
* in the order items were yielded from the input.
* @throws {RangeError} If `poolLimit` is not a positive integer.
* @throws The signal's `reason` if the signal is aborted. Already-started
* items are allowed to settle before the rejection is surfaced. If the
* input iterable also throws while the signal is already aborted, the
* abort reason is used rather than the iterable's error.
* @throws The original error if the input iterable throws while the signal is
* not aborted. Already-started items are allowed to settle before the
* rejection is surfaced.
*/
export function pooledMapSettled<T, R>(
array: Iterable<T> | AsyncIterable<T>,
Expand All @@ -92,39 +99,49 @@ export function pooledMapSettled<T, R>(

type Settled = PromiseSettledResult<R>;

const ABORT_SENTINEL = Symbol();
const ABORT_SENTINEL = Symbol("abort");
const SOURCE_ERROR_SENTINEL = Symbol("sourceError");
let sourceError: unknown;

const res = new TransformStream<
Promise<Settled | typeof ABORT_SENTINEL>,
Settled
>({
type Sentinel = typeof ABORT_SENTINEL | typeof SOURCE_ERROR_SENTINEL;

const res = new TransformStream<Promise<Settled | Sentinel>, Settled>({
async transform(
p: Promise<Settled | typeof ABORT_SENTINEL>,
p: Promise<Settled | Sentinel>,
controller: TransformStreamDefaultController<Settled>,
) {
const result = await p;
if (result === ABORT_SENTINEL) {
controller.error(signal?.reason);
return;
}
if (result === SOURCE_ERROR_SENTINEL) {
controller.error(sourceError);
return;
}
controller.enqueue(result);
},
});

(async () => {
const writer = res.writable.getWriter();
const executing: Array<Promise<unknown>> = [];

function raceWithSignal(
promises: Array<Promise<unknown>>,
): Promise<unknown> {
if (!signal) return Promise.race(promises);
const { promise, resolve, reject } = Promise.withResolvers<never>();
const onAbort = () => reject(signal.reason);
const executing = new Set<Promise<unknown>>();

let abortDeferred: PromiseWithResolvers<never> | undefined;
let removeAbortListener: (() => void) | undefined;
if (signal) {
abortDeferred = Promise.withResolvers<never>();
const onAbort = () => abortDeferred!.reject(signal.reason);
signal.addEventListener("abort", onAbort, { once: true });
return Promise.race([...promises, promise]).finally(() => {
signal.removeEventListener("abort", onAbort);
resolve(undefined as never);
removeAbortListener = () => signal.removeEventListener("abort", onAbort);
abortDeferred.promise.catch(() => {});
}

function raceWithSignal(): Promise<unknown> {
if (!abortDeferred) return Promise.race(executing);
executing.add(abortDeferred.promise);
return Promise.race(executing).finally(() => {
executing.delete(abortDeferred!.promise);
});
}

Expand All @@ -143,31 +160,52 @@ export function pooledMapSettled<T, R>(
try {
signal?.throwIfAborted();

for await (const item of array) {
signal?.throwIfAborted();
const it = (Symbol.asyncIterator in Object(array))
? (array as AsyncIterable<T>)[Symbol.asyncIterator]()
: (array as Iterable<T>)[Symbol.iterator]();

const p = settle(() => iteratorFn(item));
writer.write(p);
const e: Promise<unknown> = p.then(() =>
executing.splice(executing.indexOf(e), 1)
);
executing.push(e);
if (executing.length >= poolLimit) {
await raceWithSignal(executing);
try {
while (true) {
const nextPromise = Promise.resolve(it.next());
if (abortDeferred) nextPromise.catch(() => {});
const next = abortDeferred
? await Promise.race([nextPromise, abortDeferred.promise])
: await nextPromise;

if (next.done) break;

const item = next.value;
const p = settle(() => iteratorFn(item));
writer.write(p).catch(() => {});
const e: Promise<unknown> = p.then(() => executing.delete(e));
executing.add(e);
if (executing.size >= poolLimit) {
await raceWithSignal();
}
}
} finally {
if (signal?.aborted) {
Promise.resolve(it.return?.()).catch(() => {});
} else {
await it.return?.();
}
}

await Promise.all(executing);
writer.close();
} catch {
// Wait for in-flight work so their settled results are still yielded in
// order, then write a sentinel that causes the stream to error with the
// abort reason.
writer.close().catch(() => {});
} catch (caughtError) {
const wasAborted = signal?.aborted ?? false;
await Promise.all(executing).catch(() => {});
if (signal?.aborted) {
if (wasAborted) {
writer.write(Promise.resolve(ABORT_SENTINEL)).catch(() => {});
} else {
writer.close();
sourceError = caughtError;
writer
.write(Promise.resolve(SOURCE_ERROR_SENTINEL))
.catch(() => {});
}
} finally {
removeAbortListener?.();
}
})();

Expand All @@ -179,11 +217,15 @@ export function pooledMapSettled<T, R>(
>)()
: (async function* () {
const reader = res.readable.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) break;
yield value;
try {
while (true) {
const { done, value } = await reader.read();
if (done) break;
yield value;
}
} finally {
reader.cancel().catch(() => {});
reader.releaseLock();
}
reader.releaseLock();
})();
}
Loading
Loading