@@ -158,6 +158,7 @@ const {
158158} = require ( 'internal/fs/promises' ) ;
159159
160160const {
161+ validateAbortSignal,
161162 validateBoolean,
162163 validateFunction,
163164 validateInteger,
@@ -859,7 +860,11 @@ function parseHeaderPairs(pairs) {
859860 const block = { __proto__ : null } ;
860861 for ( let n = 0 ; n + 1 < pairs . length ; n += 2 ) {
861862 if ( block [ pairs [ n ] ] !== undefined ) {
862- block [ pairs [ n ] ] = [ block [ pairs [ n ] ] , pairs [ n + 1 ] ] ;
863+ if ( ArrayIsArray ( block [ pairs [ n ] ] ) ) {
864+ ArrayPrototypePush ( block [ pairs [ n ] ] , pairs [ n + 1 ] ) ;
865+ } else {
866+ block [ pairs [ n ] ] = [ block [ pairs [ n ] ] , pairs [ n + 1 ] ] ;
867+ }
863868 } else {
864869 block [ pairs [ n ] ] = pairs [ n + 1 ] ;
865870 }
@@ -1045,9 +1050,13 @@ async function consumeSyncSource(handle, stream, source) {
10451050 if ( await writeBatchWithDrain ( handle , stream , batch ) ) return ;
10461051 }
10471052 handle . endWrite ( ) ;
1048- } catch {
1053+ } catch ( err ) {
10491054 if ( ! stream . destroyed ) {
1050- handle . resetStream ( 0n ) ;
1055+ stream . destroy ( err ) ;
1056+ } else {
1057+ // If the stream is already destroyed, rethrow the error to avoid
1058+ // silently swallowing it. Tho in practice this shouldn't happen.
1059+ throw err ;
10511060 }
10521061 }
10531062}
@@ -1666,29 +1675,69 @@ class QuicStream {
16661675 }
16671676
16681677 function endSync ( ) {
1678+ // Per the streams/iter spec, endSync and end follow a try-fallback
1679+ // pattern. That is, callers should try endSync first and if it returns
1680+ // -1, then they should call and await end(). This is a signal that sync
1681+ // end is not currently possible. However, we always support sync end
1682+ // here unless the stream is already errored.
16691683 if ( errored ) return - 1 ;
1684+
1685+ // If we're already closed, just return the total bytes written.
16701686 if ( closed ) return totalBytesWritten ;
1687+
1688+ // If we are waiting for drain to complete, we cannot end synchronously.
1689+ if ( drainWakeup != null ) return - 1 ;
1690+
1691+ // Fantastic, we can end synchronously!
16711692 handle . endWrite ( ) ;
16721693 closed = true ;
16731694 return totalBytesWritten ;
16741695 }
16751696
1676- async function end ( options ) {
1697+ async function end ( options = kEmptyObject ) {
1698+ validateObject ( options , 'options' ) ;
1699+ const { signal } = options ;
1700+ if ( signal !== undefined ) {
1701+ validateAbortSignal ( signal , 'options.signal' ) ;
1702+ signal . throwIfAborted ( ) ;
1703+ // TODO(@jasnell): The stream/iter spec allows individual sync end
1704+ // calls to be canceled via an AbortSignal. We currently do not support
1705+ // this, but we can add before the impl is graduated from experimental.
1706+ // At most we do here is check for signal abort at the start of the call.
1707+ }
1708+
1709+ // Per the streams/iter spec, endSync and end follow a try-fallback
1710+ // pattern. That is, callers should try endSync first and if it returns
1711+ // -1, then they should call and await end(). This is a signal that sync
1712+ // end is not currently possible. However, we always support sync end
1713+ // here unless the stream is already errored.
1714+ // While the user should have already called endSync, we call it again
1715+ // here to actually process the end request. At worst it's called twice.
16771716 const n = endSync ( ) ;
1717+
1718+ // A return value of -1 indicates that endSync was not yet able to
1719+ // process the end request, either because we are errored or because we
1720+ // are awaiting drain. If we're errored, throw the error. If we're waiting
1721+ // for drain, await it and then try ending again.
1722+
16781723 if ( n >= 0 ) return n ;
16791724 if ( errored ) throw error ;
1680- drainWakeup = PromiseWithResolvers ( ) ;
1681- await drainWakeup . promise ;
1682- drainWakeup = null ;
1725+
1726+ drainWakeup ??= PromiseWithResolvers ( ) ;
1727+ try {
1728+ await drainWakeup . promise ;
1729+ } finally {
1730+ drainWakeup = null ;
1731+ }
16831732 return endSync ( ) ;
16841733 }
16851734
16861735 function fail ( reason ) {
16871736 if ( closed || errored ) return ;
16881737 errored = true ;
1689- error = reason ;
1738+ error = reason ?? new ERR_INVALID_STATE ( 'Failed' ) ;
16901739 handle . resetStream ( 0n ) ;
1691- if ( drainWakeup ) {
1740+ if ( drainWakeup != null ) {
16921741 drainWakeup . reject ( reason ) ;
16931742 drainWakeup = null ;
16941743 }
@@ -1710,7 +1759,7 @@ class QuicStream {
17101759 [ drainableProtocol ] ( ) {
17111760 if ( closed || errored ) return null ;
17121761 if ( stream . #state. writeDesiredSize > 0 ) return null ;
1713- drainWakeup = PromiseWithResolvers ( ) ;
1762+ drainWakeup ?? = PromiseWithResolvers ( ) ;
17141763 return drainWakeup . promise ;
17151764 } ,
17161765 [ SymbolAsyncDispose ] ( ) {
@@ -1912,6 +1961,9 @@ class QuicStream {
19121961 this . #stats[ kFinishClose ] ( ) ;
19131962 this . #state[ kFinishClose ] ( ) ;
19141963 this . #session[ kRemoveStream ] ( this ) ;
1964+ if ( this . #writer !== undefined ) {
1965+ this . #writer. fail ( error ) ;
1966+ }
19151967 this . #session = undefined ;
19161968 this . #pendingClose. reject = undefined ;
19171969 this . #pendingClose. resolve = undefined ;
0 commit comments