From ac82063dc400a407e69a8d7ccfd8c2c8acc17477 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=B1=A0=E4=B8=8B=20=E5=85=8B=E5=BD=A6?= Date: Thu, 26 Mar 2026 15:30:37 +0900 Subject: [PATCH 1/2] stream: use internal abort hooks for web streams --- lib/internal/streams/add-abort-signal.js | 4 ++-- lib/internal/streams/utils.js | 4 ++-- lib/internal/webstreams/readablestream.js | 9 ++++++--- lib/internal/webstreams/writablestream.js | 8 +++++--- 4 files changed, 15 insertions(+), 10 deletions(-) diff --git a/lib/internal/streams/add-abort-signal.js b/lib/internal/streams/add-abort-signal.js index ccd50dd141e6a0..e0c6c60a98581f 100644 --- a/lib/internal/streams/add-abort-signal.js +++ b/lib/internal/streams/add-abort-signal.js @@ -14,7 +14,7 @@ const { const { isNodeStream, isWebStream, - kControllerErrorFunction, + kControllerAbortFunction, } = require('internal/streams/utils'); const { eos } = require('internal/streams/end-of-stream'); @@ -47,7 +47,7 @@ module.exports.addAbortSignalNoValidate = function(signal, stream) { stream.destroy(new AbortError(undefined, { cause: signal.reason })); } : () => { - stream[kControllerErrorFunction](new AbortError(undefined, { cause: signal.reason })); + stream[kControllerAbortFunction](new AbortError(undefined, { cause: signal.reason })); }; if (signal.aborted) { onAbort(); diff --git a/lib/internal/streams/utils.js b/lib/internal/streams/utils.js index 45f55316104fe8..1afee846d4adc7 100644 --- a/lib/internal/streams/utils.js +++ b/lib/internal/streams/utils.js @@ -20,7 +20,7 @@ const kIsDisturbed = SymbolFor('nodejs.stream.disturbed'); const kOnConstructed = Symbol('kOnConstructed'); const kIsClosedPromise = SymbolFor('nodejs.webstream.isClosedPromise'); -const kControllerErrorFunction = SymbolFor('nodejs.webstream.controllerErrorFunction'); +const kControllerAbortFunction = SymbolFor('nodejs.webstream.controllerAbortFunction'); const kState = Symbol('kState'); const kObjectMode = 1 << 0; @@ -326,7 +326,7 @@ module.exports = { isReadable, kIsReadable, kIsClosedPromise, - kControllerErrorFunction, + kControllerAbortFunction, kIsWritable, isClosed, isDuplexNodeStream, diff --git a/lib/internal/webstreams/readablestream.js b/lib/internal/webstreams/readablestream.js index 94592cdf01258c..90152621449258 100644 --- a/lib/internal/webstreams/readablestream.js +++ b/lib/internal/webstreams/readablestream.js @@ -86,7 +86,7 @@ const { kIsErrored, kIsReadable, kIsClosedPromise, - kControllerErrorFunction, + kControllerAbortFunction, } = require('internal/streams/utils'); const { @@ -254,7 +254,7 @@ class ReadableStream { this[kState] = createReadableStreamState(); this[kIsClosedPromise] = PromiseWithResolvers(); - this[kControllerErrorFunction] = () => {}; + this[kControllerAbortFunction] = () => {}; // The spec requires handling of the strategy first // here. Specifically, if getting the size and @@ -2540,7 +2540,8 @@ function setupReadableStreamDefaultController( stream, }; stream[kState].controller = controller; - stream[kControllerErrorFunction] = FunctionPrototypeBind(controller.error, controller); + stream[kControllerAbortFunction] = + (error) => readableStreamDefaultControllerError(controller, error); const startResult = startAlgorithm(); @@ -3364,6 +3365,8 @@ function setupReadableByteStreamController( pendingPullIntos: [], }; stream[kState].controller = controller; + stream[kControllerAbortFunction] = + (error) => readableByteStreamControllerError(controller, error); const startResult = startAlgorithm(); diff --git a/lib/internal/webstreams/writablestream.js b/lib/internal/webstreams/writablestream.js index ca20c08b258c3c..aca901f14bb22d 100644 --- a/lib/internal/webstreams/writablestream.js +++ b/lib/internal/webstreams/writablestream.js @@ -78,7 +78,7 @@ const { kIsClosedPromise, kIsErrored, kIsWritable, - kControllerErrorFunction, + kControllerAbortFunction, } = require('internal/streams/utils'); const { @@ -173,7 +173,7 @@ class WritableStream { this[kState] = createWritableStreamState(); this[kIsClosedPromise] = PromiseWithResolvers(); - this[kControllerErrorFunction] = () => {}; + this[kControllerAbortFunction] = () => {}; const size = extractSizeAlgorithm(strategy?.size); const highWaterMark = extractHighWaterMark(strategy?.highWaterMark, 1); @@ -1322,7 +1322,9 @@ function setupWritableStreamDefaultController( writeAlgorithm, }; stream[kState].controller = controller; - stream[kControllerErrorFunction] = FunctionPrototypeBind(controller.error, controller); + stream[kControllerAbortFunction] = (reason) => { + setPromiseHandled(writableStreamAbort(stream, reason)); + }; writableStreamUpdateBackpressure( stream, From bb0c045ee285d929dddb8058ba2fe342df77f5c8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=B1=A0=E4=B8=8B=20=E5=85=8B=E5=BD=A6?= Date: Thu, 26 Mar 2026 15:31:06 +0900 Subject: [PATCH 2/2] test: cover webstreams addAbortSignal regressions --- .../test-webstreams-abort-controller.js | 238 +++++++++++++----- 1 file changed, 180 insertions(+), 58 deletions(-) diff --git a/test/parallel/test-webstreams-abort-controller.js b/test/parallel/test-webstreams-abort-controller.js index 64d38af1683e40..ad6635d5b04fc1 100644 --- a/test/parallel/test-webstreams-abort-controller.js +++ b/test/parallel/test-webstreams-abort-controller.js @@ -4,28 +4,90 @@ const common = require('../common'); const { finished, addAbortSignal } = require('stream'); const { ReadableStream, WritableStream } = require('stream/web'); const assert = require('assert'); +const { setImmediate } = require('timers/promises'); + +const typeErrorPredicate = { name: 'TypeError', code: 'ERR_INVALID_STATE' }; function createTestReadableStream() { - return new ReadableStream({ - start(controller) { + let controller; + const rs = new ReadableStream({ + start(c) { + controller = c; controller.enqueue('a'); controller.enqueue('b'); controller.enqueue('c'); controller.close(); } }); + return [rs, controller]; } function createTestWritableStream(values) { - return new WritableStream({ - write(chunk) { + let controller; + const ws = new WritableStream({ + start(c) { controller = c; }, + write(chunk, c) { values.push(chunk); } }); + return [ ws, controller ]; +} + +/** + * + * @param {ReadableStream} rs + * @param {import('internal/webstreams/readablestream').ReadableStreamReader} reader + * @param {{ + * isByob?: boolean, + * controller?: import('internal/webstreams/readablestream').ReadableStreamController, + * additionalAssertions?: () => void, + * }} options + */ +function assertReadableStreamEventuallyAborted(rs, reader, { + isByob, + controller, + additionalAssertions = common.mustCall() +} = {}) { + finished(rs, { writable: false }, common.mustCall((err) => { + assert.strictEqual(err.name, 'AbortError'); + assert.rejects(reader.read(...(isByob ? [new Uint8Array(1)] : [])), /AbortError/).then(common.mustCall()); + assert.rejects(reader.closed, /AbortError/).then(common.mustCall()); + if (controller) { + assert.throws(() => controller.close(), typeErrorPredicate); + assert.throws(() => controller.enqueue(isByob ? new Uint8Array(1) : 'a'), typeErrorPredicate); + controller.error(new Error()); // Never throws + } + additionalAssertions(); + })); +} + +/** + * + * @param {WritableStream} ws + * @param {import('internal/webstreams/writablestream').WritableStreamDefaultWriter} writer + * @param {{ + * controller?: import('internal/webstreams/writablestream').WritableStreamDefaultController, + * additionalAssertions?: () => void, + * }} options + */ +function assertWritableStreamEventuallyAborted(ws, writer, { + controller, + additionalAssertions = common.mustCall(), +} = {}) { + finished(ws, { readable: false }, common.mustCall((err) => { + assert.strictEqual(err.name, 'AbortError'); + assert.rejects(writer.write('a'), /AbortError/).then(common.mustCall()); + assert.rejects(writer.closed, /AbortError/).then(common.mustCall()); + if (controller) { + controller.error(new Error()); // Never throws + assert.strictEqual(controller.signal.aborted, true); + } + additionalAssertions(); + })); } { - const rs = createTestReadableStream(); + const [rs, controller] = createTestReadableStream(); const reader = rs.getReader(); @@ -33,11 +95,7 @@ function createTestWritableStream(values) { addAbortSignal(ac.signal, rs); - finished(rs, common.mustCall((err) => { - assert.strictEqual(err.name, 'AbortError'); - assert.rejects(reader.read(), /AbortError/).then(common.mustCall()); - assert.rejects(reader.closed, /AbortError/).then(common.mustCall()); - })); + assertReadableStreamEventuallyAborted(rs, reader, { controller }); reader.read().then(common.mustCall((result) => { assert.strictEqual(result.value, 'a'); @@ -46,7 +104,7 @@ function createTestWritableStream(values) { } { - const rs = createTestReadableStream(); + const [rs] = createTestReadableStream(); const ac = new AbortController(); @@ -62,9 +120,80 @@ function createTestWritableStream(values) { } { - const rs1 = createTestReadableStream(); + const [rs, controller] = createTestReadableStream(); + const reader = rs.getReader(); + const ac = new AbortController(); + + addAbortSignal(ac.signal, rs); + controller.error = common.mustNotCall( + 'addAbortSignal() must not call an overridden controller.error()'); - const rs2 = createTestReadableStream(); + assertReadableStreamEventuallyAborted(rs, reader); + + reader.read().then(common.mustCall(() => { + ac.abort(); + })); +} + +{ + let controller; + const rs = new ReadableStream({ + type: 'bytes', + start(c) { controller = c; }, + }); + const ac = new AbortController(); + addAbortSignal(ac.signal, rs); + + const reader = rs.getReader({ mode: 'byob' }); + assertReadableStreamEventuallyAborted(rs, reader, { controller, isByob: true }); + + ac.abort(); +} + +{ + /** @member {import('internal/webstreams/readablestream').ReadableByteStreamController} */ + let controller; + const rs = new ReadableStream({ + type: 'bytes', + start(c) { controller = c; }, + }); + const ac = new AbortController(); + + addAbortSignal(ac.signal, rs); + controller.error = common.mustNotCall('addAbortSignal() must not call an overridden controller.error()'); + + const reader = rs.getReader({ mode: 'byob' }); + assertReadableStreamEventuallyAborted(rs, reader, { isByob: true }); + + ac.abort(); +} + +{ + /** @member {import('internal/webstreams/readablestream').ReadableStreamDefaultController} */ + let controller; + let pullPromiseWithResolvers = Promise.withResolvers(); + const rs = new ReadableStream({ + start(c) { controller = c; }, + pull() { return pullPromiseWithResolvers.promise; }, + }); + const ac = new AbortController(); + addAbortSignal(ac.signal, rs); + + const reader = rs.getReader(); + assertReadableStreamEventuallyAborted(rs, reader); + + const readPromise = reader.read(); + pullPromiseWithResolvers.resolve(setImmediate().then(() => { + ac.abort(); + controller.enqueue('a'); + })); + assert.rejects(readPromise, /AbortError/).then(common.mustCall()); + assert.rejects(pullPromiseWithResolvers.promise, typeErrorPredicate).then(common.mustCall()); +} + +{ + const [rs1, controller1] = createTestReadableStream(); + const [rs2, controller2] = createTestReadableStream(); const ac = new AbortController(); @@ -74,23 +203,14 @@ function createTestWritableStream(values) { const reader1 = rs1.getReader(); const reader2 = rs2.getReader(); - finished(rs1, common.mustCall((err) => { - assert.strictEqual(err.name, 'AbortError'); - assert.rejects(reader1.read(), /AbortError/).then(common.mustCall()); - assert.rejects(reader1.closed, /AbortError/).then(common.mustCall()); - })); - - finished(rs2, common.mustCall((err) => { - assert.strictEqual(err.name, 'AbortError'); - assert.rejects(reader2.read(), /AbortError/).then(common.mustCall()); - assert.rejects(reader2.closed, /AbortError/).then(common.mustCall()); - })); + assertReadableStreamEventuallyAborted(rs1, reader1, { controller: controller1 }); + assertReadableStreamEventuallyAborted(rs2, reader2, { controller: controller2 }); ac.abort(); } { - const rs = createTestReadableStream(); + const [rs, controller] = createTestReadableStream(); const { 0: rs1, 1: rs2 } = rs.tee(); @@ -101,24 +221,15 @@ function createTestWritableStream(values) { const reader1 = rs1.getReader(); const reader2 = rs2.getReader(); - finished(rs1, common.mustCall((err) => { - assert.strictEqual(err.name, 'AbortError'); - assert.rejects(reader1.read(), /AbortError/).then(common.mustCall()); - assert.rejects(reader1.closed, /AbortError/).then(common.mustCall()); - })); - - finished(rs2, common.mustCall((err) => { - assert.strictEqual(err.name, 'AbortError'); - assert.rejects(reader2.read(), /AbortError/).then(common.mustCall()); - assert.rejects(reader2.closed, /AbortError/).then(common.mustCall()); - })); + assertReadableStreamEventuallyAborted(rs1, reader1, { controller }); + assertReadableStreamEventuallyAborted(rs2, reader2, { controller }); ac.abort(); } { const values = []; - const ws = createTestWritableStream(values); + const [ws, controller] = createTestWritableStream(values); const ac = new AbortController(); @@ -126,23 +237,22 @@ function createTestWritableStream(values) { const writer = ws.getWriter(); - finished(ws, common.mustCall((err) => { - assert.strictEqual(err.name, 'AbortError'); - assert.deepStrictEqual(values, ['a']); - assert.rejects(writer.write('b'), /AbortError/).then(common.mustCall()); - assert.rejects(writer.closed, /AbortError/).then(common.mustCall()); - })); + assertWritableStreamEventuallyAborted(ws, writer, { + controller, + additionalAssertions: common.mustCall(() => { + assert.deepStrictEqual(values, ['a']); + }), + }); - writer.write('a').then(() => { - ac.abort(); - }).then(common.mustCall()); + writer.write('a') + .then(common.mustCall(() => { ac.abort(); })); } { const values = []; - const ws1 = createTestWritableStream(values); - const ws2 = createTestWritableStream(values); + const [ws1, controller1] = createTestWritableStream(values); + const [ws2, controller2] = createTestWritableStream(values); const ac = new AbortController(); @@ -152,17 +262,29 @@ function createTestWritableStream(values) { const writer1 = ws1.getWriter(); const writer2 = ws2.getWriter(); - finished(ws1, common.mustCall((err) => { - assert.strictEqual(err.name, 'AbortError'); - assert.rejects(writer1.write('a'), /AbortError/).then(common.mustCall()); - assert.rejects(writer1.closed, /AbortError/).then(common.mustCall()); - })); + const additionalAssertions = common.mustCall(() => { + assert.deepStrictEqual(values, []); + }, 2); + assertWritableStreamEventuallyAborted(ws1, writer1, { controller: controller1, additionalAssertions }); + assertWritableStreamEventuallyAborted(ws2, writer2, { controller: controller2, additionalAssertions }); - finished(ws2, common.mustCall((err) => { - assert.strictEqual(err.name, 'AbortError'); - assert.rejects(writer2.write('a'), /AbortError/).then(common.mustCall()); - assert.rejects(writer2.closed, /AbortError/).then(common.mustCall()); - })); + ac.abort(); +} + +{ + /** @member {import('internal/webstreams/writablestream').WritableStreamDefaultController} */ + let controller; + const ws = new WritableStream({ + start(c) { controller = c; }, + }); + const ac = new AbortController(); + addAbortSignal(ac.signal, ws); + + controller.abort = common.mustNotCall('addAbortSignal() must not call an overridden controller.abort()'); + controller.error = common.mustNotCall('addAbortSignal() must not call an overridden controller.error()'); + + const writer = ws.getWriter(); + assertWritableStreamEventuallyAborted(ws, writer); ac.abort(); }