From dbecbfc46fd7ac65d256e7ee14f51934efe23782 Mon Sep 17 00:00:00 2001 From: Mykhailo Shchuka Date: Wed, 13 May 2026 19:07:07 +0300 Subject: [PATCH 01/10] Add privacy-safe PostHog telemetry --- .github/workflows/build-tui.yml | 17 + .github/workflows/publish-npm-on-release.yml | 17 + .gitignore | 3 +- docs/telemetry.md | 129 +++++ onboard.py | 16 + package.json | 4 + pyproject.toml | 13 +- requirements.txt | 1 + scripts/create_posthog_dashboard.py | 196 +++++++ scripts/smoke_telemetry.py | 42 ++ scripts/write_telemetry_config.py | 50 ++ swarm.py | 19 +- telemetry.py | 457 +++++++++++++++ telemetry_hooks.py | 557 +++++++++++++++++++ tests/conftest.py | 8 + tests/test_telemetry.py | 358 ++++++++++++ tests/test_telemetry_scripts.py | 50 ++ 17 files changed, 1934 insertions(+), 3 deletions(-) create mode 100644 docs/telemetry.md create mode 100644 scripts/create_posthog_dashboard.py create mode 100644 scripts/smoke_telemetry.py create mode 100644 scripts/write_telemetry_config.py create mode 100644 telemetry.py create mode 100644 telemetry_hooks.py create mode 100644 tests/conftest.py create mode 100644 tests/test_telemetry.py create mode 100644 tests/test_telemetry_scripts.py diff --git a/.github/workflows/build-tui.yml b/.github/workflows/build-tui.yml index 2d99b16b..7a612fb2 100644 --- a/.github/workflows/build-tui.yml +++ b/.github/workflows/build-tui.yml @@ -288,5 +288,22 @@ jobs: - name: Install dependencies run: npm ci + - name: Validate telemetry secrets + env: + POSTHOG_API_KEY: ${{ secrets.POSTHOG_API_KEY }} + shell: bash + run: | + set -euo pipefail + if [[ -z "${POSTHOG_API_KEY}" ]]; then + echo "POSTHOG_API_KEY GitHub secret is required before publishing the npm artifact." >&2 + exit 1 + fi + + - name: Generate npm telemetry config + env: + POSTHOG_API_KEY: ${{ secrets.POSTHOG_API_KEY }} + POSTHOG_HOST: ${{ secrets.POSTHOG_HOST }} + run: python3 scripts/write_telemetry_config.py + - name: Publish npm package run: npm publish --access public --tag "$NPM_TAG" diff --git a/.github/workflows/publish-npm-on-release.yml b/.github/workflows/publish-npm-on-release.yml index 4fd4914b..f04b792b 100644 --- a/.github/workflows/publish-npm-on-release.yml +++ b/.github/workflows/publish-npm-on-release.yml @@ -92,5 +92,22 @@ jobs: - name: Install dependencies run: npm ci + - name: Validate telemetry secrets + env: + POSTHOG_API_KEY: ${{ secrets.POSTHOG_API_KEY }} + shell: bash + run: | + set -euo pipefail + if [[ -z "${POSTHOG_API_KEY}" ]]; then + echo "POSTHOG_API_KEY GitHub secret is required before publishing the npm artifact." >&2 + exit 1 + fi + + - name: Generate npm telemetry config + env: + POSTHOG_API_KEY: ${{ secrets.POSTHOG_API_KEY }} + POSTHOG_HOST: ${{ secrets.POSTHOG_HOST }} + run: python3 scripts/write_telemetry_config.py + - name: Publish npm package run: npm publish --access public --tag "$NPM_TAG" diff --git a/.gitignore b/.gitignore index a01337dc..bd305997 100644 --- a/.gitignore +++ b/.gitignore @@ -2,6 +2,7 @@ node_modules/ mnt/ .bun-cache/ .playwright-browsers/ +openswarm_telemetry_config.py # TUI binaries — downloaded automatically on first run from GitHub Releases agency-windows-x64.exe @@ -183,4 +184,4 @@ cython_debug/ .agency_swarm/ third_party/ -.claude/ \ No newline at end of file +.claude/ diff --git a/docs/telemetry.md b/docs/telemetry.md new file mode 100644 index 00000000..3ba3bec7 --- /dev/null +++ b/docs/telemetry.md @@ -0,0 +1,129 @@ +# OpenSwarm Telemetry + +OpenSwarm has backend-only PostHog telemetry for product analytics. It is enabled only when a PostHog capture key is available. + +## Distribution Behavior + +- npm releases can include a generated `openswarm_telemetry_config.py` file created by CI from `POSTHOG_API_KEY` and `POSTHOG_HOST` GitHub secrets. +- GitHub/source installs do not include a capture key and do not send telemetry unless the user explicitly sets `POSTHOG_API_KEY`. +- Environment variables always win over the generated npm config. +- The runtime PostHog capture key shipped in npm is treated as public. Never use a personal API key for runtime telemetry. + +Runtime config: + +- `POSTHOG_API_KEY`: PostHog project capture key. +- `POSTHOG_HOST`: PostHog ingestion host. Defaults to `https://us.i.posthog.com`. + +## Opt Out + +Telemetry sends nothing when any of these are set: + +- `OPENSWARM_TELEMETRY=false` +- `OPENSWARM_TELEMETRY=0` +- `OPENSWARM_TELEMETRY=no` +- `OPENSWARM_TELEMETRY=off` +- `DO_NOT_TRACK=1` + +Telemetry is also disabled automatically under CI and pytest unless `OPENSWARM_TELEMETRY_ALLOW_TESTS=1` is set. + +## Runtime Status + +To inspect local telemetry status without printing any key: + +```bash +python - <<'PY' +import telemetry + +print({ + "enabled": telemetry.is_enabled(), + "config_source": telemetry.config_source() or "missing", + "has_capture_key": telemetry.has_posthog_key(), +}) +PY +``` + +`config_source` is `env`, `npm_config`, or `missing`. This check does not print the capture key. + +## Identity + +OpenSwarm stores an anonymous local install UUID and install secret in `~/.openswarm/telemetry.json` by default. Set `OPENSWARM_CONFIG_DIR` to change that location. + +Only derived identifiers are sent: + +- `user_id`: SHA-256 of the anonymous install UUID. +- `workspace_id`: HMAC of the workspace path using the local install secret. +- `agent_id` and `parent_agent_id`: HMACs derived from agent names. + +Raw workspace paths, raw user identifiers, API keys, emails, and Composio IDs are never sent. + +## Events + +- `app_started` +- `install_created` +- `onboarding_completed` +- `provider_configured` +- `message_sent` +- `swarm_run_started` +- `swarm_run_completed` +- `agent_run_started` +- `agent_run_completed` +- `llm_generation_completed` +- `tool_invoked` +- `handoff` +- `error` +- `telemetry_smoke_test` (manual smoke script only) + +`agent_name` is sent as a raw, first-class property on agent-relevant events so product analytics can group usage by agent. `agent_id` remains HMAC-derived. + +## Allowed Properties + +Only allowlisted scalar properties are sent: + +- IDs: `user_id`, `workspace_id`, `session_id`, `thread_id`, `run_trace_id`, `agent_run_id`, `parent_run_id` +- Agent/model: `agent_name`, `agent_id`, `parent_agent_id`, `caller_agent_name`, `model`, `provider`, `tool_name` +- Message metadata: `message_role`, `message_type` +- Usage/performance: `tokens_input`, `tokens_output`, `cost_usd`, `latency_ms`, `stop_reason`, `status`, `is_streaming` +- Setup/auth: `auth_method`, `provider`, `has_provider_key`, `install_source` +- Errors: `error_type`, `error_category`, `status`, `http_status` + +## Privacy Policy + +Telemetry must not include message contents, prompts, tool arguments, tool results, generated content, exception messages, stack traces, tracebacks, file paths, API keys, emails, Composio IDs, raw workspace paths, raw user IDs, or `telemetry_opted_out`. + +If a user opts out, OpenSwarm emits no telemetry event at all. + +## Dashboard + +Run this with a PostHog personal API key to create the default dashboard: + +```bash +POSTHOG_PERSONAL_API_KEY=phx_... \ +POSTHOG_ENVIRONMENT_ID=12345 \ +python scripts/create_posthog_dashboard.py +``` + +Optional: + +```bash +POSTHOG_APP_HOST=https://us.posthog.com +``` + +The dashboard is named `OpenSwarm Product Analytics` and includes DAU, messages/day, agent runs/day, tool usage, error rate, and agent usage grouped by raw `agent_name`. + +To inspect the exact API payloads without creating anything: + +```bash +python scripts/create_posthog_dashboard.py --dry-run +``` + +## Smoke Test + +To verify runtime telemetry can queue a safe manual event: + +```bash +POSTHOG_API_KEY=phc_... \ +POSTHOG_HOST=https://us.i.posthog.com \ +python scripts/smoke_telemetry.py +``` + +The smoke script sends `telemetry_smoke_test` with only allowlisted metadata. It does not send message contents, prompts, tool arguments/results, exception messages, file paths, or keys. diff --git a/onboard.py b/onboard.py index 3a838b88..2dc7848e 100644 --- a/onboard.py +++ b/onboard.py @@ -6,6 +6,7 @@ """ import getpass +import os import sys from pathlib import Path @@ -290,6 +291,21 @@ def run_onboarding() -> None: # ── write .env ──────────────────────────────────────────────────────────── _write_env(updates) + for key, value in updates.items(): + if value: + os.environ[key] = value + + try: + import telemetry + + if updates.get(provider["env_key"]): + telemetry.capture_provider_configured(provider=provider["name"]) + for addon in selected_addons: + if any(updates.get(key_spec["env"]) for key_spec in addon["keys"]): + telemetry.capture_provider_configured(provider=addon["id"]) + telemetry.capture_onboarding_completed(provider=provider["name"]) + except Exception: + pass # ── summary ─────────────────────────────────────────────────────────────── console.print() diff --git a/package.json b/package.json index 9e7abc8b..7b74be19 100644 --- a/package.json +++ b/package.json @@ -14,6 +14,9 @@ "run_utils.py", "onboard.py", "swarm.py", + "telemetry.py", + "telemetry_hooks.py", + "openswarm_telemetry_config.py", "config.py", "helpers.py", "server.py", @@ -28,6 +31,7 @@ "video_generation_agent/", "virtual_assistant/", "patches/", + "docs/telemetry.md", "pyproject.toml", "package.json", "package-lock.json" diff --git a/pyproject.toml b/pyproject.toml index 26e81b95..17add6b1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -9,6 +9,7 @@ dependencies = [ "agency-swarm[fastapi,jupyter,litellm]>=1.9.8", "questionary>=2.0.0", "python-dotenv", + "posthog>=7,<8", "rich", "fastapi", "uvicorn", @@ -60,7 +61,17 @@ dependencies = [ openswarm = "run_utils:main" [tool.setuptools] -py-modules = ["agency", "swarm", "helpers", "config", "onboard", "server"] +py-modules = [ + "agency", + "swarm", + "helpers", + "config", + "onboard", + "server", + "telemetry", + "telemetry_hooks", + "openswarm_telemetry_config", +] [tool.setuptools.packages.find] where = ["."] diff --git a/requirements.txt b/requirements.txt index 77db5c33..d3d574eb 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,6 @@ agency-swarm[fastapi,jupyter,litellm]>=1.9.7 questionary>=2.0.0 +posthog>=7,<8 fastapi uvicorn composio==0.8.0 diff --git a/scripts/create_posthog_dashboard.py b/scripts/create_posthog_dashboard.py new file mode 100644 index 00000000..6fbcd55f --- /dev/null +++ b/scripts/create_posthog_dashboard.py @@ -0,0 +1,196 @@ +#!/usr/bin/env python3 +from __future__ import annotations + +import argparse +import json +import os +import urllib.error +import urllib.request +from typing import Any + +DEFAULT_POSTHOG_APP_HOST = "https://us.posthog.com" +DASHBOARD_NAME = "OpenSwarm Product Analytics" + + +def build_dashboard_payload() -> dict[str, Any]: + return { + "name": DASHBOARD_NAME, + "description": "Privacy-safe OpenSwarm usage metrics.", + "pinned": True, + } + + +def _trends_query( + *, + event: str, + name: str, + math: str = "total", + breakdown: str | None = None, + display: str = "ActionsLineGraph", +) -> dict[str, Any]: + source: dict[str, Any] = { + "kind": "TrendsQuery", + "dateRange": {"date_from": "-30d"}, + "interval": "day", + "series": [{"kind": "EventsNode", "event": event, "name": name, "math": math}], + } + if breakdown: + source["breakdownFilter"] = {"breakdown_type": "event", "breakdown": breakdown} + return {"kind": "InsightVizNode", "source": source, "display": display} + + +def build_insight_payloads(dashboard_id: int | str) -> list[dict[str, Any]]: + dashboards = [dashboard_id] + return [ + { + "name": "DAU", + "description": "Daily active anonymous installs, based on app_started.", + "query": _trends_query(event="app_started", name="Active installs", math="dau"), + "dashboards": dashboards, + }, + { + "name": "Messages per day", + "description": "User and assistant messages persisted by OpenSwarm.", + "query": _trends_query(event="message_sent", name="Messages sent"), + "dashboards": dashboards, + }, + { + "name": "Agent runs per day", + "description": "Completed agent run count.", + "query": _trends_query(event="agent_run_completed", name="Agent runs"), + "dashboards": dashboards, + }, + { + "name": "Tool usage by tool", + "description": "Tool invocations grouped by safe tool name.", + "query": _trends_query( + event="tool_invoked", + name="Tool invocations", + breakdown="tool_name", + display="ActionsBarValue", + ), + "dashboards": dashboards, + }, + { + "name": "Agent usage by agent", + "description": "Agent runs grouped by raw agent_name.", + "query": _trends_query( + event="agent_run_completed", + name="Agent runs", + breakdown="agent_name", + display="ActionsBarValue", + ), + "dashboards": dashboards, + }, + { + "name": "Error rate", + "description": "Errors divided by completed swarm runs.", + "query": { + "kind": "InsightVizNode", + "display": "ActionsLineGraph", + "source": { + "kind": "TrendsQuery", + "dateRange": {"date_from": "-30d"}, + "interval": "day", + "series": [ + {"kind": "EventsNode", "event": "error", "name": "Errors", "math": "total"}, + { + "kind": "EventsNode", + "event": "swarm_run_completed", + "name": "Completed runs", + "math": "total", + }, + ], + "formulas": [{"formula": "A / B"}], + }, + }, + "dashboards": dashboards, + }, + ] + + +def build_dry_run_payload(environment_id: str = "POSTHOG_ENVIRONMENT_ID") -> dict[str, Any]: + dashboard_id = "DRY_RUN_DASHBOARD_ID" + return { + "dashboard": { + "method": "POST", + "path": f"/api/environments/{environment_id}/dashboards/", + "payload": build_dashboard_payload(), + }, + "insights": [ + { + "method": "POST", + "path": f"/api/environments/{environment_id}/insights/", + "payload": payload, + } + for payload in build_insight_payloads(dashboard_id) + ], + } + + +def _post_json(host: str, path: str, api_key: str, payload: dict[str, Any]) -> dict[str, Any]: + url = f"{host.rstrip('/')}{path}" + data = json.dumps(payload).encode("utf-8") + request = urllib.request.Request( + url, + data=data, + method="POST", + headers={ + "Authorization": f"Bearer {api_key}", + "Content-Type": "application/json", + }, + ) + try: + with urllib.request.urlopen(request, timeout=30) as response: + return json.loads(response.read().decode("utf-8")) + except urllib.error.HTTPError as error: + body = error.read().decode("utf-8", errors="replace") + raise RuntimeError(f"PostHog API request failed with {error.code}: {body}") from error + + +def create_dashboard(host: str, environment_id: str, personal_api_key: str) -> dict[str, Any]: + dashboard = _post_json( + host, + f"/api/environments/{environment_id}/dashboards/", + personal_api_key, + build_dashboard_payload(), + ) + dashboard_id = dashboard["id"] + insights = [ + _post_json( + host, + f"/api/environments/{environment_id}/insights/", + personal_api_key, + insight_payload, + ) + for insight_payload in build_insight_payloads(dashboard_id) + ] + return {"dashboard": dashboard, "insights": insights} + + +def main() -> int: + parser = argparse.ArgumentParser(description="Create the OpenSwarm PostHog product analytics dashboard.") + parser.add_argument("--dry-run", action="store_true", help="Print dashboard and insight API payloads without creating anything.") + args = parser.parse_args() + + environment_id = os.getenv("POSTHOG_ENVIRONMENT_ID") + if args.dry_run: + print(json.dumps(build_dry_run_payload(environment_id or "POSTHOG_ENVIRONMENT_ID"), indent=2, sort_keys=True)) + return 0 + + personal_api_key = os.getenv("POSTHOG_PERSONAL_API_KEY") + host = os.getenv("POSTHOG_APP_HOST", DEFAULT_POSTHOG_APP_HOST) + if not personal_api_key: + raise SystemExit("POSTHOG_PERSONAL_API_KEY is required") + if not environment_id: + raise SystemExit("POSTHOG_ENVIRONMENT_ID is required") + + result = create_dashboard(host, environment_id, personal_api_key) + dashboard = result["dashboard"] + dashboard_url = f"{host.rstrip('/')}/project/{environment_id}/dashboard/{dashboard['id']}" + print(f"Created {DASHBOARD_NAME}: {dashboard_url}") + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/scripts/smoke_telemetry.py b/scripts/smoke_telemetry.py new file mode 100644 index 00000000..67cd1939 --- /dev/null +++ b/scripts/smoke_telemetry.py @@ -0,0 +1,42 @@ +#!/usr/bin/env python3 +from __future__ import annotations + +import sys + +import telemetry + +SMOKE_EVENT = "telemetry_smoke_test" + + +def build_smoke_properties() -> dict[str, str]: + return { + "install_source": telemetry.config_source() or "source", + "status": "manual", + } + + +def main() -> int: + if not telemetry.is_enabled(): + print( + "Telemetry smoke test not sent: telemetry is disabled, opted out, running in CI/test mode, " + "or no PostHog capture key is configured.", + file=sys.stderr, + ) + print( + "Set POSTHOG_API_KEY, optional POSTHOG_HOST, and for CI smoke checks OPENSWARM_TELEMETRY_ALLOW_TESTS=1.", + file=sys.stderr, + ) + return 1 + + sent = telemetry.capture(SMOKE_EVENT, build_smoke_properties()) + telemetry.shutdown() + if not sent: + print("Telemetry smoke test failed before the event could be queued.", file=sys.stderr) + return 1 + + print(f"Queued {SMOKE_EVENT}; check PostHog ingestion for this event.") + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/scripts/write_telemetry_config.py b/scripts/write_telemetry_config.py new file mode 100644 index 00000000..c879ea83 --- /dev/null +++ b/scripts/write_telemetry_config.py @@ -0,0 +1,50 @@ +#!/usr/bin/env python3 +from __future__ import annotations + +import argparse +import os +from pathlib import Path + +DEFAULT_POSTHOG_HOST = "https://us.i.posthog.com" +HEADER = """# Generated during npm release packaging. +# Do not commit this file. The PostHog capture key is treated as public. +""" + + +def build_config_source(api_key: str, host: str | None = None) -> str: + if not api_key: + raise ValueError("POSTHOG_API_KEY is required") + resolved_host = host or DEFAULT_POSTHOG_HOST + return ( + HEADER + + f"POSTHOG_API_KEY = {api_key!r}\n" + + f"POSTHOG_HOST = {resolved_host!r}\n" + ) + + +def write_config(path: Path, api_key: str, host: str | None = None) -> None: + path.write_text(build_config_source(api_key, host), encoding="utf-8") + + +def main() -> int: + parser = argparse.ArgumentParser(description="Write npm-release-only OpenSwarm telemetry config.") + parser.add_argument( + "--output", + default=str(Path(__file__).resolve().parents[1] / "openswarm_telemetry_config.py"), + help="Generated Python module path.", + ) + args = parser.parse_args() + + api_key = os.getenv("POSTHOG_API_KEY", "") + host = os.getenv("POSTHOG_HOST") or DEFAULT_POSTHOG_HOST + if not api_key: + raise SystemExit("POSTHOG_API_KEY is required to generate npm telemetry config") + + output = Path(args.output) + write_config(output, api_key, host) + print(f"Wrote telemetry config to {output}") + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/swarm.py b/swarm.py index 7ddfde43..af1658eb 100644 --- a/swarm.py +++ b/swarm.py @@ -13,6 +13,8 @@ apply_file_attachment_reference_patch() apply_ipython_composio_context_patch() +_telemetry_app_started = False + _tracing_key = os.getenv("OPENAI_API_KEY") if _tracing_key: set_tracing_export_api_key(_tracing_key) @@ -20,9 +22,23 @@ set_tracing_disabled(True) +def _capture_app_started_once(): + global _telemetry_app_started + if _telemetry_app_started: + return + _telemetry_app_started = True + try: + import telemetry + + telemetry.capture_app_started() + except Exception: + pass + + def create_agency(load_threads_callback=None): from agency_swarm import Agency from agency_swarm.tools import Handoff, SendMessage + from telemetry_hooks import instrument_agency from orchestrator import create_orchestrator from virtual_assistant import create_virtual_assistant @@ -74,7 +90,8 @@ def create_agency(load_threads_callback=None): load_threads_callback=load_threads_callback, ) - return agency + _capture_app_started_once() + return instrument_agency(agency) if __name__ == "__main__": agency = create_agency() diff --git a/telemetry.py b/telemetry.py new file mode 100644 index 00000000..b21951bd --- /dev/null +++ b/telemetry.py @@ -0,0 +1,457 @@ +from __future__ import annotations + +import atexit +import hashlib +import hmac +import importlib.util +import json +import logging +import os +import time +import uuid +from pathlib import Path +from typing import Any, Callable + +logger = logging.getLogger(__name__) + +DEFAULT_POSTHOG_HOST = "https://us.i.posthog.com" +FALSE_VALUES = {"0", "false", "no", "off"} +TRUE_VALUES = {"1", "true", "yes", "on"} +STATE_VERSION = 1 +_MODULE_DIR = Path(__file__).resolve().parent +_STATE_FILE_NAME = "telemetry.json" +_SESSION_ID = uuid.uuid4().hex + +ALLOWED_PROPERTIES = { + "agent_id", + "agent_name", + "agent_run_id", + "auth_method", + "caller_agent_name", + "cost_usd", + "error_category", + "error_type", + "event_version", + "has_provider_key", + "http_status", + "install_source", + "is_streaming", + "latency_ms", + "message_role", + "message_type", + "model", + "parent_agent_id", + "parent_run_id", + "provider", + "run_trace_id", + "session_id", + "status", + "stop_reason", + "thread_id", + "tokens_input", + "tokens_output", + "tool_name", + "user_id", + "workspace_id", +} + +CONTENT_LIKE_KEYS = { + "arguments", + "content", + "exception_message", + "file_path", + "input", + "input_text", + "message", + "output", + "output_text", + "path", + "prompt", + "stack", + "tool_args", + "tool_result", + "traceback", +} + +_client: Any | None = None +_client_config: dict[str, str | None] | None = None +_posthog_factory: Callable[[str, str], Any] | None = None +_state_cache: dict[str, Any] | None = None +_state_created = False + + +def _config_dir() -> Path: + custom = os.getenv("OPENSWARM_CONFIG_DIR") + if custom: + return Path(custom).expanduser() + return Path.home() / ".openswarm" + + +def _state_path() -> Path: + return _config_dir() / _STATE_FILE_NAME + + +def _load_state() -> dict[str, Any]: + global _state_cache, _state_created + if _state_cache is not None: + return _state_cache + + path = _state_path() + if path.exists(): + try: + data = json.loads(path.read_text(encoding="utf-8")) + if isinstance(data, dict) and data.get("install_id") and data.get("install_secret"): + _state_cache = data + return _state_cache + except Exception: + logger.debug("Could not read telemetry state", exc_info=True) + + _state_created = True + _state_cache = { + "version": STATE_VERSION, + "install_id": uuid.uuid4().hex, + "install_secret": uuid.uuid4().hex + uuid.uuid4().hex, + "install_created_captured": False, + } + _save_state() + return _state_cache + + +def _save_state() -> None: + if _state_cache is None: + return + try: + path = _state_path() + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text(json.dumps(_state_cache, indent=2, sort_keys=True), encoding="utf-8") + except Exception: + logger.debug("Could not write telemetry state", exc_info=True) + + +def _sha256(value: str) -> str: + return hashlib.sha256(value.encode("utf-8")).hexdigest() + + +def _install_secret() -> bytes: + state = _load_state() + return str(state["install_secret"]).encode("utf-8") + + +def hashed_identifier(namespace: str, value: Any) -> str: + return hmac.new(_install_secret(), f"{namespace}:{value}".encode("utf-8"), hashlib.sha256).hexdigest() + + +def user_id() -> str: + state = _load_state() + return _sha256(f"openswarm:user:{state['install_id']}") + + +def workspace_id(path: str | Path | None = None) -> str: + candidate = Path(path or os.getcwd()).expanduser() + try: + normalized = str(candidate.resolve()) + except Exception: + normalized = str(candidate.absolute()) + return hashed_identifier("workspace", normalized) + + +def thread_id(value: Any) -> str: + return hashed_identifier("thread", value) + + +def agent_id(agent_name: str | None) -> str | None: + if not agent_name: + return None + return hashed_identifier("agent", agent_name) + + +def _load_generated_config() -> dict[str, str]: + config_path = _MODULE_DIR / "openswarm_telemetry_config.py" + if not config_path.exists(): + return {} + try: + spec = importlib.util.spec_from_file_location("_openswarm_telemetry_config", config_path) + if spec is None or spec.loader is None: + return {} + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) + return { + "POSTHOG_API_KEY": str(getattr(module, "POSTHOG_API_KEY", "") or ""), + "POSTHOG_HOST": str(getattr(module, "POSTHOG_HOST", "") or ""), + } + except Exception: + logger.debug("Could not load generated telemetry config", exc_info=True) + return {} + + +def _running_in_tests_or_ci() -> bool: + if os.getenv("OPENSWARM_TELEMETRY_ALLOW_TESTS", "").strip().lower() in TRUE_VALUES: + return False + if os.getenv("CI", "").strip().lower() in TRUE_VALUES: + return True + if os.getenv("PYTEST_CURRENT_TEST") or os.getenv("PYTEST_VERSION"): + return True + return False + + +def _is_opted_out() -> bool: + telemetry_env = os.getenv("OPENSWARM_TELEMETRY") + if telemetry_env is not None and telemetry_env.strip().lower() in FALSE_VALUES: + return True + if os.getenv("DO_NOT_TRACK") == "1": + return True + return _running_in_tests_or_ci() + + +def _resolve_client_config() -> dict[str, str | None]: + global _client_config + if _client_config is not None: + return _client_config + + generated = _load_generated_config() + env_key = os.getenv("POSTHOG_API_KEY") + env_host = os.getenv("POSTHOG_HOST") + + api_key = env_key or generated.get("POSTHOG_API_KEY") or None + host = env_host or generated.get("POSTHOG_HOST") or DEFAULT_POSTHOG_HOST + source = "env" if env_key else ("npm_config" if generated.get("POSTHOG_API_KEY") else None) + + _client_config = { + "api_key": api_key, + "host": host, + "source": source, + } + return _client_config + + +def is_enabled() -> bool: + if _is_opted_out(): + return False + return bool(_resolve_client_config().get("api_key")) + + +def config_source() -> str | None: + return _resolve_client_config().get("source") + + +def has_posthog_key() -> bool: + return bool(_resolve_client_config().get("api_key")) + + +def _create_posthog_client(api_key: str, host: str) -> Any: + if _posthog_factory is not None: + return _posthog_factory(api_key, host) + + os.environ.setdefault("POSTHOG_ENABLE_EXCEPTION_AUTOCAPTURE", "false") + os.environ.setdefault("POSTHOG_CAPTURE_EXCEPTION_CODE_VARIABLES", "false") + + from posthog import Posthog # type: ignore + + try: + return Posthog( + api_key, + host=host, + disable_geoip=True, + enable_exception_autocapture=False, + capture_exception_code_variables=False, + ) + except TypeError: + pass + try: + return Posthog(api_key, host=host, disable_geoip=True) + except TypeError: + return Posthog(api_key, host=host) + + +def _get_client() -> Any | None: + global _client + if not is_enabled(): + return None + if _client is not None: + return _client + + config = _resolve_client_config() + api_key = config.get("api_key") + host = config.get("host") or DEFAULT_POSTHOG_HOST + if not api_key: + return None + + try: + _client = _create_posthog_client(str(api_key), str(host)) + atexit.register(shutdown) + except Exception: + logger.debug("Could not initialize PostHog telemetry client", exc_info=True) + _client = None + return _client + + +def provider_from_model(model: Any) -> str | None: + if model is None: + return None + name = str(model) + if "/" not in name: + if name.startswith(("gpt-", "o")): + return "openai" + return None + parts = name.split("/") + if parts[0] == "litellm" and len(parts) > 1: + return parts[1] + return parts[0] + + +def configured_provider() -> str | None: + model = os.getenv("DEFAULT_MODEL", "") + if model: + return provider_from_model(model) or ("openai" if "/" not in model else None) + if os.getenv("OPENAI_API_KEY"): + return "openai" + if os.getenv("ANTHROPIC_API_KEY"): + return "anthropic" + if os.getenv("GOOGLE_API_KEY"): + return "google" + return None + + +def has_provider_key() -> bool: + return bool(os.getenv("OPENAI_API_KEY") or os.getenv("ANTHROPIC_API_KEY") or os.getenv("GOOGLE_API_KEY")) + + +def _safe_scalar(value: Any) -> str | int | float | bool | None: + if value is None or isinstance(value, bool): + return value + if isinstance(value, int) and not isinstance(value, bool): + return value + if isinstance(value, float): + return value + if isinstance(value, str): + return value[:256] + return str(type(value).__name__)[:128] + + +def sanitize_properties(properties: dict[str, Any] | None) -> dict[str, Any]: + safe: dict[str, Any] = { + "event_version": 1, + "user_id": user_id(), + "workspace_id": workspace_id(), + "session_id": _SESSION_ID, + } + if not properties: + return safe + for key, value in properties.items(): + if key not in ALLOWED_PROPERTIES: + continue + if key in CONTENT_LIKE_KEYS: + continue + scalar = _safe_scalar(value) + if scalar is not None: + safe[key] = scalar + return safe + + +def capture(event: str, properties: dict[str, Any] | None = None) -> bool: + client = _get_client() + if client is None: + return False + props = sanitize_properties(properties) + try: + client.capture(event=event, distinct_id=props["user_id"], properties=props) + return True + except TypeError: + try: + client.capture(distinct_id=props["user_id"], event=event, properties=props) + return True + except Exception: + logger.debug("PostHog capture failed for event %s", event, exc_info=True) + return False + except Exception: + logger.debug("PostHog capture failed for event %s", event, exc_info=True) + return False + + +def capture_error(error: BaseException, *, category: str, properties: dict[str, Any] | None = None) -> bool: + props = dict(properties or {}) + props.update( + { + "error_type": error.__class__.__name__, + "error_category": category, + "status": props.get("status") or "error", + } + ) + return capture("error", props) + + +def capture_app_started(*, install_source: str | None = None) -> None: + if not is_enabled(): + return + state = _load_state() + if not state.get("install_created_captured"): + if capture( + "install_created", + { + "install_source": install_source or config_source() or "source", + "has_provider_key": has_provider_key(), + "provider": configured_provider(), + }, + ): + state["install_created_captured"] = True + _save_state() + capture( + "app_started", + { + "install_source": install_source or config_source() or "source", + "has_provider_key": has_provider_key(), + "provider": configured_provider(), + }, + ) + + +def capture_onboarding_completed(*, provider: str | None, auth_method: str = "api_key") -> None: + capture( + "onboarding_completed", + { + "auth_method": auth_method, + "provider": provider, + "has_provider_key": has_provider_key(), + "install_source": config_source() or "source", + }, + ) + + +def capture_provider_configured(*, provider: str | None, auth_method: str = "api_key") -> None: + capture( + "provider_configured", + { + "auth_method": auth_method, + "provider": provider, + "has_provider_key": has_provider_key(), + "install_source": config_source() or "source", + }, + ) + + +def shutdown() -> None: + if _client is None: + return + for method_name in ("shutdown", "flush"): + method = getattr(_client, method_name, None) + if callable(method): + try: + method() + except Exception: + logger.debug("PostHog telemetry shutdown failed", exc_info=True) + break + + +def set_posthog_factory_for_tests(factory: Callable[[str, str], Any] | None) -> None: + global _posthog_factory, _client + _posthog_factory = factory + _client = None + + +def _reset_for_tests() -> None: + global _client, _client_config, _posthog_factory, _state_cache, _state_created + _client = None + _client_config = None + _posthog_factory = None + _state_cache = None + _state_created = False diff --git a/telemetry_hooks.py b/telemetry_hooks.py new file mode 100644 index 00000000..c932f825 --- /dev/null +++ b/telemetry_hooks.py @@ -0,0 +1,557 @@ +from __future__ import annotations + +import inspect +import logging +import time +import types +import uuid +from collections.abc import Iterable +from typing import Any + +from agents.lifecycle import AgentHooksBase, RunHooksBase + +import telemetry + +logger = logging.getLogger(__name__) + + +class CompositeRunHooks(RunHooksBase[Any, Any]): + """Run hook fan-out that preserves existing user or persistence hooks.""" + + def __init__(self, *hooks: RunHooksBase[Any, Any] | None) -> None: + self.hooks = [hook for hook in hooks if hook is not None and not isinstance(hook, OpenSwarmTelemetryRunHooks)] + self.telemetry_hooks = OpenSwarmTelemetryRunHooks() + + async def _invoke(self, hook: Any, method_name: str, *args: Any) -> None: + method = getattr(hook, method_name, None) + if method is None: + return + result = method(*args) + if inspect.isawaitable(result): + await result + + async def _call(self, method_name: str, *args: Any) -> None: + for hook in self.hooks: + await self._invoke(hook, method_name, *args) + try: + await self._invoke(self.telemetry_hooks, method_name, *args) + except Exception: + logger.debug("Telemetry run hook %s failed", method_name, exc_info=True) + + async def on_llm_start(self, context: Any, agent: Any, system_prompt: str | None, input_items: list[Any]) -> None: + await self._call("on_llm_start", context, agent, system_prompt, input_items) + + async def on_llm_end(self, context: Any, agent: Any, response: Any) -> None: + await self._call("on_llm_end", context, agent, response) + + async def on_agent_start(self, context: Any, agent: Any) -> None: + await self._call("on_agent_start", context, agent) + + async def on_agent_end(self, context: Any, agent: Any, output: Any) -> None: + await self._call("on_agent_end", context, agent, output) + + async def on_handoff(self, context: Any, from_agent: Any, to_agent: Any) -> None: + await self._call("on_handoff", context, from_agent, to_agent) + + async def on_tool_start(self, context: Any, agent: Any, tool: Any) -> None: + await self._call("on_tool_start", context, agent, tool) + + async def on_tool_end(self, context: Any, agent: Any, tool: Any, result: str) -> None: + await self._call("on_tool_end", context, agent, tool, result) + + +class CompositeAgentHooks(AgentHooksBase[Any, Any]): + """Agent hook fan-out that keeps any pre-existing per-agent hooks active.""" + + def __init__(self, *hooks: AgentHooksBase[Any, Any] | None) -> None: + self.hooks = [hook for hook in hooks if hook is not None and not isinstance(hook, OpenSwarmTelemetryAgentHooks)] + self.telemetry_hooks = OpenSwarmTelemetryAgentHooks() + + async def _invoke(self, hook: Any, method_name: str, *args: Any) -> None: + method = getattr(hook, method_name, None) + if method is None: + return + result = method(*args) + if inspect.isawaitable(result): + await result + + async def _call(self, method_name: str, *args: Any) -> None: + for hook in self.hooks: + await self._invoke(hook, method_name, *args) + try: + await self._invoke(self.telemetry_hooks, method_name, *args) + except Exception: + logger.debug("Telemetry agent hook %s failed", method_name, exc_info=True) + + async def on_start(self, context: Any, agent: Any) -> None: + await self._call("on_start", context, agent) + + async def on_end(self, context: Any, agent: Any, output: Any) -> None: + await self._call("on_end", context, agent, output) + + async def on_handoff(self, context: Any, agent: Any, source: Any) -> None: + await self._call("on_handoff", context, agent, source) + + async def on_tool_start(self, context: Any, agent: Any, tool: Any) -> None: + await self._call("on_tool_start", context, agent, tool) + + async def on_tool_end(self, context: Any, agent: Any, tool: Any, result: str) -> None: + await self._call("on_tool_end", context, agent, tool, result) + + async def on_llm_start(self, context: Any, agent: Any, system_prompt: str | None, input_items: list[Any]) -> None: + await self._call("on_llm_start", context, agent, system_prompt, input_items) + + async def on_llm_end(self, context: Any, agent: Any, response: Any) -> None: + await self._call("on_llm_end", context, agent, response) + + +class OpenSwarmTelemetryRunHooks(RunHooksBase[Any, Any]): + """Run hook placeholder so composition remains explicit and future-safe.""" + + +class OpenSwarmTelemetryAgentHooks(AgentHooksBase[Any, Any]): + def __init__(self) -> None: + self._agent_start_times: dict[tuple[int, str], float] = {} + self._tool_start_times: dict[tuple[int, str, str], float] = {} + self._llm_start_times: dict[tuple[int, str], float] = {} + + async def on_start(self, context: Any, agent: Any) -> None: + key = _context_agent_key(context, agent) + self._agent_start_times[key] = time.monotonic() + telemetry.capture("agent_run_started", _agent_props(context, agent, status="started")) + + async def on_end(self, context: Any, agent: Any, output: Any) -> None: + key = _context_agent_key(context, agent) + started = self._agent_start_times.pop(key, None) + telemetry.capture( + "agent_run_completed", + _agent_props( + context, + agent, + status="completed", + latency_ms=_elapsed_ms(started), + ), + ) + + async def on_tool_start(self, context: Any, agent: Any, tool: Any) -> None: + tool_name = _tool_name(tool) + self._tool_start_times[_context_tool_key(context, agent, tool_name)] = time.monotonic() + + async def on_tool_end(self, context: Any, agent: Any, tool: Any, result: str) -> None: + tool_name = _tool_name(tool) + started = self._tool_start_times.pop(_context_tool_key(context, agent, tool_name), None) + props = _agent_props(context, agent, status="completed", latency_ms=_elapsed_ms(started)) + props["tool_name"] = tool_name + telemetry.capture("tool_invoked", props) + + async def on_handoff(self, context: Any, agent: Any, source: Any) -> None: + source_name = _agent_name(source) + props = _agent_props(context, agent, status="completed") + if source_name: + props["caller_agent_name"] = source_name + props["parent_agent_id"] = telemetry.agent_id(source_name) + telemetry.capture("handoff", props) + + async def on_llm_start(self, context: Any, agent: Any, system_prompt: str | None, input_items: list[Any]) -> None: + self._llm_start_times[_context_agent_key(context, agent)] = time.monotonic() + + async def on_llm_end(self, context: Any, agent: Any, response: Any) -> None: + key = _context_agent_key(context, agent) + started = self._llm_start_times.pop(key, None) + props = _agent_props(context, agent, status="completed", latency_ms=_elapsed_ms(started)) + props.update(_usage_props_from_response(response)) + stop_reason = _stop_reason(response) + if stop_reason: + props["stop_reason"] = stop_reason + telemetry.capture("llm_generation_completed", props) + + +class TelemetryStreamingRunResponse: + """Proxy that records completion/error only after a stream is consumed.""" + + def __init__(self, stream: Any, base_props: dict[str, Any], started_at: float) -> None: + self._stream = stream + self._base_props = base_props + self._started_at = started_at + self._finished = False + self._cancelled = False + + def __getattr__(self, name: str) -> Any: + return getattr(self._stream, name) + + def __aiter__(self) -> Any: + return self + + async def __anext__(self) -> Any: + try: + return await self._stream.__anext__() + except StopAsyncIteration: + self._complete(status="cancelled" if self._cancelled else "completed") + raise + except BaseException as exc: + self._error(exc) + raise + + async def asend(self, value: Any) -> Any: + try: + return await self._stream.asend(value) + except StopAsyncIteration: + self._complete(status="cancelled" if self._cancelled else "completed") + raise + except BaseException as exc: + self._error(exc) + raise + + async def athrow(self, typ: Any, val: Any = None, tb: Any = None) -> Any: + try: + return await self._stream.athrow(typ, val, tb) + except BaseException as exc: + self._error(exc) + raise + + async def aclose(self) -> None: + try: + await self._stream.aclose() + except BaseException as exc: + self._error(exc) + raise + finally: + self._complete(status="cancelled" if self._cancelled else "closed") + + def cancel(self, mode: str = "immediate") -> None: + self._cancelled = True + cancel = getattr(self._stream, "cancel", None) + if callable(cancel): + cancel(mode=mode) + + async def wait_final_result(self) -> Any: + try: + result = await self._stream.wait_final_result() + except BaseException as exc: + self._error(exc) + raise + self._complete(status="cancelled" if self._cancelled else "completed", run_result=result) + return result + + @property + def final_result(self) -> Any: + return getattr(self._stream, "final_result", None) + + @property + def final_output(self) -> Any: + return getattr(self._stream, "final_output", None) + + def _adopt_stream(self, other: Any) -> None: + adopt = getattr(self._stream, "_adopt_stream", None) + if callable(adopt): + adopt(other) + + def _complete(self, *, status: str, run_result: Any | None = None) -> None: + if self._finished: + return + self._finished = True + props = dict(self._base_props) + props.update({"status": status, "latency_ms": _elapsed_ms(self._started_at)}) + props.update(_usage_props_from_run_result(run_result if run_result is not None else self.final_result)) + telemetry.capture("swarm_run_completed", props) + + def _error(self, exc: BaseException) -> None: + if self._finished: + return + self._finished = True + props = dict(self._base_props) + props.update({"status": "error", "latency_ms": _elapsed_ms(self._started_at)}) + telemetry.capture_error(exc, category="swarm_run", properties=props) + + +def instrument_agency(agency: Any) -> Any: + """Attach OpenSwarm telemetry to an Agency instance without replacing hooks.""" + + install_thread_manager_telemetry() + _attach_agent_hooks(_iter_agents(agency)) + _wrap_agency_methods(agency) + if getattr(agency, "persistence_hooks", None) is not None: + agency.persistence_hooks = compose_run_hooks(agency.persistence_hooks) + return agency + + +def compose_run_hooks(hooks: RunHooksBase[Any, Any] | None) -> RunHooksBase[Any, Any] | None: + if hooks is None: + return None + if isinstance(hooks, CompositeRunHooks): + return hooks + return CompositeRunHooks(hooks) + + +def compose_agent_hooks(hooks: AgentHooksBase[Any, Any] | None) -> AgentHooksBase[Any, Any]: + if isinstance(hooks, CompositeAgentHooks): + return hooks + return CompositeAgentHooks(hooks) + + +def install_thread_manager_telemetry() -> None: + from agency_swarm.utils.thread import ThreadManager + + if getattr(ThreadManager, "_openswarm_telemetry_wrapped", False): + return + + original_add_message = ThreadManager.add_message + original_add_messages = ThreadManager.add_messages + + def add_message(self: Any, message: Any) -> Any: + result = original_add_message(self, message) + _capture_message_sent(message, self) + return result + + def add_messages(self: Any, messages: list[Any]) -> Any: + result = original_add_messages(self, messages) + for message in messages or []: + _capture_message_sent(message, self) + return result + + ThreadManager.add_message = add_message + ThreadManager.add_messages = add_messages + ThreadManager._openswarm_telemetry_wrapped = True + + +def _wrap_agency_methods(agency: Any) -> None: + if getattr(agency, "_openswarm_telemetry_wrapped", False): + return + + original_get_response = agency.get_response + original_get_response_stream = agency.get_response_stream + + async def get_response_with_telemetry(self: Any, *args: Any, **kwargs: Any) -> Any: + args = _compose_hooks_override(args, kwargs) + props = _swarm_props(self, args, kwargs, is_streaming=False, status="started") + started_at = time.monotonic() + telemetry.capture("swarm_run_started", props) + try: + result = await original_get_response(*args, **kwargs) + except BaseException as exc: + error_props = dict(props) + error_props.update({"status": "error", "latency_ms": _elapsed_ms(started_at)}) + telemetry.capture_error(exc, category="swarm_run", properties=error_props) + raise + completed_props = dict(props) + completed_props.update({"status": "completed", "latency_ms": _elapsed_ms(started_at)}) + completed_props.update(_usage_props_from_run_result(result)) + telemetry.capture("swarm_run_completed", completed_props) + return result + + def get_response_stream_with_telemetry(self: Any, *args: Any, **kwargs: Any) -> Any: + args = _compose_hooks_override(args, kwargs) + props = _swarm_props(self, args, kwargs, is_streaming=True, status="started") + started_at = time.monotonic() + telemetry.capture("swarm_run_started", props) + try: + stream = original_get_response_stream(*args, **kwargs) + except BaseException as exc: + error_props = dict(props) + error_props.update({"status": "error", "latency_ms": _elapsed_ms(started_at)}) + telemetry.capture_error(exc, category="swarm_run", properties=error_props) + raise + return TelemetryStreamingRunResponse(stream, props, started_at) + + agency.get_response = types.MethodType(get_response_with_telemetry, agency) + agency.get_response_stream = types.MethodType(get_response_stream_with_telemetry, agency) + agency._openswarm_telemetry_wrapped = True + + +def _compose_hooks_override(args: tuple[Any, ...], kwargs: dict[str, Any]) -> tuple[Any, ...]: + if "hooks_override" in kwargs and kwargs["hooks_override"] is not None: + kwargs["hooks_override"] = compose_run_hooks(kwargs["hooks_override"]) + return args + if len(args) <= 3 or args[3] is None: + return args + mutable_args = list(args) + mutable_args[3] = compose_run_hooks(mutable_args[3]) + return tuple(mutable_args) + + +def _attach_agent_hooks(agents: Iterable[Any]) -> None: + for agent in agents: + try: + agent.hooks = compose_agent_hooks(getattr(agent, "hooks", None)) + except Exception: + logger.debug("Could not attach telemetry hooks to agent", exc_info=True) + + +def _iter_agents(agency: Any) -> list[Any]: + raw_agents = getattr(agency, "agents", None) + if isinstance(raw_agents, dict): + return list(raw_agents.values()) + if raw_agents is not None: + return list(raw_agents) + entry_points = getattr(agency, "entry_points", None) + return list(entry_points or []) + + +def _capture_message_sent(message: Any, manager: Any) -> None: + if not isinstance(message, dict): + return + role = message.get("role") + if role not in {"user", "assistant"}: + return + agent_name = _safe_text(message.get("agent")) + caller_agent_name = _safe_text(message.get("callerAgent")) + props: dict[str, Any] = { + "message_role": role, + "message_type": _safe_text(message.get("type")) or "message", + "thread_id": telemetry.thread_id(id(manager)), + "session_id": telemetry._SESSION_ID, + "is_streaming": bool(message.get("streaming", False)), + } + if agent_name: + props["agent_name"] = agent_name + props["agent_id"] = telemetry.agent_id(agent_name) + if caller_agent_name: + props["caller_agent_name"] = caller_agent_name + for key in ("agent_run_id", "parent_run_id", "run_trace_id"): + value = _safe_text(message.get(key)) + if value: + props[key] = value + telemetry.capture("message_sent", props) + + +def _swarm_props(agency: Any, args: tuple[Any, ...], kwargs: dict[str, Any], *, is_streaming: bool, status: str) -> dict[str, Any]: + agent_name = _resolve_recipient_agent_name(agency, args, kwargs) + props: dict[str, Any] = { + "status": status, + "is_streaming": is_streaming, + "run_trace_id": uuid.uuid4().hex, + } + if agent_name: + props["agent_name"] = agent_name + props["agent_id"] = telemetry.agent_id(agent_name) + return props + + +def _resolve_recipient_agent_name(agency: Any, args: tuple[Any, ...], kwargs: dict[str, Any]) -> str | None: + recipient = kwargs.get("recipient_agent") + if recipient is None and len(args) >= 2: + recipient = args[1] + if recipient is None: + entry_points = getattr(agency, "entry_points", None) or [] + recipient = entry_points[0] if entry_points else None + return _agent_name(recipient) + + +def _agent_props(context: Any, agent: Any, **extra: Any) -> dict[str, Any]: + name = _agent_name(agent) + props: dict[str, Any] = { + "agent_name": name, + "agent_id": telemetry.agent_id(name), + } + model = _model_name(agent) + if model: + props["model"] = model + props["provider"] = telemetry.provider_from_model(model) + run_context = getattr(context, "context", None) + for source_key, target_key in ( + ("_current_agent_run_id", "agent_run_id"), + ("_parent_run_id", "parent_run_id"), + ("_run_trace_id", "run_trace_id"), + ): + value = _safe_text(getattr(run_context, source_key, None)) + if value: + props[target_key] = value + props.update({key: value for key, value in extra.items() if value is not None}) + return props + + +def _model_name(agent: Any) -> str | None: + try: + from agency_swarm.agent.execution import get_model_name + + return get_model_name(getattr(agent, "model", None)) + except Exception: + model = getattr(agent, "model", None) + return str(model) if model is not None else None + + +def _usage_props_from_run_result(run_result: Any) -> dict[str, Any]: + if run_result is None: + return {} + try: + from agency_swarm.utils.usage_tracking import calculate_usage_with_cost, extract_usage_from_run_result + + usage = extract_usage_from_run_result(run_result) + if usage is None: + return {} + usage = calculate_usage_with_cost(usage, run_result=run_result) + return _usage_props(usage.input_tokens, usage.output_tokens, usage.total_cost) + except Exception: + logger.debug("Could not extract run usage for telemetry", exc_info=True) + return {} + + +def _usage_props_from_response(response: Any) -> dict[str, Any]: + usage = getattr(response, "usage", None) + if usage is None: + return {} + return _usage_props( + getattr(usage, "input_tokens", None), + getattr(usage, "output_tokens", None), + None, + ) + + +def _usage_props(tokens_input: Any, tokens_output: Any, cost: Any) -> dict[str, Any]: + props: dict[str, Any] = {} + if isinstance(tokens_input, int): + props["tokens_input"] = tokens_input + if isinstance(tokens_output, int): + props["tokens_output"] = tokens_output + if isinstance(cost, int | float): + props["cost_usd"] = float(cost) + return props + + +def _stop_reason(response: Any) -> str | None: + raw_items = getattr(response, "output", None) or getattr(response, "items", None) or [] + if not isinstance(raw_items, list): + return None + for item in raw_items: + status = _safe_text(getattr(item, "status", None)) + if status: + return status + if isinstance(item, dict): + status = _safe_text(item.get("status")) + if status: + return status + return None + + +def _tool_name(tool: Any) -> str: + return _safe_text(getattr(tool, "name", None) or getattr(tool, "__name__", None) or tool.__class__.__name__) or "unknown" + + +def _agent_name(agent: Any) -> str | None: + if agent is None: + return None + if isinstance(agent, str): + return agent + return _safe_text(getattr(agent, "name", None)) + + +def _context_agent_key(context: Any, agent: Any) -> tuple[int, str]: + return (id(context), _agent_name(agent) or "unknown") + + +def _context_tool_key(context: Any, agent: Any, tool_name: str) -> tuple[int, str, str]: + return (id(context), _agent_name(agent) or "unknown", tool_name) + + +def _elapsed_ms(started_at: float | None) -> int | None: + if started_at is None: + return None + return max(0, int((time.monotonic() - started_at) * 1000)) + + +def _safe_text(value: Any) -> str | None: + if value is None: + return None + if isinstance(value, str): + return value[:256] + return str(value)[:256] diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 00000000..f96b4d86 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,8 @@ +from __future__ import annotations + +import sys +from pathlib import Path + +ROOT = Path(__file__).resolve().parents[1] +if str(ROOT) not in sys.path: + sys.path.insert(0, str(ROOT)) diff --git a/tests/test_telemetry.py b/tests/test_telemetry.py new file mode 100644 index 00000000..c0413cdf --- /dev/null +++ b/tests/test_telemetry.py @@ -0,0 +1,358 @@ +from __future__ import annotations + +import hashlib +import asyncio +import os +from types import SimpleNamespace + +import pytest + +import telemetry +import telemetry_hooks +from agents.lifecycle import AgentHooksBase, RunHooksBase + + +class FakePostHog: + def __init__(self, api_key: str, host: str, events: list[dict]) -> None: + self.api_key = api_key + self.host = host + self.events = events + + def capture(self, *, event: str, distinct_id: str, properties: dict) -> None: + self.events.append( + { + "event": event, + "distinct_id": distinct_id, + "properties": properties, + "api_key": self.api_key, + "host": self.host, + } + ) + + +@pytest.fixture +def telemetry_events(monkeypatch: pytest.MonkeyPatch, tmp_path): + for key in ( + "POSTHOG_API_KEY", + "POSTHOG_HOST", + "OPENSWARM_TELEMETRY", + "DO_NOT_TRACK", + "CI", + "PYTEST_VERSION", + ): + monkeypatch.delenv(key, raising=False) + monkeypatch.setenv("OPENSWARM_CONFIG_DIR", str(tmp_path / "config")) + monkeypatch.setenv("OPENSWARM_TELEMETRY_ALLOW_TESTS", "1") + monkeypatch.setattr(telemetry, "_MODULE_DIR", tmp_path) + telemetry._reset_for_tests() + events: list[dict] = [] + telemetry.set_posthog_factory_for_tests(lambda api_key, host: FakePostHog(api_key, host, events)) + yield events + telemetry._reset_for_tests() + + +def test_env_key_enables_telemetry(monkeypatch: pytest.MonkeyPatch, telemetry_events: list[dict]) -> None: + monkeypatch.setenv("POSTHOG_API_KEY", "ph_env") + + assert telemetry.capture("app_started", {"install_source": "env"}) + + assert telemetry_events[0]["event"] == "app_started" + assert telemetry_events[0]["api_key"] == "ph_env" + assert telemetry_events[0]["host"] == telemetry.DEFAULT_POSTHOG_HOST + + +def test_generated_npm_config_enables_when_env_absent(tmp_path, telemetry_events: list[dict]) -> None: + (tmp_path / "openswarm_telemetry_config.py").write_text( + "POSTHOG_API_KEY = 'ph_generated'\nPOSTHOG_HOST = 'https://eu.i.posthog.com'\n", + encoding="utf-8", + ) + + assert telemetry.capture("app_started", {"install_source": "npm_config"}) + + assert telemetry_events[0]["api_key"] == "ph_generated" + assert telemetry_events[0]["host"] == "https://eu.i.posthog.com" + + +def test_env_key_overrides_generated_config(monkeypatch: pytest.MonkeyPatch, tmp_path, telemetry_events: list[dict]) -> None: + (tmp_path / "openswarm_telemetry_config.py").write_text( + "POSTHOG_API_KEY = 'ph_generated'\nPOSTHOG_HOST = 'https://generated.example'\n", + encoding="utf-8", + ) + monkeypatch.setenv("POSTHOG_API_KEY", "ph_env") + monkeypatch.setenv("POSTHOG_HOST", "https://env.example") + + assert telemetry.capture("app_started") + + assert telemetry_events[0]["api_key"] == "ph_env" + assert telemetry_events[0]["host"] == "https://env.example" + + +def test_source_install_without_key_is_noop(telemetry_events: list[dict]) -> None: + assert not telemetry.capture("app_started") + assert telemetry_events == [] + + +@pytest.mark.parametrize( + ("env_key", "env_value"), + [ + ("OPENSWARM_TELEMETRY", "false"), + ("OPENSWARM_TELEMETRY", "0"), + ("OPENSWARM_TELEMETRY", "no"), + ("OPENSWARM_TELEMETRY", "off"), + ("DO_NOT_TRACK", "1"), + ("CI", "1"), + ], +) +def test_opt_out_and_ci_suppress_captures( + monkeypatch: pytest.MonkeyPatch, + telemetry_events: list[dict], + env_key: str, + env_value: str, +) -> None: + monkeypatch.setenv("POSTHOG_API_KEY", "ph_env") + if env_key == "CI": + monkeypatch.delenv("OPENSWARM_TELEMETRY_ALLOW_TESTS", raising=False) + monkeypatch.setenv(env_key, env_value) + telemetry._reset_for_tests() + telemetry.set_posthog_factory_for_tests(lambda api_key, host: FakePostHog(api_key, host, telemetry_events)) + + assert not telemetry.capture("app_started") + assert telemetry_events == [] + + +def test_pytest_mode_suppresses_without_explicit_allow(monkeypatch: pytest.MonkeyPatch, telemetry_events: list[dict]) -> None: + monkeypatch.setenv("POSTHOG_API_KEY", "ph_env") + monkeypatch.delenv("OPENSWARM_TELEMETRY_ALLOW_TESTS", raising=False) + telemetry._reset_for_tests() + telemetry.set_posthog_factory_for_tests(lambda api_key, host: FakePostHog(api_key, host, telemetry_events)) + + assert not telemetry.capture("app_started") + assert telemetry_events == [] + + +def test_agent_name_is_raw_on_relevant_events(telemetry_events: list[dict], monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("POSTHOG_API_KEY", "ph_env") + relevant_events = [ + "message_sent", + "agent_run_started", + "agent_run_completed", + "llm_generation_completed", + "tool_invoked", + "handoff", + "error", + ] + + for event in relevant_events: + telemetry.capture(event, {"agent_name": "Docs Agent", "agent_id": telemetry.agent_id("Docs Agent")}) + + assert [entry["event"] for entry in telemetry_events] == relevant_events + for entry in telemetry_events: + props = entry["properties"] + assert props["agent_name"] == "Docs Agent" + assert props["agent_id"] != "Docs Agent" + + +def test_workspace_id_is_hmac_not_plain_path_hash(telemetry_events: list[dict], tmp_path) -> None: + workspace = tmp_path / "secret-project" + workspace.mkdir() + derived = telemetry.workspace_id(workspace) + plain_hash = hashlib.sha256(str(workspace.resolve()).encode("utf-8")).hexdigest() + + assert derived != str(workspace.resolve()) + assert derived != plain_hash + assert derived == telemetry.workspace_id(workspace) + + +def test_error_event_excludes_messages_traces_and_paths(monkeypatch: pytest.MonkeyPatch, telemetry_events: list[dict]) -> None: + monkeypatch.setenv("POSTHOG_API_KEY", "ph_env") + error = ValueError("secret prompt in /Users/person/project") + + telemetry.capture_error( + error, + category="swarm_run", + properties={ + "agent_name": "Orchestrator", + "message": "secret", + "traceback": "stack", + "file_path": "/Users/person/project/file.py", + "http_status": 429, + }, + ) + + props = telemetry_events[0]["properties"] + assert props["error_type"] == "ValueError" + assert props["error_category"] == "swarm_run" + assert props["http_status"] == 429 + assert "secret prompt" not in str(props) + assert "traceback" not in props + assert "file_path" not in props + assert "message" not in props + + +def test_agent_hook_composition_preserves_existing_hook(monkeypatch: pytest.MonkeyPatch, telemetry_events: list[dict]) -> None: + monkeypatch.setenv("POSTHOG_API_KEY", "ph_env") + calls: list[str] = [] + + class ExistingHooks(AgentHooksBase): + async def on_start(self, context, agent): + calls.append("existing") + + async def run_hook() -> None: + hooks = telemetry_hooks.compose_agent_hooks(ExistingHooks()) + await hooks.on_start(SimpleNamespace(context=SimpleNamespace()), SimpleNamespace(name="Research", model="gpt-5.2")) + + asyncio.run(run_hook()) + + assert calls == ["existing"] + assert telemetry_events[0]["event"] == "agent_run_started" + assert telemetry_events[0]["properties"]["agent_name"] == "Research" + + +def test_run_hook_composition_preserves_existing_hook() -> None: + calls: list[str] = [] + + class ExistingRunHooks(RunHooksBase): + async def on_agent_start(self, context, agent): + calls.append("existing") + + async def run_hook() -> None: + hooks = telemetry_hooks.compose_run_hooks(ExistingRunHooks()) + await hooks.on_agent_start(None, SimpleNamespace(name="Research")) + + asyncio.run(run_hook()) + + assert calls == ["existing"] + + +def test_positional_hooks_override_is_composed() -> None: + class ExistingRunHooks(RunHooksBase): + pass + + hook = ExistingRunHooks() + args = ("message", None, None, hook) + kwargs = {} + + composed_args = telemetry_hooks._compose_hooks_override(args, kwargs) + + assert composed_args[:3] == args[:3] + assert isinstance(composed_args[3], telemetry_hooks.CompositeRunHooks) + + +class TwoItemStream: + def __init__(self) -> None: + self.items = iter(["first", "second"]) + self.final_result = SimpleNamespace() + + def __aiter__(self): + return self + + async def __anext__(self): + try: + return next(self.items) + except StopIteration: + raise StopAsyncIteration + + async def aclose(self): + return None + + async def wait_final_result(self): + return self.final_result + + +def test_streaming_completion_fires_on_consumption(monkeypatch: pytest.MonkeyPatch, telemetry_events: list[dict]) -> None: + monkeypatch.setenv("POSTHOG_API_KEY", "ph_env") + stream = telemetry_hooks.TelemetryStreamingRunResponse( + TwoItemStream(), + {"agent_name": "Orchestrator", "is_streaming": True}, + telemetry_hooks.time.monotonic(), + ) + + async def consume_stream() -> list[str]: + consumed = [] + async for item in stream: + consumed.append(item) + assert telemetry_events == [] + return consumed + + consumed = asyncio.run(consume_stream()) + + assert consumed == ["first", "second"] + assert telemetry_events[0]["event"] == "swarm_run_completed" + assert telemetry_events[0]["properties"]["agent_name"] == "Orchestrator" + + +class ErrorStream: + final_result = None + + def __aiter__(self): + return self + + async def __anext__(self): + raise RuntimeError("private path /Users/person/project") + + async def aclose(self): + return None + + +def test_streaming_error_fires_on_consumption(monkeypatch: pytest.MonkeyPatch, telemetry_events: list[dict]) -> None: + monkeypatch.setenv("POSTHOG_API_KEY", "ph_env") + stream = telemetry_hooks.TelemetryStreamingRunResponse( + ErrorStream(), + {"agent_name": "Orchestrator", "is_streaming": True}, + telemetry_hooks.time.monotonic(), + ) + + async def consume_stream() -> None: + await stream.__anext__() + + with pytest.raises(RuntimeError): + asyncio.run(consume_stream()) + + assert telemetry_events[0]["event"] == "error" + props = telemetry_events[0]["properties"] + assert props["error_type"] == "RuntimeError" + assert "private path" not in str(props) + + +def test_message_sent_extracts_only_safe_fields(monkeypatch: pytest.MonkeyPatch, telemetry_events: list[dict]) -> None: + monkeypatch.setenv("POSTHOG_API_KEY", "ph_env") + message = { + "role": "assistant", + "type": "message", + "content": [{"type": "output_text", "text": "do not send"}], + "agent": "Docs Agent", + "callerAgent": "Orchestrator", + "agent_run_id": "run_1", + "run_trace_id": "trace_1", + } + + telemetry_hooks._capture_message_sent(message, manager=object()) + + props = telemetry_events[0]["properties"] + assert props["agent_name"] == "Docs Agent" + assert props["caller_agent_name"] == "Orchestrator" + assert props["message_role"] == "assistant" + assert "content" not in props + assert "do not send" not in str(props) + + +def test_tool_invoked_payload_drops_content_like_properties(monkeypatch: pytest.MonkeyPatch, telemetry_events: list[dict]) -> None: + monkeypatch.setenv("POSTHOG_API_KEY", "ph_env") + + telemetry.capture( + "tool_invoked", + { + "agent_name": "Slides Agent", + "tool_name": "create_slide", + "tool_args": {"prompt": "secret"}, + "tool_result": "secret output", + "latency_ms": 5, + }, + ) + + props = telemetry_events[0]["properties"] + assert props["agent_name"] == "Slides Agent" + assert props["tool_name"] == "create_slide" + assert "tool_args" not in props + assert "tool_result" not in props + assert "secret" not in str(props) diff --git a/tests/test_telemetry_scripts.py b/tests/test_telemetry_scripts.py new file mode 100644 index 00000000..8b053bd5 --- /dev/null +++ b/tests/test_telemetry_scripts.py @@ -0,0 +1,50 @@ +from __future__ import annotations + +import pytest + +from scripts.create_posthog_dashboard import build_dashboard_payload, build_dry_run_payload, build_insight_payloads +from scripts.smoke_telemetry import SMOKE_EVENT, build_smoke_properties +from scripts.write_telemetry_config import build_config_source + + +def test_config_writer_builds_generated_module() -> None: + source = build_config_source("ph_test", "https://eu.i.posthog.com") + + assert "POSTHOG_API_KEY = 'ph_test'" in source + assert "POSTHOG_HOST = 'https://eu.i.posthog.com'" in source + assert "Do not commit this file" in source + + +def test_config_writer_requires_key() -> None: + with pytest.raises(ValueError): + build_config_source("") + + +def test_dashboard_payload_name() -> None: + assert build_dashboard_payload()["name"] == "OpenSwarm Product Analytics" + + +def test_dashboard_agent_usage_groups_by_agent_name() -> None: + payloads = build_insight_payloads(123) + agent_usage = next(payload for payload in payloads if payload["name"] == "Agent usage by agent") + + source = agent_usage["query"]["source"] + assert source["series"][0]["event"] == "agent_run_completed" + assert source["breakdownFilter"]["breakdown"] == "agent_name" + assert agent_usage["dashboards"] == [123] + + +def test_dashboard_dry_run_payload_has_no_secret_fields() -> None: + payload = build_dry_run_payload("env_123") + + assert payload["dashboard"]["path"] == "/api/environments/env_123/dashboards/" + assert payload["insights"][0]["path"] == "/api/environments/env_123/insights/" + assert "POSTHOG_PERSONAL_API_KEY" not in str(payload) + + +def test_smoke_script_uses_manual_smoke_event() -> None: + props = build_smoke_properties() + + assert SMOKE_EVENT == "telemetry_smoke_test" + assert props["status"] == "manual" + assert "POSTHOG_API_KEY" not in str(props) From 25654b2bca9b461ba2d20e631f249a4d676b626f Mon Sep 17 00:00:00 2001 From: Mykhailo Shchuka Date: Wed, 13 May 2026 19:11:15 +0300 Subject: [PATCH 02/10] Harden telemetry message capture --- telemetry_hooks.py | 147 ++++++++++++++++++++++++++++++++++------ tests/test_telemetry.py | 107 +++++++++++++++++++++++++++-- 2 files changed, 230 insertions(+), 24 deletions(-) diff --git a/telemetry_hooks.py b/telemetry_hooks.py index c932f825..2d376350 100644 --- a/telemetry_hooks.py +++ b/telemetry_hooks.py @@ -1,5 +1,6 @@ from __future__ import annotations +import contextvars import inspect import logging import time @@ -13,6 +14,12 @@ import telemetry logger = logging.getLogger(__name__) +_trusted_message_context: contextvars.ContextVar[dict[str, Any] | None] = contextvars.ContextVar( + "openswarm_trusted_message_context", + default=None, +) +_trusted_thread_managers: dict[int, dict[str, Any]] = {} +_MESSAGE_TYPES = {"message"} class CompositeRunHooks(RunHooksBase[Any, Any]): @@ -118,12 +125,14 @@ def __init__(self) -> None: async def on_start(self, context: Any, agent: Any) -> None: key = _context_agent_key(context, agent) self._agent_start_times[key] = time.monotonic() - telemetry.capture("agent_run_started", _agent_props(context, agent, status="started")) + props = _agent_props(context, agent, status="started") + _set_trusted_message_context(props) + _safe_capture("agent_run_started", props) async def on_end(self, context: Any, agent: Any, output: Any) -> None: key = _context_agent_key(context, agent) started = self._agent_start_times.pop(key, None) - telemetry.capture( + _safe_capture( "agent_run_completed", _agent_props( context, @@ -142,15 +151,16 @@ async def on_tool_end(self, context: Any, agent: Any, tool: Any, result: str) -> started = self._tool_start_times.pop(_context_tool_key(context, agent, tool_name), None) props = _agent_props(context, agent, status="completed", latency_ms=_elapsed_ms(started)) props["tool_name"] = tool_name - telemetry.capture("tool_invoked", props) + _safe_capture("tool_invoked", props) async def on_handoff(self, context: Any, agent: Any, source: Any) -> None: source_name = _agent_name(source) props = _agent_props(context, agent, status="completed") + _set_trusted_message_context(props) if source_name: props["caller_agent_name"] = source_name props["parent_agent_id"] = telemetry.agent_id(source_name) - telemetry.capture("handoff", props) + _safe_capture("handoff", props) async def on_llm_start(self, context: Any, agent: Any, system_prompt: str | None, input_items: list[Any]) -> None: self._llm_start_times[_context_agent_key(context, agent)] = time.monotonic() @@ -163,7 +173,7 @@ async def on_llm_end(self, context: Any, agent: Any, response: Any) -> None: stop_reason = _stop_reason(response) if stop_reason: props["stop_reason"] = stop_reason - telemetry.capture("llm_generation_completed", props) + _safe_capture("llm_generation_completed", props) class TelemetryStreamingRunResponse: @@ -183,6 +193,7 @@ def __aiter__(self) -> Any: return self async def __anext__(self) -> Any: + token = _set_trusted_message_context(self._base_props) try: return await self._stream.__anext__() except StopAsyncIteration: @@ -191,8 +202,11 @@ async def __anext__(self) -> Any: except BaseException as exc: self._error(exc) raise + finally: + _reset_trusted_message_context(token) async def asend(self, value: Any) -> Any: + token = _set_trusted_message_context(self._base_props) try: return await self._stream.asend(value) except StopAsyncIteration: @@ -201,15 +215,21 @@ async def asend(self, value: Any) -> Any: except BaseException as exc: self._error(exc) raise + finally: + _reset_trusted_message_context(token) async def athrow(self, typ: Any, val: Any = None, tb: Any = None) -> Any: + token = _set_trusted_message_context(self._base_props) try: return await self._stream.athrow(typ, val, tb) except BaseException as exc: self._error(exc) raise + finally: + _reset_trusted_message_context(token) async def aclose(self) -> None: + token = _set_trusted_message_context(self._base_props) try: await self._stream.aclose() except BaseException as exc: @@ -217,6 +237,7 @@ async def aclose(self) -> None: raise finally: self._complete(status="cancelled" if self._cancelled else "closed") + _reset_trusted_message_context(token) def cancel(self, mode: str = "immediate") -> None: self._cancelled = True @@ -225,11 +246,14 @@ def cancel(self, mode: str = "immediate") -> None: cancel(mode=mode) async def wait_final_result(self) -> Any: + token = _set_trusted_message_context(self._base_props) try: result = await self._stream.wait_final_result() except BaseException as exc: self._error(exc) raise + finally: + _reset_trusted_message_context(token) self._complete(status="cancelled" if self._cancelled else "completed", run_result=result) return result @@ -253,7 +277,7 @@ def _complete(self, *, status: str, run_result: Any | None = None) -> None: props = dict(self._base_props) props.update({"status": status, "latency_ms": _elapsed_ms(self._started_at)}) props.update(_usage_props_from_run_result(run_result if run_result is not None else self.final_result)) - telemetry.capture("swarm_run_completed", props) + _safe_capture("swarm_run_completed", props) def _error(self, exc: BaseException) -> None: if self._finished: @@ -261,13 +285,14 @@ def _error(self, exc: BaseException) -> None: self._finished = True props = dict(self._base_props) props.update({"status": "error", "latency_ms": _elapsed_ms(self._started_at)}) - telemetry.capture_error(exc, category="swarm_run", properties=props) + _safe_capture_error(exc, category="swarm_run", properties=props) def instrument_agency(agency: Any) -> Any: """Attach OpenSwarm telemetry to an Agency instance without replacing hooks.""" install_thread_manager_telemetry() + _register_thread_manager_context(agency) _attach_agent_hooks(_iter_agents(agency)) _wrap_agency_methods(agency) if getattr(agency, "persistence_hooks", None) is not None: @@ -300,13 +325,19 @@ def install_thread_manager_telemetry() -> None: def add_message(self: Any, message: Any) -> Any: result = original_add_message(self, message) - _capture_message_sent(message, self) + try: + _capture_message_sent(message, self) + except Exception: + logger.debug("Telemetry message capture failed", exc_info=True) return result def add_messages(self: Any, messages: list[Any]) -> Any: result = original_add_messages(self, messages) for message in messages or []: - _capture_message_sent(message, self) + try: + _capture_message_sent(message, self) + except Exception: + logger.debug("Telemetry message capture failed", exc_info=True) return result ThreadManager.add_message = add_message @@ -325,31 +356,34 @@ async def get_response_with_telemetry(self: Any, *args: Any, **kwargs: Any) -> A args = _compose_hooks_override(args, kwargs) props = _swarm_props(self, args, kwargs, is_streaming=False, status="started") started_at = time.monotonic() - telemetry.capture("swarm_run_started", props) + _safe_capture("swarm_run_started", props) + token = _set_trusted_message_context(props) try: result = await original_get_response(*args, **kwargs) except BaseException as exc: error_props = dict(props) error_props.update({"status": "error", "latency_ms": _elapsed_ms(started_at)}) - telemetry.capture_error(exc, category="swarm_run", properties=error_props) + _safe_capture_error(exc, category="swarm_run", properties=error_props) raise + finally: + _reset_trusted_message_context(token) completed_props = dict(props) completed_props.update({"status": "completed", "latency_ms": _elapsed_ms(started_at)}) completed_props.update(_usage_props_from_run_result(result)) - telemetry.capture("swarm_run_completed", completed_props) + _safe_capture("swarm_run_completed", completed_props) return result def get_response_stream_with_telemetry(self: Any, *args: Any, **kwargs: Any) -> Any: args = _compose_hooks_override(args, kwargs) props = _swarm_props(self, args, kwargs, is_streaming=True, status="started") started_at = time.monotonic() - telemetry.capture("swarm_run_started", props) + _safe_capture("swarm_run_started", props) try: stream = original_get_response_stream(*args, **kwargs) except BaseException as exc: error_props = dict(props) error_props.update({"status": "error", "latency_ms": _elapsed_ms(started_at)}) - telemetry.capture_error(exc, category="swarm_run", properties=error_props) + _safe_capture_error(exc, category="swarm_run", properties=error_props) raise return TelemetryStreamingRunResponse(stream, props, started_at) @@ -387,20 +421,30 @@ def _iter_agents(agency: Any) -> list[Any]: return list(entry_points or []) +def _register_thread_manager_context(agency: Any) -> None: + manager = getattr(agency, "thread_manager", None) + if manager is None: + return + agent_names = {name for name in (_agent_name(agent) for agent in _iter_agents(agency)) if name} + _trusted_thread_managers[id(manager)] = {"agent_names": agent_names} + + def _capture_message_sent(message: Any, manager: Any) -> None: if not isinstance(message, dict): return role = message.get("role") if role not in {"user", "assistant"}: return - agent_name = _safe_text(message.get("agent")) - caller_agent_name = _safe_text(message.get("callerAgent")) + trusted_context = _message_context_for(manager) + known_agent_names = trusted_context.get("agent_names") or set() + agent_name = _trusted_agent_name(trusted_context, known_agent_names) + caller_agent_name = _trusted_message_agent_name(message.get("callerAgent"), known_agent_names) props: dict[str, Any] = { "message_role": role, - "message_type": _safe_text(message.get("type")) or "message", + "message_type": _safe_message_type(message.get("type")), "thread_id": telemetry.thread_id(id(manager)), "session_id": telemetry._SESSION_ID, - "is_streaming": bool(message.get("streaming", False)), + "is_streaming": bool(trusted_context.get("is_streaming", False)), } if agent_name: props["agent_name"] = agent_name @@ -408,10 +452,57 @@ def _capture_message_sent(message: Any, manager: Any) -> None: if caller_agent_name: props["caller_agent_name"] = caller_agent_name for key in ("agent_run_id", "parent_run_id", "run_trace_id"): - value = _safe_text(message.get(key)) + value = _safe_text(trusted_context.get(key)) if value: props[key] = value - telemetry.capture("message_sent", props) + _safe_capture("message_sent", props) + + +def _message_context_for(manager: Any) -> dict[str, Any]: + context = dict(_trusted_message_context.get() or {}) + manager_context = _trusted_thread_managers.get(id(manager), {}) + agent_names = set(manager_context.get("agent_names") or set()) + agent_names.update(context.get("agent_names") or set()) + context["agent_names"] = agent_names + return context + + +def _trusted_agent_name(context: dict[str, Any], known_agent_names: set[str]) -> str | None: + agent_name = _safe_text(context.get("agent_name")) + if not agent_name: + return None + if known_agent_names and agent_name not in known_agent_names: + return None + return agent_name + + +def _trusted_message_agent_name(value: Any, known_agent_names: set[str]) -> str | None: + candidate = _safe_text(value) + if not candidate or not known_agent_names: + return None + return candidate if candidate in known_agent_names else None + + +def _safe_message_type(value: Any) -> str: + candidate = _safe_text(value) + if candidate in _MESSAGE_TYPES: + return candidate + return "message" + + +def _set_trusted_message_context(props: dict[str, Any]) -> contextvars.Token[dict[str, Any] | None]: + current = dict(_trusted_message_context.get() or {}) + merged = {**current, **props} + if "agent_names" in current and "agent_names" not in props: + merged["agent_names"] = current["agent_names"] + return _trusted_message_context.set(merged) + + +def _reset_trusted_message_context(token: contextvars.Token[dict[str, Any] | None]) -> None: + try: + _trusted_message_context.reset(token) + except Exception: + logger.debug("Could not reset telemetry message context", exc_info=True) def _swarm_props(agency: Any, args: tuple[Any, ...], kwargs: dict[str, Any], *, is_streaming: bool, status: str) -> dict[str, Any]: @@ -460,6 +551,22 @@ def _agent_props(context: Any, agent: Any, **extra: Any) -> dict[str, Any]: return props +def _safe_capture(event: str, properties: dict[str, Any] | None = None) -> bool: + try: + return telemetry.capture(event, properties) + except Exception: + logger.debug("Telemetry capture failed for event %s", event, exc_info=True) + return False + + +def _safe_capture_error(error: BaseException, *, category: str, properties: dict[str, Any] | None = None) -> bool: + try: + return telemetry.capture_error(error, category=category, properties=properties) + except Exception: + logger.debug("Telemetry error capture failed for category %s", category, exc_info=True) + return False + + def _model_name(agent: Any) -> str | None: try: from agency_swarm.agent.execution import get_model_name diff --git a/tests/test_telemetry.py b/tests/test_telemetry.py index c0413cdf..8d643817 100644 --- a/tests/test_telemetry.py +++ b/tests/test_telemetry.py @@ -45,10 +45,14 @@ def telemetry_events(monkeypatch: pytest.MonkeyPatch, tmp_path): monkeypatch.setenv("OPENSWARM_TELEMETRY_ALLOW_TESTS", "1") monkeypatch.setattr(telemetry, "_MODULE_DIR", tmp_path) telemetry._reset_for_tests() + telemetry_hooks._trusted_thread_managers.clear() + telemetry_hooks._trusted_message_context.set(None) events: list[dict] = [] telemetry.set_posthog_factory_for_tests(lambda api_key, host: FakePostHog(api_key, host, events)) yield events telemetry._reset_for_tests() + telemetry_hooks._trusted_thread_managers.clear() + telemetry_hooks._trusted_message_context.set(None) def test_env_key_enables_telemetry(monkeypatch: pytest.MonkeyPatch, telemetry_events: list[dict]) -> None: @@ -316,24 +320,119 @@ async def consume_stream() -> None: def test_message_sent_extracts_only_safe_fields(monkeypatch: pytest.MonkeyPatch, telemetry_events: list[dict]) -> None: monkeypatch.setenv("POSTHOG_API_KEY", "ph_env") + token = telemetry_hooks._trusted_message_context.set( + { + "agent_name": "Docs Agent", + "agent_run_id": "trusted_run", + "run_trace_id": "trusted_trace", + "agent_names": {"Docs Agent", "Orchestrator"}, + } + ) message = { "role": "assistant", "type": "message", "content": [{"type": "output_text", "text": "do not send"}], - "agent": "Docs Agent", + "agent": "user controlled secret", "callerAgent": "Orchestrator", - "agent_run_id": "run_1", - "run_trace_id": "trace_1", + "agent_run_id": "/Users/private/run", + "run_trace_id": "secret_trace", } - telemetry_hooks._capture_message_sent(message, manager=object()) + try: + telemetry_hooks._capture_message_sent(message, manager=object()) + finally: + telemetry_hooks._trusted_message_context.reset(token) props = telemetry_events[0]["properties"] assert props["agent_name"] == "Docs Agent" assert props["caller_agent_name"] == "Orchestrator" + assert props["agent_run_id"] == "trusted_run" + assert props["run_trace_id"] == "trusted_trace" assert props["message_role"] == "assistant" assert "content" not in props assert "do not send" not in str(props) + assert "user controlled secret" not in str(props) + assert "/Users/private" not in str(props) + + +def test_message_sent_without_trusted_agent_context_drops_message_agent( + monkeypatch: pytest.MonkeyPatch, + telemetry_events: list[dict], +) -> None: + monkeypatch.setenv("POSTHOG_API_KEY", "ph_env") + + telemetry_hooks._capture_message_sent( + { + "role": "assistant", + "type": "arbitrary-private-type", + "agent": "private user text", + "callerAgent": "private caller text", + "agent_run_id": "private run", + }, + manager=object(), + ) + + props = telemetry_events[0]["properties"] + assert props["message_type"] == "message" + assert "agent_name" not in props + assert "caller_agent_name" not in props + assert "agent_run_id" not in props + assert "private" not in str(props) + + +def test_thread_manager_wrapper_ignores_message_telemetry_failures(monkeypatch: pytest.MonkeyPatch) -> None: + telemetry_hooks.install_thread_manager_telemetry() + from agency_swarm.utils.thread import ThreadManager + + def fail_capture(*args, **kwargs): + raise RuntimeError("telemetry failure") + + monkeypatch.setattr(telemetry_hooks, "_capture_message_sent", fail_capture) + manager = ThreadManager() + + manager.add_message({"role": "user", "type": "message", "content": "product path still works"}) + + +def test_streaming_completion_ignores_telemetry_capture_failure(monkeypatch: pytest.MonkeyPatch) -> None: + def fail_capture(*args, **kwargs): + raise RuntimeError("telemetry failure") + + monkeypatch.setattr(telemetry_hooks.telemetry, "capture", fail_capture) + stream = telemetry_hooks.TelemetryStreamingRunResponse( + TwoItemStream(), + {"agent_name": "Orchestrator", "is_streaming": True}, + telemetry_hooks.time.monotonic(), + ) + + async def consume_stream() -> list[str]: + consumed = [] + async for item in stream: + consumed.append(item) + return consumed + + assert asyncio.run(consume_stream()) == ["first", "second"] + + +def test_agency_run_wrapper_ignores_telemetry_capture_failure(monkeypatch: pytest.MonkeyPatch) -> None: + class FakeAgency: + entry_points = [SimpleNamespace(name="Orchestrator")] + + async def get_response(self, *args, **kwargs): + return SimpleNamespace() + + def get_response_stream(self, *args, **kwargs): + return TwoItemStream() + + def fail_capture(*args, **kwargs): + raise RuntimeError("telemetry failure") + + agency = FakeAgency() + monkeypatch.setattr(telemetry_hooks.telemetry, "capture", fail_capture) + telemetry_hooks._wrap_agency_methods(agency) + + result = asyncio.run(agency.get_response("hello")) + + assert isinstance(result, SimpleNamespace) def test_tool_invoked_payload_drops_content_like_properties(monkeypatch: pytest.MonkeyPatch, telemetry_events: list[dict]) -> None: From 80f8d6075e47b714cf7bcd6ccaeab300dc2daadc Mon Sep 17 00:00:00 2001 From: Mykhailo Shchuka Date: Wed, 13 May 2026 19:26:47 +0300 Subject: [PATCH 03/10] Guard telemetry packaging artifacts --- .github/workflows/build-tui.yml | 14 +++++ .github/workflows/publish-npm-on-release.yml | 14 +++++ MANIFEST.in | 3 + docs/telemetry.md | 3 + package.json | 3 + pyproject.toml | 17 +++++- scripts/check_telemetry_artifact.py | 59 ++++++++++++++++++++ setup.py | 27 +++++++++ telemetry.py | 20 +++++++ tests/test_telemetry.py | 16 ++++++ tests/test_telemetry_scripts.py | 53 ++++++++++++++++++ 11 files changed, 227 insertions(+), 2 deletions(-) create mode 100644 MANIFEST.in create mode 100644 scripts/check_telemetry_artifact.py create mode 100644 setup.py diff --git a/.github/workflows/build-tui.yml b/.github/workflows/build-tui.yml index 7a612fb2..0b635d6e 100644 --- a/.github/workflows/build-tui.yml +++ b/.github/workflows/build-tui.yml @@ -288,6 +288,20 @@ jobs: - name: Install dependencies run: npm ci + - name: Clean Python build artifacts + shell: bash + run: rm -rf build dist *.egg-info + + - name: Validate no stale telemetry config + shell: bash + run: | + set -euo pipefail + if [[ -f openswarm_telemetry_config.py ]]; then + echo "openswarm_telemetry_config.py exists before npm CI injection; remove it before publishing." >&2 + exit 1 + fi + python3 scripts/check_telemetry_artifact.py build dist + - name: Validate telemetry secrets env: POSTHOG_API_KEY: ${{ secrets.POSTHOG_API_KEY }} diff --git a/.github/workflows/publish-npm-on-release.yml b/.github/workflows/publish-npm-on-release.yml index f04b792b..724abf26 100644 --- a/.github/workflows/publish-npm-on-release.yml +++ b/.github/workflows/publish-npm-on-release.yml @@ -92,6 +92,20 @@ jobs: - name: Install dependencies run: npm ci + - name: Clean Python build artifacts + shell: bash + run: rm -rf build dist *.egg-info + + - name: Validate no stale telemetry config + shell: bash + run: | + set -euo pipefail + if [[ -f openswarm_telemetry_config.py ]]; then + echo "openswarm_telemetry_config.py exists before npm CI injection; remove it before publishing." >&2 + exit 1 + fi + python3 scripts/check_telemetry_artifact.py build dist + - name: Validate telemetry secrets env: POSTHOG_API_KEY: ${{ secrets.POSTHOG_API_KEY }} diff --git a/MANIFEST.in b/MANIFEST.in new file mode 100644 index 00000000..8ad67eef --- /dev/null +++ b/MANIFEST.in @@ -0,0 +1,3 @@ +exclude openswarm_telemetry_config.py +prune build +prune dist diff --git a/docs/telemetry.md b/docs/telemetry.md index 3ba3bec7..6d5caef3 100644 --- a/docs/telemetry.md +++ b/docs/telemetry.md @@ -6,6 +6,7 @@ OpenSwarm has backend-only PostHog telemetry for product analytics. It is enable - npm releases can include a generated `openswarm_telemetry_config.py` file created by CI from `POSTHOG_API_KEY` and `POSTHOG_HOST` GitHub secrets. - GitHub/source installs do not include a capture key and do not send telemetry unless the user explicitly sets `POSTHOG_API_KEY`. +- Python wheel/sdist artifacts intentionally exclude `openswarm_telemetry_config.py`; default telemetry injection is npm-release-only. - Environment variables always win over the generated npm config. - The runtime PostHog capture key shipped in npm is treated as public. Never use a personal API key for runtime telemetry. @@ -75,6 +76,8 @@ Raw workspace paths, raw user identifiers, API keys, emails, and Composio IDs ar `agent_name` is sent as a raw, first-class property on agent-relevant events so product analytics can group usage by agent. `agent_id` remains HMAC-derived. +Runtime telemetry rejects event names outside this allowlist before initializing PostHog or building event properties. + ## Allowed Properties Only allowlisted scalar properties are sent: diff --git a/package.json b/package.json index 7b74be19..9f3a8650 100644 --- a/package.json +++ b/package.json @@ -32,6 +32,9 @@ "virtual_assistant/", "patches/", "docs/telemetry.md", + "scripts/", + "setup.py", + "MANIFEST.in", "pyproject.toml", "package.json", "package-lock.json" diff --git a/pyproject.toml b/pyproject.toml index 17add6b1..29e6c53f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -70,12 +70,25 @@ py-modules = [ "server", "telemetry", "telemetry_hooks", - "openswarm_telemetry_config", ] [tool.setuptools.packages.find] where = ["."] -exclude = ["agentswarm-cli*", "venv*", ".venv*", ".agency_swarm*", "node_modules*", "*.node_modules*", "activity*", "mnt*", "pptx*", "slides"] +exclude = [ + "agentswarm-cli*", + "venv*", + ".venv*", + ".agency_swarm*", + "node_modules*", + "*.node_modules*", + "activity*", + "mnt*", + "pptx*", + "slides", + "build*", + "dist*", + "*.egg-info*", +] [tool.setuptools.package-data] "*" = ["*.md", "*.json", "agency-*"] diff --git a/scripts/check_telemetry_artifact.py b/scripts/check_telemetry_artifact.py new file mode 100644 index 00000000..c596f6e8 --- /dev/null +++ b/scripts/check_telemetry_artifact.py @@ -0,0 +1,59 @@ +#!/usr/bin/env python3 +from __future__ import annotations + +import argparse +import sys +import tarfile +import zipfile +from pathlib import Path + +GENERATED_CONFIG = "openswarm_telemetry_config.py" + + +def contains_generated_config(path: Path) -> bool: + if not path.exists(): + return False + if path.is_dir(): + return any(child.name == GENERATED_CONFIG for child in path.rglob(GENERATED_CONFIG)) + if path.name == GENERATED_CONFIG: + return True + if zipfile.is_zipfile(path): + with zipfile.ZipFile(path) as archive: + return any(Path(name).name == GENERATED_CONFIG for name in archive.namelist()) + if tarfile.is_tarfile(path): + with tarfile.open(path) as archive: + return any(Path(member.name).name == GENERATED_CONFIG for member in archive.getmembers()) + return False + + +def main() -> int: + parser = argparse.ArgumentParser(description="Guard against accidentally shipping generated telemetry config.") + parser.add_argument("paths", nargs="+", help="Artifact files or directories to inspect.") + parser.add_argument( + "--allow-generated-config", + action="store_true", + help="Allow openswarm_telemetry_config.py. Use only for intentional npm CI injection checks.", + ) + parser.add_argument( + "--require-generated-config", + action="store_true", + help="Fail unless at least one inspected path contains openswarm_telemetry_config.py.", + ) + args = parser.parse_args() + + matches = [str(Path(path)) for path in args.paths if contains_generated_config(Path(path))] + if matches and not args.allow_generated_config: + print( + "Refusing to ship generated telemetry config outside the intentional npm CI injection path:\n" + + "\n".join(f" - {match}" for match in matches), + file=sys.stderr, + ) + return 1 + if args.require_generated_config and not matches: + print("Expected generated telemetry config, but none was found.", file=sys.stderr) + return 1 + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/setup.py b/setup.py new file mode 100644 index 00000000..3ee7fc12 --- /dev/null +++ b/setup.py @@ -0,0 +1,27 @@ +from __future__ import annotations + +import shutil +from pathlib import Path + +from setuptools import setup +from setuptools.command.build_py import build_py as _build_py + + +class build_py(_build_py): + def run(self) -> None: + self._clean_build_lib() + super().run() + self._remove_stale_telemetry_config() + + def _clean_build_lib(self) -> None: + build_lib = Path(self.build_lib) + if build_lib.exists(): + shutil.rmtree(build_lib) + + def _remove_stale_telemetry_config(self) -> None: + generated_config = Path(self.build_lib) / "openswarm_telemetry_config.py" + if generated_config.exists(): + generated_config.unlink() + + +setup(cmdclass={"build_py": build_py}) diff --git a/telemetry.py b/telemetry.py index b21951bd..55049a04 100644 --- a/telemetry.py +++ b/telemetry.py @@ -55,6 +55,23 @@ "workspace_id", } +ALLOWED_EVENTS = { + "agent_run_completed", + "agent_run_started", + "app_started", + "error", + "handoff", + "install_created", + "llm_generation_completed", + "message_sent", + "onboarding_completed", + "provider_configured", + "swarm_run_completed", + "swarm_run_started", + "telemetry_smoke_test", + "tool_invoked", +} + CONTENT_LIKE_KEYS = { "arguments", "content", @@ -349,6 +366,9 @@ def sanitize_properties(properties: dict[str, Any] | None) -> dict[str, Any]: def capture(event: str, properties: dict[str, Any] | None = None) -> bool: + if event not in ALLOWED_EVENTS: + logger.debug("Telemetry event %s is not allowlisted", event) + return False client = _get_client() if client is None: return False diff --git a/tests/test_telemetry.py b/tests/test_telemetry.py index 8d643817..f3dfc95b 100644 --- a/tests/test_telemetry.py +++ b/tests/test_telemetry.py @@ -96,6 +96,22 @@ def test_source_install_without_key_is_noop(telemetry_events: list[dict]) -> Non assert telemetry_events == [] +def test_unknown_event_is_rejected_before_client_or_state_creation( + monkeypatch: pytest.MonkeyPatch, + tmp_path, + telemetry_events: list[dict], +) -> None: + config_dir = tmp_path / "unknown-event-config" + monkeypatch.setenv("OPENSWARM_CONFIG_DIR", str(config_dir)) + monkeypatch.setenv("POSTHOG_API_KEY", "ph_env") + telemetry._reset_for_tests() + telemetry.set_posthog_factory_for_tests(lambda api_key, host: FakePostHog(api_key, host, telemetry_events)) + + assert not telemetry.capture("user_supplied_event_name", {"agent_name": "Docs Agent"}) + assert telemetry_events == [] + assert not (config_dir / "telemetry.json").exists() + + @pytest.mark.parametrize( ("env_key", "env_value"), [ diff --git a/tests/test_telemetry_scripts.py b/tests/test_telemetry_scripts.py index 8b053bd5..8bd0dce1 100644 --- a/tests/test_telemetry_scripts.py +++ b/tests/test_telemetry_scripts.py @@ -1,7 +1,14 @@ from __future__ import annotations +import os +import subprocess +import sys +import zipfile +from pathlib import Path + import pytest +from scripts.check_telemetry_artifact import contains_generated_config from scripts.create_posthog_dashboard import build_dashboard_payload, build_dry_run_payload, build_insight_payloads from scripts.smoke_telemetry import SMOKE_EVENT, build_smoke_properties from scripts.write_telemetry_config import build_config_source @@ -48,3 +55,49 @@ def test_smoke_script_uses_manual_smoke_event() -> None: assert SMOKE_EVENT == "telemetry_smoke_test" assert props["status"] == "manual" assert "POSTHOG_API_KEY" not in str(props) + + +def test_artifact_checker_detects_stale_generated_config(tmp_path) -> None: + build_dir = tmp_path / "build" / "lib" + build_dir.mkdir(parents=True) + (build_dir / "openswarm_telemetry_config.py").write_text("POSTHOG_API_KEY = 'ph_stale'\n", encoding="utf-8") + + assert contains_generated_config(tmp_path / "build") + + +def test_python_wheel_excludes_stale_generated_config(tmp_path) -> None: + repo = Path(__file__).resolve().parents[1] + root_generated_config = repo / "openswarm_telemetry_config.py" + stale_build_config = repo / "build" / "lib" / "openswarm_telemetry_config.py" + root_generated_config.write_text("POSTHOG_API_KEY = 'ph_root'\n", encoding="utf-8") + stale_build_config.parent.mkdir(parents=True, exist_ok=True) + stale_build_config.write_text("POSTHOG_API_KEY = 'ph_stale'\n", encoding="utf-8") + + try: + result = subprocess.run( + [ + sys.executable, + "-m", + "pip", + "wheel", + "--no-build-isolation", + "--no-deps", + "-w", + str(tmp_path), + ".", + ], + cwd=repo, + env={**os.environ, "OPENSWARM_TELEMETRY_ALLOW_TESTS": "1"}, + capture_output=True, + text=True, + timeout=120, + ) + assert result.returncode == 0, result.stderr + wheel = next(tmp_path.glob("*.whl")) + with zipfile.ZipFile(wheel) as archive: + names = archive.namelist() + assert "openswarm_telemetry_config.py" not in names + assert not any(name.startswith("build/") for name in names) + finally: + root_generated_config.unlink(missing_ok=True) + stale_build_config.unlink(missing_ok=True) From 953ba794ecd20935892117a263667c1125f00453 Mon Sep 17 00:00:00 2001 From: Mykhailo Shchuka Date: Wed, 13 May 2026 19:35:43 +0300 Subject: [PATCH 04/10] Sanitize telemetry model metadata --- docs/telemetry.md | 2 + telemetry.py | 85 +++++++++++++++++++---- tests/test_telemetry.py | 116 ++++++++++++++++++++++++++++++++ tests/test_telemetry_scripts.py | 40 +++++++++++ 4 files changed, 230 insertions(+), 13 deletions(-) diff --git a/docs/telemetry.md b/docs/telemetry.md index 6d5caef3..b74baecd 100644 --- a/docs/telemetry.md +++ b/docs/telemetry.md @@ -89,6 +89,8 @@ Only allowlisted scalar properties are sent: - Setup/auth: `auth_method`, `provider`, `has_provider_key`, `install_source` - Errors: `error_type`, `error_category`, `status`, `http_status` +`model` and `provider` are additionally validated as short identifier-like values. Values that look like file paths, URLs, email addresses, API keys, tokens, or secrets are dropped. Numeric fields such as token counts, latency, HTTP status, and cost must be real numbers, and boolean fields must be real booleans. + ## Privacy Policy Telemetry must not include message contents, prompts, tool arguments, tool results, generated content, exception messages, stack traces, tracebacks, file paths, API keys, emails, Composio IDs, raw workspace paths, raw user IDs, or `telemetry_opted_out`. diff --git a/telemetry.py b/telemetry.py index 55049a04..0b968136 100644 --- a/telemetry.py +++ b/telemetry.py @@ -7,6 +7,7 @@ import json import logging import os +import re import time import uuid from pathlib import Path @@ -72,6 +73,20 @@ "tool_invoked", } +BASE_PROPERTIES = {"event_version", "session_id", "user_id", "workspace_id"} +BOOLEAN_PROPERTIES = {"has_provider_key", "is_streaming"} +INTEGER_PROPERTIES = {"http_status", "latency_ms", "tokens_input", "tokens_output"} +FLOAT_PROPERTIES = {"cost_usd"} +MODEL_ID_RE = re.compile(r"^[A-Za-z0-9][A-Za-z0-9._:/+-]{0,127}$") +SAFE_SLUG_RE = re.compile(r"^[A-Za-z0-9][A-Za-z0-9._+-]{0,63}$") +EMAIL_RE = re.compile(r"(?i)[A-Z0-9._%+-]+@[A-Z0-9.-]+\.[A-Z]{2,}") +SECRET_RE = re.compile( + r"(?i)(api[_-]?key|secret|token|password|bearer|sk-[A-Za-z0-9_-]{8,}|" + r"sk-proj-[A-Za-z0-9_-]{8,}|phc_[A-Za-z0-9_-]{8,}|phx_[A-Za-z0-9_-]{8,}|" + r"AIza[A-Za-z0-9_-]{8,}|xox[baprs]-[A-Za-z0-9_-]{8,}|gh[pousr]_[A-Za-z0-9_-]{8,})" +) +PATH_RE = re.compile(r"(?i)(^/|^\./|^\.\./|^~/|^[A-Za-z]:[\\/]|[\\]|/(Users|home|private|var|tmp)/|//)") + CONTENT_LIKE_KEYS = { "arguments", "content", @@ -307,19 +322,25 @@ def provider_from_model(model: Any) -> str | None: return None name = str(model) if "/" not in name: - if name.startswith(("gpt-", "o")): + safe_model = sanitize_model_id(name) + if safe_model and safe_model.startswith(("gpt-", "o")): return "openai" return None parts = name.split("/") + provider: str | None if parts[0] == "litellm" and len(parts) > 1: - return parts[1] - return parts[0] + provider = parts[1] + else: + provider = parts[0] + return sanitize_provider(provider) def configured_provider() -> str | None: model = os.getenv("DEFAULT_MODEL", "") if model: - return provider_from_model(model) or ("openai" if "/" not in model else None) + provider = provider_from_model(model) + if provider: + return provider if os.getenv("OPENAI_API_KEY"): return "openai" if os.getenv("ANTHROPIC_API_KEY"): @@ -333,16 +354,54 @@ def has_provider_key() -> bool: return bool(os.getenv("OPENAI_API_KEY") or os.getenv("ANTHROPIC_API_KEY") or os.getenv("GOOGLE_API_KEY")) -def _safe_scalar(value: Any) -> str | int | float | bool | None: - if value is None or isinstance(value, bool): - return value - if isinstance(value, int) and not isinstance(value, bool): - return value - if isinstance(value, float): - return value +def _looks_private(value: str) -> bool: + return bool(EMAIL_RE.search(value) or SECRET_RE.search(value) or PATH_RE.search(value)) + + +def sanitize_model_id(value: Any) -> str | None: + if not isinstance(value, str): + return None + model = value.strip() + if not model or len(model) > 128: + return None + if not MODEL_ID_RE.fullmatch(model): + return None + if _looks_private(model): + return None + return model + + +def sanitize_provider(value: Any) -> str | None: + if not isinstance(value, str): + return None + provider = value.strip() + if not provider or not SAFE_SLUG_RE.fullmatch(provider): + return None + if _looks_private(provider): + return None + return provider + + +def _safe_property(key: str, value: Any) -> str | int | float | bool | None: + if key in BASE_PROPERTIES: + return None + if key == "model": + return sanitize_model_id(value) + if key == "provider": + return sanitize_provider(value) + if key in BOOLEAN_PROPERTIES: + return value if isinstance(value, bool) else None + if key in INTEGER_PROPERTIES: + return value if isinstance(value, int) and not isinstance(value, bool) else None + if key in FLOAT_PROPERTIES: + return float(value) if isinstance(value, int | float) and not isinstance(value, bool) else None + if value is None: + return None if isinstance(value, str): return value[:256] - return str(type(value).__name__)[:128] + if isinstance(value, bool | int | float): + return value + return None def sanitize_properties(properties: dict[str, Any] | None) -> dict[str, Any]: @@ -359,7 +418,7 @@ def sanitize_properties(properties: dict[str, Any] | None) -> dict[str, Any]: continue if key in CONTENT_LIKE_KEYS: continue - scalar = _safe_scalar(value) + scalar = _safe_property(key, value) if scalar is not None: safe[key] = scalar return safe diff --git a/tests/test_telemetry.py b/tests/test_telemetry.py index f3dfc95b..6277a205 100644 --- a/tests/test_telemetry.py +++ b/tests/test_telemetry.py @@ -172,6 +172,122 @@ def test_agent_name_is_raw_on_relevant_events(telemetry_events: list[dict], monk assert props["agent_id"] != "Docs Agent" +def test_safe_model_and_provider_are_sent(monkeypatch: pytest.MonkeyPatch, telemetry_events: list[dict]) -> None: + monkeypatch.setenv("POSTHOG_API_KEY", "ph_env") + model = "litellm/anthropic/claude-sonnet-4-6" + + telemetry.capture( + "agent_run_started", + { + "agent_name": "Research", + "model": model, + "provider": telemetry.provider_from_model(model), + }, + ) + + props = telemetry_events[0]["properties"] + assert props["model"] == model + assert props["provider"] == "anthropic" + + +def test_unsafe_model_values_are_dropped(monkeypatch: pytest.MonkeyPatch, telemetry_events: list[dict]) -> None: + monkeypatch.setenv("POSTHOG_API_KEY", "ph_env") + cases = [ + ("/Users/mike/private-model", None), + ("model user@example.com", None), + ("C:\\Users\\mike\\private-model", None), + ("https://example.com/private-model", None), + ("litellm/openai/sk-secret1234567890", "openai"), + ("gpt-5.2 api_key=secret", None), + ] + + for model, expected_provider in cases: + telemetry.capture( + "agent_run_started", + { + "agent_name": "Research", + "model": model, + "provider": telemetry.provider_from_model(model), + }, + ) + props = telemetry_events[-1]["properties"] + assert "model" not in props + if expected_provider: + assert props["provider"] == expected_provider + else: + assert "provider" not in props + assert "@" not in str(props) + assert "/Users" not in str(props) + assert "sk-secret" not in str(props) + assert "api_key" not in str(props) + + +def test_configured_provider_ignores_unsafe_default_model( + monkeypatch: pytest.MonkeyPatch, + telemetry_events: list[dict], +) -> None: + monkeypatch.setenv("DEFAULT_MODEL", "gpt-5.2 api_key=secret") + monkeypatch.delenv("OPENAI_API_KEY", raising=False) + + assert telemetry.configured_provider() is None + + +def test_numeric_and_boolean_properties_are_type_gated( + monkeypatch: pytest.MonkeyPatch, + telemetry_events: list[dict], +) -> None: + monkeypatch.setenv("POSTHOG_API_KEY", "ph_env") + + telemetry.capture( + "error", + { + "error_type": "ValueError", + "error_category": "test", + "http_status": "429", + "latency_ms": "/Users/mike/private", + "tokens_input": "10", + "tokens_output": "20", + "cost_usd": "1.25", + "is_streaming": "true", + "has_provider_key": "true", + }, + ) + unsafe_props = telemetry_events[0]["properties"] + for key in ( + "http_status", + "latency_ms", + "tokens_input", + "tokens_output", + "cost_usd", + "is_streaming", + "has_provider_key", + ): + assert key not in unsafe_props + + telemetry.capture( + "error", + { + "error_type": "ValueError", + "error_category": "test", + "http_status": 429, + "latency_ms": 12, + "tokens_input": 10, + "tokens_output": 20, + "cost_usd": 1.25, + "is_streaming": False, + "has_provider_key": True, + }, + ) + safe_props = telemetry_events[1]["properties"] + assert safe_props["http_status"] == 429 + assert safe_props["latency_ms"] == 12 + assert safe_props["tokens_input"] == 10 + assert safe_props["tokens_output"] == 20 + assert safe_props["cost_usd"] == 1.25 + assert safe_props["is_streaming"] is False + assert safe_props["has_provider_key"] is True + + def test_workspace_id_is_hmac_not_plain_path_hash(telemetry_events: list[dict], tmp_path) -> None: workspace = tmp_path / "secret-project" workspace.mkdir() diff --git a/tests/test_telemetry_scripts.py b/tests/test_telemetry_scripts.py index 8bd0dce1..d1cb4362 100644 --- a/tests/test_telemetry_scripts.py +++ b/tests/test_telemetry_scripts.py @@ -1,6 +1,8 @@ from __future__ import annotations +import json import os +import shutil import subprocess import sys import zipfile @@ -101,3 +103,41 @@ def test_python_wheel_excludes_stale_generated_config(tmp_path) -> None: finally: root_generated_config.unlink(missing_ok=True) stale_build_config.unlink(missing_ok=True) + + +def test_npm_pack_includes_generated_config_after_writer() -> None: + if not shutil.which("npm"): + pytest.skip("npm is not installed") + + repo = Path(__file__).resolve().parents[1] + root_generated_config = repo / "openswarm_telemetry_config.py" + + try: + writer = subprocess.run( + [sys.executable, "scripts/write_telemetry_config.py"], + cwd=repo, + env={ + **os.environ, + "POSTHOG_API_KEY": "ph_pack_test", + "POSTHOG_HOST": "https://example.i.posthog.com", + }, + capture_output=True, + text=True, + timeout=30, + ) + assert writer.returncode == 0, writer.stderr + + packed = subprocess.run( + ["npm", "pack", "--dry-run", "--json"], + cwd=repo, + capture_output=True, + text=True, + timeout=120, + ) + assert packed.returncode == 0, packed.stderr + package = json.loads(packed.stdout)[0] + paths = {entry["path"] for entry in package["files"]} + + assert "openswarm_telemetry_config.py" in paths + finally: + root_generated_config.unlink(missing_ok=True) From 8122ea682d9ee3395420388a3d760c5cff3ac341 Mon Sep 17 00:00:00 2001 From: Mykhailo Shchuka Date: Wed, 13 May 2026 19:48:54 +0300 Subject: [PATCH 05/10] Fix PostHog dashboard query payloads --- scripts/create_posthog_dashboard.py | 30 ++++++++++++++--------------- tests/test_telemetry_scripts.py | 14 ++++++++++++++ 2 files changed, 28 insertions(+), 16 deletions(-) diff --git a/scripts/create_posthog_dashboard.py b/scripts/create_posthog_dashboard.py index 6fbcd55f..9c0c6422 100644 --- a/scripts/create_posthog_dashboard.py +++ b/scripts/create_posthog_dashboard.py @@ -33,10 +33,11 @@ def _trends_query( "dateRange": {"date_from": "-30d"}, "interval": "day", "series": [{"kind": "EventsNode", "event": event, "name": name, "math": math}], + "trendsFilter": {"display": display}, } if breakdown: source["breakdownFilter"] = {"breakdown_type": "event", "breakdown": breakdown} - return {"kind": "InsightVizNode", "source": source, "display": display} + return {"kind": "InsightVizNode", "source": source} def build_insight_payloads(dashboard_id: int | str) -> list[dict[str, Any]]: @@ -86,22 +87,19 @@ def build_insight_payloads(dashboard_id: int | str) -> list[dict[str, Any]]: "name": "Error rate", "description": "Errors divided by completed swarm runs.", "query": { - "kind": "InsightVizNode", - "display": "ActionsLineGraph", + "kind": "DataVisualizationNode", "source": { - "kind": "TrendsQuery", - "dateRange": {"date_from": "-30d"}, - "interval": "day", - "series": [ - {"kind": "EventsNode", "event": "error", "name": "Errors", "math": "total"}, - { - "kind": "EventsNode", - "event": "swarm_run_completed", - "name": "Completed runs", - "math": "total", - }, - ], - "formulas": [{"formula": "A / B"}], + "kind": "HogQLQuery", + "query": ( + "SELECT " + "toStartOfDay(timestamp) AS day, " + "countIf(event = 'error') / nullIf(countIf(event = 'swarm_run_completed'), 0) AS error_rate " + "FROM events " + "WHERE event IN ('error', 'swarm_run_completed') " + "AND timestamp >= now() - INTERVAL 30 DAY " + "GROUP BY day " + "ORDER BY day ASC" + ), }, }, "dashboards": dashboards, diff --git a/tests/test_telemetry_scripts.py b/tests/test_telemetry_scripts.py index d1cb4362..051c688a 100644 --- a/tests/test_telemetry_scripts.py +++ b/tests/test_telemetry_scripts.py @@ -37,9 +37,11 @@ def test_dashboard_agent_usage_groups_by_agent_name() -> None: payloads = build_insight_payloads(123) agent_usage = next(payload for payload in payloads if payload["name"] == "Agent usage by agent") + assert "display" not in agent_usage["query"] source = agent_usage["query"]["source"] assert source["series"][0]["event"] == "agent_run_completed" assert source["breakdownFilter"]["breakdown"] == "agent_name" + assert source["trendsFilter"]["display"] == "ActionsBarValue" assert agent_usage["dashboards"] == [123] @@ -51,6 +53,18 @@ def test_dashboard_dry_run_payload_has_no_secret_fields() -> None: assert "POSTHOG_PERSONAL_API_KEY" not in str(payload) +def test_dashboard_error_rate_uses_hogql_rate_query() -> None: + payloads = build_insight_payloads(123) + error_rate = next(payload for payload in payloads if payload["name"] == "Error rate") + + assert error_rate["query"]["kind"] == "DataVisualizationNode" + source = error_rate["query"]["source"] + assert source["kind"] == "HogQLQuery" + assert "countIf(event = 'error')" in source["query"] + assert "countIf(event = 'swarm_run_completed')" in source["query"] + assert error_rate["dashboards"] == [123] + + def test_smoke_script_uses_manual_smoke_event() -> None: props = build_smoke_properties() From 6ca3bc2f474ecab57e6ca9db3618dea409ee37b7 Mon Sep 17 00:00:00 2001 From: Mykhailo Shchuka Date: Wed, 13 May 2026 19:56:11 +0300 Subject: [PATCH 06/10] Fix telemetry smoke script import path --- scripts/smoke_telemetry.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/scripts/smoke_telemetry.py b/scripts/smoke_telemetry.py index 67cd1939..964bf83a 100644 --- a/scripts/smoke_telemetry.py +++ b/scripts/smoke_telemetry.py @@ -2,6 +2,9 @@ from __future__ import annotations import sys +from pathlib import Path + +sys.path.insert(0, str(Path(__file__).resolve().parents[1])) import telemetry From d47e1f9954b96398c848c410e89efb3720ea5313 Mon Sep 17 00:00:00 2001 From: Mykhailo Shchuka Date: Wed, 13 May 2026 20:05:43 +0300 Subject: [PATCH 07/10] Improve PostHog dashboard content --- docs/telemetry.md | 2 +- scripts/create_posthog_dashboard.py | 196 ++++++++++++++++++---------- tests/test_telemetry_scripts.py | 20 ++- 3 files changed, 148 insertions(+), 70 deletions(-) diff --git a/docs/telemetry.md b/docs/telemetry.md index b74baecd..661302e8 100644 --- a/docs/telemetry.md +++ b/docs/telemetry.md @@ -113,7 +113,7 @@ Optional: POSTHOG_APP_HOST=https://us.posthog.com ``` -The dashboard is named `OpenSwarm Product Analytics` and includes DAU, messages/day, agent runs/day, tool usage, error rate, and agent usage grouped by raw `agent_name`. +The dashboard is named `OpenSwarm Product Analytics`. It creates tagged `OpenSwarm / ...` insights for 24-hour KPI cards, product activity by day, messages by role, agent usage grouped by raw `agent_name`, tool usage, error category/type breakdowns, error rate, and a recent safe telemetry sample table. To inspect the exact API payloads without creating anything: diff --git a/scripts/create_posthog_dashboard.py b/scripts/create_posthog_dashboard.py index 9c0c6422..73fc34c9 100644 --- a/scripts/create_posthog_dashboard.py +++ b/scripts/create_posthog_dashboard.py @@ -10,29 +10,37 @@ DEFAULT_POSTHOG_APP_HOST = "https://us.posthog.com" DASHBOARD_NAME = "OpenSwarm Product Analytics" +DASHBOARD_TAGS = ["OpenSwarm", "Telemetry"] def build_dashboard_payload() -> dict[str, Any]: return { "name": DASHBOARD_NAME, - "description": "Privacy-safe OpenSwarm usage metrics.", + "description": "Privacy-safe OpenSwarm telemetry: installs, messages, agent usage, tools, and errors.", "pinned": True, + "tags": DASHBOARD_TAGS, } def _trends_query( *, - event: str, - name: str, + event: str | None = None, + name: str | None = None, + series: list[dict[str, Any]] | None = None, math: str = "total", breakdown: str | None = None, display: str = "ActionsLineGraph", + date_from: str = "-30d", ) -> dict[str, Any]: + if series is None: + if event is None or name is None: + raise ValueError("event and name are required when series is not provided") + series = [{"kind": "EventsNode", "event": event, "name": name, "math": math}] source: dict[str, Any] = { "kind": "TrendsQuery", - "dateRange": {"date_from": "-30d"}, + "dateRange": {"date_from": date_from}, "interval": "day", - "series": [{"kind": "EventsNode", "event": event, "name": name, "math": math}], + "series": series, "trendsFilter": {"display": display}, } if breakdown: @@ -40,70 +48,124 @@ def _trends_query( return {"kind": "InsightVizNode", "source": source} +def _hogql_query(query: str) -> dict[str, Any]: + return {"kind": "DataVisualizationNode", "source": {"kind": "HogQLQuery", "query": query}} + + +def _insight(name: str, description: str, query: dict[str, Any], dashboard_id: int | str) -> dict[str, Any]: + return { + "name": f"OpenSwarm / {name}", + "description": description, + "query": query, + "dashboards": [dashboard_id], + "tags": DASHBOARD_TAGS, + } + + def build_insight_payloads(dashboard_id: int | str) -> list[dict[str, Any]]: - dashboards = [dashboard_id] return [ - { - "name": "DAU", - "description": "Daily active anonymous installs, based on app_started.", - "query": _trends_query(event="app_started", name="Active installs", math="dau"), - "dashboards": dashboards, - }, - { - "name": "Messages per day", - "description": "User and assistant messages persisted by OpenSwarm.", - "query": _trends_query(event="message_sent", name="Messages sent"), - "dashboards": dashboards, - }, - { - "name": "Agent runs per day", - "description": "Completed agent run count.", - "query": _trends_query(event="agent_run_completed", name="Agent runs"), - "dashboards": dashboards, - }, - { - "name": "Tool usage by tool", - "description": "Tool invocations grouped by safe tool name.", - "query": _trends_query( - event="tool_invoked", - name="Tool invocations", - breakdown="tool_name", - display="ActionsBarValue", + _insight( + "Active installs today", + "Unique anonymous installs that started OpenSwarm in the last 24 hours.", + _trends_query(event="app_started", name="Active installs", math="dau", display="BoldNumber", date_from="-24h"), + dashboard_id, + ), + _insight( + "Messages today", + "User and assistant messages captured in the last 24 hours.", + _trends_query(event="message_sent", name="Messages", display="BoldNumber", date_from="-24h"), + dashboard_id, + ), + _insight( + "Agent runs today", + "Completed agent runs in the last 24 hours.", + _trends_query(event="agent_run_completed", name="Completed agent runs", display="BoldNumber", date_from="-24h"), + dashboard_id, + ), + _insight( + "Errors today", + "Safe error events in the last 24 hours.", + _trends_query(event="error", name="Errors", display="BoldNumber", date_from="-24h"), + dashboard_id, + ), + _insight( + "Product activity by day", + "Core OpenSwarm events over time.", + _trends_query( + series=[ + {"kind": "EventsNode", "event": "app_started", "name": "App starts", "math": "total"}, + {"kind": "EventsNode", "event": "message_sent", "name": "Messages", "math": "total"}, + {"kind": "EventsNode", "event": "swarm_run_started", "name": "Swarm runs started", "math": "total"}, + {"kind": "EventsNode", "event": "agent_run_completed", "name": "Agent runs completed", "math": "total"}, + {"kind": "EventsNode", "event": "tool_invoked", "name": "Tools invoked", "math": "total"}, + {"kind": "EventsNode", "event": "error", "name": "Errors", "math": "total"}, + ], ), - "dashboards": dashboards, - }, - { - "name": "Agent usage by agent", - "description": "Agent runs grouped by raw agent_name.", - "query": _trends_query( - event="agent_run_completed", - name="Agent runs", - breakdown="agent_name", - display="ActionsBarValue", + dashboard_id, + ), + _insight( + "Messages by role", + "Message volume split by user vs assistant.", + _trends_query(event="message_sent", name="Messages", breakdown="message_role", display="ActionsBarValue"), + dashboard_id, + ), + _insight( + "Agent usage by agent", + "Completed agent runs grouped by raw agent_name.", + _trends_query(event="agent_run_completed", name="Agent runs", breakdown="agent_name", display="ActionsBarValue"), + dashboard_id, + ), + _insight( + "Tool usage by tool", + "Tool invocations grouped by safe tool_name.", + _trends_query(event="tool_invoked", name="Tool invocations", breakdown="tool_name", display="ActionsBarValue"), + dashboard_id, + ), + _insight( + "Errors by category", + "Safe error events grouped by error_category.", + _trends_query(event="error", name="Errors", breakdown="error_category", display="ActionsBarValue"), + dashboard_id, + ), + _insight( + "Errors by type", + "Safe error events grouped by exception class only.", + _trends_query(event="error", name="Errors", breakdown="error_type", display="ActionsBarValue"), + dashboard_id, + ), + _insight( + "Error rate by day", + "Errors divided by completed swarm runs. Zero completed runs returns zero instead of null.", + _hogql_query( + "SELECT " + "toStartOfDay(timestamp) AS day, " + "if(countIf(event = 'swarm_run_completed') = 0, 0, " + "countIf(event = 'error') / countIf(event = 'swarm_run_completed')) AS error_rate " + "FROM events " + "WHERE event IN ('error', 'swarm_run_completed') " + "AND timestamp >= now() - INTERVAL 30 DAY " + "GROUP BY day " + "ORDER BY day ASC" ), - "dashboards": dashboards, - }, - { - "name": "Error rate", - "description": "Errors divided by completed swarm runs.", - "query": { - "kind": "DataVisualizationNode", - "source": { - "kind": "HogQLQuery", - "query": ( - "SELECT " - "toStartOfDay(timestamp) AS day, " - "countIf(event = 'error') / nullIf(countIf(event = 'swarm_run_completed'), 0) AS error_rate " - "FROM events " - "WHERE event IN ('error', 'swarm_run_completed') " - "AND timestamp >= now() - INTERVAL 30 DAY " - "GROUP BY day " - "ORDER BY day ASC" - ), - }, - }, - "dashboards": dashboards, - }, + dashboard_id, + ), + _insight( + "Recent telemetry samples", + "Recent OpenSwarm events with only safe structured metadata.", + _hogql_query( + "SELECT " + "timestamp, event, properties.agent_name AS agent_name, properties.message_role AS message_role, " + "properties.tool_name AS tool_name, properties.error_type AS error_type, " + "properties.error_category AS error_category, properties.status AS status " + "FROM events " + "WHERE event IN ('app_started', 'install_created', 'message_sent', 'swarm_run_started', " + "'swarm_run_completed', 'agent_run_started', 'agent_run_completed', 'tool_invoked', 'error') " + "AND timestamp >= now() - INTERVAL 7 DAY " + "ORDER BY timestamp DESC " + "LIMIT 50" + ), + dashboard_id, + ), ] @@ -154,6 +216,8 @@ def create_dashboard(host: str, environment_id: str, personal_api_key: str) -> d build_dashboard_payload(), ) dashboard_id = dashboard["id"] + # PostHog inserts API-created dashboard tiles with the latest tile first. + # Create in reverse so the dashboard opens with the KPI cards at the top. insights = [ _post_json( host, @@ -161,7 +225,7 @@ def create_dashboard(host: str, environment_id: str, personal_api_key: str) -> d personal_api_key, insight_payload, ) - for insight_payload in build_insight_payloads(dashboard_id) + for insight_payload in reversed(build_insight_payloads(dashboard_id)) ] return {"dashboard": dashboard, "insights": insights} diff --git a/tests/test_telemetry_scripts.py b/tests/test_telemetry_scripts.py index 051c688a..f5e31548 100644 --- a/tests/test_telemetry_scripts.py +++ b/tests/test_telemetry_scripts.py @@ -30,12 +30,15 @@ def test_config_writer_requires_key() -> None: def test_dashboard_payload_name() -> None: - assert build_dashboard_payload()["name"] == "OpenSwarm Product Analytics" + payload = build_dashboard_payload() + + assert payload["name"] == "OpenSwarm Product Analytics" + assert "OpenSwarm" in payload["tags"] def test_dashboard_agent_usage_groups_by_agent_name() -> None: payloads = build_insight_payloads(123) - agent_usage = next(payload for payload in payloads if payload["name"] == "Agent usage by agent") + agent_usage = next(payload for payload in payloads if payload["name"] == "OpenSwarm / Agent usage by agent") assert "display" not in agent_usage["query"] source = agent_usage["query"]["source"] @@ -43,6 +46,17 @@ def test_dashboard_agent_usage_groups_by_agent_name() -> None: assert source["breakdownFilter"]["breakdown"] == "agent_name" assert source["trendsFilter"]["display"] == "ActionsBarValue" assert agent_usage["dashboards"] == [123] + assert "OpenSwarm" in agent_usage["tags"] + + +def test_dashboard_has_useful_openswarm_sections() -> None: + payloads = build_insight_payloads(123) + names = {payload["name"] for payload in payloads} + + assert len(payloads) >= 10 + assert "OpenSwarm / Active installs today" in names + assert "OpenSwarm / Messages by role" in names + assert "OpenSwarm / Recent telemetry samples" in names def test_dashboard_dry_run_payload_has_no_secret_fields() -> None: @@ -55,7 +69,7 @@ def test_dashboard_dry_run_payload_has_no_secret_fields() -> None: def test_dashboard_error_rate_uses_hogql_rate_query() -> None: payloads = build_insight_payloads(123) - error_rate = next(payload for payload in payloads if payload["name"] == "Error rate") + error_rate = next(payload for payload in payloads if payload["name"] == "OpenSwarm / Error rate by day") assert error_rate["query"]["kind"] == "DataVisualizationNode" source = error_rate["query"]["source"] From 9776d75db7220eea8c5b03380f159977b74e3fa4 Mon Sep 17 00:00:00 2001 From: Mykhailo Shchuka Date: Wed, 13 May 2026 20:21:58 +0300 Subject: [PATCH 08/10] Apply compact PostHog dashboard layout --- docs/telemetry.md | 2 +- scripts/create_posthog_dashboard.py | 73 +++++++++++++++++++++++++++-- tests/test_telemetry_scripts.py | 35 +++++++++++++- 3 files changed, 105 insertions(+), 5 deletions(-) diff --git a/docs/telemetry.md b/docs/telemetry.md index 661302e8..3bb79a73 100644 --- a/docs/telemetry.md +++ b/docs/telemetry.md @@ -113,7 +113,7 @@ Optional: POSTHOG_APP_HOST=https://us.posthog.com ``` -The dashboard is named `OpenSwarm Product Analytics`. It creates tagged `OpenSwarm / ...` insights for 24-hour KPI cards, product activity by day, messages by role, agent usage grouped by raw `agent_name`, tool usage, error category/type breakdowns, error rate, and a recent safe telemetry sample table. +The dashboard is named `OpenSwarm Product Analytics`. It creates tagged `OpenSwarm / ...` insights for compact 24-hour KPI cards, product activity by day, messages by role, agent usage grouped by raw `agent_name`, tool usage, error category/type breakdowns, error rate, and a recent safe telemetry sample table. The dashboard script also applies a compact layout so the KPI cards appear in one row. To inspect the exact API payloads without creating anything: diff --git a/scripts/create_posthog_dashboard.py b/scripts/create_posthog_dashboard.py index 73fc34c9..a3251f96 100644 --- a/scripts/create_posthog_dashboard.py +++ b/scripts/create_posthog_dashboard.py @@ -11,6 +11,20 @@ DEFAULT_POSTHOG_APP_HOST = "https://us.posthog.com" DASHBOARD_NAME = "OpenSwarm Product Analytics" DASHBOARD_TAGS = ["OpenSwarm", "Telemetry"] +DASHBOARD_LAYOUT = [ + ("OpenSwarm / Active installs today", 0, 0, 0, 3, 3), + ("OpenSwarm / Messages today", 1, 3, 0, 3, 3), + ("OpenSwarm / Agent runs today", 2, 6, 0, 3, 3), + ("OpenSwarm / Errors today", 3, 9, 0, 3, 3), + ("OpenSwarm / Product activity by day", 4, 0, 3, 12, 5), + ("OpenSwarm / Messages by role", 5, 0, 8, 6, 4), + ("OpenSwarm / Agent usage by agent", 6, 6, 8, 6, 4), + ("OpenSwarm / Tool usage by tool", 7, 0, 12, 6, 4), + ("OpenSwarm / Errors by category", 8, 6, 12, 6, 4), + ("OpenSwarm / Errors by type", 9, 0, 16, 6, 4), + ("OpenSwarm / Error rate by day", 10, 6, 16, 6, 4), + ("OpenSwarm / Recent telemetry samples", 11, 0, 20, 12, 5), +] def build_dashboard_payload() -> dict[str, Any]: @@ -169,6 +183,32 @@ def build_insight_payloads(dashboard_id: int | str) -> list[dict[str, Any]]: ] +def build_dashboard_tile_layouts(dashboard: dict[str, Any]) -> list[dict[str, Any]]: + tiles_by_name = { + tile.get("insight", {}).get("name"): tile + for tile in dashboard.get("tiles", []) + if tile.get("insight", {}).get("name") + } + layouts: list[dict[str, Any]] = [] + xs_y = 0 + for insight_name, order, x, y, width, height in DASHBOARD_LAYOUT: + tile = tiles_by_name.get(insight_name) + if not tile: + raise ValueError(f"Dashboard is missing tile for {insight_name}") + layouts.append( + { + "id": tile["id"], + "order": order, + "layouts": { + "sm": {"x": x, "y": y, "w": width, "h": height}, + "xs": {"x": 0, "y": xs_y, "w": 1, "h": height}, + }, + } + ) + xs_y += height + return layouts + + def build_dry_run_payload(environment_id: str = "POSTHOG_ENVIRONMENT_ID") -> dict[str, Any]: dashboard_id = "DRY_RUN_DASHBOARD_ID" return { @@ -189,12 +229,31 @@ def build_dry_run_payload(environment_id: str = "POSTHOG_ENVIRONMENT_ID") -> dic def _post_json(host: str, path: str, api_key: str, payload: dict[str, Any]) -> dict[str, Any]: + return _request_json(host, path, api_key, method="POST", payload=payload) + + +def _patch_json(host: str, path: str, api_key: str, payload: dict[str, Any]) -> dict[str, Any]: + return _request_json(host, path, api_key, method="PATCH", payload=payload) + + +def _get_json(host: str, path: str, api_key: str) -> dict[str, Any]: + return _request_json(host, path, api_key, method="GET") + + +def _request_json( + host: str, + path: str, + api_key: str, + *, + method: str, + payload: dict[str, Any] | None = None, +) -> dict[str, Any]: url = f"{host.rstrip('/')}{path}" - data = json.dumps(payload).encode("utf-8") + data = json.dumps(payload).encode("utf-8") if payload is not None else None request = urllib.request.Request( url, data=data, - method="POST", + method=method, headers={ "Authorization": f"Bearer {api_key}", "Content-Type": "application/json", @@ -227,7 +286,15 @@ def create_dashboard(host: str, environment_id: str, personal_api_key: str) -> d ) for insight_payload in reversed(build_insight_payloads(dashboard_id)) ] - return {"dashboard": dashboard, "insights": insights} + dashboard = _get_json(host, f"/api/environments/{environment_id}/dashboards/{dashboard_id}/", personal_api_key) + layout = build_dashboard_tile_layouts(dashboard) + dashboard = _patch_json( + host, + f"/api/environments/{environment_id}/dashboards/{dashboard_id}/", + personal_api_key, + {"tiles": layout}, + ) + return {"dashboard": dashboard, "insights": insights, "layout": layout} def main() -> int: diff --git a/tests/test_telemetry_scripts.py b/tests/test_telemetry_scripts.py index f5e31548..dd18a80b 100644 --- a/tests/test_telemetry_scripts.py +++ b/tests/test_telemetry_scripts.py @@ -11,7 +11,12 @@ import pytest from scripts.check_telemetry_artifact import contains_generated_config -from scripts.create_posthog_dashboard import build_dashboard_payload, build_dry_run_payload, build_insight_payloads +from scripts.create_posthog_dashboard import ( + build_dashboard_payload, + build_dashboard_tile_layouts, + build_dry_run_payload, + build_insight_payloads, +) from scripts.smoke_telemetry import SMOKE_EVENT, build_smoke_properties from scripts.write_telemetry_config import build_config_source @@ -59,6 +64,34 @@ def test_dashboard_has_useful_openswarm_sections() -> None: assert "OpenSwarm / Recent telemetry samples" in names +def test_dashboard_layout_puts_kpis_in_one_compact_row() -> None: + payloads = build_insight_payloads(123) + fake_dashboard = { + "tiles": [ + {"id": index + 1, "insight": {"name": payload["name"]}} + for index, payload in enumerate(payloads) + ] + } + + layouts = build_dashboard_tile_layouts(fake_dashboard) + + first_row = layouts[:4] + assert [tile["layouts"]["sm"] for tile in first_row] == [ + {"x": 0, "y": 0, "w": 3, "h": 3}, + {"x": 3, "y": 0, "w": 3, "h": 3}, + {"x": 6, "y": 0, "w": 3, "h": 3}, + {"x": 9, "y": 0, "w": 3, "h": 3}, + ] + + by_id = {tile["id"]: tile for tile in layouts} + messages_by_role_id = payloads.index(next(p for p in payloads if p["name"] == "OpenSwarm / Messages by role")) + 1 + agent_usage_id = payloads.index(next(p for p in payloads if p["name"] == "OpenSwarm / Agent usage by agent")) + 1 + assert by_id[messages_by_role_id]["order"] == 5 + assert by_id[messages_by_role_id]["layouts"]["sm"]["x"] == 0 + assert by_id[agent_usage_id]["order"] == 6 + assert by_id[agent_usage_id]["layouts"]["sm"]["x"] == 6 + + def test_dashboard_dry_run_payload_has_no_secret_fields() -> None: payload = build_dry_run_payload("env_123") From b264e47c0a4df34f1630f934273dc0f21b7372c7 Mon Sep 17 00:00:00 2001 From: Mykhailo Shchuka Date: Wed, 13 May 2026 20:25:09 +0300 Subject: [PATCH 09/10] Enable filters for telemetry sample table --- docs/telemetry.md | 2 +- scripts/create_posthog_dashboard.py | 3 ++- tests/test_telemetry_scripts.py | 9 +++++++++ 3 files changed, 12 insertions(+), 2 deletions(-) diff --git a/docs/telemetry.md b/docs/telemetry.md index 3bb79a73..48338f88 100644 --- a/docs/telemetry.md +++ b/docs/telemetry.md @@ -113,7 +113,7 @@ Optional: POSTHOG_APP_HOST=https://us.posthog.com ``` -The dashboard is named `OpenSwarm Product Analytics`. It creates tagged `OpenSwarm / ...` insights for compact 24-hour KPI cards, product activity by day, messages by role, agent usage grouped by raw `agent_name`, tool usage, error category/type breakdowns, error rate, and a recent safe telemetry sample table. The dashboard script also applies a compact layout so the KPI cards appear in one row. +The dashboard is named `OpenSwarm Product Analytics`. It creates tagged `OpenSwarm / ...` insights for compact 24-hour KPI cards, product activity by day, messages by role, agent usage grouped by raw `agent_name`, tool usage, error category/type breakdowns, error rate, and a recent safe telemetry sample table. The dashboard script also applies a compact layout so the KPI cards appear in one row. The recent samples table includes PostHog's dashboard `{filters}` placeholder, so dashboard-level filters can narrow it by date, event, agent, tool, role, error type, or status. To inspect the exact API payloads without creating anything: diff --git a/scripts/create_posthog_dashboard.py b/scripts/create_posthog_dashboard.py index a3251f96..ef13250e 100644 --- a/scripts/create_posthog_dashboard.py +++ b/scripts/create_posthog_dashboard.py @@ -172,7 +172,8 @@ def build_insight_payloads(dashboard_id: int | str) -> list[dict[str, Any]]: "properties.tool_name AS tool_name, properties.error_type AS error_type, " "properties.error_category AS error_category, properties.status AS status " "FROM events " - "WHERE event IN ('app_started', 'install_created', 'message_sent', 'swarm_run_started', " + "WHERE {filters} " + "AND event IN ('app_started', 'install_created', 'message_sent', 'swarm_run_started', " "'swarm_run_completed', 'agent_run_started', 'agent_run_completed', 'tool_invoked', 'error') " "AND timestamp >= now() - INTERVAL 7 DAY " "ORDER BY timestamp DESC " diff --git a/tests/test_telemetry_scripts.py b/tests/test_telemetry_scripts.py index dd18a80b..48e21b18 100644 --- a/tests/test_telemetry_scripts.py +++ b/tests/test_telemetry_scripts.py @@ -64,6 +64,15 @@ def test_dashboard_has_useful_openswarm_sections() -> None: assert "OpenSwarm / Recent telemetry samples" in names +def test_recent_samples_table_supports_dashboard_filters() -> None: + payloads = build_insight_payloads(123) + samples = next(payload for payload in payloads if payload["name"] == "OpenSwarm / Recent telemetry samples") + + query = samples["query"]["source"]["query"] + assert "WHERE {filters}" in query + assert "ORDER BY timestamp DESC" in query + + def test_dashboard_layout_puts_kpis_in_one_compact_row() -> None: payloads = build_insight_payloads(123) fake_dashboard = { From 94d08212028799240a446e29122e2559068637d8 Mon Sep 17 00:00:00 2001 From: Mykhailo Shchuka Date: Wed, 13 May 2026 20:28:12 +0300 Subject: [PATCH 10/10] Apply dashboard filters to HogQL tiles --- docs/telemetry.md | 2 +- scripts/create_posthog_dashboard.py | 3 ++- tests/test_telemetry_scripts.py | 1 + 3 files changed, 4 insertions(+), 2 deletions(-) diff --git a/docs/telemetry.md b/docs/telemetry.md index 48338f88..b58fd537 100644 --- a/docs/telemetry.md +++ b/docs/telemetry.md @@ -113,7 +113,7 @@ Optional: POSTHOG_APP_HOST=https://us.posthog.com ``` -The dashboard is named `OpenSwarm Product Analytics`. It creates tagged `OpenSwarm / ...` insights for compact 24-hour KPI cards, product activity by day, messages by role, agent usage grouped by raw `agent_name`, tool usage, error category/type breakdowns, error rate, and a recent safe telemetry sample table. The dashboard script also applies a compact layout so the KPI cards appear in one row. The recent samples table includes PostHog's dashboard `{filters}` placeholder, so dashboard-level filters can narrow it by date, event, agent, tool, role, error type, or status. +The dashboard is named `OpenSwarm Product Analytics`. It creates tagged `OpenSwarm / ...` insights for compact 24-hour KPI cards, product activity by day, messages by role, agent usage grouped by raw `agent_name`, tool usage, error category/type breakdowns, error rate, and a recent safe telemetry sample table. The dashboard script also applies a compact layout so the KPI cards appear in one row. HogQL dashboard tiles include PostHog's dashboard `{filters}` placeholder, so dashboard-level filters can narrow them by date, event, agent, tool, role, error type, or status. To inspect the exact API payloads without creating anything: diff --git a/scripts/create_posthog_dashboard.py b/scripts/create_posthog_dashboard.py index ef13250e..78232101 100644 --- a/scripts/create_posthog_dashboard.py +++ b/scripts/create_posthog_dashboard.py @@ -156,7 +156,8 @@ def build_insight_payloads(dashboard_id: int | str) -> list[dict[str, Any]]: "if(countIf(event = 'swarm_run_completed') = 0, 0, " "countIf(event = 'error') / countIf(event = 'swarm_run_completed')) AS error_rate " "FROM events " - "WHERE event IN ('error', 'swarm_run_completed') " + "WHERE {filters} " + "AND event IN ('error', 'swarm_run_completed') " "AND timestamp >= now() - INTERVAL 30 DAY " "GROUP BY day " "ORDER BY day ASC" diff --git a/tests/test_telemetry_scripts.py b/tests/test_telemetry_scripts.py index 48e21b18..6ea68633 100644 --- a/tests/test_telemetry_scripts.py +++ b/tests/test_telemetry_scripts.py @@ -116,6 +116,7 @@ def test_dashboard_error_rate_uses_hogql_rate_query() -> None: assert error_rate["query"]["kind"] == "DataVisualizationNode" source = error_rate["query"]["source"] assert source["kind"] == "HogQLQuery" + assert "WHERE {filters}" in source["query"] assert "countIf(event = 'error')" in source["query"] assert "countIf(event = 'swarm_run_completed')" in source["query"] assert error_rate["dashboards"] == [123]