diff --git a/.importlinter b/.importlinter index a25a96b7..a463f903 100644 --- a/.importlinter +++ b/.importlinter @@ -11,6 +11,8 @@ source_modules = aai_cli.argscan aai_cli.auth aai_cli.client + aai_cli.clip_exec + aai_cli.clip_select aai_cli.code_gen aai_cli.coding_agent aai_cli.config @@ -53,6 +55,7 @@ modules = aai_cli.commands.account aai_cli.commands.agent aai_cli.commands.audit + aai_cli.commands.clip aai_cli.commands.deploy aai_cli.commands.dev aai_cli.commands.doctor @@ -76,6 +79,7 @@ type = forbidden source_modules = aai_cli.argscan aai_cli.client + aai_cli.clip_select aai_cli.config aai_cli.config_builder aai_cli.environments diff --git a/AGENTS.md b/AGENTS.md index 812f327a..291415f5 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -162,9 +162,9 @@ A Typer CLI. `aai_cli/main.py` builds the `app`, registers each command sub-app, ### Command layer -Each file in `aai_cli/commands/` is a Typer sub-app (`transcribe`, `stream`, `agent`, `speak`, `llm`, `transcripts`, `login` (login/logout/whoami), `doctor`, `init`, `dev`, `share`, `deploy`, `setup`, `onboard`, `account` (balance/usage/limits), `keys`, `sessions`, `audit`, `telemetry` (status/enable/disable), `webhooks` (listen)). Command bodies run through `context.run_command(ctx, fn, json=...)`, which maps any `CLIError` to clean stderr output + the error's exit code. Commands never print tracebacks for expected failures. +Each file in `aai_cli/commands/` is a Typer sub-app (`transcribe`, `stream`, `agent`, `speak`, `llm`, `clip`, `transcripts`, `login` (login/logout/whoami), `doctor`, `init`, `dev`, `share`, `deploy`, `setup`, `onboard`, `account` (balance/usage/limits), `keys`, `sessions`, `audit`, `telemetry` (status/enable/disable), `webhooks` (listen)). Command bodies run through `context.run_command(ctx, fn, json=...)`, which maps any `CLIError` to clean stderr output + the error's exit code. Commands never print tracebacks for expected failures. -**Options/run split for flag-heavy commands** (gh-CLI style): the Typer function only parses argv into a frozen `Options` dataclass and hands it to a module-level `run_(opts, state, *, json_mode)` through a thin lambda adapter in `run_command(ctx, ..., json=...)`. The five run commands follow it — `aai_cli/stream_exec.py` (the reference implementation), `transcribe_exec.py`, `agent_exec.py`, `speak_exec.py`, `llm_exec.py`. Because the run path is a plain function of data, tests construct options directly (`dataclasses.replace` off a defaults instance, see `tests/test_stream_exec.py` and `tests/test_command_options_seam.py`) instead of round-tripping argv through `CliRunner` — which is also the cheap way to kill mutation-gate mutants on orchestration lines. Follow this for new or heavily-reworked commands with long bodies; small commands keep the inline `body()` closure — the dataclass is pure ceremony there. +**Options/run split for flag-heavy commands** (gh-CLI style): the Typer function only parses argv into a frozen `Options` dataclass and hands it to a module-level `run_(opts, state, *, json_mode)` through a thin lambda adapter in `run_command(ctx, ..., json=...)`. The six run commands follow it — `aai_cli/stream_exec.py` (the reference implementation), `transcribe_exec.py`, `agent_exec.py`, `speak_exec.py`, `llm_exec.py`, `clip_exec.py`. Because the run path is a plain function of data, tests construct options directly (`dataclasses.replace` off a defaults instance, see `tests/test_stream_exec.py` and `tests/test_command_options_seam.py`) instead of round-tripping argv through `CliRunner` — which is also the cheap way to kill mutation-gate mutants on orchestration lines. Follow this for new or heavily-reworked commands with long bodies; small commands keep the inline `body()` closure — the dataclass is pure ceremony there. ### Cross-cutting state (resolution order matters) diff --git a/README.md b/README.md index 9279a73f..b7a0f0a5 100644 --- a/README.md +++ b/README.md @@ -166,6 +166,7 @@ assembly init # scaffold a starter app - **Real-time streaming**: `assembly stream` transcribes the microphone, a file, or a URL live — on macOS it can capture system audio too. - **Voice agent**: `assembly agent` runs a full-duplex spoken conversation in your terminal. - **LLM Gateway**: `assembly llm` prompts an LLM over a transcript, stdin, or a live stream (`assembly stream --llm "summarize as I talk"`). +- **Transcript-driven clipping**: `assembly clip` cuts an audio/video file (or a YouTube/podcast URL) with ffmpeg by diarized speaker (`--speaker A`), text match (`--search "pricing"`), LLM pick (`--llm "the three best moments"`), or explicit time range (`--range 1:30-2:45`) — transcribing on the fly, reusing a finished transcript with `-t ID`, or reading one from a pipe (`assembly transcribe x.mp4 --speaker-labels --json | assembly clip x.mp4 -t - --llm "…"`). - **Model evaluation**: `assembly eval` transcribes a Hugging Face dataset (with built-in aliases for common benchmarks: `assembly eval tedlium`) or a local `.csv`/`.jsonl` manifest and scores WER against its references — handy for picking a speech model. - **Starter apps**: `assembly init` scaffolds a self-contained FastAPI + HTML app (`audio-transcription`, `live-captions`, `voice-agent`); `assembly dev` runs it, `assembly share` exposes it on a public URL, and `assembly deploy` ships it to Vercel, Railway, or Fly.io. - **Webhook testing**: `assembly webhooks listen` opens a public dev URL (cloudflared quick tunnel) that prints webhook deliveries as they arrive and can forward them to your local app with `--forward-to`. diff --git a/aai_cli/clip_exec.py b/aai_cli/clip_exec.py new file mode 100644 index 00000000..090f2c5d --- /dev/null +++ b/aai_cli/clip_exec.py @@ -0,0 +1,369 @@ +"""Run logic for `assembly clip`: cut a media file by transcript content. + +The command module (aai_cli/commands/clip.py) only parses argv — it builds a +``ClipOptions`` and hands it to ``run_clip`` via ``context.run_command`` (the +options/run split, see AGENTS.md), so tests drive transcript resolution and the +ffmpeg orchestration by constructing options directly. The pure selection logic +(range parsing, utterance filtering, LLM reply parsing, merging) lives in +``clip_select``. + +Selection composes four sources: ``--speaker`` and ``--search`` filter the +diarized utterances of a transcript (made on the fly, reused via +``--transcript-id``, or piped on stdin with ``-t -``), ``--llm`` hands the +timestamped utterances to the LLM Gateway and lets the model pick the windows, +and ``--range`` adds explicit ones. The selected segments are padded, merged +where they touch, and each surviving segment is re-encoded into its own file +with ffmpeg. +""" + +from __future__ import annotations + +import json +import shutil +import subprocess +import tempfile +from dataclasses import dataclass +from pathlib import Path +from types import SimpleNamespace + +import assemblyai as aai +from rich.markup import escape + +from aai_cli import client, clip_select, jsonshape, llm, output, stdio, youtube +from aai_cli.clip_select import Segment +from aai_cli.context import AppState +from aai_cli.errors import CLIError, UsageError + + +@dataclass(frozen=True) +class ClipOptions: + """Every `assembly clip` flag as plain data (``--json`` excluded: run_command + resolves it into the ``json_mode`` argument).""" + + # The raw source as typed: a local path, or a downloadable media-page URL + # (a pathlib.Path would collapse the "//" in "https://"). + media: str + transcript_id: str | None + speakers: list[str] + search: str | None + llm_prompt: str | None + model: str + max_tokens: int + ranges: list[str] + padding: float + out_dir: Path | None + + +def _llm_segments( + api_key: str, + utterances: list[object], + opts: ClipOptions, + *, + json_mode: bool, + quiet: bool, +) -> list[Segment]: + """Ask the LLM Gateway to pick the windows matching the --llm instruction.""" + selecting = f"Selecting segments with {opts.model}…" + with output.status(selecting, json_mode=json_mode, quiet=quiet): + reply = llm.transform_transcript( + api_key, + prompt=f"{clip_select.LLM_INSTRUCTIONS}{opts.llm_prompt}", + transcript_text=clip_select.utterance_listing(utterances), + model=opts.model, + max_tokens=opts.max_tokens, + ) + return clip_select.parse_llm_segments(reply) + + +def _needs_transcript(opts: ClipOptions) -> bool: + """Whether any requested selector reads the transcript (vs pure --range).""" + return bool(opts.speakers) or opts.search is not None or opts.llm_prompt is not None + + +@dataclass(frozen=True) +class _PipedTranscript: + """A transcript reconstructed from ``-t -`` stdin JSON (no API round-trip).""" + + id: str + utterances: list[object] + + +_PIPE_SUGGESTION = ( + "Pipe a transcript into clip: " + "assembly transcribe --speaker-labels --json | assembly clip -t - …" +) + + +def _stdin_transcript_text() -> str: + text = stdio.piped_stdin_text() + if text is None: + raise UsageError( + "-t - expects a transcript id or transcript JSON on stdin.", + suggestion=_PIPE_SUGGESTION, + ) + return text.strip() + + +def _piped_transcript(text: str) -> _PipedTranscript: + """The transcript object encoded in piped ``--json`` output.""" + try: + loaded: object = json.loads(text) + except json.JSONDecodeError as exc: + raise UsageError( + f"Couldn't parse the transcript JSON on stdin: {exc}.", + suggestion=_PIPE_SUGGESTION, + ) from exc + payload = jsonshape.as_mapping(loaded) or {} + utterances: list[object] = [ + SimpleNamespace( + start=item.get("start"), + end=item.get("end"), + speaker=item.get("speaker"), + text=item.get("text"), + ) + for item in jsonshape.mapping_list(payload.get("utterances")) + ] + return _PipedTranscript(id=str(payload.get("id")), utterances=utterances) + + +def _resolve_transcript( + opts: ClipOptions, media: Path, state: AppState, *, json_mode: bool +) -> object: + """The transcript backing --speaker/--search/--llm: piped on stdin (``-t -``), + fetched by id, or made fresh from the (already local) media file — always + diarized, since speaker labels are what clip selects on.""" + transcript_id = opts.transcript_id + if transcript_id == "-": + text = _stdin_transcript_text() + if text.startswith("{"): + return _piped_transcript(text) + transcript_id = text # a bare id (e.g. from `assembly transcribe … -o id`) + if transcript_id is not None: + return client.get_transcript(state.resolve_api_key(), transcript_id) + config = aai.TranscriptionConfig(speaker_labels=True) + api_key = state.resolve_api_key() + with output.status("Transcribing for clip selection…", json_mode=json_mode, quiet=state.quiet): + return client.transcribe(api_key, str(media), config=config) + + +def _transcript_segments( + opts: ClipOptions, media: Path, state: AppState, *, json_mode: bool +) -> tuple[list[Segment], str | None]: + """Matched utterance segments plus the transcript id, or ``([], None)`` when + no transcript-backed selector was requested. + + --speaker/--search narrow the utterances first; --llm then picks windows + from whatever survived (or from the whole transcript when unfiltered). + """ + if not _needs_transcript(opts): + return [], None + transcript = _resolve_transcript(opts, media, state, json_mode=json_mode) + transcript_id = str(getattr(transcript, "id", "")) + utterances = jsonshape.object_list(getattr(transcript, "utterances", None)) + if not utterances: + raise CLIError( + f"Transcript {transcript_id} has no utterances to select from.", + error_type="no_utterances", + exit_code=2, + suggestion=( + "--speaker/--search/--llm need a diarized transcript. Pass a --transcript-id " + "created with --speaker-labels, or drop -t to let clip transcribe the file." + ), + ) + matched = clip_select.matching_utterances(utterances, opts.speakers, opts.search) + if not matched: + raise CLIError( + "No transcript segments matched the selection.", + error_type="no_match", + suggestion=( + "Inspect who said what with 'assembly transcribe --speaker-labels " + "-o utterances', then adjust --speaker/--search." + ), + ) + if opts.llm_prompt is not None: + segments = _llm_segments( + state.resolve_api_key(), matched, opts, json_mode=json_mode, quiet=state.quiet + ) + return segments, transcript_id + return [clip_select.segment_of(utterance) for utterance in matched], transcript_id + + +def _validate_media(media: Path) -> None: + """Reject a missing local source before credential resolution, so a typo'd + path reads as "file not found", never as a login prompt or an opaque + ffmpeg error.""" + if not media.exists(): + raise CLIError( + f"File not found: {media}", + error_type="file_not_found", + exit_code=2, + suggestion="Check the path. assembly clip needs a local audio/video file.", + ) + if not media.is_file(): + raise CLIError( + f"Not a file: {media}", + error_type="not_a_file", + exit_code=2, + suggestion="Pass a media file, not a directory.", + ) + + +def _validate_out_dir(out_dir: Path | None) -> None: + if out_dir is not None and not out_dir.is_dir(): + raise UsageError( + f"--out-dir doesn't exist: {out_dir}", + suggestion="Create it first, or point --out-dir at an existing directory.", + ) + + +def _validate_selection(opts: ClipOptions) -> None: + if _needs_transcript(opts): + return + if not opts.ranges: + raise UsageError( + "Nothing selects a segment to clip.", + suggestion="Pass --speaker, --search, --llm, and/or --range.", + ) + if opts.transcript_id is not None: + # -t feeds the transcript-backed selectors; with only --range it would be + # ignored, and a requested flag is never dropped silently. + raise UsageError( + "--transcript-id only applies with --speaker/--search/--llm.", + suggestion="Add a --speaker/--search/--llm selector, or drop --transcript-id.", + ) + + +def _require_ffmpeg() -> str: + """The ffmpeg executable; checked before any (billed) transcription work.""" + path = shutil.which("ffmpeg") + if path is None: + raise CLIError( + "ffmpeg is required to cut media, but it isn't on PATH.", + error_type="missing_dependency", + suggestion="Install it (brew install ffmpeg / apt install ffmpeg) and re-run.", + ) + return path + + +def _run_ffmpeg(args: list[str]) -> subprocess.CompletedProcess[str]: + """Boundary seam for tests: one ffmpeg invocation, output captured.""" + return subprocess.run(args, capture_output=True, text=True, check=False) + + +def _cut_clip(ffmpeg: str, media: Path, segment: Segment, dest: Path) -> None: + """Re-encode one segment of ``media`` into ``dest``. + + Re-encoding (no ``-c copy``) keeps cuts frame-accurate where stream copy + would snap to the nearest keyframe; ``-y`` makes a re-run overwrite its own + earlier output instead of stalling on ffmpeg's prompt. + """ + result = _run_ffmpeg( + [ + ffmpeg, + "-hide_banner", + "-loglevel", + "error", + "-y", + "-i", + str(media), + "-ss", + f"{segment.start:.3f}", + "-to", + f"{segment.end:.3f}", + str(dest), + ] + ) + if result.returncode != 0: + detail = result.stderr.strip().splitlines() + reason = detail[-1] if detail else f"ffmpeg exited with code {result.returncode}" + raise CLIError( + f"Could not cut {dest.name}: {reason}", + error_type="clip_failed", + suggestion="Check that the input is a readable audio/video file.", + ) + + +def _clip_dest(media: Path, out_dir: Path | None, index: int) -> Path: + directory = out_dir if out_dir is not None else media.parent + return directory / f"{media.stem}.clip{index:02d}{media.suffix}" + + +@dataclass(frozen=True) +class WrittenClip: + """One output file and the source window it was cut from.""" + + path: Path + segment: Segment + + def payload(self) -> dict[str, object]: + return { + "path": str(self.path), + "start": round(self.segment.start, 3), + "end": round(self.segment.end, 3), + "duration": round(self.segment.end - self.segment.start, 3), + } + + def human_line(self) -> str: + start = clip_select.format_clock(self.segment.start) + end = clip_select.format_clock(self.segment.end) + duration = round(self.segment.end - self.segment.start, 3) + return output.success(f"{escape(str(self.path))} {start} - {end} ({duration}s)") + + +def run_clip(opts: ClipOptions, state: AppState, *, json_mode: bool) -> None: + """Execute one `assembly clip` invocation from already-parsed flags.""" + _validate_out_dir(opts.out_dir) + _validate_selection(opts) + explicit = [clip_select.parse_range(value) for value in opts.ranges] + ffmpeg = _require_ffmpeg() + if youtube.is_downloadable_url(opts.media): + # A media-page URL (YouTube, podcast page, …) is downloaded once and + # clipped locally. The download dir is temporary, so the clips land in + # --out-dir or the current directory — never next to the temp file. + with tempfile.TemporaryDirectory(prefix="aai-clip-") as td: + with output.status("Downloading audio…", json_mode=json_mode, quiet=state.quiet): + local = youtube.download_audio(opts.media, Path(td)) + out_dir = opts.out_dir if opts.out_dir is not None else Path.cwd() + _cut_and_emit(opts, local, out_dir, explicit, ffmpeg, state, json_mode=json_mode) + return + if opts.media.startswith(("http://", "https://")): + raise UsageError( + "assembly clip can't fetch this URL; it cuts a local file or a " + "media-page URL yt-dlp can download (YouTube, podcasts, …).", + suggestion="Download the media first, then clip the local copy.", + ) + media = Path(opts.media) + _validate_media(media) + _cut_and_emit(opts, media, opts.out_dir, explicit, ffmpeg, state, json_mode=json_mode) + + +def _cut_and_emit( + opts: ClipOptions, + media: Path, + out_dir: Path | None, + explicit: list[Segment], + ffmpeg: str, + state: AppState, + *, + json_mode: bool, +) -> None: + """Select, cut, and report the clips for an already-local media file.""" + matched, transcript_id = _transcript_segments(opts, media, state, json_mode=json_mode) + segments = clip_select.merge_segments([*matched, *explicit], opts.padding) + written: list[WrittenClip] = [] + cutting = f"Cutting {len(segments)} clip(s)…" + with output.status(cutting, json_mode=json_mode, quiet=state.quiet): + for index, segment in enumerate(segments, 1): + dest = _clip_dest(media, out_dir, index) + _cut_clip(ffmpeg, media, segment, dest) + written.append(WrittenClip(path=dest, segment=segment)) + payload: dict[str, object] = { + "source": opts.media, + "transcript_id": transcript_id, + "clips": [clip.payload() for clip in written], + } + output.emit( + payload, + lambda _: "\n".join(clip.human_line() for clip in written), + json_mode=json_mode, + ) diff --git a/aai_cli/clip_select.py b/aai_cli/clip_select.py new file mode 100644 index 00000000..ab1d4b1e --- /dev/null +++ b/aai_cli/clip_select.py @@ -0,0 +1,198 @@ +"""Segment selection for `assembly clip`: pure logic, no I/O. + +Everything here turns user selectors into :class:`Segment` lists — parsing +``--range`` values, filtering diarized utterances for ``--speaker``/``--search``, +rendering the timestamped listing an ``--llm`` model selects from, parsing the +model's reply, and merging the combined selection. The orchestration (transcript +fetch, LLM call, ffmpeg) lives in ``clip_exec``. +""" + +from __future__ import annotations + +import json +import math +from dataclasses import dataclass + +from aai_cli import jsonshape +from aai_cli.errors import CLIError, UsageError + +_RANGE_FORMAT = "START-END, each end as seconds or [HH:]MM:SS (e.g. 90-120 or 1:30-2:00)" +_MAX_CLOCK_FIELDS = 3 # [HH:]MM:SS — anything longer than three colon fields is a typo + + +@dataclass(frozen=True) +class Segment: + """A time window within the source media, in seconds.""" + + start: float + end: float + + +def _bad_range(flag_value: str) -> UsageError: + return UsageError( + f"Invalid --range {flag_value!r}.", + suggestion=f"Use {_RANGE_FORMAT}.", + ) + + +def _parse_point(token: str, flag_value: str) -> float: + """Seconds for one ``--range`` endpoint: bare seconds or colon-separated clock time.""" + parts = token.strip().split(":") + try: + values = [float(part) for part in parts] + except ValueError: + raise _bad_range(flag_value) from None + if len(values) > _MAX_CLOCK_FIELDS or any(not math.isfinite(value) for value in values): + raise _bad_range(flag_value) + seconds = 0.0 + for value in values: + seconds = seconds * 60 + value + return seconds + + +def parse_range(flag_value: str) -> Segment: + """The :class:`Segment` for one ``--range START-END`` flag value. + + Negative endpoints can't be expressed (``-`` is the separator), so the only + validations are shape, finiteness, and end-after-start. + """ + head, sep, tail = flag_value.partition("-") + if not sep: + raise _bad_range(flag_value) + segment = Segment(_parse_point(head, flag_value), _parse_point(tail, flag_value)) + if segment.end <= segment.start: + raise UsageError( + f"--range end must be after its start: {flag_value!r}.", + suggestion=f"Use {_RANGE_FORMAT}.", + ) + return segment + + +def merge_segments(segments: list[Segment], padding: float) -> list[Segment]: + """Padded segments, sorted and coalesced where they touch or overlap. + + Padding widens each segment on both sides (clamped at 0); overlapping or + back-to-back selections fold into one clip so a speaker's consecutive + utterances don't shatter into per-sentence files. + """ + padded = sorted( + (Segment(max(0.0, seg.start - padding), seg.end + padding) for seg in segments), + key=lambda seg: seg.start, + ) + merged: list[Segment] = [] + for seg in padded: + if merged and seg.start <= merged[-1].end: + merged[-1] = Segment(merged[-1].start, max(merged[-1].end, seg.end)) + else: + merged.append(seg) + return merged + + +def matching_utterances( + utterances: list[object], speakers: list[str], search: str | None +) -> list[object]: + """The utterances passing the ``--speaker``/``--search`` filters. + + Both filters are case-insensitive and combine with AND; an unset filter + passes everything. + """ + wanted = {speaker.upper() for speaker in speakers} + needle = search.lower() if search is not None else None + matched: list[object] = [] + for utterance in utterances: + speaker = str(getattr(utterance, "speaker", "") or "") + text = str(getattr(utterance, "text", "") or "") + if wanted and speaker.upper() not in wanted: + continue + if needle is not None and needle not in text.lower(): + continue + matched.append(utterance) + return matched + + +def segment_of(utterance: object) -> Segment: + """The utterance's time window in seconds (the API reports milliseconds).""" + start_ms = jsonshape.as_float(getattr(utterance, "start", None)) + end_ms = jsonshape.as_float(getattr(utterance, "end", None)) + return Segment(start_ms / 1000.0, end_ms / 1000.0) + + +def utterance_listing(utterances: list[object]) -> str: + """The timestamped transcript view the LLM selects from, one utterance per line.""" + lines: list[str] = [] + for utterance in utterances: + seg = segment_of(utterance) + speaker = str(getattr(utterance, "speaker", "") or "") + text = str(getattr(utterance, "text", "") or "") + lines.append(f"[{seg.start:.3f}-{seg.end:.3f}] {speaker}: {text}") + return "\n".join(lines) + + +# Prefixed to the user's --llm instruction; the reply contract ("only a JSON +# array") is what parse_llm_segments depends on. +LLM_INSTRUCTIONS = ( + "Select the time ranges to cut from the timestamped transcript below. " + 'Reply with only a JSON array like [{"start": 12.5, "end": 30.0}] — ' + "start/end in seconds within the transcript, no prose, no code fences. " + "Selection instruction: " +) + + +def _llm_range_items(reply: str) -> list[dict[str, object]] | None: + """The JSON array of range objects in the model's reply, or None. + + Tolerates prose or code fences around the array by slicing from the first + ``[`` to the last ``]``; anything that doesn't decode to a list of objects + is a parse failure. + """ + try: + loaded: object = json.loads(reply[reply.find("[") : reply.rfind("]") + 1]) + except json.JSONDecodeError: + return None + return jsonshape.as_object_list(loaded) + + +def _segment_from_item(item: dict[str, object]) -> Segment | None: + """A Segment from one model-returned range object, or None for a malformed + entry (wrong types, non-finite, negative, or inverted) — one bad entry is + dropped rather than failing the whole selection.""" + start, end = item.get("start"), item.get("end") + if not isinstance(start, int | float) or not isinstance(end, int | float): + return None + segment = Segment(float(start), float(end)) + if not math.isfinite(segment.start) or not math.isfinite(segment.end): + return None + if segment.start < 0 or segment.end <= segment.start: + return None + return segment + + +def parse_llm_segments(reply: str) -> list[Segment]: + """The segments the model selected, parsed defensively from its reply.""" + items = _llm_range_items(reply) + if items is None: + raise CLIError( + "The model's reply could not be read as clip ranges.", + error_type="llm_parse_error", + suggestion=( + "Re-run, or rephrase --llm; the model must answer with a JSON array " + 'of {"start", "end"} seconds.' + ), + ) + segments = [seg for item in items if (seg := _segment_from_item(item)) is not None] + if not segments: + raise CLIError( + "The model selected no segments.", + error_type="no_match", + suggestion="Loosen the --llm instruction, or select with --speaker/--search/--range.", + ) + return segments + + +def format_clock(seconds: float) -> str: + """``M:SS.t`` (or ``H:MM:SS.t``) for the human view of a clip window.""" + minutes, secs = divmod(seconds, 60) + hours, minutes = divmod(int(minutes), 60) + if hours: + return f"{hours}:{minutes:02d}:{secs:04.1f}" + return f"{minutes}:{secs:04.1f}" diff --git a/aai_cli/commands/clip.py b/aai_cli/commands/clip.py new file mode 100644 index 00000000..7a5b3d52 --- /dev/null +++ b/aai_cli/commands/clip.py @@ -0,0 +1,128 @@ +from __future__ import annotations + +from pathlib import Path + +import typer + +from aai_cli import clip_exec, help_panels, llm, options +from aai_cli.context import run_command +from aai_cli.help_text import examples_epilog + +app = typer.Typer() + + +@app.command( + rich_help_panel=help_panels.TRANSCRIPTION, + epilog=examples_epilog( + [ + ("Clip everything speaker A says", "assembly clip meeting.mp4 --speaker A"), + ( + "Clip the sentences that mention a topic", + 'assembly clip call.mp3 --search "pricing"', + ), + ("Cut an explicit time range", "assembly clip talk.mp4 --range 1:30-2:45"), + ( + "Let an LLM pick the moments worth clipping", + 'assembly clip meeting.mp4 --llm "the three strongest customer objections"', + ), + ( + "Clip a YouTube video's audio with an LLM", + 'assembly clip "https://youtube.com/watch?v=ID" --llm "the best quote"', + ), + ( + "Reuse a finished transcript instead of re-transcribing", + "assembly clip meeting.mp4 -t TRANSCRIPT_ID --speaker B", + ), + ( + "Pipe transcribe straight into clip", + "assembly transcribe meeting.mp4 --speaker-labels --json" + ' | assembly clip meeting.mp4 -t - --llm "the funniest exchange"', + ), + ( + "Pad each clip and collect them in a directory", + "assembly clip meeting.mp4 --speaker A --padding 0.5 --out-dir clips", + ), + ] + ), +) +def clip( + ctx: typer.Context, + media: str = typer.Argument( + ..., + help="Audio/video to cut clips from: a local file, or a YouTube/media-page " + "URL (audio downloaded via yt-dlp).", + ), + transcript_id: str | None = typer.Option( + None, + "--transcript-id", + "-t", + help="Reuse an existing transcript of this media instead of transcribing it again: " + "an id, or '-' to read an id or 'transcribe --json' output from stdin.", + ), + speaker: list[str] = typer.Option( + [], + "--speaker", + help="Keep segments spoken by this diarized speaker label (repeatable, e.g. --speaker A).", + ), + search: str | None = typer.Option( + None, "--search", help="Keep segments whose text contains this (case-insensitive)." + ), + llm_prompt: str | None = typer.Option( + None, + "--llm", + help="Let an LLM Gateway model pick the windows to clip from the timestamped " + 'transcript (e.g. --llm "the funniest moments"). Composes with --speaker/--search.', + rich_help_panel=help_panels.OPT_LLM, + ), + model: str = typer.Option( + llm.DEFAULT_MODEL, + "--model", + help="LLM Gateway model for --llm.", + rich_help_panel=help_panels.OPT_LLM, + autocompletion=llm.complete_model, + ), + max_tokens: int = typer.Option( + llm.DEFAULT_MAX_TOKENS, + "--max-tokens", + help="Max tokens for the --llm selection reply.", + rich_help_panel=help_panels.OPT_LLM, + ), + ranges: list[str] = typer.Option( + [], + "--range", + help="Keep an explicit START-END window (seconds or [HH:]MM:SS; repeatable).", + ), + padding: float = typer.Option( + 0.0, "--padding", min=0.0, help="Seconds of padding to add around each clip." + ), + out_dir: Path | None = typer.Option( + None, "--out-dir", help="Directory for the clip files (default: next to the input)." + ), + json_out: bool = options.json_option("Emit JSON describing the clips written."), +) -> None: + """Cut clips out of a media file by speaker, text match, LLM pick, or time range. + + --speaker and --search select from a diarized transcript (made on the fly, + or reused with --transcript-id); --llm has an LLM Gateway model pick the + windows; --range adds explicit ones. Overlapping selections merge, and each + surviving segment is written as .clipNN using ffmpeg (which must + be installed). A YouTube/media-page source is downloaded first; its clips + land in --out-dir or the current directory. + """ + opts = clip_exec.ClipOptions( + media=media, + transcript_id=transcript_id, + speakers=speaker, + search=search, + llm_prompt=llm_prompt, + model=model, + max_tokens=max_tokens, + ranges=ranges, + padding=padding, + out_dir=out_dir, + ) + run_command( + ctx, + lambda state, json_mode: clip_exec.run_clip(opts, state, json_mode=json_mode), + json=json_out, + ) diff --git a/aai_cli/main.py b/aai_cli/main.py index 179a86ca..987cb7d2 100644 --- a/aai_cli/main.py +++ b/aai_cli/main.py @@ -24,6 +24,7 @@ account, agent, audit, + clip, deploy, dev, doctor, @@ -67,6 +68,7 @@ "agent", "speak", "llm", + "clip", "eval", "webhooks", # Setup & Tools — get set up & maintain @@ -391,6 +393,7 @@ def main( app.add_typer(agent.app) app.add_typer(speak.app) app.add_typer(llm.app) +app.add_typer(clip.app) app.add_typer(evaluate.app) # eval app.add_typer(account.app) # balance, usage, limits app.add_typer(login.app) # login, logout, whoami diff --git a/aai_cli/skills/aai-cli/references/transcription.md b/aai_cli/skills/aai-cli/references/transcription.md index 12ef2994..c16bbc65 100644 --- a/aai_cli/skills/aai-cli/references/transcription.md +++ b/aai_cli/skills/aai-cli/references/transcription.md @@ -1,8 +1,9 @@ # Transcription & AI -Four commands. All accept `--json` (auto-enabled when piped) and `-o/--output` -to print a single field. `transcribe`, `stream`, and `agent` accept -`--show-code` to print equivalent Python SDK code without calling the API. +Five commands. All accept `--json` (auto-enabled when piped); `transcribe`, +`stream`, `agent`, and `llm` accept `-o/--output` to print a single field. +`transcribe`, `stream`, and `agent` accept `--show-code` to print equivalent +Python SDK code without calling the API. ## `assembly transcribe [SOURCE]` — file / URL / YouTube / podcast page @@ -112,3 +113,35 @@ echo "meeting notes" | assembly llm "turn into action items" assembly stream -o text | assembly llm -f "summarize action items as I talk" assembly llm --list-models ``` + +## `assembly clip MEDIA` — cut a media file by transcript content + +Cuts clips out of an audio/video file with ffmpeg (must be installed). `MEDIA` +is a local file or a YouTube/media-page URL (audio downloaded via yt-dlp; the +clips then land in `--out-dir` or the current directory). `--speaker`/`--search` +select diarized utterances — the file is transcribed with speaker labels on the +fly, or pass `-t/--transcript-id` (an id, or `-` to read an id or +`transcribe --json` output from stdin). `--llm "instruction"` sends the +timestamped utterances to LLM Gateway and the model picks the windows. +`--range START-END` adds explicit windows (seconds or `[HH:]MM:SS`). +Overlapping selections merge; each surviving segment is written as +`.clipNN`. + +High-value flags: + +- Selection: `--speaker A` (repeatable), `--search "topic"` (case-insensitive), + `--llm "the best moments"` (composes with the filters), `--range 1:30-2:45` + (repeatable). +- LLM: `--model` (default `claude-haiku-4-5-20251001`), `--max-tokens N`. +- Shaping: `--padding 0.5` (seconds around each clip), `--out-dir clips/`. +- Output: `--json` (paths + start/end/duration of each clip written). + +Examples: + +```bash +assembly clip meeting.mp4 --speaker A +assembly clip call.mp3 --search "pricing" --padding 0.5 +assembly clip talk.mp4 --range 1:30-2:45 --range 10:00-10:30 +assembly clip "https://youtube.com/watch?v=ID" --llm "the strongest quote" +assembly transcribe meeting.mp4 --speaker-labels --json | assembly clip meeting.mp4 -t - --llm "the funniest exchange" +``` diff --git a/tests/__snapshots__/test_snapshots_help_run.ambr b/tests/__snapshots__/test_snapshots_help_run.ambr index 7cf3763d..92288607 100644 --- a/tests/__snapshots__/test_snapshots_help_run.ambr +++ b/tests/__snapshots__/test_snapshots_help_run.ambr @@ -65,6 +65,85 @@ + ''' +# --- +# name: test_command_help_matches_snapshot[clip] + ''' + + Usage: assembly clip [OPTIONS] MEDIA + + Cut clips out of a media file by speaker, text match, LLM pick, or time range. + + --speaker and --search select from a diarized transcript (made on the fly, + or reused with --transcript-id); --llm has an LLM Gateway model pick the + windows; --range adds explicit ones. Overlapping selections merge, and each + surviving segment is written as .clipNN using ffmpeg (which must + be installed). A YouTube/media-page source is downloaded first; its clips + land in --out-dir or the current directory. + + ╭─ Arguments ──────────────────────────────────────────────────────────────────╮ + │ * media TEXT Audio/video to cut clips from: a local file, or a │ + │ YouTube/media-page URL (audio downloaded via yt-dlp). │ + │ [required] │ + ╰──────────────────────────────────────────────────────────────────────────────╯ + ╭─ Options ────────────────────────────────────────────────────────────────────╮ + │ --transcript-id -t TEXT Reuse an existing transcript │ + │ of this media instead of │ + │ transcribing it again: an id, │ + │ or '-' to read an id or │ + │ 'transcribe --json' output │ + │ from stdin. │ + │ --speaker TEXT Keep segments spoken by this │ + │ diarized speaker label │ + │ (repeatable, e.g. --speaker │ + │ A). │ + │ --search TEXT Keep segments whose text │ + │ contains this │ + │ (case-insensitive). │ + │ --range TEXT Keep an explicit START-END │ + │ window (seconds or │ + │ [HH:]MM:SS; repeatable). │ + │ --padding FLOAT RANGE [x>=0.0] Seconds of padding to add │ + │ around each clip. │ + │ [default: 0.0] │ + │ --out-dir PATH Directory for the clip files │ + │ (default: next to the input). │ + │ --json -j Emit JSON describing the │ + │ clips written. │ + │ --help Show this message and exit. │ + ╰──────────────────────────────────────────────────────────────────────────────╯ + ╭─ LLM Transform ──────────────────────────────────────────────────────────────╮ + │ --llm TEXT Let an LLM Gateway model pick the windows to │ + │ clip from the timestamped transcript (e.g. │ + │ --llm "the funniest moments"). Composes with │ + │ --speaker/--search. │ + │ --model TEXT LLM Gateway model for --llm. │ + │ [default: claude-haiku-4-5-20251001] │ + │ --max-tokens INTEGER Max tokens for the --llm selection reply. │ + │ [default: 1000] │ + ╰──────────────────────────────────────────────────────────────────────────────╯ + + Examples + Clip everything speaker A says + $ assembly clip meeting.mp4 --speaker A + Clip the sentences that mention a topic + $ assembly clip call.mp3 --search "pricing" + Cut an explicit time range + $ assembly clip talk.mp4 --range 1:30-2:45 + Let an LLM pick the moments worth clipping + $ assembly clip meeting.mp4 --llm "the three strongest customer objections" + Clip a YouTube video's audio with an LLM + $ assembly clip "https://youtube.com/watch?v=ID" --llm "the best quote" + Reuse a finished transcript instead of re-transcribing + $ assembly clip meeting.mp4 -t TRANSCRIPT_ID --speaker B + Pipe transcribe straight into clip + $ assembly transcribe meeting.mp4 --speaker-labels --json | assembly clip + meeting.mp4 -t - --llm "the funniest exchange" + Pad each clip and collect them in a directory + $ assembly clip meeting.mp4 --speaker A --padding 0.5 --out-dir clips + + + ''' # --- # name: test_command_help_matches_snapshot[eval] diff --git a/tests/_clip_helpers.py b/tests/_clip_helpers.py new file mode 100644 index 00000000..8dcdba6f --- /dev/null +++ b/tests/_clip_helpers.py @@ -0,0 +1,67 @@ +"""Shared builders for the `assembly clip` test modules. + +The clip suite is split across test_clip_select.py (pure selection logic), +test_clip_exec.py (validation + cutting), test_clip_sources.py (YouTube, stdin +pipe, LLM selection), and test_clip_command.py (argv parsing); the option +defaults and transcript fakes they all share live here. +""" + +from __future__ import annotations + +import re +import subprocess +from types import SimpleNamespace + +import pytest + +from aai_cli import clip_exec, llm +from aai_cli.clip_exec import ClipOptions + +_ANSI_SGR = re.compile(r"\x1b\[[0-9;]*m") + +# The CLI's flag defaults, as data. Tests override per-case with dataclasses.replace. +DEFAULTS = ClipOptions( + media="meeting.mp4", + transcript_id=None, + speakers=[], + search=None, + llm_prompt=None, + model=llm.DEFAULT_MODEL, + max_tokens=llm.DEFAULT_MAX_TOKENS, + ranges=[], + padding=0.0, + out_dir=None, +) + + +def plain(text: str) -> str: + """Strip SGR color codes (CI forces color on) for substring assertions.""" + return _ANSI_SGR.sub("", text) + + +def utterance(start, end, speaker, text): + return SimpleNamespace(start=start, end=end, speaker=speaker, text=text) + + +UTTERANCES = [ + utterance(1500, 2500, "A", "Let's talk pricing today."), + utterance(3000, 4000, "B", "Sounds good."), + utterance(5000, 6000, "A", "Moving on to hiring."), +] + + +def fake_transcript(utterances): + return SimpleNamespace(id="tr_123", utterances=utterances) + + +def record_ffmpeg(monkeypatch: pytest.MonkeyPatch) -> list[list[str]]: + """Resolve ffmpeg and record every invocation, succeeding with no output.""" + monkeypatch.setattr("shutil.which", lambda name: f"/usr/bin/{name}") + calls: list[list[str]] = [] + + def run(args: list[str]) -> subprocess.CompletedProcess[str]: + calls.append(args) + return subprocess.CompletedProcess(args=args, returncode=0, stdout="", stderr="") + + monkeypatch.setattr(clip_exec, "_run_ffmpeg", run) + return calls diff --git a/tests/_snapshot_surface.py b/tests/_snapshot_surface.py index d28cd75c..a329c1c3 100644 --- a/tests/_snapshot_surface.py +++ b/tests/_snapshot_surface.py @@ -23,7 +23,7 @@ # ``tests/test_snapshots_help_.py`` module suffixes. HELP_GROUPS: dict[str, frozenset[str]] = { "build": frozenset({"onboard", "init", "dev", "share", "deploy"}), - "run": frozenset({"transcribe", "stream", "agent", "speak", "llm", "eval", "webhooks"}), + "run": frozenset({"transcribe", "stream", "agent", "speak", "llm", "clip", "eval", "webhooks"}), "tools": frozenset({"doctor", "setup", "telemetry", "_update-check"}), "history": frozenset({"transcripts", "sessions"}), "account": frozenset( diff --git a/tests/test_clip_command.py b/tests/test_clip_command.py new file mode 100644 index 00000000..9c7d0454 --- /dev/null +++ b/tests/test_clip_command.py @@ -0,0 +1,158 @@ +"""CLI-level tests for `assembly clip`: argv → ClipOptions parsing, error rendering, +and the command's placement in the root help.""" + +from __future__ import annotations + +import json +import re +import subprocess + +from typer.testing import CliRunner + +from aai_cli import clip_exec, llm +from aai_cli.clip_exec import ClipOptions +from aai_cli.main import app + +runner = CliRunner() + +_ANSI_SGR = re.compile(r"\x1b\[[0-9;]*m") + + +def _plain(text: str) -> str: + return _ANSI_SGR.sub("", text) + + +def _capture_run_clip(monkeypatch): + captured = {} + + def fake_run_clip(opts, state, *, json_mode): + captured["opts"] = opts + captured["json_mode"] = json_mode + + monkeypatch.setattr(clip_exec, "run_clip", fake_run_clip) + return captured + + +def test_clip_parses_every_flag_into_options(monkeypatch, tmp_path): + captured = _capture_run_clip(monkeypatch) + result = runner.invoke( + app, + [ + "clip", + "meeting.mp4", + "-t", + "tr_abc", + "--speaker", + "A", + "--speaker", + "B", + "--search", + "pricing", + "--llm", + "best moments", + "--model", + "gpt-5", + "--max-tokens", + "64", + "--range", + "5-10", + "--range", + "1:30-2:00", + "--padding", + "0.5", + "--out-dir", + str(tmp_path), + "--json", + ], + ) + assert result.exit_code == 0, result.output + assert captured["opts"] == ClipOptions( + media="meeting.mp4", + transcript_id="tr_abc", + speakers=["A", "B"], + search="pricing", + llm_prompt="best moments", + model="gpt-5", + max_tokens=64, + ranges=["5-10", "1:30-2:00"], + padding=0.5, + out_dir=tmp_path, + ) + assert captured["json_mode"] is True + + +def test_clip_defaults_when_only_media_is_given(monkeypatch): + captured = _capture_run_clip(monkeypatch) + result = runner.invoke(app, ["clip", "meeting.mp4"]) + assert result.exit_code == 0, result.output + assert captured["opts"] == ClipOptions( + media="meeting.mp4", + transcript_id=None, + speakers=[], + search=None, + llm_prompt=None, + model=llm.DEFAULT_MODEL, + max_tokens=llm.DEFAULT_MAX_TOKENS, + ranges=[], + padding=0.0, + out_dir=None, + ) + assert captured["json_mode"] is False + + +def test_clip_requires_the_media_argument(): + result = runner.invoke(app, ["clip"]) + assert result.exit_code == 2 + + +def test_clip_rejects_negative_padding(): + result = runner.invoke(app, ["clip", "meeting.mp4", "--padding", "-1"]) + assert result.exit_code == 2 + + +def test_clip_missing_file_renders_clean_error(tmp_path): + result = runner.invoke(app, ["clip", str(tmp_path / "nope.mp4"), "--range", "1-2"]) + assert result.exit_code == 2 + plain = _plain(result.output) + assert "File not found" in plain + assert "Traceback" not in plain + + +def test_clip_json_error_shape(tmp_path): + result = runner.invoke(app, ["clip", str(tmp_path / "nope.mp4"), "--range", "1-2", "--json"]) + assert result.exit_code == 2 + err = json.loads(_plain(result.output).strip()) + assert err["error"]["type"] == "file_not_found" + + +def test_clip_end_to_end_range_cut_via_cli(tmp_path, monkeypatch): + media = tmp_path / "talk.mp3" + media.write_bytes(b"\x00") + monkeypatch.setattr("shutil.which", lambda name: "/usr/bin/ffmpeg") + calls: list[list[str]] = [] + + def fake_run(args: list[str]) -> subprocess.CompletedProcess[str]: + calls.append(args) + return subprocess.CompletedProcess(args=args, returncode=0, stdout="", stderr="") + + monkeypatch.setattr(clip_exec, "_run_ffmpeg", fake_run) + result = runner.invoke(app, ["clip", str(media), "--range", "1-2", "--json"]) + assert result.exit_code == 0, result.output + assert calls[0][-1] == str(tmp_path / "talk.clip01.mp3") + payload = json.loads(result.output.strip().splitlines()[-1]) + assert payload["clips"][0]["duration"] == 1.0 + + +def test_clip_is_listed_between_llm_and_eval_in_root_help(): + # Pins clip's slot in _COMMAND_ORDER: it renders in the "Run AssemblyAI" + # panel after llm, not alphabetically at the end of the help. + result = runner.invoke(app, ["--help"]) + assert result.exit_code == 0 + plain = _plain(result.output) + + def row(name: str) -> int: + match = re.search(rf"^[│|\s]*{name}\s", plain, flags=re.MULTILINE) + assert match is not None, f"{name} not in root help" + return match.start() + + assert row("llm") < row("clip") < row("eval") diff --git a/tests/test_clip_exec.py b/tests/test_clip_exec.py new file mode 100644 index 00000000..0e399495 --- /dev/null +++ b/tests/test_clip_exec.py @@ -0,0 +1,362 @@ +"""Direct tests of the `assembly clip` options/run seam (aai_cli/clip_exec.py): +validation, ffmpeg orchestration, and transcript-backed --speaker/--search +selection. Constructed-options tests (dataclasses.replace off the shared +defaults) avoid any argv round-trip; the ffmpeg boundary is faked at +`clip_exec._run_ffmpeg`. The pure selection logic is covered in +test_clip_select.py; YouTube/stdin/LLM sources in test_clip_sources.py.""" + +from __future__ import annotations + +import contextlib +import dataclasses +import json +import subprocess +import sys +from pathlib import Path + +import pytest + +from aai_cli import clip_exec, config +from aai_cli.clip_select import Segment +from aai_cli.context import AppState +from aai_cli.errors import CLIError, UsageError +from tests._clip_helpers import ( + DEFAULTS, + UTTERANCES, + fake_transcript, + plain, + record_ffmpeg, + utterance, +) + + +@pytest.fixture +def media(tmp_path: Path) -> Path: + path = tmp_path / "meeting.mp4" + path.write_bytes(b"\x00fake-media") + return path + + +@pytest.fixture +def fake_ffmpeg(monkeypatch): + return record_ffmpeg(monkeypatch) + + +def test_options_are_immutable(): + field_name = dataclasses.fields(DEFAULTS)[0].name + with pytest.raises(dataclasses.FrozenInstanceError): + setattr(DEFAULTS, field_name, None) + + +@pytest.mark.parametrize( + "instance", + [ + clip_exec.WrittenClip(path=Path("x.mp4"), segment=Segment(0.0, 1.0)), + clip_exec._PipedTranscript(id="tr_1", utterances=[]), + ], + ids=["written_clip", "piped_transcript"], +) +def test_result_records_are_immutable(instance): + field_name = dataclasses.fields(instance)[0].name + with pytest.raises(dataclasses.FrozenInstanceError): + setattr(instance, field_name, None) + + +# --- validation errors ------------------------------------------------------- + + +def test_run_clip_rejects_missing_file(tmp_path): + opts = dataclasses.replace(DEFAULTS, media=str(tmp_path / "nope.mp4"), ranges=["1-2"]) + with pytest.raises(CLIError) as exc: + clip_exec.run_clip(opts, AppState(), json_mode=False) + assert exc.value.error_type == "file_not_found" + assert exc.value.exit_code == 2 + assert "File not found" in exc.value.message + assert "local audio/video file" in (exc.value.suggestion or "") + + +def test_run_clip_rejects_directory(tmp_path): + opts = dataclasses.replace(DEFAULTS, media=str(tmp_path), ranges=["1-2"]) + with pytest.raises(CLIError) as exc: + clip_exec.run_clip(opts, AppState(), json_mode=False) + assert exc.value.error_type == "not_a_file" + assert exc.value.exit_code == 2 + assert "Not a file" in exc.value.message + assert "not a directory" in (exc.value.suggestion or "") + + +def test_run_clip_rejects_non_downloadable_url(): + opts = dataclasses.replace(DEFAULTS, media="https://x.test/a.mp4", ranges=["1-2"]) + with pytest.raises(UsageError) as exc: + clip_exec.run_clip(opts, AppState(), json_mode=False) + assert "can't fetch this URL" in exc.value.message + assert "Download the media first" in (exc.value.suggestion or "") + + +def test_run_clip_requires_a_selector(media): + with pytest.raises(UsageError) as exc: + clip_exec.run_clip( + dataclasses.replace(DEFAULTS, media=str(media)), AppState(), json_mode=False + ) + assert "Nothing selects a segment" in exc.value.message + assert "--range" in (exc.value.suggestion or "") + + +def test_run_clip_rejects_transcript_id_without_a_transcript_selector(media): + opts = dataclasses.replace(DEFAULTS, media=str(media), transcript_id="tr_1", ranges=["1-2"]) + with pytest.raises(UsageError) as exc: + clip_exec.run_clip(opts, AppState(), json_mode=False) + assert "--transcript-id only applies with --speaker/--search" in exc.value.message + assert "drop --transcript-id" in (exc.value.suggestion or "") + + +def test_run_clip_rejects_missing_out_dir(media, tmp_path): + opts = dataclasses.replace( + DEFAULTS, media=str(media), ranges=["1-2"], out_dir=tmp_path / "missing" + ) + with pytest.raises(UsageError) as exc: + clip_exec.run_clip(opts, AppState(), json_mode=False) + assert "--out-dir doesn't exist" in exc.value.message + assert "Create it first" in (exc.value.suggestion or "") + + +def test_run_clip_requires_ffmpeg(media, monkeypatch): + monkeypatch.setattr("shutil.which", lambda name: None) + opts = dataclasses.replace(DEFAULTS, media=str(media), ranges=["1-2"]) + with pytest.raises(CLIError) as exc: + clip_exec.run_clip(opts, AppState(), json_mode=False) + assert exc.value.error_type == "missing_dependency" + assert "ffmpeg is required" in exc.value.message + assert "Install it" in (exc.value.suggestion or "") + + +# --- range-only cutting (no transcript, no network) -------------------------- + + +def test_run_clip_range_only_cuts_and_emits_json(media, fake_ffmpeg, capsys): + opts = dataclasses.replace(DEFAULTS, media=str(media), ranges=["5-12.5"]) + clip_exec.run_clip(opts, AppState(), json_mode=True) + dest = media.parent / "meeting.clip01.mp4" + assert fake_ffmpeg == [ + [ + "/usr/bin/ffmpeg", + "-hide_banner", + "-loglevel", + "error", + "-y", + "-i", + str(media), + "-ss", + "5.000", + "-to", + "12.500", + str(dest), + ] + ] + payload = json.loads(capsys.readouterr().out) + assert payload == { + "source": str(media), + "transcript_id": None, + "clips": [{"path": str(dest), "start": 5.0, "end": 12.5, "duration": 7.5}], + } + + +def test_run_clip_human_mode_prints_one_line_per_clip(tmp_path, fake_ffmpeg, capsys, monkeypatch): + # A relative source keeps each rendered line under the 80-column console + # width — an absolute tmp_path would wrap and split the asserted text. + monkeypatch.chdir(tmp_path) + (tmp_path / "meeting.mp4").write_bytes(b"\x00fake-media") + opts = dataclasses.replace(DEFAULTS, media="meeting.mp4", ranges=["5-12.5678", "90-100"]) + clip_exec.run_clip(opts, AppState(), json_mode=False) + out = plain(capsys.readouterr().out) + assert "meeting.clip01.mp4" in out + assert "0:05.0 - 0:12.6" in out + # The duration rounds at 3 decimals (7.5678 -> 7.568). + assert "(7.568s)" in out + assert "meeting.clip02.mp4" in out + assert "1:30.0 - 1:40.0" in out + # Human mode prints lines, not a JSON object. + assert not out.lstrip().startswith("{") + + +def test_run_clip_applies_padding_to_explicit_ranges(media, fake_ffmpeg, capsys): + opts = dataclasses.replace(DEFAULTS, media=str(media), ranges=["5-10"], padding=1.0) + clip_exec.run_clip(opts, AppState(), json_mode=True) + assert fake_ffmpeg[0][7:11] == ["-ss", "4.000", "-to", "11.000"] + clips = json.loads(capsys.readouterr().out)["clips"] + assert (clips[0]["start"], clips[0]["end"]) == (4.0, 11.0) + + +def test_run_clip_rounds_payload_times_to_milliseconds(media, fake_ffmpeg, capsys): + # Both endpoints carry sub-millisecond noise, so every rounded field + # (start, end, duration) actually changes at 3 decimal places. + opts = dataclasses.replace(DEFAULTS, media=str(media), ranges=["0.1234-1.5678"]) + clip_exec.run_clip(opts, AppState(), json_mode=True) + clips = json.loads(capsys.readouterr().out)["clips"] + assert clips[0] == { + "path": str(media.parent / "meeting.clip01.mp4"), + "start": 0.123, + "end": 1.568, + "duration": 1.444, + } + + +def test_run_clip_honors_out_dir(media, tmp_path, fake_ffmpeg, capsys): + out_dir = tmp_path / "clips" + out_dir.mkdir() + opts = dataclasses.replace(DEFAULTS, media=str(media), ranges=["1-2"], out_dir=out_dir) + clip_exec.run_clip(opts, AppState(), json_mode=True) + dest = out_dir / "meeting.clip01.mp4" + assert fake_ffmpeg[0][-1] == str(dest) + assert json.loads(capsys.readouterr().out)["clips"][0]["path"] == str(dest) + + +def test_run_clip_surfaces_ffmpeg_failure(media, monkeypatch): + monkeypatch.setattr("shutil.which", lambda name: "/usr/bin/ffmpeg") + + def fail(args: list[str]) -> subprocess.CompletedProcess[str]: + return subprocess.CompletedProcess( + args=args, returncode=1, stdout="", stderr="noise\nInvalid data found\n" + ) + + monkeypatch.setattr(clip_exec, "_run_ffmpeg", fail) + opts = dataclasses.replace(DEFAULTS, media=str(media), ranges=["1-2"]) + with pytest.raises(CLIError) as exc: + clip_exec.run_clip(opts, AppState(), json_mode=False) + assert exc.value.error_type == "clip_failed" + assert "Could not cut meeting.clip01.mp4" in exc.value.message + # The last stderr line is the reason ffmpeg gives; earlier noise is dropped. + assert "Invalid data found" in exc.value.message + assert "noise" not in exc.value.message + assert "readable audio/video file" in (exc.value.suggestion or "") + + +def test_run_clip_reports_exit_code_when_ffmpeg_is_silent(media, monkeypatch): + monkeypatch.setattr("shutil.which", lambda name: "/usr/bin/ffmpeg") + monkeypatch.setattr( + clip_exec, + "_run_ffmpeg", + lambda args: subprocess.CompletedProcess(args=args, returncode=3, stdout="", stderr=""), + ) + opts = dataclasses.replace(DEFAULTS, media=str(media), ranges=["1-2"]) + with pytest.raises(CLIError) as exc: + clip_exec.run_clip(opts, AppState(), json_mode=False) + assert "ffmpeg exited with code 3" in exc.value.message + + +def test_run_ffmpeg_captures_output_and_does_not_raise(): + # The real boundary (not the fake): output is captured as text and a non-zero + # exit must not raise — _cut_clip turns the exit code into a CLIError itself. + result = clip_exec._run_ffmpeg( + [ + sys.executable, + "-c", + "import sys; print('out'); print('err', file=sys.stderr); sys.exit(3)", + ] + ) + assert result.returncode == 3 + assert result.stdout == "out\n" + assert result.stderr == "err\n" + + +# --- transcript-backed selection --------------------------------------------- + + +def test_run_clip_transcribes_with_speaker_labels(media, fake_ffmpeg, capsys, monkeypatch): + config.set_api_key("default", "sk_test") + seen = {} + + def fake_transcribe(api_key, audio, *, config): + seen["api_key"] = api_key + seen["audio"] = audio + seen["config"] = config + return fake_transcript(list(UTTERANCES)) + + monkeypatch.setattr(clip_exec.client, "transcribe", fake_transcribe) + opts = dataclasses.replace(DEFAULTS, media=str(media), speakers=["a"]) + clip_exec.run_clip(opts, AppState(), json_mode=True) + assert seen["api_key"] == "sk_test" + assert seen["audio"] == str(media) + assert seen["config"].speaker_labels is True + payload = json.loads(capsys.readouterr().out) + assert payload["transcript_id"] == "tr_123" + # Speaker A's two utterances: 1.5-2.5s and 5-6s. + assert [(c["start"], c["end"]) for c in payload["clips"]] == [(1.5, 2.5), (5.0, 6.0)] + assert fake_ffmpeg[0][-1] == str(media.parent / "meeting.clip01.mp4") + assert fake_ffmpeg[1][-1] == str(media.parent / "meeting.clip02.mp4") + + +def test_run_clip_reuses_transcript_by_id(media, fake_ffmpeg, capsys, monkeypatch): + config.set_api_key("default", "sk_test") + seen = {} + + def fake_get(api_key, transcript_id): + seen["args"] = (api_key, transcript_id) + return fake_transcript(list(UTTERANCES)) + + monkeypatch.setattr(clip_exec.client, "get_transcript", fake_get) + monkeypatch.setattr( + clip_exec.client, + "transcribe", + lambda *a, **k: pytest.fail("must not re-transcribe when -t is given"), + ) + opts = dataclasses.replace(DEFAULTS, media=str(media), transcript_id="tr_123", search="pricing") + clip_exec.run_clip(opts, AppState(), json_mode=True) + assert seen["args"] == ("sk_test", "tr_123") + payload = json.loads(capsys.readouterr().out) + assert [(c["start"], c["end"]) for c in payload["clips"]] == [(1.5, 2.5)] + + +def test_run_clip_merges_transcript_matches_with_explicit_ranges( + media, fake_ffmpeg, capsys, monkeypatch +): + config.set_api_key("default", "sk_test") + utterances = [utterance(5000, 8000, "A", "hello")] + monkeypatch.setattr(clip_exec.client, "transcribe", lambda *a, **k: fake_transcript(utterances)) + opts = dataclasses.replace(DEFAULTS, media=str(media), speakers=["A"], ranges=["7-12"]) + clip_exec.run_clip(opts, AppState(), json_mode=True) + clips = json.loads(capsys.readouterr().out)["clips"] + assert [(c["start"], c["end"]) for c in clips] == [(5.0, 12.0)] + + +def test_run_clip_errors_when_transcript_has_no_utterances(media, fake_ffmpeg, monkeypatch): + config.set_api_key("default", "sk_test") + monkeypatch.setattr(clip_exec.client, "transcribe", lambda *a, **k: fake_transcript(None)) + opts = dataclasses.replace(DEFAULTS, media=str(media), speakers=["A"]) + with pytest.raises(CLIError) as exc: + clip_exec.run_clip(opts, AppState(), json_mode=False) + assert exc.value.error_type == "no_utterances" + assert exc.value.exit_code == 2 + assert "tr_123 has no utterances" in exc.value.message + assert "--speaker-labels" in (exc.value.suggestion or "") + + +def test_run_clip_errors_when_nothing_matches(media, fake_ffmpeg, monkeypatch): + config.set_api_key("default", "sk_test") + monkeypatch.setattr( + clip_exec.client, "transcribe", lambda *a, **k: fake_transcript(list(UTTERANCES)) + ) + opts = dataclasses.replace(DEFAULTS, media=str(media), speakers=["Z"]) + with pytest.raises(CLIError) as exc: + clip_exec.run_clip(opts, AppState(), json_mode=False) + assert exc.value.error_type == "no_match" + assert "No transcript segments matched" in exc.value.message + assert "-o utterances" in (exc.value.suggestion or "") + + +def test_run_clip_status_messages(media, fake_ffmpeg, monkeypatch): + config.set_api_key("default", "sk_test") + monkeypatch.setattr( + clip_exec.client, "transcribe", lambda *a, **k: fake_transcript(list(UTTERANCES)) + ) + messages: list[str] = [] + + @contextlib.contextmanager + def fake_status(message, *, json_mode, quiet): + messages.append(message) + yield + + monkeypatch.setattr(clip_exec.output, "status", fake_status) + opts = dataclasses.replace(DEFAULTS, media=str(media), speakers=["A"]) + clip_exec.run_clip(opts, AppState(), json_mode=False) + assert messages == ["Transcribing for clip selection…", "Cutting 2 clip(s)…"] diff --git a/tests/test_clip_select.py b/tests/test_clip_select.py new file mode 100644 index 00000000..7754ec7b --- /dev/null +++ b/tests/test_clip_select.py @@ -0,0 +1,209 @@ +"""Tests for the pure clip selection logic (aai_cli/clip_select.py): --range +parsing, segment merging, utterance filtering, the LLM listing/reply contract, +and clock formatting.""" + +from __future__ import annotations + +import pytest + +from aai_cli import clip_select +from aai_cli.clip_select import Segment +from aai_cli.errors import CLIError, UsageError +from tests._clip_helpers import UTTERANCES + +# --- range parsing ----------------------------------------------------------- + + +@pytest.mark.parametrize( + ("flag_value", "start", "end"), + [ + ("5-12.5", 5.0, 12.5), + ("90-120", 90.0, 120.0), + ("1:30-2:45", 90.0, 165.0), + ("1:00:03.5-1:00:04", 3603.5, 3604.0), + ("5 - 10", 5.0, 10.0), + ("0-0.5", 0.0, 0.5), + ], +) +def test_parse_range_accepts_seconds_and_clock_times(flag_value, start, end): + assert clip_select.parse_range(flag_value) == Segment(start, end) + + +@pytest.mark.parametrize( + "flag_value", + ["5", "5-", "-5", "abc-10", "5-10-15", "1:2:3:4-5", "inf-10", "nan-10", "1e400-2e400"], +) +def test_parse_range_rejects_malformed_values(flag_value): + with pytest.raises(UsageError) as exc: + clip_select.parse_range(flag_value) + # Specifically the malformed-shape error — "1:2:3:4" must not parse as a + # huge clock value and fall through to the end-before-start error instead. + assert "Invalid --range" in exc.value.message + assert flag_value in exc.value.message + assert "START-END" in (exc.value.suggestion or "") + + +@pytest.mark.parametrize("flag_value", ["10-5", "5-5"]) +def test_parse_range_rejects_end_not_after_start(flag_value): + with pytest.raises(UsageError) as exc: + clip_select.parse_range(flag_value) + assert "end must be after its start" in exc.value.message + + +# --- segment merging --------------------------------------------------------- + + +def test_merge_segments_sorts_disjoint_segments(): + segs = [Segment(10.0, 11.0), Segment(0.0, 1.0)] + assert clip_select.merge_segments(segs, 0.0) == [Segment(0.0, 1.0), Segment(10.0, 11.0)] + + +def test_merge_segments_coalesces_overlapping_and_touching(): + assert clip_select.merge_segments([Segment(0.0, 5.0), Segment(4.0, 8.0)], 0.0) == [ + Segment(0.0, 8.0) + ] + # Back-to-back (start == previous end) folds too — `<=`, not `<`. + assert clip_select.merge_segments([Segment(0.0, 5.0), Segment(5.0, 8.0)], 0.0) == [ + Segment(0.0, 8.0) + ] + + +def test_merge_segments_keeps_outer_end_for_contained_segment(): + assert clip_select.merge_segments([Segment(0.0, 10.0), Segment(2.0, 3.0)], 0.0) == [ + Segment(0.0, 10.0) + ] + + +def test_merge_segments_merges_against_the_last_segment_only(): + segs = [Segment(0.0, 1.0), Segment(5.0, 6.0), Segment(10.0, 11.0), Segment(10.5, 12.0)] + assert clip_select.merge_segments(segs, 0.0) == [ + Segment(0.0, 1.0), + Segment(5.0, 6.0), + Segment(10.0, 12.0), + ] + + +def test_merge_segments_padding_widens_and_clamps_at_zero(): + assert clip_select.merge_segments([Segment(0.2, 1.0)], 0.5) == [Segment(0.0, 1.5)] + + +def test_merge_segments_padding_bridges_a_small_gap(): + merged = clip_select.merge_segments([Segment(0.0, 1.0), Segment(1.5, 2.0)], 0.3) + assert merged == [Segment(0.0, 2.0 + 0.3)] + + +# --- utterance selection ----------------------------------------------------- + + +def _filtered_segments(utterances, speakers, search): + matched = clip_select.matching_utterances(utterances, speakers, search) + return [clip_select.segment_of(u) for u in matched] + + +def test_utterance_segments_converts_milliseconds_to_seconds(): + segs = _filtered_segments(list(UTTERANCES), [], "sounds") + assert segs == [Segment(3.0, 4.0)] + + +def test_utterance_segments_speaker_filter_is_case_insensitive(): + segs = _filtered_segments(list(UTTERANCES), ["a"], None) + assert segs == [Segment(1.5, 2.5), Segment(5.0, 6.0)] + + +def test_utterance_segments_search_is_case_insensitive(): + segs = _filtered_segments(list(UTTERANCES), [], "PRICING") + assert segs == [Segment(1.5, 2.5)] + + +def test_utterance_segments_speaker_and_search_combine_with_and(): + segs = _filtered_segments(list(UTTERANCES), ["A"], "hiring") + assert segs == [Segment(5.0, 6.0)] + + +def test_utterance_segments_excludes_unselected_speakers(): + segs = _filtered_segments(list(UTTERANCES), ["B"], None) + assert segs == [Segment(3.0, 4.0)] + + +# --- the LLM listing / reply contract ------------------------------------------ + + +def test_utterance_listing_renders_timestamped_lines(): + listing = clip_select.utterance_listing(list(UTTERANCES)) + assert listing == ( + "[1.500-2.500] A: Let's talk pricing today.\n" + "[3.000-4.000] B: Sounds good.\n" + "[5.000-6.000] A: Moving on to hiring." + ) + + +@pytest.mark.parametrize( + "reply", + [ + '[{"start": 5, "end": 9.5}]', + '```json\n[{"start": 5, "end": 9.5}]\n```', + 'Here are the ranges: [{"start": 5, "end": 9.5}] - enjoy!', + # The slice must stop exactly at the closing "]" — the next char would + # break the JSON. + '[{"start": 5, "end": 9.5}], thanks', + ], +) +def test_parse_llm_segments_reads_the_array_through_noise(reply): + assert clip_select.parse_llm_segments(reply) == [Segment(5.0, 9.5)] + + +@pytest.mark.parametrize("reply", ["no ranges here", "", "[1, 2, 3]", "[{]"]) +def test_parse_llm_segments_rejects_unreadable_replies(reply): + with pytest.raises(CLIError) as exc: + clip_select.parse_llm_segments(reply) + assert exc.value.error_type == "llm_parse_error" + assert "could not be read as clip ranges" in exc.value.message + assert "JSON array" in (exc.value.suggestion or "") + + +def test_parse_llm_segments_errors_when_model_selects_nothing(): + with pytest.raises(CLIError) as exc: + clip_select.parse_llm_segments("[]") + assert exc.value.error_type == "no_match" + assert "The model selected no segments" in exc.value.message + assert "--speaker/--search/--range" in (exc.value.suggestion or "") + + +def test_parse_llm_segments_drops_malformed_entries(): + reply = ( + '[{"start": 0.5, "end": 0.9},' + ' {"start": "x", "end": 2},' + ' {"start": 3},' + ' {"start": -1, "end": 2},' + ' {"start": 4, "end": 4},' + ' {"start": 9, "end": 5},' + ' {"start": Infinity, "end": 10},' + ' {"start": 1, "end": Infinity},' + ' {"start": 6, "end": 7.5}]' + ) + assert clip_select.parse_llm_segments(reply) == [Segment(0.5, 0.9), Segment(6.0, 7.5)] + + +def test_segment_is_immutable(): + import dataclasses + + segment = Segment(0.0, 1.0) + field_name = dataclasses.fields(segment)[0].name + with pytest.raises(dataclasses.FrozenInstanceError): + setattr(segment, field_name, 5.0) + + +# --- clock formatting -------------------------------------------------------- + + +@pytest.mark.parametrize( + ("seconds", "rendered"), + [ + (5.0, "0:05.0"), + (90.5, "1:30.5"), + (3723.5, "1:02:03.5"), + (0.0, "0:00.0"), + ], +) +def test_format_clock(seconds, rendered): + assert clip_select.format_clock(seconds) == rendered diff --git a/tests/test_clip_sources.py b/tests/test_clip_sources.py new file mode 100644 index 00000000..2601ae2e --- /dev/null +++ b/tests/test_clip_sources.py @@ -0,0 +1,294 @@ +"""Tests for `assembly clip`'s alternative sources and LLM-driven selection: +YouTube/media-page downloads, the `-t -` stdin transcript pipe, and `--llm` +segment selection through the LLM Gateway (all boundaries faked).""" + +from __future__ import annotations + +import contextlib +import dataclasses +import json +from pathlib import Path + +import pytest + +from aai_cli import clip_exec, clip_select, config +from aai_cli.context import AppState +from aai_cli.errors import CLIError, UsageError +from tests._clip_helpers import DEFAULTS, UTTERANCES, fake_transcript, record_ffmpeg + + +@pytest.fixture +def media(tmp_path: Path) -> Path: + path = tmp_path / "meeting.mp4" + path.write_bytes(b"\x00fake-media") + return path + + +@pytest.fixture +def fake_ffmpeg(monkeypatch): + return record_ffmpeg(monkeypatch) + + +# --- YouTube / media-page sources --------------------------------------------- + + +@pytest.fixture +def fake_download(monkeypatch): + """Stand in for yt-dlp: 'download' a fixed audio file into the temp dir.""" + seen: dict[str, object] = {} + + def download(url, dest_dir): + seen["url"] = url + path = dest_dir / "vid123.m4a" + path.write_bytes(b"\x00audio") + seen["path"] = path + return path + + monkeypatch.setattr(clip_exec.youtube, "download_audio", download) + return seen + + +YT_URL = "https://www.youtube.com/watch?v=abc123" + + +def test_run_clip_downloads_youtube_audio_into_cwd( + tmp_path, fake_ffmpeg, fake_download, capsys, monkeypatch +): + monkeypatch.chdir(tmp_path) + opts = dataclasses.replace(DEFAULTS, media=YT_URL, ranges=["1-2"]) + clip_exec.run_clip(opts, AppState(), json_mode=True) + assert fake_download["url"] == YT_URL + # ffmpeg reads the downloaded temp file; the clip lands in the cwd, named + # after the download (the temp dir is gone after the run). + assert fake_ffmpeg[0][6] == str(fake_download["path"]) + dest = tmp_path / "vid123.clip01.m4a" + assert fake_ffmpeg[0][-1] == str(dest) + payload = json.loads(capsys.readouterr().out) + assert payload["source"] == YT_URL + assert payload["clips"][0]["path"] == str(dest) + + +def test_run_clip_youtube_honors_out_dir(tmp_path, fake_ffmpeg, fake_download, capsys): + out_dir = tmp_path / "clips" + out_dir.mkdir() + opts = dataclasses.replace(DEFAULTS, media=YT_URL, ranges=["1-2"], out_dir=out_dir) + clip_exec.run_clip(opts, AppState(), json_mode=True) + assert fake_ffmpeg[0][-1] == str(out_dir / "vid123.clip01.m4a") + + +def test_run_clip_youtube_transcribes_the_downloaded_file( + tmp_path, fake_ffmpeg, fake_download, capsys, monkeypatch +): + monkeypatch.chdir(tmp_path) + config.set_api_key("default", "sk_test") + seen = {} + + def fake_transcribe(api_key, audio, *, config): + seen["audio"] = audio + return fake_transcript(list(UTTERANCES)) + + monkeypatch.setattr(clip_exec.client, "transcribe", fake_transcribe) + monkeypatch.setattr( + clip_exec.llm, "transform_transcript", lambda *a, **k: '[{"start": 1, "end": 2}]' + ) + opts = dataclasses.replace(DEFAULTS, media=YT_URL, llm_prompt="best moment") + clip_exec.run_clip(opts, AppState(), json_mode=True) + assert seen["audio"] == str(fake_download["path"]) + payload = json.loads(capsys.readouterr().out) + assert [(c["start"], c["end"]) for c in payload["clips"]] == [(1.0, 2.0)] + + +def test_run_clip_youtube_download_status_message( + tmp_path, fake_ffmpeg, fake_download, capsys, monkeypatch +): + monkeypatch.chdir(tmp_path) + messages: list[str] = [] + + @contextlib.contextmanager + def fake_status(message, *, json_mode, quiet): + messages.append(message) + yield + + monkeypatch.setattr(clip_exec.output, "status", fake_status) + opts = dataclasses.replace(DEFAULTS, media=YT_URL, ranges=["1-2"]) + clip_exec.run_clip(opts, AppState(), json_mode=False) + assert messages == ["Downloading audio…", "Cutting 1 clip(s)…"] + + +# --- transcript piped on stdin (-t -) ------------------------------------------- + + +def _piped_payload(): + return json.dumps( + { + "id": "tr_piped", + "utterances": [ + {"start": 1500, "end": 2500, "speaker": "A", "text": "Let's talk pricing."}, + {"start": 3000, "end": 4000, "speaker": "B", "text": "Sounds good."}, + ], + } + ) + + +def test_run_clip_reads_transcript_json_from_stdin(media, fake_ffmpeg, capsys, monkeypatch): + # No API key configured and no client call: the piped JSON is the transcript. + monkeypatch.setattr(clip_exec.stdio, "piped_stdin_text", _piped_payload) + monkeypatch.setattr( + clip_exec.client, + "get_transcript", + lambda *a: pytest.fail("must not fetch when JSON is piped"), + ) + opts = dataclasses.replace(DEFAULTS, media=str(media), transcript_id="-", speakers=["A"]) + clip_exec.run_clip(opts, AppState(), json_mode=True) + payload = json.loads(capsys.readouterr().out) + assert payload["transcript_id"] == "tr_piped" + assert [(c["start"], c["end"]) for c in payload["clips"]] == [(1.5, 2.5)] + + +def test_run_clip_reads_transcript_id_from_stdin(media, fake_ffmpeg, capsys, monkeypatch): + config.set_api_key("default", "sk_test") + monkeypatch.setattr(clip_exec.stdio, "piped_stdin_text", lambda: "tr_999\n") + seen = {} + + def fake_get(api_key, transcript_id): + seen["args"] = (api_key, transcript_id) + return fake_transcript(list(UTTERANCES)) + + monkeypatch.setattr(clip_exec.client, "get_transcript", fake_get) + opts = dataclasses.replace(DEFAULTS, media=str(media), transcript_id="-", speakers=["B"]) + clip_exec.run_clip(opts, AppState(), json_mode=True) + assert seen["args"] == ("sk_test", "tr_999") + + +def test_run_clip_stdin_transcript_requires_piped_input(media, fake_ffmpeg, monkeypatch): + monkeypatch.setattr(clip_exec.stdio, "piped_stdin_text", lambda: None) + opts = dataclasses.replace(DEFAULTS, media=str(media), transcript_id="-", speakers=["A"]) + with pytest.raises(UsageError) as exc: + clip_exec.run_clip(opts, AppState(), json_mode=False) + assert "-t - expects a transcript id or transcript JSON on stdin" in exc.value.message + assert "assembly clip -t -" in (exc.value.suggestion or "") + + +def test_run_clip_stdin_transcript_rejects_bad_json(media, fake_ffmpeg, monkeypatch): + monkeypatch.setattr(clip_exec.stdio, "piped_stdin_text", lambda: '{"id": ') + opts = dataclasses.replace(DEFAULTS, media=str(media), transcript_id="-", speakers=["A"]) + with pytest.raises(UsageError) as exc: + clip_exec.run_clip(opts, AppState(), json_mode=False) + assert "Couldn't parse the transcript JSON on stdin" in exc.value.message + + +# --- LLM-driven selection ----------------------------------------------------- + + +def test_run_clip_llm_selection_drives_the_cut(media, fake_ffmpeg, capsys, monkeypatch): + config.set_api_key("default", "sk_test") + monkeypatch.setattr( + clip_exec.client, "transcribe", lambda *a, **k: fake_transcript(list(UTTERANCES)) + ) + seen = {} + + def fake_transform(api_key, *, prompt, transcript_text, model, max_tokens): + seen.update( + api_key=api_key, + prompt=prompt, + transcript_text=transcript_text, + model=model, + max_tokens=max_tokens, + ) + return ' [{"start": 1.5, "end": 4.0}] ' + + monkeypatch.setattr(clip_exec.llm, "transform_transcript", fake_transform) + opts = dataclasses.replace( + DEFAULTS, + media=str(media), + llm_prompt="the pricing discussion", + model="gpt-5", + max_tokens=64, + ) + clip_exec.run_clip(opts, AppState(), json_mode=True) + assert seen["api_key"] == "sk_test" + # The reply contract is prefixed; the user's instruction closes the prompt. + assert "Reply with only a JSON array" in seen["prompt"] + assert seen["prompt"].endswith("Selection instruction: the pricing discussion") + assert seen["transcript_text"] == clip_select.utterance_listing(list(UTTERANCES)) + assert seen["model"] == "gpt-5" + assert seen["max_tokens"] == 64 + payload = json.loads(capsys.readouterr().out) + assert payload["transcript_id"] == "tr_123" + assert [(c["start"], c["end"]) for c in payload["clips"]] == [(1.5, 4.0)] + assert fake_ffmpeg[0][7:11] == ["-ss", "1.500", "-to", "4.000"] + + +def test_run_clip_llm_composes_with_speaker_filter(media, fake_ffmpeg, capsys, monkeypatch): + # --speaker narrows the utterances first; the LLM only sees what survived. + config.set_api_key("default", "sk_test") + monkeypatch.setattr( + clip_exec.client, "transcribe", lambda *a, **k: fake_transcript(list(UTTERANCES)) + ) + seen = {} + + def fake_transform(api_key, *, prompt, transcript_text, model, max_tokens): + seen["transcript_text"] = transcript_text + return '[{"start": 5.0, "end": 6.0}]' + + monkeypatch.setattr(clip_exec.llm, "transform_transcript", fake_transform) + opts = dataclasses.replace(DEFAULTS, media=str(media), speakers=["A"], llm_prompt="hiring talk") + clip_exec.run_clip(opts, AppState(), json_mode=True) + assert "B: Sounds good." not in seen["transcript_text"] + assert "A: Moving on to hiring." in seen["transcript_text"] + payload = json.loads(capsys.readouterr().out) + assert [(c["start"], c["end"]) for c in payload["clips"]] == [(5.0, 6.0)] + + +def test_run_clip_llm_works_with_transcript_id(media, fake_ffmpeg, capsys, monkeypatch): + # -t with --llm alone is a valid selection (no --speaker/--search needed). + config.set_api_key("default", "sk_test") + monkeypatch.setattr( + clip_exec.client, "get_transcript", lambda *a: fake_transcript(list(UTTERANCES)) + ) + monkeypatch.setattr( + clip_exec.llm, + "transform_transcript", + lambda *a, **k: '[{"start": 3.0, "end": 4.0}]', + ) + opts = dataclasses.replace(DEFAULTS, media=str(media), transcript_id="tr_123", llm_prompt="x") + clip_exec.run_clip(opts, AppState(), json_mode=True) + payload = json.loads(capsys.readouterr().out) + assert [(c["start"], c["end"]) for c in payload["clips"]] == [(3.0, 4.0)] + + +def test_run_clip_llm_parse_error_surfaces(media, fake_ffmpeg, monkeypatch): + config.set_api_key("default", "sk_test") + monkeypatch.setattr( + clip_exec.client, "transcribe", lambda *a, **k: fake_transcript(list(UTTERANCES)) + ) + monkeypatch.setattr(clip_exec.llm, "transform_transcript", lambda *a, **k: "no json, sorry") + opts = dataclasses.replace(DEFAULTS, media=str(media), llm_prompt="x") + with pytest.raises(CLIError) as exc: + clip_exec.run_clip(opts, AppState(), json_mode=False) + assert exc.value.error_type == "llm_parse_error" + + +def test_run_clip_llm_status_message_names_the_model(media, fake_ffmpeg, monkeypatch): + config.set_api_key("default", "sk_test") + monkeypatch.setattr( + clip_exec.client, "transcribe", lambda *a, **k: fake_transcript(list(UTTERANCES)) + ) + monkeypatch.setattr( + clip_exec.llm, "transform_transcript", lambda *a, **k: '[{"start": 1, "end": 2}]' + ) + messages: list[str] = [] + + @contextlib.contextmanager + def fake_status(message, *, json_mode, quiet): + messages.append(message) + yield + + monkeypatch.setattr(clip_exec.output, "status", fake_status) + opts = dataclasses.replace(DEFAULTS, media=str(media), llm_prompt="best bits", model="gpt-5") + clip_exec.run_clip(opts, AppState(), json_mode=False) + assert messages == [ + "Transcribing for clip selection…", + "Selecting segments with gpt-5…", + "Cutting 1 clip(s)…", + ] diff --git a/tests/test_smoke.py b/tests/test_smoke.py index a1ec7119..9e32a297 100644 --- a/tests/test_smoke.py +++ b/tests/test_smoke.py @@ -156,6 +156,7 @@ def test_help_lists_commands_in_workflow_order(): "agent", "speak", "llm", + "clip", "eval", "webhooks", # Setup & Tools