diff --git a/WORKSPACE b/WORKSPACE index 42f46cefc..51d08f140 100644 --- a/WORKSPACE +++ b/WORKSPACE @@ -42,6 +42,7 @@ git_repository( "@//patches:0003-original_dst_cluster-Avoid-multiple-hosts-for-the-sa.patch", "@//patches:0004-thread_local-reset-slot-in-worker-threads-first.patch", "@//patches:0005-http-header-expose-attribute.patch", + "@//patches:0006-test-integration-Defer-fake-upstream-read-enable-un.patch", ], # // clang-format off: Envoy's format check: Only repository_locations.bzl may contains URL references remote = "https://github.com/envoyproxy/envoy.git", diff --git a/patches/0006-test-integration-Defer-fake-upstream-read-enable-un.patch b/patches/0006-test-integration-Defer-fake-upstream-read-enable-un.patch new file mode 100644 index 000000000..32c28dea1 --- /dev/null +++ b/patches/0006-test-integration-Defer-fake-upstream-read-enable-un.patch @@ -0,0 +1,95 @@ +diff --git a/test/integration/fake_upstream.cc b/test/integration/fake_upstream.cc +--- a/test/integration/fake_upstream.cc ++++ b/test/integration/fake_upstream.cc +@@ -436,6 +436,16 @@ + Network::ReadFilterSharedPtr{new ReadFilter(*this)}); + } + ++void FakeHttpConnection::initialize() { ++ FakeConnectionBase::initialize(); ++ if (shared_connection_.connected() && !shared_connection_.connection().readEnabled()) { ++ // FakeUpstream::consumeConnection() may hand HTTP connections off before the codec/filter ++ // stack is initialized. Re-enable reads only after initialize() has attached the HTTP read ++ // filter, or early request bytes can be consumed without ever creating a FakeStream. ++ shared_connection_.connection().readDisable(false); ++ } ++} ++ + AssertionResult FakeConnectionBase::close(std::chrono::milliseconds timeout) { + ENVOY_LOG(trace, "FakeConnectionBase close"); + if (!shared_connection_.connected()) { +@@ -749,7 +756,8 @@ + // not lazily create for HTTP/3 + if (http_type_ == Http::CodecType::HTTP3) { + quic_connections_.push_back(std::make_unique( +- *this, consumeConnection(), http_type_, time_system_, config_.max_request_headers_kb_, ++ *this, consumeConnection(/*defer_read_enable=*/true), http_type_, time_system_, ++ config_.max_request_headers_kb_, + config_.max_request_headers_count_, config_.headers_with_underscores_action_)); + quic_connections_.back()->initialize(); + } +@@ -820,7 +828,8 @@ + return runOnDispatcherThreadAndWait([&]() { + absl::MutexLock lock(lock_); + connection = std::make_unique( +- *this, consumeConnection(), http_type_, time_system_, config_.max_request_headers_kb_, ++ *this, consumeConnection(/*defer_read_enable=*/true), http_type_, time_system_, ++ config_.max_request_headers_kb_, + config_.max_request_headers_count_, config_.headers_with_underscores_action_); + connection->initialize(); + return AssertionSuccess(); +@@ -857,7 +866,8 @@ + EXPECT_TRUE(upstream.runOnDispatcherThreadAndWait([&]() { + absl::MutexLock lock(&upstream.lock_); + connection = std::make_unique( +- upstream, upstream.consumeConnection(), upstream.http_type_, upstream.timeSystem(), ++ upstream, upstream.consumeConnection(/*defer_read_enable=*/true), ++ upstream.http_type_, upstream.timeSystem(), + Http::DEFAULT_MAX_REQUEST_HEADERS_KB, Http::DEFAULT_MAX_HEADERS_COUNT, + envoy::config::core::v3::HttpProtocolOptions::ALLOW); + connection->initialize(); +@@ -920,7 +930,7 @@ + raw_connection.release(); + } + +-SharedConnectionWrapper& FakeUpstream::consumeConnection() { ++SharedConnectionWrapper& FakeUpstream::consumeConnection(bool defer_read_enable) { + ASSERT(!new_connections_.empty()); + auto* const connection_wrapper = new_connections_.front().get(); + // Skip the thread safety check if the network connection has already been freed since there's no +@@ -930,10 +940,11 @@ + connection_wrapper->moveBetweenLists(new_connections_, consumed_connections_); + if (read_disable_on_new_connection_ && connection_wrapper->connected() && + http_type_ != Http::CodecType::HTTP3 && !disable_and_do_not_enable_) { +- // Re-enable read and early close detection. + auto& connection = connection_wrapper->connection(); + connection.detectEarlyCloseWhenReadDisabled(true); +- connection.readDisable(false); ++ if (!defer_read_enable) { ++ connection.readDisable(false); ++ } + } + return *connection_wrapper; + } +diff --git a/test/integration/fake_upstream.h b/test/integration/fake_upstream.h +--- a/test/integration/fake_upstream.h ++++ b/test/integration/fake_upstream.h +@@ -547,6 +547,8 @@ + envoy::config::core::v3::HttpProtocolOptions::HeadersWithUnderscoresAction + headers_with_underscores_action); + ++ void initialize() override; ++ + ABSL_MUST_USE_RESULT + testing::AssertionResult + waitForNewStream(Event::Dispatcher& client_dispatcher, FakeStreamPtr& stream, +@@ -998,7 +1000,8 @@ + }; + + void threadRoutine(); +- SharedConnectionWrapper& consumeConnection() ABSL_EXCLUSIVE_LOCKS_REQUIRED(lock_); ++ SharedConnectionWrapper& consumeConnection(bool defer_read_enable = false) ++ ABSL_EXCLUSIVE_LOCKS_REQUIRED(lock_); + Network::FilterStatus onRecvDatagram(Network::UdpRecvData& data); + AssertionResult + runOnDispatcherThreadAndWait(std::function cb, diff --git a/tests/accesslog_server.cc b/tests/accesslog_server.cc index d82db94f1..9cffe6165 100644 --- a/tests/accesslog_server.cc +++ b/tests/accesslog_server.cc @@ -16,9 +16,11 @@ namespace Envoy { AccessLogServer::AccessLogServer(const std::string path) - : UDSServer(path, std::bind(&AccessLogServer::msgCallback, this, std::placeholders::_1)) {} + : UDSServer(path, std::bind(&AccessLogServer::msgCallback, this, std::placeholders::_1)) { + startServerThread(); +} -AccessLogServer::~AccessLogServer() = default; +AccessLogServer::~AccessLogServer() { shutdownServerThread(); } void AccessLogServer::clear() { absl::MutexLock lock(&mutex_); diff --git a/tests/accesslog_server.h b/tests/accesslog_server.h index 67a51ab8c..60f06c6fe 100644 --- a/tests/accesslog_server.h +++ b/tests/accesslog_server.h @@ -17,7 +17,7 @@ namespace Envoy { class AccessLogServer : public UDSServer { public: AccessLogServer(const std::string path); - ~AccessLogServer(); + ~AccessLogServer() override; void clear(); absl::optional<::cilium::LogEntry> diff --git a/tests/cilium_http_integration.cc b/tests/cilium_http_integration.cc index 44438e45f..a34d42203 100644 --- a/tests/cilium_http_integration.cc +++ b/tests/cilium_http_integration.cc @@ -33,7 +33,15 @@ CiliumHttpIntegrationTest::CiliumHttpIntegrationTest(const std::string& config) #endif } -CiliumHttpIntegrationTest::~CiliumHttpIntegrationTest() = default; +CiliumHttpIntegrationTest::~CiliumHttpIntegrationTest() { + // Shut down live downstream/upstream traffic before the access log UDS server member is + // destroyed. Otherwise the UDS sink can disappear while HttpIntegrationTest base teardown is + // still closing HTTP connections and the Envoy test server. + cleanupUpstreamAndDownstream(); + codec_client_.reset(); + test_server_.reset(); + fake_upstreams_.clear(); +} void CiliumHttpIntegrationTest::createEnvoy() { // fake upstreams have been created by now, use the port from the 1st upstream diff --git a/tests/cilium_tcp_integration.cc b/tests/cilium_tcp_integration.cc index de660e8a7..40ea6f1a1 100644 --- a/tests/cilium_tcp_integration.cc +++ b/tests/cilium_tcp_integration.cc @@ -40,8 +40,7 @@ const std::string TCP_POLICY_fmt = R"EOF(version_info: "0" )EOF"; CiliumTcpIntegrationTest::CiliumTcpIntegrationTest(const std::string& config) - : BaseIntegrationTest(GetParam(), config), - accessLogServer_(TestEnvironment::unixDomainSocketPath("access_log.sock")) { + : BaseIntegrationTest(GetParam(), config) { enableHalfClose(true); #if 1 for (Logger::Logger& logger : Logger::Registry::loggers()) { diff --git a/tests/cilium_tcp_integration.h b/tests/cilium_tcp_integration.h index bf1efdabb..1f53fd6be 100644 --- a/tests/cilium_tcp_integration.h +++ b/tests/cilium_tcp_integration.h @@ -8,8 +8,6 @@ #include "test/integration/base_integration_test.h" -#include "tests/accesslog_server.h" - namespace Envoy { class CiliumTcpIntegrationTest : public BaseIntegrationTest, @@ -22,8 +20,6 @@ class CiliumTcpIntegrationTest : public BaseIntegrationTest, virtual std::string testPolicyFmt(); void initialize() override; - - AccessLogServer accessLogServer_; }; } // namespace Envoy diff --git a/tests/cilium_tls_http_integration_test.cc b/tests/cilium_tls_http_integration_test.cc index a244cefe4..6037576b2 100644 --- a/tests/cilium_tls_http_integration_test.cc +++ b/tests/cilium_tls_http_integration_test.cc @@ -234,7 +234,6 @@ class CiliumHttpTLSIntegrationTest : public CiliumHttpIntegrationTest { void initialize() override { CiliumHttpIntegrationTest::initialize(); - fake_upstreams_[0]->setReadDisableOnNewConnection(false); // Set up the SSL client. Network::Address::InstanceConstSharedPtr address = diff --git a/tests/cilium_tls_tcp_integration_test.cc b/tests/cilium_tls_tcp_integration_test.cc index 97f68cac0..05999a877 100644 --- a/tests/cilium_tls_tcp_integration_test.cc +++ b/tests/cilium_tls_tcp_integration_test.cc @@ -4,30 +4,25 @@ #include #include #include -#include #include #include #include #include #include -#include #include #include "envoy/buffer/buffer.h" -#include "envoy/common/exception.h" #include "envoy/event/dispatcher.h" -#include "envoy/extensions/transport_sockets/tls/v3/tls.pb.h" #include "envoy/http/codec.h" // IWYU pragma: keep #include "envoy/network/address.h" #include "envoy/network/connection.h" #include "envoy/network/transport_socket.h" +#include "envoy/ssl/connection.h" #include "source/common/buffer/buffer_impl.h" +#include "source/common/buffer/watermark_buffer.h" #include "source/common/common/assert.h" -#include "source/common/stats/isolated_store_impl.h" -#include "source/common/tls/server_context_config_impl.h" -#include "source/common/tls/server_ssl_socket.h" #include "test/integration/fake_upstream.h" #include "test/integration/integration_tcp_client.h" @@ -36,6 +31,7 @@ #include "test/mocks/buffer/mocks.h" #include "test/mocks/server/admin.h" #include "test/test_common/environment.h" +#include "test/test_common/test_time_system.h" #include "test/test_common/utility.h" #include "tests/cilium_tcp_integration.h" @@ -122,42 +118,43 @@ class CiliumTLSIntegrationTest : public CiliumTcpIntegrationTest { payload_reader_ = std::make_shared(*dispatcher_); } - void createUpstreams() override { - if (upstream_tls_) { - auto config = upstreamConfig(); - config.upstream_protocol_ = FakeHttpConnection::Type::HTTP1; - config.enable_half_close_ = true; - fake_upstreams_.emplace_back( - new FakeUpstream(createUpstreamSslContext(), 0, version_, config)); - } else { - CiliumTcpIntegrationTest::createUpstreams(); // maybe BaseIntegrationTest::createUpstreams() + AssertionResult + waitForTlsHandshake(const FakeRawConnection& connection, + std::chrono::milliseconds timeout = TestUtility::DefaultTimeout) { + Event::TestTimeSystem::RealTimeBound bound(timeout); + while (true) { + const auto downstream_timing = connection.connection().streamInfo().downstreamTiming(); + if (downstream_timing.downstreamHandshakeComplete().has_value()) { + return AssertionSuccess(); + } + + // client-side TLS I/O will not progress unless the test-thread dispatcher runs + dispatcher_->run(Event::Dispatcher::RunType::NonBlock); + + timeSystem().advanceTimeWait(std::chrono::milliseconds(1)); + + if (timeout != std::chrono::milliseconds::zero() && !bound.withinBound()) { + const Ssl::ConnectionInfoConstSharedPtr ssl = connection.connection().ssl(); + return AssertionFailure() << "Timed out waiting for TLS handshake. ssl=" << (ssl != nullptr) + << " handshake_complete=" + << downstream_timing.downstreamHandshakeComplete().has_value() + << " tls_version=" << (ssl != nullptr ? ssl->tlsVersion() : "") + << " ciphersuite=" + << (ssl != nullptr ? ssl->ciphersuiteString() : ""); + } } } - // TODO(mattklein123): This logic is duplicated in various places. Cleanup in - // a follow up. + void createUpstreams() override { + auto config = upstreamConfig(); + config.upstream_protocol_ = FakeHttpConnection::Type::HTTP1; + config.enable_half_close_ = true; + fake_upstreams_.emplace_back(new FakeUpstream(createUpstreamSslContext(), 0, version_, config)); + } + Network::DownstreamTransportSocketFactoryPtr createUpstreamSslContext() { - envoy::extensions::transport_sockets::tls::v3::DownstreamTlsContext tls_context; - auto* common_tls_context = tls_context.mutable_common_tls_context(); - auto* tls_cert = common_tls_context->add_tls_certificates(); - tls_cert->mutable_certificate_chain()->set_filename(TestEnvironment::runfilesPath( - fmt::format("test/config/integration/certs/{}cert.pem", upstream_cert_name_))); - tls_cert->mutable_private_key()->set_filename(TestEnvironment::runfilesPath( - fmt::format("test/config/integration/certs/{}key.pem", upstream_cert_name_))); - - auto cfg_or_error = Extensions::TransportSockets::Tls::ServerContextConfigImpl::create( - tls_context, factory_context_, false); - // NOLINTNEXTLINE(performance-unnecessary-copy-initialization) - THROW_IF_NOT_OK(cfg_or_error.status()); - auto cfg = std::move(cfg_or_error.value()); - - static auto* upstream_stats_store = new Stats::IsolatedStoreImpl(); - auto server_or_error = Extensions::TransportSockets::Tls::ServerSslSocketFactory::create( - std::move(cfg), context_manager_, *upstream_stats_store->rootScope(), - std::vector{}); - // NOLINTNEXTLINE(performance-unnecessary-copy-initialization) - THROW_IF_NOT_OK(server_or_error.status()); - return std::move(server_or_error.value()); + return Ssl::createFakeUpstreamSslContext(upstream_cert_name_, context_manager_, + factory_context_); } void setupConnections() { @@ -179,6 +176,10 @@ class CiliumTLSIntegrationTest : public CiliumTcpIntegrationTest { ON_CALL(*client_write_buffer_, drain(_)) .WillByDefault(Invoke(client_write_buffer_, &MockWatermarkBuffer::trackDrains)); return client_write_buffer_; + })) + .WillRepeatedly(Invoke([](std::function below_low, std::function above_high, + std::function above_overflow) -> Buffer::Instance* { + return new Buffer::WatermarkBuffer(below_low, above_high, above_overflow); })); // Set up the SSL client. Network::Address::InstanceConstSharedPtr address = @@ -207,6 +208,9 @@ class CiliumTLSIntegrationTest : public CiliumTcpIntegrationTest { AssertionResult result = fake_upstreams_[0]->waitForRawConnection(fake_upstream_connection); RELEASE_ASSERT(result, result.message()); + // Wait for TLS handshake first to get a clear error signal if it never completes. + ASSERT_TRUE(waitForTlsHandshake(*fake_upstream_connection)); + // Ship some data upstream. Buffer::OwnedImpl buffer(data_to_send_upstream); ssl_client_->write(buffer, false); @@ -232,18 +236,33 @@ class CiliumTLSIntegrationTest : public CiliumTcpIntegrationTest { ssl_client_->dispatcher().run(Event::Dispatcher::RunType::Block); EXPECT_TRUE(payload_reader_->readLastByte()); EXPECT_TRUE(connect_callbacks_.closed()); + + // FakeRawConnection removes its read filter on the fake-upstream dispatcher in its + // destructor, so drop it before we start tearing down the client side and the fixture. + fake_upstream_connection.reset(); + teardownConnections(); + } + + void teardownConnections() { + ssl_client_.reset(); + // Client connection teardown uses deferred delete on the test dispatcher. Flush one + // non-blocking pass before releasing the helper objects that were attached to the connection. + dispatcher_->run(Event::Dispatcher::RunType::NonBlock); + context_.reset(); + client_write_buffer_ = nullptr; + payload_reader_.reset(); + connect_callbacks_.reset(); } // Upstream - bool upstream_tls_{true}; std::string upstream_cert_name_{"upstreamlocalhost"}; // Downstream std::shared_ptr payload_reader_; MockWatermarkBuffer* client_write_buffer_; Network::UpstreamTransportSocketFactoryPtr context_; - Network::ClientConnectionPtr ssl_client_; ConnectionStatusCallbacks connect_callbacks_; + Network::ClientConnectionPtr ssl_client_; }; // upstream_tls_context tructed_ca from test/config/integration/certs/upstreamcacert.pem @@ -293,6 +312,8 @@ TEST_P(CiliumTLSProxyIntegrationTest, CiliumTLSProxyUpstreamWritesFirst) { FakeRawConnectionPtr fake_upstream_connection; ASSERT_TRUE(fake_upstreams_[0]->waitForRawConnection(fake_upstream_connection)); + // Wait for TLS handshake first to get a clear error signal if it never completes. + ASSERT_TRUE(waitForTlsHandshake(*fake_upstream_connection)); ASSERT_TRUE(fake_upstream_connection->write("hello")); tcp_client->waitForData("hello"); @@ -316,6 +337,8 @@ TEST_P(CiliumTLSProxyIntegrationTest, CiliumTLSProxyUpstreamDisconnect) { FakeRawConnectionPtr fake_upstream_connection; ASSERT_TRUE(fake_upstreams_[0]->waitForRawConnection(fake_upstream_connection)); + // Wait for TLS handshake first to get a clear error signal if it never completes. + ASSERT_TRUE(waitForTlsHandshake(*fake_upstream_connection)); ASSERT_TRUE(fake_upstream_connection->waitForData(5)); ASSERT_TRUE(fake_upstream_connection->write("world")); @@ -336,6 +359,8 @@ TEST_P(CiliumTLSProxyIntegrationTest, CiliumTcpProxyDownstreamDisconnect) { FakeRawConnectionPtr fake_upstream_connection; ASSERT_TRUE(fake_upstreams_[0]->waitForRawConnection(fake_upstream_connection)); + // Wait for TLS handshake first to get a clear error signal if it never completes. + ASSERT_TRUE(waitForTlsHandshake(*fake_upstream_connection)); ASSERT_TRUE(fake_upstream_connection->waitForData(5)); ASSERT_TRUE(fake_upstream_connection->write("world")); @@ -358,6 +383,8 @@ TEST_P(CiliumTLSProxyIntegrationTest, CiliumTLSProxyLargeWrite) { FakeRawConnectionPtr fake_upstream_connection; ASSERT_TRUE(fake_upstreams_[0]->waitForRawConnection(fake_upstream_connection)); + // Wait for TLS handshake first to get a clear error signal if it never completes. + ASSERT_TRUE(waitForTlsHandshake(*fake_upstream_connection)); ASSERT_TRUE(fake_upstream_connection->waitForData(data.size())); ASSERT_TRUE(fake_upstream_connection->write(data)); @@ -396,6 +423,9 @@ TEST_P(CiliumTLSProxyIntegrationTest, CiliumTLSProxyDownstreamFlush) { FakeRawConnectionPtr fake_upstream_connection; ASSERT_TRUE(fake_upstreams_[0]->waitForRawConnection(fake_upstream_connection)); + // Disabling read before the TLS handshake completes can stall the connection. + ASSERT_TRUE(waitForTlsHandshake(*fake_upstream_connection)); + tcp_client->readDisable(true); ASSERT_TRUE(tcp_client->write("", true)); @@ -439,10 +469,8 @@ TEST_P(CiliumTLSProxyIntegrationTest, CiliumTLSProxyUpstreamFlush) { FakeRawConnectionPtr fake_upstream_connection; ASSERT_TRUE(fake_upstreams_[0]->waitForRawConnection(fake_upstream_connection)); - // Disabling read does not let the TLS handshake to finish. We should be able - // to wait for ConnectionEvent::Connected, which is raised after the TLS - // handshake has completed, but just wait for a while instead for now. - usleep(100000); // NO_CHECK_FORMAT(real_time) + // Disabling read before the TLS handshake completes can stall the connection. + ASSERT_TRUE(waitForTlsHandshake(*fake_upstream_connection)); ASSERT_TRUE(fake_upstream_connection->readDisable(true)); ASSERT_TRUE(fake_upstream_connection->write("", true)); @@ -481,10 +509,8 @@ TEST_P(CiliumTLSProxyIntegrationTest, CiliumTLSProxyUpstreamFlushEnvoyExit) { FakeRawConnectionPtr fake_upstream_connection; ASSERT_TRUE(fake_upstreams_[0]->waitForRawConnection(fake_upstream_connection)); - // Disabling read does not let the TLS handshake to finish. We should be able - // to wait for ConnectionEvent::Connected, which is raised after the TLS - // handshake has completed, but just wait for a while instead for now. - usleep(100000); // NO_CHECK_FORMAT(real_time) + // Disabling read before the TLS handshake completes can stall the connection. + ASSERT_TRUE(waitForTlsHandshake(*fake_upstream_connection)); ASSERT_TRUE(fake_upstream_connection->readDisable(true)); ASSERT_TRUE(fake_upstream_connection->write("", true)); @@ -617,6 +643,8 @@ TEST_P(CiliumDownstreamTLSIntegrationTest, DownstreamHalfClose) { FakeRawConnectionPtr fake_upstream_connection; AssertionResult result = fake_upstreams_[0]->waitForRawConnection(fake_upstream_connection); RELEASE_ASSERT(result, result.message()); + // Wait for TLS handshake first to get a clear error signal if it never completes. + ASSERT_TRUE(waitForTlsHandshake(*fake_upstream_connection)); Buffer::OwnedImpl empty_buffer; ssl_client_->write(empty_buffer, true); @@ -634,6 +662,10 @@ TEST_P(CiliumDownstreamTLSIntegrationTest, DownstreamHalfClose) { ssl_client_->dispatcher().run(Event::Dispatcher::RunType::Block); EXPECT_TRUE(payload_reader_->readLastByte()); EXPECT_TRUE(connect_callbacks_.closed()); + // Drop the fake-upstream wrapper before client-side teardown so its read-filter removal runs + // while the fake-upstream dispatcher is still unquestionably alive. + fake_upstream_connection.reset(); + teardownConnections(); } // Test that a half-close on the upstream side is proxied correctly. @@ -643,6 +675,8 @@ TEST_P(CiliumDownstreamTLSIntegrationTest, UpstreamHalfClose) { FakeRawConnectionPtr fake_upstream_connection; AssertionResult result = fake_upstreams_[0]->waitForRawConnection(fake_upstream_connection); RELEASE_ASSERT(result, result.message()); + // Wait for TLS handshake first to get a clear error signal if it never completes. + ASSERT_TRUE(waitForTlsHandshake(*fake_upstream_connection)); ASSERT_TRUE(fake_upstream_connection->write("", true)); ssl_client_->dispatcher().run(Event::Dispatcher::RunType::Block); @@ -664,6 +698,10 @@ TEST_P(CiliumDownstreamTLSIntegrationTest, UpstreamHalfClose) { } ASSERT_TRUE(fake_upstream_connection->waitForHalfClose()); ASSERT_TRUE(fake_upstream_connection->waitForDisconnect()); + // Drop the fake-upstream wrapper before client-side teardown so its read-filter removal runs + // while the fake-upstream dispatcher is still unquestionably alive. + fake_upstream_connection.reset(); + teardownConnections(); } } // namespace Cilium diff --git a/tests/cilium_websocket_decap_integration_test.cc b/tests/cilium_websocket_decap_integration_test.cc index 191b20a84..d7a7055f4 100644 --- a/tests/cilium_websocket_decap_integration_test.cc +++ b/tests/cilium_websocket_decap_integration_test.cc @@ -67,7 +67,6 @@ const std::string cilium_tcp_proxy_config_fmt = R"EOF( - name: cilium.network.websocket.server typed_config: "@type": type.googleapis.com/cilium.WebSocketServer - access_log_path: "{{ test_udsdir }}/access_log.sock" origin: "jarno.cilium.rocks" - name: cilium.network typed_config: @@ -363,12 +362,11 @@ TEST_P(CiliumWebSocketIntegrationTest, AcceptedWebSocket) { ASSERT_TRUE(fake_upstream_connection->close()); ASSERT_TRUE(fake_upstream_connection->waitForDisconnect()); - // Wait for websocket close frame + // Wait for websocket close frame, has to be exactly 2 bytes response->waitForBodyData(2); absl::string_view close_frame{"\x88\0", 2}; ASSERT_EQ(response->body(), close_frame); - - cleanupUpstreamAndDownstream(); + codec_client_->close(); } } // namespace Envoy diff --git a/tests/health_check_sink_server.cc b/tests/health_check_sink_server.cc index a5ed3ac94..7a2915b05 100644 --- a/tests/health_check_sink_server.cc +++ b/tests/health_check_sink_server.cc @@ -18,9 +18,10 @@ namespace Envoy { HealthCheckSinkServer::HealthCheckSinkServer(const std::string path) : UDSServer(path, std::bind(&HealthCheckSinkServer::msgCallback, this, std::placeholders::_1)) { + startServerThread(); } -HealthCheckSinkServer::~HealthCheckSinkServer() = default; +HealthCheckSinkServer::~HealthCheckSinkServer() { shutdownServerThread(); } void HealthCheckSinkServer::clear() { absl::MutexLock lock(&mutex_); diff --git a/tests/health_check_sink_server.h b/tests/health_check_sink_server.h index 72851ea18..4230e0153 100644 --- a/tests/health_check_sink_server.h +++ b/tests/health_check_sink_server.h @@ -19,7 +19,7 @@ namespace Envoy { class HealthCheckSinkServer : public UDSServer { public: HealthCheckSinkServer(const std::string path); - ~HealthCheckSinkServer(); + ~HealthCheckSinkServer() override; void clear(); absl::optional diff --git a/tests/uds_server.cc b/tests/uds_server.cc index d7599eba4..581100644 100644 --- a/tests/uds_server.cc +++ b/tests/uds_server.cc @@ -1,7 +1,6 @@ #include "tests/uds_server.h" #include -#include #include #include @@ -23,7 +22,7 @@ namespace Envoy { UDSServer::UDSServer(const std::string& path, std::function cb) : msg_cb_(cb), addr_(THROW_OR_RETURN_VALUE(Network::Address::PipeInstance::create(path), std::unique_ptr)), - fd2_(-1) { + fd_(-1), fd2_(-1) { ENVOY_LOG(trace, "Creating unix domain socket server: {}", addr_->asStringView()); if (!addr_->pipe()->abstractNamespace()) { ::unlink(addr_->asString().c_str()); @@ -37,38 +36,52 @@ UDSServer::UDSServer(const std::string& path, std::functionasStringView()); if (::bind(fd_, addr_->sockAddr(), addr_->sockAddrLen()) == -1) { ENVOY_LOG(warn, "Bind to {} failed: {}", addr_->asStringView(), Envoy::errorDetails(errno)); - close(); + ::close(fd_); + fd_ = -1; return; } ENVOY_LOG(trace, "Listening on {}", addr_->asStringView()); if (::listen(fd_, 5) == -1) { ENVOY_LOG(warn, "Listen on {} failed: {}", addr_->asStringView(), Envoy::errorDetails(errno)); - close(); + ::close(fd_); + fd_ = -1; + if (!addr_->pipe()->abstractNamespace()) { + ::unlink(addr_->asString().c_str()); + } return; } +} - ENVOY_LOG(trace, "Starting unix domain socket server thread fd: {}", fd_.load()); +UDSServer::~UDSServer() { shutdownServerThread(); } +void UDSServer::startServerThread() { + if (fd_ < 0 || thread_ != nullptr) { + return; + } + ENVOY_LOG(trace, "Starting unix domain socket server thread fd: {}", fd_.load()); thread_ = Thread::threadFactoryForTest().createThread([this]() { threadRoutine(); }); } -UDSServer::~UDSServer() { - if (fd_ >= 0) { - close(); +void UDSServer::shutdownServerThread() { + const int fd = fd_.exchange(-1); + const int fd2 = fd2_.exchange(-1); + + if (fd2 >= 0) { + ::shutdown(fd2, SHUT_RD); + ::close(fd2); + } + if (fd >= 0) { + ::shutdown(fd, SHUT_RD); + errno = 0; + ::close(fd); + } + if (thread_ != nullptr) { ENVOY_LOG(trace, "Waiting on unix domain socket server to close: {}", Envoy::errorDetails(errno)); thread_->join(); thread_.reset(); } -} - -void UDSServer::close() { - ::shutdown(fd_, SHUT_RD); - ::shutdown(fd2_, SHUT_RD); - errno = 0; - ::close(fd_); - fd_ = -1; if (!addr_->pipe()->abstractNamespace()) { ::unlink(addr_->asString().c_str()); } @@ -112,8 +125,10 @@ void UDSServer::threadRoutine() { } } } - ::close(fd2_); - fd2_ = -1; + const int fd2 = fd2_.exchange(-1); + if (fd2 >= 0) { + ::close(fd2); + } } } diff --git a/tests/uds_server.h b/tests/uds_server.h index 68028d97b..20767f1dc 100644 --- a/tests/uds_server.h +++ b/tests/uds_server.h @@ -15,10 +15,15 @@ namespace Envoy { class UDSServer : public Logger::Loggable { public: UDSServer(const std::string& path, std::function cb); - ~UDSServer(); + virtual ~UDSServer(); + +protected: + // Derived classes bind callbacks into their own state, so start the server thread only after + // the derived object has finished constructing. + void startServerThread(); + void shutdownServerThread(); private: - void close(); void threadRoutine(); std::function msg_cb_;