Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions doc/api/errors.md
Original file line number Diff line number Diff line change
Expand Up @@ -2882,6 +2882,13 @@ An attempt was made to call [`stream.pipe()`][] on a [`Writable`][] stream.
A stream method was called that cannot complete because the stream was
destroyed using `stream.destroy()`.

<a id="ERR_STREAM_ITER_MISSING_FLAG"></a>

### `ERR_STREAM_ITER_MISSING_FLAG`

A stream/iter API was used without the `--experimental-stream-iter` CLI flag
enabled.

<a id="ERR_STREAM_NULL_VALUES"></a>

### `ERR_STREAM_NULL_VALUES`
Expand Down
303 changes: 303 additions & 0 deletions doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -979,6 +979,94 @@ added: v12.3.0

Getter for the property `objectMode` of a given `Writable` stream.

##### `writable.toStreamIterWriter([options])`

<!-- YAML
added: REPLACEME
-->

> Stability: 1 - Experimental

* `options` {Object}
* `backpressure` {string} Backpressure policy. One of `'strict'` (default),
`'block'`, or `'drop-newest'`. See below for details.
* Returns: {Object} A [`stream/iter` Writer][stream-iter-writer] adapter.

When the `--experimental-stream-iter` flag is enabled, returns an adapter
object that conforms to the [`stream/iter`][] Writer interface, allowing the
`Writable` to be used as a destination in the iterable streams API.

Since all writes on a classic `stream.Writable` are fundamentally
asynchronous, the synchronous methods (`writeSync`, `writevSync`, `endSync`)
always return `false` or `-1`, deferring to the async path. The per-write
`options.signal` parameter from the Writer interface is also ignored; classic
`stream.Writable` has no per-write abort signal support, so cancellation
should be handled at the pipeline level.

**Backpressure policies:**

* `'strict'` (default) — writes are rejected with `ERR_INVALID_STATE` when
the buffer is full (`writableLength >= writableHighWaterMark`). This catches
callers that ignore backpressure.
* `'block'` — writes wait for the `'drain'` event when the buffer is full.
This matches classic `stream.Writable` behavior and is the recommended
policy when using [`pipeTo()`][stream-iter-pipeto].
* `'drop-newest'` — writes are silently discarded when the buffer is full.
The data is not written to the underlying resource, but `writer.end()`
still reports the total bytes (including dropped bytes) for consistency
with the Writer spec.
* `'drop-oldest'` — **not supported**. Classic `stream.Writable` does not
provide an API to evict already-buffered data without risking partial
eviction of atomic `writev()` batches. Passing this value throws
`ERR_INVALID_ARG_VALUE`.

The adapter maps:

* `writer.write(chunk)` — calls `writable.write(chunk)`, subject to the
backpressure policy.
* `writer.writev(chunks)` — corks the writable, writes all chunks, then
uncorks. Subject to the backpressure policy.
* `writer.end()` — calls `writable.end()` and resolves with total bytes
written when the `'finish'` event fires.
* `writer.fail(reason)` — calls `writable.destroy(reason)`.
* `writer.desiredSize` — returns the available buffer space
(`writableHighWaterMark - writableLength`), or `null` if the stream
is destroyed or finished.

```mjs
import { Writable } from 'node:stream';
import { from, pipeTo } from 'node:stream/iter';

const chunks = [];
const writable = new Writable({
write(chunk, encoding, cb) { chunks.push(chunk); cb(); },
});

// Use 'block' policy with pipeTo for classic backpressure behavior
await pipeTo(from('hello world'),
writable.toStreamIterWriter({ backpressure: 'block' }));
```

```cjs
const { Writable } = require('node:stream');
const { from, pipeTo } = require('node:stream/iter');

async function run() {
const chunks = [];
const writable = new Writable({
write(chunk, encoding, cb) { chunks.push(chunk); cb(); },
});

await pipeTo(from('hello world'),
writable.toStreamIterWriter({ backpressure: 'block' }));
}

run().catch(console.error);
```

Without the `--experimental-stream-iter` flag, calling this method throws
[`ERR_STREAM_ITER_MISSING_FLAG`][].

##### `writable[Symbol.asyncDispose]()`

<!-- YAML
Expand Down Expand Up @@ -1998,6 +2086,61 @@ option. In the code example above, data will be in a single chunk if the file
has less then 64 KiB of data because no `highWaterMark` option is provided to
[`fs.createReadStream()`][].

##### `readable[Symbol.for('Stream.toAsyncStreamable')]()`

<!-- YAML
added: REPLACEME
-->

> Stability: 1 - Experimental

* Returns: {AsyncIterable} An `AsyncIterable<Uint8Array[]>` that yields
batched chunks from the stream.

When the `--experimental-stream-iter` flag is enabled, `Readable` streams
implement the [`Stream.toAsyncStreamable`][] protocol, enabling efficient
consumption by the [`stream/iter`][] API.

