diff --git a/.github/workflows/build-tui.yml b/.github/workflows/build-tui.yml index 2d99b16..0b635d6 100644 --- a/.github/workflows/build-tui.yml +++ b/.github/workflows/build-tui.yml @@ -288,5 +288,36 @@ jobs: - name: Install dependencies run: npm ci + - name: Clean Python build artifacts + shell: bash + run: rm -rf build dist *.egg-info + + - name: Validate no stale telemetry config + shell: bash + run: | + set -euo pipefail + if [[ -f openswarm_telemetry_config.py ]]; then + echo "openswarm_telemetry_config.py exists before npm CI injection; remove it before publishing." >&2 + exit 1 + fi + python3 scripts/check_telemetry_artifact.py build dist + + - name: Validate telemetry secrets + env: + POSTHOG_API_KEY: ${{ secrets.POSTHOG_API_KEY }} + shell: bash + run: | + set -euo pipefail + if [[ -z "${POSTHOG_API_KEY}" ]]; then + echo "POSTHOG_API_KEY GitHub secret is required before publishing the npm artifact." >&2 + exit 1 + fi + + - name: Generate npm telemetry config + env: + POSTHOG_API_KEY: ${{ secrets.POSTHOG_API_KEY }} + POSTHOG_HOST: ${{ secrets.POSTHOG_HOST }} + run: python3 scripts/write_telemetry_config.py + - name: Publish npm package run: npm publish --access public --tag "$NPM_TAG" diff --git a/.github/workflows/publish-npm-on-release.yml b/.github/workflows/publish-npm-on-release.yml index 4fd4914..724abf2 100644 --- a/.github/workflows/publish-npm-on-release.yml +++ b/.github/workflows/publish-npm-on-release.yml @@ -92,5 +92,36 @@ jobs: - name: Install dependencies run: npm ci + - name: Clean Python build artifacts + shell: bash + run: rm -rf build dist *.egg-info + + - name: Validate no stale telemetry config + shell: bash + run: | + set -euo pipefail + if [[ -f openswarm_telemetry_config.py ]]; then + echo "openswarm_telemetry_config.py exists before npm CI injection; remove it before publishing." >&2 + exit 1 + fi + python3 scripts/check_telemetry_artifact.py build dist + + - name: Validate telemetry secrets + env: + POSTHOG_API_KEY: ${{ secrets.POSTHOG_API_KEY }} + shell: bash + run: | + set -euo pipefail + if [[ -z "${POSTHOG_API_KEY}" ]]; then + echo "POSTHOG_API_KEY GitHub secret is required before publishing the npm artifact." >&2 + exit 1 + fi + + - name: Generate npm telemetry config + env: + POSTHOG_API_KEY: ${{ secrets.POSTHOG_API_KEY }} + POSTHOG_HOST: ${{ secrets.POSTHOG_HOST }} + run: python3 scripts/write_telemetry_config.py + - name: Publish npm package run: npm publish --access public --tag "$NPM_TAG" diff --git a/.gitignore b/.gitignore index a01337d..bd30599 100644 --- a/.gitignore +++ b/.gitignore @@ -2,6 +2,7 @@ node_modules/ mnt/ .bun-cache/ .playwright-browsers/ +openswarm_telemetry_config.py # TUI binaries — downloaded automatically on first run from GitHub Releases agency-windows-x64.exe @@ -183,4 +184,4 @@ cython_debug/ .agency_swarm/ third_party/ -.claude/ \ No newline at end of file +.claude/ diff --git a/MANIFEST.in b/MANIFEST.in new file mode 100644 index 0000000..8ad67ee --- /dev/null +++ b/MANIFEST.in @@ -0,0 +1,3 @@ +exclude openswarm_telemetry_config.py +prune build +prune dist diff --git a/docs/telemetry.md b/docs/telemetry.md new file mode 100644 index 0000000..b58fd53 --- /dev/null +++ b/docs/telemetry.md @@ -0,0 +1,134 @@ +# OpenSwarm Telemetry + +OpenSwarm has backend-only PostHog telemetry for product analytics. It is enabled only when a PostHog capture key is available. + +## Distribution Behavior + +- npm releases can include a generated `openswarm_telemetry_config.py` file created by CI from `POSTHOG_API_KEY` and `POSTHOG_HOST` GitHub secrets. +- GitHub/source installs do not include a capture key and do not send telemetry unless the user explicitly sets `POSTHOG_API_KEY`. +- Python wheel/sdist artifacts intentionally exclude `openswarm_telemetry_config.py`; default telemetry injection is npm-release-only. +- Environment variables always win over the generated npm config. +- The runtime PostHog capture key shipped in npm is treated as public. Never use a personal API key for runtime telemetry. + +Runtime config: + +- `POSTHOG_API_KEY`: PostHog project capture key. +- `POSTHOG_HOST`: PostHog ingestion host. Defaults to `https://us.i.posthog.com`. + +## Opt Out + +Telemetry sends nothing when any of these are set: + +- `OPENSWARM_TELEMETRY=false` +- `OPENSWARM_TELEMETRY=0` +- `OPENSWARM_TELEMETRY=no` +- `OPENSWARM_TELEMETRY=off` +- `DO_NOT_TRACK=1` + +Telemetry is also disabled automatically under CI and pytest unless `OPENSWARM_TELEMETRY_ALLOW_TESTS=1` is set. + +## Runtime Status + +To inspect local telemetry status without printing any key: + +```bash +python - <<'PY' +import telemetry + +print({ + "enabled": telemetry.is_enabled(), + "config_source": telemetry.config_source() or "missing", + "has_capture_key": telemetry.has_posthog_key(), +}) +PY +``` + +`config_source` is `env`, `npm_config`, or `missing`. This check does not print the capture key. + +## Identity + +OpenSwarm stores an anonymous local install UUID and install secret in `~/.openswarm/telemetry.json` by default. Set `OPENSWARM_CONFIG_DIR` to change that location. + +Only derived identifiers are sent: + +- `user_id`: SHA-256 of the anonymous install UUID. +- `workspace_id`: HMAC of the workspace path using the local install secret. +- `agent_id` and `parent_agent_id`: HMACs derived from agent names. + +Raw workspace paths, raw user identifiers, API keys, emails, and Composio IDs are never sent. + +## Events + +- `app_started` +- `install_created` +- `onboarding_completed` +- `provider_configured` +- `message_sent` +- `swarm_run_started` +- `swarm_run_completed` +- `agent_run_started` +- `agent_run_completed` +- `llm_generation_completed` +- `tool_invoked` +- `handoff` +- `error` +- `telemetry_smoke_test` (manual smoke script only) + +`agent_name` is sent as a raw, first-class property on agent-relevant events so product analytics can group usage by agent. `agent_id` remains HMAC-derived. + +Runtime telemetry rejects event names outside this allowlist before initializing PostHog or building event properties. + +## Allowed Properties + +Only allowlisted scalar properties are sent: + +- IDs: `user_id`, `workspace_id`, `session_id`, `thread_id`, `run_trace_id`, `agent_run_id`, `parent_run_id` +- Agent/model: `agent_name`, `agent_id`, `parent_agent_id`, `caller_agent_name`, `model`, `provider`, `tool_name` +- Message metadata: `message_role`, `message_type` +- Usage/performance: `tokens_input`, `tokens_output`, `cost_usd`, `latency_ms`, `stop_reason`, `status`, `is_streaming` +- Setup/auth: `auth_method`, `provider`, `has_provider_key`, `install_source` +- Errors: `error_type`, `error_category`, `status`, `http_status` + +`model` and `provider` are additionally validated as short identifier-like values. Values that look like file paths, URLs, email addresses, API keys, tokens, or secrets are dropped. Numeric fields such as token counts, latency, HTTP status, and cost must be real numbers, and boolean fields must be real booleans. + +## Privacy Policy + +Telemetry must not include message contents, prompts, tool arguments, tool results, generated content, exception messages, stack traces, tracebacks, file paths, API keys, emails, Composio IDs, raw workspace paths, raw user IDs, or `telemetry_opted_out`. + +If a user opts out, OpenSwarm emits no telemetry event at all. + +## Dashboard + +Run this with a PostHog personal API key to create the default dashboard: + +```bash +POSTHOG_PERSONAL_API_KEY=phx_... \ +POSTHOG_ENVIRONMENT_ID=12345 \ +python scripts/create_posthog_dashboard.py +``` + +Optional: + +```bash +POSTHOG_APP_HOST=https://us.posthog.com +``` + +The dashboard is named `OpenSwarm Product Analytics`. It creates tagged `OpenSwarm / ...` insights for compact 24-hour KPI cards, product activity by day, messages by role, agent usage grouped by raw `agent_name`, tool usage, error category/type breakdowns, error rate, and a recent safe telemetry sample table. The dashboard script also applies a compact layout so the KPI cards appear in one row. HogQL dashboard tiles include PostHog's dashboard `{filters}` placeholder, so dashboard-level filters can narrow them by date, event, agent, tool, role, error type, or status. + +To inspect the exact API payloads without creating anything: + +```bash +python scripts/create_posthog_dashboard.py --dry-run +``` + +## Smoke Test + +To verify runtime telemetry can queue a safe manual event: + +```bash +POSTHOG_API_KEY=phc_... \ +POSTHOG_HOST=https://us.i.posthog.com \ +python scripts/smoke_telemetry.py +``` + +The smoke script sends `telemetry_smoke_test` with only allowlisted metadata. It does not send message contents, prompts, tool arguments/results, exception messages, file paths, or keys. diff --git a/onboard.py b/onboard.py index 3a838b8..2dc7848 100644 --- a/onboard.py +++ b/onboard.py @@ -6,6 +6,7 @@ """ import getpass +import os import sys from pathlib import Path @@ -290,6 +291,21 @@ def run_onboarding() -> None: # ── write .env ──────────────────────────────────────────────────────────── _write_env(updates) + for key, value in updates.items(): + if value: + os.environ[key] = value + + try: + import telemetry + + if updates.get(provider["env_key"]): + telemetry.capture_provider_configured(provider=provider["name"]) + for addon in selected_addons: + if any(updates.get(key_spec["env"]) for key_spec in addon["keys"]): + telemetry.capture_provider_configured(provider=addon["id"]) + telemetry.capture_onboarding_completed(provider=provider["name"]) + except Exception: + pass # ── summary ─────────────────────────────────────────────────────────────── console.print() diff --git a/package.json b/package.json index 9e7abc8..9f3a865 100644 --- a/package.json +++ b/package.json @@ -14,6 +14,9 @@ "run_utils.py", "onboard.py", "swarm.py", + "telemetry.py", + "telemetry_hooks.py", + "openswarm_telemetry_config.py", "config.py", "helpers.py", "server.py", @@ -28,6 +31,10 @@ "video_generation_agent/", "virtual_assistant/", "patches/", + "docs/telemetry.md", + "scripts/", + "setup.py", + "MANIFEST.in", "pyproject.toml", "package.json", "package-lock.json" diff --git a/pyproject.toml b/pyproject.toml index 26e81b9..29e6c53 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -9,6 +9,7 @@ dependencies = [ "agency-swarm[fastapi,jupyter,litellm]>=1.9.8", "questionary>=2.0.0", "python-dotenv", + "posthog>=7,<8", "rich", "fastapi", "uvicorn", @@ -60,11 +61,34 @@ dependencies = [ openswarm = "run_utils:main" [tool.setuptools] -py-modules = ["agency", "swarm", "helpers", "config", "onboard", "server"] +py-modules = [ + "agency", + "swarm", + "helpers", + "config", + "onboard", + "server", + "telemetry", + "telemetry_hooks", +] [tool.setuptools.packages.find] where = ["."] -exclude = ["agentswarm-cli*", "venv*", ".venv*", ".agency_swarm*", "node_modules*", "*.node_modules*", "activity*", "mnt*", "pptx*", "slides"] +exclude = [ + "agentswarm-cli*", + "venv*", + ".venv*", + ".agency_swarm*", + "node_modules*", + "*.node_modules*", + "activity*", + "mnt*", + "pptx*", + "slides", + "build*", + "dist*", + "*.egg-info*", +] [tool.setuptools.package-data] "*" = ["*.md", "*.json", "agency-*"] diff --git a/requirements.txt b/requirements.txt index 77db5c3..d3d574e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,6 @@ agency-swarm[fastapi,jupyter,litellm]>=1.9.7 questionary>=2.0.0 +posthog>=7,<8 fastapi uvicorn composio==0.8.0 diff --git a/scripts/check_telemetry_artifact.py b/scripts/check_telemetry_artifact.py new file mode 100644 index 0000000..c596f6e --- /dev/null +++ b/scripts/check_telemetry_artifact.py @@ -0,0 +1,59 @@ +#!/usr/bin/env python3 +from __future__ import annotations + +import argparse +import sys +import tarfile +import zipfile +from pathlib import Path + +GENERATED_CONFIG = "openswarm_telemetry_config.py" + + +def contains_generated_config(path: Path) -> bool: + if not path.exists(): + return False + if path.is_dir(): + return any(child.name == GENERATED_CONFIG for child in path.rglob(GENERATED_CONFIG)) + if path.name == GENERATED_CONFIG: + return True + if zipfile.is_zipfile(path): + with zipfile.ZipFile(path) as archive: + return any(Path(name).name == GENERATED_CONFIG for name in archive.namelist()) + if tarfile.is_tarfile(path): + with tarfile.open(path) as archive: + return any(Path(member.name).name == GENERATED_CONFIG for member in archive.getmembers()) + return False + + +def main() -> int: + parser = argparse.ArgumentParser(description="Guard against accidentally shipping generated telemetry config.") + parser.add_argument("paths", nargs="+", help="Artifact files or directories to inspect.") + parser.add_argument( + "--allow-generated-config", + action="store_true", + help="Allow openswarm_telemetry_config.py. Use only for intentional npm CI injection checks.", + ) + parser.add_argument( + "--require-generated-config", + action="store_true", + help="Fail unless at least one inspected path contains openswarm_telemetry_config.py.", + ) + args = parser.parse_args() + + matches = [str(Path(path)) for path in args.paths if contains_generated_config(Path(path))] + if matches and not args.allow_generated_config: + print( + "Refusing to ship generated telemetry config outside the intentional npm CI injection path:\n" + + "\n".join(f" - {match}" for match in matches), + file=sys.stderr, + ) + return 1 + if args.require_generated_config and not matches: + print("Expected generated telemetry config, but none was found.", file=sys.stderr) + return 1 + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/scripts/create_posthog_dashboard.py b/scripts/create_posthog_dashboard.py new file mode 100644 index 0000000..7823210 --- /dev/null +++ b/scripts/create_posthog_dashboard.py @@ -0,0 +1,327 @@ +#!/usr/bin/env python3 +from __future__ import annotations + +import argparse +import json +import os +import urllib.error +import urllib.request +from typing import Any + +DEFAULT_POSTHOG_APP_HOST = "https://us.posthog.com" +DASHBOARD_NAME = "OpenSwarm Product Analytics" +DASHBOARD_TAGS = ["OpenSwarm", "Telemetry"] +DASHBOARD_LAYOUT = [ + ("OpenSwarm / Active installs today", 0, 0, 0, 3, 3), + ("OpenSwarm / Messages today", 1, 3, 0, 3, 3), + ("OpenSwarm / Agent runs today", 2, 6, 0, 3, 3), + ("OpenSwarm / Errors today", 3, 9, 0, 3, 3), + ("OpenSwarm / Product activity by day", 4, 0, 3, 12, 5), + ("OpenSwarm / Messages by role", 5, 0, 8, 6, 4), + ("OpenSwarm / Agent usage by agent", 6, 6, 8, 6, 4), + ("OpenSwarm / Tool usage by tool", 7, 0, 12, 6, 4), + ("OpenSwarm / Errors by category", 8, 6, 12, 6, 4), + ("OpenSwarm / Errors by type", 9, 0, 16, 6, 4), + ("OpenSwarm / Error rate by day", 10, 6, 16, 6, 4), + ("OpenSwarm / Recent telemetry samples", 11, 0, 20, 12, 5), +] + + +def build_dashboard_payload() -> dict[str, Any]: + return { + "name": DASHBOARD_NAME, + "description": "Privacy-safe OpenSwarm telemetry: installs, messages, agent usage, tools, and errors.", + "pinned": True, + "tags": DASHBOARD_TAGS, + } + + +def _trends_query( + *, + event: str | None = None, + name: str | None = None, + series: list[dict[str, Any]] | None = None, + math: str = "total", + breakdown: str | None = None, + display: str = "ActionsLineGraph", + date_from: str = "-30d", +) -> dict[str, Any]: + if series is None: + if event is None or name is None: + raise ValueError("event and name are required when series is not provided") + series = [{"kind": "EventsNode", "event": event, "name": name, "math": math}] + source: dict[str, Any] = { + "kind": "TrendsQuery", + "dateRange": {"date_from": date_from}, + "interval": "day", + "series": series, + "trendsFilter": {"display": display}, + } + if breakdown: + source["breakdownFilter"] = {"breakdown_type": "event", "breakdown": breakdown} + return {"kind": "InsightVizNode", "source": source} + + +def _hogql_query(query: str) -> dict[str, Any]: + return {"kind": "DataVisualizationNode", "source": {"kind": "HogQLQuery", "query": query}} + + +def _insight(name: str, description: str, query: dict[str, Any], dashboard_id: int | str) -> dict[str, Any]: + return { + "name": f"OpenSwarm / {name}", + "description": description, + "query": query, + "dashboards": [dashboard_id], + "tags": DASHBOARD_TAGS, + } + + +def build_insight_payloads(dashboard_id: int | str) -> list[dict[str, Any]]: + return [ + _insight( + "Active installs today", + "Unique anonymous installs that started OpenSwarm in the last 24 hours.", + _trends_query(event="app_started", name="Active installs", math="dau", display="BoldNumber", date_from="-24h"), + dashboard_id, + ), + _insight( + "Messages today", + "User and assistant messages captured in the last 24 hours.", + _trends_query(event="message_sent", name="Messages", display="BoldNumber", date_from="-24h"), + dashboard_id, + ), + _insight( + "Agent runs today", + "Completed agent runs in the last 24 hours.", + _trends_query(event="agent_run_completed", name="Completed agent runs", display="BoldNumber", date_from="-24h"), + dashboard_id, + ), + _insight( + "Errors today", + "Safe error events in the last 24 hours.", + _trends_query(event="error", name="Errors", display="BoldNumber", date_from="-24h"), + dashboard_id, + ), + _insight( + "Product activity by day", + "Core OpenSwarm events over time.", + _trends_query( + series=[ + {"kind": "EventsNode", "event": "app_started", "name": "App starts", "math": "total"}, + {"kind": "EventsNode", "event": "message_sent", "name": "Messages", "math": "total"}, + {"kind": "EventsNode", "event": "swarm_run_started", "name": "Swarm runs started", "math": "total"}, + {"kind": "EventsNode", "event": "agent_run_completed", "name": "Agent runs completed", "math": "total"}, + {"kind": "EventsNode", "event": "tool_invoked", "name": "Tools invoked", "math": "total"}, + {"kind": "EventsNode", "event": "error", "name": "Errors", "math": "total"}, + ], + ), + dashboard_id, + ), + _insight( + "Messages by role", + "Message volume split by user vs assistant.", + _trends_query(event="message_sent", name="Messages", breakdown="message_role", display="ActionsBarValue"), + dashboard_id, + ), + _insight( + "Agent usage by agent", + "Completed agent runs grouped by raw agent_name.", + _trends_query(event="agent_run_completed", name="Agent runs", breakdown="agent_name", display="ActionsBarValue"), + dashboard_id, + ), + _insight( + "Tool usage by tool", + "Tool invocations grouped by safe tool_name.", + _trends_query(event="tool_invoked", name="Tool invocations", breakdown="tool_name", display="ActionsBarValue"), + dashboard_id, + ), + _insight( + "Errors by category", + "Safe error events grouped by error_category.", + _trends_query(event="error", name="Errors", breakdown="error_category", display="ActionsBarValue"), + dashboard_id, + ), + _insight( + "Errors by type", + "Safe error events grouped by exception class only.", + _trends_query(event="error", name="Errors", breakdown="error_type", display="ActionsBarValue"), + dashboard_id, + ), + _insight( + "Error rate by day", + "Errors divided by completed swarm runs. Zero completed runs returns zero instead of null.", + _hogql_query( + "SELECT " + "toStartOfDay(timestamp) AS day, " + "if(countIf(event = 'swarm_run_completed') = 0, 0, " + "countIf(event = 'error') / countIf(event = 'swarm_run_completed')) AS error_rate " + "FROM events " + "WHERE {filters} " + "AND event IN ('error', 'swarm_run_completed') " + "AND timestamp >= now() - INTERVAL 30 DAY " + "GROUP BY day " + "ORDER BY day ASC" + ), + dashboard_id, + ), + _insight( + "Recent telemetry samples", + "Recent OpenSwarm events with only safe structured metadata.", + _hogql_query( + "SELECT " + "timestamp, event, properties.agent_name AS agent_name, properties.message_role AS message_role, " + "properties.tool_name AS tool_name, properties.error_type AS error_type, " + "properties.error_category AS error_category, properties.status AS status " + "FROM events " + "WHERE {filters} " + "AND event IN ('app_started', 'install_created', 'message_sent', 'swarm_run_started', " + "'swarm_run_completed', 'agent_run_started', 'agent_run_completed', 'tool_invoked', 'error') " + "AND timestamp >= now() - INTERVAL 7 DAY " + "ORDER BY timestamp DESC " + "LIMIT 50" + ), + dashboard_id, + ), + ] + + +def build_dashboard_tile_layouts(dashboard: dict[str, Any]) -> list[dict[str, Any]]: + tiles_by_name = { + tile.get("insight", {}).get("name"): tile + for tile in dashboard.get("tiles", []) + if tile.get("insight", {}).get("name") + } + layouts: list[dict[str, Any]] = [] + xs_y = 0 + for insight_name, order, x, y, width, height in DASHBOARD_LAYOUT: + tile = tiles_by_name.get(insight_name) + if not tile: + raise ValueError(f"Dashboard is missing tile for {insight_name}") + layouts.append( + { + "id": tile["id"], + "order": order, + "layouts": { + "sm": {"x": x, "y": y, "w": width, "h": height}, + "xs": {"x": 0, "y": xs_y, "w": 1, "h": height}, + }, + } + ) + xs_y += height + return layouts + + +def build_dry_run_payload(environment_id: str = "POSTHOG_ENVIRONMENT_ID") -> dict[str, Any]: + dashboard_id = "DRY_RUN_DASHBOARD_ID" + return { + "dashboard": { + "method": "POST", + "path": f"/api/environments/{environment_id}/dashboards/", + "payload": build_dashboard_payload(), + }, + "insights": [ + { + "method": "POST", + "path": f"/api/environments/{environment_id}/insights/", + "payload": payload, + } + for payload in build_insight_payloads(dashboard_id) + ], + } + + +def _post_json(host: str, path: str, api_key: str, payload: dict[str, Any]) -> dict[str, Any]: + return _request_json(host, path, api_key, method="POST", payload=payload) + + +def _patch_json(host: str, path: str, api_key: str, payload: dict[str, Any]) -> dict[str, Any]: + return _request_json(host, path, api_key, method="PATCH", payload=payload) + + +def _get_json(host: str, path: str, api_key: str) -> dict[str, Any]: + return _request_json(host, path, api_key, method="GET") + + +def _request_json( + host: str, + path: str, + api_key: str, + *, + method: str, + payload: dict[str, Any] | None = None, +) -> dict[str, Any]: + url = f"{host.rstrip('/')}{path}" + data = json.dumps(payload).encode("utf-8") if payload is not None else None + request = urllib.request.Request( + url, + data=data, + method=method, + headers={ + "Authorization": f"Bearer {api_key}", + "Content-Type": "application/json", + }, + ) + try: + with urllib.request.urlopen(request, timeout=30) as response: + return json.loads(response.read().decode("utf-8")) + except urllib.error.HTTPError as error: + body = error.read().decode("utf-8", errors="replace") + raise RuntimeError(f"PostHog API request failed with {error.code}: {body}") from error + + +def create_dashboard(host: str, environment_id: str, personal_api_key: str) -> dict[str, Any]: + dashboard = _post_json( + host, + f"/api/environments/{environment_id}/dashboards/", + personal_api_key, + build_dashboard_payload(), + ) + dashboard_id = dashboard["id"] + # PostHog inserts API-created dashboard tiles with the latest tile first. + # Create in reverse so the dashboard opens with the KPI cards at the top. + insights = [ + _post_json( + host, + f"/api/environments/{environment_id}/insights/", + personal_api_key, + insight_payload, + ) + for insight_payload in reversed(build_insight_payloads(dashboard_id)) + ] + dashboard = _get_json(host, f"/api/environments/{environment_id}/dashboards/{dashboard_id}/", personal_api_key) + layout = build_dashboard_tile_layouts(dashboard) + dashboard = _patch_json( + host, + f"/api/environments/{environment_id}/dashboards/{dashboard_id}/", + personal_api_key, + {"tiles": layout}, + ) + return {"dashboard": dashboard, "insights": insights, "layout": layout} + + +def main() -> int: + parser = argparse.ArgumentParser(description="Create the OpenSwarm PostHog product analytics dashboard.") + parser.add_argument("--dry-run", action="store_true", help="Print dashboard and insight API payloads without creating anything.") + args = parser.parse_args() + + environment_id = os.getenv("POSTHOG_ENVIRONMENT_ID") + if args.dry_run: + print(json.dumps(build_dry_run_payload(environment_id or "POSTHOG_ENVIRONMENT_ID"), indent=2, sort_keys=True)) + return 0 + + personal_api_key = os.getenv("POSTHOG_PERSONAL_API_KEY") + host = os.getenv("POSTHOG_APP_HOST", DEFAULT_POSTHOG_APP_HOST) + if not personal_api_key: + raise SystemExit("POSTHOG_PERSONAL_API_KEY is required") + if not environment_id: + raise SystemExit("POSTHOG_ENVIRONMENT_ID is required") + + result = create_dashboard(host, environment_id, personal_api_key) + dashboard = result["dashboard"] + dashboard_url = f"{host.rstrip('/')}/project/{environment_id}/dashboard/{dashboard['id']}" + print(f"Created {DASHBOARD_NAME}: {dashboard_url}") + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/scripts/smoke_telemetry.py b/scripts/smoke_telemetry.py new file mode 100644 index 0000000..964bf83 --- /dev/null +++ b/scripts/smoke_telemetry.py @@ -0,0 +1,45 @@ +#!/usr/bin/env python3 +from __future__ import annotations + +import sys +from pathlib import Path + +sys.path.insert(0, str(Path(__file__).resolve().parents[1])) + +import telemetry + +SMOKE_EVENT = "telemetry_smoke_test" + + +def build_smoke_properties() -> dict[str, str]: + return { + "install_source": telemetry.config_source() or "source", + "status": "manual", + } + + +def main() -> int: + if not telemetry.is_enabled(): + print( + "Telemetry smoke test not sent: telemetry is disabled, opted out, running in CI/test mode, " + "or no PostHog capture key is configured.", + file=sys.stderr, + ) + print( + "Set POSTHOG_API_KEY, optional POSTHOG_HOST, and for CI smoke checks OPENSWARM_TELEMETRY_ALLOW_TESTS=1.", + file=sys.stderr, + ) + return 1 + + sent = telemetry.capture(SMOKE_EVENT, build_smoke_properties()) + telemetry.shutdown() + if not sent: + print("Telemetry smoke test failed before the event could be queued.", file=sys.stderr) + return 1 + + print(f"Queued {SMOKE_EVENT}; check PostHog ingestion for this event.") + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/scripts/write_telemetry_config.py b/scripts/write_telemetry_config.py new file mode 100644 index 0000000..c879ea8 --- /dev/null +++ b/scripts/write_telemetry_config.py @@ -0,0 +1,50 @@ +#!/usr/bin/env python3 +from __future__ import annotations + +import argparse +import os +from pathlib import Path + +DEFAULT_POSTHOG_HOST = "https://us.i.posthog.com" +HEADER = """# Generated during npm release packaging. +# Do not commit this file. The PostHog capture key is treated as public. +""" + + +def build_config_source(api_key: str, host: str | None = None) -> str: + if not api_key: + raise ValueError("POSTHOG_API_KEY is required") + resolved_host = host or DEFAULT_POSTHOG_HOST + return ( + HEADER + + f"POSTHOG_API_KEY = {api_key!r}\n" + + f"POSTHOG_HOST = {resolved_host!r}\n" + ) + + +def write_config(path: Path, api_key: str, host: str | None = None) -> None: + path.write_text(build_config_source(api_key, host), encoding="utf-8") + + +def main() -> int: + parser = argparse.ArgumentParser(description="Write npm-release-only OpenSwarm telemetry config.") + parser.add_argument( + "--output", + default=str(Path(__file__).resolve().parents[1] / "openswarm_telemetry_config.py"), + help="Generated Python module path.", + ) + args = parser.parse_args() + + api_key = os.getenv("POSTHOG_API_KEY", "") + host = os.getenv("POSTHOG_HOST") or DEFAULT_POSTHOG_HOST + if not api_key: + raise SystemExit("POSTHOG_API_KEY is required to generate npm telemetry config") + + output = Path(args.output) + write_config(output, api_key, host) + print(f"Wrote telemetry config to {output}") + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..3ee7fc1 --- /dev/null +++ b/setup.py @@ -0,0 +1,27 @@ +from __future__ import annotations + +import shutil +from pathlib import Path + +from setuptools import setup +from setuptools.command.build_py import build_py as _build_py + + +class build_py(_build_py): + def run(self) -> None: + self._clean_build_lib() + super().run() + self._remove_stale_telemetry_config() + + def _clean_build_lib(self) -> None: + build_lib = Path(self.build_lib) + if build_lib.exists(): + shutil.rmtree(build_lib) + + def _remove_stale_telemetry_config(self) -> None: + generated_config = Path(self.build_lib) / "openswarm_telemetry_config.py" + if generated_config.exists(): + generated_config.unlink() + + +setup(cmdclass={"build_py": build_py}) diff --git a/swarm.py b/swarm.py index 7ddfde4..af1658e 100644 --- a/swarm.py +++ b/swarm.py @@ -13,6 +13,8 @@ apply_file_attachment_reference_patch() apply_ipython_composio_context_patch() +_telemetry_app_started = False + _tracing_key = os.getenv("OPENAI_API_KEY") if _tracing_key: set_tracing_export_api_key(_tracing_key) @@ -20,9 +22,23 @@ set_tracing_disabled(True) +def _capture_app_started_once(): + global _telemetry_app_started + if _telemetry_app_started: + return + _telemetry_app_started = True + try: + import telemetry + + telemetry.capture_app_started() + except Exception: + pass + + def create_agency(load_threads_callback=None): from agency_swarm import Agency from agency_swarm.tools import Handoff, SendMessage + from telemetry_hooks import instrument_agency from orchestrator import create_orchestrator from virtual_assistant import create_virtual_assistant @@ -74,7 +90,8 @@ def create_agency(load_threads_callback=None): load_threads_callback=load_threads_callback, ) - return agency + _capture_app_started_once() + return instrument_agency(agency) if __name__ == "__main__": agency = create_agency() diff --git a/telemetry.py b/telemetry.py new file mode 100644 index 0000000..0b96813 --- /dev/null +++ b/telemetry.py @@ -0,0 +1,536 @@ +from __future__ import annotations + +import atexit +import hashlib +import hmac +import importlib.util +import json +import logging +import os +import re +import time +import uuid +from pathlib import Path +from typing import Any, Callable + +logger = logging.getLogger(__name__) + +DEFAULT_POSTHOG_HOST = "https://us.i.posthog.com" +FALSE_VALUES = {"0", "false", "no", "off"} +TRUE_VALUES = {"1", "true", "yes", "on"} +STATE_VERSION = 1 +_MODULE_DIR = Path(__file__).resolve().parent +_STATE_FILE_NAME = "telemetry.json" +_SESSION_ID = uuid.uuid4().hex + +ALLOWED_PROPERTIES = { + "agent_id", + "agent_name", + "agent_run_id", + "auth_method", + "caller_agent_name", + "cost_usd", + "error_category", + "error_type", + "event_version", + "has_provider_key", + "http_status", + "install_source", + "is_streaming", + "latency_ms", + "message_role", + "message_type", + "model", + "parent_agent_id", + "parent_run_id", + "provider", + "run_trace_id", + "session_id", + "status", + "stop_reason", + "thread_id", + "tokens_input", + "tokens_output", + "tool_name", + "user_id", + "workspace_id", +} + +ALLOWED_EVENTS = { + "agent_run_completed", + "agent_run_started", + "app_started", + "error", + "handoff", + "install_created", + "llm_generation_completed", + "message_sent", + "onboarding_completed", + "provider_configured", + "swarm_run_completed", + "swarm_run_started", + "telemetry_smoke_test", + "tool_invoked", +} + +BASE_PROPERTIES = {"event_version", "session_id", "user_id", "workspace_id"} +BOOLEAN_PROPERTIES = {"has_provider_key", "is_streaming"} +INTEGER_PROPERTIES = {"http_status", "latency_ms", "tokens_input", "tokens_output"} +FLOAT_PROPERTIES = {"cost_usd"} +MODEL_ID_RE = re.compile(r"^[A-Za-z0-9][A-Za-z0-9._:/+-]{0,127}$") +SAFE_SLUG_RE = re.compile(r"^[A-Za-z0-9][A-Za-z0-9._+-]{0,63}$") +EMAIL_RE = re.compile(r"(?i)[A-Z0-9._%+-]+@[A-Z0-9.-]+\.[A-Z]{2,}") +SECRET_RE = re.compile( + r"(?i)(api[_-]?key|secret|token|password|bearer|sk-[A-Za-z0-9_-]{8,}|" + r"sk-proj-[A-Za-z0-9_-]{8,}|phc_[A-Za-z0-9_-]{8,}|phx_[A-Za-z0-9_-]{8,}|" + r"AIza[A-Za-z0-9_-]{8,}|xox[baprs]-[A-Za-z0-9_-]{8,}|gh[pousr]_[A-Za-z0-9_-]{8,})" +) +PATH_RE = re.compile(r"(?i)(^/|^\./|^\.\./|^~/|^[A-Za-z]:[\\/]|[\\]|/(Users|home|private|var|tmp)/|//)") + +CONTENT_LIKE_KEYS = { + "arguments", + "content", + "exception_message", + "file_path", + "input", + "input_text", + "message", + "output", + "output_text", + "path", + "prompt", + "stack", + "tool_args", + "tool_result", + "traceback", +} + +_client: Any | None = None +_client_config: dict[str, str | None] | None = None +_posthog_factory: Callable[[str, str], Any] | None = None +_state_cache: dict[str, Any] | None = None +_state_created = False + + +def _config_dir() -> Path: + custom = os.getenv("OPENSWARM_CONFIG_DIR") + if custom: + return Path(custom).expanduser() + return Path.home() / ".openswarm" + + +def _state_path() -> Path: + return _config_dir() / _STATE_FILE_NAME + + +def _load_state() -> dict[str, Any]: + global _state_cache, _state_created + if _state_cache is not None: + return _state_cache + + path = _state_path() + if path.exists(): + try: + data = json.loads(path.read_text(encoding="utf-8")) + if isinstance(data, dict) and data.get("install_id") and data.get("install_secret"): + _state_cache = data + return _state_cache + except Exception: + logger.debug("Could not read telemetry state", exc_info=True) + + _state_created = True + _state_cache = { + "version": STATE_VERSION, + "install_id": uuid.uuid4().hex, + "install_secret": uuid.uuid4().hex + uuid.uuid4().hex, + "install_created_captured": False, + } + _save_state() + return _state_cache + + +def _save_state() -> None: + if _state_cache is None: + return + try: + path = _state_path() + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text(json.dumps(_state_cache, indent=2, sort_keys=True), encoding="utf-8") + except Exception: + logger.debug("Could not write telemetry state", exc_info=True) + + +def _sha256(value: str) -> str: + return hashlib.sha256(value.encode("utf-8")).hexdigest() + + +def _install_secret() -> bytes: + state = _load_state() + return str(state["install_secret"]).encode("utf-8") + + +def hashed_identifier(namespace: str, value: Any) -> str: + return hmac.new(_install_secret(), f"{namespace}:{value}".encode("utf-8"), hashlib.sha256).hexdigest() + + +def user_id() -> str: + state = _load_state() + return _sha256(f"openswarm:user:{state['install_id']}") + + +def workspace_id(path: str | Path | None = None) -> str: + candidate = Path(path or os.getcwd()).expanduser() + try: + normalized = str(candidate.resolve()) + except Exception: + normalized = str(candidate.absolute()) + return hashed_identifier("workspace", normalized) + + +def thread_id(value: Any) -> str: + return hashed_identifier("thread", value) + + +def agent_id(agent_name: str | None) -> str | None: + if not agent_name: + return None + return hashed_identifier("agent", agent_name) + + +def _load_generated_config() -> dict[str, str]: + config_path = _MODULE_DIR / "openswarm_telemetry_config.py" + if not config_path.exists(): + return {} + try: + spec = importlib.util.spec_from_file_location("_openswarm_telemetry_config", config_path) + if spec is None or spec.loader is None: + return {} + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) + return { + "POSTHOG_API_KEY": str(getattr(module, "POSTHOG_API_KEY", "") or ""), + "POSTHOG_HOST": str(getattr(module, "POSTHOG_HOST", "") or ""), + } + except Exception: + logger.debug("Could not load generated telemetry config", exc_info=True) + return {} + + +def _running_in_tests_or_ci() -> bool: + if os.getenv("OPENSWARM_TELEMETRY_ALLOW_TESTS", "").strip().lower() in TRUE_VALUES: + return False + if os.getenv("CI", "").strip().lower() in TRUE_VALUES: + return True + if os.getenv("PYTEST_CURRENT_TEST") or os.getenv("PYTEST_VERSION"): + return True + return False + + +def _is_opted_out() -> bool: + telemetry_env = os.getenv("OPENSWARM_TELEMETRY") + if telemetry_env is not None and telemetry_env.strip().lower() in FALSE_VALUES: + return True + if os.getenv("DO_NOT_TRACK") == "1": + return True + return _running_in_tests_or_ci() + + +def _resolve_client_config() -> dict[str, str | None]: + global _client_config + if _client_config is not None: + return _client_config + + generated = _load_generated_config() + env_key = os.getenv("POSTHOG_API_KEY") + env_host = os.getenv("POSTHOG_HOST") + + api_key = env_key or generated.get("POSTHOG_API_KEY") or None + host = env_host or generated.get("POSTHOG_HOST") or DEFAULT_POSTHOG_HOST + source = "env" if env_key else ("npm_config" if generated.get("POSTHOG_API_KEY") else None) + + _client_config = { + "api_key": api_key, + "host": host, + "source": source, + } + return _client_config + + +def is_enabled() -> bool: + if _is_opted_out(): + return False + return bool(_resolve_client_config().get("api_key")) + + +def config_source() -> str | None: + return _resolve_client_config().get("source") + + +def has_posthog_key() -> bool: + return bool(_resolve_client_config().get("api_key")) + + +def _create_posthog_client(api_key: str, host: str) -> Any: + if _posthog_factory is not None: + return _posthog_factory(api_key, host) + + os.environ.setdefault("POSTHOG_ENABLE_EXCEPTION_AUTOCAPTURE", "false") + os.environ.setdefault("POSTHOG_CAPTURE_EXCEPTION_CODE_VARIABLES", "false") + + from posthog import Posthog # type: ignore + + try: + return Posthog( + api_key, + host=host, + disable_geoip=True, + enable_exception_autocapture=False, + capture_exception_code_variables=False, + ) + except TypeError: + pass + try: + return Posthog(api_key, host=host, disable_geoip=True) + except TypeError: + return Posthog(api_key, host=host) + + +def _get_client() -> Any | None: + global _client + if not is_enabled(): + return None + if _client is not None: + return _client + + config = _resolve_client_config() + api_key = config.get("api_key") + host = config.get("host") or DEFAULT_POSTHOG_HOST + if not api_key: + return None + + try: + _client = _create_posthog_client(str(api_key), str(host)) + atexit.register(shutdown) + except Exception: + logger.debug("Could not initialize PostHog telemetry client", exc_info=True) + _client = None + return _client + + +def provider_from_model(model: Any) -> str | None: + if model is None: + return None + name = str(model) + if "/" not in name: + safe_model = sanitize_model_id(name) + if safe_model and safe_model.startswith(("gpt-", "o")): + return "openai" + return None + parts = name.split("/") + provider: str | None + if parts[0] == "litellm" and len(parts) > 1: + provider = parts[1] + else: + provider = parts[0] + return sanitize_provider(provider) + + +def configured_provider() -> str | None: + model = os.getenv("DEFAULT_MODEL", "") + if model: + provider = provider_from_model(model) + if provider: + return provider + if os.getenv("OPENAI_API_KEY"): + return "openai" + if os.getenv("ANTHROPIC_API_KEY"): + return "anthropic" + if os.getenv("GOOGLE_API_KEY"): + return "google" + return None + + +def has_provider_key() -> bool: + return bool(os.getenv("OPENAI_API_KEY") or os.getenv("ANTHROPIC_API_KEY") or os.getenv("GOOGLE_API_KEY")) + + +def _looks_private(value: str) -> bool: + return bool(EMAIL_RE.search(value) or SECRET_RE.search(value) or PATH_RE.search(value)) + + +def sanitize_model_id(value: Any) -> str | None: + if not isinstance(value, str): + return None + model = value.strip() + if not model or len(model) > 128: + return None + if not MODEL_ID_RE.fullmatch(model): + return None + if _looks_private(model): + return None + return model + + +def sanitize_provider(value: Any) -> str | None: + if not isinstance(value, str): + return None + provider = value.strip() + if not provider or not SAFE_SLUG_RE.fullmatch(provider): + return None + if _looks_private(provider): + return None + return provider + + +def _safe_property(key: str, value: Any) -> str | int | float | bool | None: + if key in BASE_PROPERTIES: + return None + if key == "model": + return sanitize_model_id(value) + if key == "provider": + return sanitize_provider(value) + if key in BOOLEAN_PROPERTIES: + return value if isinstance(value, bool) else None + if key in INTEGER_PROPERTIES: + return value if isinstance(value, int) and not isinstance(value, bool) else None + if key in FLOAT_PROPERTIES: + return float(value) if isinstance(value, int | float) and not isinstance(value, bool) else None + if value is None: + return None + if isinstance(value, str): + return value[:256] + if isinstance(value, bool | int | float): + return value + return None + + +def sanitize_properties(properties: dict[str, Any] | None) -> dict[str, Any]: + safe: dict[str, Any] = { + "event_version": 1, + "user_id": user_id(), + "workspace_id": workspace_id(), + "session_id": _SESSION_ID, + } + if not properties: + return safe + for key, value in properties.items(): + if key not in ALLOWED_PROPERTIES: + continue + if key in CONTENT_LIKE_KEYS: + continue + scalar = _safe_property(key, value) + if scalar is not None: + safe[key] = scalar + return safe + + +def capture(event: str, properties: dict[str, Any] | None = None) -> bool: + if event not in ALLOWED_EVENTS: + logger.debug("Telemetry event %s is not allowlisted", event) + return False + client = _get_client() + if client is None: + return False + props = sanitize_properties(properties) + try: + client.capture(event=event, distinct_id=props["user_id"], properties=props) + return True + except TypeError: + try: + client.capture(distinct_id=props["user_id"], event=event, properties=props) + return True + except Exception: + logger.debug("PostHog capture failed for event %s", event, exc_info=True) + return False + except Exception: + logger.debug("PostHog capture failed for event %s", event, exc_info=True) + return False + + +def capture_error(error: BaseException, *, category: str, properties: dict[str, Any] | None = None) -> bool: + props = dict(properties or {}) + props.update( + { + "error_type": error.__class__.__name__, + "error_category": category, + "status": props.get("status") or "error", + } + ) + return capture("error", props) + + +def capture_app_started(*, install_source: str | None = None) -> None: + if not is_enabled(): + return + state = _load_state() + if not state.get("install_created_captured"): + if capture( + "install_created", + { + "install_source": install_source or config_source() or "source", + "has_provider_key": has_provider_key(), + "provider": configured_provider(), + }, + ): + state["install_created_captured"] = True + _save_state() + capture( + "app_started", + { + "install_source": install_source or config_source() or "source", + "has_provider_key": has_provider_key(), + "provider": configured_provider(), + }, + ) + + +def capture_onboarding_completed(*, provider: str | None, auth_method: str = "api_key") -> None: + capture( + "onboarding_completed", + { + "auth_method": auth_method, + "provider": provider, + "has_provider_key": has_provider_key(), + "install_source": config_source() or "source", + }, + ) + + +def capture_provider_configured(*, provider: str | None, auth_method: str = "api_key") -> None: + capture( + "provider_configured", + { + "auth_method": auth_method, + "provider": provider, + "has_provider_key": has_provider_key(), + "install_source": config_source() or "source", + }, + ) + + +def shutdown() -> None: + if _client is None: + return + for method_name in ("shutdown", "flush"): + method = getattr(_client, method_name, None) + if callable(method): + try: + method() + except Exception: + logger.debug("PostHog telemetry shutdown failed", exc_info=True) + break + + +def set_posthog_factory_for_tests(factory: Callable[[str, str], Any] | None) -> None: + global _posthog_factory, _client + _posthog_factory = factory + _client = None + + +def _reset_for_tests() -> None: + global _client, _client_config, _posthog_factory, _state_cache, _state_created + _client = None + _client_config = None + _posthog_factory = None + _state_cache = None + _state_created = False diff --git a/telemetry_hooks.py b/telemetry_hooks.py new file mode 100644 index 0000000..2d37635 --- /dev/null +++ b/telemetry_hooks.py @@ -0,0 +1,664 @@ +from __future__ import annotations + +import contextvars +import inspect +import logging +import time +import types +import uuid +from collections.abc import Iterable +from typing import Any + +from agents.lifecycle import AgentHooksBase, RunHooksBase + +import telemetry + +logger = logging.getLogger(__name__) +_trusted_message_context: contextvars.ContextVar[dict[str, Any] | None] = contextvars.ContextVar( + "openswarm_trusted_message_context", + default=None, +) +_trusted_thread_managers: dict[int, dict[str, Any]] = {} +_MESSAGE_TYPES = {"message"} + + +class CompositeRunHooks(RunHooksBase[Any, Any]): + """Run hook fan-out that preserves existing user or persistence hooks.""" + + def __init__(self, *hooks: RunHooksBase[Any, Any] | None) -> None: + self.hooks = [hook for hook in hooks if hook is not None and not isinstance(hook, OpenSwarmTelemetryRunHooks)] + self.telemetry_hooks = OpenSwarmTelemetryRunHooks() + + async def _invoke(self, hook: Any, method_name: str, *args: Any) -> None: + method = getattr(hook, method_name, None) + if method is None: + return + result = method(*args) + if inspect.isawaitable(result): + await result + + async def _call(self, method_name: str, *args: Any) -> None: + for hook in self.hooks: + await self._invoke(hook, method_name, *args) + try: + await self._invoke(self.telemetry_hooks, method_name, *args) + except Exception: + logger.debug("Telemetry run hook %s failed", method_name, exc_info=True) + + async def on_llm_start(self, context: Any, agent: Any, system_prompt: str | None, input_items: list[Any]) -> None: + await self._call("on_llm_start", context, agent, system_prompt, input_items) + + async def on_llm_end(self, context: Any, agent: Any, response: Any) -> None: + await self._call("on_llm_end", context, agent, response) + + async def on_agent_start(self, context: Any, agent: Any) -> None: + await self._call("on_agent_start", context, agent) + + async def on_agent_end(self, context: Any, agent: Any, output: Any) -> None: + await self._call("on_agent_end", context, agent, output) + + async def on_handoff(self, context: Any, from_agent: Any, to_agent: Any) -> None: + await self._call("on_handoff", context, from_agent, to_agent) + + async def on_tool_start(self, context: Any, agent: Any, tool: Any) -> None: + await self._call("on_tool_start", context, agent, tool) + + async def on_tool_end(self, context: Any, agent: Any, tool: Any, result: str) -> None: + await self._call("on_tool_end", context, agent, tool, result) + + +class CompositeAgentHooks(AgentHooksBase[Any, Any]): + """Agent hook fan-out that keeps any pre-existing per-agent hooks active.""" + + def __init__(self, *hooks: AgentHooksBase[Any, Any] | None) -> None: + self.hooks = [hook for hook in hooks if hook is not None and not isinstance(hook, OpenSwarmTelemetryAgentHooks)] + self.telemetry_hooks = OpenSwarmTelemetryAgentHooks() + + async def _invoke(self, hook: Any, method_name: str, *args: Any) -> None: + method = getattr(hook, method_name, None) + if method is None: + return + result = method(*args) + if inspect.isawaitable(result): + await result + + async def _call(self, method_name: str, *args: Any) -> None: + for hook in self.hooks: + await self._invoke(hook, method_name, *args) + try: + await self._invoke(self.telemetry_hooks, method_name, *args) + except Exception: + logger.debug("Telemetry agent hook %s failed", method_name, exc_info=True) + + async def on_start(self, context: Any, agent: Any) -> None: + await self._call("on_start", context, agent) + + async def on_end(self, context: Any, agent: Any, output: Any) -> None: + await self._call("on_end", context, agent, output) + + async def on_handoff(self, context: Any, agent: Any, source: Any) -> None: + await self._call("on_handoff", context, agent, source) + + async def on_tool_start(self, context: Any, agent: Any, tool: Any) -> None: + await self._call("on_tool_start", context, agent, tool) + + async def on_tool_end(self, context: Any, agent: Any, tool: Any, result: str) -> None: + await self._call("on_tool_end", context, agent, tool, result) + + async def on_llm_start(self, context: Any, agent: Any, system_prompt: str | None, input_items: list[Any]) -> None: + await self._call("on_llm_start", context, agent, system_prompt, input_items) + + async def on_llm_end(self, context: Any, agent: Any, response: Any) -> None: + await self._call("on_llm_end", context, agent, response) + + +class OpenSwarmTelemetryRunHooks(RunHooksBase[Any, Any]): + """Run hook placeholder so composition remains explicit and future-safe.""" + + +class OpenSwarmTelemetryAgentHooks(AgentHooksBase[Any, Any]): + def __init__(self) -> None: + self._agent_start_times: dict[tuple[int, str], float] = {} + self._tool_start_times: dict[tuple[int, str, str], float] = {} + self._llm_start_times: dict[tuple[int, str], float] = {} + + async def on_start(self, context: Any, agent: Any) -> None: + key = _context_agent_key(context, agent) + self._agent_start_times[key] = time.monotonic() + props = _agent_props(context, agent, status="started") + _set_trusted_message_context(props) + _safe_capture("agent_run_started", props) + + async def on_end(self, context: Any, agent: Any, output: Any) -> None: + key = _context_agent_key(context, agent) + started = self._agent_start_times.pop(key, None) + _safe_capture( + "agent_run_completed", + _agent_props( + context, + agent, + status="completed", + latency_ms=_elapsed_ms(started), + ), + ) + + async def on_tool_start(self, context: Any, agent: Any, tool: Any) -> None: + tool_name = _tool_name(tool) + self._tool_start_times[_context_tool_key(context, agent, tool_name)] = time.monotonic() + + async def on_tool_end(self, context: Any, agent: Any, tool: Any, result: str) -> None: + tool_name = _tool_name(tool) + started = self._tool_start_times.pop(_context_tool_key(context, agent, tool_name), None) + props = _agent_props(context, agent, status="completed", latency_ms=_elapsed_ms(started)) + props["tool_name"] = tool_name + _safe_capture("tool_invoked", props) + + async def on_handoff(self, context: Any, agent: Any, source: Any) -> None: + source_name = _agent_name(source) + props = _agent_props(context, agent, status="completed") + _set_trusted_message_context(props) + if source_name: + props["caller_agent_name"] = source_name + props["parent_agent_id"] = telemetry.agent_id(source_name) + _safe_capture("handoff", props) + + async def on_llm_start(self, context: Any, agent: Any, system_prompt: str | None, input_items: list[Any]) -> None: + self._llm_start_times[_context_agent_key(context, agent)] = time.monotonic() + + async def on_llm_end(self, context: Any, agent: Any, response: Any) -> None: + key = _context_agent_key(context, agent) + started = self._llm_start_times.pop(key, None) + props = _agent_props(context, agent, status="completed", latency_ms=_elapsed_ms(started)) + props.update(_usage_props_from_response(response)) + stop_reason = _stop_reason(response) + if stop_reason: + props["stop_reason"] = stop_reason + _safe_capture("llm_generation_completed", props) + + +class TelemetryStreamingRunResponse: + """Proxy that records completion/error only after a stream is consumed.""" + + def __init__(self, stream: Any, base_props: dict[str, Any], started_at: float) -> None: + self._stream = stream + self._base_props = base_props + self._started_at = started_at + self._finished = False + self._cancelled = False + + def __getattr__(self, name: str) -> Any: + return getattr(self._stream, name) + + def __aiter__(self) -> Any: + return self + + async def __anext__(self) -> Any: + token = _set_trusted_message_context(self._base_props) + try: + return await self._stream.__anext__() + except StopAsyncIteration: + self._complete(status="cancelled" if self._cancelled else "completed") + raise + except BaseException as exc: + self._error(exc) + raise + finally: + _reset_trusted_message_context(token) + + async def asend(self, value: Any) -> Any: + token = _set_trusted_message_context(self._base_props) + try: + return await self._stream.asend(value) + except StopAsyncIteration: + self._complete(status="cancelled" if self._cancelled else "completed") + raise + except BaseException as exc: + self._error(exc) + raise + finally: + _reset_trusted_message_context(token) + + async def athrow(self, typ: Any, val: Any = None, tb: Any = None) -> Any: + token = _set_trusted_message_context(self._base_props) + try: + return await self._stream.athrow(typ, val, tb) + except BaseException as exc: + self._error(exc) + raise + finally: + _reset_trusted_message_context(token) + + async def aclose(self) -> None: + token = _set_trusted_message_context(self._base_props) + try: + await self._stream.aclose() + except BaseException as exc: + self._error(exc) + raise + finally: + self._complete(status="cancelled" if self._cancelled else "closed") + _reset_trusted_message_context(token) + + def cancel(self, mode: str = "immediate") -> None: + self._cancelled = True + cancel = getattr(self._stream, "cancel", None) + if callable(cancel): + cancel(mode=mode) + + async def wait_final_result(self) -> Any: + token = _set_trusted_message_context(self._base_props) + try: + result = await self._stream.wait_final_result() + except BaseException as exc: + self._error(exc) + raise + finally: + _reset_trusted_message_context(token) + self._complete(status="cancelled" if self._cancelled else "completed", run_result=result) + return result + + @property + def final_result(self) -> Any: + return getattr(self._stream, "final_result", None) + + @property + def final_output(self) -> Any: + return getattr(self._stream, "final_output", None) + + def _adopt_stream(self, other: Any) -> None: + adopt = getattr(self._stream, "_adopt_stream", None) + if callable(adopt): + adopt(other) + + def _complete(self, *, status: str, run_result: Any | None = None) -> None: + if self._finished: + return + self._finished = True + props = dict(self._base_props) + props.update({"status": status, "latency_ms": _elapsed_ms(self._started_at)}) + props.update(_usage_props_from_run_result(run_result if run_result is not None else self.final_result)) + _safe_capture("swarm_run_completed", props) + + def _error(self, exc: BaseException) -> None: + if self._finished: + return + self._finished = True + props = dict(self._base_props) + props.update({"status": "error", "latency_ms": _elapsed_ms(self._started_at)}) + _safe_capture_error(exc, category="swarm_run", properties=props) + + +def instrument_agency(agency: Any) -> Any: + """Attach OpenSwarm telemetry to an Agency instance without replacing hooks.""" + + install_thread_manager_telemetry() + _register_thread_manager_context(agency) + _attach_agent_hooks(_iter_agents(agency)) + _wrap_agency_methods(agency) + if getattr(agency, "persistence_hooks", None) is not None: + agency.persistence_hooks = compose_run_hooks(agency.persistence_hooks) + return agency + + +def compose_run_hooks(hooks: RunHooksBase[Any, Any] | None) -> RunHooksBase[Any, Any] | None: + if hooks is None: + return None + if isinstance(hooks, CompositeRunHooks): + return hooks + return CompositeRunHooks(hooks) + + +def compose_agent_hooks(hooks: AgentHooksBase[Any, Any] | None) -> AgentHooksBase[Any, Any]: + if isinstance(hooks, CompositeAgentHooks): + return hooks + return CompositeAgentHooks(hooks) + + +def install_thread_manager_telemetry() -> None: + from agency_swarm.utils.thread import ThreadManager + + if getattr(ThreadManager, "_openswarm_telemetry_wrapped", False): + return + + original_add_message = ThreadManager.add_message + original_add_messages = ThreadManager.add_messages + + def add_message(self: Any, message: Any) -> Any: + result = original_add_message(self, message) + try: + _capture_message_sent(message, self) + except Exception: + logger.debug("Telemetry message capture failed", exc_info=True) + return result + + def add_messages(self: Any, messages: list[Any]) -> Any: + result = original_add_messages(self, messages) + for message in messages or []: + try: + _capture_message_sent(message, self) + except Exception: + logger.debug("Telemetry message capture failed", exc_info=True) + return result + + ThreadManager.add_message = add_message + ThreadManager.add_messages = add_messages + ThreadManager._openswarm_telemetry_wrapped = True + + +def _wrap_agency_methods(agency: Any) -> None: + if getattr(agency, "_openswarm_telemetry_wrapped", False): + return + + original_get_response = agency.get_response + original_get_response_stream = agency.get_response_stream + + async def get_response_with_telemetry(self: Any, *args: Any, **kwargs: Any) -> Any: + args = _compose_hooks_override(args, kwargs) + props = _swarm_props(self, args, kwargs, is_streaming=False, status="started") + started_at = time.monotonic() + _safe_capture("swarm_run_started", props) + token = _set_trusted_message_context(props) + try: + result = await original_get_response(*args, **kwargs) + except BaseException as exc: + error_props = dict(props) + error_props.update({"status": "error", "latency_ms": _elapsed_ms(started_at)}) + _safe_capture_error(exc, category="swarm_run", properties=error_props) + raise + finally: + _reset_trusted_message_context(token) + completed_props = dict(props) + completed_props.update({"status": "completed", "latency_ms": _elapsed_ms(started_at)}) + completed_props.update(_usage_props_from_run_result(result)) + _safe_capture("swarm_run_completed", completed_props) + return result + + def get_response_stream_with_telemetry(self: Any, *args: Any, **kwargs: Any) -> Any: + args = _compose_hooks_override(args, kwargs) + props = _swarm_props(self, args, kwargs, is_streaming=True, status="started") + started_at = time.monotonic() + _safe_capture("swarm_run_started", props) + try: + stream = original_get_response_stream(*args, **kwargs) + except BaseException as exc: + error_props = dict(props) + error_props.update({"status": "error", "latency_ms": _elapsed_ms(started_at)}) + _safe_capture_error(exc, category="swarm_run", properties=error_props) + raise + return TelemetryStreamingRunResponse(stream, props, started_at) + + agency.get_response = types.MethodType(get_response_with_telemetry, agency) + agency.get_response_stream = types.MethodType(get_response_stream_with_telemetry, agency) + agency._openswarm_telemetry_wrapped = True + + +def _compose_hooks_override(args: tuple[Any, ...], kwargs: dict[str, Any]) -> tuple[Any, ...]: + if "hooks_override" in kwargs and kwargs["hooks_override"] is not None: + kwargs["hooks_override"] = compose_run_hooks(kwargs["hooks_override"]) + return args + if len(args) <= 3 or args[3] is None: + return args + mutable_args = list(args) + mutable_args[3] = compose_run_hooks(mutable_args[3]) + return tuple(mutable_args) + + +def _attach_agent_hooks(agents: Iterable[Any]) -> None: + for agent in agents: + try: + agent.hooks = compose_agent_hooks(getattr(agent, "hooks", None)) + except Exception: + logger.debug("Could not attach telemetry hooks to agent", exc_info=True) + + +def _iter_agents(agency: Any) -> list[Any]: + raw_agents = getattr(agency, "agents", None) + if isinstance(raw_agents, dict): + return list(raw_agents.values()) + if raw_agents is not None: + return list(raw_agents) + entry_points = getattr(agency, "entry_points", None) + return list(entry_points or []) + + +def _register_thread_manager_context(agency: Any) -> None: + manager = getattr(agency, "thread_manager", None) + if manager is None: + return + agent_names = {name for name in (_agent_name(agent) for agent in _iter_agents(agency)) if name} + _trusted_thread_managers[id(manager)] = {"agent_names": agent_names} + + +def _capture_message_sent(message: Any, manager: Any) -> None: + if not isinstance(message, dict): + return + role = message.get("role") + if role not in {"user", "assistant"}: + return + trusted_context = _message_context_for(manager) + known_agent_names = trusted_context.get("agent_names") or set() + agent_name = _trusted_agent_name(trusted_context, known_agent_names) + caller_agent_name = _trusted_message_agent_name(message.get("callerAgent"), known_agent_names) + props: dict[str, Any] = { + "message_role": role, + "message_type": _safe_message_type(message.get("type")), + "thread_id": telemetry.thread_id(id(manager)), + "session_id": telemetry._SESSION_ID, + "is_streaming": bool(trusted_context.get("is_streaming", False)), + } + if agent_name: + props["agent_name"] = agent_name + props["agent_id"] = telemetry.agent_id(agent_name) + if caller_agent_name: + props["caller_agent_name"] = caller_agent_name + for key in ("agent_run_id", "parent_run_id", "run_trace_id"): + value = _safe_text(trusted_context.get(key)) + if value: + props[key] = value + _safe_capture("message_sent", props) + + +def _message_context_for(manager: Any) -> dict[str, Any]: + context = dict(_trusted_message_context.get() or {}) + manager_context = _trusted_thread_managers.get(id(manager), {}) + agent_names = set(manager_context.get("agent_names") or set()) + agent_names.update(context.get("agent_names") or set()) + context["agent_names"] = agent_names + return context + + +def _trusted_agent_name(context: dict[str, Any], known_agent_names: set[str]) -> str | None: + agent_name = _safe_text(context.get("agent_name")) + if not agent_name: + return None + if known_agent_names and agent_name not in known_agent_names: + return None + return agent_name + + +def _trusted_message_agent_name(value: Any, known_agent_names: set[str]) -> str | None: + candidate = _safe_text(value) + if not candidate or not known_agent_names: + return None + return candidate if candidate in known_agent_names else None + + +def _safe_message_type(value: Any) -> str: + candidate = _safe_text(value) + if candidate in _MESSAGE_TYPES: + return candidate + return "message" + + +def _set_trusted_message_context(props: dict[str, Any]) -> contextvars.Token[dict[str, Any] | None]: + current = dict(_trusted_message_context.get() or {}) + merged = {**current, **props} + if "agent_names" in current and "agent_names" not in props: + merged["agent_names"] = current["agent_names"] + return _trusted_message_context.set(merged) + + +def _reset_trusted_message_context(token: contextvars.Token[dict[str, Any] | None]) -> None: + try: + _trusted_message_context.reset(token) + except Exception: + logger.debug("Could not reset telemetry message context", exc_info=True) + + +def _swarm_props(agency: Any, args: tuple[Any, ...], kwargs: dict[str, Any], *, is_streaming: bool, status: str) -> dict[str, Any]: + agent_name = _resolve_recipient_agent_name(agency, args, kwargs) + props: dict[str, Any] = { + "status": status, + "is_streaming": is_streaming, + "run_trace_id": uuid.uuid4().hex, + } + if agent_name: + props["agent_name"] = agent_name + props["agent_id"] = telemetry.agent_id(agent_name) + return props + + +def _resolve_recipient_agent_name(agency: Any, args: tuple[Any, ...], kwargs: dict[str, Any]) -> str | None: + recipient = kwargs.get("recipient_agent") + if recipient is None and len(args) >= 2: + recipient = args[1] + if recipient is None: + entry_points = getattr(agency, "entry_points", None) or [] + recipient = entry_points[0] if entry_points else None + return _agent_name(recipient) + + +def _agent_props(context: Any, agent: Any, **extra: Any) -> dict[str, Any]: + name = _agent_name(agent) + props: dict[str, Any] = { + "agent_name": name, + "agent_id": telemetry.agent_id(name), + } + model = _model_name(agent) + if model: + props["model"] = model + props["provider"] = telemetry.provider_from_model(model) + run_context = getattr(context, "context", None) + for source_key, target_key in ( + ("_current_agent_run_id", "agent_run_id"), + ("_parent_run_id", "parent_run_id"), + ("_run_trace_id", "run_trace_id"), + ): + value = _safe_text(getattr(run_context, source_key, None)) + if value: + props[target_key] = value + props.update({key: value for key, value in extra.items() if value is not None}) + return props + + +def _safe_capture(event: str, properties: dict[str, Any] | None = None) -> bool: + try: + return telemetry.capture(event, properties) + except Exception: + logger.debug("Telemetry capture failed for event %s", event, exc_info=True) + return False + + +def _safe_capture_error(error: BaseException, *, category: str, properties: dict[str, Any] | None = None) -> bool: + try: + return telemetry.capture_error(error, category=category, properties=properties) + except Exception: + logger.debug("Telemetry error capture failed for category %s", category, exc_info=True) + return False + + +def _model_name(agent: Any) -> str | None: + try: + from agency_swarm.agent.execution import get_model_name + + return get_model_name(getattr(agent, "model", None)) + except Exception: + model = getattr(agent, "model", None) + return str(model) if model is not None else None + + +def _usage_props_from_run_result(run_result: Any) -> dict[str, Any]: + if run_result is None: + return {} + try: + from agency_swarm.utils.usage_tracking import calculate_usage_with_cost, extract_usage_from_run_result + + usage = extract_usage_from_run_result(run_result) + if usage is None: + return {} + usage = calculate_usage_with_cost(usage, run_result=run_result) + return _usage_props(usage.input_tokens, usage.output_tokens, usage.total_cost) + except Exception: + logger.debug("Could not extract run usage for telemetry", exc_info=True) + return {} + + +def _usage_props_from_response(response: Any) -> dict[str, Any]: + usage = getattr(response, "usage", None) + if usage is None: + return {} + return _usage_props( + getattr(usage, "input_tokens", None), + getattr(usage, "output_tokens", None), + None, + ) + + +def _usage_props(tokens_input: Any, tokens_output: Any, cost: Any) -> dict[str, Any]: + props: dict[str, Any] = {} + if isinstance(tokens_input, int): + props["tokens_input"] = tokens_input + if isinstance(tokens_output, int): + props["tokens_output"] = tokens_output + if isinstance(cost, int | float): + props["cost_usd"] = float(cost) + return props + + +def _stop_reason(response: Any) -> str | None: + raw_items = getattr(response, "output", None) or getattr(response, "items", None) or [] + if not isinstance(raw_items, list): + return None + for item in raw_items: + status = _safe_text(getattr(item, "status", None)) + if status: + return status + if isinstance(item, dict): + status = _safe_text(item.get("status")) + if status: + return status + return None + + +def _tool_name(tool: Any) -> str: + return _safe_text(getattr(tool, "name", None) or getattr(tool, "__name__", None) or tool.__class__.__name__) or "unknown" + + +def _agent_name(agent: Any) -> str | None: + if agent is None: + return None + if isinstance(agent, str): + return agent + return _safe_text(getattr(agent, "name", None)) + + +def _context_agent_key(context: Any, agent: Any) -> tuple[int, str]: + return (id(context), _agent_name(agent) or "unknown") + + +def _context_tool_key(context: Any, agent: Any, tool_name: str) -> tuple[int, str, str]: + return (id(context), _agent_name(agent) or "unknown", tool_name) + + +def _elapsed_ms(started_at: float | None) -> int | None: + if started_at is None: + return None + return max(0, int((time.monotonic() - started_at) * 1000)) + + +def _safe_text(value: Any) -> str | None: + if value is None: + return None + if isinstance(value, str): + return value[:256] + return str(value)[:256] diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..f96b4d8 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,8 @@ +from __future__ import annotations + +import sys +from pathlib import Path + +ROOT = Path(__file__).resolve().parents[1] +if str(ROOT) not in sys.path: + sys.path.insert(0, str(ROOT)) diff --git a/tests/test_telemetry.py b/tests/test_telemetry.py new file mode 100644 index 0000000..6277a20 --- /dev/null +++ b/tests/test_telemetry.py @@ -0,0 +1,589 @@ +from __future__ import annotations + +import hashlib +import asyncio +import os +from types import SimpleNamespace + +import pytest + +import telemetry +import telemetry_hooks +from agents.lifecycle import AgentHooksBase, RunHooksBase + + +class FakePostHog: + def __init__(self, api_key: str, host: str, events: list[dict]) -> None: + self.api_key = api_key + self.host = host + self.events = events + + def capture(self, *, event: str, distinct_id: str, properties: dict) -> None: + self.events.append( + { + "event": event, + "distinct_id": distinct_id, + "properties": properties, + "api_key": self.api_key, + "host": self.host, + } + ) + + +@pytest.fixture +def telemetry_events(monkeypatch: pytest.MonkeyPatch, tmp_path): + for key in ( + "POSTHOG_API_KEY", + "POSTHOG_HOST", + "OPENSWARM_TELEMETRY", + "DO_NOT_TRACK", + "CI", + "PYTEST_VERSION", + ): + monkeypatch.delenv(key, raising=False) + monkeypatch.setenv("OPENSWARM_CONFIG_DIR", str(tmp_path / "config")) + monkeypatch.setenv("OPENSWARM_TELEMETRY_ALLOW_TESTS", "1") + monkeypatch.setattr(telemetry, "_MODULE_DIR", tmp_path) + telemetry._reset_for_tests() + telemetry_hooks._trusted_thread_managers.clear() + telemetry_hooks._trusted_message_context.set(None) + events: list[dict] = [] + telemetry.set_posthog_factory_for_tests(lambda api_key, host: FakePostHog(api_key, host, events)) + yield events + telemetry._reset_for_tests() + telemetry_hooks._trusted_thread_managers.clear() + telemetry_hooks._trusted_message_context.set(None) + + +def test_env_key_enables_telemetry(monkeypatch: pytest.MonkeyPatch, telemetry_events: list[dict]) -> None: + monkeypatch.setenv("POSTHOG_API_KEY", "ph_env") + + assert telemetry.capture("app_started", {"install_source": "env"}) + + assert telemetry_events[0]["event"] == "app_started" + assert telemetry_events[0]["api_key"] == "ph_env" + assert telemetry_events[0]["host"] == telemetry.DEFAULT_POSTHOG_HOST + + +def test_generated_npm_config_enables_when_env_absent(tmp_path, telemetry_events: list[dict]) -> None: + (tmp_path / "openswarm_telemetry_config.py").write_text( + "POSTHOG_API_KEY = 'ph_generated'\nPOSTHOG_HOST = 'https://eu.i.posthog.com'\n", + encoding="utf-8", + ) + + assert telemetry.capture("app_started", {"install_source": "npm_config"}) + + assert telemetry_events[0]["api_key"] == "ph_generated" + assert telemetry_events[0]["host"] == "https://eu.i.posthog.com" + + +def test_env_key_overrides_generated_config(monkeypatch: pytest.MonkeyPatch, tmp_path, telemetry_events: list[dict]) -> None: + (tmp_path / "openswarm_telemetry_config.py").write_text( + "POSTHOG_API_KEY = 'ph_generated'\nPOSTHOG_HOST = 'https://generated.example'\n", + encoding="utf-8", + ) + monkeypatch.setenv("POSTHOG_API_KEY", "ph_env") + monkeypatch.setenv("POSTHOG_HOST", "https://env.example") + + assert telemetry.capture("app_started") + + assert telemetry_events[0]["api_key"] == "ph_env" + assert telemetry_events[0]["host"] == "https://env.example" + + +def test_source_install_without_key_is_noop(telemetry_events: list[dict]) -> None: + assert not telemetry.capture("app_started") + assert telemetry_events == [] + + +def test_unknown_event_is_rejected_before_client_or_state_creation( + monkeypatch: pytest.MonkeyPatch, + tmp_path, + telemetry_events: list[dict], +) -> None: + config_dir = tmp_path / "unknown-event-config" + monkeypatch.setenv("OPENSWARM_CONFIG_DIR", str(config_dir)) + monkeypatch.setenv("POSTHOG_API_KEY", "ph_env") + telemetry._reset_for_tests() + telemetry.set_posthog_factory_for_tests(lambda api_key, host: FakePostHog(api_key, host, telemetry_events)) + + assert not telemetry.capture("user_supplied_event_name", {"agent_name": "Docs Agent"}) + assert telemetry_events == [] + assert not (config_dir / "telemetry.json").exists() + + +@pytest.mark.parametrize( + ("env_key", "env_value"), + [ + ("OPENSWARM_TELEMETRY", "false"), + ("OPENSWARM_TELEMETRY", "0"), + ("OPENSWARM_TELEMETRY", "no"), + ("OPENSWARM_TELEMETRY", "off"), + ("DO_NOT_TRACK", "1"), + ("CI", "1"), + ], +) +def test_opt_out_and_ci_suppress_captures( + monkeypatch: pytest.MonkeyPatch, + telemetry_events: list[dict], + env_key: str, + env_value: str, +) -> None: + monkeypatch.setenv("POSTHOG_API_KEY", "ph_env") + if env_key == "CI": + monkeypatch.delenv("OPENSWARM_TELEMETRY_ALLOW_TESTS", raising=False) + monkeypatch.setenv(env_key, env_value) + telemetry._reset_for_tests() + telemetry.set_posthog_factory_for_tests(lambda api_key, host: FakePostHog(api_key, host, telemetry_events)) + + assert not telemetry.capture("app_started") + assert telemetry_events == [] + + +def test_pytest_mode_suppresses_without_explicit_allow(monkeypatch: pytest.MonkeyPatch, telemetry_events: list[dict]) -> None: + monkeypatch.setenv("POSTHOG_API_KEY", "ph_env") + monkeypatch.delenv("OPENSWARM_TELEMETRY_ALLOW_TESTS", raising=False) + telemetry._reset_for_tests() + telemetry.set_posthog_factory_for_tests(lambda api_key, host: FakePostHog(api_key, host, telemetry_events)) + + assert not telemetry.capture("app_started") + assert telemetry_events == [] + + +def test_agent_name_is_raw_on_relevant_events(telemetry_events: list[dict], monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("POSTHOG_API_KEY", "ph_env") + relevant_events = [ + "message_sent", + "agent_run_started", + "agent_run_completed", + "llm_generation_completed", + "tool_invoked", + "handoff", + "error", + ] + + for event in relevant_events: + telemetry.capture(event, {"agent_name": "Docs Agent", "agent_id": telemetry.agent_id("Docs Agent")}) + + assert [entry["event"] for entry in telemetry_events] == relevant_events + for entry in telemetry_events: + props = entry["properties"] + assert props["agent_name"] == "Docs Agent" + assert props["agent_id"] != "Docs Agent" + + +def test_safe_model_and_provider_are_sent(monkeypatch: pytest.MonkeyPatch, telemetry_events: list[dict]) -> None: + monkeypatch.setenv("POSTHOG_API_KEY", "ph_env") + model = "litellm/anthropic/claude-sonnet-4-6" + + telemetry.capture( + "agent_run_started", + { + "agent_name": "Research", + "model": model, + "provider": telemetry.provider_from_model(model), + }, + ) + + props = telemetry_events[0]["properties"] + assert props["model"] == model + assert props["provider"] == "anthropic" + + +def test_unsafe_model_values_are_dropped(monkeypatch: pytest.MonkeyPatch, telemetry_events: list[dict]) -> None: + monkeypatch.setenv("POSTHOG_API_KEY", "ph_env") + cases = [ + ("/Users/mike/private-model", None), + ("model user@example.com", None), + ("C:\\Users\\mike\\private-model", None), + ("https://example.com/private-model", None), + ("litellm/openai/sk-secret1234567890", "openai"), + ("gpt-5.2 api_key=secret", None), + ] + + for model, expected_provider in cases: + telemetry.capture( + "agent_run_started", + { + "agent_name": "Research", + "model": model, + "provider": telemetry.provider_from_model(model), + }, + ) + props = telemetry_events[-1]["properties"] + assert "model" not in props + if expected_provider: + assert props["provider"] == expected_provider + else: + assert "provider" not in props + assert "@" not in str(props) + assert "/Users" not in str(props) + assert "sk-secret" not in str(props) + assert "api_key" not in str(props) + + +def test_configured_provider_ignores_unsafe_default_model( + monkeypatch: pytest.MonkeyPatch, + telemetry_events: list[dict], +) -> None: + monkeypatch.setenv("DEFAULT_MODEL", "gpt-5.2 api_key=secret") + monkeypatch.delenv("OPENAI_API_KEY", raising=False) + + assert telemetry.configured_provider() is None + + +def test_numeric_and_boolean_properties_are_type_gated( + monkeypatch: pytest.MonkeyPatch, + telemetry_events: list[dict], +) -> None: + monkeypatch.setenv("POSTHOG_API_KEY", "ph_env") + + telemetry.capture( + "error", + { + "error_type": "ValueError", + "error_category": "test", + "http_status": "429", + "latency_ms": "/Users/mike/private", + "tokens_input": "10", + "tokens_output": "20", + "cost_usd": "1.25", + "is_streaming": "true", + "has_provider_key": "true", + }, + ) + unsafe_props = telemetry_events[0]["properties"] + for key in ( + "http_status", + "latency_ms", + "tokens_input", + "tokens_output", + "cost_usd", + "is_streaming", + "has_provider_key", + ): + assert key not in unsafe_props + + telemetry.capture( + "error", + { + "error_type": "ValueError", + "error_category": "test", + "http_status": 429, + "latency_ms": 12, + "tokens_input": 10, + "tokens_output": 20, + "cost_usd": 1.25, + "is_streaming": False, + "has_provider_key": True, + }, + ) + safe_props = telemetry_events[1]["properties"] + assert safe_props["http_status"] == 429 + assert safe_props["latency_ms"] == 12 + assert safe_props["tokens_input"] == 10 + assert safe_props["tokens_output"] == 20 + assert safe_props["cost_usd"] == 1.25 + assert safe_props["is_streaming"] is False + assert safe_props["has_provider_key"] is True + + +def test_workspace_id_is_hmac_not_plain_path_hash(telemetry_events: list[dict], tmp_path) -> None: + workspace = tmp_path / "secret-project" + workspace.mkdir() + derived = telemetry.workspace_id(workspace) + plain_hash = hashlib.sha256(str(workspace.resolve()).encode("utf-8")).hexdigest() + + assert derived != str(workspace.resolve()) + assert derived != plain_hash + assert derived == telemetry.workspace_id(workspace) + + +def test_error_event_excludes_messages_traces_and_paths(monkeypatch: pytest.MonkeyPatch, telemetry_events: list[dict]) -> None: + monkeypatch.setenv("POSTHOG_API_KEY", "ph_env") + error = ValueError("secret prompt in /Users/person/project") + + telemetry.capture_error( + error, + category="swarm_run", + properties={ + "agent_name": "Orchestrator", + "message": "secret", + "traceback": "stack", + "file_path": "/Users/person/project/file.py", + "http_status": 429, + }, + ) + + props = telemetry_events[0]["properties"] + assert props["error_type"] == "ValueError" + assert props["error_category"] == "swarm_run" + assert props["http_status"] == 429 + assert "secret prompt" not in str(props) + assert "traceback" not in props + assert "file_path" not in props + assert "message" not in props + + +def test_agent_hook_composition_preserves_existing_hook(monkeypatch: pytest.MonkeyPatch, telemetry_events: list[dict]) -> None: + monkeypatch.setenv("POSTHOG_API_KEY", "ph_env") + calls: list[str] = [] + + class ExistingHooks(AgentHooksBase): + async def on_start(self, context, agent): + calls.append("existing") + + async def run_hook() -> None: + hooks = telemetry_hooks.compose_agent_hooks(ExistingHooks()) + await hooks.on_start(SimpleNamespace(context=SimpleNamespace()), SimpleNamespace(name="Research", model="gpt-5.2")) + + asyncio.run(run_hook()) + + assert calls == ["existing"] + assert telemetry_events[0]["event"] == "agent_run_started" + assert telemetry_events[0]["properties"]["agent_name"] == "Research" + + +def test_run_hook_composition_preserves_existing_hook() -> None: + calls: list[str] = [] + + class ExistingRunHooks(RunHooksBase): + async def on_agent_start(self, context, agent): + calls.append("existing") + + async def run_hook() -> None: + hooks = telemetry_hooks.compose_run_hooks(ExistingRunHooks()) + await hooks.on_agent_start(None, SimpleNamespace(name="Research")) + + asyncio.run(run_hook()) + + assert calls == ["existing"] + + +def test_positional_hooks_override_is_composed() -> None: + class ExistingRunHooks(RunHooksBase): + pass + + hook = ExistingRunHooks() + args = ("message", None, None, hook) + kwargs = {} + + composed_args = telemetry_hooks._compose_hooks_override(args, kwargs) + + assert composed_args[:3] == args[:3] + assert isinstance(composed_args[3], telemetry_hooks.CompositeRunHooks) + + +class TwoItemStream: + def __init__(self) -> None: + self.items = iter(["first", "second"]) + self.final_result = SimpleNamespace() + + def __aiter__(self): + return self + + async def __anext__(self): + try: + return next(self.items) + except StopIteration: + raise StopAsyncIteration + + async def aclose(self): + return None + + async def wait_final_result(self): + return self.final_result + + +def test_streaming_completion_fires_on_consumption(monkeypatch: pytest.MonkeyPatch, telemetry_events: list[dict]) -> None: + monkeypatch.setenv("POSTHOG_API_KEY", "ph_env") + stream = telemetry_hooks.TelemetryStreamingRunResponse( + TwoItemStream(), + {"agent_name": "Orchestrator", "is_streaming": True}, + telemetry_hooks.time.monotonic(), + ) + + async def consume_stream() -> list[str]: + consumed = [] + async for item in stream: + consumed.append(item) + assert telemetry_events == [] + return consumed + + consumed = asyncio.run(consume_stream()) + + assert consumed == ["first", "second"] + assert telemetry_events[0]["event"] == "swarm_run_completed" + assert telemetry_events[0]["properties"]["agent_name"] == "Orchestrator" + + +class ErrorStream: + final_result = None + + def __aiter__(self): + return self + + async def __anext__(self): + raise RuntimeError("private path /Users/person/project") + + async def aclose(self): + return None + + +def test_streaming_error_fires_on_consumption(monkeypatch: pytest.MonkeyPatch, telemetry_events: list[dict]) -> None: + monkeypatch.setenv("POSTHOG_API_KEY", "ph_env") + stream = telemetry_hooks.TelemetryStreamingRunResponse( + ErrorStream(), + {"agent_name": "Orchestrator", "is_streaming": True}, + telemetry_hooks.time.monotonic(), + ) + + async def consume_stream() -> None: + await stream.__anext__() + + with pytest.raises(RuntimeError): + asyncio.run(consume_stream()) + + assert telemetry_events[0]["event"] == "error" + props = telemetry_events[0]["properties"] + assert props["error_type"] == "RuntimeError" + assert "private path" not in str(props) + + +def test_message_sent_extracts_only_safe_fields(monkeypatch: pytest.MonkeyPatch, telemetry_events: list[dict]) -> None: + monkeypatch.setenv("POSTHOG_API_KEY", "ph_env") + token = telemetry_hooks._trusted_message_context.set( + { + "agent_name": "Docs Agent", + "agent_run_id": "trusted_run", + "run_trace_id": "trusted_trace", + "agent_names": {"Docs Agent", "Orchestrator"}, + } + ) + message = { + "role": "assistant", + "type": "message", + "content": [{"type": "output_text", "text": "do not send"}], + "agent": "user controlled secret", + "callerAgent": "Orchestrator", + "agent_run_id": "/Users/private/run", + "run_trace_id": "secret_trace", + } + + try: + telemetry_hooks._capture_message_sent(message, manager=object()) + finally: + telemetry_hooks._trusted_message_context.reset(token) + + props = telemetry_events[0]["properties"] + assert props["agent_name"] == "Docs Agent" + assert props["caller_agent_name"] == "Orchestrator" + assert props["agent_run_id"] == "trusted_run" + assert props["run_trace_id"] == "trusted_trace" + assert props["message_role"] == "assistant" + assert "content" not in props + assert "do not send" not in str(props) + assert "user controlled secret" not in str(props) + assert "/Users/private" not in str(props) + + +def test_message_sent_without_trusted_agent_context_drops_message_agent( + monkeypatch: pytest.MonkeyPatch, + telemetry_events: list[dict], +) -> None: + monkeypatch.setenv("POSTHOG_API_KEY", "ph_env") + + telemetry_hooks._capture_message_sent( + { + "role": "assistant", + "type": "arbitrary-private-type", + "agent": "private user text", + "callerAgent": "private caller text", + "agent_run_id": "private run", + }, + manager=object(), + ) + + props = telemetry_events[0]["properties"] + assert props["message_type"] == "message" + assert "agent_name" not in props + assert "caller_agent_name" not in props + assert "agent_run_id" not in props + assert "private" not in str(props) + + +def test_thread_manager_wrapper_ignores_message_telemetry_failures(monkeypatch: pytest.MonkeyPatch) -> None: + telemetry_hooks.install_thread_manager_telemetry() + from agency_swarm.utils.thread import ThreadManager + + def fail_capture(*args, **kwargs): + raise RuntimeError("telemetry failure") + + monkeypatch.setattr(telemetry_hooks, "_capture_message_sent", fail_capture) + manager = ThreadManager() + + manager.add_message({"role": "user", "type": "message", "content": "product path still works"}) + + +def test_streaming_completion_ignores_telemetry_capture_failure(monkeypatch: pytest.MonkeyPatch) -> None: + def fail_capture(*args, **kwargs): + raise RuntimeError("telemetry failure") + + monkeypatch.setattr(telemetry_hooks.telemetry, "capture", fail_capture) + stream = telemetry_hooks.TelemetryStreamingRunResponse( + TwoItemStream(), + {"agent_name": "Orchestrator", "is_streaming": True}, + telemetry_hooks.time.monotonic(), + ) + + async def consume_stream() -> list[str]: + consumed = [] + async for item in stream: + consumed.append(item) + return consumed + + assert asyncio.run(consume_stream()) == ["first", "second"] + + +def test_agency_run_wrapper_ignores_telemetry_capture_failure(monkeypatch: pytest.MonkeyPatch) -> None: + class FakeAgency: + entry_points = [SimpleNamespace(name="Orchestrator")] + + async def get_response(self, *args, **kwargs): + return SimpleNamespace() + + def get_response_stream(self, *args, **kwargs): + return TwoItemStream() + + def fail_capture(*args, **kwargs): + raise RuntimeError("telemetry failure") + + agency = FakeAgency() + monkeypatch.setattr(telemetry_hooks.telemetry, "capture", fail_capture) + telemetry_hooks._wrap_agency_methods(agency) + + result = asyncio.run(agency.get_response("hello")) + + assert isinstance(result, SimpleNamespace) + + +def test_tool_invoked_payload_drops_content_like_properties(monkeypatch: pytest.MonkeyPatch, telemetry_events: list[dict]) -> None: + monkeypatch.setenv("POSTHOG_API_KEY", "ph_env") + + telemetry.capture( + "tool_invoked", + { + "agent_name": "Slides Agent", + "tool_name": "create_slide", + "tool_args": {"prompt": "secret"}, + "tool_result": "secret output", + "latency_ms": 5, + }, + ) + + props = telemetry_events[0]["properties"] + assert props["agent_name"] == "Slides Agent" + assert props["tool_name"] == "create_slide" + assert "tool_args" not in props + assert "tool_result" not in props + assert "secret" not in str(props) diff --git a/tests/test_telemetry_scripts.py b/tests/test_telemetry_scripts.py new file mode 100644 index 0000000..6ea6863 --- /dev/null +++ b/tests/test_telemetry_scripts.py @@ -0,0 +1,214 @@ +from __future__ import annotations + +import json +import os +import shutil +import subprocess +import sys +import zipfile +from pathlib import Path + +import pytest + +from scripts.check_telemetry_artifact import contains_generated_config +from scripts.create_posthog_dashboard import ( + build_dashboard_payload, + build_dashboard_tile_layouts, + build_dry_run_payload, + build_insight_payloads, +) +from scripts.smoke_telemetry import SMOKE_EVENT, build_smoke_properties +from scripts.write_telemetry_config import build_config_source + + +def test_config_writer_builds_generated_module() -> None: + source = build_config_source("ph_test", "https://eu.i.posthog.com") + + assert "POSTHOG_API_KEY = 'ph_test'" in source + assert "POSTHOG_HOST = 'https://eu.i.posthog.com'" in source + assert "Do not commit this file" in source + + +def test_config_writer_requires_key() -> None: + with pytest.raises(ValueError): + build_config_source("") + + +def test_dashboard_payload_name() -> None: + payload = build_dashboard_payload() + + assert payload["name"] == "OpenSwarm Product Analytics" + assert "OpenSwarm" in payload["tags"] + + +def test_dashboard_agent_usage_groups_by_agent_name() -> None: + payloads = build_insight_payloads(123) + agent_usage = next(payload for payload in payloads if payload["name"] == "OpenSwarm / Agent usage by agent") + + assert "display" not in agent_usage["query"] + source = agent_usage["query"]["source"] + assert source["series"][0]["event"] == "agent_run_completed" + assert source["breakdownFilter"]["breakdown"] == "agent_name" + assert source["trendsFilter"]["display"] == "ActionsBarValue" + assert agent_usage["dashboards"] == [123] + assert "OpenSwarm" in agent_usage["tags"] + + +def test_dashboard_has_useful_openswarm_sections() -> None: + payloads = build_insight_payloads(123) + names = {payload["name"] for payload in payloads} + + assert len(payloads) >= 10 + assert "OpenSwarm / Active installs today" in names + assert "OpenSwarm / Messages by role" in names + assert "OpenSwarm / Recent telemetry samples" in names + + +def test_recent_samples_table_supports_dashboard_filters() -> None: + payloads = build_insight_payloads(123) + samples = next(payload for payload in payloads if payload["name"] == "OpenSwarm / Recent telemetry samples") + + query = samples["query"]["source"]["query"] + assert "WHERE {filters}" in query + assert "ORDER BY timestamp DESC" in query + + +def test_dashboard_layout_puts_kpis_in_one_compact_row() -> None: + payloads = build_insight_payloads(123) + fake_dashboard = { + "tiles": [ + {"id": index + 1, "insight": {"name": payload["name"]}} + for index, payload in enumerate(payloads) + ] + } + + layouts = build_dashboard_tile_layouts(fake_dashboard) + + first_row = layouts[:4] + assert [tile["layouts"]["sm"] for tile in first_row] == [ + {"x": 0, "y": 0, "w": 3, "h": 3}, + {"x": 3, "y": 0, "w": 3, "h": 3}, + {"x": 6, "y": 0, "w": 3, "h": 3}, + {"x": 9, "y": 0, "w": 3, "h": 3}, + ] + + by_id = {tile["id"]: tile for tile in layouts} + messages_by_role_id = payloads.index(next(p for p in payloads if p["name"] == "OpenSwarm / Messages by role")) + 1 + agent_usage_id = payloads.index(next(p for p in payloads if p["name"] == "OpenSwarm / Agent usage by agent")) + 1 + assert by_id[messages_by_role_id]["order"] == 5 + assert by_id[messages_by_role_id]["layouts"]["sm"]["x"] == 0 + assert by_id[agent_usage_id]["order"] == 6 + assert by_id[agent_usage_id]["layouts"]["sm"]["x"] == 6 + + +def test_dashboard_dry_run_payload_has_no_secret_fields() -> None: + payload = build_dry_run_payload("env_123") + + assert payload["dashboard"]["path"] == "/api/environments/env_123/dashboards/" + assert payload["insights"][0]["path"] == "/api/environments/env_123/insights/" + assert "POSTHOG_PERSONAL_API_KEY" not in str(payload) + + +def test_dashboard_error_rate_uses_hogql_rate_query() -> None: + payloads = build_insight_payloads(123) + error_rate = next(payload for payload in payloads if payload["name"] == "OpenSwarm / Error rate by day") + + assert error_rate["query"]["kind"] == "DataVisualizationNode" + source = error_rate["query"]["source"] + assert source["kind"] == "HogQLQuery" + assert "WHERE {filters}" in source["query"] + assert "countIf(event = 'error')" in source["query"] + assert "countIf(event = 'swarm_run_completed')" in source["query"] + assert error_rate["dashboards"] == [123] + + +def test_smoke_script_uses_manual_smoke_event() -> None: + props = build_smoke_properties() + + assert SMOKE_EVENT == "telemetry_smoke_test" + assert props["status"] == "manual" + assert "POSTHOG_API_KEY" not in str(props) + + +def test_artifact_checker_detects_stale_generated_config(tmp_path) -> None: + build_dir = tmp_path / "build" / "lib" + build_dir.mkdir(parents=True) + (build_dir / "openswarm_telemetry_config.py").write_text("POSTHOG_API_KEY = 'ph_stale'\n", encoding="utf-8") + + assert contains_generated_config(tmp_path / "build") + + +def test_python_wheel_excludes_stale_generated_config(tmp_path) -> None: + repo = Path(__file__).resolve().parents[1] + root_generated_config = repo / "openswarm_telemetry_config.py" + stale_build_config = repo / "build" / "lib" / "openswarm_telemetry_config.py" + root_generated_config.write_text("POSTHOG_API_KEY = 'ph_root'\n", encoding="utf-8") + stale_build_config.parent.mkdir(parents=True, exist_ok=True) + stale_build_config.write_text("POSTHOG_API_KEY = 'ph_stale'\n", encoding="utf-8") + + try: + result = subprocess.run( + [ + sys.executable, + "-m", + "pip", + "wheel", + "--no-build-isolation", + "--no-deps", + "-w", + str(tmp_path), + ".", + ], + cwd=repo, + env={**os.environ, "OPENSWARM_TELEMETRY_ALLOW_TESTS": "1"}, + capture_output=True, + text=True, + timeout=120, + ) + assert result.returncode == 0, result.stderr + wheel = next(tmp_path.glob("*.whl")) + with zipfile.ZipFile(wheel) as archive: + names = archive.namelist() + assert "openswarm_telemetry_config.py" not in names + assert not any(name.startswith("build/") for name in names) + finally: + root_generated_config.unlink(missing_ok=True) + stale_build_config.unlink(missing_ok=True) + + +def test_npm_pack_includes_generated_config_after_writer() -> None: + if not shutil.which("npm"): + pytest.skip("npm is not installed") + + repo = Path(__file__).resolve().parents[1] + root_generated_config = repo / "openswarm_telemetry_config.py" + + try: + writer = subprocess.run( + [sys.executable, "scripts/write_telemetry_config.py"], + cwd=repo, + env={ + **os.environ, + "POSTHOG_API_KEY": "ph_pack_test", + "POSTHOG_HOST": "https://example.i.posthog.com", + }, + capture_output=True, + text=True, + timeout=30, + ) + assert writer.returncode == 0, writer.stderr + + packed = subprocess.run( + ["npm", "pack", "--dry-run", "--json"], + cwd=repo, + capture_output=True, + text=True, + timeout=120, + ) + assert packed.returncode == 0, packed.stderr + package = json.loads(packed.stdout)[0] + paths = {entry["path"] for entry in package["files"]} + + assert "openswarm_telemetry_config.py" in paths + finally: + root_generated_config.unlink(missing_ok=True)