From 1276bd7aa3cb4fa4b4c65abcefe239418123fbae Mon Sep 17 00:00:00 2001 From: Alex Kroman Date: Tue, 16 Jun 2026 08:02:59 -0700 Subject: [PATCH 01/12] Design spec: --llm-reduce map-reduce step for batch transcribe Co-Authored-By: Claude Opus 4.8 (1M context) --- ...6-16-llm-reduce-batch-transcribe-design.md | 105 ++++++++++++++++++ 1 file changed, 105 insertions(+) create mode 100644 docs/superpowers/specs/2026-06-16-llm-reduce-batch-transcribe-design.md diff --git a/docs/superpowers/specs/2026-06-16-llm-reduce-batch-transcribe-design.md b/docs/superpowers/specs/2026-06-16-llm-reduce-batch-transcribe-design.md new file mode 100644 index 00000000..e824e8fc --- /dev/null +++ b/docs/superpowers/specs/2026-06-16-llm-reduce-batch-transcribe-design.md @@ -0,0 +1,105 @@ +# `--llm-reduce`: map-reduce LLM step for batch transcribe + +Date: 2026-06-16 + +## Problem + +`assembly transcribe --from-stdin --llm 'PROMPT'` runs an LLM prompt over *each* +transcript (a per-source "map") and writes the result into each source's sidecar +under the `transform` key. There is no built-in way to run a single LLM prompt +over *all* of a batch's results to produce one aggregate answer. + +Piping the batch into `assembly llm` does not achieve this: batch mode writes a +Rich status table to stdout (human mode) or per-source `{type:"result"}` NDJSON +records (`--json` mode) — neither contains the transcript text or the per-source +`--llm` output, which live only in the sidecar files. So +`assembly transcribe --from-stdin --llm '...' | assembly llm 'summarize'` would +summarize a status table, not the results. + +## Goal + +Add a repeatable `--llm-reduce 'PROMPT'` flag to `assembly transcribe` that runs +one LLM Gateway call (a "reduce") over the combined per-source results and prints +the aggregate answer to stdout — making the command itself the cleanly pipeable +unit, with no second `assembly llm` needed. + +## Design + +### Concept: map-reduce + +- `--llm` (existing) = the per-source **map**: runs over each transcript. +- `--llm-reduce` (new) = the **reduce**: one chain over all sources' results. + +`--llm-reduce` is repeatable (a prompt chain, mirroring `--llm`) and reuses the +same `--model` / `--max-tokens` already carried for the map step. + +### Reduce input + +For each **completed or skipped** source, the reduce input is: + +- the **last `--llm` map step's output** if a map chain ran, otherwise +- the source's **transcript text**. + +Items are concatenated with a per-source header (`### Source: `) so the +reduce prompt can attribute findings. Failed sources are excluded. + +The combined text is sent inline via the existing `llm.run_chain(..., +transcript_text=combined)` path — the same inlining `stream --llm` already uses +(there is no single transcript id to inject server-side for an aggregate). + +### Components / data flow + +1. **`commands/transcribe.py`** — add a repeatable `--llm-reduce` Typer option in + the existing LLM help panel (terse, period-less help). Parse into the existing + `TransformOptions` via a new `reduce_prompts: tuple[str, ...]` field. +2. **`app/transcribe/batch.py`** — after `_drain`/`_summarize`, when + `reduce_prompts` is non-empty: gather reduce inputs from the sidecars the batch + already wrote, build the combined text, call `llm.run_chain(...)` once, and + print the result to **stdout** (`output.console`). +3. **Output routing** — when `--llm-reduce` is set, render the batch progress + table to **stderr** (`error_console`) instead of stdout, so stdout carries only + the reduce result and the command pipes cleanly. Gated on the flag: with no + `--llm-reduce`, existing batch output (and its snapshots) is unchanged. +4. **`--json` mode** — keep the per-source `{type:"result"}` NDJSON, then emit a + final additive `{type:"reduce", model, prompts, output}` record. Additive to + the documented "every NDJSON line carries a `type`" contract. +5. **Single source (non-batch)** — there is nothing to aggregate, so + `reduce_prompts` are appended to the `--llm` chain over the one transcript in + `app/transcribe/run.py` (effectively `prompts + reduce_prompts` run as one + chain). No error, predictable behavior. + +### End state (one command, not a pipe) + +```sh +assembly transcribe --from-stdin --concurrency 3 --speaker-labels \ + --llm 'Judge diarization quality; output JSON {speaker_count, issues, score}' \ + --llm-reduce 'Rank these videos worst-to-best and summarize the failure modes' +``` + +## Testing (to clear the gates) + +- Batch reduce: assert the reduce LLM call receives the concatenated per-source + map outputs (with headers) and the result is written to stdout. +- Reduce-input fallback: a source with no `--llm` map contributes its transcript + text. +- Single source: `--llm-reduce` appends to the `--llm` chain over the one + transcript (no aggregation path). +- `--json`: a final `{type:"reduce"}` record is emitted after the per-source + `{type:"result"}` records. +- Routing: the progress table goes to stderr only when `--llm-reduce` is set; + unchanged otherwise. +- Regenerate the `transcribe --help` snapshot; add a REFERENCE.md / README entry + (docs-consistency gate). + +Tests construct `TransformOptions` / run-path data directly (per the repo's +options/run seam) and assert behavior, so the mutation gate's changed lines are +killed by failing-on-break assertions, not mere coverage. + +## Scope / non-goals + +- Touches only `transcribe`'s own modules (`commands/transcribe.py`, + `app/transcribe/*`) plus docs/snapshots — no shared-file edits, consistent with + the additive-command convention. +- No change to the existing `--llm` map semantics or to batch output when + `--llm-reduce` is absent. +- Dependency set unchanged (`uv.lock` untouched). From 761fd531f8bfa2207854d9c251b1907ce4ab9807 Mon Sep 17 00:00:00 2001 From: Alex Kroman Date: Tue, 16 Jun 2026 08:11:04 -0700 Subject: [PATCH 02/12] Plan: --llm-reduce map-reduce step for batch transcribe Co-Authored-By: Claude Opus 4.8 (1M context) --- .../plans/2026-06-16-llm-reduce.md | 619 ++++++++++++++++++ 1 file changed, 619 insertions(+) create mode 100644 docs/superpowers/plans/2026-06-16-llm-reduce.md diff --git a/docs/superpowers/plans/2026-06-16-llm-reduce.md b/docs/superpowers/plans/2026-06-16-llm-reduce.md new file mode 100644 index 00000000..28676c4c --- /dev/null +++ b/docs/superpowers/plans/2026-06-16-llm-reduce.md @@ -0,0 +1,619 @@ +# `--llm-reduce` Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Add a repeatable `--llm-reduce 'PROMPT'` flag to `assembly transcribe` that runs one LLM-Gateway call (a "reduce") over all of a batch's per-source results and prints the aggregate answer to stdout. + +**Architecture:** `--llm` is the per-source *map* (existing); `--llm-reduce` is the *reduce*. In batch mode, after every source is transcribed, the reduce step concatenates each source's last `--llm` output (or its transcript text if no `--llm` ran) and runs the reduce prompt chain over the combined text via the existing `llm.run_chain(..., transcript_text=...)` path. When `--llm-reduce` is set, the progress table is routed to stderr so stdout carries only the reduce result. In single-source mode there is nothing to aggregate, so the reduce prompts are appended to the `--llm` chain over the one transcript. + +**Tech Stack:** Python 3.12+, Typer, the `assemblyai` SDK, the OpenAI-compatible LLM Gateway, pytest + pytest-mock, syrupy snapshots. All tooling runs via `uv run`. + +--- + +## File structure + +- `aai_cli/app/transcribe/run.py` — add `reduce_prompts` to `TransformOptions`, a `chain()` helper, `llm_reduce` to `TranscribeOptions`, wire `transform_options()`, single-source delivery, show-code, and `--out` validation. +- `aai_cli/commands/transcribe.py` — add the `--llm-reduce` Typer option and pass it into `TranscribeOptions`. +- `aai_cli/app/transcribe/batch.py` — add `_reduce_input`, `_gather_reduce_inputs`, `_run_reduce`; route the progress table to stderr when reduce is active; call `_run_reduce` after a successful batch. +- `tests/test_transcribe_reduce.py` — new, self-contained test module for all `--llm-reduce` behavior (no edits to shared test files). +- `REFERENCE.md`, `README.md`, `tests/__snapshots__/` — docs + regenerated `transcribe --help` golden. + +Run the targeted tests with `uv run pytest tests/test_transcribe_reduce.py -q` during development; run `./scripts/check.sh` once at the end. + +--- + +### Task 1: Plumb the `--llm-reduce` flag as data + +**Files:** +- Modify: `aai_cli/app/transcribe/run.py` (`TransformOptions`, `TranscribeOptions`, `transform_options`) +- Modify: `aai_cli/commands/transcribe.py` (option + construction) +- Test: `tests/test_transcribe_reduce.py` + +- [ ] **Step 1: Write the failing test** + +Create `tests/test_transcribe_reduce.py`: + +```python +"""`assembly transcribe --llm-reduce`: the batch map-reduce step, single-source +chain behavior, and output routing.""" + +import json + +import pytest +from typer.testing import CliRunner + +from aai_cli.app.transcribe import batch as transcribe_batch +from aai_cli.app.transcribe import run as transcribe_run +from aai_cli.core import config +from aai_cli.main import app + +runner = CliRunner() + +_TRANSCRIBE = "aai_cli.app.transcribe.run.client.transcribe" +_TRANSFORM = "aai_cli.core.llm.transform_transcript" + + +@pytest.fixture(autouse=True) +def workdir(tmp_path, monkeypatch): + monkeypatch.chdir(tmp_path) + + +def _auth(): + config.set_api_key("default", "sk_live") + + +def _defaults(**overrides): + """A minimal TranscribeOptions for seam tests; override only what matters.""" + base = dict( + source=None, sample=False, from_stdin=False, concurrency=2, force=False, + speech_model=None, language_code=None, language_detection=None, + keyterms_prompt=None, temperature=None, prompt=None, punctuate=None, + format_text=None, disfluencies=None, speaker_labels=False, + speakers_expected=None, multichannel=None, redact_pii=None, + redact_pii_policy=None, redact_pii_sub=None, redact_pii_audio=None, + filter_profanity=None, content_safety=None, content_safety_confidence=None, + speech_threshold=None, summarization=None, summary_model=None, + summary_type=None, auto_chapters=None, sentiment_analysis=None, + entity_detection=None, auto_highlights=None, topic_detection=None, + word_boost=None, custom_spelling_file=None, audio_start=None, + audio_end=None, download_sections=None, webhook_url=None, + webhook_auth_header=None, translate_to=None, config_kv=None, + config_file=None, llm_prompt=None, model="claude-haiku-4-5-20251001", + max_tokens=1000, output_field=None, chars_per_caption=None, out=None, + show_code=False, llm_reduce=None, + ) + base.update(overrides) + return transcribe_run.TranscribeOptions(**base) + + +def test_transform_options_carries_reduce_prompts(): + opts = _defaults(llm_prompt=["judge"], llm_reduce=["rank", "summarize"]) + transform = opts.transform_options() + assert transform.prompts == ["judge"] + assert transform.reduce_prompts == ["rank", "summarize"] + + +def test_chain_appends_reduce_to_map(): + transform = transcribe_run.TransformOptions( + prompts=["a"], model="m", max_tokens=10, reduce_prompts=["b"] + ) + assert transform.chain() == ["a", "b"] +``` + +- [ ] **Step 2: Run the tests to verify they fail** + +Run: `uv run pytest tests/test_transcribe_reduce.py -q` +Expected: FAIL — `TranscribeOptions` has no `llm_reduce`, `TransformOptions` has no `reduce_prompts`/`chain`. + +- [ ] **Step 3: Extend `TransformOptions` in `aai_cli/app/transcribe/run.py`** + +Replace the `TransformOptions` class (currently lines 97-102) with: + +```python +class TransformOptions(NamedTuple): + """The ``--llm`` chain options: the prompts plus the gateway model settings. + + ``reduce_prompts`` is the ``--llm-reduce`` chain — the aggregate step run over + all batch results (or appended to the per-transcript chain for a single source). + """ + + prompts: list[str] + model: str + max_tokens: int + reduce_prompts: list[str] + + def chain(self) -> list[str]: + """The full single-source chain: the map prompts followed by the reduce ones. + + With one source there is nothing to aggregate, so the reduce prompts simply + extend the ``--llm`` chain over that transcript. + """ + return self.prompts + self.reduce_prompts +``` + +- [ ] **Step 4: Update `transform_options()` and add the `TranscribeOptions` field** + +In `aai_cli/app/transcribe/run.py`, add the field to the `TranscribeOptions` dataclass right after `llm_prompt` (line 215): + +```python + llm_prompt: list[str] | None + llm_reduce: list[str] | None + model: str + max_tokens: int +``` + +And update `transform_options()` (lines 272-276) to: + +```python + def transform_options(self) -> TransformOptions: + """The post-transcription LLM transform spec built from the `--llm` flags.""" + return TransformOptions( + prompts=list(self.llm_prompt or []), + model=self.model, + max_tokens=self.max_tokens, + reduce_prompts=list(self.llm_reduce or []), + ) +``` + +- [ ] **Step 5: Check for other `TransformOptions(` constructors** + +Run: `grep -rn "TransformOptions(" aai_cli tests` +Expected: only `transform_options()` (just updated) and the test from Step 1 (which already passes `reduce_prompts`). If any other source constructor exists, add `reduce_prompts=[]` to it. + +- [ ] **Step 6: Add the Typer option in `aai_cli/commands/transcribe.py`** + +Insert immediately after the `llm_prompt` option block (after line 312, before `model`): + +```python + llm_reduce: list[str] | None = typer.Option( + None, + "--llm-reduce", + help="Run one LLM-Gateway prompt over all batch results (a reduce). " + "Repeatable: each runs on the previous one's output. For a single source it " + "extends the --llm chain over that transcript.", + rich_help_panel=help_panels.OPT_LLM, + ), +``` + +And pass it into the `TranscribeOptions(...)` construction, right after `llm_prompt=llm_prompt,` (line 412): + +```python + llm_prompt=llm_prompt, + llm_reduce=llm_reduce, + model=model, +``` + +- [ ] **Step 7: Run the tests to verify they pass** + +Run: `uv run pytest tests/test_transcribe_reduce.py -q` +Expected: PASS (2 tests). + +- [ ] **Step 8: Commit** + +```bash +git add aai_cli/app/transcribe/run.py aai_cli/commands/transcribe.py tests/test_transcribe_reduce.py +git commit -m "feat(transcribe): plumb --llm-reduce flag as data" +``` + +--- + +### Task 2: Single-source reduce (append to the `--llm` chain) + +**Files:** +- Modify: `aai_cli/app/transcribe/run.py` (`deliver_result`, `_print_show_code`, `run_transcribe` validation) +- Test: `tests/test_transcribe_reduce.py` + +- [ ] **Step 1: Write the failing test** + +Append to `tests/test_transcribe_reduce.py`: + +```python +def test_single_source_runs_reduce_as_chain_step(mocker): + _auth() + mocker.patch(_TRANSCRIBE, return_value=mocker.MagicMock( + id="t1", text="hello", status="completed", + json_response={"id": "t1", "text": "hello", "status": "completed"}, + )) + transform = mocker.patch(_TRANSFORM, side_effect=["mapped", "reduced"]) + result = runner.invoke(app, ["transcribe", "--sample", "--llm", "map", "--llm-reduce", "red"]) + assert result.exit_code == 0, result.output + # Two chain steps ran: --llm then --llm-reduce, over the one transcript. + assert transform.call_count == 2 + assert "reduced" in result.output +``` + +- [ ] **Step 2: Run it to verify it fails** + +Run: `uv run pytest tests/test_transcribe_reduce.py::test_single_source_runs_reduce_as_chain_step -q` +Expected: FAIL — only one chain step runs (`--llm-reduce` is ignored in single-source mode). + +- [ ] **Step 3: Use the combined chain in `deliver_result`** + +In `aai_cli/app/transcribe/run.py`, replace the `if transform.prompts:` branch (lines 140-156) with: + +```python + chain = transform.chain() + if chain: + # Chain the prompts: the first runs over the transcript (injected server-side + # via transcript_id); each subsequent prompt runs over the prior response. + # --llm-reduce prompts extend the chain here — a single source has nothing to + # aggregate, so reduce is just more chain steps over this one transcript. + steps = llm.run_chain_steps( + api_key, + chain, + transcript_id=transcript.id, + model=transform.model, + max_tokens=transform.max_tokens, + ) + output.emit( + client.transcript_summary(transcript) + | {"transform": {"model": transform.model, "steps": steps}}, + render_transform_steps, + json_mode=json_mode, + ) + return +``` + +- [ ] **Step 4: Include reduce prompts in `--out` validation and `--show-code`** + +In `run_transcribe` (line 320), change the `--out`/`--llm` guard to also cover reduce: + +```python + transcribe_validate.validate_out_with_llm( + opts.out, (opts.llm_prompt or []) + (opts.llm_reduce or []) or None + ) +``` + +In `_print_show_code` (line 295), include reduce prompts in the generated gateway chain: + +```python + gateway = code_gen.gateway_options( + list(opts.llm_prompt or []) + list(opts.llm_reduce or []), opts.model, opts.max_tokens + ) +``` + +- [ ] **Step 5: Run the test to verify it passes** + +Run: `uv run pytest tests/test_transcribe_reduce.py -q` +Expected: PASS (3 tests). + +- [ ] **Step 6: Commit** + +```bash +git add aai_cli/app/transcribe/run.py tests/test_transcribe_reduce.py +git commit -m "feat(transcribe): single-source --llm-reduce extends the chain" +``` + +--- + +### Task 3: Batch reduce (gather, run, route table to stderr) + +**Files:** +- Modify: `aai_cli/app/transcribe/batch.py` +- Test: `tests/test_transcribe_reduce.py` + +- [ ] **Step 1: Write the failing tests** + +Append to `tests/test_transcribe_reduce.py`: + +```python +def _fake_transcript(mocker, source): + t = mocker.MagicMock() + t.id = f"t_{source}" + t.text = f"text of {source}" + t.status = "completed" + t.json_response = {"id": t.id, "text": t.text, "status": "completed"} + return t + + +def _ndjson(result): + return [json.loads(line) for line in result.output.splitlines() if line.startswith("{")] + + +def test_batch_reduce_feeds_map_outputs(mocker, monkeypatch): + _auth() + monkeypatch.setattr(_TRANSCRIBE, lambda api_key, audio, *, config: _fake_transcript(mocker, audio)) + # map step returns one output per source; reduce returns the final answer. + mocker.patch(_TRANSFORM, side_effect=["JUDGED a", "JUDGED b", "FINAL"]) + captured = {} + + real_run_chain = transcribe_batch.llm.run_chain + + def spy(api_key, prompts, *, transcript_text, model, max_tokens): + captured["text"] = transcript_text + captured["prompts"] = prompts + return "FINAL" + + monkeypatch.setattr(transcribe_batch.llm, "run_chain", spy) + result = runner.invoke( + app, + ["transcribe", "--from-stdin", "--llm", "judge", "--llm-reduce", "rank"], + input="https://a\nhttps://b\n", + ) + assert result.exit_code == 0, result.output + # Reduce saw both sources' map outputs, each under a source header. + assert "### Source: https://a" in captured["text"] + assert "JUDGED a" in captured["text"] and "JUDGED b" in captured["text"] + assert captured["prompts"] == ["rank"] + assert "FINAL" in result.output + + +def test_batch_reduce_falls_back_to_transcript_text(mocker, monkeypatch): + _auth() + monkeypatch.setattr(_TRANSCRIBE, lambda api_key, audio, *, config: _fake_transcript(mocker, audio)) + captured = {} + + def spy(api_key, prompts, *, transcript_text, model, max_tokens): + captured["text"] = transcript_text + return "FINAL" + + monkeypatch.setattr(transcribe_batch.llm, "run_chain", spy) + result = runner.invoke( + app, + ["transcribe", "--from-stdin", "--llm-reduce", "summarize"], + input="https://a\n", + ) + assert result.exit_code == 0, result.output + # No --llm map ran, so the transcript text is fed to the reduce. + assert "text of https://a" in captured["text"] + + +def test_batch_reduce_emits_json_record(mocker, monkeypatch): + _auth() + monkeypatch.setattr(_TRANSCRIBE, lambda api_key, audio, *, config: _fake_transcript(mocker, audio)) + monkeypatch.setattr( + transcribe_batch.llm, "run_chain", + lambda api_key, prompts, *, transcript_text, model, max_tokens: "FINAL", + ) + result = runner.invoke( + app, + ["transcribe", "--from-stdin", "--llm-reduce", "summarize", "--json"], + input="https://a\n", + ) + assert result.exit_code == 0, result.output + records = _ndjson(result) + reduce_records = [r for r in records if r.get("type") == "reduce"] + assert len(reduce_records) == 1 + assert reduce_records[0]["output"] == "FINAL" + assert reduce_records[0]["prompts"] == ["summarize"] +``` + +- [ ] **Step 2: Run them to verify they fail** + +Run: `uv run pytest tests/test_transcribe_reduce.py -q` +Expected: FAIL — `--llm-reduce` produces no reduce call / record in batch mode. + +- [ ] **Step 3: Add the reduce helpers in `aai_cli/app/transcribe/batch.py`** + +Add these functions above `run_batch` (after `_summarize`, line 296): + +```python +def _reduce_input(record: dict[str, object]) -> str: + """A source's contribution to the reduce: its last --llm output, else its text.""" + transform = jsonshape.as_mapping(record.get("transform")) + if transform is not None: + steps = transform.get("steps") + if isinstance(steps, list) and steps: + last = jsonshape.as_mapping(steps[-1]) + if last is not None: + return str(last.get("output", "") or "") + transcript = jsonshape.as_mapping(record.get("transcript")) + if transcript is not None: + return str(transcript.get("text", "") or "") + return "" + + +def _gather_reduce_inputs(items: list[_Item]) -> str: + """Concatenate each completed/skipped source's reduce input under a header.""" + blocks: list[str] = [] + for item in items: + if item.status not in ("completed", "skipped"): + continue + record = resumable_record(sidecar_path(item.source), digest=None) + text = _reduce_input(record) if record is not None else "" + if text: + blocks.append(f"### Source: {item.source}\n{text}") + return "\n\n".join(blocks) + + +def _run_reduce( + api_key: str, + items: list[_Item], + *, + transform: transcribe_exec.TransformOptions, + json_mode: bool, +) -> None: + """Run the --llm-reduce chain once over every source's result; print to stdout.""" + combined = _gather_reduce_inputs(items) + result = llm.run_chain( + api_key, + transform.reduce_prompts, + transcript_text=combined, + model=transform.model, + max_tokens=transform.max_tokens, + ) + if json_mode: + # Additive NDJSON event after the per-source {"type":"result"} records. + output.emit_ndjson( + { + "type": "reduce", + "model": transform.model, + "prompts": transform.reduce_prompts, + "output": result, + } + ) + else: + output.emit_text(result) +``` + +- [ ] **Step 4: Route the table to stderr and call the reduce in `run_batch`** + +Change `_progress_table` (line 230) to accept a `reduce_active` flag and pick the console: + +```python +@contextmanager +def _progress_table(items: list[_Item], *, json_mode: bool, reduce_active: bool = False) -> Generator[None]: + """Render the batch as a live-updating table (human mode). + + Rich renders nothing while running on a non-interactive console and prints the + final frame once on stop, so piped/agent runs still get the result table. JSON + mode skips Rich entirely — NDJSON per source is the output. When a --llm-reduce + step will print the aggregate to stdout, the table goes to stderr so stdout + carries only the reduce result. + """ + if json_mode: + yield + return + console = output.error_console if reduce_active else output.console + with Live( + get_renderable=lambda: _render_table(items), + console=console, + refresh_per_second=4, # pragma: no mutate (cosmetic refresh cadence) + ): + yield +``` + +Update `run_batch` (lines 314-325) to: + +```python + items = [_Item(source) for source in sources] + reduce_active = bool(transform.reduce_prompts) + with _progress_table(items, json_mode=json_mode, reduce_active=reduce_active): + _drain( + api_key, + items, + transcription_config=transcription_config, + concurrency=concurrency, + force=force, + transform=transform, + json_mode=json_mode, + ) + _summarize(items, json_mode=json_mode, quiet=quiet) + if reduce_active: + _run_reduce(api_key, items, transform=transform, json_mode=json_mode) +``` + +- [ ] **Step 5: Run the tests to verify they pass** + +Run: `uv run pytest tests/test_transcribe_reduce.py -q` +Expected: PASS (6 tests). + +- [ ] **Step 6: Add a routing test (table on stderr, result on stdout)** + +Append to `tests/test_transcribe_reduce.py`: + +```python +def test_batch_reduce_routes_table_to_stderr(mocker, monkeypatch, capsys): + """run_batch sends the reduce result to stdout and the progress table to stderr.""" + import assemblyai as aai + + _auth() + monkeypatch.setattr(_TRANSCRIBE, lambda api_key, audio, *, config: _fake_transcript(mocker, audio)) + monkeypatch.setattr( + transcribe_batch.llm, "run_chain", + lambda api_key, prompts, *, transcript_text, model, max_tokens: "AGGREGATE", + ) + transform = transcribe_run.TransformOptions( + prompts=[], model="m", max_tokens=10, reduce_prompts=["summarize"] + ) + transcribe_batch.run_batch( + "sk_live", + ["https://a"], + transcription_config=aai.TranscriptionConfig(), + concurrency=1, + force=False, + transform=transform, + json_mode=False, + quiet=False, + ) + out, err = capsys.readouterr() + assert "AGGREGATE" in out # reduce result → stdout + assert "AGGREGATE" not in err + assert "https://a" in err # progress table → stderr +``` + +- [ ] **Step 7: Run it to verify it passes** + +Run: `uv run pytest tests/test_transcribe_reduce.py::test_batch_reduce_routes_table_to_stderr -q` +Expected: PASS. + +- [ ] **Step 8: Commit** + +```bash +git add aai_cli/app/transcribe/batch.py tests/test_transcribe_reduce.py +git commit -m "feat(transcribe): batch --llm-reduce aggregates results to stdout" +``` + +--- + +### Task 4: Docs + help snapshot + +**Files:** +- Modify: `REFERENCE.md`, `README.md` +- Modify: `tests/__snapshots__/` (regenerated, not hand-edited) + +- [ ] **Step 1: Locate the transcribe flag + NDJSON sections in REFERENCE.md** + +Run: `grep -n "\-\-llm\b\|\"type\"\|result.*sidecar\|## .*transcribe" REFERENCE.md` +Read the surrounding lines to match the existing format. + +- [ ] **Step 2: Document `--llm-reduce` and the `reduce` event in REFERENCE.md** + +Add a `--llm-reduce` entry beside the existing `--llm` documentation, worded: "Run one LLM-Gateway prompt over all batch results (a reduce); repeatable. For a single source it extends the `--llm` chain." Where the NDJSON `type` values are listed, add `reduce` — `{"type":"reduce","model","prompts","output"}`, emitted once after the per-source `result` records when `--llm-reduce` is set. + +- [ ] **Step 3: Update the README example** + +In `README.md`, find the "Score diarization quality across several videos" example (added earlier) and extend it to show the reduce step, replacing the trailing `--llm` line so the block ends with: + +```sh +| assembly transcribe --from-stdin --concurrency 3 --speaker-labels \ + --llm 'Judge diarization quality; output JSON {speaker_count, issues, score}' \ + --llm-reduce 'Rank these videos worst-to-best and summarize the failure modes' +``` + +Update the prose to mention that `--llm-reduce` runs one prompt over all results. + +- [ ] **Step 4: Regenerate the `transcribe --help` snapshot** + +Run: `uv run pytest tests/ -k "snapshot and transcribe" --snapshot-update -q` +Then inspect the diff: `git diff tests/__snapshots__/` — it must show only the new `--llm-reduce` help line. Never hand-edit `.ambr` files. + +- [ ] **Step 5: Verify docs + snapshots** + +Run: `uv run pytest tests/ -k "snapshot and transcribe" -q && uv run python scripts/docs_consistency_gate.py` +Expected: PASS / no consistency errors. (If `scripts/docs_consistency_gate.py` fails to start under the sandbox with an EPERM from safe-chain, re-run the single command with the sandbox disabled.) + +- [ ] **Step 6: Commit** + +```bash +git add REFERENCE.md README.md tests/__snapshots__ +git commit -m "docs(transcribe): document --llm-reduce and the reduce NDJSON event" +``` + +--- + +### Task 5: Full gate + +- [ ] **Step 1: Run the authoritative gate** + +Run: `./scripts/check.sh` +Expected: it prints `All checks passed.` This enforces lint, types, vulture/deptry/import-linter, **100% patch coverage vs origin/main**, the **diff-scoped mutation gate**, the "no new escape hatches" gate, CodeQL, and the build. Do not claim done until it prints that line. + +- [ ] **Step 2: Fix and re-run as needed** + +If patch coverage flags an uncovered line, add an assertion that would fail if that line broke (not just a call). If a mutant survives on a changed line, add the assertion that kills it. Re-run `./scripts/check.sh` after any edit (the commit-gate hook requires a passing run for the current tree). + +- [ ] **Step 3: Final commit (if the gate produced fixups)** + +```bash +git add -A +git commit -m "test(transcribe): close coverage/mutation gaps for --llm-reduce" +``` + +--- + +## Self-review + +- **Spec coverage:** flag + repeatable chain (Task 1); reduce-input = last map output else transcript text (Task 3 `_reduce_input`); concatenation with `### Source:` headers (Task 3 `_gather_reduce_inputs`); reduce via `llm.run_chain` inline text (Task 3 `_run_reduce`); table→stderr routing gated on the flag (Task 3 `_progress_table`); additive `{type:"reduce"}` NDJSON (Task 3); single-source = append to chain (Task 2); docs + snapshot (Task 4); gates (Task 5). All spec sections map to a task. +- **Placeholder scan:** no TBD/TODO; every code step shows complete code; the only "find the spot" steps (Task 4 REFERENCE.md) give exact grep commands because the file's layout isn't quoted here. +- **Type consistency:** `reduce_prompts: list[str]` and `chain()` defined in Task 1 are the names used in Tasks 2-3; `_run_reduce`/`_gather_reduce_inputs`/`_reduce_input` signatures match their call sites; `run_chain(..., transcript_text=...)` matches `aai_cli/core/llm.py`. From e1d73729a8bca14fc22f3dafa2d170f0502f2d0d Mon Sep 17 00:00:00 2001 From: Alex Kroman Date: Tue, 16 Jun 2026 08:13:52 -0700 Subject: [PATCH 03/12] feat(transcribe): plumb --llm-reduce flag as data Co-Authored-By: Claude Opus 4.8 (1M context) --- aai_cli/app/transcribe/run.py | 21 ++++++++- aai_cli/commands/transcribe.py | 9 ++++ tests/test_transcribe_reduce.py | 84 +++++++++++++++++++++++++++++++++ 3 files changed, 112 insertions(+), 2 deletions(-) create mode 100644 tests/test_transcribe_reduce.py diff --git a/aai_cli/app/transcribe/run.py b/aai_cli/app/transcribe/run.py index 4b1d5c97..197d7f5d 100644 --- a/aai_cli/app/transcribe/run.py +++ b/aai_cli/app/transcribe/run.py @@ -95,11 +95,24 @@ def run_transcription( class TransformOptions(NamedTuple): - """The ``--llm`` chain options: the prompts plus the gateway model settings.""" + """The ``--llm`` chain options: the prompts plus the gateway model settings. + + ``reduce_prompts`` is the ``--llm-reduce`` chain — the aggregate step run over + all batch results (or appended to the per-transcript chain for a single source). + """ prompts: list[str] model: str max_tokens: int + reduce_prompts: list[str] + + def chain(self) -> list[str]: + """The full single-source chain: the map prompts followed by the reduce ones. + + With one source there is nothing to aggregate, so the reduce prompts simply + extend the ``--llm`` chain over that transcript. + """ + return self.prompts + self.reduce_prompts def deliver_result( @@ -213,6 +226,7 @@ class TranscribeOptions: config_kv: list[str] | None config_file: Path | None llm_prompt: list[str] | None + llm_reduce: list[str] | None model: str max_tokens: int output_field: choices.TranscriptOutput | None @@ -272,7 +286,10 @@ def flags(self, pii_policies: list[str] | None) -> dict[str, object]: def transform_options(self) -> TransformOptions: """The post-transcription LLM transform spec built from the `--llm` flags.""" return TransformOptions( - prompts=list(self.llm_prompt or []), model=self.model, max_tokens=self.max_tokens + prompts=list(self.llm_prompt or []), + model=self.model, + max_tokens=self.max_tokens, + reduce_prompts=list(self.llm_reduce or []), ) diff --git a/aai_cli/commands/transcribe.py b/aai_cli/commands/transcribe.py index 20633476..83f8365b 100644 --- a/aai_cli/commands/transcribe.py +++ b/aai_cli/commands/transcribe.py @@ -310,6 +310,14 @@ def transcribe( "prompt runs on the previous one's response (a chain), the first on the transcript.", rich_help_panel=help_panels.OPT_LLM, ), + llm_reduce: list[str] | None = typer.Option( + None, + "--llm-reduce", + help="Run one LLM-Gateway prompt over all batch results (a reduce). " + "Repeatable: each runs on the previous one's output. For a single source it " + "extends the --llm chain over that transcript.", + rich_help_panel=help_panels.OPT_LLM, + ), model: str = typer.Option( llm.DEFAULT_MODEL, "--model", @@ -410,6 +418,7 @@ def transcribe( config_kv=config_kv, config_file=config_file, llm_prompt=llm_prompt, + llm_reduce=llm_reduce, model=model, max_tokens=max_tokens, output_field=output_field, diff --git a/tests/test_transcribe_reduce.py b/tests/test_transcribe_reduce.py new file mode 100644 index 00000000..789b654c --- /dev/null +++ b/tests/test_transcribe_reduce.py @@ -0,0 +1,84 @@ +"""`assembly transcribe --llm-reduce`: the map-reduce LLM step. + +Task 1 covers the data plumbing (the flag and TransformOptions); later tasks add +the single-source chain and batch-reduce behavior tests to this file. +""" + +from __future__ import annotations + +import dataclasses + +from aai_cli.app.transcribe import run as transcribe_run + +_DEFAULT_OPTS = transcribe_run.TranscribeOptions( + source=None, + sample=False, + from_stdin=False, + concurrency=2, + force=False, + speech_model=None, + language_code=None, + language_detection=None, + keyterms_prompt=None, + temperature=None, + prompt=None, + punctuate=None, + format_text=None, + disfluencies=None, + speaker_labels=False, + speakers_expected=None, + multichannel=None, + redact_pii=None, + redact_pii_policy=None, + redact_pii_sub=None, + redact_pii_audio=None, + filter_profanity=None, + content_safety=None, + content_safety_confidence=None, + speech_threshold=None, + summarization=None, + summary_model=None, + summary_type=None, + auto_chapters=None, + sentiment_analysis=None, + entity_detection=None, + auto_highlights=None, + topic_detection=None, + word_boost=None, + custom_spelling_file=None, + audio_start=None, + audio_end=None, + download_sections=None, + webhook_url=None, + webhook_auth_header=None, + translate_to=None, + config_kv=None, + config_file=None, + llm_prompt=None, + llm_reduce=None, + model="claude-haiku-4-5-20251001", + max_tokens=1000, + output_field=None, + chars_per_caption=None, + out=None, + show_code=False, +) + + +def _defaults(**overrides: object) -> transcribe_run.TranscribeOptions: + """A minimal TranscribeOptions for seam tests; override only what matters.""" + return dataclasses.replace(_DEFAULT_OPTS, **overrides) + + +def test_transform_options_carries_reduce_prompts() -> None: + opts = _defaults(llm_prompt=["judge"], llm_reduce=["rank", "summarize"]) + transform = opts.transform_options() + assert transform.prompts == ["judge"] + assert transform.reduce_prompts == ["rank", "summarize"] + + +def test_chain_appends_reduce_to_map() -> None: + transform = transcribe_run.TransformOptions( + prompts=["a"], model="m", max_tokens=10, reduce_prompts=["b"] + ) + assert transform.chain() == ["a", "b"] From 4c950302210c6283983025ab5d23aade14b8c17e Mon Sep 17 00:00:00 2001 From: Alex Kroman Date: Tue, 16 Jun 2026 08:23:56 -0700 Subject: [PATCH 04/12] feat(transcribe): single-source --llm-reduce extends the chain Co-Authored-By: Claude Opus 4.8 (1M context) --- aai_cli/app/transcribe/run.py | 15 +++++++++---- tests/test_transcribe_reduce.py | 40 +++++++++++++++++++++++++++++++++ 2 files changed, 51 insertions(+), 4 deletions(-) diff --git a/aai_cli/app/transcribe/run.py b/aai_cli/app/transcribe/run.py index 197d7f5d..f02d8358 100644 --- a/aai_cli/app/transcribe/run.py +++ b/aai_cli/app/transcribe/run.py @@ -150,12 +150,15 @@ def deliver_result( ) return - if transform.prompts: + chain = transform.chain() + if chain: # Chain the prompts: the first runs over the transcript (injected server-side # via transcript_id); each subsequent prompt runs over the prior response. + # --llm-reduce prompts extend the chain here — a single source has nothing to + # aggregate, so reduce is just more chain steps over this one transcript. steps = llm.run_chain_steps( api_key, - transform.prompts, + chain, transcript_id=transcript.id, model=transform.model, max_tokens=transform.max_tokens, @@ -309,7 +312,9 @@ def _print_show_code(opts: TranscribeOptions, merged: dict[str, object]) -> None if opts.source or opts.sample else "your-audio-file.mp3" ) - gateway = code_gen.gateway_options(list(opts.llm_prompt or []), opts.model, opts.max_tokens) + gateway = code_gen.gateway_options( + list(opts.llm_prompt or []) + list(opts.llm_reduce or []), opts.model, opts.max_tokens + ) output.print_code( render_transcribe_code( merged, @@ -334,7 +339,9 @@ def run_transcribe(opts: TranscribeOptions, state: AppState, *, json_mode: bool) transcribe_validate.validate_pii_policies(pii_policies) flags = opts.flags(pii_policies) - transcribe_validate.validate_out_with_llm(opts.out, opts.llm_prompt) + transcribe_validate.validate_out_with_llm( + opts.out, (opts.llm_prompt or []) + (opts.llm_reduce or []) or None + ) transcribe_validate.validate_out_path(opts.out) transcribe_validate.validate_json_with_output(opts.output_field, json_mode=json_mode) client.validate_chars_per_caption(opts.chars_per_caption, opts.output_field) diff --git a/tests/test_transcribe_reduce.py b/tests/test_transcribe_reduce.py index 789b654c..c3061afc 100644 --- a/tests/test_transcribe_reduce.py +++ b/tests/test_transcribe_reduce.py @@ -8,7 +8,28 @@ import dataclasses +import pytest +from typer.testing import CliRunner + from aai_cli.app.transcribe import run as transcribe_run +from aai_cli.core import config +from aai_cli.main import app + +runner = CliRunner() + +_TRANSCRIBE = "aai_cli.app.transcribe.run.client.transcribe" +_TRANSFORM = "aai_cli.core.llm.transform_transcript" + + +@pytest.fixture(autouse=True) +def workdir(tmp_path, monkeypatch): + # Batch sources and sidecars resolve relative to cwd; isolate each test. + monkeypatch.chdir(tmp_path) + + +def _auth() -> None: + config.set_api_key("default", "sk_live") + _DEFAULT_OPTS = transcribe_run.TranscribeOptions( source=None, @@ -82,3 +103,22 @@ def test_chain_appends_reduce_to_map() -> None: prompts=["a"], model="m", max_tokens=10, reduce_prompts=["b"] ) assert transform.chain() == ["a", "b"] + + +def test_single_source_runs_reduce_as_chain_step(mocker): + _auth() + mocker.patch( + _TRANSCRIBE, + return_value=mocker.MagicMock( + id="t1", + text="hello", + status="completed", + json_response={"id": "t1", "text": "hello", "status": "completed"}, + ), + ) + transform = mocker.patch(_TRANSFORM, side_effect=["mapped", "reduced"]) + result = runner.invoke(app, ["transcribe", "--sample", "--llm", "map", "--llm-reduce", "red"]) + assert result.exit_code == 0, result.output + # Two chain steps ran: --llm then --llm-reduce, over the one transcript. + assert transform.call_count == 2 + assert "reduced" in result.output From 55cde54cfc3d4a77d6d7fd0c561a2e8e01c309b4 Mon Sep 17 00:00:00 2001 From: Alex Kroman Date: Tue, 16 Jun 2026 08:29:49 -0700 Subject: [PATCH 05/12] feat(transcribe): batch --llm-reduce aggregates results to stdout Co-Authored-By: Claude Opus 4.8 (1M context) --- aai_cli/app/transcribe/batch.py | 72 ++++++++++++++++++-- tests/test_transcribe_reduce.py | 117 ++++++++++++++++++++++++++++++++ 2 files changed, 185 insertions(+), 4 deletions(-) diff --git a/aai_cli/app/transcribe/batch.py b/aai_cli/app/transcribe/batch.py index 0e4124ee..c547c29e 100644 --- a/aai_cli/app/transcribe/batch.py +++ b/aai_cli/app/transcribe/batch.py @@ -227,19 +227,24 @@ def _render_table(items: list[_Item]) -> Table: @contextmanager -def _progress_table(items: list[_Item], *, json_mode: bool) -> Generator[None]: +def _progress_table( + items: list[_Item], *, json_mode: bool, reduce_active: bool = False +) -> Generator[None]: """Render the batch as a live-updating table (human mode). Rich renders nothing while running on a non-interactive console and prints the final frame once on stop, so piped/agent runs still get the result table. JSON - mode skips Rich entirely — NDJSON per source is the output. + mode skips Rich entirely — NDJSON per source is the output. When a --llm-reduce + step will print the aggregate to stdout, the table goes to stderr so stdout + carries only the reduce result. """ if json_mode: yield return + console = output.error_console if reduce_active else output.console with Live( get_renderable=lambda: _render_table(items), - console=output.console, + console=console, refresh_per_second=4, # pragma: no mutate (cosmetic refresh cadence) ): yield @@ -295,6 +300,62 @@ def _summarize(items: list[_Item], *, json_mode: bool, quiet: bool) -> None: output.error_console.print(output.success(f"Transcribed {completed}, skipped {skipped}.")) +def _reduce_input(record: dict[str, object]) -> str: + """A source's contribution to the reduce: its last --llm output, else its text.""" + transform = jsonshape.as_mapping(record.get("transform")) + if transform is not None: + steps = jsonshape.mapping_list(transform.get("steps")) + if steps: + return str(steps[-1].get("output", "") or "") + transcript = jsonshape.as_mapping(record.get("transcript")) + if transcript is not None: + return str(transcript.get("text", "") or "") + return "" + + +def _gather_reduce_inputs(items: list[_Item]) -> str: + """Concatenate each completed/skipped source's reduce input under a header.""" + blocks: list[str] = [] + for item in items: + if item.status not in ("completed", "skipped"): + continue + record = resumable_record(sidecar_path(item.source), digest=None) + text = _reduce_input(record) if record is not None else "" + if text: + blocks.append(f"### Source: {item.source}\n{text}") + return "\n\n".join(blocks) + + +def _run_reduce( + api_key: str, + items: list[_Item], + *, + transform: transcribe_exec.TransformOptions, + json_mode: bool, +) -> None: + """Run the --llm-reduce chain once over every source's result; print to stdout.""" + combined = _gather_reduce_inputs(items) + result = llm.run_chain( + api_key, + transform.reduce_prompts, + transcript_text=combined, + model=transform.model, + max_tokens=transform.max_tokens, + ) + if json_mode: + # Additive NDJSON event after the per-source {"type":"result"} records. + output.emit_ndjson( + { + "type": "reduce", + "model": transform.model, + "prompts": transform.reduce_prompts, + "output": result, + } + ) + else: + output.emit_text(result) + + def run_batch( api_key: str, sources: list[str], @@ -312,7 +373,8 @@ def run_batch( code; a re-run resumes from the sidecars and retries only the failures. """ items = [_Item(source) for source in sources] - with _progress_table(items, json_mode=json_mode): + reduce_active = bool(transform.reduce_prompts) + with _progress_table(items, json_mode=json_mode, reduce_active=reduce_active): _drain( api_key, items, @@ -323,3 +385,5 @@ def run_batch( json_mode=json_mode, ) _summarize(items, json_mode=json_mode, quiet=quiet) + if reduce_active: + _run_reduce(api_key, items, transform=transform, json_mode=json_mode) diff --git a/tests/test_transcribe_reduce.py b/tests/test_transcribe_reduce.py index c3061afc..5b0f54ae 100644 --- a/tests/test_transcribe_reduce.py +++ b/tests/test_transcribe_reduce.py @@ -7,10 +7,12 @@ from __future__ import annotations import dataclasses +import json import pytest from typer.testing import CliRunner +from aai_cli.app.transcribe import batch as transcribe_batch from aai_cli.app.transcribe import run as transcribe_run from aai_cli.core import config from aai_cli.main import app @@ -31,6 +33,19 @@ def _auth() -> None: config.set_api_key("default", "sk_live") +def _fake_transcript(mocker, source): + t = mocker.MagicMock() + t.id = f"t_{source}" + t.text = f"text of {source}" + t.status = "completed" + t.json_response = {"id": t.id, "text": t.text, "status": "completed"} + return t + + +def _ndjson(result): + return [json.loads(line) for line in result.output.splitlines() if line.startswith("{")] + + _DEFAULT_OPTS = transcribe_run.TranscribeOptions( source=None, sample=False, @@ -122,3 +137,105 @@ def test_single_source_runs_reduce_as_chain_step(mocker): # Two chain steps ran: --llm then --llm-reduce, over the one transcript. assert transform.call_count == 2 assert "reduced" in result.output + + +def test_batch_reduce_feeds_map_outputs(mocker, monkeypatch): + _auth() + monkeypatch.setattr( + _TRANSCRIBE, lambda api_key, audio, *, config: _fake_transcript(mocker, audio) + ) + mocker.patch(_TRANSFORM, side_effect=["JUDGED a", "JUDGED b", "FINAL"]) + captured = {} + + def spy(api_key, prompts, *, transcript_text, model, max_tokens): + captured["text"] = transcript_text + captured["prompts"] = prompts + return "FINAL" + + monkeypatch.setattr(transcribe_batch.llm, "run_chain", spy) + result = runner.invoke( + app, + ["transcribe", "--from-stdin", "--llm", "judge", "--llm-reduce", "rank"], + input="https://a\nhttps://b\n", + ) + assert result.exit_code == 0, result.output + assert "### Source: https://a" in captured["text"] + assert "JUDGED a" in captured["text"] and "JUDGED b" in captured["text"] + assert captured["prompts"] == ["rank"] + assert "FINAL" in result.output + + +def test_batch_reduce_falls_back_to_transcript_text(mocker, monkeypatch): + _auth() + monkeypatch.setattr( + _TRANSCRIBE, lambda api_key, audio, *, config: _fake_transcript(mocker, audio) + ) + captured = {} + + def spy(api_key, prompts, *, transcript_text, model, max_tokens): + captured["text"] = transcript_text + return "FINAL" + + monkeypatch.setattr(transcribe_batch.llm, "run_chain", spy) + result = runner.invoke( + app, + ["transcribe", "--from-stdin", "--llm-reduce", "summarize"], + input="https://a\n", + ) + assert result.exit_code == 0, result.output + assert "text of https://a" in captured["text"] + + +def test_batch_reduce_emits_json_record(mocker, monkeypatch): + _auth() + monkeypatch.setattr( + _TRANSCRIBE, lambda api_key, audio, *, config: _fake_transcript(mocker, audio) + ) + monkeypatch.setattr( + transcribe_batch.llm, + "run_chain", + lambda api_key, prompts, *, transcript_text, model, max_tokens: "FINAL", + ) + result = runner.invoke( + app, + ["transcribe", "--from-stdin", "--llm-reduce", "summarize", "--json"], + input="https://a\n", + ) + assert result.exit_code == 0, result.output + records = _ndjson(result) + reduce_records = [r for r in records if r.get("type") == "reduce"] + assert len(reduce_records) == 1 + assert reduce_records[0]["output"] == "FINAL" + assert reduce_records[0]["prompts"] == ["summarize"] + + +def test_batch_reduce_routes_table_to_stderr(mocker, monkeypatch, capsys): + """run_batch sends the reduce result to stdout and the progress table to stderr.""" + import assemblyai as aai + + _auth() + monkeypatch.setattr( + _TRANSCRIBE, lambda api_key, audio, *, config: _fake_transcript(mocker, audio) + ) + monkeypatch.setattr( + transcribe_batch.llm, + "run_chain", + lambda api_key, prompts, *, transcript_text, model, max_tokens: "AGGREGATE", + ) + transform = transcribe_run.TransformOptions( + prompts=[], model="m", max_tokens=10, reduce_prompts=["summarize"] + ) + transcribe_batch.run_batch( + "sk_live", + ["https://a"], + transcription_config=aai.TranscriptionConfig(), + concurrency=1, + force=False, + transform=transform, + json_mode=False, + quiet=False, + ) + out, err = capsys.readouterr() + assert "AGGREGATE" in out + assert "AGGREGATE" not in err + assert "https://a" in err From c2a04fafb67ddc7e11f913e16da302b2e4270d69 Mon Sep 17 00:00:00 2001 From: Alex Kroman Date: Tue, 16 Jun 2026 08:56:45 -0700 Subject: [PATCH 06/12] docs(transcribe): document --llm-reduce and the reduce NDJSON event Co-Authored-By: Claude Opus 4.8 (1M context) --- README.md | 12 ++++++++++++ REFERENCE.md | 10 +++++++++- tests/__snapshots__/test_snapshots_help_run.ambr | 5 +++++ 3 files changed, 26 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 027bf02a..150fedde 100644 --- a/README.md +++ b/README.md @@ -134,6 +134,18 @@ assembly stream -o text | grep --line-buffered -i alex \ assembly transcribe --sample --llm "summarize" --llm "translate the summary to French" ``` +**Score diarization quality across several videos** — pipe a list of URLs into batch mode (`--from-stdin`), transcribe them in parallel with speaker labels, have an LLM judge each transcript, then use `--llm-reduce` to run one prompt over all the results for a single aggregate verdict: + +```sh +printf '%s\n' \ + https://youtu.be/RC5zRvqnRm8 \ + https://youtu.be/u9S41Kplsbs \ + https://youtu.be/mP31CdpGzUY \ +| assembly transcribe --from-stdin --concurrency 3 --speaker-labels \ + --llm 'Judge diarization quality; output JSON {speaker_count, issues, score}' \ + --llm-reduce 'Rank these videos worst-to-best and summarize the failure modes' +``` + **Talk to a voice agent in your terminal** — full-duplex, around 20 voices: ```sh diff --git a/REFERENCE.md b/REFERENCE.md index 0e13314c..b45f7643 100644 --- a/REFERENCE.md +++ b/REFERENCE.md @@ -79,7 +79,15 @@ each carrying a `"type"` field to dispatch on: | `assembly agent-cascade --json` | `session.ready`, `transcript.user.delta`, `transcript.user`, `reply.started`, `transcript.agent`, `reply.done` | | `assembly dictate --json` | `utterance` | | `assembly llm --follow --json` | `answer` | -| `assembly transcribe --json` | `result` (one per source) | +| `assembly transcribe --json` | `result` (one per source), then `reduce` if `--llm-reduce` is set | New event types may be added; existing fields are stable. Consumers should ignore types they don't recognize. + +With `--llm-reduce`, batch mode emits one final +`{"type":"reduce","model","prompts","output"}` record after the per-source +`result` records — the aggregate prompt(s) run once over every result, with the +output printed to stdout (the progress table is routed to stderr so stdout stays +clean for piping). `--llm-reduce` is repeatable, each prompt running on the +previous one's output; for a single source it extends the `--llm` chain over +that transcript. diff --git a/tests/__snapshots__/test_snapshots_help_run.ambr b/tests/__snapshots__/test_snapshots_help_run.ambr index 1e5b4073..489fb8d7 100644 --- a/tests/__snapshots__/test_snapshots_help_run.ambr +++ b/tests/__snapshots__/test_snapshots_help_run.ambr @@ -1155,6 +1155,11 @@ │ Gateway. Repeatable: each prompt runs on the │ │ previous one's response (a chain), the first on │ │ the transcript. │ + │ --llm-reduce TEXT Run one LLM-Gateway prompt over all batch │ + │ results (a reduce). Repeatable: each runs on │ + │ the previous one's output. For a single source │ + │ it extends the --llm chain over that │ + │ transcript. │ │ --model TEXT LLM Gateway model │ │ [default: claude-haiku-4-5-20251001] │ │ --max-tokens INTEGER Max tokens [default: 1000] │ From 2c00d3d8564803719de0b40c7b2d5e1a65ad917e Mon Sep 17 00:00:00 2001 From: Alex Kroman Date: Tue, 16 Jun 2026 08:57:59 -0700 Subject: [PATCH 07/12] test: add llm_reduce to TranscribeOptions seam defaults Co-Authored-By: Claude Opus 4.8 (1M context) --- tests/test_command_options_seam.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/test_command_options_seam.py b/tests/test_command_options_seam.py index d2088394..cd623599 100644 --- a/tests/test_command_options_seam.py +++ b/tests/test_command_options_seam.py @@ -71,6 +71,7 @@ config_kv=None, config_file=None, llm_prompt=None, + llm_reduce=None, model=llm.DEFAULT_MODEL, max_tokens=llm.DEFAULT_MAX_TOKENS, output_field=None, From b51a9b70028dfcdf0cf64b7c1f943a9a6e2e59df Mon Sep 17 00:00:00 2001 From: Alex Kroman Date: Tue, 16 Jun 2026 09:06:25 -0700 Subject: [PATCH 08/12] test: use explicit replace kwargs for mypy in reduce seam test Co-Authored-By: Claude Opus 4.8 (1M context) --- tests/test_transcribe_reduce.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/tests/test_transcribe_reduce.py b/tests/test_transcribe_reduce.py index 5b0f54ae..4e8ca5e6 100644 --- a/tests/test_transcribe_reduce.py +++ b/tests/test_transcribe_reduce.py @@ -101,13 +101,10 @@ def _ndjson(result): ) -def _defaults(**overrides: object) -> transcribe_run.TranscribeOptions: - """A minimal TranscribeOptions for seam tests; override only what matters.""" - return dataclasses.replace(_DEFAULT_OPTS, **overrides) - - def test_transform_options_carries_reduce_prompts() -> None: - opts = _defaults(llm_prompt=["judge"], llm_reduce=["rank", "summarize"]) + opts = dataclasses.replace( + _DEFAULT_OPTS, llm_prompt=["judge"], llm_reduce=["rank", "summarize"] + ) transform = opts.transform_options() assert transform.prompts == ["judge"] assert transform.reduce_prompts == ["rank", "summarize"] From 1158455b3bda5d927964cfbf6ac8e01ffcd061a0 Mon Sep 17 00:00:00 2001 From: Alex Kroman Date: Tue, 16 Jun 2026 09:25:16 -0700 Subject: [PATCH 09/12] test: regenerate t-alias help snapshot for --llm-reduce Co-Authored-By: Claude Opus 4.8 (1M context) --- tests/__snapshots__/test_snapshots_help_run.ambr | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/tests/__snapshots__/test_snapshots_help_run.ambr b/tests/__snapshots__/test_snapshots_help_run.ambr index 489fb8d7..74c27476 100644 --- a/tests/__snapshots__/test_snapshots_help_run.ambr +++ b/tests/__snapshots__/test_snapshots_help_run.ambr @@ -977,6 +977,11 @@ │ Gateway. Repeatable: each prompt runs on the │ │ previous one's response (a chain), the first on │ │ the transcript. │ + │ --llm-reduce TEXT Run one LLM-Gateway prompt over all batch │ + │ results (a reduce). Repeatable: each runs on │ + │ the previous one's output. For a single source │ + │ it extends the --llm chain over that │ + │ transcript. │ │ --model TEXT LLM Gateway model │ │ [default: claude-haiku-4-5-20251001] │ │ --max-tokens INTEGER Max tokens [default: 1000] │ From bb6ce6ef6613d8e3f2c59c93738223f821da0adc Mon Sep 17 00:00:00 2001 From: Alex Kroman Date: Tue, 16 Jun 2026 09:31:44 -0700 Subject: [PATCH 10/12] test: cover reduce helper edge branches (empty input, status skip) Co-Authored-By: Claude Opus 4.8 (1M context) --- tests/test_transcribe_reduce.py | 33 +++++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/tests/test_transcribe_reduce.py b/tests/test_transcribe_reduce.py index 4e8ca5e6..41425f0b 100644 --- a/tests/test_transcribe_reduce.py +++ b/tests/test_transcribe_reduce.py @@ -117,6 +117,39 @@ def test_chain_appends_reduce_to_map() -> None: assert transform.chain() == ["a", "b"] +def test_reduce_input_prefers_map_output_then_text_then_empty() -> None: + # The last --llm step's output wins over the transcript text. + assert ( + transcribe_batch._reduce_input( + {"transform": {"steps": [{"output": "judged"}]}, "transcript": {"text": "raw"}} + ) + == "judged" + ) + # Falls back to transcript text when no --llm step ran. + assert transcribe_batch._reduce_input({"transcript": {"text": "raw text"}}) == "raw text" + # Empty when the record carries neither. + assert transcribe_batch._reduce_input({}) == "" + + +def test_gather_reduce_inputs_skips_non_completed_items() -> None: + done = transcribe_batch._Item("https://a", status="completed") + failed = transcribe_batch._Item("https://b", status="failed") + transcribe_batch._dump_sidecar( + transcribe_batch.sidecar_path("https://a"), + {"status": "completed", "transcript": {"text": "alpha text"}}, + ) + # `b` has a perfectly valid completed sidecar; only its item status excludes it, + # so dropping the status guard would wrongly pull "beta text" into the reduce. + transcribe_batch._dump_sidecar( + transcribe_batch.sidecar_path("https://b"), + {"status": "completed", "transcript": {"text": "beta text"}}, + ) + combined = transcribe_batch._gather_reduce_inputs([done, failed]) + assert "### Source: https://a" in combined + assert "alpha text" in combined + assert "beta text" not in combined + + def test_single_source_runs_reduce_as_chain_step(mocker): _auth() mocker.patch( From 11a47c92be7922d8018779fde0a0d725b0a4ff58 Mon Sep 17 00:00:00 2001 From: Alex Kroman Date: Tue, 16 Jun 2026 09:45:07 -0700 Subject: [PATCH 11/12] refactor(transcribe): make _progress_table reduce_active explicit Kills a mutation-gate survivor: the default was dead since run_batch always passes reduce_active. Co-Authored-By: Claude Opus 4.8 (1M context) --- aai_cli/app/transcribe/batch.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/aai_cli/app/transcribe/batch.py b/aai_cli/app/transcribe/batch.py index c547c29e..76cd6c7a 100644 --- a/aai_cli/app/transcribe/batch.py +++ b/aai_cli/app/transcribe/batch.py @@ -227,9 +227,7 @@ def _render_table(items: list[_Item]) -> Table: @contextmanager -def _progress_table( - items: list[_Item], *, json_mode: bool, reduce_active: bool = False -) -> Generator[None]: +def _progress_table(items: list[_Item], *, json_mode: bool, reduce_active: bool) -> Generator[None]: """Render the batch as a live-updating table (human mode). Rich renders nothing while running on a non-interactive console and prints the From 7d722a98f3db3215089f046b12fd7e46b451701d Mon Sep 17 00:00:00 2001 From: Alex Kroman Date: Tue, 16 Jun 2026 09:50:08 -0700 Subject: [PATCH 12/12] fix(transcribe): skip --llm-reduce call when there is nothing to reduce Avoids a billable LLM-Gateway call (and junk on stdout) when every source's transcript text and --llm output are empty; warns on stderr instead. Closes the empty-combined-text gap from final review. Co-Authored-By: Claude Opus 4.8 (1M context) --- aai_cli/app/transcribe/batch.py | 8 +++++++ tests/test_transcribe_reduce.py | 41 +++++++++++++++++++++++++++++++++ 2 files changed, 49 insertions(+) diff --git a/aai_cli/app/transcribe/batch.py b/aai_cli/app/transcribe/batch.py index 76cd6c7a..14a12a0e 100644 --- a/aai_cli/app/transcribe/batch.py +++ b/aai_cli/app/transcribe/batch.py @@ -333,6 +333,14 @@ def _run_reduce( ) -> None: """Run the --llm-reduce chain once over every source's result; print to stdout.""" combined = _gather_reduce_inputs(items) + if not combined: + # Every source had empty transcript text and no --llm output, so there is + # nothing to aggregate — skip the (billable) Gateway call rather than prompt + # it over an empty transcript and print a meaningless answer to stdout. + output.emit_warning( + "Nothing to reduce: no transcript text across sources.", json_mode=json_mode + ) + return result = llm.run_chain( api_key, transform.reduce_prompts, diff --git a/tests/test_transcribe_reduce.py b/tests/test_transcribe_reduce.py index 41425f0b..25dedce6 100644 --- a/tests/test_transcribe_reduce.py +++ b/tests/test_transcribe_reduce.py @@ -269,3 +269,44 @@ def test_batch_reduce_routes_table_to_stderr(mocker, monkeypatch, capsys): assert "AGGREGATE" in out assert "AGGREGATE" not in err assert "https://a" in err + + +def test_batch_reduce_skips_when_nothing_to_reduce(mocker, monkeypatch, capsys): + """An all-empty batch result must not fire a (billable) reduce call.""" + import assemblyai as aai + + _auth() + + def _empty(api_key, audio, *, config): + t = mocker.MagicMock() + t.id = "t_a" + t.text = "" + t.status = "completed" + t.json_response = {"id": "t_a", "text": "", "status": "completed"} + return t + + monkeypatch.setattr(_TRANSCRIBE, _empty) + calls = {"n": 0} + + def _spy(*args, **kwargs): + calls["n"] += 1 + return "SHOULD NOT RUN" + + monkeypatch.setattr(transcribe_batch.llm, "run_chain", _spy) + transform = transcribe_run.TransformOptions( + prompts=[], model="m", max_tokens=10, reduce_prompts=["summarize"] + ) + transcribe_batch.run_batch( + "sk_live", + ["https://a"], + transcription_config=aai.TranscriptionConfig(), + concurrency=1, + force=False, + transform=transform, + json_mode=False, + quiet=False, + ) + out, err = capsys.readouterr() + assert calls["n"] == 0 + assert "SHOULD NOT RUN" not in out + assert "Nothing to reduce" in err