in_forward: handle unpacker allocation failure #11809
Conversation
📝 WalkthroughWalkthroughRefactored the forward protocol parser to handle msgpack unpacker allocation failures explicitly and added a Linux-only integration regression test with helpers for payload crafting, subprocess orchestration, memory measurement, and log polling to reproduce and validate the failure and recovery. ChangesMsgpack Unpacker Allocation Failure Handling
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Tip 💬 Introducing Slack Agent: The best way for teams to turn conversations into code.Slack Agent is built on CodeRabbit's deep understanding of your code, so your team can collaborate across the entire SDLC without losing context.
Built for teams:
One agent for your entire SDLC. Right inside Slack. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@tests/integration/scenarios/in_forward/tests/fail_alloc_preload.c`:
- Around line 164-177: should_fail() is too broad and trips on any 2048‑byte
allocation; restrict it to only fail when the allocation originates from the
msgpack unpacker call-site used in fw_prot.c. Update should_fail() to check the
caller symbol (use dladdr on __builtin_return_address(0)) and only set
failed/record_failure if the caller name equals
"msgpack_unpacker_reserve_buffer" (or when the existing
FLB_TEST_FAIL_UNPACKER_RESERVE_FILE hook is the intended trigger), leaving the
existing guards failed, running_in_fluent_bit(), armed(), and target_size() in
place; reference the should_fail, msgpack_unpacker_reserve_buffer,
FLB_TEST_FAIL_UNPACKER_RESERVE_FILE, failed, running_in_fluent_bit, armed,
target_size, and record_failure identifiers when making the change.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: a172b744-d1ff-4626-95b1-0753494e6438
📒 Files selected for processing (4)
plugins/in_forward/CMakeLists.txtplugins/in_forward/fw_prot.ctests/integration/scenarios/in_forward/tests/fail_alloc_preload.ctests/integration/scenarios/in_forward/tests/test_in_forward_001.py
| static int should_fail(size_t size) | ||
| { | ||
| if (failed || !running_in_fluent_bit()) { | ||
| return 0; | ||
| } | ||
|
|
||
| if (size != target_size() || !armed()) { | ||
| return 0; | ||
| } | ||
|
|
||
| failed = 1; | ||
| record_failure(); | ||
|
|
||
| return 1; |
There was a problem hiding this comment.
Scope the injected failure more narrowly.
should_fail() currently trips on any 2048-byte malloc/realloc in the fluent-bit process once the arm file exists. An unrelated allocation can consume the one-shot failure before msgpack_unpacker_reserve_buffer() runs, so this regression can go flaky even though fw_prot.c already has a call-site-specific FLB_TEST_FAIL_UNPACKER_RESERVE_FILE hook.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@tests/integration/scenarios/in_forward/tests/fail_alloc_preload.c` around
lines 164 - 177, should_fail() is too broad and trips on any 2048‑byte
allocation; restrict it to only fail when the allocation originates from the
msgpack unpacker call-site used in fw_prot.c. Update should_fail() to check the
caller symbol (use dladdr on __builtin_return_address(0)) and only set
failed/record_failure if the caller name equals
"msgpack_unpacker_reserve_buffer" (or when the existing
FLB_TEST_FAIL_UNPACKER_RESERVE_FILE hook is the intended trigger), leaving the
existing guards failed, running_in_fluent_bit(), armed(), and target_size() in
place; reference the should_fail, msgpack_unpacker_reserve_buffer,
FLB_TEST_FAIL_UNPACKER_RESERVE_FILE, failed, running_in_fluent_bit, armed,
target_size, and record_failure identifiers when making the change.
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@tests/integration/scenarios/in_forward/tests/test_in_forward_001.py`:
- Around line 571-583: The test currently uses _send_tcp_payload(...) which
closes the client socket right after sendall(), so it never verifies that the
server closed the connection that hit the unpacker OOM path; replace that
initial _send_tcp_payload(_forward_allocation_repro_payload()) with code that
opens a TCP socket, connects to port, calls sendall(...) with
_forward_allocation_repro_payload(), and keeps that socket open; then wait for
EOF/reset on that same socket (e.g. recv() returning b'' or a connection-reset
exception) before calling _restore_process_data_limit(process.pid) and sending
the recovery payload (you can keep using _send_tcp_payload(...) for the
recovery); ensure the test asserts the socket closure on the same socket and
then uses _wait_for_log_text(log_file, "after-unpacker-nomem", process,
timeout=5) to confirm recovery. Also apply the same change to the other
occurrence around lines 757-776.
- Around line 451-454: _find_free_port currently binds and closes the ephemeral
socket before launching Fluent Bit, allowing a race where another process can
steal the port; change it to reserve the port by binding and returning the bound
socket (or a (sock, port) tuple) without closing it, update all callers
(including the startup code around the tests at lines ~558-569) to hold that
socket open across process startup and only close/release it after Fluent Bit
reports its listener has started; ensure callers extract the port number for
configuration but keep the socket alive until the process is confirmed
listening.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 3e237c05-41f3-4c6c-b5a3-561724519d55
📒 Files selected for processing (2)
plugins/in_forward/fw_prot.ctests/integration/scenarios/in_forward/tests/test_in_forward_001.py
| def _find_free_port(): | ||
| with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: | ||
| sock.bind(("127.0.0.1", 0)) | ||
| return sock.getsockname()[1] |
There was a problem hiding this comment.
Reserve the port through process startup.
_find_free_port() releases the ephemeral socket before Fluent Bit binds it, so another process can steal that port in the gap. In this flow that turns into a sporadic listener did not start result that has nothing to do with the unpacker-allocation path you're trying to exercise.
Also applies to: 558-569
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@tests/integration/scenarios/in_forward/tests/test_in_forward_001.py` around
lines 451 - 454, _find_free_port currently binds and closes the ephemeral socket
before launching Fluent Bit, allowing a race where another process can steal the
port; change it to reserve the port by binding and returning the bound socket
(or a (sock, port) tuple) without closing it, update all callers (including the
startup code around the tests at lines ~558-569) to hold that socket open across
process startup and only close/release it after Fluent Bit reports its listener
has started; ensure callers extract the port number for configuration but keep
the socket alive until the process is confirmed listening.
| _send_tcp_payload(port, _forward_allocation_repro_payload()) | ||
|
|
||
| deadline = time.time() + 3 | ||
| while time.time() < deadline: | ||
| log_text = read_file(log_file) | ||
| if "could not allocate msgpack unpacker buffer" in log_text: | ||
| _restore_process_data_limit(process.pid) | ||
| _send_tcp_payload( | ||
| port, | ||
| _message_mode_payload(TEST_TAG, {"message": "after-unpacker-nomem"}), | ||
| ) | ||
| if _wait_for_log_text(log_file, "after-unpacker-nomem", process, timeout=5): | ||
| return True, "triggered and recovered" |
There was a problem hiding this comment.
The regression never proves the failing connection was closed.
The repro payload is sent through _send_tcp_payload(), which closes the client socket immediately after sendall(). That means this test only proves Fluent Bit logs the allocation failure and later accepts a fresh connection; it never observes server-side closure of the connection that hit the unpacker OOM path, despite the test name claiming that coverage. Keep the repro socket open and assert EOF/reset on that same socket before doing the recovery send.
Also applies to: 757-776
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@tests/integration/scenarios/in_forward/tests/test_in_forward_001.py` around
lines 571 - 583, The test currently uses _send_tcp_payload(...) which closes the
client socket right after sendall(), so it never verifies that the server closed
the connection that hit the unpacker OOM path; replace that initial
_send_tcp_payload(_forward_allocation_repro_payload()) with code that opens a
TCP socket, connects to port, calls sendall(...) with
_forward_allocation_repro_payload(), and keeps that socket open; then wait for
EOF/reset on that same socket (e.g. recv() returning b'' or a connection-reset
exception) before calling _restore_process_data_limit(process.pid) and sending
the recovery payload (you can keep using _send_tcp_payload(...) for the
recovery); ensure the test asserts the socket closure on the same socket and
then uses _wait_for_log_text(log_file, "after-unpacker-nomem", process,
timeout=5) to confirm recovery. Also apply the same change to the other
occurrence around lines 757-776.
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@tests/integration/scenarios/in_forward/tests/test_in_forward_001.py`:
- Around line 496-505: The code opens a file handle (output = open(...)) and
then calls subprocess.Popen but never closes output if Popen raises; wrap the
Popen call in a try/except (or use contextlib.ExitStack) so that if
subprocess.Popen(...) in the block that creates process (calling
_fluent_bit_binary_path()) raises, you explicitly call output.close() before
re-raising the exception; reference the output variable and the subprocess.Popen
call (and the returned process variable) when making the change.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: d42cad69-37be-49df-a089-689295cfe8ee
📒 Files selected for processing (2)
plugins/in_forward/fw_prot.ctests/integration/scenarios/in_forward/tests/test_in_forward_001.py
🚧 Files skipped from review as they are similar to previous changes (1)
- plugins/in_forward/fw_prot.c
| output = open(log_file, "a", encoding="utf-8") | ||
| process = subprocess.Popen( | ||
| [_fluent_bit_binary_path(), "-c", str(config_file), "-l", str(log_file)], | ||
| stdout=output, | ||
| stderr=subprocess.STDOUT, | ||
| text=True, | ||
| preexec_fn=preexec_fn, | ||
| ) | ||
|
|
||
| return process, output |
There was a problem hiding this comment.
File handle leaked if Popen raises.
If subprocess.Popen() fails, the file handle opened on line 496 is never closed.
🛠️ Suggested fix
- output = open(log_file, "a", encoding="utf-8")
- process = subprocess.Popen(
- [_fluent_bit_binary_path(), "-c", str(config_file), "-l", str(log_file)],
- stdout=output,
- stderr=subprocess.STDOUT,
- text=True,
- preexec_fn=preexec_fn,
- )
-
- return process, output
+ output = open(log_file, "a", encoding="utf-8")
+ try:
+ process = subprocess.Popen(
+ [_fluent_bit_binary_path(), "-c", str(config_file), "-l", str(log_file)],
+ stdout=output,
+ stderr=subprocess.STDOUT,
+ text=True,
+ preexec_fn=preexec_fn,
+ )
+ except:
+ output.close()
+ raise
+
+ return process, output🧰 Tools
🪛 Ruff (0.15.12)
[error] 497-497: subprocess call: check for execution of untrusted input
(S603)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@tests/integration/scenarios/in_forward/tests/test_in_forward_001.py` around
lines 496 - 505, The code opens a file handle (output = open(...)) and then
calls subprocess.Popen but never closes output if Popen raises; wrap the Popen
call in a try/except (or use contextlib.ExitStack) so that if
subprocess.Popen(...) in the block that creates process (calling
_fluent_bit_binary_path()) raises, you explicitly call output.close() before
re-raising the exception; reference the output variable and the subprocess.Popen
call (and the returned process variable) when making the change.
Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.
Summary by CodeRabbit
Bug Fixes
Tests