Skip to content

Commit 4ade24f

Browse files
committed
quic: fixup quic stream variable chunk len
Signed-off-by: James M Snell <jasnell@gmail.com>
1 parent 49aef0d commit 4ade24f

3 files changed

Lines changed: 110 additions & 4 deletions

File tree

lib/internal/quic/quic.js

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2031,7 +2031,15 @@ class QuicStream {
20312031
if (len === 0) return true;
20322032
// Refuse the write if the chunk doesn't fit in the available
20332033
// buffer capacity. The caller should wait for drain and retry.
2034-
if (len > stream.#state.writeDesiredSize) return false;
2034+
// Set up drainWakeup so that drainableProtocol() returns a
2035+
// promise the caller can await, even when writeDesiredSize > 0
2036+
// but is smaller than the chunk. Without this, the standard
2037+
// while(!writeSync) { dp(); await } loop would spin
2038+
// synchronously and starve the event loop.
2039+
if (len > stream.#state.writeDesiredSize) {
2040+
drainWakeup ??= PromiseWithResolvers();
2041+
return false;
2042+
}
20352043
const result = handle.write([chunk]);
20362044
if (result === undefined) return false;
20372045
totalBytesWritten += len;
@@ -2070,7 +2078,10 @@ class QuicStream {
20702078
let len = 0;
20712079
for (const c of chunks) len += TypedArrayPrototypeGetByteLength(c);
20722080
if (len === 0) return true;
2073-
if (len > stream.#state.writeDesiredSize) return false;
2081+
if (len > stream.#state.writeDesiredSize) {
2082+
drainWakeup ??= PromiseWithResolvers();
2083+
return false;
2084+
}
20742085
const result = handle.write(chunks);
20752086
if (result === undefined) return false;
20762087
totalBytesWritten += len;

src/quic/streams.cc

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1592,8 +1592,12 @@ void Stream::UpdateWriteDesiredSize() {
15921592
uint32_t old_size = state_->write_desired_size;
15931593
state_->write_desired_size = clamped;
15941594

1595-
// Fire drain when transitioning from 0 to non-zero
1596-
if (old_size == 0 && desired > 0) {
1595+
// Fire drain when available capacity increases. This covers both the
1596+
// classic 0-to-positive transition and the case where writeDesiredSize
1597+
// was already positive but too small for the next chunk. The JS drain
1598+
// handler is a no-op when no drainWakeup is pending, so the extra
1599+
// callbacks when nobody is waiting are harmless.
1600+
if (clamped > old_size) {
15971601
EmitDrain();
15981602
}
15991603
}
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
// Flags: --experimental-quic --experimental-stream-iter --no-warnings
2+
3+
// Test: bidirectional data transfer with varying chunk sizes.
4+
// This is a regression test for a stall caused by a mismatch between
5+
// writeSync (which rejects when chunk > writeDesiredSize) and
6+
// drainableProtocol (which returned null when writeDesiredSize > 0).
7+
// When chunks don't evenly fill the high water mark, writeDesiredSize
8+
// can be positive but smaller than the next chunk, causing the
9+
// while(!writeSync) { dp(); await } loop to spin without yielding.
10+
// See: https://github.com/nodejs/node/issues/63216
11+
12+
import { hasQuic, skip, mustCall } from '../common/index.mjs';
13+
import assert from 'node:assert';
14+
15+
const { strictEqual } = assert;
16+
17+
if (!hasQuic) {
18+
skip('QUIC is not enabled');
19+
}
20+
21+
const { listen, connect } = await import('../common/quic.mjs');
22+
const { bytes, drainableProtocol: dp } = await import('stream/iter');
23+
24+
// Varying chunk sizes — the pattern of alternating large and small
25+
// chunks is effective at triggering the writeDesiredSize gap.
26+
const chunkSizes = [60000, 12, 50000, 1600, 20000, 30000, 0, 100];
27+
const numChunks = chunkSizes.length;
28+
const byteLength = chunkSizes.reduce((a, b) => a + b, 0);
29+
30+
// Build a deterministic payload so we can verify integrity.
31+
function buildChunk(index) {
32+
const chunk = new Uint8Array(chunkSizes[index]);
33+
const val = index & 0xff;
34+
for (let i = 0; i < chunkSizes[index]; i++) {
35+
chunk[i] = (val + i) & 0xff;
36+
}
37+
return chunk;
38+
}
39+
40+
function checksum(data) {
41+
let sum = 0;
42+
for (let i = 0; i < data.byteLength; i++) {
43+
sum = (sum + data[i]) | 0;
44+
}
45+
return sum;
46+
}
47+
48+
// Compute expected checksum.
49+
let expectedChecksum = 0;
50+
for (let i = 0; i < numChunks; i++) {
51+
const chunk = buildChunk(i);
52+
expectedChecksum = (expectedChecksum + checksum(chunk)) | 0;
53+
}
54+
55+
const done = Promise.withResolvers();
56+
57+
const serverEndpoint = await listen(mustCall((serverSession) => {
58+
serverSession.onstream = mustCall(async (stream) => {
59+
const received = await bytes(stream);
60+
strictEqual(received.byteLength, byteLength);
61+
strictEqual(checksum(received), expectedChecksum);
62+
63+
stream.writer.endSync();
64+
await stream.closed;
65+
serverSession.close();
66+
done.resolve();
67+
});
68+
}));
69+
70+
const clientSession = await connect(serverEndpoint.address);
71+
await clientSession.opened;
72+
73+
const stream = await clientSession.createBidirectionalStream();
74+
const w = stream.writer;
75+
76+
// Write chunks, respecting backpressure via drainableProtocol.
77+
for (let i = 0; i < numChunks; i++) {
78+
const chunk = buildChunk(i);
79+
while (!w.writeSync(chunk)) {
80+
// Flow controlled — wait for drain before retrying.
81+
const drainable = w[dp]();
82+
if (drainable) await drainable;
83+
}
84+
}
85+
86+
const totalWritten = w.endSync();
87+
strictEqual(totalWritten, byteLength);
88+
89+
await Promise.all([stream.closed, done.promise]);
90+
await clientSession.close();
91+
await serverEndpoint.close();

0 commit comments

Comments
 (0)