Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 16 additions & 31 deletions aai_cli/app/transcribe/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

from aai_cli.app.transcribe import run as transcribe_exec
from aai_cli.app.transcribe.sources import SIDECAR_SUFFIX, URL_PREFIXES
from aai_cli.app.transform import emit_reduce
from aai_cli.core import client, jsonshape, llm, remotefs
from aai_cli.core.errors import CLIError, NotAuthenticated
from aai_cli.ui import output, theme
Expand Down Expand Up @@ -317,17 +318,19 @@ def _reduce_input(record: dict[str, object]) -> str:
return ""


def _gather_reduce_inputs(items: list[_Item]) -> str:
"""Concatenate each completed/skipped source's reduce input under a header."""
blocks: list[str] = []
def _gather_reduce_inputs(items: list[_Item]) -> list[tuple[str, str]]:
"""Each completed/skipped source's ``(source, reduce-input-text)`` contribution.

Failed/queued sources are dropped; empty-text contributions are kept and filtered
by `emit_reduce`, which owns the "nothing to reduce" policy.
"""
contributions: list[tuple[str, str]] = []
for item in items:
if item.status not in ("completed", "skipped"):
continue
record = resumable_record(sidecar_path(item.source), digest=None)
text = _reduce_input(record) if record is not None else ""
if text:
blocks.append(f"### Source: {item.source}\n{text}")
return "\n\n".join(blocks)
contributions.append((item.source, _reduce_input(record) if record is not None else ""))
return contributions


