Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/http_server/flb_hs.c
Original file line number Diff line number Diff line change
Expand Up @@ -370,9 +370,9 @@ struct flb_hs *flb_hs_create(const char *listen, const char *tcp_port,
options.networking_flags = 0;
flb_net_setup_init(&hs->net_setup);
options.networking_setup = &hs->net_setup;
options.event_loop = config->evl;
options.event_loop = NULL;
options.system_context = config;
options.use_caller_event_loop = FLB_TRUE;
options.use_caller_event_loop = FLB_FALSE;
Comment on lines +373 to +375
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Search for any synchronisation primitives protecting hs buffer fields
# in the http_server module (headers, source, callers)

echo "=== mutex / rwlock / atomic references in http_server module ==="
rg -rn --type c \
   "pthread_mutex|flb_mutex|flb_lock|pthread_rwlock|rwlock|atomic|_Atomic" \
   --include="*.c" --include="*.h" \
   -g "*/http_server/*"

echo ""
echo "=== flb_hs struct definition (looking for lock fields) ==="
rg -rn --type c -A 40 "struct flb_hs\b" \
   --include="*.h"

echo ""
echo "=== callers of flb_hs_push_pipeline_metrics, flb_hs_push_metrics, etc. ==="
rg -rn --type c \
   "flb_hs_push_(pipeline_metrics|metrics|storage_metrics|health_metrics)\b"

Repository: fluent/fluent-bit

Length of output: 2147


🏁 Script executed:

#!/bin/bash

echo "=== flb_hs struct definition from header ==="
fd -t f "flb_hs.h" --exec cat -n {} \;

echo ""
echo "=== Content around lines 373-375 in flb_hs.c ==="
sed -n '360,390p' src/http_server/flb_hs.c | cat -n

echo ""
echo "=== Search for any locks/mutexes in flb_hs.c ==="
grep -n "pthread_mutex\|flb_mutex\|flb_lock\|rwlock\|atomic" src/http_server/flb_hs.c

echo ""
echo "=== Search for pthread in entire http_server dir ==="
find src/http_server -name "*.c" -o -name "*.h" | xargs grep -l "pthread" 2>/dev/null

echo ""
echo "=== Search for flb_hs_push functions to see their real names ==="
grep -n "flb_hs_push" include/fluent-bit/http_server/flb_hs.h

echo ""
echo "=== Check if flb_http_server_running_on_caller_context is defined ==="
grep -rn "flb_http_server_running_on_caller_context" src/ include/

Repository: fluent/fluent-bit

Length of output: 6066


🏁 Script executed:

#!/bin/bash

echo "=== flb_http_server.c for threading and synchronization ==="
grep -n "pthread\|mutex\|lock" src/http_server/flb_http_server.c | head -20

echo ""
echo "=== flb_hs_push_metrics and related implementations ==="
grep -n "flb_hs_push" src/http_server/flb_hs.c | head -20

echo ""
echo "=== Implementation of flb_hs_push_metrics function ==="
sed -n '1,50p' src/http_server/flb_hs.c | grep -A 30 "flb_hs_push_metrics"

echo ""
echo "=== Search for flb_hs_buf_cleanup ==="
grep -rn "flb_hs_buf_cleanup" src/http_server/

echo ""
echo "=== Search for where flb_hs_buf data/raw_data are accessed in handlers ==="
rg -n "hs->metrics|hs->storage_metrics|hs->health_metrics" src/http_server/flb_hs.c | head -30

echo ""
echo "=== Check flb_http_server_init_with_options for worker thread setup ==="
grep -n "flb_http_server_init_with_options" src/http_server/flb_http_server.c

Repository: fluent/fluent-bit

Length of output: 3721


🏁 Script executed:

#!/bin/bash

echo "=== api_v1_registration and endpoint callbacks ==="
grep -n "api_v1_registration\|api_v1_" src/http_server/flb_hs.c | head -20

echo ""
echo "=== Look for /api/v1/metrics or similar handlers ==="
grep -rn "metrics.*callback\|flb_hs_register_endpoint" src/http_server/flb_hs.c | head -20

echo ""
echo "=== Find handler implementations for metrics endpoint ==="
sed -n '1,500p' src/http_server/flb_hs.c | grep -B 5 -A 15 "\.data\|\.raw_data" | head -60

echo ""
echo "=== Check api_v1.c for endpoint implementations ==="
find src/http_server -name "*api_v1*" -o -name "*endpoint*" | head -10

echo ""
echo "=== Check where push_* functions are called from ==="
rg -rn "flb_hs_push_metrics|flb_hs_push_health|flb_hs_push_pipeline|flb_hs_push_storage" src/ | grep -v "\.h:" | grep -v "flb_hs\.c:"

Repository: fluent/fluent-bit

Length of output: 2208


🏁 Script executed:

#!/bin/bash

echo "=== flb_hs_endpoints.c - find endpoint handlers ==="
wc -l src/http_server/flb_hs_endpoints.c

echo ""
echo "=== Look for handlers that read metrics buffers ==="
grep -n "hs->metrics\|hs->storage_metrics\|hs->health_metrics" src/http_server/flb_hs_endpoints.c | head -30

echo ""
echo "=== Search for callback functions that handle requests ==="
grep -n "static.*callback\|int.*callback" src/http_server/flb_hs_endpoints.c | head -20

echo ""
echo "=== Get the metrics endpoint handler ==="
grep -B 5 -A 20 "hs->metrics\.data\|hs->metrics\.raw_data" src/http_server/flb_hs_endpoints.c | head -60

echo ""
echo "=== Confirm use_caller_event_loop behavior ==="
grep -B 5 -A 10 "use_caller_event_loop.*FLB_FALSE\|use_caller_event_loop.*FLB_TRUE" src/http_server/flb_http_server.c | head -40

Repository: fluent/fluent-bit

Length of output: 1686


🏁 Script executed:

#!/bin/bash

echo "=== Full flb_hs_endpoints.c file ==="
cat -n src/http_server/flb_hs_endpoints.c

echo ""
echo "=== Look for all occurrences of 'hs' parameter usage in callbacks ==="
grep -n "hs->" src/http_server/flb_hs_endpoints.c

Repository: fluent/fluent-bit

Length of output: 4701


🏁 Script executed:

#!/bin/bash

echo "=== Find api_v1_registration function ==="
rg -n "api_v1_registration" src/ --type c -A 30 | head -80

echo ""
echo "=== Search for actual HTTP request callbacks in http_server ==="
rg -n "flb_http_request\|flb_http_response" src/http_server/ --type c | grep -v "\.h:" | head -40

echo ""
echo "=== Look for where endpoint callbacks are called from ==="
rg -n "callback\(" src/http_server/flb_http_server.c | head -20

echo ""
echo "=== Check how worker threads invoke callbacks ==="
grep -B 5 -A 15 "request.*response\|handler.*callback" src/http_server/flb_http_server.c | head -60

Repository: fluent/fluent-bit

Length of output: 4286


🏁 Script executed:

#!/bin/bash

echo "=== Find api_v1_metrics implementation ==="
find src/http_server/api -name "*.c" | head -20

echo ""
echo "=== List all api endpoint files ==="
find src/http_server/api -type f -name "*.c"

echo ""
echo "=== Check api_v1_metrics.c ==="
wc -l src/http_server/api/v1/metrics.c

echo ""
echo "=== Search for callback functions in v1 api ==="
grep -rn "static int.*callback\|endpoint_metrics\|cb_" src/http_server/api/v1/ --include="*.c" | head -20

echo ""
echo "=== Check metrics.c for buffer access ==="
grep -n "hs->metrics\|hs->storage_metrics" src/http_server/api/v1/metrics.c

Repository: fluent/fluent-bit

Length of output: 2593


🏁 Script executed:

#!/bin/bash

echo "=== View cb_metrics function in metrics.c ==="
sed -n '405,430p' src/http_server/api/v1/metrics.c

echo ""
echo "=== View cb_metrics_prometheus and how it accesses buffers ==="
sed -n '124,160p' src/http_server/api/v1/metrics.c

echo ""
echo "=== Check cb_storage function in storage.c ==="
sed -n '40,70p' src/http_server/api/v1/storage.c

echo ""
echo "=== Check cb_health function ==="
sed -n '152,180p' src/http_server/api/v1/health.c

echo ""
echo "=== Find request_callback assignment in flb_http_server.c ==="
grep -n "request_callback.*=" src/http_server/flb_http_server.c | head -10

Repository: fluent/fluent-bit

Length of output: 3860


🏁 Script executed:

#!/bin/bash

echo "=== View metrics_get_latest function ==="
grep -B 5 -A 10 "metrics_get_latest" src/http_server/api/v1/metrics.c | head -30

echo ""
echo "=== View flb_hs_buf_cleanup implementation ==="
sed -n '113,150p' src/http_server/flb_hs.c

echo ""
echo "=== View flb_hs_buf_release implementation ==="
sed -n '135,160p' src/http_server/flb_hs.c

echo ""
echo "=== Check if users field is accessed anywhere else ==="
grep -rn "\.users\+\+\|\.users--\|\.users =" src/http_server/ --include="*.c"

echo ""
echo "=== Verify that use_caller_event_loop = FLB_FALSE means worker threads ==="
grep -B 10 -A 10 "use_caller_event_loop == FLB_FALSE" src/http_server/flb_http_server.c | head -40

Repository: fluent/fluent-bit

Length of output: 2507


Data race on shared hs buffers after moving HTTP server to worker thread

After this change the HTTP server runs on worker threads instead of integrating into the main event loop. The endpoint request handlers (e.g., cb_metrics, cb_storage in the API endpoints) now run concurrently with the engine's metric-push functions (flb_hs_push_metrics, flb_hs_push_pipeline_metrics, flb_hs_push_storage_metrics, flb_hs_push_health_metrics), with no synchronization:

  • Request handlers increment buf->users++ and read buf->data/buf->raw_data pointers (worker thread)
  • Push functions call flb_hs_buf_cleanup() which checks/writes users, pending_free, and nulls data/raw_data (main engine thread)
  • The users field in struct flb_hs_buf is plain int — not atomic — so concurrent increment/decrement can silently miscount, leading to use-after-free or leaks
  • No mutex or rwlock protects these fields

Add synchronization (e.g., pthread_mutex_t) guarding access to flb_hs_buf structures and hs->health_metrics list mutations before merge.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/http_server/flb_hs.c` around lines 373 - 375, The shared buffer fields in
struct flb_hs_buf (users, data, raw_data, pending_free) and the
hs->health_metrics list need mutual exclusion to prevent data races between
worker-thread request handlers (e.g., cb_metrics, cb_storage) and engine-thread
push functions (flb_hs_push_metrics, flb_hs_push_pipeline_metrics,
flb_hs_push_storage_metrics, flb_hs_push_health_metrics) and flb_hs_buf_cleanup;
add a pthread_mutex_t (or similar) to struct flb_hs_buf and a mutex protecting
hs->health_metrics, initialize/destroy them alongside the existing struct
lifecycle, and surround all accesses/updates to users/data/raw_data/pending_free
and any insert/remove operations on hs->health_metrics with the mutex (lock
before reading/updating, unlock after) so increment/decrement of users and
nulling/freeing of data are atomic and race-free.


ret = flb_http_server_init_with_options(&hs->server, &options);
if (ret != 0) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
service:
flush: 1
grace: 1
log_level: info
http_server: on
http_listen: 127.0.0.1
http_port: ${FLUENT_BIT_HTTP_MONITORING_PORT}

pipeline:
inputs:
- name: exec
tag: test
command: curl -s http://127.0.0.1:${FLUENT_BIT_HTTP_MONITORING_PORT}/api/v1/metrics/prometheus
interval_sec: 1
buf_size: 128k

outputs:
- name: "null"
match: "*"
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import os
import time

from utils.http_matrix import run_curl_request
from utils.test_service import FluentBitTestService


class Service:
def __init__(self):
self.config_file = os.path.abspath(
os.path.join(os.path.dirname(__file__), "../config/internal_http_server_exec_deadlock.yaml")
)
self.service = FluentBitTestService(self.config_file)

def start(self):
self.service.start()
self.flb = self.service.flb
self.base_url = f"http://127.0.0.1:{self.flb.http_monitoring_port}"

def stop(self):
self.service.stop()

def request(self, path, *, method="GET", http_mode="http1.1"):
return run_curl_request(
f"{self.base_url}{path}",
method=method,
http_mode=http_mode,
)


def test_http_server_responsive_after_exec_self_request():
"""The built-in HTTP server must remain responsive after the exec input
plugin makes an HTTP request to it. Before the fix, the exec child
process (curl) and the HTTP server shared the same event loop, causing
a deadlock that made the server permanently unresponsive."""

service = Service()
service.start()

try:
# Verify the server works before exec fires
result = service.request("/api/v1/uptime")
assert result["status_code"] == 200
assert "uptime_sec" in result["body"]

# Wait for exec to fire (interval_sec=1) plus a small buffer
time.sleep(2)

# Verify the server is still responsive after exec has fired
result = service.service.wait_for_condition(
lambda: (
response
if response["status_code"] == 200 and "uptime_sec" in response["body"]
else None
) if (response := service.request("/api/v1/uptime")) else None,
timeout=10,
interval=1,
description="HTTP server responsive after exec self-request",
)
assert result["status_code"] == 200
assert "uptime_sec" in result["body"]
finally:
service.stop()
Loading