From 5e0474e3570cc2f7859952ae4255e363c7221c3c Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Fri, 13 Feb 2026 10:16:24 +0100 Subject: [PATCH] stream: add fast paths for webstreams read and pipeTo Add internal fast paths to improve webstreams performance without changing the public API or breaking spec compliance. 1. ReadableStreamDefaultReader.read() fast path: When data is already buffered in the controller's queue, return PromiseResolve() directly without creating a DefaultReadRequest object. This is spec-compliant because read() returns a Promise, and resolved promises still run callbacks in the microtask queue. 2. pipeTo() batch read fast path: When data is buffered, batch reads directly from the controller queue up to highWaterMark without creating PipeToReadableStreamReadRequest objects per chunk. Respects backpressure by checking desiredSize after each write. Benchmark results: - pipeTo: ~11% faster (***) - buffered read(): ~17-20% faster (***) Co-Authored-By: Malte Ubl --- .../webstreams/readable-read-buffered.js | 54 ++++++++++++ lib/internal/webstreams/readablestream.js | 83 +++++++++++++++++++ 2 files changed, 137 insertions(+) create mode 100644 benchmark/webstreams/readable-read-buffered.js diff --git a/benchmark/webstreams/readable-read-buffered.js b/benchmark/webstreams/readable-read-buffered.js new file mode 100644 index 00000000000000..653d5577ef1e95 --- /dev/null +++ b/benchmark/webstreams/readable-read-buffered.js @@ -0,0 +1,54 @@ +'use strict'; +const common = require('../common.js'); +const { ReadableStream } = require('node:stream/web'); + +// Benchmark for reading from a pre-buffered ReadableStream. +// This measures the fast path optimization where data is already +// queued in the controller, avoiding DefaultReadRequest allocation. + +const bench = common.createBenchmark(main, { + n: [1e5], + bufferSize: [1, 10, 100, 1000], +}); + +async function main({ n, bufferSize }) { + let enqueued = 0; + + const rs = new ReadableStream({ + start(controller) { + // Pre-fill the buffer + for (let i = 0; i < bufferSize; i++) { + controller.enqueue('a'); + enqueued++; + } + }, + pull(controller) { + // Refill buffer when pulled + const toEnqueue = Math.min(bufferSize, n - enqueued); + for (let i = 0; i < toEnqueue; i++) { + controller.enqueue('a'); + enqueued++; + } + if (enqueued >= n) { + controller.close(); + } + }, + }, { + // Use buffer size as high water mark to allow pre-buffering + highWaterMark: bufferSize, + }); + + const reader = rs.getReader(); + let x = null; + let reads = 0; + + bench.start(); + while (reads < n) { + const { value, done } = await reader.read(); + if (done) break; + x = value; + reads++; + } + bench.end(reads); + console.assert(x); +} diff --git a/lib/internal/webstreams/readablestream.js b/lib/internal/webstreams/readablestream.js index f9b9e6b4fb2c3e..fbafc9b2b945de 100644 --- a/lib/internal/webstreams/readablestream.js +++ b/lib/internal/webstreams/readablestream.js @@ -860,6 +860,31 @@ class ReadableStreamDefaultReader { new ERR_INVALID_STATE.TypeError( 'The reader is not attached to a stream')); } + + const stream = this[kState].stream; + const controller = stream[kState].controller; + + // Fast path: if data is already buffered in a default controller, + // return a resolved promise immediately without creating a read request. + // This is spec-compliant because read() returns a Promise, and + // Promise.resolve() callbacks still run in the microtask queue. + if (stream[kState].state === 'readable' && + isReadableStreamDefaultController(controller) && + controller[kState].queue.length > 0) { + stream[kState].disturbed = true; + const chunk = dequeueValue(controller); + + if (controller[kState].closeRequested && !controller[kState].queue.length) { + readableStreamDefaultControllerClearAlgorithms(controller); + readableStreamClose(stream); + } else { + readableStreamDefaultControllerCallPullIfNeeded(controller); + } + + return PromiseResolve({ value: chunk, done: false }); + } + + // Slow path: create request and go through normal flow const readRequest = new DefaultReadRequest(); readableStreamDefaultReaderRead(this, readRequest); return readRequest.promise; @@ -1286,6 +1311,8 @@ const isReadableStream = isBrandCheck('ReadableStream'); const isReadableByteStreamController = isBrandCheck('ReadableByteStreamController'); +const isReadableStreamDefaultController = + isBrandCheck('ReadableStreamDefaultController'); const isReadableStreamBYOBRequest = isBrandCheck('ReadableStreamBYOBRequest'); const isReadableStreamDefaultReader = @@ -1510,6 +1537,62 @@ function readableStreamPipeTo( await writer[kState].ready.promise; + const controller = source[kState].controller; + + // Fast path: batch reads when data is buffered in a default controller. + // This avoids creating PipeToReadableStreamReadRequest objects and + // reduces promise allocation overhead. + if (source[kState].state === 'readable' && + isReadableStreamDefaultController(controller) && + controller[kState].queue.length > 0) { + let batchCount = 0; + const hwm = controller[kState].highWaterMark || 1; + + while (controller[kState].queue.length > 0 && batchCount < hwm) { + if (shuttingDown) return true; + + source[kState].disturbed = true; + const chunk = dequeueValue(controller); + + if (controller[kState].closeRequested && !controller[kState].queue.length) { + readableStreamDefaultControllerClearAlgorithms(controller); + readableStreamClose(source); + } + + // Write the chunk - we're already in a separate microtask from enqueue + // because we awaited writer[kState].ready.promise above + state.currentWrite = writableStreamDefaultWriterWrite(writer, chunk); + setPromiseHandled(state.currentWrite); + + batchCount++; + + // Check backpressure after each write + if (writer[kState].ready.promise !== undefined && + dest[kState].state === 'writable') { + const desiredSize = writer.desiredSize; + if (desiredSize !== null && desiredSize <= 0) { + // Backpressure - stop batch and wait for ready + break; + } + } + } + + // Trigger pull if needed after batch + if (source[kState].state === 'readable' && + !controller[kState].closeRequested) { + readableStreamDefaultControllerCallPullIfNeeded(controller); + } + + // Check if stream closed during batch + if (source[kState].state === 'closed') { + return true; + } + + // Yield to microtask queue between batches to allow events/signals to fire + return false; + } + + // Slow path: use read request for async reads const promise = PromiseWithResolvers(); // eslint-disable-next-line no-use-before-define readableStreamDefaultReaderRead(reader, new PipeToReadableStreamReadRequest(writer, state, promise));