Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 26 additions & 10 deletions plugins/in_forward/fw_prot.c
Original file line number Diff line number Diff line change
Expand Up @@ -1093,21 +1093,22 @@ static size_t receiver_recv(struct fw_conn *conn, char *buf, size_t try_size) {
return actual_size;
}

static size_t receiver_to_unpacker(struct fw_conn *conn, size_t request_size,
msgpack_unpacker *unpacker)
static int receiver_to_unpacker(struct fw_conn *conn, size_t request_size,
msgpack_unpacker *unpacker, size_t *recv_len)
{
size_t recv_len;
*recv_len = 0;

/* make sure there's enough room, or expand the unpacker accordingly */
if (msgpack_unpacker_buffer_capacity(unpacker) < request_size) {
msgpack_unpacker_reserve_buffer(unpacker, request_size);
assert(msgpack_unpacker_buffer_capacity(unpacker) >= request_size);
if (!msgpack_unpacker_reserve_buffer(unpacker, request_size)) {
return -1;
}
}
recv_len = receiver_recv(conn, msgpack_unpacker_buffer(unpacker),
request_size);
msgpack_unpacker_buffer_consumed(unpacker, recv_len);
*recv_len = receiver_recv(conn, msgpack_unpacker_buffer(unpacker),
request_size);
msgpack_unpacker_buffer_consumed(unpacker, *recv_len);

return recv_len;
return 0;
}

static int append_log(struct flb_input_instance *ins, struct fw_conn *conn,
Expand Down Expand Up @@ -1288,11 +1289,26 @@ int fw_prot_process(struct flb_input_instance *ins, struct fw_conn *conn)
}

unp = msgpack_unpacker_new(1024);
if (!unp) {
flb_plg_error(ctx->ins, "could not allocate msgpack unpacker");
flb_sds_destroy(out_tag);
return -1;
}

msgpack_unpacked_init(&result);
conn->rest = conn->buf_len;

while (1) {
recv_len = receiver_to_unpacker(conn, EACH_RECV_SIZE, unp);
ret = receiver_to_unpacker(conn, EACH_RECV_SIZE, unp, &recv_len);
if (ret == -1) {
flb_plg_error(ctx->ins, "could not allocate msgpack unpacker buffer");
msgpack_unpacked_destroy(&result);
msgpack_unpacker_free(unp);
flb_sds_destroy(out_tag);

return -1;
}

if (recv_len == 0) {
/* No more data */
msgpack_unpacker_free(unp);
Expand Down
228 changes: 219 additions & 9 deletions tests/integration/scenarios/in_forward/tests/test_in_forward_001.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
import socket
import ssl
import subprocess
import sys
import tempfile
import time
import uuid
from pathlib import Path

Expand Down Expand Up @@ -42,7 +44,7 @@
forward_server_stop,
)
from server.http_server import configure_http_response, data_storage, http_server_run
from utils.data_utils import read_json_file
from utils.data_utils import read_file, read_json_file
from utils.test_service import FluentBitTestService


Expand Down Expand Up @@ -124,6 +126,16 @@ def wait_for_record_count(self, minimum_count, timeout=10):
description=f"{minimum_count} forwarded forward records",
)

def wait_for_log_contains(self, text, timeout=10):
return self.service.wait_for_condition(
lambda: read_file(self.service.flb.log_file)
if text in read_file(self.service.flb.log_file)
else None,
timeout=timeout,
interval=0.5,
description=f"log text {text!r}",
)


class StorageLimitService(Service):
def __init__(self, config_file):
Expand Down Expand Up @@ -317,15 +329,16 @@ def _pack_ext(type_code, payload):
raise ValueError(f"Unsupported ext payload size {length}")


def _pack_array(items):
length = len(items)
def _pack_array_header(length):
if length <= 15:
prefix = bytes([0x90 | length])
elif length <= 0xFFFF:
prefix = b"\xDC" + length.to_bytes(2, "big")
else:
prefix = b"\xDD" + length.to_bytes(4, "big")
return prefix + b"".join(_pack_obj(item) for item in items)
return bytes([0x90 | length])
if length <= 0xFFFF:
return b"\xDC" + length.to_bytes(2, "big")
return b"\xDD" + length.to_bytes(4, "big")


def _pack_array(items):
return _pack_array_header(len(items)) + b"".join(_pack_obj(item) for item in items)


def _pack_map(mapping):
Expand Down Expand Up @@ -408,6 +421,181 @@ def _zstd_bytes(data):
return result.stdout


def _forward_allocation_repro_payload():
entry_count = 170
declared_entries = entry_count * 2
body = "R" * 200
payload = bytearray()

payload += _pack_array_header(2)
payload += _pack_str(TEST_TAG)
payload += _pack_array_header(declared_entries)

for i in range(entry_count):
payload += _pack_obj([TEST_TS + i, {"log": body}])

