From 80c0b9767cb28cbbfeb0286dc75aad5410cd884a Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Thu, 23 Apr 2026 12:35:05 -0600 Subject: [PATCH 1/7] connection: add drop notification callback Signed-off-by: Eduardo Silva --- include/fluent-bit/flb_connection.h | 14 ++++++++++++++ src/flb_connection.c | 2 +- 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/include/fluent-bit/flb_connection.h b/include/fluent-bit/flb_connection.h index 53f28b974e4..31801e71e09 100644 --- a/include/fluent-bit/flb_connection.h +++ b/include/fluent-bit/flb_connection.h @@ -58,12 +58,26 @@ struct flb_net_setup; struct flb_upstream; struct flb_downstream; struct flb_tls_session; +struct flb_connection; + +typedef void (*flb_connection_drop_notification_callback)( + struct flb_connection *connection); /* Base network connection */ struct flb_connection { struct mk_event event; void *user_data; + /* + * Optional notification invoked from prepare_destroy_conn() while the + * connection is still linked on busy_queue and before the event is + * deregistered and the file descriptor is closed. + * + * Callers may detach external state here, but must not free, destroy or + * unlink the connection because prepare_destroy_conn() performs the final + * teardown immediately after the callback returns. + */ + flb_connection_drop_notification_callback drop_notification_callback; /* Socket */ flb_sockfd_t fd; diff --git a/src/flb_connection.c b/src/flb_connection.c index a3ed402651b..34b1cc229c7 100644 --- a/src/flb_connection.c +++ b/src/flb_connection.c @@ -254,4 +254,4 @@ void flb_connection_unset_io_timeout(struct flb_connection *connection) assert(connection != NULL); connection->ts_io_timeout = -1; -} \ No newline at end of file +} From 276ed0630d985a3efabfd8e38992cb3a04e5656b Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Thu, 23 Apr 2026 12:35:08 -0600 Subject: [PATCH 2/7] downstream: notify before destroying connections Signed-off-by: Eduardo Silva --- src/flb_downstream.c | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/flb_downstream.c b/src/flb_downstream.c index acef2f2cd4c..f53805e1931 100644 --- a/src/flb_downstream.c +++ b/src/flb_downstream.c @@ -209,6 +209,10 @@ static int prepare_destroy_conn(struct flb_connection *connection) flb_trace("[downstream] destroy connection #%i to %s", connection->fd, flb_connection_get_remote_address(connection)); + if (connection->drop_notification_callback != NULL) { + connection->drop_notification_callback(connection); + } + if (MK_EVENT_IS_REGISTERED((&connection->event))) { mk_event_del(connection->evl, &connection->event); } From 594e37cc8cdaabc79b8ade07a0fef97159d9f820 Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Thu, 23 Apr 2026 12:35:11 -0600 Subject: [PATCH 3/7] http_server: harden stalled client cleanup Signed-off-by: Eduardo Silva --- .../fluent-bit/http_server/flb_http_server.h | 9 ++ src/http_server/flb_http_server.c | 120 ++++++++++++++++-- src/http_server/flb_http_server_config_map.c | 5 + 3 files changed, 126 insertions(+), 8 deletions(-) diff --git a/include/fluent-bit/http_server/flb_http_server.h b/include/fluent-bit/http_server/flb_http_server.h index 83bb10fb90b..d6db0e0f1b2 100755 --- a/include/fluent-bit/http_server/flb_http_server.h +++ b/include/fluent-bit/http_server/flb_http_server.h @@ -38,6 +38,7 @@ #define HTTP_SERVER_INITIAL_BUFFER_SIZE (10 * 1024) #define HTTP_SERVER_MAXIMUM_BUFFER_SIZE (10 * (1000 * 1024)) +#define HTTP_SERVER_DEFAULT_IDLE_TIMEOUT 10 /* seconds */ #define FLB_HTTP_SERVER_FLAG_KEEPALIVE (((uint64_t) 1) << 0) #define FLB_HTTP_SERVER_FLAG_AUTO_DEFLATE (((uint64_t) 1) << 1) @@ -70,6 +71,7 @@ struct flb_input_instance; struct flb_http_server_config { int http2; + int idle_timeout; /* seconds */ size_t buffer_max_size; size_t buffer_chunk_size; size_t max_connections; @@ -92,6 +94,7 @@ struct flb_http_server_options { struct mk_event_loop *event_loop; struct flb_config *system_context; + int idle_timeout; /* seconds */ size_t buffer_max_size; size_t buffer_chunk_size; size_t max_connections; @@ -131,6 +134,7 @@ struct flb_http_server { uint64_t flags; int status; int protocol_version; + int idle_timeout; /* seconds */ struct flb_downstream *downstream; struct cfl_list clients; flb_http_server_request_processor_callback request_callback; @@ -161,6 +165,7 @@ struct flb_http_server_session { size_t read_buffer_size; int releasable; + int drop_pending; struct flb_connection *connection; struct flb_http_server *parent; @@ -226,6 +231,10 @@ void flb_http_server_set_buffer_max_size(struct flb_http_server *server, size_t size_t flb_http_server_get_buffer_max_size(struct flb_http_server *server); +const struct flb_net_setup * +flb_http_server_runtime_worker_net_setup_get(struct flb_http_server *server, + int worker_id); + /* HTTP SESSION */ int flb_http_server_session_init(struct flb_http_server_session *session, int version); diff --git a/src/http_server/flb_http_server.c b/src/http_server/flb_http_server.c index 96bf24d149f..2b20635fc7b 100644 --- a/src/http_server/flb_http_server.c +++ b/src/http_server/flb_http_server.c @@ -52,6 +52,20 @@ struct flb_http_server_runtime { int worker_count; }; +const struct flb_net_setup * +flb_http_server_runtime_worker_net_setup_get(struct flb_http_server *server, + int worker_id) +{ + if (server == NULL || + server->runtime == NULL || + worker_id < 0 || + worker_id >= server->runtime->worker_count) { + return NULL; + } + + return &server->runtime->workers[worker_id].net_setup; +} + static void flb_http_server_runtime_stop(struct flb_http_server *session); static void flb_http_server_worker_context_reset( @@ -98,6 +112,51 @@ static const char *flb_http_server_get_alpn_string(struct flb_http_server *sessi return "http/1.0"; } +static void flb_http_server_connection_drop(struct flb_connection *connection) +{ + struct flb_http_server_session *session; + + if (connection == NULL) { + return; + } + + session = connection->user_data; + + if (session != NULL && + session->connection == connection) { + session->connection = NULL; + session->drop_pending = FLB_TRUE; + connection->event.data = session; + } + else { + connection->event.data = NULL; + } + + connection->user_data = NULL; + connection->drop_notification_callback = NULL; +} + +static void flb_http_server_reap_stale_sessions(struct flb_http_server *server) +{ + struct cfl_list *iterator_backup; + struct cfl_list *iterator; + struct flb_http_server_session *session; + + cfl_list_foreach_safe(iterator, + iterator_backup, + &server->clients) { + session = cfl_list_entry(iterator, + struct flb_http_server_session, + _head); + + if (session->drop_pending == FLB_FALSE && + (session->connection == NULL || + session->connection->fd == FLB_INVALID_SOCKET)) { + flb_http_server_session_destroy(session); + } + } +} + static size_t flb_http_server_client_count(struct flb_http_server *server) { return cfl_list_size(&server->clients); @@ -112,6 +171,7 @@ static int flb_http_server_apply_options(struct flb_http_server *session, session->status = HTTP_SERVER_UNINITIALIZED; session->protocol_version = options->protocol_version; + session->idle_timeout = options->idle_timeout; session->flags = options->flags; session->request_callback = options->request_callback; session->user_data = options->user_data; @@ -319,12 +379,27 @@ static int flb_http_server_client_activity_event_handler(void *data) struct mk_event *event; connection = (struct flb_connection *) data; + if (connection == NULL) { + return -1; + } + + event = &connection->event; session = (struct flb_http_server_session *) connection->user_data; + if (session == NULL) { + session = (struct flb_http_server_session *) event->data; + if (session != NULL && + (session->connection == NULL || + session->connection->fd == FLB_INVALID_SOCKET)) { + event->data = NULL; + session->drop_pending = FLB_FALSE; + flb_http_server_session_destroy(session); + } - server = session->parent; + return -1; + } - event = &connection->event; + server = session->parent; if (event->mask & MK_EVENT_READ) { result = flb_http_server_session_read(session); @@ -404,11 +479,14 @@ static int flb_http_server_client_connection_event_handler(void *data) return -1; } - if (server->max_connections > 0 && - flb_http_server_client_count(server) >= server->max_connections) { - flb_downstream_conn_release(connection); + if (server->max_connections > 0) { + flb_http_server_reap_stale_sessions(server); - return -5; + if (flb_http_server_client_count(server) >= server->max_connections) { + flb_downstream_conn_release(connection); + + return -5; + } } session = flb_http_server_session_create(server->protocol_version); @@ -429,6 +507,8 @@ static int flb_http_server_client_connection_event_handler(void *data) MK_EVENT_NEW(&connection->event); connection->user_data = (void *) session; + connection->event.data = (void *) session; + connection->drop_notification_callback = flb_http_server_connection_drop; connection->event.type = FLB_ENGINE_EV_CUSTOM; connection->event.handler = flb_http_server_client_activity_event_handler; @@ -469,6 +549,8 @@ static void flb_http_server_worker_maintenance(struct flb_config *config, if (worker->server.downstream != NULL) { flb_downstream_conn_timeouts_stream(worker->server.downstream); } + + flb_http_server_reap_stale_sessions(&worker->server); } static int flb_http_server_worker_initialize( @@ -490,6 +572,7 @@ static int flb_http_server_worker_initialize( options.networking_setup = &worker->net_setup; options.event_loop = worker->event_loop; options.system_context = worker->parent.system_context; + options.idle_timeout = worker->parent.idle_timeout; options.buffer_max_size = worker->parent.buffer_max_size; options.workers = 1; options.use_caller_event_loop = FLB_TRUE; @@ -730,6 +813,7 @@ void flb_http_server_options_init(struct flb_http_server_options *options) options->buffer_max_size = HTTP_SERVER_MAXIMUM_BUFFER_SIZE; options->buffer_chunk_size = HTTP_SERVER_INITIAL_BUFFER_SIZE; + options->idle_timeout = HTTP_SERVER_DEFAULT_IDLE_TIMEOUT; options->max_connections = 0; options->workers = 1; options->use_caller_event_loop = FLB_TRUE; @@ -745,6 +829,7 @@ void flb_http_server_config_init(struct flb_http_server_config *config) memset(config, 0, sizeof(struct flb_http_server_config)); config->http2 = FLB_TRUE; + config->idle_timeout = HTTP_SERVER_DEFAULT_IDLE_TIMEOUT; config->buffer_max_size = HTTP_SERVER_MAXIMUM_BUFFER_SIZE; config->buffer_chunk_size = HTTP_SERVER_INITIAL_BUFFER_SIZE; config->max_connections = 0; @@ -831,6 +916,7 @@ int flb_input_http_server_options_init(struct flb_http_server_options *options, } if (server_config != NULL) { + options->idle_timeout = server_config->idle_timeout; if (server_config->buffer_chunk_size > 0) { options->buffer_chunk_size = server_config->buffer_chunk_size; } @@ -867,6 +953,12 @@ int flb_http_server_init_with_options( options->networking_setup->share_port = FLB_TRUE; } + if (options->networking_setup != NULL && + options->networking_setup->io_timeout <= 0 && + options->idle_timeout > 0) { + options->networking_setup->io_timeout = options->idle_timeout; + } + return flb_http_server_apply_options(session, options); } @@ -1114,9 +1206,21 @@ struct flb_http_server_session *flb_http_server_session_create(int version) void flb_http_server_session_destroy(struct flb_http_server_session *session) { + struct flb_connection *connection; + if (session != NULL) { - if (session->connection != NULL) { - flb_downstream_conn_release(session->connection); + connection = session->connection; + session->connection = NULL; + + if (connection != NULL) { + connection->user_data = NULL; + connection->event.data = NULL; + connection->drop_notification_callback = NULL; + session->drop_pending = FLB_FALSE; + + if (connection->fd != FLB_INVALID_SOCKET) { + flb_downstream_conn_release(connection); + } } if (!cfl_list_entry_is_orphan(&session->_head)) { diff --git a/src/http_server/flb_http_server_config_map.c b/src/http_server/flb_http_server_config_map.c index 8f1ed6b2908..020b770c5a8 100644 --- a/src/http_server/flb_http_server_config_map.c +++ b/src/http_server/flb_http_server_config_map.c @@ -27,6 +27,11 @@ struct flb_config_map flb_http_server_config_map[] = { 0, FLB_TRUE, offsetof(struct flb_http_server_config, http2), "Enable HTTP/2 support for the HTTP server" }, + { + FLB_CONFIG_MAP_TIME, "http_server.idle_timeout", "10s", + 0, FLB_TRUE, offsetof(struct flb_http_server_config, idle_timeout), + "Set the maximum idle time for accepted HTTP connections. 0 disables it." + }, { FLB_CONFIG_MAP_SIZE, "http_server.buffer_max_size", "4M", 0, FLB_TRUE, offsetof(struct flb_http_server_config, buffer_max_size), From d28146d11d758a9d9e8a831c89536c7694157a25 Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Thu, 23 Apr 2026 12:35:20 -0600 Subject: [PATCH 4/7] out_prometheus_exporter: propagate http server idle timeout Signed-off-by: Eduardo Silva --- plugins/out_prometheus_exporter/prom_http.c | 1 + 1 file changed, 1 insertion(+) diff --git a/plugins/out_prometheus_exporter/prom_http.c b/plugins/out_prometheus_exporter/prom_http.c index 64196d66533..842e9ad351f 100644 --- a/plugins/out_prometheus_exporter/prom_http.c +++ b/plugins/out_prometheus_exporter/prom_http.c @@ -126,6 +126,7 @@ struct prom_http *prom_http_server_create(struct prom_exporter *ctx, options.use_caller_event_loop = FLB_TRUE; if (ins->http_server_config != NULL) { + options.idle_timeout = ins->http_server_config->idle_timeout; options.buffer_max_size = ins->http_server_config->buffer_max_size; options.max_connections = ins->http_server_config->max_connections; } From 029a8a509390f4ed4934711d67ac040b2cedb693 Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Thu, 23 Apr 2026 12:35:24 -0600 Subject: [PATCH 5/7] out_vivo_exporter: propagate http server idle timeout Signed-off-by: Eduardo Silva --- plugins/out_vivo_exporter/vivo_http.c | 1 + 1 file changed, 1 insertion(+) diff --git a/plugins/out_vivo_exporter/vivo_http.c b/plugins/out_vivo_exporter/vivo_http.c index de523e04307..3d34c3e4eff 100644 --- a/plugins/out_vivo_exporter/vivo_http.c +++ b/plugins/out_vivo_exporter/vivo_http.c @@ -342,6 +342,7 @@ struct vivo_http *vivo_http_server_create(struct vivo_exporter *ctx, options.use_caller_event_loop = FLB_TRUE; if (ins->http_server_config != NULL) { + options.idle_timeout = ins->http_server_config->idle_timeout; options.buffer_max_size = ins->http_server_config->buffer_max_size; options.max_connections = ins->http_server_config->max_connections; } From 4ae6af3f40f381bd282a03aab706aa41017f2dfd Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Thu, 23 Apr 2026 12:35:29 -0600 Subject: [PATCH 6/7] tests: internal: cover http server idle timeout Signed-off-by: Eduardo Silva --- tests/internal/http_server.c | 211 +++++++++++++++++++++++++++++++++++ 1 file changed, 211 insertions(+) diff --git a/tests/internal/http_server.c b/tests/internal/http_server.c index 133d45a8e12..464a0e7b158 100644 --- a/tests/internal/http_server.c +++ b/tests/internal/http_server.c @@ -7,6 +7,7 @@ #include #include #include +#include #include @@ -21,6 +22,7 @@ struct test_http_server_context { int request_calls; }; + static void test_http_server_context_init(struct test_http_server_context *context) { memset(context, 0, sizeof(struct test_http_server_context)); @@ -94,11 +96,15 @@ void test_http_server_options_defaults() TEST_CHECK(options.workers == 1); TEST_CHECK(options.use_caller_event_loop == FLB_TRUE); TEST_CHECK(options.reuse_port == FLB_FALSE); + TEST_CHECK(options.idle_timeout == HTTP_SERVER_DEFAULT_IDLE_TIMEOUT); TEST_CHECK(options.buffer_max_size == HTTP_SERVER_MAXIMUM_BUFFER_SIZE); TEST_CHECK(options.max_connections == 0); TEST_CHECK(config.http2 == FLB_TRUE); + TEST_CHECK(config.idle_timeout == HTTP_SERVER_DEFAULT_IDLE_TIMEOUT); TEST_CHECK(config.buffer_max_size == HTTP_SERVER_MAXIMUM_BUFFER_SIZE); TEST_CHECK(config.max_connections == 0); + TEST_CHECK(flb_http_server_property_is_allowed("http_server.idle_timeout") == FLB_TRUE); + TEST_CHECK(flb_http_server_property_is_allowed("idle_timeout") == FLB_FALSE); } void test_http_server_options_multi_worker_magic() @@ -136,6 +142,7 @@ void test_http_server_options_multi_worker_magic() TEST_CHECK(server.workers == 2); TEST_CHECK(server.reuse_port == FLB_TRUE); TEST_CHECK(server.use_caller_event_loop == FLB_FALSE); + TEST_CHECK(server.idle_timeout == HTTP_SERVER_DEFAULT_IDLE_TIMEOUT); TEST_CHECK(net_setup.share_port == FLB_TRUE); TEST_CHECK(server.max_connections == 7); @@ -184,6 +191,7 @@ void test_http_server_managed_worker_contract() TEST_CHECK(server.workers == 2); TEST_CHECK(server.use_caller_event_loop == FLB_FALSE); TEST_CHECK(server.reuse_port == FLB_TRUE); + TEST_CHECK(server.idle_timeout == HTTP_SERVER_DEFAULT_IDLE_TIMEOUT); TEST_CHECK(server.max_connections == 3); TEST_CHECK(server.cb_worker_init == test_http_server_worker_init); TEST_CHECK(server.cb_worker_exit == test_http_server_worker_exit); @@ -196,9 +204,212 @@ void test_http_server_managed_worker_contract() flb_config_exit(config); } +void test_http_server_idle_timeout_applies_to_networking_setup() +{ + struct flb_config *config; + struct flb_net_setup net_setup; + struct flb_http_server server; + struct flb_http_server_options options; + int ret; + + config = flb_config_init(); + if (!TEST_CHECK(config != NULL)) { + return; + } + + flb_net_setup_init(&net_setup); + flb_http_server_options_init(&options); + + options.protocol_version = HTTP_PROTOCOL_VERSION_AUTODETECT; + options.request_callback = test_http_server_request_handler; + options.address = (char *) TEST_HTTP_SERVER_HOST; + options.port = 10003; + options.networking_flags = FLB_IO_TCP; + options.networking_setup = &net_setup; + options.system_context = config; + options.idle_timeout = 17; + + ret = flb_http_server_init_with_options(&server, &options); + TEST_CHECK(ret == 0); + if (ret != 0) { + flb_config_exit(config); + return; + } + + TEST_CHECK(net_setup.io_timeout == 17); + TEST_CHECK(server.idle_timeout == 17); + + flb_http_server_destroy(&server); + flb_config_exit(config); +} + +void test_http_server_explicit_network_timeout_is_preserved() +{ + struct flb_config *config; + struct flb_net_setup net_setup; + struct flb_http_server server; + struct flb_http_server_options options; + int ret; + + config = flb_config_init(); + if (!TEST_CHECK(config != NULL)) { + return; + } + + flb_net_setup_init(&net_setup); + flb_http_server_options_init(&options); + + net_setup.io_timeout = 23; + + options.protocol_version = HTTP_PROTOCOL_VERSION_AUTODETECT; + options.request_callback = test_http_server_request_handler; + options.address = (char *) TEST_HTTP_SERVER_HOST; + options.port = 10004; + options.networking_flags = FLB_IO_TCP; + options.networking_setup = &net_setup; + options.system_context = config; + options.idle_timeout = 17; + + ret = flb_http_server_init_with_options(&server, &options); + TEST_CHECK(ret == 0); + if (ret != 0) { + flb_config_exit(config); + return; + } + + TEST_CHECK(net_setup.io_timeout == 23); + TEST_CHECK(server.idle_timeout == 17); + + flb_http_server_destroy(&server); + flb_config_exit(config); +} + +void test_http_server_multi_worker_disabled_idle_timeout_is_preserved() +{ + struct flb_config *config; + struct flb_net_setup net_setup; + struct flb_http_server server; + struct flb_http_server_options options; + const struct flb_net_setup *worker0_net_setup; + const struct flb_net_setup *worker1_net_setup; + const struct flb_net_setup *worker2_net_setup; + int ret; + + config = flb_config_init(); + if (!TEST_CHECK(config != NULL)) { + return; + } + + flb_net_setup_init(&net_setup); + flb_http_server_options_init(&options); + + options.protocol_version = HTTP_PROTOCOL_VERSION_AUTODETECT; + options.request_callback = test_http_server_request_handler; + options.address = (char *) TEST_HTTP_SERVER_HOST; + options.port = 0; + options.networking_flags = FLB_IO_TCP; + options.networking_setup = &net_setup; + options.system_context = config; + options.workers = 2; + options.idle_timeout = 0; + + ret = flb_http_server_init_with_options(&server, &options); + TEST_CHECK(ret == 0); + if (ret != 0) { + flb_config_exit(config); + return; + } + + TEST_CHECK(server.idle_timeout == 0); + TEST_CHECK(net_setup.io_timeout == 0); + + ret = flb_http_server_start(&server); + TEST_CHECK(ret == 0); + if (ret != 0) { + flb_http_server_destroy(&server); + flb_config_exit(config); + return; + } + + worker0_net_setup = flb_http_server_runtime_worker_net_setup_get(&server, 0); + worker1_net_setup = flb_http_server_runtime_worker_net_setup_get(&server, 1); + worker2_net_setup = flb_http_server_runtime_worker_net_setup_get(&server, 2); + + if (TEST_CHECK(worker0_net_setup != NULL) && + TEST_CHECK(worker1_net_setup != NULL) && + TEST_CHECK(worker2_net_setup == NULL)) { + TEST_CHECK(worker0_net_setup->io_timeout == 0); + TEST_CHECK(worker1_net_setup->io_timeout == 0); + } + + flb_http_server_destroy(&server); + flb_config_exit(config); +} + +void test_http_server_session_destroy_with_closed_connection() +{ + struct flb_connection connection; + struct flb_http_server_session *session; + + memset(&connection, 0, sizeof(struct flb_connection)); + connection.fd = FLB_INVALID_SOCKET; + + session = flb_http_server_session_create(HTTP_PROTOCOL_VERSION_11); + if (!TEST_CHECK(session != NULL)) { + return; + } + + session->connection = &connection; + connection.user_data = session; + connection.event.data = session; + + flb_http_server_session_destroy(session); + + TEST_CHECK(connection.user_data == NULL); + TEST_CHECK(connection.event.data == NULL); +} + +void test_http_server_session_destroy_clears_drop_pending() +{ + struct flb_connection connection; + struct flb_http_server_session *session; + + memset(&connection, 0, sizeof(struct flb_connection)); + connection.fd = FLB_INVALID_SOCKET; + + session = flb_http_server_session_create(HTTP_PROTOCOL_VERSION_11); + if (!TEST_CHECK(session != NULL)) { + return; + } + + session->connection = &connection; + session->drop_pending = FLB_TRUE; + session->releasable = FLB_FALSE; + connection.user_data = session; + connection.event.data = session; + + flb_http_server_session_destroy(session); + + TEST_CHECK(connection.user_data == NULL); + TEST_CHECK(connection.event.data == NULL); + TEST_CHECK(session->drop_pending == FLB_FALSE); + + flb_free(session); +} + TEST_LIST = { { "http_server_options_defaults", test_http_server_options_defaults }, { "http_server_options_multi_worker_magic", test_http_server_options_multi_worker_magic }, { "http_server_managed_worker_contract", test_http_server_managed_worker_contract }, + { "http_server_idle_timeout_applies_to_networking_setup", + test_http_server_idle_timeout_applies_to_networking_setup }, + { "http_server_explicit_network_timeout_is_preserved", + test_http_server_explicit_network_timeout_is_preserved }, + { "http_server_multi_worker_disabled_idle_timeout_is_preserved", + test_http_server_multi_worker_disabled_idle_timeout_is_preserved }, + { "http_server_session_destroy_with_closed_connection", + test_http_server_session_destroy_with_closed_connection }, + { "http_server_session_destroy_clears_drop_pending", + test_http_server_session_destroy_clears_drop_pending }, { 0 } }; From acb63fb51229cf9c925fc1f60804ecabd2eadbe7 Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Thu, 23 Apr 2026 12:35:35 -0600 Subject: [PATCH 7/7] tests: integration: cover idle timeout recovery Signed-off-by: Eduardo Silva --- .../config/in_http_max_connections.yaml | 1 + .../tests/test_in_http_max_connections_001.py | 239 +++++++++++++++--- 2 files changed, 202 insertions(+), 38 deletions(-) diff --git a/tests/integration/scenarios/in_http_max_connections/config/in_http_max_connections.yaml b/tests/integration/scenarios/in_http_max_connections/config/in_http_max_connections.yaml index cccb8bf133e..07459ac71e1 100644 --- a/tests/integration/scenarios/in_http_max_connections/config/in_http_max_connections.yaml +++ b/tests/integration/scenarios/in_http_max_connections/config/in_http_max_connections.yaml @@ -10,6 +10,7 @@ pipeline: - name: http listen: 127.0.0.1 port: ${FLUENT_BIT_TEST_LISTENER_PORT} + http_server.idle_timeout: 2s http_server.max_connections: 1 outputs: diff --git a/tests/integration/scenarios/in_http_max_connections/tests/test_in_http_max_connections_001.py b/tests/integration/scenarios/in_http_max_connections/tests/test_in_http_max_connections_001.py index c9e3766aa4b..c88572dcc9b 100644 --- a/tests/integration/scenarios/in_http_max_connections/tests/test_in_http_max_connections_001.py +++ b/tests/integration/scenarios/in_http_max_connections/tests/test_in_http_max_connections_001.py @@ -1,15 +1,20 @@ +import logging import os import socket +import time import pytest import requests from server.http_server import data_storage, http_server_run from utils.fluent_bit_manager import FluentBitStartupError -from utils.http_matrix import run_curl_request +from utils.http_matrix import curl_supports_http2, run_curl_request from utils.test_service import FluentBitTestService +LOGGER = logging.getLogger(__name__) + + class Service: def __init__(self): self.config_file = os.path.abspath( @@ -41,55 +46,213 @@ def stop(self): self.service.stop() -def test_in_http_max_connections_blocks_and_recovers(): - service = Service() +def _read_fluent_bit_log(service): + log_contents = "" + + if service.service.flb and service.service.flb.log_file: + with open(service.service.flb.log_file, "r", encoding="utf-8", errors="replace") as file: + log_contents = file.read() + + return log_contents + + +def _start_service_or_skip(service, required_properties): try: service.start() except FluentBitStartupError as error: - log_contents = "" - if service.service.flb and service.service.flb.log_file: - with open(service.service.flb.log_file, "r", encoding="utf-8", errors="replace") as file: - log_contents = file.read() - if "http_server.max_connections" in str(error) or "unknown configuration property 'http_server.max_connections'" in log_contents: - pytest.skip("http_server.max_connections is not supported by this Fluent Bit binary") + log_contents = _read_fluent_bit_log(service) + error_message = str(error) + + for property_name in required_properties: + unknown_property_error = f"unknown configuration property '{property_name}'" + if (unknown_property_error in error_message or + unknown_property_error in log_contents): + try: + service.stop() + except Exception: + LOGGER.debug("stop after skip failed", exc_info=True) + pytest.skip(f"{property_name} is not supported by this Fluent Bit binary") raise - held_connection = None - try: - held_connection = socket.create_connection(("127.0.0.1", service.flb_listener_port), timeout=2) - held_connection.settimeout(2) - overflow_rejected = False +def _wait_for_accepted_request(service, payload, http_mode, timeout=10, interval=0.5): + """Poll until 201, return the last response on timeout, and raise only if none arrived.""" + deadline = time.monotonic() + timeout + last_error = None + last_response = None + + while time.monotonic() < deadline: try: - response = run_curl_request( + last_response = run_curl_request( f"http://127.0.0.1:{service.flb_listener_port}/", - payload='{"message":"max-connections"}', + payload=payload, headers=["Content-Type: application/json"], - http_mode="http1.1", + http_mode=http_mode, ) - overflow_rejected = response["status_code"] != 201 - except Exception: - overflow_rejected = True + if last_response["status_code"] == 201: + return last_response + except Exception as error: + last_error = error + + time.sleep(interval) + + if last_response is not None: + return last_response + + if last_error is not None: + raise last_error - assert overflow_rejected + return {"status_code": 0} + + +def test_in_http_max_connections_blocks_and_recovers(): + service = Service() + accepted = {"status_code": 0} + forwarded_payloads = [] + + _start_service_or_skip(service, ["http_server.max_connections"]) + + held_connection = None + try: + try: + held_connection = socket.create_connection(("127.0.0.1", service.flb_listener_port), timeout=2) + held_connection.settimeout(2) + + overflow_rejected = False + try: + response = run_curl_request( + f"http://127.0.0.1:{service.flb_listener_port}/", + payload='{"message":"max-connections"}', + headers=["Content-Type: application/json"], + http_mode="http1.1", + ) + overflow_rejected = response["status_code"] != 201 + except Exception: + overflow_rejected = True + + assert overflow_rejected + finally: + if held_connection: + held_connection.close() + + accepted = run_curl_request( + f"http://127.0.0.1:{service.flb_listener_port}/", + payload='{"message":"max-connections"}', + headers=["Content-Type: application/json"], + http_mode="http1.1", + ) + forwarded_payloads = service.service.wait_for_condition( + lambda: data_storage["payloads"] if data_storage["payloads"] else None, + timeout=10, + interval=0.5, + description="forwarded max-connections payload", + ) finally: - if held_connection: - held_connection.close() - - accepted = run_curl_request( - f"http://127.0.0.1:{service.flb_listener_port}/", - payload='{"message":"max-connections"}', - headers=["Content-Type: application/json"], - http_mode="http1.1", - ) - forwarded_payloads = service.service.wait_for_condition( - lambda: data_storage["payloads"] if data_storage["payloads"] else None, - timeout=10, - interval=0.5, - description="forwarded max-connections payload", - ) - - service.stop() + service.stop() assert accepted["status_code"] == 201 assert forwarded_payloads[0][0]["message"] == "max-connections" + + +def test_in_http_idle_timeout_evicts_partial_request_connection(): + service = Service() + response = {"status_code": 0} + forwarded_payloads = [] + + _start_service_or_skip(service, ["http_server.max_connections", "http_server.idle_timeout"]) + + held_connection = None + try: + try: + held_connection = socket.create_connection(("127.0.0.1", service.flb_listener_port), timeout=2) + held_connection.settimeout(5) + held_connection.sendall(b"POST / HTTP/1.1\r\n") + + overflow_rejected = False + try: + response = run_curl_request( + f"http://127.0.0.1:{service.flb_listener_port}/", + payload='{"message":"idle-timeout-blocked"}', + headers=["Content-Type: application/json"], + http_mode="http1.1", + ) + overflow_rejected = response["status_code"] != 201 + except Exception: + overflow_rejected = True + + assert overflow_rejected + + response = _wait_for_accepted_request( + service, + payload='{"message":"idle-timeout-recovered"}', + http_mode="http1.1", + ) + finally: + if held_connection: + held_connection.close() + held_connection = None + forwarded_payloads = service.service.wait_for_condition( + lambda: data_storage["payloads"] if data_storage["payloads"] else None, + timeout=10, + interval=0.5, + description="forwarded idle-timeout payload", + ) + finally: + service.stop() + + assert response["status_code"] == 201 + assert forwarded_payloads[0][0]["message"] == "idle-timeout-recovered" + + +def test_in_http_idle_timeout_evicts_partial_http2_preface_connection(): + if not curl_supports_http2(): + pytest.skip("curl was built without HTTP/2 support") + + service = Service() + response = {"status_code": 0, "http_version": ""} + forwarded_payloads = [] + + _start_service_or_skip(service, ["http_server.max_connections", "http_server.idle_timeout"]) + + held_connection = None + try: + try: + held_connection = socket.create_connection(("127.0.0.1", service.flb_listener_port), timeout=2) + held_connection.settimeout(5) + held_connection.sendall(b"PRI * HTTP/2.0\r\n\r\nSM\r\n") + + overflow_rejected = False + try: + response = run_curl_request( + f"http://127.0.0.1:{service.flb_listener_port}/", + payload='{"message":"idle-timeout-http2-blocked"}', + headers=["Content-Type: application/json"], + http_mode="http2-prior-knowledge", + ) + overflow_rejected = response["status_code"] != 201 + except Exception: + overflow_rejected = True + + assert overflow_rejected + + response = _wait_for_accepted_request( + service, + payload='{"message":"idle-timeout-http2-recovered"}', + http_mode="http2-prior-knowledge", + ) + finally: + if held_connection: + held_connection.close() + held_connection = None + forwarded_payloads = service.service.wait_for_condition( + lambda: data_storage["payloads"] if data_storage["payloads"] else None, + timeout=10, + interval=0.5, + description="forwarded idle-timeout http2 payload", + ) + finally: + service.stop() + + assert response["status_code"] == 201 + assert response["http_version"] == "2" + assert forwarded_payloads[0][0]["message"] == "idle-timeout-http2-recovered"