Skip to content

Commit e16169a

Browse files
committed
feat(fluentbit): add tag prefix support to HTTPFluentBitWriter
1 parent da8ad27 commit e16169a

2 files changed

Lines changed: 41 additions & 12 deletions

File tree

src/dstack/_internal/server/services/logs/fluentbit.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -164,15 +164,17 @@ def close(self) -> None: ...
164164
class HTTPFluentBitWriter:
165165
"""Writes logs to Fluent-bit via HTTP POST."""
166166

167-
def __init__(self, host: str, port: int) -> None:
167+
def __init__(self, host: str, port: int, tag_prefix: str) -> None:
168168
self._endpoint = f"http://{host}:{port}"
169169
self._client = httpx.Client(timeout=30.0)
170+
self._tag_prefix = tag_prefix
170171

171172
def write(self, tag: str, records: List[dict]) -> None:
173+
prefixed_tag = f"{self._tag_prefix}.{tag}" if self._tag_prefix else tag
172174
for record in records:
173175
try:
174176
response = self._client.post(
175-
f"{self._endpoint}/{tag}",
177+
f"{self._endpoint}/{prefixed_tag}",
176178
json=record,
177179
headers={"Content-Type": "application/json"},
178180
)
@@ -249,7 +251,9 @@ def __init__(
249251
self._tag_prefix = tag_prefix
250252

251253
if protocol == "http":
252-
self._writer: FluentBitWriter = HTTPFluentBitWriter(host=host, port=port)
254+
self._writer: FluentBitWriter = HTTPFluentBitWriter(
255+
host=host, port=port, tag_prefix=tag_prefix
256+
)
253257
elif protocol == "forward":
254258
self._writer = ForwardFluentBitWriter(host=host, port=port, tag_prefix=tag_prefix)
255259
else:

src/tests/_internal/server/services/test_fluentbit_logs.py

Lines changed: 34 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -61,11 +61,12 @@ def mock_httpx_client(self):
6161
yield mock.return_value
6262

6363
def test_init_creates_client(self, mock_httpx_client):
64-
writer = HTTPFluentBitWriter(host="localhost", port=8080)
64+
writer = HTTPFluentBitWriter(host="localhost", port=8080, tag_prefix="dstack")
6565
assert writer._endpoint == "http://localhost:8080"
66+
assert writer._tag_prefix == "dstack"
6667

6768
def test_write_posts_records(self, mock_httpx_client):
68-
writer = HTTPFluentBitWriter(host="localhost", port=8080)
69+
writer = HTTPFluentBitWriter(host="localhost", port=8080, tag_prefix="dstack")
6970
records = [
7071
{"message": "Hello", "@timestamp": "2023-10-06T10:00:00+00:00"},
7172
{"message": "World", "@timestamp": "2023-10-06T10:00:01+00:00"},
@@ -74,12 +75,12 @@ def test_write_posts_records(self, mock_httpx_client):
7475

7576
assert mock_httpx_client.post.call_count == 2
7677
mock_httpx_client.post.assert_any_call(
77-
"http://localhost:8080/test-tag",
78+
"http://localhost:8080/dstack.test-tag",
7879
json=records[0],
7980
headers={"Content-Type": "application/json"},
8081
)
8182
mock_httpx_client.post.assert_any_call(
82-
"http://localhost:8080/test-tag",
83+
"http://localhost:8080/dstack.test-tag",
8384
json=records[1],
8485
headers={"Content-Type": "application/json"},
8586
)
@@ -88,7 +89,7 @@ def test_write_calls_raise_for_status(self, mock_httpx_client):
8889
"""Test that response.raise_for_status() is called to detect non-2xx responses."""
8990
mock_response = Mock()
9091
mock_httpx_client.post.return_value = mock_response
91-
writer = HTTPFluentBitWriter(host="localhost", port=8080)
92+
writer = HTTPFluentBitWriter(host="localhost", port=8080, tag_prefix="dstack")
9293

9394
writer.write(tag="test-tag", records=[{"message": "test"}])
9495

@@ -105,7 +106,7 @@ def test_write_raises_on_http_status_error(self, mock_httpx_client):
105106
mock_response.raise_for_status.side_effect = httpx.HTTPStatusError(
106107
"Server Error", request=Mock(), response=mock_response
107108
)
108-
writer = HTTPFluentBitWriter(host="localhost", port=8080)
109+
writer = HTTPFluentBitWriter(host="localhost", port=8080, tag_prefix="dstack")
109110

110111
with pytest.raises(LogStorageError, match="Fluent-bit HTTP error: status 500"):
111112
writer.write(tag="test-tag", records=[{"message": "test"}])
@@ -114,16 +115,40 @@ def test_write_raises_on_transport_error(self, mock_httpx_client):
114115
import httpx
115116

116117
mock_httpx_client.post.side_effect = httpx.HTTPError("Connection failed")
117-
writer = HTTPFluentBitWriter(host="localhost", port=8080)
118+
writer = HTTPFluentBitWriter(host="localhost", port=8080, tag_prefix="dstack")
118119

119120
with pytest.raises(LogStorageError, match="Fluent-bit HTTP error"):
120121
writer.write(tag="test-tag", records=[{"message": "test"}])
121122

122123
def test_close_closes_client(self, mock_httpx_client):
123-
writer = HTTPFluentBitWriter(host="localhost", port=8080)
124+
writer = HTTPFluentBitWriter(host="localhost", port=8080, tag_prefix="dstack")
124125
writer.close()
125126
mock_httpx_client.close.assert_called_once()
126127

128+
def test_write_applies_tag_prefix(self, mock_httpx_client):
129+
"""Test that tag prefix is applied to tags in HTTP requests."""
130+
writer = HTTPFluentBitWriter(host="localhost", port=8080, tag_prefix="dstack")
131+
records = [{"message": "test"}]
132+
writer.write(tag="project/run/job", records=records)
133+
134+
mock_httpx_client.post.assert_called_once_with(
135+
"http://localhost:8080/dstack.project/run/job",
136+
json=records[0],
137+
headers={"Content-Type": "application/json"},
138+
)
139+
140+
def test_write_with_empty_tag_prefix(self, mock_httpx_client):
141+
"""Test that empty tag prefix doesn't break the tag."""
142+
writer = HTTPFluentBitWriter(host="localhost", port=8080, tag_prefix="")
143+
records = [{"message": "test"}]
144+
writer.write(tag="test-tag", records=records)
145+
146+
mock_httpx_client.post.assert_called_once_with(
147+
"http://localhost:8080/test-tag",
148+
json=records[0],
149+
headers={"Content-Type": "application/json"},
150+
)
151+
127152

128153
class TestForwardFluentBitWriter:
129154
"""Tests for the ForwardFluentBitWriter."""
@@ -240,7 +265,7 @@ def test_init_with_http_protocol(self, mock_http_writer):
240265
protocol="http",
241266
tag_prefix="dstack",
242267
)
243-
mock.assert_called_once_with(host="localhost", port=8080)
268+
mock.assert_called_once_with(host="localhost", port=8080, tag_prefix="dstack")
244269

245270
def test_init_with_unsupported_protocol_raises(self):
246271
with pytest.raises(LogStorageError, match="Unsupported Fluent-bit protocol"):

0 commit comments

Comments
 (0)