From 31f3cd1f7ee3b9980205a5b03db1fb2bb3d0453d Mon Sep 17 00:00:00 2001 From: James M Snell Date: Mon, 23 Mar 2026 11:33:54 -0700 Subject: [PATCH 1/3] stream: make stream.Readable implement the toAsyncStreamable protocol Signed-off-by: James M Snell Assisted-by: Opencode/Opus 4.6 --- doc/api/errors.md | 7 + lib/internal/errors.js | 2 + lib/internal/streams/iter/from.js | 21 +- lib/internal/streams/iter/types.js | 10 + lib/internal/streams/readable.js | 145 ++++ ...t-stream-iter-readable-interop-disabled.js | 44 ++ .../test-stream-iter-readable-interop.js | 640 ++++++++++++++++++ 7 files changed, 867 insertions(+), 2 deletions(-) create mode 100644 test/parallel/test-stream-iter-readable-interop-disabled.js create mode 100644 test/parallel/test-stream-iter-readable-interop.js diff --git a/doc/api/errors.md b/doc/api/errors.md index 65ef2ce7bf5d01..98073d49d62098 100644 --- a/doc/api/errors.md +++ b/doc/api/errors.md @@ -2882,6 +2882,13 @@ An attempt was made to call [`stream.pipe()`][] on a [`Writable`][] stream. A stream method was called that cannot complete because the stream was destroyed using `stream.destroy()`. + + +### `ERR_STREAM_ITER_MISSING_FLAG` + +A stream/iter API was used without the `--experimental-stream-iter` CLI flag +enabled. + ### `ERR_STREAM_NULL_VALUES` diff --git a/lib/internal/errors.js b/lib/internal/errors.js index 7c4728627731fe..206e2a24716022 100644 --- a/lib/internal/errors.js +++ b/lib/internal/errors.js @@ -1770,6 +1770,8 @@ E('ERR_STREAM_ALREADY_FINISHED', Error); E('ERR_STREAM_CANNOT_PIPE', 'Cannot pipe, not readable', Error); E('ERR_STREAM_DESTROYED', 'Cannot call %s after a stream was destroyed', Error); +E('ERR_STREAM_ITER_MISSING_FLAG', + 'The stream/iter API requires the --experimental-stream-iter flag', TypeError); E('ERR_STREAM_NULL_VALUES', 'May not write null values to stream', TypeError); E('ERR_STREAM_PREMATURE_CLOSE', 'Premature close', Error); E('ERR_STREAM_PUSH_AFTER_EOF', 'stream.push() after EOF', Error); diff --git a/lib/internal/streams/iter/from.js b/lib/internal/streams/iter/from.js index d76f430ab0d51e..f117808fa7d3b7 100644 --- a/lib/internal/streams/iter/from.js +++ b/lib/internal/streams/iter/from.js @@ -37,6 +37,7 @@ const { } = require('internal/util/types'); const { + kTrustedSource, toStreamable, toAsyncStreamable, } = require('internal/streams/iter/types'); @@ -483,6 +484,11 @@ function from(input) { throw new ERR_INVALID_ARG_TYPE('input', 'a non-null value', input); } + // Fast path: trusted source already yields valid Uint8Array[] batches + if (input[kTrustedSource]) { + return input; + } + // Check for primitives first (ByteInput) if (isPrimitiveChunk(input)) { const chunk = primitiveToUint8Array(input); @@ -531,11 +537,22 @@ function from(input) { // Check toAsyncStreamable protocol (takes precedence over toStreamable and // iteration protocols) if (typeof input[toAsyncStreamable] === 'function') { + const result = input[toAsyncStreamable](); + // Synchronous trusted source (e.g. Readable batched iterator) + if (result?.[kTrustedSource]) { + return result; + } return { __proto__: null, async *[SymbolAsyncIterator]() { - const result = await input[toAsyncStreamable](); - yield* from(result)[SymbolAsyncIterator](); + // The result may be a Promise. Check trusted on both the Promise + // itself (if tagged) and the resolved value. + const resolved = await result; + if (resolved?.[kTrustedSource]) { + yield* resolved[SymbolAsyncIterator](); + return; + } + yield* from(resolved)[SymbolAsyncIterator](); }, }; } diff --git a/lib/internal/streams/iter/types.js b/lib/internal/streams/iter/types.js index c205db00e3782a..9e528647ca9110 100644 --- a/lib/internal/streams/iter/types.js +++ b/lib/internal/streams/iter/types.js @@ -55,9 +55,19 @@ const drainableProtocol = SymbolFor('Stream.drainableProtocol'); */ const kTrustedTransform = Symbol('kTrustedTransform'); +/** + * Internal sentinel for trusted sources. An async iterable with + * [kTrustedSource] = true signals that it already yields valid + * Uint8Array[] batches - no normalizeAsyncSource wrapper needed. + * from() will return such sources directly, skipping all normalization. + * This is NOT a public protocol symbol - it uses Symbol() not Symbol.for(). + */ +const kTrustedSource = Symbol('kTrustedSource'); + module.exports = { broadcastProtocol, drainableProtocol, + kTrustedSource, kTrustedTransform, shareProtocol, shareSyncProtocol, diff --git a/lib/internal/streams/readable.js b/lib/internal/streams/readable.js index 919c527a2be6f8..d3680743328419 100644 --- a/lib/internal/streams/readable.js +++ b/lib/internal/streams/readable.js @@ -35,6 +35,7 @@ const { Symbol, SymbolAsyncDispose, SymbolAsyncIterator, + SymbolFor, SymbolSpecies, TypedArrayPrototypeSet, } = primordials; @@ -52,6 +53,8 @@ const { } = require('internal/streams/add-abort-signal'); const { eos } = require('internal/streams/end-of-stream'); +const { getOptionValue } = require('internal/options'); + let debug = require('internal/util/debuglog').debuglog('stream', (fn) => { debug = fn; }); @@ -82,6 +85,7 @@ const { ERR_INVALID_ARG_TYPE, ERR_METHOD_NOT_IMPLEMENTED, ERR_OUT_OF_RANGE, + ERR_STREAM_ITER_MISSING_FLAG, ERR_STREAM_PUSH_AFTER_EOF, ERR_STREAM_UNSHIFT_AFTER_END_EVENT, ERR_UNKNOWN_ENCODING, @@ -1796,3 +1800,144 @@ Readable.wrap = function(src, options) { }, }).wrap(src); }; + +// Efficient interop with the stream/iter API via toAsyncStreamable protocol. +// Provides a batched async iterator that drains the internal buffer into +// Uint8Array[] batches, avoiding the per-chunk Promise overhead of the +// standard Symbol.asyncIterator path. +// +// The flag cannot be checked at module load time (readable.js loads during +// bootstrap before options are available). Instead, toAsyncStreamable is +// always defined but lazily initializes on first call - throwing if the +// flag is not set, or installing the real implementation if it is. +{ + const toAsyncStreamable = SymbolFor('Stream.toAsyncStreamable'); + let kTrustedSource; + let normalizeAsyncValue; + let isU8; + + // Maximum chunks to drain into a single batch. Bounds peak memory when + // _read() synchronously pushes many chunks into the buffer. + const MAX_DRAIN_BATCH = 128; + + function lazyInit() { + if (kTrustedSource !== undefined) return; + if (!getOptionValue('--experimental-stream-iter')) { + throw new ERR_STREAM_ITER_MISSING_FLAG(); + } + ({ kTrustedSource } = require('internal/streams/iter/types')); + ({ normalizeAsyncValue } = require('internal/streams/iter/from')); + ({ isUint8Array: isU8 } = require('internal/util/types')); + } + + // Normalize a batch of raw chunks from an object-mode or encoded + // Readable into Uint8Array values. Returns the normalized batch, + // or null if normalization produced no output. + async function normalizeBatch(raw) { + const batch = []; + for (let i = 0; i < raw.length; i++) { + const value = raw[i]; + if (isU8(value)) { + batch.push(value); + } else { + // normalizeAsyncValue may await for async protocols (e.g. + // toAsyncStreamable on yielded objects). Stream events during + // the suspension are queued, not lost - errors will surface + // on the next loop iteration after this yield completes. + for await (const normalized of normalizeAsyncValue(value)) { + batch.push(normalized); + } + } + } + return batch.length > 0 ? batch : null; + } + + // Batched async iterator for Readable streams. Same mechanism as + // createAsyncIterator (same event setup, same stream.read() to + // trigger _read(), same teardown) but drains all currently buffered + // chunks into a single Uint8Array[] batch per yield, amortizing the + // Promise/microtask cost across multiple chunks. + // + // When normalize is provided (object-mode / encoded streams), each + // drained batch is passed through it to convert chunks to Uint8Array. + // When normalize is null (byte-mode), chunks are already Buffers + // (Uint8Array subclass) and are yielded directly. + async function* createBatchedAsyncIterator(stream, normalize) { + let callback = nop; + + function next(resolve) { + if (this === stream) { + callback(); + callback = nop; + } else { + callback = resolve; + } + } + + stream.on('readable', next); + + let error; + const cleanup = eos(stream, { writable: false }, (err) => { + error = err ? aggregateTwoErrors(error, err) : null; + callback(); + callback = nop; + }); + + try { + while (true) { + const chunk = stream.destroyed ? null : stream.read(); + if (chunk !== null) { + // Drain any additional already-buffered chunks into the same + // batch. The first read() may trigger _read() which + // synchronously pushes more data into the buffer. We drain + // that buffered data without issuing unbounded _read() calls - + // once state.length hits 0 or MAX_DRAIN_BATCH is reached, we + // stop and yield what we have. + const batch = [chunk]; + while (batch.length < MAX_DRAIN_BATCH && + stream._readableState.length > 0) { + const c = stream.read(); + if (c === null) break; + batch.push(c); + } + if (normalize !== null) { + const result = await normalize(batch); + if (result !== null) { + yield result; + } + } else { + yield batch; + } + } else if (error) { + throw error; + } else if (error === null) { + return; + } else { + await new Promise(next); + } + } + } catch (err) { + error = aggregateTwoErrors(error, err); + throw error; + } finally { + if (error === undefined || stream._readableState.autoDestroy) { + destroyImpl.destroyer(stream, null); + } else { + stream.off('readable', next); + cleanup(); + } + } + } + + Readable.prototype[toAsyncStreamable] = function() { + lazyInit(); + const state = this._readableState; + const normalize = (state.objectMode || state.encoding) ? + normalizeBatch : + null; + const iter = createBatchedAsyncIterator(this, normalize); + iter[kTrustedSource] = true; + iter.stream = this; + return iter; + }; +} diff --git a/test/parallel/test-stream-iter-readable-interop-disabled.js b/test/parallel/test-stream-iter-readable-interop-disabled.js new file mode 100644 index 00000000000000..794bf2842cfbfa --- /dev/null +++ b/test/parallel/test-stream-iter-readable-interop-disabled.js @@ -0,0 +1,44 @@ +'use strict'; + +// Tests that toAsyncStreamable throws ERR_STREAM_ITER_MISSING_FLAG +// when --experimental-stream-iter is not enabled. + +const common = require('../common'); +const assert = require('assert'); +const { spawnPromisified } = common; + +async function testToAsyncStreamableWithoutFlag() { + const { stderr, code } = await spawnPromisified(process.execPath, [ + '-e', + ` + const { Readable } = require('stream'); + const r = new Readable({ read() {} }); + r[Symbol.for('Stream.toAsyncStreamable')](); + `, + ]); + assert.notStrictEqual(code, 0); + assert.match(stderr, /ERR_STREAM_ITER_MISSING_FLAG/); +} + +async function testToAsyncStreamableWithFlag() { + const { code } = await spawnPromisified(process.execPath, [ + '--experimental-stream-iter', + '-e', + ` + const { Readable } = require('stream'); + const r = new Readable({ + read() { this.push(Buffer.from('ok')); this.push(null); } + }); + const sym = Symbol.for('Stream.toAsyncStreamable'); + const iter = r[sym](); + // Should not throw, and should have stream property + if (!iter.stream) process.exit(1); + `, + ]); + assert.strictEqual(code, 0); +} + +Promise.all([ + testToAsyncStreamableWithoutFlag(), + testToAsyncStreamableWithFlag(), +]).then(common.mustCall()); diff --git a/test/parallel/test-stream-iter-readable-interop.js b/test/parallel/test-stream-iter-readable-interop.js new file mode 100644 index 00000000000000..f201dab4f5642a --- /dev/null +++ b/test/parallel/test-stream-iter-readable-interop.js @@ -0,0 +1,640 @@ +// Flags: --experimental-stream-iter +'use strict'; + +// Tests for classic Readable stream interop with the stream/iter API +// via the toAsyncStreamable protocol and kTrustedSource optimization. + +const common = require('../common'); +const assert = require('assert'); +const { Readable } = require('stream'); +const { + from, + pull, + bytes, + text, +} = require('stream/iter'); + +const toAsyncStreamable = Symbol.for('Stream.toAsyncStreamable'); + +// ============================================================================= +// toAsyncStreamable protocol is present on Readable.prototype +// ============================================================================= + +function testProtocolExists() { + assert.strictEqual(typeof Readable.prototype[toAsyncStreamable], 'function'); + + const readable = new Readable({ read() {} }); + assert.strictEqual(typeof readable[toAsyncStreamable], 'function'); +} + +// ============================================================================= +// Byte-mode Readable: basic round-trip through from() +// ============================================================================= + +async function testByteModeThroughFrom() { + const readable = new Readable({ + read() { + this.push(Buffer.from('hello')); + this.push(Buffer.from(' world')); + this.push(null); + }, + }); + + const result = await text(from(readable)); + assert.strictEqual(result, 'hello world'); +} + +// ============================================================================= +// Byte-mode Readable: basic round-trip through pull() +// ============================================================================= + +async function testByteModeThroughPull() { + const readable = new Readable({ + read() { + this.push(Buffer.from('pull ')); + this.push(Buffer.from('test')); + this.push(null); + }, + }); + + const result = await text(pull(readable)); + assert.strictEqual(result, 'pull test'); +} + +// ============================================================================= +// Byte-mode Readable: bytes consumer +// ============================================================================= + +async function testByteModeBytes() { + const data = Buffer.from('binary data here'); + const readable = new Readable({ + read() { + this.push(data); + this.push(null); + }, + }); + + const result = await bytes(from(readable)); + assert.deepStrictEqual(result, new Uint8Array(data)); +} + +// ============================================================================= +// Byte-mode Readable: batching - multiple buffered chunks yield as one batch +// ============================================================================= + +async function testBatchingBehavior() { + const readable = new Readable({ + read() { + // Push multiple chunks synchronously so they all buffer + for (let i = 0; i < 10; i++) { + this.push(Buffer.from(`chunk${i}`)); + } + this.push(null); + }, + }); + + const source = from(readable); + const batches = []; + for await (const batch of source) { + batches.push(batch); + } + + // All chunks were buffered synchronously, so they should come out + // as fewer batches than individual chunks (ideally one batch). + assert.ok(batches.length < 10, + `Expected fewer batches than chunks, got ${batches.length}`); + + // Total data should be correct + const allChunks = batches.flat(); + const combined = Buffer.concat(allChunks); + let expected = ''; + for (let i = 0; i < 10; i++) { + expected += `chunk${i}`; + } + assert.strictEqual(combined.toString(), expected); +} + +// ============================================================================= +// Byte-mode Readable: kTrustedSource is set +// ============================================================================= + +function testTrustedSourceByteMode() { + const readable = new Readable({ read() {} }); + const result = readable[toAsyncStreamable](); + // kTrustedSource is a private symbol, but we can verify the result + // is used directly by from() without wrapping by checking it has + // Symbol.asyncIterator + assert.strictEqual(typeof result[Symbol.asyncIterator], 'function'); + assert.strictEqual(result.stream, readable); +} + +// ============================================================================= +// Byte-mode Readable: multi-read with delayed pushes +// ============================================================================= + +async function testDelayedPushes() { + let pushCount = 0; + const readable = new Readable({ + read() { + if (pushCount < 3) { + setTimeout(() => { + this.push(Buffer.from(`delayed${pushCount}`)); + pushCount++; + if (pushCount === 3) { + this.push(null); + } + }, 10); + } + }, + }); + + const result = await text(from(readable)); + assert.strictEqual(result, 'delayed0delayed1delayed2'); +} + +// ============================================================================= +// Byte-mode Readable: empty stream +// ============================================================================= + +async function testEmptyStream() { + const readable = new Readable({ + read() { + this.push(null); + }, + }); + + const result = await text(from(readable)); + assert.strictEqual(result, ''); +} + +// ============================================================================= +// Byte-mode Readable: error propagation +// ============================================================================= + +async function testErrorPropagation() { + const readable = new Readable({ + read() { + process.nextTick(() => this.destroy(new Error('test error'))); + }, + }); + + await assert.rejects( + text(from(readable)), + (err) => err.message === 'test error', + ); +} + +// ============================================================================= +// Byte-mode Readable: with transforms +// ============================================================================= + +async function testWithTransform() { + const readable = new Readable({ + read() { + this.push(Buffer.from('hello')); + this.push(null); + }, + }); + + // Uppercase transform + function uppercase(chunks) { + if (chunks === null) return null; + return chunks.map((c) => { + const buf = Buffer.from(c); + for (let i = 0; i < buf.length; i++) { + if (buf[i] >= 97 && buf[i] <= 122) buf[i] -= 32; + } + return buf; + }); + } + + const result = await text(pull(readable, uppercase)); + assert.strictEqual(result, 'HELLO'); +} + +// ============================================================================= +// Object-mode Readable: strings are normalized to Uint8Array +// ============================================================================= + +async function testObjectModeStrings() { + const readable = new Readable({ + objectMode: true, + read() { + this.push('hello'); + this.push(' object'); + this.push(' mode'); + this.push(null); + }, + }); + + const result = await text(from(readable)); + assert.strictEqual(result, 'hello object mode'); +} + +// ============================================================================= +// Object-mode Readable: Uint8Array chunks pass through +// ============================================================================= + +async function testObjectModeUint8Array() { + const readable = new Readable({ + objectMode: true, + read() { + this.push(new Uint8Array([72, 73])); // "HI" + this.push(null); + }, + }); + + const result = await text(from(readable)); + assert.strictEqual(result, 'HI'); +} + +// ============================================================================= +// Object-mode Readable: mixed types (strings + Uint8Array) +// ============================================================================= + +async function testObjectModeMixed() { + const readable = new Readable({ + objectMode: true, + read() { + this.push('hello'); + this.push(Buffer.from(' ')); + this.push('world'); + this.push(null); + }, + }); + + const result = await text(from(readable)); + assert.strictEqual(result, 'hello world'); +} + +// ============================================================================= +// Object-mode Readable: toStreamable protocol objects +// ============================================================================= + +async function testObjectModeToStreamable() { + const toStreamableSym = Symbol.for('Stream.toStreamable'); + const readable = new Readable({ + objectMode: true, + read() { + this.push({ + [toStreamableSym]() { + return 'from-protocol'; + }, + }); + this.push(null); + }, + }); + + const result = await text(from(readable)); + assert.strictEqual(result, 'from-protocol'); +} + +// ============================================================================= +// Object-mode Readable: kTrustedSource is set +// ============================================================================= + +function testTrustedSourceObjectMode() { + const readable = new Readable({ objectMode: true, read() {} }); + const result = readable[toAsyncStreamable](); + assert.strictEqual(typeof result[Symbol.asyncIterator], 'function'); + assert.strictEqual(result.stream, readable); +} + +// ============================================================================= +// Encoded Readable: strings are re-encoded to Uint8Array +// ============================================================================= + +async function testEncodedReadable() { + const readable = new Readable({ + encoding: 'utf8', + read() { + this.push(Buffer.from('encoded')); + this.push(null); + }, + }); + + const result = await text(from(readable)); + assert.strictEqual(result, 'encoded'); +} + +// ============================================================================= +// Readable.from() source: verify interop with Readable.from() +// ============================================================================= + +async function testReadableFrom() { + const readable = Readable.from(['chunk1', 'chunk2', 'chunk3']); + + const result = await text(from(readable)); + assert.strictEqual(result, 'chunk1chunk2chunk3'); +} + +// ============================================================================= +// Byte-mode Readable: large data +// ============================================================================= + +async function testLargeData() { + const totalSize = 1024 * 1024; // 1 MB + const chunkSize = 16384; + let pushed = 0; + + const readable = new Readable({ + read() { + if (pushed < totalSize) { + const size = Math.min(chunkSize, totalSize - pushed); + const buf = Buffer.alloc(size, 0x41); // Fill with 'A' + this.push(buf); + pushed += size; + } else { + this.push(null); + } + }, + }); + + const result = await bytes(from(readable)); + assert.strictEqual(result.length, totalSize); + assert.strictEqual(result[0], 0x41); + assert.strictEqual(result[totalSize - 1], 0x41); +} + +// ============================================================================= +// Byte-mode Readable: consumer return (early termination) +// ============================================================================= + +async function testEarlyTermination() { + let pushCount = 0; + const readable = new Readable({ + read() { + this.push(Buffer.from(`chunk${pushCount++}`)); + // Never pushes null - infinite stream + }, + }); + + // Take only the first batch + const source = from(readable); + const batches = []; + for await (const batch of source) { + batches.push(batch); + break; // Stop after first batch + } + + assert.ok(batches.length >= 1); + // Stream should be destroyed after consumer return + // Give it a tick to clean up + await new Promise((resolve) => setTimeout(resolve, 50)); + assert.ok(readable.destroyed); +} + +// ============================================================================= +// Byte-mode Readable: pull() with compression transform +// ============================================================================= + +async function testWithCompression() { + const { + compressGzip, + decompressGzip, + } = require('zlib/iter'); + + const readable = new Readable({ + read() { + this.push(Buffer.from('compress me via classic Readable')); + this.push(null); + }, + }); + + const compressed = pull(readable, compressGzip()); + const result = await text(pull(compressed, decompressGzip())); + assert.strictEqual(result, 'compress me via classic Readable'); +} + +// ============================================================================= +// Object-mode Readable: error propagation +// ============================================================================= + +async function testObjectModeError() { + const readable = new Readable({ + objectMode: true, + read() { + process.nextTick(() => this.destroy(new Error('object error'))); + }, + }); + + await assert.rejects( + text(from(readable)), + (err) => err.message === 'object error', + ); +} + +// ============================================================================= +// Stream destroyed mid-iteration +// ============================================================================= + +async function testDestroyMidIteration() { + let pushCount = 0; + const readable = new Readable({ + read() { + this.push(Buffer.from(`chunk${pushCount++}`)); + // Never pushes null - infinite stream + }, + }); + + const chunks = []; + await assert.rejects(async () => { + for await (const batch of from(readable)) { + chunks.push(...batch); + if (chunks.length >= 3) { + readable.destroy(); + } + } + }, { code: 'ERR_STREAM_PREMATURE_CLOSE' }); + assert.ok(readable.destroyed); + assert.ok(chunks.length >= 3); +} + +// ============================================================================= +// Error after partial data +// ============================================================================= + +async function testErrorAfterPartialData() { + let count = 0; + const readable = new Readable({ + read() { + if (count < 3) { + this.push(Buffer.from(`ok${count++}`)); + } else { + this.destroy(new Error('late error')); + } + }, + }); + + const chunks = []; + await assert.rejects(async () => { + for await (const batch of from(readable)) { + chunks.push(...batch); + } + }, { message: 'late error' }); + assert.ok(chunks.length > 0, 'Should have received partial data'); +} + +// ============================================================================= +// Multiple consumers (second iteration yields empty) +// ============================================================================= + +async function testMultipleConsumers() { + const readable = new Readable({ + read() { + this.push(Buffer.from('data')); + this.push(null); + }, + }); + + const first = await text(from(readable)); + assert.strictEqual(first, 'data'); + + // Second consumption - stream is already consumed/destroyed + const second = await text(from(readable)); + assert.strictEqual(second, ''); +} + +// ============================================================================= +// highWaterMark: 0 - each chunk becomes its own batch +// ============================================================================= + +async function testHighWaterMarkZero() { + let pushCount = 0; + const readable = new Readable({ + highWaterMark: 0, + read() { + if (pushCount < 3) { + this.push(Buffer.from(`hwm0-${pushCount++}`)); + } else { + this.push(null); + } + }, + }); + + const batches = []; + for await (const batch of from(readable)) { + batches.push(batch); + } + + // With HWM=0, buffer is always empty so drain loop never fires. + // Each chunk should be its own batch. + assert.strictEqual(batches.length, 3); + const combined = Buffer.concat(batches.flat()); + assert.strictEqual(combined.toString(), 'hwm0-0hwm0-1hwm0-2'); +} + +// ============================================================================= +// Duplex stream (Duplex extends Readable, toAsyncStreamable should work) +// ============================================================================= + +async function testDuplexStream() { + const { Duplex } = require('stream'); + + const duplex = new Duplex({ + read() { + this.push(Buffer.from('duplex-data')); + this.push(null); + }, + write(chunk, enc, cb) { cb(); }, + }); + + const result = await text(from(duplex)); + assert.strictEqual(result, 'duplex-data'); +} + +// ============================================================================= +// setEncoding called dynamically after construction +// ============================================================================= + +async function testSetEncodingDynamic() { + const readable = new Readable({ + read() { + this.push(Buffer.from('dynamic-enc')); + this.push(null); + }, + }); + + readable.setEncoding('utf8'); + + const result = await text(from(readable)); + assert.strictEqual(result, 'dynamic-enc'); +} + +// ============================================================================= +// AbortSignal cancellation +// ============================================================================= + +async function testAbortSignal() { + let pushCount = 0; + const readable = new Readable({ + read() { + this.push(Buffer.from(`sig${pushCount++}`)); + // Never pushes null - infinite stream + }, + }); + + const ac = new AbortController(); + const chunks = []; + + await assert.rejects(async () => { + for await (const batch of pull(readable, { signal: ac.signal })) { + chunks.push(...batch); + if (chunks.length >= 2) { + ac.abort(); + } + } + }, { name: 'AbortError' }); + assert.ok(chunks.length >= 2); +} + +// ============================================================================= +// kTrustedSource identity - from() returns same object for trusted sources +// ============================================================================= + +function testTrustedSourceIdentity() { + const readable = new Readable({ read() {} }); + const iter = readable[toAsyncStreamable](); + + // from() should return the trusted iterator directly (same reference), + // not wrap it in another generator + const result = from(iter); + assert.strictEqual(result, iter); +} + +// ============================================================================= +// Run all tests +// ============================================================================= + +testProtocolExists(); +testTrustedSourceByteMode(); +testTrustedSourceObjectMode(); +testTrustedSourceIdentity(); + +Promise.all([ + testByteModeThroughFrom(), + testByteModeThroughPull(), + testByteModeBytes(), + testBatchingBehavior(), + testDelayedPushes(), + testEmptyStream(), + testErrorPropagation(), + testWithTransform(), + testObjectModeStrings(), + testObjectModeUint8Array(), + testObjectModeMixed(), + testObjectModeToStreamable(), + testEncodedReadable(), + testReadableFrom(), + testLargeData(), + testEarlyTermination(), + testWithCompression(), + testObjectModeError(), + testDestroyMidIteration(), + testErrorAfterPartialData(), + testMultipleConsumers(), + testHighWaterMarkZero(), + testDuplexStream(), + testSetEncodingDynamic(), + testAbortSignal(), +]).then(common.mustCall()); From 702109b1a5b13fd240ed9a9e55b296b6589f9215 Mon Sep 17 00:00:00 2001 From: James M Snell Date: Mon, 23 Mar 2026 13:51:35 -0700 Subject: [PATCH 2/3] stream: add fromStreamIter/fromStreamIterSync to stream.Readable Signed-off-by: James M Snell Assisted-by: Opencode/Opus 4.6 --- doc/api/stream.md | 157 +++++ lib/internal/streams/readable.js | 137 ++++ ...t-stream-iter-readable-interop-disabled.js | 28 + test/parallel/test-stream-iter-to-readable.js | 618 ++++++++++++++++++ 4 files changed, 940 insertions(+) create mode 100644 test/parallel/test-stream-iter-to-readable.js diff --git a/doc/api/stream.md b/doc/api/stream.md index 65afeaad6306e0..6f046137f4ab75 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -1998,6 +1998,61 @@ option. In the code example above, data will be in a single chunk if the file has less then 64 KiB of data because no `highWaterMark` option is provided to [`fs.createReadStream()`][]. +##### `readable[Symbol.for('Stream.toAsyncStreamable')]()` + + + +> Stability: 1 - Experimental + +* Returns: {AsyncIterable} An `AsyncIterable` that yields + batched chunks from the stream. + +When the `--experimental-stream-iter` flag is enabled, `Readable` streams +implement the [`Stream.toAsyncStreamable`][] protocol, enabling efficient +consumption by the [`stream/iter`][] API. + +This provides a batched async iterator that drains the stream's internal +buffer into `Uint8Array[]` batches, amortizing the per-chunk Promise overhead +of the standard `Symbol.asyncIterator` path. For byte-mode streams, chunks +are yielded directly as `Buffer` instances (which are `Uint8Array` subclasses). +For object-mode or encoded streams, each chunk is normalized to `Uint8Array` +before batching. + +The returned iterator is tagged as a trusted source, so [`from()`][stream-iter-from] +passes it through without additional normalization. + +```mjs +import { Readable } from 'node:stream'; +import { text, from } from 'node:stream/iter'; + +const readable = new Readable({ + read() { this.push('hello'); this.push(null); }, +}); + +// Readable is automatically consumed via toAsyncStreamable +console.log(await text(from(readable))); // 'hello' +``` + +```cjs +const { Readable } = require('node:stream'); +const { text, from } = require('node:stream/iter'); + +async function run() { + const readable = new Readable({ + read() { this.push('hello'); this.push(null); }, + }); + + console.log(await text(from(readable))); // 'hello' +} + +run().catch(console.error); +``` + +Without the `--experimental-stream-iter` flag, calling this method throws +[`ERR_STREAM_ITER_MISSING_FLAG`][]. + ##### `readable[Symbol.asyncDispose]()` + +> Stability: 1 - Experimental + +* `source` {AsyncIterable} An `AsyncIterable` source, such as + the return value of [`pull()`][] or [`from()`][stream-iter-from]. +* `options` {Object} + * `highWaterMark` {number} The internal buffer size in bytes before + backpressure is applied. **Default:** `65536` (64 KB). + * `signal` {AbortSignal} An optional signal that can be used to abort + the readable, destroying the stream and cleaning up the source iterator. +* Returns: {stream.Readable} + +Creates a byte-mode {stream.Readable} from an `AsyncIterable` +(the native batch format used by the [`stream/iter`][] API). Each +`Uint8Array` in a yielded batch is pushed as a separate chunk into the +Readable. + +This method requires the `--experimental-stream-iter` CLI flag. + +```mjs +import { Readable } from 'node:stream'; +import { createWriteStream } from 'node:fs'; +import { from, pull } from 'node:stream/iter'; +import { compressGzip } from 'node:zlib/iter'; + +// Bridge a stream/iter pipeline to a classic Readable +const source = pull(from('hello world'), compressGzip()); +const readable = Readable.fromStreamIter(source); + +readable.pipe(createWriteStream('output.gz')); +``` + +```cjs +const { Readable } = require('node:stream'); +const { createWriteStream } = require('node:fs'); +const { from, pull } = require('node:stream/iter'); +const { compressGzip } = require('node:zlib/iter'); + +const source = pull(from('hello world'), compressGzip()); +const readable = Readable.fromStreamIter(source); + +readable.pipe(createWriteStream('output.gz')); +``` + +### `stream.Readable.fromStreamIterSync(source[, options])` + + + +> Stability: 1 - Experimental + +* `source` {Iterable} An `Iterable` source, such as the + return value of [`pullSync()`][] or [`fromSync()`][]. +* `options` {Object} + * `highWaterMark` {number} The internal buffer size in bytes before + backpressure is applied. **Default:** `65536` (64 KB). +* Returns: {stream.Readable} + +Creates a byte-mode {stream.Readable} from a synchronous +`Iterable` (the native batch format used by the +[`stream/iter`][] sync API). Each `Uint8Array` in a yielded batch is +pushed as a separate chunk into the Readable. + +The `_read()` method pulls from the iterator synchronously, so data is +available immediately via `readable.read()` without waiting for async +callbacks. + +This method requires the `--experimental-stream-iter` CLI flag. + +```mjs +import { Readable } from 'node:stream'; +import { fromSync } from 'node:stream/iter'; + +const source = fromSync('hello world'); +const readable = Readable.fromStreamIterSync(source); + +console.log(readable.read().toString()); // 'hello world' +``` + +```cjs +const { Readable } = require('node:stream'); +const { fromSync } = require('node:stream/iter'); + +const source = fromSync('hello world'); +const readable = Readable.fromStreamIterSync(source); + +console.log(readable.read().toString()); // 'hello world' +``` + ### `stream.Readable.fromWeb(readableStream[, options])` + +> Stability: 1 - Experimental + +* `options` {Object} + * `backpressure` {string} Backpressure policy. One of `'strict'` (default), + `'block'`, or `'drop-newest'`. See below for details. +* Returns: {Object} A [`stream/iter` Writer][stream-iter-writer] adapter. + +When the `--experimental-stream-iter` flag is enabled, returns an adapter +object that conforms to the [`stream/iter`][] Writer interface, allowing the +`Writable` to be used as a destination in the iterable streams API. + +Since all writes on a classic `stream.Writable` are fundamentally +asynchronous, the synchronous methods (`writeSync`, `writevSync`, `endSync`) +always return `false` or `-1`, deferring to the async path. The per-write +`options.signal` parameter from the Writer interface is also ignored; classic +`stream.Writable` has no per-write abort signal support, so cancellation +should be handled at the pipeline level. + +**Backpressure policies:** + +* `'strict'` (default) — writes are rejected with `ERR_INVALID_STATE` when + the buffer is full (`writableLength >= writableHighWaterMark`). This catches + callers that ignore backpressure. +* `'block'` — writes wait for the `'drain'` event when the buffer is full. + This matches classic `stream.Writable` behavior and is the recommended + policy when using [`pipeTo()`][stream-iter-pipeto]. +* `'drop-newest'` — writes are silently discarded when the buffer is full. + The data is not written to the underlying resource, but `writer.end()` + still reports the total bytes (including dropped bytes) for consistency + with the Writer spec. +* `'drop-oldest'` — **not supported**. Classic `stream.Writable` does not + provide an API to evict already-buffered data without risking partial + eviction of atomic `writev()` batches. Passing this value throws + `ERR_INVALID_ARG_VALUE`. + +The adapter maps: + +* `writer.write(chunk)` — calls `writable.write(chunk)`, subject to the + backpressure policy. +* `writer.writev(chunks)` — corks the writable, writes all chunks, then + uncorks. Subject to the backpressure policy. +* `writer.end()` — calls `writable.end()` and resolves with total bytes + written when the `'finish'` event fires. +* `writer.fail(reason)` — calls `writable.destroy(reason)`. +* `writer.desiredSize` — returns the available buffer space + (`writableHighWaterMark - writableLength`), or `null` if the stream + is destroyed or finished. + +```mjs +import { Writable } from 'node:stream'; +import { from, pipeTo } from 'node:stream/iter'; + +const chunks = []; +const writable = new Writable({ + write(chunk, encoding, cb) { chunks.push(chunk); cb(); }, +}); + +// Use 'block' policy with pipeTo for classic backpressure behavior +await pipeTo(from('hello world'), + writable.toStreamIterWriter({ backpressure: 'block' })); +``` + +```cjs +const { Writable } = require('node:stream'); +const { from, pipeTo } = require('node:stream/iter'); + +async function run() { + const chunks = []; + const writable = new Writable({ + write(chunk, encoding, cb) { chunks.push(chunk); cb(); }, + }); + + await pipeTo(from('hello world'), + writable.toStreamIterWriter({ backpressure: 'block' })); +} + +run().catch(console.error); +``` + +Without the `--experimental-stream-iter` flag, calling this method throws +[`ERR_STREAM_ITER_MISSING_FLAG`][]. + ##### `writable[Symbol.asyncDispose]()` + +> Stability: 1 - Experimental + +* `writer` {Object} A [`stream/iter`][] Writer. Only the `write()` method is + required; `end()`, `fail()`, `writeSync()`, `writevSync()`, `endSync()`, + and `writev()` are optional. +* Returns: {stream.Writable} + +When the `--experimental-stream-iter` flag is enabled, creates a classic +`stream.Writable` backed by a [`stream/iter` Writer][stream-iter-writer]. + +Each `_write()` / `_writev()` call attempts the Writer's synchronous method +first (`writeSync` / `writevSync`), falling back to the async method if the +sync path returns `false`. Similarly, `_final()` tries `endSync()` before +`end()`. When the sync path succeeds, the callback is deferred via +`queueMicrotask` to preserve the async resolution contract that Writable +internals expect. + +* `_write(chunk, encoding, cb)` — tries `writer.writeSync(bytes)`, falls + back to `await writer.write(bytes)`. +* `_writev(entries, cb)` — tries `writer.writevSync(chunks)`, falls + back to `await writer.writev(chunks)`. Only defined if `writer.writev` + exists. +* `_final(cb)` — tries `writer.endSync()`, falls back to + `await writer.end()`. +* `_destroy(err, cb)` — calls `writer.fail(err)`. + +```mjs +import { Writable } from 'node:stream'; +import { push, from, pipeTo } from 'node:stream/iter'; + +const { writer, readable } = push(); +const writable = Writable.fromStreamIter(writer); + +writable.write('hello'); +writable.end(); +``` + +```cjs +const { Writable } = require('node:stream'); +const { push, from, pipeTo } = require('node:stream/iter'); + +const { writer, readable } = push(); +const writable = Writable.fromStreamIter(writer); + +writable.write('hello'); +writable.end(); +``` + +This method requires the `--experimental-stream-iter` CLI flag. + ### `stream.Writable.fromWeb(writableStream[, options])`