Skip to content
Merged
12 changes: 11 additions & 1 deletion packages/utils/src/abstract-message-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,11 @@ export abstract class AbstractMessageStream<Timeline extends MessageStreamTimeli
this.writableNeedsDrain = false

queueMicrotask(() => {
this.processSendQueue()
try {
this.processSendQueue()
} catch (err) {
this.log.error('processSendQueue threw - %e', err)
}
})
}

Expand Down Expand Up @@ -449,6 +453,11 @@ export abstract class AbstractMessageStream<Timeline extends MessageStreamTimeli
return true
}

if (this.writeStatus !== 'writable' && this.writeStatus !== 'closing') {
this.log.trace('not processing send queue as stream is %s', this.writeStatus)
return false
}

this.sendingData = true

this.log.trace('processing send queue with %d queued bytes', this.writeBuffer.byteLength)
Expand Down Expand Up @@ -480,6 +489,7 @@ export abstract class AbstractMessageStream<Timeline extends MessageStreamTimeli
// sending data can cause buffers to fill up, events to be emitted and
// this method to be invoked again
const sendResult = this.sendData(toSend)

canSendMore = sendResult.canSendMore
sentBytes += sendResult.sentBytes

Expand Down
60 changes: 60 additions & 0 deletions packages/utils/test/stream-utils-test.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -412,4 +412,64 @@ describe('stream-pair', () => {
await expect(outgoing.onDrain()).to.eventually.be.rejected
.with.property('name', 'StreamResetError')
})

it('should not throw uncaught error on drain event when stream is closed', async () => {
const [outgoing] = await streamPair({
capacity: 1,
delay: 100
})

while (outgoing.send(Uint8Array.from([0, 1, 2, 3]))) {}
expect(outgoing.send(Uint8Array.from([4, 5, 6, 7]))).to.be.false()

outgoing.writeStatus = 'closed'

expect(() => {
outgoing.dispatchEvent(new Event('drain'))
}).to.not.throw()

outgoing.abort(new Error('cleanup'))
})

it('should catch StreamStateError from sendData during closing', async () => {
const [outgoing] = await streamPair({
capacity: 1,
delay: 100
})
const outgoingStream = outgoing as typeof outgoing & {
sendData(data: Uint8ArrayList): { sentBytes: number, canSendMore: boolean }
}

while (outgoing.send(Uint8Array.from([0, 1, 2, 3]))) {}
expect(outgoing.send(Uint8Array.from([4, 5, 6, 7]))).to.be.false()

const err = new Error('Cannot write to a stream that is closing')
err.name = 'StreamStateError'
Sinon.stub(outgoingStream, 'sendData').throws(err)

outgoing.writeStatus = 'closing'

expect(() => {
outgoing.dispatchEvent(new Event('drain'))
}).to.not.throw()

outgoing.abort(new Error('cleanup'))
})

it('should propagate StreamStateError from send() when sendData throws', async () => {
const [outgoing] = await streamPair()
const outgoingStream = outgoing as typeof outgoing & {
sendData(data: Uint8ArrayList): { sentBytes: number, canSendMore: boolean }
}

const err = new Error('Cannot write to a stream that is closing')
err.name = 'StreamStateError'
Sinon.stub(outgoingStream, 'sendData').throws(err)

expect(() => {
outgoing.send(Uint8Array.from([0, 1, 2, 3]))
}).to.throw().with.property('name', 'StreamStateError')

outgoing.abort(new Error('cleanup'))
})
})
Loading