Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 32 additions & 1 deletion aai_cli/commands/stream/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@
("Stream the hosted sample", "assembly stream --sample"),
("Label speakers in the live transcript", "assembly stream --speaker-labels"),
("Save a WAV of the audio while streaming", "assembly stream --save-audio out.wav"),
("Save the transcript text to a file", "assembly stream --save-transcript notes.txt"),
(
"Auto-name the transcript + WAV under a dir",
'assembly stream --save-dir ~/recordings --name "Standup"',
),
(
"Boost domain terms with keyterm prompts",
'assembly stream --keyterms-prompt "AssemblyAI" --keyterms-prompt "Claude"',
Expand Down Expand Up @@ -83,16 +88,39 @@ def stream(
help="macOS only: stream system/app audio without the microphone",
rich_help_panel=help_panels.OPT_CAPTURE,
),
# saving
save_audio: Path | None = typer.Option(
None,
"--save-audio",
help="Tee the streamed PCM to PATH as a 16-bit mono WAV while transcribing",
rich_help_panel=help_panels.OPT_CAPTURE,
rich_help_panel=help_panels.OPT_SAVING,
dir_okay=False,
# Click guardrail; flipping it changes no behavior a unit test can observe
# (and the writable check is a no-op under the test runner's root uid).
writable=True, # pragma: no mutate
),
save_transcript: Path | None = typer.Option(
None,
"--save-transcript",
help="Write the finalized transcript to PATH, one turn per line",
rich_help_panel=help_panels.OPT_SAVING,
dir_okay=False,
writable=True, # pragma: no mutate
),
save_dir: Path | None = typer.Option(
None,
"--save-dir",
help="Auto-name the transcript and a matching WAV under DIR/YYYY-MM-DD/ "
"with a timestamped file",
rich_help_panel=help_panels.OPT_SAVING,
file_okay=False,
),
name: str | None = typer.Option(
None,
"--name",
help="Title to slug into the --save-dir filename (e.g. a meeting title)",
rich_help_panel=help_panels.OPT_SAVING,
),
# model & input
speech_model: SpeechModel = typer.Option(
DEFAULT_SPEECH_MODEL,
Expand Down Expand Up @@ -367,5 +395,8 @@ def stream(
output_field=output_field,
show_code=show_code,
save_audio=save_audio,
save_transcript=save_transcript,
save_dir=save_dir,
name=name,
)
run_with_options(ctx, stream_exec.run_stream, opts, json=json_out)
93 changes: 77 additions & 16 deletions aai_cli/commands/stream/_exec.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import tempfile
from collections.abc import Iterable
from dataclasses import dataclass
from datetime import UTC, datetime
from pathlib import Path

from assemblyai import PIISubstitutionPolicy
Expand All @@ -22,7 +23,7 @@
from aai_cli.core import choices, client, config_builder, stdio, youtube
from aai_cli.core.errors import UsageError, mutually_exclusive
from aai_cli.core.microphone import MicrophoneSource
from aai_cli.streaming import record, turn_presets
from aai_cli.streaming import naming, record, transcript, turn_presets
from aai_cli.streaming.macos import MacSystemAudioSource
from aai_cli.streaming.render import StreamRenderer
from aai_cli.streaming.session import (
Expand Down Expand Up @@ -86,6 +87,9 @@ class StreamOptions:
output_field: choices.TextOrJson | None
show_code: bool
save_audio: Path | None
save_transcript: Path | None
save_dir: Path | None
name: str | None

def source_options(self) -> SourceOptions:
"""The audio-input subset, in the shape the validation/dispatch helpers read."""
Expand Down Expand Up @@ -179,6 +183,70 @@ def _print_show_code(
output.print_code(code_gen.stream(merged, llm=gateway, source=code_source))


def _reject_save_with_show_code(opts: StreamOptions) -> None:
"""Reject any save flag combined with --show-code: the generated SDK code never
writes audio or a transcript to disk, so silently dropping the save would mislead."""
for flag, given in (
("--save-audio", opts.save_audio is not None),
("--save-transcript", opts.save_transcript is not None),
("--save-dir", opts.save_dir is not None),
):
if given:
raise UsageError(
f"{flag} cannot be combined with --show-code; the generated SDK code "
"does not save to disk."
)


def _resolve_save_targets(
opts: StreamOptions, sources: SourceOptions
) -> tuple[Path | None, Path | None]:
"""Resolve the save flags into the (audio, transcript) paths the session writes.

``--save-dir`` owns filename assembly — it auto-names both the transcript and a
matching WAV under ``DIR/YYYY-MM-DD/`` — so it can't be combined with the explicit
``--save-audio``/``--save-transcript`` paths, and ``--name`` only feeds that assembly.
Audio can't tee to a single WAV under ``--system-audio`` (two streams), which rejects
both the explicit ``--save-audio`` and ``--save-dir``'s audio leg.
"""
if opts.save_dir is not None:
mutually_exclusive(
("--save-dir", True),
("--save-audio", opts.save_audio is not None),
("--save-transcript", opts.save_transcript is not None),
suggestion="--save-dir names the files for you; drop the explicit path.",
)
if sources.from_system_audio:
raise UsageError(
"--save-dir cannot be combined with --system-audio; the mic and system "
"streams can't share one recording.",
suggestion="Record a single source (mic, file, URL, or - on stdin).",
)
# Local wall-clock time (what a meeting filename wants); the explicit utc-then-
# astimezone keeps the now() call timezone-aware for the linter.
now = datetime.now(UTC).astimezone()
paths = naming.resolve(opts.save_dir, opts.name, now=now)
naming.ensure_dir(paths.transcript.parent)
return paths.audio, paths.transcript
if opts.name is not None:
raise UsageError(
"--name applies only with --save-dir.",
suggestion="Pass --save-dir DIR to auto-name the files, "
"or --save-transcript PATH for an explicit path.",
)
if opts.save_audio is not None:
if sources.from_system_audio:
raise UsageError(
"--save-audio cannot be combined with --system-audio; the mic and system "
"streams can't share one file.",
suggestion="Record a single source (mic, file, URL, or - on stdin).",
)
record.validate_target(opts.save_audio)
if opts.save_transcript is not None:
transcript.validate_target(opts.save_transcript)
return opts.save_audio, opts.save_transcript


def _dispatch(session: StreamSession, opts: SourceOptions) -> None:
"""Open the right audio source(s) for the flags and stream them."""
if opts.from_system_audio:
Expand Down Expand Up @@ -249,7 +317,10 @@ def _collect_batch_sources(opts: StreamOptions, *, text_mode: bool) -> list[str]
mutually_exclusive(
("--from-stdin", True),
("--save-audio", opts.save_audio is not None),
suggestion="--save-audio tees one stream; run a single source to record it.",
("--save-transcript", opts.save_transcript is not None),
("--save-dir", opts.save_dir is not None),
("--name", opts.name is not None),
suggestion="--from-stdin streams many sources; saving applies to a single run.",
)
mutually_exclusive(
("--llm", bool(opts.llm_prompt)),
Expand Down Expand Up @@ -311,25 +382,14 @@ def run_stream(opts: StreamOptions, state: AppState, *, json_mode: bool) -> None
base_flags = opts.base_flags()

if opts.show_code:
if opts.save_audio is not None:
raise UsageError(
"--save-audio cannot be combined with --show-code; the generated SDK "
"code does not tee audio to disk."
)
_reject_save_with_show_code(opts)
_print_show_code(opts, sources, base_flags, text_mode=text_mode)
return

# Validate the requested sources (including that a local file exists) before
# credentials, so a typo'd path reads as "file not found" — not as a login.
validate_sources(sources, has_llm=bool(opts.llm_prompt), text_mode=text_mode)
if opts.save_audio is not None:
if sources.from_system_audio:
raise UsageError(
"--save-audio cannot be combined with --system-audio; the mic and system "
"streams can't share one file.",
suggestion="Record a single source (mic, file, URL, or - on stdin).",
)
record.validate_target(opts.save_audio)
save_audio, save_transcript = _resolve_save_targets(opts, sources)
if sources.from_file and not sources.from_stdin:
client.resolve_audio_source(sources.source, sample=sources.sample)
api_key = state.resolve_api_key()
Expand All @@ -345,7 +405,8 @@ def run_stream(opts: StreamOptions, state: AppState, *, json_mode: bool) -> None
llm_prompts=llm_prompts,
model=opts.model,
max_tokens=opts.max_tokens,
save_audio=opts.save_audio,
save_audio=save_audio,
save_transcript=save_transcript,
llm_interval=opts.llm_interval,
)
_dispatch(session, sources)
1 change: 1 addition & 0 deletions aai_cli/help_panels.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,4 @@
OPT_CAPTURE = "Audio Capture"
OPT_TURNS = "Turn Detection"
OPT_FEATURES = "Features"
OPT_SAVING = "Saving" # write the audio/transcript to disk: --save-audio/-transcript/-dir, --name
74 changes: 74 additions & 0 deletions aai_cli/streaming/naming.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
"""Assemble auto-named output paths for `assembly stream --save-dir`.

``--save-dir DIR`` turns filename assembly over to the CLI: the transcript and the
matching WAV land under ``DIR/YYYY-MM-DD/`` with a timestamped, slugged stem
(``YYYY-MM-DD-HHMMSS[-slug]``). That is the block a wrapper script otherwise hand-rolls
with ``date(1)`` plus a ``tr``/``sed`` slug — passing ``--name "<title>"`` feeds e.g. a
calendar event title straight in, so the caller never slugs anything itself.
"""

from __future__ import annotations

import re
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path

from aai_cli.core.errors import CLIError

# Each run of non-alphanumeric characters collapses to a single hyphen.
_NON_ALNUM = re.compile(r"[^a-z0-9]+")
# Cap the slug so a long title can't push the filename past a filesystem name limit.
_MAX_SLUG_LEN = 80


def slugify(title: str) -> str:
"""Fold ``title`` to a filename-safe slug.

Lowercased, with every run of non-alphanumeric characters replaced by a single
hyphen, capped at 80 characters, and no leading/trailing hyphen. Returns "" when
the title has no usable characters (e.g. "!!!"), so the caller drops the slug
suffix rather than emitting a bare-hyphen filename.
"""
slug = _NON_ALNUM.sub("-", title.lower()).strip("-")
return slug[:_MAX_SLUG_LEN].strip("-")


def _stem(now: datetime, name: str | None) -> str:
"""The filename stem: a sortable timestamp, plus the slugged name when one survives."""
stamp = now.strftime("%Y-%m-%d-%H%M%S")
slug = slugify(name) if name else ""
return f"{stamp}-{slug}" if slug else stamp


@dataclass(frozen=True)
class SavePaths:
"""The auto-assembled transcript path and its matching audio path (same stem)."""

transcript: Path
audio: Path


def resolve(save_dir: Path, name: str | None, *, now: datetime) -> SavePaths:
"""Build ``DIR/YYYY-MM-DD/<stem>.{txt,wav}`` for ``now`` and ``--name``.

The date bucket and the stem both carry the date so a transcript stays
self-describing if it is later moved out of its bucket. Path assembly only —
creating the directory is the caller's job (see ``ensure_dir``).
"""
bucket = save_dir / now.strftime("%Y-%m-%d")
stem = _stem(now, name)
return SavePaths(transcript=bucket / f"{stem}.txt", audio=bucket / f"{stem}.wav")


def ensure_dir(path: Path) -> None:
"""Create ``path`` (and parents) for the auto-named files, as a clean CLIError on failure."""
try:
path.mkdir(parents=True, exist_ok=True)
except OSError as exc:
raise CLIError(
f"Cannot create {path}: {exc}",
error_type="save_dir_path",
exit_code=2,
suggestion="Pass --save-dir under a writable location.",
) from exc
54 changes: 46 additions & 8 deletions aai_cli/streaming/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,29 @@
)
from aai_cli.streaming import record
from aai_cli.streaming.render import StreamRenderer, speaker_prefix
from aai_cli.streaming.transcript import TranscriptWriter
from aai_cli.ui import output
from aai_cli.ui.follow import FollowRenderer

# Sources that can be transcribed in parallel sessions: (label, audio chunks, sample rate).
_ParallelStreams = list[tuple[str, Iterable[bytes], int]]


def _finalized_turn_line(event: object, source_label: str | None) -> str | None:
"""The transcript line for a finalized, non-empty turn, or None for a partial/empty one.

Prefixes the speaker/source label exactly as the text renderer does, so the saved
file and the ``--llm`` transcript record both read like the on-screen turns.
"""
if not getattr(event, "end_of_turn", False):
return None # partials don't belong in the transcript
text = getattr(event, "transcript", "") or ""
if not text:
return None
prefix = speaker_prefix(source_label, getattr(event, "speaker_label", None))
return f"{prefix[0]}: {text}" if prefix is not None else text


@dataclass(frozen=True)
class SourceOptions:
"""Where the audio comes from, distilled from the CLI flags.
Expand Down Expand Up @@ -141,11 +157,17 @@ class StreamSession:
# When set, tee the streamed PCM to this path as a WAV (see record.tee_wav). Only
# the single-source path sets it — the parallel/batch callers reject --save-audio.
save_audio: Path | None = None
# When set, write each finalized turn to this path as plain text (see TranscriptWriter).
# Like save_audio, only the single-source path sets it; batch rejects --save-transcript.
save_transcript: Path | None = None
# Seconds between --llm summary refreshes; <=0 re-runs the chain on every turn.
llm_interval: float = 0.0
# Monotonic clock, injectable so the interval throttle is deterministic in tests.
clock: Callable[[], float] = time.monotonic
transcript: list[str] = field(default_factory=list[str])
# The open transcript-file writer for a single run; created/closed in _guarded so a
# save target is opened once per session and a Ctrl-C still leaves a flushed file.
_transcript_writer: TranscriptWriter | None = None
_callback_lock: threading.RLock = field(default_factory=threading.RLock)
_listening_lock: threading.Lock = field(default_factory=threading.Lock)
_listening_started: bool = False
Expand Down Expand Up @@ -176,23 +198,33 @@ def on_turn(self, event: object, *, source_label: str | None = None) -> None:
if self.follow is None:
with self._callback_lock:
self.renderer.turn(event, source=source_label)
line = _finalized_turn_line(event, source_label)
if line is not None:
self._save_line(line)
else:
# --llm mode locks only to record the turn; the chain re-runs (network) are
# left unlocked so the other source's turns keep flowing during a refresh.
self._record_turn(event, source_label)

def _save_line(self, line: str) -> None:
"""Append one finalized turn to the open --save-transcript/--save-dir file, if any.

Always called under ``_callback_lock`` so parallel source threads can't interleave
a partial write into the file.
"""
if self._transcript_writer is not None:
self._transcript_writer.write_turn(line)

def _record_turn(self, event: object, source_label: str | None) -> None:
"""Append a finalized turn to the running transcript, then refresh the --llm
answer if a refresh is due (every turn, or once per ``llm_interval`` seconds)."""
if not getattr(event, "end_of_turn", False):
return # partials don't change the transcript
text = getattr(event, "transcript", "") or ""
if not text:
"""Append a finalized turn to the running transcript (and the saved file), then
refresh the --llm answer if a refresh is due (every turn, or once per
``llm_interval`` seconds)."""
line = _finalized_turn_line(event, source_label)
if line is None:
return
prefix = speaker_prefix(source_label, getattr(event, "speaker_label", None))
line = f"{prefix[0]}: {text}" if prefix is not None else text
with self._callback_lock:
self.transcript.append(line)
self._save_line(line)
self._maybe_summarize()

def _maybe_summarize(self, *, final: bool = False) -> None:
Expand Down Expand Up @@ -287,6 +319,10 @@ def _guarded(self, work: Callable[[], None], *, handle_interrupt: bool = True) -
being swallowed here — the batch driver owns those signals across the whole
``--from-stdin`` sequence, so one Ctrl-C stops the batch rather than just
advancing to the next source."""
if self.save_transcript is not None:
# Open before streaming so a bad path fails up front; the finally closes it
# (flushing the turns so far) even on Ctrl-C or a worker error.
self._transcript_writer = TranscriptWriter(self.save_transcript)
try:
if self.follow is not None:
with self.follow:
Expand All @@ -311,6 +347,8 @@ def _guarded(self, work: Callable[[], None], *, handle_interrupt: bool = True) -
# Downstream consumer (e.g. `| head`) closed the pipe; stop quietly.
raise typer.Exit(code=0) from None
finally:
if self._transcript_writer is not None:
self._transcript_writer.close()
if self.follow is None:
self.renderer.close()

Expand Down
Loading
Loading