Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions include/fluent-bit/flb_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,26 @@ struct flb_net_setup;
struct flb_upstream;
struct flb_downstream;
struct flb_tls_session;
struct flb_connection;

typedef void (*flb_connection_drop_notification_callback)(
struct flb_connection *connection);

/* Base network connection */
struct flb_connection {
struct mk_event event;

void *user_data;
/*
* Optional notification invoked from prepare_destroy_conn() while the
* connection is still linked on busy_queue and before the event is
* deregistered and the file descriptor is closed.
*
* Callers may detach external state here, but must not free, destroy or
* unlink the connection because prepare_destroy_conn() performs the final
* teardown immediately after the callback returns.
*/
flb_connection_drop_notification_callback drop_notification_callback;

/* Socket */
flb_sockfd_t fd;
Expand Down
9 changes: 9 additions & 0 deletions include/fluent-bit/http_server/flb_http_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@

#define HTTP_SERVER_INITIAL_BUFFER_SIZE (10 * 1024)
#define HTTP_SERVER_MAXIMUM_BUFFER_SIZE (10 * (1000 * 1024))
#define HTTP_SERVER_DEFAULT_IDLE_TIMEOUT 10 /* seconds */

#define FLB_HTTP_SERVER_FLAG_KEEPALIVE (((uint64_t) 1) << 0)
#define FLB_HTTP_SERVER_FLAG_AUTO_DEFLATE (((uint64_t) 1) << 1)
Expand Down Expand Up @@ -70,6 +71,7 @@ struct flb_input_instance;

struct flb_http_server_config {
int http2;
int idle_timeout; /* seconds */
size_t buffer_max_size;
size_t buffer_chunk_size;
size_t max_connections;
Expand All @@ -92,6 +94,7 @@ struct flb_http_server_options {
struct mk_event_loop *event_loop;
struct flb_config *system_context;

int idle_timeout; /* seconds */
size_t buffer_max_size;
size_t buffer_chunk_size;
size_t max_connections;
Expand Down Expand Up @@ -131,6 +134,7 @@ struct flb_http_server {
uint64_t flags;
int status;
int protocol_version;
int idle_timeout; /* seconds */
struct flb_downstream *downstream;
struct cfl_list clients;
flb_http_server_request_processor_callback request_callback;
Expand Down Expand Up @@ -161,6 +165,7 @@ struct flb_http_server_session {
size_t read_buffer_size;

int releasable;
int drop_pending;

struct flb_connection *connection;
struct flb_http_server *parent;
Expand Down Expand Up @@ -226,6 +231,10 @@ void flb_http_server_set_buffer_max_size(struct flb_http_server *server, size_t

size_t flb_http_server_get_buffer_max_size(struct flb_http_server *server);

const struct flb_net_setup *
flb_http_server_runtime_worker_net_setup_get(struct flb_http_server *server,
int worker_id);

/* HTTP SESSION */

int flb_http_server_session_init(struct flb_http_server_session *session, int version);
Expand Down
1 change: 1 addition & 0 deletions plugins/out_prometheus_exporter/prom_http.c
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ struct prom_http *prom_http_server_create(struct prom_exporter *ctx,
options.use_caller_event_loop = FLB_TRUE;

if (ins->http_server_config != NULL) {
options.idle_timeout = ins->http_server_config->idle_timeout;
options.buffer_max_size = ins->http_server_config->buffer_max_size;
options.max_connections = ins->http_server_config->max_connections;
}
Expand Down
1 change: 1 addition & 0 deletions plugins/out_vivo_exporter/vivo_http.c
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,7 @@ struct vivo_http *vivo_http_server_create(struct vivo_exporter *ctx,
options.use_caller_event_loop = FLB_TRUE;

if (ins->http_server_config != NULL) {
options.idle_timeout = ins->http_server_config->idle_timeout;
options.buffer_max_size = ins->http_server_config->buffer_max_size;
options.max_connections = ins->http_server_config->max_connections;
}
Expand Down
2 changes: 1 addition & 1 deletion src/flb_connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -254,4 +254,4 @@ void flb_connection_unset_io_timeout(struct flb_connection *connection)
assert(connection != NULL);

connection->ts_io_timeout = -1;
}
}
4 changes: 4 additions & 0 deletions src/flb_downstream.c
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,10 @@ static int prepare_destroy_conn(struct flb_connection *connection)
flb_trace("[downstream] destroy connection #%i to %s",
connection->fd, flb_connection_get_remote_address(connection));

if (connection->drop_notification_callback != NULL) {
connection->drop_notification_callback(connection);
}

if (MK_EVENT_IS_REGISTERED((&connection->event))) {
mk_event_del(connection->evl, &connection->event);
}
Expand Down
120 changes: 112 additions & 8 deletions src/http_server/flb_http_server.c
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,20 @@ struct flb_http_server_runtime {
int worker_count;
};

const struct flb_net_setup *
flb_http_server_runtime_worker_net_setup_get(struct flb_http_server *server,
int worker_id)
{
if (server == NULL ||
server->runtime == NULL ||
worker_id < 0 ||
worker_id >= server->runtime->worker_count) {
return NULL;
}

return &server->runtime->workers[worker_id].net_setup;
}

static void flb_http_server_runtime_stop(struct flb_http_server *session);

static void flb_http_server_worker_context_reset(
Expand Down Expand Up @@ -98,6 +112,51 @@ static const char *flb_http_server_get_alpn_string(struct flb_http_server *sessi
return "http/1.0";
}

static void flb_http_server_connection_drop(struct flb_connection *connection)
{
struct flb_http_server_session *session;

if (connection == NULL) {
return;
}

session = connection->user_data;

if (session != NULL &&
session->connection == connection) {
session->connection = NULL;
session->drop_pending = FLB_TRUE;
connection->event.data = session;
}
else {
connection->event.data = NULL;
}

connection->user_data = NULL;
connection->drop_notification_callback = NULL;
}

static void flb_http_server_reap_stale_sessions(struct flb_http_server *server)
{
struct cfl_list *iterator_backup;
struct cfl_list *iterator;
struct flb_http_server_session *session;

cfl_list_foreach_safe(iterator,
iterator_backup,
&server->clients) {
session = cfl_list_entry(iterator,
struct flb_http_server_session,
_head);

if (session->drop_pending == FLB_FALSE &&
(session->connection == NULL ||
session->connection->fd == FLB_INVALID_SOCKET)) {
flb_http_server_session_destroy(session);
}
}
}

static size_t flb_http_server_client_count(struct flb_http_server *server)
{
return cfl_list_size(&server->clients);
Expand All @@ -112,6 +171,7 @@ static int flb_http_server_apply_options(struct flb_http_server *session,

session->status = HTTP_SERVER_UNINITIALIZED;
session->protocol_version = options->protocol_version;
session->idle_timeout = options->idle_timeout;
session->flags = options->flags;
session->request_callback = options->request_callback;
session->user_data = options->user_data;
Expand Down Expand Up @@ -319,12 +379,27 @@ static int flb_http_server_client_activity_event_handler(void *data)
struct mk_event *event;

connection = (struct flb_connection *) data;
if (connection == NULL) {
return -1;
}

event = &connection->event;

session = (struct flb_http_server_session *) connection->user_data;
if (session == NULL) {
session = (struct flb_http_server_session *) event->data;
if (session != NULL &&
(session->connection == NULL ||
session->connection->fd == FLB_INVALID_SOCKET)) {
event->data = NULL;
session->drop_pending = FLB_FALSE;
flb_http_server_session_destroy(session);
}

server = session->parent;
return -1;
Comment on lines 388 to +399
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Destroy timed-out sessions after user_data is cleared

When a timeout drops a connection, prepare_destroy_conn() now invokes the drop callback, which clears connection->user_data before the injected cleanup event is processed. This early return path then skips flb_http_server_session_destroy(), so the session can remain linked in server->clients (and keep its buffers) until another accept path happens to sweep it. In single-worker/caller-event-loop mode, flb_http_server_worker_maintenance() is never run, so these timed-out sessions are retained unnecessarily.

Useful? React with 👍 / 👎.

}

event = &connection->event;
server = session->parent;

if (event->mask & MK_EVENT_READ) {
result = flb_http_server_session_read(session);
Expand Down Expand Up @@ -404,11 +479,14 @@ static int flb_http_server_client_connection_event_handler(void *data)
return -1;
}

if (server->max_connections > 0 &&
flb_http_server_client_count(server) >= server->max_connections) {
flb_downstream_conn_release(connection);
if (server->max_connections > 0) {
flb_http_server_reap_stale_sessions(server);

return -5;
if (flb_http_server_client_count(server) >= server->max_connections) {
flb_downstream_conn_release(connection);

return -5;
}
}

session = flb_http_server_session_create(server->protocol_version);
Expand All @@ -429,6 +507,8 @@ static int flb_http_server_client_connection_event_handler(void *data)
MK_EVENT_NEW(&connection->event);

connection->user_data = (void *) session;
connection->event.data = (void *) session;
connection->drop_notification_callback = flb_http_server_connection_drop;
connection->event.type = FLB_ENGINE_EV_CUSTOM;
connection->event.handler = flb_http_server_client_activity_event_handler;

Expand Down Expand Up @@ -469,6 +549,8 @@ static void flb_http_server_worker_maintenance(struct flb_config *config,
if (worker->server.downstream != NULL) {
flb_downstream_conn_timeouts_stream(worker->server.downstream);
}

flb_http_server_reap_stale_sessions(&worker->server);
}

static int flb_http_server_worker_initialize(
Expand All @@ -490,6 +572,7 @@ static int flb_http_server_worker_initialize(
options.networking_setup = &worker->net_setup;
options.event_loop = worker->event_loop;
options.system_context = worker->parent.system_context;
options.idle_timeout = worker->parent.idle_timeout;
options.buffer_max_size = worker->parent.buffer_max_size;
options.workers = 1;
options.use_caller_event_loop = FLB_TRUE;
Expand Down Expand Up @@ -730,6 +813,7 @@ void flb_http_server_options_init(struct flb_http_server_options *options)

options->buffer_max_size = HTTP_SERVER_MAXIMUM_BUFFER_SIZE;
options->buffer_chunk_size = HTTP_SERVER_INITIAL_BUFFER_SIZE;
options->idle_timeout = HTTP_SERVER_DEFAULT_IDLE_TIMEOUT;
options->max_connections = 0;
options->workers = 1;
options->use_caller_event_loop = FLB_TRUE;
Expand All @@ -745,6 +829,7 @@ void flb_http_server_config_init(struct flb_http_server_config *config)
memset(config, 0, sizeof(struct flb_http_server_config));

config->http2 = FLB_TRUE;
config->idle_timeout = HTTP_SERVER_DEFAULT_IDLE_TIMEOUT;
config->buffer_max_size = HTTP_SERVER_MAXIMUM_BUFFER_SIZE;
config->buffer_chunk_size = HTTP_SERVER_INITIAL_BUFFER_SIZE;
config->max_connections = 0;
Expand Down Expand Up @@ -831,6 +916,7 @@ int flb_input_http_server_options_init(struct flb_http_server_options *options,
}

if (server_config != NULL) {
options->idle_timeout = server_config->idle_timeout;
if (server_config->buffer_chunk_size > 0) {
options->buffer_chunk_size = server_config->buffer_chunk_size;
}
Expand Down Expand Up @@ -867,6 +953,12 @@ int flb_http_server_init_with_options(
options->networking_setup->share_port = FLB_TRUE;
}

if (options->networking_setup != NULL &&
options->networking_setup->io_timeout <= 0 &&
options->idle_timeout > 0) {
options->networking_setup->io_timeout = options->idle_timeout;
Comment on lines +956 to +959
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Preserve disabled idle timeout in multi-worker servers

This assignment is correct for normal initialization, but worker servers are initialized with fresh default options (idle_timeout=10) and a copied net_setup. If users configure http_server.idle_timeout to 0 (documented to disable), net_setup.io_timeout stays 0 in the parent and this block re-applies 10 during worker initialization, so disabling idle timeout does not work when http_server.workers > 1.

Useful? React with 👍 / 👎.

}

return flb_http_server_apply_options(session, options);
}

Expand Down Expand Up @@ -1114,9 +1206,21 @@ struct flb_http_server_session *flb_http_server_session_create(int version)

void flb_http_server_session_destroy(struct flb_http_server_session *session)
{
struct flb_connection *connection;

if (session != NULL) {
if (session->connection != NULL) {
flb_downstream_conn_release(session->connection);
connection = session->connection;
session->connection = NULL;

if (connection != NULL) {
connection->user_data = NULL;
connection->event.data = NULL;
connection->drop_notification_callback = NULL;
session->drop_pending = FLB_FALSE;

if (connection->fd != FLB_INVALID_SOCKET) {
flb_downstream_conn_release(connection);
}
}

if (!cfl_list_entry_is_orphan(&session->_head)) {
Expand Down
5 changes: 5 additions & 0 deletions src/http_server/flb_http_server_config_map.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ struct flb_config_map flb_http_server_config_map[] = {
0, FLB_TRUE, offsetof(struct flb_http_server_config, http2),
"Enable HTTP/2 support for the HTTP server"
},
{
FLB_CONFIG_MAP_TIME, "http_server.idle_timeout", "10s",
0, FLB_TRUE, offsetof(struct flb_http_server_config, idle_timeout),
"Set the maximum idle time for accepted HTTP connections. 0 disables it."
},
{
FLB_CONFIG_MAP_SIZE, "http_server.buffer_max_size", "4M",
0, FLB_TRUE, offsetof(struct flb_http_server_config, buffer_max_size),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ pipeline:
- name: http
listen: 127.0.0.1
port: ${FLUENT_BIT_TEST_LISTENER_PORT}
http_server.idle_timeout: 2s
http_server.max_connections: 1

outputs:
Expand Down
Loading
Loading