downstream: re-architect workers support#11775
Conversation
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughAdds a downstream worker runtime API and implementation, then integrates it into HTTP server, and TCP/UDP/Forward inputs. Plugins gain a ChangesMulti-Worker Input Framework & Plugin Integration
Sequence Diagram(s)sequenceDiagram
participant Client as Client
participant Runtime as Downstream_Runtime
participant Worker as Worker (Thread/EventLoop)
participant Input as Plugin_Input
participant Engine as Engine
Client->>Runtime: connect / send packet
Runtime->>Worker: notify worker event loop (socket readable)
Worker->>Input: invoke collect handler (in_*_collect_ctx)
Input->>Input: decode/encode record
Input->>Input: call ingest helper (fw/tcp/udp_ingest_*)
alt use_ingress_queue
Input->>Engine: flb_input_ingress_queue_* (enqueue)
else direct_append
Input->>Engine: flb_input_*_append (direct append)
end
Worker->>Engine: FLB_ENGINE_EV_CUSTOM handlers dispatched
Runtime->>Worker: periodic cb_maintenance callbacks
Estimated code review effort🎯 4 (Complex) | ⏱️ ~50 minutes Possibly related PRs
Suggested reviewers
✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
|
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 9e576e909b
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| if (ctx->runtime != NULL) { | ||
| flb_downstream_worker_runtime_foreach(ctx->runtime, | ||
| in_tcp_worker_pause, | ||
| NULL); | ||
| return; |
There was a problem hiding this comment.
Close active TCP worker connections during pause
With workers > 1, in_tcp_pause exits after dispatching in_tcp_worker_pause, and that callback only sets flb_downstream_pause on each listener. Unlike the single-worker path, no worker connection list is drained (tcp_conn_del / pending_close), so already-accepted sockets keep their event handlers and can continue ingesting records during backpressure pauses.
Useful? React with 👍 / 👎.
| if (ctx->downstream != NULL) { | ||
| flb_downstream_pause(ctx->downstream); | ||
| ctx->is_paused = FLB_TRUE; | ||
| ctx->state = FW_INSTANCE_STATE_PAUSED; | ||
| } |
There was a problem hiding this comment.
Drop worker Forward connections when pausing ingestion
In worker mode, the pause callback only marks each worker context as paused and pauses the downstream listener, but it never closes existing worker connections. Because those connections remain registered, established clients can continue sending traffic while paused, which breaks the plugin’s own pause contract to close active connections under backpressure.
Useful? React with 👍 / 👎.
| runtime->workers[i].should_exit = FLB_TRUE; | ||
| if (runtime->workers[i].thread_created == FLB_TRUE) { | ||
| pthread_join(runtime->workers[i].thread, NULL); | ||
| } | ||
| downstream_worker_context_cleanup(&runtime->workers[i]); |
There was a problem hiding this comment.
Skip pthread cleanup for workers never initialized
Startup failure paths call flb_downstream_worker_runtime_stop, which currently destroys mutex/cond objects for every slot in worker_count. However, only workers that reached downstream_worker_context_reset were pthread-initialized; later slots were merely zeroed by calloc, so destroying them is undefined behavior and can crash when worker creation/init fails partway through startup.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Actionable comments posted: 7
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
plugins/in_forward/fw.c (1)
159-172:⚠️ Potential issue | 🟠 Major | ⚡ Quick winMove the trace log before
flb_downstream_conn_releaseto avoid logging a closed file descriptor.At line 168,
connection->fdis dereferenced afterflb_downstream_conn_release(connection)is called on line 167. While the connection struct remains in memory,prepare_destroy_conncloses the underlying socket viaflb_socket_close(connection->fd), making the fd a closed resource. Logging the fd after closure is semantically incorrect and should be moved before the release.Suggested fix
if(ctx->is_paused) { + flb_plg_trace(ctx->ins, "TCP connection will be closed FD=%i", connection->fd); flb_downstream_conn_release(connection); - flb_plg_trace(ctx->ins, "TCP connection will be closed FD=%i", connection->fd); ctx->state = state_backup; return -1; }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@plugins/in_forward/fw.c` around lines 159 - 172, The trace log is emitted after flb_downstream_conn_release(connection) which can close connection->fd; move the flb_plg_trace call so it runs before flb_downstream_conn_release to avoid logging a closed FD. Specifically, in the branch that checks ctx->is_paused, call flb_plg_trace(ctx->ins, "TCP connection will be closed FD=%i", connection->fd) before flb_downstream_conn_release(connection), then set ctx->state = state_backup and return -1, preserving the same behavior but ensuring the FD is logged while still valid.
🧹 Nitpick comments (3)
tests/integration/scenarios/in_udp/tests/test_in_udp_001.py (2)
75-81: 💤 Low valueMinor:
flattened_records()is evaluated twice per poll.The lambda in
wait_for_record_countcallsself.flattened_records()twice on every poll cycle (once for the length check, once to return the list). For larger payload counts this doubles the iteration work. Caching the call result avoids it.♻️ Optional refactor
def wait_for_record_count(self, minimum_count, timeout=10): + def _check(): + records = self.flattened_records() + return records if len(records) >= minimum_count else None + return self.service.wait_for_condition( - lambda: self.flattened_records() if len(self.flattened_records()) >= minimum_count else None, + _check, timeout=timeout, interval=0.2, description=f"{minimum_count} forwarded UDP payloads", )🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@tests/integration/scenarios/in_udp/tests/test_in_udp_001.py` around lines 75 - 81, The lambda passed to wait_for_condition in wait_for_record_count calls self.flattened_records() twice per poll, wasting work; modify the lambda so it calls self.flattened_records() once (e.g., assign to a local variable like records inside the lambda), then check len(records) against minimum_count and return records when ready. Update the lambda used in wait_for_record_count to reuse that single records value and keep the same timeout/interval/description parameters for wait_for_condition.
148-174: ⚡ Quick winThe config correctly uses
format: jsonto drop malformed datagrams.The
in_udp_json_workers.yamlfile usesformat: json(confirmed), which drops invalid records rather than forwarding them as fallback log lines. The test will work as intended—malformed datagrams are discarded and only valid records reach the assertion, so theKeyErrorrisk is not present here.That said, adding an explicit
assert len(records) == valid_recordscheck before the values assertion would provide a clearer failure message if the count unexpectedly diverges, improving test robustness.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@tests/integration/scenarios/in_udp/tests/test_in_udp_001.py` around lines 148 - 174, The test test_in_udp_workers_drop_malformed_datagrams_and_continue relies on wait_for_record_count but lacks an explicit check that the number of records equals valid_records; add an assertion like assert len(records) == valid_records immediately after records = service.wait_for_record_count(...) to produce a clearer failure message if counts differ before extracting values from records.tests/integration/scenarios/in_forward/tests/test_in_forward_001.py (1)
424-432: ⚡ Quick winConsolidate duplicate TCP-send helpers.
_drop_partial_tcp_payloadand_drop_raw_tls_connectionare byte-for-byte identical to the existing_send_tcp_payload— three copies of the same two-line helper. The differentiation only lives in the names (and_drop_raw_tls_connection's name is also misleading: it doesn't speak TLS, it just dumps raw bytes against a TLS port). Either drop the new helpers and call_send_tcp_payloadat the test sites with a brief inline comment, or keep one helper whose name explains the negative-path intent.♻️ Proposed consolidation
-def _drop_partial_tcp_payload(port, payload): - with socket.create_connection(("127.0.0.1", port), timeout=5) as sock: - sock.sendall(payload) - - def _send_unix_payload(path, payload): @@ -def _drop_raw_tls_connection(port, payload): - with socket.create_connection(("127.0.0.1", port), timeout=5) as sock: - sock.sendall(payload) - - def _recv_msgpack_value(sock):…and update the two call sites:
- lambda _: _drop_partial_tcp_payload(service.flb_listener_port, b"\x93\xa4test"), + # Send a truncated msgpack array and close to exercise partial-payload handling. + lambda _: _send_tcp_payload(service.flb_listener_port, b"\x93\xa4test"), @@ - lambda i: _drop_raw_tls_connection(service.flb_listener_port, - f"not-tls-{i}".encode("utf-8")), + # Send raw (non-TLS) bytes to a TLS port to exercise bad-handshake recovery. + lambda i: _send_tcp_payload(service.flb_listener_port, + f"not-tls-{i}".encode("utf-8")),Also applies to: 448-451
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@tests/integration/scenarios/in_forward/tests/test_in_forward_001.py` around lines 424 - 432, Duplicate two-line TCP helpers _drop_partial_tcp_payload and _drop_raw_tls_connection should be consolidated with the existing _send_tcp_payload (or replace all three with a single clearer-named helper like _send_raw_bytes_or_abort) — remove the duplicate functions, update all call sites that currently call _drop_partial_tcp_payload or _drop_raw_tls_connection to call the chosen single helper (_send_tcp_payload or the new name), and add a brief inline comment at those call sites indicating the negative-path intent (e.g., "send raw bytes and close to simulate partial/drop"). Ensure references to _send_tcp_payload, _drop_partial_tcp_payload and _drop_raw_tls_connection in tests are updated accordingly.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@plugins/in_forward/fw_prot.c`:
- Around line 1140-1151: The error paths for metrics and traces call
cmt_decode_msgpack_destroy(cmt) unconditionally and can double-free when
ownership was transferred to the ingress queue; update both error branches (the
metrics path around the fw_ingest_metrics call and the traces path around the
corresponding fw_ingest_traces call) to only call
cmt_decode_msgpack_destroy(cmt) when conn->ctx->use_ingress_queue == FLB_FALSE
(matching the success path), ensuring that when ingress queue ownership is used
the ingress event/queue cleanup (e.g., flb_input_ingress_event_destroy) remains
responsible for freeing cmt.
In `@plugins/in_tcp/tcp_conn.c`:
- Around line 143-145: The tcp_ingest_logs() calls currently ignore its return
value and can drop records; update both call sites (the two tcp_ingest_logs(ctx,
ctx->log_encoder->output_buffer, ctx->log_encoder->output_length) invocations)
to check the returned status, log a failure with context (include the returned
error code and any relevant ctx identifiers), and propagate the error out of the
enclosing function instead of silently continuing—i.e., if tcp_ingest_logs()
returns non-zero, call the logger on ctx (or appropriate logger) with a
descriptive message and return/forward a non-success code (or set the enclosing
function’s error return) so upstream code can handle the failure.
In `@plugins/in_tcp/tcp.c`:
- Around line 206-220: The worker pause path (in_tcp_worker_pause) currently
only calls flb_downstream_pause(ctx->downstream) and fails to walk
ctx->connections to drain/close worker-owned client sessions; update
in_tcp_worker_pause (and the analogous block around lines 361-366) to iterate
ctx->connections and for each connection perform the same shutdown/close steps
the single-listener path uses (e.g., stop reads, flush/drain pending data and
then close/destroy the connection using the existing connection shutdown/close
helpers), ensuring connections cannot continue reading after pause.
In `@plugins/in_udp/udp.c`:
- Around line 88-110: The startup path creates ctx->dummy_conn via udp_conn_add
but if collector registration fails (flb_input_set_collector_socket or
flb_input_collector_get_event returns NULL) the current error path calls
udp_config_destroy(ctx) without first destroying the dummy connection; modify
the failure branches in the block that sets the collector (the code around
flb_input_set_collector_socket/in_udp_collect and the subsequent
ctx->collector_event check) to call in_udp_dummy_conn_destroy(ctx) (or otherwise
free ctx->dummy_conn) before returning/calling udp_config_destroy so the dummy
connection is cleaned up; apply the same fix to the similar failure path around
lines 288-291.
In `@src/flb_downstream_worker.c`:
- Around line 84-96: The loop is reading worker->should_exit while
flb_downstream_worker_runtime_stop() writes it from another thread, causing a
data race; change the shutdown flag to be synchronized (either replace
worker->should_exit with an atomic type and use atomic load/store, or guard
accesses with the worker runtime mutex), update the writer in
flb_downstream_worker_runtime_stop() to use the same atomic/store or mutex, and
also fix the other occurrence noted (the second read/write pair around the later
shutdown code) so all reads/writes to the shutdown flag use the same
synchronization primitive.
- Around line 147-180: The shutdown path calls
flb_downstream_worker_runtime_stop() which unconditionally runs
downstream_worker_context_cleanup() over all runtime->workers, potentially
destroying mutex/cond for slots never initialized; fix by tracking how many
slots were actually set up and only cleaning those: either set
runtime->worker_count (or a new field like runtime->active_workers) to i before
calling flb_downstream_worker_runtime_stop(), or change
flb_downstream_worker_runtime_stop() to inspect each
runtime->workers[k].thread_created or runtime->workers[k].initialized and skip
cleanup for entries that are false so downstream_worker_context_cleanup() is
only called for properly reset/initialized workers (references:
downstream_worker_context_reset, downstream_worker_context_cleanup,
flb_downstream_worker_runtime_stop, runtime->workers, thread_created,
initialized).
In `@src/http_server/flb_http_server.c`:
- Around line 483-486: flb_http_server_start() failure path leaks the
partially-initialized per-worker server because flb_http_server_worker_init()
only assigns *worker_context on success and the normal
flb_http_server_worker_exit() won't be invoked; before freeing context on start
failure, explicitly call the worker cleanup routine (flb_http_server_worker_exit
or the server-destroy helper that releases context->server) to tear down
context->server, then free context, and make the identical change for the other
symmetric failure block that also frees context after a failed
flb_http_server_start().
---
Outside diff comments:
In `@plugins/in_forward/fw.c`:
- Around line 159-172: The trace log is emitted after
flb_downstream_conn_release(connection) which can close connection->fd; move the
flb_plg_trace call so it runs before flb_downstream_conn_release to avoid
logging a closed FD. Specifically, in the branch that checks ctx->is_paused,
call flb_plg_trace(ctx->ins, "TCP connection will be closed FD=%i",
connection->fd) before flb_downstream_conn_release(connection), then set
ctx->state = state_backup and return -1, preserving the same behavior but
ensuring the FD is logged while still valid.
---
Nitpick comments:
In `@tests/integration/scenarios/in_forward/tests/test_in_forward_001.py`:
- Around line 424-432: Duplicate two-line TCP helpers _drop_partial_tcp_payload
and _drop_raw_tls_connection should be consolidated with the existing
_send_tcp_payload (or replace all three with a single clearer-named helper like
_send_raw_bytes_or_abort) — remove the duplicate functions, update all call
sites that currently call _drop_partial_tcp_payload or _drop_raw_tls_connection
to call the chosen single helper (_send_tcp_payload or the new name), and add a
brief inline comment at those call sites indicating the negative-path intent
(e.g., "send raw bytes and close to simulate partial/drop"). Ensure references
to _send_tcp_payload, _drop_partial_tcp_payload and _drop_raw_tls_connection in
tests are updated accordingly.
In `@tests/integration/scenarios/in_udp/tests/test_in_udp_001.py`:
- Around line 75-81: The lambda passed to wait_for_condition in
wait_for_record_count calls self.flattened_records() twice per poll, wasting
work; modify the lambda so it calls self.flattened_records() once (e.g., assign
to a local variable like records inside the lambda), then check len(records)
against minimum_count and return records when ready. Update the lambda used in
wait_for_record_count to reuse that single records value and keep the same
timeout/interval/description parameters for wait_for_condition.
- Around line 148-174: The test
test_in_udp_workers_drop_malformed_datagrams_and_continue relies on
wait_for_record_count but lacks an explicit check that the number of records
equals valid_records; add an assertion like assert len(records) == valid_records
immediately after records = service.wait_for_record_count(...) to produce a
clearer failure message if counts differ before extracting values from records.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 71fad1c8-6e23-49bd-8a3f-8580866af9a0
📒 Files selected for processing (26)
include/fluent-bit/flb_downstream_worker.hinclude/fluent-bit/http_server/flb_http_server.hplugins/in_forward/fw.cplugins/in_forward/fw.hplugins/in_forward/fw_config.cplugins/in_forward/fw_prot.cplugins/in_tcp/tcp.cplugins/in_tcp/tcp.hplugins/in_tcp/tcp_config.cplugins/in_tcp/tcp_conn.cplugins/in_udp/udp.cplugins/in_udp/udp.hplugins/in_udp/udp_config.cplugins/in_udp/udp_conn.csrc/CMakeLists.txtsrc/flb_downstream_worker.csrc/http_server/flb_http_server.ctests/integration/scenarios/in_forward/config/in_forward_tls_workers.yamltests/integration/scenarios/in_forward/config/in_forward_workers.yamltests/integration/scenarios/in_forward/tests/test_in_forward_001.pytests/integration/scenarios/in_tcp/config/in_tcp_parser_json_tls_workers.yamltests/integration/scenarios/in_tcp/config/in_tcp_parser_json_workers.yamltests/integration/scenarios/in_tcp/tests/test_in_tcp_001.pytests/integration/scenarios/in_udp/config/in_udp_json_workers.yamltests/integration/scenarios/in_udp/config/in_udp_parser_json_workers.yamltests/integration/scenarios/in_udp/tests/test_in_udp_001.py
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@tests/integration/scenarios/in_forward/tests/test_in_forward_001.py`:
- Around line 619-620: The startup log wait call
service.wait_for_log_message("with 4 workers") (and the other similar
wait_for_log_message calls) must be moved inside the try/finally that surrounds
the test runtime so service.stop() always runs; modify each affected test to
enter the try block first, call wait_for_log_message(...) inside that try, then
keep service.stop() in the finally block (ensuring the same unique symbols
service.wait_for_log_message and service.stop are used and no early returns
occur before the finally).
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: de10bd2e-7153-43d6-96b3-d5d3bec44ca9
📒 Files selected for processing (10)
include/fluent-bit/flb_downstream_worker.hplugins/in_forward/fw.cplugins/in_forward/fw_prot.cplugins/in_tcp/tcp.cplugins/in_tcp/tcp_conn.cplugins/in_udp/udp.csrc/flb_downstream_worker.csrc/http_server/flb_http_server.ctests/integration/scenarios/in_forward/tests/test_in_forward_001.pytests/integration/scenarios/in_udp/tests/test_in_udp_001.py
✅ Files skipped from review due to trivial changes (2)
- include/fluent-bit/flb_downstream_worker.h
- plugins/in_forward/fw.c
🚧 Files skipped from review as they are similar to previous changes (5)
- plugins/in_forward/fw_prot.c
- plugins/in_udp/udp.c
- tests/integration/scenarios/in_udp/tests/test_in_udp_001.py
- src/http_server/flb_http_server.c
- plugins/in_tcp/tcp.c
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
There was a problem hiding this comment.
🧹 Nitpick comments (1)
tests/integration/scenarios/in_forward/tests/test_in_forward_001.py (1)
130-133: ⚡ Quick winUse monotonic time for timeout loops to avoid clock-shift flakiness.
wait_for_log_message()uses wall-clock time (time.time()), which can make timeout behavior unstable if system time changes during test execution. Prefertime.monotonic()for deadline checks.Suggested patch
def wait_for_log_message(self, pattern, timeout=10, interval=0.25): - deadline = time.time() + timeout - while time.time() < deadline: + deadline = time.monotonic() + timeout + while time.monotonic() < deadline: if self.flb and self.flb.log_file and os.path.exists(self.flb.log_file): with open(self.flb.log_file, "r", encoding="utf-8", errors="replace") as log_file: if pattern in log_file.read(): return True time.sleep(interval)Also applies to: 137-137
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@tests/integration/scenarios/in_forward/tests/test_in_forward_001.py` around lines 130 - 133, The timeout loop in wait_for_log_message() uses time.time(), which can be shifted by system clock changes; change both occurrences (the initial deadline calculation and the while condition) to use time.monotonic() instead (e.g., set deadline = time.monotonic() + timeout and compare time.monotonic() < deadline) so the deadline check is stable; update any other timeout loops in the same test file that use time.time() in the same manner.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Nitpick comments:
In `@tests/integration/scenarios/in_forward/tests/test_in_forward_001.py`:
- Around line 130-133: The timeout loop in wait_for_log_message() uses
time.time(), which can be shifted by system clock changes; change both
occurrences (the initial deadline calculation and the while condition) to use
time.monotonic() instead (e.g., set deadline = time.monotonic() + timeout and
compare time.monotonic() < deadline) so the deadline check is stable; update any
other timeout loops in the same test file that use time.time() in the same
manner.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 5da3b860-4f7a-47e4-9009-3e87a525c860
📒 Files selected for processing (1)
tests/integration/scenarios/in_forward/tests/test_in_forward_001.py
thanks for the suggestion. Looking at the issue , there is the lack of a generic atomic API in Fluent Bit, however we already did this in CMetrics, so to avoid situations like this instead of tweaking the compiler options for windows a more general solution is to move that existing atomic API inside CFL, working on that in fluent/cfl#76 , once that is merged, I will upgrade the CFL version in this branch |
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
There was a problem hiding this comment.
Actionable comments posted: 3
♻️ Duplicate comments (1)
lib/cfl/.github/workflows/build.yaml (1)
20-20:⚠️ Potential issue | 🔴 Critical | ⚡ Quick winVerify that
actions/checkout@v6exists.Similar to the packages workflow, this workflow bumps
actions/checkoutfrom v4 to v6. Please verify this version exists using the web search in the previous comment.Also applies to: 36-36, 129-129, 145-145, 174-174, 200-200
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@lib/cfl/.github/workflows/build.yaml` at line 20, Confirm that the GitHub Action reference "uses: actions/checkout@v6" is a valid published version in the Actions marketplace; search for actions/checkout tags/releases and if v6 does not exist replace the occurrences with a valid tag (e.g., actions/checkout@v4 or the latest confirmed tag) or use the stable "actions/checkout@v3" pattern, then update all instances of the string ("uses: actions/checkout@v6") found in this workflow and the other occurrences noted (lines referenced as 36, 129, 145, 174, 200) so the workflow points to an existing, supported checkout action.
🧹 Nitpick comments (2)
lib/cfl/src/cfl_kvlist.c (1)
405-453: 💤 Low valueCycle/aliasing prevention in
cfl_kvlist_insert_slooks correct, but worth a brief comment block.The four-step check (variant identity reachable from
list, variant containslist, underlying array contained, underlying kvlist contained) is comprehensive and prevents both double-ownership of the samecfl_variant *and structural cycles. The first check (line 415) is doing alias prevention rather than cycle prevention — i.e., it rejects reinserting an already-owned variant, which previously could lead to double-free incfl_kvlist_destroy. The container-specific checks (425/430) are needed because a fresh variant may wrap an already-nested array/kvlist.Consider expanding the comment at line 419 to also mention the alias-prevention intent of the first check, since the rationale isn't obvious.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@lib/cfl/src/cfl_kvlist.c` around lines 405 - 453, In cfl_kvlist_insert_s, expand the existing comment before the container checks to explicitly state that the first check (cfl_container_kvlist_contains_variant(list, value)) is for alias-prevention (rejecting reinsertion of an already-owned cfl_variant to avoid double-free), while the subsequent checks (cfl_container_variant_contains_kvlist(value, list), cfl_container_kvlist_contains_array(list, value->data.as_array), and cfl_container_kvlist_contains_kvlist(list, value->data.as_kvlist)) are for true cycle detection of container-valued variants; update the comment block to mention both purposes and reference these function names so readers understand the distinction and rationale.lib/cfl/src/cfl_sds.c (1)
186-258: 💤 Low valueSelf-append detection via cross-object pointer arithmetic is implementation-defined.
The
(uintptr_t)strvs(uintptr_t)scomparison at lines 221–224 only has well-defined semantics when both pointers refer to the same object. For unrelated allocations the result is implementation-defined, though it works reliably on flat-address platforms (all currently supported targets). Functionally safe because:
- A false positive still resolves
s + source_offsetto the same address asstron flat-address systems, and theappend_len - 1 > head->alloc - source_offsetbound is enforced.- After
cfl_sds_increase()potentially relocates the buffer, the offset is reapplied (line 244), preventing use-after-free on the stalestr.
memmovecorrectly handles the overlap case. Worth a short comment noting the flat-address assumption for future maintainers.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@lib/cfl/src/cfl_sds.c` around lines 186 - 258, The self-append detection in cfl_sds_cat uses cross-object pointer arithmetic (comparing buffer_addr and source_addr) which is implementation-defined; update cfl_sds_cat to document this by adding a short comment above the buffer_addr/source_addr check (referencing buffer_addr, source_addr, source_in_buffer and the subsequent use of cfl_sds_increase and s + source_offset) stating the code relies on flat-address platforms and explaining why the current bounds check and re-application of source_offset after cfl_sds_increase make this safe; no logic changes required—just add the explanatory comment so future maintainers understand the assumption and rationale.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@lib/cfl/tests/atomic_operations.c`:
- Around line 129-139: The thread creation loop does not handle CreateThread
failures and later blindly calls WaitForSingleObject/CloseHandle on possibly
NULL handles; modify the CreateThread loop that spawns THREAD_COUNT threads
(calling CreateThread with worker_thread_add_through_compare_exchange,
threads[], and thread_ids[]) to check the return value for NULL immediately and
record the failure (e.g., set a boolean or increment a failure counter and call
TEST_CHECK(FALSE) or similar). In the subsequent join loop that iterates
THREAD_COUNT and calls WaitForSingleObject/CloseHandle, only call those
functions for entries where threads[i] is non-NULL (valid handle); for any
failed slot ensure you surface a clear test failure via TEST_CHECK/result
handling instead of using the invalid handle. Ensure thread_ids handling is
consistent for both success and failure cases.
In `@lib/cfl/tests/kvlist.c`:
- Around line 1256-1263: The test currently calls tmpfile() and only uses
TEST_CHECK(fp != NULL) which doesn’t stop execution on failure, leading to a
possible NULL dereference in cfl_kvlist_print; update the test around tmpfile()
to fail fast if fp is NULL (e.g., after TEST_CHECK(fp != NULL) add an immediate
exit/return or TEST_FAIL/TEST_FAIL_MESSAGE) so that cfl_kvlist_print and
subsequent calls (cfl_kvlist_print, compare) are not invoked with a NULL FILE*.
In `@lib/cfl/tests/object.c`:
- Around line 104-111: The test currently calls tmpfile() and uses TEST_CHECK(fp
!= NULL) but continues execution, risking a NULL dereference when later calling
compare(fp, ...) or cfl_object_print; update each test block (around tmpfile()
usage and calls to cfl_object_print/compare) to check fp immediately after
tmpfile() and return or jump to the test cleanup when fp == NULL (e.g., early
return or goto cleanup) so no further operations use a NULL FILE*; ensure the
change is applied to all instances referenced (lines near the fp assignments at
the shown blocks).
---
Duplicate comments:
In `@lib/cfl/.github/workflows/build.yaml`:
- Line 20: Confirm that the GitHub Action reference "uses: actions/checkout@v6"
is a valid published version in the Actions marketplace; search for
actions/checkout tags/releases and if v6 does not exist replace the occurrences
with a valid tag (e.g., actions/checkout@v4 or the latest confirmed tag) or use
the stable "actions/checkout@v3" pattern, then update all instances of the
string ("uses: actions/checkout@v6") found in this workflow and the other
occurrences noted (lines referenced as 36, 129, 145, 174, 200) so the workflow
points to an existing, supported checkout action.
---
Nitpick comments:
In `@lib/cfl/src/cfl_kvlist.c`:
- Around line 405-453: In cfl_kvlist_insert_s, expand the existing comment
before the container checks to explicitly state that the first check
(cfl_container_kvlist_contains_variant(list, value)) is for alias-prevention
(rejecting reinsertion of an already-owned cfl_variant to avoid double-free),
while the subsequent checks (cfl_container_variant_contains_kvlist(value, list),
cfl_container_kvlist_contains_array(list, value->data.as_array), and
cfl_container_kvlist_contains_kvlist(list, value->data.as_kvlist)) are for true
cycle detection of container-valued variants; update the comment block to
mention both purposes and reference these function names so readers understand
the distinction and rationale.
In `@lib/cfl/src/cfl_sds.c`:
- Around line 186-258: The self-append detection in cfl_sds_cat uses
cross-object pointer arithmetic (comparing buffer_addr and source_addr) which is
implementation-defined; update cfl_sds_cat to document this by adding a short
comment above the buffer_addr/source_addr check (referencing buffer_addr,
source_addr, source_in_buffer and the subsequent use of cfl_sds_increase and s +
source_offset) stating the code relies on flat-address platforms and explaining
why the current bounds check and re-application of source_offset after
cfl_sds_increase make this safe; no logic changes required—just add the
explanatory comment so future maintainers understand the assumption and
rationale.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: ee9426b1-38d2-419b-9fdd-f55046c69ab6
📒 Files selected for processing (48)
include/fluent-bit/flb_downstream_worker.hlib/cfl/.github/workflows/build.yamllib/cfl/.github/workflows/lint.yamllib/cfl/.github/workflows/packages.yamllib/cfl/AGENTS.mdlib/cfl/CMakeLists.txtlib/cfl/README.mdlib/cfl/include/cfl/cfl.hlib/cfl/include/cfl/cfl_array.hlib/cfl/include/cfl/cfl_atomic.hlib/cfl/include/cfl/cfl_checksum.hlib/cfl/include/cfl/cfl_container.hlib/cfl/include/cfl/cfl_kv.hlib/cfl/include/cfl/cfl_kvlist.hlib/cfl/include/cfl/cfl_list.hlib/cfl/include/cfl/cfl_object.hlib/cfl/include/cfl/cfl_sds.hlib/cfl/include/cfl/cfl_time.hlib/cfl/include/cfl/cfl_utils.hlib/cfl/include/cfl/cfl_variant.hlib/cfl/src/CMakeLists.txtlib/cfl/src/cfl.clib/cfl/src/cfl_array.clib/cfl/src/cfl_atomic_clang.clib/cfl/src/cfl_atomic_gcc.clib/cfl/src/cfl_atomic_generic.clib/cfl/src/cfl_atomic_msvc.clib/cfl/src/cfl_checksum.clib/cfl/src/cfl_container.clib/cfl/src/cfl_kv.clib/cfl/src/cfl_kvlist.clib/cfl/src/cfl_object.clib/cfl/src/cfl_sds.clib/cfl/src/cfl_utils.clib/cfl/src/cfl_variant.clib/cfl/tests/CMakeLists.txtlib/cfl/tests/array.clib/cfl/tests/atomic_operations.clib/cfl/tests/checksum.clib/cfl/tests/headers.clib/cfl/tests/kv.clib/cfl/tests/kvlist.clib/cfl/tests/object.clib/cfl/tests/sds.clib/cfl/tests/utils.clib/cfl/tests/variant.csrc/flb_downstream_worker.csrc/http_server/flb_http_server_http1.c
✅ Files skipped from review due to trivial changes (9)
- lib/cfl/include/cfl/cfl_utils.h
- lib/cfl/.github/workflows/lint.yaml
- lib/cfl/include/cfl/cfl_kvlist.h
- lib/cfl/CMakeLists.txt
- lib/cfl/tests/headers.c
- lib/cfl/AGENTS.md
- lib/cfl/tests/checksum.c
- lib/cfl/include/cfl/cfl_kv.h
- lib/cfl/include/cfl/cfl_atomic.h
🚧 Files skipped from review as they are similar to previous changes (2)
- include/fluent-bit/flb_downstream_worker.h
- src/flb_downstream_worker.c
| for (thread_index = 0; thread_index < THREAD_COUNT; thread_index++) { | ||
| threads[thread_index] = CreateThread(NULL, 0, | ||
| worker_thread_add_through_compare_exchange, | ||
| NULL, 0, &thread_ids[thread_index]); | ||
| } | ||
|
|
||
| for (thread_index = 0; thread_index < THREAD_COUNT; thread_index++) { | ||
| result = WaitForSingleObject(threads[thread_index], INFINITE); | ||
| TEST_CHECK(result == WAIT_OBJECT_0); | ||
| CloseHandle(threads[thread_index]); | ||
| } |
There was a problem hiding this comment.
Handle thread creation failures before join/wait.
Line 130 and Line 157 can fail, but the code still waits/joins every slot. That can hit invalid handles/IDs and crash the test path instead of reporting a clean assertion failure.
Suggested fix
@@
- for (thread_index = 0; thread_index < THREAD_COUNT; thread_index++) {
+ int created = 0;
+ for (thread_index = 0; thread_index < THREAD_COUNT; thread_index++) {
threads[thread_index] = CreateThread(NULL, 0,
worker_thread_add_through_compare_exchange,
NULL, 0, &thread_ids[thread_index]);
+ if (!TEST_CHECK(threads[thread_index] != NULL)) {
+ break;
+ }
+ created++;
}
- for (thread_index = 0; thread_index < THREAD_COUNT; thread_index++) {
+ for (thread_index = 0; thread_index < created; thread_index++) {
result = WaitForSingleObject(threads[thread_index], INFINITE);
TEST_CHECK(result == WAIT_OBJECT_0);
CloseHandle(threads[thread_index]);
@@
- for (thread_index = 0; thread_index < THREAD_COUNT; thread_index++) {
+ int created = 0;
+ for (thread_index = 0; thread_index < THREAD_COUNT; thread_index++) {
result = pthread_create(&threads[thread_index], NULL,
worker_thread_add_through_compare_exchange, NULL);
- TEST_CHECK(result == 0);
+ if (!TEST_CHECK(result == 0)) {
+ break;
+ }
+ created++;
}
- for (thread_index = 0; thread_index < THREAD_COUNT; thread_index++) {
+ for (thread_index = 0; thread_index < created; thread_index++) {
result = pthread_join(threads[thread_index], NULL);
TEST_CHECK(result == 0);
}Also applies to: 156-165
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@lib/cfl/tests/atomic_operations.c` around lines 129 - 139, The thread
creation loop does not handle CreateThread failures and later blindly calls
WaitForSingleObject/CloseHandle on possibly NULL handles; modify the
CreateThread loop that spawns THREAD_COUNT threads (calling CreateThread with
worker_thread_add_through_compare_exchange, threads[], and thread_ids[]) to
check the return value for NULL immediately and record the failure (e.g., set a
boolean or increment a failure counter and call TEST_CHECK(FALSE) or similar).
In the subsequent join loop that iterates THREAD_COUNT and calls
WaitForSingleObject/CloseHandle, only call those functions for entries where
threads[i] is non-NULL (valid handle); for any failed slot ensure you surface a
clear test failure via TEST_CHECK/result handling instead of using the invalid
handle. Ensure thread_ids handling is consistent for both success and failure
cases.
| fp = tmpfile(); | ||
| TEST_CHECK(fp != NULL); | ||
|
|
||
| ret = cfl_kvlist_print(fp, list); | ||
| TEST_CHECK(ret > 0); | ||
|
|
||
| ret = compare(fp, "{\"a\\\"b\\n\":\"v\\n\"}"); | ||
| TEST_CHECK(ret == 0); |
There was a problem hiding this comment.
Fail fast after tmpfile() check to avoid possible NULL dereference.
TEST_CHECK(fp != NULL) alone does not stop execution. If tmpfile() fails, the subsequent print call can pass NULL fp.
Suggested fix
static void print_escaped_keys()
{
int ret;
FILE *fp;
struct cfl_kvlist *list;
list = cfl_kvlist_create();
- TEST_CHECK(list != NULL);
+ if (!TEST_CHECK(list != NULL)) {
+ return;
+ }
ret = cfl_kvlist_insert_string(list, "a\"b\n", "v\n");
TEST_CHECK(ret == 0);
fp = tmpfile();
- TEST_CHECK(fp != NULL);
+ if (!TEST_CHECK(fp != NULL)) {
+ cfl_kvlist_destroy(list);
+ return;
+ }
ret = cfl_kvlist_print(fp, list);
TEST_CHECK(ret > 0);🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@lib/cfl/tests/kvlist.c` around lines 1256 - 1263, The test currently calls
tmpfile() and only uses TEST_CHECK(fp != NULL) which doesn’t stop execution on
failure, leading to a possible NULL dereference in cfl_kvlist_print; update the
test around tmpfile() to fail fast if fp is NULL (e.g., after TEST_CHECK(fp !=
NULL) add an immediate exit/return or TEST_FAIL/TEST_FAIL_MESSAGE) so that
cfl_kvlist_print and subsequent calls (cfl_kvlist_print, compare) are not
invoked with a NULL FILE*.
| fp = tmpfile(); | ||
| TEST_CHECK(fp != NULL); | ||
|
|
||
| ret = cfl_object_print(fp, object); | ||
| TEST_CHECK(ret == 0); | ||
|
|
||
| ret = compare(fp, "\"second\"\n"); | ||
| TEST_CHECK(ret == 0); |
There was a problem hiding this comment.
Return early when tmpfile() fails before calling compare().
TEST_CHECK(fp != NULL) logs failure but execution continues. If tmpfile() returns NULL, later compare(fp, ...) dereferences a null FILE *.
Suggested fix pattern
fp = tmpfile();
- TEST_CHECK(fp != NULL);
+ if (!TEST_CHECK(fp != NULL)) {
+ cfl_object_destroy(object);
+ return;
+ }
ret = cfl_object_print(fp, object);
TEST_CHECK(ret == 0);Also applies to: 141-148, 178-185, 217-224, 256-263, 295-302
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@lib/cfl/tests/object.c` around lines 104 - 111, The test currently calls
tmpfile() and uses TEST_CHECK(fp != NULL) but continues execution, risking a
NULL dereference when later calling compare(fp, ...) or cfl_object_print; update
each test block (around tmpfile() usage and calls to cfl_object_print/compare)
to check fp immediately after tmpfile() and return or jump to the test cleanup
when fp == NULL (e.g., early return or goto cleanup) so no further operations
use a NULL FILE*; ensure the change is applied to all instances referenced
(lines near the fp assignments at the shown blocks).
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
There was a problem hiding this comment.
🧹 Nitpick comments (2)
src/flb_downstream_worker.c (1)
249-267: ⚡ Quick winShutdown can stall up to ~250 ms per worker; wake the event loop via the control pipe.
flb_downstream_worker_runtime_stop()setsshould_exitand immediatelypthread_join()s, but the worker thread only re-evaluates the flag aftermk_event_wait_2(..., 250)returns. With many workers (and the runtime is now used by HTTP server, in_tcp, in_udp, in_forward) shutdown latency adds up linearly. Writing a byte to each worker's control channel before joining wakes the loop immediately, and the existing control-event handler already tolerates a no-op signal (callback isNULL).♻️ Suggested change
void flb_downstream_worker_runtime_stop(struct flb_downstream_worker_runtime *runtime) { int i; + uint64_t signal = 1; if (runtime == NULL) { return; } for (i = 0; i < runtime->active_workers; i++) { cfl_atomic_store(&runtime->workers[i].should_exit, FLB_TRUE); + if (runtime->workers[i].control_channel_created == FLB_TRUE) { + flb_pipe_w(runtime->workers[i].control_channel[1], + &signal, sizeof(signal)); + } if (runtime->workers[i].thread_created == FLB_TRUE) { pthread_join(runtime->workers[i].thread, NULL); } downstream_worker_context_cleanup(&runtime->workers[i]); } flb_free(runtime->workers); flb_free(runtime); }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/flb_downstream_worker.c` around lines 249 - 267, flb_downstream_worker_runtime_stop currently sets workers[i].should_exit and immediately pthread_join() which can block until the worker's mk_event_wait_2 timeout; to fix this, after setting should_exit for each runtime->workers[i] and before pthread_join, write a single byte to that worker's control pipe/file-descriptor (the control FD the event loop watches) to wake the event loop immediately (the control-event handler tolerates no-op signals), then proceed with pthread_join and downstream_worker_context_cleanup(&runtime->workers[i]).include/fluent-bit/flb_downstream_worker.h (1)
49-70: 💤 Low valueConsider documenting the ownership/threading contract for
struct flb_downstream_worker's public fields.This is a public header used by
in_tcp,in_udp,in_forward, and the HTTP server. The struct mixes fields the runtime owns and writes (e.g.event_loop,control_event,mutex,condition,should_exit,control_done,startup_result,initialized,thread_created,control_channel_created) with fields callbacks may legitimately read or write (context,parent,worker_id,worker_count). Without a short comment block, downstream plugins are likely to touch the wrong fields (e.g. mutateshould_exitdirectly, readcontrol_donewithout holdingmutex, or writecontextfrom a foreach callback). A few/* runtime-owned, do not modify *///* set by cb_init only */comments would prevent that without changing the API.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@include/fluent-bit/flb_downstream_worker.h` around lines 49 - 70, Add concise ownership and threading comments to the public struct flb_downstream_worker to clarify which fields are runtime-owned and which may be touched by plugins/callbacks: annotate fields like event_loop, control_event, mutex, condition, should_exit, control_done, startup_result, initialized, thread_created, control_channel_created as "runtime-owned / do not modify" and indicate that reads must be synchronized (e.g., hold mutex). Mark context, parent, worker_id, worker_count and control_callback/control_data as "set by cb_init / safe for callbacks" or "readable/writable by plugins" and document any required synchronization (e.g., update only from the foreach callback or while holding mutex). Ensure the comments sit right above or beside each group of fields in struct flb_downstream_worker so downstream users can easily follow the ownership/threading contract.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Nitpick comments:
In `@include/fluent-bit/flb_downstream_worker.h`:
- Around line 49-70: Add concise ownership and threading comments to the public
struct flb_downstream_worker to clarify which fields are runtime-owned and which
may be touched by plugins/callbacks: annotate fields like event_loop,
control_event, mutex, condition, should_exit, control_done, startup_result,
initialized, thread_created, control_channel_created as "runtime-owned / do not
modify" and indicate that reads must be synchronized (e.g., hold mutex). Mark
context, parent, worker_id, worker_count and control_callback/control_data as
"set by cb_init / safe for callbacks" or "readable/writable by plugins" and
document any required synchronization (e.g., update only from the foreach
callback or while holding mutex). Ensure the comments sit right above or beside
each group of fields in struct flb_downstream_worker so downstream users can
easily follow the ownership/threading contract.
In `@src/flb_downstream_worker.c`:
- Around line 249-267: flb_downstream_worker_runtime_stop currently sets
workers[i].should_exit and immediately pthread_join() which can block until the
worker's mk_event_wait_2 timeout; to fix this, after setting should_exit for
each runtime->workers[i] and before pthread_join, write a single byte to that
worker's control pipe/file-descriptor (the control FD the event loop watches) to
wake the event loop immediately (the control-event handler tolerates no-op
signals), then proceed with pthread_join and
downstream_worker_context_cleanup(&runtime->workers[i]).
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: c9470cca-e45f-4c78-a0d0-dc224c980c94
📒 Files selected for processing (3)
include/fluent-bit/flb_downstream_worker.hplugins/in_forward/fw.csrc/flb_downstream_worker.c
🚧 Files skipped from review as they are similar to previous changes (1)
- plugins/in_forward/fw.c
fixes #11757
Adds shared downstream worker support and enables workers for downstream-based inputs:
Also refactors flb_http_server to use the same shared downstream worker runtime, avoiding
duplicated worker lifecycle code across HTTP and non-HTTP downstream listeners.
Scope
Compatibility Notes
fan-out.
Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.
Summary by CodeRabbit