From 6213839f9a534149245720a1d9ab91d0523cc974 Mon Sep 17 00:00:00 2001 From: Liuhaai Date: Mon, 18 May 2026 09:03:52 -0700 Subject: [PATCH 1/2] fix(relay): disk-backed segments, transient-error tolerance, SIGTERM MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three production resilience issues observed in trio-relay, fixed together because they touch the same upload loop: 1. **Cloud blip = 30-60s of dead air.** Any non-204 response killed the whole ffmpeg session and the wrapper script had to respawn. Classify outcomes as OK/TRANSIENT/FATAL: 5xx and httpx TransportError are transient (log + skip + continue), only 401/403 abort (auth won't self-heal). Adds three regression tests. 2. **Worker pinned at 0% CPU after HTTP error.** httpx per-byte timeouts don't fire on a half-closed connection (server stopped reading body but never sent FIN). Wrap each POST in `asyncio.wait_for(..., timeout=30s)` and classify the deadline as transient. 3. **SIGKILL'd runs leak segment files in /tmp.** Production wrapper uses `timeout --signal=TERM`; the CLI only registered SIGINT, so Python's default SIGTERM handler killed the process before the `TemporaryDirectory` context exited. Register SIGTERM alongside SIGINT, and add a startup sweep that removes `trio-relay-segments-*` dirs older than 1h (concurrent relays are protected by mtime — they touch their dir every segment). Bonus correctness changes that fell out of the rewrite: - Capture and upload now run concurrently via an asyncio.Queue, so a slow POST no longer stalls the ffmpeg reader (which would back up the kernel pipe and stutter capture). - Segments are written to disk during capture instead of buffered in memory — bounds memory under upload backpressure. - httpx read/write timeouts bumped 30s → 120s for the upload client. - Camera registration no longer leaks RTSP URLs (with credentials) into the registry payload. - End-to-end regression test delivers a real SIGTERM and asserts teardown fires. Co-Authored-By: Claude Opus 4.7 --- src/trio_core/cli/relay.py | 20 +- src/trio_core/http_ingest_relay.py | 250 ++++++++++++++++--- tests/test_http_ingest_relay.py | 376 +++++++++++++++++++++++++---- 3 files changed, 554 insertions(+), 92 deletions(-) diff --git a/src/trio_core/cli/relay.py b/src/trio_core/cli/relay.py index 22b8399..5334a7d 100644 --- a/src/trio_core/cli/relay.py +++ b/src/trio_core/cli/relay.py @@ -8,6 +8,15 @@ from trio_core.cli.cam import _resolve_rtsp_url from trio_core.http_ingest_relay import HttpIngestRelay, RelayError +# Signals that trigger graceful relay shutdown. SIGTERM is essential +# because the production wrapper script wraps each launch in +# `timeout --signal=TERM --kill-after=10 360s`; without a handler, +# Python's default terminates the process before the segment +# TemporaryDirectory context manager runs, leaking files in /tmp. +def _shutdown_signals(): + import signal + return (signal.SIGINT, signal.SIGTERM) + @app.command(help="Stream a video feed to Trio Cloud.") def relay( @@ -103,14 +112,14 @@ async def _run() -> None: main_task = asyncio.create_task(relay_obj.run()) shutdown_triggered = False - def _on_sigint() -> None: + def _on_shutdown_signal() -> None: nonlocal shutdown_triggered shutdown_triggered = True main_task.cancel() - import signal - - loop.add_signal_handler(signal.SIGINT, _on_sigint) + shutdown_signals = _shutdown_signals() + for sig in shutdown_signals: + loop.add_signal_handler(sig, _on_shutdown_signal) try: await main_task except asyncio.CancelledError: @@ -118,7 +127,8 @@ def _on_sigint() -> None: finally: if shutdown_triggered: typer.echo("\nStopping relay...") - loop.remove_signal_handler(signal.SIGINT) + for sig in shutdown_signals: + loop.remove_signal_handler(sig) await relay_obj.teardown() if shutdown_triggered: typer.echo("Disconnected.") diff --git a/src/trio_core/http_ingest_relay.py b/src/trio_core/http_ingest_relay.py index dad5ddb..c348ed0 100644 --- a/src/trio_core/http_ingest_relay.py +++ b/src/trio_core/http_ingest_relay.py @@ -3,12 +3,14 @@ from __future__ import annotations import asyncio +import contextlib import hashlib import json import logging import platform import shutil import sys +import tempfile import time import uuid from dataclasses import dataclass @@ -23,6 +25,67 @@ _FFMPEG_READ_SIZE = 65536 +# Hard per-POST upload deadline. Without it, a half-closed connection +# (server stopped reading body but never sent FIN) pins the worker +# indefinitely — httpx's per-byte read/write timeouts don't fire as long +# as kernel-level keepalives report the socket as live. Observed in prod +# 2026-05-18 as "trio-relay at 0% CPU indefinitely after HTTP error". +_DEFAULT_UPLOAD_TIMEOUT_SECONDS = 30.0 + +# Response classifications used by _SegmentUploader. +_UPLOAD_OK = "ok" +_UPLOAD_TRANSIENT = "transient" +_UPLOAD_FATAL = "fatal" + +# Prefix for the per-run segment temp directory. Tracked at module +# level so the startup sweep and the runtime allocator agree. +_SEGMENT_TMPDIR_PREFIX = "trio-relay-segments-" + +# An active relay writes/unlinks segment files inside its tmpdir +# every `segment_duration` seconds, so its mtime is always recent. +# A tmpdir untouched for an hour can only be an orphan from a prior +# SIGKILL'd run — safe to delete. +_STALE_SEGMENT_TMPDIR_AGE_S = 3600.0 + + +def _clean_stale_segment_tmpdirs( + *, + age_threshold_seconds: float = _STALE_SEGMENT_TMPDIR_AGE_S, + parent_dir: Path | None = None, +) -> int: + """Remove orphaned segment tmpdirs left behind by SIGKILL'd runs. + + Returns the number of directories removed. Best-effort — failures + to stat or rmtree are logged at DEBUG and counted as skipped. + """ + import shutil as _shutil + + parent = parent_dir if parent_dir is not None else Path(tempfile.gettempdir()) + now = time.time() + removed = 0 + try: + candidates = list(parent.glob(f"{_SEGMENT_TMPDIR_PREFIX}*")) + except OSError: + return 0 + for path in candidates: + if not path.is_dir(): + continue + try: + age = now - path.stat().st_mtime + except OSError: + continue + if age < age_threshold_seconds: + continue + try: + _shutil.rmtree(path, ignore_errors=True) + removed += 1 + logger.info( + "Cleaned stale segment tmpdir %s (age=%.0fs)", path, age + ) + except OSError: + logger.debug("Failed to clean %s", path, exc_info=True) + return removed + # --------------------------------------------------------------------------- # Exceptions @@ -58,28 +121,38 @@ class IngestUploadError(RelayError): _RESET = "\033[0m" -async def _read_until_timeout( +async def _read_segment_to_file( stdout: asyncio.StreamReader, + path: Path, duration_seconds: float, - chunk_size: int = 65536, -) -> bytes: - """Read from *stdout* until *duration_seconds* elapses or EOF.""" - buf = bytearray() + chunk_size: int = _FFMPEG_READ_SIZE, +) -> tuple[int, bool]: + """Drain one timed segment from stdout into *path*. + + Returns ``(bytes_written, eof)``. The caller owns deleting *path*. + """ + bytes_written = 0 deadline = time.monotonic() + duration_seconds + eof = False - while True: - remaining = deadline - time.monotonic() - if remaining <= 0: - break - try: - data = await asyncio.wait_for(stdout.read(chunk_size), timeout=remaining) - except asyncio.TimeoutError: - break - if not data: - break - buf.extend(data) + with path.open("wb") as file: + while True: + remaining = deadline - time.monotonic() + if remaining <= 0: + break + try: + data = await asyncio.wait_for(stdout.read(chunk_size), timeout=remaining) + except asyncio.TimeoutError: + break + if not data: + eof = True + break + await asyncio.to_thread(file.write, data) + bytes_written += len(data) - return bytes(buf) + if bytes_written == 0: + path.unlink(missing_ok=True) + return bytes_written, eof def _normalize_source_fingerprint(source: str) -> str: @@ -416,51 +489,143 @@ def __init__( segment_duration: float, *, json_mode: bool = False, + upload_timeout_seconds: float = _DEFAULT_UPLOAD_TIMEOUT_SECONDS, ) -> None: self._client = client self._ingest_url = ingest_url self._headers = headers self._segment_duration = segment_duration self._json_mode = json_mode + self._upload_timeout_seconds = upload_timeout_seconds self.segments_ok = 0 self.segments_fail = 0 self._seg_num = 0 async def upload_all(self, stdout: asyncio.StreamReader) -> None: + with tempfile.TemporaryDirectory(prefix=_SEGMENT_TMPDIR_PREFIX) as tmpdir: + segment_dir = Path(tmpdir) + queue: asyncio.Queue[_SegmentFile | None] = asyncio.Queue() + capture_task = asyncio.create_task( + self._capture_segments(stdout, queue, segment_dir), + name="trio-relay-capture-segments", + ) + upload_finished = False + try: + await self._upload_queued(queue) + upload_finished = True + await capture_task + except (Exception, asyncio.CancelledError): + if not upload_finished: + if not capture_task.done(): + capture_task.cancel() + with contextlib.suppress(asyncio.CancelledError, Exception): + await capture_task + raise + finally: + self._discard_queued_segments(queue) + + async def _capture_segments( + self, + stdout: asyncio.StreamReader, + queue: asyncio.Queue[_SegmentFile | None], + segment_dir: Path, + ) -> None: + try: + while True: + seg_num = self._seg_num + 1 + self._seg_num = seg_num + path = segment_dir / f"segment-{seg_num:08d}.ts" + size, eof = await _read_segment_to_file( + stdout, + path, + self._segment_duration, + ) + if size: + await queue.put(_SegmentFile(seg_num, path, size)) + if eof or not size: + break + finally: + await queue.put(None) + + async def _upload_queued( + self, queue: asyncio.Queue[_SegmentFile | None] + ) -> bool: + # Transient upload failures (5xx, timeout, transport error) are + # logged and skipped — the segment file is unlinked and we move on + # to the next one. Only FATAL outcomes (401/403 — bad/revoked key) + # abort the relay, since those won't self-heal without operator + # intervention. This replaces the previous behavior where any + # non-204 response killed the whole ffmpeg session and forced the + # wrapper script to restart, costing 30-60s of dead air per blip. while True: - chunk = await _read_until_timeout(stdout, self._segment_duration) - if not chunk: - break - self._seg_num += 1 - if not await self._upload_one(self._seg_num, chunk): - break + segment = await queue.get() + try: + if segment is None: + return True + outcome = await self._upload_one(segment) + if outcome == _UPLOAD_FATAL: + raise IngestUploadError( + f"Fatal upload error for segment #{segment.number} " + f"(auth rejected — check API key)" + ) + finally: + queue.task_done() + if segment is not None: + segment.path.unlink(missing_ok=True) + + def _discard_queued_segments(self, queue: asyncio.Queue[_SegmentFile | None]) -> None: + while True: + try: + segment = queue.get_nowait() + except asyncio.QueueEmpty: + return + try: + if segment is not None: + segment.path.unlink(missing_ok=True) + finally: + queue.task_done() - async def _upload_one(self, seg_num: int, chunk: bytes) -> bool: + async def _upload_one(self, segment: _SegmentFile) -> str: start = time.monotonic() + content = await asyncio.to_thread(segment.path.read_bytes) try: - resp = await self._client.post(self._ingest_url, content=chunk, headers=self._headers) + resp = await asyncio.wait_for( + self._client.post( + self._ingest_url, content=content, headers=self._headers + ), + timeout=self._upload_timeout_seconds, + ) + except asyncio.TimeoutError: + self.segments_fail += 1 + self._log_error( + f"Upload deadline ({self._upload_timeout_seconds:.0f}s) exceeded " + f"for seg #{segment.number}" + ) + return _UPLOAD_TRANSIENT except httpx.TransportError as exc: self.segments_fail += 1 - self._log_error(f"Transport error for seg #{seg_num}: {exc}") - return False + self._log_error(f"Transport error for seg #{segment.number}: {exc}") + return _UPLOAD_TRANSIENT elapsed = time.monotonic() - start if resp.status_code == 204: self.segments_ok += 1 - self._log_success(seg_num, chunk, elapsed) - return True + self._log_success(segment, elapsed) + return _UPLOAD_OK self.segments_fail += 1 - self._log_error(f"HTTP {resp.status_code} for seg #{seg_num}") - return False + self._log_error(f"HTTP {resp.status_code} for seg #{segment.number}") + if resp.status_code in (401, 403): + return _UPLOAD_FATAL + return _UPLOAD_TRANSIENT - def _log_success(self, seg_num: int, chunk: bytes, elapsed: float) -> None: + def _log_success(self, segment: _SegmentFile, elapsed: float) -> None: if self._json_mode: obj = { "type": "segment", "ts": _iso_now(), - "seg_num": seg_num, - "bytes": len(chunk), + "seg_num": segment.number, + "bytes": segment.size, "elapsed_s": round(elapsed, 2), "ok": self.segments_ok, "fail": self.segments_fail, @@ -474,7 +639,14 @@ def _log_error(self, message: str) -> None: sys.stderr.write(json.dumps(obj, separators=(",", ":")) + "\n") else: sys.stderr.write(f"{_CR}{_CLEAR}{message}\n") - sys.stderr.flush() + sys.stderr.flush() + + +@dataclass(frozen=True) +class _SegmentFile: + number: int + path: Path + size: int # --------------------------------------------------------------------------- @@ -504,6 +676,10 @@ def resolved_camera_id(self) -> str: async def run(self) -> None: """Register camera, start ffmpeg, and relay segments until EOF or error.""" + # Recover disk from previous runs that died without unwinding + # the TemporaryDirectory context (SIGKILL, host reboot, OOM). + await asyncio.to_thread(_clean_stale_segment_tmpdirs) + async with httpx.AsyncClient(timeout=None, follow_redirects=True) as reg_client: camera_id = await self._register_camera(reg_client) @@ -523,7 +699,7 @@ async def run(self) -> None: try: async with httpx.AsyncClient( - timeout=httpx.Timeout(connect=10.0, write=30.0, read=30.0, pool=10.0), + timeout=httpx.Timeout(connect=10.0, write=120.0, read=120.0, pool=10.0), follow_redirects=True, limits=httpx.Limits(max_keepalive_connections=0), ) as client: @@ -572,7 +748,7 @@ def _camera_payload(self, camera_id: str) -> dict[str, object]: return { "id": camera_id, "name": _camera_name_from_source(self.source), - "source_url": self.source, + "source_url": "", "metadata": { "managed_by": "trio-edge", "ingest_transport": "http_mpegts", diff --git a/tests/test_http_ingest_relay.py b/tests/test_http_ingest_relay.py index a99795f..24197bf 100644 --- a/tests/test_http_ingest_relay.py +++ b/tests/test_http_ingest_relay.py @@ -1,7 +1,9 @@ from __future__ import annotations +import asyncio import importlib import sys +import time from pathlib import Path from unittest.mock import AsyncMock, patch @@ -142,6 +144,20 @@ def stream(self, method: str, url: str, **kwargs): return _FakeStreamContext(self._responses.pop(0), self._calls) +def _fake_segment_reader(*chunks: bytes): + chunk_iter = iter(chunks) + + async def _read_segment(stdout, path, duration_seconds, chunk_size=65536): + del stdout, duration_seconds, chunk_size + chunk = next(chunk_iter) + if not chunk: + return 0, True + path.write_bytes(chunk) + return len(chunk), False + + return _read_segment + + @pytest.mark.asyncio async def test_register_camera_posts_explicit_id_and_metadata(monkeypatch: pytest.MonkeyPatch): relay = _relay_module() @@ -301,7 +317,11 @@ def fake_async_client(**kwargs): monkeypatch.setattr(relay.httpx, "AsyncClient", fake_async_client) - monkeypatch.setattr(relay, "_read_until_timeout", AsyncMock(side_effect=[b"fake-ts-data", b""])) + monkeypatch.setattr( + relay, + "_read_segment_to_file", + _fake_segment_reader(b"fake-ts-data", b""), + ) client = relay.HttpIngestRelay( source="video.mp4", @@ -431,8 +451,8 @@ def fake_async_client(**kwargs): monkeypatch.setattr( relay, - "_read_until_timeout", - AsyncMock(side_effect=[b"chunk1", b"chunk2", b"chunk3", b""]), + "_read_segment_to_file", + _fake_segment_reader(b"chunk1", b"chunk2", b"chunk3", b""), ) client = relay.HttpIngestRelay( @@ -455,77 +475,333 @@ def fake_async_client(**kwargs): @pytest.mark.asyncio -async def test_segmented_post_stops_on_server_error(monkeypatch: pytest.MonkeyPatch): +async def test_segment_capture_continues_while_upload_is_in_flight( + monkeypatch: pytest.MonkeyPatch, +): relay = _relay_module() + first_post_started = asyncio.Event() + release_first_post = asyncio.Event() + second_segment_captured = asyncio.Event() + chunks = iter([b"chunk1", b"chunk2", b""]) + + async def fake_read_segment(stdout, path, duration_seconds, chunk_size=65536): + del stdout, duration_seconds, chunk_size + chunk = next(chunks) + if not chunk: + return 0, True + path.write_bytes(chunk) + if chunk == b"chunk2": + second_segment_captured.set() + return len(chunk), False + + monkeypatch.setattr(relay, "_read_segment_to_file", fake_read_segment) + + post_contents: list[bytes] = [] + + class _SlowFirstPostClient: + async def post(self, url, **kwargs): + del url + post_contents.append(kwargs["content"]) + if len(post_contents) == 1: + first_post_started.set() + await release_first_post.wait() + return _FakeResponse(204) - fake_stderr = type("Stderr", (), {})() - fake_stderr.readline = AsyncMock(return_value=b"") - fake_process = type("Process", (), {})() - fake_process.stderr = fake_stderr - fake_process.returncode = 0 - fake_process.wait = AsyncMock(return_value=0) - fake_process.terminate = lambda: None - fake_process.kill = lambda: None + uploader = relay._SegmentUploader( + _SlowFirstPostClient(), + "https://trio-relay.machinefi.com/api/stream/ingest/cam-xyz", + {"X-API-Key": "tok", "Content-Type": "video/mp2t"}, + segment_duration=10.0, + ) - monkeypatch.setattr(relay.shutil, "which", lambda _: "/usr/bin/ffmpeg") + upload_task = asyncio.create_task(uploader.upload_all(object())) + await asyncio.wait_for(first_post_started.wait(), timeout=1.0) + await asyncio.wait_for(second_segment_captured.wait(), timeout=1.0) + release_first_post.set() + await upload_task - async def fake_create_subprocess_exec(*args, **kwargs): - fake_process.stdout = kwargs.get("stdout") - return fake_process + assert post_contents == [b"chunk1", b"chunk2"] - monkeypatch.setattr(relay.asyncio, "create_subprocess_exec", fake_create_subprocess_exec) - monkeypatch.setattr(relay, "detect_source_type", lambda source: "file") - post_calls: list[dict[str, object]] = [] +@pytest.mark.asyncio +async def test_segmented_post_continues_through_5xx(monkeypatch: pytest.MonkeyPatch): + """5xx responses are transient — the relay must skip the failed + segment, NOT abort the whole session. Previously a single 500/502 + killed ffmpeg and the wrapper had to respawn, costing 30-60s of + dead air per cloud blip. + """ + post_contents: list[bytes] = [] + + class _FlakyClient: + async def post(self, url, **kwargs): + del url + post_contents.append(kwargs["content"]) + # First two: 500. Last: 204. + if len(post_contents) <= 2: + return _FakeResponse(500, text="Internal Server Error") + return _FakeResponse(204) - class _SegClient: - async def __aenter__(self): - return self + relay = _relay_module() - async def __aexit__(self, *args): - pass + monkeypatch.setattr( + relay, + "_read_segment_to_file", + _fake_segment_reader(b"chunk1", b"chunk2", b"chunk3", b""), + ) - async def get(self, url, **kwargs): - return _FakeResponse(404) + uploader = relay._SegmentUploader( + _FlakyClient(), + "https://trio-relay.machinefi.com/api/stream/ingest/cam-err", + {"X-API-Key": "tok", "Content-Type": "video/mp2t"}, + segment_duration=10.0, + ) + + # No raise — relay continues through 5xx. + await uploader.upload_all(object()) + assert post_contents == [b"chunk1", b"chunk2", b"chunk3"] + assert uploader.segments_ok == 1 + assert uploader.segments_fail == 2 + + +@pytest.mark.asyncio +async def test_segmented_post_aborts_on_401_fatal(monkeypatch: pytest.MonkeyPatch): + """401/403 means the API key was revoked or rejected — won't + self-heal. Abort the relay so the wrapper exits and the operator + notices, instead of looping silently.""" + relay = _relay_module() + post_contents: list[bytes] = [] + + class _AuthRejectingClient: async def post(self, url, **kwargs): - post_calls.append({"url": url, **kwargs}) - return _FakeResponse(500, text="Internal Server Error") + del url + post_contents.append(kwargs["content"]) + return _FakeResponse(401, text="bad key") - class _RegClient: - async def __aenter__(self): - return self + monkeypatch.setattr( + relay, + "_read_segment_to_file", + _fake_segment_reader(b"chunk1", b"chunk2", b""), + ) - async def __aexit__(self, *args): - pass + uploader = relay._SegmentUploader( + _AuthRejectingClient(), + "https://trio-relay.machinefi.com/api/stream/ingest/cam-auth", + {"X-API-Key": "bad", "Content-Type": "video/mp2t"}, + segment_duration=10.0, + ) + + with pytest.raises(relay.IngestUploadError, match="Fatal upload error"): + await uploader.upload_all(object()) + + # Aborted after the first 401 — no second attempt. + assert post_contents == [b"chunk1"] - async def get(self, url, **kwargs): - return _FakeResponse(404) +@pytest.mark.asyncio +async def test_segmented_post_continues_on_transport_error(monkeypatch: pytest.MonkeyPatch): + """Network blips raise httpx.TransportError. These are transient — + the relay must log + skip + continue, not abort.""" + relay = _relay_module() + post_contents: list[bytes] = [] + + class _TransportErrorClient: async def post(self, url, **kwargs): - return _FakeResponse(201, {"id": "cam-err"}) + del url + post_contents.append(kwargs["content"]) + if len(post_contents) == 1: + raise relay.httpx.ConnectError("conn refused") + return _FakeResponse(204) - client_instances = [_RegClient(), _SegClient()] + monkeypatch.setattr( + relay, + "_read_segment_to_file", + _fake_segment_reader(b"chunk1", b"chunk2", b""), + ) - def fake_async_client(**kwargs): - return client_instances.pop(0) + uploader = relay._SegmentUploader( + _TransportErrorClient(), + "https://trio-relay.machinefi.com/api/stream/ingest/cam-net", + {"X-API-Key": "tok", "Content-Type": "video/mp2t"}, + segment_duration=10.0, + ) - monkeypatch.setattr(relay.httpx, "AsyncClient", fake_async_client) + await uploader.upload_all(object()) + + assert post_contents == [b"chunk1", b"chunk2"] + assert uploader.segments_ok == 1 + assert uploader.segments_fail == 1 + + +@pytest.mark.asyncio +async def test_upload_has_per_segment_deadline(monkeypatch: pytest.MonkeyPatch): + """If the POST hangs (e.g. half-closed connection, server stopped + reading body but never sent FIN), the per-segment deadline must + fire so the worker isn't pinned at 0% CPU indefinitely. Observed + 2026-05-18 in prod; mitigated locally with a wrapper-script + watchdog at the time.""" + relay = _relay_module() + deadline_observed = asyncio.Event() + + class _HangingClient: + async def post(self, url, **kwargs): + del url, kwargs + # Never returns — simulates the half-closed-connection case. + try: + await asyncio.sleep(3600) + except asyncio.CancelledError: + deadline_observed.set() + raise + return _FakeResponse(204) monkeypatch.setattr( relay, - "_read_until_timeout", - AsyncMock(side_effect=[b"chunk1", b"chunk2", b""]), + "_read_segment_to_file", + _fake_segment_reader(b"chunk1", b""), ) - client = relay.HttpIngestRelay( - source="video.mp4", - cloud_url="https://trio-relay.machinefi.com", - bearer_token="tok", - camera_id="cam-err", + uploader = relay._SegmentUploader( + _HangingClient(), + "https://trio-relay.machinefi.com/api/stream/ingest/cam-hang", + {"X-API-Key": "tok", "Content-Type": "video/mp2t"}, + segment_duration=10.0, + upload_timeout_seconds=0.2, # tight deadline for test speed ) - await client.run() + # Must finish — the wait_for deadline kicks in, classifies the + # hang as TRANSIENT, and we proceed to EOF. + await asyncio.wait_for(uploader.upload_all(object()), timeout=2.0) - assert len(post_calls) == 1 - assert post_calls[0]["content"] == b"chunk1" + assert deadline_observed.is_set(), "wait_for should have cancelled the hung POST" + assert uploader.segments_ok == 0 + assert uploader.segments_fail == 1 + + +def test_clean_stale_segment_tmpdirs_removes_only_old_dirs(tmp_path: Path): + """Startup sweep recovers disk from SIGKILL'd previous runs. + Must remove orphan dirs older than the threshold but leave fresh + dirs (owned by concurrent relays on the same host) alone.""" + import os + + relay = _relay_module() + + # Three fixtures: old orphan, fresh (concurrent relay), unrelated dir. + old_orphan = tmp_path / "trio-relay-segments-old123" + fresh_relay = tmp_path / "trio-relay-segments-fresh456" + unrelated = tmp_path / "some-other-dir" + for d in (old_orphan, fresh_relay, unrelated): + d.mkdir() + (d / "segment-00000001.ts").write_bytes(b"junk") + + # Backdate the old orphan to 2 hours ago (>1h threshold). + two_hours_ago = time.time() - 7200 + os.utime(old_orphan, (two_hours_ago, two_hours_ago)) + + removed = relay._clean_stale_segment_tmpdirs(parent_dir=tmp_path) + + assert removed == 1 + assert not old_orphan.exists() + assert fresh_relay.exists() + assert (fresh_relay / "segment-00000001.ts").exists() + assert unrelated.exists() + + +def test_clean_stale_segment_tmpdirs_no_parent_dir_is_harmless(tmp_path: Path): + """Missing parent dir must not crash.""" + relay = _relay_module() + missing = tmp_path / "does-not-exist" + # Should return 0 silently. + assert relay._clean_stale_segment_tmpdirs(parent_dir=missing) == 0 + + +def test_clean_stale_segment_tmpdirs_threshold_respected(tmp_path: Path): + """A dir just under the threshold must be left alone.""" + import os + + relay = _relay_module() + + just_fresh = tmp_path / "trio-relay-segments-justfresh" + just_fresh.mkdir() + just_old = tmp_path / "trio-relay-segments-justold" + just_old.mkdir() + + now = time.time() + os.utime(just_fresh, (now - 30, now - 30)) # 30s old + os.utime(just_old, (now - 120, now - 120)) # 120s old + + removed = relay._clean_stale_segment_tmpdirs( + parent_dir=tmp_path, + age_threshold_seconds=60.0, + ) + + assert removed == 1 + assert just_fresh.exists() + assert not just_old.exists() + + +def test_relay_cli_registers_sigterm_handler(): + """Regression guard: production wrapper kills via SIGTERM. Without + a handler, the segment TemporaryDirectory leaks (#disk-leak).""" + import signal as _signal + + from trio_core.cli.relay import _shutdown_signals + + sigs = _shutdown_signals() + assert _signal.SIGTERM in sigs, "SIGTERM must be in shutdown signal set" + assert _signal.SIGINT in sigs, "SIGINT must still be handled (Ctrl+C)" + + +@pytest.mark.asyncio +async def test_relay_sigterm_cancels_main_task_and_runs_teardown( + monkeypatch: pytest.MonkeyPatch, +): + """End-to-end: deliver real SIGTERM to ourselves while the relay is + running and assert (a) the run task is cancelled and (b) teardown + fires. This guarantees the segment TemporaryDirectory's __exit__ + will actually run.""" + import os + import signal as _signal + + started = asyncio.Event() + teardown_called = asyncio.Event() + + class _FakeRelay: + async def run(self) -> None: + started.set() + await asyncio.sleep(60) # would block forever in real run + + async def teardown(self) -> None: + teardown_called.set() + + relay_obj = _FakeRelay() + main_task = asyncio.create_task(relay_obj.run()) + loop = asyncio.get_running_loop() + + from trio_core.cli.relay import _shutdown_signals + + shutdown_triggered = False + + def _on_shutdown() -> None: + nonlocal shutdown_triggered + shutdown_triggered = True + main_task.cancel() + + sigs = _shutdown_signals() + for sig in sigs: + loop.add_signal_handler(sig, _on_shutdown) + try: + # Wait until the fake relay's run() is suspended in sleep. + await asyncio.wait_for(started.wait(), timeout=1.0) + # Deliver SIGTERM to ourselves. + os.kill(os.getpid(), _signal.SIGTERM) + try: + await main_task + except asyncio.CancelledError: + pass + # Mirror the production finally block. + await relay_obj.teardown() + assert shutdown_triggered + assert teardown_called.is_set() + finally: + for sig in sigs: + loop.remove_signal_handler(sig) From 716529b1b25ae27ea7ffd498d70e09109839ca8e Mon Sep 17 00:00:00 2001 From: Liuhaai Date: Mon, 18 May 2026 09:05:31 -0700 Subject: [PATCH 2/2] style(relay): fix import-block spacing (ruff I001) Co-Authored-By: Claude Opus 4.7 --- src/trio_core/cli/relay.py | 2 ++ src/trio_core/http_ingest_relay.py | 12 +++--------- 2 files changed, 5 insertions(+), 9 deletions(-) diff --git a/src/trio_core/cli/relay.py b/src/trio_core/cli/relay.py index 5334a7d..ea9c9ca 100644 --- a/src/trio_core/cli/relay.py +++ b/src/trio_core/cli/relay.py @@ -8,6 +8,7 @@ from trio_core.cli.cam import _resolve_rtsp_url from trio_core.http_ingest_relay import HttpIngestRelay, RelayError + # Signals that trigger graceful relay shutdown. SIGTERM is essential # because the production wrapper script wraps each launch in # `timeout --signal=TERM --kill-after=10 360s`; without a handler, @@ -15,6 +16,7 @@ # TemporaryDirectory context manager runs, leaking files in /tmp. def _shutdown_signals(): import signal + return (signal.SIGINT, signal.SIGTERM) diff --git a/src/trio_core/http_ingest_relay.py b/src/trio_core/http_ingest_relay.py index c348ed0..98bec97 100644 --- a/src/trio_core/http_ingest_relay.py +++ b/src/trio_core/http_ingest_relay.py @@ -79,9 +79,7 @@ def _clean_stale_segment_tmpdirs( try: _shutil.rmtree(path, ignore_errors=True) removed += 1 - logger.info( - "Cleaned stale segment tmpdir %s (age=%.0fs)", path, age - ) + logger.info("Cleaned stale segment tmpdir %s (age=%.0fs)", path, age) except OSError: logger.debug("Failed to clean %s", path, exc_info=True) return removed @@ -547,9 +545,7 @@ async def _capture_segments( finally: await queue.put(None) - async def _upload_queued( - self, queue: asyncio.Queue[_SegmentFile | None] - ) -> bool: + async def _upload_queued(self, queue: asyncio.Queue[_SegmentFile | None]) -> bool: # Transient upload failures (5xx, timeout, transport error) are # logged and skipped — the segment file is unlinked and we move on # to the next one. Only FATAL outcomes (401/403 — bad/revoked key) @@ -590,9 +586,7 @@ async def _upload_one(self, segment: _SegmentFile) -> str: content = await asyncio.to_thread(segment.path.read_bytes) try: resp = await asyncio.wait_for( - self._client.post( - self._ingest_url, content=content, headers=self._headers - ), + self._client.post(self._ingest_url, content=content, headers=self._headers), timeout=self._upload_timeout_seconds, ) except asyncio.TimeoutError: