diff --git a/.importlinter b/.importlinter index aef764a2..6fde2af9 100644 --- a/.importlinter +++ b/.importlinter @@ -38,6 +38,7 @@ source_modules = aai_cli.transcribe_batch aai_cli.transcribe_exec aai_cli.transcribe_render + aai_cli.webhook_listen aai_cli.wer aai_cli.ws aai_cli.youtube @@ -66,6 +67,7 @@ modules = aai_cli.commands.telemetry aai_cli.commands.transcribe aai_cli.commands.transcripts + aai_cli.commands.webhooks [importlinter:contract:3] name = Library layers do not depend on Rich rendering diff --git a/AGENTS.md b/AGENTS.md index 03e45bc8..d38fc0b0 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -162,7 +162,7 @@ A Typer CLI. `aai_cli/main.py` builds the `app`, registers each command sub-app, ### Command layer -Each file in `aai_cli/commands/` is a Typer sub-app (`transcribe`, `stream`, `agent`, `speak`, `llm`, `transcripts`, `login` (login/logout/whoami), `doctor`, `init`, `dev`, `share`, `deploy`, `setup`, `onboard`, `account` (balance/usage/limits), `keys`, `sessions`, `audit`, `telemetry` (status/enable/disable)). Command bodies run through `context.run_command(ctx, fn, json=...)`, which maps any `CLIError` to clean stderr output + the error's exit code. Commands never print tracebacks for expected failures. +Each file in `aai_cli/commands/` is a Typer sub-app (`transcribe`, `stream`, `agent`, `speak`, `llm`, `transcripts`, `login` (login/logout/whoami), `doctor`, `init`, `dev`, `share`, `deploy`, `setup`, `onboard`, `account` (balance/usage/limits), `keys`, `sessions`, `audit`, `telemetry` (status/enable/disable), `webhooks` (listen)). Command bodies run through `context.run_command(ctx, fn, json=...)`, which maps any `CLIError` to clean stderr output + the error's exit code. Commands never print tracebacks for expected failures. **Options/run split for flag-heavy commands** (gh-CLI style): the Typer function only parses argv into a frozen `Options` dataclass and hands it to a module-level `run_(opts, state, *, json_mode)` through a thin lambda adapter in `run_command(ctx, ..., json=...)`. The five run commands follow it — `aai_cli/stream_exec.py` (the reference implementation), `transcribe_exec.py`, `agent_exec.py`, `speak_exec.py`, `llm_exec.py`. Because the run path is a plain function of data, tests construct options directly (`dataclasses.replace` off a defaults instance, see `tests/test_stream_exec.py` and `tests/test_command_options_seam.py`) instead of round-tripping argv through `CliRunner` — which is also the cheap way to kill mutation-gate mutants on orchestration lines. Follow this for new or heavily-reworked commands with long bodies; small commands keep the inline `body()` closure — the dataclass is pure ceremony there. diff --git a/README.md b/README.md index ff7a53dc..3c37c727 100644 --- a/README.md +++ b/README.md @@ -58,12 +58,13 @@ neither. ## 📋 Key Features - **Transcription**: `assembly transcribe` handles files, URLs, and YouTube/podcast pages, with flags for speaker labels, PII redaction, summarization, sentiment, chapters, and more. -- **Batch transcription**: point `assembly transcribe` at a directory or glob (or pipe paths with `--from-stdin`) to transcribe everything concurrently, with sidecar files that make re-runs resumable. +- **Batch transcription**: point `assembly transcribe` at a directory or glob (or pipe paths with `--from-stdin`) to transcribe everything concurrently, with sidecar files that make re-runs resumable. Add `--llm "prompt"` to run an LLM prompt over each finished transcript, saved into the sidecars. - **Real-time streaming**: `assembly stream` transcribes the microphone, a file, or a URL live — on macOS it can capture system audio too. - **Voice agent**: `assembly agent` runs a full-duplex spoken conversation in your terminal (use headphones). - **LLM Gateway**: `assembly llm` prompts an LLM over a transcript, stdin, or a live stream (`assembly stream --llm "summarize as I talk"`). - **Model evaluation**: `assembly eval` transcribes a Hugging Face dataset (with built-in aliases for common benchmarks: `assembly eval tedlium`) or a local `.csv`/`.jsonl` manifest and scores WER against its references — handy for picking a speech model. - **Starter apps**: `assembly init` scaffolds a self-contained FastAPI + HTML app (`audio-transcription`, `live-captions`, `voice-agent`). +- **Webhook testing**: `assembly webhooks listen` opens a public dev URL (cloudflared quick tunnel) that prints webhook deliveries as they arrive and can forward them to your local app with `--forward-to`. - **Code generation**: add `--show-code` to `transcribe`/`stream`/`agent` to print the equivalent Python SDK script instead of running. - **Account self-service**: `assembly keys` / `balance` / `usage` / `limits` / `sessions` / `audit` via browser login. diff --git a/aai_cli/commands/share.py b/aai_cli/commands/share.py index 488e4a4e..83291380 100644 --- a/aai_cli/commands/share.py +++ b/aai_cli/commands/share.py @@ -2,16 +2,12 @@ from __future__ import annotations import os -import shutil -import subprocess -import sys -import tempfile from pathlib import Path import typer from rich.markup import escape -from aai_cli import config, help_panels, options, output, steps +from aai_cli import help_panels, options, output, steps from aai_cli.context import AppState, run_command from aai_cli.errors import CLIError from aai_cli.help_text import examples_epilog @@ -21,30 +17,6 @@ app = typer.Typer() -# brew exists only on macOS; everywhere else point at Cloudflare's install docs. -_CLOUDFLARED_DOCS = ( - "https://developers.cloudflare.com/cloudflare-one/connections/connect-networks/downloads/" -) - - -def _cloudflared_install_hint() -> str: - # A ternary (not an if/return) so neither branch reads as unreachable under - # mypy --warn-unreachable, which targets one platform at a time: on macOS the - # second return looked dead, on Linux the first would. - hint = "brew install cloudflared" if sys.platform == "darwin" else _CLOUDFLARED_DOCS - return f"Install it: {hint}" - - -def _require_cloudflared() -> None: - if shutil.which(tunnel.CLOUDFLARED) is None: - raise CLIError( - "cloudflared is required to share a public link.", - error_type="missing_dependency", - exit_code=1, - suggestion=_cloudflared_install_hint(), - ) - - def _render_share(data: dict[str, object]) -> str: return ( f"[aai.heading]Sharing[/aai.heading] [aai.url]{escape(str(data['url']))}[/aai.url]\n" @@ -53,11 +25,6 @@ def _render_share(data: dict[str, object]) -> str: ) -def _terminate(proc: subprocess.Popen[str] | None) -> None: - if proc is not None and proc.poll() is None: - proc.terminate() - - def run_share(*, port: int, no_install: bool, json_mode: bool, quiet: bool) -> None: """Boot the app and expose it on a public cloudflared quick-tunnel URL.""" target = Path.cwd() @@ -67,7 +34,7 @@ def run_share(*, port: int, no_install: bool, json_mode: bool, quiet: bool) -> N devserver.notify_port_change(port, chosen_port, json_mode=json_mode, quiet=quiet) env = {**os.environ, "PORT": str(chosen_port)} web = procfile.web_argv(target, env=env) # validates we're in a scaffolded project - _require_cloudflared() + tunnel.require_cloudflared("share a public link") report: list[steps.Step] = [ devserver.install_step(target, no_install=no_install, use_uv=use_uv) @@ -77,7 +44,7 @@ def run_share(*, port: int, no_install: bool, json_mode: bool, quiet: bool) -> N raise typer.Exit(code=1) server = runner.spawn(devserver.dev_command(target, web, use_uv=use_uv), cwd=target, env=env) - proxy: subprocess.Popen[str] | None = None + proxy = None log_path: Path | None = None keep_log = False try: @@ -87,16 +54,7 @@ def run_share(*, port: int, no_install: bool, json_mode: bool, quiet: bool) -> N error_type="server_error", exit_code=1, ) - fd, name = tempfile.mkstemp(prefix="aai-tunnel-", suffix=".log") - os.close(fd) - log_path = Path(name) - # The tunnel binary only proxies the port; don't hand it the API key the - # dev server needs (keeps the secret out of cloudflared's logs/diagnostics). - tunnel_env = {k: v for k, v in os.environ.items() if k != config.ENV_API_KEY} - proxy = runner.spawn( - tunnel.tunnel_command(chosen_port), cwd=target, env=tunnel_env, log_path=log_path - ) - public = tunnel.await_url(log_path) + proxy, public, log_path = tunnel.open_quick_tunnel(chosen_port, cwd=target) if public is None: # Keep the captured cloudflared output: it's the only evidence of why # the tunnel never came up. @@ -119,8 +77,8 @@ def run_share(*, port: int, no_install: bool, json_mode: bool, quiet: bool) -> N # block below tears down the tunnel and server. pass finally: - _terminate(proxy) - _terminate(server) + tunnel.terminate(proxy) + tunnel.terminate(server) if log_path is not None and not keep_log: log_path.unlink(missing_ok=True) diff --git a/aai_cli/commands/transcribe.py b/aai_cli/commands/transcribe.py index 21a0320b..5e6873f0 100644 --- a/aai_cli/commands/transcribe.py +++ b/aai_cli/commands/transcribe.py @@ -26,6 +26,7 @@ ("Redact PII for compliance", "assembly transcribe call.mp3 --redact-pii"), ("Summarize a recording", "assembly transcribe call.mp3 --summarization"), ("Ask about the transcript", 'assembly transcribe call.mp3 --llm "List action items"'), + ("Summarize a whole folder", 'assembly transcribe ./calls --llm "Summarize this call"'), ] ), ) @@ -257,7 +258,7 @@ def transcribe( webhook_url: str | None = typer.Option( None, "--webhook-url", - help="Webhook URL for completion.", + help="Webhook URL for completion (get a dev URL with `assembly webhooks listen`).", rich_help_panel=help_panels.OPT_WEBHOOKS, ), webhook_auth_header: str | None = typer.Option( @@ -342,7 +343,9 @@ def transcribe( Batch mode: pass a directory or glob (or pipe a list with --from-stdin) to transcribe many sources concurrently. Each source gets a .aai.json sidecar - with the full result, and a re-run skips sources already transcribed. + with the full result (including any --llm responses), and a re-run skips + sources already transcribed — with changed --llm prompts it replays just + the LLM step, never a second transcription. Curated flags cover common features; --config KEY=VALUE and --config-file reach every other field. Analysis (summary, chapters, ...) renders in human mode. """ diff --git a/aai_cli/commands/webhooks.py b/aai_cli/commands/webhooks.py new file mode 100644 index 00000000..4aa9af1e --- /dev/null +++ b/aai_cli/commands/webhooks.py @@ -0,0 +1,71 @@ +# aai_cli/commands/webhooks.py +from __future__ import annotations + +import typer + +from aai_cli import options, webhook_listen +from aai_cli.context import AppState, run_command +from aai_cli.help_text import examples_epilog + +app = typer.Typer(help="Receive webhook deliveries on a public dev URL.", no_args_is_help=True) + + +@app.command( + epilog=examples_epilog( + [ + ("Listen on a public URL", "assembly webhooks listen"), + ( + "Deliver a transcription to it", + "assembly transcribe call.mp3 --webhook-url https://….trycloudflare.com", + ), + ( + "Forward deliveries to your app", + "assembly webhooks listen --forward-to http://localhost:8000/webhook", + ), + ("Local-only sink (no tunnel)", "assembly webhooks listen --no-tunnel"), + ("Stop after the first delivery", "assembly webhooks listen --max-events 1"), + ] + ), +) +def listen( + ctx: typer.Context, + port: int = typer.Option( + 8989, "--port", help="Local listener port (the first free port from here)." + ), + forward_to: str | None = typer.Option( + None, "--forward-to", help="Re-POST each delivery to this URL (e.g. your local app)." + ), + no_tunnel: bool = typer.Option( + False, "--no-tunnel", help="Local-only: skip the cloudflared public URL." + ), + max_events: int = typer.Option( + 0, # pragma: no mutate (0 = serve forever; only observable by never exiting) + "--max-events", + min=0, + help="Exit after this many deliveries (0 = run until Ctrl-C).", + ), + json_out: bool = options.json_option("One NDJSON record per delivery."), +) -> None: + """Receive AssemblyAI webhooks on a public URL and watch them arrive. + + Opens a cloudflared quick tunnel to a local listener and prints the public + URL to pass as --webhook-url (or webhook_url in the API). Each delivery is + acknowledged with HTTP 200 and printed; --forward-to relays it to your app, + so you can build webhook handlers without deploying a public endpoint. + + Requires cloudflared unless --no-tunnel (macOS: `brew install cloudflared`; other + platforms: https://developers.cloudflare.com/cloudflare-one/connections/connect-networks/downloads/). + """ + + def body(_state: AppState, json_mode: bool) -> None: + webhook_listen.run_listen( + webhook_listen.ListenOptions( + port=port, + forward_to=forward_to, + public=not no_tunnel, + max_events=max_events, + ), + json_mode=json_mode, + ) + + run_command(ctx, body, json=json_out) diff --git a/aai_cli/init/tunnel.py b/aai_cli/init/tunnel.py index 27f45fc1..2a9e6faa 100644 --- a/aai_cli/init/tunnel.py +++ b/aai_cli/init/tunnel.py @@ -1,23 +1,78 @@ # aai_cli/init/tunnel.py from __future__ import annotations +import os import re +import shutil +import subprocess +import sys +import tempfile import time from collections.abc import Callable from pathlib import Path +from aai_cli import config +from aai_cli.errors import CLIError +from aai_cli.init import runner + # cloudflared binary name; resolved via shutil.which by callers. CLOUDFLARED = "cloudflared" +# brew exists only on macOS; everywhere else point at Cloudflare's install docs. +_CLOUDFLARED_DOCS = ( + "https://developers.cloudflare.com/cloudflare-one/connections/connect-networks/downloads/" +) + # A cloudflared quick tunnel prints an ephemeral https://.trycloudflare.com URL. _URL = re.compile(r"https://[a-z0-9-]+\.trycloudflare\.com") +def install_hint() -> str: + # A ternary (not an if/return) so neither branch reads as unreachable under + # mypy --warn-unreachable, which targets one platform at a time: on macOS the + # second return looked dead, on Linux the first would. + hint = "brew install cloudflared" if sys.platform == "darwin" else _CLOUDFLARED_DOCS + return f"Install it: {hint}" + + +def require_cloudflared(purpose: str) -> None: + """Raise a clean missing-dependency error when cloudflared isn't on PATH.""" + if shutil.which(CLOUDFLARED) is None: + raise CLIError( + f"cloudflared is required to {purpose}.", + error_type="missing_dependency", + exit_code=1, + suggestion=install_hint(), + ) + + def tunnel_command(port: int) -> list[str]: """The cloudflared quick-tunnel command pointing at the local server.""" return [CLOUDFLARED, "tunnel", "--url", f"http://localhost:{port}"] +def open_quick_tunnel(port: int, *, cwd: Path) -> tuple[subprocess.Popen[str], str | None, Path]: + """Spawn a cloudflared quick tunnel for ``port``: (process, URL or None, log path). + + The tunnel binary only proxies the port, so the API key is stripped from its + environment (keeps the secret out of cloudflared's logs/diagnostics). A None + URL means cloudflared never reported one — the caller should keep the log + file (the only evidence of why) and name it; on success it should unlink it. + """ + fd, name = tempfile.mkstemp(prefix="aai-tunnel-", suffix=".log") + os.close(fd) + log_path = Path(name) + env = {k: v for k, v in os.environ.items() if k != config.ENV_API_KEY} + process = runner.spawn(tunnel_command(port), cwd=cwd, env=env, log_path=log_path) + return process, await_url(log_path), log_path + + +def terminate(process: subprocess.Popen[str] | None) -> None: + """Terminate a spawned process if it's still running (None / exited: no-op).""" + if process is not None and process.poll() is None: + process.terminate() + + def find_url(text: str) -> str | None: """The first trycloudflare.com URL in `text`, or None.""" match = _URL.search(text) diff --git a/aai_cli/main.py b/aai_cli/main.py index 2a92f9ad..5c02aa3b 100644 --- a/aai_cli/main.py +++ b/aai_cli/main.py @@ -41,6 +41,7 @@ telemetry, transcribe, transcripts, + webhooks, ) from aai_cli.context import AppState, env_override_warning, resolve_environment from aai_cli.errors import CLIError, NotAuthenticated, UsageError @@ -67,6 +68,7 @@ "speak", "llm", "eval", + "webhooks", # Setup & Tools — get set up & maintain "doctor", "setup", @@ -399,6 +401,7 @@ def main( app.add_typer(setup.app, name="setup", rich_help_panel=help_panels.SETUP) app.add_typer(telemetry.app, name="telemetry", rich_help_panel=help_panels.SETUP) app.add_typer(keys.app, name="keys", rich_help_panel=help_panels.ACCOUNT) +app.add_typer(webhooks.app, name="webhooks", rich_help_panel=help_panels.TRANSCRIPTION) @app.command( diff --git a/aai_cli/transcribe_batch.py b/aai_cli/transcribe_batch.py index e4dbedfe..fef03c22 100644 --- a/aai_cli/transcribe_batch.py +++ b/aai_cli/transcribe_batch.py @@ -7,6 +7,11 @@ the resume marker — a re-run skips any source whose sidecar records a completed transcription of the same bytes — so retrying a partly-failed batch only pays for what's missing (``--force`` re-transcribes everything). + +``--llm`` prompts run per source once its transcription is recorded, landing under +the sidecar's ``transform`` key. The chain is resumable on its own: a re-run with +missing or changed prompts replays just the LLM step against the recorded +transcript id, never a second transcription. """ from __future__ import annotations @@ -24,7 +29,7 @@ from rich.live import Live from rich.markup import escape -from aai_cli import client, jsonshape, output, stdio, theme, transcribe_exec +from aai_cli import client, jsonshape, llm, output, stdio, theme, transcribe_exec from aai_cli.errors import CLIError, NotAuthenticated, UsageError, mutually_exclusive if TYPE_CHECKING: @@ -132,10 +137,13 @@ def reject_single_source_flags( *, out: Path | None, output_field: object | None, - llm_prompt: list[str] | None, show_code: bool, ) -> None: - """Batch mode writes one sidecar per source; the single-result flags don't apply.""" + """Batch mode writes one sidecar per source; the single-result flags don't apply. + + ``--llm`` is deliberately not here: in batch mode the chain runs per source and + its steps land in each sidecar. + """ mutually_exclusive( ("--show-code", show_code), ("multiple sources", True), @@ -144,7 +152,6 @@ def reject_single_source_flags( mutually_exclusive( ("--out", out), ("-o/--output", output_field), - ("--llm", llm_prompt), ("multiple sources", True), suggestion=f"Each source gets a '{SIDECAR_SUFFIX}' sidecar with the full result.", ) @@ -185,9 +192,13 @@ def resumable_record(sidecar: Path, *, digest: str | None) -> dict[str, object] return record +def _dump_sidecar(sidecar: Path, record: dict[str, object]) -> None: + sidecar.write_text(json.dumps(record, indent=2, default=str) + "\n") + + def _write_sidecar( sidecar: Path, *, source: str, transcript: aai.Transcript, digest: str | None -) -> None: +) -> dict[str, object]: record: dict[str, object] = { "source": source, "id": transcript.id, @@ -196,7 +207,35 @@ def _write_sidecar( } if digest is not None: record["source_sha256"] = digest - sidecar.write_text(json.dumps(record, indent=2, default=str) + "\n") + _dump_sidecar(sidecar, record) + return record + + +def _transform_record( + api_key: str, transform: transcribe_exec.TransformOptions, *, transcript_id: str +) -> dict[str, object]: + """Run the ``--llm`` chain server-side over the transcript; the sidecar entry.""" + steps = llm.run_chain_steps( + api_key, + transform.prompts, + transcript_id=transcript_id, + model=transform.model, + max_tokens=transform.max_tokens, + ) + return {"model": transform.model, "prompts": transform.prompts, "steps": steps} + + +def _transform_satisfied( + record: dict[str, object], transform: transcribe_exec.TransformOptions +) -> bool: + """True when no ``--llm`` chain was requested, or the sidecar already records this + exact chain (same prompts against the same gateway model).""" + if not transform.prompts: + return True + existing = jsonshape.as_mapping(record.get("transform")) + if existing is None: + return False + return existing.get("prompts") == transform.prompts and existing.get("model") == transform.model @dataclasses.dataclass @@ -218,11 +257,47 @@ def record(self) -> dict[str, str]: return rec +def _resume_one( + api_key: str, + item: _Item, + record: dict[str, object], + sidecar: Path, + *, + transform: transcribe_exec.TransformOptions, +) -> bool: + """Finish a source whose completed transcription the sidecar already holds. + + Skips outright when the recorded ``transform`` satisfies the requested chain; + otherwise replays just the chain against the recorded transcript id. Returns + False (transcribe again) when the record has no id to anchor the chain on. + """ + item.transcript_id = str(record.get("id") or "") + if _transform_satisfied(record, transform): + item.status, item.detail = "skipped", str(sidecar) + return True + if not item.transcript_id: + return False + item.status = "processing" + transformed = _transform_record(api_key, transform, transcript_id=item.transcript_id) + _dump_sidecar(sidecar, dict(record) | {"transform": transformed}) + item.status, item.detail = "completed", str(sidecar) + return True + + def _transcribe_one( - api_key: str, item: _Item, *, transcription_config: aai.TranscriptionConfig, force: bool + api_key: str, + item: _Item, + *, + transcription_config: aai.TranscriptionConfig, + force: bool, + transform: transcribe_exec.TransformOptions, ) -> None: """Worker body: resume from the sidecar, or transcribe and write one. + The ``--llm`` chain runs only after the sidecar records the completed + transcription, so a failed chain leaves a resumable transcription and the + retry pays only for the LLM step. + A per-source failure is recorded on the item and the batch carries on — except NotAuthenticated, which re-raises so ``_drain`` aborts the batch (one rejected key fails every source identically, and auto-login should trigger once). @@ -230,16 +305,18 @@ def _transcribe_one( try: sidecar = sidecar_path(item.source) digest = _source_digest(item.source) - if not force and (record := resumable_record(sidecar, digest=digest)) is not None: - item.transcript_id = str(record.get("id") or "") - item.status, item.detail = "skipped", str(sidecar) + record = None if force else resumable_record(sidecar, digest=digest) + if record is not None and _resume_one(api_key, item, record, sidecar, transform=transform): return item.status = "processing" transcript = transcribe_exec.run_transcription( api_key, item.source, sample=False, transcription_config=transcription_config ) - _write_sidecar(sidecar, source=item.source, transcript=transcript, digest=digest) + fresh = _write_sidecar(sidecar, source=item.source, transcript=transcript, digest=digest) item.transcript_id = transcript.id or "" + if transform.prompts: + transformed = _transform_record(api_key, transform, transcript_id=item.transcript_id) + _dump_sidecar(sidecar, fresh | {"transform": transformed}) item.status, item.detail = "completed", str(sidecar) except CLIError as err: item.status, item.detail = "failed", err.message @@ -285,6 +362,7 @@ def _drain( transcription_config: aai.TranscriptionConfig, concurrency: int, force: bool, + transform: transcribe_exec.TransformOptions, json_mode: bool, ) -> None: """Run the workers, emitting one NDJSON record per finished source under ``--json``. @@ -300,6 +378,7 @@ def _drain( item, transcription_config=transcription_config, force=force, + transform=transform, ): item for item in items } @@ -333,6 +412,7 @@ def run_batch( transcription_config: aai.TranscriptionConfig, concurrency: int, force: bool, + transform: transcribe_exec.TransformOptions, json_mode: bool, quiet: bool, ) -> None: @@ -349,6 +429,7 @@ def run_batch( transcription_config=transcription_config, concurrency=concurrency, force=force, + transform=transform, json_mode=json_mode, ) _summarize(items, json_mode=json_mode, quiet=quiet) diff --git a/aai_cli/transcribe_exec.py b/aai_cli/transcribe_exec.py index 5c602e8a..712d1cc3 100644 --- a/aai_cli/transcribe_exec.py +++ b/aai_cli/transcribe_exec.py @@ -397,7 +397,6 @@ def run_transcribe(opts: TranscribeOptions, state: AppState, *, json_mode: bool) transcribe_batch.reject_single_source_flags( out=opts.out, output_field=opts.output_field, - llm_prompt=opts.llm_prompt, show_code=opts.show_code, ) transcribe_batch.run_batch( @@ -406,6 +405,9 @@ def run_transcribe(opts: TranscribeOptions, state: AppState, *, json_mode: bool) transcription_config=config_builder.construct_transcription_config(merged), concurrency=opts.concurrency, force=opts.force, + transform=TransformOptions( + prompts=list(opts.llm_prompt or []), model=opts.model, max_tokens=opts.max_tokens + ), json_mode=json_mode, quiet=state.quiet, ) diff --git a/aai_cli/webhook_listen.py b/aai_cli/webhook_listen.py new file mode 100644 index 00000000..25e8b7c2 --- /dev/null +++ b/aai_cli/webhook_listen.py @@ -0,0 +1,191 @@ +"""`assembly webhooks listen` engine: a local sink for AssemblyAI webhook deliveries. + +Binds a threaded HTTP server on 127.0.0.1 and exposes it through a cloudflared +quick tunnel — the printed public URL is what ``--webhook-url`` wants. Each +delivery is acknowledged with HTTP 200 immediately and printed as it arrives +(one NDJSON record per delivery under ``--json``); ``--forward-to`` re-POSTs +the body to a local app, with forwarding failures reported on the event rather +than to AssemblyAI. Ctrl-C (or ``--max-events``) stops the listener. +""" + +from __future__ import annotations + +import contextlib +import json +import threading +from collections.abc import Callable +from dataclasses import dataclass +from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer +from pathlib import Path +from typing import TYPE_CHECKING + +from rich.markup import escape + +from aai_cli import jsonshape, output +from aai_cli.errors import CLIError +from aai_cli.init import runner, tunnel + +if TYPE_CHECKING: + import subprocess + + +@dataclass(frozen=True) +class ListenOptions: + """Every `assembly webhooks listen` flag as plain data (options/run split).""" + + port: int + forward_to: str | None + public: bool # False = --no-tunnel (local sink only) + max_events: int # 0 = serve until Ctrl-C + + +def shape_event(path: str, body: bytes) -> dict[str, object]: + """The emitted record for one delivery: the parsed payload, plus pulled-up + ``transcript_id``/``status`` when the body looks like an AssemblyAI webhook.""" + event: dict[str, object] = {"path": path} + try: + payload = json.loads(body.decode()) + except ValueError: # includes UnicodeDecodeError: non-JSON bodies ride as raw text + event["raw"] = body.decode(errors="replace") + return event + event["payload"] = payload + record = jsonshape.as_mapping(payload) + if record is not None and "transcript_id" in record: + event["transcript_id"] = record["transcript_id"] + event["status"] = record.get("status") + return event + + +def forward(url: str, body: bytes, content_type: str) -> dict[str, object]: + """Re-POST a delivery to ``url``; the per-event forwarding-outcome record. + + A refused/failed forward is data on the event, never an exception — the + delivery was already acknowledged to AssemblyAI. + """ + import httpx2 as httpx # deferred: imported per delivery, keeps CLI startup light + + try: + response = httpx.post( + url, + content=body, + headers={"content-type": content_type}, + timeout=10, # pragma: no mutate (tuning constant; no unit-observable behavior) + ) + except httpx.HTTPError as err: + return {"url": url, "error": str(err)} + return {"url": url, "status_code": response.status_code} + + +def _render_event(event: dict[str, object]) -> str: + parts = [f"[aai.heading]→ POST[/aai.heading] {escape(str(event['path']))}"] + if "transcript_id" in event: + parts.append(f"transcript_id={escape(str(event['transcript_id']))}") + parts.append(f"status={escape(str(event['status']))}") + elif "raw" in event: + parts.append(escape(str(event["raw"]))) + fwd = jsonshape.as_mapping(event.get("forward")) + if fwd is not None: + outcome = fwd.get("error") if "error" in fwd else fwd.get("status_code") + parts.append( + f"[aai.muted]→ forwarded to {escape(str(fwd.get('url')))}: " + f"{escape(str(outcome))}[/aai.muted]" + ) + return " ".join(parts) + + +class _EventSink: + """Serializes deliveries from handler threads into one printed record each.""" + + def __init__(self, *, forward_to: str | None, max_events: int, json_mode: bool) -> None: + self._forward_to = forward_to + self._max_events = max_events + self._json_mode = json_mode + self._lock = threading.Lock() + self._count = 0 + self.on_limit: Callable[[], None] = lambda: None # the listen loop's shutdown + + def handle(self, path: str, body: bytes, content_type: str) -> None: + event = shape_event(path, body) + if self._forward_to is not None: + event["forward"] = forward(self._forward_to, body, content_type) + with self._lock: + self._count += 1 + output.emit(event, _render_event, json_mode=self._json_mode) + if self._max_events and self._count >= self._max_events: + self.on_limit() + + +def _make_handler(sink: _EventSink) -> type[BaseHTTPRequestHandler]: + class Handler(BaseHTTPRequestHandler): + def do_POST(self) -> None: + body = self.rfile.read(int(self.headers.get("Content-Length", "0"))) + # Acknowledge before printing/forwarding: AssemblyAI only needs a 2xx. + self.send_response(200) + self.send_header("Content-Type", "application/json") + self.end_headers() + self.wfile.write(b'{"ok": true}') + sink.handle(self.path, body, self.headers.get("Content-Type") or "application/json") + + def log_request(self, code: int | str = "-", size: int | str = "-") -> None: + """BaseHTTPRequestHandler logs every request to stderr; stay quiet.""" + del code, size + + return Handler + + +def _render_listening(data: dict[str, object]) -> str: + return ( + f"[aai.heading]Listening for webhooks[/aai.heading] " + f"[aai.url]{escape(str(data['url']))}[/aai.url]\n" + f"[aai.muted]→ receiving on[/aai.muted] [aai.url]{escape(str(data['local']))}[/aai.url]" + " [aai.muted](Ctrl-C to stop)[/aai.muted]\n" + f"[aai.muted]Try:[/aai.muted] assembly transcribe --sample " + f"--webhook-url {escape(str(data['url']))}" + ) + + +def _announce(public_url: str | None, port: int, *, json_mode: bool) -> None: + local = f"http://127.0.0.1:{port}" + payload: dict[str, object] = {"url": public_url or local, "local": local, "port": port} + output.emit(payload, _render_listening, json_mode=json_mode) + + +def run_listen(opts: ListenOptions, *, json_mode: bool) -> None: + """Bind the sink, open the tunnel, and serve deliveries until stopped.""" + if opts.public: + tunnel.require_cloudflared("expose a public webhook URL") + port = runner.find_free_port(opts.port) + sink = _EventSink(forward_to=opts.forward_to, max_events=opts.max_events, json_mode=json_mode) + server = ThreadingHTTPServer(("127.0.0.1", port), _make_handler(sink)) + # shutdown() is called from a handler thread; serve_forever (main thread) + # notices within its poll interval and returns. + sink.on_limit = server.shutdown + proxy: subprocess.Popen[str] | None = None + log_path: Path | None = None + keep_log = False + try: + public_url: str | None = None + if opts.public: + proxy, public_url, log_path = tunnel.open_quick_tunnel(port, cwd=Path.cwd()) + if public_url is None: + # Keep the captured cloudflared output: it's the only evidence of + # why the tunnel never came up. + keep_log = True + raise CLIError( + "cloudflared didn't report a tunnel URL in time.", + error_type="tunnel_error", + exit_code=1, + suggestion=f"cloudflared's output was kept at {log_path} — " + "check it for errors.", + ) + _announce(public_url, port, json_mode=json_mode) + # Ctrl-C is the expected way to stop a foreground listener. + with contextlib.suppress(KeyboardInterrupt): + server.serve_forever() + finally: + # Close here, not via `with server:` — a tunnel failure raises before the + # serve block, and the bound listening socket must not outlive the command. + server.server_close() + tunnel.terminate(proxy) + if log_path is not None and not keep_log: + log_path.unlink(missing_ok=True) diff --git a/tests/__snapshots__/test_cli_output_snapshots.ambr b/tests/__snapshots__/test_cli_output_snapshots.ambr index 18b8ede0..1a3bad51 100644 --- a/tests/__snapshots__/test_cli_output_snapshots.ambr +++ b/tests/__snapshots__/test_cli_output_snapshots.ambr @@ -1055,7 +1055,9 @@ Batch mode: pass a directory or glob (or pipe a list with --from-stdin) to transcribe many sources concurrently. Each source gets a .aai.json sidecar - with the full result, and a re-run skips sources already transcribed. + with the full result (including any --llm responses), and a re-run skips + sources already transcribed — with changed --llm prompts it replays just + the LLM step, never a second transcription. Curated flags cover common features; --config KEY=VALUE and --config-file reach every other field. Analysis (summary, chapters, ...) renders in human @@ -1175,7 +1177,9 @@ │ repeatable). │ ╰──────────────────────────────────────────────────────────────────────────────╯ ╭─ Webhooks ───────────────────────────────────────────────────────────────────╮ - │ --webhook-url TEXT Webhook URL for completion. │ + │ --webhook-url TEXT Webhook URL for completion (get a │ + │ dev URL with `assembly webhooks │ + │ listen`). │ │ --webhook-auth-header NAME:VALUE Webhook auth header as NAME:VALUE. │ ╰──────────────────────────────────────────────────────────────────────────────╯ ╭─ Translation ────────────────────────────────────────────────────────────────╮ @@ -1217,6 +1221,8 @@ $ assembly transcribe call.mp3 --summarization Ask about the transcript $ assembly transcribe call.mp3 --llm "List action items" + Summarize a whole folder + $ assembly transcribe ./calls --llm "Summarize this call" @@ -1312,6 +1318,55 @@ + ''' +# --- +# name: test_command_help_matches_snapshot[webhooks_listen] + ''' + + Usage: assembly webhooks listen [OPTIONS] + + Receive AssemblyAI webhooks on a public URL and watch them arrive. + + Opens a cloudflared quick tunnel to a local listener and prints the public + URL to pass as --webhook-url (or webhook_url in the API). Each delivery is + acknowledged with HTTP 200 and printed; --forward-to relays it to your app, + so you can build webhook handlers without deploying a public endpoint. + + Requires cloudflared unless --no-tunnel (macOS: `brew install cloudflared`; + other + platforms: + https://developers.cloudflare.com/cloudflare-one/connections/connect-networks/ + downloads/). + + ╭─ Options ────────────────────────────────────────────────────────────────────╮ + │ --port INTEGER Local listener port (the first │ + │ free port from here). │ + │ [default: 8989] │ + │ --forward-to TEXT Re-POST each delivery to this │ + │ URL (e.g. your local app). │ + │ --no-tunnel Local-only: skip the cloudflared │ + │ public URL. │ + │ --max-events INTEGER RANGE [x>=0] Exit after this many deliveries │ + │ (0 = run until Ctrl-C). │ + │ [default: 0] │ + │ --json -j One NDJSON record per delivery. │ + │ --help Show this message and exit. │ + ╰──────────────────────────────────────────────────────────────────────────────╯ + + Examples + Listen on a public URL + $ assembly webhooks listen + Deliver a transcription to it + $ assembly transcribe call.mp3 --webhook-url https://….trycloudflare.com + Forward deliveries to your app + $ assembly webhooks listen --forward-to http://localhost:8000/webhook + Local-only sink (no tunnel) + $ assembly webhooks listen --no-tunnel + Stop after the first delivery + $ assembly webhooks listen --max-events 1 + + + ''' # --- # name: test_command_help_matches_snapshot[whoami] diff --git a/tests/test_share.py b/tests/test_share.py index ae9e0b6a..e1f463cb 100644 --- a/tests/test_share.py +++ b/tests/test_share.py @@ -100,12 +100,12 @@ def test_share_missing_cloudflared_errors_with_docs_url_on_linux(tmp_path, monke def test_cloudflared_install_hint_per_platform(monkeypatch): - from aai_cli.commands import share as share_cmd + from aai_cli.init import tunnel monkeypatch.setattr("sys.platform", "darwin") - assert share_cmd._cloudflared_install_hint() == "Install it: brew install cloudflared" + assert tunnel.install_hint() == "Install it: brew install cloudflared" monkeypatch.setattr("sys.platform", "linux") - assert share_cmd._cloudflared_install_hint() == ( + assert tunnel.install_hint() == ( "Install it: " "https://developers.cloudflare.com/cloudflare-one/connections/connect-networks/downloads/" ) diff --git a/tests/test_smoke.py b/tests/test_smoke.py index a495f581..a1ec7119 100644 --- a/tests/test_smoke.py +++ b/tests/test_smoke.py @@ -157,6 +157,7 @@ def test_help_lists_commands_in_workflow_order(): "speak", "llm", "eval", + "webhooks", # Setup & Tools "doctor", "setup", diff --git a/tests/test_transcribe_batch_llm.py b/tests/test_transcribe_batch_llm.py new file mode 100644 index 00000000..4da1e984 --- /dev/null +++ b/tests/test_transcribe_batch_llm.py @@ -0,0 +1,260 @@ +"""Batch-mode `assembly transcribe --llm`: the per-source LLM chain, its sidecar +`transform` record, and chain-only resume (see test_transcribe_batch.py for the +core batch/sidecar behavior). +""" + +import hashlib +import json + +import pytest +from typer.testing import CliRunner + +from aai_cli import config, transcribe_batch +from aai_cli.main import app + +runner = CliRunner() + +_TRANSCRIBE = "aai_cli.transcribe_exec.client.transcribe" + + +@pytest.fixture(autouse=True) +def workdir(tmp_path, monkeypatch): + # Batch sources and sidecars are resolved relative to the working directory; + # isolate each test in its own tmp cwd. + monkeypatch.chdir(tmp_path) + + +def _auth(): + config.set_api_key("default", "sk_live") + + +def _fake_transcript(mocker, source="x"): + t = mocker.MagicMock() + t.id = f"t_{source}" + t.text = f"text of {source}" + t.status = "completed" + t.json_response = {"id": t.id, "text": t.text, "status": "completed"} + return t + + +def _patch_transcribe(mocker, monkeypatch): + """Patch client.transcribe with a fake that records the audio args it saw.""" + seen = [] + + def fake(api_key, audio, *, config): + seen.append(audio) + return _fake_transcript(mocker, audio) + + monkeypatch.setattr(_TRANSCRIBE, fake) + return seen + + +def _ndjson(result): + return [json.loads(line) for line in result.output.splitlines() if line.startswith("{")] + + +# --- the --llm chain in batch mode ------------------------------------------------ + +_TRANSFORM = "aai_cli.llm.transform_transcript" + + +def _patch_transform(monkeypatch): + """Patch the gateway call under run_chain_steps, recording each call's args.""" + calls = [] + + def fake(api_key, *, prompt, model, max_tokens, transcript_id=None, transcript_text=None): + calls.append( + { + "prompt": prompt, + "model": model, + "max_tokens": max_tokens, + "transcript_id": transcript_id, + } + ) + return f"resp:{prompt}" + + monkeypatch.setattr(_TRANSFORM, fake) + return calls + + +def _sidecar_record(tmp_path, name, data, transform=None): + record = { + "source": name, + "id": "t_old", + "status": "completed", + "transcript": {"id": "t_old"}, + "source_sha256": hashlib.sha256(data).hexdigest(), + } + if transform is not None: + record["transform"] = transform + (tmp_path / f"{name}.aai.json").write_text(json.dumps(record)) + + +def test_batch_llm_stores_chain_steps_in_each_sidecar(tmp_path, mocker, monkeypatch): + _auth() + (tmp_path / "a.mp3").write_bytes(b"aaa") + (tmp_path / "b.mp3").write_bytes(b"bbb") + _patch_transcribe(mocker, monkeypatch) + calls = _patch_transform(monkeypatch) + result = runner.invoke( + app, + [ + "transcribe", + "*.mp3", + "--llm", + "Summarize", + "--model", + "gw-x", + "--max-tokens", + "7", + "--json", + ], + ) + assert result.exit_code == 0 + # One chain per source, against that source's transcript id, with the gateway + # flags passed through (non-default on purpose). + assert {(c["transcript_id"], c["prompt"], c["model"], c["max_tokens"]) for c in calls} == { + ("t_a.mp3", "Summarize", "gw-x", 7), + ("t_b.mp3", "Summarize", "gw-x", 7), + } + sidecar = json.loads((tmp_path / "a.mp3.aai.json").read_text()) + assert sidecar["transform"] == { + "model": "gw-x", + "prompts": ["Summarize"], + "steps": [{"prompt": "Summarize", "output": "resp:Summarize"}], + } + assert sidecar["transcript"]["id"] == "t_a.mp3" # transcription payload kept alongside + records = {r["source"]: r for r in _ndjson(result)} + assert records["a.mp3"] == { + "source": "a.mp3", + "status": "completed", + "id": "t_a.mp3", + "sidecar": "a.mp3.aai.json", + } + + +def test_failed_llm_chain_leaves_resumable_transcription(tmp_path, mocker, monkeypatch): + from aai_cli.errors import APIError + + _auth() + (tmp_path / "a.mp3").write_bytes(b"aaa") + seen = _patch_transcribe(mocker, monkeypatch) + + def boom(api_key, **kwargs): + raise APIError("gateway exploded") + + monkeypatch.setattr(_TRANSFORM, boom) + result = runner.invoke(app, ["transcribe", "*.mp3", "--llm", "Summarize", "--json"]) + assert result.exit_code == 1 + records = {r["source"]: r for r in _ndjson(result) if "source" in r} + assert records["a.mp3"]["status"] == "failed" + assert records["a.mp3"]["error"] == "gateway exploded" + # The transcription itself was recorded before the chain ran... + sidecar = json.loads((tmp_path / "a.mp3.aai.json").read_text()) + assert sidecar["status"] == "completed" + assert "transform" not in sidecar + + # ...so the retry pays only for the LLM step: no second transcription, the + # chain anchored on the recorded transcript id, and the transcription payload + # kept alongside the new transform. + seen.clear() + calls = _patch_transform(monkeypatch) + result = runner.invoke(app, ["transcribe", "*.mp3", "--llm", "Summarize", "--json"]) + assert result.exit_code == 0 + assert seen == [] + assert [c["transcript_id"] for c in calls] == ["t_a.mp3"] + sidecar = json.loads((tmp_path / "a.mp3.aai.json").read_text()) + assert sidecar["transform"]["steps"] == [{"prompt": "Summarize", "output": "resp:Summarize"}] + assert sidecar["transcript"] == { + "id": "t_a.mp3", + "text": "text of a.mp3", + "status": "completed", + } + assert sidecar["source_sha256"] == hashlib.sha256(b"aaa").hexdigest() + assert {r["source"]: r["status"] for r in _ndjson(result)} == {"a.mp3": "completed"} + + +def test_rerun_with_same_llm_chain_skips_entirely(tmp_path, mocker, monkeypatch): + _auth() + (tmp_path / "a.mp3").write_bytes(b"aaa") + _sidecar_record( + tmp_path, + "a.mp3", + b"aaa", + transform={"model": "gw-x", "prompts": ["Summarize"], "steps": []}, + ) + seen = _patch_transcribe(mocker, monkeypatch) + calls = _patch_transform(monkeypatch) + result = runner.invoke( + app, ["transcribe", "*.mp3", "--llm", "Summarize", "--model", "gw-x", "--json"] + ) + assert result.exit_code == 0 + assert seen == [] + assert calls == [] + assert _ndjson(result) == [ + {"source": "a.mp3", "status": "skipped", "id": "t_old", "sidecar": "a.mp3.aai.json"} + ] + + +@pytest.mark.parametrize( + "stored", + [ + {"model": "gw-x", "prompts": ["old prompt"], "steps": []}, # prompts differ + {"model": "gw-other", "prompts": ["Summarize"], "steps": []}, # model differs + None, # pre---llm sidecar with no transform at all + ], +) +def test_changed_llm_chain_replays_llm_only(tmp_path, mocker, monkeypatch, stored): + _auth() + (tmp_path / "a.mp3").write_bytes(b"aaa") + _sidecar_record(tmp_path, "a.mp3", b"aaa", transform=stored) + seen = _patch_transcribe(mocker, monkeypatch) + calls = _patch_transform(monkeypatch) + result = runner.invoke( + app, ["transcribe", "*.mp3", "--llm", "Summarize", "--model", "gw-x", "--json"] + ) + assert result.exit_code == 0 + assert seen == [] # the transcription is already paid for + assert [c["transcript_id"] for c in calls] == ["t_old"] + sidecar = json.loads((tmp_path / "a.mp3.aai.json").read_text()) + assert sidecar["transform"] == { + "model": "gw-x", + "prompts": ["Summarize"], + "steps": [{"prompt": "Summarize", "output": "resp:Summarize"}], + } + assert sidecar["transcript"] == {"id": "t_old"} # the resumed record's payload survives + + +def test_resumed_record_without_id_retranscribes_for_llm(tmp_path, mocker, monkeypatch): + # A completed sidecar that never recorded a transcript id can't anchor a + # server-side chain: the source is transcribed again. + _auth() + url = "https://example.com/ep.mp3" + transcribe_batch.sidecar_path(url).write_text(json.dumps({"status": "completed"})) + seen = _patch_transcribe(mocker, monkeypatch) + calls = _patch_transform(monkeypatch) + result = runner.invoke( + app, ["transcribe", "--from-stdin", "--llm", "Summarize", "--json"], input=url + "\n" + ) + assert result.exit_code == 0 + assert seen == [url] + assert [c["transcript_id"] for c in calls] == [f"t_{url}"] + + +def test_force_with_llm_retranscribes_and_reruns_chain(tmp_path, mocker, monkeypatch): + _auth() + (tmp_path / "a.mp3").write_bytes(b"aaa") + _sidecar_record( + tmp_path, + "a.mp3", + b"aaa", + transform={"model": "gw-x", "prompts": ["Summarize"], "steps": []}, + ) + seen = _patch_transcribe(mocker, monkeypatch) + calls = _patch_transform(monkeypatch) + result = runner.invoke( + app, ["transcribe", "*.mp3", "--force", "--llm", "Summarize", "--model", "gw-x", "--json"] + ) + assert result.exit_code == 0 + assert seen == ["a.mp3"] + assert [c["transcript_id"] for c in calls] == ["t_a.mp3"] diff --git a/tests/test_transcribe_batch_sources.py b/tests/test_transcribe_batch_sources.py index 5553e993..8c835b62 100644 --- a/tests/test_transcribe_batch_sources.py +++ b/tests/test_transcribe_batch_sources.py @@ -185,7 +185,6 @@ def test_expand_sources_directory_error_message_names_the_path(tmp_path): [ (["--out", "x.txt"], "--out", "sidecar with the full result"), (["-o", "text"], "-o/--output", "sidecar with the full result"), - (["--llm", "summarize"], "--llm", "sidecar with the full result"), (["--show-code"], "--show-code", "Pass one file or URL"), ], ) diff --git a/tests/test_webhook_listen.py b/tests/test_webhook_listen.py new file mode 100644 index 00000000..41843264 --- /dev/null +++ b/tests/test_webhook_listen.py @@ -0,0 +1,357 @@ +"""`assembly webhooks listen`: the local sink, event shaping, forwarding, and the +cloudflared tunnel wiring. + +The serving tests bind a real loopback server (tight allow_hosts opt-in) and POST +to it from a background thread; pytest-timeout bounds them so a listener that +never shuts down fails instead of wedging the run. +""" + +import json +import socket +import threading +from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer + +import pytest +from typer.testing import CliRunner + +from aai_cli import webhook_listen +from aai_cli.main import app + +runner = CliRunner() + + +def _free_port(): + with socket.socket() as s: + s.bind(("127.0.0.1", 0)) + return int(s.getsockname()[1]) + + +def _post_when_up(port, payload, results): + import httpx2 as httpx + + from aai_cli.init import runner as init_runner + + assert init_runner.wait_for_port(port, timeout=15) + results.append(httpx.post(f"http://127.0.0.1:{port}/", json=payload, timeout=10)) + + +def _ndjson(result): + return [json.loads(line) for line in result.output.splitlines() if line.startswith("{")] + + +# --- receiving deliveries ---------------------------------------------------------- + + +@pytest.mark.allow_hosts(["127.0.0.1"]) +@pytest.mark.timeout(60) +def test_listen_no_tunnel_emits_banner_and_event_ndjson(): + port = _free_port() + results = [] + payload = {"transcript_id": "t_1", "status": "completed"} + poster = threading.Thread(target=_post_when_up, args=(port, payload, results)) + poster.start() + try: + result = runner.invoke( + app, + ["webhooks", "listen", "--no-tunnel", "--port", str(port), "--max-events", "1", "-j"], + ) + finally: + poster.join() + assert result.exit_code == 0, result.output + local = f"http://127.0.0.1:{port}" + banner = next(o for o in _ndjson(result) if "local" in o) + assert banner == {"url": local, "local": local, "port": port} + event = next(o for o in _ndjson(result) if "payload" in o) + assert event["path"] == "/" + assert event["payload"] == payload + assert event["transcript_id"] == "t_1" + assert event["status"] == "completed" + assert "forward" not in event + # The delivery was acknowledged the way AssemblyAI expects: 200 + JSON body. + assert results[0].status_code == 200 + assert results[0].json() == {"ok": True} + + +@pytest.mark.allow_hosts(["127.0.0.1"]) +@pytest.mark.timeout(60) +def test_listen_human_mode_prints_hint_and_event_line(): + port = _free_port() + results = [] + poster = threading.Thread( + target=_post_when_up, args=(port, {"transcript_id": "t_9", "status": "error"}, results) + ) + poster.start() + try: + result = runner.invoke( + app, ["webhooks", "listen", "--no-tunnel", "--port", str(port), "--max-events", "1"] + ) + finally: + poster.join() + assert result.exit_code == 0, result.output + assert "Listening for webhooks" in result.output + assert f"http://127.0.0.1:{port}" in result.output + assert "--webhook-url" in result.output # the copy-paste hint + assert "t_9" in result.output + assert "status=error" in result.output + assert "{" not in result.output.replace('{"ok": true}', "") # no NDJSON in human mode + + +# --- forwarding -------------------------------------------------------------------- + + +class _Receiver(ThreadingHTTPServer): + """A target app double for --forward-to: records each POST, answers 204.""" + + def __init__(self): + self.seen = [] + + outer = self + + class Handler(BaseHTTPRequestHandler): + def do_POST(self): + body = self.rfile.read(int(self.headers.get("Content-Length", "0"))) + outer.seen.append((self.path, body, self.headers.get("Content-Type"))) + self.send_response(204) + self.end_headers() + + def log_request(self, code="-", size="-"): + pass + + super().__init__(("127.0.0.1", 0), Handler) + + +@pytest.mark.allow_hosts(["127.0.0.1"]) +@pytest.mark.timeout(60) +def test_forward_to_relays_the_body_and_reports_the_status(): + receiver = _Receiver() + receiver_thread = threading.Thread(target=receiver.serve_forever, daemon=True) + receiver_thread.start() + forward_url = f"http://127.0.0.1:{receiver.server_address[1]}/hook" + port = _free_port() + results = [] + payload = {"transcript_id": "t_fw", "status": "completed"} + poster = threading.Thread(target=_post_when_up, args=(port, payload, results)) + poster.start() + try: + result = runner.invoke( + app, + [ + "webhooks", + "listen", + "--no-tunnel", + "--port", + str(port), + "--forward-to", + forward_url, + "--max-events", + "1", + "--json", + ], + ) + finally: + poster.join() + receiver.shutdown() + receiver_thread.join() + receiver.server_close() + assert result.exit_code == 0, result.output + event = next(o for o in _ndjson(result) if "payload" in o) + assert event["forward"] == {"url": forward_url, "status_code": 204} + # The body is relayed byte-for-byte (httpx serializes json= compactly). + compact = json.dumps(payload, separators=(",", ":")).encode() + assert receiver.seen == [("/hook", compact, "application/json")] + + +@pytest.mark.allow_hosts(["127.0.0.1"]) +@pytest.mark.timeout(60) +def test_forward_failure_is_reported_on_the_event_not_fatal(): + closed = _free_port() # nothing listens here: the forward is refused + port = _free_port() + results = [] + poster = threading.Thread(target=_post_when_up, args=(port, {"transcript_id": "x"}, results)) + poster.start() + try: + result = runner.invoke( + app, + [ + "webhooks", + "listen", + "--no-tunnel", + "--port", + str(port), + "--forward-to", + f"http://127.0.0.1:{closed}/hook", + "--max-events", + "1", + "--json", + ], + ) + finally: + poster.join() + assert result.exit_code == 0, result.output # delivery was still acknowledged + event = next(o for o in _ndjson(result) if "payload" in o) + assert event["forward"]["url"] == f"http://127.0.0.1:{closed}/hook" + assert event["forward"]["error"] + assert results[0].status_code == 200 + + +# --- the sink and event shaping (no sockets) --------------------------------------- + + +def test_shape_event_pulls_up_transcript_fields(): + event = webhook_listen.shape_event("/", b'{"transcript_id": "t", "status": "completed"}') + assert event == { + "path": "/", + "payload": {"transcript_id": "t", "status": "completed"}, + "transcript_id": "t", + "status": "completed", + } + + +def test_shape_event_non_json_body_rides_as_raw_text(): + assert webhook_listen.shape_event("/x", b"\xffnot json") == { + "path": "/x", + "raw": "�not json", + } + + +def test_shape_event_non_dict_payload_has_no_transcript_fields(): + assert webhook_listen.shape_event("/", b"[1, 2]") == {"path": "/", "payload": [1, 2]} + + +def test_shape_event_dict_without_transcript_id(): + assert webhook_listen.shape_event("/", b'{"hello": 1}') == { + "path": "/", + "payload": {"hello": 1}, + } + + +def test_render_event_shows_forward_status_and_error(): + line = webhook_listen._render_event( + { + "path": "/", + "transcript_id": "t_r", + "status": "completed", + "forward": {"url": "http://app/hook", "status_code": 200}, + } + ) + assert "t_r" in line + assert "status=completed" in line + assert "http://app/hook" in line + assert "200" in line + line = webhook_listen._render_event( + {"path": "/", "raw": "plain", "forward": {"url": "u", "error": "boom"}} + ) + assert "plain" in line + assert "boom" in line + + +def _sink_events(capsys): + return [json.loads(line) for line in capsys.readouterr().out.splitlines()] + + +def test_sink_max_events_zero_never_fires_the_limit(capsys): + sink = webhook_listen._EventSink(forward_to=None, max_events=0, json_mode=True) + fired = [] + sink.on_limit = lambda: fired.append(1) + sink.handle("/", b"{}", "application/json") + sink.handle("/", b"{}", "application/json") + assert fired == [] + assert len(_sink_events(capsys)) == 2 # both deliveries still emitted + + +def test_sink_fires_the_limit_exactly_at_max_events(capsys): + sink = webhook_listen._EventSink(forward_to=None, max_events=2, json_mode=True) + fired = [] + sink.on_limit = lambda: fired.append(1) + sink.handle("/", b"{}", "application/json") + assert fired == [] # one short of the limit: keep serving + sink.handle("/", b"{}", "application/json") + assert fired == [1] + assert len(_sink_events(capsys)) == 2 + + +# --- tunnel wiring (cloudflared faked) ---------------------------------------------- + + +class _FakeProc: + def __init__(self): + self.terminated = False + + def poll(self): + return None + + def terminate(self): + self.terminated = True + + +def _raise_interrupt(self): + raise KeyboardInterrupt + + +def _stub_tunnel(monkeypatch, tmp_path, *, url): + proc = _FakeProc() + log = tmp_path / "tunnel.log" + log.write_text("cloudflared output") + seen = {} + real_port = _free_port() + + def fake_find_free_port(preferred, **kwargs): + seen["preferred"] = preferred + return real_port + + monkeypatch.setattr("shutil.which", lambda name: "/usr/bin/cloudflared") + monkeypatch.setattr("aai_cli.init.runner.find_free_port", fake_find_free_port) + monkeypatch.setattr( + "aai_cli.init.tunnel.open_quick_tunnel", lambda port, *, cwd: (proc, url, log) + ) + # Stop immediately: the listener loop isn't under test here. + monkeypatch.setattr( + "aai_cli.webhook_listen.ThreadingHTTPServer.serve_forever", _raise_interrupt + ) + return proc, log, seen, real_port + + +@pytest.mark.allow_hosts(["127.0.0.1"]) +def test_listen_public_prints_tunnel_url_and_cleans_up(tmp_path, monkeypatch): + proc, log, seen, real_port = _stub_tunnel( + monkeypatch, tmp_path, url="https://hook-slug.trycloudflare.com" + ) + result = runner.invoke(app, ["webhooks", "listen"]) + assert result.exit_code == 0, result.output + assert seen["preferred"] == 8989 # the documented default port + assert "Listening for webhooks https://hook-slug.trycloudflare.com" in result.output + # Rich wraps the long hint line mid-token; compare with whitespace removed. + assert "--webhook-urlhttps://hook-slug.trycloudflare.com" in "".join(result.output.split()) + assert f"127.0.0.1:{real_port}" in result.output + assert proc.terminated is True + assert not log.exists() # a clean exit must not leave aai-tunnel-*.log litter + + +@pytest.mark.allow_hosts(["127.0.0.1"]) +def test_listen_tunnel_url_timeout_errors_and_keeps_the_log(tmp_path, monkeypatch): + proc, log, _seen, _port = _stub_tunnel(monkeypatch, tmp_path, url=None) + result = runner.invoke(app, ["webhooks", "listen"]) + assert result.exit_code == 1 + assert "didn't report a tunnel URL" in result.output + assert str(log) in "".join(result.output.split()) # Rich may wrap the path + assert log.exists() # kept: it's the only evidence of why the tunnel failed + assert proc.terminated is True + + +@pytest.mark.allow_hosts(["127.0.0.1"]) +def test_listen_accepts_explicit_max_events_zero(monkeypatch): + # 0 is the documented "until Ctrl-C" value; the option's floor must not reject it. + monkeypatch.setattr( + "aai_cli.webhook_listen.ThreadingHTTPServer.serve_forever", _raise_interrupt + ) + result = runner.invoke(app, ["webhooks", "listen", "--no-tunnel", "--max-events", "0"]) + assert result.exit_code == 0, result.output + assert "Listening for webhooks" in result.output + + +def test_listen_missing_cloudflared_errors_before_binding(monkeypatch): + monkeypatch.setattr("shutil.which", lambda name: None) + result = runner.invoke(app, ["webhooks", "listen"]) + assert result.exit_code == 1 + assert "cloudflared is required to expose a public webhook URL." in result.output + assert "Install it:" in result.output