Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 2 additions & 101 deletions aai_cli/commands/evaluate/_exec.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,10 @@
from enum import StrEnum

import assemblyai as aai
from rich.console import RenderableType
from rich.markup import escape

from aai_cli.app.context import AppState
from aai_cli.commands.evaluate import _data as eval_data
from aai_cli.commands.evaluate import _render
from aai_cli.core import client, jsonshape, wer
from aai_cli.core import llm as gateway
from aai_cli.core.errors import CLIError, NotAuthenticated
Expand Down Expand Up @@ -79,15 +78,6 @@ class _LlmOptions:
max_tokens: int


def _pct(value: object) -> str:
return f"{jsonshape.as_float(value):.2%}"


def _secs(value: object) -> str:
"""A latency in seconds, formatted for display."""
return f"{jsonshape.as_float(value):.2f}s"


def _percentile(values: list[float], q: float) -> float:
"""The q-quantile (q in [0, 1]) of ``values``, linearly interpolated between
the two closest ranks (numpy's default method). ``values`` must be non-empty."""
Expand Down Expand Up @@ -334,95 +324,6 @@ def _payload(
return payload


def _summary(payload: dict[str, object]) -> str:
parts: list[str] = []
if "wer" in payload:
errors = jsonshape.as_int(payload.get("errors"))
noun = "error" if errors == 1 else "errors"
parts.append(
f"WER {_pct(payload.get('wer'))} ({errors} {noun} / {payload.get('words')} words)"
)
if "latency_p50" in payload:
parts.append(
f"latency p50 {_secs(payload.get('latency_p50'))}"
f" · p90 {_secs(payload.get('latency_p90'))}"
)
return output.heading(" ".join(parts))


def _cell(row: dict[str, object], key: str) -> str:
"""The row's value as table text — blank when absent (e.g. a failed row's scores)."""
return str(row[key]) if key in row else ""


def _pct_cell(row: dict[str, object], key: str) -> str:
return _pct(row[key]) if key in row else ""


def _secs_cell(row: dict[str, object], key: str) -> str:
return _secs(row[key]) if key in row else ""


def _final_llm_output(row: dict[str, object]) -> str | None:
"""A row's last ``--llm`` step output, or ``None`` when no chain ran on it."""
llm_data = jsonshape.as_mapping(row.get("llm"))
if llm_data is None:
return None
steps = jsonshape.mapping_list(llm_data.get("steps"))
return str(steps[-1].get("output", "") or "") if steps else ""


def _llm_block(payload: dict[str, object]) -> str | None:
"""The per-item ``--llm`` outputs as a heading + one ``item: output`` line each,
or ``None`` when no ``--llm`` chain ran."""
lines: list[str] = []
for row in jsonshape.mapping_list(payload.get("rows")):
final = _final_llm_output(row)
if final is not None:
lines.append(f"{escape(str(row.get('item')))}: {escape(final)}")
if not lines:
return None
return "\n".join([output.heading("--llm"), *lines])


def _reduce_block(payload: dict[str, object]) -> str | None:
"""The ``--llm-reduce`` aggregate as a heading + the output, or ``None`` when unset."""
reduce = jsonshape.as_mapping(payload.get("reduce"))
if reduce is None:
return None
return f"{output.heading('--llm-reduce')}\n{escape(str(reduce.get('output', '')))}"


def _render(payload: dict[str, object]) -> RenderableType:
has_wer = "wer" in payload
has_failed = "failed" in payload
has_latency = "latency_p50" in payload
columns = [
"ITEM",
*(["WORDS", "ERRORS", "WER"] if has_wer else []),
*(["LATENCY"] if has_latency else []),
*(["ERROR"] if has_failed else []),
]
table = output.data_table(*columns)
for row in jsonshape.mapping_list(payload.get("rows")):
cells = [str(row.get("item"))]
if has_wer:
cells += [_cell(row, "words"), _cell(row, "errors"), _pct_cell(row, "wer")]
if has_latency:
cells.append(_secs_cell(row, "latency"))
if has_failed:
cells.append(_cell(row, "error"))
table.add_row(*cells)
model = payload.get("speech_model") or "default model"
return output.stack(
output.muted(f"{payload.get('dataset')} · {model}"),
table,
_summary(payload),
_llm_block(payload),
_reduce_block(payload),
)


def _evaluate_one(
dataset: str, api_key: str, opts: EvalOptions, state: AppState, *, json_mode: bool
) -> dict[str, object]:
Expand Down Expand Up @@ -477,7 +378,7 @@ def run_evaluate(opts: EvalOptions, state: AppState, *, json_mode: bool) -> None
total = 0
for dataset in opts.datasets:
payload = _evaluate_one(dataset, api_key, opts, state, json_mode=json_mode)
output.emit(payload, _render, json_mode=json_mode)
output.emit(payload, _render.render, json_mode=json_mode)
failed += jsonshape.as_int(payload.get("failed"))
total += jsonshape.as_int(payload.get("items"))
if failed:
Expand Down
113 changes: 113 additions & 0 deletions aai_cli/commands/evaluate/_render.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
"""Human-mode rendering for `assembly eval`: turn an emitted payload into a table.

Split out of ``_exec`` so the scoring/orchestration path and the Rich rendering
stay in separate files. Every function here reads only the already-emitted payload
dict (plus its rows), so it shares no state with the run path — the `--json`
output is the payload verbatim, and this module is what `-o`/human mode renders.
"""

from __future__ import annotations

from rich.console import RenderableType
from rich.markup import escape

from aai_cli.core import jsonshape
from aai_cli.ui import output


def _pct(value: object) -> str:
return f"{jsonshape.as_float(value):.2%}"


def _secs(value: object) -> str:
"""A latency in seconds, formatted for display."""
return f"{jsonshape.as_float(value):.2f}s"


def _summary(payload: dict[str, object]) -> str:
parts: list[str] = []
if "wer" in payload:
errors = jsonshape.as_int(payload.get("errors"))
noun = "error" if errors == 1 else "errors"
parts.append(
f"WER {_pct(payload.get('wer'))} ({errors} {noun} / {payload.get('words')} words)"
)
if "latency_p50" in payload:
parts.append(
f"latency p50 {_secs(payload.get('latency_p50'))}"
f" · p90 {_secs(payload.get('latency_p90'))}"
)
return output.heading(" ".join(parts))


def _cell(row: dict[str, object], key: str) -> str:
"""The row's value as table text — blank when absent (e.g. a failed row's scores)."""
return str(row[key]) if key in row else ""


def _pct_cell(row: dict[str, object], key: str) -> str:
return _pct(row[key]) if key in row else ""


def _secs_cell(row: dict[str, object], key: str) -> str:
return _secs(row[key]) if key in row else ""


def _final_llm_output(row: dict[str, object]) -> str | None:
"""A row's last ``--llm`` step output, or ``None`` when no chain ran on it."""
llm_data = jsonshape.as_mapping(row.get("llm"))
if llm_data is None:
return None
steps = jsonshape.mapping_list(llm_data.get("steps"))
return str(steps[-1].get("output", "") or "") if steps else ""


def _llm_block(payload: dict[str, object]) -> str | None:
"""The per-item ``--llm`` outputs as a heading + one ``item: output`` line each,
or ``None`` when no ``--llm`` chain ran."""
lines: list[str] = []
for row in jsonshape.mapping_list(payload.get("rows")):
final = _final_llm_output(row)
if final is not None:
lines.append(f"{escape(str(row.get('item')))}: {escape(final)}")
if not lines:
return None
return "\n".join([output.heading("--llm"), *lines])


def _reduce_block(payload: dict[str, object]) -> str | None:
"""The ``--llm-reduce`` aggregate as a heading + the output, or ``None`` when unset."""
reduce = jsonshape.as_mapping(payload.get("reduce"))
if reduce is None:
return None
return f"{output.heading('--llm-reduce')}\n{escape(str(reduce.get('output', '')))}"


def render(payload: dict[str, object]) -> RenderableType:
has_wer = "wer" in payload
has_failed = "failed" in payload
has_latency = "latency_p50" in payload
columns = [
"ITEM",
*(["WORDS", "ERRORS", "WER"] if has_wer else []),
*(["LATENCY"] if has_latency else []),
*(["ERROR"] if has_failed else []),
]
table = output.data_table(*columns)
for row in jsonshape.mapping_list(payload.get("rows")):
cells = [str(row.get("item"))]
if has_wer:
cells += [_cell(row, "words"), _cell(row, "errors"), _pct_cell(row, "wer")]
if has_latency:
cells.append(_secs_cell(row, "latency"))
if has_failed:
cells.append(_cell(row, "error"))
table.add_row(*cells)
model = payload.get("speech_model") or "default model"
return output.stack(
output.muted(f"{payload.get('dataset')} · {model}"),
table,
_summary(payload),
_llm_block(payload),
_reduce_block(payload),
)
118 changes: 3 additions & 115 deletions aai_cli/commands/stream/_exec.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,18 @@
import tempfile
from collections.abc import Iterable
from dataclasses import dataclass
from datetime import UTC, datetime
from pathlib import Path

from assemblyai import PIISubstitutionPolicy
from assemblyai.streaming.v3 import Encoding, NoiseSuppressionModel, SpeechModel

from aai_cli import code_gen
from aai_cli.app.context import AppState
from aai_cli.commands.stream import _save
from aai_cli.core import choices, client, config_builder, signals, stdio, youtube
from aai_cli.core.errors import UsageError, mutually_exclusive
from aai_cli.core.microphone import MicrophoneSource
from aai_cli.streaming import naming, record, savedir, transcript, turn_presets
from aai_cli.streaming import turn_presets
from aai_cli.streaming.batch import stream_batch_sources
from aai_cli.streaming.macos import MacSystemAudioSource
from aai_cli.streaming.render import StreamRenderer
Expand Down Expand Up @@ -196,118 +196,6 @@ def _reject_save_with_show_code(opts: StreamOptions) -> None:
)


