Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 17 additions & 5 deletions src/trio_core/cli/relay.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,17 @@
from trio_core.http_ingest_relay import HttpIngestRelay, RelayError


# Signals that trigger graceful relay shutdown. SIGTERM is essential
# because the production wrapper script wraps each launch in
# `timeout --signal=TERM --kill-after=10 360s`; without a handler,
# Python's default terminates the process before the segment
# TemporaryDirectory context manager runs, leaking files in /tmp.
def _shutdown_signals():
import signal

return (signal.SIGINT, signal.SIGTERM)


@app.command(help="Stream a video feed to Trio Cloud.")
def relay(
cloud: str = typer.Option(
Expand Down Expand Up @@ -103,22 +114,23 @@ async def _run() -> None:
main_task = asyncio.create_task(relay_obj.run())
shutdown_triggered = False

def _on_sigint() -> None:
def _on_shutdown_signal() -> None:
nonlocal shutdown_triggered
shutdown_triggered = True
main_task.cancel()

import signal

loop.add_signal_handler(signal.SIGINT, _on_sigint)
shutdown_signals = _shutdown_signals()
for sig in shutdown_signals:
loop.add_signal_handler(sig, _on_shutdown_signal)
try:
await main_task
except asyncio.CancelledError:
pass
finally:
if shutdown_triggered:
typer.echo("\nStopping relay...")
loop.remove_signal_handler(signal.SIGINT)
for sig in shutdown_signals:
loop.remove_signal_handler(sig)
await relay_obj.teardown()
if shutdown_triggered:
typer.echo("Disconnected.")
Expand Down
244 changes: 207 additions & 37 deletions src/trio_core/http_ingest_relay.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@
from __future__ import annotations

import asyncio
import contextlib
import hashlib
import json
import logging
import platform
import shutil
import sys
import tempfile
import time
import uuid
from dataclasses import dataclass
Expand All @@ -23,6 +25,65 @@

_FFMPEG_READ_SIZE = 65536

# Hard per-POST upload deadline. Without it, a half-closed connection
# (server stopped reading body but never sent FIN) pins the worker
# indefinitely — httpx's per-byte read/write timeouts don't fire as long
# as kernel-level keepalives report the socket as live. Observed in prod
# 2026-05-18 as "trio-relay at 0% CPU indefinitely after HTTP error".
_DEFAULT_UPLOAD_TIMEOUT_SECONDS = 30.0

# Response classifications used by _SegmentUploader.
_UPLOAD_OK = "ok"
_UPLOAD_TRANSIENT = "transient"
_UPLOAD_FATAL = "fatal"

# Prefix for the per-run segment temp directory. Tracked at module
# level so the startup sweep and the runtime allocator agree.
_SEGMENT_TMPDIR_PREFIX = "trio-relay-segments-"

# An active relay writes/unlinks segment files inside its tmpdir
# every `segment_duration` seconds, so its mtime is always recent.
# A tmpdir untouched for an hour can only be an orphan from a prior
# SIGKILL'd run — safe to delete.
_STALE_SEGMENT_TMPDIR_AGE_S = 3600.0


def _clean_stale_segment_tmpdirs(
*,
age_threshold_seconds: float = _STALE_SEGMENT_TMPDIR_AGE_S,
parent_dir: Path | None = None,
) -> int:
"""Remove orphaned segment tmpdirs left behind by SIGKILL'd runs.

Returns the number of directories removed. Best-effort — failures
to stat or rmtree are logged at DEBUG and counted as skipped.
"""
import shutil as _shutil

parent = parent_dir if parent_dir is not None else Path(tempfile.gettempdir())
now = time.time()
removed = 0
try:
candidates = list(parent.glob(f"{_SEGMENT_TMPDIR_PREFIX}*"))
except OSError:
return 0
for path in candidates:
if not path.is_dir():
continue
try:
age = now - path.stat().st_mtime
except OSError:
continue
if age < age_threshold_seconds:
continue
try:
_shutil.rmtree(path, ignore_errors=True)
removed += 1
logger.info("Cleaned stale segment tmpdir %s (age=%.0fs)", path, age)
except OSError:
logger.debug("Failed to clean %s", path, exc_info=True)
return removed


# ---------------------------------------------------------------------------
# Exceptions
Expand Down Expand Up @@ -58,28 +119,38 @@ class IngestUploadError(RelayError):
_RESET = "\033[0m"


async def _read_until_timeout(
async def _read_segment_to_file(
stdout: asyncio.StreamReader,
path: Path,
duration_seconds: float,
chunk_size: int = 65536,
) -> bytes:
"""Read from *stdout* until *duration_seconds* elapses or EOF."""
buf = bytearray()
chunk_size: int = _FFMPEG_READ_SIZE,
) -> tuple[int, bool]:
"""Drain one timed segment from stdout into *path*.

Returns ``(bytes_written, eof)``. The caller owns deleting *path*.
"""
bytes_written = 0
deadline = time.monotonic() + duration_seconds
eof = False

while True:
remaining = deadline - time.monotonic()
if remaining <= 0:
break
try:
data = await asyncio.wait_for(stdout.read(chunk_size), timeout=remaining)
except asyncio.TimeoutError:
break
if not data:
break
buf.extend(data)
with path.open("wb") as file:
while True:
remaining = deadline - time.monotonic()
if remaining <= 0:
break
try:
data = await asyncio.wait_for(stdout.read(chunk_size), timeout=remaining)
except asyncio.TimeoutError:
break
if not data:
eof = True
break
await asyncio.to_thread(file.write, data)
bytes_written += len(data)

return bytes(buf)
if bytes_written == 0:
path.unlink(missing_ok=True)
return bytes_written, eof


def _normalize_source_fingerprint(source: str) -> str:
Expand Down Expand Up @@ -416,51 +487,139 @@ def __init__(
segment_duration: float,
*,
json_mode: bool = False,
upload_timeout_seconds: float = _DEFAULT_UPLOAD_TIMEOUT_SECONDS,
) -> None:
self._client = client
self._ingest_url = ingest_url
self._headers = headers
self._segment_duration = segment_duration
self._json_mode = json_mode
self._upload_timeout_seconds = upload_timeout_seconds
self.segments_ok = 0
self.segments_fail = 0
self._seg_num = 0

async def upload_all(self, stdout: asyncio.StreamReader) -> None:
with tempfile.TemporaryDirectory(prefix=_SEGMENT_TMPDIR_PREFIX) as tmpdir:
segment_dir = Path(tmpdir)
queue: asyncio.Queue[_SegmentFile | None] = asyncio.Queue()
capture_task = asyncio.create_task(
self._capture_segments(stdout, queue, segment_dir),
name="trio-relay-capture-segments",
)
upload_finished = False
try:
await self._upload_queued(queue)
upload_finished = True
await capture_task
except (Exception, asyncio.CancelledError):
if not upload_finished:
if not capture_task.done():
capture_task.cancel()
with contextlib.suppress(asyncio.CancelledError, Exception):
await capture_task
raise
finally:
self._discard_queued_segments(queue)

async def _capture_segments(
self,
stdout: asyncio.StreamReader,
queue: asyncio.Queue[_SegmentFile | None],
segment_dir: Path,
) -> None:
try:
while True:
seg_num = self._seg_num + 1
self._seg_num = seg_num
path = segment_dir / f"segment-{seg_num:08d}.ts"
size, eof = await _read_segment_to_file(
stdout,
path,
self._segment_duration,
)
if size:
await queue.put(_SegmentFile(seg_num, path, size))
if eof or not size:
break
finally:
await queue.put(None)

async def _upload_queued(self, queue: asyncio.Queue[_SegmentFile | None]) -> bool:
# Transient upload failures (5xx, timeout, transport error) are
# logged and skipped — the segment file is unlinked and we move on
# to the next one. Only FATAL outcomes (401/403 — bad/revoked key)
# abort the relay, since those won't self-heal without operator
# intervention. This replaces the previous behavior where any
# non-204 response killed the whole ffmpeg session and forced the
# wrapper script to restart, costing 30-60s of dead air per blip.
while True:
chunk = await _read_until_timeout(stdout, self._segment_duration)
if not chunk:
break
self._seg_num += 1
if not await self._upload_one(self._seg_num, chunk):
break
segment = await queue.get()
try:
if segment is None:
return True
outcome = await self._upload_one(segment)
if outcome == _UPLOAD_FATAL:
raise IngestUploadError(
f"Fatal upload error for segment #{segment.number} "
f"(auth rejected — check API key)"
)
finally:
queue.task_done()
if segment is not None:
segment.path.unlink(missing_ok=True)

def _discard_queued_segments(self, queue: asyncio.Queue[_SegmentFile | None]) -> None:
while True:
try:
segment = queue.get_nowait()
except asyncio.QueueEmpty:
return
try:
if segment is not None:
segment.path.unlink(missing_ok=True)
finally:
queue.task_done()

async def _upload_one(self, seg_num: int, chunk: bytes) -> bool:
async def _upload_one(self, segment: _SegmentFile) -> str:
start = time.monotonic()
content = await asyncio.to_thread(segment.path.read_bytes)
try:
resp = await self._client.post(self._ingest_url, content=chunk, headers=self._headers)
resp = await asyncio.wait_for(
self._client.post(self._ingest_url, content=content, headers=self._headers),
timeout=self._upload_timeout_seconds,
)
except asyncio.TimeoutError:
self.segments_fail += 1
self._log_error(
f"Upload deadline ({self._upload_timeout_seconds:.0f}s) exceeded "
f"for seg #{segment.number}"
)
return _UPLOAD_TRANSIENT
except httpx.TransportError as exc:
self.segments_fail += 1
self._log_error(f"Transport error for seg #{seg_num}: {exc}")
return False
self._log_error(f"Transport error for seg #{segment.number}: {exc}")
return _UPLOAD_TRANSIENT

elapsed = time.monotonic() - start
if resp.status_code == 204:
self.segments_ok += 1
self._log_success(seg_num, chunk, elapsed)
return True
self._log_success(segment, elapsed)
return _UPLOAD_OK

self.segments_fail += 1
self._log_error(f"HTTP {resp.status_code} for seg #{seg_num}")
return False
self._log_error(f"HTTP {resp.status_code} for seg #{segment.number}")
if resp.status_code in (401, 403):
return _UPLOAD_FATAL
return _UPLOAD_TRANSIENT

def _log_success(self, seg_num: int, chunk: bytes, elapsed: float) -> None:
def _log_success(self, segment: _SegmentFile, elapsed: float) -> None:
if self._json_mode:
obj = {
"type": "segment",
"ts": _iso_now(),
"seg_num": seg_num,
"bytes": len(chunk),
"seg_num": segment.number,
"bytes": segment.size,
"elapsed_s": round(elapsed, 2),
"ok": self.segments_ok,
"fail": self.segments_fail,
Expand All @@ -474,7 +633,14 @@ def _log_error(self, message: str) -> None:
sys.stderr.write(json.dumps(obj, separators=(",", ":")) + "\n")
else:
sys.stderr.write(f"{_CR}{_CLEAR}{message}\n")
sys.stderr.flush()
sys.stderr.flush()


@dataclass(frozen=True)
class _SegmentFile:
number: int
path: Path
size: int


# ---------------------------------------------------------------------------
Expand Down Expand Up @@ -504,6 +670,10 @@ def resolved_camera_id(self) -> str:

async def run(self) -> None:
"""Register camera, start ffmpeg, and relay segments until EOF or error."""
# Recover disk from previous runs that died without unwinding
# the TemporaryDirectory context (SIGKILL, host reboot, OOM).
await asyncio.to_thread(_clean_stale_segment_tmpdirs)

async with httpx.AsyncClient(timeout=None, follow_redirects=True) as reg_client:
camera_id = await self._register_camera(reg_client)

Expand All @@ -523,7 +693,7 @@ async def run(self) -> None:

try:
async with httpx.AsyncClient(
timeout=httpx.Timeout(connect=10.0, write=30.0, read=30.0, pool=10.0),
timeout=httpx.Timeout(connect=10.0, write=120.0, read=120.0, pool=10.0),
follow_redirects=True,
limits=httpx.Limits(max_keepalive_connections=0),
) as client:
Expand Down Expand Up @@ -572,7 +742,7 @@ def _camera_payload(self, camera_id: str) -> dict[str, object]:
return {
"id": camera_id,
"name": _camera_name_from_source(self.source),
"source_url": self.source,
"source_url": "",
"metadata": {
"managed_by": "trio-edge",
"ingest_transport": "http_mpegts",
Expand Down
Loading
Loading