return bytes(payload[:29229]).ljust(29229, b"\x00")


def _process_status_kb(pid, key):
status_path = Path("/proc") / str(pid) / "status"
prefix = f"{key}:"

for line in status_path.read_text(encoding="utf-8").splitlines():
if line.startswith(prefix):
return int(line.split()[1])

raise RuntimeError(f"{key} not found in {status_path}")


def _find_free_port():
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.bind(("127.0.0.1", 0))
return sock.getsockname()[1]
Comment on lines +451 to +454
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Reserve the port through process startup.

_find_free_port() releases the ephemeral socket before Fluent Bit binds it, so another process can steal that port in the gap. In this flow that turns into a sporadic listener did not start result that has nothing to do with the unpacker-allocation path you're trying to exercise.

Also applies to: 558-569

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@tests/integration/scenarios/in_forward/tests/test_in_forward_001.py` around
lines 451 - 454, _find_free_port currently binds and closes the ephemeral socket
before launching Fluent Bit, allowing a race where another process can steal the
port; change it to reserve the port by binding and returning the bound socket
(or a (sock, port) tuple) without closing it, update all callers (including the
startup code around the tests at lines ~558-569) to hold that socket open across
process startup and only close/release it after Fluent Bit reports its listener
has started; ensure callers extract the port number for configuration but keep
the socket alive until the process is confirmed listening.



def _fluent_bit_binary_path():
if os.environ.get("FLUENT_BIT_BINARY"):
return os.environ["FLUENT_BIT_BINARY"]

return str(Path(__file__).resolve().parents[5] / "build" / "bin" / "fluent-bit")


def _write_forward_stdout_config(path, port):
path.write_text(
f"""[SERVICE]
Flush 1
Daemon Off
Log_Level info

[INPUT]
Name forward
Listen 127.0.0.1
Port {port}

[OUTPUT]
Name stdout
Match *
""",
encoding="utf-8",
)


def _start_fluent_bit_process(config_file, log_file, data_limit_kb=None):
preexec_fn = None

if data_limit_kb is not None:
def set_data_limit():
import resource

_, hard_limit = resource.getrlimit(resource.RLIMIT_DATA)
resource.setrlimit(resource.RLIMIT_DATA, (data_limit_kb * 1024, hard_limit))

preexec_fn = set_data_limit

output = open(log_file, "a", encoding="utf-8")
process = subprocess.Popen(
[_fluent_bit_binary_path(), "-c", str(config_file), "-l", str(log_file)],
stdout=output,
stderr=subprocess.STDOUT,
text=True,
preexec_fn=preexec_fn,
)

return process, output
Comment on lines +496 to +505
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

File handle leaked if Popen raises.

If subprocess.Popen() fails, the file handle opened on line 496 is never closed.

🛠️ Suggested fix
-    output = open(log_file, "a", encoding="utf-8")
-    process = subprocess.Popen(
-        [_fluent_bit_binary_path(), "-c", str(config_file), "-l", str(log_file)],
-        stdout=output,
-        stderr=subprocess.STDOUT,
-        text=True,
-        preexec_fn=preexec_fn,
-    )
-
-    return process, output
+    output = open(log_file, "a", encoding="utf-8")
+    try:
+        process = subprocess.Popen(
+            [_fluent_bit_binary_path(), "-c", str(config_file), "-l", str(log_file)],
+            stdout=output,
+            stderr=subprocess.STDOUT,
+            text=True,
+            preexec_fn=preexec_fn,
+        )
+    except:
+        output.close()
+        raise
+
+    return process, output
🧰 Tools
🪛 Ruff (0.15.12)

[error] 497-497: subprocess call: check for execution of untrusted input

(S603)

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@tests/integration/scenarios/in_forward/tests/test_in_forward_001.py` around
lines 496 - 505, The code opens a file handle (output = open(...)) and then
calls subprocess.Popen but never closes output if Popen raises; wrap the Popen
call in a try/except (or use contextlib.ExitStack) so that if
subprocess.Popen(...) in the block that creates process (calling
_fluent_bit_binary_path()) raises, you explicitly call output.close() before
re-raising the exception; reference the output variable and the subprocess.Popen
call (and the returned process variable) when making the change.



def _stop_fluent_bit_process(process, output):
try:
if process.poll() is None:
process.terminate()
try:
process.wait(timeout=5)
except subprocess.TimeoutExpired:
process.kill()
process.wait(timeout=5)
finally:
output.close()


def _wait_for_log_text(log_file, text, process, timeout=10):
deadline = time.time() + timeout

while time.time() < deadline:
if log_file.exists() and text in read_file(log_file):
return True
if process.poll() is not None:
return False
time.sleep(0.1)

return False


