Skip to content

fix: guard processSendQueue against non-writable stream status#3416

Merged
tabcat merged 10 commits into
libp2p:mainfrom
lodekeeper:fix/stream-drain-race-guard
May 14, 2026
Merged

fix: guard processSendQueue against non-writable stream status#3416
tabcat merged 10 commits into
libp2p:mainfrom
lodekeeper:fix/stream-drain-race-guard

Conversation

@lodekeeper
Copy link
Copy Markdown
Contributor

What

Add a writeStatus guard at the top of processSendQueue() in abstract-message-stream.ts to bail out when the stream is no longer writable.

Why

A race condition causes an uncaught StreamStateError when a drain event fires on a stream whose underlying transport is already closing:

  1. TCP connection starts closing → writeStatus transitions to 'closing'
  2. A drain event fires on the underlying TCP stream
  3. The noise layer relays this as safeDispatchEvent('drain') on the encrypted stream
  4. continueSendingOnDrainprocessSendQueue()sendData()send() on the now-closing stream
  5. send() throws StreamStateError — inside an event listener with no try/catch → uncaught exception

Observed on Lodestar mainnet (v1.41.0, @libp2p/utils@7.0.13): 6 uncaught exceptions in 7 days.

Fix

Single guard at the top of processSendQueue(), consistent with the existing bail-out pattern:

if (this.writeStatus !== 'writable') {
  this.log.trace('not processing send queue as stream write status is %s', this.writeStatus)
  return false
}

Test

Added a regression test that:

  1. Creates a stream pair with capacity: 1 to trigger backpressure
  2. Fills the send buffer until send() returns false
  3. Sets writeStatus to 'closing'
  4. Dispatches a drain event
  5. Asserts sendData is NOT called (the guard catches it) and no error is thrown

Closes #3415

@lodekeeper lodekeeper requested a review from a team as a code owner March 21, 2026 16:24
@lodekeeper lodekeeper changed the title fix(@libp2p/utils): guard processSendQueue against non-writable stream status fix(@libp2p/utils): guard processSendQueue on non-writable stream Mar 21, 2026
@lodekeeper lodekeeper changed the title fix(@libp2p/utils): guard processSendQueue on non-writable stream fix: guard processSendQueue against non-writable stream status Mar 21, 2026
@lodekeeper lodekeeper force-pushed the fix/stream-drain-race-guard branch 3 times, most recently from 9538027 to c1c6584 Compare March 21, 2026 17:06
Add a try-catch around sendData() in processSendQueue() to catch
StreamStateError when the underlying transport closes between a drain
event and the send attempt. This prevents the error from propagating
as an uncaught exception.

Also add a writeStatus guard in continueSendingOnDrain to skip
processing when the stream has fully closed.

Fixes libp2p#3415
@lodekeeper lodekeeper force-pushed the fix/stream-drain-race-guard branch from c1c6584 to a8c2cf2 Compare March 21, 2026 17:06
@dozyio
Copy link
Copy Markdown
Collaborator

dozyio commented Apr 6, 2026

@lodekeeper - can you review this commit 17d9da8 please

@dozyio dozyio requested a review from tabcat April 6, 2026 12:14
@lodekeeper
Copy link
Copy Markdown
Contributor Author

@dozyio Reviewed commit 17d9da8 — looks good overall.

abstract-message-stream.ts: The willSend defensive copy + comment makes the intent clear — guard against sendData mutating the slice before throwing. The rename is cleaner than the original toSend requeue. No issues here.

Tests: Both cases are solid:

  • preserve on error: verifies buffer is intact after a StreamStateError on drain ✓
  • no duplication on transient error: the throwOnce pattern correctly proves data isn't counted twice across two drain events ✓

One minor note: the second test sets writeStatus = 'closing' before the first drain but the continueSendingOnDrain guard only bails on statuses that are neither 'writable' nor 'closing'. So 'closing' still reaches the try/catch — which is the right flow for the test to exercise. Just worth a comment in the test body if you want it to be self-documenting, but it's not required.

Happy to address any remaining feedback before merge.

@tabcat tabcat self-assigned this Apr 10, 2026
@dozyio
Copy link
Copy Markdown
Collaborator

dozyio commented Apr 22, 2026

@lodekeeper it seems test are failing after merging latest main branch - could you investigate?

The drain handler was changed in libp2p#3435 to wrap processSendQueue() in
queueMicrotask() to avoid a Maximum call stack size exceeded error.
That merge of main into this branch causes the StreamStateError drain
tests added in 17d9da8 to fail: dispatchEvent(new Event('drain'))
now returns before sendData is invoked, so synchronous assertions on
stub call counts and writeBufferLength see stale state.

Await the microtask queue between dispatching drain and asserting so
the tests observe the post-drain state the scenarios were designed to
verify.

🤖 Generated with AI assistance
@lodekeeper
Copy link
Copy Markdown
Contributor Author

