Skip to content

downstream: re-architect workers support#11775

Open
edsiper wants to merge 18 commits into
masterfrom
downstream-workers
Open

downstream: re-architect workers support#11775
edsiper wants to merge 18 commits into
masterfrom
downstream-workers

Conversation

@edsiper
Copy link
Copy Markdown
Member

@edsiper edsiper commented May 4, 2026

fixes #11757

Adds shared downstream worker support and enables workers for downstream-based inputs:

  • in_tcp
  • in_udp
  • in_forward

Also refactors flb_http_server to use the same shared downstream worker runtime, avoiding
duplicated worker lifecycle code across HTTP and non-HTTP downstream listeners.

Scope

  • Adds flb_downstream_worker helper for worker lifecycle:
    • thread startup/shutdown
    • per-worker event loop
    • DNS context setup
    • custom event dispatch
    • startup synchronization
    • safe join/cleanup
    • maintenance callbacks
  • Adds workers config option to:
    • in_tcp
    • in_udp
    • in_forward
  • Keeps default behavior unchanged:
    • workers defaults to 1
  • Adds integration coverage for:
    • concurrent TCP workers
    • concurrent UDP workers
    • concurrent Forward workers
    • dropped/partial connections
    • malformed UDP datagrams
    • TCP TLS workers
    • Forward TLS workers
    • bad TLS handshakes followed by valid traffic

Compatibility Notes

  • Default behavior remains single listener worker.
  • in_forward workers are supported for TCP listeners; Unix socket mode still runs without worker
    fan-out.
  • in_udp has no TLS worker test because the plugin does not expose TLS support.

Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.

Summary by CodeRabbit

  • New Features
    • TCP, UDP and Forward inputs now support configurable multi‑worker listeners with a shared downstream worker runtime and optional ingress routing for improved concurrency; HTTP server adopts the same worker model.
  • Tests
    • New integration scenarios and tests covering multi‑worker concurrency, TLS/non‑TLS, malformed/partial connections, and recovery.
  • Documentation
    • Added contributor guidelines and refreshed library README.
  • Chores
    • CI workflow updates and new public atomic/container APIs in the CFL library.

edsiper added 6 commits May 4, 2026 17:24
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented May 4, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

Adds a downstream worker runtime API and implementation, then integrates it into HTTP server, and TCP/UDP/Forward inputs. Plugins gain a workers configuration, per-worker event-loop listeners, ingress-queue routing helpers, and integration/unit tests; CFL utilities receive safety hardening and atomic/container APIs.

Changes

Multi-Worker Input Framework & Plugin Integration

