Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions plugins/in_exec/in_exec.c
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,7 @@ static struct flb_config_map config_map[] = {
struct flb_input_plugin in_exec_plugin = {
.name = "exec",
.description = "Exec Input",
.flags = FLB_INPUT_THREADED,
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Preserve conditional routing for exec inputs

Making every exec instance threaded disables an existing routing feature for this plugin: flb_input_log_append() explicitly skips conditional/per-record routes when flb_input_is_threaded(ins) is true (src/flb_input_log.c:1021-1025) and falls back to normal routing. Any exec input configured with routes.logs conditions or per-record routing will now send records to the default matched outputs instead of the conditional destinations, whereas it was non-threaded before this line. Please either keep exec non-threaded for configurations that use conditional routes or add threaded-input support for that routing path.

Useful? React with 👍 / 👎.

.cb_init = in_exec_init,
.cb_pre_run = in_exec_prerun,
.cb_collect = in_exec_collect,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
service:
flush: 1
grace: 1
log_level: info
http_server: on
http_listen: 127.0.0.1
http_port: ${FLUENT_BIT_HTTP_MONITORING_PORT}

pipeline:
inputs:
- name: exec
tag: test
command: curl -s http://127.0.0.1:${FLUENT_BIT_HTTP_MONITORING_PORT}/api/v1/metrics/prometheus
interval_sec: 1
buf_size: 128k

outputs:
- name: "null"
match: "*"
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import os
import time

from utils.http_matrix import run_curl_request
from utils.test_service import FluentBitTestService


class Service:
def __init__(self):
self.config_file = os.path.abspath(
os.path.join(
os.path.dirname(__file__),
"../config/internal_http_server_exec_deadlock.yaml",
)
)
self.service = FluentBitTestService(self.config_file)

def start(self):
self.service.start()
self.flb = self.service.flb
self.base_url = f"http://127.0.0.1:{self.flb.http_monitoring_port}"

def stop(self):
self.service.stop()

def request(self, path, *, method="GET", http_mode="http1.1"):
return run_curl_request(
f"{self.base_url}{path}",
method=method,
http_mode=http_mode,
)


def test_http_server_responsive_after_exec_self_request():
"""The built-in HTTP server must remain responsive after the exec input
plugin makes an HTTP request to it. Before the fix, the exec child
process (curl) and the HTTP server shared the same event loop, causing
a deadlock that made the server permanently unresponsive."""

service = Service()

try:
service.start()

result = service.request("/api/v1/uptime")
assert result["status_code"] == 200
assert "uptime_sec" in result["body"]

time.sleep(2)

result = service.service.wait_for_condition(
lambda: (
response
if response["status_code"] == 200 and "uptime_sec" in response["body"]
else None
) if (response := service.request("/api/v1/uptime")) else None,
timeout=10,
interval=1,
description="HTTP server responsive after exec self-request",
)
assert result["status_code"] == 200
assert "uptime_sec" in result["body"]
Comment thread
cosmo0920 marked this conversation as resolved.
finally:
service.stop()
Loading