From 858afe1dd0ae640966557b812bcf904e7cb97a6a Mon Sep 17 00:00:00 2001 From: Claude Date: Tue, 16 Jun 2026 21:06:52 +0000 Subject: [PATCH] Add transcript saving to `assembly stream` MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds three save flags to `assembly stream`, grouped under a new "Saving" help panel alongside the existing --save-audio: - --save-transcript PATH: write finalized turns to a text file, one per line (the same plain text `-o text` emits), flushed per turn so a Ctrl-C leaves a complete transcript so far. - --save-dir DIR + --name TITLE: hand filename assembly to the CLI. Files land under DIR/YYYY-MM-DD/ with a timestamped, slugged stem (YYYY-MM-DD-HHMMSS[-slug]), and a matching WAV shares the stem — so a wrapper script's hand-rolled date(1) + tr/sed slug block can go away. New core pieces: streaming/transcript.py (the turn-by-turn text writer, parallel to record.tee_wav) and streaming/naming.py (slug + path assembly). StreamSession grows a transcript writer opened/closed in _guarded and fed from both the plain and --llm turn paths. Also splits the --system-audio tests out of test_stream_session.py into test_stream_system_audio.py to stay under the 500-line file gate. Co-Authored-By: Claude Opus 4.8 Claude-Session: https://claude.ai/code/session_01UcH9QQmiZXviKzCpof7gJ4 --- aai_cli/commands/stream/__init__.py | 33 +- aai_cli/commands/stream/_exec.py | 93 +++- aai_cli/help_panels.py | 1 + aai_cli/streaming/naming.py | 74 +++ aai_cli/streaming/session.py | 54 ++- aai_cli/streaming/transcript.py | 58 +++ .../test_snapshots_help_run.ambr | 18 +- tests/test_stream_exec.py | 154 ++++++ tests/test_stream_session.py | 444 +++--------------- tests/test_stream_system_audio.py | 399 ++++++++++++++++ tests/test_streaming_naming.py | 61 +++ tests/test_streaming_transcript.py | 38 ++ 12 files changed, 1026 insertions(+), 401 deletions(-) create mode 100644 aai_cli/streaming/naming.py create mode 100644 aai_cli/streaming/transcript.py create mode 100644 tests/test_stream_system_audio.py create mode 100644 tests/test_streaming_naming.py create mode 100644 tests/test_streaming_transcript.py diff --git a/aai_cli/commands/stream/__init__.py b/aai_cli/commands/stream/__init__.py index 2eda6348..7fb00569 100644 --- a/aai_cli/commands/stream/__init__.py +++ b/aai_cli/commands/stream/__init__.py @@ -34,6 +34,11 @@ ("Stream the hosted sample", "assembly stream --sample"), ("Label speakers in the live transcript", "assembly stream --speaker-labels"), ("Save a WAV of the audio while streaming", "assembly stream --save-audio out.wav"), + ("Save the transcript text to a file", "assembly stream --save-transcript notes.txt"), + ( + "Auto-name the transcript + WAV under a dir", + 'assembly stream --save-dir ~/recordings --name "Standup"', + ), ( "Boost domain terms with keyterm prompts", 'assembly stream --keyterms-prompt "AssemblyAI" --keyterms-prompt "Claude"', @@ -83,16 +88,39 @@ def stream( help="macOS only: stream system/app audio without the microphone", rich_help_panel=help_panels.OPT_CAPTURE, ), + # saving save_audio: Path | None = typer.Option( None, "--save-audio", help="Tee the streamed PCM to PATH as a 16-bit mono WAV while transcribing", - rich_help_panel=help_panels.OPT_CAPTURE, + rich_help_panel=help_panels.OPT_SAVING, dir_okay=False, # Click guardrail; flipping it changes no behavior a unit test can observe # (and the writable check is a no-op under the test runner's root uid). writable=True, # pragma: no mutate ), + save_transcript: Path | None = typer.Option( + None, + "--save-transcript", + help="Write the finalized transcript to PATH, one turn per line", + rich_help_panel=help_panels.OPT_SAVING, + dir_okay=False, + writable=True, # pragma: no mutate + ), + save_dir: Path | None = typer.Option( + None, + "--save-dir", + help="Auto-name the transcript and a matching WAV under DIR/YYYY-MM-DD/ " + "with a timestamped file", + rich_help_panel=help_panels.OPT_SAVING, + file_okay=False, + ), + name: str | None = typer.Option( + None, + "--name", + help="Title to slug into the --save-dir filename (e.g. a meeting title)", + rich_help_panel=help_panels.OPT_SAVING, + ), # model & input speech_model: SpeechModel = typer.Option( DEFAULT_SPEECH_MODEL, @@ -367,5 +395,8 @@ def stream( output_field=output_field, show_code=show_code, save_audio=save_audio, + save_transcript=save_transcript, + save_dir=save_dir, + name=name, ) run_with_options(ctx, stream_exec.run_stream, opts, json=json_out) diff --git a/aai_cli/commands/stream/_exec.py b/aai_cli/commands/stream/_exec.py index af0e7a44..c0bc8b43 100644 --- a/aai_cli/commands/stream/_exec.py +++ b/aai_cli/commands/stream/_exec.py @@ -12,6 +12,7 @@ import tempfile from collections.abc import Iterable from dataclasses import dataclass +from datetime import UTC, datetime from pathlib import Path from assemblyai import PIISubstitutionPolicy @@ -22,7 +23,7 @@ from aai_cli.core import choices, client, config_builder, stdio, youtube from aai_cli.core.errors import UsageError, mutually_exclusive from aai_cli.core.microphone import MicrophoneSource -from aai_cli.streaming import record, turn_presets +from aai_cli.streaming import naming, record, transcript, turn_presets from aai_cli.streaming.macos import MacSystemAudioSource from aai_cli.streaming.render import StreamRenderer from aai_cli.streaming.session import ( @@ -86,6 +87,9 @@ class StreamOptions: output_field: choices.TextOrJson | None show_code: bool save_audio: Path | None + save_transcript: Path | None + save_dir: Path | None + name: str | None def source_options(self) -> SourceOptions: """The audio-input subset, in the shape the validation/dispatch helpers read.""" @@ -179,6 +183,70 @@ def _print_show_code( output.print_code(code_gen.stream(merged, llm=gateway, source=code_source)) +def _reject_save_with_show_code(opts: StreamOptions) -> None: + """Reject any save flag combined with --show-code: the generated SDK code never + writes audio or a transcript to disk, so silently dropping the save would mislead.""" + for flag, given in ( + ("--save-audio", opts.save_audio is not None), + ("--save-transcript", opts.save_transcript is not None), + ("--save-dir", opts.save_dir is not None), + ): + if given: + raise UsageError( + f"{flag} cannot be combined with --show-code; the generated SDK code " + "does not save to disk." + ) + + +def _resolve_save_targets( + opts: StreamOptions, sources: SourceOptions +) -> tuple[Path | None, Path | None]: + """Resolve the save flags into the (audio, transcript) paths the session writes. + + ``--save-dir`` owns filename assembly — it auto-names both the transcript and a + matching WAV under ``DIR/YYYY-MM-DD/`` — so it can't be combined with the explicit + ``--save-audio``/``--save-transcript`` paths, and ``--name`` only feeds that assembly. + Audio can't tee to a single WAV under ``--system-audio`` (two streams), which rejects + both the explicit ``--save-audio`` and ``--save-dir``'s audio leg. + """ + if opts.save_dir is not None: + mutually_exclusive( + ("--save-dir", True), + ("--save-audio", opts.save_audio is not None), + ("--save-transcript", opts.save_transcript is not None), + suggestion="--save-dir names the files for you; drop the explicit path.", + ) + if sources.from_system_audio: + raise UsageError( + "--save-dir cannot be combined with --system-audio; the mic and system " + "streams can't share one recording.", + suggestion="Record a single source (mic, file, URL, or - on stdin).", + ) + # Local wall-clock time (what a meeting filename wants); the explicit utc-then- + # astimezone keeps the now() call timezone-aware for the linter. + now = datetime.now(UTC).astimezone() + paths = naming.resolve(opts.save_dir, opts.name, now=now) + naming.ensure_dir(paths.transcript.parent) + return paths.audio, paths.transcript + if opts.name is not None: + raise UsageError( + "--name applies only with --save-dir.", + suggestion="Pass --save-dir DIR to auto-name the files, " + "or --save-transcript PATH for an explicit path.", + ) + if opts.save_audio is not None: + if sources.from_system_audio: + raise UsageError( + "--save-audio cannot be combined with --system-audio; the mic and system " + "streams can't share one file.", + suggestion="Record a single source (mic, file, URL, or - on stdin).", + ) + record.validate_target(opts.save_audio) + if opts.save_transcript is not None: + transcript.validate_target(opts.save_transcript) + return opts.save_audio, opts.save_transcript + + def _dispatch(session: StreamSession, opts: SourceOptions) -> None: """Open the right audio source(s) for the flags and stream them.""" if opts.from_system_audio: @@ -249,7 +317,10 @@ def _collect_batch_sources(opts: StreamOptions, *, text_mode: bool) -> list[str] mutually_exclusive( ("--from-stdin", True), ("--save-audio", opts.save_audio is not None), - suggestion="--save-audio tees one stream; run a single source to record it.", + ("--save-transcript", opts.save_transcript is not None), + ("--save-dir", opts.save_dir is not None), + ("--name", opts.name is not None), + suggestion="--from-stdin streams many sources; saving applies to a single run.", ) mutually_exclusive( ("--llm", bool(opts.llm_prompt)), @@ -311,25 +382,14 @@ def run_stream(opts: StreamOptions, state: AppState, *, json_mode: bool) -> None base_flags = opts.base_flags() if opts.show_code: - if opts.save_audio is not None: - raise UsageError( - "--save-audio cannot be combined with --show-code; the generated SDK " - "code does not tee audio to disk." - ) + _reject_save_with_show_code(opts) _print_show_code(opts, sources, base_flags, text_mode=text_mode) return # Validate the requested sources (including that a local file exists) before # credentials, so a typo'd path reads as "file not found" — not as a login. validate_sources(sources, has_llm=bool(opts.llm_prompt), text_mode=text_mode) - if opts.save_audio is not None: - if sources.from_system_audio: - raise UsageError( - "--save-audio cannot be combined with --system-audio; the mic and system " - "streams can't share one file.", - suggestion="Record a single source (mic, file, URL, or - on stdin).", - ) - record.validate_target(opts.save_audio) + save_audio, save_transcript = _resolve_save_targets(opts, sources) if sources.from_file and not sources.from_stdin: client.resolve_audio_source(sources.source, sample=sources.sample) api_key = state.resolve_api_key() @@ -345,7 +405,8 @@ def run_stream(opts: StreamOptions, state: AppState, *, json_mode: bool) -> None llm_prompts=llm_prompts, model=opts.model, max_tokens=opts.max_tokens, - save_audio=opts.save_audio, + save_audio=save_audio, + save_transcript=save_transcript, llm_interval=opts.llm_interval, ) _dispatch(session, sources) diff --git a/aai_cli/help_panels.py b/aai_cli/help_panels.py index 218448f7..92ddd20b 100644 --- a/aai_cli/help_panels.py +++ b/aai_cli/help_panels.py @@ -47,3 +47,4 @@ OPT_CAPTURE = "Audio Capture" OPT_TURNS = "Turn Detection" OPT_FEATURES = "Features" +OPT_SAVING = "Saving" # write the audio/transcript to disk: --save-audio/-transcript/-dir, --name diff --git a/aai_cli/streaming/naming.py b/aai_cli/streaming/naming.py new file mode 100644 index 00000000..24a4d4d7 --- /dev/null +++ b/aai_cli/streaming/naming.py @@ -0,0 +1,74 @@ +"""Assemble auto-named output paths for `assembly stream --save-dir`. + +``--save-dir DIR`` turns filename assembly over to the CLI: the transcript and the +matching WAV land under ``DIR/YYYY-MM-DD/`` with a timestamped, slugged stem +(``YYYY-MM-DD-HHMMSS[-slug]``). That is the block a wrapper script otherwise hand-rolls +with ``date(1)`` plus a ``tr``/``sed`` slug — passing ``--name ""`` feeds e.g. a +calendar event title straight in, so the caller never slugs anything itself. +""" + +from __future__ import annotations + +import re +from dataclasses import dataclass +from datetime import datetime +from pathlib import Path + +from aai_cli.core.errors import CLIError + +# Each run of non-alphanumeric characters collapses to a single hyphen. +_NON_ALNUM = re.compile(r"[^a-z0-9]+") +# Cap the slug so a long title can't push the filename past a filesystem name limit. +_MAX_SLUG_LEN = 80 + + +def slugify(title: str) -> str: + """Fold ``title`` to a filename-safe slug. + + Lowercased, with every run of non-alphanumeric characters replaced by a single + hyphen, capped at 80 characters, and no leading/trailing hyphen. Returns "" when + the title has no usable characters (e.g. "!!!"), so the caller drops the slug + suffix rather than emitting a bare-hyphen filename. + """ + slug = _NON_ALNUM.sub("-", title.lower()).strip("-") + return slug[:_MAX_SLUG_LEN].strip("-") + + +def _stem(now: datetime, name: str | None) -> str: + """The filename stem: a sortable timestamp, plus the slugged name when one survives.""" + stamp = now.strftime("%Y-%m-%d-%H%M%S") + slug = slugify(name) if name else "" + return f"{stamp}-{slug}" if slug else stamp + + +@dataclass(frozen=True) +class SavePaths: + """The auto-assembled transcript path and its matching audio path (same stem).""" + + transcript: Path + audio: Path + + +def resolve(save_dir: Path, name: str | None, *, now: datetime) -> SavePaths: + """Build ``DIR/YYYY-MM-DD/<stem>.{txt,wav}`` for ``now`` and ``--name``. + + The date bucket and the stem both carry the date so a transcript stays + self-describing if it is later moved out of its bucket. Path assembly only — + creating the directory is the caller's job (see ``ensure_dir``). + """ + bucket = save_dir / now.strftime("%Y-%m-%d") + stem = _stem(now, name) + return SavePaths(transcript=bucket / f"{stem}.txt", audio=bucket / f"{stem}.wav") + + +def ensure_dir(path: Path) -> None: + """Create ``path`` (and parents) for the auto-named files, as a clean CLIError on failure.""" + try: + path.mkdir(parents=True, exist_ok=True) + except OSError as exc: + raise CLIError( + f"Cannot create {path}: {exc}", + error_type="save_dir_path", + exit_code=2, + suggestion="Pass --save-dir under a writable location.", + ) from exc diff --git a/aai_cli/streaming/session.py b/aai_cli/streaming/session.py index 3953ade7..71d0ce67 100644 --- a/aai_cli/streaming/session.py +++ b/aai_cli/streaming/session.py @@ -19,6 +19,7 @@ ) from aai_cli.streaming import record from aai_cli.streaming.render import StreamRenderer, speaker_prefix +from aai_cli.streaming.transcript import TranscriptWriter from aai_cli.ui import output from aai_cli.ui.follow import FollowRenderer @@ -26,6 +27,21 @@ _ParallelStreams = list[tuple[str, Iterable[bytes], int]] +def _finalized_turn_line(event: object, source_label: str | None) -> str | None: + """The transcript line for a finalized, non-empty turn, or None for a partial/empty one. + + Prefixes the speaker/source label exactly as the text renderer does, so the saved + file and the ``--llm`` transcript record both read like the on-screen turns. + """ + if not getattr(event, "end_of_turn", False): + return None # partials don't belong in the transcript + text = getattr(event, "transcript", "") or "" + if not text: + return None + prefix = speaker_prefix(source_label, getattr(event, "speaker_label", None)) + return f"{prefix[0]}: {text}" if prefix is not None else text + + @dataclass(frozen=True) class SourceOptions: """Where the audio comes from, distilled from the CLI flags. @@ -141,11 +157,17 @@ class StreamSession: # When set, tee the streamed PCM to this path as a WAV (see record.tee_wav). Only # the single-source path sets it — the parallel/batch callers reject --save-audio. save_audio: Path | None = None + # When set, write each finalized turn to this path as plain text (see TranscriptWriter). + # Like save_audio, only the single-source path sets it; batch rejects --save-transcript. + save_transcript: Path | None = None # Seconds between --llm summary refreshes; <=0 re-runs the chain on every turn. llm_interval: float = 0.0 # Monotonic clock, injectable so the interval throttle is deterministic in tests. clock: Callable[[], float] = time.monotonic transcript: list[str] = field(default_factory=list[str]) + # The open transcript-file writer for a single run; created/closed in _guarded so a + # save target is opened once per session and a Ctrl-C still leaves a flushed file. + _transcript_writer: TranscriptWriter | None = None _callback_lock: threading.RLock = field(default_factory=threading.RLock) _listening_lock: threading.Lock = field(default_factory=threading.Lock) _listening_started: bool = False @@ -176,23 +198,33 @@ def on_turn(self, event: object, *, source_label: str | None = None) -> None: if self.follow is None: with self._callback_lock: self.renderer.turn(event, source=source_label) + line = _finalized_turn_line(event, source_label) + if line is not None: + self._save_line(line) else: # --llm mode locks only to record the turn; the chain re-runs (network) are # left unlocked so the other source's turns keep flowing during a refresh. self._record_turn(event, source_label) + def _save_line(self, line: str) -> None: + """Append one finalized turn to the open --save-transcript/--save-dir file, if any. + + Always called under ``_callback_lock`` so parallel source threads can't interleave + a partial write into the file. + """ + if self._transcript_writer is not None: + self._transcript_writer.write_turn(line) + def _record_turn(self, event: object, source_label: str | None) -> None: - """Append a finalized turn to the running transcript, then refresh the --llm - answer if a refresh is due (every turn, or once per ``llm_interval`` seconds).""" - if not getattr(event, "end_of_turn", False): - return # partials don't change the transcript - text = getattr(event, "transcript", "") or "" - if not text: + """Append a finalized turn to the running transcript (and the saved file), then + refresh the --llm answer if a refresh is due (every turn, or once per + ``llm_interval`` seconds).""" + line = _finalized_turn_line(event, source_label) + if line is None: return - prefix = speaker_prefix(source_label, getattr(event, "speaker_label", None)) - line = f"{prefix[0]}: {text}" if prefix is not None else text with self._callback_lock: self.transcript.append(line) + self._save_line(line) self._maybe_summarize() def _maybe_summarize(self, *, final: bool = False) -> None: @@ -287,6 +319,10 @@ def _guarded(self, work: Callable[[], None], *, handle_interrupt: bool = True) - being swallowed here — the batch driver owns those signals across the whole ``--from-stdin`` sequence, so one Ctrl-C stops the batch rather than just advancing to the next source.""" + if self.save_transcript is not None: + # Open before streaming so a bad path fails up front; the finally closes it + # (flushing the turns so far) even on Ctrl-C or a worker error. + self._transcript_writer = TranscriptWriter(self.save_transcript) try: if self.follow is not None: with self.follow: @@ -311,6 +347,8 @@ def _guarded(self, work: Callable[[], None], *, handle_interrupt: bool = True) - # Downstream consumer (e.g. `| head`) closed the pipe; stop quietly. raise typer.Exit(code=0) from None finally: + if self._transcript_writer is not None: + self._transcript_writer.close() if self.follow is None: self.renderer.close() diff --git a/aai_cli/streaming/transcript.py b/aai_cli/streaming/transcript.py new file mode 100644 index 00000000..e084dc57 --- /dev/null +++ b/aai_cli/streaming/transcript.py @@ -0,0 +1,58 @@ +"""Write finalized turn text to a file — backs `assembly stream --save-transcript`/`--save-dir`. + +Parallels ``record.tee_wav`` (which keeps the raw PCM): this keeps the human-readable +transcript, one finalized turn per line — exactly the plain text ``-o text`` emits — +flushed as each turn lands so a Ctrl-C still leaves the transcript captured so far. +""" + +from __future__ import annotations + +from pathlib import Path + +from aai_cli.core.errors import CLIError + + +def validate_target(path: Path) -> None: + """Reject a transcript path whose parent directory is missing, before streaming. + + Run before credentials/audio are opened so a bad path reads as a path error up + front, not after a session has already started writing into the void. + """ + parent = path.parent + if not parent.is_dir(): + raise CLIError( + f"Cannot save transcript to {path}: {parent} is not a directory.", + error_type="save_transcript_path", + exit_code=2, + suggestion="Create the directory first, or pass a path under an existing one.", + ) + + +class TranscriptWriter: + """Append finalized turn lines to a UTF-8 text file, flushing each one. + + Flushing per turn means a mid-stream stop (Ctrl-C) still leaves a complete + transcript of the turns seen so far, the same robustness ``record.tee_wav`` gives + the audio. + """ + + def __init__(self, path: Path) -> None: + try: + # Open the handle up front so a bad path fails here cleanly, mirroring + # record.tee_wav rather than discovering the error on the first turn. + self._handle = path.open("w", encoding="utf-8") + except OSError as exc: + raise CLIError( + f"Cannot open {path} for writing: {exc}", + error_type="save_transcript_path", + exit_code=2, + ) from exc + + def write_turn(self, line: str) -> None: + """Write one finalized turn as its own line and flush it to disk immediately.""" + self._handle.write(f"{line}\n") + self._handle.flush() + + def close(self) -> None: + """Close the underlying file handle.""" + self._handle.close() diff --git a/tests/__snapshots__/test_snapshots_help_run.ambr b/tests/__snapshots__/test_snapshots_help_run.ambr index 1d2d4cd6..106f47c0 100644 --- a/tests/__snapshots__/test_snapshots_help_run.ambr +++ b/tests/__snapshots__/test_snapshots_help_run.ambr @@ -731,9 +731,17 @@ │ --system-audio-only macOS only: stream │ │ system/app audio without │ │ the microphone │ - │ --save-audio FILE Tee the streamed PCM to │ - │ PATH as a 16-bit mono WAV │ - │ while transcribing │ + ╰──────────────────────────────────────────────────────────────────────────────╯ + ╭─ Saving ─────────────────────────────────────────────────────────────────────╮ + │ --save-audio FILE Tee the streamed PCM to PATH as a 16-bit │ + │ mono WAV while transcribing │ + │ --save-transcript FILE Write the finalized transcript to PATH, │ + │ one turn per line │ + │ --save-dir DIRECTORY Auto-name the transcript and a matching │ + │ WAV under DIR/YYYY-MM-DD/ with a │ + │ timestamped file │ + │ --name TEXT Title to slug into the --save-dir │ + │ filename (e.g. a meeting title) │ ╰──────────────────────────────────────────────────────────────────────────────╯ ╭─ Model & Language ───────────────────────────────────────────────────────────╮ │ --speech-model [universal-streaming-m Streaming speech model │ @@ -837,6 +845,10 @@ $ assembly stream --speaker-labels Save a WAV of the audio while streaming $ assembly stream --save-audio out.wav + Save the transcript text to a file + $ assembly stream --save-transcript notes.txt + Auto-name the transcript + WAV under a dir + $ assembly stream --save-dir ~/recordings --name "Standup" Boost domain terms with keyterm prompts $ assembly stream --keyterms-prompt "AssemblyAI" --keyterms-prompt "Claude" Summarize action items live as you talk diff --git a/tests/test_stream_exec.py b/tests/test_stream_exec.py index 86663595..47af253a 100644 --- a/tests/test_stream_exec.py +++ b/tests/test_stream_exec.py @@ -10,6 +10,7 @@ import dataclasses import wave +from datetime import datetime from pathlib import Path import pytest @@ -63,6 +64,9 @@ output_field=None, show_code=False, save_audio=None, + save_transcript=None, + save_dir=None, + name=None, ) @@ -174,6 +178,9 @@ def test_stream_options_are_immutable(): {"from_stdin": True, "sample_rate": 44100}, {"from_stdin": True, "show_code": True}, # renders one source {"from_stdin": True, "save_audio": Path("out.wav")}, # tees one stream + {"from_stdin": True, "save_transcript": Path("out.txt")}, # saves one transcript + {"from_stdin": True, "save_dir": Path("rec")}, # auto-names one run + {"from_stdin": True, "name": "Standup"}, # --name needs --save-dir ], ) def test_from_stdin_rejects_incompatible_flags(overrides): @@ -307,3 +314,150 @@ def test_save_audio_rejects_missing_parent_dir(tmp_path): json_mode=False, ) assert excinfo.value.error_type == "save_audio_path" + + +# --- --save-transcript / --save-dir (write the transcript text) ------------ +class FakeTurn: + """A streaming turn event with just the attributes the session reads.""" + + def __init__(self, transcript, *, end_of_turn=True, speaker_label=None): + self.transcript = transcript + self.end_of_turn = end_of_turn + self.speaker_label = speaker_label + + +def _emit_turns(*events): + """A fake client.stream_audio that drains the audio (driving any tee) then fires + each turn through the session's on_turn callback, like the real SDK reader.""" + + def _fake(api_key, source, *, params, on_turn, **_kwargs): + b"".join(source) # draining is what writes the tee'd WAV, if any + for event in events: + on_turn(event) + + return _fake + + +def test_save_transcript_writes_only_finalized_nonempty_turns(monkeypatch, tmp_path): + # Each finalized, non-empty turn is one line; partials and empty turns are skipped. + config.set_api_key("default", "sk_live") + out = tmp_path / "notes.txt" + monkeypatch.setattr( + stream_exec.client, + "stream_audio", + _emit_turns( + FakeTurn("partial", end_of_turn=False), # not finalized -> skipped + FakeTurn("hello world"), + FakeTurn("", end_of_turn=True), # finalized but empty -> skipped + FakeTurn("goodbye"), + ), + ) + monkeypatch.setattr(stream_exec, "MicrophoneSource", FakeMic) + + stream_exec.run_stream( + dataclasses.replace(DEFAULTS, save_transcript=out), AppState(), json_mode=True + ) + + assert out.read_text(encoding="utf-8") == "hello world\ngoodbye\n" + + +def test_save_transcript_prefixes_diarized_speaker(monkeypatch, tmp_path): + # A diarized turn is saved with the same "Speaker A:" prefix the text renderer uses. + config.set_api_key("default", "sk_live") + out = tmp_path / "notes.txt" + monkeypatch.setattr( + stream_exec.client, "stream_audio", _emit_turns(FakeTurn("hi", speaker_label="A")) + ) + monkeypatch.setattr(stream_exec, "MicrophoneSource", FakeMic) + + stream_exec.run_stream( + dataclasses.replace(DEFAULTS, save_transcript=out), AppState(), json_mode=True + ) + + assert out.read_text(encoding="utf-8") == "Speaker A: hi\n" + + +def test_no_transcript_file_written_when_flag_unset(monkeypatch, tmp_path): + # Without a save flag the default run leaves no stray .txt (kills a mutant that + # writes unconditionally). + config.set_api_key("default", "sk_live") + monkeypatch.setattr(stream_exec.client, "stream_audio", _emit_turns(FakeTurn("hi"))) + monkeypatch.setattr(stream_exec, "MicrophoneSource", FakeMic) + + stream_exec.run_stream(DEFAULTS, AppState(), json_mode=True) + + assert list(tmp_path.glob("*.txt")) == [] + + +class _FixedDatetime: + """Freezes datetime.now() so the auto-assembled filename is deterministic.""" + + @staticmethod + def now(*_args, **_kwargs): + # Naive local wall-clock; _exec's .astimezone() keeps the same 14:30:05. + return datetime(2026, 6, 16, 14, 30, 5) + + +def test_save_dir_auto_names_transcript_and_matching_wav(monkeypatch, tmp_path): + # --save-dir buckets by date and shares one timestamp+slug stem across the .txt and + # the .wav, so both land together under DIR/YYYY-MM-DD/. + config.set_api_key("default", "sk_live") + monkeypatch.setattr(stream_exec, "datetime", _FixedDatetime) + monkeypatch.setattr(stream_exec.client, "stream_audio", _emit_turns(FakeTurn("hi there"))) + monkeypatch.setattr(stream_exec, "MicrophoneSource", RecordingMic) + + stream_exec.run_stream( + dataclasses.replace(DEFAULTS, save_dir=tmp_path / "rec", name="My Meeting"), + AppState(), + json_mode=True, + ) + + bucket = tmp_path / "rec" / "2026-06-16" + txt = bucket / "2026-06-16-143005-my-meeting.txt" + wav = bucket / "2026-06-16-143005-my-meeting.wav" + assert txt.read_text(encoding="utf-8") == "hi there\n" + with wave.open(str(wav), "rb") as w: + assert w.readframes(w.getnframes()) == RecordingMic.PCM + + +@pytest.mark.parametrize( + "overrides", + [ + {"save_dir": Path("rec"), "save_audio": Path("a.wav")}, # save-dir owns the audio name + {"save_dir": Path("rec"), "save_transcript": Path("a.txt")}, # ...and the transcript + {"save_dir": Path("rec"), "system_audio": True}, # two streams can't share one wav + {"name": "Standup"}, # --name without --save-dir is meaningless + ], +) +def test_save_dir_rejects_incompatible_flags(overrides): + with pytest.raises(UsageError): + stream_exec.run_stream( + dataclasses.replace(DEFAULTS, **overrides), AppState(), json_mode=False + ) + + +@pytest.mark.parametrize( + "overrides", + [ + {"save_transcript": Path("a.txt"), "show_code": True}, + {"save_dir": Path("rec"), "show_code": True}, + ], +) +def test_save_flags_reject_show_code(overrides): + # The generated SDK code doesn't save to disk, so pairing a save flag with --show-code + # is a usage error rather than a silently-dropped save. + with pytest.raises(UsageError): + stream_exec.run_stream( + dataclasses.replace(DEFAULTS, **overrides), AppState(), json_mode=False + ) + + +def test_save_transcript_rejects_missing_parent_dir(tmp_path): + config.set_api_key("default", "sk_live") + with pytest.raises(CLIError) as excinfo: + stream_exec.run_stream( + dataclasses.replace(DEFAULTS, save_transcript=tmp_path / "nope" / "notes.txt"), + AppState(), + json_mode=False, + ) + assert excinfo.value.error_type == "save_transcript_path" diff --git a/tests/test_stream_session.py b/tests/test_stream_session.py index f25dc180..702b39af 100644 --- a/tests/test_stream_session.py +++ b/tests/test_stream_session.py @@ -1,22 +1,11 @@ -import json -import types -from collections.abc import Callable - -from typer.testing import CliRunner - -from aai_cli.core import config -from aai_cli.core.errors import APIError -from aai_cli.main import app - -runner = CliRunner() +"""`assembly stream` session lifecycle + transcript-saving tests. +The --system-audio family lives in test_stream_system_audio.py; this file covers the +shared StreamSession plumbing (the "Listening…" latch, renderer teardown) and the +--save-transcript/--save-dir file writer. +""" -def _capture_source(seen): - def fake(api_key, source, *, params, on_begin=None, on_turn=None, on_termination=None): - seen["source"] = source - seen["rate"] = params.sample_rate - - return fake +import types def test_stream_session_listening_notice_latches(monkeypatch): @@ -81,374 +70,83 @@ def boom(*_args, **_kwargs): assert closed["n"] >= 1 -def test_stream_system_audio_uses_macos_source(monkeypatch) -> None: - config.set_api_key("default", "sk_live") - source_types: list[str] = [] - rates: list[int] = [] - mic_target_rate: list[int | None] = [None] - system_on_open: list[Callable[[], None] | None] = [None] - mic_on_open: list[Callable[[], None] | None] = [None] - - class FakeSystemAudio: - def __init__(self, *, on_open=None): - system_on_open[0] = on_open - self.sample_rate = 16000 - - def __iter__(self): - if system_on_open[0] is not None: - system_on_open[0]() - return iter([b"system"]) - - class FakeMic: - def __init__(self, *, target_rate=None, device=None, capture_rate=None, on_open=None): - mic_target_rate[0] = target_rate - mic_on_open[0] = on_open - self.sample_rate = 16000 - - def __iter__(self): - if mic_on_open[0] is not None: - mic_on_open[0]() - return iter([b"mic"]) - - def fake_stream_audio( - api_key, source, *, params, on_begin=None, on_turn=None, on_termination=None - ): - source_type = type(source).__name__ - source_types.append(source_type) - rates.append(params.sample_rate) - if on_begin: - on_begin(types.SimpleNamespace(id=source_type)) - list(source) - if on_turn: - on_turn(types.SimpleNamespace(transcript=source_type, end_of_turn=True)) - - monkeypatch.setattr("aai_cli.commands.stream._exec.MacSystemAudioSource", FakeSystemAudio) - monkeypatch.setattr("aai_cli.commands.stream._exec.MicrophoneSource", FakeMic) - monkeypatch.setattr("aai_cli.commands.stream._exec.client.stream_audio", fake_stream_audio) - result = runner.invoke(app, ["stream", "--system-audio", "--json"]) - assert result.exit_code == 0 - assert set(source_types) == {"FakeSystemAudio", "FakeMic"} - assert rates == [16000, 16000] - assert mic_target_rate[0] == 16000 - lines = [json.loads(x) for x in result.output.splitlines() if x.strip()] - assert { - "type": "turn", - "transcript": "FakeSystemAudio", - "end_of_turn": True, - "source": "system", - } in lines - assert {"type": "turn", "transcript": "FakeMic", "end_of_turn": True, "source": "you"} in lines - - -def test_stream_system_audio_only_disables_mic(monkeypatch): - config.set_api_key("default", "sk_live") - seen = {} - - class FakeSystemAudio: - def __init__(self, *, on_open=None): - self.sample_rate = 16000 - - def __iter__(self): - return iter([b"\x00\x00"]) - - def fail_mic(**_kwargs): - raise AssertionError("system-audio-only must not open the microphone") - - monkeypatch.setattr("aai_cli.commands.stream._exec.MacSystemAudioSource", FakeSystemAudio) - monkeypatch.setattr("aai_cli.commands.stream._exec.MicrophoneSource", fail_mic) - monkeypatch.setattr("aai_cli.commands.stream._exec.client.stream_audio", _capture_source(seen)) - result = runner.invoke(app, ["stream", "--system-audio-only", "--json"]) - assert result.exit_code == 0 - assert type(seen["source"]).__name__ == "FakeSystemAudio" - - -def test_stream_system_audio_rejects_other_sources(): - config.set_api_key("default", "sk_live") - result = runner.invoke(app, ["stream", "--system-audio", "--sample"]) - assert result.exit_code == 2 - assert "cannot be combined" in result.output - - -def test_stream_system_audio_forwards_mic_device_flags(monkeypatch): - config.set_api_key("default", "sk_live") - seen = {} - - class FakeSystemAudio: - def __init__(self, *, on_open=None): - self.sample_rate = 16000 - - def __iter__(self): - return iter([b"system"]) - - class FakeMic: - def __init__(self, *, target_rate=None, device=None, capture_rate=None, on_open=None): - seen["target_rate"] = target_rate - seen["device"] = device - seen["capture_rate"] = capture_rate - self.sample_rate = target_rate - - def __iter__(self): - return iter([b"mic"]) +def _saving_session(out, *, follow=None, llm_prompts=None): + """A StreamSession wired with a transcript writer for the save-to-file tests.""" + import io - def fake_stream_audio( - api_key, source, *, params, on_begin=None, on_turn=None, on_termination=None - ): - list(source) + from aai_cli.streaming.render import StreamRenderer + from aai_cli.streaming.session import StreamSession + from aai_cli.streaming.transcript import TranscriptWriter - monkeypatch.setattr("aai_cli.commands.stream._exec.MacSystemAudioSource", FakeSystemAudio) - monkeypatch.setattr("aai_cli.commands.stream._exec.MicrophoneSource", FakeMic) - monkeypatch.setattr("aai_cli.commands.stream._exec.client.stream_audio", fake_stream_audio) - result = runner.invoke( - app, - ["stream", "--system-audio", "--device", "2", "--sample-rate", "44100", "--json"], + session = StreamSession( + api_key="sk", + base_flags={}, + overrides=None, + config_file=None, + renderer=StreamRenderer(json_mode=True, out=io.StringIO()), + follow=follow, + llm_prompts=llm_prompts or [], + model="m", + max_tokens=1, + save_transcript=out, ) - assert result.exit_code == 0 - assert seen == {"target_rate": 16000, "device": 2, "capture_rate": 44100} - - -def test_stream_system_audio_llm_prefixes_sources(monkeypatch): - config.set_api_key("default", "sk_live") - transcript_inputs = [] - - class FakeSystemAudio: - def __init__(self, *, on_open=None): - self.sample_rate = 16000 - - def __iter__(self): - return iter([b"system"]) - - class FakeMic: - def __init__(self, *, target_rate=None, device=None, capture_rate=None, on_open=None): - self.sample_rate = target_rate - - def __iter__(self): - return iter([b"mic"]) - - def fake_stream_audio( - api_key, source, *, params, on_begin=None, on_turn=None, on_termination=None - ): - if on_turn: - on_turn(types.SimpleNamespace(transcript="", end_of_turn=True)) - on_turn(types.SimpleNamespace(transcript=type(source).__name__, end_of_turn=True)) - - def fake_run_chain(api_key, prompts, *, transcript_text, model, max_tokens): - transcript_inputs.append(transcript_text) - return "summary" - - monkeypatch.setattr("aai_cli.commands.stream._exec.MacSystemAudioSource", FakeSystemAudio) - monkeypatch.setattr("aai_cli.commands.stream._exec.MicrophoneSource", FakeMic) - monkeypatch.setattr("aai_cli.commands.stream._exec.client.stream_audio", fake_stream_audio) - monkeypatch.setattr("aai_cli.core.llm.run_chain", fake_run_chain) - result = runner.invoke(app, ["stream", "--system-audio", "--llm", "summarize", "--json"]) - assert result.exit_code == 0 - assert any("System: FakeSystemAudio" in value for value in transcript_inputs) - assert any("You: FakeMic" in value for value in transcript_inputs) - - -def test_stream_system_audio_speaker_labels_only_diarizes_system(monkeypatch): - # --speaker-labels diarizes the system audio but never the mic: the "you" session - # is forced to speaker_labels=False so the mic stays a single "You". - config.set_api_key("default", "sk_live") - speaker_labels_by_chunk = {} - - class FakeSystemAudio: - def __init__(self, *, on_open=None): - self.sample_rate = 16000 - - def __iter__(self): - return iter([b"system"]) - - class FakeMic: - def __init__(self, *, target_rate=None, device=None, capture_rate=None, on_open=None): - self.sample_rate = target_rate - - def __iter__(self): - return iter([b"mic"]) + session._transcript_writer = TranscriptWriter(out) + return session - def fake_stream_audio( - api_key, source, *, params, on_begin=None, on_turn=None, on_termination=None - ): - chunk = next(iter(source)) - speaker_labels_by_chunk[chunk] = params.speaker_labels - monkeypatch.setattr("aai_cli.commands.stream._exec.MacSystemAudioSource", FakeSystemAudio) - monkeypatch.setattr("aai_cli.commands.stream._exec.MicrophoneSource", FakeMic) - monkeypatch.setattr("aai_cli.commands.stream._exec.client.stream_audio", fake_stream_audio) - result = runner.invoke(app, ["stream", "--system-audio", "--speaker-labels", "--json"]) - assert result.exit_code == 0 - assert speaker_labels_by_chunk[b"system"] is True - assert speaker_labels_by_chunk[b"mic"] is False +def test_record_turn_saves_to_file_in_llm_mode(monkeypatch, tmp_path): + # In --llm (follow) mode on_turn routes through _record_turn, which must also append + # the finalized turn to the open --save-transcript file (pins the _save_line call there). + from aai_cli.streaming import session as session_mod + monkeypatch.setattr(session_mod.llm, "run_chain", lambda *a, **k: "answer") + out = tmp_path / "notes.txt" + summaries: list[str] = [] + session = _saving_session( + out, follow=lambda answer, turns: summaries.append(answer), llm_prompts=["go"] + ) -def test_stream_system_audio_parallel_final_worker_error_surfaces(monkeypatch): - config.set_api_key("default", "sk_live") - - class FakeSystemAudio: - def __init__(self, *, on_open=None): - self.sample_rate = 16000 - - def __iter__(self): - return iter([b"system"]) - - class FakeMic: - def __init__(self, *, target_rate=None, device=None, capture_rate=None, on_open=None): - self.sample_rate = target_rate - - def __iter__(self): - return iter([b"mic"]) - - daemons = [] - - class ImmediateThread: - def __init__(self, *, target, args, daemon): - self._target = target - self._args = args - daemons.append(daemon) - - def start(self): - self._target(*self._args) - - def is_alive(self): - return False - - def join(self, timeout=None): - return None - - def fake_stream_audio( - api_key, source, *, params, on_begin=None, on_turn=None, on_termination=None - ): - raise APIError(f"{type(source).__name__} failed") - - monkeypatch.setattr("aai_cli.commands.stream._exec.MacSystemAudioSource", FakeSystemAudio) - monkeypatch.setattr("aai_cli.commands.stream._exec.MicrophoneSource", FakeMic) - monkeypatch.setattr("aai_cli.commands.stream._exec.client.stream_audio", fake_stream_audio) - monkeypatch.setattr("aai_cli.streaming.session.threading.Thread", ImmediateThread) - result = runner.invoke(app, ["stream", "--system-audio", "--json"]) - assert result.exit_code == 1 - assert "failed" in result.output - # Both source workers run as daemons so a wedged stream can't block process exit. - assert daemons and all(d is True for d in daemons) - - -def test_stream_system_audio_parallel_unexpected_worker_error_fails_the_run(monkeypatch): - # A non-CLIError bug inside a worker must still fail the run with a clean error: - # uncaught, it would die with the daemon thread and the command would exit 0 - # for a stream that actually failed. - config.set_api_key("default", "sk_live") - - class FakeSystemAudio: - def __init__(self, *, on_open=None): - self.sample_rate = 16000 - - def __iter__(self): - return iter([b"system"]) - - class FakeMic: - def __init__(self, *, target_rate=None, device=None, capture_rate=None, on_open=None): - self.sample_rate = target_rate - - def __iter__(self): - return iter([b"mic"]) - - class ImmediateThread: - def __init__(self, *, target, args, daemon): - self._target = target - self._args = args - - def start(self): - self._target(*self._args) - - def is_alive(self): - return False - - def join(self, timeout=None): - return None - - def fake_stream_audio(api_key, source, *, params, **_kwargs): - raise RuntimeError("event parsing blew up") - - monkeypatch.setattr("aai_cli.commands.stream._exec.MacSystemAudioSource", FakeSystemAudio) - monkeypatch.setattr("aai_cli.commands.stream._exec.MicrophoneSource", FakeMic) - monkeypatch.setattr("aai_cli.commands.stream._exec.client.stream_audio", fake_stream_audio) - monkeypatch.setattr("aai_cli.streaming.session.threading.Thread", ImmediateThread) - result = runner.invoke(app, ["stream", "--system-audio", "--json"]) - assert result.exit_code == 1 - # Normalized to a clean worker error that names the source and the cause. - assert "Streaming worker" in result.output - assert "event parsing blew up" in result.output - assert "Traceback" not in result.output - - -def test_stream_system_audio_parallel_keyboard_interrupt_exits_cleanly(monkeypatch): - config.set_api_key("default", "sk_live") - - class FakeSystemAudio: - def __init__(self, *, on_open=None): - self.sample_rate = 16000 - - class FakeMic: - def __init__(self, *, target_rate=None, device=None, capture_rate=None, on_open=None): - self.sample_rate = target_rate - - class InterruptingThread: - def __init__(self, *, target, args, daemon): - pass - - def start(self): - raise KeyboardInterrupt - - monkeypatch.setattr("aai_cli.commands.stream._exec.MacSystemAudioSource", FakeSystemAudio) - monkeypatch.setattr("aai_cli.commands.stream._exec.MicrophoneSource", FakeMic) - monkeypatch.setattr("aai_cli.streaming.session.threading.Thread", InterruptingThread) - result = runner.invoke(app, ["stream", "--system-audio"]) - assert result.exit_code == 0 - assert "Stopped." in result.output - - -def test_stream_system_audio_parallel_broken_pipe_exits_zero(monkeypatch): - config.set_api_key("default", "sk_live") - - class FakeSystemAudio: - def __init__(self, *, on_open=None): - self.sample_rate = 16000 - - class FakeMic: - def __init__(self, *, target_rate=None, device=None, capture_rate=None, on_open=None): - self.sample_rate = target_rate - - class BrokenPipeThread: - def __init__(self, *, target, args, daemon): - pass - - def start(self): - raise BrokenPipeError + session.on_turn(types.SimpleNamespace(transcript="hello", end_of_turn=True, speaker_label=None)) + writer = session._transcript_writer + assert writer is not None + writer.close() - monkeypatch.setattr("aai_cli.commands.stream._exec.MacSystemAudioSource", FakeSystemAudio) - monkeypatch.setattr("aai_cli.commands.stream._exec.MicrophoneSource", FakeMic) - monkeypatch.setattr("aai_cli.streaming.session.threading.Thread", BrokenPipeThread) - result = runner.invoke(app, ["stream", "--system-audio"]) - assert result.exit_code == 0 + assert out.read_text(encoding="utf-8") == "hello\n" + assert session.transcript == ["hello"] # still recorded for the --llm chain + assert summaries == ["answer"] # the chain refreshed off the saved turn -def test_stream_system_audio_only_rejects_mic_device_flags(): - config.set_api_key("default", "sk_live") - result = runner.invoke(app, ["stream", "--system-audio-only", "--device", "2"]) - assert result.exit_code == 2 - assert "--device" in result.output +def test_guarded_closes_transcript_writer(monkeypatch, tmp_path): + # The writer opened for a run is closed in _guarded's finally, even on a clean run + # (pins the close() in the finally block — flush-per-turn alone wouldn't release it). + from aai_cli.streaming import session as session_mod + from aai_cli.streaming.render import StreamRenderer + from aai_cli.streaming.session import StreamSession - result = runner.invoke(app, ["stream", "--system-audio-only", "--sample-rate", "44100"]) - assert result.exit_code == 2 - assert "--sample-rate" in result.output + closed = {"n": 0} + real = session_mod.TranscriptWriter + class SpyWriter(real): + def close(self): + closed["n"] += 1 + super().close() -def test_stream_system_audio_rejects_both_modes(): - config.set_api_key("default", "sk_live") - result = runner.invoke(app, ["stream", "--system-audio", "--system-audio-only"]) - assert result.exit_code == 2 - assert "--system-audio and --system-audio-only can't be combined." in result.output + monkeypatch.setattr(session_mod, "TranscriptWriter", SpyWriter) + monkeypatch.setattr(session_mod.client, "stream_audio", lambda *a, **k: list(a[1])) + import io -def test_stream_show_code_rejects_system_audio(): - result = runner.invoke(app, ["stream", "--system-audio", "--show-code"]) - assert result.exit_code == 2 - assert "--show-code" in result.output + session = StreamSession( + api_key="sk", + base_flags={"speech_model": "u3-rt-pro"}, + overrides=None, + config_file=None, + renderer=StreamRenderer(json_mode=True, out=io.StringIO()), + follow=None, + llm_prompts=[], + model="m", + max_tokens=1, + save_transcript=tmp_path / "notes.txt", + ) + session.run([b"\x00\x00"], 16000) + assert closed["n"] == 1 diff --git a/tests/test_stream_system_audio.py b/tests/test_stream_system_audio.py new file mode 100644 index 00000000..d47bdc6e --- /dev/null +++ b/tests/test_stream_system_audio.py @@ -0,0 +1,399 @@ +"""`assembly stream` system-audio capture tests (macOS parallel mic + system sessions). + +Split out of test_stream_session.py: the --system-audio family is a cohesive block +(parallel sessions, worker-error propagation, the mic/system source wiring) large +enough to live on its own under the 500-line file gate. +""" + +import json +import types +from collections.abc import Callable + +from typer.testing import CliRunner + +from aai_cli.core import config +from aai_cli.core.errors import APIError +from aai_cli.main import app + +runner = CliRunner() + + +def _capture_source(seen): + def fake(api_key, source, *, params, on_begin=None, on_turn=None, on_termination=None): + seen["source"] = source + seen["rate"] = params.sample_rate + + return fake + + +def test_stream_system_audio_uses_macos_source(monkeypatch) -> None: + config.set_api_key("default", "sk_live") + source_types: list[str] = [] + rates: list[int] = [] + mic_target_rate: list[int | None] = [None] + system_on_open: list[Callable[[], None] | None] = [None] + mic_on_open: list[Callable[[], None] | None] = [None] + + class FakeSystemAudio: + def __init__(self, *, on_open=None): + system_on_open[0] = on_open + self.sample_rate = 16000 + + def __iter__(self): + if system_on_open[0] is not None: + system_on_open[0]() + return iter([b"system"]) + + class FakeMic: + def __init__(self, *, target_rate=None, device=None, capture_rate=None, on_open=None): + mic_target_rate[0] = target_rate + mic_on_open[0] = on_open + self.sample_rate = 16000 + + def __iter__(self): + if mic_on_open[0] is not None: + mic_on_open[0]() + return iter([b"mic"]) + + def fake_stream_audio( + api_key, source, *, params, on_begin=None, on_turn=None, on_termination=None + ): + source_type = type(source).__name__ + source_types.append(source_type) + rates.append(params.sample_rate) + if on_begin: + on_begin(types.SimpleNamespace(id=source_type)) + list(source) + if on_turn: + on_turn(types.SimpleNamespace(transcript=source_type, end_of_turn=True)) + + monkeypatch.setattr("aai_cli.commands.stream._exec.MacSystemAudioSource", FakeSystemAudio) + monkeypatch.setattr("aai_cli.commands.stream._exec.MicrophoneSource", FakeMic) + monkeypatch.setattr("aai_cli.commands.stream._exec.client.stream_audio", fake_stream_audio) + result = runner.invoke(app, ["stream", "--system-audio", "--json"]) + assert result.exit_code == 0 + assert set(source_types) == {"FakeSystemAudio", "FakeMic"} + assert rates == [16000, 16000] + assert mic_target_rate[0] == 16000 + lines = [json.loads(x) for x in result.output.splitlines() if x.strip()] + assert { + "type": "turn", + "transcript": "FakeSystemAudio", + "end_of_turn": True, + "source": "system", + } in lines + assert {"type": "turn", "transcript": "FakeMic", "end_of_turn": True, "source": "you"} in lines + + +def test_stream_system_audio_only_disables_mic(monkeypatch): + config.set_api_key("default", "sk_live") + seen = {} + + class FakeSystemAudio: + def __init__(self, *, on_open=None): + self.sample_rate = 16000 + + def __iter__(self): + return iter([b"\x00\x00"]) + + def fail_mic(**_kwargs): + raise AssertionError("system-audio-only must not open the microphone") + + monkeypatch.setattr("aai_cli.commands.stream._exec.MacSystemAudioSource", FakeSystemAudio) + monkeypatch.setattr("aai_cli.commands.stream._exec.MicrophoneSource", fail_mic) + monkeypatch.setattr("aai_cli.commands.stream._exec.client.stream_audio", _capture_source(seen)) + result = runner.invoke(app, ["stream", "--system-audio-only", "--json"]) + assert result.exit_code == 0 + assert type(seen["source"]).__name__ == "FakeSystemAudio" + + +def test_stream_system_audio_rejects_other_sources(): + config.set_api_key("default", "sk_live") + result = runner.invoke(app, ["stream", "--system-audio", "--sample"]) + assert result.exit_code == 2 + assert "cannot be combined" in result.output + + +def test_stream_system_audio_forwards_mic_device_flags(monkeypatch): + config.set_api_key("default", "sk_live") + seen = {} + + class FakeSystemAudio: + def __init__(self, *, on_open=None): + self.sample_rate = 16000 + + def __iter__(self): + return iter([b"system"]) + + class FakeMic: + def __init__(self, *, target_rate=None, device=None, capture_rate=None, on_open=None): + seen["target_rate"] = target_rate + seen["device"] = device + seen["capture_rate"] = capture_rate + self.sample_rate = target_rate + + def __iter__(self): + return iter([b"mic"]) + + def fake_stream_audio( + api_key, source, *, params, on_begin=None, on_turn=None, on_termination=None + ): + list(source) + + monkeypatch.setattr("aai_cli.commands.stream._exec.MacSystemAudioSource", FakeSystemAudio) + monkeypatch.setattr("aai_cli.commands.stream._exec.MicrophoneSource", FakeMic) + monkeypatch.setattr("aai_cli.commands.stream._exec.client.stream_audio", fake_stream_audio) + result = runner.invoke( + app, + ["stream", "--system-audio", "--device", "2", "--sample-rate", "44100", "--json"], + ) + assert result.exit_code == 0 + assert seen == {"target_rate": 16000, "device": 2, "capture_rate": 44100} + + +def test_stream_system_audio_llm_prefixes_sources(monkeypatch): + config.set_api_key("default", "sk_live") + transcript_inputs = [] + + class FakeSystemAudio: + def __init__(self, *, on_open=None): + self.sample_rate = 16000 + + def __iter__(self): + return iter([b"system"]) + + class FakeMic: + def __init__(self, *, target_rate=None, device=None, capture_rate=None, on_open=None): + self.sample_rate = target_rate + + def __iter__(self): + return iter([b"mic"]) + + def fake_stream_audio( + api_key, source, *, params, on_begin=None, on_turn=None, on_termination=None + ): + if on_turn: + on_turn(types.SimpleNamespace(transcript="", end_of_turn=True)) + on_turn(types.SimpleNamespace(transcript=type(source).__name__, end_of_turn=True)) + + def fake_run_chain(api_key, prompts, *, transcript_text, model, max_tokens): + transcript_inputs.append(transcript_text) + return "summary" + + monkeypatch.setattr("aai_cli.commands.stream._exec.MacSystemAudioSource", FakeSystemAudio) + monkeypatch.setattr("aai_cli.commands.stream._exec.MicrophoneSource", FakeMic) + monkeypatch.setattr("aai_cli.commands.stream._exec.client.stream_audio", fake_stream_audio) + monkeypatch.setattr("aai_cli.core.llm.run_chain", fake_run_chain) + result = runner.invoke(app, ["stream", "--system-audio", "--llm", "summarize", "--json"]) + assert result.exit_code == 0 + assert any("System: FakeSystemAudio" in value for value in transcript_inputs) + assert any("You: FakeMic" in value for value in transcript_inputs) + + +def test_stream_system_audio_speaker_labels_only_diarizes_system(monkeypatch): + # --speaker-labels diarizes the system audio but never the mic: the "you" session + # is forced to speaker_labels=False so the mic stays a single "You". + config.set_api_key("default", "sk_live") + speaker_labels_by_chunk = {} + + class FakeSystemAudio: + def __init__(self, *, on_open=None): + self.sample_rate = 16000 + + def __iter__(self): + return iter([b"system"]) + + class FakeMic: + def __init__(self, *, target_rate=None, device=None, capture_rate=None, on_open=None): + self.sample_rate = target_rate + + def __iter__(self): + return iter([b"mic"]) + + def fake_stream_audio( + api_key, source, *, params, on_begin=None, on_turn=None, on_termination=None + ): + chunk = next(iter(source)) + speaker_labels_by_chunk[chunk] = params.speaker_labels + + monkeypatch.setattr("aai_cli.commands.stream._exec.MacSystemAudioSource", FakeSystemAudio) + monkeypatch.setattr("aai_cli.commands.stream._exec.MicrophoneSource", FakeMic) + monkeypatch.setattr("aai_cli.commands.stream._exec.client.stream_audio", fake_stream_audio) + result = runner.invoke(app, ["stream", "--system-audio", "--speaker-labels", "--json"]) + assert result.exit_code == 0 + assert speaker_labels_by_chunk[b"system"] is True + assert speaker_labels_by_chunk[b"mic"] is False + + +def test_stream_system_audio_parallel_final_worker_error_surfaces(monkeypatch): + config.set_api_key("default", "sk_live") + + class FakeSystemAudio: + def __init__(self, *, on_open=None): + self.sample_rate = 16000 + + def __iter__(self): + return iter([b"system"]) + + class FakeMic: + def __init__(self, *, target_rate=None, device=None, capture_rate=None, on_open=None): + self.sample_rate = target_rate + + def __iter__(self): + return iter([b"mic"]) + + daemons = [] + + class ImmediateThread: + def __init__(self, *, target, args, daemon): + self._target = target + self._args = args + daemons.append(daemon) + + def start(self): + self._target(*self._args) + + def is_alive(self): + return False + + def join(self, timeout=None): + return None + + def fake_stream_audio( + api_key, source, *, params, on_begin=None, on_turn=None, on_termination=None + ): + raise APIError(f"{type(source).__name__} failed") + + monkeypatch.setattr("aai_cli.commands.stream._exec.MacSystemAudioSource", FakeSystemAudio) + monkeypatch.setattr("aai_cli.commands.stream._exec.MicrophoneSource", FakeMic) + monkeypatch.setattr("aai_cli.commands.stream._exec.client.stream_audio", fake_stream_audio) + monkeypatch.setattr("aai_cli.streaming.session.threading.Thread", ImmediateThread) + result = runner.invoke(app, ["stream", "--system-audio", "--json"]) + assert result.exit_code == 1 + assert "failed" in result.output + # Both source workers run as daemons so a wedged stream can't block process exit. + assert daemons and all(d is True for d in daemons) + + +def test_stream_system_audio_parallel_unexpected_worker_error_fails_the_run(monkeypatch): + # A non-CLIError bug inside a worker must still fail the run with a clean error: + # uncaught, it would die with the daemon thread and the command would exit 0 + # for a stream that actually failed. + config.set_api_key("default", "sk_live") + + class FakeSystemAudio: + def __init__(self, *, on_open=None): + self.sample_rate = 16000 + + def __iter__(self): + return iter([b"system"]) + + class FakeMic: + def __init__(self, *, target_rate=None, device=None, capture_rate=None, on_open=None): + self.sample_rate = target_rate + + def __iter__(self): + return iter([b"mic"]) + + class ImmediateThread: + def __init__(self, *, target, args, daemon): + self._target = target + self._args = args + + def start(self): + self._target(*self._args) + + def is_alive(self): + return False + + def join(self, timeout=None): + return None + + def fake_stream_audio(api_key, source, *, params, **_kwargs): + raise RuntimeError("event parsing blew up") + + monkeypatch.setattr("aai_cli.commands.stream._exec.MacSystemAudioSource", FakeSystemAudio) + monkeypatch.setattr("aai_cli.commands.stream._exec.MicrophoneSource", FakeMic) + monkeypatch.setattr("aai_cli.commands.stream._exec.client.stream_audio", fake_stream_audio) + monkeypatch.setattr("aai_cli.streaming.session.threading.Thread", ImmediateThread) + result = runner.invoke(app, ["stream", "--system-audio", "--json"]) + assert result.exit_code == 1 + # Normalized to a clean worker error that names the source and the cause. + assert "Streaming worker" in result.output + assert "event parsing blew up" in result.output + assert "Traceback" not in result.output + + +def test_stream_system_audio_parallel_keyboard_interrupt_exits_cleanly(monkeypatch): + config.set_api_key("default", "sk_live") + + class FakeSystemAudio: + def __init__(self, *, on_open=None): + self.sample_rate = 16000 + + class FakeMic: + def __init__(self, *, target_rate=None, device=None, capture_rate=None, on_open=None): + self.sample_rate = target_rate + + class InterruptingThread: + def __init__(self, *, target, args, daemon): + pass + + def start(self): + raise KeyboardInterrupt + + monkeypatch.setattr("aai_cli.commands.stream._exec.MacSystemAudioSource", FakeSystemAudio) + monkeypatch.setattr("aai_cli.commands.stream._exec.MicrophoneSource", FakeMic) + monkeypatch.setattr("aai_cli.streaming.session.threading.Thread", InterruptingThread) + result = runner.invoke(app, ["stream", "--system-audio"]) + assert result.exit_code == 0 + assert "Stopped." in result.output + + +def test_stream_system_audio_parallel_broken_pipe_exits_zero(monkeypatch): + config.set_api_key("default", "sk_live") + + class FakeSystemAudio: + def __init__(self, *, on_open=None): + self.sample_rate = 16000 + + class FakeMic: + def __init__(self, *, target_rate=None, device=None, capture_rate=None, on_open=None): + self.sample_rate = target_rate + + class BrokenPipeThread: + def __init__(self, *, target, args, daemon): + pass + + def start(self): + raise BrokenPipeError + + monkeypatch.setattr("aai_cli.commands.stream._exec.MacSystemAudioSource", FakeSystemAudio) + monkeypatch.setattr("aai_cli.commands.stream._exec.MicrophoneSource", FakeMic) + monkeypatch.setattr("aai_cli.streaming.session.threading.Thread", BrokenPipeThread) + result = runner.invoke(app, ["stream", "--system-audio"]) + assert result.exit_code == 0 + + +def test_stream_system_audio_only_rejects_mic_device_flags(): + config.set_api_key("default", "sk_live") + result = runner.invoke(app, ["stream", "--system-audio-only", "--device", "2"]) + assert result.exit_code == 2 + assert "--device" in result.output + + result = runner.invoke(app, ["stream", "--system-audio-only", "--sample-rate", "44100"]) + assert result.exit_code == 2 + assert "--sample-rate" in result.output + + +def test_stream_system_audio_rejects_both_modes(): + config.set_api_key("default", "sk_live") + result = runner.invoke(app, ["stream", "--system-audio", "--system-audio-only"]) + assert result.exit_code == 2 + assert "--system-audio and --system-audio-only can't be combined." in result.output + + +def test_stream_show_code_rejects_system_audio(): + result = runner.invoke(app, ["stream", "--system-audio", "--show-code"]) + assert result.exit_code == 2 + assert "--show-code" in result.output diff --git a/tests/test_streaming_naming.py b/tests/test_streaming_naming.py new file mode 100644 index 00000000..a2d5bf48 --- /dev/null +++ b/tests/test_streaming_naming.py @@ -0,0 +1,61 @@ +"""Unit tests for aai_cli.streaming.naming — the --save-dir filename assembly.""" + +from __future__ import annotations + +from datetime import datetime +from pathlib import Path + +import pytest + +from aai_cli.core.errors import CLIError +from aai_cli.streaming import naming + +NOW = datetime(2026, 6, 16, 14, 30, 5) + + +@pytest.mark.parametrize( + ("title", "expected"), + [ + ("Weekly Standup", "weekly-standup"), + ("Weekly Standup!! (Eng)", "weekly-standup-eng"), # runs collapse, edges stripped + ("ALL CAPS", "all-caps"), # lowercased + ("---trim---", "trim"), # leading/trailing separators dropped + ("!!!", ""), # nothing usable -> empty, so the caller drops the suffix + ], +) +def test_slugify(title, expected): + assert naming.slugify(title) == expected + + +def test_slugify_caps_length_without_trailing_hyphen(): + # A long title is capped at 80 chars; a hyphen landing on the cut boundary is stripped. + slug = naming.slugify("a" * 79 + " more words here") + assert slug == "a" * 79 # the space would have become the 80th char's hyphen, trimmed + + +def test_resolve_buckets_by_date_with_slugged_name(): + paths = naming.resolve(Path("rec"), "My Meeting", now=NOW) + assert paths.transcript == Path("rec/2026-06-16/2026-06-16-143005-my-meeting.txt") + assert paths.audio == Path("rec/2026-06-16/2026-06-16-143005-my-meeting.wav") + + +def test_resolve_without_name_is_just_the_timestamp(): + # No --name (or a name that slugs to nothing) -> the stem is the bare timestamp, + # never a trailing-hyphen filename. + assert naming.resolve(Path("rec"), None, now=NOW).transcript.name == "2026-06-16-143005.txt" + assert naming.resolve(Path("rec"), "!!!", now=NOW).transcript.name == "2026-06-16-143005.txt" + + +def test_ensure_dir_creates_nested_dirs(tmp_path): + target = tmp_path / "rec" / "2026-06-16" + naming.ensure_dir(target) + assert target.is_dir() + + +def test_ensure_dir_raises_clean_error_when_path_is_blocked(tmp_path): + # A parent component that is a file, not a directory, can't be turned into a dir. + blocker = tmp_path / "blocker" + blocker.write_text("not a dir") + with pytest.raises(CLIError) as excinfo: + naming.ensure_dir(blocker / "2026-06-16") + assert excinfo.value.error_type == "save_dir_path" diff --git a/tests/test_streaming_transcript.py b/tests/test_streaming_transcript.py new file mode 100644 index 00000000..303f80f6 --- /dev/null +++ b/tests/test_streaming_transcript.py @@ -0,0 +1,38 @@ +"""Unit tests for aai_cli.streaming.transcript — the --save-transcript text writer.""" + +from __future__ import annotations + +import pytest + +from aai_cli.core.errors import CLIError +from aai_cli.streaming.transcript import TranscriptWriter, validate_target + + +def test_validate_target_rejects_missing_parent_dir(tmp_path): + with pytest.raises(CLIError) as excinfo: + validate_target(tmp_path / "nope" / "notes.txt") + assert excinfo.value.error_type == "save_transcript_path" + + +def test_validate_target_accepts_existing_parent(tmp_path): + validate_target(tmp_path / "notes.txt") # parent exists -> no raise + + +def test_writer_appends_one_line_per_turn_and_flushes(tmp_path): + out = tmp_path / "notes.txt" + writer = TranscriptWriter(out) + try: + writer.write_turn("hello world") + # Flushed per turn: the line is on disk before close, so a Ctrl-C keeps it. + assert out.read_text(encoding="utf-8") == "hello world\n" + writer.write_turn("Speaker A: second turn") + finally: + writer.close() + assert out.read_text(encoding="utf-8") == "hello world\nSpeaker A: second turn\n" + + +def test_writer_raises_clean_error_on_unopenable_path(tmp_path): + # The parent doesn't exist, so opening for write fails -> a clean CLIError, not OSError. + with pytest.raises(CLIError) as excinfo: + TranscriptWriter(tmp_path / "missing" / "notes.txt") + assert excinfo.value.error_type == "save_transcript_path"