Skip to content

in_forward: handle unpacker allocation failure #11809

Merged
edsiper merged 2 commits into
masterfrom
security-forward
May 16, 2026
Merged

in_forward: handle unpacker allocation failure #11809
edsiper merged 2 commits into
masterfrom
security-forward

Conversation

@edsiper
Copy link
Copy Markdown
Member

@edsiper edsiper commented May 15, 2026


Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.

Summary by CodeRabbit

  • Bug Fixes

    • Improved handling when unpacker buffer allocation fails: errors are logged, resources are cleaned up, the connection is closed, and the process can recover without crashing.
  • Tests

    • Added Linux-only regression test that reproduces forward input allocation failures, verifies log messages and recovery, and includes helpers to poll logs and craft/send repro payloads.

Review Change Stack

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented May 15, 2026

📝 Walkthrough

Walkthrough

Refactored the forward protocol parser to handle msgpack unpacker allocation failures explicitly and added a Linux-only integration regression test with helpers for payload crafting, subprocess orchestration, memory measurement, and log polling to reproduce and validate the failure and recovery.

Changes

Msgpack Unpacker Allocation Failure Handling

Layer / File(s) Summary
Forward protocol error handling for allocation failures
plugins/in_forward/fw_prot.c
receiver_to_unpacker() refactored to return int with an added recv_len out-parameter, checks unpacker buffer reservation and returns -1 on reserve failure. fw_prot_process() now checks msgpack_unpacker_new() for NULL, handles receiver_to_unpacker() returning -1 with logging and proper cleanup.
Test imports and log polling
tests/integration/scenarios/in_forward/tests/test_in_forward_001.py
Added imports (sys, tempfile, time) and read_file import; introduced Service.wait_for_log_contains() to poll Fluent Bit logs for expected substrings.
MessagePack helpers and subprocess orchestration
tests/integration/scenarios/in_forward/tests/test_in_forward_001.py
Extracted _pack_array_header(length) and updated _pack_array(items); added helpers to build a crafted forward payload, find a free port, locate the Fluent Bit binary, write a minimal forward→stdout config, start/stop Fluent Bit with optional RLIMIT_DATA, measure /proc VmData, restore data limits, and wait for log markers.
Integration regression test for allocation failures
tests/integration/scenarios/in_forward/tests/test_in_forward_001.py
Added Linux-only test test_in_forward_repro_payload_allocation_failure_closes_connection() (skipped under Valgrind) that iterates data-limit deltas to trigger unpacker allocation failures, asserts the failure log, restores limits, sends a valid follow-up message, and confirms recovery.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

  • fluent/fluent-bit#11442: Overlapping changes to fw_prot_process() error-handling flow and unpacker state management.

Suggested reviewers

  • cosmo0920
  • koleini
  • fujimotos

Poem

🐰 I nudged a tiny buffer tight,

A hop, a log, a failing bite,
We catch the error, mend the seam,
Send one more message, hear the stream—
The forward dance resumes its light.

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'in_forward: handle unpacker allocation failure' directly and clearly summarizes the main objective of the pull request—adding proper error handling for msgpack unpacker allocation failures in the forward plugin.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch security-forward

Tip

💬 Introducing Slack Agent: The best way for teams to turn conversations into code.

Slack Agent is built on CodeRabbit's deep understanding of your code, so your team can collaborate across the entire SDLC without losing context.

  • Generate code and open pull requests
  • Plan features and break down work
  • Investigate incidents and troubleshoot customer tickets together
  • Automate recurring tasks and respond to alerts with triggers
  • Summarize progress and report instantly

Built for teams:

  • Shared memory across your entire org—no repeating context
  • Per-thread sandboxes to safely plan and execute work
  • Governance built-in—scoped access, auditability, and budget controls

One agent for your entire SDLC. Right inside Slack.

