From d7fa2b4d84ba12a6b8fcf0fc4530e6ef06bdbe09 Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Fri, 15 May 2026 15:14:12 -0600 Subject: [PATCH 1/2] in_forward: handle unpacker allocation failure Signed-off-by: Eduardo Silva --- plugins/in_forward/fw_prot.c | 36 ++++++++++++++++++++++++++---------- 1 file changed, 26 insertions(+), 10 deletions(-) diff --git a/plugins/in_forward/fw_prot.c b/plugins/in_forward/fw_prot.c index 844c8494e66..c7f6f8ed125 100644 --- a/plugins/in_forward/fw_prot.c +++ b/plugins/in_forward/fw_prot.c @@ -1093,21 +1093,22 @@ static size_t receiver_recv(struct fw_conn *conn, char *buf, size_t try_size) { return actual_size; } -static size_t receiver_to_unpacker(struct fw_conn *conn, size_t request_size, - msgpack_unpacker *unpacker) +static int receiver_to_unpacker(struct fw_conn *conn, size_t request_size, + msgpack_unpacker *unpacker, size_t *recv_len) { - size_t recv_len; + *recv_len = 0; /* make sure there's enough room, or expand the unpacker accordingly */ if (msgpack_unpacker_buffer_capacity(unpacker) < request_size) { - msgpack_unpacker_reserve_buffer(unpacker, request_size); - assert(msgpack_unpacker_buffer_capacity(unpacker) >= request_size); + if (!msgpack_unpacker_reserve_buffer(unpacker, request_size)) { + return -1; + } } - recv_len = receiver_recv(conn, msgpack_unpacker_buffer(unpacker), - request_size); - msgpack_unpacker_buffer_consumed(unpacker, recv_len); + *recv_len = receiver_recv(conn, msgpack_unpacker_buffer(unpacker), + request_size); + msgpack_unpacker_buffer_consumed(unpacker, *recv_len); - return recv_len; + return 0; } static int append_log(struct flb_input_instance *ins, struct fw_conn *conn, @@ -1288,11 +1289,26 @@ int fw_prot_process(struct flb_input_instance *ins, struct fw_conn *conn) } unp = msgpack_unpacker_new(1024); + if (!unp) { + flb_plg_error(ctx->ins, "could not allocate msgpack unpacker"); + flb_sds_destroy(out_tag); + return -1; + } + msgpack_unpacked_init(&result); conn->rest = conn->buf_len; while (1) { - recv_len = receiver_to_unpacker(conn, EACH_RECV_SIZE, unp); + ret = receiver_to_unpacker(conn, EACH_RECV_SIZE, unp, &recv_len); + if (ret == -1) { + flb_plg_error(ctx->ins, "could not allocate msgpack unpacker buffer"); + msgpack_unpacked_destroy(&result); + msgpack_unpacker_free(unp); + flb_sds_destroy(out_tag); + + return -1; + } + if (recv_len == 0) { /* No more data */ msgpack_unpacker_free(unp); From 99d2aa946becbd8a079b208b7eb56c0ad068996b Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Fri, 15 May 2026 15:14:16 -0600 Subject: [PATCH 2/2] tests: integration: cover forward allocation failure handling Signed-off-by: Eduardo Silva --- .../in_forward/tests/test_in_forward_001.py | 228 +++++++++++++++++- 1 file changed, 219 insertions(+), 9 deletions(-) diff --git a/tests/integration/scenarios/in_forward/tests/test_in_forward_001.py b/tests/integration/scenarios/in_forward/tests/test_in_forward_001.py index 167f86eb5a5..92ce2940b77 100644 --- a/tests/integration/scenarios/in_forward/tests/test_in_forward_001.py +++ b/tests/integration/scenarios/in_forward/tests/test_in_forward_001.py @@ -6,7 +6,9 @@ import socket import ssl import subprocess +import sys import tempfile +import time import uuid from pathlib import Path @@ -42,7 +44,7 @@ forward_server_stop, ) from server.http_server import configure_http_response, data_storage, http_server_run -from utils.data_utils import read_json_file +from utils.data_utils import read_file, read_json_file from utils.test_service import FluentBitTestService @@ -124,6 +126,16 @@ def wait_for_record_count(self, minimum_count, timeout=10): description=f"{minimum_count} forwarded forward records", ) + def wait_for_log_contains(self, text, timeout=10): + return self.service.wait_for_condition( + lambda: read_file(self.service.flb.log_file) + if text in read_file(self.service.flb.log_file) + else None, + timeout=timeout, + interval=0.5, + description=f"log text {text!r}", + ) + class StorageLimitService(Service): def __init__(self, config_file): @@ -317,15 +329,16 @@ def _pack_ext(type_code, payload): raise ValueError(f"Unsupported ext payload size {length}") -def _pack_array(items): - length = len(items) +def _pack_array_header(length): if length <= 15: - prefix = bytes([0x90 | length]) - elif length <= 0xFFFF: - prefix = b"\xDC" + length.to_bytes(2, "big") - else: - prefix = b"\xDD" + length.to_bytes(4, "big") - return prefix + b"".join(_pack_obj(item) for item in items) + return bytes([0x90 | length]) + if length <= 0xFFFF: + return b"\xDC" + length.to_bytes(2, "big") + return b"\xDD" + length.to_bytes(4, "big") + + +def _pack_array(items): + return _pack_array_header(len(items)) + b"".join(_pack_obj(item) for item in items) def _pack_map(mapping): @@ -408,6 +421,181 @@ def _zstd_bytes(data): return result.stdout +def _forward_allocation_repro_payload(): + entry_count = 170 + declared_entries = entry_count * 2 + body = "R" * 200 + payload = bytearray() + + payload += _pack_array_header(2) + payload += _pack_str(TEST_TAG) + payload += _pack_array_header(declared_entries) + + for i in range(entry_count): + payload += _pack_obj([TEST_TS + i, {"log": body}]) + + return bytes(payload[:29229]).ljust(29229, b"\x00") + + +def _process_status_kb(pid, key): + status_path = Path("/proc") / str(pid) / "status" + prefix = f"{key}:" + + for line in status_path.read_text(encoding="utf-8").splitlines(): + if line.startswith(prefix): + return int(line.split()[1]) + + raise RuntimeError(f"{key} not found in {status_path}") + + +def _find_free_port(): + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: + sock.bind(("127.0.0.1", 0)) + return sock.getsockname()[1] + + +def _fluent_bit_binary_path(): + if os.environ.get("FLUENT_BIT_BINARY"): + return os.environ["FLUENT_BIT_BINARY"] + + return str(Path(__file__).resolve().parents[5] / "build" / "bin" / "fluent-bit") + + +def _write_forward_stdout_config(path, port): + path.write_text( + f"""[SERVICE] + Flush 1 + Daemon Off + Log_Level info + +[INPUT] + Name forward + Listen 127.0.0.1 + Port {port} + +[OUTPUT] + Name stdout + Match * +""", + encoding="utf-8", + ) + + +def _start_fluent_bit_process(config_file, log_file, data_limit_kb=None): + preexec_fn = None + + if data_limit_kb is not None: + def set_data_limit(): + import resource + + _, hard_limit = resource.getrlimit(resource.RLIMIT_DATA) + resource.setrlimit(resource.RLIMIT_DATA, (data_limit_kb * 1024, hard_limit)) + + preexec_fn = set_data_limit + + output = open(log_file, "a", encoding="utf-8") + process = subprocess.Popen( + [_fluent_bit_binary_path(), "-c", str(config_file), "-l", str(log_file)], + stdout=output, + stderr=subprocess.STDOUT, + text=True, + preexec_fn=preexec_fn, + ) + + return process, output + + +def _stop_fluent_bit_process(process, output): + try: + if process.poll() is None: + process.terminate() + try: + process.wait(timeout=5) + except subprocess.TimeoutExpired: + process.kill() + process.wait(timeout=5) + finally: + output.close() + + +def _wait_for_log_text(log_file, text, process, timeout=10): + deadline = time.time() + timeout + + while time.time() < deadline: + if log_file.exists() and text in read_file(log_file): + return True + if process.poll() is not None: + return False + time.sleep(0.1) + + return False + + +def _measure_forward_vmdata_kb(tmp_path): + port = _find_free_port() + config_file = tmp_path / "measure-forward.conf" + log_file = tmp_path / "measure-forward.log" + _write_forward_stdout_config(config_file, port) + + process, output = _start_fluent_bit_process(config_file, log_file) + try: + assert _wait_for_log_text(log_file, f"listening on 127.0.0.1:{port}", process) + return _process_status_kb(process.pid, "VmData") + finally: + _stop_fluent_bit_process(process, output) + + +def _restore_process_data_limit(pid): + try: + import resource + + _, hard_limit = resource.prlimit(pid, resource.RLIMIT_DATA) + resource.prlimit(pid, resource.RLIMIT_DATA, (hard_limit, hard_limit)) + except ProcessLookupError: + pass + + +def _run_forward_repro_attempt(tmp_path, data_limit_kb): + port = _find_free_port() + attempt_dir = tmp_path / f"data-limit-{data_limit_kb}" + attempt_dir.mkdir() + config_file = attempt_dir / "fluent-bit.conf" + log_file = attempt_dir / "fluent-bit.log" + _write_forward_stdout_config(config_file, port) + + process, output = _start_fluent_bit_process(config_file, log_file, data_limit_kb) + try: + if not _wait_for_log_text(log_file, f"listening on 127.0.0.1:{port}", process): + return False, "listener did not start" + + _send_tcp_payload(port, _forward_allocation_repro_payload()) + + deadline = time.time() + 3 + while time.time() < deadline: + log_text = read_file(log_file) + if "could not allocate msgpack unpacker buffer" in log_text: + _restore_process_data_limit(process.pid) + _send_tcp_payload( + port, + _message_mode_payload(TEST_TAG, {"message": "after-unpacker-nomem"}), + ) + if _wait_for_log_text(log_file, "after-unpacker-nomem", process, timeout=5): + return True, "triggered and recovered" + return False, "triggered but did not recover" + + if "fw_conn.c" in log_text and "Cannot allocate memory" in log_text: + return False, "connection allocation failed" + + if process.poll() is not None: + return False, f"process exited with {process.returncode}" + + time.sleep(0.1) + + return False, "allocation failure was not triggered" + finally: + _stop_fluent_bit_process(process, output) + + def _send_tcp_payload(port, payload): with socket.create_connection(("127.0.0.1", port), timeout=5) as sock: sock.sendall(payload) @@ -566,6 +754,28 @@ def test_in_forward_message_mode_partial_tcp_writes(): assert records[0]["message"] == "partial" +@pytest.mark.skipif(sys.platform != "linux", reason="process resource limits are Linux-only") +def test_in_forward_repro_payload_allocation_failure_closes_connection(tmp_path): + if os.environ.get("VALGRIND"): + pytest.skip("process resource limits do not give a deterministic failure under Valgrind") + + baseline_kb = _measure_forward_vmdata_kb(tmp_path) + attempts = [] + + for delta_kb in [ + 512, 640, 768, 896, 960, 992, 1008, 1016, 1024, 1040, 1088, 1152, 1280, + 1536, 2048, + ]: + data_limit_kb = baseline_kb + delta_kb + matched, status = _run_forward_repro_attempt(tmp_path, data_limit_kb) + attempts.append(f"{data_limit_kb} KiB: {status}") + + if matched: + return + + pytest.fail("could not trigger allocation failure; attempts: " + "; ".join(attempts)) + + def test_in_forward_message_mode_eventtime_ext(): service = Service("in_forward.yaml") service.start()