diff --git a/src/workerd/api/streams-test.c++ b/src/workerd/api/streams-test.c++ index 257f7377613..4f3f611443b 100644 --- a/src/workerd/api/streams-test.c++ +++ b/src/workerd/api/streams-test.c++ @@ -198,5 +198,96 @@ KJ_TEST("ReadableStream pumpTo pending write cancellation regression") { KJ_ASSERT(events[2] == "sink was destroyed"); } +KJ_TEST("ReadableStream pumpTo cancels the JS source when dropped mid-stream") { + // Regression test for https://github.com/cloudflare/workerd/issues/6832. + // + // When the pump is dropped while the JS ReadableStream source is suspended awaiting more + // data (e.g. the client disconnected and the HTTP layer dropped the response-body pump), + // the underlying source's cancel() algorithm must still run. Before the fix, dropping the + // pump coroutine only released the reader lock and the JS source ran to natural completion. + + struct TestSink final: public WritableStreamSink { + kj::Own> gotFirstWrite; + bool fired = false; + TestSink(kj::Own> gotFirstWrite) + : gotFirstWrite(kj::mv(gotFirstWrite)) {} + + void signal() { + if (!fired) { + fired = true; + gotFirstWrite->fulfill(); + } + } + kj::Promise write(kj::ArrayPtr buffer) override { + signal(); + return kj::READY_NOW; + } + kj::Promise write(kj::ArrayPtr> pieces) override { + signal(); + return kj::READY_NOW; + } + kj::Promise end() override { + return kj::READY_NOW; + } + void abort(kj::Exception reason) override {} + }; + + capnp::MallocMessageBuilder flagsBuilder; + auto featureFlags = flagsBuilder.initRoot(); + featureFlags.setStreamsJavaScriptControllers(true); + TestFixture testFixture({.featureFlags = featureFlags.asReader()}); + + // Declared in the test scope (outliving the runInIoContext lambda frame) because the JS + // cancel() callback captures them by reference and runs asynchronously, after the pump is + // dropped. + bool cancelCalled = false; + auto cancelObserved = kj::newPromiseAndFulfiller(); + + testFixture.runInIoContext([&](const TestFixture::Environment& env) -> kj::Promise { + auto& js = jsg::Lock::from(env.isolate); + + auto firstWrite = kj::newPromiseAndFulfiller(); + + auto stream = ReadableStream::constructor(js, + UnderlyingSource{ + .start = + [](jsg::Lock& js, auto controller) { + auto& c = KJ_REQUIRE_NONNULL( + controller.template tryGet>()); + // Enqueue one chunk so the pump makes progress, but don't close the stream. + c->enqueue(js, jsg::JsValue(v8::ArrayBuffer::New(js.v8Isolate, 10))); + return js.resolvedPromise(); + }, + .pull = + [](jsg::Lock& js, auto controller) { + // Never enqueue more, so once the first chunk is drained the pump's next read suspends. + return js.resolvedPromise(); + }, + .cancel = + [&cancelCalled, &cancelObserved](jsg::Lock& js, jsg::JsValue) -> jsg::Promise { + cancelCalled = true; + if (cancelObserved.fulfiller->isWaiting()) { + cancelObserved.fulfiller->fulfill(); + } + return js.resolvedPromise(); + }, + }, + kj::none); + + auto sink = kj::heap(kj::mv(firstWrite.fulfiller)); + auto pump = stream->pumpTo(js, kj::mv(sink), true); + + // Once the first chunk has been written the source is suspended on its next read. Drop the + // pump (simulating the disconnect), then wait for the source's cancel() to run. + return firstWrite.promise.then( + [pump = kj::mv(pump), cancelPromise = kj::mv(cancelObserved.promise)]() mutable { + { auto dropped = kj::mv(pump); } + return kj::mv(cancelPromise); + }); + }); + + KJ_ASSERT(cancelCalled); +} + } // namespace } // namespace workerd::api diff --git a/src/workerd/api/streams/standard.c++ b/src/workerd/api/streams/standard.c++ index 4c7f4adb218..2454c12c82c 100644 --- a/src/workerd/api/streams/standard.c++ +++ b/src/workerd/api/streams/standard.c++ @@ -3356,8 +3356,9 @@ class AllReader { // pumped synchronously as many times as possible. // // The pump loop is a kj coroutine. Dropping the returned kj::Promise drops the -// coroutine frame, which destroys the DrainingReader (releasing the stream lock) -// and the sink. No WeakRef/IoOwn dance is needed because ownership is clear. +// coroutine frame, which destroys the DrainingReader (releasing the stream lock) and the +// sink -- except on a mid-stream drop, where the KJ_DEFER below keeps the reader alive to +// cancel the source first. No WeakRef/IoOwn dance is needed because ownership is clear. // The coroutine that implements the pump loop takes ownership of the DrainingReader // and sink. The jsg::Ref is not passed into the coroutine because // jsg::Ref is disallowed in coroutine parameters; instead, the DrainingReader holds @@ -3369,6 +3370,22 @@ kj::Promise pumpToImpl(IoContext& ioContext, bool writeFailed = false; + // If the coroutine is dropped mid-pump (e.g. the client disconnected and the HTTP layer + // dropped the response-body pump), cancel the source so its JS cancel() runs. We can't take + // the isolate lock during teardown, so schedule it as a task, as ~WritableStreamJsRpcAdapter + // does. The clean exits below set pumpSettled so we don't cancel an already-settled stream. + bool pumpSettled = false; + KJ_DEFER({ + if (!pumpSettled && reader->isAttached()) { + ioContext.addTask(ioContext.run( + [reader = kj::mv(reader)](jsg::Lock& js) mutable -> kj::Promise { + auto& ioContext = IoContext::current(); + auto promise = ioContext.awaitJs(js, reader->cancel(js, kj::none)); + return promise.attach(kj::mv(reader)); + })); + } + }); + KJ_TRY { while (true) { // Perform a draining read to get all synchronously available data if possible @@ -3395,6 +3412,7 @@ kj::Promise pumpToImpl(IoContext& ioContext, if (end) { co_await sink->end(); } + pumpSettled = true; co_return; } } @@ -3409,6 +3427,8 @@ kj::Promise pumpToImpl(IoContext& ioContext, auto error = js.exceptionToJsValue(kj::mv(ex)); return ioContext.awaitJs(js, reader->cancel(js, error.getHandle(js))); }); + // Set only after the cancel above runs; if dropped mid-co_await, KJ_DEFER cancels instead. + pumpSettled = true; kj::throwFatalException(kj::mv(exception)); } }