diff --git a/doc/api/errors.md b/doc/api/errors.md index 65ef2ce7bf5d01..98073d49d62098 100644 --- a/doc/api/errors.md +++ b/doc/api/errors.md @@ -2882,6 +2882,13 @@ An attempt was made to call [`stream.pipe()`][] on a [`Writable`][] stream. A stream method was called that cannot complete because the stream was destroyed using `stream.destroy()`. + + +### `ERR_STREAM_ITER_MISSING_FLAG` + +A stream/iter API was used without the `--experimental-stream-iter` CLI flag +enabled. + ### `ERR_STREAM_NULL_VALUES` diff --git a/doc/api/stream.md b/doc/api/stream.md index 65afeaad6306e0..7e9d31e82675d8 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -979,6 +979,94 @@ added: v12.3.0 Getter for the property `objectMode` of a given `Writable` stream. +##### `writable.toStreamIterWriter([options])` + + + +> Stability: 1 - Experimental + +* `options` {Object} + * `backpressure` {string} Backpressure policy. One of `'strict'` (default), + `'block'`, or `'drop-newest'`. See below for details. +* Returns: {Object} A [`stream/iter` Writer][stream-iter-writer] adapter. + +When the `--experimental-stream-iter` flag is enabled, returns an adapter +object that conforms to the [`stream/iter`][] Writer interface, allowing the +`Writable` to be used as a destination in the iterable streams API. + +Since all writes on a classic `stream.Writable` are fundamentally +asynchronous, the synchronous methods (`writeSync`, `writevSync`, `endSync`) +always return `false` or `-1`, deferring to the async path. The per-write +`options.signal` parameter from the Writer interface is also ignored; classic +`stream.Writable` has no per-write abort signal support, so cancellation +should be handled at the pipeline level. + +**Backpressure policies:** + +* `'strict'` (default) — writes are rejected with `ERR_INVALID_STATE` when + the buffer is full (`writableLength >= writableHighWaterMark`). This catches + callers that ignore backpressure. +* `'block'` — writes wait for the `'drain'` event when the buffer is full. + This matches classic `stream.Writable` behavior and is the recommended + policy when using [`pipeTo()`][stream-iter-pipeto]. +* `'drop-newest'` — writes are silently discarded when the buffer is full. + The data is not written to the underlying resource, but `writer.end()` + still reports the total bytes (including dropped bytes) for consistency + with the Writer spec. +* `'drop-oldest'` — **not supported**. Classic `stream.Writable` does not + provide an API to evict already-buffered data without risking partial + eviction of atomic `writev()` batches. Passing this value throws + `ERR_INVALID_ARG_VALUE`. + +The adapter maps: + +* `writer.write(chunk)` — calls `writable.write(chunk)`, subject to the + backpressure policy. +* `writer.writev(chunks)` — corks the writable, writes all chunks, then + uncorks. Subject to the backpressure policy. +* `writer.end()` — calls `writable.end()` and resolves with total bytes + written when the `'finish'` event fires. +* `writer.fail(reason)` — calls `writable.destroy(reason)`. +* `writer.desiredSize` — returns the available buffer space + (`writableHighWaterMark - writableLength`), or `null` if the stream + is destroyed or finished. + +```mjs +import { Writable } from 'node:stream'; +import { from, pipeTo } from 'node:stream/iter'; + +const chunks = []; +const writable = new Writable({ + write(chunk, encoding, cb) { chunks.push(chunk); cb(); }, +}); + +// Use 'block' policy with pipeTo for classic backpressure behavior +await pipeTo(from('hello world'), + writable.toStreamIterWriter({ backpressure: 'block' })); +``` + +```cjs +const { Writable } = require('node:stream'); +const { from, pipeTo } = require('node:stream/iter'); + +async function run() { + const chunks = []; + const writable = new Writable({ + write(chunk, encoding, cb) { chunks.push(chunk); cb(); }, + }); + + await pipeTo(from('hello world'), + writable.toStreamIterWriter({ backpressure: 'block' })); +} + +run().catch(console.error); +``` + +Without the `--experimental-stream-iter` flag, calling this method throws +[`ERR_STREAM_ITER_MISSING_FLAG`][]. + ##### `writable[Symbol.asyncDispose]()` + +> Stability: 1 - Experimental + +* Returns: {AsyncIterable} An `AsyncIterable` that yields + batched chunks from the stream. + +When the `--experimental-stream-iter` flag is enabled, `Readable` streams +implement the [`Stream.toAsyncStreamable`][] protocol, enabling efficient +consumption by the [`stream/iter`][] API. + +This provides a batched async iterator that drains the stream's internal +buffer into `Uint8Array[]` batches, amortizing the per-chunk Promise overhead +of the standard `Symbol.asyncIterator` path. For byte-mode streams, chunks +are yielded directly as `Buffer` instances (which are `Uint8Array` subclasses). +For object-mode or encoded streams, each chunk is normalized to `Uint8Array` +before batching. + +The returned iterator is tagged as a trusted source, so [`from()`][stream-iter-from] +passes it through without additional normalization. + +```mjs +import { Readable } from 'node:stream'; +import { text, from } from 'node:stream/iter'; + +const readable = new Readable({ + read() { this.push('hello'); this.push(null); }, +}); + +// Readable is automatically consumed via toAsyncStreamable +console.log(await text(from(readable))); // 'hello' +``` + +```cjs +const { Readable } = require('node:stream'); +const { text, from } = require('node:stream/iter'); + +async function run() { + const readable = new Readable({ + read() { this.push('hello'); this.push(null); }, + }); + + console.log(await text(from(readable))); // 'hello' +} + +run().catch(console.error); +``` + +Without the `--experimental-stream-iter` flag, calling this method throws +[`ERR_STREAM_ITER_MISSING_FLAG`][]. + ##### `readable[Symbol.asyncDispose]()` + +> Stability: 1 - Experimental + +* `source` {AsyncIterable} An `AsyncIterable` source, such as + the return value of [`pull()`][] or [`from()`][stream-iter-from]. +* `options` {Object} + * `highWaterMark` {number} The internal buffer size in bytes before + backpressure is applied. **Default:** `65536` (64 KB). + * `signal` {AbortSignal} An optional signal that can be used to abort + the readable, destroying the stream and cleaning up the source iterator. +* Returns: {stream.Readable} + +Creates a byte-mode {stream.Readable} from an `AsyncIterable` +(the native batch format used by the [`stream/iter`][] API). Each +`Uint8Array` in a yielded batch is pushed as a separate chunk into the +Readable. + +This method requires the `--experimental-stream-iter` CLI flag. + +```mjs +import { Readable } from 'node:stream'; +import { createWriteStream } from 'node:fs'; +import { from, pull } from 'node:stream/iter'; +import { compressGzip } from 'node:zlib/iter'; + +// Bridge a stream/iter pipeline to a classic Readable +const source = pull(from('hello world'), compressGzip()); +const readable = Readable.fromStreamIter(source); + +readable.pipe(createWriteStream('output.gz')); +``` + +```cjs +const { Readable } = require('node:stream'); +const { createWriteStream } = require('node:fs'); +const { from, pull } = require('node:stream/iter'); +const { compressGzip } = require('node:zlib/iter'); + +const source = pull(from('hello world'), compressGzip()); +const readable = Readable.fromStreamIter(source); + +readable.pipe(createWriteStream('output.gz')); +``` + +### `stream.Readable.fromStreamIterSync(source[, options])` + + + +> Stability: 1 - Experimental + +* `source` {Iterable} An `Iterable` source, such as the + return value of [`pullSync()`][] or [`fromSync()`][]. +* `options` {Object} + * `highWaterMark` {number} The internal buffer size in bytes before + backpressure is applied. **Default:** `65536` (64 KB). +* Returns: {stream.Readable} + +Creates a byte-mode {stream.Readable} from a synchronous +`Iterable` (the native batch format used by the +[`stream/iter`][] sync API). Each `Uint8Array` in a yielded batch is +pushed as a separate chunk into the Readable. + +The `_read()` method pulls from the iterator synchronously, so data is +available immediately via `readable.read()` without waiting for async +callbacks. + +This method requires the `--experimental-stream-iter` CLI flag. + +```mjs +import { Readable } from 'node:stream'; +import { fromSync } from 'node:stream/iter'; + +const source = fromSync('hello world'); +const readable = Readable.fromStreamIterSync(source); + +console.log(readable.read().toString()); // 'hello world' +``` + +```cjs +const { Readable } = require('node:stream'); +const { fromSync } = require('node:stream/iter'); + +const source = fromSync('hello world'); +const readable = Readable.fromStreamIterSync(source); + +console.log(readable.read().toString()); // 'hello world' +``` + ### `stream.Readable.fromWeb(readableStream[, options])` + +> Stability: 1 - Experimental + +* `writer` {Object} A [`stream/iter`][] Writer. Only the `write()` method is + required; `end()`, `fail()`, `writeSync()`, `writevSync()`, `endSync()`, + and `writev()` are optional. +* Returns: {stream.Writable} + +When the `--experimental-stream-iter` flag is enabled, creates a classic +`stream.Writable` backed by a [`stream/iter` Writer][stream-iter-writer]. + +Each `_write()` / `_writev()` call attempts the Writer's synchronous method +first (`writeSync` / `writevSync`), falling back to the async method if the +sync path returns `false`. Similarly, `_final()` tries `endSync()` before +`end()`. When the sync path succeeds, the callback is deferred via +`queueMicrotask` to preserve the async resolution contract that Writable +internals expect. + +* `_write(chunk, encoding, cb)` — tries `writer.writeSync(bytes)`, falls + back to `await writer.write(bytes)`. +* `_writev(entries, cb)` — tries `writer.writevSync(chunks)`, falls + back to `await writer.writev(chunks)`. Only defined if `writer.writev` + exists. +* `_final(cb)` — tries `writer.endSync()`, falls back to + `await writer.end()`. +* `_destroy(err, cb)` — calls `writer.fail(err)`. + +```mjs +import { Writable } from 'node:stream'; +import { push, from, pipeTo } from 'node:stream/iter'; + +const { writer, readable } = push(); +const writable = Writable.fromStreamIter(writer); + +writable.write('hello'); +writable.end(); +``` + +```cjs +const { Writable } = require('node:stream'); +const { push, from, pipeTo } = require('node:stream/iter'); + +const { writer, readable } = push(); +const writable = Writable.fromStreamIter(writer); + +writable.write('hello'); +writable.end(); +``` + +This method requires the `--experimental-stream-iter` CLI flag. + ### `stream.Writable.fromWeb(writableStream[, options])`