@dataclass(frozen=True)
class SaveTargets:
"""Resolved save destinations for one streaming run.

``audio`` tees a single source to one WAV; ``audio_by_label`` instead maps each
parallel ``--system-audio`` channel ("you", "system") to its own WAV when the two
streams can't share a file. At most one of the two is set; ``transcript`` is the
single shared transcript either way. ``plan`` is set only under ``--save-dir`` and
carries the post-stream finalization (auto-name rename, ``--llm`` note, sidecar).
"""

transcript: Path | None = None
audio: Path | None = None
audio_by_label: dict[str, Path] | None = None
plan: savedir.SaveDirPlan | None = None


def _save_dir_targets(opts: StreamOptions, sources: SourceOptions, save_dir: Path) -> SaveTargets:
"""Resolve ``--save-dir`` into auto-named targets plus the finalization plan.

``--save-dir`` owns filename assembly, so it rejects the explicit
``--save-audio``/``--save-transcript`` paths and the conflicting ``--name``/
``--auto-name`` title pair. Two parallel ``--system-audio`` streams can't tee to one
WAV, so each channel gets its own ``<stem>-{you,system}.wav`` (one shared transcript);
``--no-save-audio`` drops the WAV(s) entirely.
"""
mutually_exclusive(
("--save-dir", True),
("--save-audio", opts.save_audio is not None),
("--save-transcript", opts.save_transcript is not None),
suggestion="--save-dir names the files for you; drop the explicit path.",
)
mutually_exclusive(
("--name", opts.name is not None),
("--auto-name", opts.auto_name),
suggestion="Both set the title — pass --name for an explicit one or "
"--auto-name to derive it from the transcript.",
)
# Local wall-clock time (what a meeting filename wants); the explicit utc-then-
# astimezone keeps the now() call timezone-aware for the linter.
now = datetime.now(UTC).astimezone()
plan = savedir.SaveDirPlan(
save_dir=save_dir,
now=now,
name=opts.name,
auto_name=opts.auto_name,
write_note=bool(opts.llm_prompt),
)
paths = plan.paths
naming.ensure_dir(paths.directory)
if opts.no_save_audio:
# Transcript + sidecar (+ note) only; no WAV teed for any source.
return SaveTargets(transcript=paths.transcript, plan=plan)
if sources.system_audio:
# Parallel mic + system: one WAV per channel beside the shared transcript.
return SaveTargets(
transcript=paths.transcript,
audio_by_label={
"you": naming.channel_audio(paths.audio, "you"),
"system": naming.channel_audio(paths.audio, "system"),
},
plan=plan,
)
if sources.system_audio_only:
# A lone system-audio stream; label its single WAV so it reads like the pair.
return SaveTargets(
transcript=paths.transcript,
audio=naming.channel_audio(paths.audio, "system"),
plan=plan,
)
return SaveTargets(transcript=paths.transcript, audio=paths.audio, plan=plan)


