diff --git a/aai_cli/app/transcribe/batch.py b/aai_cli/app/transcribe/batch.py index 0e192193..4745cd91 100644 --- a/aai_cli/app/transcribe/batch.py +++ b/aai_cli/app/transcribe/batch.py @@ -34,6 +34,7 @@ from aai_cli.app.transcribe import run as transcribe_exec from aai_cli.app.transcribe.sources import SIDECAR_SUFFIX, URL_PREFIXES +from aai_cli.app.transform import emit_reduce from aai_cli.core import client, jsonshape, llm, remotefs from aai_cli.core.errors import CLIError, NotAuthenticated from aai_cli.ui import output, theme @@ -317,17 +318,19 @@ def _reduce_input(record: dict[str, object]) -> str: return "" -def _gather_reduce_inputs(items: list[_Item]) -> str: - """Concatenate each completed/skipped source's reduce input under a header.""" - blocks: list[str] = [] +def _gather_reduce_inputs(items: list[_Item]) -> list[tuple[str, str]]: + """Each completed/skipped source's ``(source, reduce-input-text)`` contribution. + + Failed/queued sources are dropped; empty-text contributions are kept and filtered + by `emit_reduce`, which owns the "nothing to reduce" policy. + """ + contributions: list[tuple[str, str]] = [] for item in items: if item.status not in ("completed", "skipped"): continue record = resumable_record(sidecar_path(item.source), digest=None) - text = _reduce_input(record) if record is not None else "" - if text: - blocks.append(f"### Source: {item.source}\n{text}") - return "\n\n".join(blocks) + contributions.append((item.source, _reduce_input(record) if record is not None else "")) + return contributions def _run_reduce( @@ -338,34 +341,16 @@ def _run_reduce( json_mode: bool, ) -> None: """Run the --llm-reduce chain once over every source's result; print to stdout.""" - combined = _gather_reduce_inputs(items) - if not combined: - # Every source had empty transcript text and no --llm output, so there is - # nothing to aggregate — skip the (billable) Gateway call rather than prompt - # it over an empty transcript and print a meaningless answer to stdout. - output.emit_warning( - "Nothing to reduce: no transcript text across sources.", json_mode=json_mode - ) - return - result = llm.run_chain( + emit_reduce( api_key, - transform.reduce_prompts, - transcript_text=combined, + _gather_reduce_inputs(items), + prompts=transform.reduce_prompts, model=transform.model, max_tokens=transform.max_tokens, + block_label="Source", + empty_noun="sources", + json_mode=json_mode, ) - if json_mode: - # Additive NDJSON event after the per-source {"type":"result"} records. - output.emit_ndjson( - { - "type": "reduce", - "model": transform.model, - "prompts": transform.reduce_prompts, - "output": result, - } - ) - else: - output.emit_text(result) def run_batch( diff --git a/aai_cli/app/transcribe/run.py b/aai_cli/app/transcribe/run.py index 2b190dc5..06224572 100644 --- a/aai_cli/app/transcribe/run.py +++ b/aai_cli/app/transcribe/run.py @@ -10,12 +10,13 @@ import tempfile from dataclasses import dataclass from pathlib import Path -from typing import Any, NamedTuple +from typing import NamedTuple import assemblyai as aai from rich.markup import escape from aai_cli import code_gen +from aai_cli.app import transform as transform_delivery from aai_cli.app.context import AppState from aai_cli.app.transcribe import render as transcribe_render from aai_cli.app.transcribe import sources as transcribe_sources @@ -26,14 +27,6 @@ from aai_cli.ui import output -def render_transform_steps(d: dict[str, Any]) -> str: - """Human view of chained LLM-Gateway steps: the lone output, or each step labeled.""" - steps = d["transform"]["steps"] - if len(steps) == 1: - return str(steps[0]["output"]) - return "\n\n".join(f"Step {i} — {s['prompt']}:\n{s['output']}" for i, s in enumerate(steps, 1)) - - def out_payload( transcript: aai.Transcript, output_field: choices.TranscriptOutput | None, @@ -163,11 +156,8 @@ def deliver_result( model=transform.model, max_tokens=transform.max_tokens, ) - output.emit( - client.transcript_summary(transcript) - | {"transform": {"model": transform.model, "steps": steps}}, - render_transform_steps, - json_mode=json_mode, + transform_delivery.emit_transform( + transcript, model=transform.model, steps=steps, json_mode=json_mode ) return diff --git a/aai_cli/app/transform.py b/aai_cli/app/transform.py new file mode 100644 index 00000000..804c480e --- /dev/null +++ b/aai_cli/app/transform.py @@ -0,0 +1,85 @@ +"""Shared LLM-Gateway map/reduce delivery for `transcribe` and `transcripts get`. + +Both commands post-process transcripts through the LLM Gateway the same way: a +per-transcript ``--llm`` *map* chain, then an optional ``--llm-reduce`` chain run +*once* over every result. The map render (the ``transform`` record shape and the +human/NDJSON split) and the reduce policy (concatenate each contribution under a +header, skip the billable call when there's nothing to reduce, emit the additive +``{"type": "reduce", …}`` record) used to be duplicated across +``commands/transcripts.py`` and ``app/transcribe/{run,batch}.py``. Centralized here +so the two callers can't drift on the record shapes, the headers, or the empty-input +guard. The low-level chain primitives stay in ``core.llm``; this is only the +delivery/rendering policy that sits above them. +""" + +from __future__ import annotations + +from typing import Any + +from aai_cli.core import client, llm +from aai_cli.ui import output + + +def render_transform_steps(d: dict[str, Any]) -> str: + """Human view of chained LLM-Gateway steps: the lone output, or each step labeled.""" + steps = d["transform"]["steps"] + if len(steps) == 1: + return str(steps[0]["output"]) + return "\n\n".join(f"Step {i} — {s['prompt']}:\n{s['output']}" for i, s in enumerate(steps, 1)) + + +def emit_transform( + transcript: object, + *, + model: str, + steps: list[dict[str, str]], + json_mode: bool, + batch: bool = False, +) -> None: + """Emit a transcript's ``--llm`` chain result. + + One NDJSON ``{"type": "transcript", …}`` record per id under ``--json`` in a batch + (so a downstream stage can map over the stream), otherwise the same JSON/human render + `transcribe` gives a single source. + """ + record = client.transcript_summary(transcript) | {"transform": {"model": model, "steps": steps}} + if json_mode and batch: + output.emit_ndjson({"type": "transcript", **record}) + else: + output.emit(record, render_transform_steps, json_mode=json_mode) + + +def emit_reduce( + api_key: str, + contributions: list[tuple[str, str]], + *, + prompts: list[str], + model: str, + max_tokens: int, + block_label: str, + empty_noun: str, + json_mode: bool, +) -> None: + """Run the ``--llm-reduce`` chain once over every contribution; print to stdout. + + ``contributions`` is one ``(id, text)`` per source — each non-empty one becomes a + ``### {block_label}: {id}`` block. When every contribution is empty the (billable) + Gateway call is skipped with a ``Nothing to reduce … across {empty_noun}`` warning + instead of prompting over nothing. Under ``--json`` the result is the additive + ``{"type": "reduce", …}`` record; otherwise the plain aggregate text. + """ + combined = "\n\n".join( + f"### {block_label}: {cid}\n{text}" for cid, text in contributions if text + ) + if not combined: + output.emit_warning( + f"Nothing to reduce: no transcript text across {empty_noun}.", json_mode=json_mode + ) + return + result = llm.run_chain( + api_key, prompts, transcript_text=combined, model=model, max_tokens=max_tokens + ) + if json_mode: + output.emit_ndjson({"type": "reduce", "model": model, "prompts": prompts, "output": result}) + else: + output.emit_text(result) diff --git a/aai_cli/auth/flow.py b/aai_cli/auth/flow.py index 35a174c5..b936dea7 100644 --- a/aai_cli/auth/flow.py +++ b/aai_cli/auth/flow.py @@ -1,6 +1,5 @@ from __future__ import annotations -import json import sys import webbrowser from dataclasses import dataclass @@ -9,6 +8,7 @@ from rich.markup import escape from aai_cli.auth import ams, discovery, endpoints, loopback +from aai_cli.core import jsonshape from aai_cli.core.errors import STDIN_KEY_RECIPE, APIError, NotAuthenticated from aai_cli.ui import output @@ -96,7 +96,7 @@ def _note(*, json_mode: bool, human: str, hint: str, url: str | None = None) -> payload: dict[str, object] = {"hint": hint} if url is not None: payload["url"] = url - sys.stderr.write(json.dumps(payload) + "\n") + sys.stderr.write(jsonshape.dumps(payload) + "\n") else: output.error_console.print(human) diff --git a/aai_cli/commands/account.py b/aai_cli/commands/account.py index 6db08d17..35eaac3c 100644 --- a/aai_cli/commands/account.py +++ b/aai_cli/commands/account.py @@ -11,7 +11,7 @@ from aai_cli import command_registry, help_panels, options from aai_cli.app.context import AppState, run_command from aai_cli.auth import ams -from aai_cli.core import jsonshape, timeparse +from aai_cli.core import choices, jsonshape, timeparse from aai_cli.core.errors import UsageError from aai_cli.ui import output from aai_cli.ui.help_text import examples_epilog @@ -34,11 +34,6 @@ def _utc_day_start(day: date) -> str: return datetime(day.year, day.month, day.day, tzinfo=UTC).isoformat() -# The AMS usage endpoint's recognized window sizes; anything else is silently -# misinterpreted server-side, so reject it client-side as a usage error. -_USAGE_WINDOWS = ("day", "week", "month") - - def _format_usage_number(value: object) -> str: number = jsonshape.as_float(value) if number.is_integer(): @@ -186,8 +181,8 @@ def usage( None, "--start", help="Start date (YYYY-MM-DD). Default: 30d ago." ), end: str | None = typer.Option(None, "--end", help="End date (YYYY-MM-DD). Default: today."), - window: str | None = typer.Option( - None, "--window", help="Window size: 'day', 'week', or 'month'" + window: choices.UsageWindow | None = typer.Option( + None, "--window", help="Aggregate usage by this window size" ), include_zero: bool = typer.Option( False, @@ -211,11 +206,6 @@ def body(state: AppState, json_mode: bool) -> None: f"--end {end_day.isoformat()} is before --start {start_day.isoformat()}.", suggestion="Pick an end date on or after the start date.", ) - if window is not None and window not in _USAGE_WINDOWS: - raise UsageError( - f"Invalid --window {window!r}.", - suggestion=f"Use one of: {', '.join(_USAGE_WINDOWS)}.", - ) start_date = _utc_day_start(start_day) end_date = _utc_day_start(end_day) _, jwt = state.resolve_session() diff --git a/aai_cli/commands/audit.py b/aai_cli/commands/audit.py index 60bffd68..0b6bd90d 100644 --- a/aai_cli/commands/audit.py +++ b/aai_cli/commands/audit.py @@ -63,8 +63,6 @@ def _is_login(entry: Mapping[str, object]) -> bool: def _actor_label(entry: Mapping[str, object]) -> str: actor_type = str(entry.get("actor_type") or "system") actor_id = entry.get("actor_id") - if actor_type == "system": - return "system" if actor_id is None else f"system #{actor_id}" return actor_type if actor_id is None else f"{actor_type} #{actor_id}" diff --git a/aai_cli/commands/transcripts.py b/aai_cli/commands/transcripts.py index ece29e66..4234cfc2 100644 --- a/aai_cli/commands/transcripts.py +++ b/aai_cli/commands/transcripts.py @@ -5,7 +5,7 @@ from aai_cli import command_registry, help_panels, options from aai_cli.app.context import AppState, run_command -from aai_cli.app.transcribe.run import render_transform_steps +from aai_cli.app.transform import emit_reduce, emit_transform from aai_cli.core import choices, client, llm, stdio, timeparse from aai_cli.core.errors import APIError, UsageError from aai_cli.ui import output, theme @@ -119,22 +119,6 @@ def _text_of(transcript: object) -> str: return str(getattr(transcript, "text", "") or "") -def _emit_transform( - transcript: object, - model: str, - steps: list[dict[str, str]], - *, - json_mode: bool, - batch: bool, -) -> None: - """Emit a transcript's ``--llm`` chain result: NDJSON per id in batch, else like `transcribe`.""" - record = client.transcript_summary(transcript) | {"transform": {"model": model, "steps": steps}} - if json_mode and batch: - output.emit_ndjson({"type": "transcript", **record}) - else: - output.emit(record, render_transform_steps, json_mode=json_mode) - - def _deliver_transcript( transcript: object, api_key: str, @@ -165,43 +149,13 @@ def _deliver_transcript( api_key, chain, transcript_id=_id_of(transcript), model=model, max_tokens=max_tokens ) if not suppress: - _emit_transform(transcript, model, steps, json_mode=json_mode, batch=batch) + emit_transform(transcript, model=model, steps=steps, json_mode=json_mode, batch=batch) return steps[-1]["output"] if steps else "" if not suppress: _emit_transcript(transcript, json_mode=json_mode, batch=batch) return _text_of(transcript) -def _run_reduce( - api_key: str, - contributions: list[tuple[str, str]], - *, - prompts: list[str], - model: str, - max_tokens: int, - json_mode: bool, -) -> None: - """Run the ``--llm-reduce`` chain once over every fetched transcript; print to stdout. - - Mirrors `transcribe`'s reduce: concatenate each id's contribution under a header, - skip the billable call when there's nothing to reduce, and emit the same additive - ``{"type": "reduce", …}`` NDJSON record under --json. - """ - combined = "\n\n".join(f"### Transcript: {tid}\n{text}" for tid, text in contributions if text) - if not combined: - output.emit_warning( - "Nothing to reduce: no transcript text across ids.", json_mode=json_mode - ) - return - result = llm.run_chain( - api_key, prompts, transcript_text=combined, model=model, max_tokens=max_tokens - ) - if json_mode: - output.emit_ndjson({"type": "reduce", "model": model, "prompts": prompts, "output": result}) - else: - output.emit_text(result) - - @app.command( epilog=examples_epilog( [ @@ -312,12 +266,14 @@ def body(state: AppState, json_mode: bool) -> None: ) contributions.append((tid, contribution)) if do_reduce: - _run_reduce( + emit_reduce( api_key, contributions, prompts=reduce_prompts, model=model, max_tokens=max_tokens, + block_label="Transcript", + empty_noun="ids", json_mode=json_mode, ) diff --git a/aai_cli/core/choices.py b/aai_cli/core/choices.py index fe3d4da6..4800497b 100644 --- a/aai_cli/core/choices.py +++ b/aai_cli/core/choices.py @@ -51,6 +51,14 @@ class ConfigKey(enum.StrEnum): telemetry_enabled = "telemetry_enabled" +class UsageWindow(enum.StrEnum): + """Window sizes the AMS usage endpoint aggregates by (`account usage --window`).""" + + day = "day" + week = "week" + month = "month" + + class ColorMode(enum.StrEnum): """The conventional tri-state for ANSI color (`--color`), matching git/gh/cargo.""" diff --git a/tests/__snapshots__/test_snapshots_help_account.ambr b/tests/__snapshots__/test_snapshots_help_account.ambr index 254c96ea..c6bbafdd 100644 --- a/tests/__snapshots__/test_snapshots_help_account.ambr +++ b/tests/__snapshots__/test_snapshots_help_account.ambr @@ -216,16 +216,20 @@ Show usage over a date range (default: last 30 days) ╭─ Options ────────────────────────────────────────────────────────────────────╮ - │ --start TEXT Start date (YYYY-MM-DD). Default: 30d │ - │ ago. │ - │ --end TEXT End date (YYYY-MM-DD). Default: today. │ - │ --window TEXT Window size: 'day', 'week', or 'month' │ - │ --include-zero,--all Include zero-usage windows (matches │ - │ --include-logins on `assembly audit`) │ - │ --json -j Output raw JSON │ - │ --output -o FIELDS Project fields from the JSON result │ - │ (comma-separated, e.g. id,status) │ - │ --help Show this message and exit. │ + │ --start TEXT Start date (YYYY-MM-DD). │ + │ Default: 30d ago. │ + │ --end TEXT End date (YYYY-MM-DD). │ + │ Default: today. │ + │ --window [day|week|month] Aggregate usage by this │ + │ window size │ + │ --include-zero,--all Include zero-usage windows │ + │ (matches --include-logins on │ + │ `assembly audit`) │ + │ --json -j Output raw JSON │ + │ --output -o FIELDS Project fields from the JSON │ + │ result (comma-separated, │ + │ e.g. id,status) │ + │ --help Show this message and exit. │ ╰──────────────────────────────────────────────────────────────────────────────╯ Examples diff --git a/tests/test_account_command.py b/tests/test_account_command.py index 7157a696..9efa56ed 100644 --- a/tests/test_account_command.py +++ b/tests/test_account_command.py @@ -53,7 +53,7 @@ def test_usage_defaults_date_range_and_renders(mocker): captured = {} def fake_usage(jwt, start, end, window): - captured["start"], captured["end"] = start, end + captured["start"], captured["end"], captured["window"] = start, end, window return { "usage_items": [ { @@ -68,6 +68,8 @@ def fake_usage(jwt, start, end, window): mocker.patch("aai_cli.commands.account.ams.get_usage", autospec=True, side_effect=fake_usage) result = runner.invoke(app, ["usage", "--json"]) assert result.exit_code == 0 + # --window defaults to None when omitted, so AMS aggregates over the whole range. + assert captured["window"] is None # both bounds are tz-aware UTC ISO-8601 timestamps, defaulted when not passed # (AMS rejects naive datetimes with a 400). for bound in (captured["start"], captured["end"]): @@ -415,8 +417,8 @@ def test_usage_allows_equal_start_and_end(mocker): def test_usage_rejects_unknown_window(monkeypatch, mocker): - # --window was free text silently misinterpreted server-side; now it's validated - # client-side, before session resolution or any AMS call. + # --window is a closed choice set (choices.UsageWindow), so Typer rejects an + # unknown value at parse time — before session resolution or any AMS call. monkeypatch.setattr("aai_cli.app.context._interactive_session", lambda: True) monkeypatch.setattr( "aai_cli.auth.run_login_flow", @@ -425,8 +427,8 @@ def test_usage_rejects_unknown_window(monkeypatch, mocker): get_usage = mocker.patch("aai_cli.commands.account.ams.get_usage", autospec=True) result = runner.invoke(app, ["usage", "--window", "fortnight"]) assert result.exit_code == 2 - assert "Invalid --window" in result.output - assert "day, week, month" in result.output + assert "'fortnight' is not one of" in result.output + assert "'day'" in result.output # the valid choices are listed for the user get_usage.assert_not_called() diff --git a/tests/test_transcribe_reduce.py b/tests/test_transcribe_reduce.py index 410a99b3..18440eec 100644 --- a/tests/test_transcribe_reduce.py +++ b/tests/test_transcribe_reduce.py @@ -12,6 +12,7 @@ import pytest from typer.testing import CliRunner +from aai_cli.app import transform from aai_cli.app.transcribe import batch as transcribe_batch from aai_cli.app.transcribe import run as transcribe_run from aai_cli.core import config @@ -101,6 +102,22 @@ def _ndjson(result): ) +def test_emit_transform_defaults_to_single_source_render_not_batch_ndjson(mocker, capsys) -> None: + # `transcribe` calls emit_transform without `batch`, relying on the default False: + # a single source's --llm result renders via output.emit (the transcript payload + # under a "transform" key), NOT the batch-only `{"type": "transcript", …}` NDJSON + # stream record. Pins the batch=False default (flipping it would tag the output). + mocker.patch( + "aai_cli.app.transform.client.transcript_summary", return_value={"id": "t1", "text": "hi"} + ) + transform.emit_transform( + object(), model="m", steps=[{"prompt": "p", "output": "o"}], json_mode=True + ) + payload = json.loads(capsys.readouterr().out) + assert "type" not in payload # not the batch NDJSON record + assert payload["transform"]["steps"][0]["output"] == "o" + + def test_transform_options_carries_reduce_prompts() -> None: opts = dataclasses.replace( _DEFAULT_OPTS, llm_prompt=["judge"], llm_reduce=["rank", "summarize"] @@ -144,10 +161,11 @@ def test_gather_reduce_inputs_skips_non_completed_items() -> None: transcribe_batch.sidecar_path("https://b"), {"status": "completed", "transcript": {"text": "beta text"}}, ) - combined = transcribe_batch._gather_reduce_inputs([done, failed]) - assert "### Source: https://a" in combined - assert "alpha text" in combined - assert "beta text" not in combined + contributions = transcribe_batch._gather_reduce_inputs([done, failed]) + # Only the completed source contributes; the failed one is dropped despite its + # valid sidecar (so "beta text" can't leak into the reduce). The "### Source:" + # header is now added by `emit_reduce`, asserted in test_batch_reduce_feeds_map_outputs. + assert contributions == [("https://a", "alpha text")] def test_single_source_runs_reduce_as_chain_step(mocker):