diff --git a/REFERENCE.md b/REFERENCE.md index 84ff8365..e9226be4 100644 --- a/REFERENCE.md +++ b/REFERENCE.md @@ -116,3 +116,26 @@ object per dataset (not NDJSON; a single dataset is therefore one object): the row's `llm` key (the WER score still uses the raw transcript), and `--llm-reduce` runs one prompt over every item's result and adds a top-level `reduce` (`{"model","prompts","output"}`) to the object. + +## Recording streams to disk + +`assembly stream --save-dir DIR` auto-names a capture under `DIR/YYYY-MM-DD/` +with a timestamped stem (`YYYY-MM-DD-HHMMSS[-slug]`) shared across every file it +writes: + +- `.txt` — the transcript, one finalized turn per line (flushed live). +- `.wav` — the recorded audio, 16-bit mono PCM. Suppress it with + `--no-save-audio` to keep only the text. Under `--system-audio` the two channels + can't share a file, so each gets its own `-you.wav` / `-system.wav`. +- `.md` — written when `--llm "…"` is also passed: the final answer of the + live prompt chain, captured as a note next to the transcript. +- `.aai.json` — a metadata sidecar so a list/browse UI needs no transcript + parsing: `{"title", "date", "duration_seconds", "speakers", "turns", + "transcript", "audio", "note"}`. `audio` is the list of WAV file names (empty + under `--no-save-audio`, two entries under `--system-audio`); `note` is `null` + when no `--llm` note was written. + +`--name "Title"` slugs an explicit title into the stem; `--auto-name` instead +derives that title from the transcript via the LLM Gateway once the stream ends, +renaming the files to match (the timestamp stem is kept if the title is empty). +The two are mutually exclusive. diff --git a/aai_cli/commands/agent/_exec.py b/aai_cli/commands/agent/_exec.py index 9434e918..814154c8 100644 --- a/aai_cli/commands/agent/_exec.py +++ b/aai_cli/commands/agent/_exec.py @@ -24,8 +24,8 @@ from aai_cli.app.context import AppState from aai_cli.core import choices, client, errors, signals from aai_cli.core.errors import UsageError -from aai_cli.streaming.session import resolve_output_modes from aai_cli.streaming.sources import FileSource +from aai_cli.streaming.validate import resolve_output_modes from aai_cli.ui import output diff --git a/aai_cli/commands/agent_cascade/_exec.py b/aai_cli/commands/agent_cascade/_exec.py index b2340511..022affd3 100644 --- a/aai_cli/commands/agent_cascade/_exec.py +++ b/aai_cli/commands/agent_cascade/_exec.py @@ -25,8 +25,8 @@ from aai_cli.core import choices, client, config_builder, errors, llm, signals from aai_cli.core.errors import UsageError from aai_cli.streaming import turn_presets -from aai_cli.streaming.session import resolve_output_modes from aai_cli.streaming.sources import FileSource +from aai_cli.streaming.validate import resolve_output_modes from aai_cli.tts import session as tts_session from aai_cli.ui import output diff --git a/aai_cli/commands/dictate/_exec.py b/aai_cli/commands/dictate/_exec.py index 73c6307a..bcdb0f67 100644 --- a/aai_cli/commands/dictate/_exec.py +++ b/aai_cli/commands/dictate/_exec.py @@ -19,7 +19,7 @@ from aai_cli.core.config_builder import split_csv from aai_cli.core.hotkey import CTRL_C, CTRL_D, ESC, TerminalKeys from aai_cli.core.microphone import MicrophoneSource -from aai_cli.streaming.session import resolve_output_modes +from aai_cli.streaming.validate import resolve_output_modes from aai_cli.ui import output # Capture is resampled to one rate the Sync API accepts; 16 kHz mono PCM16 keeps diff --git a/aai_cli/commands/stream/__init__.py b/aai_cli/commands/stream/__init__.py index fcfeb5d4..64b4b38b 100644 --- a/aai_cli/commands/stream/__init__.py +++ b/aai_cli/commands/stream/__init__.py @@ -39,6 +39,10 @@ "Auto-name the transcript + WAV under a dir", 'assembly stream --save-dir ~/recordings --name "Standup"', ), + ( + "Name from content + save a summary note", + 'assembly stream --save-dir ~/recordings --auto-name --llm "summarize as a note"', + ), ( "Boost domain terms with keyterm prompts", 'assembly stream --keyterms-prompt "AssemblyAI" --keyterms-prompt "Claude"', @@ -121,6 +125,18 @@ def stream( help="Title to slug into the --save-dir filename (e.g. a meeting title)", rich_help_panel=help_panels.OPT_SAVING, ), + auto_name: bool = typer.Option( + False, + "--auto-name", + help="With --save-dir, derive the filename from the transcript via the LLM", + rich_help_panel=help_panels.OPT_SAVING, + ), + no_save_audio: bool = typer.Option( + False, + "--no-save-audio", + help="With --save-dir, skip the WAV and save only the transcript", + rich_help_panel=help_panels.OPT_SAVING, + ), # model & input speech_model: SpeechModel = typer.Option( DEFAULT_SPEECH_MODEL, @@ -398,5 +414,7 @@ def stream( save_transcript=save_transcript, save_dir=save_dir, name=name, + auto_name=auto_name, + no_save_audio=no_save_audio, ) run_with_options(ctx, stream_exec.run_stream, opts, json=json_out) diff --git a/aai_cli/commands/stream/_exec.py b/aai_cli/commands/stream/_exec.py index 0f2d10b5..9327574b 100644 --- a/aai_cli/commands/stream/_exec.py +++ b/aai_cli/commands/stream/_exec.py @@ -23,18 +23,14 @@ from aai_cli.core import choices, client, config_builder, signals, stdio, youtube from aai_cli.core.errors import UsageError, mutually_exclusive from aai_cli.core.microphone import MicrophoneSource -from aai_cli.streaming import naming, record, transcript, turn_presets +from aai_cli.streaming import naming, record, savedir, transcript, turn_presets +from aai_cli.streaming.batch import stream_batch_sources from aai_cli.streaming.macos import MacSystemAudioSource from aai_cli.streaming.render import StreamRenderer -from aai_cli.streaming.session import ( - SourceOptions, - StreamSession, - resolve_output_modes, - stream_batch_sources, - validate_sources, -) +from aai_cli.streaming.session import StreamSession from aai_cli.streaming.sources import TARGET_RATE, FileSource, StdinSource from aai_cli.streaming.turn_presets import TurnDetectionPreset +from aai_cli.streaming.validate import SourceOptions, resolve_output_modes, validate_sources from aai_cli.ui import output from aai_cli.ui.follow import FollowRenderer @@ -90,6 +86,8 @@ class StreamOptions: save_transcript: Path | None save_dir: Path | None name: str | None + auto_name: bool + no_save_audio: bool def source_options(self) -> SourceOptions: """The audio-input subset, in the shape the validation/dispatch helpers read.""" @@ -205,57 +203,97 @@ class SaveTargets: ``audio`` tees a single source to one WAV; ``audio_by_label`` instead maps each parallel ``--system-audio`` channel ("you", "system") to its own WAV when the two streams can't share a file. At most one of the two is set; ``transcript`` is the - single shared transcript either way. + single shared transcript either way. ``plan`` is set only under ``--save-dir`` and + carries the post-stream finalization (auto-name rename, ``--llm`` note, sidecar). """ transcript: Path | None = None audio: Path | None = None audio_by_label: dict[str, Path] | None = None + plan: savedir.SaveDirPlan | None = None + + +def _save_dir_targets(opts: StreamOptions, sources: SourceOptions, save_dir: Path) -> SaveTargets: + """Resolve ``--save-dir`` into auto-named targets plus the finalization plan. + + ``--save-dir`` owns filename assembly, so it rejects the explicit + ``--save-audio``/``--save-transcript`` paths and the conflicting ``--name``/ + ``--auto-name`` title pair. Two parallel ``--system-audio`` streams can't tee to one + WAV, so each channel gets its own ``-{you,system}.wav`` (one shared transcript); + ``--no-save-audio`` drops the WAV(s) entirely. + """ + mutually_exclusive( + ("--save-dir", True), + ("--save-audio", opts.save_audio is not None), + ("--save-transcript", opts.save_transcript is not None), + suggestion="--save-dir names the files for you; drop the explicit path.", + ) + mutually_exclusive( + ("--name", opts.name is not None), + ("--auto-name", opts.auto_name), + suggestion="Both set the title — pass --name for an explicit one or " + "--auto-name to derive it from the transcript.", + ) + # Local wall-clock time (what a meeting filename wants); the explicit utc-then- + # astimezone keeps the now() call timezone-aware for the linter. + now = datetime.now(UTC).astimezone() + plan = savedir.SaveDirPlan( + save_dir=save_dir, + now=now, + name=opts.name, + auto_name=opts.auto_name, + write_note=bool(opts.llm_prompt), + ) + paths = plan.paths + naming.ensure_dir(paths.directory) + if opts.no_save_audio: + # Transcript + sidecar (+ note) only; no WAV teed for any source. + return SaveTargets(transcript=paths.transcript, plan=plan) + if sources.system_audio: + # Parallel mic + system: one WAV per channel beside the shared transcript. + return SaveTargets( + transcript=paths.transcript, + audio_by_label={ + "you": naming.channel_audio(paths.audio, "you"), + "system": naming.channel_audio(paths.audio, "system"), + }, + plan=plan, + ) + if sources.system_audio_only: + # A lone system-audio stream; label its single WAV so it reads like the pair. + return SaveTargets( + transcript=paths.transcript, + audio=naming.channel_audio(paths.audio, "system"), + plan=plan, + ) + return SaveTargets(transcript=paths.transcript, audio=paths.audio, plan=plan) def _resolve_save_targets(opts: StreamOptions, sources: SourceOptions) -> SaveTargets: """Resolve the save flags into the destinations the session writes. - ``--save-dir`` owns filename assembly — it auto-names the transcript and a matching - WAV under ``DIR/YYYY-MM-DD/`` — so it can't be combined with the explicit - ``--save-audio``/``--save-transcript`` paths, and ``--name`` only feeds that assembly. - Two parallel ``--system-audio`` streams can't tee to one WAV, so under ``--save-dir`` - each channel gets its own ``-{you,system}.wav`` (one shared transcript), and the - explicit single-path ``--save-audio`` is rejected outright. + ``--save-dir`` owns filename assembly (see ``_save_dir_targets``); the explicit + ``--save-audio``/``--save-transcript`` paths are the fallback, with the save-dir-only + ``--name``/``--auto-name``/``--no-save-audio`` flags rejected outside it. """ if opts.save_dir is not None: - mutually_exclusive( - ("--save-dir", True), - ("--save-audio", opts.save_audio is not None), - ("--save-transcript", opts.save_transcript is not None), - suggestion="--save-dir names the files for you; drop the explicit path.", - ) - # Local wall-clock time (what a meeting filename wants); the explicit utc-then- - # astimezone keeps the now() call timezone-aware for the linter. - now = datetime.now(UTC).astimezone() - paths = naming.resolve(opts.save_dir, opts.name, now=now) - naming.ensure_dir(paths.transcript.parent) - if sources.system_audio: - # Parallel mic + system: one WAV per channel beside the shared transcript. - return SaveTargets( - transcript=paths.transcript, - audio_by_label={ - "you": naming.channel_audio(paths.audio, "you"), - "system": naming.channel_audio(paths.audio, "system"), - }, - ) - if sources.system_audio_only: - # A lone system-audio stream; label its single WAV so it reads like the pair. - return SaveTargets( - transcript=paths.transcript, audio=naming.channel_audio(paths.audio, "system") - ) - return SaveTargets(transcript=paths.transcript, audio=paths.audio) + return _save_dir_targets(opts, sources, opts.save_dir) if opts.name is not None: raise UsageError( "--name applies only with --save-dir.", suggestion="Pass --save-dir DIR to auto-name the files, " "or --save-transcript PATH for an explicit path.", ) + if opts.auto_name: + raise UsageError( + "--auto-name applies only with --save-dir.", + suggestion="Pass --save-dir DIR so there's an auto-named file to title.", + ) + if opts.no_save_audio: + raise UsageError( + "--no-save-audio applies only with --save-dir.", + suggestion="Omit --save-audio to skip the WAV, or pass --save-dir DIR.", + ) if opts.save_audio is not None: if sources.system_audio: raise UsageError( @@ -343,6 +381,8 @@ def _collect_batch_sources(opts: StreamOptions, *, text_mode: bool) -> list[str] ("--save-transcript", opts.save_transcript is not None), ("--save-dir", opts.save_dir is not None), ("--name", opts.name is not None), + ("--auto-name", opts.auto_name), + ("--no-save-audio", opts.no_save_audio), suggestion="--from-stdin streams many sources; saving applies to a single run.", ) mutually_exclusive( @@ -434,6 +474,7 @@ def run_stream(opts: StreamOptions, state: AppState, *, json_mode: bool) -> None save_audio=targets.audio, save_audio_by_label=targets.audio_by_label, save_transcript=targets.transcript, + save_plan=targets.plan, llm_interval=opts.llm_interval, ) with signals.terminate_as_interrupt(): diff --git a/aai_cli/streaming/batch.py b/aai_cli/streaming/batch.py new file mode 100644 index 00000000..851984e5 --- /dev/null +++ b/aai_cli/streaming/batch.py @@ -0,0 +1,107 @@ +"""Drive a ``assembly stream --from-stdin`` list of sources, one realtime session each. + +The realtime API is one session at a time, so a list of files/URLs (read on stdin, +one per line) streams sequentially. This lives beside ``StreamSession`` rather than +inside it: a session owns *one* run, while this owns the sequence — fresh session per +source, per-source failure accounting, and the batch-wide Ctrl-C/pipe handling. +""" + +from __future__ import annotations + +from collections.abc import Callable, Iterable + +import typer + +from aai_cli.core.errors import CANCELLED_EXIT_CODE, CLIError, NotAuthenticated +from aai_cli.streaming.render import StreamRenderer +from aai_cli.streaming.session import StreamSession +from aai_cli.ui import output + +# A batch source string resolved to its real-time audio chunks and declared rate. +_OpenedSource = tuple[Iterable[bytes], int] + + +def _stream_source( + source: str, + *, + index: int, + total: int, + make_session: Callable[[], StreamSession], + open_source: Callable[[str], _OpenedSource], + renderer: StreamRenderer, + json_mode: bool, +) -> bool: + """Stream one batch source in its own session; return True when it failed. + + A ``CLIError`` (bad path, missing ffmpeg, decode failure) is recorded as a warning + so the batch carries on — except ``NotAuthenticated``, which re-raises to abort the + whole batch (one rejected key fails every source identically, and auto-login should + trigger once). + """ + renderer.source(source, index=index, total=total) + try: + audio, rate = open_source(source) + # handle_interrupt=False: let a Ctrl-C/pipe close bubble to the batch loop below so + # one interrupt stops the whole sequence. Flipping it to True is behavior-equivalent + # here (the session would convert the same interrupt to the same Exit(130)/Exit(0)), + # so no test can distinguish it. + make_session().run(audio, rate, handle_interrupt=False) # pragma: no mutate + except NotAuthenticated: + raise + except CLIError as exc: + # Flatten newlines so a crafted path/URL can't inject extra log lines (CR/LF). + detail = f"{source}: {exc.message}".replace("\n", " ").replace("\r", " ") + output.emit_warning(detail, json_mode=json_mode) + return True + else: + return False + + +def stream_batch_sources( + sources: list[str], + *, + make_session: Callable[[], StreamSession], + open_source: Callable[[str], _OpenedSource], + renderer: StreamRenderer, + json_mode: bool, +) -> None: + """Stream each source in ``sources`` in turn — the ``assembly stream --from-stdin`` + batch mode. + + The realtime API is one session at a time, so a list of files/URLs streams + sequentially: each source gets a fresh ``StreamSession`` from ``make_session`` (its + own transcript and ``--llm`` chain state) via ``_stream_source``. + + A Ctrl-C stops the batch with the cancel code (exit 130); a closed downstream pipe + stops it quietly (exit 0). When any source failed, raises a ``CLIError`` at the end + so a script can trust the exit code. + """ + total = len(sources) + failures = 0 + try: + for index, source in enumerate(sources, start=1): + failures += _stream_source( + source, + index=index, + total=total, + make_session=make_session, + open_source=open_source, + renderer=renderer, + json_mode=json_mode, + ) + except KeyboardInterrupt: + # One Ctrl-C stops the whole batch, not just the current source. Exit 130 + # (cancel) so the interrupt isn't mistaken for a clean run of every source. + renderer.stopped() + raise typer.Exit(code=CANCELLED_EXIT_CODE) from None + except BrokenPipeError: + # Downstream consumer (e.g. `| head`) closed the pipe; stop quietly. + raise typer.Exit(code=0) from None + finally: + renderer.close() + if failures: + raise CLIError( + f"{failures} of {total} sources failed.", + error_type="batch_failed", + suggestion="Check each failed path or URL, then re-run.", + ) diff --git a/aai_cli/streaming/naming.py b/aai_cli/streaming/naming.py index c1d2fc13..29bb0b6a 100644 --- a/aai_cli/streaming/naming.py +++ b/aai_cli/streaming/naming.py @@ -41,24 +41,49 @@ def _stem(now: datetime, name: str | None) -> str: return f"{stamp}-{slug}" if slug else stamp +# The sidecar's extension, mirroring batch transcribe's ``.aai.json`` so a +# browse/list UI can recognize a stream recording's metadata file the same way. +SIDECAR_SUFFIX = ".aai.json" + + @dataclass(frozen=True) class SavePaths: - """The auto-assembled transcript path and its matching audio path (same stem).""" + """The auto-assembled output paths for one recording, all sharing a stem. + + The transcript ``.txt`` and matching audio ``.wav`` plus the optional ``--llm`` + note ``.md`` and the ``.aai.json`` metadata sidecar — each is derived from the + same ``directory``/``stem`` so they land together and stay matchable by name. + """ + + directory: Path + stem: str + + @property + def transcript(self) -> Path: + return self.directory / f"{self.stem}.txt" + + @property + def audio(self) -> Path: + return self.directory / f"{self.stem}.wav" + + @property + def note(self) -> Path: + return self.directory / f"{self.stem}.md" - transcript: Path - audio: Path + @property + def sidecar(self) -> Path: + return self.directory / f"{self.stem}{SIDECAR_SUFFIX}" def resolve(save_dir: Path, name: str | None, *, now: datetime) -> SavePaths: - """Build ``DIR/YYYY-MM-DD/.{txt,wav}`` for ``now`` and ``--name``. + """Build the ``DIR/YYYY-MM-DD/`` paths for ``now`` and ``--name``. The date bucket and the stem both carry the date so a transcript stays self-describing if it is later moved out of its bucket. Path assembly only — creating the directory is the caller's job (see ``ensure_dir``). """ bucket = save_dir / now.strftime("%Y-%m-%d") - stem = _stem(now, name) - return SavePaths(transcript=bucket / f"{stem}.txt", audio=bucket / f"{stem}.wav") + return SavePaths(directory=bucket, stem=_stem(now, name)) def channel_audio(audio: Path, channel: str) -> Path: diff --git a/aai_cli/streaming/savedir.py b/aai_cli/streaming/savedir.py new file mode 100644 index 00000000..a3886ff9 --- /dev/null +++ b/aai_cli/streaming/savedir.py @@ -0,0 +1,184 @@ +"""The `assembly stream --save-dir` capture lifecycle: auto-name, note, and sidecar. + +``--save-dir`` already auto-names a transcript + WAV under ``DIR/YYYY-MM-DD/`` +(see ``naming``). This module folds the steps a wrapper script used to bolt on +afterwards into capture time, so nothing downstream needs an index pass: + +- ``--auto-name`` derives the filename slug from the transcript itself (via the + LLM), so a recording is meaningfully named with no calendar or manual title. +- ``--llm`` alongside ``--save-dir`` writes its final answer as a ``.md`` note + next to the transcript — a summary produced as the audio is captured. +- a ``.aai.json`` sidecar (title, date, duration, speakers, turns) lands beside + every recording so a list/browse UI shows rich info without parsing transcripts. + +``write_outputs`` is pure file I/O (no network), so the rename/note/sidecar +behavior is unit-tested without a gateway; the LLM title call lives in +``derive_title`` and is injected past it. +""" + +from __future__ import annotations + +import json +from dataclasses import dataclass +from datetime import datetime +from pathlib import Path + +from aai_cli.core import llm +from aai_cli.core.errors import CLIError +from aai_cli.streaming import naming + +# Asks for a short headline only — kept terse so a small/cheap model returns a clean +# line we can slug, not a paragraph. The transcript is appended by build_messages. +TITLE_PROMPT = ( + "Write a short, descriptive title (3 to 7 words) for this transcript. " + "Reply with only the title — no quotes, no surrounding punctuation." +) + + +@dataclass(frozen=True) +class SaveDirPlan: + """The resolved ``--save-dir`` intent, the data ``write_outputs`` finalizes from. + + ``now`` is captured once up front so the live transcript file, the post-stream + rename, and the sidecar's ``date`` all agree on a single timestamp. + """ + + save_dir: Path + now: datetime + name: str | None + auto_name: bool + write_note: bool + + @property + def paths(self) -> naming.SavePaths: + """The provisional paths the live run writes to (before any ``--auto-name`` rename).""" + return naming.resolve(self.save_dir, self.name, now=self.now) + + +def derive_title(api_key: str, transcript_text: str, *, model: str, max_tokens: int) -> str: + """Ask the LLM for a short headline for ``transcript_text`` (the ``--auto-name`` title). + + Returns the raw title text; ``naming.resolve`` slugs it into the filename, so an + unusable answer (all punctuation) simply collapses to the bare timestamp stem. + """ + return llm.run_chain( + api_key, + [TITLE_PROMPT], + transcript_text=transcript_text, + model=model, + max_tokens=max_tokens, + ).strip() + + +def _rename(src: Path, dst: Path) -> None: + """Move a provisional capture file to its final auto-named path, as a clean CLIError.""" + try: + src.rename(dst) + except OSError as exc: + raise CLIError( + f"Cannot rename {src} to {dst}: {exc}", + error_type="save_dir_path", + exit_code=2, + ) from exc + + +def _restem(path: Path, old_stem: str, new_stem: str) -> Path: + """Swap a capture file's stem prefix for the auto-name rename. + + Every capture file is named ```` (``.txt``, ``.wav``, ``-you.wav``, …), + so re-stemming preserves the per-channel/extension suffix while adopting the new + auto-named stem. + """ + return path.with_name(new_stem + path.name[len(old_stem) :]) + + +def _move_restem(src: Path, old_stem: str, new_stem: str) -> Path: + """Re-stem ``src`` to the auto-named stem and move it there, returning the new path. + + A stream interrupted before any audio leaves no WAV to move, so a missing source is + skipped (its intended new name is still reported in the sidecar). + """ + dst = _restem(src, old_stem, new_stem) + if src.exists(): + _rename(src, dst) + return dst + + +def _write(path: Path, text: str) -> None: + """Write ``text`` to ``path`` (the note or sidecar), as a clean CLIError on failure.""" + try: + path.write_text(text, encoding="utf-8") + except OSError as exc: + raise CLIError( + f"Cannot write {path}: {exc}", + error_type="save_dir_path", + exit_code=2, + ) from exc + + +def _sidecar_record( + paths: naming.SavePaths, + *, + plan: SaveDirPlan, + title: str | None, + note_written: bool, + audio_names: list[str], + speakers: list[str], + duration_seconds: int, + turns: int, +) -> dict[str, object]: + """The ``.aai.json`` metadata: enough for a browse UI without parsing the transcript. + + ``audio`` is a list so the ``--system-audio`` two-WAV case (one per channel) reads the + same shape as the single-WAV case; it is empty under ``--no-save-audio``. + """ + return { + "title": title, + "date": plan.now.isoformat(), + "duration_seconds": duration_seconds, + "speakers": speakers, + "turns": turns, + "transcript": paths.transcript.name, + "audio": audio_names, + "note": paths.note.name if note_written else None, + } + + +def write_outputs( + plan: SaveDirPlan, + *, + title: str | None, + note: str | None, + audio: list[Path], + speakers: list[str], + duration_seconds: int, + turns: int, +) -> naming.SavePaths: + """Finalize a ``--save-dir`` capture: rename for ``--auto-name``, write note + sidecar. + + ``title`` is the ``--auto-name`` headline (None when not requested); when it slugs to a + non-empty stem the provisional transcript and every WAV in ``audio`` are re-stemmed to + carry it. ``note`` is the final ``--llm`` answer, written as ``.md`` when present. + The sidecar is always written. Returns the final paths. + """ + provisional = plan.paths + final_name = title if (plan.auto_name and title) else plan.name + final = naming.resolve(plan.save_dir, final_name, now=plan.now) + final_audio = list(audio) + if final.stem != provisional.stem: + _rename(provisional.transcript, final.transcript) + final_audio = [_move_restem(src, provisional.stem, final.stem) for src in audio] + if note is not None: + _write(final.note, note + "\n") + record = _sidecar_record( + final, + plan=plan, + title=final_name, + note_written=note is not None, + audio_names=[p.name for p in final_audio], + speakers=speakers, + duration_seconds=duration_seconds, + turns=turns, + ) + _write(final.sidecar, json.dumps(record, indent=2) + "\n") + return final diff --git a/aai_cli/streaming/session.py b/aai_cli/streaming/session.py index fdc7c766..7e77ccd2 100644 --- a/aai_cli/streaming/session.py +++ b/aai_cli/streaming/session.py @@ -9,15 +9,9 @@ import typer -from aai_cli.core import choices, client, config_builder, errors, llm -from aai_cli.core.errors import ( - APIError, - CLIError, - NotAuthenticated, - UsageError, - mutually_exclusive, -) -from aai_cli.streaming import record +from aai_cli.core import client, config_builder, errors, llm +from aai_cli.core.errors import APIError, CLIError +from aai_cli.streaming import record, savedir from aai_cli.streaming.render import StreamRenderer, speaker_prefix from aai_cli.streaming.transcript import TranscriptWriter from aai_cli.ui import output @@ -42,99 +36,6 @@ def _finalized_turn_line(event: object, source_label: str | None) -> str | None: return f"{prefix[0]}: {text}" if prefix is not None else text -@dataclass(frozen=True) -class SourceOptions: - """Where the audio comes from, distilled from the CLI flags. - - Centralizes the "which input?" predicates so the validation and dispatch helpers - in the command module read off one object instead of re-deriving the same booleans. - """ - - source: str | None - sample: bool - sample_rate: int | None - device: int | None - system_audio: bool - system_audio_only: bool - - @property - def from_stdin(self) -> bool: - return self.source == "-" - - @property - def from_file(self) -> bool: - return bool(self.source) or self.sample - - @property - def from_system_audio(self) -> bool: - return self.system_audio or self.system_audio_only - - @property - def has_capture_overrides(self) -> bool: - """Whether a microphone-only flag (--sample-rate or --device) was given.""" - return self.sample_rate is not None or self.device is not None - - -def validate_output_flags(*, json_mode: bool, output_field: choices.TextOrJson | None) -> None: - """Reject --json combined with -o text, shared by `stream` and `agent`. - - Same precedent as --llm + -o text: contradictory output shapes are a clean - usage error, not a silent coin-flip between plain text and NDJSON. - """ - mutually_exclusive( - ("--json", json_mode), - ("-o text", output_field is choices.TextOrJson.text), - suggestion="Pick one output format.", - ) - - -def resolve_output_modes( - output_field: choices.TextOrJson | None, *, json_mode: bool -) -> tuple[bool, bool]: - """Validate the -o/--json combination, then fold it into (text_mode, json_mode). - - The two steps always run together for the realtime commands (`stream`, `agent`), - so pairing them here keeps a caller from resolving the modes without first - rejecting a contradictory pair. - """ - validate_output_flags(json_mode=json_mode, output_field=output_field) - return output.stream_output_modes(output_field, json_mode=json_mode) - - -def validate_sources(opts: SourceOptions, *, has_llm: bool, text_mode: bool) -> None: - """Reject flag combinations that can't be honored, before any audio is opened.""" - mutually_exclusive( - ("--system-audio", opts.system_audio), - ("--system-audio-only", opts.system_audio_only), - ) - _validate_input_source(opts) - mutually_exclusive( - ("--llm", has_llm), - ("-o text", text_mode), - suggestion="--llm renders a live panel (or NDJSON when piped).", - ) - - -def _validate_input_source(opts: SourceOptions) -> None: - """Reject --sample-rate/--device/source combinations the chosen input can't accept.""" - if opts.from_system_audio: - if opts.from_file: - raise UsageError("--system-audio cannot be combined with an audio source or --sample.") - if opts.system_audio_only and opts.has_capture_overrides: - raise UsageError( - "--sample-rate and --device require microphone input; use --system-audio." - ) - elif opts.from_stdin: - if opts.sample: - # The stdin branch wins dispatch over --sample, so without this the - # hosted clip would be silently ignored in favor of the pipe. - raise UsageError("- (stdin) cannot be combined with --sample.") - if opts.device is not None: - raise UsageError("--device applies only to microphone input.") - elif opts.from_file and opts.has_capture_overrides: - raise UsageError("--sample-rate and --device apply only to microphone input.") - - @dataclass class StreamSession: """Owns one streaming run: the renderers, the LLM-chain state, and the audio @@ -164,11 +65,22 @@ class StreamSession: # When set, write each finalized turn to this path as plain text (see TranscriptWriter). # One shared transcript even under --system-audio (both channels); batch rejects it. save_transcript: Path | None = None + # When set, run the --save-dir finalization (auto-name rename, --llm note, sidecar) + # once streaming ends. Only the single-source path sets it; batch rejects --save-dir. + save_plan: savedir.SaveDirPlan | None = None # Seconds between --llm summary refreshes; <=0 re-runs the chain on every turn. llm_interval: float = 0.0 # Monotonic clock, injectable so the interval throttle is deterministic in tests. clock: Callable[[], float] = time.monotonic transcript: list[str] = field(default_factory=list[str]) + # Finalized turn lines + diarized speakers, recorded for the --save-dir sidecar and + # --auto-name title regardless of --llm; only populated when save_plan is set. + _meta_lines: list[str] = field(default_factory=list[str]) + _meta_speakers: dict[str, None] = field(default_factory=dict[str, None]) + # Wall-clock capture window (via clock) → the sidecar's duration_seconds. + _capture_start: float | None = None + # The most recent --llm answer, written as the .md note at --save-dir finalization. + _last_answer: str | None = None # The open transcript-file writer for a single run; created/closed in _guarded so a # save target is opened once per session and a Ctrl-C still leaves a flushed file. _transcript_writer: TranscriptWriter | None = None @@ -205,6 +117,7 @@ def on_turn(self, event: object, *, source_label: str | None = None) -> None: line = _finalized_turn_line(event, source_label) if line is not None: self._save_line(line) + self._note_meta(event, line) else: # --llm mode locks only to record the turn; the chain re-runs (network) are # left unlocked so the other source's turns keep flowing during a refresh. @@ -219,6 +132,19 @@ def _save_line(self, line: str) -> None: if self._transcript_writer is not None: self._transcript_writer.write_turn(line) + def _note_meta(self, event: object, line: str) -> None: + """Record a finalized turn's text + speaker for the --save-dir sidecar/auto-name. + + A no-op unless --save-dir is active, so a plain run accumulates nothing. Called + under ``_callback_lock`` (like ``_save_line``) so the lists/sets stay consistent. + """ + if self.save_plan is None: + return + self._meta_lines.append(line) + speaker = getattr(event, "speaker_label", None) + if speaker is not None: + self._meta_speakers[str(speaker)] = None + def _record_turn(self, event: object, source_label: str | None) -> None: """Append a finalized turn to the running transcript (and the saved file), then refresh the --llm answer if a refresh is due (every turn, or once per @@ -229,6 +155,7 @@ def _record_turn(self, event: object, source_label: str | None) -> None: with self._callback_lock: self.transcript.append(line) self._save_line(line) + self._note_meta(event, line) self._maybe_summarize() def _maybe_summarize(self, *, final: bool = False) -> None: @@ -277,6 +204,8 @@ def _maybe_summarize(self, *, final: bool = False) -> None: f"[aai.muted]--llm refresh failed: {exc.message}[/aai.muted]" ) return + # Hold the latest answer so --save-dir can write it as the .md note at the end. + self._last_answer = answer follow(answer, turns) def _audio_target(self, source_label: str | None) -> Path | None: @@ -292,6 +221,9 @@ def _audio_target(self, source_label: str | None) -> Path | None: def stream_one( self, audio: Iterable[bytes], rate: int, *, source_label: str | None = None ) -> None: + if self._capture_start is None: + # First source opened → start the wall-clock window for the sidecar duration. + self._capture_start = self.clock() target = self._audio_target(source_label) if target is not None: # Tee verbatim to disk at the source's true rate before it hits the wire. @@ -378,13 +310,68 @@ def run( source_label: str | None = None, handle_interrupt: bool = True, ) -> None: - self._guarded( + self._guarded_then_finalize( lambda: self.stream_one(audio, rate, source_label=source_label), handle_interrupt=handle_interrupt, ) + def _guarded_then_finalize( + self, work: Callable[[], None], *, handle_interrupt: bool = True + ) -> None: + """Run ``work`` under ``_guarded``, then finalize a ``--save-dir`` capture. + + A user Ctrl-C (or SIGTERM) ends a recording via ``typer.Exit`` with the cancel + code, which is exactly when a stopped capture still wants its auto-name/note/ + sidecar — so finalize before re-raising that exit. A stream error (``CLIError``) + or a quiet broken-pipe exit (code 0) propagates without finalizing. + """ + try: + self._guarded(work, handle_interrupt=handle_interrupt) + except typer.Exit as stop: + if self.save_plan is not None and stop.exit_code == errors.CANCELLED_EXIT_CODE: + self._finalize_save_dir(self.save_plan) + raise + if self.save_plan is not None: + self._finalize_save_dir(self.save_plan) + + def _finalize_save_dir(self, plan: savedir.SaveDirPlan) -> None: + """Auto-name, write the --llm note, and drop the sidecar for a --save-dir capture.""" + transcript_text = " ".join(self._meta_lines) + title: str | None = None + if plan.auto_name and transcript_text: + try: + title = savedir.derive_title( + self.api_key, transcript_text, model=self.model, max_tokens=self.max_tokens + ) + except CLIError as exc: + # The recording is already saved under its timestamp stem; a failed title + # call shouldn't lose it, so warn and keep the timestamped name. + output.error_console.print( + f"[aai.muted]--auto-name failed: {exc.message}[/aai.muted]" + ) + duration = 0 if self._capture_start is None else round(self.clock() - self._capture_start) + savedir.write_outputs( + plan, + title=title, + note=self._last_answer if plan.write_note else None, + audio=self._audio_files(), + speakers=list(self._meta_speakers), + duration_seconds=duration, + turns=len(self._meta_lines), + ) + + def _audio_files(self) -> list[Path]: + """The WAV file(s) this run teed to: one per ``--system-audio`` channel, the lone + ``save_audio`` otherwise, or none under ``--no-save-audio``.""" + if self.save_audio is not None: + return [self.save_audio] + if self.save_audio_by_label is not None: + return list(self.save_audio_by_label.values()) + return [] + def run_parallel(self, streams: _ParallelStreams) -> None: - self._guarded(lambda: self._drive(streams)) + # Same finalize-on-stop handling as run(), for the --save-dir --system-audio pair. + self._guarded_then_finalize(lambda: self._drive(streams)) def _drive(self, streams: _ParallelStreams) -> None: """Stream every source concurrently, surfacing the first worker error.""" @@ -419,61 +406,3 @@ def worker(source_label: str, audio: Iterable[bytes], rate: int) -> None: raise errors.get() if not errors.empty(): raise errors.get() - - -# A batch source string resolved to its real-time audio chunks and declared rate. -_OpenedSource = tuple[Iterable[bytes], int] - - -def stream_batch_sources( - sources: list[str], - *, - make_session: Callable[[], StreamSession], - open_source: Callable[[str], _OpenedSource], - renderer: StreamRenderer, - json_mode: bool, -) -> None: - """Stream each source in ``sources`` in turn — the ``assembly stream --from-stdin`` - batch mode. - - The realtime API is one session at a time, so a list of files/URLs streams - sequentially: each source gets a fresh ``StreamSession`` from ``make_session`` (its - own transcript and ``--llm`` chain state) and is announced via ``renderer.source`` - before its turns. ``open_source`` resolves a source string to ``(audio, rate)`` and - may raise ``CLIError`` (bad path, missing ffmpeg, decode failure), which is recorded - as a per-source failure so the batch carries on — except ``NotAuthenticated``, which - re-raises to abort the whole batch (one rejected key fails every source identically). - - A Ctrl-C stops the batch with the cancel code (exit 130); a closed downstream pipe - stops it quietly (exit 0). When any source failed, raises a ``CLIError`` at the end - so a script can trust the exit code. - """ - total = len(sources) - failures: list[str] = [] - try: - for index, source in enumerate(sources, start=1): - renderer.source(source, index=index, total=total) - try: - audio, rate = open_source(source) - make_session().run(audio, rate, handle_interrupt=False) - except NotAuthenticated: - raise - except CLIError as exc: - failures.append(source) - output.emit_warning(f"{source}: {exc.message}", json_mode=json_mode) - except KeyboardInterrupt: - # One Ctrl-C stops the whole batch, not just the current source. Exit 130 - # (cancel) so the interrupt isn't mistaken for a clean run of every source. - renderer.stopped() - raise typer.Exit(code=errors.CANCELLED_EXIT_CODE) from None - except BrokenPipeError: - # Downstream consumer (e.g. `| head`) closed the pipe; stop quietly. - raise typer.Exit(code=0) from None - finally: - renderer.close() - if failures: - raise CLIError( - f"{len(failures)} of {total} sources failed.", - error_type="batch_failed", - suggestion="Check each failed path or URL, then re-run.", - ) diff --git a/aai_cli/streaming/validate.py b/aai_cli/streaming/validate.py new file mode 100644 index 00000000..c1f0c77c --- /dev/null +++ b/aai_cli/streaming/validate.py @@ -0,0 +1,109 @@ +"""Input + output-mode validation for the realtime commands (`stream`, `agent`, …). + +The ``StreamSession`` execution engine (``session.py``) doesn't use any of this — only +the command layer does, ahead of opening audio or credentials. Keeping the "which input, +which output shape, is this flag combo legal?" logic here (the frozen ``SourceOptions`` +carrier plus the validate/resolve helpers) leaves ``session.py`` to the run machinery, +mirroring the ``app/transcribe`` run/validate split. +""" + +from __future__ import annotations + +from dataclasses import dataclass + +from aai_cli.core import choices +from aai_cli.core.errors import UsageError, mutually_exclusive +from aai_cli.ui import output + + +@dataclass(frozen=True) +class SourceOptions: + """Where the audio comes from, distilled from the CLI flags. + + Centralizes the "which input?" predicates so the validation and dispatch helpers + in the command module read off one object instead of re-deriving the same booleans. + """ + + source: str | None + sample: bool + sample_rate: int | None + device: int | None + system_audio: bool + system_audio_only: bool + + @property + def from_stdin(self) -> bool: + return self.source == "-" + + @property + def from_file(self) -> bool: + return bool(self.source) or self.sample + + @property + def from_system_audio(self) -> bool: + return self.system_audio or self.system_audio_only + + @property + def has_capture_overrides(self) -> bool: + """Whether a microphone-only flag (--sample-rate or --device) was given.""" + return self.sample_rate is not None or self.device is not None + + +def validate_output_flags(*, json_mode: bool, output_field: choices.TextOrJson | None) -> None: + """Reject --json combined with -o text, shared by `stream` and `agent`. + + Same precedent as --llm + -o text: contradictory output shapes are a clean + usage error, not a silent coin-flip between plain text and NDJSON. + """ + mutually_exclusive( + ("--json", json_mode), + ("-o text", output_field is choices.TextOrJson.text), + suggestion="Pick one output format.", + ) + + +def resolve_output_modes( + output_field: choices.TextOrJson | None, *, json_mode: bool +) -> tuple[bool, bool]: + """Validate the -o/--json combination, then fold it into (text_mode, json_mode). + + The two steps always run together for the realtime commands (`stream`, `agent`), + so pairing them here keeps a caller from resolving the modes without first + rejecting a contradictory pair. + """ + validate_output_flags(json_mode=json_mode, output_field=output_field) + return output.stream_output_modes(output_field, json_mode=json_mode) + + +def validate_sources(opts: SourceOptions, *, has_llm: bool, text_mode: bool) -> None: + """Reject flag combinations that can't be honored, before any audio is opened.""" + mutually_exclusive( + ("--system-audio", opts.system_audio), + ("--system-audio-only", opts.system_audio_only), + ) + _validate_input_source(opts) + mutually_exclusive( + ("--llm", has_llm), + ("-o text", text_mode), + suggestion="--llm renders a live panel (or NDJSON when piped).", + ) + + +def _validate_input_source(opts: SourceOptions) -> None: + """Reject --sample-rate/--device/source combinations the chosen input can't accept.""" + if opts.from_system_audio: + if opts.from_file: + raise UsageError("--system-audio cannot be combined with an audio source or --sample.") + if opts.system_audio_only and opts.has_capture_overrides: + raise UsageError( + "--sample-rate and --device require microphone input; use --system-audio." + ) + elif opts.from_stdin: + if opts.sample: + # The stdin branch wins dispatch over --sample, so without this the + # hosted clip would be silently ignored in favor of the pipe. + raise UsageError("- (stdin) cannot be combined with --sample.") + if opts.device is not None: + raise UsageError("--device applies only to microphone input.") + elif opts.from_file and opts.has_capture_overrides: + raise UsageError("--sample-rate and --device apply only to microphone input.") diff --git a/tests/__snapshots__/test_snapshots_help_run.ambr b/tests/__snapshots__/test_snapshots_help_run.ambr index b9af0c88..5b336190 100644 --- a/tests/__snapshots__/test_snapshots_help_run.ambr +++ b/tests/__snapshots__/test_snapshots_help_run.ambr @@ -774,6 +774,10 @@ │ one WAV per channel │ │ --name TEXT Title to slug into the --save-dir │ │ filename (e.g. a meeting title) │ + │ --auto-name With --save-dir, derive the filename │ + │ from the transcript via the LLM │ + │ --no-save-audio With --save-dir, skip the WAV and save │ + │ only the transcript │ ╰──────────────────────────────────────────────────────────────────────────────╯ ╭─ Model & Language ───────────────────────────────────────────────────────────╮ │ --speech-model [universal-streaming-m Streaming speech model │ @@ -881,6 +885,9 @@ $ assembly stream --save-transcript notes.txt Auto-name the transcript + WAV under a dir $ assembly stream --save-dir ~/recordings --name "Standup" + Name from content + save a summary note + $ assembly stream --save-dir ~/recordings --auto-name --llm "summarize as a + note" Boost domain terms with keyterm prompts $ assembly stream --keyterms-prompt "AssemblyAI" --keyterms-prompt "Claude" Summarize action items live as you talk diff --git a/tests/_stream_helpers.py b/tests/_stream_helpers.py new file mode 100644 index 00000000..274ae2d9 --- /dev/null +++ b/tests/_stream_helpers.py @@ -0,0 +1,113 @@ +"""Shared building blocks for the `assembly stream` run-path tests. + +Split out of test_stream_exec.py so the save-flag suites (test_stream_exec.py and +test_stream_save_dir.py) share one set of fakes — a mic, turn events, a frozen clock, +and the StreamOptions defaults — instead of duplicating them per file. +""" + +from __future__ import annotations + +from datetime import datetime + +from aai_cli.commands.stream import DEFAULT_SPEECH_MODEL +from aai_cli.commands.stream import _exec as stream_exec +from aai_cli.core import llm + +# The CLI's flag defaults, as data. Tests override per-case with dataclasses.replace. +DEFAULTS = stream_exec.StreamOptions( + source=None, + sample=False, + from_stdin=False, + sample_rate=None, + device=None, + system_audio=False, + system_audio_only=False, + speech_model=DEFAULT_SPEECH_MODEL, + encoding=None, + language_detection=None, + domain=None, + prompt=None, + keyterms_prompt=None, + end_of_turn_confidence_threshold=None, + min_turn_silence=None, + max_turn_silence=None, + turn_detection=None, + vad_threshold=None, + format_turns=None, + include_partial_turns=None, + speaker_labels=None, + max_speakers=None, + voice_focus=None, + voice_focus_threshold=None, + inactivity_timeout=None, + filter_profanity=None, + redact_pii=None, + redact_pii_policy=None, + redact_pii_sub=None, + webhook_url=None, + webhook_auth_header=None, + llm_prompt=None, + llm_interval=10.0, + model=llm.DEFAULT_MODEL, + max_tokens=llm.DEFAULT_MAX_TOKENS, + config_kv=None, + config_file=None, + output_field=None, + show_code=False, + save_audio=None, + save_transcript=None, + save_dir=None, + name=None, + auto_name=False, + no_save_audio=False, +) + + +class FakeMic: + """Mirrors MicrophoneSource's keyword signature (see microphone.py).""" + + def __init__(self, *, target_rate=None, device=None, capture_rate=None, on_open=None): + self.sample_rate = capture_rate or 16000 + self.device = device + + def __iter__(self): + return iter([b"\x00\x00"]) + + +class RecordingMic(FakeMic): + """A mic that yields known PCM so the tee'd WAV's contents can be asserted.""" + + PCM = b"\x01\x02\x03\x04\x05\x06\x07\x08" + + def __iter__(self): + return iter([self.PCM]) + + +class FakeTurn: + """A streaming turn event with just the attributes the session reads.""" + + def __init__(self, transcript, *, end_of_turn=True, speaker_label=None): + self.transcript = transcript + self.end_of_turn = end_of_turn + self.speaker_label = speaker_label + + +def emit_turns(*events): + """A fake client.stream_audio that drains the audio (driving any tee) then fires + each turn through the session's on_turn callback, like the real SDK reader.""" + + def _fake(api_key, source, *, params, on_turn, **_kwargs): + b"".join(source) # draining is what writes the tee'd WAV, if any + for event in events: + on_turn(event) + + return _fake + + +class FixedDatetime: + """Freezes datetime.now() so the auto-assembled filename is deterministic.""" + + @staticmethod + def now(*_args, **_kwargs): + # Naive local wall-clock; _exec's .astimezone() keeps the same 14:30:05. + return datetime(2026, 6, 16, 14, 30, 5) diff --git a/tests/test_stream_batch.py b/tests/test_stream_batch.py index 6a58ecc4..0ce2efbc 100644 --- a/tests/test_stream_batch.py +++ b/tests/test_stream_batch.py @@ -159,3 +159,59 @@ def fake_stream_audio(api_key, source, *, params, **_kwargs): result = runner.invoke(app, ["stream", "--from-stdin"], input="") assert result.exit_code == 2 assert "No sources received on stdin" in result.output + + +def test_stream_batch_sources_reports_exact_failure_count(): + # Every source failing raises a CLIError naming the exact count (pins the `failures +=` + # accumulator: a sign/operator slip would report a wrong total like "-2 of 2"). + import io + + import pytest + + from aai_cli.core.errors import CLIError + from aai_cli.streaming.batch import stream_batch_sources + from aai_cli.streaming.render import StreamRenderer + + def open_source(source): + raise CLIError(f"nope: {source}", error_type="file_not_found", exit_code=2) + + def make_session(): + raise AssertionError("a failed open must short-circuit before a session opens") + + with pytest.raises(CLIError) as excinfo: + stream_batch_sources( + ["a.wav", "b.wav"], + make_session=make_session, + open_source=open_source, + renderer=StreamRenderer(json_mode=True, out=io.StringIO()), + json_mode=True, + ) + assert excinfo.value.message == "2 of 2 sources failed." + + +def test_stream_source_strips_newlines_from_failure_warning(capsys): + # A crafted source/error with embedded CR/LF must not inject extra log lines: the + # emitted warning is flattened to a single line (pins the newline sanitization). + import io + import json as _json + + from aai_cli.core.errors import CLIError + from aai_cli.streaming import batch + from aai_cli.streaming.render import StreamRenderer + + def open_source(source): + raise CLIError("bad\ninjected line") + + failed = batch._stream_source( + "a\nb.wav", + index=1, + total=1, + make_session=lambda: (_ for _ in ()).throw(AssertionError("never")), + open_source=open_source, + renderer=StreamRenderer(json_mode=True, out=io.StringIO()), + json_mode=True, + ) + assert failed is True + warning = _json.loads(capsys.readouterr().err.strip())["warning"] + assert warning == "a b.wav: bad injected line" + assert "\n" not in warning diff --git a/tests/test_stream_exec.py b/tests/test_stream_exec.py index d4cf5bb4..db8e1773 100644 --- a/tests/test_stream_exec.py +++ b/tests/test_stream_exec.py @@ -3,82 +3,24 @@ The command module only parses argv into a StreamOptions; everything after that is run_stream, a plain function of data. These tests drive validation, flag mapping, and session wiring by constructing options directly — no CliRunner argv round-trip, -no merged-stream output parsing. +no merged-stream output parsing. The --save-dir suite lives in test_stream_save_dir.py; +the shared fakes (mic, turns, defaults) live in tests/_stream_helpers.py. """ from __future__ import annotations import dataclasses import wave -from datetime import datetime from pathlib import Path import pytest from aai_cli.app.context import AppState -from aai_cli.commands.stream import DEFAULT_SPEECH_MODEL from aai_cli.commands.stream import _exec as stream_exec -from aai_cli.core import config, llm +from aai_cli.core import config from aai_cli.core.errors import CLIError, UsageError from aai_cli.streaming.turn_presets import TurnDetectionPreset - -# The CLI's flag defaults, as data. Tests override per-case with dataclasses.replace. -DEFAULTS = stream_exec.StreamOptions( - source=None, - sample=False, - from_stdin=False, - sample_rate=None, - device=None, - system_audio=False, - system_audio_only=False, - speech_model=DEFAULT_SPEECH_MODEL, - encoding=None, - language_detection=None, - domain=None, - prompt=None, - keyterms_prompt=None, - end_of_turn_confidence_threshold=None, - min_turn_silence=None, - max_turn_silence=None, - turn_detection=None, - vad_threshold=None, - format_turns=None, - include_partial_turns=None, - speaker_labels=None, - max_speakers=None, - voice_focus=None, - voice_focus_threshold=None, - inactivity_timeout=None, - filter_profanity=None, - redact_pii=None, - redact_pii_policy=None, - redact_pii_sub=None, - webhook_url=None, - webhook_auth_header=None, - llm_prompt=None, - llm_interval=10.0, - model=llm.DEFAULT_MODEL, - max_tokens=llm.DEFAULT_MAX_TOKENS, - config_kv=None, - config_file=None, - output_field=None, - show_code=False, - save_audio=None, - save_transcript=None, - save_dir=None, - name=None, -) - - -class FakeMic: - """Mirrors MicrophoneSource's keyword signature (see microphone.py).""" - - def __init__(self, *, target_rate=None, device=None, capture_rate=None, on_open=None): - self.sample_rate = capture_rate or 16000 - self.device = device - - def __iter__(self): - return iter([b"\x00\x00"]) +from tests._stream_helpers import DEFAULTS, FakeMic, FakeTurn, RecordingMic, emit_turns def test_run_stream_maps_flags_to_params_without_cli(monkeypatch): @@ -165,6 +107,24 @@ def test_stream_options_are_immutable(): setattr(DEFAULTS, field_name, True) +def test_source_options_are_immutable(): + # The input carrier is frozen too, so a validation/dispatch step can't mutate which + # source a run reads from after the flags are resolved. + from aai_cli.streaming.validate import SourceOptions + + opts = SourceOptions( + source=None, + sample=False, + sample_rate=None, + device=None, + system_audio=False, + system_audio_only=False, + ) + field_name = "system_audio" + with pytest.raises(dataclasses.FrozenInstanceError): + setattr(opts, field_name, True) + + def test_save_targets_are_immutable(): # The resolved save destinations are a frozen carrier (like StreamOptions), so a # later step can't quietly retarget a file mid-run. @@ -189,6 +149,8 @@ def test_save_targets_are_immutable(): {"from_stdin": True, "save_transcript": Path("out.txt")}, # saves one transcript {"from_stdin": True, "save_dir": Path("rec")}, # auto-names one run {"from_stdin": True, "name": "Standup"}, # --name needs --save-dir + {"from_stdin": True, "auto_name": True}, # --auto-name names one run + {"from_stdin": True, "no_save_audio": True}, # --no-save-audio is a single-run flag ], ) def test_from_stdin_rejects_incompatible_flags(overrides): @@ -244,15 +206,6 @@ def fake_stream_batch(sources, *, make_session, open_source, renderer, json_mode # --- --save-audio (tee the streamed PCM to a WAV) -------------------------- -class RecordingMic(FakeMic): - """A mic that yields known PCM so the tee'd WAV's contents can be asserted.""" - - PCM = b"\x01\x02\x03\x04\x05\x06\x07\x08" - - def __iter__(self): - return iter([self.PCM]) - - def test_save_audio_tees_streamed_pcm_to_a_wav(monkeypatch, tmp_path): # The bytes the streaming API receives are also written to --save-audio, verbatim, # as a 16-bit mono WAV at the source's sample rate. @@ -353,28 +306,7 @@ def test_save_audio_rejects_missing_parent_dir(tmp_path): assert excinfo.value.error_type == "save_audio_path" -# --- --save-transcript / --save-dir (write the transcript text) ------------ -class FakeTurn: - """A streaming turn event with just the attributes the session reads.""" - - def __init__(self, transcript, *, end_of_turn=True, speaker_label=None): - self.transcript = transcript - self.end_of_turn = end_of_turn - self.speaker_label = speaker_label - - -def _emit_turns(*events): - """A fake client.stream_audio that drains the audio (driving any tee) then fires - each turn through the session's on_turn callback, like the real SDK reader.""" - - def _fake(api_key, source, *, params, on_turn, **_kwargs): - b"".join(source) # draining is what writes the tee'd WAV, if any - for event in events: - on_turn(event) - - return _fake - - +# --- --save-transcript (write the finalized turn text) --------------------- def test_save_transcript_writes_only_finalized_nonempty_turns(monkeypatch, tmp_path): # Each finalized, non-empty turn is one line; partials and empty turns are skipped. config.set_api_key("default", "sk_live") @@ -382,7 +314,7 @@ def test_save_transcript_writes_only_finalized_nonempty_turns(monkeypatch, tmp_p monkeypatch.setattr( stream_exec.client, "stream_audio", - _emit_turns( + emit_turns( FakeTurn("partial", end_of_turn=False), # not finalized -> skipped FakeTurn("hello world"), FakeTurn("", end_of_turn=True), # finalized but empty -> skipped @@ -403,7 +335,7 @@ def test_save_transcript_prefixes_diarized_speaker(monkeypatch, tmp_path): config.set_api_key("default", "sk_live") out = tmp_path / "notes.txt" monkeypatch.setattr( - stream_exec.client, "stream_audio", _emit_turns(FakeTurn("hi", speaker_label="A")) + stream_exec.client, "stream_audio", emit_turns(FakeTurn("hi", speaker_label="A")) ) monkeypatch.setattr(stream_exec, "MicrophoneSource", FakeMic) @@ -418,7 +350,7 @@ def test_no_transcript_file_written_when_flag_unset(monkeypatch, tmp_path): # Without a save flag the default run leaves no stray .txt (kills a mutant that # writes unconditionally). config.set_api_key("default", "sk_live") - monkeypatch.setattr(stream_exec.client, "stream_audio", _emit_turns(FakeTurn("hi"))) + monkeypatch.setattr(stream_exec.client, "stream_audio", emit_turns(FakeTurn("hi"))) monkeypatch.setattr(stream_exec, "MicrophoneSource", FakeMic) stream_exec.run_stream(DEFAULTS, AppState(), json_mode=True) @@ -426,68 +358,6 @@ def test_no_transcript_file_written_when_flag_unset(monkeypatch, tmp_path): assert list(tmp_path.glob("*.txt")) == [] -class _FixedDatetime: - """Freezes datetime.now() so the auto-assembled filename is deterministic.""" - - @staticmethod - def now(*_args, **_kwargs): - # Naive local wall-clock; _exec's .astimezone() keeps the same 14:30:05. - return datetime(2026, 6, 16, 14, 30, 5) - - -def test_save_dir_auto_names_transcript_and_matching_wav(monkeypatch, tmp_path): - # --save-dir buckets by date and shares one timestamp+slug stem across the .txt and - # the .wav, so both land together under DIR/YYYY-MM-DD/. - config.set_api_key("default", "sk_live") - monkeypatch.setattr(stream_exec, "datetime", _FixedDatetime) - monkeypatch.setattr(stream_exec.client, "stream_audio", _emit_turns(FakeTurn("hi there"))) - monkeypatch.setattr(stream_exec, "MicrophoneSource", RecordingMic) - - stream_exec.run_stream( - dataclasses.replace(DEFAULTS, save_dir=tmp_path / "rec", name="My Meeting"), - AppState(), - json_mode=True, - ) - - bucket = tmp_path / "rec" / "2026-06-16" - txt = bucket / "2026-06-16-143005-my-meeting.txt" - wav = bucket / "2026-06-16-143005-my-meeting.wav" - assert txt.read_text(encoding="utf-8") == "hi there\n" - with wave.open(str(wav), "rb") as w: - assert w.readframes(w.getnframes()) == RecordingMic.PCM - - -@pytest.mark.parametrize( - "overrides", - [ - {"save_dir": Path("rec"), "save_audio": Path("a.wav")}, # save-dir owns the audio name - {"save_dir": Path("rec"), "save_transcript": Path("a.txt")}, # ...and the transcript - {"name": "Standup"}, # --name without --save-dir is meaningless - ], -) -def test_save_dir_rejects_incompatible_flags(overrides): - with pytest.raises(UsageError): - stream_exec.run_stream( - dataclasses.replace(DEFAULTS, **overrides), AppState(), json_mode=False - ) - - -@pytest.mark.parametrize( - "overrides", - [ - {"save_transcript": Path("a.txt"), "show_code": True}, - {"save_dir": Path("rec"), "show_code": True}, - ], -) -def test_save_flags_reject_show_code(overrides): - # The generated SDK code doesn't save to disk, so pairing a save flag with --show-code - # is a usage error rather than a silently-dropped save. - with pytest.raises(UsageError): - stream_exec.run_stream( - dataclasses.replace(DEFAULTS, **overrides), AppState(), json_mode=False - ) - - def test_save_transcript_rejects_missing_parent_dir(tmp_path): config.set_api_key("default", "sk_live") with pytest.raises(CLIError) as excinfo: diff --git a/tests/test_stream_save_dir.py b/tests/test_stream_save_dir.py new file mode 100644 index 00000000..cb5c4f6e --- /dev/null +++ b/tests/test_stream_save_dir.py @@ -0,0 +1,133 @@ +"""End-to-end tests of `assembly stream --save-dir` through run_stream. + +Split from test_stream_exec.py: this file drives the auto-name / note / sidecar / +--no-save-audio behavior over the real session + savedir finalization (only the LLM +gateway is faked). Shared fakes live in tests/_stream_helpers.py. +""" + +from __future__ import annotations + +import dataclasses +import json +import wave +from pathlib import Path + +import pytest + +from aai_cli.app.context import AppState +from aai_cli.commands.stream import _exec as stream_exec +from aai_cli.core import config +from aai_cli.core.errors import UsageError +from tests._stream_helpers import DEFAULTS, FakeTurn, FixedDatetime, RecordingMic, emit_turns + + +def test_save_dir_auto_names_transcript_and_matching_wav(monkeypatch, tmp_path): + # --save-dir buckets by date and shares one timestamp+slug stem across the .txt and + # the .wav, so both land together under DIR/YYYY-MM-DD/. + config.set_api_key("default", "sk_live") + monkeypatch.setattr(stream_exec, "datetime", FixedDatetime) + monkeypatch.setattr(stream_exec.client, "stream_audio", emit_turns(FakeTurn("hi there"))) + monkeypatch.setattr(stream_exec, "MicrophoneSource", RecordingMic) + + stream_exec.run_stream( + dataclasses.replace(DEFAULTS, save_dir=tmp_path / "rec", name="My Meeting"), + AppState(), + json_mode=True, + ) + + bucket = tmp_path / "rec" / "2026-06-16" + txt = bucket / "2026-06-16-143005-my-meeting.txt" + wav = bucket / "2026-06-16-143005-my-meeting.wav" + assert txt.read_text(encoding="utf-8") == "hi there\n" + with wave.open(str(wav), "rb") as w: + assert w.readframes(w.getnframes()) == RecordingMic.PCM + # The sidecar lands beside them with the same stem. + assert (bucket / "2026-06-16-143005-my-meeting.aai.json").is_file() + + +@pytest.mark.parametrize( + "overrides", + [ + {"save_dir": Path("rec"), "save_audio": Path("a.wav")}, # save-dir owns the audio name + {"save_dir": Path("rec"), "save_transcript": Path("a.txt")}, # ...and the transcript + {"save_dir": Path("rec"), "name": "X", "auto_name": True}, # both set the title + {"name": "Standup"}, # --name without --save-dir is meaningless + {"auto_name": True}, # --auto-name needs --save-dir + {"no_save_audio": True}, # --no-save-audio needs --save-dir + ], +) +def test_save_dir_rejects_incompatible_flags(overrides): + with pytest.raises(UsageError): + stream_exec.run_stream( + dataclasses.replace(DEFAULTS, **overrides), AppState(), json_mode=False + ) + + +@pytest.mark.parametrize( + "overrides", + [ + {"save_transcript": Path("a.txt"), "show_code": True}, + {"save_dir": Path("rec"), "show_code": True}, + ], +) +def test_save_flags_reject_show_code(overrides): + # The generated SDK code doesn't save to disk, so pairing a save flag with --show-code + # is a usage error rather than a silently-dropped save. + with pytest.raises(UsageError): + stream_exec.run_stream( + dataclasses.replace(DEFAULTS, **overrides), AppState(), json_mode=False + ) + + +def test_no_save_audio_writes_transcript_and_sidecar_but_no_wav(monkeypatch, tmp_path): + # --save-dir --no-save-audio keeps the auto-named transcript + sidecar but writes no WAV. + config.set_api_key("default", "sk_live") + monkeypatch.setattr(stream_exec, "datetime", FixedDatetime) + monkeypatch.setattr(stream_exec.client, "stream_audio", emit_turns(FakeTurn("hi there"))) + monkeypatch.setattr(stream_exec, "MicrophoneSource", RecordingMic) + + stream_exec.run_stream( + dataclasses.replace(DEFAULTS, save_dir=tmp_path / "rec", name="Talk", no_save_audio=True), + AppState(), + json_mode=True, + ) + + bucket = tmp_path / "rec" / "2026-06-16" + assert (bucket / "2026-06-16-143005-talk.txt").read_text(encoding="utf-8") == "hi there\n" + record = json.loads((bucket / "2026-06-16-143005-talk.aai.json").read_text(encoding="utf-8")) + assert record["audio"] == [] + assert list(bucket.glob("*.wav")) == [] + + +def test_save_dir_auto_name_and_note_end_to_end(monkeypatch, tmp_path): + # --save-dir --auto-name --llm: the files are renamed from the LLM-derived title, the + # final answer lands as a .md note, and the sidecar records the title. + config.set_api_key("default", "sk_live") + monkeypatch.setattr(stream_exec, "datetime", FixedDatetime) + monkeypatch.setattr(stream_exec.client, "stream_audio", emit_turns(FakeTurn("hi there"))) + monkeypatch.setattr(stream_exec, "MicrophoneSource", RecordingMic) + + from aai_cli.streaming import savedir + + def fake_run_chain(api_key, prompts, *, transcript_text, model, max_tokens): + return "Cool Title" if prompts == [savedir.TITLE_PROMPT] else "the summary" + + monkeypatch.setattr("aai_cli.core.llm.run_chain", fake_run_chain) + + stream_exec.run_stream( + dataclasses.replace( + DEFAULTS, save_dir=tmp_path / "rec", auto_name=True, llm_prompt=["summarize"] + ), + AppState(), + json_mode=True, + ) + + bucket = tmp_path / "rec" / "2026-06-16" + stem = "2026-06-16-143005-cool-title" + assert (bucket / f"{stem}.txt").read_text(encoding="utf-8") == "hi there\n" + assert (bucket / f"{stem}.md").read_text(encoding="utf-8") == "the summary\n" + with wave.open(str(bucket / f"{stem}.wav"), "rb") as w: + assert w.readframes(w.getnframes()) == RecordingMic.PCM + record = json.loads((bucket / f"{stem}.aai.json").read_text(encoding="utf-8")) + assert record["title"] == "Cool Title" + assert record["turns"] == 1 diff --git a/tests/test_stream_session.py b/tests/test_stream_session.py index 702b39af..3f76da43 100644 --- a/tests/test_stream_session.py +++ b/tests/test_stream_session.py @@ -6,6 +6,338 @@ """ import types +from datetime import datetime + +import pytest + + +def _turn(text, *, speaker_label=None): + return types.SimpleNamespace(transcript=text, end_of_turn=True, speaker_label=speaker_label) + + +def _save_plan(tmp_path, *, auto_name=False, write_note=False, name: str | None = "Meeting"): + from aai_cli.streaming.savedir import SaveDirPlan + + return SaveDirPlan( + save_dir=tmp_path / "rec", + now=datetime(2026, 6, 16, 14, 30, 5), + name=name, + auto_name=auto_name, + write_note=write_note, + ) + + +def test_save_dir_finalize_passes_recorded_metadata(monkeypatch, tmp_path): + # A --save-dir run records each finalized turn's text + diarized speaker and the + # wall-clock duration, then hands them to write_outputs once streaming ends. Pins + # the speaker dedupe, the turn count, and the injected-clock duration. + import io + + from aai_cli.streaming import savedir as savedir_mod + from aai_cli.streaming import session as session_mod + from aai_cli.streaming.render import StreamRenderer + + captured: dict[str, object] = {} + monkeypatch.setattr( + savedir_mod, "write_outputs", lambda plan, **kw: captured.update(kw) or plan.paths + ) + monkeypatch.setattr( + session_mod.client, + "stream_audio", + lambda api_key, source, *, on_turn, **k: [ + b"".join(source), + on_turn(_turn("hello", speaker_label="A")), + on_turn(_turn("again", speaker_label="A")), # same speaker -> deduped + on_turn(_turn("bye", speaker_label="B")), + ], + ) + ticks = iter([100.0, 107.0]) + session = session_mod.StreamSession( + api_key="sk", + base_flags={"speech_model": "u3-rt-pro"}, + overrides=None, + config_file=None, + renderer=StreamRenderer(json_mode=True, out=io.StringIO()), + follow=None, + llm_prompts=[], + model="m", + max_tokens=1, + save_audio=tmp_path / "out.wav", + save_plan=_save_plan(tmp_path), + clock=lambda: next(ticks), + ) + session.run([b"\x00\x00"], 16000) + + assert captured["speakers"] == ["A", "B"] + assert captured["turns"] == 3 + assert captured["duration_seconds"] == 7 # 107.0 - 100.0 + assert captured["title"] is None # no --auto-name + assert captured["note"] is None # no --llm note + assert captured["audio"] == [tmp_path / "out.wav"] # the single teed WAV is handed on + + +def test_audio_files_lists_per_channel_or_single_or_none(tmp_path): + # _audio_files reports what finalize should rename/record: the per-channel WAVs under + # --system-audio, the lone save_audio otherwise, or nothing under --no-save-audio. + import io + + from aai_cli.streaming import session as session_mod + from aai_cli.streaming.render import StreamRenderer + + def _session(**kw): + return session_mod.StreamSession( + api_key="sk", + base_flags={}, + overrides=None, + config_file=None, + renderer=StreamRenderer(json_mode=True, out=io.StringIO()), + follow=None, + llm_prompts=[], + model="m", + max_tokens=1, + **kw, + ) + + you, system = tmp_path / "you.wav", tmp_path / "system.wav" + assert _session(save_audio=tmp_path / "a.wav")._audio_files() == [tmp_path / "a.wav"] + assert _session(save_audio_by_label={"you": you, "system": system})._audio_files() == [ + you, + system, + ] + assert _session()._audio_files() == [] + + +def test_save_dir_finalize_derives_title_and_note(monkeypatch, tmp_path): + # --auto-name derives the title from the transcript via the LLM, and --llm's final + # answer is handed to write_outputs as the note. + import io + + from aai_cli.streaming import savedir as savedir_mod + from aai_cli.streaming import session as session_mod + from aai_cli.streaming.render import StreamRenderer + from aai_cli.ui.follow import FollowRenderer + + captured: dict[str, object] = {} + monkeypatch.setattr( + savedir_mod, "write_outputs", lambda plan, **kw: captured.update(kw) or plan.paths + ) + monkeypatch.setattr(savedir_mod, "derive_title", lambda *a, **k: "Derived Title") + monkeypatch.setattr(session_mod.llm, "run_chain", lambda *a, **k: "the summary") + monkeypatch.setattr( + session_mod.client, + "stream_audio", + lambda api_key, source, *, on_turn, **k: [b"".join(source), on_turn(_turn("hi"))], + ) + session = session_mod.StreamSession( + api_key="sk", + base_flags={"speech_model": "u3-rt-pro"}, + overrides=None, + config_file=None, + renderer=StreamRenderer(json_mode=True, out=io.StringIO()), + follow=FollowRenderer(json_mode=True), + llm_prompts=["summarize"], + model="m", + max_tokens=1, + save_plan=_save_plan(tmp_path, auto_name=True, write_note=True, name=None), + llm_interval=0.0, + ) + session.run([b"\x00\x00"], 16000) + + assert captured["title"] == "Derived Title" + assert captured["note"] == "the summary" + + +def test_save_dir_skips_title_when_transcript_is_empty(monkeypatch, tmp_path): + # --auto-name with zero finalized turns has nothing to title, so derive_title is + # skipped and the file keeps its timestamp stem (pins the `auto_name and text` guard). + import io + + from aai_cli.streaming import savedir as savedir_mod + from aai_cli.streaming import session as session_mod + from aai_cli.streaming.render import StreamRenderer + + captured: dict[str, object] = {} + monkeypatch.setattr( + savedir_mod, "write_outputs", lambda plan, **kw: captured.update(kw) or plan.paths + ) + monkeypatch.setattr(savedir_mod, "derive_title", lambda *a, **k: "Should Not Be Used") + monkeypatch.setattr( + session_mod.client, + "stream_audio", + lambda api_key, source, *, on_turn, **k: b"".join(source), # no turns fired + ) + session = session_mod.StreamSession( + api_key="sk", + base_flags={"speech_model": "u3-rt-pro"}, + overrides=None, + config_file=None, + renderer=StreamRenderer(json_mode=True, out=io.StringIO()), + follow=None, + llm_prompts=[], + model="m", + max_tokens=1, + save_plan=_save_plan(tmp_path, auto_name=True, name=None), + ) + session.run([b"\x00\x00"], 16000) + + assert captured["title"] is None # no transcript -> no LLM title call + assert captured["turns"] == 0 + + +def test_finalize_uses_zero_duration_when_capture_never_started(monkeypatch, tmp_path): + # If the capture window never opened (stream_one not reached), the sidecar duration is + # 0, not a bogus value (pins the `0 if _capture_start is None` literal). + import io + + from aai_cli.streaming import savedir as savedir_mod + from aai_cli.streaming import session as session_mod + from aai_cli.streaming.render import StreamRenderer + + captured: dict[str, object] = {} + monkeypatch.setattr( + savedir_mod, "write_outputs", lambda plan, **kw: captured.update(kw) or plan.paths + ) + plan = _save_plan(tmp_path) + session = session_mod.StreamSession( + api_key="sk", + base_flags={}, + overrides=None, + config_file=None, + renderer=StreamRenderer(json_mode=True, out=io.StringIO()), + follow=None, + llm_prompts=[], + model="m", + max_tokens=1, + save_plan=plan, + ) + session._finalize_save_dir(plan) # no run() -> _capture_start stayed None + + assert captured["duration_seconds"] == 0 + + +def test_save_dir_auto_name_failure_keeps_recording(monkeypatch, tmp_path): + # A failed --auto-name title call must not lose the (already-saved) recording: the + # error is warned and write_outputs still runs, with no title. + import io + + from aai_cli.core.errors import APIError + from aai_cli.streaming import savedir as savedir_mod + from aai_cli.streaming import session as session_mod + from aai_cli.streaming.render import StreamRenderer + + def boom(*_a, **_k): + raise APIError("gateway down") + + captured: dict[str, object] = {} + monkeypatch.setattr(savedir_mod, "derive_title", boom) + monkeypatch.setattr( + savedir_mod, "write_outputs", lambda plan, **kw: captured.update(kw) or plan.paths + ) + monkeypatch.setattr( + session_mod.client, + "stream_audio", + lambda api_key, source, *, on_turn, **k: [b"".join(source), on_turn(_turn("hi"))], + ) + session = session_mod.StreamSession( + api_key="sk", + base_flags={"speech_model": "u3-rt-pro"}, + overrides=None, + config_file=None, + renderer=StreamRenderer(json_mode=True, out=io.StringIO()), + follow=None, + llm_prompts=[], + model="m", + max_tokens=1, + save_plan=_save_plan(tmp_path, auto_name=True, name=None), + ) + session.run([b"\x00\x00"], 16000) + + assert captured["title"] is None # finalize still ran, just without a derived title + + +def _finalize_probe_session(monkeypatch, *, stream_audio, save_plan=None): + """A StreamSession whose write_outputs is replaced with a call-recorder. + + Returns ``(session, calls)`` where ``calls`` flips to ``[True]`` if finalize ran. + """ + import io + + from aai_cli.streaming import savedir as savedir_mod + from aai_cli.streaming import session as session_mod + from aai_cli.streaming.render import StreamRenderer + + calls: list[bool] = [] + monkeypatch.setattr( + savedir_mod, "write_outputs", lambda plan, **kw: calls.append(True) or plan.paths + ) + monkeypatch.setattr(session_mod.client, "stream_audio", stream_audio) + session = session_mod.StreamSession( + api_key="sk", + base_flags={"speech_model": "u3-rt-pro"}, + overrides=None, + config_file=None, + renderer=StreamRenderer(json_mode=True, out=io.StringIO()), + follow=None, + llm_prompts=[], + model="m", + max_tokens=1, + save_plan=save_plan, + ) + return session, calls + + +def test_save_dir_finalizes_on_ctrl_c(monkeypatch, tmp_path): + # Ctrl-C ends a recording via typer.Exit(130); a --save-dir capture is still + # finalized (sidecar/auto-name/note) before that cancel exit propagates. + import typer + + from aai_cli.core import errors + + def interrupt(api_key, source, *, on_turn, **k): + on_turn(_turn("hi")) + raise KeyboardInterrupt + + session, calls = _finalize_probe_session( + monkeypatch, stream_audio=interrupt, save_plan=_save_plan(tmp_path) + ) + with pytest.raises(typer.Exit) as excinfo: + session.run([b"\x00\x00"], 16000) + assert excinfo.value.exit_code == errors.CANCELLED_EXIT_CODE + assert calls == [True] # finalize ran despite the interrupt + + +def test_save_dir_skips_finalize_on_broken_pipe(monkeypatch, tmp_path): + # A closed downstream pipe (exit 0, not a user stop) is not a recording the user + # chose to keep, so finalize is skipped — only the cancel exit triggers it. + import typer + + def pipe_closed(api_key, source, *, on_turn, **k): + raise BrokenPipeError + + session, calls = _finalize_probe_session( + monkeypatch, stream_audio=pipe_closed, save_plan=_save_plan(tmp_path) + ) + with pytest.raises(typer.Exit) as excinfo: + session.run([b"\x00\x00"], 16000) + assert excinfo.value.exit_code == 0 + assert calls == [] # no finalize on a broken pipe + + +def test_ctrl_c_without_save_dir_does_not_finalize(monkeypatch): + # With no --save-dir plan, Ctrl-C just exits 130 — the finalize guard must not fire + # (pins the `save_plan is not None and …` short-circuit). + import typer + + from aai_cli.core import errors + + def interrupt(api_key, source, *, on_turn, **k): + raise KeyboardInterrupt + + session, calls = _finalize_probe_session(monkeypatch, stream_audio=interrupt, save_plan=None) + with pytest.raises(typer.Exit) as excinfo: + session.run([b"\x00\x00"], 16000) + assert excinfo.value.exit_code == errors.CANCELLED_EXIT_CODE + assert calls == [] # no plan -> no finalize, and no crash def test_stream_session_listening_notice_latches(monkeypatch): diff --git a/tests/test_streaming_naming.py b/tests/test_streaming_naming.py index 7f135a25..29248dc4 100644 --- a/tests/test_streaming_naming.py +++ b/tests/test_streaming_naming.py @@ -39,6 +39,16 @@ def test_resolve_buckets_by_date_with_slugged_name(): assert paths.audio == Path("rec/2026-06-16/2026-06-16-143005-my-meeting.wav") +def test_resolve_derives_note_and_sidecar_from_the_same_stem(): + # The .md note and .aai.json sidecar share the transcript's stem so a browse UI can + # match all four files of one recording by name (pins each suffix). + paths = naming.resolve(Path("rec"), "My Meeting", now=NOW) + bucket = Path("rec/2026-06-16") + assert paths.note == bucket / "2026-06-16-143005-my-meeting.md" + assert paths.sidecar == bucket / "2026-06-16-143005-my-meeting.aai.json" + assert paths.directory == bucket + + def test_resolve_without_name_is_just_the_timestamp(): # No --name (or a name that slugs to nothing) -> the stem is the bare timestamp, # never a trailing-hyphen filename. diff --git a/tests/test_streaming_savedir.py b/tests/test_streaming_savedir.py new file mode 100644 index 00000000..888671ea --- /dev/null +++ b/tests/test_streaming_savedir.py @@ -0,0 +1,273 @@ +"""Unit tests for aai_cli.streaming.savedir — the --save-dir finalization. + +``write_outputs`` is pure file I/O (no gateway), so the rename/note/sidecar behavior +is asserted directly; the LLM title call (``derive_title``) is exercised against a +patched ``llm.run_chain``. +""" + +from __future__ import annotations + +import dataclasses +import json +from datetime import datetime +from pathlib import Path + +import pytest + +from aai_cli.core.errors import CLIError +from aai_cli.streaming import naming, savedir + +NOW = datetime(2026, 6, 16, 14, 30, 5) + + +def _plan(tmp_path: Path, *, name=None, auto_name=False, write_note=False): + return savedir.SaveDirPlan( + save_dir=tmp_path / "rec", + now=NOW, + name=name, + auto_name=auto_name, + write_note=write_note, + ) + + +def _seed(plan: savedir.SaveDirPlan, *, audio=True) -> tuple[naming.SavePaths, list[Path]]: + """Create the provisional .txt (+ .wav when audio is saved) a live run would leave. + + Returns the provisional paths and the audio-file list to hand ``write_outputs`` (the + session passes the WAV(s) it actually teed, or none under ``--no-save-audio``). + """ + paths = plan.paths + naming.ensure_dir(paths.directory) + paths.transcript.write_text("Speaker A: hello\n", encoding="utf-8") + if not audio: + return paths, [] + paths.audio.write_bytes(b"RIFFFAKE") + return paths, [paths.audio] + + +def test_save_dir_plan_is_immutable(tmp_path): + plan = _plan(tmp_path, auto_name=True) + field_name = "auto_name" + with pytest.raises(dataclasses.FrozenInstanceError): + setattr(plan, field_name, False) + + +def test_write_outputs_writes_sidecar_with_metadata(tmp_path): + # The sidecar carries title/date/duration/speakers/turns plus the file names, so a + # list UI needs no transcript parsing (a whole-dict assert kills key/value mutants). + plan = _plan(tmp_path, name="My Meeting") + _, audio = _seed(plan) + + final = savedir.write_outputs( + plan, title=None, note=None, audio=audio, speakers=["A", "B"], duration_seconds=42, turns=3 + ) + + raw = final.sidecar.read_text(encoding="utf-8") + assert '\n "title"' in raw # pretty-printed with a 2-space indent + assert json.loads(raw) == { + "title": "My Meeting", + "date": "2026-06-16T14:30:05", + "duration_seconds": 42, + "speakers": ["A", "B"], + "turns": 3, + "transcript": "2026-06-16-143005-my-meeting.txt", + "audio": ["2026-06-16-143005-my-meeting.wav"], + "note": None, + } + + +def test_write_outputs_empty_audio_list_when_not_saved(tmp_path): + # --no-save-audio: the sidecar's "audio" is an empty list, not a phantom WAV name. + plan = _plan(tmp_path, name="Talk") + _seed(plan, audio=False) + + final = savedir.write_outputs( + plan, title=None, note=None, audio=[], speakers=[], duration_seconds=0, turns=0 + ) + + assert json.loads(final.sidecar.read_text(encoding="utf-8"))["audio"] == [] + + +def test_write_outputs_lists_each_channel_wav_for_system_audio(tmp_path): + # --save-dir --system-audio tees one WAV per channel; the sidecar lists both, and + # --auto-name re-stems each alongside the transcript. + plan = _plan(tmp_path, auto_name=True) + paths = plan.paths + naming.ensure_dir(paths.directory) + paths.transcript.write_text("You: hi\nSystem: hi\n", encoding="utf-8") + you = naming.channel_audio(paths.audio, "you") + system = naming.channel_audio(paths.audio, "system") + you.write_bytes(b"YOU0") + system.write_bytes(b"SYS0") + + final = savedir.write_outputs( + plan, + title="Sync Up", + note=None, + audio=[you, system], + speakers=[], + duration_seconds=3, + turns=2, + ) + + record = json.loads(final.sidecar.read_text(encoding="utf-8")) + assert record["audio"] == [ + "2026-06-16-143005-sync-up-you.wav", + "2026-06-16-143005-sync-up-system.wav", + ] + # Both channel WAVs moved to the new stem; the old-stem files are gone. + assert (final.directory / "2026-06-16-143005-sync-up-you.wav").read_bytes() == b"YOU0" + assert (final.directory / "2026-06-16-143005-sync-up-system.wav").read_bytes() == b"SYS0" + assert not you.exists() + + +def test_write_outputs_writes_llm_note_as_markdown(tmp_path): + # With --llm a note lands as .md next to the transcript, and the sidecar + # records its name. + plan = _plan(tmp_path, name="Talk", write_note=True) + _, audio = _seed(plan) + + final = savedir.write_outputs( + plan, + title=None, + note="- ship the release", + audio=audio, + speakers=[], + duration_seconds=1, + turns=1, + ) + + assert final.note.read_text(encoding="utf-8") == "- ship the release\n" + assert json.loads(final.sidecar.read_text(encoding="utf-8"))["note"] == final.note.name + + +def test_write_outputs_skips_note_file_when_no_answer(tmp_path): + # write_note was requested but no answer was produced (no turns) -> no .md, null in sidecar. + plan = _plan(tmp_path, name="Talk", write_note=True) + _, audio = _seed(plan) + + final = savedir.write_outputs( + plan, title=None, note=None, audio=audio, speakers=[], duration_seconds=0, turns=0 + ) + + assert not final.note.exists() + assert json.loads(final.sidecar.read_text(encoding="utf-8"))["note"] is None + + +def test_write_outputs_renames_files_for_auto_name(tmp_path): + # --auto-name: the provisional timestamp-only files are renamed to carry the title slug; + # the WAV moves with the transcript and the old stem is gone. + plan = _plan(tmp_path, auto_name=True) + provisional, audio = _seed(plan) + + final = savedir.write_outputs( + plan, + title="Quarterly Review", + note=None, + audio=audio, + speakers=[], + duration_seconds=5, + turns=2, + ) + + assert final.transcript.name == "2026-06-16-143005-quarterly-review.txt" + assert final.transcript.read_text(encoding="utf-8") == "Speaker A: hello\n" + assert final.audio.name == "2026-06-16-143005-quarterly-review.wav" + assert final.audio.read_bytes() == b"RIFFFAKE" + assert not provisional.transcript.exists() + assert not provisional.audio.exists() + assert json.loads(final.sidecar.read_text(encoding="utf-8"))["title"] == "Quarterly Review" + + +def test_write_outputs_keeps_timestamp_when_title_slugs_to_nothing(tmp_path): + # An auto-name title with no usable characters slugs to "" -> keep the timestamp stem + # rather than rename to a bare-hyphen name. + plan = _plan(tmp_path, auto_name=True) + provisional, audio = _seed(plan) + + final = savedir.write_outputs( + plan, title="???", note=None, audio=audio, speakers=[], duration_seconds=0, turns=1 + ) + + assert final.transcript == provisional.transcript + assert final.transcript.exists() + + +def test_write_outputs_skips_missing_wav_on_rename(tmp_path): + # --auto-name with a recording that left no WAV (e.g. interrupted before audio) renames + # the transcript without crashing on the absent audio file. + plan = _plan(tmp_path, auto_name=True) + paths = plan.paths + naming.ensure_dir(paths.directory) + paths.transcript.write_text("hi\n", encoding="utf-8") # no .wav written + + final = savedir.write_outputs( + plan, + title="A Title", + note=None, + audio=[paths.audio], + speakers=[], + duration_seconds=0, + turns=1, + ) + + assert final.transcript.read_text(encoding="utf-8") == "hi\n" + assert not final.audio.exists() + + +def test_write_outputs_ignores_title_without_auto_name(tmp_path): + # A title is only adopted under --auto-name; without it the explicit --name stem wins + # and the sidecar keeps that name (pins the `auto_name and title` selection). + plan = _plan(tmp_path, name="Orig", auto_name=False) + provisional, audio = _seed(plan) + + final = savedir.write_outputs( + plan, title="Ignored", note=None, audio=audio, speakers=[], duration_seconds=0, turns=0 + ) + + assert final.stem == provisional.stem + assert json.loads(final.sidecar.read_text(encoding="utf-8"))["title"] == "Orig" + + +def test_write_outputs_raises_clean_error_when_sidecar_write_blocked(tmp_path): + # A failed sidecar write (its directory was never created) is a save_dir_path CLIError, + # not a raw OSError — pins the _write error wrapper. + plan = _plan(tmp_path, name="Talk") + # No ensure_dir / provisional files: the bucket directory doesn't exist, so writing + # the sidecar into it fails. + with pytest.raises(CLIError) as excinfo: + savedir.write_outputs( + plan, title=None, note=None, audio=[], speakers=[], duration_seconds=0, turns=0 + ) + assert excinfo.value.error_type == "save_dir_path" + assert excinfo.value.exit_code == 2 + + +def test_write_outputs_raises_clean_error_when_rename_blocked(tmp_path): + # A failed rename surfaces as a save_dir_path CLIError, not a raw OSError. + plan = _plan(tmp_path, auto_name=True) + naming.ensure_dir(plan.paths.directory) + # No provisional transcript exists, so the rename of a missing file fails. + with pytest.raises(CLIError) as excinfo: + savedir.write_outputs( + plan, title="A Title", note=None, audio=[], speakers=[], duration_seconds=0, turns=0 + ) + assert excinfo.value.error_type == "save_dir_path" + assert excinfo.value.exit_code == 2 + + +def test_derive_title_strips_and_prompts_over_transcript(monkeypatch): + # derive_title runs the title prompt over the transcript text and trims the answer. + seen = {} + + def fake_run_chain(api_key, prompts, *, transcript_text, model, max_tokens): + seen["prompts"] = prompts + seen["transcript_text"] = transcript_text + return " Quarterly Review \n" + + monkeypatch.setattr(savedir.llm, "run_chain", fake_run_chain) + title = savedir.derive_title("sk", "we shipped the release", model="m", max_tokens=32) + + assert title == "Quarterly Review" + assert seen["prompts"] == [savedir.TITLE_PROMPT] + assert seen["transcript_text"] == "we shipped the release"