Layer / File(s) Summary
Public API
include/fluent-bit/flb_downstream_worker.h, include/fluent-bit/http_server/flb_http_server.h
New public header: callback typedefs (init, exit, maintenance, foreach), flb_downstream_worker & flb_downstream_worker_options structs, and runtime APIs start, stop, foreach; HTTP server runtime member updated to downstream runtime.
Core Runtime
src/flb_downstream_worker.c, src/CMakeLists.txt
Implements downstream worker runtime: per-worker event loop, init/maintenance/exit sequencing, startup synchronization and signaling, shutdown/join logic; added to build list.
HTTP Server Migration
src/http_server/flb_http_server.c
Replaced pthread-per-worker code with downstream-worker callbacks; added per-worker init/exit/maintenance hooks and changed runtime start/stop to use the new API.
Plugin Headers & Helpers
plugins/in_forward/fw.h, plugins/in_tcp/tcp.h, plugins/in_udp/udp.h
Added runtime pointer and worker/listener state fields (workers, worker_id, use_ingress_queue, listener_registered, listener_event, event_loop, net setup); added inline ingestion helpers (fw_ingest_*, udp_ingest_logs).
Config Init / Destroy
plugins/in_forward/fw_config.c, plugins/in_tcp/tcp_config.c, plugins/in_udp/udp_config.c
Initialize workers to 1 in init; on destroy, conditionally unregister listener event from worker event loop (mk_event_del) when registered.
Plugin Startup / Wiring
plugins/in_forward/fw.c, plugins/in_tcp/tcp.c, plugins/in_udp/udp.c
Added workers config option (INT, default 1). Startup chooses multi-worker runtime (enable ingress, start downstream runtime) when workers > 1, otherwise uses single collector-socket listener; added per-plugin worker init/exit/maintenance and pause/resume dispatching.
Data Path Changes
plugins/in_forward/fw_prot.c, plugins/in_tcp/tcp_conn.c, plugins/in_udp/udp_conn.c
Replaced direct flb_input_*_append() calls with plugin-specific ingestion helpers that route via ingress queue when enabled; adjusted ownership/cleanup of decoded metric/trace objects when ingress queue is used.
Integration Tests & Scenarios
tests/integration/scenarios/in_*/config/*workers*.yaml, tests/integration/scenarios/in_*/tests/test_in_*_001.py
Added worker-enabled YAML scenarios and tests for Forward/TCP/UDP: concurrent sends, partial/malformed connection resilience, TLS handshake-failure cases; test helpers for log polling and record-count waiting.
CFL Library Hardening & Tests
lib/cfl/include/*, lib/cfl/src/*, lib/cfl/tests/*, CMake changes, CI workflow updates
Added atomic API and multiple backends, container containment APIs, defensive null/overflow checks across many modules (sds, array, kvlist, variant, utils), expanded unit tests, and updated CI workflows and packaging actions.
Configs & docs
lib/cfl/AGENTS.md, README updates
Adds repository agent guidelines and README revisions for CFL module docs.

Sequence Diagram(s)

sequenceDiagram
    participant Client as Client
    participant Runtime as Downstream_Runtime
    participant Worker as Worker (Thread/EventLoop)
    participant Input as Plugin_Input
    participant Engine as Engine

    Client->>Runtime: connect / send packet
    Runtime->>Worker: notify worker event loop (socket readable)
    Worker->>Input: invoke collect handler (in_*_collect_ctx)
    Input->>Input: decode/encode record
    Input->>Input: call ingest helper (fw/tcp/udp_ingest_*)
    alt use_ingress_queue
        Input->>Engine: flb_input_ingress_queue_* (enqueue)
    else direct_append
        Input->>Engine: flb_input_*_append (direct append)
    end
    Worker->>Engine: FLB_ENGINE_EV_CUSTOM handlers dispatched
    Runtime->>Worker: periodic cb_maintenance callbacks
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~50 minutes

Possibly related PRs

Suggested reviewers

  • cosmo0920
  • niedbalski
  • patrick-stephens

🐰 I stitched the loops and spun each thread,
Event loops hummed where packets sped,
Workers nibble, queues align,
Records hop in tidy line,
A rabbit cheers: the pipeline's fed.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch downstream-workers

@edsiper edsiper added this to the Fluent Bit v5.1 milestone May 4, 2026
Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 9e576e909b

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment thread plugins/in_tcp/tcp.c
Comment on lines +361 to +365
if (ctx->runtime != NULL) {
flb_downstream_worker_runtime_foreach(ctx->runtime,
in_tcp_worker_pause,
NULL);
return;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Close active TCP worker connections during pause

With workers > 1, in_tcp_pause exits after dispatching in_tcp_worker_pause, and that callback only sets flb_downstream_pause on each listener. Unlike the single-worker path, no worker connection list is drained (tcp_conn_del / pending_close), so already-accepted sockets keep their event handlers and can continue ingesting records during backpressure pauses.

Useful? React with 👍 / 👎.

Comment thread plugins/in_forward/fw.c
Comment on lines +469 to +473
if (ctx->downstream != NULL) {
flb_downstream_pause(ctx->downstream);
ctx->is_paused = FLB_TRUE;
ctx->state = FW_INSTANCE_STATE_PAUSED;
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Drop worker Forward connections when pausing ingestion

In worker mode, the pause callback only marks each worker context as paused and pauses the downstream listener, but it never closes existing worker connections. Because those connections remain registered, established clients can continue sending traffic while paused, which breaks the plugin’s own pause contract to close active connections under backpressure.

Useful? React with 👍 / 👎.

Comment thread src/flb_downstream_worker.c Outdated
Comment on lines +195 to +199
runtime->workers[i].should_exit = FLB_TRUE;
if (runtime->workers[i].thread_created == FLB_TRUE) {
pthread_join(runtime->workers[i].thread, NULL);
}
downstream_worker_context_cleanup(&runtime->workers[i]);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Skip pthread cleanup for workers never initialized

Startup failure paths call flb_downstream_worker_runtime_stop, which currently destroys mutex/cond objects for every slot in worker_count. However, only workers that reached downstream_worker_context_reset were pthread-initialized; later slots were merely zeroed by calloc, so destroying them is undefined behavior and can crash when worker creation/init fails partway through startup.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 7

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
plugins/in_forward/fw.c (1)

159-172: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Move the trace log before flb_downstream_conn_release to avoid logging a closed file descriptor.

At line 168, connection->fd is dereferenced after flb_downstream_conn_release(connection) is called on line 167. While the connection struct remains in memory, prepare_destroy_conn closes the underlying socket via flb_socket_close(connection->fd), making the fd a closed resource. Logging the fd after closure is semantically incorrect and should be moved before the release.

Suggested fix
    if(ctx->is_paused) {
+       flb_plg_trace(ctx->ins, "TCP connection will be closed FD=%i", connection->fd);
        flb_downstream_conn_release(connection);
-       flb_plg_trace(ctx->ins, "TCP connection will be closed FD=%i", connection->fd);
        ctx->state = state_backup;

        return -1;
    }
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@plugins/in_forward/fw.c` around lines 159 - 172, The trace log is emitted
after flb_downstream_conn_release(connection) which can close connection->fd;
move the flb_plg_trace call so it runs before flb_downstream_conn_release to
avoid logging a closed FD. Specifically, in the branch that checks
ctx->is_paused, call flb_plg_trace(ctx->ins, "TCP connection will be closed
FD=%i", connection->fd) before flb_downstream_conn_release(connection), then set
ctx->state = state_backup and return -1, preserving the same behavior but
ensuring the FD is logged while still valid.
🧹 Nitpick comments (3)
tests/integration/scenarios/in_udp/tests/test_in_udp_001.py (2)

75-81: 💤 Low value

Minor: flattened_records() is evaluated twice per poll.

The lambda in wait_for_record_count calls self.flattened_records() twice on every poll cycle (once for the length check, once to return the list). For larger payload counts this doubles the iteration work. Caching the call result avoids it.

♻️ Optional refactor
     def wait_for_record_count(self, minimum_count, timeout=10):
+        def _check():
+            records = self.flattened_records()
+            return records if len(records) >= minimum_count else None
+
         return self.service.wait_for_condition(
-            lambda: self.flattened_records() if len(self.flattened_records()) >= minimum_count else None,
+            _check,
             timeout=timeout,
             interval=0.2,
             description=f"{minimum_count} forwarded UDP payloads",
         )
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@tests/integration/scenarios/in_udp/tests/test_in_udp_001.py` around lines 75
- 81, The lambda passed to wait_for_condition in wait_for_record_count calls
self.flattened_records() twice per poll, wasting work; modify the lambda so it
calls self.flattened_records() once (e.g., assign to a local variable like
records inside the lambda), then check len(records) against minimum_count and
return records when ready. Update the lambda used in wait_for_record_count to
reuse that single records value and keep the same timeout/interval/description
parameters for wait_for_condition.

148-174: ⚡ Quick win

The config correctly uses format: json to drop malformed datagrams.

The in_udp_json_workers.yaml file uses format: json (confirmed), which drops invalid records rather than forwarding them as fallback log lines. The test will work as intended—malformed datagrams are discarded and only valid records reach the assertion, so the KeyError risk is not present here.

That said, adding an explicit assert len(records) == valid_records check before the values assertion would provide a clearer failure message if the count unexpectedly diverges, improving test robustness.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@tests/integration/scenarios/in_udp/tests/test_in_udp_001.py` around lines 148
- 174, The test test_in_udp_workers_drop_malformed_datagrams_and_continue relies
on wait_for_record_count but lacks an explicit check that the number of records
equals valid_records; add an assertion like assert len(records) == valid_records
immediately after records = service.wait_for_record_count(...) to produce a
clearer failure message if counts differ before extracting values from records.
tests/integration/scenarios/in_forward/tests/test_in_forward_001.py (1)

424-432: ⚡ Quick win

Consolidate duplicate TCP-send helpers.

_drop_partial_tcp_payload and _drop_raw_tls_connection are byte-for-byte identical to the existing _send_tcp_payload — three copies of the same two-line helper. The differentiation only lives in the names (and _drop_raw_tls_connection's name is also misleading: it doesn't speak TLS, it just dumps raw bytes against a TLS port). Either drop the new helpers and call _send_tcp_payload at the test sites with a brief inline comment, or keep one helper whose name explains the negative-path intent.

♻️ Proposed consolidation
-def _drop_partial_tcp_payload(port, payload):
-    with socket.create_connection(("127.0.0.1", port), timeout=5) as sock:
-        sock.sendall(payload)
-
-
 def _send_unix_payload(path, payload):
@@
-def _drop_raw_tls_connection(port, payload):
-    with socket.create_connection(("127.0.0.1", port), timeout=5) as sock:
-        sock.sendall(payload)
-
-
 def _recv_msgpack_value(sock):

…and update the two call sites:

-                lambda _: _drop_partial_tcp_payload(service.flb_listener_port, b"\x93\xa4test"),
+                # Send a truncated msgpack array and close to exercise partial-payload handling.
+                lambda _: _send_tcp_payload(service.flb_listener_port, b"\x93\xa4test"),
@@
-                lambda i: _drop_raw_tls_connection(service.flb_listener_port,
-                                                   f"not-tls-{i}".encode("utf-8")),
+                # Send raw (non-TLS) bytes to a TLS port to exercise bad-handshake recovery.
+                lambda i: _send_tcp_payload(service.flb_listener_port,
+                                            f"not-tls-{i}".encode("utf-8")),

Also applies to: 448-451

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@tests/integration/scenarios/in_forward/tests/test_in_forward_001.py` around
lines 424 - 432, Duplicate two-line TCP helpers _drop_partial_tcp_payload and
_drop_raw_tls_connection should be consolidated with the existing
_send_tcp_payload (or replace all three with a single clearer-named helper like
_send_raw_bytes_or_abort) — remove the duplicate functions, update all call
sites that currently call _drop_partial_tcp_payload or _drop_raw_tls_connection
to call the chosen single helper (_send_tcp_payload or the new name), and add a
brief inline comment at those call sites indicating the negative-path intent
(e.g., "send raw bytes and close to simulate partial/drop"). Ensure references
to _send_tcp_payload, _drop_partial_tcp_payload and _drop_raw_tls_connection in
tests are updated accordingly.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@plugins/in_forward/fw_prot.c`:
- Around line 1140-1151: The error paths for metrics and traces call
cmt_decode_msgpack_destroy(cmt) unconditionally and can double-free when
ownership was transferred to the ingress queue; update both error branches (the
metrics path around the fw_ingest_metrics call and the traces path around the
corresponding fw_ingest_traces call) to only call
cmt_decode_msgpack_destroy(cmt) when conn->ctx->use_ingress_queue == FLB_FALSE
(matching the success path), ensuring that when ingress queue ownership is used
the ingress event/queue cleanup (e.g., flb_input_ingress_event_destroy) remains
responsible for freeing cmt.

In `@plugins/in_tcp/tcp_conn.c`:
- Around line 143-145: The tcp_ingest_logs() calls currently ignore its return
value and can drop records; update both call sites (the two tcp_ingest_logs(ctx,
ctx->log_encoder->output_buffer, ctx->log_encoder->output_length) invocations)
to check the returned status, log a failure with context (include the returned
error code and any relevant ctx identifiers), and propagate the error out of the
enclosing function instead of silently continuing—i.e., if tcp_ingest_logs()
returns non-zero, call the logger on ctx (or appropriate logger) with a
descriptive message and return/forward a non-success code (or set the enclosing
function’s error return) so upstream code can handle the failure.

In `@plugins/in_tcp/tcp.c`:
- Around line 206-220: The worker pause path (in_tcp_worker_pause) currently
only calls flb_downstream_pause(ctx->downstream) and fails to walk
ctx->connections to drain/close worker-owned client sessions; update
in_tcp_worker_pause (and the analogous block around lines 361-366) to iterate
ctx->connections and for each connection perform the same shutdown/close steps
the single-listener path uses (e.g., stop reads, flush/drain pending data and
then close/destroy the connection using the existing connection shutdown/close
helpers), ensuring connections cannot continue reading after pause.

In `@plugins/in_udp/udp.c`:
- Around line 88-110: The startup path creates ctx->dummy_conn via udp_conn_add
but if collector registration fails (flb_input_set_collector_socket or
flb_input_collector_get_event returns NULL) the current error path calls
udp_config_destroy(ctx) without first destroying the dummy connection; modify
the failure branches in the block that sets the collector (the code around
flb_input_set_collector_socket/in_udp_collect and the subsequent
ctx->collector_event check) to call in_udp_dummy_conn_destroy(ctx) (or otherwise
free ctx->dummy_conn) before returning/calling udp_config_destroy so the dummy
connection is cleaned up; apply the same fix to the similar failure path around
lines 288-291.

In `@src/flb_downstream_worker.c`:
- Around line 84-96: The loop is reading worker->should_exit while
flb_downstream_worker_runtime_stop() writes it from another thread, causing a
data race; change the shutdown flag to be synchronized (either replace
worker->should_exit with an atomic type and use atomic load/store, or guard
accesses with the worker runtime mutex), update the writer in
flb_downstream_worker_runtime_stop() to use the same atomic/store or mutex, and
also fix the other occurrence noted (the second read/write pair around the later
shutdown code) so all reads/writes to the shutdown flag use the same
synchronization primitive.
- Around line 147-180: The shutdown path calls
flb_downstream_worker_runtime_stop() which unconditionally runs
downstream_worker_context_cleanup() over all runtime->workers, potentially
destroying mutex/cond for slots never initialized; fix by tracking how many
slots were actually set up and only cleaning those: either set
runtime->worker_count (or a new field like runtime->active_workers) to i before
calling flb_downstream_worker_runtime_stop(), or change
flb_downstream_worker_runtime_stop() to inspect each
runtime->workers[k].thread_created or runtime->workers[k].initialized and skip
cleanup for entries that are false so downstream_worker_context_cleanup() is
only called for properly reset/initialized workers (references:
downstream_worker_context_reset, downstream_worker_context_cleanup,
flb_downstream_worker_runtime_stop, runtime->workers, thread_created,
initialized).

In `@src/http_server/flb_http_server.c`:
- Around line 483-486: flb_http_server_start() failure path leaks the
partially-initialized per-worker server because flb_http_server_worker_init()
only assigns *worker_context on success and the normal
flb_http_server_worker_exit() won't be invoked; before freeing context on start
failure, explicitly call the worker cleanup routine (flb_http_server_worker_exit
or the server-destroy helper that releases context->server) to tear down
context->server, then free context, and make the identical change for the other
symmetric failure block that also frees context after a failed
flb_http_server_start().

---

Outside diff comments:
In `@plugins/in_forward/fw.c`:
- Around line 159-172: The trace log is emitted after
flb_downstream_conn_release(connection) which can close connection->fd; move the
flb_plg_trace call so it runs before flb_downstream_conn_release to avoid
logging a closed FD. Specifically, in the branch that checks ctx->is_paused,
call flb_plg_trace(ctx->ins, "TCP connection will be closed FD=%i",
connection->fd) before flb_downstream_conn_release(connection), then set
ctx->state = state_backup and return -1, preserving the same behavior but
ensuring the FD is logged while still valid.

---

Nitpick comments:
In `@tests/integration/scenarios/in_forward/tests/test_in_forward_001.py`:
- Around line 424-432: Duplicate two-line TCP helpers _drop_partial_tcp_payload
and _drop_raw_tls_connection should be consolidated with the existing
_send_tcp_payload (or replace all three with a single clearer-named helper like
_send_raw_bytes_or_abort) — remove the duplicate functions, update all call
sites that currently call _drop_partial_tcp_payload or _drop_raw_tls_connection
to call the chosen single helper (_send_tcp_payload or the new name), and add a
brief inline comment at those call sites indicating the negative-path intent
(e.g., "send raw bytes and close to simulate partial/drop"). Ensure references
to _send_tcp_payload, _drop_partial_tcp_payload and _drop_raw_tls_connection in
tests are updated accordingly.

In `@tests/integration/scenarios/in_udp/tests/test_in_udp_001.py`:
- Around line 75-81: The lambda passed to wait_for_condition in
wait_for_record_count calls self.flattened_records() twice per poll, wasting
work; modify the lambda so it calls self.flattened_records() once (e.g., assign
to a local variable like records inside the lambda), then check len(records)
against minimum_count and return records when ready. Update the lambda used in
wait_for_record_count to reuse that single records value and keep the same
timeout/interval/description parameters for wait_for_condition.
- Around line 148-174: The test
test_in_udp_workers_drop_malformed_datagrams_and_continue relies on
wait_for_record_count but lacks an explicit check that the number of records
equals valid_records; add an assertion like assert len(records) == valid_records
immediately after records = service.wait_for_record_count(...) to produce a
clearer failure message if counts differ before extracting values from records.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 71fad1c8-6e23-49bd-8a3f-8580866af9a0

📥 Commits

Reviewing files that changed from the base of the PR and between 86ec64a and 9e576e9.

📒 Files selected for processing (26)
  • include/fluent-bit/flb_downstream_worker.h
  • include/fluent-bit/http_server/flb_http_server.h
  • plugins/in_forward/fw.c
  • plugins/in_forward/fw.h
  • plugins/in_forward/fw_config.c
  • plugins/in_forward/fw_prot.c
  • plugins/in_tcp/tcp.c
  • plugins/in_tcp/tcp.h
  • plugins/in_tcp/tcp_config.c
  • plugins/in_tcp/tcp_conn.c
  • plugins/in_udp/udp.c
  • plugins/in_udp/udp.h
  • plugins/in_udp/udp_config.c
  • plugins/in_udp/udp_conn.c
  • src/CMakeLists.txt
  • src/flb_downstream_worker.c
  • src/http_server/flb_http_server.c
  • tests/integration/scenarios/in_forward/config/in_forward_tls_workers.yaml
  • tests/integration/scenarios/in_forward/config/in_forward_workers.yaml
  • tests/integration/scenarios/in_forward/tests/test_in_forward_001.py
  • tests/integration/scenarios/in_tcp/config/in_tcp_parser_json_tls_workers.yaml
  • tests/integration/scenarios/in_tcp/config/in_tcp_parser_json_workers.yaml
  • tests/integration/scenarios/in_tcp/tests/test_in_tcp_001.py
  • tests/integration/scenarios/in_udp/config/in_udp_json_workers.yaml
  • tests/integration/scenarios/in_udp/config/in_udp_parser_json_workers.yaml
  • tests/integration/scenarios/in_udp/tests/test_in_udp_001.py

Comment thread plugins/in_forward/fw_prot.c
Comment thread plugins/in_tcp/tcp_conn.c Outdated
Comment thread plugins/in_tcp/tcp.c
Comment thread plugins/in_udp/udp.c
Comment thread src/flb_downstream_worker.c Outdated
Comment thread src/flb_downstream_worker.c
Comment thread src/http_server/flb_http_server.c
edsiper added 6 commits May 5, 2026 09:34
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@tests/integration/scenarios/in_forward/tests/test_in_forward_001.py`:
- Around line 619-620: The startup log wait call
service.wait_for_log_message("with 4 workers") (and the other similar
wait_for_log_message calls) must be moved inside the try/finally that surrounds
the test runtime so service.stop() always runs; modify each affected test to
enter the try block first, call wait_for_log_message(...) inside that try, then
keep service.stop() in the finally block (ensuring the same unique symbols
service.wait_for_log_message and service.stop are used and no early returns
occur before the finally).
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: de10bd2e-7153-43d6-96b3-d5d3bec44ca9

📥 Commits

Reviewing files that changed from the base of the PR and between 9e576e9 and 22a2ceb.

📒 Files selected for processing (10)
  • include/fluent-bit/flb_downstream_worker.h
  • plugins/in_forward/fw.c
  • plugins/in_forward/fw_prot.c
  • plugins/in_tcp/tcp.c
  • plugins/in_tcp/tcp_conn.c
  • plugins/in_udp/udp.c
  • src/flb_downstream_worker.c
  • src/http_server/flb_http_server.c
  • tests/integration/scenarios/in_forward/tests/test_in_forward_001.py
  • tests/integration/scenarios/in_udp/tests/test_in_udp_001.py
✅ Files skipped from review due to trivial changes (2)
  • include/fluent-bit/flb_downstream_worker.h
  • plugins/in_forward/fw.c
🚧 Files skipped from review as they are similar to previous changes (5)
  • plugins/in_forward/fw_prot.c
  • plugins/in_udp/udp.c
  • tests/integration/scenarios/in_udp/tests/test_in_udp_001.py
  • src/http_server/flb_http_server.c
  • plugins/in_tcp/tcp.c

Comment thread tests/integration/scenarios/in_forward/tests/test_in_forward_001.py Outdated
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (1)
tests/integration/scenarios/in_forward/tests/test_in_forward_001.py (1)

130-133: ⚡ Quick win

Use monotonic time for timeout loops to avoid clock-shift flakiness.

wait_for_log_message() uses wall-clock time (time.time()), which can make timeout behavior unstable if system time changes during test execution. Prefer time.monotonic() for deadline checks.

Suggested patch
 def wait_for_log_message(self, pattern, timeout=10, interval=0.25):
-    deadline = time.time() + timeout
-    while time.time() < deadline:
+    deadline = time.monotonic() + timeout
+    while time.monotonic() < deadline:
         if self.flb and self.flb.log_file and os.path.exists(self.flb.log_file):
             with open(self.flb.log_file, "r", encoding="utf-8", errors="replace") as log_file:
                 if pattern in log_file.read():
                     return True
         time.sleep(interval)

Also applies to: 137-137

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@tests/integration/scenarios/in_forward/tests/test_in_forward_001.py` around
lines 130 - 133, The timeout loop in wait_for_log_message() uses time.time(),
which can be shifted by system clock changes; change both occurrences (the
initial deadline calculation and the while condition) to use time.monotonic()
instead (e.g., set deadline = time.monotonic() + timeout and compare
time.monotonic() < deadline) so the deadline check is stable; update any other
timeout loops in the same test file that use time.time() in the same manner.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Nitpick comments:
In `@tests/integration/scenarios/in_forward/tests/test_in_forward_001.py`:
- Around line 130-133: The timeout loop in wait_for_log_message() uses
time.time(), which can be shifted by system clock changes; change both
occurrences (the initial deadline calculation and the while condition) to use
time.monotonic() instead (e.g., set deadline = time.monotonic() + timeout and
compare time.monotonic() < deadline) so the deadline check is stable; update any
other timeout loops in the same test file that use time.time() in the same
manner.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 5da3b860-4f7a-47e4-9009-3e87a525c860

📥 Commits

Reviewing files that changed from the base of the PR and between 22a2ceb and 4182c4b.

📒 Files selected for processing (1)
  • tests/integration/scenarios/in_forward/tests/test_in_forward_001.py

Copy link
Copy Markdown
Contributor

@cosmo0920 cosmo0920 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about adding the following MSVC specific option to get compiled with stdatomic.h on Windows?

if (MSVC)
  add_compile_options(/std:c11)
endif()

@edsiper
Copy link
Copy Markdown
Member Author

edsiper commented May 11, 2026

How about adding the following MSVC specific option to get compiled with stdatomic.h on Windows?

if (MSVC)
  add_compile_options(/std:c11)
endif()

thanks for the suggestion. Looking at the issue , there is the lack of a generic atomic API in Fluent Bit, however we already did this in CMetrics, so to avoid situations like this instead of tweaking the compiler options for windows a more general solution is to move that existing atomic API inside CFL, working on that in fluent/cfl#76 , once that is merged, I will upgrade the CFL version in this branch

edsiper added 3 commits May 11, 2026 19:16
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

♻️ Duplicate comments (1)
lib/cfl/.github/workflows/build.yaml (1)

20-20: ⚠️ Potential issue | 🔴 Critical | ⚡ Quick win

Verify that actions/checkout@v6 exists.

Similar to the packages workflow, this workflow bumps actions/checkout from v4 to v6. Please verify this version exists using the web search in the previous comment.

Also applies to: 36-36, 129-129, 145-145, 174-174, 200-200

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@lib/cfl/.github/workflows/build.yaml` at line 20, Confirm that the GitHub
Action reference "uses: actions/checkout@v6" is a valid published version in the
Actions marketplace; search for actions/checkout tags/releases and if v6 does
not exist replace the occurrences with a valid tag (e.g., actions/checkout@v4 or
the latest confirmed tag) or use the stable "actions/checkout@v3" pattern, then
update all instances of the string ("uses: actions/checkout@v6") found in this
workflow and the other occurrences noted (lines referenced as 36, 129, 145, 174,
200) so the workflow points to an existing, supported checkout action.
🧹 Nitpick comments (2)
lib/cfl/src/cfl_kvlist.c (1)

405-453: 💤 Low value

Cycle/aliasing prevention in cfl_kvlist_insert_s looks correct, but worth a brief comment block.

The four-step check (variant identity reachable from list, variant contains list, underlying array contained, underlying kvlist contained) is comprehensive and prevents both double-ownership of the same cfl_variant * and structural cycles. The first check (line 415) is doing alias prevention rather than cycle prevention — i.e., it rejects reinserting an already-owned variant, which previously could lead to double-free in cfl_kvlist_destroy. The container-specific checks (425/430) are needed because a fresh variant may wrap an already-nested array/kvlist.

Consider expanding the comment at line 419 to also mention the alias-prevention intent of the first check, since the rationale isn't obvious.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@lib/cfl/src/cfl_kvlist.c` around lines 405 - 453, In cfl_kvlist_insert_s,
expand the existing comment before the container checks to explicitly state that
the first check (cfl_container_kvlist_contains_variant(list, value)) is for
alias-prevention (rejecting reinsertion of an already-owned cfl_variant to avoid
double-free), while the subsequent checks
(cfl_container_variant_contains_kvlist(value, list),
cfl_container_kvlist_contains_array(list, value->data.as_array), and
cfl_container_kvlist_contains_kvlist(list, value->data.as_kvlist)) are for true
cycle detection of container-valued variants; update the comment block to
mention both purposes and reference these function names so readers understand
the distinction and rationale.
lib/cfl/src/cfl_sds.c (1)

186-258: 💤 Low value

Self-append detection via cross-object pointer arithmetic is implementation-defined.

The (uintptr_t)str vs (uintptr_t)s comparison at lines 221–224 only has well-defined semantics when both pointers refer to the same object. For unrelated allocations the result is implementation-defined, though it works reliably on flat-address platforms (all currently supported targets). Functionally safe because:

  • A false positive still resolves s + source_offset to the same address as str on flat-address systems, and the append_len - 1 > head->alloc - source_offset bound is enforced.
  • After cfl_sds_increase() potentially relocates the buffer, the offset is reapplied (line 244), preventing use-after-free on the stale str.

memmove correctly handles the overlap case. Worth a short comment noting the flat-address assumption for future maintainers.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@lib/cfl/src/cfl_sds.c` around lines 186 - 258, The self-append detection in
cfl_sds_cat uses cross-object pointer arithmetic (comparing buffer_addr and
source_addr) which is implementation-defined; update cfl_sds_cat to document
this by adding a short comment above the buffer_addr/source_addr check
(referencing buffer_addr, source_addr, source_in_buffer and the subsequent use
of cfl_sds_increase and s + source_offset) stating the code relies on
flat-address platforms and explaining why the current bounds check and
re-application of source_offset after cfl_sds_increase make this safe; no logic
changes required—just add the explanatory comment so future maintainers
understand the assumption and rationale.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@lib/cfl/tests/atomic_operations.c`:
- Around line 129-139: The thread creation loop does not handle CreateThread
failures and later blindly calls WaitForSingleObject/CloseHandle on possibly
NULL handles; modify the CreateThread loop that spawns THREAD_COUNT threads
(calling CreateThread with worker_thread_add_through_compare_exchange,
threads[], and thread_ids[]) to check the return value for NULL immediately and
record the failure (e.g., set a boolean or increment a failure counter and call
TEST_CHECK(FALSE) or similar). In the subsequent join loop that iterates
THREAD_COUNT and calls WaitForSingleObject/CloseHandle, only call those
functions for entries where threads[i] is non-NULL (valid handle); for any
failed slot ensure you surface a clear test failure via TEST_CHECK/result
handling instead of using the invalid handle. Ensure thread_ids handling is
consistent for both success and failure cases.

In `@lib/cfl/tests/kvlist.c`:
- Around line 1256-1263: The test currently calls tmpfile() and only uses
TEST_CHECK(fp != NULL) which doesn’t stop execution on failure, leading to a
possible NULL dereference in cfl_kvlist_print; update the test around tmpfile()
to fail fast if fp is NULL (e.g., after TEST_CHECK(fp != NULL) add an immediate
exit/return or TEST_FAIL/TEST_FAIL_MESSAGE) so that cfl_kvlist_print and
subsequent calls (cfl_kvlist_print, compare) are not invoked with a NULL FILE*.

In `@lib/cfl/tests/object.c`:
- Around line 104-111: The test currently calls tmpfile() and uses TEST_CHECK(fp
!= NULL) but continues execution, risking a NULL dereference when later calling
compare(fp, ...) or cfl_object_print; update each test block (around tmpfile()
usage and calls to cfl_object_print/compare) to check fp immediately after
tmpfile() and return or jump to the test cleanup when fp == NULL (e.g., early
return or goto cleanup) so no further operations use a NULL FILE*; ensure the
change is applied to all instances referenced (lines near the fp assignments at
the shown blocks).

---

Duplicate comments:
In `@lib/cfl/.github/workflows/build.yaml`:
- Line 20: Confirm that the GitHub Action reference "uses: actions/checkout@v6"
is a valid published version in the Actions marketplace; search for
actions/checkout tags/releases and if v6 does not exist replace the occurrences
with a valid tag (e.g., actions/checkout@v4 or the latest confirmed tag) or use
the stable "actions/checkout@v3" pattern, then update all instances of the
string ("uses: actions/checkout@v6") found in this workflow and the other
occurrences noted (lines referenced as 36, 129, 145, 174, 200) so the workflow
points to an existing, supported checkout action.

---

Nitpick comments:
In `@lib/cfl/src/cfl_kvlist.c`:
- Around line 405-453: In cfl_kvlist_insert_s, expand the existing comment
before the container checks to explicitly state that the first check
(cfl_container_kvlist_contains_variant(list, value)) is for alias-prevention
(rejecting reinsertion of an already-owned cfl_variant to avoid double-free),
while the subsequent checks (cfl_container_variant_contains_kvlist(value, list),
cfl_container_kvlist_contains_array(list, value->data.as_array), and
cfl_container_kvlist_contains_kvlist(list, value->data.as_kvlist)) are for true
cycle detection of container-valued variants; update the comment block to
mention both purposes and reference these function names so readers understand
the distinction and rationale.

In `@lib/cfl/src/cfl_sds.c`:
- Around line 186-258: The self-append detection in cfl_sds_cat uses
cross-object pointer arithmetic (comparing buffer_addr and source_addr) which is
implementation-defined; update cfl_sds_cat to document this by adding a short
comment above the buffer_addr/source_addr check (referencing buffer_addr,
source_addr, source_in_buffer and the subsequent use of cfl_sds_increase and s +
source_offset) stating the code relies on flat-address platforms and explaining
why the current bounds check and re-application of source_offset after
cfl_sds_increase make this safe; no logic changes required—just add the
explanatory comment so future maintainers understand the assumption and
rationale.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: ee9426b1-38d2-419b-9fdd-f55046c69ab6

📥 Commits

Reviewing files that changed from the base of the PR and between 4182c4b and bb5cba8.

📒 Files selected for processing (48)
  • include/fluent-bit/flb_downstream_worker.h
  • lib/cfl/.github/workflows/build.yaml
  • lib/cfl/.github/workflows/lint.yaml
  • lib/cfl/.github/workflows/packages.yaml
  • lib/cfl/AGENTS.md
  • lib/cfl/CMakeLists.txt
  • lib/cfl/README.md
  • lib/cfl/include/cfl/cfl.h
  • lib/cfl/include/cfl/cfl_array.h
  • lib/cfl/include/cfl/cfl_atomic.h
  • lib/cfl/include/cfl/cfl_checksum.h
  • lib/cfl/include/cfl/cfl_container.h
  • lib/cfl/include/cfl/cfl_kv.h
  • lib/cfl/include/cfl/cfl_kvlist.h
  • lib/cfl/include/cfl/cfl_list.h
  • lib/cfl/include/cfl/cfl_object.h
  • lib/cfl/include/cfl/cfl_sds.h
  • lib/cfl/include/cfl/cfl_time.h
  • lib/cfl/include/cfl/cfl_utils.h
  • lib/cfl/include/cfl/cfl_variant.h
  • lib/cfl/src/CMakeLists.txt
  • lib/cfl/src/cfl.c
  • lib/cfl/src/cfl_array.c
  • lib/cfl/src/cfl_atomic_clang.c
  • lib/cfl/src/cfl_atomic_gcc.c
  • lib/cfl/src/cfl_atomic_generic.c
  • lib/cfl/src/cfl_atomic_msvc.c
  • lib/cfl/src/cfl_checksum.c
  • lib/cfl/src/cfl_container.c
  • lib/cfl/src/cfl_kv.c
  • lib/cfl/src/cfl_kvlist.c
  • lib/cfl/src/cfl_object.c
  • lib/cfl/src/cfl_sds.c
  • lib/cfl/src/cfl_utils.c
  • lib/cfl/src/cfl_variant.c
  • lib/cfl/tests/CMakeLists.txt
  • lib/cfl/tests/array.c
  • lib/cfl/tests/atomic_operations.c
  • lib/cfl/tests/checksum.c
  • lib/cfl/tests/headers.c
  • lib/cfl/tests/kv.c
  • lib/cfl/tests/kvlist.c
  • lib/cfl/tests/object.c
  • lib/cfl/tests/sds.c
  • lib/cfl/tests/utils.c
  • lib/cfl/tests/variant.c
  • src/flb_downstream_worker.c
  • src/http_server/flb_http_server_http1.c
✅ Files skipped from review due to trivial changes (9)
  • lib/cfl/include/cfl/cfl_utils.h
  • lib/cfl/.github/workflows/lint.yaml
  • lib/cfl/include/cfl/cfl_kvlist.h
  • lib/cfl/CMakeLists.txt
  • lib/cfl/tests/headers.c
  • lib/cfl/AGENTS.md
  • lib/cfl/tests/checksum.c
  • lib/cfl/include/cfl/cfl_kv.h
  • lib/cfl/include/cfl/cfl_atomic.h
🚧 Files skipped from review as they are similar to previous changes (2)
  • include/fluent-bit/flb_downstream_worker.h
  • src/flb_downstream_worker.c

Comment on lines +129 to +139
for (thread_index = 0; thread_index < THREAD_COUNT; thread_index++) {
threads[thread_index] = CreateThread(NULL, 0,
worker_thread_add_through_compare_exchange,
NULL, 0, &thread_ids[thread_index]);
}

for (thread_index = 0; thread_index < THREAD_COUNT; thread_index++) {
result = WaitForSingleObject(threads[thread_index], INFINITE);
TEST_CHECK(result == WAIT_OBJECT_0);
CloseHandle(threads[thread_index]);
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Handle thread creation failures before join/wait.

Line 130 and Line 157 can fail, but the code still waits/joins every slot. That can hit invalid handles/IDs and crash the test path instead of reporting a clean assertion failure.

Suggested fix
@@
-    for (thread_index = 0; thread_index < THREAD_COUNT; thread_index++) {
+    int created = 0;
+    for (thread_index = 0; thread_index < THREAD_COUNT; thread_index++) {
         threads[thread_index] = CreateThread(NULL, 0,
                                              worker_thread_add_through_compare_exchange,
                                              NULL, 0, &thread_ids[thread_index]);
+        if (!TEST_CHECK(threads[thread_index] != NULL)) {
+            break;
+        }
+        created++;
     }
 
-    for (thread_index = 0; thread_index < THREAD_COUNT; thread_index++) {
+    for (thread_index = 0; thread_index < created; thread_index++) {
         result = WaitForSingleObject(threads[thread_index], INFINITE);
         TEST_CHECK(result == WAIT_OBJECT_0);
         CloseHandle(threads[thread_index]);
@@
-    for (thread_index = 0; thread_index < THREAD_COUNT; thread_index++) {
+    int created = 0;
+    for (thread_index = 0; thread_index < THREAD_COUNT; thread_index++) {
         result = pthread_create(&threads[thread_index], NULL,
                                 worker_thread_add_through_compare_exchange, NULL);
-        TEST_CHECK(result == 0);
+        if (!TEST_CHECK(result == 0)) {
+            break;
+        }
+        created++;
     }
 
-    for (thread_index = 0; thread_index < THREAD_COUNT; thread_index++) {
+    for (thread_index = 0; thread_index < created; thread_index++) {
         result = pthread_join(threads[thread_index], NULL);
         TEST_CHECK(result == 0);
     }

Also applies to: 156-165

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@lib/cfl/tests/atomic_operations.c` around lines 129 - 139, The thread
creation loop does not handle CreateThread failures and later blindly calls
WaitForSingleObject/CloseHandle on possibly NULL handles; modify the
CreateThread loop that spawns THREAD_COUNT threads (calling CreateThread with
worker_thread_add_through_compare_exchange, threads[], and thread_ids[]) to
check the return value for NULL immediately and record the failure (e.g., set a
boolean or increment a failure counter and call TEST_CHECK(FALSE) or similar).
In the subsequent join loop that iterates THREAD_COUNT and calls
WaitForSingleObject/CloseHandle, only call those functions for entries where
threads[i] is non-NULL (valid handle); for any failed slot ensure you surface a
clear test failure via TEST_CHECK/result handling instead of using the invalid
handle. Ensure thread_ids handling is consistent for both success and failure
cases.

Comment thread lib/cfl/tests/kvlist.c
Comment on lines +1256 to +1263
fp = tmpfile();
TEST_CHECK(fp != NULL);

ret = cfl_kvlist_print(fp, list);
TEST_CHECK(ret > 0);

ret = compare(fp, "{\"a\\\"b\\n\":\"v\\n\"}");
TEST_CHECK(ret == 0);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Fail fast after tmpfile() check to avoid possible NULL dereference.

TEST_CHECK(fp != NULL) alone does not stop execution. If tmpfile() fails, the subsequent print call can pass NULL fp.

Suggested fix
 static void print_escaped_keys()
 {
     int ret;
     FILE *fp;
     struct cfl_kvlist *list;

     list = cfl_kvlist_create();
-    TEST_CHECK(list != NULL);
+    if (!TEST_CHECK(list != NULL)) {
+        return;
+    }

     ret = cfl_kvlist_insert_string(list, "a\"b\n", "v\n");
     TEST_CHECK(ret == 0);

     fp = tmpfile();
-    TEST_CHECK(fp != NULL);
+    if (!TEST_CHECK(fp != NULL)) {
+        cfl_kvlist_destroy(list);
+        return;
+    }

     ret = cfl_kvlist_print(fp, list);
     TEST_CHECK(ret > 0);
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@lib/cfl/tests/kvlist.c` around lines 1256 - 1263, The test currently calls
tmpfile() and only uses TEST_CHECK(fp != NULL) which doesn’t stop execution on
failure, leading to a possible NULL dereference in cfl_kvlist_print; update the
test around tmpfile() to fail fast if fp is NULL (e.g., after TEST_CHECK(fp !=
NULL) add an immediate exit/return or TEST_FAIL/TEST_FAIL_MESSAGE) so that
cfl_kvlist_print and subsequent calls (cfl_kvlist_print, compare) are not
invoked with a NULL FILE*.

Comment thread lib/cfl/tests/object.c
Comment on lines +104 to +111
fp = tmpfile();
TEST_CHECK(fp != NULL);

ret = cfl_object_print(fp, object);
TEST_CHECK(ret == 0);

ret = compare(fp, "\"second\"\n");
TEST_CHECK(ret == 0);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Return early when tmpfile() fails before calling compare().

TEST_CHECK(fp != NULL) logs failure but execution continues. If tmpfile() returns NULL, later compare(fp, ...) dereferences a null FILE *.

Suggested fix pattern
     fp = tmpfile();
-    TEST_CHECK(fp != NULL);
+    if (!TEST_CHECK(fp != NULL)) {
+        cfl_object_destroy(object);
+        return;
+    }

     ret = cfl_object_print(fp, object);
     TEST_CHECK(ret == 0);

Also applies to: 141-148, 178-185, 217-224, 256-263, 295-302

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@lib/cfl/tests/object.c` around lines 104 - 111, The test currently calls
tmpfile() and uses TEST_CHECK(fp != NULL) but continues execution, risking a
NULL dereference when later calling compare(fp, ...) or cfl_object_print; update
each test block (around tmpfile() usage and calls to cfl_object_print/compare)
to check fp immediately after tmpfile() and return or jump to the test cleanup
when fp == NULL (e.g., early return or goto cleanup) so no further operations
use a NULL FILE*; ensure the change is applied to all instances referenced
(lines near the fp assignments at the shown blocks).

edsiper added 2 commits May 11, 2026 20:23
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (2)
src/flb_downstream_worker.c (1)

249-267: ⚡ Quick win

Shutdown can stall up to ~250 ms per worker; wake the event loop via the control pipe.

flb_downstream_worker_runtime_stop() sets should_exit and immediately pthread_join()s, but the worker thread only re-evaluates the flag after mk_event_wait_2(..., 250) returns. With many workers (and the runtime is now used by HTTP server, in_tcp, in_udp, in_forward) shutdown latency adds up linearly. Writing a byte to each worker's control channel before joining wakes the loop immediately, and the existing control-event handler already tolerates a no-op signal (callback is NULL).

♻️ Suggested change
 void flb_downstream_worker_runtime_stop(struct flb_downstream_worker_runtime *runtime)
 {
     int i;
+    uint64_t signal = 1;

     if (runtime == NULL) {
         return;
     }

     for (i = 0; i < runtime->active_workers; i++) {
         cfl_atomic_store(&runtime->workers[i].should_exit, FLB_TRUE);
+        if (runtime->workers[i].control_channel_created == FLB_TRUE) {
+            flb_pipe_w(runtime->workers[i].control_channel[1],
+                       &signal, sizeof(signal));
+        }
         if (runtime->workers[i].thread_created == FLB_TRUE) {
             pthread_join(runtime->workers[i].thread, NULL);
         }
         downstream_worker_context_cleanup(&runtime->workers[i]);
     }

     flb_free(runtime->workers);
     flb_free(runtime);
 }
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/flb_downstream_worker.c` around lines 249 - 267,
flb_downstream_worker_runtime_stop currently sets workers[i].should_exit and
immediately pthread_join() which can block until the worker's mk_event_wait_2
timeout; to fix this, after setting should_exit for each runtime->workers[i] and
before pthread_join, write a single byte to that worker's control
pipe/file-descriptor (the control FD the event loop watches) to wake the event
loop immediately (the control-event handler tolerates no-op signals), then
proceed with pthread_join and
downstream_worker_context_cleanup(&runtime->workers[i]).
include/fluent-bit/flb_downstream_worker.h (1)

49-70: 💤 Low value

Consider documenting the ownership/threading contract for struct flb_downstream_worker's public fields.

This is a public header used by in_tcp, in_udp, in_forward, and the HTTP server. The struct mixes fields the runtime owns and writes (e.g. event_loop, control_event, mutex, condition, should_exit, control_done, startup_result, initialized, thread_created, control_channel_created) with fields callbacks may legitimately read or write (context, parent, worker_id, worker_count). Without a short comment block, downstream plugins are likely to touch the wrong fields (e.g. mutate should_exit directly, read control_done without holding mutex, or write context from a foreach callback). A few /* runtime-owned, do not modify */ / /* set by cb_init only */ comments would prevent that without changing the API.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@include/fluent-bit/flb_downstream_worker.h` around lines 49 - 70, Add concise
ownership and threading comments to the public struct flb_downstream_worker to
clarify which fields are runtime-owned and which may be touched by
plugins/callbacks: annotate fields like event_loop, control_event, mutex,
condition, should_exit, control_done, startup_result, initialized,
thread_created, control_channel_created as "runtime-owned / do not modify" and
indicate that reads must be synchronized (e.g., hold mutex). Mark context,
parent, worker_id, worker_count and control_callback/control_data as "set by
cb_init / safe for callbacks" or "readable/writable by plugins" and document any
required synchronization (e.g., update only from the foreach callback or while
holding mutex). Ensure the comments sit right above or beside each group of
fields in struct flb_downstream_worker so downstream users can easily follow the
ownership/threading contract.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Nitpick comments:
In `@include/fluent-bit/flb_downstream_worker.h`:
- Around line 49-70: Add concise ownership and threading comments to the public
struct flb_downstream_worker to clarify which fields are runtime-owned and which
may be touched by plugins/callbacks: annotate fields like event_loop,
control_event, mutex, condition, should_exit, control_done, startup_result,
initialized, thread_created, control_channel_created as "runtime-owned / do not
modify" and indicate that reads must be synchronized (e.g., hold mutex). Mark
context, parent, worker_id, worker_count and control_callback/control_data as
"set by cb_init / safe for callbacks" or "readable/writable by plugins" and
document any required synchronization (e.g., update only from the foreach
callback or while holding mutex). Ensure the comments sit right above or beside
each group of fields in struct flb_downstream_worker so downstream users can
easily follow the ownership/threading contract.

In `@src/flb_downstream_worker.c`:
- Around line 249-267: flb_downstream_worker_runtime_stop currently sets
workers[i].should_exit and immediately pthread_join() which can block until the
worker's mk_event_wait_2 timeout; to fix this, after setting should_exit for
each runtime->workers[i] and before pthread_join, write a single byte to that
worker's control pipe/file-descriptor (the control FD the event loop watches) to
wake the event loop immediately (the control-event handler tolerates no-op
signals), then proceed with pthread_join and
downstream_worker_context_cleanup(&runtime->workers[i]).

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: c9470cca-e45f-4c78-a0d0-dc224c980c94

📥 Commits

Reviewing files that changed from the base of the PR and between bb5cba8 and 33d6543.

📒 Files selected for processing (3)
  • include/fluent-bit/flb_downstream_worker.h
  • plugins/in_forward/fw.c
  • src/flb_downstream_worker.c
🚧 Files skipped from review as they are similar to previous changes (1)
  • plugins/in_forward/fw.c

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add workers to tcp, udp, and forward input plugins

2 participants