def _run_reduce(
Expand All @@ -338,34 +341,16 @@ def _run_reduce(
json_mode: bool,
) -> None:
"""Run the --llm-reduce chain once over every source's result; print to stdout."""
combined = _gather_reduce_inputs(items)
if not combined:
# Every source had empty transcript text and no --llm output, so there is
# nothing to aggregate — skip the (billable) Gateway call rather than prompt
# it over an empty transcript and print a meaningless answer to stdout.
output.emit_warning(
"Nothing to reduce: no transcript text across sources.", json_mode=json_mode
)
return
result = llm.run_chain(
emit_reduce(
api_key,
transform.reduce_prompts,
transcript_text=combined,
_gather_reduce_inputs(items),
prompts=transform.reduce_prompts,
model=transform.model,
max_tokens=transform.max_tokens,
block_label="Source",
empty_noun="sources",
json_mode=json_mode,
)
if json_mode:
# Additive NDJSON event after the per-source {"type":"result"} records.
output.emit_ndjson(
{
"type": "reduce",
"model": transform.model,
"prompts": transform.reduce_prompts,
"output": result,
}
)
else:
output.emit_text(result)


def run_batch(
Expand Down
18 changes: 4 additions & 14 deletions aai_cli/app/transcribe/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,13 @@
import tempfile
from dataclasses import dataclass
from pathlib import Path
from typing import Any, NamedTuple
from typing import NamedTuple

import assemblyai as aai
from rich.markup import escape

from aai_cli import code_gen
from aai_cli.app import transform as transform_delivery
from aai_cli.app.context import AppState
from aai_cli.app.transcribe import render as transcribe_render
from aai_cli.app.transcribe import sources as transcribe_sources
Expand All @@ -26,14 +27,6 @@
from aai_cli.ui import output


def render_transform_steps(d: dict[str, Any]) -> str:
"""Human view of chained LLM-Gateway steps: the lone output, or each step labeled."""
steps = d["transform"]["steps"]
if len(steps) == 1:
return str(steps[0]["output"])
return "\n\n".join(f"Step {i} — {s['prompt']}:\n{s['output']}" for i, s in enumerate(steps, 1))


def out_payload(
transcript: aai.Transcript,
output_field: choices.TranscriptOutput | None,
Expand Down Expand Up @@ -163,11 +156,8 @@ def deliver_result(
model=transform.model,
max_tokens=transform.max_tokens,
)
output.emit(
client.transcript_summary(transcript)
| {"transform": {"model": transform.model, "steps": steps}},
render_transform_steps,
json_mode=json_mode,
transform_delivery.emit_transform(
transcript, model=transform.model, steps=steps, json_mode=json_mode
)
return

Expand Down
85 changes: 85 additions & 0 deletions aai_cli/app/transform.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
"""Shared LLM-Gateway map/reduce delivery for `transcribe` and `transcripts get`.

Both commands post-process transcripts through the LLM Gateway the same way: a
per-transcript ``--llm`` *map* chain, then an optional ``--llm-reduce`` chain run
*once* over every result. The map render (the ``transform`` record shape and the
human/NDJSON split) and the reduce policy (concatenate each contribution under a
header, skip the billable call when there's nothing to reduce, emit the additive
``{"type": "reduce", …}`` record) used to be duplicated across
``commands/transcripts.py`` and ``app/transcribe/{run,batch}.py``. Centralized here
so the two callers can't drift on the record shapes, the headers, or the empty-input
guard. The low-level chain primitives stay in ``core.llm``; this is only the
delivery/rendering policy that sits above them.
"""

from __future__ import annotations

from typing import Any

from aai_cli.core import client, llm
from aai_cli.ui import output


def render_transform_steps(d: dict[str, Any]) -> str:
"""Human view of chained LLM-Gateway steps: the lone output, or each step labeled."""
steps = d["transform"]["steps"]
if len(steps) == 1:
return str(steps[0]["output"])
return "\n\n".join(f"Step {i} — {s['prompt']}:\n{s['output']}" for i, s in enumerate(steps, 1))


def emit_transform(
transcript: object,
*,
model: str,
steps: list[dict[str, str]],
json_mode: bool,
batch: bool = False,
) -> None:
"""Emit a transcript's ``--llm`` chain result.

One NDJSON ``{"type": "transcript", …}`` record per id under ``--json`` in a batch
(so a downstream stage can map over the stream), otherwise the same JSON/human render
`transcribe` gives a single source.
"""
record = client.transcript_summary(transcript) | {"transform": {"model": model, "steps": steps}}
if json_mode and batch:
output.emit_ndjson({"type": "transcript", **record})
else:
output.emit(record, render_transform_steps, json_mode=json_mode)


def emit_reduce(
api_key: str,
contributions: list[tuple[str, str]],
*,
prompts: list[str],
model: str,
max_tokens: int,
block_label: str,
empty_noun: str,
json_mode: bool,
) -> None:
"""Run the ``--llm-reduce`` chain once over every contribution; print to stdout.

``contributions`` is one ``(id, text)`` per source — each non-empty one becomes a
``### {block_label}: {id}`` block. When every contribution is empty the (billable)
Gateway call is skipped with a ``Nothing to reduce … across {empty_noun}`` warning
instead of prompting over nothing. Under ``--json`` the result is the additive
``{"type": "reduce", …}`` record; otherwise the plain aggregate text.
"""
combined = "\n\n".join(
f"### {block_label}: {cid}\n{text}" for cid, text in contributions if text
)
if not combined:
output.emit_warning(
f"Nothing to reduce: no transcript text across {empty_noun}.", json_mode=json_mode
)
return
result = llm.run_chain(
api_key, prompts, transcript_text=combined, model=model, max_tokens=max_tokens
)
if json_mode:
output.emit_ndjson({"type": "reduce", "model": model, "prompts": prompts, "output": result})
else:
output.emit_text(result)
4 changes: 2 additions & 2 deletions aai_cli/auth/flow.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from __future__ import annotations

import json
import sys
import webbrowser
from dataclasses import dataclass
Expand All @@ -9,6 +8,7 @@
from rich.markup import escape

from aai_cli.auth import ams, discovery, endpoints, loopback
from aai_cli.core import jsonshape
from aai_cli.core.errors import STDIN_KEY_RECIPE, APIError, NotAuthenticated
from aai_cli.ui import output

Expand Down Expand Up @@ -96,7 +96,7 @@ def _note(*, json_mode: bool, human: str, hint: str, url: str | None = None) ->
payload: dict[str, object] = {"hint": hint}
if url is not None:
payload["url"] = url
sys.stderr.write(json.dumps(payload) + "\n")
sys.stderr.write(jsonshape.dumps(payload) + "\n")
else:
output.error_console.print(human)

Expand Down
16 changes: 3 additions & 13 deletions aai_cli/commands/account.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from aai_cli import command_registry, help_panels, options
from aai_cli.app.context import AppState, run_command
from aai_cli.auth import ams
from aai_cli.core import jsonshape, timeparse
from aai_cli.core import choices, jsonshape, timeparse
from aai_cli.core.errors import UsageError
from aai_cli.ui import output
from aai_cli.ui.help_text import examples_epilog
Expand All @@ -34,11 +34,6 @@ def _utc_day_start(day: date) -> str:
return datetime(day.year, day.month, day.day, tzinfo=UTC).isoformat()


# The AMS usage endpoint's recognized window sizes; anything else is silently
# misinterpreted server-side, so reject it client-side as a usage error.
_USAGE_WINDOWS = ("day", "week", "month")


def _format_usage_number(value: object) -> str:
number = jsonshape.as_float(value)
if number.is_integer():
Expand Down Expand Up @@ -186,8 +181,8 @@ def usage(
None, "--start", help="Start date (YYYY-MM-DD). Default: 30d ago."
),
end: str | None = typer.Option(None, "--end", help="End date (YYYY-MM-DD). Default: today."),
window: str | None = typer.Option(
None, "--window", help="Window size: 'day', 'week', or 'month'"
window: choices.UsageWindow | None = typer.Option(
None, "--window", help="Aggregate usage by this window size"
),
include_zero: bool = typer.Option(
False,
Expand All @@ -211,11 +206,6 @@ def body(state: AppState, json_mode: bool) -> None:
f"--end {end_day.isoformat()} is before --start {start_day.isoformat()}.",
suggestion="Pick an end date on or after the start date.",
)
if window is not None and window not in _USAGE_WINDOWS:
raise UsageError(
f"Invalid --window {window!r}.",
suggestion=f"Use one of: {', '.join(_USAGE_WINDOWS)}.",
)
start_date = _utc_day_start(start_day)
end_date = _utc_day_start(end_day)
_, jwt = state.resolve_session()
Expand Down
2 changes: 0 additions & 2 deletions aai_cli/commands/audit.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,6 @@ def _is_login(entry: Mapping[str, object]) -> bool:
def _actor_label(entry: Mapping[str, object]) -> str:
actor_type = str(entry.get("actor_type") or "system")
actor_id = entry.get("actor_id")
if actor_type == "system":
return "system" if actor_id is None else f"system #{actor_id}"
return actor_type if actor_id is None else f"{actor_type} #{actor_id}"


Expand Down
54 changes: 5 additions & 49 deletions aai_cli/commands/transcripts.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from aai_cli import command_registry, help_panels, options
from aai_cli.app.context import AppState, run_command
from aai_cli.app.transcribe.run import render_transform_steps
from aai_cli.app.transform import emit_reduce, emit_transform
from aai_cli.core import choices, client, llm, stdio, timeparse
from aai_cli.core.errors import APIError, UsageError
from aai_cli.ui import output, theme
Expand Down Expand Up @@ -119,22 +119,6 @@ def _text_of(transcript: object) -> str:
return str(getattr(transcript, "text", "") or "")


def _emit_transform(
transcript: object,
model: str,
steps: list[dict[str, str]],
*,
json_mode: bool,
batch: bool,
) -> None:
"""Emit a transcript's ``--llm`` chain result: NDJSON per id in batch, else like `transcribe`."""
record = client.transcript_summary(transcript) | {"transform": {"model": model, "steps": steps}}
if json_mode and batch:
output.emit_ndjson({"type": "transcript", **record})
else:
output.emit(record, render_transform_steps, json_mode=json_mode)


def _deliver_transcript(
transcript: object,
api_key: str,
Expand Down Expand Up @@ -165,43 +149,13 @@ def _deliver_transcript(
api_key, chain, transcript_id=_id_of(transcript), model=model, max_tokens=max_tokens
)
if not suppress:
_emit_transform(transcript, model, steps, json_mode=json_mode, batch=batch)
emit_transform(transcript, model=model, steps=steps, json_mode=json_mode, batch=batch)
return steps[-1]["output"] if steps else ""
if not suppress:
_emit_transcript(transcript, json_mode=json_mode, batch=batch)
return _text_of(transcript)


def _run_reduce(
api_key: str,
contributions: list[tuple[str, str]],
*,
prompts: list[str],
model: str,
max_tokens: int,
json_mode: bool,
) -> None:
"""Run the ``--llm-reduce`` chain once over every fetched transcript; print to stdout.

Mirrors `transcribe`'s reduce: concatenate each id's contribution under a header,
skip the billable call when there's nothing to reduce, and emit the same additive
``{"type": "reduce", …}`` NDJSON record under --json.
"""
combined = "\n\n".join(f"### Transcript: {tid}\n{text}" for tid, text in contributions if text)
if not combined:
output.emit_warning(
"Nothing to reduce: no transcript text across ids.", json_mode=json_mode
)
return
result = llm.run_chain(
api_key, prompts, transcript_text=combined, model=model, max_tokens=max_tokens
)
if json_mode:
output.emit_ndjson({"type": "reduce", "model": model, "prompts": prompts, "output": result})
else:
output.emit_text(result)


@app.command(
epilog=examples_epilog(
[
Expand Down Expand Up @@ -312,12 +266,14 @@ def body(state: AppState, json_mode: bool) -> None:
)
contributions.append((tid, contribution))
if do_reduce:
_run_reduce(
emit_reduce(
api_key,
contributions,
prompts=reduce_prompts,
model=model,
max_tokens=max_tokens,
block_label="Transcript",
empty_noun="ids",
json_mode=json_mode,
)

Expand Down
8 changes: 8 additions & 0 deletions aai_cli/core/choices.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,14 @@ class ConfigKey(enum.StrEnum):
telemetry_enabled = "telemetry_enabled"


class UsageWindow(enum.StrEnum):
"""Window sizes the AMS usage endpoint aggregates by (`account usage --window`)."""

day = "day"
week = "week"
month = "month"


class ColorMode(enum.StrEnum):
"""The conventional tri-state for ANSI color (`--color`), matching git/gh/cargo."""

Expand Down
Loading
Loading