From fc0a147a3d1df7acc3b4b085b9377c2b5b4d781f Mon Sep 17 00:00:00 2001 From: Sean Chang Date: Sat, 20 Jun 2026 18:45:52 +0800 Subject: [PATCH] Cancel the JS source when a ReadableStream pump is dropped mid-stream After the draining-read pump path replaced PumpToReader (commit c32933263), dropping the pump coroutine while the JS ReadableStream source was suspended awaiting more data no longer cancelled the source. This regressed client-disconnect handling: when the HTTP layer drops the response-body pump, the underlying source's cancel() algorithm never ran and the stream kept producing data until natural completion. Restore the behavior by cancelling the source from pumpToImpl's teardown when the coroutine is dropped before settling. Because the isolate lock is unavailable during coroutine teardown, the cancellation is scheduled as a task, mirroring ~WritableStreamJsRpcAdapter. A pumpSettled flag suppresses the cancel on the normal-completion and error paths, which already finalize the stream themselves. Fixes #6832 --- src/workerd/api/streams-test.c++ | 91 ++++++++++++++++++++++++++++ src/workerd/api/streams/standard.c++ | 24 +++++++- 2 files changed, 113 insertions(+), 2 deletions(-) diff --git a/src/workerd/api/streams-test.c++ b/src/workerd/api/streams-test.c++ index 257f7377613..4f3f611443b 100644 --- a/src/workerd/api/streams-test.c++ +++ b/src/workerd/api/streams-test.c++ @@ -198,5 +198,96 @@ KJ_TEST("ReadableStream pumpTo pending write cancellation regression") { KJ_ASSERT(events[2] == "sink was destroyed"); } +KJ_TEST("ReadableStream pumpTo cancels the JS source when dropped mid-stream") { + // Regression test for https://github.com/cloudflare/workerd/issues/6832. + // + // When the pump is dropped while the JS ReadableStream source is suspended awaiting more + // data (e.g. the client disconnected and the HTTP layer dropped the response-body pump), + // the underlying source's cancel() algorithm must still run. Before the fix, dropping the + // pump coroutine only released the reader lock and the JS source ran to natural completion. + + struct TestSink final: public WritableStreamSink { + kj::Own> gotFirstWrite; + bool fired = false; + TestSink(kj::Own> gotFirstWrite) + : gotFirstWrite(kj::mv(gotFirstWrite)) {} + + void signal() { + if (!fired) { + fired = true; + gotFirstWrite->fulfill(); + } + } + kj::Promise write(kj::ArrayPtr buffer) override { + signal(); + return kj::READY_NOW; + } + kj::Promise write(kj::ArrayPtr> pieces) override { + signal(); + return kj::READY_NOW; + } + kj::Promise end() override { + return kj::READY_NOW; + } + void abort(kj::Exception reason) override {} + }; + + capnp::MallocMessageBuilder flagsBuilder; + auto featureFlags = flagsBuilder.initRoot(); + featureFlags.setStreamsJavaScriptControllers(true); + TestFixture testFixture({.featureFlags = featureFlags.asReader()}); + + // Declared in the test scope (outliving the runInIoContext lambda frame) because the JS + // cancel() callback captures them by reference and runs asynchronously, after the pump is + // dropped. + bool cancelCalled = false; + auto cancelObserved = kj::newPromiseAndFulfiller(); + + testFixture.runInIoContext([&](const TestFixture::Environment& env) -> kj::Promise { + auto& js = jsg::Lock::from(env.isolate); + + auto firstWrite = kj::newPromiseAndFulfiller(); + + auto stream = ReadableStream::constructor(js, + UnderlyingSource{ + .start = + [](jsg::Lock& js, auto controller) { + auto& c = KJ_REQUIRE_NONNULL( + controller.template tryGet>()); + // Enqueue one chunk so the pump makes progress, but don't close the stream. + c->enqueue(js, jsg::JsValue(v8::ArrayBuffer::New(js.v8Isolate, 10))); + return js.resolvedPromise(); + }, + .pull = + [](jsg::Lock& js, auto controller) { + // Never enqueue more, so once the first chunk is drained the pump's next read suspends. + return js.resolvedPromise(); + }, + .cancel = + [&cancelCalled, &cancelObserved](jsg::Lock& js, jsg::JsValue) -> jsg::Promise { + cancelCalled = true; + if (cancelObserved.fulfiller->isWaiting()) { + cancelObserved.fulfiller->fulfill(); + } + return js.resolvedPromise(); + }, + }, + kj::none); + + auto sink = kj::heap(kj::mv(firstWrite.fulfiller)); + auto pump = stream->pumpTo(js, kj::mv(sink), true); + + // Once the first chunk has been written the source is suspended on its next read. Drop the + // pump (simulating the disconnect), then wait for the source's cancel() to run. + return firstWrite.promise.then( + [pump = kj::mv(pump), cancelPromise = kj::mv(cancelObserved.promise)]() mutable { + { auto dropped = kj::mv(pump); } + return kj::mv(cancelPromise); + }); + }); + + KJ_ASSERT(cancelCalled); +} + } // namespace } // namespace workerd::api diff --git a/src/workerd/api/streams/standard.c++ b/src/workerd/api/streams/standard.c++ index 4c7f4adb218..2454c12c82c 100644 --- a/src/workerd/api/streams/standard.c++ +++ b/src/workerd/api/streams/standard.c++ @@ -3356,8 +3356,9 @@ class AllReader { // pumped synchronously as many times as possible. // // The pump loop is a kj coroutine. Dropping the returned kj::Promise drops the -// coroutine frame, which destroys the DrainingReader (releasing the stream lock) -// and the sink. No WeakRef/IoOwn dance is needed because ownership is clear. +// coroutine frame, which destroys the DrainingReader (releasing the stream lock) and the +// sink -- except on a mid-stream drop, where the KJ_DEFER below keeps the reader alive to +// cancel the source first. No WeakRef/IoOwn dance is needed because ownership is clear. // The coroutine that implements the pump loop takes ownership of the DrainingReader // and sink. The jsg::Ref is not passed into the coroutine because // jsg::Ref is disallowed in coroutine parameters; instead, the DrainingReader holds @@ -3369,6 +3370,22 @@ kj::Promise pumpToImpl(IoContext& ioContext, bool writeFailed = false; + // If the coroutine is dropped mid-pump (e.g. the client disconnected and the HTTP layer + // dropped the response-body pump), cancel the source so its JS cancel() runs. We can't take + // the isolate lock during teardown, so schedule it as a task, as ~WritableStreamJsRpcAdapter + // does. The clean exits below set pumpSettled so we don't cancel an already-settled stream. + bool pumpSettled = false; + KJ_DEFER({ + if (!pumpSettled && reader->isAttached()) { + ioContext.addTask(ioContext.run( + [reader = kj::mv(reader)](jsg::Lock& js) mutable -> kj::Promise { + auto& ioContext = IoContext::current(); + auto promise = ioContext.awaitJs(js, reader->cancel(js, kj::none)); + return promise.attach(kj::mv(reader)); + })); + } + }); + KJ_TRY { while (true) { // Perform a draining read to get all synchronously available data if possible @@ -3395,6 +3412,7 @@ kj::Promise pumpToImpl(IoContext& ioContext, if (end) { co_await sink->end(); } + pumpSettled = true; co_return; } } @@ -3409,6 +3427,8 @@ kj::Promise pumpToImpl(IoContext& ioContext, auto error = js.exceptionToJsValue(kj::mv(ex)); return ioContext.awaitJs(js, reader->cancel(js, error.getHandle(js))); }); + // Set only after the cancel above runs; if dropped mid-co_await, KJ_DEFER cancels instead. + pumpSettled = true; kj::throwFatalException(kj::mv(exception)); } }