From 508a17f265ac338c922d4e91d97c2cac455eeb64 Mon Sep 17 00:00:00 2001 From: Claude Date: Tue, 16 Jun 2026 22:57:11 +0000 Subject: [PATCH] Add stream --save-dir summarize-on-capture, sidecar metadata, and --auto-name MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fold the post-capture index step into `assembly stream --save-dir` so a wrapper script no longer needs an index loop: - `--llm "…"` alongside `--save-dir` writes the final prompt-chain answer as a `.md` note next to the auto-named transcript (summarize-on-capture). - a `.aai.json` sidecar (title, date, duration, speakers, turns, file names) lands beside every recording so a list/browse UI needs no transcript parsing, and `--no-save-audio` keeps the transcript without the WAV. - `--auto-name` derives the filename slug from the transcript via the LLM and renames the files once the stream ends (mutually exclusive with `--name`). The --save-dir lifecycle lives in the new streaming/savedir.py (pure file I/O, unit-tested without a gateway); the batch driver moves to streaming/batch.py to keep session.py under the line limit. Docs updated in REFERENCE.md. Co-Authored-By: Claude Opus 4.8 Claude-Session: https://claude.ai/code/session_01KNx966tACLPYX4B5jkcfqp --- REFERENCE.md | 20 ++ aai_cli/commands/stream/__init__.py | 18 ++ aai_cli/commands/stream/_exec.py | 53 ++++- aai_cli/streaming/batch.py | 99 ++++++++ aai_cli/streaming/naming.py | 37 ++- aai_cli/streaming/savedir.py | 156 +++++++++++++ aai_cli/streaming/session.py | 119 +++++----- .../test_snapshots_help_run.ambr | 7 + tests/_stream_helpers.py | 113 +++++++++ tests/test_stream_batch.py | 28 +++ tests/test_stream_exec.py | 169 +------------- tests/test_stream_save_dir.py | 134 +++++++++++ tests/test_stream_session.py | 215 +++++++++++++++++ tests/test_streaming_naming.py | 10 + tests/test_streaming_savedir.py | 218 ++++++++++++++++++ 15 files changed, 1161 insertions(+), 235 deletions(-) create mode 100644 aai_cli/streaming/batch.py create mode 100644 aai_cli/streaming/savedir.py create mode 100644 tests/_stream_helpers.py create mode 100644 tests/test_stream_save_dir.py create mode 100644 tests/test_streaming_savedir.py diff --git a/REFERENCE.md b/REFERENCE.md index 6a89200b..1d56fbf4 100644 --- a/REFERENCE.md +++ b/REFERENCE.md @@ -98,3 +98,23 @@ object per dataset (not NDJSON; a single dataset is therefore one object): the row's `llm` key (the WER score still uses the raw transcript), and `--llm-reduce` runs one prompt over every item's result and adds a top-level `reduce` (`{"model","prompts","output"}`) to the object. + +## Recording streams to disk + +`assembly stream --save-dir DIR` auto-names a capture under `DIR/YYYY-MM-DD/` +with a timestamped stem (`YYYY-MM-DD-HHMMSS[-slug]`) shared across every file it +writes: + +- `.txt` — the transcript, one finalized turn per line (flushed live). +- `.wav` — the recorded audio, 16-bit mono PCM. Suppress it with + `--no-save-audio` to keep only the text. +- `.md` — written when `--llm "…"` is also passed: the final answer of the + live prompt chain, captured as a note next to the transcript. +- `.aai.json` — a metadata sidecar so a list/browse UI needs no transcript + parsing: `{"title", "date", "duration_seconds", "speakers", "turns", + "transcript", "audio", "note"}` (`audio`/`note` are `null` when not written). + +`--name "Title"` slugs an explicit title into the stem; `--auto-name` instead +derives that title from the transcript via the LLM Gateway once the stream ends, +renaming the files to match (the timestamp stem is kept if the title is empty). +The two are mutually exclusive. diff --git a/aai_cli/commands/stream/__init__.py b/aai_cli/commands/stream/__init__.py index 7fb00569..1529b64e 100644 --- a/aai_cli/commands/stream/__init__.py +++ b/aai_cli/commands/stream/__init__.py @@ -39,6 +39,10 @@ "Auto-name the transcript + WAV under a dir", 'assembly stream --save-dir ~/recordings --name "Standup"', ), + ( + "Name from content + save a summary note", + 'assembly stream --save-dir ~/recordings --auto-name --llm "summarize as a note"', + ), ( "Boost domain terms with keyterm prompts", 'assembly stream --keyterms-prompt "AssemblyAI" --keyterms-prompt "Claude"', @@ -121,6 +125,18 @@ def stream( help="Title to slug into the --save-dir filename (e.g. a meeting title)", rich_help_panel=help_panels.OPT_SAVING, ), + auto_name: bool = typer.Option( + False, + "--auto-name", + help="With --save-dir, derive the filename from the transcript via the LLM", + rich_help_panel=help_panels.OPT_SAVING, + ), + no_save_audio: bool = typer.Option( + False, + "--no-save-audio", + help="With --save-dir, skip the WAV and save only the transcript", + rich_help_panel=help_panels.OPT_SAVING, + ), # model & input speech_model: SpeechModel = typer.Option( DEFAULT_SPEECH_MODEL, @@ -398,5 +414,7 @@ def stream( save_transcript=save_transcript, save_dir=save_dir, name=name, + auto_name=auto_name, + no_save_audio=no_save_audio, ) run_with_options(ctx, stream_exec.run_stream, opts, json=json_out) diff --git a/aai_cli/commands/stream/_exec.py b/aai_cli/commands/stream/_exec.py index c0bc8b43..409751f0 100644 --- a/aai_cli/commands/stream/_exec.py +++ b/aai_cli/commands/stream/_exec.py @@ -23,14 +23,14 @@ from aai_cli.core import choices, client, config_builder, stdio, youtube from aai_cli.core.errors import UsageError, mutually_exclusive from aai_cli.core.microphone import MicrophoneSource -from aai_cli.streaming import naming, record, transcript, turn_presets +from aai_cli.streaming import naming, record, savedir, transcript, turn_presets +from aai_cli.streaming.batch import stream_batch_sources from aai_cli.streaming.macos import MacSystemAudioSource from aai_cli.streaming.render import StreamRenderer from aai_cli.streaming.session import ( SourceOptions, StreamSession, resolve_output_modes, - stream_batch_sources, validate_sources, ) from aai_cli.streaming.sources import TARGET_RATE, FileSource, StdinSource @@ -90,6 +90,8 @@ class StreamOptions: save_transcript: Path | None save_dir: Path | None name: str | None + auto_name: bool + no_save_audio: bool def source_options(self) -> SourceOptions: """The audio-input subset, in the shape the validation/dispatch helpers read.""" @@ -200,12 +202,13 @@ def _reject_save_with_show_code(opts: StreamOptions) -> None: def _resolve_save_targets( opts: StreamOptions, sources: SourceOptions -) -> tuple[Path | None, Path | None]: - """Resolve the save flags into the (audio, transcript) paths the session writes. +) -> tuple[Path | None, Path | None, savedir.SaveDirPlan | None]: + """Resolve the save flags into the (audio, transcript, save-dir plan) the session uses. - ``--save-dir`` owns filename assembly — it auto-names both the transcript and a - matching WAV under ``DIR/YYYY-MM-DD/`` — so it can't be combined with the explicit - ``--save-audio``/``--save-transcript`` paths, and ``--name`` only feeds that assembly. + ``--save-dir`` owns filename assembly — it auto-names the transcript, a matching WAV, + a ``.aai.json`` sidecar, and (with ``--llm``) a ``.md`` note under ``DIR/YYYY-MM-DD/`` + — so it can't be combined with the explicit ``--save-audio``/``--save-transcript`` + paths, and ``--name``/``--auto-name``/``--no-save-audio`` only feed that assembly. Audio can't tee to a single WAV under ``--system-audio`` (two streams), which rejects both the explicit ``--save-audio`` and ``--save-dir``'s audio leg. """ @@ -216,6 +219,12 @@ def _resolve_save_targets( ("--save-transcript", opts.save_transcript is not None), suggestion="--save-dir names the files for you; drop the explicit path.", ) + mutually_exclusive( + ("--name", opts.name is not None), + ("--auto-name", opts.auto_name), + suggestion="Both set the title — pass --name for an explicit one or " + "--auto-name to derive it from the transcript.", + ) if sources.from_system_audio: raise UsageError( "--save-dir cannot be combined with --system-audio; the mic and system " @@ -225,15 +234,32 @@ def _resolve_save_targets( # Local wall-clock time (what a meeting filename wants); the explicit utc-then- # astimezone keeps the now() call timezone-aware for the linter. now = datetime.now(UTC).astimezone() - paths = naming.resolve(opts.save_dir, opts.name, now=now) - naming.ensure_dir(paths.transcript.parent) - return paths.audio, paths.transcript + plan = savedir.SaveDirPlan( + save_dir=opts.save_dir, + now=now, + name=opts.name, + auto_name=opts.auto_name, + save_audio=not opts.no_save_audio, + write_note=bool(opts.llm_prompt), + ) + naming.ensure_dir(plan.paths.directory) + return (plan.paths.audio if plan.save_audio else None), plan.paths.transcript, plan if opts.name is not None: raise UsageError( "--name applies only with --save-dir.", suggestion="Pass --save-dir DIR to auto-name the files, " "or --save-transcript PATH for an explicit path.", ) + if opts.auto_name: + raise UsageError( + "--auto-name applies only with --save-dir.", + suggestion="Pass --save-dir DIR so there's an auto-named file to title.", + ) + if opts.no_save_audio: + raise UsageError( + "--no-save-audio applies only with --save-dir.", + suggestion="Omit --save-audio to skip the WAV, or pass --save-dir DIR.", + ) if opts.save_audio is not None: if sources.from_system_audio: raise UsageError( @@ -244,7 +270,7 @@ def _resolve_save_targets( record.validate_target(opts.save_audio) if opts.save_transcript is not None: transcript.validate_target(opts.save_transcript) - return opts.save_audio, opts.save_transcript + return opts.save_audio, opts.save_transcript, None def _dispatch(session: StreamSession, opts: SourceOptions) -> None: @@ -320,6 +346,8 @@ def _collect_batch_sources(opts: StreamOptions, *, text_mode: bool) -> list[str] ("--save-transcript", opts.save_transcript is not None), ("--save-dir", opts.save_dir is not None), ("--name", opts.name is not None), + ("--auto-name", opts.auto_name), + ("--no-save-audio", opts.no_save_audio), suggestion="--from-stdin streams many sources; saving applies to a single run.", ) mutually_exclusive( @@ -389,7 +417,7 @@ def run_stream(opts: StreamOptions, state: AppState, *, json_mode: bool) -> None # Validate the requested sources (including that a local file exists) before # credentials, so a typo'd path reads as "file not found" — not as a login. validate_sources(sources, has_llm=bool(opts.llm_prompt), text_mode=text_mode) - save_audio, save_transcript = _resolve_save_targets(opts, sources) + save_audio, save_transcript, save_plan = _resolve_save_targets(opts, sources) if sources.from_file and not sources.from_stdin: client.resolve_audio_source(sources.source, sample=sources.sample) api_key = state.resolve_api_key() @@ -407,6 +435,7 @@ def run_stream(opts: StreamOptions, state: AppState, *, json_mode: bool) -> None max_tokens=opts.max_tokens, save_audio=save_audio, save_transcript=save_transcript, + save_plan=save_plan, llm_interval=opts.llm_interval, ) _dispatch(session, sources) diff --git a/aai_cli/streaming/batch.py b/aai_cli/streaming/batch.py new file mode 100644 index 00000000..4a3b629e --- /dev/null +++ b/aai_cli/streaming/batch.py @@ -0,0 +1,99 @@ +"""Drive a ``assembly stream --from-stdin`` list of sources, one realtime session each. + +The realtime API is one session at a time, so a list of files/URLs (read on stdin, +one per line) streams sequentially. This lives beside ``StreamSession`` rather than +inside it: a session owns *one* run, while this owns the sequence — fresh session per +source, per-source failure accounting, and the batch-wide Ctrl-C/pipe handling. +""" + +from __future__ import annotations + +from collections.abc import Callable, Iterable + +import typer + +from aai_cli.core.errors import CLIError, NotAuthenticated +from aai_cli.streaming.render import StreamRenderer +from aai_cli.streaming.session import StreamSession +from aai_cli.ui import output + +# A batch source string resolved to its real-time audio chunks and declared rate. +_OpenedSource = tuple[Iterable[bytes], int] + + +def _stream_source( + source: str, + *, + index: int, + total: int, + make_session: Callable[[], StreamSession], + open_source: Callable[[str], _OpenedSource], + renderer: StreamRenderer, + json_mode: bool, +) -> bool: + """Stream one batch source in its own session; return True when it failed. + + A ``CLIError`` (bad path, missing ffmpeg, decode failure) is recorded as a warning + so the batch carries on — except ``NotAuthenticated``, which re-raises to abort the + whole batch (one rejected key fails every source identically, and auto-login should + trigger once). + """ + renderer.source(source, index=index, total=total) + try: + audio, rate = open_source(source) + make_session().run(audio, rate, handle_interrupt=False) + except NotAuthenticated: + raise + except CLIError as exc: + output.emit_warning(f"{source}: {exc.message}", json_mode=json_mode) + return True + else: + return False + + +def stream_batch_sources( + sources: list[str], + *, + make_session: Callable[[], StreamSession], + open_source: Callable[[str], _OpenedSource], + renderer: StreamRenderer, + json_mode: bool, +) -> None: + """Stream each source in ``sources`` in turn — the ``assembly stream --from-stdin`` + batch mode. + + The realtime API is one session at a time, so a list of files/URLs streams + sequentially: each source gets a fresh ``StreamSession`` from ``make_session`` (its + own transcript and ``--llm`` chain state) via ``_stream_source``. + + A Ctrl-C or a closed downstream pipe stops the batch cleanly (exit 0). When any + source failed, raises a ``CLIError`` at the end so a script can trust the exit code. + """ + total = len(sources) + failures = 0 + try: + for index, source in enumerate(sources, start=1): + failures += _stream_source( + source, + index=index, + total=total, + make_session=make_session, + open_source=open_source, + renderer=renderer, + json_mode=json_mode, + ) + except KeyboardInterrupt: + # One Ctrl-C stops the whole batch, not just the current source -> exit 0. + renderer.stopped() + return + except BrokenPipeError: + # Downstream consumer (e.g. `| head`) closed the pipe; stop quietly. + raise typer.Exit(code=0) from None + finally: + renderer.close() + if failures: + raise CLIError( + f"{failures} of {total} sources failed.", + error_type="batch_failed", + suggestion="Check each failed path or URL, then re-run.", + ) diff --git a/aai_cli/streaming/naming.py b/aai_cli/streaming/naming.py index 24a4d4d7..b4271d50 100644 --- a/aai_cli/streaming/naming.py +++ b/aai_cli/streaming/naming.py @@ -41,24 +41,49 @@ def _stem(now: datetime, name: str | None) -> str: return f"{stamp}-{slug}" if slug else stamp +# The sidecar's extension, mirroring batch transcribe's ``.aai.json`` so a +# browse/list UI can recognize a stream recording's metadata file the same way. +SIDECAR_SUFFIX = ".aai.json" + + @dataclass(frozen=True) class SavePaths: - """The auto-assembled transcript path and its matching audio path (same stem).""" + """The auto-assembled output paths for one recording, all sharing a stem. + + The transcript ``.txt`` and matching audio ``.wav`` plus the optional ``--llm`` + note ``.md`` and the ``.aai.json`` metadata sidecar — each is derived from the + same ``directory``/``stem`` so they land together and stay matchable by name. + """ + + directory: Path + stem: str + + @property + def transcript(self) -> Path: + return self.directory / f"{self.stem}.txt" + + @property + def audio(self) -> Path: + return self.directory / f"{self.stem}.wav" + + @property + def note(self) -> Path: + return self.directory / f"{self.stem}.md" - transcript: Path - audio: Path + @property + def sidecar(self) -> Path: + return self.directory / f"{self.stem}{SIDECAR_SUFFIX}" def resolve(save_dir: Path, name: str | None, *, now: datetime) -> SavePaths: - """Build ``DIR/YYYY-MM-DD/.{txt,wav}`` for ``now`` and ``--name``. + """Build the ``DIR/YYYY-MM-DD/`` paths for ``now`` and ``--name``. The date bucket and the stem both carry the date so a transcript stays self-describing if it is later moved out of its bucket. Path assembly only — creating the directory is the caller's job (see ``ensure_dir``). """ bucket = save_dir / now.strftime("%Y-%m-%d") - stem = _stem(now, name) - return SavePaths(transcript=bucket / f"{stem}.txt", audio=bucket / f"{stem}.wav") + return SavePaths(directory=bucket, stem=_stem(now, name)) def ensure_dir(path: Path) -> None: diff --git a/aai_cli/streaming/savedir.py b/aai_cli/streaming/savedir.py new file mode 100644 index 00000000..ef9114dc --- /dev/null +++ b/aai_cli/streaming/savedir.py @@ -0,0 +1,156 @@ +"""The `assembly stream --save-dir` capture lifecycle: auto-name, note, and sidecar. + +``--save-dir`` already auto-names a transcript + WAV under ``DIR/YYYY-MM-DD/`` +(see ``naming``). This module folds the steps a wrapper script used to bolt on +afterwards into capture time, so nothing downstream needs an index pass: + +- ``--auto-name`` derives the filename slug from the transcript itself (via the + LLM), so a recording is meaningfully named with no calendar or manual title. +- ``--llm`` alongside ``--save-dir`` writes its final answer as a ``.md`` note + next to the transcript — a summary produced as the audio is captured. +- a ``.aai.json`` sidecar (title, date, duration, speakers, turns) lands beside + every recording so a list/browse UI shows rich info without parsing transcripts. + +``write_outputs`` is pure file I/O (no network), so the rename/note/sidecar +behavior is unit-tested without a gateway; the LLM title call lives in +``derive_title`` and is injected past it. +""" + +from __future__ import annotations + +import json +from dataclasses import dataclass +from datetime import datetime +from pathlib import Path + +from aai_cli.core import llm +from aai_cli.core.errors import CLIError +from aai_cli.streaming import naming + +# Asks for a short headline only — kept terse so a small/cheap model returns a clean +# line we can slug, not a paragraph. The transcript is appended by build_messages. +TITLE_PROMPT = ( + "Write a short, descriptive title (3 to 7 words) for this transcript. " + "Reply with only the title — no quotes, no surrounding punctuation." +) + + +@dataclass(frozen=True) +class SaveDirPlan: + """The resolved ``--save-dir`` intent, the data ``write_outputs`` finalizes from. + + ``now`` is captured once up front so the live transcript file, the post-stream + rename, and the sidecar's ``date`` all agree on a single timestamp. + """ + + save_dir: Path + now: datetime + name: str | None + auto_name: bool + save_audio: bool + write_note: bool + + @property + def paths(self) -> naming.SavePaths: + """The provisional paths the live run writes to (before any ``--auto-name`` rename).""" + return naming.resolve(self.save_dir, self.name, now=self.now) + + +def derive_title(api_key: str, transcript_text: str, *, model: str, max_tokens: int) -> str: + """Ask the LLM for a short headline for ``transcript_text`` (the ``--auto-name`` title). + + Returns the raw title text; ``naming.resolve`` slugs it into the filename, so an + unusable answer (all punctuation) simply collapses to the bare timestamp stem. + """ + return llm.run_chain( + api_key, + [TITLE_PROMPT], + transcript_text=transcript_text, + model=model, + max_tokens=max_tokens, + ).strip() + + +def _rename(src: Path, dst: Path) -> None: + """Move a provisional capture file to its final auto-named path, as a clean CLIError.""" + try: + src.rename(dst) + except OSError as exc: + raise CLIError( + f"Cannot rename {src} to {dst}: {exc}", + error_type="save_dir_path", + exit_code=2, + ) from exc + + +def _write(path: Path, text: str) -> None: + """Write ``text`` to ``path`` (the note or sidecar), as a clean CLIError on failure.""" + try: + path.write_text(text, encoding="utf-8") + except OSError as exc: + raise CLIError( + f"Cannot write {path}: {exc}", + error_type="save_dir_path", + exit_code=2, + ) from exc + + +def _sidecar_record( + paths: naming.SavePaths, + *, + plan: SaveDirPlan, + title: str | None, + note_written: bool, + speakers: list[str], + duration_seconds: int, + turns: int, +) -> dict[str, object]: + """The ``.aai.json`` metadata: enough for a browse UI without parsing the transcript.""" + return { + "title": title, + "date": plan.now.isoformat(), + "duration_seconds": duration_seconds, + "speakers": speakers, + "turns": turns, + "transcript": paths.transcript.name, + "audio": paths.audio.name if plan.save_audio else None, + "note": paths.note.name if note_written else None, + } + + +def write_outputs( + plan: SaveDirPlan, + *, + title: str | None, + note: str | None, + speakers: list[str], + duration_seconds: int, + turns: int, +) -> naming.SavePaths: + """Finalize a ``--save-dir`` capture: rename for ``--auto-name``, write note + sidecar. + + ``title`` is the ``--auto-name`` headline (None when not requested); when it slugs + to a non-empty stem the provisional transcript/WAV are renamed to carry it. ``note`` + is the final ``--llm`` answer, written as ``.md`` when present. The sidecar is + always written. Returns the final paths. + """ + provisional = plan.paths + final_name = title if (plan.auto_name and title) else plan.name + final = naming.resolve(plan.save_dir, final_name, now=plan.now) + if final.stem != provisional.stem: + _rename(provisional.transcript, final.transcript) + if plan.save_audio and provisional.audio.exists(): + _rename(provisional.audio, final.audio) + if note is not None: + _write(final.note, note + "\n") + record = _sidecar_record( + final, + plan=plan, + title=final_name, + note_written=note is not None, + speakers=speakers, + duration_seconds=duration_seconds, + turns=turns, + ) + _write(final.sidecar, json.dumps(record, indent=2) + "\n") + return final diff --git a/aai_cli/streaming/session.py b/aai_cli/streaming/session.py index 71d0ce67..5618b530 100644 --- a/aai_cli/streaming/session.py +++ b/aai_cli/streaming/session.py @@ -13,11 +13,10 @@ from aai_cli.core.errors import ( APIError, CLIError, - NotAuthenticated, UsageError, mutually_exclusive, ) -from aai_cli.streaming import record +from aai_cli.streaming import record, savedir from aai_cli.streaming.render import StreamRenderer, speaker_prefix from aai_cli.streaming.transcript import TranscriptWriter from aai_cli.ui import output @@ -160,11 +159,22 @@ class StreamSession: # When set, write each finalized turn to this path as plain text (see TranscriptWriter). # Like save_audio, only the single-source path sets it; batch rejects --save-transcript. save_transcript: Path | None = None + # When set, run the --save-dir finalization (auto-name rename, --llm note, sidecar) + # once streaming ends. Only the single-source path sets it; batch rejects --save-dir. + save_plan: savedir.SaveDirPlan | None = None # Seconds between --llm summary refreshes; <=0 re-runs the chain on every turn. llm_interval: float = 0.0 # Monotonic clock, injectable so the interval throttle is deterministic in tests. clock: Callable[[], float] = time.monotonic transcript: list[str] = field(default_factory=list[str]) + # Finalized turn lines + diarized speakers, recorded for the --save-dir sidecar and + # --auto-name title regardless of --llm; only populated when save_plan is set. + _meta_lines: list[str] = field(default_factory=list[str]) + _meta_speakers: dict[str, None] = field(default_factory=dict[str, None]) + # Wall-clock capture window (via clock) → the sidecar's duration_seconds. + _capture_start: float | None = None + # The most recent --llm answer, written as the .md note at --save-dir finalization. + _last_answer: str | None = None # The open transcript-file writer for a single run; created/closed in _guarded so a # save target is opened once per session and a Ctrl-C still leaves a flushed file. _transcript_writer: TranscriptWriter | None = None @@ -201,6 +211,7 @@ def on_turn(self, event: object, *, source_label: str | None = None) -> None: line = _finalized_turn_line(event, source_label) if line is not None: self._save_line(line) + self._note_meta(event, line) else: # --llm mode locks only to record the turn; the chain re-runs (network) are # left unlocked so the other source's turns keep flowing during a refresh. @@ -215,6 +226,19 @@ def _save_line(self, line: str) -> None: if self._transcript_writer is not None: self._transcript_writer.write_turn(line) + def _note_meta(self, event: object, line: str) -> None: + """Record a finalized turn's text + speaker for the --save-dir sidecar/auto-name. + + A no-op unless --save-dir is active, so a plain run accumulates nothing. Called + under ``_callback_lock`` (like ``_save_line``) so the lists/sets stay consistent. + """ + if self.save_plan is None: + return + self._meta_lines.append(line) + speaker = getattr(event, "speaker_label", None) + if speaker is not None: + self._meta_speakers[str(speaker)] = None + def _record_turn(self, event: object, source_label: str | None) -> None: """Append a finalized turn to the running transcript (and the saved file), then refresh the --llm answer if a refresh is due (every turn, or once per @@ -225,6 +249,7 @@ def _record_turn(self, event: object, source_label: str | None) -> None: with self._callback_lock: self.transcript.append(line) self._save_line(line) + self._note_meta(event, line) self._maybe_summarize() def _maybe_summarize(self, *, final: bool = False) -> None: @@ -273,11 +298,16 @@ def _maybe_summarize(self, *, final: bool = False) -> None: f"[aai.muted]--llm refresh failed: {exc.message}[/aai.muted]" ) return + # Hold the latest answer so --save-dir can write it as the .md note at the end. + self._last_answer = answer follow(answer, turns) def stream_one( self, audio: Iterable[bytes], rate: int, *, source_label: str | None = None ) -> None: + if self._capture_start is None: + # First source opened → start the wall-clock window for the sidecar duration. + self._capture_start = self.clock() if self.save_audio is not None: # Tee verbatim to disk at the source's true rate before it hits the wire. audio = record.tee_wav(audio, self.save_audio, rate=rate) @@ -364,6 +394,35 @@ def run( lambda: self.stream_one(audio, rate, source_label=source_label), handle_interrupt=handle_interrupt, ) + # _guarded re-raises a stream error (skipping finalize) but returns normally on a + # clean stop or a Ctrl-C, so a stopped recording is still named + sidecared. + if self.save_plan is not None: + self._finalize_save_dir(self.save_plan) + + def _finalize_save_dir(self, plan: savedir.SaveDirPlan) -> None: + """Auto-name, write the --llm note, and drop the sidecar for a --save-dir capture.""" + transcript_text = " ".join(self._meta_lines) + title: str | None = None + if plan.auto_name and transcript_text: + try: + title = savedir.derive_title( + self.api_key, transcript_text, model=self.model, max_tokens=self.max_tokens + ) + except CLIError as exc: + # The recording is already saved under its timestamp stem; a failed title + # call shouldn't lose it, so warn and keep the timestamped name. + output.error_console.print( + f"[aai.muted]--auto-name failed: {exc.message}[/aai.muted]" + ) + duration = 0 if self._capture_start is None else round(self.clock() - self._capture_start) + savedir.write_outputs( + plan, + title=title, + note=self._last_answer if plan.write_note else None, + speakers=list(self._meta_speakers), + duration_seconds=duration, + turns=len(self._meta_lines), + ) def run_parallel(self, streams: _ParallelStreams) -> None: self._guarded(lambda: self._drive(streams)) @@ -401,59 +460,3 @@ def worker(source_label: str, audio: Iterable[bytes], rate: int) -> None: raise errors.get() if not errors.empty(): raise errors.get() - - -# A batch source string resolved to its real-time audio chunks and declared rate. -_OpenedSource = tuple[Iterable[bytes], int] - - -def stream_batch_sources( - sources: list[str], - *, - make_session: Callable[[], StreamSession], - open_source: Callable[[str], _OpenedSource], - renderer: StreamRenderer, - json_mode: bool, -) -> None: - """Stream each source in ``sources`` in turn — the ``assembly stream --from-stdin`` - batch mode. - - The realtime API is one session at a time, so a list of files/URLs streams - sequentially: each source gets a fresh ``StreamSession`` from ``make_session`` (its - own transcript and ``--llm`` chain state) and is announced via ``renderer.source`` - before its turns. ``open_source`` resolves a source string to ``(audio, rate)`` and - may raise ``CLIError`` (bad path, missing ffmpeg, decode failure), which is recorded - as a per-source failure so the batch carries on — except ``NotAuthenticated``, which - re-raises to abort the whole batch (one rejected key fails every source identically). - - A Ctrl-C or a closed downstream pipe stops the batch cleanly (exit 0). When any - source failed, raises a ``CLIError`` at the end so a script can trust the exit code. - """ - total = len(sources) - failures: list[str] = [] - try: - for index, source in enumerate(sources, start=1): - renderer.source(source, index=index, total=total) - try: - audio, rate = open_source(source) - make_session().run(audio, rate, handle_interrupt=False) - except NotAuthenticated: - raise - except CLIError as exc: - failures.append(source) - output.emit_warning(f"{source}: {exc.message}", json_mode=json_mode) - except KeyboardInterrupt: - # One Ctrl-C stops the whole batch, not just the current source -> exit 0. - renderer.stopped() - return - except BrokenPipeError: - # Downstream consumer (e.g. `| head`) closed the pipe; stop quietly. - raise typer.Exit(code=0) from None - finally: - renderer.close() - if failures: - raise CLIError( - f"{len(failures)} of {total} sources failed.", - error_type="batch_failed", - suggestion="Check each failed path or URL, then re-run.", - ) diff --git a/tests/__snapshots__/test_snapshots_help_run.ambr b/tests/__snapshots__/test_snapshots_help_run.ambr index ffc0ae1a..ea1cc5eb 100644 --- a/tests/__snapshots__/test_snapshots_help_run.ambr +++ b/tests/__snapshots__/test_snapshots_help_run.ambr @@ -761,6 +761,10 @@ │ timestamped file │ │ --name TEXT Title to slug into the --save-dir │ │ filename (e.g. a meeting title) │ + │ --auto-name With --save-dir, derive the filename │ + │ from the transcript via the LLM │ + │ --no-save-audio With --save-dir, skip the WAV and save │ + │ only the transcript │ ╰──────────────────────────────────────────────────────────────────────────────╯ ╭─ Model & Language ───────────────────────────────────────────────────────────╮ │ --speech-model [universal-streaming-m Streaming speech model │ @@ -868,6 +872,9 @@ $ assembly stream --save-transcript notes.txt Auto-name the transcript + WAV under a dir $ assembly stream --save-dir ~/recordings --name "Standup" + Name from content + save a summary note + $ assembly stream --save-dir ~/recordings --auto-name --llm "summarize as a + note" Boost domain terms with keyterm prompts $ assembly stream --keyterms-prompt "AssemblyAI" --keyterms-prompt "Claude" Summarize action items live as you talk diff --git a/tests/_stream_helpers.py b/tests/_stream_helpers.py new file mode 100644 index 00000000..274ae2d9 --- /dev/null +++ b/tests/_stream_helpers.py @@ -0,0 +1,113 @@ +"""Shared building blocks for the `assembly stream` run-path tests. + +Split out of test_stream_exec.py so the save-flag suites (test_stream_exec.py and +test_stream_save_dir.py) share one set of fakes — a mic, turn events, a frozen clock, +and the StreamOptions defaults — instead of duplicating them per file. +""" + +from __future__ import annotations + +from datetime import datetime + +from aai_cli.commands.stream import DEFAULT_SPEECH_MODEL +from aai_cli.commands.stream import _exec as stream_exec +from aai_cli.core import llm + +# The CLI's flag defaults, as data. Tests override per-case with dataclasses.replace. +DEFAULTS = stream_exec.StreamOptions( + source=None, + sample=False, + from_stdin=False, + sample_rate=None, + device=None, + system_audio=False, + system_audio_only=False, + speech_model=DEFAULT_SPEECH_MODEL, + encoding=None, + language_detection=None, + domain=None, + prompt=None, + keyterms_prompt=None, + end_of_turn_confidence_threshold=None, + min_turn_silence=None, + max_turn_silence=None, + turn_detection=None, + vad_threshold=None, + format_turns=None, + include_partial_turns=None, + speaker_labels=None, + max_speakers=None, + voice_focus=None, + voice_focus_threshold=None, + inactivity_timeout=None, + filter_profanity=None, + redact_pii=None, + redact_pii_policy=None, + redact_pii_sub=None, + webhook_url=None, + webhook_auth_header=None, + llm_prompt=None, + llm_interval=10.0, + model=llm.DEFAULT_MODEL, + max_tokens=llm.DEFAULT_MAX_TOKENS, + config_kv=None, + config_file=None, + output_field=None, + show_code=False, + save_audio=None, + save_transcript=None, + save_dir=None, + name=None, + auto_name=False, + no_save_audio=False, +) + + +class FakeMic: + """Mirrors MicrophoneSource's keyword signature (see microphone.py).""" + + def __init__(self, *, target_rate=None, device=None, capture_rate=None, on_open=None): + self.sample_rate = capture_rate or 16000 + self.device = device + + def __iter__(self): + return iter([b"\x00\x00"]) + + +class RecordingMic(FakeMic): + """A mic that yields known PCM so the tee'd WAV's contents can be asserted.""" + + PCM = b"\x01\x02\x03\x04\x05\x06\x07\x08" + + def __iter__(self): + return iter([self.PCM]) + + +class FakeTurn: + """A streaming turn event with just the attributes the session reads.""" + + def __init__(self, transcript, *, end_of_turn=True, speaker_label=None): + self.transcript = transcript + self.end_of_turn = end_of_turn + self.speaker_label = speaker_label + + +def emit_turns(*events): + """A fake client.stream_audio that drains the audio (driving any tee) then fires + each turn through the session's on_turn callback, like the real SDK reader.""" + + def _fake(api_key, source, *, params, on_turn, **_kwargs): + b"".join(source) # draining is what writes the tee'd WAV, if any + for event in events: + on_turn(event) + + return _fake + + +class FixedDatetime: + """Freezes datetime.now() so the auto-assembled filename is deterministic.""" + + @staticmethod + def now(*_args, **_kwargs): + # Naive local wall-clock; _exec's .astimezone() keeps the same 14:30:05. + return datetime(2026, 6, 16, 14, 30, 5) diff --git a/tests/test_stream_batch.py b/tests/test_stream_batch.py index da586b24..f97722e6 100644 --- a/tests/test_stream_batch.py +++ b/tests/test_stream_batch.py @@ -159,3 +159,31 @@ def fake_stream_audio(api_key, source, *, params, **_kwargs): result = runner.invoke(app, ["stream", "--from-stdin"], input="") assert result.exit_code == 2 assert "No sources received on stdin" in result.output + + +def test_stream_batch_sources_reports_exact_failure_count(): + # Every source failing raises a CLIError naming the exact count (pins the `failures +=` + # accumulator: a sign/operator slip would report a wrong total like "-2 of 2"). + import io + + import pytest + + from aai_cli.core.errors import CLIError + from aai_cli.streaming.batch import stream_batch_sources + from aai_cli.streaming.render import StreamRenderer + + def open_source(source): + raise CLIError(f"nope: {source}", error_type="file_not_found", exit_code=2) + + def make_session(): + raise AssertionError("a failed open must short-circuit before a session opens") + + with pytest.raises(CLIError) as excinfo: + stream_batch_sources( + ["a.wav", "b.wav"], + make_session=make_session, + open_source=open_source, + renderer=StreamRenderer(json_mode=True, out=io.StringIO()), + json_mode=True, + ) + assert excinfo.value.message == "2 of 2 sources failed." diff --git a/tests/test_stream_exec.py b/tests/test_stream_exec.py index 47af253a..5c5fb4ef 100644 --- a/tests/test_stream_exec.py +++ b/tests/test_stream_exec.py @@ -3,82 +3,24 @@ The command module only parses argv into a StreamOptions; everything after that is run_stream, a plain function of data. These tests drive validation, flag mapping, and session wiring by constructing options directly — no CliRunner argv round-trip, -no merged-stream output parsing. +no merged-stream output parsing. The --save-dir suite lives in test_stream_save_dir.py; +the shared fakes (mic, turns, defaults) live in tests/_stream_helpers.py. """ from __future__ import annotations import dataclasses import wave -from datetime import datetime from pathlib import Path import pytest from aai_cli.app.context import AppState -from aai_cli.commands.stream import DEFAULT_SPEECH_MODEL from aai_cli.commands.stream import _exec as stream_exec -from aai_cli.core import config, llm +from aai_cli.core import config from aai_cli.core.errors import CLIError, UsageError from aai_cli.streaming.turn_presets import TurnDetectionPreset - -# The CLI's flag defaults, as data. Tests override per-case with dataclasses.replace. -DEFAULTS = stream_exec.StreamOptions( - source=None, - sample=False, - from_stdin=False, - sample_rate=None, - device=None, - system_audio=False, - system_audio_only=False, - speech_model=DEFAULT_SPEECH_MODEL, - encoding=None, - language_detection=None, - domain=None, - prompt=None, - keyterms_prompt=None, - end_of_turn_confidence_threshold=None, - min_turn_silence=None, - max_turn_silence=None, - turn_detection=None, - vad_threshold=None, - format_turns=None, - include_partial_turns=None, - speaker_labels=None, - max_speakers=None, - voice_focus=None, - voice_focus_threshold=None, - inactivity_timeout=None, - filter_profanity=None, - redact_pii=None, - redact_pii_policy=None, - redact_pii_sub=None, - webhook_url=None, - webhook_auth_header=None, - llm_prompt=None, - llm_interval=10.0, - model=llm.DEFAULT_MODEL, - max_tokens=llm.DEFAULT_MAX_TOKENS, - config_kv=None, - config_file=None, - output_field=None, - show_code=False, - save_audio=None, - save_transcript=None, - save_dir=None, - name=None, -) - - -class FakeMic: - """Mirrors MicrophoneSource's keyword signature (see microphone.py).""" - - def __init__(self, *, target_rate=None, device=None, capture_rate=None, on_open=None): - self.sample_rate = capture_rate or 16000 - self.device = device - - def __iter__(self): - return iter([b"\x00\x00"]) +from tests._stream_helpers import DEFAULTS, FakeMic, FakeTurn, RecordingMic, emit_turns def test_run_stream_maps_flags_to_params_without_cli(monkeypatch): @@ -181,6 +123,8 @@ def test_stream_options_are_immutable(): {"from_stdin": True, "save_transcript": Path("out.txt")}, # saves one transcript {"from_stdin": True, "save_dir": Path("rec")}, # auto-names one run {"from_stdin": True, "name": "Standup"}, # --name needs --save-dir + {"from_stdin": True, "auto_name": True}, # --auto-name names one run + {"from_stdin": True, "no_save_audio": True}, # --no-save-audio is a single-run flag ], ) def test_from_stdin_rejects_incompatible_flags(overrides): @@ -236,15 +180,6 @@ def fake_stream_batch(sources, *, make_session, open_source, renderer, json_mode # --- --save-audio (tee the streamed PCM to a WAV) -------------------------- -class RecordingMic(FakeMic): - """A mic that yields known PCM so the tee'd WAV's contents can be asserted.""" - - PCM = b"\x01\x02\x03\x04\x05\x06\x07\x08" - - def __iter__(self): - return iter([self.PCM]) - - def test_save_audio_tees_streamed_pcm_to_a_wav(monkeypatch, tmp_path): # The bytes the streaming API receives are also written to --save-audio, verbatim, # as a 16-bit mono WAV at the source's sample rate. @@ -316,28 +251,7 @@ def test_save_audio_rejects_missing_parent_dir(tmp_path): assert excinfo.value.error_type == "save_audio_path" -# --- --save-transcript / --save-dir (write the transcript text) ------------ -class FakeTurn: - """A streaming turn event with just the attributes the session reads.""" - - def __init__(self, transcript, *, end_of_turn=True, speaker_label=None): - self.transcript = transcript - self.end_of_turn = end_of_turn - self.speaker_label = speaker_label - - -def _emit_turns(*events): - """A fake client.stream_audio that drains the audio (driving any tee) then fires - each turn through the session's on_turn callback, like the real SDK reader.""" - - def _fake(api_key, source, *, params, on_turn, **_kwargs): - b"".join(source) # draining is what writes the tee'd WAV, if any - for event in events: - on_turn(event) - - return _fake - - +# --- --save-transcript (write the finalized turn text) --------------------- def test_save_transcript_writes_only_finalized_nonempty_turns(monkeypatch, tmp_path): # Each finalized, non-empty turn is one line; partials and empty turns are skipped. config.set_api_key("default", "sk_live") @@ -345,7 +259,7 @@ def test_save_transcript_writes_only_finalized_nonempty_turns(monkeypatch, tmp_p monkeypatch.setattr( stream_exec.client, "stream_audio", - _emit_turns( + emit_turns( FakeTurn("partial", end_of_turn=False), # not finalized -> skipped FakeTurn("hello world"), FakeTurn("", end_of_turn=True), # finalized but empty -> skipped @@ -366,7 +280,7 @@ def test_save_transcript_prefixes_diarized_speaker(monkeypatch, tmp_path): config.set_api_key("default", "sk_live") out = tmp_path / "notes.txt" monkeypatch.setattr( - stream_exec.client, "stream_audio", _emit_turns(FakeTurn("hi", speaker_label="A")) + stream_exec.client, "stream_audio", emit_turns(FakeTurn("hi", speaker_label="A")) ) monkeypatch.setattr(stream_exec, "MicrophoneSource", FakeMic) @@ -381,7 +295,7 @@ def test_no_transcript_file_written_when_flag_unset(monkeypatch, tmp_path): # Without a save flag the default run leaves no stray .txt (kills a mutant that # writes unconditionally). config.set_api_key("default", "sk_live") - monkeypatch.setattr(stream_exec.client, "stream_audio", _emit_turns(FakeTurn("hi"))) + monkeypatch.setattr(stream_exec.client, "stream_audio", emit_turns(FakeTurn("hi"))) monkeypatch.setattr(stream_exec, "MicrophoneSource", FakeMic) stream_exec.run_stream(DEFAULTS, AppState(), json_mode=True) @@ -389,69 +303,6 @@ def test_no_transcript_file_written_when_flag_unset(monkeypatch, tmp_path): assert list(tmp_path.glob("*.txt")) == [] -class _FixedDatetime: - """Freezes datetime.now() so the auto-assembled filename is deterministic.""" - - @staticmethod - def now(*_args, **_kwargs): - # Naive local wall-clock; _exec's .astimezone() keeps the same 14:30:05. - return datetime(2026, 6, 16, 14, 30, 5) - - -def test_save_dir_auto_names_transcript_and_matching_wav(monkeypatch, tmp_path): - # --save-dir buckets by date and shares one timestamp+slug stem across the .txt and - # the .wav, so both land together under DIR/YYYY-MM-DD/. - config.set_api_key("default", "sk_live") - monkeypatch.setattr(stream_exec, "datetime", _FixedDatetime) - monkeypatch.setattr(stream_exec.client, "stream_audio", _emit_turns(FakeTurn("hi there"))) - monkeypatch.setattr(stream_exec, "MicrophoneSource", RecordingMic) - - stream_exec.run_stream( - dataclasses.replace(DEFAULTS, save_dir=tmp_path / "rec", name="My Meeting"), - AppState(), - json_mode=True, - ) - - bucket = tmp_path / "rec" / "2026-06-16" - txt = bucket / "2026-06-16-143005-my-meeting.txt" - wav = bucket / "2026-06-16-143005-my-meeting.wav" - assert txt.read_text(encoding="utf-8") == "hi there\n" - with wave.open(str(wav), "rb") as w: - assert w.readframes(w.getnframes()) == RecordingMic.PCM - - -@pytest.mark.parametrize( - "overrides", - [ - {"save_dir": Path("rec"), "save_audio": Path("a.wav")}, # save-dir owns the audio name - {"save_dir": Path("rec"), "save_transcript": Path("a.txt")}, # ...and the transcript - {"save_dir": Path("rec"), "system_audio": True}, # two streams can't share one wav - {"name": "Standup"}, # --name without --save-dir is meaningless - ], -) -def test_save_dir_rejects_incompatible_flags(overrides): - with pytest.raises(UsageError): - stream_exec.run_stream( - dataclasses.replace(DEFAULTS, **overrides), AppState(), json_mode=False - ) - - -@pytest.mark.parametrize( - "overrides", - [ - {"save_transcript": Path("a.txt"), "show_code": True}, - {"save_dir": Path("rec"), "show_code": True}, - ], -) -def test_save_flags_reject_show_code(overrides): - # The generated SDK code doesn't save to disk, so pairing a save flag with --show-code - # is a usage error rather than a silently-dropped save. - with pytest.raises(UsageError): - stream_exec.run_stream( - dataclasses.replace(DEFAULTS, **overrides), AppState(), json_mode=False - ) - - def test_save_transcript_rejects_missing_parent_dir(tmp_path): config.set_api_key("default", "sk_live") with pytest.raises(CLIError) as excinfo: diff --git a/tests/test_stream_save_dir.py b/tests/test_stream_save_dir.py new file mode 100644 index 00000000..4d4edfa5 --- /dev/null +++ b/tests/test_stream_save_dir.py @@ -0,0 +1,134 @@ +"""End-to-end tests of `assembly stream --save-dir` through run_stream. + +Split from test_stream_exec.py: this file drives the auto-name / note / sidecar / +--no-save-audio behavior over the real session + savedir finalization (only the LLM +gateway is faked). Shared fakes live in tests/_stream_helpers.py. +""" + +from __future__ import annotations + +import dataclasses +import json +import wave +from pathlib import Path + +import pytest + +from aai_cli.app.context import AppState +from aai_cli.commands.stream import _exec as stream_exec +from aai_cli.core import config +from aai_cli.core.errors import UsageError +from tests._stream_helpers import DEFAULTS, FakeTurn, FixedDatetime, RecordingMic, emit_turns + + +def test_save_dir_auto_names_transcript_and_matching_wav(monkeypatch, tmp_path): + # --save-dir buckets by date and shares one timestamp+slug stem across the .txt and + # the .wav, so both land together under DIR/YYYY-MM-DD/. + config.set_api_key("default", "sk_live") + monkeypatch.setattr(stream_exec, "datetime", FixedDatetime) + monkeypatch.setattr(stream_exec.client, "stream_audio", emit_turns(FakeTurn("hi there"))) + monkeypatch.setattr(stream_exec, "MicrophoneSource", RecordingMic) + + stream_exec.run_stream( + dataclasses.replace(DEFAULTS, save_dir=tmp_path / "rec", name="My Meeting"), + AppState(), + json_mode=True, + ) + + bucket = tmp_path / "rec" / "2026-06-16" + txt = bucket / "2026-06-16-143005-my-meeting.txt" + wav = bucket / "2026-06-16-143005-my-meeting.wav" + assert txt.read_text(encoding="utf-8") == "hi there\n" + with wave.open(str(wav), "rb") as w: + assert w.readframes(w.getnframes()) == RecordingMic.PCM + # The sidecar lands beside them with the same stem. + assert (bucket / "2026-06-16-143005-my-meeting.aai.json").is_file() + + +@pytest.mark.parametrize( + "overrides", + [ + {"save_dir": Path("rec"), "save_audio": Path("a.wav")}, # save-dir owns the audio name + {"save_dir": Path("rec"), "save_transcript": Path("a.txt")}, # ...and the transcript + {"save_dir": Path("rec"), "system_audio": True}, # two streams can't share one wav + {"save_dir": Path("rec"), "name": "X", "auto_name": True}, # both set the title + {"name": "Standup"}, # --name without --save-dir is meaningless + {"auto_name": True}, # --auto-name needs --save-dir + {"no_save_audio": True}, # --no-save-audio needs --save-dir + ], +) +def test_save_dir_rejects_incompatible_flags(overrides): + with pytest.raises(UsageError): + stream_exec.run_stream( + dataclasses.replace(DEFAULTS, **overrides), AppState(), json_mode=False + ) + + +@pytest.mark.parametrize( + "overrides", + [ + {"save_transcript": Path("a.txt"), "show_code": True}, + {"save_dir": Path("rec"), "show_code": True}, + ], +) +def test_save_flags_reject_show_code(overrides): + # The generated SDK code doesn't save to disk, so pairing a save flag with --show-code + # is a usage error rather than a silently-dropped save. + with pytest.raises(UsageError): + stream_exec.run_stream( + dataclasses.replace(DEFAULTS, **overrides), AppState(), json_mode=False + ) + + +def test_no_save_audio_writes_transcript_and_sidecar_but_no_wav(monkeypatch, tmp_path): + # --save-dir --no-save-audio keeps the auto-named transcript + sidecar but writes no WAV. + config.set_api_key("default", "sk_live") + monkeypatch.setattr(stream_exec, "datetime", FixedDatetime) + monkeypatch.setattr(stream_exec.client, "stream_audio", emit_turns(FakeTurn("hi there"))) + monkeypatch.setattr(stream_exec, "MicrophoneSource", RecordingMic) + + stream_exec.run_stream( + dataclasses.replace(DEFAULTS, save_dir=tmp_path / "rec", name="Talk", no_save_audio=True), + AppState(), + json_mode=True, + ) + + bucket = tmp_path / "rec" / "2026-06-16" + assert (bucket / "2026-06-16-143005-talk.txt").read_text(encoding="utf-8") == "hi there\n" + record = json.loads((bucket / "2026-06-16-143005-talk.aai.json").read_text(encoding="utf-8")) + assert record["audio"] is None + assert list(bucket.glob("*.wav")) == [] + + +def test_save_dir_auto_name_and_note_end_to_end(monkeypatch, tmp_path): + # --save-dir --auto-name --llm: the files are renamed from the LLM-derived title, the + # final answer lands as a .md note, and the sidecar records the title. + config.set_api_key("default", "sk_live") + monkeypatch.setattr(stream_exec, "datetime", FixedDatetime) + monkeypatch.setattr(stream_exec.client, "stream_audio", emit_turns(FakeTurn("hi there"))) + monkeypatch.setattr(stream_exec, "MicrophoneSource", RecordingMic) + + from aai_cli.streaming import savedir + + def fake_run_chain(api_key, prompts, *, transcript_text, model, max_tokens): + return "Cool Title" if prompts == [savedir.TITLE_PROMPT] else "the summary" + + monkeypatch.setattr("aai_cli.core.llm.run_chain", fake_run_chain) + + stream_exec.run_stream( + dataclasses.replace( + DEFAULTS, save_dir=tmp_path / "rec", auto_name=True, llm_prompt=["summarize"] + ), + AppState(), + json_mode=True, + ) + + bucket = tmp_path / "rec" / "2026-06-16" + stem = "2026-06-16-143005-cool-title" + assert (bucket / f"{stem}.txt").read_text(encoding="utf-8") == "hi there\n" + assert (bucket / f"{stem}.md").read_text(encoding="utf-8") == "the summary\n" + with wave.open(str(bucket / f"{stem}.wav"), "rb") as w: + assert w.readframes(w.getnframes()) == RecordingMic.PCM + record = json.loads((bucket / f"{stem}.aai.json").read_text(encoding="utf-8")) + assert record["title"] == "Cool Title" + assert record["turns"] == 1 diff --git a/tests/test_stream_session.py b/tests/test_stream_session.py index 702b39af..0cdebf15 100644 --- a/tests/test_stream_session.py +++ b/tests/test_stream_session.py @@ -6,6 +6,221 @@ """ import types +from datetime import datetime + + +def _turn(text, *, speaker_label=None): + return types.SimpleNamespace(transcript=text, end_of_turn=True, speaker_label=speaker_label) + + +def _save_plan( + tmp_path, *, auto_name=False, save_audio=True, write_note=False, name: str | None = "Meeting" +): + from aai_cli.streaming.savedir import SaveDirPlan + + return SaveDirPlan( + save_dir=tmp_path / "rec", + now=datetime(2026, 6, 16, 14, 30, 5), + name=name, + auto_name=auto_name, + save_audio=save_audio, + write_note=write_note, + ) + + +def test_save_dir_finalize_passes_recorded_metadata(monkeypatch, tmp_path): + # A --save-dir run records each finalized turn's text + diarized speaker and the + # wall-clock duration, then hands them to write_outputs once streaming ends. Pins + # the speaker dedupe, the turn count, and the injected-clock duration. + import io + + from aai_cli.streaming import savedir as savedir_mod + from aai_cli.streaming import session as session_mod + from aai_cli.streaming.render import StreamRenderer + + captured: dict[str, object] = {} + monkeypatch.setattr( + savedir_mod, "write_outputs", lambda plan, **kw: captured.update(kw) or plan.paths + ) + monkeypatch.setattr( + session_mod.client, + "stream_audio", + lambda api_key, source, *, on_turn, **k: [ + b"".join(source), + on_turn(_turn("hello", speaker_label="A")), + on_turn(_turn("again", speaker_label="A")), # same speaker -> deduped + on_turn(_turn("bye", speaker_label="B")), + ], + ) + ticks = iter([100.0, 107.0]) + session = session_mod.StreamSession( + api_key="sk", + base_flags={"speech_model": "u3-rt-pro"}, + overrides=None, + config_file=None, + renderer=StreamRenderer(json_mode=True, out=io.StringIO()), + follow=None, + llm_prompts=[], + model="m", + max_tokens=1, + save_plan=_save_plan(tmp_path), + clock=lambda: next(ticks), + ) + session.run([b"\x00\x00"], 16000) + + assert captured["speakers"] == ["A", "B"] + assert captured["turns"] == 3 + assert captured["duration_seconds"] == 7 # 107.0 - 100.0 + assert captured["title"] is None # no --auto-name + assert captured["note"] is None # no --llm note + + +def test_save_dir_finalize_derives_title_and_note(monkeypatch, tmp_path): + # --auto-name derives the title from the transcript via the LLM, and --llm's final + # answer is handed to write_outputs as the note. + import io + + from aai_cli.streaming import savedir as savedir_mod + from aai_cli.streaming import session as session_mod + from aai_cli.streaming.render import StreamRenderer + from aai_cli.ui.follow import FollowRenderer + + captured: dict[str, object] = {} + monkeypatch.setattr( + savedir_mod, "write_outputs", lambda plan, **kw: captured.update(kw) or plan.paths + ) + monkeypatch.setattr(savedir_mod, "derive_title", lambda *a, **k: "Derived Title") + monkeypatch.setattr(session_mod.llm, "run_chain", lambda *a, **k: "the summary") + monkeypatch.setattr( + session_mod.client, + "stream_audio", + lambda api_key, source, *, on_turn, **k: [b"".join(source), on_turn(_turn("hi"))], + ) + session = session_mod.StreamSession( + api_key="sk", + base_flags={"speech_model": "u3-rt-pro"}, + overrides=None, + config_file=None, + renderer=StreamRenderer(json_mode=True, out=io.StringIO()), + follow=FollowRenderer(json_mode=True), + llm_prompts=["summarize"], + model="m", + max_tokens=1, + save_plan=_save_plan(tmp_path, auto_name=True, write_note=True, name=None), + llm_interval=0.0, + ) + session.run([b"\x00\x00"], 16000) + + assert captured["title"] == "Derived Title" + assert captured["note"] == "the summary" + + +def test_save_dir_skips_title_when_transcript_is_empty(monkeypatch, tmp_path): + # --auto-name with zero finalized turns has nothing to title, so derive_title is + # skipped and the file keeps its timestamp stem (pins the `auto_name and text` guard). + import io + + from aai_cli.streaming import savedir as savedir_mod + from aai_cli.streaming import session as session_mod + from aai_cli.streaming.render import StreamRenderer + + captured: dict[str, object] = {} + monkeypatch.setattr( + savedir_mod, "write_outputs", lambda plan, **kw: captured.update(kw) or plan.paths + ) + monkeypatch.setattr(savedir_mod, "derive_title", lambda *a, **k: "Should Not Be Used") + monkeypatch.setattr( + session_mod.client, + "stream_audio", + lambda api_key, source, *, on_turn, **k: b"".join(source), # no turns fired + ) + session = session_mod.StreamSession( + api_key="sk", + base_flags={"speech_model": "u3-rt-pro"}, + overrides=None, + config_file=None, + renderer=StreamRenderer(json_mode=True, out=io.StringIO()), + follow=None, + llm_prompts=[], + model="m", + max_tokens=1, + save_plan=_save_plan(tmp_path, auto_name=True, name=None), + ) + session.run([b"\x00\x00"], 16000) + + assert captured["title"] is None # no transcript -> no LLM title call + assert captured["turns"] == 0 + + +def test_finalize_uses_zero_duration_when_capture_never_started(monkeypatch, tmp_path): + # If the capture window never opened (stream_one not reached), the sidecar duration is + # 0, not a bogus value (pins the `0 if _capture_start is None` literal). + import io + + from aai_cli.streaming import savedir as savedir_mod + from aai_cli.streaming import session as session_mod + from aai_cli.streaming.render import StreamRenderer + + captured: dict[str, object] = {} + monkeypatch.setattr( + savedir_mod, "write_outputs", lambda plan, **kw: captured.update(kw) or plan.paths + ) + plan = _save_plan(tmp_path) + session = session_mod.StreamSession( + api_key="sk", + base_flags={}, + overrides=None, + config_file=None, + renderer=StreamRenderer(json_mode=True, out=io.StringIO()), + follow=None, + llm_prompts=[], + model="m", + max_tokens=1, + save_plan=plan, + ) + session._finalize_save_dir(plan) # no run() -> _capture_start stayed None + + assert captured["duration_seconds"] == 0 + + +def test_save_dir_auto_name_failure_keeps_recording(monkeypatch, tmp_path): + # A failed --auto-name title call must not lose the (already-saved) recording: the + # error is warned and write_outputs still runs, with no title. + import io + + from aai_cli.core.errors import APIError + from aai_cli.streaming import savedir as savedir_mod + from aai_cli.streaming import session as session_mod + from aai_cli.streaming.render import StreamRenderer + + def boom(*_a, **_k): + raise APIError("gateway down") + + captured: dict[str, object] = {} + monkeypatch.setattr(savedir_mod, "derive_title", boom) + monkeypatch.setattr( + savedir_mod, "write_outputs", lambda plan, **kw: captured.update(kw) or plan.paths + ) + monkeypatch.setattr( + session_mod.client, + "stream_audio", + lambda api_key, source, *, on_turn, **k: [b"".join(source), on_turn(_turn("hi"))], + ) + session = session_mod.StreamSession( + api_key="sk", + base_flags={"speech_model": "u3-rt-pro"}, + overrides=None, + config_file=None, + renderer=StreamRenderer(json_mode=True, out=io.StringIO()), + follow=None, + llm_prompts=[], + model="m", + max_tokens=1, + save_plan=_save_plan(tmp_path, auto_name=True, name=None), + ) + session.run([b"\x00\x00"], 16000) + + assert captured["title"] is None # finalize still ran, just without a derived title def test_stream_session_listening_notice_latches(monkeypatch): diff --git a/tests/test_streaming_naming.py b/tests/test_streaming_naming.py index a2d5bf48..da68143d 100644 --- a/tests/test_streaming_naming.py +++ b/tests/test_streaming_naming.py @@ -39,6 +39,16 @@ def test_resolve_buckets_by_date_with_slugged_name(): assert paths.audio == Path("rec/2026-06-16/2026-06-16-143005-my-meeting.wav") +def test_resolve_derives_note_and_sidecar_from_the_same_stem(): + # The .md note and .aai.json sidecar share the transcript's stem so a browse UI can + # match all four files of one recording by name (pins each suffix). + paths = naming.resolve(Path("rec"), "My Meeting", now=NOW) + bucket = Path("rec/2026-06-16") + assert paths.note == bucket / "2026-06-16-143005-my-meeting.md" + assert paths.sidecar == bucket / "2026-06-16-143005-my-meeting.aai.json" + assert paths.directory == bucket + + def test_resolve_without_name_is_just_the_timestamp(): # No --name (or a name that slugs to nothing) -> the stem is the bare timestamp, # never a trailing-hyphen filename. diff --git a/tests/test_streaming_savedir.py b/tests/test_streaming_savedir.py new file mode 100644 index 00000000..fbd9891a --- /dev/null +++ b/tests/test_streaming_savedir.py @@ -0,0 +1,218 @@ +"""Unit tests for aai_cli.streaming.savedir — the --save-dir finalization. + +``write_outputs`` is pure file I/O (no gateway), so the rename/note/sidecar behavior +is asserted directly; the LLM title call (``derive_title``) is exercised against a +patched ``llm.run_chain``. +""" + +from __future__ import annotations + +import dataclasses +import json +from datetime import datetime +from pathlib import Path + +import pytest + +from aai_cli.core.errors import CLIError +from aai_cli.streaming import naming, savedir + +NOW = datetime(2026, 6, 16, 14, 30, 5) + + +def test_save_dir_plan_is_immutable(tmp_path): + plan = savedir.SaveDirPlan( + save_dir=tmp_path, now=NOW, name=None, auto_name=True, save_audio=True, write_note=False + ) + field_name = "save_audio" + with pytest.raises(dataclasses.FrozenInstanceError): + setattr(plan, field_name, False) + + +def _plan(tmp_path: Path, *, name=None, auto_name=False, save_audio=True, write_note=False): + return savedir.SaveDirPlan( + save_dir=tmp_path / "rec", + now=NOW, + name=name, + auto_name=auto_name, + save_audio=save_audio, + write_note=write_note, + ) + + +def _seed_capture(plan: savedir.SaveDirPlan) -> naming.SavePaths: + """Create the provisional .txt (+ .wav when audio is saved) a live run would leave.""" + paths = plan.paths + naming.ensure_dir(paths.directory) + paths.transcript.write_text("Speaker A: hello\n", encoding="utf-8") + if plan.save_audio: + paths.audio.write_bytes(b"RIFFFAKE") + return paths + + +def test_write_outputs_writes_sidecar_with_metadata(tmp_path): + # The sidecar carries title/date/duration/speakers/turns plus the file names, so a + # list UI needs no transcript parsing (a whole-dict assert kills key/value mutants). + plan = _plan(tmp_path, name="My Meeting") + _seed_capture(plan) + + final = savedir.write_outputs( + plan, title=None, note=None, speakers=["A", "B"], duration_seconds=42, turns=3 + ) + + raw = final.sidecar.read_text(encoding="utf-8") + assert '\n "title"' in raw # pretty-printed with a 2-space indent + assert json.loads(raw) == { + "title": "My Meeting", + "date": "2026-06-16T14:30:05", + "duration_seconds": 42, + "speakers": ["A", "B"], + "turns": 3, + "transcript": "2026-06-16-143005-my-meeting.txt", + "audio": "2026-06-16-143005-my-meeting.wav", + "note": None, + } + + +def test_write_outputs_omits_audio_in_sidecar_when_not_saved(tmp_path): + # --no-save-audio: the sidecar's "audio" is null, not a phantom WAV name. + plan = _plan(tmp_path, name="Talk", save_audio=False) + _seed_capture(plan) + + final = savedir.write_outputs( + plan, title=None, note=None, speakers=[], duration_seconds=0, turns=0 + ) + + assert json.loads(final.sidecar.read_text(encoding="utf-8"))["audio"] is None + + +def test_write_outputs_writes_llm_note_as_markdown(tmp_path): + # With --llm a note lands as .md next to the transcript, and the sidecar + # records its name. + plan = _plan(tmp_path, name="Talk", write_note=True) + _seed_capture(plan) + + final = savedir.write_outputs( + plan, title=None, note="- ship the release", speakers=[], duration_seconds=1, turns=1 + ) + + assert final.note.read_text(encoding="utf-8") == "- ship the release\n" + assert json.loads(final.sidecar.read_text(encoding="utf-8"))["note"] == final.note.name + + +def test_write_outputs_skips_note_file_when_no_answer(tmp_path): + # write_note was requested but no answer was produced (no turns) -> no .md, null in sidecar. + plan = _plan(tmp_path, name="Talk", write_note=True) + _seed_capture(plan) + + final = savedir.write_outputs( + plan, title=None, note=None, speakers=[], duration_seconds=0, turns=0 + ) + + assert not final.note.exists() + assert json.loads(final.sidecar.read_text(encoding="utf-8"))["note"] is None + + +def test_write_outputs_renames_files_for_auto_name(tmp_path): + # --auto-name: the provisional timestamp-only files are renamed to carry the title slug; + # the WAV moves with the transcript and the old stem is gone. + plan = _plan(tmp_path, auto_name=True) + provisional = _seed_capture(plan) + + final = savedir.write_outputs( + plan, title="Quarterly Review", note=None, speakers=[], duration_seconds=5, turns=2 + ) + + assert final.transcript.name == "2026-06-16-143005-quarterly-review.txt" + assert final.transcript.read_text(encoding="utf-8") == "Speaker A: hello\n" + assert final.audio.name == "2026-06-16-143005-quarterly-review.wav" + assert final.audio.read_bytes() == b"RIFFFAKE" + assert not provisional.transcript.exists() + assert not provisional.audio.exists() + assert json.loads(final.sidecar.read_text(encoding="utf-8"))["title"] == "Quarterly Review" + + +def test_write_outputs_keeps_timestamp_when_title_slugs_to_nothing(tmp_path): + # An auto-name title with no usable characters slugs to "" -> keep the timestamp stem + # rather than rename to a bare-hyphen name. + plan = _plan(tmp_path, auto_name=True) + provisional = _seed_capture(plan) + + final = savedir.write_outputs( + plan, title="???", note=None, speakers=[], duration_seconds=0, turns=1 + ) + + assert final.transcript == provisional.transcript + assert final.transcript.exists() + + +def test_write_outputs_skips_missing_wav_on_rename(tmp_path): + # --auto-name with a recording that left no WAV (e.g. interrupted before audio) renames + # the transcript without crashing on the absent audio file. + plan = _plan(tmp_path, auto_name=True, save_audio=True) + paths = plan.paths + naming.ensure_dir(paths.directory) + paths.transcript.write_text("hi\n", encoding="utf-8") # no .wav written + + final = savedir.write_outputs( + plan, title="A Title", note=None, speakers=[], duration_seconds=0, turns=1 + ) + + assert final.transcript.read_text(encoding="utf-8") == "hi\n" + assert not final.audio.exists() + + +def test_write_outputs_ignores_title_without_auto_name(tmp_path): + # A title is only adopted under --auto-name; without it the explicit --name stem wins + # and the sidecar keeps that name (pins the `auto_name and title` selection). + plan = _plan(tmp_path, name="Orig", auto_name=False) + provisional = _seed_capture(plan) + + final = savedir.write_outputs( + plan, title="Ignored", note=None, speakers=[], duration_seconds=0, turns=0 + ) + + assert final.stem == provisional.stem + assert json.loads(final.sidecar.read_text(encoding="utf-8"))["title"] == "Orig" + + +def test_write_outputs_raises_clean_error_when_sidecar_write_blocked(tmp_path): + # A failed sidecar write (its directory was never created) is a save_dir_path CLIError, + # not a raw OSError — pins the _write error wrapper. + plan = _plan(tmp_path, name="Talk", save_audio=False) + # No ensure_dir / provisional files: the bucket directory doesn't exist, so writing + # the sidecar into it fails. + with pytest.raises(CLIError) as excinfo: + savedir.write_outputs(plan, title=None, note=None, speakers=[], duration_seconds=0, turns=0) + assert excinfo.value.error_type == "save_dir_path" + assert excinfo.value.exit_code == 2 + + +def test_write_outputs_raises_clean_error_when_rename_blocked(tmp_path): + # A failed rename surfaces as a save_dir_path CLIError, not a raw OSError. + plan = _plan(tmp_path, auto_name=True, save_audio=False) + naming.ensure_dir(plan.paths.directory) + # No provisional transcript exists, so the rename of a missing file fails. + with pytest.raises(CLIError) as excinfo: + savedir.write_outputs( + plan, title="A Title", note=None, speakers=[], duration_seconds=0, turns=0 + ) + assert excinfo.value.error_type == "save_dir_path" + assert excinfo.value.exit_code == 2 + + +def test_derive_title_strips_and_prompts_over_transcript(monkeypatch): + # derive_title runs the title prompt over the transcript text and trims the answer. + seen = {} + + def fake_run_chain(api_key, prompts, *, transcript_text, model, max_tokens): + seen["prompts"] = prompts + seen["transcript_text"] = transcript_text + return " Quarterly Review \n" + + monkeypatch.setattr(savedir.llm, "run_chain", fake_run_chain) + title = savedir.derive_title("sk", "we shipped the release", model="m", max_tokens=32) + + assert title == "Quarterly Review" + assert seen["prompts"] == [savedir.TITLE_PROMPT] + assert seen["transcript_text"] == "we shipped the release"