From ed791ecd8d7f3fb78cf8dd5447264a43bdc9509d Mon Sep 17 00:00:00 2001 From: Nico Vibert Date: Sun, 26 Apr 2026 16:47:24 +0100 Subject: [PATCH] cilium: defer response logging until stream completion The Cilium HTTP access log entry for the response side is currently emitted from encodeHeaders(), before later stages of the response stream complete. This means response-side header mutations performed by downstream filters such as ext_proc are not reflected in the access log entry that the Cilium agent consumes, even though they reach the wire correctly. Defer the response log entry until the response stream has actually completed: - encodeHeaders() now records the response header map and only emits the log entry immediately if end_stream is true on headers. - encodeData() and encodeTrailers() are implemented and call logResponse() once the stream ends. - onStreamComplete() acts as a final safety net. A response_logged_ guard ensures the response log entry is emitted exactly once per stream regardless of which code path triggers it. This is a prerequisite for surfacing ext_proc-injected response metadata (for example, AI usage headers such as token counts added by an external processor on the response path) through the Cilium access log bridge into Hubble. Tests: - New unit coverage in tests/accesslog_test.cc for deferred response logging on stream completion. - New integration coverage in tests/cilium_http_integration_test.cc validating that response-side headers mutated after encodeHeaders() are reflected in the access log entry consumed by Cilium. Signed-off-by: Nico Vibert --- cilium/l7policy.cc | 35 ++++++++++++++++++++-- cilium/l7policy.h | 11 ++++--- tests/accesslog_test.cc | 42 +++++++++++++++++++++++++++ tests/cilium_http_integration_test.cc | 42 +++++++++++++++++++++++++++ 4 files changed, 121 insertions(+), 9 deletions(-) diff --git a/cilium/l7policy.cc b/cilium/l7policy.cc index 6cb49750d..6f52173f9 100644 --- a/cilium/l7policy.cc +++ b/cilium/l7policy.cc @@ -8,6 +8,7 @@ #include #include +#include "envoy/buffer/buffer.h" #include "envoy/common/time.h" #include "envoy/http/codes.h" #include "envoy/http/filter.h" @@ -303,6 +304,20 @@ void AccessFilter::onStreamComplete() { if (log_entry_ && !log_entry_->request_logged_) { config_->log(*log_entry_, ::cilium::EntryType::Request); } + + if (!response_logged_ && response_headers_.has_value()) { + logResponse(); + } +} + +void AccessFilter::logResponse() { + if (response_logged_ || log_entry_ == nullptr || !response_headers_.has_value()) { + return; + } + + log_entry_->updateFromResponse(*response_headers_, config_->time_source_); + config_->log(*log_entry_, ::cilium::EntryType::Response); + response_logged_ = true; } Http::FilterHeadersStatus AccessFilter::encodeHeaders(Http::ResponseHeaderMap& headers, bool) { @@ -374,11 +389,25 @@ Http::FilterHeadersStatus AccessFilter::encodeHeaders(Http::ResponseHeaderMap& h config_->log(*log_entry_, log_type); } - // Log the response - log_entry_->updateFromResponse(headers, config_->time_source_); - config_->log(*log_entry_, ::cilium::EntryType::Response); + response_headers_ = headers; + + // Note: We intentionally do NOT call logResponse() here even when end_stream is true. + // Other encode filters (e.g. ext_proc with BUFFERED body mode) may still mutate + // response headers after we return. We defer logging to onStreamComplete() so that + // the access log captures the final, post-mutation header state regardless of where + // cilium.l7policy sits in the HCM filter chain. return Http::FilterHeadersStatus::Continue; } +Http::FilterDataStatus AccessFilter::encodeData(Buffer::Instance&, bool) { + // See encodeHeaders(): logging is deferred to onStreamComplete(). + return Http::FilterDataStatus::Continue; +} + +Http::FilterTrailersStatus AccessFilter::encodeTrailers(Http::ResponseTrailerMap&) { + // See encodeHeaders(): logging is deferred to onStreamComplete(). + return Http::FilterTrailersStatus::Continue; +} + } // namespace Cilium } // namespace Envoy diff --git a/cilium/l7policy.h b/cilium/l7policy.h index c3ff812d0..9d42d3cca 100644 --- a/cilium/l7policy.h +++ b/cilium/l7policy.h @@ -94,18 +94,15 @@ class AccessFilter : public Http::StreamFilter, } Http::FilterHeadersStatus encodeHeaders(Http::ResponseHeaderMap& headers, bool end_stream) override; - Http::FilterDataStatus encodeData(Buffer::Instance&, bool) override { - return Http::FilterDataStatus::Continue; - } - Http::FilterTrailersStatus encodeTrailers(Http::ResponseTrailerMap&) override { - return Http::FilterTrailersStatus::Continue; - } + Http::FilterDataStatus encodeData(Buffer::Instance&, bool end_stream) override; + Http::FilterTrailersStatus encodeTrailers(Http::ResponseTrailerMap&) override; void setEncoderFilterCallbacks(Http::StreamEncoderFilterCallbacks&) override {} Http::FilterMetadataStatus encodeMetadata(Http::MetadataMap&) override { return Http::FilterMetadataStatus::Continue; } private: + void logResponse(); void sendLocalError(absl::string_view details); ConfigSharedPtr config_; @@ -114,7 +111,9 @@ class AccessFilter : public Http::StreamFilter, AccessLog::Entry* log_entry_ = nullptr; OptRef latched_headers_; + OptRef response_headers_; absl::optional latched_end_stream_; + bool response_logged_ = false; }; } // namespace Cilium diff --git a/tests/accesslog_test.cc b/tests/accesslog_test.cc index bc7c5ffa5..66657760e 100644 --- a/tests/accesslog_test.cc +++ b/tests/accesslog_test.cc @@ -86,5 +86,47 @@ TEST_F(CiliumTest, AccessLog) { EXPECT_STREQ(log.entry_.http().headers(1).value().c_str(), "response"); } +TEST_F(CiliumTest, AccessLogPreservesAIResponseHeaders) { + Http::TestRequestHeaderMapImpl headers{{":method", "POST"}, + {":path", "/v1/chat/completions"}, + {":authority", "model-server"}, + {"x-forwarded-proto", "http"}, + {"x-request-id", "req-123"}}; + Network::MockConnection connection; + auto source_address = std::make_shared("5.6.7.8", 45678); + auto destination_address = std::make_shared("1.2.3.4", 80); + connection.stream_info_.protocol_ = Http::Protocol::Http11; + connection.stream_info_.start_time_ = time_system_.systemTime(); + connection.stream_info_.downstream_connection_info_provider_->setRemoteAddress(source_address); + connection.stream_info_.downstream_connection_info_provider_->setLocalAddress( + destination_address); + + AccessLog::Entry log; + log.initFromRequest("1.2.3.4", 42, true, 1, source_address, 173, destination_address, + connection.stream_info_, headers); + + Http::TestResponseHeaderMapImpl response_headers{{":status", "200"}, + {"x-ai-model", "llama3.1-8b-instruct"}, + {"x-ai-input-tokens", "11"}, + {"x-ai-output-tokens", "6"}, + {"x-ai-total-tokens", "17"}}; + + NiceMock time_source; + log.updateFromResponse(response_headers, time_source); + + EXPECT_EQ(log.entry_.http().status(), 200); + EXPECT_EQ(log.entry_.http().headers_size(), 5); + EXPECT_STREQ(log.entry_.http().headers(0).key().c_str(), "x-request-id"); + EXPECT_STREQ(log.entry_.http().headers(0).value().c_str(), "req-123"); + EXPECT_STREQ(log.entry_.http().headers(1).key().c_str(), "x-ai-model"); + EXPECT_STREQ(log.entry_.http().headers(1).value().c_str(), "llama3.1-8b-instruct"); + EXPECT_STREQ(log.entry_.http().headers(2).key().c_str(), "x-ai-input-tokens"); + EXPECT_STREQ(log.entry_.http().headers(2).value().c_str(), "11"); + EXPECT_STREQ(log.entry_.http().headers(3).key().c_str(), "x-ai-output-tokens"); + EXPECT_STREQ(log.entry_.http().headers(3).value().c_str(), "6"); + EXPECT_STREQ(log.entry_.http().headers(4).key().c_str(), "x-ai-total-tokens"); + EXPECT_STREQ(log.entry_.http().headers(4).value().c_str(), "17"); +} + } // namespace Cilium } // namespace Envoy diff --git a/tests/cilium_http_integration_test.cc b/tests/cilium_http_integration_test.cc index 7dc31c734..064a483cc 100644 --- a/tests/cilium_http_integration_test.cc +++ b/tests/cilium_http_integration_test.cc @@ -903,6 +903,48 @@ TEST_P(CiliumIntegrationTest, AcceptedMethod) { })); } +TEST_P(CiliumIntegrationTest, AcceptedResponsePreservesArbitraryHeaders) { + initialize(); + codec_client_ = makeHttpConnection(lookupPort("http")); + + Http::TestResponseHeaderMapImpl response_headers{ + {":status", "200"}, {"x-ai-model", "llama3.1-8b-instruct"}, {"x-ai-total-tokens", "17"}}; + auto response = sendRequestAndWaitForResponse( + {{":method", "PUT"}, {":path", "/public/opinions"}, {":authority", "host"}}, 0, + response_headers, 0); + + absl::optional maybe_x_request_id; + EXPECT_TRUE(expectAccessLogRequestTo([&maybe_x_request_id](const ::cilium::LogEntry& entry) { + maybe_x_request_id = getHeader(entry.http().headers(), "x-request-id"); + return entry.http().status() == 0; + })); + ASSERT_TRUE(maybe_x_request_id.has_value()); + + absl::optional maybe_x_request_id_resp; + absl::optional maybe_model; + absl::optional maybe_total_tokens; + EXPECT_TRUE(expectAccessLogResponseTo([&maybe_x_request_id_resp, &maybe_model, + &maybe_total_tokens](const ::cilium::LogEntry& entry) { + maybe_x_request_id_resp = getHeader(entry.http().headers(), "x-request-id"); + maybe_model = getHeader(entry.http().headers(), "x-ai-model"); + maybe_total_tokens = getHeader(entry.http().headers(), "x-ai-total-tokens"); + return entry.http().status() == 200; + })); + + ASSERT_TRUE(maybe_x_request_id_resp.has_value()); + EXPECT_EQ(maybe_x_request_id.value(), maybe_x_request_id_resp.value()); + ASSERT_TRUE(maybe_model.has_value()); + EXPECT_EQ("llama3.1-8b-instruct", maybe_model.value()); + ASSERT_TRUE(maybe_total_tokens.has_value()); + EXPECT_EQ("17", maybe_total_tokens.value()); + + EXPECT_TRUE(response->complete()); + EXPECT_EQ("200", response->headers().getStatusValue()); + EXPECT_TRUE(upstream_request_->complete()); + EXPECT_EQ(0, upstream_request_->bodyLength()); + cleanupUpstreamAndDownstream(); +} + TEST_P(CiliumIntegrationTest, L3DeniedPath) { denied({{":method", "GET"}, {":path", "/only-2-allowed"}, {":authority", "host"}});