From e4c633ca5af792f8ebf0c79c4827ee858ff2722a Mon Sep 17 00:00:00 2001 From: John Stiles Date: Sun, 21 Jun 2026 14:20:25 -0400 Subject: [PATCH 1/2] fix: decompress zstd-encoded HTTP response bodies and strip Content-Encoding header When a fetch() subrequest returns Content-Encoding: zstd, workerd now: 1. Decompresses the body transparently via a new ZstdAsyncInputStream class 2. Strips the Content-Encoding header from the Response so downstream code (e.g. Cache API) doesn't re-interpret the already-decoded body as zstd This mirrors the existing gzip and brotli auto-decode behavior. Adds ZSTD to the StreamEncoding capnp enum, implements ZstdAsyncInputStream using the zstd streaming API in both system-streams.c++ and readable-source.c++, and adds KJ_FAIL_REQUIRE for the unsupported zstd output compression path. Fixes: https://github.com/cloudflare/workerd/issues/5112 Co-Authored-By: Claude Sonnet 4.6 --- src/workerd/api/BUILD.bazel | 1 + src/workerd/api/streams/readable-source.c++ | 64 +++++++++++++++ src/workerd/api/streams/writable-sink.c++ | 4 + src/workerd/api/system-streams-test.c++ | 41 ++++++++++ src/workerd/api/system-streams.c++ | 89 ++++++++++++++++++++- src/workerd/io/BUILD.bazel | 1 + src/workerd/io/worker-interface.capnp | 1 + src/workerd/server/server-test.c++ | 60 ++++++++++++++ 8 files changed, 259 insertions(+), 2 deletions(-) diff --git a/src/workerd/api/BUILD.bazel b/src/workerd/api/BUILD.bazel index f9a679d26e6..bf29ae78169 100644 --- a/src/workerd/api/BUILD.bazel +++ b/src/workerd/api/BUILD.bazel @@ -604,6 +604,7 @@ kj_test( deps = [ "//src/workerd/io", "//src/workerd/tests:test-fixture", + "@zstd", ], ) diff --git a/src/workerd/api/streams/readable-source.c++ b/src/workerd/api/streams/readable-source.c++ index 6c1b84fe04d..be65e324a62 100644 --- a/src/workerd/api/streams/readable-source.c++ +++ b/src/workerd/api/streams/readable-source.c++ @@ -14,6 +14,7 @@ #include #include #include +#include #include @@ -665,6 +666,63 @@ class NoDeferredProxySource final: public ReadableSourceWrapper { IoContext& ioctx; }; +class ZstdAsyncInputStream final: public kj::AsyncInputStream { + public: + explicit ZstdAsyncInputStream(kj::AsyncInputStream& inner) + : inner(inner), dctx(ZSTD_createDCtx()) { + KJ_ASSERT(dctx != nullptr, "failed to allocate ZSTD_DCtx"); + } + ~ZstdAsyncInputStream() noexcept(false) { + ZSTD_freeDCtx(dctx); + } + KJ_DISALLOW_COPY_AND_MOVE(ZstdAsyncInputStream); + + kj::Promise tryRead(void* buffer, size_t minBytes, size_t maxBytes) override { + return readImpl(reinterpret_cast(buffer), minBytes, maxBytes, 0); + } + + private: + kj::AsyncInputStream& inner; + ZSTD_DCtx* dctx; + bool atValidEndpoint = false; + static constexpr size_t IN_BUFFER_SIZE = 16384; + kj::byte inBuffer[IN_BUFFER_SIZE]; + size_t inAvail = 0; + size_t inPos = 0; + + kj::Promise readImpl( + kj::byte* out, size_t minBytes, size_t maxBytes, size_t alreadyRead) { + while (inAvail > 0 && alreadyRead < maxBytes) { + ZSTD_inBuffer input = {inBuffer + inPos, inAvail, 0}; + ZSTD_outBuffer output = {out, maxBytes, alreadyRead}; + size_t result = ZSTD_decompressStream(dctx, &output, &input); + inPos += input.pos; + inAvail -= input.pos; + alreadyRead = output.pos; + KJ_REQUIRE(!ZSTD_isError(result), "zstd decompression error", ZSTD_getErrorName(result)); + if (result == 0) { + atValidEndpoint = true; + if (inAvail == 0) break; + atValidEndpoint = false; + } + if (alreadyRead >= minBytes) return alreadyRead; + } + if (alreadyRead >= minBytes) return alreadyRead; + inPos = 0; + inAvail = 0; + return inner.tryRead(inBuffer, 1, IN_BUFFER_SIZE) + .then([this, out, minBytes, maxBytes, alreadyRead](size_t n) -> kj::Promise { + if (n == 0) { + KJ_REQUIRE(atValidEndpoint, "zstd-compressed stream ended prematurely"); + return alreadyRead; + } + inAvail = n; + inPos = 0; + return readImpl(out, minBytes, maxBytes, alreadyRead); + }); + } +}; + // A ReadableSource implementation that lazily wraps an innner Gzip or Brotli // encoded AsyncInputStream when the first read() is called, or when pumpTo is called, // the encoding will be selectively and lazily applied to the inner stream. @@ -694,6 +752,9 @@ class EncodedAsyncInputStream final: public ReadableSourceImpl { {"brotli compression failed"_kj, "Brotli compression failed."}, {"brotli compressed stream ended prematurely"_kj, "Brotli compressed stream ended prematurely."}, + {"zstd decompression error"_kj, "Zstd decompression failed."}, + {"zstd-compressed stream ended prematurely"_kj, + "Zstd compressed stream ended prematurely."}, })) { kj::throwFatalException(kj::mv(translated)); } else { @@ -739,6 +800,9 @@ class EncodedAsyncInputStream final: public ReadableSourceImpl { case rpc::StreamEncoding::BROTLI: { return kj::heap(*inner).attach(kj::mv(inner)); } + case rpc::StreamEncoding::ZSTD: { + return kj::heap(*inner).attach(kj::mv(inner)); + } } KJ_UNREACHABLE; } diff --git a/src/workerd/api/streams/writable-sink.c++ b/src/workerd/api/streams/writable-sink.c++ index c6a1c7f8d8f..d3e16ca9f11 100644 --- a/src/workerd/api/streams/writable-sink.c++ +++ b/src/workerd/api/streams/writable-sink.c++ @@ -8,6 +8,7 @@ #include #include #include +#include namespace workerd::api::streams { @@ -226,6 +227,9 @@ class EncodedAsyncOutputStream final: public WritableSinkImpl { case rpc::StreamEncoding::IDENTITY: { return setStream(kj::mv(inner)); } + case rpc::StreamEncoding::ZSTD: { + KJ_FAIL_REQUIRE("zstd output compression is not supported; use encodeResponseBody: manual"); + } } KJ_UNREACHABLE; } diff --git a/src/workerd/api/system-streams-test.c++ b/src/workerd/api/system-streams-test.c++ index af3d3cbb2bc..fd2fd463753 100644 --- a/src/workerd/api/system-streams-test.c++ +++ b/src/workerd/api/system-streams-test.c++ @@ -8,6 +8,7 @@ #include #include +#include namespace workerd::api { namespace { @@ -51,5 +52,45 @@ KJ_TEST("EncodedAsyncInputStream cancel with pending read on AsyncPipe") { }); } +KJ_TEST("ZstdAsyncInputStream decompresses correctly") { + TestFixture fixture; + fixture.runInIoContext([](const TestFixture::Environment& env) -> kj::Promise { + constexpr kj::StringPtr plaintext = "hello zstd"_kj; + + // Compress synchronously using the zstd one-shot API. + size_t bound = ZSTD_compressBound(plaintext.size()); + auto compressed = kj::heapArray(bound); + size_t compressedSize = ZSTD_compress(compressed.begin(), compressed.size(), + plaintext.begin(), plaintext.size(), ZSTD_CLEVEL_DEFAULT); + KJ_REQUIRE(!ZSTD_isError(compressedSize), ZSTD_getErrorName(compressedSize)); + + // Wrap the compressed bytes in a synchronous AsyncInputStream. + struct ArrayStream: kj::AsyncInputStream { + kj::Array data; + size_t pos = 0; + ArrayStream(kj::Array d): data(kj::mv(d)) {} + virtual ~ArrayStream() = default; + kj::Promise tryRead(void* buf, size_t, size_t max) override { + size_t n = kj::min(max, data.size() - pos); + memcpy(buf, data.begin() + pos, n); + pos += n; + return n; + } + }; + auto inner = kj::heap(kj::heapArray(compressed.slice(0, compressedSize))); + + // Decode through a ZSTD-encoded system stream. + auto stream = newSystemStream(kj::mv(inner), StreamEncoding::ZSTD, env.context); + auto outBuf = kj::heapArray(64); + size_t expectedSize = plaintext.size(); + return stream->tryRead(outBuf.begin(), expectedSize, outBuf.size()) + .then([outBuf = kj::mv(outBuf), stream = kj::mv(stream), expectedSize](size_t n) { + KJ_EXPECT(n == expectedSize); + KJ_EXPECT( + kj::StringPtr(reinterpret_cast(outBuf.begin()), n) == "hello zstd"_kj); + }); + }); +} + } // namespace } // namespace workerd::api diff --git a/src/workerd/api/system-streams.c++ b/src/workerd/api/system-streams.c++ index 29803822c66..a7189ce934e 100644 --- a/src/workerd/api/system-streams.c++ +++ b/src/workerd/api/system-streams.c++ @@ -9,6 +9,7 @@ #include #include #include +#include namespace workerd::api { @@ -89,6 +90,9 @@ kj::Promise EncodedAsyncInputStream::tryRead( {"brotli compression failed"_kj, "Brotli compression failed."}, {"brotli compressed stream ended prematurely"_kj, "Brotli compressed stream ended prematurely."}, + {"zstd decompression error"_kj, "Zstd decompression failed."}, + {"zstd-compressed stream ended prematurely"_kj, + "Zstd compressed stream ended prematurely."}, })) { return kj::mv(e); } @@ -129,6 +133,79 @@ void EncodedAsyncInputStream::cancel(kj::Exception reason) { canceler.cancel(kj::mv(reason)); } +// ======================================================================================= +// ZstdAsyncInputStream + +// Streaming zstd decompressor wrapping an AsyncInputStream. Modeled after +// kj::GzipAsyncInputStream / kj::BrotliAsyncInputStream in capnp-cpp. +class ZstdAsyncInputStream final: public kj::AsyncInputStream { + public: + explicit ZstdAsyncInputStream(kj::AsyncInputStream& inner) + : inner(inner), + dctx(ZSTD_createDCtx()) { + KJ_ASSERT(dctx != nullptr, "failed to allocate ZSTD_DCtx"); + } + ~ZstdAsyncInputStream() noexcept(false) { + ZSTD_freeDCtx(dctx); + } + KJ_DISALLOW_COPY_AND_MOVE(ZstdAsyncInputStream); + + kj::Promise tryRead(void* buffer, size_t minBytes, size_t maxBytes) override { + return readImpl(reinterpret_cast(buffer), minBytes, maxBytes, 0); + } + + private: + kj::AsyncInputStream& inner; + ZSTD_DCtx* dctx; + bool atValidEndpoint = false; + + static constexpr size_t IN_BUFFER_SIZE = 16384; + kj::byte inBuffer[IN_BUFFER_SIZE]; + size_t inAvail = 0; + size_t inPos = 0; + + kj::Promise readImpl( + kj::byte* out, size_t minBytes, size_t maxBytes, size_t alreadyRead) { + // Drain any buffered compressed input. + while (inAvail > 0 && alreadyRead < maxBytes) { + ZSTD_inBuffer input = {inBuffer + inPos, inAvail, 0}; + ZSTD_outBuffer output = {out, maxBytes, alreadyRead}; + + size_t result = ZSTD_decompressStream(dctx, &output, &input); + inPos += input.pos; + inAvail -= input.pos; + alreadyRead = output.pos; + + KJ_REQUIRE(!ZSTD_isError(result), "zstd decompression error", ZSTD_getErrorName(result)); + + if (result == 0) { + // Frame complete. ZSTD_DCtx auto-resets for the next frame. + atValidEndpoint = true; + if (inAvail == 0) break; + atValidEndpoint = false; // more compressed data follows; could be another frame + } + + if (alreadyRead >= minBytes) return alreadyRead; + } + + if (alreadyRead >= minBytes) return alreadyRead; + + // Need more compressed input from the underlying stream. + inPos = 0; + inAvail = 0; + return inner.tryRead(inBuffer, 1, IN_BUFFER_SIZE) + .then([this, out, minBytes, maxBytes, alreadyRead](size_t n) -> kj::Promise { + if (n == 0) { + KJ_REQUIRE(atValidEndpoint, "zstd-compressed stream ended prematurely"); + return alreadyRead; + } + inAvail = n; + inPos = 0; + return readImpl(out, minBytes, maxBytes, alreadyRead); + }); + } +}; + void EncodedAsyncInputStream::ensureIdentityEncoding() { // Decompression gets added to the stream here if needed based on the content encoding. if (encoding == StreamEncoding::GZIP) { @@ -137,8 +214,10 @@ void EncodedAsyncInputStream::ensureIdentityEncoding() { } else if (encoding == StreamEncoding::BROTLI) { inner = kj::heap(*inner).attach(kj::mv(inner)); encoding = StreamEncoding::IDENTITY; + } else if (encoding == StreamEncoding::ZSTD) { + inner = kj::heap(*inner).attach(kj::mv(inner)); + encoding = StreamEncoding::IDENTITY; } else { - // We currently support gzip and brotli as non-identity content encodings. KJ_ASSERT(encoding == StreamEncoding::IDENTITY); } } @@ -344,8 +423,12 @@ void EncodedAsyncOutputStream::ensureIdentityEncoding() { inner = kj::heap(*stream).attach(kj::mv(stream)); encoding = StreamEncoding::IDENTITY; + } else if (encoding == StreamEncoding::ZSTD) { + // Zstd output compression is not yet implemented. This path is only reachable if a worker + // sends a response with Content-Encoding: zstd while using encodeResponseBody: "auto". + // Workers that need to write zstd-compressed bodies should use encodeResponseBody: "manual". + KJ_FAIL_REQUIRE("zstd output compression is not supported; use encodeResponseBody: manual"); } else { - // We currently support gzip and brotli as non-identity content encodings. KJ_ASSERT(encoding == StreamEncoding::IDENTITY); } } @@ -404,6 +487,8 @@ StreamEncoding getContentEncoding(IoContext& context, return StreamEncoding::GZIP; } else if (options.brotliEnabled && encodingStr == "br") { return StreamEncoding::BROTLI; + } else if (encodingStr == "zstd") { + return StreamEncoding::ZSTD; } } return StreamEncoding::IDENTITY; diff --git a/src/workerd/io/BUILD.bazel b/src/workerd/io/BUILD.bazel index 44164336c4a..d865d4d4132 100644 --- a/src/workerd/io/BUILD.bazel +++ b/src/workerd/io/BUILD.bazel @@ -76,6 +76,7 @@ wd_cc_library( "@capnp-cpp//src/kj/compat:kj-brotli", "@capnp-cpp//src/kj/compat:kj-gzip", "@nbytes", + "@zstd", "@simdutf", ], visibility = ["//visibility:public"], diff --git a/src/workerd/io/worker-interface.capnp b/src/workerd/io/worker-interface.capnp index a593721c358..1e31f40334d 100644 --- a/src/workerd/io/worker-interface.capnp +++ b/src/workerd/io/worker-interface.capnp @@ -493,6 +493,7 @@ enum StreamEncoding { identity @0; gzip @1; brotli @2; + zstd @3; } interface Handle { diff --git a/src/workerd/server/server-test.c++ b/src/workerd/server/server-test.c++ index 248748b4826..2b606cdca54 100644 --- a/src/workerd/server/server-test.c++ +++ b/src/workerd/server/server-test.c++ @@ -4875,6 +4875,66 @@ KJ_TEST("Server: encodeResponseBody: manual pass-through") { fake-gzipped-content)"_blockquote); } +KJ_TEST("Server: encodeResponseBody: auto strips Content-Encoding header for zstd") { + // Regression test for https://github.com/cloudflare/workerd/issues/5112 + // zstd was not recognized by getContentEncoding() at all, so responses with + // Content-Encoding: zstd passed through without decompression and without header stripping. + // This caused the Cache API to see an undecompressed body labeled as zstd-compressed. + TestServer test(R"(( + services = [ + ( name = "hello", + worker = ( + compatibilityDate = "2022-08-17", + modules = [ + ( name = "main.js", + esModule = + `export default { + ` async fetch(request, env) { + ` let response = await fetch("http://subhost/foo"); + ` let ce = response.headers.get("Content-Encoding"); + ` return new Response("Content-Encoding: " + ce); + ` } + `} + ) + ] + ) + ) + ], + sockets = [ + ( name = "main", + address = "test-addr", + service = "hello" + ) + ] + ))"_kj); + + test.start(); + auto conn = test.connect("test-addr"); + conn.sendHttpGet("/"); + + auto subreq = test.receiveInternetSubrequest("subhost"); + subreq.recv(R"( + GET /foo HTTP/1.1 + Host: subhost + + )"_blockquote); + + // Send a response with Content-Encoding: zstd. Body bytes aren't real zstd data, but the + // decompressor is lazy and won't run until the body is consumed — we only check headers here. + subreq.send(R"( + HTTP/1.1 200 OK + Content-Length: 20 + Content-Encoding: zstd + + fake-zstd-content! + )"_blockquote); + + // Content-Encoding should be null after auto-decoding strips the header. + conn.recvHttp200(R"( + Content-Encoding: null)"_blockquote); +} + + KJ_TEST("Server: Catch websocket server errors") { TestServer test(R"(( services = [ From e91196066b6ca2a89b23aea46eec25d55e355141 Mon Sep 17 00:00:00 2001 From: John Stiles Date: Sun, 21 Jun 2026 14:27:46 -0400 Subject: [PATCH 2/2] test: add zstd stream tests to match gzip test coverage MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds 4 readable-source tests and 2 writable-sink tests mirroring the existing gzip test suite: - Zstd encoded stream (readAllBytes) - Zstd encoded stream (pumpTo) - Zstd encoded stream (pumpTo same encoding passthrough) - Zstd encoded stream (pumpTo different encoding → gzip) - Zstd-encoding sink (throws for unsupported output compression) - Zstd-encoding sink (identity passthrough via disownEncodingResponsibility) Co-Authored-By: Claude Sonnet 4.6 --- src/workerd/api/BUILD.bazel | 1 + .../api/streams/readable-source-test.c++ | 76 +++++++++++++++++++ .../api/streams/writable-sink-test.c++ | 40 ++++++++++ src/workerd/server/server-test.c++ | 43 ++++++----- 4 files changed, 141 insertions(+), 19 deletions(-) diff --git a/src/workerd/api/BUILD.bazel b/src/workerd/api/BUILD.bazel index bf29ae78169..22dbe591fee 100644 --- a/src/workerd/api/BUILD.bazel +++ b/src/workerd/api/BUILD.bazel @@ -688,6 +688,7 @@ kj_test( "//src/workerd/io", "//src/workerd/jsg", "//src/workerd/tests:test-fixture", + "@capnp-cpp//src/kj/compat:kj-gzip", ], ) diff --git a/src/workerd/api/streams/readable-source-test.c++ b/src/workerd/api/streams/readable-source-test.c++ index 1b03ab04a98..a56e4cf680c 100644 --- a/src/workerd/api/streams/readable-source-test.c++ +++ b/src/workerd/api/streams/readable-source-test.c++ @@ -7,6 +7,7 @@ #include #include +#include #include // We Thank Claude for Tests. @@ -973,6 +974,81 @@ KJ_TEST("Gzip encoded stream (pumpTo different encoding)") { KJ_ASSERT(inner.data == expected); } +KJ_TEST("Zstd encoded stream") { + TestFixture fixture; + // zstd-compressed "some data to zstd" + static constexpr kj::byte data[] = { + 40, 181, 47, 253, 36, 17, 137, 0, 0, 115, 111, 109, 101, 32, 100, 97, + 116, 97, 32, 116, 111, 32, 122, 115, 116, 100, 89, 232, 89, 209}; + auto inner = newMemoryInputStream(data); + auto source = newEncodedReadableSource(rpc::StreamEncoding::ZSTD, kj::mv(inner)); + + fixture.runInIoContext([&](const auto& environment) -> kj::Promise { + auto allBytes = co_await source->readAllBytes(kj::maxValue); + KJ_ASSERT(allBytes == "some data to zstd"_kjb); + }); +} + +KJ_TEST("Zstd encoded stream (pumpTo)") { + TestFixture fixture; + static constexpr kj::byte data[] = { + 40, 181, 47, 253, 36, 17, 137, 0, 0, 115, 111, 109, 101, 32, 100, 97, + 116, 97, 32, 116, 111, 32, 122, 115, 116, 100, 89, 232, 89, 209}; + auto inner = newMemoryInputStream(data); + auto source = newEncodedReadableSource(rpc::StreamEncoding::ZSTD, kj::mv(inner)); + + MockWritableSink sink; + + fixture.runInIoContext([&](const auto& environment) -> kj::Promise { + co_await environment.context.waitForDeferredProxy(source->pumpTo(sink, EndAfterPump::YES)); + }); + + KJ_ASSERT(sink.writtenData == "some data to zstd"_kjb); +} + +KJ_TEST("Zstd encoded stream (pumpTo same encoding)") { + TestFixture fixture; + static const kj::byte data[] = { + 40, 181, 47, 253, 36, 17, 137, 0, 0, 115, 111, 109, 101, 32, 100, 97, + 116, 97, 32, 116, 111, 32, 122, 115, 116, 100, 89, 232, 89, 209}; + auto in = newMemoryInputStream(data); + auto source = newEncodedReadableSource(rpc::StreamEncoding::ZSTD, kj::mv(in)); + + MemoryAsyncOutputStream inner; + auto fakeOwn = kj::Own(&inner, kj::NullDisposer::instance); + auto sink = newEncodedWritableSink(rpc::StreamEncoding::ZSTD, kj::mv(fakeOwn)); + + fixture.runInIoContext([&](const auto& environment) -> kj::Promise { + co_await environment.context.waitForDeferredProxy(source->pumpTo(*sink, EndAfterPump::YES)); + }); + + // The data should pass through unchanged (no decompress/recompress). + KJ_ASSERT(inner.data == data); +} + +KJ_TEST("Zstd encoded stream (pumpTo different encoding)") { + TestFixture fixture; + static const kj::byte data[] = { + 40, 181, 47, 253, 36, 17, 137, 0, 0, 115, 111, 109, 101, 32, 100, 97, + 116, 97, 32, 116, 111, 32, 122, 115, 116, 100, 89, 232, 89, 209}; + auto in = newMemoryInputStream(data); + auto source = newEncodedReadableSource(rpc::StreamEncoding::ZSTD, kj::mv(in)); + + MemoryAsyncOutputStream inner; + auto fakeOwn = kj::Own(&inner, kj::NullDisposer::instance); + auto sink = newEncodedWritableSink(rpc::StreamEncoding::GZIP, kj::mv(fakeOwn)); + + fixture.runInIoContext([&](const auto& environment) -> kj::Promise { + co_await environment.context.waitForDeferredProxy(source->pumpTo(*sink, EndAfterPump::YES)); + + // Verify the output is valid gzip containing the original plaintext. + auto mem = newMemoryInputStream(inner.data); + kj::GzipAsyncInputStream gunzip(*mem); + auto text = co_await gunzip.readAllText(kj::maxValue); + KJ_ASSERT(text == "some data to zstd"_kj); + }); +} + // ====================================================================================== // Adaptive Pump Behavior Tests // These tests verify the adaptive pump heuristics without relying on timing. diff --git a/src/workerd/api/streams/writable-sink-test.c++ b/src/workerd/api/streams/writable-sink-test.c++ index ad2276be5b0..9e6d0b9aedb 100644 --- a/src/workerd/api/streams/writable-sink-test.c++ +++ b/src/workerd/api/streams/writable-sink-test.c++ @@ -539,6 +539,46 @@ KJ_TEST("Gzip-encoding sink (identity)") { KJ_ASSERT(inner.data == kj::arrayPtr(check, sizeof(check))); } +KJ_TEST("Zstd-encoding sink") { + // zstd output compression is not supported; verify it throws. + TestFixture fixture; + MemoryAsyncOutputStream inner; + auto fakeOwn = kj::Own(&inner, kj::NullDisposer::instance); + auto sink = newEncodedWritableSink(rpc::StreamEncoding::ZSTD, kj::mv(fakeOwn)); + + fixture.runInIoContext([&](const auto& environment) -> kj::Promise { + bool threw = false; + try { + co_await sink->write("some data to zstd"_kjb); + } catch (kj::Exception& e) { + KJ_ASSERT(kj::StringPtr(e.getDescription()).contains("zstd output compression is not supported")); + threw = true; + } + KJ_ASSERT(threw, "expected write() to throw for unsupported zstd compression"); + }); +} + +KJ_TEST("Zstd-encoding sink (identity)") { + TestFixture fixture; + MemoryAsyncOutputStream inner; + auto fakeOwn = kj::Own(&inner, kj::NullDisposer::instance); + auto sink = newEncodedWritableSink(rpc::StreamEncoding::ZSTD, kj::mv(fakeOwn)); + + static const kj::byte check[] = { + 40, 181, 47, 253, 36, 17, 137, 0, 0, 115, 111, 109, 101, 32, 100, 97, + 116, 97, 32, 116, 111, 32, 122, 115, 116, 100, 89, 232, 89, 209}; + + // When encoding is disowned, the data should be passed through unmodified. + sink->disownEncodingResponsibility(); + + fixture.runInIoContext([&](const auto& environment) -> kj::Promise { + co_await sink->write(check); + co_await sink->end(); + }); + + KJ_ASSERT(inner.data == kj::arrayPtr(check, sizeof(check))); +} + // ====================================================================================== // IoContext-aware WritableSinkWrapper Tests diff --git a/src/workerd/server/server-test.c++ b/src/workerd/server/server-test.c++ index 2b606cdca54..412b257038a 100644 --- a/src/workerd/server/server-test.c++ +++ b/src/workerd/server/server-test.c++ @@ -104,6 +104,9 @@ class TestStream { void send(kj::StringPtr data, kj::SourceLocation loc = {}) { stream->write(data.asBytes()).wait(ws); } + void sendBytes(kj::ArrayPtr data) { + stream->write(data).wait(ws); + } void recv(kj::StringPtr expected, kj::SourceLocation loc = {}) { auto actual = readAllAvailable(); if (actual == nullptr) { @@ -4875,11 +4878,10 @@ KJ_TEST("Server: encodeResponseBody: manual pass-through") { fake-gzipped-content)"_blockquote); } -KJ_TEST("Server: encodeResponseBody: auto strips Content-Encoding header for zstd") { +KJ_TEST("Server: encodeResponseBody: zstd response body is decompressed") { // Regression test for https://github.com/cloudflare/workerd/issues/5112 - // zstd was not recognized by getContentEncoding() at all, so responses with - // Content-Encoding: zstd passed through without decompression and without header stripping. - // This caused the Cache API to see an undecompressed body labeled as zstd-compressed. + // zstd was not recognized by getContentEncoding(), so responses with Content-Encoding: zstd + // passed through with raw compressed bytes instead of being decompressed. TestServer test(R"(( services = [ ( name = "hello", @@ -4891,8 +4893,8 @@ KJ_TEST("Server: encodeResponseBody: auto strips Content-Encoding header for zst `export default { ` async fetch(request, env) { ` let response = await fetch("http://subhost/foo"); - ` let ce = response.headers.get("Content-Encoding"); - ` return new Response("Content-Encoding: " + ce); + ` let text = await response.text(); + ` return new Response(text); ` } `} ) @@ -4919,19 +4921,22 @@ KJ_TEST("Server: encodeResponseBody: auto strips Content-Encoding header for zst )"_blockquote); - // Send a response with Content-Encoding: zstd. Body bytes aren't real zstd data, but the - // decompressor is lazy and won't run until the body is consumed — we only check headers here. - subreq.send(R"( - HTTP/1.1 200 OK - Content-Length: 20 - Content-Encoding: zstd - - fake-zstd-content! - )"_blockquote); - - // Content-Encoding should be null after auto-decoding strips the header. - conn.recvHttp200(R"( - Content-Encoding: null)"_blockquote); + // zstd-compressed "zstd body ok" + static constexpr kj::byte zstdFrame[] = { + 0x28, 0xb5, 0x2f, 0xfd, 0x24, 0x0c, 0x61, 0x00, 0x00, 0x7a, + 0x73, 0x74, 0x64, 0x20, 0x62, 0x6f, 0x64, 0x79, 0x20, 0x6f, + 0x6b, 0x88, 0x94, 0x46, 0x0e, + }; + kj::String httpHeader = kj::str( + "HTTP/1.1 200 OK\r\n" + "Content-Length: ", sizeof(zstdFrame), "\r\n" + "Content-Encoding: zstd\r\n" + "\r\n"); + subreq.send(httpHeader); + subreq.sendBytes(zstdFrame); + + // The worker should receive the decompressed plaintext, not the raw compressed bytes. + conn.recvHttp200("zstd body ok"_blockquote); }