Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions lib/internal/streams/add-abort-signal.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ const {
const {
isNodeStream,
isWebStream,
kControllerErrorFunction,
kControllerAbortFunction,
} = require('internal/streams/utils');

const { eos } = require('internal/streams/end-of-stream');
Expand Down Expand Up @@ -47,7 +47,7 @@ module.exports.addAbortSignalNoValidate = function(signal, stream) {
stream.destroy(new AbortError(undefined, { cause: signal.reason }));
} :
() => {
stream[kControllerErrorFunction](new AbortError(undefined, { cause: signal.reason }));
stream[kControllerAbortFunction](new AbortError(undefined, { cause: signal.reason }));
};
if (signal.aborted) {
onAbort();
Expand Down
4 changes: 2 additions & 2 deletions lib/internal/streams/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ const kIsDisturbed = SymbolFor('nodejs.stream.disturbed');
const kOnConstructed = Symbol('kOnConstructed');

const kIsClosedPromise = SymbolFor('nodejs.webstream.isClosedPromise');
const kControllerErrorFunction = SymbolFor('nodejs.webstream.controllerErrorFunction');
const kControllerAbortFunction = SymbolFor('nodejs.webstream.controllerAbortFunction');

const kState = Symbol('kState');
const kObjectMode = 1 << 0;
Expand Down Expand Up @@ -326,7 +326,7 @@ module.exports = {
isReadable,
kIsReadable,
kIsClosedPromise,
kControllerErrorFunction,
kControllerAbortFunction,
kIsWritable,
isClosed,
isDuplexNodeStream,
Expand Down
9 changes: 6 additions & 3 deletions lib/internal/webstreams/readablestream.js
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ const {
kIsErrored,
kIsReadable,
kIsClosedPromise,
kControllerErrorFunction,
kControllerAbortFunction,
} = require('internal/streams/utils');

const {
Expand Down Expand Up @@ -254,7 +254,7 @@ class ReadableStream {
this[kState] = createReadableStreamState();

this[kIsClosedPromise] = PromiseWithResolvers();
this[kControllerErrorFunction] = () => {};
this[kControllerAbortFunction] = () => {};

// The spec requires handling of the strategy first
// here. Specifically, if getting the size and
Expand Down Expand Up @@ -2540,7 +2540,8 @@ function setupReadableStreamDefaultController(
stream,
};
stream[kState].controller = controller;
stream[kControllerErrorFunction] = FunctionPrototypeBind(controller.error, controller);
stream[kControllerAbortFunction] =
(error) => readableStreamDefaultControllerError(controller, error);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of curiosity: why are we erroring the stream instead of canceling it?

  • A consumer can cancel the stream with readableStreamCancel, and the stream will call the source.cancel callback so it can clean up any resources.
  • With controller.error, there is no so such callback, since the Streams standard assumes that the error was signaled by the source. If we allow a consumer to inject an error into the stream, this may leak resources in the source.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@MattiasBuelens
Should I include that fix here as well?

Copy link
Copy Markdown
Contributor

@MattiasBuelens MattiasBuelens Mar 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, that's probably not a good idea. The addAbortSignal documentation currently states that it calls controller.error, so changing that will be a breaking change... 🤔

Let's stick with the current behavior for now. I'll put up a separate draft PR to discuss if we want to change it or not.

Actually, this PR is already introducing a breaking change for WritableStream... so maybe we should just fix both. 😅

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we want to preserve the current addAbortSignal behavior of leaving the stream in an errored state, but with the proper cleanup through source.cancel(), we could modify readableStreamCancel slightly:

- function readableStreamCancel(stream, reason) {
+ function readableStreamCancel(stream, reason, finalState = 'closed') {
    stream[kState].disturbed = true;
    switch (stream[kState].state) {
      case 'closed':
        return PromiseResolve();
      case 'errored':
        return PromiseReject(stream[kState].storedError);
    }
-   readableStreamClose(stream);
+   if (finalState === 'errored') {
+     readableStreamError(stream, reason);
+   } else {
+     readableStreamClose(stream);
+   }
    const {
      reader,
    } = stream[kState];
    if (reader !== undefined && readableStreamHasBYOBReader(stream)) {
      for (let n = 0; n < reader[kState].readIntoRequests.length; n++)
        reader[kState].readIntoRequests[n][kClose]();
      reader[kState].readIntoRequests = [];
    }
  
    return PromisePrototypeThen(
      stream[kState].controller[kCancel](reason),
      () => {});
  }

We could then define ReadableStream[kControllerAbortFunction] as such:

  [kControllerAbortFunction](reason) {
    if (!isReadableStream(this))
      throw new ERR_INVALID_THIS('ReadableStream');
    setPromiseHandled(readableStreamCancel(this, reason, 'errored'));
  }

That will call source.cancel while still passing all your tests. Best of both worlds? 🙂


const startResult = startAlgorithm();

Expand Down Expand Up @@ -3364,6 +3365,8 @@ function setupReadableByteStreamController(
pendingPullIntos: [],
};
stream[kState].controller = controller;
stream[kControllerAbortFunction] =
(error) => readableByteStreamControllerError(controller, error);

const startResult = startAlgorithm();

Expand Down
8 changes: 5 additions & 3 deletions lib/internal/webstreams/writablestream.js
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ const {
kIsClosedPromise,
kIsErrored,
kIsWritable,
kControllerErrorFunction,
kControllerAbortFunction,
} = require('internal/streams/utils');

const {
Expand Down Expand Up @@ -173,7 +173,7 @@ class WritableStream {
this[kState] = createWritableStreamState();

this[kIsClosedPromise] = PromiseWithResolvers();
this[kControllerErrorFunction] = () => {};
this[kControllerAbortFunction] = () => {};

const size = extractSizeAlgorithm(strategy?.size);
const highWaterMark = extractHighWaterMark(strategy?.highWaterMark, 1);
Expand Down Expand Up @@ -1322,7 +1322,9 @@ function setupWritableStreamDefaultController(
writeAlgorithm,
};
stream[kState].controller = controller;
stream[kControllerErrorFunction] = FunctionPrototypeBind(controller.error, controller);
stream[kControllerAbortFunction] = (reason) => {
setPromiseHandled(writableStreamAbort(stream, reason));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a breaking change. From doc/api/stream.md:

Calling abort on the AbortController corresponding to the passed
AbortSignal will behave the same way as calling .destroy(new AbortError())
on the stream, and controller.error(new AbortError()) for webstreams.

We'll need to at least update the docs to document the new behavior.

};
Comment on lines +1325 to +1327
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Slightly off-topic: should we make this a class method instead? For WritableStream, we could do:

class WritableStream {
  [kControllerAbortFunction](reason) {
    setPromiseHandled(writableStreamAbort(this, reason));
  }
}

For ReadableStream, the method could check the type of this[kState].controller and then call either readableStreamDefaultControllerError or readableByteStreamControllerError.


writableStreamUpdateBackpressure(
stream,
Expand Down
Loading
Loading