This provides a batched async iterator that drains the stream's internal
buffer into `Uint8Array[]` batches, amortizing the per-chunk Promise overhead
of the standard `Symbol.asyncIterator` path. For byte-mode streams, chunks
are yielded directly as `Buffer` instances (which are `Uint8Array` subclasses).
For object-mode or encoded streams, each chunk is normalized to `Uint8Array`
before batching.

The returned iterator is tagged as a trusted source, so [`from()`][stream-iter-from]
passes it through without additional normalization.

```mjs
import { Readable } from 'node:stream';
import { text, from } from 'node:stream/iter';

const readable = new Readable({
read() { this.push('hello'); this.push(null); },
});

// Readable is automatically consumed via toAsyncStreamable
console.log(await text(from(readable))); // 'hello'
```

```cjs
const { Readable } = require('node:stream');
const { text, from } = require('node:stream/iter');

async function run() {
const readable = new Readable({
read() { this.push('hello'); this.push(null); },
});

console.log(await text(from(readable))); // 'hello'
}

run().catch(console.error);
```

Without the `--experimental-stream-iter` flag, calling this method throws
[`ERR_STREAM_ITER_MISSING_FLAG`][].

##### `readable[Symbol.asyncDispose]()`

<!-- YAML
Expand Down Expand Up @@ -3152,6 +3295,101 @@ Readable.from([
]);
```

### `stream.Readable.fromStreamIter(source[, options])`

<!-- YAML
added: REPLACEME
-->

> Stability: 1 - Experimental

* `source` {AsyncIterable} An `AsyncIterable<Uint8Array[]>` source, such as
the return value of [`pull()`][] or [`from()`][stream-iter-from].
* `options` {Object}
* `highWaterMark` {number} The internal buffer size in bytes before
backpressure is applied. **Default:** `65536` (64 KB).
* `signal` {AbortSignal} An optional signal that can be used to abort
the readable, destroying the stream and cleaning up the source iterator.
* Returns: {stream.Readable}

Creates a byte-mode {stream.Readable} from an `AsyncIterable<Uint8Array[]>`
(the native batch format used by the [`stream/iter`][] API). Each
`Uint8Array` in a yielded batch is pushed as a separate chunk into the
Readable.

This method requires the `--experimental-stream-iter` CLI flag.

```mjs
import { Readable } from 'node:stream';
import { createWriteStream } from 'node:fs';
import { from, pull } from 'node:stream/iter';
import { compressGzip } from 'node:zlib/iter';

// Bridge a stream/iter pipeline to a classic Readable
const source = pull(from('hello world'), compressGzip());
const readable = Readable.fromStreamIter(source);

readable.pipe(createWriteStream('output.gz'));
```

```cjs
const { Readable } = require('node:stream');
const { createWriteStream } = require('node:fs');
const { from, pull } = require('node:stream/iter');
const { compressGzip } = require('node:zlib/iter');

const source = pull(from('hello world'), compressGzip());
const readable = Readable.fromStreamIter(source);

readable.pipe(createWriteStream('output.gz'));
```

### `stream.Readable.fromStreamIterSync(source[, options])`

<!-- YAML
added: REPLACEME
-->

> Stability: 1 - Experimental

* `source` {Iterable} An `Iterable<Uint8Array[]>` source, such as the
return value of [`pullSync()`][] or [`fromSync()`][].
* `options` {Object}
* `highWaterMark` {number} The internal buffer size in bytes before
backpressure is applied. **Default:** `65536` (64 KB).
* Returns: {stream.Readable}

Creates a byte-mode {stream.Readable} from a synchronous
`Iterable<Uint8Array[]>` (the native batch format used by the
[`stream/iter`][] sync API). Each `Uint8Array` in a yielded batch is
pushed as a separate chunk into the Readable.

The `_read()` method pulls from the iterator synchronously, so data is
available immediately via `readable.read()` without waiting for async
callbacks.

This method requires the `--experimental-stream-iter` CLI flag.

```mjs
import { Readable } from 'node:stream';
import { fromSync } from 'node:stream/iter';

const source = fromSync('hello world');
const readable = Readable.fromStreamIterSync(source);

console.log(readable.read().toString()); // 'hello world'
```

```cjs
const { Readable } = require('node:stream');
const { fromSync } = require('node:stream/iter');

const source = fromSync('hello world');
const readable = Readable.fromStreamIterSync(source);

