Skip to content

Commit 71d5cc0

Browse files
committed
fixup! quic: add support for HTTP/3 datagrams
Queue datagrams for pending streams
1 parent f135d3b commit 71d5cc0

8 files changed

Lines changed: 197 additions & 26 deletions

File tree

lib/internal/quic/quic.js

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2474,13 +2474,17 @@ class QuicStream {
24742474
* large or if datagrams were not negotiated. In any such case `0n` is
24752475
* returned and the payload is silently discarded. Otherwise the underlying
24762476
* QUIC datagram id is returned.
2477+
*
2478+
* If the stream is still pending (no id yet) the datagram is buffered and
2479+
* sending attempted once the stream opens. `0n` is still returned if it
2480+
* cannot even be buffered.
24772481
* @param {ArrayBufferView|string} datagram The datagram payload.
24782482
* @param {string} [encoding] The encoding to use if datagram is a string.
2479-
* @returns {bigint} The datagram id, or `0n` if it was not sent.
2483+
* @returns {bigint} The datagram id, or `0n` if it was not queued.
24802484
*/
24812485
sendDatagram(datagram, encoding = 'utf8') {
24822486
assertIsQuicStream(this);
2483-
if (this.destroyed || this.pending) return kNilDatagramId;
2487+
if (this.destroyed) return kNilDatagramId;
24842488

24852489
// Datagrams are gated by the session's negotiated max datagram size.
24862490
// A value of 0 means datagrams are disabled (transport or, for HTTP/3,

src/quic/application.h

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -224,10 +224,12 @@ class Session::Application : public MemoryRetainer {
224224
size_t datalen,
225225
const Session::DatagramReceivedFlags& flags);
226226

227-
// Sends a datagram associated with the given stream. Applications that do
228-
// not support stream-bound datagrams return 0. Returns the underlying QUIC
229-
// datagram id, or 0 if the datagram was not queued.
230-
virtual datagram_id SendDatagram(Stream* stream, Store&& payload) {
227+
// Sends a datagram associated with the given stream, using the caller's
228+
// reserved id. Applications that do not support stream-bound datagrams
229+
// return 0. Returns the id on success, or 0 if the datagram was not queued.
230+
virtual datagram_id SendDatagram(Stream* stream,
231+
Store&& payload,
232+
datagram_id id) {
231233
return 0;
232234
}
233235

src/quic/http3.cc

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -405,15 +405,17 @@ class Http3ApplicationImpl final : public Session::Application {
405405
nghttp3_conn_unblock_stream(*this, stream->id());
406406
}
407407

408-
datagram_id SendDatagram(Stream* stream, Store&& payload) override {
408+
datagram_id SendDatagram(Stream* stream,
409+
Store&& payload,
410+
datagram_id id) override {
409411
if (!SupportsDatagrams()) return 0;
410412

411413
// HTTP/3 datagrams can only ever be associated with client-initiated
412414
// bidi streams (requests). Those ids are always divisible by 4, and
413415
// sent /4 in the framing itself.
414-
stream_id id = stream->id();
415-
if (id < 0 || id % 4 != 0) return 0;
416-
uint64_t qsid = static_cast<uint64_t>(id) / 4;
416+
stream_id sid = stream->id();
417+
if (sid < 0 || sid % 4 != 0) return 0;
418+
uint64_t qsid = static_cast<uint64_t>(sid) / 4;
417419

418420
size_t prefix_len = nghttp3_put_uvarintlen(qsid);
419421
uv_buf_t buf = payload;
@@ -433,7 +435,7 @@ class Http3ApplicationImpl final : public Session::Application {
433435
nghttp3_put_uvarint(dest, qsid);
434436
// N.b. HTTP/3 Datagram payloads can be empty (RFC 9297).
435437
if (buf.len > 0) memcpy(dest + prefix_len, buf.base, buf.len);
436-
return session().SendDatagram(Store(std::move(backing), total));
438+
return session().SendDatagram(Store(std::move(backing), total), id);
437439
}
438440

439441
void ReceiveDatagram(const uint8_t* data,

src/quic/session.cc

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1170,8 +1170,9 @@ struct Session::Impl final : public MemoryRetainer {
11701170
return;
11711171
}
11721172

1173-
args.GetReturnValue().Set(
1174-
BigInt::New(env->isolate(), session->SendDatagram(std::move(store))));
1173+
datagram_id id = session->ReserveDatagramId();
1174+
args.GetReturnValue().Set(BigInt::New(
1175+
env->isolate(), session->SendDatagram(std::move(store), id)));
11751176
}
11761177

11771178
JS_METHOD(LocalTransportParams) {
@@ -2542,7 +2543,12 @@ void Session::Send(Packet::Ptr packet, const PathStorage& path) {
25422543
Send(std::move(packet));
25432544
}
25442545

2545-
datagram_id Session::SendDatagram(Store&& data) {
2546+
datagram_id Session::ReserveDatagramId() {
2547+
DCHECK(!is_destroyed());
2548+
return ++impl_->state()->last_datagram_id;
2549+
}
2550+
2551+
datagram_id Session::SendDatagram(Store&& data, datagram_id id) {
25462552
DCHECK(!is_destroyed());
25472553

25482554
// Sending a datagram is best effort. If we cannot send it for any reason,
@@ -2577,8 +2583,8 @@ datagram_id Session::SendDatagram(Store&& data) {
25772583
return 0;
25782584
}
25792585

2580-
// Assign the datagram ID.
2581-
datagram_id did = ++impl_->state()->last_datagram_id;
2586+
// Use the id reserved by the caller.
2587+
datagram_id did = id;
25822588

25832589
// Check queue capacity. Apply the drop policy when full.
25842590
auto max_pending = impl_->state()->max_pending_datagrams;

src/quic/session.h

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -426,7 +426,16 @@ class Session final : public AsyncWrap, private SessionTicket::AppData::Source {
426426

427427
void Send(Packet::Ptr packet);
428428
void Send(Packet::Ptr packet, const PathStorage& path);
429-
datagram_id SendDatagram(Store&& data);
429+
// Reserves the next datagram id from the session-wide counter without
430+
// queueing anything. Every send path mints an id here up front and passes
431+
// it to SendDatagram. The id is only "exposed" (returned to JS) once the
432+
// datagram is committed (queued now, or buffered for a pending stream); a
433+
// reserved id that is never committed is simply discarded.
434+
datagram_id ReserveDatagramId();
435+
// Queues a datagram for sending using a previously reserved id. Returns the
436+
// id on success, or 0 if it could not be queued, in which case the caller
437+
// discards the id and emits no status.
438+
datagram_id SendDatagram(Store&& data, datagram_id id);
430439

431440
// Pending datagram accessors for use by SendPendingData.
432441
struct PendingDatagram {

src/quic/streams.cc

Lines changed: 62 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -617,21 +617,29 @@ struct Stream::Impl {
617617
ASSIGN_OR_RETURN_UNWRAP(&stream, args.This());
618618
DCHECK(args[0]->IsArrayBufferView());
619619

620-
// A datagram can only be associated with a stream that has a real id.
621-
// A pending stream has no id yet, so there is nothing to bind to.
622-
if (stream->is_pending()) {
623-
return args.GetReturnValue().Set(BigInt::New(env->isolate(), 0));
624-
}
625-
626620
Store store;
627621
if (!Store::From(args[0].As<ArrayBufferView>()).To(&store)) {
628622
return;
629623
}
630624

625+
// Mint the id up front. It is exposed (returned non-zero to JS) only
626+
// if the datagram is committed - queued now, or buffered for a pending
627+
// stream. Sync rejection returns 0 and discards (no subsequent status).
628+
datagram_id id = stream->session().ReserveDatagramId();
629+
630+
// A pending stream has no id yet, so the datagram cannot be framed and
631+
// bound to a Quarter Stream ID. Buffer it and flush when the stream opens.
632+
// If it cannot be buffered (queue full) the id is discarded.
633+
if (stream->is_pending()) {
634+
bool buffered = stream->EnqueuePendingDatagram(id, std::move(store));
635+
return args.GetReturnValue().Set(
636+
BigInt::New(env->isolate(), buffered ? id : 0));
637+
}
638+
631639
Session::SendPendingDataScope send_scope(&stream->session());
632-
datagram_id id =
633-
stream->session().application().SendDatagram(stream, std::move(store));
634-
args.GetReturnValue().Set(BigInt::New(env->isolate(), id));
640+
datagram_id result = stream->session().application().SendDatagram(
641+
stream, std::move(store), id);
642+
args.GetReturnValue().Set(BigInt::New(env->isolate(), result));
635643
}
636644
};
637645

@@ -1313,6 +1321,21 @@ void Stream::NotifyStreamOpened(stream_id id) {
13131321
headers->flags);
13141322
}
13151323
}
1324+
if (!pending_datagram_queue_.empty()) {
1325+
// Like the headers flush above, this runs inside an ngtcp2 callback, so
1326+
// the queued datagrams ride the session's deferred flush; no send scope.
1327+
std::deque<PendingDatagram> queue;
1328+
pending_datagram_queue_.swap(queue);
1329+
pending_datagram_bytes_ = 0;
1330+
for (auto& dgram : queue) {
1331+
// The id was already returned to JS. If the send is rejected now (e.g.
1332+
// the framed size exceeds the peer's max) we report ABANDONED.
1333+
if (session().application().SendDatagram(
1334+
this, std::move(dgram.data), dgram.id) == 0) {
1335+
session().DatagramStatus(dgram.id, DatagramStatus::ABANDONED);
1336+
}
1337+
}
1338+
}
13161339
// If the stream is not a local undirectional stream and is_readable is
13171340
// false, then we should shutdown the streams readable side now.
13181341
if (!is_local_unidirectional() && !is_readable()) {
@@ -1350,6 +1373,23 @@ void Stream::EnqueuePendingHeaders(HeadersKind kind,
13501373
kind, Global<Array>(env()->isolate(), headers), flags));
13511374
}
13521375

1376+
bool Stream::EnqueuePendingDatagram(datagram_id id, Store&& store) {
1377+
// Bound the pending queue by the stream's outbound high water mark. When
1378+
// full, we refuse to send the datagram (its id is discarded by the caller).
1379+
// A high water mark of 0 means unbounded, matching the stream-write path.
1380+
uint64_t hwm = state()->high_water_mark;
1381+
size_t incoming = store.length();
1382+
if (hwm > 0 && !pending_datagram_queue_.empty() &&
1383+
pending_datagram_bytes_ + incoming > hwm) {
1384+
Debug(this, "Pending datagram buffer full, refusing datagram %" PRIu64, id);
1385+
return false;
1386+
}
1387+
Debug(this, "Buffering datagram %" PRIu64 " for pending stream", id);
1388+
pending_datagram_bytes_ += incoming;
1389+
pending_datagram_queue_.push_back({id, std::move(store)});
1390+
return true;
1391+
}
1392+
13531393
bool Stream::is_pending() const {
13541394
return state()->pending;
13551395
}
@@ -1683,6 +1723,19 @@ void Stream::Destroy(QuicError error) {
16831723
}
16841724
state()->pending = 0;
16851725

1726+
// Datagrams still buffered for a pending stream had their ids returned to
1727+
// JS but will never reach the wire, so abandon them here to close each
1728+
// handle. If the session is already gone there is nobody to notify.
1729+
if (!pending_datagram_queue_.empty()) {
1730+
if (!session_->is_destroyed()) {
1731+
for (auto& dgram : pending_datagram_queue_) {
1732+
session_->DatagramStatus(dgram.id, DatagramStatus::ABANDONED);
1733+
}
1734+
}
1735+
pending_datagram_queue_.clear();
1736+
pending_datagram_bytes_ = 0;
1737+
}
1738+
16861739
maybe_pending_stream_.reset();
16871740

16881741
// End the writable before marking as destroyed.

src/quic/streams.h

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
#include "bindingdata.h"
1616
#include "data.h"
1717

18+
#include <deque>
1819
#include <vector>
1920

2021
namespace node::quic {
@@ -435,6 +436,11 @@ class Stream final : public AsyncWrap,
435436
v8::Local<v8::Array> headers,
436437
HeadersFlags flags);
437438

439+
// Buffers a datagram (with its already-reserved id) sent on a still-pending
440+
// stream, bounded by the stream's high water mark. Returns false without
441+
// queueing when the buffer is full, in which case the id is discarded.
442+
bool EnqueuePendingDatagram(datagram_id id, Store&& store);
443+
438444
ArenaSlotBase stats_slot_;
439445
ArenaSlotBase state_slot_;
440446
BaseObjectWeakPtr<Session> session_;
@@ -452,6 +458,13 @@ class Stream final : public AsyncWrap,
452458
error_code pending_close_read_code_ = 0;
453459
error_code pending_close_write_code_ = 0;
454460

461+
struct PendingDatagram {
462+
datagram_id id;
463+
Store data;
464+
};
465+
std::deque<PendingDatagram> pending_datagram_queue_;
466+
size_t pending_datagram_bytes_ = 0;
467+
455468
struct StoredPriority {
456469
StreamPriority priority = StreamPriority::DEFAULT;
457470
StreamPriorityFlags flags = StreamPriorityFlags::NON_INCREMENTAL;

test/parallel/test-quic-h3-datagram.mjs

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -330,3 +330,85 @@ const cert = readKey('agent1-cert.pem');
330330
await clientSession.close();
331331
await serverEndpoint.close();
332332
}
333+
334+
// Test 6: a datagram sent on a still-pending stream gets a real, trackable id,
335+
// is delivered once the stream opens, and reports a terminal status via
336+
// ondatagramstatus like any other datagram.
337+
{
338+
const serverGotDatagram = Promise.withResolvers();
339+
const serverDone = Promise.withResolvers();
340+
341+
const serverEndpoint = await listen(mustCall(async (ss) => {
342+
ss.onstream = mustCall((stream) => {
343+
stream.ondatagram = (data) => {
344+
deepStrictEqual([...data], [7, 8, 9]);
345+
serverGotDatagram.resolve();
346+
};
347+
}, 2);
348+
await serverDone.promise;
349+
ss.close();
350+
}), {
351+
sni: { '*': { keys: [key], certs: [cert] } },
352+
application: { enableDatagrams: true },
353+
// Only one concurrent client bidi stream, so the second request stays
354+
// pending until the first closes.
355+
transportParams: { maxDatagramFrameSize: 100, initialMaxStreamsBidi: 1 },
356+
onheaders: mustCall(function() {
357+
this.sendHeaders({ ':status': '200' });
358+
this.writer.endSync();
359+
}, 2),
360+
});
361+
362+
const datagramStatus = Promise.withResolvers();
363+
const clientSession = await connect(serverEndpoint.address, {
364+
servername: 'localhost',
365+
verifyPeer: 'manual',
366+
application: { enableDatagrams: true },
367+
transportParams: { maxDatagramFrameSize: 100 },
368+
ondatagramstatus: (id, status) => datagramStatus.resolve({ id, status }),
369+
});
370+
await clientSession.opened;
371+
372+
// First request holds the single available bidi slot.
373+
const stream1 = await clientSession.createBidirectionalStream({
374+
headers: {
375+
':method': 'GET', ':path': '/first',
376+
':scheme': 'https', ':authority': 'localhost',
377+
},
378+
onheaders: mustCall(function(headers) {
379+
strictEqual(headers[':status'], '200');
380+
}),
381+
});
382+
383+
// Second stream is pending; the datagram is buffered with a real id.
384+
const secondResponse = Promise.withResolvers();
385+
const stream2 = await clientSession.createBidirectionalStream({
386+
headers: {
387+
':method': 'GET', ':path': '/second',
388+
':scheme': 'https', ':authority': 'localhost',
389+
},
390+
onheaders: mustCall(() => secondResponse.resolve()),
391+
});
392+
ok(stream2.pending);
393+
const pendingId = stream2.sendDatagram(new Uint8Array([7, 8, 9]));
394+
ok(pendingId > 0n);
395+
396+
// Free the slot; stream2 opens and the buffered datagram flushes.
397+
stream1.writer.endSync();
398+
for await (const _ of stream1) { /* drain */ } // eslint-disable-line no-unused-vars
399+
await stream1.closed;
400+
401+
await Promise.all([secondResponse.promise, serverGotDatagram.promise]);
402+
403+
// The buffered datagram is tracked: its id reports exactly one terminal
404+
// status (acknowledged on loopback, but lost is also valid).
405+
const { id, status } = await datagramStatus.promise;
406+
strictEqual(id, pendingId);
407+
ok(status === 'acknowledged' || status === 'lost');
408+
409+
for await (const _ of stream2) { /* drain */ } // eslint-disable-line no-unused-vars
410+
await stream2.closed;
411+
serverDone.resolve();
412+
await clientSession.close();
413+
await serverEndpoint.close();
414+
}

0 commit comments

Comments
 (0)