diff --git a/lib/internal/quic/quic.js b/lib/internal/quic/quic.js index 6ca59469faf2de..a5ce3d8c14fb23 100644 --- a/lib/internal/quic/quic.js +++ b/lib/internal/quic/quic.js @@ -1279,19 +1279,14 @@ function waitForDrain(stream) { // Writes a batch to the handle, awaiting drain if backpressured. // Returns true if the stream was destroyed during the wait. -// Checks writeDesiredSize before writing to enforce backpressure -// against the outbound DataQueue's uncommitted bytes. +// Only waits when writeDesiredSize is 0 (no capacity at all). +// When there is any capacity, the write proceeds even if the batch +// is larger -- the C++ side buffers the data and writeDesiredSize +// drops toward 0, letting the normal drain mechanism take over. async function writeBatchWithDrain(handle, stream, batch) { const state = getQuicStreamState(stream); - // Calculate total batch size for the capacity check. - let len = 0; - for (const chunk of batch) len += TypedArrayPrototypeGetByteLength(chunk); - - // If insufficient capacity, wait for the C++ drain signal which - // fires when writeDesiredSize transitions from 0 to > 0 (i.e., - // ngtcp2 has consumed data from the outbound DataQueue). - if (len > state.writeDesiredSize) { + if (state.writeDesiredSize === 0) { await waitForDrain(stream); if (stream.destroyed) return true; } @@ -2029,9 +2024,15 @@ class QuicStream { chunk = toUint8Array(chunk); const len = TypedArrayPrototypeGetByteLength(chunk); if (len === 0) return true; - // Refuse the write if the chunk doesn't fit in the available - // buffer capacity. The caller should wait for drain and retry. - if (len > stream.#state.writeDesiredSize) return false; + // Refuse the write only when there is no available capacity at + // all. When writeDesiredSize > 0 we allow the write even if the + // chunk is larger than the remaining capacity -- the C++ side + // will accept the data into the DataQueue and + // UpdateWriteDesiredSize() will drop writeDesiredSize toward 0, + // at which point the standard drain mechanism takes over. + // This follows the Web Streams model where writes beyond the HWM + // succeed and backpressure applies to *subsequent* writes. + if (stream.#state.writeDesiredSize === 0) return false; const result = handle.write([chunk]); if (result === undefined) return false; totalBytesWritten += len; @@ -2070,7 +2071,7 @@ class QuicStream { let len = 0; for (const c of chunks) len += TypedArrayPrototypeGetByteLength(c); if (len === 0) return true; - if (len > stream.#state.writeDesiredSize) return false; + if (stream.#state.writeDesiredSize === 0) return false; const result = handle.write(chunks); if (result === undefined) return false; totalBytesWritten += len; diff --git a/src/quic/streams.cc b/src/quic/streams.cc index dd7f7ecbb3880e..81e619e28d3720 100644 --- a/src/quic/streams.cc +++ b/src/quic/streams.cc @@ -1592,8 +1592,10 @@ void Stream::UpdateWriteDesiredSize() { uint32_t old_size = state_->write_desired_size; state_->write_desired_size = clamped; - // Fire drain when transitioning from 0 to non-zero - if (old_size == 0 && desired > 0) { + // Fire drain when transitioning from 0 to non-zero. + // writeDesiredSize == 0 means the buffer is full or flow control is + // exhausted, so the JS side may be waiting for capacity. + if (old_size == 0 && clamped > 0) { EmitDrain(); } } diff --git a/test/parallel/test-quic-stream-bidi-varchunklen.mjs b/test/parallel/test-quic-stream-bidi-varchunklen.mjs new file mode 100644 index 00000000000000..a8928c7ced34dc --- /dev/null +++ b/test/parallel/test-quic-stream-bidi-varchunklen.mjs @@ -0,0 +1,91 @@ +// Flags: --experimental-quic --experimental-stream-iter --no-warnings + +// Test: bidirectional data transfer with varying chunk sizes. +// This is a regression test for a stall caused by a mismatch between +// writeSync (which rejects when chunk > writeDesiredSize) and +// drainableProtocol (which returned null when writeDesiredSize > 0). +// When chunks don't evenly fill the high water mark, writeDesiredSize +// can be positive but smaller than the next chunk, causing the +// while(!writeSync) { dp(); await } loop to spin without yielding. +// See: https://github.com/nodejs/node/issues/63216 + +import { hasQuic, skip, mustCall } from '../common/index.mjs'; +import assert from 'node:assert'; + +const { strictEqual } = assert; + +if (!hasQuic) { + skip('QUIC is not enabled'); +} + +const { listen, connect } = await import('../common/quic.mjs'); +const { bytes, drainableProtocol: dp } = await import('stream/iter'); + +// Varying chunk sizes — the pattern of alternating large and small +// chunks is effective at triggering the writeDesiredSize gap. +const chunkSizes = [60000, 12, 50000, 1600, 20000, 30000, 0, 100]; +const numChunks = chunkSizes.length; +const byteLength = chunkSizes.reduce((a, b) => a + b, 0); + +// Build a deterministic payload so we can verify integrity. +function buildChunk(index) { + const chunk = new Uint8Array(chunkSizes[index]); + const val = index & 0xff; + for (let i = 0; i < chunkSizes[index]; i++) { + chunk[i] = (val + i) & 0xff; + } + return chunk; +} + +function checksum(data) { + let sum = 0; + for (let i = 0; i < data.byteLength; i++) { + sum = (sum + data[i]) | 0; + } + return sum; +} + +// Compute expected checksum. +let expectedChecksum = 0; +for (let i = 0; i < numChunks; i++) { + const chunk = buildChunk(i); + expectedChecksum = (expectedChecksum + checksum(chunk)) | 0; +} + +const done = Promise.withResolvers(); + +const serverEndpoint = await listen(mustCall((serverSession) => { + serverSession.onstream = mustCall(async (stream) => { + const received = await bytes(stream); + strictEqual(received.byteLength, byteLength); + strictEqual(checksum(received), expectedChecksum); + + stream.writer.endSync(); + await stream.closed; + serverSession.close(); + done.resolve(); + }); +})); + +const clientSession = await connect(serverEndpoint.address); +await clientSession.opened; + +const stream = await clientSession.createBidirectionalStream(); +const w = stream.writer; + +// Write chunks, respecting backpressure via drainableProtocol. +for (let i = 0; i < numChunks; i++) { + const chunk = buildChunk(i); + while (!w.writeSync(chunk)) { + // Flow controlled — wait for drain before retrying. + const drainable = w[dp](); + if (drainable) await drainable; + } +} + +const totalWritten = w.endSync(); +strictEqual(totalWritten, byteLength); + +await Promise.all([stream.closed, done.promise]); +await clientSession.close(); +await serverEndpoint.close();