-
Notifications
You must be signed in to change notification settings - Fork 1.9k
in_forward: handle unpacker allocation failure #11809
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -6,7 +6,9 @@ | |
| import socket | ||
| import ssl | ||
| import subprocess | ||
| import sys | ||
| import tempfile | ||
| import time | ||
| import uuid | ||
| from pathlib import Path | ||
|
|
||
|
|
@@ -42,7 +44,7 @@ | |
| forward_server_stop, | ||
| ) | ||
| from server.http_server import configure_http_response, data_storage, http_server_run | ||
| from utils.data_utils import read_json_file | ||
| from utils.data_utils import read_file, read_json_file | ||
| from utils.test_service import FluentBitTestService | ||
|
|
||
|
|
||
|
|
@@ -124,6 +126,16 @@ def wait_for_record_count(self, minimum_count, timeout=10): | |
| description=f"{minimum_count} forwarded forward records", | ||
| ) | ||
|
|
||
| def wait_for_log_contains(self, text, timeout=10): | ||
| return self.service.wait_for_condition( | ||
| lambda: read_file(self.service.flb.log_file) | ||
| if text in read_file(self.service.flb.log_file) | ||
| else None, | ||
| timeout=timeout, | ||
| interval=0.5, | ||
| description=f"log text {text!r}", | ||
| ) | ||
|
|
||
|
|
||
| class StorageLimitService(Service): | ||
| def __init__(self, config_file): | ||
|
|
@@ -317,15 +329,16 @@ def _pack_ext(type_code, payload): | |
| raise ValueError(f"Unsupported ext payload size {length}") | ||
|
|
||
|
|
||
| def _pack_array(items): | ||
| length = len(items) | ||
| def _pack_array_header(length): | ||
| if length <= 15: | ||
| prefix = bytes([0x90 | length]) | ||
| elif length <= 0xFFFF: | ||
| prefix = b"\xDC" + length.to_bytes(2, "big") | ||
| else: | ||
| prefix = b"\xDD" + length.to_bytes(4, "big") | ||
| return prefix + b"".join(_pack_obj(item) for item in items) | ||
| return bytes([0x90 | length]) | ||
| if length <= 0xFFFF: | ||
| return b"\xDC" + length.to_bytes(2, "big") | ||
| return b"\xDD" + length.to_bytes(4, "big") | ||
|
|
||
|
|
||
| def _pack_array(items): | ||
| return _pack_array_header(len(items)) + b"".join(_pack_obj(item) for item in items) | ||
|
|
||
|
|
||
| def _pack_map(mapping): | ||
|
|
@@ -408,6 +421,181 @@ def _zstd_bytes(data): | |
| return result.stdout | ||
|
|
||
|
|
||
| def _forward_allocation_repro_payload(): | ||
| entry_count = 170 | ||
| declared_entries = entry_count * 2 | ||
| body = "R" * 200 | ||
| payload = bytearray() | ||
|
|
||
| payload += _pack_array_header(2) | ||
| payload += _pack_str(TEST_TAG) | ||
| payload += _pack_array_header(declared_entries) | ||
|
|
||
| for i in range(entry_count): | ||
| payload += _pack_obj([TEST_TS + i, {"log": body}]) | ||
|
|
||
| return bytes(payload[:29229]).ljust(29229, b"\x00") | ||
|
|
||
|
|
||
| def _process_status_kb(pid, key): | ||
| status_path = Path("/proc") / str(pid) / "status" | ||
| prefix = f"{key}:" | ||
|
|
||
| for line in status_path.read_text(encoding="utf-8").splitlines(): | ||
| if line.startswith(prefix): | ||
| return int(line.split()[1]) | ||
|
|
||
| raise RuntimeError(f"{key} not found in {status_path}") | ||
|
|
||
|
|
||
| def _find_free_port(): | ||
| with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: | ||
| sock.bind(("127.0.0.1", 0)) | ||
| return sock.getsockname()[1] | ||
|
|
||
|
|
||
| def _fluent_bit_binary_path(): | ||
| if os.environ.get("FLUENT_BIT_BINARY"): | ||
| return os.environ["FLUENT_BIT_BINARY"] | ||
|
|
||
| return str(Path(__file__).resolve().parents[5] / "build" / "bin" / "fluent-bit") | ||
|
|
||
|
|
||
| def _write_forward_stdout_config(path, port): | ||
| path.write_text( | ||
| f"""[SERVICE] | ||
| Flush 1 | ||
| Daemon Off | ||
| Log_Level info | ||
|
|
||
| [INPUT] | ||
| Name forward | ||
| Listen 127.0.0.1 | ||
| Port {port} | ||
|
|
||
| [OUTPUT] | ||
| Name stdout | ||
| Match * | ||
| """, | ||
| encoding="utf-8", | ||
| ) | ||
|
|
||
|
|
||
| def _start_fluent_bit_process(config_file, log_file, data_limit_kb=None): | ||
| preexec_fn = None | ||
|
|
||
| if data_limit_kb is not None: | ||
| def set_data_limit(): | ||
| import resource | ||
|
|
||
| _, hard_limit = resource.getrlimit(resource.RLIMIT_DATA) | ||
| resource.setrlimit(resource.RLIMIT_DATA, (data_limit_kb * 1024, hard_limit)) | ||
|
|
||
| preexec_fn = set_data_limit | ||
|
|
||
| output = open(log_file, "a", encoding="utf-8") | ||
| process = subprocess.Popen( | ||
| [_fluent_bit_binary_path(), "-c", str(config_file), "-l", str(log_file)], | ||
| stdout=output, | ||
| stderr=subprocess.STDOUT, | ||
| text=True, | ||
| preexec_fn=preexec_fn, | ||
| ) | ||
|
|
||
| return process, output | ||
|
Comment on lines
+496
to
+505
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. File handle leaked if If 🛠️ Suggested fix- output = open(log_file, "a", encoding="utf-8")
- process = subprocess.Popen(
- [_fluent_bit_binary_path(), "-c", str(config_file), "-l", str(log_file)],
- stdout=output,
- stderr=subprocess.STDOUT,
- text=True,
- preexec_fn=preexec_fn,
- )
-
- return process, output
+ output = open(log_file, "a", encoding="utf-8")
+ try:
+ process = subprocess.Popen(
+ [_fluent_bit_binary_path(), "-c", str(config_file), "-l", str(log_file)],
+ stdout=output,
+ stderr=subprocess.STDOUT,
+ text=True,
+ preexec_fn=preexec_fn,
+ )
+ except:
+ output.close()
+ raise
+
+ return process, output🧰 Tools🪛 Ruff (0.15.12)[error] 497-497: (S603) 🤖 Prompt for AI Agents |
||
|
|
||
|
|
||
| def _stop_fluent_bit_process(process, output): | ||
| try: | ||
| if process.poll() is None: | ||
| process.terminate() | ||
| try: | ||
| process.wait(timeout=5) | ||
| except subprocess.TimeoutExpired: | ||
| process.kill() | ||
| process.wait(timeout=5) | ||
| finally: | ||
| output.close() | ||
|
|
||
|
|
||
| def _wait_for_log_text(log_file, text, process, timeout=10): | ||
| deadline = time.time() + timeout | ||
|
|
||
| while time.time() < deadline: | ||
| if log_file.exists() and text in read_file(log_file): | ||
| return True | ||
| if process.poll() is not None: | ||
| return False | ||
| time.sleep(0.1) | ||
|
|
||
| return False | ||
|
|
||
|
|
||
| def _measure_forward_vmdata_kb(tmp_path): | ||
| port = _find_free_port() | ||
| config_file = tmp_path / "measure-forward.conf" | ||
| log_file = tmp_path / "measure-forward.log" | ||
| _write_forward_stdout_config(config_file, port) | ||
|
|
||
| process, output = _start_fluent_bit_process(config_file, log_file) | ||
| try: | ||
| assert _wait_for_log_text(log_file, f"listening on 127.0.0.1:{port}", process) | ||
| return _process_status_kb(process.pid, "VmData") | ||
| finally: | ||
| _stop_fluent_bit_process(process, output) | ||
|
|
||
|
|
||
| def _restore_process_data_limit(pid): | ||
| try: | ||
| import resource | ||
|
|
||
| _, hard_limit = resource.prlimit(pid, resource.RLIMIT_DATA) | ||
| resource.prlimit(pid, resource.RLIMIT_DATA, (hard_limit, hard_limit)) | ||
| except ProcessLookupError: | ||
| pass | ||
|
|
||
|
|
||
| def _run_forward_repro_attempt(tmp_path, data_limit_kb): | ||
| port = _find_free_port() | ||
| attempt_dir = tmp_path / f"data-limit-{data_limit_kb}" | ||
| attempt_dir.mkdir() | ||
| config_file = attempt_dir / "fluent-bit.conf" | ||
| log_file = attempt_dir / "fluent-bit.log" | ||
| _write_forward_stdout_config(config_file, port) | ||
|
|
||
| process, output = _start_fluent_bit_process(config_file, log_file, data_limit_kb) | ||
| try: | ||
| if not _wait_for_log_text(log_file, f"listening on 127.0.0.1:{port}", process): | ||
| return False, "listener did not start" | ||
|
|
||
| _send_tcp_payload(port, _forward_allocation_repro_payload()) | ||
|
|
||
| deadline = time.time() + 3 | ||
| while time.time() < deadline: | ||
| log_text = read_file(log_file) | ||
| if "could not allocate msgpack unpacker buffer" in log_text: | ||
| _restore_process_data_limit(process.pid) | ||
| _send_tcp_payload( | ||
| port, | ||
| _message_mode_payload(TEST_TAG, {"message": "after-unpacker-nomem"}), | ||
| ) | ||
| if _wait_for_log_text(log_file, "after-unpacker-nomem", process, timeout=5): | ||
| return True, "triggered and recovered" | ||
|
Comment on lines
+571
to
+583
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The regression never proves the failing connection was closed. The repro payload is sent through Also applies to: 757-776 🤖 Prompt for AI Agents |
||
| return False, "triggered but did not recover" | ||
|
|
||
| if "fw_conn.c" in log_text and "Cannot allocate memory" in log_text: | ||
| return False, "connection allocation failed" | ||
|
|
||
| if process.poll() is not None: | ||
| return False, f"process exited with {process.returncode}" | ||
|
|
||
| time.sleep(0.1) | ||
|
|
||
| return False, "allocation failure was not triggered" | ||
| finally: | ||
| _stop_fluent_bit_process(process, output) | ||
|
|
||
|
|
||
| def _send_tcp_payload(port, payload): | ||
| with socket.create_connection(("127.0.0.1", port), timeout=5) as sock: | ||
| sock.sendall(payload) | ||
|
|
@@ -566,6 +754,28 @@ def test_in_forward_message_mode_partial_tcp_writes(): | |
| assert records[0]["message"] == "partial" | ||
|
|
||
|
|
||
| @pytest.mark.skipif(sys.platform != "linux", reason="process resource limits are Linux-only") | ||
| def test_in_forward_repro_payload_allocation_failure_closes_connection(tmp_path): | ||
| if os.environ.get("VALGRIND"): | ||
| pytest.skip("process resource limits do not give a deterministic failure under Valgrind") | ||
|
|
||
| baseline_kb = _measure_forward_vmdata_kb(tmp_path) | ||
| attempts = [] | ||
|
|
||
| for delta_kb in [ | ||
| 512, 640, 768, 896, 960, 992, 1008, 1016, 1024, 1040, 1088, 1152, 1280, | ||
| 1536, 2048, | ||
| ]: | ||
| data_limit_kb = baseline_kb + delta_kb | ||
| matched, status = _run_forward_repro_attempt(tmp_path, data_limit_kb) | ||
| attempts.append(f"{data_limit_kb} KiB: {status}") | ||
|
|
||
| if matched: | ||
| return | ||
|
|
||
| pytest.fail("could not trigger allocation failure; attempts: " + "; ".join(attempts)) | ||
|
|
||
|
|
||
| def test_in_forward_message_mode_eventtime_ext(): | ||
| service = Service("in_forward.yaml") | ||
| service.start() | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reserve the port through process startup.
_find_free_port()releases the ephemeral socket before Fluent Bit binds it, so another process can steal that port in the gap. In this flow that turns into a sporadiclistener did not startresult that has nothing to do with the unpacker-allocation path you're trying to exercise.Also applies to: 558-569
🤖 Prompt for AI Agents