def _measure_forward_vmdata_kb(tmp_path):
port = _find_free_port()
config_file = tmp_path / "measure-forward.conf"
log_file = tmp_path / "measure-forward.log"
_write_forward_stdout_config(config_file, port)

process, output = _start_fluent_bit_process(config_file, log_file)
try:
assert _wait_for_log_text(log_file, f"listening on 127.0.0.1:{port}", process)
return _process_status_kb(process.pid, "VmData")
finally:
_stop_fluent_bit_process(process, output)


def _restore_process_data_limit(pid):
try:
import resource

_, hard_limit = resource.prlimit(pid, resource.RLIMIT_DATA)
resource.prlimit(pid, resource.RLIMIT_DATA, (hard_limit, hard_limit))
except ProcessLookupError:
pass


def _run_forward_repro_attempt(tmp_path, data_limit_kb):
port = _find_free_port()
attempt_dir = tmp_path / f"data-limit-{data_limit_kb}"
attempt_dir.mkdir()
config_file = attempt_dir / "fluent-bit.conf"
log_file = attempt_dir / "fluent-bit.log"
_write_forward_stdout_config(config_file, port)

process, output = _start_fluent_bit_process(config_file, log_file, data_limit_kb)
try:
if not _wait_for_log_text(log_file, f"listening on 127.0.0.1:{port}", process):
return False, "listener did not start"

_send_tcp_payload(port, _forward_allocation_repro_payload())

deadline = time.time() + 3
while time.time() < deadline:
log_text = read_file(log_file)
if "could not allocate msgpack unpacker buffer" in log_text:
_restore_process_data_limit(process.pid)
_send_tcp_payload(
port,
_message_mode_payload(TEST_TAG, {"message": "after-unpacker-nomem"}),
)
if _wait_for_log_text(log_file, "after-unpacker-nomem", process, timeout=5):
return True, "triggered and recovered"
Comment on lines +571 to +583
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

The regression never proves the failing connection was closed.

The repro payload is sent through _send_tcp_payload(), which closes the client socket immediately after sendall(). That means this test only proves Fluent Bit logs the allocation failure and later accepts a fresh connection; it never observes server-side closure of the connection that hit the unpacker OOM path, despite the test name claiming that coverage. Keep the repro socket open and assert EOF/reset on that same socket before doing the recovery send.

Also applies to: 757-776

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@tests/integration/scenarios/in_forward/tests/test_in_forward_001.py` around
lines 571 - 583, The test currently uses _send_tcp_payload(...) which closes the
client socket right after sendall(), so it never verifies that the server closed
the connection that hit the unpacker OOM path; replace that initial
_send_tcp_payload(_forward_allocation_repro_payload()) with code that opens a
TCP socket, connects to port, calls sendall(...) with
_forward_allocation_repro_payload(), and keeps that socket open; then wait for
EOF/reset on that same socket (e.g. recv() returning b'' or a connection-reset
exception) before calling _restore_process_data_limit(process.pid) and sending
the recovery payload (you can keep using _send_tcp_payload(...) for the
recovery); ensure the test asserts the socket closure on the same socket and
then uses _wait_for_log_text(log_file, "after-unpacker-nomem", process,
timeout=5) to confirm recovery. Also apply the same change to the other
occurrence around lines 757-776.

return False, "triggered but did not recover"

if "fw_conn.c" in log_text and "Cannot allocate memory" in log_text:
return False, "connection allocation failed"

if process.poll() is not None:
return False, f"process exited with {process.returncode}"

time.sleep(0.1)

return False, "allocation failure was not triggered"
finally:
_stop_fluent_bit_process(process, output)


def _send_tcp_payload(port, payload):
with socket.create_connection(("127.0.0.1", port), timeout=5) as sock:
sock.sendall(payload)
Expand Down Expand Up @@ -566,6 +754,28 @@ def test_in_forward_message_mode_partial_tcp_writes():
assert records[0]["message"] == "partial"


@pytest.mark.skipif(sys.platform != "linux", reason="process resource limits are Linux-only")
def test_in_forward_repro_payload_allocation_failure_closes_connection(tmp_path):
if os.environ.get("VALGRIND"):
pytest.skip("process resource limits do not give a deterministic failure under Valgrind")

baseline_kb = _measure_forward_vmdata_kb(tmp_path)
attempts = []

for delta_kb in [
512, 640, 768, 896, 960, 992, 1008, 1016, 1024, 1040, 1088, 1152, 1280,
1536, 2048,
]:
data_limit_kb = baseline_kb + delta_kb
matched, status = _run_forward_repro_attempt(tmp_path, data_limit_kb)
attempts.append(f"{data_limit_kb} KiB: {status}")

if matched:
return

pytest.fail("could not trigger allocation failure; attempts: " + "; ".join(attempts))


def test_in_forward_message_mode_eventtime_ext():
service = Service("in_forward.yaml")
service.start()
Expand Down
Loading