Skip to content

Commit ac82063

Browse files
committed
stream: use internal abort hooks for web streams
1 parent 53bcd11 commit ac82063

File tree

4 files changed

+15
-10
lines changed

4 files changed

+15
-10
lines changed

lib/internal/streams/add-abort-signal.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ const {
1414
const {
1515
isNodeStream,
1616
isWebStream,
17-
kControllerErrorFunction,
17+
kControllerAbortFunction,
1818
} = require('internal/streams/utils');
1919

2020
const { eos } = require('internal/streams/end-of-stream');
@@ -47,7 +47,7 @@ module.exports.addAbortSignalNoValidate = function(signal, stream) {
4747
stream.destroy(new AbortError(undefined, { cause: signal.reason }));
4848
} :
4949
() => {
50-
stream[kControllerErrorFunction](new AbortError(undefined, { cause: signal.reason }));
50+
stream[kControllerAbortFunction](new AbortError(undefined, { cause: signal.reason }));
5151
};
5252
if (signal.aborted) {
5353
onAbort();

lib/internal/streams/utils.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ const kIsDisturbed = SymbolFor('nodejs.stream.disturbed');
2020
const kOnConstructed = Symbol('kOnConstructed');
2121

2222
const kIsClosedPromise = SymbolFor('nodejs.webstream.isClosedPromise');
23-
const kControllerErrorFunction = SymbolFor('nodejs.webstream.controllerErrorFunction');
23+
const kControllerAbortFunction = SymbolFor('nodejs.webstream.controllerAbortFunction');
2424

2525
const kState = Symbol('kState');
2626
const kObjectMode = 1 << 0;
@@ -326,7 +326,7 @@ module.exports = {
326326
isReadable,
327327
kIsReadable,
328328
kIsClosedPromise,
329-
kControllerErrorFunction,
329+
kControllerAbortFunction,
330330
kIsWritable,
331331
isClosed,
332332
isDuplexNodeStream,

lib/internal/webstreams/readablestream.js

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ const {
8686
kIsErrored,
8787
kIsReadable,
8888
kIsClosedPromise,
89-
kControllerErrorFunction,
89+
kControllerAbortFunction,
9090
} = require('internal/streams/utils');
9191

9292
const {
@@ -254,7 +254,7 @@ class ReadableStream {
254254
this[kState] = createReadableStreamState();
255255

256256
this[kIsClosedPromise] = PromiseWithResolvers();
257-
this[kControllerErrorFunction] = () => {};
257+
this[kControllerAbortFunction] = () => {};
258258

259259
// The spec requires handling of the strategy first
260260
// here. Specifically, if getting the size and
@@ -2540,7 +2540,8 @@ function setupReadableStreamDefaultController(
25402540
stream,
25412541
};
25422542
stream[kState].controller = controller;
2543-
stream[kControllerErrorFunction] = FunctionPrototypeBind(controller.error, controller);
2543+
stream[kControllerAbortFunction] =
2544+
(error) => readableStreamDefaultControllerError(controller, error);
25442545

25452546
const startResult = startAlgorithm();
25462547

@@ -3364,6 +3365,8 @@ function setupReadableByteStreamController(
33643365
pendingPullIntos: [],
33653366
};
33663367
stream[kState].controller = controller;
3368+
stream[kControllerAbortFunction] =
3369+
(error) => readableByteStreamControllerError(controller, error);
33673370

33683371
const startResult = startAlgorithm();
33693372

lib/internal/webstreams/writablestream.js

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ const {
7878
kIsClosedPromise,
7979
kIsErrored,
8080
kIsWritable,
81-
kControllerErrorFunction,
81+
kControllerAbortFunction,
8282
} = require('internal/streams/utils');
8383

8484
const {
@@ -173,7 +173,7 @@ class WritableStream {
173173
this[kState] = createWritableStreamState();
174174

175175
this[kIsClosedPromise] = PromiseWithResolvers();
176-
this[kControllerErrorFunction] = () => {};
176+
this[kControllerAbortFunction] = () => {};
177177

178178
const size = extractSizeAlgorithm(strategy?.size);
179179
const highWaterMark = extractHighWaterMark(strategy?.highWaterMark, 1);
@@ -1322,7 +1322,9 @@ function setupWritableStreamDefaultController(
13221322
writeAlgorithm,
13231323
};
13241324
stream[kState].controller = controller;
1325-
stream[kControllerErrorFunction] = FunctionPrototypeBind(controller.error, controller);
1325+
stream[kControllerAbortFunction] = (reason) => {
1326+
setPromiseHandled(writableStreamAbort(stream, reason));
1327+
};
13261328

13271329
writableStreamUpdateBackpressure(
13281330
stream,

0 commit comments

Comments
 (0)