From a8c2cf28dcb82c826553adb80eb38ed87b21d12c Mon Sep 17 00:00:00 2001 From: lodekeeper Date: Sat, 21 Mar 2026 17:05:02 +0000 Subject: [PATCH 1/7] fix: guard processSendQueue against non-writable stream status Add a try-catch around sendData() in processSendQueue() to catch StreamStateError when the underlying transport closes between a drain event and the send attempt. This prevents the error from propagating as an uncaught exception. Also add a writeStatus guard in continueSendingOnDrain to skip processing when the stream has fully closed. Fixes #3415 --- packages/utils/src/abstract-message-stream.ts | 26 ++++++++- packages/utils/test/stream-utils-test.spec.ts | 54 +++++++++++++++++++ 2 files changed, 79 insertions(+), 1 deletion(-) diff --git a/packages/utils/src/abstract-message-stream.ts b/packages/utils/src/abstract-message-stream.ts index 2b55c531de..f633efacd8 100644 --- a/packages/utils/src/abstract-message-stream.ts +++ b/packages/utils/src/abstract-message-stream.ts @@ -96,6 +96,15 @@ export abstract class AbstractMessageStream { if (this.writableNeedsDrain) { + // a drain event can arrive after writeStatus transitions to + // 'closing'/'closed' during connection teardown - bail out to + // avoid calling send() on a non-writable stream which would + // throw an uncaught StreamStateError + if (this.writeStatus !== 'writable' && this.writeStatus !== 'closing') { + this.log.trace('not processing send queue on drain as stream write status is %s', this.writeStatus) + return + } + this.log.trace('drain event received, continue sending data') this.writableNeedsDrain = false this.processSendQueue() @@ -476,7 +485,22 @@ export abstract class AbstractMessageStream { await expect(outgoing.onDrain()).to.eventually.be.rejected .with.property('name', 'StreamResetError') }) + + it('should not throw uncaught error on drain event when stream is closed', async () => { + const [outgoing] = await streamPair({ + capacity: 1, + delay: 100 + }) + + // fill the write buffer to trigger backpressure + while (outgoing.send(Uint8Array.from([0, 1, 2, 3]))) { + // drain the send buffer + } + expect(outgoing.send(Uint8Array.from([4, 5, 6, 7]))).to.be.false() + + // set the stream to closed - drain events after this should be ignored + outgoing.writeStatus = 'closed' + + // dispatching drain on a closed stream should not throw + expect(() => { + outgoing.dispatchEvent(new Event('drain')) + }).to.not.throw() + + outgoing.abort(new Error('cleanup')) + }) + + it('should catch StreamStateError from sendData during closing', async () => { + const [outgoing] = await streamPair({ + capacity: 1, + delay: 100 + }) + const outgoingStream = outgoing as typeof outgoing & { + sendData(data: Uint8ArrayList): { sentBytes: number, canSendMore: boolean } + } + + // fill the write buffer to trigger backpressure + while (outgoing.send(Uint8Array.from([0, 1, 2, 3]))) { + // drain the send buffer + } + expect(outgoing.send(Uint8Array.from([4, 5, 6, 7]))).to.be.false() + + // stub sendData to throw StreamStateError (simulates underlying + // transport closing while the outer stream is still flushing) + const err = new Error('Cannot write to a stream that is closing') + err.name = 'StreamStateError' + Sinon.stub(outgoingStream, 'sendData').throws(err) + + outgoing.writeStatus = 'closing' + + // the drain event should not throw - the error should be caught internally + expect(() => { + outgoing.dispatchEvent(new Event('drain')) + }).to.not.throw() + + outgoing.abort(new Error('cleanup')) + }) }) From ef4dc8b9b283c6f18e56e57149518d7f8882e342 Mon Sep 17 00:00:00 2001 From: lodekeeper Date: Sat, 21 Mar 2026 17:35:05 +0000 Subject: [PATCH 2/7] ci: retrigger CI for flaky WebRTC and integration test failures From 17d9da8abf14cd23941699e1c12fb51a8f589b05 Mon Sep 17 00:00:00 2001 From: dozyio Date: Mon, 6 Apr 2026 13:10:48 +0100 Subject: [PATCH 3/7] chore: use defensive copy instead of potentially modified data, few tests --- packages/utils/src/abstract-message-stream.ts | 4 +- packages/utils/test/stream-utils-test.spec.ts | 90 +++++++++++++++++++ 2 files changed, 93 insertions(+), 1 deletion(-) diff --git a/packages/utils/src/abstract-message-stream.ts b/packages/utils/src/abstract-message-stream.ts index f633efacd8..32702e4187 100644 --- a/packages/utils/src/abstract-message-stream.ts +++ b/packages/utils/src/abstract-message-stream.ts @@ -494,7 +494,9 @@ export abstract class AbstractMessageStream { outgoing.abort(new Error('cleanup')) }) + + it('should preserve queued bytes when sendData throws StreamStateError during drain', async () => { + const [outgoing] = await streamPair() + const outgoingStream = outgoing as typeof outgoing & { + sendData(data: Uint8ArrayList): { sentBytes: number, canSendMore: boolean } + } + + const payload = new Uint8ArrayList( + Uint8Array.from([0, 1, 2, 3]), + Uint8Array.from([4, 5, 6, 7]) + ) + + outgoing.writableNeedsDrain = true + expect(outgoing.send(payload)).to.be.false() + expect(outgoing.writeBufferLength).to.equal(payload.byteLength) + + const err = new Error('Cannot write to a stream that is closing') + err.name = 'StreamStateError' + const stub = Sinon.stub(outgoingStream, 'sendData').throws(err) + + outgoing.writeStatus = 'closing' + + expect(() => { + outgoing.dispatchEvent(new Event('drain')) + }).to.not.throw() + + expect(stub.calledOnce).to.be.true() + expect(outgoing.writeBufferLength).to.equal(payload.byteLength) + + stub.restore() + outgoing.abort(new Error('cleanup')) + }) + + it('should not duplicate queued bytes after transient StreamStateError during drain', async () => { + const [outgoing] = await streamPair() + const outgoingStream = outgoing as typeof outgoing & { + sendData(data: Uint8ArrayList): { sentBytes: number, canSendMore: boolean } + } + + const payload = new Uint8ArrayList( + Uint8Array.from([0, 1, 2, 3]), + Uint8Array.from([4, 5, 6, 7]), + Uint8Array.from([8, 9, 10, 11]) + ) + + outgoing.writableNeedsDrain = true + expect(outgoing.send(payload)).to.be.false() + expect(outgoing.writeBufferLength).to.equal(payload.byteLength) + + const streamStateError = new Error('Cannot write to a stream that is closing') + streamStateError.name = 'StreamStateError' + + const sentPayloads: Uint8Array[] = [] + let throwOnce = true + + const stub = Sinon.stub(outgoingStream, 'sendData').callsFake((data: Uint8ArrayList) => { + if (throwOnce) { + throwOnce = false + throw streamStateError + } + + sentPayloads.push(data.subarray()) + + return { + sentBytes: data.byteLength, + canSendMore: true + } + }) + + outgoing.writeStatus = 'closing' + + expect(() => { + outgoing.dispatchEvent(new Event('drain')) + }).to.not.throw() + expect(outgoing.writeBufferLength).to.equal(payload.byteLength) + + // simulate a later drain event after writableNeedsDrain is set again + outgoing.writableNeedsDrain = true + expect(() => { + outgoing.dispatchEvent(new Event('drain')) + }).to.not.throw() + + expect(stub.callCount).to.equal(2) + expect(outgoing.writeBufferLength).to.equal(0) + expect(sentPayloads).to.have.lengthOf(1) + expect(sentPayloads[0]).to.equalBytes(payload.subarray()) + + stub.restore() + outgoing.abort(new Error('cleanup')) + }) }) From cbc1f2e2de08e999f6f94b9d1d984e523d71e149 Mon Sep 17 00:00:00 2001 From: lodekeeper Date: Wed, 22 Apr 2026 04:28:18 +0000 Subject: [PATCH 4/7] test: await microtask flush after drain dispatch MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The drain handler was changed in #3435 to wrap processSendQueue() in queueMicrotask() to avoid a Maximum call stack size exceeded error. That merge of main into this branch causes the StreamStateError drain tests added in 17d9da8ab to fail: dispatchEvent(new Event('drain')) now returns before sendData is invoked, so synchronous assertions on stub call counts and writeBufferLength see stale state. Await the microtask queue between dispatching drain and asserting so the tests observe the post-drain state the scenarios were designed to verify. 🤖 Generated with AI assistance --- packages/utils/test/stream-utils-test.spec.ts | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/packages/utils/test/stream-utils-test.spec.ts b/packages/utils/test/stream-utils-test.spec.ts index 7d18620e89..0a2b84d162 100644 --- a/packages/utils/test/stream-utils-test.spec.ts +++ b/packages/utils/test/stream-utils-test.spec.ts @@ -492,6 +492,9 @@ describe('stream-pair', () => { outgoing.dispatchEvent(new Event('drain')) }).to.not.throw() + // drain handler defers processSendQueue via queueMicrotask - wait for it + await new Promise((resolve) => { queueMicrotask(resolve) }) + expect(stub.calledOnce).to.be.true() expect(outgoing.writeBufferLength).to.equal(payload.byteLength) @@ -540,6 +543,10 @@ describe('stream-pair', () => { expect(() => { outgoing.dispatchEvent(new Event('drain')) }).to.not.throw() + + // drain handler defers processSendQueue via queueMicrotask - wait for it + await new Promise((resolve) => { queueMicrotask(resolve) }) + expect(outgoing.writeBufferLength).to.equal(payload.byteLength) // simulate a later drain event after writableNeedsDrain is set again @@ -548,6 +555,8 @@ describe('stream-pair', () => { outgoing.dispatchEvent(new Event('drain')) }).to.not.throw() + await new Promise((resolve) => { queueMicrotask(resolve) }) + expect(stub.callCount).to.equal(2) expect(outgoing.writeBufferLength).to.equal(0) expect(sentPayloads).to.have.lengthOf(1) From 1dfb825ba07c6c4519b3018ecc7b1d36ad0e5f6b Mon Sep 17 00:00:00 2001 From: tabcat Date: Thu, 14 May 2026 17:34:48 +0700 Subject: [PATCH 5/7] refactor: move writeStatus guard into processSendQueue Consolidate the guard alongside the existing bail-outs. Drop the inner sendData try/catch so send() throws consistently with the existing closed/closing check; the drain handler's microtask wrapper catches and logs. --- packages/utils/src/abstract-message-stream.ts | 39 +++----- packages/utils/test/stream-utils-test.spec.ts | 90 +------------------ 2 files changed, 16 insertions(+), 113 deletions(-) diff --git a/packages/utils/src/abstract-message-stream.ts b/packages/utils/src/abstract-message-stream.ts index 1f16f3b298..8a09072815 100644 --- a/packages/utils/src/abstract-message-stream.ts +++ b/packages/utils/src/abstract-message-stream.ts @@ -96,20 +96,15 @@ export abstract class AbstractMessageStream { if (this.writableNeedsDrain) { - // a drain event can arrive after writeStatus transitions to - // 'closing'/'closed' during connection teardown - bail out to - // avoid calling send() on a non-writable stream which would - // throw an uncaught StreamStateError - if (this.writeStatus !== 'writable' && this.writeStatus !== 'closing') { - this.log.trace('not processing send queue on drain as stream write status is %s', this.writeStatus) - return - } - this.log.trace('drain event received, continue sending data') this.writableNeedsDrain = false queueMicrotask(() => { - this.processSendQueue() + try { + this.processSendQueue() + } catch (err) { + this.log.error('processSendQueue threw - %e', err) + } }) } @@ -458,6 +453,12 @@ export abstract class AbstractMessageStream { outgoing.abort(new Error('cleanup')) }) - it('should preserve queued bytes when sendData throws StreamStateError during drain', async () => { + it('should propagate StreamStateError from send() when sendData throws', async () => { const [outgoing] = await streamPair() const outgoingStream = outgoing as typeof outgoing & { sendData(data: Uint8ArrayList): { sentBytes: number, canSendMore: boolean } } - const payload = new Uint8ArrayList( - Uint8Array.from([0, 1, 2, 3]), - Uint8Array.from([4, 5, 6, 7]) - ) - - outgoing.writableNeedsDrain = true - expect(outgoing.send(payload)).to.be.false() - expect(outgoing.writeBufferLength).to.equal(payload.byteLength) - const err = new Error('Cannot write to a stream that is closing') err.name = 'StreamStateError' - const stub = Sinon.stub(outgoingStream, 'sendData').throws(err) - - outgoing.writeStatus = 'closing' - - expect(() => { - outgoing.dispatchEvent(new Event('drain')) - }).to.not.throw() - - // drain handler defers processSendQueue via queueMicrotask - wait for it - await new Promise((resolve) => { queueMicrotask(resolve) }) - - expect(stub.calledOnce).to.be.true() - expect(outgoing.writeBufferLength).to.equal(payload.byteLength) - - stub.restore() - outgoing.abort(new Error('cleanup')) - }) - - it('should not duplicate queued bytes after transient StreamStateError during drain', async () => { - const [outgoing] = await streamPair() - const outgoingStream = outgoing as typeof outgoing & { - sendData(data: Uint8ArrayList): { sentBytes: number, canSendMore: boolean } - } - - const payload = new Uint8ArrayList( - Uint8Array.from([0, 1, 2, 3]), - Uint8Array.from([4, 5, 6, 7]), - Uint8Array.from([8, 9, 10, 11]) - ) - - outgoing.writableNeedsDrain = true - expect(outgoing.send(payload)).to.be.false() - expect(outgoing.writeBufferLength).to.equal(payload.byteLength) - - const streamStateError = new Error('Cannot write to a stream that is closing') - streamStateError.name = 'StreamStateError' - - const sentPayloads: Uint8Array[] = [] - let throwOnce = true - - const stub = Sinon.stub(outgoingStream, 'sendData').callsFake((data: Uint8ArrayList) => { - if (throwOnce) { - throwOnce = false - throw streamStateError - } - - sentPayloads.push(data.subarray()) - - return { - sentBytes: data.byteLength, - canSendMore: true - } - }) - - outgoing.writeStatus = 'closing' - - expect(() => { - outgoing.dispatchEvent(new Event('drain')) - }).to.not.throw() - - // drain handler defers processSendQueue via queueMicrotask - wait for it - await new Promise((resolve) => { queueMicrotask(resolve) }) - - expect(outgoing.writeBufferLength).to.equal(payload.byteLength) + Sinon.stub(outgoingStream, 'sendData').throws(err) - // simulate a later drain event after writableNeedsDrain is set again - outgoing.writableNeedsDrain = true expect(() => { - outgoing.dispatchEvent(new Event('drain')) - }).to.not.throw() - - await new Promise((resolve) => { queueMicrotask(resolve) }) - - expect(stub.callCount).to.equal(2) - expect(outgoing.writeBufferLength).to.equal(0) - expect(sentPayloads).to.have.lengthOf(1) - expect(sentPayloads[0]).to.equalBytes(payload.subarray()) + outgoing.send(Uint8Array.from([0, 1, 2, 3])) + }).to.throw().with.property('name', 'StreamStateError') - stub.restore() outgoing.abort(new Error('cleanup')) }) }) From 03bfe4f34f7acf31fee3624d2fa5310c160bce05 Mon Sep 17 00:00:00 2001 From: tabcat Date: Thu, 14 May 2026 17:54:32 +0700 Subject: [PATCH 6/7] chore: drop redundant guard comment --- packages/utils/src/abstract-message-stream.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/packages/utils/src/abstract-message-stream.ts b/packages/utils/src/abstract-message-stream.ts index 8a09072815..b2ca400c3d 100644 --- a/packages/utils/src/abstract-message-stream.ts +++ b/packages/utils/src/abstract-message-stream.ts @@ -453,7 +453,6 @@ export abstract class AbstractMessageStream Date: Thu, 14 May 2026 17:57:13 +0700 Subject: [PATCH 7/7] chore: drop redundant test comments --- packages/utils/test/stream-utils-test.spec.ts | 15 ++------------- 1 file changed, 2 insertions(+), 13 deletions(-) diff --git a/packages/utils/test/stream-utils-test.spec.ts b/packages/utils/test/stream-utils-test.spec.ts index 7ead70bcbb..9f892ddfb4 100644 --- a/packages/utils/test/stream-utils-test.spec.ts +++ b/packages/utils/test/stream-utils-test.spec.ts @@ -419,16 +419,11 @@ describe('stream-pair', () => { delay: 100 }) - // fill the write buffer to trigger backpressure - while (outgoing.send(Uint8Array.from([0, 1, 2, 3]))) { - // drain the send buffer - } + while (outgoing.send(Uint8Array.from([0, 1, 2, 3]))) {} expect(outgoing.send(Uint8Array.from([4, 5, 6, 7]))).to.be.false() - // set the stream to closed - drain events after this should be ignored outgoing.writeStatus = 'closed' - // dispatching drain on a closed stream should not throw expect(() => { outgoing.dispatchEvent(new Event('drain')) }).to.not.throw() @@ -445,21 +440,15 @@ describe('stream-pair', () => { sendData(data: Uint8ArrayList): { sentBytes: number, canSendMore: boolean } } - // fill the write buffer to trigger backpressure - while (outgoing.send(Uint8Array.from([0, 1, 2, 3]))) { - // drain the send buffer - } + while (outgoing.send(Uint8Array.from([0, 1, 2, 3]))) {} expect(outgoing.send(Uint8Array.from([4, 5, 6, 7]))).to.be.false() - // stub sendData to throw StreamStateError (simulates underlying - // transport closing while the outer stream is still flushing) const err = new Error('Cannot write to a stream that is closing') err.name = 'StreamStateError' Sinon.stub(outgoingStream, 'sendData').throws(err) outgoing.writeStatus = 'closing' - // the drain event should not throw - the error should be caught internally expect(() => { outgoing.dispatchEvent(new Event('drain')) }).to.not.throw()