👉 Get started


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@tests/integration/scenarios/in_forward/tests/fail_alloc_preload.c`:
- Around line 164-177: should_fail() is too broad and trips on any 2048‑byte
allocation; restrict it to only fail when the allocation originates from the
msgpack unpacker call-site used in fw_prot.c. Update should_fail() to check the
caller symbol (use dladdr on __builtin_return_address(0)) and only set
failed/record_failure if the caller name equals
"msgpack_unpacker_reserve_buffer" (or when the existing
FLB_TEST_FAIL_UNPACKER_RESERVE_FILE hook is the intended trigger), leaving the
existing guards failed, running_in_fluent_bit(), armed(), and target_size() in
place; reference the should_fail, msgpack_unpacker_reserve_buffer,
FLB_TEST_FAIL_UNPACKER_RESERVE_FILE, failed, running_in_fluent_bit, armed,
target_size, and record_failure identifiers when making the change.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: a172b744-d1ff-4626-95b1-0753494e6438

📥 Commits

Reviewing files that changed from the base of the PR and between 8627815 and f2d9c3f.

📒 Files selected for processing (4)
  • plugins/in_forward/CMakeLists.txt
  • plugins/in_forward/fw_prot.c
  • tests/integration/scenarios/in_forward/tests/fail_alloc_preload.c
  • tests/integration/scenarios/in_forward/tests/test_in_forward_001.py

Comment on lines +164 to +177
static int should_fail(size_t size)
{
if (failed || !running_in_fluent_bit()) {
return 0;
}

if (size != target_size() || !armed()) {
return 0;
}

failed = 1;
record_failure();

return 1;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | 🏗️ Heavy lift

Scope the injected failure more narrowly.

should_fail() currently trips on any 2048-byte malloc/realloc in the fluent-bit process once the arm file exists. An unrelated allocation can consume the one-shot failure before msgpack_unpacker_reserve_buffer() runs, so this regression can go flaky even though fw_prot.c already has a call-site-specific FLB_TEST_FAIL_UNPACKER_RESERVE_FILE hook.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@tests/integration/scenarios/in_forward/tests/fail_alloc_preload.c` around
lines 164 - 177, should_fail() is too broad and trips on any 2048‑byte
allocation; restrict it to only fail when the allocation originates from the
msgpack unpacker call-site used in fw_prot.c. Update should_fail() to check the
caller symbol (use dladdr on __builtin_return_address(0)) and only set
failed/record_failure if the caller name equals
"msgpack_unpacker_reserve_buffer" (or when the existing
FLB_TEST_FAIL_UNPACKER_RESERVE_FILE hook is the intended trigger), leaving the
existing guards failed, running_in_fluent_bit(), armed(), and target_size() in
place; reference the should_fail, msgpack_unpacker_reserve_buffer,
FLB_TEST_FAIL_UNPACKER_RESERVE_FILE, failed, running_in_fluent_bit, armed,
target_size, and record_failure identifiers when making the change.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@tests/integration/scenarios/in_forward/tests/test_in_forward_001.py`:
- Around line 571-583: The test currently uses _send_tcp_payload(...) which
closes the client socket right after sendall(), so it never verifies that the
server closed the connection that hit the unpacker OOM path; replace that
initial _send_tcp_payload(_forward_allocation_repro_payload()) with code that
opens a TCP socket, connects to port, calls sendall(...) with
_forward_allocation_repro_payload(), and keeps that socket open; then wait for
EOF/reset on that same socket (e.g. recv() returning b'' or a connection-reset
exception) before calling _restore_process_data_limit(process.pid) and sending
the recovery payload (you can keep using _send_tcp_payload(...) for the
recovery); ensure the test asserts the socket closure on the same socket and
then uses _wait_for_log_text(log_file, "after-unpacker-nomem", process,
timeout=5) to confirm recovery. Also apply the same change to the other
occurrence around lines 757-776.
- Around line 451-454: _find_free_port currently binds and closes the ephemeral
socket before launching Fluent Bit, allowing a race where another process can
steal the port; change it to reserve the port by binding and returning the bound
socket (or a (sock, port) tuple) without closing it, update all callers
(including the startup code around the tests at lines ~558-569) to hold that
socket open across process startup and only close/release it after Fluent Bit
reports its listener has started; ensure callers extract the port number for
configuration but keep the socket alive until the process is confirmed
listening.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 3e237c05-41f3-4c6c-b5a3-561724519d55

📥 Commits

Reviewing files that changed from the base of the PR and between f2d9c3f and a8405af.

📒 Files selected for processing (2)
  • plugins/in_forward/fw_prot.c
  • tests/integration/scenarios/in_forward/tests/test_in_forward_001.py

Comment on lines +451 to +454
def _find_free_port():
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.bind(("127.0.0.1", 0))
return sock.getsockname()[1]
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Reserve the port through process startup.

_find_free_port() releases the ephemeral socket before Fluent Bit binds it, so another process can steal that port in the gap. In this flow that turns into a sporadic listener did not start result that has nothing to do with the unpacker-allocation path you're trying to exercise.

Also applies to: 558-569

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@tests/integration/scenarios/in_forward/tests/test_in_forward_001.py` around
lines 451 - 454, _find_free_port currently binds and closes the ephemeral socket
before launching Fluent Bit, allowing a race where another process can steal the
port; change it to reserve the port by binding and returning the bound socket
(or a (sock, port) tuple) without closing it, update all callers (including the
startup code around the tests at lines ~558-569) to hold that socket open across
process startup and only close/release it after Fluent Bit reports its listener
has started; ensure callers extract the port number for configuration but keep
the socket alive until the process is confirmed listening.

Comment on lines +571 to +583
_send_tcp_payload(port, _forward_allocation_repro_payload())

