Skip to content

Commit ba65cd6

Browse files
committed
quic: Implement webtransport close session stream
1 parent 5e9751e commit ba65cd6

5 files changed

Lines changed: 116 additions & 1 deletion

File tree

lib/internal/quic/quic.js

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,7 @@ const {
212212
kReset,
213213
kSendHeaders,
214214
kMakeWebtransportStream,
215+
kCloseWebtransportSessionStream,
215216
kSessionApplication,
216217
kSessionTicket,
217218
kTrailers,
@@ -2040,6 +2041,33 @@ class QuicStream {
20402041
return this.#inner.pendingClose.promise;
20412042
}
20422043

2044+
/**
2045+
* Only for webtransport session streams
2046+
* Closes the webtransport session stream, and also closes
2047+
* connected datastream internally.
2048+
* @param {number|undefined} code optional error code
2049+
* @param {string|undefined} msg optional error message truncated to 1024 bytes
2050+
*/
2051+
closeWebtransportSessionStream(code, msg) {
2052+
assertIsQuicStream(this);
2053+
const inner = this.#inner;
2054+
if (inner.destroying || this.destroyed) return;
2055+
2056+
if (msg !== undefined) {
2057+
validateString(msg, 'msg');
2058+
}
2059+
inner.destroying = true;
2060+
const handle = this.#handle;
2061+
const error = makeQuicError(
2062+
'ERR_QUIC_APPLICATION_ERROR',
2063+
'Webtransport error',
2064+
'application',
2065+
code ?? 0,
2066+
msg ?? '');
2067+
this[kFinishClose](error);
2068+
handle.destroy();
2069+
}
2070+
20432071
/**
20442072
* Immediately destroys the stream. Any queued data is discarded. If
20452073
* an error is given, the closed promise will be rejected with that
@@ -2603,6 +2631,7 @@ class QuicStream {
26032631
/**
26042632
* Attaches a webtransport session to the stream and sends initial bytes
26052633
* indicating webtransport stream and the session id
2634+
* @param {QuicStream} session Webtransport session stream
26062635
* @returns {boolean} true if it succeeded.
26072636
*/
26082637
[kMakeWebtransportStream](session) {
@@ -2614,6 +2643,23 @@ class QuicStream {
26142643
return this.#handle.makeWebtransportStream(session.#handle);
26152644
}
26162645

2646+
/**
2647+
* Closes a webtransport session stream and also closes associated data streams
2648+
* Passing optional error code and error message (limited to 1024 bytes).
2649+
* @param {number} code error code
2650+
* @param {string} msg error message limited to 1024 bytes
2651+
* @returns {boolean} true if it succeeded.
2652+
*/
2653+
[kCloseWebtransportSessionStream](code, msg) {
2654+
if (this.pending) {
2655+
debug('pending stream enqueuing closeWebtransportSessionStream with code', code, ' and msg:', msg);
2656+
} else {
2657+
debug(`stream ${this.id} closeWebtransportSessionStream with code`, code,
2658+
' and msg:', msg);
2659+
}
2660+
return this.#handle.closeWebtransportSessionStream(session.#handle);
2661+
}
2662+
26172663
[kFinishClose](error) {
26182664
const inner = this.#inner;
26192665
inner.pendingClose ??= PromiseWithResolvers();

lib/internal/quic/symbols.js

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ const kRemoveStream = Symbol('kRemoveStream');
5858
const kReset = Symbol('kReset');
5959
const kSendHeaders = Symbol('kSendHeaders');
6060
const kMakeWebtransportStream = Symbol('kMakeWebtransportStream');
61+
const kCloseWebtransportSessionStream = Symbol('kCloseWebtransportSessionStream');
6162
const kSessionApplication = Symbol('kSessionApplication');
6263
const kSessionTicket = Symbol('kSessionTicket');
6364
const kTrailers = Symbol('kTrailers');
@@ -96,6 +97,7 @@ module.exports = {
9697
kReset,
9798
kSendHeaders,
9899
kMakeWebtransportStream,
100+
kCloseWebtransportSessionStream,
99101
kSessionApplication,
100102
kSessionTicket,
101103
kTrailers,

src/quic/application.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,17 @@ class Session::Application : public MemoryRetainer {
216216
return false;
217217
}
218218

219+
// closes the webtransort session stream,
220+
// and also closes connect webtransport data streams
221+
virtual bool CloseWebtransportSessionStream(
222+
const Stream& stream,
223+
uint32_t wt_error_code,
224+
const uint8_t *msg,
225+
size_t msglen
226+
) {
227+
return false;
228+
}
229+
219230
// Signals to the Application that it should serialize and transmit any
220231
// pending session and stream packets it has accumulated.
221232
void SendPendingData();

src/quic/http3.cc

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -686,6 +686,27 @@ class Http3ApplicationImpl final : public Session::Application {
686686
== 0;
687687
}
688688

689+
// closes the webtransort session stream,
690+
// and also closes connect webtransport data streams
691+
// msg is optional
692+
// msg length is maximum 1024
693+
bool CloseWebtransportSessionStream(
694+
const Stream& stream,
695+
uint32_t wt_error_code,
696+
const uint8_t *msg,
697+
size_t msglen
698+
) override {
699+
Session::SendPendingDataScope send_scope(&session());
700+
Debug(&session(),
701+
"Close webtransport session stream %" PRIu64,
702+
stream.id());
703+
return nghttp3_conn_close_wt_session(*this,
704+
stream.id(),
705+
wt_error_code,
706+
msg,
707+
msglen) == 0;
708+
}
709+
689710
void SetStreamPriority(const Stream& stream,
690711
StreamPriority priority,
691712
StreamPriorityFlags flags) override {

src/quic/streams.cc

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ namespace quic {
103103
V(Destroy, destroy, false) \
104104
V(SendHeaders, sendHeaders, false) \
105105
V(MakeWebtransportStream, makeWebtransportStream, false) \
106+
V(CloseWebtransportSessionStream, closeWebtransportSessionStream, false) \
106107
V(StopSending, stopSending, false) \
107108
V(ResetStream, resetStream, false) \
108109
V(SetPriority, setPriority, false) \
@@ -481,7 +482,7 @@ struct Stream::Impl {
481482
}
482483

483484
// Connects a stream to a webtransport session stream,
484-
// also sends the initial bytes of a stream to signel the wt stream
485+
// also sends the initial bytes of a stream to signal the wt stream
485486
// also connects the readers
486487
JS_METHOD(MakeWebtransportStream) {
487488
Stream* stream;
@@ -500,6 +501,40 @@ struct Stream::Impl {
500501
));
501502
}
502503

504+
// Closes a webtransport session stream,
505+
// also closes connected data streams
506+
JS_METHOD(CloseWebtransportSessionStream) {
507+
Stream* stream;
508+
ASSIGN_OR_RETURN_UNWRAP(&stream, args.This());
509+
CHECK(args.Length() > 0);
510+
CHECK(args[0]->IsObject());
511+
uint32_t wt_error_code = 0;
512+
if (args.Length() > 1) {
513+
CHECK(args[1]->IsUint32());
514+
wt_error_code = FromV8Value<uint32_t>(args[0]);
515+
}
516+
uint8_t * msg = nullptr;
517+
size_t msglen = 0;
518+
if (args.Length() > 2) {
519+
CHECK(args[2]->IsString());
520+
Local<String> msgstr = args[2].As<String>();
521+
const size_t length = msgstr->Utf8LengthV2(args.GetIsolate());
522+
msg = new uint8_t[length];
523+
msgstr->WriteUtf8V2(
524+
args.GetIsolate(), reinterpret_cast<char*>(msg), length, String::WriteFlags::kNone);
525+
msglen = std::min<size_t>(length, 1024);
526+
}
527+
args.GetReturnValue().Set(stream->session().application().CloseWebtransportSessionStream(
528+
*stream,
529+
wt_error_code,
530+
msg,
531+
msglen
532+
));
533+
if (msg) {
534+
delete[] msg;
535+
}
536+
}
537+
503538
// Tells the peer to stop sending data for this stream. This has the effect
504539
// of shutting down the readable side of the stream for this peer. Any data
505540
// that has already been received is still readable.

0 commit comments

Comments
 (0)