From 9e57da0a2cd43be8ccd5b906716eb7b07179c065 Mon Sep 17 00:00:00 2001 From: SEPURI-SAI-KRISHNA Date: Sat, 4 Jul 2026 14:14:22 +0530 Subject: [PATCH 1/2] fix(telemetry): dispatch finding events off the event loop (#672) --- strix/telemetry/_common.py | 79 ++++++++++++++++++++++++++++- strix/telemetry/posthog.py | 42 ++++++++------- strix/telemetry/scarf.py | 39 +++++++------- tests/test_telemetry.py | 101 +++++++++++++++++++++++++++++++++++++ 4 files changed, 225 insertions(+), 36 deletions(-) create mode 100644 tests/test_telemetry.py diff --git a/strix/telemetry/_common.py b/strix/telemetry/_common.py index ff53ceefd..0054997e3 100644 --- a/strix/telemetry/_common.py +++ b/strix/telemetry/_common.py @@ -2,13 +2,20 @@ import logging import platform +import queue import sys +import threading +import time from importlib.metadata import PackageNotFoundError, version from pathlib import Path -from typing import Any +from typing import TYPE_CHECKING, Any from uuid import uuid4 +if TYPE_CHECKING: + from collections.abc import Callable + + logger = logging.getLogger(__name__) SESSION_ID: str = uuid4().hex[:16] @@ -48,3 +55,73 @@ def base_props() -> dict[str, Any]: "python": f"{sys.version_info.major}.{sys.version_info.minor}", "strix_version": get_version(), } + + +# --------------------------------------------------------------------------- +# Background dispatch +# --------------------------------------------------------------------------- +# +# Telemetry is best-effort and fire-and-forget. The actual delivery does a +# blocking ``urllib.request.urlopen(..., timeout=10)``; running that inline +# stalls whatever thread emits the event. ``finding()`` is emitted from an +# async tool, so an inline send blocks the asyncio event loop — and therefore +# every concurrent agent — for up to the full network timeout per finding. +# +# Instead, delivery runs on a single dedicated daemon worker fed by a bounded +# queue. Enqueue is non-blocking and drops the event when the queue is full, +# so a slow or hung telemetry endpoint can neither block nor back-pressure the +# caller. The worker is a daemon thread, so it never delays interpreter exit. + +_TELEMETRY_QUEUE_MAXSIZE = 256 + +_telemetry_queue: queue.Queue[Callable[[], None]] = queue.Queue(maxsize=_TELEMETRY_QUEUE_MAXSIZE) +_worker_thread: threading.Thread | None = None +_worker_lock = threading.Lock() + + +def _worker_loop() -> None: + while True: + task = _telemetry_queue.get() + try: + task() + except Exception: # noqa: BLE001 + logger.debug("telemetry task raised", exc_info=True) + finally: + _telemetry_queue.task_done() + + +def _ensure_worker() -> None: + global _worker_thread # noqa: PLW0603 + if _worker_thread is not None and _worker_thread.is_alive(): + return + with _worker_lock: + if _worker_thread is None or not _worker_thread.is_alive(): + _worker_thread = threading.Thread( + target=_worker_loop, name="strix-telemetry", daemon=True + ) + _worker_thread.start() + + +def dispatch(task: Callable[[], None]) -> None: + """Queue a best-effort telemetry task on the background worker. + + Non-blocking: returns immediately and never raises. If the queue is full + (endpoint slow/unreachable) the task is dropped rather than blocking the + caller — telemetry must never stall the event loop or delay a scan. + """ + _ensure_worker() + try: + _telemetry_queue.put_nowait(task) + except queue.Full: + logger.debug("telemetry queue full; dropping event") + + +def flush(timeout: float = 5.0) -> None: + """Best-effort wait for queued telemetry to drain (bounded by ``timeout``). + + Used by tests and by clean-shutdown paths that want emitted events to have + a chance to deliver. Never blocks longer than ``timeout`` seconds. + """ + deadline = time.monotonic() + timeout + while _telemetry_queue.unfinished_tasks and time.monotonic() < deadline: + time.sleep(0.01) diff --git a/strix/telemetry/posthog.py b/strix/telemetry/posthog.py index d1a9b4607..1a5ee2073 100644 --- a/strix/telemetry/posthog.py +++ b/strix/telemetry/posthog.py @@ -8,6 +8,7 @@ from strix.telemetry._common import ( SESSION_ID, base_props, + dispatch, is_first_run, ) @@ -30,24 +31,29 @@ def _send(event: str, properties: dict[str, Any]) -> None: if not _is_enabled(): logger.debug("posthog disabled; skipping event %s", event) return - try: - payload = { - "api_key": _POSTHOG_PUBLIC_API_KEY, - "event": event, - "distinct_id": SESSION_ID, - "properties": properties, - } - req = urllib.request.Request( # noqa: S310 - f"{_POSTHOG_HOST}/capture/", - data=json.dumps(payload).encode(), - headers={"Content-Type": "application/json"}, - ) - with urllib.request.urlopen(req, timeout=10): # noqa: S310 # nosec B310 - pass - except Exception: # noqa: BLE001 - logger.debug("posthog send failed for event %s", event, exc_info=True) - else: - logger.debug("posthog event sent: %s", event) + payload = { + "api_key": _POSTHOG_PUBLIC_API_KEY, + "event": event, + "distinct_id": SESSION_ID, + "properties": properties, + } + data = json.dumps(payload).encode() + + def _deliver() -> None: + try: + req = urllib.request.Request( # noqa: S310 + f"{_POSTHOG_HOST}/capture/", + data=data, + headers={"Content-Type": "application/json"}, + ) + with urllib.request.urlopen(req, timeout=10): # noqa: S310 # nosec B310 + pass + except Exception: # noqa: BLE001 + logger.debug("posthog send failed for event %s", event, exc_info=True) + else: + logger.debug("posthog event sent: %s", event) + + dispatch(_deliver) def start( diff --git a/strix/telemetry/scarf.py b/strix/telemetry/scarf.py index 037494bb3..b5b8ff31c 100644 --- a/strix/telemetry/scarf.py +++ b/strix/telemetry/scarf.py @@ -10,6 +10,7 @@ from strix.telemetry._common import ( SESSION_ID, base_props, + dispatch, get_version, is_first_run, ) @@ -32,23 +33,27 @@ def _send(event: str, properties: dict[str, Any]) -> None: if not _is_enabled(): logger.debug("scarf disabled; skipping event %s", event) return - try: - props = dict(properties) - version = str(props.pop("strix_version", get_version()) or "unknown") - path = f"/{urllib.parse.quote(event, safe='')}/{urllib.parse.quote(version, safe='')}" - query = urllib.parse.urlencode( - {k: ("" if v is None else str(v)) for k, v in props.items()}, - ) - url = f"{_SCARF_ENDPOINT}{path}" - if query: - url = f"{url}?{query}" - req = urllib.request.Request(url, method="POST") # noqa: S310 - with urllib.request.urlopen(req, timeout=10): # noqa: S310 # nosec B310 - pass - except Exception: # noqa: BLE001 - logger.debug("scarf send failed for event %s", event, exc_info=True) - else: - logger.debug("scarf event sent: %s", event) + props = dict(properties) + version = str(props.pop("strix_version", get_version()) or "unknown") + path = f"/{urllib.parse.quote(event, safe='')}/{urllib.parse.quote(version, safe='')}" + query = urllib.parse.urlencode( + {k: ("" if v is None else str(v)) for k, v in props.items()}, + ) + url = f"{_SCARF_ENDPOINT}{path}" + if query: + url = f"{url}?{query}" + + def _deliver() -> None: + try: + req = urllib.request.Request(url, method="POST") # noqa: S310 + with urllib.request.urlopen(req, timeout=10): # noqa: S310 # nosec B310 + pass + except Exception: # noqa: BLE001 + logger.debug("scarf send failed for event %s", event, exc_info=True) + else: + logger.debug("scarf event sent: %s", event) + + dispatch(_deliver) def start( diff --git a/tests/test_telemetry.py b/tests/test_telemetry.py new file mode 100644 index 000000000..a9b40947d --- /dev/null +++ b/tests/test_telemetry.py @@ -0,0 +1,101 @@ +"""Telemetry must never block the caller (regression test for #672). + +``ReportState.add_vulnerability_report`` runs synchronously inside an async +tool, so a blocking ``urlopen`` in ``scarf.finding`` / ``posthog.finding`` +stalled the asyncio event loop — and every concurrent agent — for up to the +full 10s network timeout per finding. Delivery now runs on a background +worker; these tests pin that the emit call returns immediately while the +send still happens off-thread. +""" + +from __future__ import annotations + +import queue +import time +from typing import Any, Self + +import pytest + +from strix.telemetry import _common, posthog, scarf + + +class _FakeResponse: + def __enter__(self) -> Self: + return self + + def __exit__(self, *_: object) -> bool: + return False + + +@pytest.fixture(autouse=True) +def _drain_queue() -> Any: + # Keep tests independent: let the background worker finish before the next. + yield + _common.flush(timeout=5.0) + + +def _install_slow_urlopen(monkeypatch: pytest.MonkeyPatch, module: Any, delay: float) -> list[str]: + """Patch ``module``'s urlopen to record calls after sleeping ``delay``.""" + calls: list[str] = [] + + def _slow_urlopen(req: Any, timeout: float | None = None) -> _FakeResponse: # noqa: ARG001 + time.sleep(delay) + calls.append(req.full_url) + return _FakeResponse() + + monkeypatch.setattr(module, "_is_enabled", lambda: True) + monkeypatch.setattr(module.urllib.request, "urlopen", _slow_urlopen) + return calls + + +@pytest.mark.parametrize("module", [scarf, posthog]) +def test_finding_returns_without_waiting_for_network( + monkeypatch: pytest.MonkeyPatch, module: Any +) -> None: + calls = _install_slow_urlopen(monkeypatch, module, delay=1.0) + + start = time.perf_counter() + module.finding("high") + elapsed = time.perf_counter() - start + + # The 1s "network" call must not be on the caller's critical path. + assert elapsed < 0.2, f"{module.__name__}.finding blocked the caller for {elapsed:.3f}s" + + _common.flush(timeout=5.0) + assert calls, f"{module.__name__}.finding never delivered the event off-thread" + + +@pytest.mark.parametrize("module", [scarf, posthog]) +def test_disabled_telemetry_sends_nothing( + monkeypatch: pytest.MonkeyPatch, module: Any +) -> None: + calls: list[str] = [] + + def _urlopen(req: Any, timeout: float | None = None) -> _FakeResponse: # noqa: ARG001 + calls.append(req.full_url) + return _FakeResponse() + + monkeypatch.setattr(module, "_is_enabled", lambda: False) + monkeypatch.setattr(module.urllib.request, "urlopen", _urlopen) + + module.finding("high") + _common.flush(timeout=1.0) + assert not calls + + +def test_dispatch_drops_when_queue_full(monkeypatch: pytest.MonkeyPatch) -> None: + # A hung endpoint must not let the queue grow without bound or block enqueue. + full_queue: queue.Queue[Any] = queue.Queue(maxsize=1) + full_queue.put(lambda: None) # occupy the only slot; nothing drains it here + monkeypatch.setattr(_common, "_telemetry_queue", full_queue) + monkeypatch.setattr(_common, "_ensure_worker", lambda: None) # don't spawn a real worker + + start = time.perf_counter() + _common.dispatch(lambda: None) # must drop, not raise or block + assert time.perf_counter() - start < 0.1 + + # Slot still holds only the original task; the overflow task was dropped. + assert full_queue.qsize() == 1 + full_queue.get_nowait() + with pytest.raises(queue.Empty): + full_queue.get_nowait() From f794b17789c76d5282250a67ffe9e3014e9bc6dc Mon Sep 17 00:00:00 2001 From: SEPURI-SAI-KRISHNA Date: Sat, 4 Jul 2026 15:06:12 +0530 Subject: [PATCH 2/2] fix(telemetry): flush terminal events at exit; keep serialization inside the delivery guard --- strix/telemetry/_common.py | 20 +++++++++++++++++++- strix/telemetry/posthog.py | 19 ++++++++++--------- strix/telemetry/scarf.py | 22 ++++++++++++---------- tests/test_telemetry.py | 35 +++++++++++++++++++++++++++++++++++ 4 files changed, 76 insertions(+), 20 deletions(-) diff --git a/strix/telemetry/_common.py b/strix/telemetry/_common.py index 0054997e3..708b60adb 100644 --- a/strix/telemetry/_common.py +++ b/strix/telemetry/_common.py @@ -1,5 +1,6 @@ from __future__ import annotations +import atexit import logging import platform import queue @@ -70,10 +71,18 @@ def base_props() -> dict[str, Any]: # Instead, delivery runs on a single dedicated daemon worker fed by a bounded # queue. Enqueue is non-blocking and drops the event when the queue is full, # so a slow or hung telemetry endpoint can neither block nor back-pressure the -# caller. The worker is a daemon thread, so it never delays interpreter exit. +# caller. The worker is a daemon thread, and an ``atexit`` hook flushes the +# queue with a bounded timeout at interpreter shutdown so terminal ``end`` / +# ``error`` events still get a chance to deliver — without letting a hung +# endpoint stall process exit. _TELEMETRY_QUEUE_MAXSIZE = 256 +# Bounded time we allow queued telemetry (typically the terminal end/error +# events) to drain at interpreter shutdown. Well under the old synchronous +# worst case, so shutdown can never hang on an unresponsive endpoint. +_SHUTDOWN_FLUSH_TIMEOUT = 3.0 + _telemetry_queue: queue.Queue[Callable[[], None]] = queue.Queue(maxsize=_TELEMETRY_QUEUE_MAXSIZE) _worker_thread: threading.Thread | None = None _worker_lock = threading.Lock() @@ -125,3 +134,12 @@ def flush(timeout: float = 5.0) -> None: deadline = time.monotonic() + timeout while _telemetry_queue.unfinished_tasks and time.monotonic() < deadline: time.sleep(0.01) + + +def _flush_on_exit() -> None: + # Give queued terminal events (end/error) a bounded chance to deliver + # before the daemon worker is torn down at interpreter shutdown. + flush(timeout=_SHUTDOWN_FLUSH_TIMEOUT) + + +atexit.register(_flush_on_exit) diff --git a/strix/telemetry/posthog.py b/strix/telemetry/posthog.py index 1a5ee2073..a321d247c 100644 --- a/strix/telemetry/posthog.py +++ b/strix/telemetry/posthog.py @@ -31,19 +31,20 @@ def _send(event: str, properties: dict[str, Any]) -> None: if not _is_enabled(): logger.debug("posthog disabled; skipping event %s", event) return - payload = { - "api_key": _POSTHOG_PUBLIC_API_KEY, - "event": event, - "distinct_id": SESSION_ID, - "properties": properties, - } - data = json.dumps(payload).encode() - def _deliver() -> None: + # Serialization stays inside the guard: a non-JSON-safe property must + # be swallowed here, never raised back through the public telemetry + # call on the caller thread. try: + payload = { + "api_key": _POSTHOG_PUBLIC_API_KEY, + "event": event, + "distinct_id": SESSION_ID, + "properties": properties, + } req = urllib.request.Request( # noqa: S310 f"{_POSTHOG_HOST}/capture/", - data=data, + data=json.dumps(payload).encode(), headers={"Content-Type": "application/json"}, ) with urllib.request.urlopen(req, timeout=10): # noqa: S310 # nosec B310 diff --git a/strix/telemetry/scarf.py b/strix/telemetry/scarf.py index b5b8ff31c..c7392adff 100644 --- a/strix/telemetry/scarf.py +++ b/strix/telemetry/scarf.py @@ -33,18 +33,20 @@ def _send(event: str, properties: dict[str, Any]) -> None: if not _is_enabled(): logger.debug("scarf disabled; skipping event %s", event) return - props = dict(properties) - version = str(props.pop("strix_version", get_version()) or "unknown") - path = f"/{urllib.parse.quote(event, safe='')}/{urllib.parse.quote(version, safe='')}" - query = urllib.parse.urlencode( - {k: ("" if v is None else str(v)) for k, v in props.items()}, - ) - url = f"{_SCARF_ENDPOINT}{path}" - if query: - url = f"{url}?{query}" - def _deliver() -> None: + # URL building stays inside the guard: a value whose str()/URL + # encoding raises must be swallowed here, never raised back through + # the public telemetry call on the caller thread. try: + props = dict(properties) + version = str(props.pop("strix_version", get_version()) or "unknown") + path = f"/{urllib.parse.quote(event, safe='')}/{urllib.parse.quote(version, safe='')}" + query = urllib.parse.urlencode( + {k: ("" if v is None else str(v)) for k, v in props.items()}, + ) + url = f"{_SCARF_ENDPOINT}{path}" + if query: + url = f"{url}?{query}" req = urllib.request.Request(url, method="POST") # noqa: S310 with urllib.request.urlopen(req, timeout=10): # noqa: S310 # nosec B310 pass diff --git a/tests/test_telemetry.py b/tests/test_telemetry.py index a9b40947d..256244e05 100644 --- a/tests/test_telemetry.py +++ b/tests/test_telemetry.py @@ -83,6 +83,41 @@ def _urlopen(req: Any, timeout: float | None = None) -> _FakeResponse: # noqa: assert not calls +@pytest.mark.parametrize("module", [scarf, posthog]) +def test_terminal_events_deliver_after_shutdown_flush( + monkeypatch: pytest.MonkeyPatch, module: Any +) -> None: + # end()/error() were synchronous before; they must still deliver once the + # queue is drained (the atexit hook uses this same flush at shutdown). + calls = _install_slow_urlopen(monkeypatch, module, delay=0.0) + + module.error("unhandled_exception") + _common.flush(timeout=5.0) + assert calls, f"{module.__name__}.error was lost instead of delivered on flush" + + +@pytest.mark.parametrize("module", [scarf, posthog]) +def test_unserializable_property_does_not_raise_to_caller( + monkeypatch: pytest.MonkeyPatch, module: Any +) -> None: + # A property whose str()/JSON encoding blows up must be swallowed by the + # guarded delivery closure, never propagate through the public call. + class _Explosive: + def __str__(self) -> str: + raise RuntimeError("boom") + + monkeypatch.setattr(module, "_is_enabled", lambda: True) + monkeypatch.setattr( + module.urllib.request, + "urlopen", + lambda *_a, **_k: (_ for _ in ()).throw(AssertionError("should not reach network")), + ) + + # Must not raise even though the property can't be serialized/encoded. + module._send("scan_ended", {**_common.base_props(), "bad": _Explosive()}) + _common.flush(timeout=2.0) + + def test_dispatch_drops_when_queue_full(monkeypatch: pytest.MonkeyPatch) -> None: # A hung endpoint must not let the queue grow without bound or block enqueue. full_queue: queue.Queue[Any] = queue.Queue(maxsize=1)