diff --git a/async/unstable_pool_settled.ts b/async/unstable_pool_settled.ts index 26657ab919d5..2188876cebf5 100644 --- a/async/unstable_pool_settled.ts +++ b/async/unstable_pool_settled.ts @@ -29,7 +29,7 @@ export interface PooledMapSettledOptions { * * If the input iterable itself throws, all currently executing items are * allowed to finish and their settled results are yielded, then the iterator - * closes. The error from the input iterable is not propagated to the consumer. + * rejects with the original error from the input iterable. * * @experimental **UNSTABLE**: New API, yet to be vetted. * @@ -76,6 +76,13 @@ export interface PooledMapSettledOptions { * @returns An async iterator yielding `PromiseSettledResult` for each item, * in the order items were yielded from the input. * @throws {RangeError} If `poolLimit` is not a positive integer. + * @throws The signal's `reason` if the signal is aborted. Already-started + * items are allowed to settle before the rejection is surfaced. If the + * input iterable also throws while the signal is already aborted, the + * abort reason is used rather than the iterable's error. + * @throws The original error if the input iterable throws while the signal is + * not aborted. Already-started items are allowed to settle before the + * rejection is surfaced. */ export function pooledMapSettled( array: Iterable | AsyncIterable, @@ -92,14 +99,15 @@ export function pooledMapSettled( type Settled = PromiseSettledResult; - const ABORT_SENTINEL = Symbol(); + const ABORT_SENTINEL = Symbol("abort"); + const SOURCE_ERROR_SENTINEL = Symbol("sourceError"); + let sourceError: unknown; - const res = new TransformStream< - Promise, - Settled - >({ + type Sentinel = typeof ABORT_SENTINEL | typeof SOURCE_ERROR_SENTINEL; + + const res = new TransformStream, Settled>({ async transform( - p: Promise, + p: Promise, controller: TransformStreamDefaultController, ) { const result = await p; @@ -107,24 +115,33 @@ export function pooledMapSettled( controller.error(signal?.reason); return; } + if (result === SOURCE_ERROR_SENTINEL) { + controller.error(sourceError); + return; + } controller.enqueue(result); }, }); (async () => { const writer = res.writable.getWriter(); - const executing: Array> = []; - - function raceWithSignal( - promises: Array>, - ): Promise { - if (!signal) return Promise.race(promises); - const { promise, resolve, reject } = Promise.withResolvers(); - const onAbort = () => reject(signal.reason); + const executing = new Set>(); + + let abortDeferred: PromiseWithResolvers | undefined; + let removeAbortListener: (() => void) | undefined; + if (signal) { + abortDeferred = Promise.withResolvers(); + const onAbort = () => abortDeferred!.reject(signal.reason); signal.addEventListener("abort", onAbort, { once: true }); - return Promise.race([...promises, promise]).finally(() => { - signal.removeEventListener("abort", onAbort); - resolve(undefined as never); + removeAbortListener = () => signal.removeEventListener("abort", onAbort); + abortDeferred.promise.catch(() => {}); + } + + function raceWithSignal(): Promise { + if (!abortDeferred) return Promise.race(executing); + executing.add(abortDeferred.promise); + return Promise.race(executing).finally(() => { + executing.delete(abortDeferred!.promise); }); } @@ -143,31 +160,52 @@ export function pooledMapSettled( try { signal?.throwIfAborted(); - for await (const item of array) { - signal?.throwIfAborted(); + const it = (Symbol.asyncIterator in Object(array)) + ? (array as AsyncIterable)[Symbol.asyncIterator]() + : (array as Iterable)[Symbol.iterator](); - const p = settle(() => iteratorFn(item)); - writer.write(p); - const e: Promise = p.then(() => - executing.splice(executing.indexOf(e), 1) - ); - executing.push(e); - if (executing.length >= poolLimit) { - await raceWithSignal(executing); + try { + while (true) { + const nextPromise = Promise.resolve(it.next()); + if (abortDeferred) nextPromise.catch(() => {}); + const next = abortDeferred + ? await Promise.race([nextPromise, abortDeferred.promise]) + : await nextPromise; + + if (next.done) break; + + const item = next.value; + const p = settle(() => iteratorFn(item)); + writer.write(p).catch(() => {}); + const e: Promise = p.then(() => executing.delete(e)); + executing.add(e); + if (executing.size >= poolLimit) { + await raceWithSignal(); + } + } + } finally { + if (signal?.aborted) { + Promise.resolve(it.return?.()).catch(() => {}); + } else { + await it.return?.(); } } + await Promise.all(executing); - writer.close(); - } catch { - // Wait for in-flight work so their settled results are still yielded in - // order, then write a sentinel that causes the stream to error with the - // abort reason. + writer.close().catch(() => {}); + } catch (caughtError) { + const wasAborted = signal?.aborted ?? false; await Promise.all(executing).catch(() => {}); - if (signal?.aborted) { + if (wasAborted) { writer.write(Promise.resolve(ABORT_SENTINEL)).catch(() => {}); } else { - writer.close(); + sourceError = caughtError; + writer + .write(Promise.resolve(SOURCE_ERROR_SENTINEL)) + .catch(() => {}); } + } finally { + removeAbortListener?.(); } })(); @@ -179,11 +217,15 @@ export function pooledMapSettled( >)() : (async function* () { const reader = res.readable.getReader(); - while (true) { - const { done, value } = await reader.read(); - if (done) break; - yield value; + try { + while (true) { + const { done, value } = await reader.read(); + if (done) break; + yield value; + } + } finally { + reader.cancel().catch(() => {}); + reader.releaseLock(); } - reader.releaseLock(); })(); } diff --git a/async/unstable_pool_settled_test.ts b/async/unstable_pool_settled_test.ts index 335aa74b709c..39485132ed27 100644 --- a/async/unstable_pool_settled_test.ts +++ b/async/unstable_pool_settled_test.ts @@ -146,27 +146,194 @@ Deno.test("pooledMapSettled() yields in-flight results then rejects on abort", a } }); -Deno.test("pooledMapSettled() closes cleanly when input iterable throws", async () => { +Deno.test("pooledMapSettled() propagates source iterable error after in-flight results", async () => { async function* failing() { yield 1; yield 2; throw new Error("source failed"); } - const results = await Array.fromAsync( - pooledMapSettled( - failing(), - (i) => Promise.resolve(i * 10), - { poolLimit: 2 }, - ), + const collected: PromiseSettledResult[] = []; + + await assertRejects( + async () => { + for await ( + const result of pooledMapSettled( + failing(), + (i) => Promise.resolve(i * 10), + { poolLimit: 2 }, + ) + ) { + collected.push(result); + } + }, + Error, + "source failed", ); - assertEquals(results, [ + assertEquals(collected, [ { status: "fulfilled", value: 10 }, { status: "fulfilled", value: 20 }, ]); }); +Deno.test("pooledMapSettled() source error is not wrapped as PromiseSettledResult", async () => { + async function* failing() { + yield 1; + throw new Error("source failed"); + } + + const collected: PromiseSettledResult[] = []; + + await assertRejects( + async () => { + for await ( + const result of pooledMapSettled( + failing(), + async (i) => { + await delay(10); + return i; + }, + { poolLimit: 2 }, + ) + ) { + collected.push(result); + } + }, + Error, + "source failed", + ); + + assertEquals(collected, [{ status: "fulfilled", value: 1 }]); +}); + +Deno.test("pooledMapSettled() rejects with source error when signal is not yet aborted at source failure", async () => { + const controller = new AbortController(); + + async function* failingSource() { + yield 1; + yield 2; + throw new Error("source failed"); + } + + const collected: PromiseSettledResult[] = []; + + await assertRejects( + async () => { + for await ( + const result of pooledMapSettled( + failingSource(), + async (i) => { + await delay(20); + if (i === 2) controller.abort(new Error("abort later")); + return i; + }, + { poolLimit: 3, signal: controller.signal }, + ) + ) { + collected.push(result); + } + }, + Error, + "source failed", + ); + + assertGreaterOrEqual(collected.length, 1); +}); + +Deno.test("pooledMapSettled() rejects with abort reason when signal is already aborted at source failure", async () => { + const controller = new AbortController(); + + function* failingSource() { + yield 1; + controller.abort(new Error("aborted")); + throw new Error("source failed"); + } + + const collected: PromiseSettledResult[] = []; + + await assertRejects( + async () => { + for await ( + const result of pooledMapSettled( + failingSource(), + (i) => Promise.resolve(i), + { poolLimit: 2, signal: controller.signal }, + ) + ) { + collected.push(result); + } + }, + Error, + "aborted", + ); + + assertEquals(collected, [{ status: "fulfilled", value: 1 }]); +}); + +Deno.test({ + name: + "pooledMapSettled() reacts to abort while waiting for slow async source", + async fn() { + const controller = new AbortController(); + + async function* slowSource() { + yield 1; + await new Promise((r) => setTimeout(r, 1000)); + yield 2; + } + + setTimeout(() => controller.abort(new Error("aborted")), 25); + + const start = performance.now(); + await assertRejects( + () => + Array.fromAsync( + pooledMapSettled(slowSource(), (i) => i, { + poolLimit: 1, + signal: controller.signal, + }), + ), + Error, + "aborted", + ); + assertLess(performance.now() - start, 200); + }, + sanitizeOps: false, + sanitizeResources: false, +}); + +Deno.test({ + name: "pooledMapSettled() reacts to abort with stalled async source", + async fn() { + const controller = new AbortController(); + + async function* stalledSource() { + yield 1; + await new Promise(() => {}); + yield 2; + } + + setTimeout(() => controller.abort(new Error("aborted")), 25); + + const start = performance.now(); + await assertRejects( + () => + Array.fromAsync( + pooledMapSettled(stalledSource(), (i) => i, { + poolLimit: 1, + signal: controller.signal, + }), + ), + Error, + "aborted", + ); + assertLess(performance.now() - start, 200); + }, + sanitizeOps: false, + sanitizeResources: false, +}); + Deno.test("pooledMapSettled() checks browser compat", async () => { const asyncIterFunc = ReadableStream.prototype[Symbol.asyncIterator]; // deno-lint-ignore no-explicit-any @@ -184,3 +351,105 @@ Deno.test("pooledMapSettled() checks browser compat", async () => { ReadableStream.prototype[Symbol.asyncIterator] = asyncIterFunc; } }); + +// Early consumer break tests are grouped at the end of this file because +// breaking out of `for await` leaves the producer IIFE running with in-flight +// timers that cannot be deterministically drained. Sanitizers are disabled +// following the same pattern as delay_test.ts. + +Deno.test({ + name: + "pooledMapSettled() handles early consumer break without unhandled rejections", + async fn() { + const collected: PromiseSettledResult[] = []; + + for await ( + const result of pooledMapSettled( + [1, 2, 3, 4, 5, 6, 7, 8, 9, 10], + async (i) => { + await delay(10); + return i; + }, + { poolLimit: 2 }, + ) + ) { + collected.push(result); + if (collected.length === 2) break; + } + + assertEquals(collected.length, 2); + for (const r of collected) { + assertEquals(r.status, "fulfilled"); + } + }, + sanitizeOps: false, + sanitizeResources: false, +}); + +Deno.test({ + name: "pooledMapSettled() handles early consumer break with source error", + async fn() { + async function* failingSource() { + for (let i = 1; i <= 10; i++) { + yield i; + if (i === 5) throw new Error("source failed"); + } + } + + const collected: PromiseSettledResult[] = []; + + for await ( + const result of pooledMapSettled( + failingSource(), + async (i) => { + await delay(10); + return i; + }, + { poolLimit: 2 }, + ) + ) { + collected.push(result); + if (collected.length === 2) break; + } + + assertGreaterOrEqual(collected.length, 1); + assertLess(collected.length, 10); + }, + sanitizeOps: false, + sanitizeResources: false, +}); + +Deno.test({ + name: "pooledMapSettled() fallback path handles early consumer break", + async fn() { + const asyncIterFunc = ReadableStream.prototype[Symbol.asyncIterator]; + // deno-lint-ignore no-explicit-any + delete (ReadableStream.prototype as any)[Symbol.asyncIterator]; + try { + const collected: PromiseSettledResult[] = []; + + for await ( + const result of pooledMapSettled( + [1, 2, 3, 4, 5, 6, 7, 8, 9, 10], + async (i) => { + await delay(10); + return i; + }, + { poolLimit: 2 }, + ) + ) { + collected.push(result); + if (collected.length === 2) break; + } + + assertEquals(collected.length, 2); + for (const r of collected) { + assertEquals(r.status, "fulfilled"); + } + } finally { + ReadableStream.prototype[Symbol.asyncIterator] = asyncIterFunc; + } + }, + sanitizeOps: false, + sanitizeResources: false, +});