def _resolve_save_targets(opts: StreamOptions, sources: SourceOptions) -> SaveTargets:
"""Resolve the save flags into the destinations the session writes.

``--save-dir`` owns filename assembly (see ``_save_dir_targets``); the explicit
``--save-audio``/``--save-transcript`` paths are the fallback, with the save-dir-only
``--name``/``--auto-name``/``--no-save-audio`` flags rejected outside it.
"""
if opts.save_dir is not None:
return _save_dir_targets(opts, sources, opts.save_dir)
if opts.name is not None:
raise UsageError(
"--name applies only with --save-dir.",
suggestion="Pass --save-dir DIR to auto-name the files, "
"or --save-transcript PATH for an explicit path.",
)
if opts.auto_name:
raise UsageError(
"--auto-name applies only with --save-dir.",
suggestion="Pass --save-dir DIR so there's an auto-named file to title.",
)
if opts.no_save_audio:
raise UsageError(
"--no-save-audio applies only with --save-dir.",
suggestion="Omit --save-audio to skip the WAV, or pass --save-dir DIR.",
)
if opts.save_audio is not None:
if sources.system_audio:
raise UsageError(
"--save-audio cannot be combined with --system-audio; the mic and system "
"streams can't share one file.",
suggestion="Pass --save-dir DIR to save one WAV per channel, "
"or record a single source.",
)
record.validate_target(opts.save_audio)
if opts.save_transcript is not None:
transcript.validate_target(opts.save_transcript)
return SaveTargets(transcript=opts.save_transcript, audio=opts.save_audio)


def _dispatch(session: StreamSession, opts: SourceOptions) -> None:
"""Open the right audio source(s) for the flags and stream them."""
if opts.from_system_audio:
Expand Down Expand Up @@ -455,7 +343,7 @@ def run_stream(opts: StreamOptions, state: AppState, *, json_mode: bool) -> None
# Validate the requested sources (including that a local file exists) before
# credentials, so a typo'd path reads as "file not found" — not as a login.
validate_sources(sources, has_llm=bool(opts.llm_prompt), text_mode=text_mode)
targets = _resolve_save_targets(opts, sources)
targets = _save.resolve_save_targets(opts, sources)
if sources.from_file and not sources.from_stdin:
client.resolve_audio_source(sources.source, sample=sources.sample)
api_key = state.resolve_api_key()
Expand Down
Loading
Loading