deadline = time.time() + 3
while time.time() < deadline:
log_text = read_file(log_file)
if "could not allocate msgpack unpacker buffer" in log_text:
_restore_process_data_limit(process.pid)
_send_tcp_payload(
port,
_message_mode_payload(TEST_TAG, {"message": "after-unpacker-nomem"}),
)
if _wait_for_log_text(log_file, "after-unpacker-nomem", process, timeout=5):
return True, "triggered and recovered"
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

The regression never proves the failing connection was closed.

The repro payload is sent through _send_tcp_payload(), which closes the client socket immediately after sendall(). That means this test only proves Fluent Bit logs the allocation failure and later accepts a fresh connection; it never observes server-side closure of the connection that hit the unpacker OOM path, despite the test name claiming that coverage. Keep the repro socket open and assert EOF/reset on that same socket before doing the recovery send.

Also applies to: 757-776

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@tests/integration/scenarios/in_forward/tests/test_in_forward_001.py` around
lines 571 - 583, The test currently uses _send_tcp_payload(...) which closes the
client socket right after sendall(), so it never verifies that the server closed
the connection that hit the unpacker OOM path; replace that initial
_send_tcp_payload(_forward_allocation_repro_payload()) with code that opens a
TCP socket, connects to port, calls sendall(...) with
_forward_allocation_repro_payload(), and keeps that socket open; then wait for
EOF/reset on that same socket (e.g. recv() returning b'' or a connection-reset
exception) before calling _restore_process_data_limit(process.pid) and sending
the recovery payload (you can keep using _send_tcp_payload(...) for the
recovery); ensure the test asserts the socket closure on the same socket and
then uses _wait_for_log_text(log_file, "after-unpacker-nomem", process,
timeout=5) to confirm recovery. Also apply the same change to the other
occurrence around lines 757-776.

edsiper added 2 commits May 15, 2026 15:14
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@tests/integration/scenarios/in_forward/tests/test_in_forward_001.py`:
- Around line 496-505: The code opens a file handle (output = open(...)) and
then calls subprocess.Popen but never closes output if Popen raises; wrap the
Popen call in a try/except (or use contextlib.ExitStack) so that if
subprocess.Popen(...) in the block that creates process (calling
_fluent_bit_binary_path()) raises, you explicitly call output.close() before
re-raising the exception; reference the output variable and the subprocess.Popen
call (and the returned process variable) when making the change.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: d42cad69-37be-49df-a089-689295cfe8ee

📥 Commits

Reviewing files that changed from the base of the PR and between a8405af and 99d2aa9.

📒 Files selected for processing (2)
  • plugins/in_forward/fw_prot.c
  • tests/integration/scenarios/in_forward/tests/test_in_forward_001.py
🚧 Files skipped from review as they are similar to previous changes (1)
  • plugins/in_forward/fw_prot.c

Comment on lines +496 to +505
output = open(log_file, "a", encoding="utf-8")
process = subprocess.Popen(
[_fluent_bit_binary_path(), "-c", str(config_file), "-l", str(log_file)],
stdout=output,
stderr=subprocess.STDOUT,
text=True,
preexec_fn=preexec_fn,
)

return process, output
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

File handle leaked if Popen raises.

If subprocess.Popen() fails, the file handle opened on line 496 is never closed.

🛠️ Suggested fix
-    output = open(log_file, "a", encoding="utf-8")
-    process = subprocess.Popen(
-        [_fluent_bit_binary_path(), "-c", str(config_file), "-l", str(log_file)],
-        stdout=output,
-        stderr=subprocess.STDOUT,
-        text=True,
-        preexec_fn=preexec_fn,
-    )
-
-    return process, output
+    output = open(log_file, "a", encoding="utf-8")
+    try:
+        process = subprocess.Popen(
+            [_fluent_bit_binary_path(), "-c", str(config_file), "-l", str(log_file)],
+            stdout=output,
+            stderr=subprocess.STDOUT,
+            text=True,
+            preexec_fn=preexec_fn,
+        )
+    except:
+        output.close()
+        raise
+
+    return process, output
🧰 Tools
🪛 Ruff (0.15.12)

[error] 497-497: subprocess call: check for execution of untrusted input

(S603)

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@tests/integration/scenarios/in_forward/tests/test_in_forward_001.py` around
lines 496 - 505, The code opens a file handle (output = open(...)) and then
calls subprocess.Popen but never closes output if Popen raises; wrap the Popen
call in a try/except (or use contextlib.ExitStack) so that if
subprocess.Popen(...) in the block that creates process (calling
_fluent_bit_binary_path()) raises, you explicitly call output.close() before
re-raising the exception; reference the output variable and the subprocess.Popen
call (and the returned process variable) when making the change.

@edsiper edsiper merged commit 049a774 into master May 16, 2026
40 of 50 checks passed
@edsiper edsiper deleted the security-forward branch May 16, 2026 03:18
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant