Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 91 additions & 0 deletions src/workerd/api/streams-test.c++
Original file line number Diff line number Diff line change
Expand Up @@ -198,5 +198,96 @@ KJ_TEST("ReadableStream pumpTo pending write cancellation regression") {
KJ_ASSERT(events[2] == "sink was destroyed");
}

KJ_TEST("ReadableStream pumpTo cancels the JS source when dropped mid-stream") {
// Regression test for https://github.com/cloudflare/workerd/issues/6832.
//
// When the pump is dropped while the JS ReadableStream source is suspended awaiting more
// data (e.g. the client disconnected and the HTTP layer dropped the response-body pump),
// the underlying source's cancel() algorithm must still run. Before the fix, dropping the
// pump coroutine only released the reader lock and the JS source ran to natural completion.

struct TestSink final: public WritableStreamSink {
kj::Own<kj::PromiseFulfiller<void>> gotFirstWrite;
bool fired = false;
TestSink(kj::Own<kj::PromiseFulfiller<void>> gotFirstWrite)
: gotFirstWrite(kj::mv(gotFirstWrite)) {}

void signal() {
if (!fired) {
fired = true;
gotFirstWrite->fulfill();
}
}
kj::Promise<void> write(kj::ArrayPtr<const byte> buffer) override {
signal();
return kj::READY_NOW;
}
kj::Promise<void> write(kj::ArrayPtr<const kj::ArrayPtr<const byte>> pieces) override {
signal();
return kj::READY_NOW;
}
kj::Promise<void> end() override {
return kj::READY_NOW;
}
void abort(kj::Exception reason) override {}
};

capnp::MallocMessageBuilder flagsBuilder;
auto featureFlags = flagsBuilder.initRoot<CompatibilityFlags>();
featureFlags.setStreamsJavaScriptControllers(true);
TestFixture testFixture({.featureFlags = featureFlags.asReader()});

// Declared in the test scope (outliving the runInIoContext lambda frame) because the JS
// cancel() callback captures them by reference and runs asynchronously, after the pump is
// dropped.
bool cancelCalled = false;
auto cancelObserved = kj::newPromiseAndFulfiller<void>();

testFixture.runInIoContext([&](const TestFixture::Environment& env) -> kj::Promise<void> {
auto& js = jsg::Lock::from(env.isolate);

auto firstWrite = kj::newPromiseAndFulfiller<void>();

auto stream = ReadableStream::constructor(js,
UnderlyingSource{
.start =
[](jsg::Lock& js, auto controller) {
auto& c = KJ_REQUIRE_NONNULL(
controller.template tryGet<jsg::Ref<ReadableStreamDefaultController>>());
// Enqueue one chunk so the pump makes progress, but don't close the stream.
c->enqueue(js, jsg::JsValue(v8::ArrayBuffer::New(js.v8Isolate, 10)));
return js.resolvedPromise();
},
.pull =
[](jsg::Lock& js, auto controller) {
// Never enqueue more, so once the first chunk is drained the pump's next read suspends.
return js.resolvedPromise();
},
.cancel =
[&cancelCalled, &cancelObserved](jsg::Lock& js, jsg::JsValue) -> jsg::Promise<void> {
cancelCalled = true;
if (cancelObserved.fulfiller->isWaiting()) {
cancelObserved.fulfiller->fulfill();
}
return js.resolvedPromise();
},
},
kj::none);

auto sink = kj::heap<TestSink>(kj::mv(firstWrite.fulfiller));
auto pump = stream->pumpTo(js, kj::mv(sink), true);

// Once the first chunk has been written the source is suspended on its next read. Drop the
// pump (simulating the disconnect), then wait for the source's cancel() to run.
return firstWrite.promise.then(
[pump = kj::mv(pump), cancelPromise = kj::mv(cancelObserved.promise)]() mutable {
{ auto dropped = kj::mv(pump); }
return kj::mv(cancelPromise);
});
});

KJ_ASSERT(cancelCalled);
}

} // namespace
} // namespace workerd::api
24 changes: 22 additions & 2 deletions src/workerd/api/streams/standard.c++
Original file line number Diff line number Diff line change
Expand Up @@ -3356,8 +3356,9 @@ class AllReader {
// pumped synchronously as many times as possible.
//
// The pump loop is a kj coroutine. Dropping the returned kj::Promise drops the
// coroutine frame, which destroys the DrainingReader (releasing the stream lock)
// and the sink. No WeakRef/IoOwn dance is needed because ownership is clear.
// coroutine frame, which destroys the DrainingReader (releasing the stream lock) and the
// sink -- except on a mid-stream drop, where the KJ_DEFER below keeps the reader alive to
// cancel the source first. No WeakRef/IoOwn dance is needed because ownership is clear.
// The coroutine that implements the pump loop takes ownership of the DrainingReader
// and sink. The jsg::Ref<ReadableStream> is not passed into the coroutine because
// jsg::Ref is disallowed in coroutine parameters; instead, the DrainingReader holds
Expand All @@ -3369,6 +3370,22 @@ kj::Promise<void> pumpToImpl(IoContext& ioContext,

bool writeFailed = false;

// If the coroutine is dropped mid-pump (e.g. the client disconnected and the HTTP layer
// dropped the response-body pump), cancel the source so its JS cancel() runs. We can't take
// the isolate lock during teardown, so schedule it as a task, as ~WritableStreamJsRpcAdapter
// does. The clean exits below set pumpSettled so we don't cancel an already-settled stream.
bool pumpSettled = false;
KJ_DEFER({
if (!pumpSettled && reader->isAttached()) {
ioContext.addTask(ioContext.run(
[reader = kj::mv(reader)](jsg::Lock& js) mutable -> kj::Promise<void> {
auto& ioContext = IoContext::current();
auto promise = ioContext.awaitJs(js, reader->cancel(js, kj::none));
return promise.attach(kj::mv(reader));
}));
}
});

KJ_TRY {
while (true) {
// Perform a draining read to get all synchronously available data if possible
Expand All @@ -3395,6 +3412,7 @@ kj::Promise<void> pumpToImpl(IoContext& ioContext,
if (end) {
co_await sink->end();
}
pumpSettled = true;
co_return;
}
}
Expand All @@ -3409,6 +3427,8 @@ kj::Promise<void> pumpToImpl(IoContext& ioContext,
auto error = js.exceptionToJsValue(kj::mv(ex));
return ioContext.awaitJs(js, reader->cancel(js, error.getHandle(js)));
});
// Set only after the cancel above runs; if dropped mid-co_await, KJ_DEFER cancels instead.
pumpSettled = true;
kj::throwFatalException(kj::mv(exception));
}
}
Expand Down
Loading