fix: guard processSendQueue against non-writable stream status#3416
Conversation
9538027 to
c1c6584
Compare
Add a try-catch around sendData() in processSendQueue() to catch StreamStateError when the underlying transport closes between a drain event and the send attempt. This prevents the error from propagating as an uncaught exception. Also add a writeStatus guard in continueSendingOnDrain to skip processing when the stream has fully closed. Fixes libp2p#3415
c1c6584 to
a8c2cf2
Compare
|
@lodekeeper - can you review this commit 17d9da8 please |
|
@dozyio Reviewed commit 17d9da8 — looks good overall.
Tests: Both cases are solid:
One minor note: the second test sets Happy to address any remaining feedback before merge. |
|
@lodekeeper it seems test are failing after merging latest main branch - could you investigate? |
The drain handler was changed in libp2p#3435 to wrap processSendQueue() in queueMicrotask() to avoid a Maximum call stack size exceeded error. That merge of main into this branch causes the StreamStateError drain tests added in 17d9da8 to fail: dispatchEvent(new Event('drain')) now returns before sendData is invoked, so synchronous assertions on stub call counts and writeBufferLength see stale state. Await the microtask queue between dispatching drain and asserting so the tests observe the post-drain state the scenarios were designed to verify. 🤖 Generated with AI assistance
|
@dozyio Investigated — pushed Root cause: the merge of Fix: await a microtask flush ( |
tabcat
left a comment
There was a problem hiding this comment.
Thanks for the work @lodekeeper
I'm going to get this over the finish line for you. Feel free to leave comments but edits are not requested from you at this time.
| this.log('send failed during queue processing, stream is %s - %e', this.writeStatus, err) | ||
| // Requeue the defensive copy in case sendData mutated `toSend` | ||
| // before throwing. | ||
| this.writeBuffer.prepend(willSend) |
There was a problem hiding this comment.
i dont like this. imo the data should be dropped. a lot could happen inside of this.sendData before it throws
| // Requeue the defensive copy in case sendData mutated `toSend` | ||
| // before throwing. | ||
| this.writeBuffer.prepend(willSend) | ||
| return false |
There was a problem hiding this comment.
send should error if we were not able to send the data
There was a problem hiding this comment.
wrapping this with try/catch and logging the error will fix the original uncaught runtime error symptom on its own.
| this.log.trace('not processing send queue as already sending data') | ||
| return true | ||
| } | ||
|
|
There was a problem hiding this comment.
the write guard should go here (line 460), log, return false.
| // a drain event can arrive after writeStatus transitions to | ||
| // 'closing'/'closed' during connection teardown - bail out to | ||
| // avoid calling send() on a non-writable stream which would | ||
| // throw an uncaught StreamStateError | ||
| if (this.writeStatus !== 'writable' && this.writeStatus !== 'closing') { | ||
| this.log.trace('not processing send queue on drain as stream write status is %s', this.writeStatus) | ||
| return | ||
| } | ||
|
|
| const sendResult = this.sendData(toSend) | ||
| let sendResult: SendResult | ||
| try { | ||
| sendResult = this.sendData(toSend) | ||
| } catch (err: any) { | ||
| // the underlying transport may have closed between the drain event | ||
| // and this send attempt - treat as a failed send rather than letting | ||
| // the error propagate as an uncaught exception | ||
| if (err.name === 'StreamStateError') { | ||
| this.log('send failed during queue processing, stream is %s - %e', this.writeStatus, err) | ||
| // Requeue the defensive copy in case sendData mutated `toSend` | ||
| // before throwing. | ||
| this.writeBuffer.prepend(willSend) | ||
| return false | ||
| } | ||
|
|
||
| throw err | ||
| } | ||
|
|
There was a problem hiding this comment.
should not be wrapped and return false on failure from sendData. can catch the uncaught runtime error by wrapping the call in continueSendingOnDrain
lodekeeper
left a comment
There was a problem hiding this comment.
Understood — thanks @tabcat, appreciate you taking it over. Happy to answer any questions about the original implementation if context would help. Will keep an eye on the thread.
Consolidate the guard alongside the existing bail-outs. Drop the inner sendData try/catch so send() throws consistently with the existing closed/closing check; the drain handler's microtask wrapper catches and logs.
|
@lodekeeper care to review the current diff with main? |
|
Reviewed the current diff against main. The implementation matches the design notes from your inline comments:
Tests cover the three resulting paths:
|
What
Add a
writeStatusguard at the top ofprocessSendQueue()inabstract-message-stream.tsto bail out when the stream is no longer writable.Why
A race condition causes an uncaught
StreamStateErrorwhen adrainevent fires on a stream whose underlying transport is already closing:writeStatustransitions to'closing'drainevent fires on the underlying TCP streamsafeDispatchEvent('drain')on the encrypted streamcontinueSendingOnDrain→processSendQueue()→sendData()→send()on the now-closing streamsend()throwsStreamStateError— inside an event listener with no try/catch → uncaught exceptionObserved on Lodestar mainnet (v1.41.0,
@libp2p/utils@7.0.13): 6 uncaught exceptions in 7 days.Fix
Single guard at the top of
processSendQueue(), consistent with the existing bail-out pattern:Test
Added a regression test that:
capacity: 1to trigger backpressuresend()returnsfalsewriteStatusto'closing'draineventsendDatais NOT called (the guard catches it) and no error is thrownCloses #3415