diff --git a/docs/telemetry.md b/docs/telemetry.md new file mode 100644 index 00000000..094b5ac9 --- /dev/null +++ b/docs/telemetry.md @@ -0,0 +1,248 @@ +# Ops Telemetry + +dws can emit one **anonymous, dimensions-only** ops metric per **command +invocation**, used to monitor error rate, latency, command distribution, and +version/platform health. It is the ops-side counterpart of [audit](./audit.md), +but deliberately **far smaller**: + +- Collects **coarse dimensions only** — never object names, free text, peer ids, + device fingerprints, or natural-language input. There is no "redaction profile" + because there are no sensitive fields to redact in the first place. +- **Independent of audit**: unrelated to `DWS_AUDIT_*`; you can enable telemetry + without enabling compliance audit. +- **Default posture depends on the build** (see [Default posture](#default-posture)): + the open-source build is **off** (pure opt-in); a downstream distribution may + bake in a default endpoint and ship **on by default**, with a one-time + disclosure and an opt-out. + +> This is an open-source CLI: the **public build never reports a byte and never +> hardcodes an endpoint**. Any on-by-default behavior lives only in a downstream +> build that injects its own endpoint — and even then it is disclosed once and +> can be opted out of. + +## Enabling + +| Environment variable | Description | Example | +|---|---|---| +| `DWS_TELEMETRY_ENABLED` | Explicitly enable/disable; overrides the build default either way | `true` / `false` | +| `DWS_TELEMETRY_DISABLED` | Hard opt-out; wins over everything (the off switch for on-by-default builds) | `true` | +| `DWS_TELEMETRY_FILE` | **Local file sink** — append each event as one JSON line here instead of POSTing (no server, no network). Takes precedence over URL | `~/.dws/telemetry.jsonl` | +| `DWS_TELEMETRY_URL` | Ingest endpoint; overrides the build-time default; one JSON event POSTed per invocation | `https://telemetry.example.com/dws` | +| `DWS_TELEMETRY_TOKEN` | Bearer auth for the endpoint (optional) | `xxxxx` | +| `DWS_TELEMETRY_TIMEOUT_MS` | Per-report timeout cap, in ms (default 1500) | `1500` | + +## Default posture + +`Enabled()` resolves like this: + +1. `DWS_TELEMETRY_DISABLED=true` → **off** (always wins). +2. No destination (no `DWS_TELEMETRY_URL` and no baked-in default) → **off**. +3. `DWS_TELEMETRY_ENABLED` set → its value wins (`true`/`false`). +4. Otherwise → **on only if the build baked in a default endpoint**; a bare env + URL in the open-source build stays opt-in (off until `DWS_TELEMETRY_ENABLED=true`). + +**Open-source build** → off; an operator opts in with `DWS_TELEMETRY_ENABLED=true` +plus a `DWS_TELEMETRY_URL`. + +**Downstream "fleet" build (on by default)** → inject a default endpoint at build +time via `-ldflags`, so every install of that distribution reports to the +operator's own ingest out of the box (users opt out with +`DWS_TELEMETRY_DISABLED=true`): + +```bash +go build -ldflags "\ + -X github.com/DingTalk-Real-AI/dingtalk-workspace-cli/internal/telemetry.defaultURL=https:///dws \ + -X github.com/DingTalk-Real-AI/dingtalk-workspace-cli/internal/telemetry.defaultToken=" ./cmd +``` + +The public repo never hardcodes a real endpoint — only your build does. This +keeps "code is open source" and "data lands in the operator's own sink" +decoupled. + +### One-time disclosure + +The first time telemetry is active on a machine, dws prints a one-time notice to +stderr and writes a marker (`~/.dws/.telemetry_notice_shown`) so it never repeats: + +``` +ℹ️ dws reports anonymous operational telemetry (command, outcome, latency, version + — no content, no identity) to help monitor stability. Opt out anytime with + DWS_TELEMETRY_DISABLED=true. Details: docs/telemetry.md +``` + +## Local monitoring (lightest — no server, no SLS) + +The smallest possible setup: point telemetry at a **local file**. No receiver, no +FC, no SLS — each machine appends its own events; you aggregate the file whenever. + +```bash +# turn it on (file sink alone enables telemetry) +export DWS_TELEMETRY_FILE=~/.dws/telemetry.jsonl + +# ... use dws normally ... + +# one-line stability view (per command: calls / errors / avg latency) +python3 - <<'PY' +import json, collections, os +rows=[json.loads(l) for l in open(os.path.expanduser('~/.dws/telemetry.jsonl')) if l.strip()] +by=collections.defaultdict(lambda:{'n':0,'err':0,'dur':[]}) +for r in rows: + k=f"{r.get('command')}.{r.get('subcommand')}"; b=by[k] + b['n']+=1; b['err']+=(r.get('outcome')!='ok'); b['dur'].append(r.get('duration_ms',0)) +print(f"{'command':<28}{'calls':>6}{'err':>5}{'avg_ms':>8}") +for k,v in sorted(by.items(),key=lambda x:-x[1]['n']): + d=v['dur'] or [0]; print(f"{k:<28}{v['n']:>6}{v['err']:>5}{sum(d)//len(d):>8}") +PY +``` + +For a small fleet, collect each machine's `telemetry.jsonl` (rsync/scp) and run +the same aggregation over the combined files. Scale to the URL→ingest path only +when you outgrow this. + +## Reported fields (complete) + +```json +{ + "schema_version": "1", + "ts": "2026-06-04T11:38:24+08:00", + "trace_id": "76a04f9eba0ad00c", // == transport execution_id, joinable with server-side logs + "corp_id": "ding...", // tenant dimension, best-effort (from the login token) + "cli_version": "1.0.34", // version health: "did this release break a command" + "channel": "openclaw", // which agent/integration drove the call (DWS_CHANNEL) + "os": "darwin", // coarse platform, not PII + "module": "doc", + "command": "doc", + "subcommand": "create_document", + "outcome": "ok", // ok | error + "err_class": "", // error category when outcome=error + "exit_code": 0, + "duration_ms": 73 // wall-clock latency of the call, used for P99 +} +``` + +**Deliberately not collected** (verify the privacy boundary by reading the +struct): user identity (user_id / name), object names/ids, free text, device +id/serial, request/response body. + +## Receiver contract + +Any HTTP service can receive it: + +``` +POST / +Content-Type: application/json +Authorization: Bearer # matches DWS_TELEMETRY_TOKEN +X-Dws-Telemetry-Schema: 1 +Body: one telemetry event JSON +Return 2xx for success +``` + +## Local testing (zero dependencies, no SLS) + +Before going to SLS, run the whole pipeline locally. Use +`fc-sls-ingest/localsink.py` (pure Python standard library, no `pip install` +needed) as the receiver: + +```bash +# 1. Start the local receiver (with a test token) +cd docs/telemetry/fc-sls-ingest +TOKEN=dev python3 localsink.py # listens on 127.0.0.1:8799, writes /tmp/dws_telemetry.jsonl + +# 2. In another terminal, point dws at it +export DWS_TELEMETRY_ENABLED=true +export DWS_TELEMETRY_URL=http://127.0.0.1:8799 +export DWS_TELEMETRY_TOKEN=dev + +# 3. Run a few commands (--mock needs no network or real backend, still emits telemetry) +dws doc create --title test --mock +dws drive list --mock +``` + +The receiver prints each event in real time and appends to +`/tmp/dws_telemetry.jsonl`. Things to verify: + +- Events carry dimensions such as `command/outcome/duration_ms/cli_version/channel/os`; +- Compare command arguments (e.g. `--title test`) against the payload and confirm + the **content does not appear in the payload**; +- A POST without the token must be rejected (401). + +Once written to disk, you can locally simulate the kind of metrics a dashboard +would compute: + +```bash +python3 - <<'PY' +import json, collections +rows=[json.loads(l) for l in open('/tmp/dws_telemetry.jsonl') if l.strip()] +by=collections.defaultdict(lambda:{'n':0,'err':0,'dur':[]}) +for r in rows: + k=f"{r['command']} {r['subcommand']}"; b=by[k] + b['n']+=1; b['err']+=(r['outcome']!='ok'); b['dur'].append(r.get('duration_ms',0)) +for k,v in sorted(by.items(), key=lambda x:-x[1]['n']): + d=v['dur']; print(f"{k:<26}calls{v['n']:>4} err{v['err']:>3} avg{sum(d)//len(d):>5}ms max{max(d):>5}ms") +PY +``` + +> Note: telemetry is only emitted once a command actually reaches the MCP-call +> stage. If a command fails at argument parsing (before the call), no telemetry is +> produced — this is expected behavior. + +## Boundary between open-source code and internal resources (public/private split) + +dws is an open-source repository, but **which SLS the telemetry lands in and which +internal app it binds to is the deployer's own concern and never goes into the +repo**. This boundary is by design, not accident: + +| | Where | Contains | In repo? | +|---|---|---|---| +| dws binary + the FC/local reference code in this dir | Public repo | Only POSTs to `DWS_TELEMETRY_URL`; **no endpoint, no secret, no app name** | ✅ | +| SLS Project / FC instance / real URL+token | Deployer's internal infra | Real address, auth, logstore; inside Alibaba it also binds to an internal app | ❌ Never in the repo; injected via env vars | + +The code **never hardcodes any vendor reporting address**; the URL is always read +from an environment variable at runtime. So "code is public" and "data lands in +the deployer's internal SLS" are naturally decoupled: switching deployers is just +a different set of env vars, the repo needs no change, and no party's real config +is visible. + +> Inside Alibaba: the SLS Project must hang under an AONE app (resource-governance +> requirement). Bind it to the app that owns the dws backend (e.g. the DingTalk +> MCP gateway app); that binding, the real URL, and the token all stay internal — +> the public repo is unaware of them. + +## Wiring up Alibaba Cloud SLS (recommended for production) + +SLS (Log Service) ships with ingest / storage / search / dashboards / alerting — +a standard choice for ops monitoring: + +1. **Create the store**: in the SLS console create a Project + Logstore (e.g. + `dws-telemetry`), set retention days; index the fields `command` / + `subcommand` / `outcome` / `cli_version` / `corp_id` / `channel`, and set + `duration_ms` as a long-typed index (needed for P99). +2. **Create the receiver endpoint**: a **Function Compute (FC)** HTTP trigger is + the lowest-ops option — after validating the Bearer, write the body as a single + log via `PutLogs` into the Logstore (put the whole JSON in an `event` field and + also extract `command`/`outcome`/`duration_ms`/`cli_version` as indexed + columns). +3. **Roll out**: set the FC address as `DWS_TELEMETRY_URL` on each dws endpoint. + +### Four ready-to-use alerts (SLS alert rules) + +| Alert | SLS query (illustrative) | Trigger | +|---|---|---| +| Error-rate spike | `* \| select count_if(outcome='error')*1.0/count(*) as err_rate` | err_rate > 5% | +| P99 latency over budget | `* \| select approx_percentile(duration_ms, 0.99) as p99` | p99 > 3000 | +| One command failing broadly | `* \| select command, count_if(outcome='error') c group by command order by c desc` | c spikes for a single command | +| Call volume drops to zero | `* \| select count(*)` | == 0 within 5 minutes | + +The alert notification channel can be a DingTalk bot directly. + +## Where the data lands / two flows + +- **Off = never leaves the machine.** dws ships no default vendor reporting address. +- **Enterprise self-hosted monitoring**: point `DWS_TELEMETRY_URL` at the + enterprise's own SLS ingest. +- **Platform-side unified monitoring**: point the URL at DingTalk's telemetry + ingest — technically possible, but must be opt-in + disclosed. Because this + telemetry **contains only anonymous dimensions**, the privacy boundary is clean + by construction, suitable for a platform ops dashboard. +- Full compliance trails are a separate track — use the enterprise's own sink via + [audit](./audit.md); don't mix it with telemetry. diff --git a/docs/telemetry/fc-sls-ingest/.dockerignore b/docs/telemetry/fc-sls-ingest/.dockerignore new file mode 100644 index 00000000..5d75e3ff --- /dev/null +++ b/docs/telemetry/fc-sls-ingest/.dockerignore @@ -0,0 +1,6 @@ +localsink.py +README.md +s.yaml +deploy.sh +Dockerfile +.dockerignore diff --git a/docs/telemetry/fc-sls-ingest/Dockerfile b/docs/telemetry/fc-sls-ingest/Dockerfile new file mode 100644 index 00000000..79df921d --- /dev/null +++ b/docs/telemetry/fc-sls-ingest/Dockerfile @@ -0,0 +1,22 @@ +# dws telemetry ingest — container image (AONE-deployable / any container platform). +# +# Build: docker build -t dws-telemetry-ingest . +# Run: docker run -p 9000:9000 -e INGEST_TOKEN= dws-telemetry-ingest # dry-run +# docker run -p 9000:9000 -e INGEST_TOKEN= \ +# -e SLS_ENDPOINT=... -e SLS_PROJECT=... -e SLS_LOGSTORE=... dws-telemetry-ingest # -> SLS +# +# For AONE: point the app's build at this Dockerfile; expose port 9000; set the +# env vars (INGEST_TOKEN required; SLS_* to write to an internal SLS Logstore). +# Grant the running identity SLS PutLogs so app.py uses injected creds. +FROM python:3.10-slim + +WORKDIR /app +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt +COPY app.py . + +ENV PORT=9000 +EXPOSE 9000 + +# gunicorn web server; app.py auto-detects dry-run vs SLS mode from env. +CMD ["gunicorn", "-b", "0.0.0.0:9000", "--workers", "2", "--timeout", "30", "app:app"] diff --git a/docs/telemetry/fc-sls-ingest/README.md b/docs/telemetry/fc-sls-ingest/README.md new file mode 100644 index 00000000..1e18e4cb --- /dev/null +++ b/docs/telemetry/fc-sls-ingest/README.md @@ -0,0 +1,118 @@ +# dws Telemetry Receiver (Function Compute FC → SLS) + +This is the **reference receiver** for [Ops Telemetry](../../telemetry.md): dws +POSTs one telemetry JSON over, but SLS cannot accept a raw POST directly (writes +must be signed), so this minimal HTTP service sits in between — it validates the +token and writes to SLS via `PutLogs`. Deploy it as a Function Compute (FC) **web +function**; you don't have to deal with the FC handler signature. + +``` +dws ──POST one JSON──▶ this service (FC web fn) ──PutLogs──▶ SLS Logstore ──▶ dashboard/alerts +``` + +## Files + +- `app.py` — Flask service: `POST /` validates Bearer → parses JSON → writes SLS; `GET /` health check +- `requirements.txt` — dependencies (flask / gunicorn / aliyun-log-python-sdk) + +## 1. Create the store in SLS first (a few clicks in the console) + +1. Create a **Project** (e.g. `dws-ops`) and a **Logstore** (e.g. + `dws-telemetry`), set retention days. +2. Enable indexes: set `command` / `subcommand` / `outcome` / `cli_version` / + `corp_id` / `channel` as **text**; set `duration_ms` / `exit_code` as **long** + (needed for P99 and aggregation). + +## Two run modes (auto-detected) + +`app.py` switches automatically by environment variables — **no code change**: + +| Mode | Trigger | Behavior | +|---|---|---| +| **dry-run** | Any SLS variable missing, or `TELEMETRY_DRYRUN=true` | Received events are printed to stdout (FC captures this in function logs) and return 204. **Does not require the aliyun-log SDK** — good for validating the pipeline first | +| **SLS** | `SLS_ENDPOINT`+`SLS_PROJECT`+`SLS_LOGSTORE` all set | After validation, `PutLogs` writes into the Logstore | + +The `GET /` health check echoes the current mode (`mode=dry-run` / `mode=sls`), +so it's obvious right after deploy. + +## 2. Deploy this service as an FC web function + +1. Function Compute console → Create function → **Web function** → Python runtime. +2. Upload this directory's code (incl. `requirements.txt`; FC installs deps + automatically). +3. **Startup command**: `gunicorn -b 0.0.0.0:9000 app:app`, **listen port** `9000`. +4. **Dry-run validation first (strongly recommended)**: on the first deploy set + only `INGEST_TOKEN` and **leave the SLS variables unset** (or add + `TELEMETRY_DRYRUN=true`). After deploy, `GET /` should show `mode=dry-run`; + point dws at it, run a few commands, and you'll see `DRYRUN {...}` lines in FC's + **function logs** — proving the "client → FC" leg works. This step **needs no + SLS, no store, no SDK**. +5. **Then wire up SLS**: **bind a service role** to the function and grant + `AliyunLogFullAccess` (or a narrower PutLogs permission) — this way you don't + put an AccessKey in env vars; FC injects STS temporary credentials and `app.py` + reads them first. Then add the SLS env vars; once `GET /` becomes `mode=sls` + it's live: + + | Variable | Value | Note | + |---|---|---| + | `SLS_ENDPOINT` | `cn-hangzhou.log.aliyuncs.com` | change to your region | + | `SLS_PROJECT` | `dws-ops` | the Project from step 1 | + | `SLS_LOGSTORE` | `dws-telemetry` | the Logstore from step 1 | + | `INGEST_TOKEN` | a random string you generate | must match dws-side `DWS_TELEMETRY_TOKEN` | + +6. After deploy, grab the function's HTTP trigger address (like + `https://xxx.cn-hangzhou.fcapp.run`). + +## 3. Wire dws up + +In the environment where dws runs (or injected by the host agent): + +```bash +export DWS_TELEMETRY_ENABLED=true +export DWS_TELEMETRY_URL="https://xxx.cn-hangzhou.fcapp.run" # the function address from above +export DWS_TELEMETRY_TOKEN="" +``` + +Run a few commands and you'll see records appear in the SLS Logstore query page. + +## 4. Validate locally first (optional, no FC / SLS needed) + +The simplest local validation uses `localsink.py` (pure standard library, zero +deps), see [the "Local testing" section in telemetry.md](../../telemetry.md#local-testing-zero-dependencies-no-sls). + +You can also run this service's **dry-run mode** locally (no SLS, no aliyun-log): + +```bash +cd docs/telemetry/fc-sls-ingest +pip install flask # dry-run only needs flask; aliyun-log is only for SLS mode +INGEST_TOKEN=dev python3 app.py # no SLS_* -> auto dry-run, listens on :9000 +# in another terminal: +curl -s localhost:9000/ # should echo mode=dry-run +curl -XPOST localhost:9000/ -H 'Authorization: Bearer dev' \ + -H 'Content-Type: application/json' \ + -d '{"schema_version":"1","command":"doc","outcome":"ok","duration_ms":42}' +# returns 204; the event prints as DRYRUN {...} in the app.py terminal. +``` + +To validate against real SLS locally, add `SLS_ENDPOINT/SLS_PROJECT/SLS_LOGSTORE` +and an AccessKey (`pip install -r requirements.txt` to install aliyun-log), and +`GET /` will become `mode=sls`. + +## 5. Configure alerts (SLS console → Alerts) + +| Alert | Query (illustrative) | Trigger | +|---|---|---| +| Error-rate spike | `* \| select count_if(outcome='error')*1.0/count(*) as err_rate` | err_rate > 0.05 | +| P99 latency over budget | `* \| select approx_percentile(duration_ms, 0.99) as p99` | p99 > 3000 | +| One command failing broadly | `* \| select command, count_if(outcome='error') c group by command order by c desc` | c spikes for a single command | +| Call volume drops to zero | `* \| select count(*) as n` | n == 0 (5-minute window) | + +The notification channel can be a DingTalk bot webhook directly. + +## Security notes + +- Use a strong random string for `INGEST_TOKEN`, keep it in sync with the dws + side, and never leave it empty. +- Prefer the FC service role (STS); do not put a long-lived AccessKey in env vars. +- This service only accepts **anonymous dimension** data — no user content or + identity; the privacy boundary is guaranteed by the dws client. diff --git a/docs/telemetry/fc-sls-ingest/app.py b/docs/telemetry/fc-sls-ingest/app.py new file mode 100644 index 00000000..f97e38ac --- /dev/null +++ b/docs/telemetry/fc-sls-ingest/app.py @@ -0,0 +1,145 @@ +# Copyright 2026 Alibaba Group +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# +# Reference telemetry ingest for dws (DingTalk Workspace CLI). +# +# Role: the "translator" between dws and SLS. dws POSTs one telemetry Event as +# JSON; SLS cannot accept that raw POST (its write API must be signed), so this +# tiny HTTP service verifies the bearer token, then writes the event into an SLS +# Logstore via PutLogs. +# +# Two modes (auto-detected): +# - SLS mode: all of SLS_ENDPOINT / SLS_PROJECT / SLS_LOGSTORE are set +# (and TELEMETRY_DRYRUN is not truthy) -> writes to SLS. +# - dry-run mode: otherwise -> just logs the event to stdout and returns 204. +# Lets you deploy to Function Compute and verify the pipeline +# end-to-end BEFORE wiring up SLS. The aliyun-log SDK is only +# imported in SLS mode, so dry-run needs no extra dependency. +# +# Deploy as a Function Compute (FC) "Web Function": +# startup command: gunicorn -b 0.0.0.0:9000 app:app +# listen port: 9000 +# See README.md in this directory for the full walkthrough. + +import json +import os +import sys +import time + +from flask import Flask, request, abort + +app = Flask(__name__) + +# Shared secret the CLI sends as `Authorization: Bearer `. This must +# match DWS_TELEMETRY_TOKEN on the dws side. Empty disables auth (NOT advised). +INGEST_TOKEN = os.environ.get("INGEST_TOKEN", "") + +# Fields lifted out of the event into their own SLS columns so they are +# query/aggregation-friendly (the full event is also stored verbatim). +_INDEX_FIELDS = ( + "trace_id", + "corp_id", + "cli_version", + "channel", + "os", + "module", + "command", + "subcommand", + "outcome", + "err_class", + "exit_code", + "duration_ms", +) + + +def _truthy(v): + return str(v).strip().lower() in ("1", "true", "yes", "on") + + +def _sls_mode(): + """SLS mode requires the three SLS vars and no explicit dry-run override.""" + if _truthy(os.environ.get("TELEMETRY_DRYRUN", "")): + return False + return all(os.environ.get(k) for k in ("SLS_ENDPOINT", "SLS_PROJECT", "SLS_LOGSTORE")) + + +# Lazily-built SLS client (only constructed once, only in SLS mode). Kept module +# global so warm FC invocations reuse it. +_client = None + + +def _get_client(): + global _client + if _client is None: + # Imported here (not at module load) so dry-run works without the SDK. + from aliyun.log import LogClient + + _client = LogClient( + os.environ["SLS_ENDPOINT"], + os.environ.get("ALIBABA_CLOUD_ACCESS_KEY_ID", ""), + os.environ.get("ALIBABA_CLOUD_ACCESS_KEY_SECRET", ""), + securityToken=os.environ.get("ALIBABA_CLOUD_SECURITY_TOKEN", "") or None, + ) + return _client + + +def _write_sls(event): + from aliyun.log import LogItem, PutLogsRequest + + item = LogItem() + item.set_time(int(time.time())) + contents = [("event", json.dumps(event, ensure_ascii=False))] + for k in _INDEX_FIELDS: + if k in event and event[k] is not None: + contents.append((k, str(event[k]))) + item.set_contents(contents) + + req = PutLogsRequest( + project=os.environ["SLS_PROJECT"], + logstore=os.environ["SLS_LOGSTORE"], + topic=event.get("schema_version", ""), + source="dws-telemetry", + logitems=[item], + ) + _get_client().put_logs(req) + + +@app.get("/") +def health(): + mode = "sls" if _sls_mode() else "dry-run" + return f"dws telemetry ingest ok (mode={mode})\n", 200 + + +@app.post("/") +def ingest(): + # 1) Auth: bearer check. + if INGEST_TOKEN: + if request.headers.get("Authorization", "") != f"Bearer {INGEST_TOKEN}": + abort(401) + + # 2) Parse one telemetry Event. + try: + event = request.get_json(force=True) + if not isinstance(event, dict): + raise ValueError("body is not a JSON object") + except Exception as e: # noqa: BLE001 - reject any malformed body + abort(400, description=f"bad json: {e}") + + # 3) Dispatch by mode. + if _sls_mode(): + try: + _write_sls(event) + except Exception as e: # noqa: BLE001 - surface SLS errors, never crash + abort(502, description=f"sls put_logs failed: {e}") + else: + # dry-run: emit to stdout (captured by FC function logs) so you can + # confirm the pipeline before SLS is wired up. + print("DRYRUN " + json.dumps(event, ensure_ascii=False), file=sys.stdout, flush=True) + + return "", 204 + + +if __name__ == "__main__": + # Local dev: python app.py, then POST to http://127.0.0.1:9000/ + app.run(host="0.0.0.0", port=int(os.environ.get("PORT", "9000"))) diff --git a/docs/telemetry/fc-sls-ingest/deploy.sh b/docs/telemetry/fc-sls-ingest/deploy.sh new file mode 100755 index 00000000..013ae0b7 --- /dev/null +++ b/docs/telemetry/fc-sls-ingest/deploy.sh @@ -0,0 +1,32 @@ +#!/usr/bin/env bash +# Deploy the dws telemetry ingest to Alibaba Cloud Function Compute via Serverless Devs. +# +# Prereqs (one-time): +# npm i -g @serverless-devs/s +# s config add # add an Aliyun credential alias named "default" +# +# Usage: +# INGEST_TOKEN= ./deploy.sh # dry-run-first deploy (no SLS) +# INGEST_TOKEN= SLS_ENDPOINT=cn-hangzhou.log.aliyuncs.com \ +# SLS_PROJECT=dws-ops SLS_LOGSTORE=dws-telemetry ./deploy.sh # write to SLS +set -euo pipefail +cd "$(dirname "$0")" + +: "${INGEST_TOKEN:?set INGEST_TOKEN to a random shared secret (must match dws-side DWS_TELEMETRY_TOKEN)}" +export SLS_ENDPOINT="${SLS_ENDPOINT:-}" SLS_PROJECT="${SLS_PROJECT:-}" SLS_LOGSTORE="${SLS_LOGSTORE:-}" + +echo "==> building (installs flask/gunicorn/aliyun-log from requirements.txt)" +s build --use-docker || s build + +echo "==> deploying" +s deploy -y + +echo "==> function info (look for the http trigger URL)" +s info + +echo +echo "Verify:" +echo " curl / # expect: mode=dry-run (or mode=sls once SLS_* set)" +echo " curl -XPOST / -H \"Authorization: Bearer \$INGEST_TOKEN\" \\" +echo " -H 'Content-Type: application/json' -d '{\"schema_version\":\"1\",\"command\":\"doc\",\"outcome\":\"ok\",\"duration_ms\":42}'" +echo "Then set DWS_TELEMETRY_URL= + DWS_TELEMETRY_TOKEN=\$INGEST_TOKEN (or bake via ldflags)." diff --git a/docs/telemetry/fc-sls-ingest/localsink.py b/docs/telemetry/fc-sls-ingest/localsink.py new file mode 100644 index 00000000..f25ed712 --- /dev/null +++ b/docs/telemetry/fc-sls-ingest/localsink.py @@ -0,0 +1,81 @@ +#!/usr/bin/env python3 +# Copyright 2026 Alibaba Group +# Licensed under the Apache License, Version 2.0 (the "License"). +# +# Local telemetry sink for testing dws telemetry WITHOUT SLS or Function Compute. +# +# Zero dependencies (Python standard library only). It mimics the ingest +# contract: accepts `POST /` with a JSON body, optionally checks the bearer +# token, pretty-prints each event and appends the raw line to a JSONL file. +# Use this to validate the whole pipeline (dws -> HTTP) before touching any +# cloud infrastructure. +# +# Usage: +# python3 localsink.py # listen on :8799, no auth +# PORT=9000 TOKEN=dev python3 localsink.py +# +# Then point dws at it: +# export DWS_TELEMETRY_ENABLED=true +# export DWS_TELEMETRY_URL=http://127.0.0.1:8799 +# # export DWS_TELEMETRY_TOKEN=dev # only if you set TOKEN above + +import json +import os +from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer + +PORT = int(os.environ.get("PORT", "8799")) +# HOST: 127.0.0.1 (default, local-only/safe). Set HOST=0.0.0.0 to accept POSTs +# from other machines on your LAN — e.g. to use THIS computer as a small central +# collector that teammates' dws report into (token auth strongly recommended then). +HOST = os.environ.get("HOST", "127.0.0.1") +TOKEN = os.environ.get("TOKEN", "") +OUTFILE = os.environ.get("OUTFILE", "/tmp/dws_telemetry.jsonl") +# APPEND=1 keeps history across restarts (central collector); default truncates. +APPEND = os.environ.get("APPEND", "") not in ("", "0", "false", "no") + +_count = 0 + + +class Handler(BaseHTTPRequestHandler): + def do_GET(self): # health check + self._send(200, "dws local telemetry sink ok\n") + + def do_POST(self): + global _count + if TOKEN and self.headers.get("Authorization", "") != f"Bearer {TOKEN}": + self._send(401, "unauthorized\n") + return + n = int(self.headers.get("Content-Length", "0")) + raw = self.rfile.read(n) + try: + event = json.loads(raw) + except Exception as e: # noqa: BLE001 + self._send(400, f"bad json: {e}\n") + return + + _count += 1 + with open(OUTFILE, "ab") as f: + f.write(raw + b"\n") + print(f"\n#{_count} ({len(raw)} bytes) -> {OUTFILE}") + print(json.dumps(event, indent=2, ensure_ascii=False), flush=True) + self._send(204, "") + + def _send(self, code, body): + data = body.encode("utf-8") + self.send_response(code) + self.send_header("Content-Length", str(len(data))) + self.end_headers() + if data: + self.wfile.write(data) + + def log_message(self, *args): # silence default access logging + pass + + +if __name__ == "__main__": + if not APPEND: + open(OUTFILE, "w").close() # truncate previous run (test default) + auth = f"(bearer required: {TOKEN!r})" if TOKEN else "(no auth — set TOKEN!)" + print(f"dws local telemetry sink listening on http://{HOST}:{PORT} {auth}") + print(f"capturing to {OUTFILE} (append={APPEND})\n") + ThreadingHTTPServer((HOST, PORT), Handler).serve_forever() diff --git a/docs/telemetry/fc-sls-ingest/requirements.txt b/docs/telemetry/fc-sls-ingest/requirements.txt new file mode 100644 index 00000000..7c26998c --- /dev/null +++ b/docs/telemetry/fc-sls-ingest/requirements.txt @@ -0,0 +1,4 @@ +flask>=3.0 +gunicorn>=21.0 +# aliyun-log is only needed in SLS mode; dry-run mode (no SLS_* env) runs without it. +aliyun-log-python-sdk>=0.9.0 diff --git a/docs/telemetry/fc-sls-ingest/s.yaml b/docs/telemetry/fc-sls-ingest/s.yaml new file mode 100644 index 00000000..6dd8a733 --- /dev/null +++ b/docs/telemetry/fc-sls-ingest/s.yaml @@ -0,0 +1,58 @@ +# Serverless Devs (s) deploy spec for the dws telemetry ingest (FC web function). +# +# One-time prep: +# s config add # add an Aliyun credential alias named "default" +# export INGEST_TOKEN= # shared secret (must equal dws-side DWS_TELEMETRY_TOKEN) +# # optional — omit for a dry-run-first deploy, add later to write to SLS: +# # export SLS_ENDPOINT=cn-hangzhou.log.aliyuncs.com +# # export SLS_PROJECT=dws-ops +# # export SLS_LOGSTORE=dws-telemetry +# +# Deploy: s build && s deploy -y (or just ./deploy.sh) +# URL: s info (look for the http trigger url) +edition: 3.0.0 +name: dws-telemetry-ingest +access: default + +vars: + region: cn-hangzhou # change to your region + +resources: + ingest: + component: fc3 + props: + region: ${vars.region} + functionName: dws-telemetry-ingest + description: dws ops telemetry ingest (FC web function -> SLS) + runtime: python3.10 + code: ./ + handler: app.app # unused by web functions, but a value is required + timeout: 30 + memorySize: 512 + cpu: 0.35 + diskSize: 512 + instanceConcurrency: 20 + # Grant SLS write so app.py uses FC-injected STS creds (no AccessKey in env). + # Fill in your account uid + a role that has AliyunLogFullAccess (or PutLogs): + # role: acs:ram:::role/ + environmentVariables: + INGEST_TOKEN: ${env('INGEST_TOKEN')} + SLS_ENDPOINT: ${env('SLS_ENDPOINT')} + SLS_PROJECT: ${env('SLS_PROJECT')} + SLS_LOGSTORE: ${env('SLS_LOGSTORE')} + customRuntimeConfig: + command: + - gunicorn + args: + - "-b" + - "0.0.0.0:9000" + - "app:app" + port: 9000 + triggers: + - triggerName: http + triggerType: http + triggerConfig: + authType: anonymous # app.py enforces the Bearer token itself + methods: + - GET + - POST diff --git a/internal/app/runner.go b/internal/app/runner.go index eb026b15..188d9ff6 100644 --- a/internal/app/runner.go +++ b/internal/app/runner.go @@ -299,9 +299,12 @@ func (r *runtimeRunner) executeInvocation(ctx context.Context, endpoint string, errReason = retErr.Error() } } + dur := time.Since(invokeStart) logging.LogCommandEnd(fl, execID, invocation.CanonicalProduct, invocation.Tool, - retErr == nil, time.Since(invokeStart), errCat, errReason) + retErr == nil, dur, errCat, errReason) + // Anonymous ops telemetry (opt-in, dimensions only). No-op when disabled. + emitTelemetry(execID, invocation, retErr == nil, errCat, dur) }() // Check if this product has plugin-level auth credentials registered. diff --git a/internal/app/telemetry_runtime.go b/internal/app/telemetry_runtime.go new file mode 100644 index 00000000..32f5a7d3 --- /dev/null +++ b/internal/app/telemetry_runtime.go @@ -0,0 +1,68 @@ +// Copyright 2026 Alibaba Group +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package app + +import ( + "os" + "runtime" + "time" + + authpkg "github.com/DingTalk-Real-AI/dingtalk-workspace-cli/internal/auth" + "github.com/DingTalk-Real-AI/dingtalk-workspace-cli/internal/executor" + "github.com/DingTalk-Real-AI/dingtalk-workspace-cli/internal/telemetry" +) + +// emitTelemetry ships one anonymous operational metric for a finished +// invocation. It is cheap to skip: telemetry.NewForwarderFromEnv returns nil +// (after a single env read) when telemetry is disabled, so the hot path pays +// nothing and never loads the token or touches request content. +// +// Only coarse dimensions are collected here — there is intentionally no path +// that reads param values, object names, or natural-language input. +func emitTelemetry(execID string, inv executor.Invocation, ok bool, errClass string, dur time.Duration) { + fwd := telemetry.NewForwarderFromEnv() + if fwd == nil { + return + } + + // Disclosure: print the telemetry notice once per machine the first time it + // becomes active (required because a downstream build may ship it on by + // default). Best-effort; never blocks the command. + telemetry.ShowNoticeOnce(defaultConfigDir()) + + ev := telemetry.New(time.Now(), execID) + ev.CLIVersion = version + ev.Channel = os.Getenv(envDWSChannel) + ev.OS = runtime.GOOS + ev.Module = inv.CanonicalProduct + ev.Command = inv.CanonicalProduct + ev.Subcommand = inv.Tool + ev.DurationMS = dur.Milliseconds() + + // corp_id is the only identity-adjacent dimension, kept for per-tenant + // health. Best-effort: a missing/locked token simply omits it. + if td, err := authpkg.LoadTokenData(defaultConfigDir()); err == nil && td != nil { + ev.CorpID = td.CorpID + } + + if ok { + ev.Outcome = "ok" + } else { + ev.Outcome = "error" + ev.ErrClass = errClass + ev.ExitCode = 1 + } + + _ = fwd.Emit(ev) +} diff --git a/internal/app/telemetry_runtime_test.go b/internal/app/telemetry_runtime_test.go new file mode 100644 index 00000000..7b21876e --- /dev/null +++ b/internal/app/telemetry_runtime_test.go @@ -0,0 +1,113 @@ +// Copyright 2026 Alibaba Group +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package app + +import ( + "encoding/json" + "io" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/DingTalk-Real-AI/dingtalk-workspace-cli/internal/executor" + "github.com/DingTalk-Real-AI/dingtalk-workspace-cli/internal/telemetry" +) + +// TestEmitTelemetryWiresEvent proves the app-layer hook assembles a correct, +// content-free event and ships it when telemetry is enabled. +func TestEmitTelemetryWiresEvent(t *testing.T) { + var body []byte + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + body, _ = io.ReadAll(r.Body) + w.WriteHeader(http.StatusOK) + })) + defer srv.Close() + + t.Setenv(telemetry.EnvEnabled, "true") + t.Setenv(telemetry.EnvURL, srv.URL) + t.Setenv(envDWSChannel, "openclaw") + + inv := executor.Invocation{ + CanonicalProduct: "doc", + Tool: "create", + // Params carry content; telemetry must NOT read any of it. + Params: map[string]any{"title": "Q3 Earnings", "doc_id": "doc-secret-123"}, + } + + emitTelemetry("trace-xyz", inv, false, "validation", 123*time.Millisecond) + + if len(body) == 0 { + t.Fatal("no telemetry was POSTed") + } + var ev telemetry.Event + if err := json.Unmarshal(body, &ev); err != nil { + t.Fatalf("non-JSON body %q: %v", body, err) + } + + if ev.TraceID != "trace-xyz" { + t.Errorf("trace_id=%q", ev.TraceID) + } + if ev.Command != "doc" || ev.Subcommand != "create" { + t.Errorf("command/subcommand=%q/%q", ev.Command, ev.Subcommand) + } + if ev.Outcome != "error" || ev.ErrClass != "validation" || ev.ExitCode != 1 { + t.Errorf("outcome wiring wrong: %+v", ev) + } + if ev.DurationMS != 123 { + t.Errorf("duration_ms=%d, want 123", ev.DurationMS) + } + if ev.Channel != "openclaw" { + t.Errorf("channel=%q, want openclaw", ev.Channel) + } + if ev.OS == "" { + t.Error("os dimension should be set") + } + + // Privacy boundary: no param content may ever leak into the wire payload. + raw := string(body) + for _, secret := range []string{"Q3 Earnings", "doc-secret-123", "title"} { + if contains(raw, secret) { + t.Errorf("telemetry payload leaked content %q: %s", secret, raw) + } + } +} + +// TestEmitTelemetryNoopWhenDisabled proves the hot path is silent when off. +func TestEmitTelemetryNoopWhenDisabled(t *testing.T) { + called := false + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + called = true + w.WriteHeader(http.StatusOK) + })) + defer srv.Close() + + t.Setenv(telemetry.EnvEnabled, "") + t.Setenv(telemetry.EnvURL, srv.URL) + + emitTelemetry("t", executor.Invocation{CanonicalProduct: "doc", Tool: "get"}, true, "", time.Second) + + if called { + t.Fatal("telemetry was sent while disabled") + } +} + +func contains(s, sub string) bool { + for i := 0; i+len(sub) <= len(s); i++ { + if s[i:i+len(sub)] == sub { + return true + } + } + return false +} diff --git a/internal/telemetry/event.go b/internal/telemetry/event.go new file mode 100644 index 00000000..94a2aa2f --- /dev/null +++ b/internal/telemetry/event.go @@ -0,0 +1,68 @@ +// Copyright 2026 Alibaba Group +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package telemetry emits anonymous, dimensions-only operational metrics for +// each dws command invocation — error rate, latency, command distribution and +// version/platform health. It is the ops-monitoring counterpart to the audit +// package, but deliberately MUCH smaller: +// +// - It carries ONLY coarse dimensions: never object names, free text, peer +// ids, device fingerprints, or the user's natural-language intent. There is +// nothing to redact because nothing sensitive is ever collected. +// - It is independent of the audit package and its DWS_AUDIT_* switches, so +// an operator can run ops telemetry without enabling compliance auditing. +// - It is OFF by default. The CLI emits nothing unless the operator opts in +// via DWS_TELEMETRY_ENABLED, and the destination is operator-configured. +// +// What it deliberately does NOT collect (so reviewers can verify the privacy +// boundary by inspecting this struct alone): actor identity (user id/name), +// target object names/ids, free-text intent, device id / serial number, and +// request/response bodies. +package telemetry + +import "time" + +// SchemaVersion is bumped on any breaking change to Event's JSON shape. +const SchemaVersion = "1" + +// Event is the full operational record for one dws command. Every field is a +// low-cardinality (or trace) dimension safe to ship to a central ops sink. +type Event struct { + SchemaVersion string `json:"schema_version"` + Timestamp time.Time `json:"ts"` + TraceID string `json:"trace_id"` // == transport execution_id, for joining with server-side logs + + CorpID string `json:"corp_id,omitempty"` // tenant dimension, for per-org health; best-effort + CLIVersion string `json:"cli_version,omitempty"` // "did this release break a command at scale" + Channel string `json:"channel,omitempty"` // which integration/agent drove dws (DWS_CHANNEL) + OS string `json:"os,omitempty"` // runtime.GOOS — coarse platform, not PII + + Module string `json:"module"` // operated product, e.g. "doc" + Command string `json:"command"` // skill command + Subcommand string `json:"subcommand"` // skill subcommand, e.g. "create" + + Outcome string `json:"outcome"` // "ok" | "error" + ErrClass string `json:"err_class,omitempty"` // error category when outcome=error + ExitCode int `json:"exit_code"` + DurationMS int64 `json:"duration_ms"` // wall-clock latency of the invocation +} + +// New stamps the schema version, timestamp and trace id. The caller supplies +// the wall clock so callers stay testable and deterministic. +func New(ts time.Time, traceID string) *Event { + return &Event{ + SchemaVersion: SchemaVersion, + Timestamp: ts, + TraceID: traceID, + } +} diff --git a/internal/telemetry/telemetry.go b/internal/telemetry/telemetry.go new file mode 100644 index 00000000..ef93f4aa --- /dev/null +++ b/internal/telemetry/telemetry.go @@ -0,0 +1,258 @@ +// Copyright 2026 Alibaba Group +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package telemetry + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "net/http" + "os" + "path/filepath" + "strings" + "time" + + "github.com/DingTalk-Real-AI/dingtalk-workspace-cli/pkg/configmeta" +) + +// Environment variables that drive telemetry. +// +// Posture depends on the build (see the defaultURL build-time var and Enabled): +// - Open-source build: OFF; the operator must opt in with EnvEnabled + EnvURL. +// - Downstream build with a baked-in default endpoint: ON by default, so a +// fleet reports to the operator's ingest out of the box; users opt out with +// EnvDisabled. +const ( + // EnvEnabled explicitly turns telemetry on/off ("true"/"1" or "false"/"0"), + // overriding the build posture either way. + EnvEnabled = "DWS_TELEMETRY_ENABLED" + // EnvDisabled is a hard opt-out. When truthy it disables telemetry no matter + // what the build default or EnvEnabled says. + EnvDisabled = "DWS_TELEMETRY_DISABLED" + // EnvURL is the ingest endpoint that receives one JSON Event per POST. It + // overrides the build-time default endpoint when set. + EnvURL = "DWS_TELEMETRY_URL" + // EnvToken is an optional bearer for the ingest endpoint. Overrides the + // build-time default token when set. + EnvToken = "DWS_TELEMETRY_TOKEN" + // EnvTimeoutMS bounds how long a single POST may block command exit. + EnvTimeoutMS = "DWS_TELEMETRY_TIMEOUT_MS" + // EnvFile is the lightest sink: when set, each event is appended as one JSON + // line to this local file instead of being POSTed — no server, no network. + // Ideal for local/per-machine stability monitoring. Takes precedence over URL. + EnvFile = "DWS_TELEMETRY_FILE" +) + +// Build-time defaults, empty in the open-source build so telemetry stays opt-in +// and OFF. A downstream distribution may inject these via -ldflags to ship +// telemetry on-by-default to its own ingest, e.g.: +// +// go build -ldflags "\ +// -X github.com/DingTalk-Real-AI/dingtalk-workspace-cli/internal/telemetry.defaultURL=https:///dws \ +// -X github.com/DingTalk-Real-AI/dingtalk-workspace-cli/internal/telemetry.defaultToken=" ./cmd +// +// The public repo never hardcodes a real endpoint; only a downstream build does. +// This keeps "code is open source" and "data lands in the operator's own sink" +// decoupled. +var ( + defaultURL string + defaultToken string +) + +// defaultTimeout caps how long telemetry may delay command exit. Telemetry is a +// side effect, never a gate: a slow or dead sink must not punish the user. +const defaultTimeout = 1500 * time.Millisecond + +func init() { + for _, it := range []configmeta.ConfigItem{ + {Name: EnvEnabled, Category: configmeta.CategoryDebug, Description: "Explicitly enable/disable ops telemetry (overrides the build default)", Example: "true"}, + {Name: EnvDisabled, Category: configmeta.CategoryDebug, Description: "Hard opt-out of ops telemetry (wins over everything)", Example: "true"}, + {Name: EnvURL, Category: configmeta.CategoryDebug, Description: "Telemetry ingest endpoint (overrides the build-time default; one JSON event POSTed per invocation)", Example: "https://telemetry.example.com/dws"}, + {Name: EnvToken, Category: configmeta.CategoryDebug, Description: "Bearer auth for the telemetry endpoint (optional)", Sensitive: true, Example: "xxxxx"}, + {Name: EnvTimeoutMS, Category: configmeta.CategoryDebug, Description: "Per-report timeout cap in milliseconds (default 1500)", Example: "1500"}, + {Name: EnvFile, Category: configmeta.CategoryDebug, Description: "Local file sink: append each event as one JSON line here instead of POSTing (no server). Takes precedence over URL", Example: "~/.dws/telemetry.jsonl"}, + } { + configmeta.Register(it) + } +} + +// resolvedFile returns the local file sink path (env only). When set, events are +// appended to this file instead of POSTed — the lightest, server-less sink. +func resolvedFile() string { + return expandHome(strings.TrimSpace(os.Getenv(EnvFile))) +} + +// expandHome resolves a leading ~ to the user's home directory. +func expandHome(p string) string { + if p == "~" || strings.HasPrefix(p, "~/") { + if home, err := os.UserHomeDir(); err == nil { + return filepath.Join(home, strings.TrimPrefix(p, "~")) + } + } + return p +} + +// resolvedURL returns the effective ingest endpoint: the env override if set, +// otherwise the build-time default (empty in the open-source build). +func resolvedURL() string { + if u := strings.TrimSpace(os.Getenv(EnvURL)); u != "" { + return u + } + return strings.TrimSpace(defaultURL) +} + +// resolvedToken returns the effective bearer token: env override, else default. +func resolvedToken() string { + if t := strings.TrimSpace(os.Getenv(EnvToken)); t != "" { + return t + } + return strings.TrimSpace(defaultToken) +} + +// Enabled reports whether telemetry should run. +// +// - EnvDisabled (hard opt-out) always wins. +// - With no destination (no env URL and no baked-in default) nothing is sent. +// - EnvEnabled, when set, is an explicit operator override either way. +// - Otherwise: ON only when a default endpoint is baked into the build +// (downstream distribution). A bare env URL in the open-source build stays +// opt-in (off until EnvEnabled is also set). +func Enabled() bool { + if truthy(os.Getenv(EnvDisabled)) { + return false + } + if resolvedURL() == "" && resolvedFile() == "" { + return false + } + if v := strings.TrimSpace(os.Getenv(EnvEnabled)); v != "" { + return truthy(v) + } + // On when a default endpoint is baked into the build (downstream distribution) + // or a local file sink is explicitly set (user opted into local monitoring). + return strings.TrimSpace(defaultURL) != "" || resolvedFile() != "" +} + +// noticeText is the one-time disclosure shown when telemetry is active. Keep it +// short, factual, and actionable (how to opt out). +const noticeText = "ℹ️ dws reports anonymous operational telemetry (command, outcome, latency, version — no content, no identity) to help monitor stability. Opt out anytime with DWS_TELEMETRY_DISABLED=true. Details: docs/telemetry.md" + +// ShowNoticeOnce prints the telemetry disclosure to stderr the first time +// telemetry is active on this machine, then writes a marker so it never repeats. +// Best-effort: any filesystem error silently skips — telemetry, including its +// disclosure, must never disrupt the command. +func ShowNoticeOnce(configDir string) { + if strings.TrimSpace(configDir) == "" { + return + } + marker := filepath.Join(configDir, ".telemetry_notice_shown") + if _, err := os.Stat(marker); err == nil { + return + } + fmt.Fprintln(os.Stderr, noticeText) + _ = os.WriteFile(marker, []byte(time.Now().UTC().Format(time.RFC3339)+"\n"), 0o644) +} + +// Forwarder ships events to the configured endpoint. Best-effort: a transport +// error or non-2xx is returned for logging but never blocks beyond the timeout, +// and the command's own result is unaffected. +type Forwarder struct { + URL string + Token string + File string // local file sink; when set, append JSONL instead of POSTing + Client *http.Client +} + +// NewForwarderFromEnv builds a Forwarder using the effective URL/token, or +// returns nil when telemetry is disabled. A nil *Forwarder's Emit is a safe +// no-op. +func NewForwarderFromEnv() *Forwarder { + if !Enabled() { + return nil + } + return &Forwarder{ + URL: resolvedURL(), + Token: resolvedToken(), + File: resolvedFile(), + Client: &http.Client{Timeout: timeoutFromEnv()}, + } +} + +// Emit ships e as a single JSON object. With a file sink configured it appends +// one JSON line locally (no network); otherwise it POSTs to the URL. A nil +// receiver is a no-op so callers never need a guard. Errors are returned +// (best-effort) but never block command exit past the configured timeout. +func (f *Forwarder) Emit(e *Event) error { + if f == nil || e == nil { + return nil + } + data, err := json.Marshal(e) + if err != nil { + return err + } + + // Local file sink (lightest path): append one JSON line, no network. + if f.File != "" { + fh, openErr := os.OpenFile(f.File, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o644) + if openErr != nil { + return openErr + } + defer fh.Close() + _, writeErr := fh.Write(append(data, '\n')) + return writeErr + } + + ctx, cancel := context.WithTimeout(context.Background(), f.Client.Timeout) + defer cancel() + + req, err := http.NewRequestWithContext(ctx, http.MethodPost, f.URL, bytes.NewReader(data)) + if err != nil { + return err + } + req.Header.Set("Content-Type", "application/json") + req.Header.Set("X-Dws-Telemetry-Schema", SchemaVersion) + if f.Token != "" { + req.Header.Set("Authorization", "Bearer "+f.Token) + } + resp, err := f.Client.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + return fmt.Errorf("telemetry: sink returned %d", resp.StatusCode) + } + return nil +} + +func timeoutFromEnv() time.Duration { + raw := strings.TrimSpace(os.Getenv(EnvTimeoutMS)) + if raw == "" { + return defaultTimeout + } + var ms int + if _, err := fmt.Sscanf(raw, "%d", &ms); err != nil || ms <= 0 { + return defaultTimeout + } + return time.Duration(ms) * time.Millisecond +} + +func truthy(s string) bool { + switch strings.ToLower(strings.TrimSpace(s)) { + case "1", "true", "yes", "on": + return true + default: + return false + } +} diff --git a/internal/telemetry/telemetry_test.go b/internal/telemetry/telemetry_test.go new file mode 100644 index 00000000..f7648da6 --- /dev/null +++ b/internal/telemetry/telemetry_test.go @@ -0,0 +1,205 @@ +// Copyright 2026 Alibaba Group +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package telemetry + +import ( + "encoding/json" + "io" + "net/http" + "net/http/httptest" + "os" + "path/filepath" + "strings" + "testing" + "time" +) + +func TestEnabledRequiresBothSwitchAndURL(t *testing.T) { + cases := []struct { + name, enabled, url string + want bool + }{ + {"both set", "true", "https://x.example/dws", true}, + {"only switch", "true", "", false}, + {"only url", "", "https://x.example/dws", false}, + {"neither", "", "", false}, + {"falsey switch", "0", "https://x.example/dws", false}, + } + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + t.Setenv(EnvEnabled, c.enabled) + t.Setenv(EnvURL, c.url) + if got := Enabled(); got != c.want { + t.Fatalf("Enabled()=%v, want %v", got, c.want) + } + }) + } +} + +func TestEnabledWithBakedInDefaultEndpoint(t *testing.T) { + // Simulate a downstream build that injected a default endpoint via -ldflags. + orig := defaultURL + defaultURL = "https://fleet.example/dws" + t.Cleanup(func() { defaultURL = orig }) + + cases := []struct { + name, enabled, disabled, url string + want bool + }{ + {"default on (no env)", "", "", "", true}, + {"hard opt-out wins", "", "true", "", false}, + {"hard opt-out beats explicit enable", "true", "true", "", false}, + {"explicit disable via enabled=false", "false", "", "", false}, + {"explicit enable", "true", "", "", true}, + {"env url overrides default, still on", "", "", "https://other.example/dws", true}, + } + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + t.Setenv(EnvEnabled, c.enabled) + t.Setenv(EnvDisabled, c.disabled) + t.Setenv(EnvURL, c.url) + if got := Enabled(); got != c.want { + t.Fatalf("Enabled()=%v, want %v", got, c.want) + } + }) + } +} + +func TestFileSinkAppendsAndEnables(t *testing.T) { + path := filepath.Join(t.TempDir(), "telemetry.jsonl") + t.Setenv(EnvEnabled, "") + t.Setenv(EnvURL, "") + t.Setenv(EnvFile, path) + + // A file sink alone is a destination -> enabled (local monitoring opt-in). + if !Enabled() { + t.Fatal("Enabled()=false, want true when DWS_TELEMETRY_FILE is set") + } + fwd := NewForwarderFromEnv() + if fwd == nil { + t.Fatal("expected a forwarder when file sink is set") + } + if fwd.File != path { + t.Fatalf("forwarder File=%q, want %q", fwd.File, path) + } + + // Two events -> two JSON lines, no network. + for _, oc := range []string{"ok", "error"} { + ev := New(time.Unix(0, 0), "t") + ev.Command = "doc" + ev.Outcome = oc + if err := fwd.Emit(ev); err != nil { + t.Fatalf("Emit: %v", err) + } + } + data, err := os.ReadFile(path) + if err != nil { + t.Fatalf("read sink file: %v", err) + } + lines := 0 + for _, l := range strings.Split(strings.TrimSpace(string(data)), "\n") { + if strings.TrimSpace(l) == "" { + continue + } + var ev Event + if err := json.Unmarshal([]byte(l), &ev); err != nil { + t.Fatalf("line not JSON: %q (%v)", l, err) + } + lines++ + } + if lines != 2 { + t.Fatalf("file has %d JSON lines, want 2", lines) + } + + // Hard opt-out still wins over a file sink. + t.Setenv(EnvDisabled, "true") + if Enabled() { + t.Fatal("Enabled()=true with DWS_TELEMETRY_DISABLED set, want false") + } +} + +func TestNewForwarderFromEnvNilWhenDisabled(t *testing.T) { + t.Setenv(EnvEnabled, "") + t.Setenv(EnvURL, "") + if f := NewForwarderFromEnv(); f != nil { + t.Fatalf("expected nil forwarder when disabled, got %+v", f) + } + // Emit on a nil forwarder must be a safe no-op. + var nilFwd *Forwarder + if err := nilFwd.Emit(New(time.Unix(0, 0), "t")); err != nil { + t.Fatalf("nil Emit should be no-op, got %v", err) + } +} + +func TestForwarderEmitPostsJSON(t *testing.T) { + var gotBody []byte + var gotAuth, gotSchema string + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + gotBody, _ = io.ReadAll(r.Body) + gotAuth = r.Header.Get("Authorization") + gotSchema = r.Header.Get("X-Dws-Telemetry-Schema") + w.WriteHeader(http.StatusOK) + })) + defer srv.Close() + + t.Setenv(EnvEnabled, "true") + t.Setenv(EnvURL, srv.URL) + t.Setenv(EnvToken, "secret-token") + + fwd := NewForwarderFromEnv() + if fwd == nil { + t.Fatal("expected a forwarder when enabled") + } + + ev := New(time.Unix(1700000000, 0).UTC(), "trace-123") + ev.CLIVersion = "1.2.3" + ev.Command = "doc" + ev.Subcommand = "create" + ev.Outcome = "ok" + ev.DurationMS = 42 + + if err := fwd.Emit(ev); err != nil { + t.Fatalf("Emit: %v", err) + } + + if gotAuth != "Bearer secret-token" { + t.Errorf("Authorization=%q, want Bearer secret-token", gotAuth) + } + if gotSchema != SchemaVersion { + t.Errorf("schema header=%q, want %q", gotSchema, SchemaVersion) + } + + var decoded Event + if err := json.Unmarshal(gotBody, &decoded); err != nil { + t.Fatalf("server got non-JSON body %q: %v", gotBody, err) + } + if decoded.TraceID != "trace-123" || decoded.Command != "doc" || + decoded.Subcommand != "create" || decoded.Outcome != "ok" || decoded.DurationMS != 42 { + t.Errorf("decoded event mismatch: %+v", decoded) + } +} + +func TestForwarderEmitReturnsErrorOnNon2xx(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + })) + defer srv.Close() + + t.Setenv(EnvEnabled, "true") + t.Setenv(EnvURL, srv.URL) + fwd := NewForwarderFromEnv() + if err := fwd.Emit(New(time.Unix(0, 0), "t")); err == nil { + t.Fatal("expected error on 500 response") + } +} diff --git a/scripts/dev/telemetry_smoke.sh b/scripts/dev/telemetry_smoke.sh new file mode 100755 index 00000000..55b6abb6 --- /dev/null +++ b/scripts/dev/telemetry_smoke.sh @@ -0,0 +1,94 @@ +#!/usr/bin/env bash +# Copyright 2026 Alibaba Group +# Licensed under the Apache License, Version 2.0 (the "License"). +# +# One-shot local smoke test for dws telemetry — NO SLS, NO cloud, NO login. +# Builds dws, starts the zero-dependency local sink, fires a few --mock +# commands, then asserts the pipeline end-to-end: +# - events are received with the expected dimensions +# - command argument content never leaks into the payload (privacy boundary) +# - the bearer token is enforced (401 without it) +# Exits non-zero on any failure, so it is safe to wire into CI / pre-push. +# +# Usage: bash scripts/dev/telemetry_smoke.sh + +set -euo pipefail + +ROOT="$(cd "$(dirname "${BASH_SOURCE[0]}")/../.." && pwd)" +PORT="${PORT:-8799}" +TOKEN="dev" +SENTINEL="PRIVATE_SENTINEL_$$" # unique per run; must NOT appear in any event +BIN="$(mktemp -t dws-smoke.XXXXXX)" +OUT="$(mktemp -t dws-tel.XXXXXX.jsonl)" +SINK_PID="" + +cleanup() { + if [ -n "$SINK_PID" ]; then + kill "$SINK_PID" 2>/dev/null || true + wait "$SINK_PID" 2>/dev/null || true # absorb the job-control "Terminated" notice + fi + rm -f "$BIN" "$OUT" +} +trap cleanup EXIT + +say() { printf '\n\033[1m%s\033[0m\n' "$*"; } +fail() { printf '\033[31mFAIL: %s\033[0m\n' "$*" >&2; exit 1; } + +say "[1/5] build dws" +( cd "$ROOT" && go build -o "$BIN" ./cmd ) +echo " -> $BIN" + +say "[2/5] start local sink on :$PORT" +TOKEN="$TOKEN" PORT="$PORT" OUTFILE="$OUT" \ + python3 "$ROOT/docs/telemetry/fc-sls-ingest/localsink.py" >/dev/null 2>&1 & +SINK_PID=$! +sleep 1.2 +curl -fsS "http://127.0.0.1:$PORT/" >/dev/null || fail "sink not responding on :$PORT" +echo " -> sink up (pid $SINK_PID)" + +say "[3/5] auth enforced (POST without token must be 401)" +code="$(curl -s -o /dev/null -w '%{http_code}' -XPOST "http://127.0.0.1:$PORT/" -d '{}')" +[ "$code" = "401" ] || fail "expected 401 without token, got $code" +echo " -> 401 OK" + +say "[4/5] run --mock commands (telemetry on)" +export DWS_TELEMETRY_ENABLED=true +export DWS_TELEMETRY_URL="http://127.0.0.1:$PORT" +export DWS_TELEMETRY_TOKEN="$TOKEN" +export DWS_CHANNEL="smoke-test" +"$BIN" doc create --title "$SENTINEL" --mock >/dev/null 2>&1 || true +"$BIN" doc create --title other --mock >/dev/null 2>&1 || true +"$BIN" drive list --mock >/dev/null 2>&1 || true +sleep 1 + +say "[5/5] assert captured events" +python3 - "$OUT" "$SENTINEL" <<'PY' +import json, sys, collections +path, sentinel = sys.argv[1], sys.argv[2] +rows = [json.loads(l) for l in open(path) if l.strip()] +if len(rows) < 3: + print(f"FAIL: expected >=3 events, got {len(rows)}", file=sys.stderr); sys.exit(1) + +required = ("schema_version","trace_id","cli_version","os","command","subcommand","outcome","duration_ms") +for r in rows: + miss = [k for k in required if k not in r] + if miss: + print(f"FAIL: event missing fields {miss}: {r}", file=sys.stderr); sys.exit(1) + if r["channel"] != "smoke-test": + print(f"FAIL: channel not propagated: {r.get('channel')!r}", file=sys.stderr); sys.exit(1) + +# privacy boundary: the sentinel title must never appear anywhere in the payload +raw = open(path, encoding="utf-8").read() +if sentinel in raw: + print("FAIL: command content LEAKED into telemetry payload", file=sys.stderr); sys.exit(1) + +by = collections.defaultdict(lambda: {"n":0,"err":0,"d":[]}) +for r in rows: + k=f"{r['command']}/{r['subcommand']}"; b=by[k] + b["n"]+=1; b["err"]+=(r["outcome"]!="ok"); b["d"].append(r["duration_ms"]) +print(f" {len(rows)} events, all dimensions present, no content leak") +for k,v in sorted(by.items(), key=lambda x:-x[1]['n']): + d=v["d"]; print(f" {k:<26} calls {v['n']} err {v['err']} avg {sum(d)//len(d)}ms max {max(d)}ms") +PY + +say "PASS — telemetry pipeline healthy"