From fc3ad92c82324ec4a57029048db199eda2232154 Mon Sep 17 00:00:00 2001 From: "Kamat, Trivikram" <16024985+trivikr@users.noreply.github.com> Date: Mon, 11 May 2026 22:31:56 -0700 Subject: [PATCH 1/4] stream: cache minimum cursor count in share Track the number of consumers at the cached minimum cursor in share() so the minimum is only recomputed when the last consumer at that cursor advances or detaches. This avoids scanning every consumer on each trim attempt when multiple consumers advance through a shared buffer. Signed-off-by: Kamat, Trivikram <16024985+trivikr@users.noreply.github.com> Assisted-by: openai:gpt-5.5 --- benchmark/streams/iter-throughput-share.js | 32 +++++ lib/internal/streams/iter/broadcast.js | 3 +- lib/internal/streams/iter/share.js | 142 +++++++++++++++++---- lib/internal/streams/iter/utils.js | 20 +-- 4 files changed, 160 insertions(+), 37 deletions(-) create mode 100644 benchmark/streams/iter-throughput-share.js diff --git a/benchmark/streams/iter-throughput-share.js b/benchmark/streams/iter-throughput-share.js new file mode 100644 index 00000000000000..98ac439bf4098f --- /dev/null +++ b/benchmark/streams/iter-throughput-share.js @@ -0,0 +1,32 @@ +'use strict'; + +const common = require('../common.js'); + +const bench = common.createBenchmark(main, { + consumers: [8, 32, 128], + batches: [1e4], + backpressure: ['block'], + n: [5], +}, { + flags: ['--experimental-stream-iter'], +}); + +async function main({ consumers, batches, backpressure, n }) { + const { share, array } = require('stream/iter'); + const chunk = Buffer.alloc(1024); + const totalOps = batches * consumers * n; + + async function* source() { + for (let i = 0; i < batches; i++) { + yield [chunk]; + } + } + + bench.start(); + for (let i = 0; i < n; i++) { + const shared = share(source(), { highWaterMark: 64, backpressure }); + const readers = Array.from({ length: consumers }, () => array(shared.pull())); + await Promise.all(readers); + } + bench.end(totalOps); +} diff --git a/lib/internal/streams/iter/broadcast.js b/lib/internal/streams/iter/broadcast.js index 769f93b5404c30..b3926413f31a9e 100644 --- a/lib/internal/streams/iter/broadcast.js +++ b/lib/internal/streams/iter/broadcast.js @@ -343,8 +343,9 @@ class BroadcastImpl { // Private methods #recomputeMinCursor() { - this.#cachedMinCursor = getMinCursor( + const [minCursor] = getMinCursor( this.#consumers, this.#bufferStart + this.#buffer.length); + this.#cachedMinCursor = minCursor; this.#minCursorDirty = false; } diff --git a/lib/internal/streams/iter/share.js b/lib/internal/streams/iter/share.js index 752c0bfcbcab8f..af2cc44fd16f39 100644 --- a/lib/internal/streams/iter/share.js +++ b/lib/internal/streams/iter/share.js @@ -77,6 +77,8 @@ class ShareImpl { #cancelled = false; #pulling = false; #pullWaiters = []; + #cachedMinCursor = 0; + #cachedMinCursorConsumers = 0; constructor(source, options) { this.#source = source; @@ -114,6 +116,14 @@ class ShareImpl { }; this.#consumers.add(state); + if (this.#consumers.size === 1) { + this.#cachedMinCursor = state.cursor; + this.#cachedMinCursorConsumers = 1; + } else if (state.cursor === this.#cachedMinCursor) { + this.#cachedMinCursorConsumers++; + } else { + this.#recomputeMinCursor(); + } const self = this; return { @@ -139,7 +149,7 @@ class ShareImpl { if (self.#cancelled) { state.detached = true; - self.#consumers.delete(state); + self.#deleteConsumer(state); return { __proto__: null, done: true, value: undefined }; } @@ -147,14 +157,18 @@ class ShareImpl { const bufferIndex = state.cursor - self.#bufferStart; if (bufferIndex < self.#buffer.length) { const chunk = self.#buffer.get(bufferIndex); + const cursor = state.cursor; state.cursor++; - self.#tryTrimBuffer(); + if (cursor === self.#cachedMinCursor && + --self.#cachedMinCursorConsumers === 0) { + self.#tryTrimBuffer(); + } return { __proto__: null, done: false, value: chunk }; } if (self.#sourceExhausted) { state.detached = true; - self.#consumers.delete(state); + self.#deleteConsumer(state); if (self.#sourceError) throw self.#sourceError; return { __proto__: null, done: true, value: undefined }; } @@ -163,7 +177,7 @@ class ShareImpl { const canPull = await self.#waitForBufferSpace(); if (!canPull) { state.detached = true; - self.#consumers.delete(state); + self.#deleteConsumer(state); if (self.#sourceError) throw self.#sourceError; return { __proto__: null, done: true, value: undefined }; } @@ -176,8 +190,9 @@ class ShareImpl { state.detached = true; state.resolve = null; state.reject = null; - self.#consumers.delete(state); - self.#tryTrimBuffer(); + if (self.#deleteConsumer(state)) { + self.#tryTrimBuffer(); + } return { __proto__: null, done: true, value: undefined }; }, @@ -185,8 +200,9 @@ class ShareImpl { state.detached = true; state.resolve = null; state.reject = null; - self.#consumers.delete(state); - self.#tryTrimBuffer(); + if (self.#deleteConsumer(state)) { + self.#tryTrimBuffer(); + } return { __proto__: null, done: true, value: undefined }; }, }; @@ -254,9 +270,11 @@ class ShareImpl { this.#bufferStart++; for (const consumer of this.#consumers) { if (consumer.cursor < this.#bufferStart) { + this.#deleteConsumerFromMin(consumer); consumer.cursor = this.#bufferStart; } } + this.#recomputeMinCursor(); return true; case 'drop-newest': return true; @@ -324,18 +342,41 @@ class ShareImpl { } #tryTrimBuffer() { - const minCursor = getMinCursor( - this.#consumers, this.#bufferStart + this.#buffer.length); - const trimCount = minCursor - this.#bufferStart; + if (this.#cachedMinCursorConsumers === 0) { + this.#recomputeMinCursor(); + } + const trimCount = this.#cachedMinCursor - this.#bufferStart; if (trimCount > 0) { this.#buffer.trimFront(trimCount); - this.#bufferStart = minCursor; + this.#bufferStart = this.#cachedMinCursor; for (let i = 0; i < this.#pullWaiters.length; i++) { this.#pullWaiters[i](); } this.#pullWaiters = []; } } + + #recomputeMinCursor() { + const [minCursor, minCursorConsumers] = getMinCursor( + this.#consumers, this.#bufferStart + this.#buffer.length); + this.#cachedMinCursor = minCursor; + this.#cachedMinCursorConsumers = minCursorConsumers; + } + + #deleteConsumerFromMin(consumer) { + if (consumer.cursor === this.#cachedMinCursor) { + this.#cachedMinCursorConsumers--; + } + } + + #deleteConsumer(consumer) { + if (this.#consumers.delete(consumer)) { + const wasAtMin = consumer.cursor === this.#cachedMinCursor; + this.#deleteConsumerFromMin(consumer); + return wasAtMin && this.#cachedMinCursorConsumers === 0; + } + return false; + } } // ============================================================================= @@ -352,6 +393,8 @@ class SyncShareImpl { #sourceExhausted = false; #sourceError = null; #cancelled = false; + #cachedMinCursor = 0; + #cachedMinCursorConsumers = 0; constructor(source, options) { this.#source = source; @@ -383,6 +426,14 @@ class SyncShareImpl { }; this.#consumers.add(state); + if (this.#consumers.size === 1) { + this.#cachedMinCursor = state.cursor; + this.#cachedMinCursorConsumers = 1; + } else if (state.cursor === this.#cachedMinCursor) { + this.#cachedMinCursorConsumers++; + } else { + this.#recomputeMinCursor(); + } const self = this; return { @@ -396,26 +447,30 @@ class SyncShareImpl { } if (self.#sourceError) { state.detached = true; - self.#consumers.delete(state); + self.#deleteConsumer(state); throw self.#sourceError; } if (self.#cancelled) { state.detached = true; - self.#consumers.delete(state); + self.#deleteConsumer(state); return { __proto__: null, done: true, value: undefined }; } const bufferIndex = state.cursor - self.#bufferStart; if (bufferIndex < self.#buffer.length) { const chunk = self.#buffer.get(bufferIndex); + const cursor = state.cursor; state.cursor++; - self.#tryTrimBuffer(); + if (cursor === self.#cachedMinCursor && + --self.#cachedMinCursorConsumers === 0) { + self.#tryTrimBuffer(); + } return { __proto__: null, done: false, value: chunk }; } if (self.#sourceExhausted) { state.detached = true; - self.#consumers.delete(state); + self.#deleteConsumer(state); return { __proto__: null, done: true, value: undefined }; } @@ -436,13 +491,15 @@ class SyncShareImpl { self.#bufferStart++; for (const consumer of self.#consumers) { if (consumer.cursor < self.#bufferStart) { + self.#deleteConsumerFromMin(consumer); consumer.cursor = self.#bufferStart; } } + self.#recomputeMinCursor(); break; case 'drop-newest': state.detached = true; - self.#consumers.delete(state); + self.#deleteConsumer(state); return { __proto__: null, done: true, value: undefined }; } } @@ -451,21 +508,25 @@ class SyncShareImpl { if (self.#sourceError) { state.detached = true; - self.#consumers.delete(state); + self.#deleteConsumer(state); throw self.#sourceError; } const newBufferIndex = state.cursor - self.#bufferStart; if (newBufferIndex < self.#buffer.length) { const chunk = self.#buffer.get(newBufferIndex); + const cursor = state.cursor; state.cursor++; - self.#tryTrimBuffer(); + if (cursor === self.#cachedMinCursor && + --self.#cachedMinCursorConsumers === 0) { + self.#tryTrimBuffer(); + } return { __proto__: null, done: false, value: chunk }; } if (self.#sourceExhausted) { state.detached = true; - self.#consumers.delete(state); + self.#deleteConsumer(state); return { __proto__: null, done: true, value: undefined }; } @@ -474,15 +535,17 @@ class SyncShareImpl { return() { state.detached = true; - self.#consumers.delete(state); - self.#tryTrimBuffer(); + if (self.#deleteConsumer(state)) { + self.#tryTrimBuffer(); + } return { __proto__: null, done: true, value: undefined }; }, throw() { state.detached = true; - self.#consumers.delete(state); - self.#tryTrimBuffer(); + if (self.#deleteConsumer(state)) { + self.#tryTrimBuffer(); + } return { __proto__: null, done: true, value: undefined }; }, }; @@ -532,13 +595,36 @@ class SyncShareImpl { } #tryTrimBuffer() { - const minCursor = getMinCursor( - this.#consumers, this.#bufferStart + this.#buffer.length); - const trimCount = minCursor - this.#bufferStart; + if (this.#cachedMinCursorConsumers === 0) { + this.#recomputeMinCursor(); + } + const trimCount = this.#cachedMinCursor - this.#bufferStart; if (trimCount > 0) { this.#buffer.trimFront(trimCount); - this.#bufferStart = minCursor; + this.#bufferStart = this.#cachedMinCursor; + } + } + + #recomputeMinCursor() { + const [minCursor, minCursorConsumers] = getMinCursor( + this.#consumers, this.#bufferStart + this.#buffer.length); + this.#cachedMinCursor = minCursor; + this.#cachedMinCursorConsumers = minCursorConsumers; + } + + #deleteConsumerFromMin(consumer) { + if (consumer.cursor === this.#cachedMinCursor) { + this.#cachedMinCursorConsumers--; + } + } + + #deleteConsumer(consumer) { + if (this.#consumers.delete(consumer)) { + const wasAtMin = consumer.cursor === this.#cachedMinCursor; + this.#deleteConsumerFromMin(consumer); + return wasAtMin && this.#cachedMinCursorConsumers === 0; } + return false; } } diff --git a/lib/internal/streams/iter/utils.js b/lib/internal/streams/iter/utils.js index 0520630b09c4b8..c49e56af8251d2 100644 --- a/lib/internal/streams/iter/utils.js +++ b/lib/internal/streams/iter/utils.js @@ -70,20 +70,24 @@ function onSignalAbort(signal, handler) { } /** - * Compute the minimum cursor across a set of consumers. - * Returns fallback if the set is empty. + * Compute the minimum cursor across a set of consumers and count how many + * consumers are at that cursor. * @param {Set} consumers - Set of objects with a `cursor` property - * @param {number} fallback - Value to return when set is empty - * @returns {number} + * @param {number} fallback - Cursor to return when set is empty + * @returns {[number, number]} */ function getMinCursor(consumers, fallback) { - let min = Infinity; + let minCursor = fallback; + let minCursorConsumers = 0; for (const consumer of consumers) { - if (consumer.cursor < min) { - min = consumer.cursor; + if (consumer.cursor < minCursor) { + minCursor = consumer.cursor; + minCursorConsumers = 1; + } else if (consumer.cursor === minCursor) { + minCursorConsumers++; } } - return min === Infinity ? fallback : min; + return [minCursor, minCursorConsumers]; } /** From e21db2f863f2147d26d5260fdd7fbda33b47e35c Mon Sep 17 00:00:00 2001 From: "Kamat, Trivikram" <16024985+trivikr@users.noreply.github.com> Date: Mon, 11 May 2026 23:54:00 -0700 Subject: [PATCH 2/4] chore: remove 128 consumers from share benchmark --- benchmark/streams/iter-throughput-share.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/benchmark/streams/iter-throughput-share.js b/benchmark/streams/iter-throughput-share.js index 98ac439bf4098f..a0383172c04140 100644 --- a/benchmark/streams/iter-throughput-share.js +++ b/benchmark/streams/iter-throughput-share.js @@ -3,7 +3,7 @@ const common = require('../common.js'); const bench = common.createBenchmark(main, { - consumers: [8, 32, 128], + consumers: [2, 8, 32], batches: [1e4], backpressure: ['block'], n: [5], From e87334e019941e00b66689a896e679714f66fea0 Mon Sep 17 00:00:00 2001 From: "Kamat, Trivikram" <16024985+trivikr@users.noreply.github.com> Date: Tue, 12 May 2026 08:37:51 -0700 Subject: [PATCH 3/4] chore: return object instead of tuple from getMinCursor --- lib/internal/streams/iter/broadcast.js | 2 +- lib/internal/streams/iter/share.js | 4 ++-- lib/internal/streams/iter/utils.js | 4 ++-- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/lib/internal/streams/iter/broadcast.js b/lib/internal/streams/iter/broadcast.js index b3926413f31a9e..9b3ccebff9ac89 100644 --- a/lib/internal/streams/iter/broadcast.js +++ b/lib/internal/streams/iter/broadcast.js @@ -343,7 +343,7 @@ class BroadcastImpl { // Private methods #recomputeMinCursor() { - const [minCursor] = getMinCursor( + const { minCursor } = getMinCursor( this.#consumers, this.#bufferStart + this.#buffer.length); this.#cachedMinCursor = minCursor; this.#minCursorDirty = false; diff --git a/lib/internal/streams/iter/share.js b/lib/internal/streams/iter/share.js index af2cc44fd16f39..d4adf37fd6401c 100644 --- a/lib/internal/streams/iter/share.js +++ b/lib/internal/streams/iter/share.js @@ -357,7 +357,7 @@ class ShareImpl { } #recomputeMinCursor() { - const [minCursor, minCursorConsumers] = getMinCursor( + const { minCursor, minCursorConsumers } = getMinCursor( this.#consumers, this.#bufferStart + this.#buffer.length); this.#cachedMinCursor = minCursor; this.#cachedMinCursorConsumers = minCursorConsumers; @@ -606,7 +606,7 @@ class SyncShareImpl { } #recomputeMinCursor() { - const [minCursor, minCursorConsumers] = getMinCursor( + const { minCursor, minCursorConsumers } = getMinCursor( this.#consumers, this.#bufferStart + this.#buffer.length); this.#cachedMinCursor = minCursor; this.#cachedMinCursorConsumers = minCursorConsumers; diff --git a/lib/internal/streams/iter/utils.js b/lib/internal/streams/iter/utils.js index c49e56af8251d2..7829afaade832f 100644 --- a/lib/internal/streams/iter/utils.js +++ b/lib/internal/streams/iter/utils.js @@ -74,7 +74,7 @@ function onSignalAbort(signal, handler) { * consumers are at that cursor. * @param {Set} consumers - Set of objects with a `cursor` property * @param {number} fallback - Cursor to return when set is empty - * @returns {[number, number]} + * @returns {{ minCursor: number, minCursorConsumers: number }} */ function getMinCursor(consumers, fallback) { let minCursor = fallback; @@ -87,7 +87,7 @@ function getMinCursor(consumers, fallback) { minCursorConsumers++; } } - return [minCursor, minCursorConsumers]; + return { __proto__: null, minCursor, minCursorConsumers }; } /** From f843e462ccf0729879f7e5cfcc55502becf92659 Mon Sep 17 00:00:00 2001 From: "Kamat, Trivikram" <16024985+trivikr@users.noreply.github.com> Date: Tue, 12 May 2026 08:43:01 -0700 Subject: [PATCH 4/4] stream: deduplicate share min cursor updates --- lib/internal/streams/iter/share.js | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/lib/internal/streams/iter/share.js b/lib/internal/streams/iter/share.js index d4adf37fd6401c..0160bc7eace009 100644 --- a/lib/internal/streams/iter/share.js +++ b/lib/internal/streams/iter/share.js @@ -366,14 +366,14 @@ class ShareImpl { #deleteConsumerFromMin(consumer) { if (consumer.cursor === this.#cachedMinCursor) { this.#cachedMinCursorConsumers--; + return this.#cachedMinCursorConsumers === 0; } + return false; } #deleteConsumer(consumer) { if (this.#consumers.delete(consumer)) { - const wasAtMin = consumer.cursor === this.#cachedMinCursor; - this.#deleteConsumerFromMin(consumer); - return wasAtMin && this.#cachedMinCursorConsumers === 0; + return this.#deleteConsumerFromMin(consumer); } return false; } @@ -615,14 +615,14 @@ class SyncShareImpl { #deleteConsumerFromMin(consumer) { if (consumer.cursor === this.#cachedMinCursor) { this.#cachedMinCursorConsumers--; + return this.#cachedMinCursorConsumers === 0; } + return false; } #deleteConsumer(consumer) { if (this.#consumers.delete(consumer)) { - const wasAtMin = consumer.cursor === this.#cachedMinCursor; - this.#deleteConsumerFromMin(consumer); - return wasAtMin && this.#cachedMinCursorConsumers === 0; + return this.#deleteConsumerFromMin(consumer); } return false; }