@dozyio Investigated — pushed cbc1f2e2d.

Root cause: the merge of main brought in #3435 (commit 5917715d0c), which wraps processSendQueue() in queueMicrotask(...) inside the drain handler to prevent a Maximum call stack size exceeded error. After that change, the two drain-race tests added in 17d9da8ab (should preserve queued bytes when sendData throws StreamStateError during drain and should not duplicate queued bytes after transient StreamStateError during drain) dispatch a synchronous drain event and then immediately assert on stub.calledOnce / writeBufferLength, but sendData is now deferred to a microtask — so the stub has been called 0 times when the assertion fires (expected false to be true at line 495).

Fix: await a microtask flush (await new Promise<void>((resolve) => { queueMicrotask(resolve) })) between dispatchEvent('drain') and the post-drain assertions. Preserves test intent (sendData is still invoked, bytes are still preserved / requeued); just adapts to the deferred scheduling. No source change needed.

Copy link
Copy Markdown
Member

@tabcat tabcat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the work @lodekeeper
I'm going to get this over the finish line for you. Feel free to leave comments but edits are not requested from you at this time.

this.log('send failed during queue processing, stream is %s - %e', this.writeStatus, err)
// Requeue the defensive copy in case sendData mutated `toSend`
// before throwing.
this.writeBuffer.prepend(willSend)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i dont like this. imo the data should be dropped. a lot could happen inside of this.sendData before it throws

// Requeue the defensive copy in case sendData mutated `toSend`
// before throwing.
this.writeBuffer.prepend(willSend)
return false
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

send should error if we were not able to send the data

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wrapping this with try/catch and logging the error will fix the original uncaught runtime error symptom on its own.

this.log.trace('not processing send queue as already sending data')
return true
}

Copy link
Copy Markdown
Member

@tabcat tabcat May 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the write guard should go here (line 460), log, return false.

Comment on lines +99 to +107
// a drain event can arrive after writeStatus transitions to
// 'closing'/'closed' during connection teardown - bail out to
// avoid calling send() on a non-writable stream which would
// throw an uncaught StreamStateError
if (this.writeStatus !== 'writable' && this.writeStatus !== 'closing') {
this.log.trace('not processing send queue on drain as stream write status is %s', this.writeStatus)
return
}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should live in processSendQueue

Comment on lines -482 to +508
const sendResult = this.sendData(toSend)
let sendResult: SendResult
try {
sendResult = this.sendData(toSend)
} catch (err: any) {
// the underlying transport may have closed between the drain event
// and this send attempt - treat as a failed send rather than letting
// the error propagate as an uncaught exception
if (err.name === 'StreamStateError') {
this.log('send failed during queue processing, stream is %s - %e', this.writeStatus, err)
// Requeue the defensive copy in case sendData mutated `toSend`
// before throwing.
this.writeBuffer.prepend(willSend)
return false
}

throw err
}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should not be wrapped and return false on failure from sendData. can catch the uncaught runtime error by wrapping the call in continueSendingOnDrain

Copy link
Copy Markdown
Contributor Author

@lodekeeper lodekeeper left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Understood — thanks @tabcat, appreciate you taking it over. Happy to answer any questions about the original implementation if context would help. Will keep an eye on the thread.

tabcat added 3 commits May 14, 2026 17:47
Consolidate the guard alongside the existing bail-outs. Drop the
inner sendData try/catch so send() throws consistently with the
existing closed/closing check; the drain handler's microtask
wrapper catches and logs.
@tabcat
Copy link
Copy Markdown
Member

tabcat commented May 14, 2026

@lodekeeper care to review the current diff with main?

@lodekeeper
Copy link
Copy Markdown
Contributor Author

Reviewed the current diff against main. The implementation matches the design notes from your inline comments:

  • L453–457 — writeStatus guard moved into processSendQueue (returns false and logs), addresses the should live in processSendQueue / write-guard-on-line-460 notes.
  • L100–110 — drain-handler microtask wrapped in try/catch with log.error, addresses wrapping this with try/catch and logging the error will fix the original uncaught runtime error symptom.
  • Inner sendData try/catch removedsend() now propagates StreamStateError consistently with the existing closed/closing check (matches send should error if we were not able to send the data + should not be wrapped).

Tests cover the three resulting paths:

  1. drain on closed stream → no uncaught error
  2. drain mid-closing while sendData throws StreamStateError → caught by microtask wrapper
  3. direct send() when sendData throws → error propagates to caller

build and most Examples checks are green; test-node / browser / interop matrix still running, no failures so far. mergeable: MERGEABLE, status UNSTABLE only because CI is in progress. LGTM from my side — happy to address anything else if reviewers flag something.

@tabcat tabcat merged commit 8230eb5 into libp2p:main May 14, 2026
34 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

bug: uncaught StreamStateError when drain event fires on closing stream

3 participants