Skip to content

Commit 2785bd3

Browse files
committed
stream: validate fromWritable() options before cache
Validate options before returning a cached fromWritable() adapter so invalid later options still throw. Cache adapters by backpressure policy as well as Writable instance, since the policy changes write behavior. Fixes: #63277 Signed-off-by: Kamat, Trivikram <16024985+trivikr@users.noreply.github.com> Assisted-by: openai:gpt-5.5
1 parent a37c61d commit 2785bd3

3 files changed

Lines changed: 61 additions & 7 deletions

File tree

doc/api/stream_iter.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1543,8 +1543,9 @@ the synchronous Writer methods (`writeSync`, `writevSync`, `endSync`) always
15431543
return `false` or `-1`, deferring to the async path. The per-write
15441544
`options.signal` parameter from the Writer interface is also ignored.
15451545

1546-
The result is cached per instance -- calling `fromWritable()` twice with the
1547-
same stream returns the same Writer.
1546+
The result is cached per instance and backpressure policy -- calling
1547+
`fromWritable()` twice with the same stream and `backpressure` option returns
1548+
the same Writer.
15481549

15491550
For duck-typed streams that do not expose `writableHighWaterMark`,
15501551
`writableLength`, or similar properties, sensible defaults are used.

lib/internal/streams/iter/classic.js

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ const {
2121
PromiseReject,
2222
PromiseResolve,
2323
PromiseWithResolvers,
24+
SafeMap,
2425
SafeWeakMap,
2526
SymbolAsyncDispose,
2627
SymbolAsyncIterator,
@@ -413,10 +414,6 @@ function fromWritable(writable, options = kNullPrototype) {
413414
throw new ERR_INVALID_ARG_TYPE('writable', 'Writable', writable);
414415
}
415416

416-
// Return cached adapter if available.
417-
const cached = fromWritableCache.get(writable);
418-
if (cached !== undefined) return cached;
419-
420417
validateObject(options, 'options');
421418
const {
422419
backpressure = 'strict',
@@ -445,6 +442,17 @@ function fromWritable(writable, options = kNullPrototype) {
445442
'drop-oldest is not supported for classic stream.Writable');
446443
}
447444

445+
// Return cached adapter if available. Backpressure policy changes writer
446+
// behavior, so cache one adapter per policy.
447+
let cachedByBackpressure = fromWritableCache.get(writable);
448+
if (cachedByBackpressure !== undefined) {
449+
const cached = cachedByBackpressure.get(backpressure);
450+
if (cached !== undefined) return cached;
451+
} else {
452+
cachedByBackpressure = new SafeMap();
453+
fromWritableCache.set(writable, cachedByBackpressure);
454+
}
455+
448456
// Fall back to sensible defaults for duck-typed streams that may not
449457
// expose the full stream.Writable property set.
450458
const hwm = writable.writableHighWaterMark ?? 16384;
@@ -696,7 +704,7 @@ function fromWritable(writable, options = kNullPrototype) {
696704
return promise;
697705
};
698706

699-
fromWritableCache.set(writable, writer);
707+
cachedByBackpressure.set(backpressure, writer);
700708
return writer;
701709
}
702710

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
// Flags: --experimental-stream-iter
2+
'use strict';
3+
4+
const common = require('../common');
5+
const assert = require('assert');
6+
const { Writable } = require('stream');
7+
const { fromWritable } = require('stream/iter');
8+
9+
{
10+
const writable = new Writable({ write() {} });
11+
12+
fromWritable(writable);
13+
14+
assert.throws(
15+
() => fromWritable(writable, { backpressure: 'invalid' }),
16+
{ code: 'ERR_INVALID_ARG_VALUE' },
17+
);
18+
19+
writable.destroy();
20+
}
21+
22+
async function testCachedWritableUsesLaterBackpressureOptions() {
23+
const chunks = [];
24+
const writable = new Writable({
25+
highWaterMark: 1,
26+
write(chunk, encoding, callback) {
27+
chunks.push(Buffer.from(chunk));
28+
},
29+
});
30+
31+
fromWritable(writable);
32+
const writer = fromWritable(writable, { backpressure: 'drop-newest' });
33+
34+
await writer.write('a');
35+
await writer.write('b');
36+
37+
assert.deepStrictEqual(
38+
chunks.map((chunk) => chunk.toString()),
39+
['a'],
40+
);
41+
42+
writable.destroy();
43+
}
44+
45+
testCachedWritableUsesLaterBackpressureOptions().then(common.mustCall());

0 commit comments

Comments
 (0)