console.log(readable.read().toString()); // 'hello world'
```

### `stream.Readable.fromWeb(readableStream[, options])`

<!-- YAML
Expand Down Expand Up @@ -3225,6 +3463,62 @@ changes:
`'bytes'` or undefined.
* Returns: {ReadableStream}

### `stream.Writable.fromStreamIter(writer)`

<!-- YAML
added: REPLACEME
-->

> Stability: 1 - Experimental

* `writer` {Object} A [`stream/iter`][] Writer. Only the `write()` method is
required; `end()`, `fail()`, `writeSync()`, `writevSync()`, `endSync()`,
and `writev()` are optional.
* Returns: {stream.Writable}

When the `--experimental-stream-iter` flag is enabled, creates a classic
`stream.Writable` backed by a [`stream/iter` Writer][stream-iter-writer].

Each `_write()` / `_writev()` call attempts the Writer's synchronous method
first (`writeSync` / `writevSync`), falling back to the async method if the
sync path returns `false`. Similarly, `_final()` tries `endSync()` before
`end()`. When the sync path succeeds, the callback is deferred via
`queueMicrotask` to preserve the async resolution contract that Writable
internals expect.

* `_write(chunk, encoding, cb)` — tries `writer.writeSync(bytes)`, falls
back to `await writer.write(bytes)`.
* `_writev(entries, cb)` — tries `writer.writevSync(chunks)`, falls
back to `await writer.writev(chunks)`. Only defined if `writer.writev`
exists.
* `_final(cb)` — tries `writer.endSync()`, falls back to
`await writer.end()`.
* `_destroy(err, cb)` — calls `writer.fail(err)`.

```mjs
import { Writable } from 'node:stream';
import { push, from, pipeTo } from 'node:stream/iter';

const { writer, readable } = push();
const writable = Writable.fromStreamIter(writer);

writable.write('hello');
writable.end();
```

```cjs
const { Writable } = require('node:stream');
const { push, from, pipeTo } = require('node:stream/iter');

const { writer, readable } = push();
const writable = Writable.fromStreamIter(writer);

writable.write('hello');
writable.end();
```

This method requires the `--experimental-stream-iter` CLI flag.

### `stream.Writable.fromWeb(writableStream[, options])`

<!-- YAML
Expand Down Expand Up @@ -4997,17 +5291,22 @@ contain multi-byte characters.
[`'finish'`]: #event-finish
[`'readable'`]: #event-readable
[`Duplex`]: #class-streamduplex
[`ERR_STREAM_ITER_MISSING_FLAG`]: errors.md#err_stream_iter_missing_flag
[`EventEmitter`]: events.md#class-eventemitter
[`Readable`]: #class-streamreadable
[`Stream.toAsyncStreamable`]: stream_iter.md#streamtoasyncstreamable
[`Symbol.hasInstance`]: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Symbol/hasInstance
[`Transform`]: #class-streamtransform
[`Writable`]: #class-streamwritable
[`fromSync()`]: stream_iter.md#fromsyncinput
[`fs.createReadStream()`]: fs.md#fscreatereadstreampath-options
[`fs.createWriteStream()`]: fs.md#fscreatewritestreampath-options
[`net.Socket`]: net.md#class-netsocket
[`process.stderr`]: process.md#processstderr
[`process.stdin`]: process.md#processstdin
[`process.stdout`]: process.md#processstdout
[`pull()`]: stream_iter.md#pullsource-transforms-options
[`pullSync()`]: stream_iter.md#pullsyncsource-transforms-options
[`readable._read()`]: #readable_readsize
[`readable.compose(stream)`]: #readablecomposestream-options
[`readable.map`]: #readablemapfn-options
Expand All @@ -5024,6 +5323,7 @@ contain multi-byte characters.
[`stream.uncork()`]: #writableuncork
[`stream.unpipe()`]: #readableunpipedestination
[`stream.wrap()`]: #readablewrapstream
[`stream/iter`]: stream_iter.md
[`writable._final()`]: #writable_finalcallback
[`writable._write()`]: #writable_writechunk-encoding-callback
[`writable._writev()`]: #writable_writevchunks-callback
Expand Down Expand Up @@ -5052,6 +5352,9 @@ contain multi-byte characters.
[stream-end]: #writableendchunk-encoding-callback
[stream-finished]: #streamfinishedstream-options-callback
[stream-finished-promise]: #streamfinishedstream-options
[stream-iter-from]: stream_iter.md#frominput
[stream-iter-pipeto]: stream_iter.md#pipetosource-transforms-writer
[stream-iter-writer]: stream_iter.md#writer-interface
[stream-pause]: #readablepause
[stream-pipeline]: #streampipelinesource-transforms-destination-callback
[stream-pipeline-promise]: #streampipelinesource-transforms-destination-options
Expand Down
2 changes: 2 additions & 0 deletions lib/internal/errors.js
Original file line number Diff line number Diff line change
Expand Up @@ -1770,6 +1770,8 @@ E('ERR_STREAM_ALREADY_FINISHED',
Error);
E('ERR_STREAM_CANNOT_PIPE', 'Cannot pipe, not readable', Error);
E('ERR_STREAM_DESTROYED', 'Cannot call %s after a stream was destroyed', Error);
E('ERR_STREAM_ITER_MISSING_FLAG',
'The stream/iter API requires the --experimental-stream-iter flag', TypeError);
E('ERR_STREAM_NULL_VALUES', 'May not write null values to stream', TypeError);
E('ERR_STREAM_PREMATURE_CLOSE', 'Premature close', Error);
E('ERR_STREAM_PUSH_AFTER_EOF', 'stream.push() after EOF', Error);
Expand Down
Loading
Loading