From f7beda65d72d2b7eac8f8ed1ccc0a75c9ed380f1 Mon Sep 17 00:00:00 2001 From: Claude Date: Fri, 12 Jun 2026 23:20:16 +0000 Subject: [PATCH 1/2] Deduplicate clip/dub media scaffolding into shared mediafile module The dub command (#135) copied clip_exec's local-media validation, ffmpeg discovery/invocation, ffmpeg-failure mapping, and diarized-transcript resolution near-verbatim, and speak_exec's sandbox guard. Hoist the shared scaffolding so the two commands can't drift apart: - aai_cli/mediafile.py: validate_local_media, require_ffmpeg, run_ffmpeg, ffmpeg_failure, resolve_diarized_transcript, used by both clip_exec and dub_exec (parameterized by command/purpose strings). - tts/session.require_available(command): one sandbox guard for speak and dub (speak's message now also names streaming TTS as the reason). - dub_exec: resolve the API key once per run instead of twice (config.resolve_api_key hits keyring IPC each call), pass the already-computed transcript id into _utterances_of, fold the single-use `starts` generator into the zip comprehension, and return assemble_timeline's bytearray directly instead of copying the whole dubbed track into bytes (write_wav accepts any buffer). Tests: ffmpeg fakes now patch mediafile.run_ffmpeg; the duplicated plain() ANSI-stripper in _dub_helpers imports from _clip_helpers; new dub status-message test; suggestion asserts pin the per-command parameterization of the shared helpers. https://claude.ai/code/session_018TuAQTvp9PVy5EdhsDWo2h --- .importlinter | 1 + aai_cli/clip_exec.py | 72 +++++------------------- aai_cli/dub_exec.py | 109 ++++++++----------------------------- aai_cli/mediafile.py | 93 +++++++++++++++++++++++++++++++ aai_cli/speak_exec.py | 13 +---- aai_cli/tts/audio.py | 2 +- aai_cli/tts/session.py | 13 +++++ tests/_clip_helpers.py | 4 +- tests/_dub_helpers.py | 13 +---- tests/test_clip_command.py | 4 +- tests/test_clip_exec.py | 40 +++++++------- tests/test_clip_sources.py | 28 +++------- tests/test_dub_command.py | 2 +- tests/test_dub_exec.py | 26 ++------- tests/test_dub_pipeline.py | 35 +++++++++--- 15 files changed, 217 insertions(+), 238 deletions(-) create mode 100644 aai_cli/mediafile.py diff --git a/.importlinter b/.importlinter index 9a666fdc..841117d0 100644 --- a/.importlinter +++ b/.importlinter @@ -31,6 +31,7 @@ source_modules = aai_cli.init aai_cli.llm aai_cli.llm_exec + aai_cli.mediafile aai_cli.microphone aai_cli.options aai_cli.output diff --git a/aai_cli/clip_exec.py b/aai_cli/clip_exec.py index aae96f48..d86e1208 100644 --- a/aai_cli/clip_exec.py +++ b/aai_cli/clip_exec.py @@ -20,17 +20,14 @@ from __future__ import annotations import json -import shutil -import subprocess import tempfile from dataclasses import dataclass from pathlib import Path from types import SimpleNamespace -import assemblyai as aai from rich.markup import escape -from aai_cli import client, clip_select, jsonshape, llm, output, stdio, youtube +from aai_cli import clip_select, jsonshape, llm, mediafile, output, stdio, youtube from aai_cli.clip_select import Segment from aai_cli.context import AppState from aai_cli.errors import CLIError, UsageError @@ -140,12 +137,14 @@ def _resolve_transcript( if text.startswith("{"): return _piped_transcript(text) transcript_id = text # a bare id (e.g. from `assembly transcribe … -o id`) - if transcript_id is not None: - return client.get_transcript(state.resolve_api_key(), transcript_id) - config = aai.TranscriptionConfig(speaker_labels=True) - api_key = state.resolve_api_key() - with output.status("Transcribing for clip selection…", json_mode=json_mode, quiet=state.quiet): - return client.transcribe(api_key, str(media), config=config) + return mediafile.resolve_diarized_transcript( + state.resolve_api_key(), + transcript_id, + media, + status_message="Transcribing for clip selection…", + json_mode=json_mode, + quiet=state.quiet, + ) def _transcript_segments( @@ -190,26 +189,6 @@ def _transcript_segments( return [clip_select.segment_of(utterance) for utterance in matched], transcript_id -def _validate_media(media: Path) -> None: - """Reject a missing local source before credential resolution, so a typo'd - path reads as "file not found", never as a login prompt or an opaque - ffmpeg error.""" - if not media.exists(): - raise CLIError( - f"File not found: {media}", - error_type="file_not_found", - exit_code=2, - suggestion="Check the path. assembly clip needs a local audio/video file.", - ) - if not media.is_file(): - raise CLIError( - f"Not a file: {media}", - error_type="not_a_file", - exit_code=2, - suggestion="Pass a media file, not a directory.", - ) - - def _validate_out_dir(out_dir: Path | None) -> None: if out_dir is not None and not out_dir.is_dir(): raise UsageError( @@ -235,23 +214,6 @@ def _validate_selection(opts: ClipOptions) -> None: ) -def _require_ffmpeg() -> str: - """The ffmpeg executable; checked before any (billed) transcription work.""" - path = shutil.which("ffmpeg") - if path is None: - raise CLIError( - "ffmpeg is required to cut media, but it isn't on PATH.", - error_type="missing_dependency", - suggestion="Install it (brew install ffmpeg / apt install ffmpeg) and re-run.", - ) - return path - - -def _run_ffmpeg(args: list[str]) -> subprocess.CompletedProcess[str]: - """Boundary seam for tests: one ffmpeg invocation, output captured.""" - return subprocess.run(args, capture_output=True, text=True, check=False) - - # -30dB for at least 0.2s reads as a pause in normal speech recordings. _SILENCE_FILTER = "silencedetect=noise=-30dB:d=0.2" @@ -264,7 +226,7 @@ def _detect_silences(ffmpeg: str, media: Path) -> list[Segment]: silencedetect logs at info level on stderr, so the usual ``-loglevel error`` would silence the very lines this parses. """ - result = _run_ffmpeg( + result = mediafile.run_ffmpeg( [ ffmpeg, "-hide_banner", @@ -290,7 +252,7 @@ def _cut_clip(ffmpeg: str, media: Path, segment: Segment, dest: Path) -> None: would snap to the nearest keyframe; ``-y`` makes a re-run overwrite its own earlier output instead of stalling on ffmpeg's prompt. """ - result = _run_ffmpeg( + result = mediafile.run_ffmpeg( [ ffmpeg, "-hide_banner", @@ -307,13 +269,7 @@ def _cut_clip(ffmpeg: str, media: Path, segment: Segment, dest: Path) -> None: ] ) if result.returncode != 0: - detail = result.stderr.strip().splitlines() - reason = detail[-1] if detail else f"ffmpeg exited with code {result.returncode}" - raise CLIError( - f"Could not cut {dest.name}: {reason}", - error_type="clip_failed", - suggestion="Check that the input is a readable audio/video file.", - ) + raise mediafile.ffmpeg_failure(result, "cut", dest, error_type="clip_failed") def _clip_dest(media: Path, out_dir: Path | None, index: int) -> Path: @@ -348,7 +304,7 @@ def run_clip(opts: ClipOptions, state: AppState, *, json_mode: bool) -> None: _validate_out_dir(opts.out_dir) _validate_selection(opts) explicit = [clip_select.parse_range(value) for value in opts.ranges] - ffmpeg = _require_ffmpeg() + ffmpeg = mediafile.require_ffmpeg("cut media") if youtube.is_downloadable_url(opts.media): # A media-page URL (YouTube, podcast page, …) is downloaded once and # clipped locally. The download dir is temporary, so the clips land in @@ -366,7 +322,7 @@ def run_clip(opts: ClipOptions, state: AppState, *, json_mode: bool) -> None: suggestion="Download the media first, then clip the local copy.", ) media = Path(opts.media) - _validate_media(media) + mediafile.validate_local_media(media, "clip") _cut_and_emit(opts, media, opts.out_dir, explicit, ffmpeg, state, json_mode=json_mode) diff --git a/aai_cli/dub_exec.py b/aai_cli/dub_exec.py index 26040d49..413d265a 100644 --- a/aai_cli/dub_exec.py +++ b/aai_cli/dub_exec.py @@ -18,16 +18,13 @@ from __future__ import annotations import re -import shutil -import subprocess import tempfile from dataclasses import dataclass from pathlib import Path -import assemblyai as aai from rich.markup import escape -from aai_cli import client, environments, jsonshape, output +from aai_cli import jsonshape, mediafile, output from aai_cli import llm as gateway from aai_cli.context import AppState from aai_cli.errors import APIError, CLIError, UsageError @@ -104,7 +101,7 @@ def assemble_timeline( placed: list[tuple[int, bytes]], sample_rate: int, total_seconds: float | None, -) -> bytes: +) -> bytearray: """Lay each ``(start_ms, pcm)`` segment onto a silence timeline. Gaps before a segment's start are filled with silence; a segment whose @@ -122,7 +119,7 @@ def assemble_timeline( tail = total_seconds - _pcm_seconds(pcm, sample_rate) if tail > 0: pcm.extend(audio.silence(sample_rate, tail)) - return bytes(pcm) + return pcm def _pcm_seconds(pcm: bytes | bytearray, sample_rate: int) -> float: @@ -130,37 +127,6 @@ def _pcm_seconds(pcm: bytes | bytearray, sample_rate: int) -> float: return len(pcm) / 2 / sample_rate -def _require_sandbox() -> None: - """`assembly dub` synthesizes with streaming TTS, which is sandbox-only today.""" - if not session.is_available(): - raise CLIError( - "assembly dub is only available in the sandbox (it uses streaming TTS).", - error_type="unsupported_environment", - exit_code=2, - suggestion="Re-run as: assembly --sandbox dub … " - f"(--sandbox goes before the command; or use --env {environments.SANDBOX_ENV}).", - ) - - -def _validate_media(media: Path) -> None: - """Reject a missing local source before credential resolution, so a typo'd - path reads as "file not found", never as a login prompt or an ffmpeg error.""" - if not media.exists(): - raise CLIError( - f"File not found: {media}", - error_type="file_not_found", - exit_code=2, - suggestion="Check the path. assembly dub needs a local audio/video file.", - ) - if not media.is_file(): - raise CLIError( - f"Not a file: {media}", - error_type="not_a_file", - exit_code=2, - suggestion="Pass a media file, not a directory.", - ) - - def _validate_out(out: Path, media: Path) -> None: """The dub must never overwrite its own input: ffmpeg would read and write the same file concurrently, corrupting it.""" @@ -171,23 +137,6 @@ def _validate_out(out: Path, media: Path) -> None: ) -def _require_ffmpeg() -> str: - """The ffmpeg executable; checked before any (billed) transcription work.""" - path = shutil.which("ffmpeg") - if path is None: - raise CLIError( - "ffmpeg is required to write the dubbed file, but it isn't on PATH.", - error_type="missing_dependency", - suggestion="Install it (brew install ffmpeg / apt install ffmpeg) and re-run.", - ) - return path - - -def _run_ffmpeg(args: list[str]) -> subprocess.CompletedProcess[str]: - """Boundary seam for tests: one ffmpeg invocation, output captured.""" - return subprocess.run(args, capture_output=True, text=True, check=False) - - def _mux(ffmpeg: str, media: Path, track: Path, out: Path) -> None: """Swap ``track`` in as the audio of ``media``, writing ``out``. @@ -196,7 +145,7 @@ def _mux(ffmpeg: str, media: Path, track: Path, out: Path) -> None: dubs both a video and a plain audio file. ``-y`` makes a re-run overwrite its own earlier output instead of stalling on ffmpeg's prompt. """ - result = _run_ffmpeg( + result = mediafile.run_ffmpeg( [ ffmpeg, "-hide_banner", @@ -217,13 +166,7 @@ def _mux(ffmpeg: str, media: Path, track: Path, out: Path) -> None: ] ) if result.returncode != 0: - detail = result.stderr.strip().splitlines() - reason = detail[-1] if detail else f"ffmpeg exited with code {result.returncode}" - raise CLIError( - f"Could not write {out.name}: {reason}", - error_type="dub_failed", - suggestion="Check that the input is a readable audio/video file.", - ) + raise mediafile.ffmpeg_failure(result, "write", out, error_type="dub_failed") @dataclass(frozen=True) @@ -235,21 +178,7 @@ class _Utterance: text: str -def _resolve_transcript( - opts: DubOptions, media: Path, state: AppState, *, json_mode: bool -) -> object: - """The diarized transcript driving the dub: fetched by id, or made fresh from - the (already local) media file — always with speaker labels, so each speaker - can keep a distinct voice in the dub.""" - if opts.transcript_id is not None: - return client.get_transcript(state.resolve_api_key(), opts.transcript_id) - config = aai.TranscriptionConfig(speaker_labels=True) - api_key = state.resolve_api_key() - with output.status("Transcribing for dubbing…", json_mode=json_mode, quiet=state.quiet): - return client.transcribe(api_key, str(media), config=config) - - -def _utterances_of(transcript: object) -> list[_Utterance]: +def _utterances_of(transcript: object, transcript_id: str) -> list[_Utterance]: """The transcript's spoken utterances, with empty-text ones dropped.""" utterances = [ _Utterance( @@ -261,7 +190,6 @@ def _utterances_of(transcript: object) -> list[_Utterance]: ] spoken = [utterance for utterance in utterances if utterance.text] if not spoken: - transcript_id = str(getattr(transcript, "id", "")) raise CLIError( f"Transcript {transcript_id} has no utterances to dub.", error_type="no_utterances", @@ -370,17 +298,24 @@ def _assign_voices( def run_dub(opts: DubOptions, state: AppState, *, json_mode: bool) -> None: """Execute one `assembly dub` invocation from already-parsed flags.""" language = resolve_language(opts.language) - _require_sandbox() + session.require_available("dub") media = Path(opts.media) - _validate_media(media) + mediafile.validate_local_media(media, "dub") out = opts.out if opts.out is not None else default_out_path(media, language) _validate_out(out, media) - ffmpeg = _require_ffmpeg() + ffmpeg = mediafile.require_ffmpeg("write the dubbed file") - transcript = _resolve_transcript(opts, media, state, json_mode=json_mode) - transcript_id = str(getattr(transcript, "id", "")) - utterances = _utterances_of(transcript) api_key = state.resolve_api_key() + transcript = mediafile.resolve_diarized_transcript( + api_key, + opts.transcript_id, + media, + status_message="Transcribing for dubbing…", + json_mode=json_mode, + quiet=state.quiet, + ) + transcript_id = str(getattr(transcript, "id", "")) + utterances = _utterances_of(transcript, transcript_id) translations = _translate( api_key, utterances, language, opts, json_mode=json_mode, quiet=state.quiet ) @@ -390,8 +325,10 @@ def run_dub(opts: DubOptions, state: AppState, *, json_mode: bool) -> None: ) # strict=True is an invariant guard only: _synthesize returns one PCM per segment. - starts = (u.start_ms for u in utterances) - placed = list(zip(starts, pcm_segments, strict=True)) # pragma: no mutate + placed = [ + (utterance.start_ms, pcm) + for utterance, pcm in zip(utterances, pcm_segments, strict=True) # pragma: no mutate + ] track = assemble_timeline(placed, sample_rate, _total_seconds(transcript)) with tempfile.TemporaryDirectory(prefix="aai-dub-") as tmp: wav = Path(tmp) / "dub.wav" diff --git a/aai_cli/mediafile.py b/aai_cli/mediafile.py new file mode 100644 index 00000000..8fb3713f --- /dev/null +++ b/aai_cli/mediafile.py @@ -0,0 +1,93 @@ +"""Shared scaffolding for commands that operate on a local media file (clip, +dub): source validation, ffmpeg discovery/invocation, and resolution of the +diarized transcript that drives selection or dubbing. + +The helpers raise identical CLIErrors regardless of the calling command — only +the command-name/purpose strings differ — so the media-file UX of `assembly +clip` and `assembly dub` can't drift apart. +""" + +from __future__ import annotations + +import shutil +import subprocess +from pathlib import Path + +import assemblyai as aai + +from aai_cli import client, output +from aai_cli.errors import CLIError + + +def validate_local_media(media: Path, command: str) -> None: + """Reject a missing local source before credential resolution, so a typo'd + path reads as "file not found", never as a login prompt or an opaque + ffmpeg error.""" + if not media.exists(): + raise CLIError( + f"File not found: {media}", + error_type="file_not_found", + exit_code=2, + suggestion=f"Check the path. assembly {command} needs a local audio/video file.", + ) + if not media.is_file(): + raise CLIError( + f"Not a file: {media}", + error_type="not_a_file", + exit_code=2, + suggestion="Pass a media file, not a directory.", + ) + + +def require_ffmpeg(purpose: str) -> str: + """The ffmpeg executable; checked before any (billed) transcription work.""" + path = shutil.which("ffmpeg") + if path is None: + raise CLIError( + f"ffmpeg is required to {purpose}, but it isn't on PATH.", + error_type="missing_dependency", + suggestion="Install it (brew install ffmpeg / apt install ffmpeg) and re-run.", + ) + return path + + +def run_ffmpeg(args: list[str]) -> subprocess.CompletedProcess[str]: + """Boundary seam for tests: one ffmpeg invocation, output captured.""" + return subprocess.run(args, capture_output=True, text=True, check=False) + + +def ffmpeg_failure( + result: subprocess.CompletedProcess[str], + action: str, + dest: Path, + *, + error_type: str, +) -> CLIError: + """A failed ffmpeg run as a clean CLIError: the reason is ffmpeg's last + stderr line (earlier noise dropped), or the exit code when it said nothing.""" + detail = result.stderr.strip().splitlines() + reason = detail[-1] if detail else f"ffmpeg exited with code {result.returncode}" + return CLIError( + f"Could not {action} {dest.name}: {reason}", + error_type=error_type, + suggestion="Check that the input is a readable audio/video file.", + ) + + +def resolve_diarized_transcript( + api_key: str, + transcript_id: str | None, + media: Path, + *, + status_message: str, + json_mode: bool, + quiet: bool, +) -> object: + """The diarized transcript driving the command: fetched by id, or made fresh + from the (already local) media file — always with speaker labels, so the + caller can select or voice content per speaker.""" + if transcript_id is not None: + return client.get_transcript(api_key, transcript_id) + config = aai.TranscriptionConfig(speaker_labels=True) + with output.status(status_message, json_mode=json_mode, quiet=quiet): + return client.transcribe(api_key, str(media), config=config) diff --git a/aai_cli/speak_exec.py b/aai_cli/speak_exec.py index 1dd16e58..56f3f082 100644 --- a/aai_cli/speak_exec.py +++ b/aai_cli/speak_exec.py @@ -11,9 +11,9 @@ from dataclasses import dataclass from pathlib import Path -from aai_cli import environments, output, stdio +from aai_cli import output, stdio from aai_cli.context import AppState -from aai_cli.errors import CLIError, UsageError +from aai_cli.errors import UsageError from aai_cli.tts import audio, dialogue, session # The streaming-TTS reference client defaults to the PocketTTS "jane" voice and @@ -177,14 +177,7 @@ def _speak_dialogue( def run_speak(opts: SpeakOptions, state: AppState, *, json_mode: bool) -> None: """Execute one `assembly speak` invocation from already-parsed flags.""" - if not session.is_available(): - raise CLIError( - "assembly speak is only available in the sandbox.", - error_type="unsupported_environment", - exit_code=2, - suggestion="Re-run as: assembly --sandbox speak … " - f"(--sandbox goes before the command; or use --env {environments.SANDBOX_ENV}).", - ) + session.require_available("speak") spoken = _read_text(opts.text) api_key = state.resolve_api_key() bare_voice, overrides = dialogue.parse_voice_overrides(opts.voice) diff --git a/aai_cli/tts/audio.py b/aai_cli/tts/audio.py index 315c1a30..eb0a4d75 100644 --- a/aai_cli/tts/audio.py +++ b/aai_cli/tts/audio.py @@ -36,7 +36,7 @@ def close(self) -> None: _PLAYBACK_CHUNK_BYTES = 4096 -def write_wav(path: Path, pcm: bytes, sample_rate: int) -> None: +def write_wav(path: Path, pcm: bytes | bytearray, sample_rate: int) -> None: """Write 16-bit mono PCM to a WAV file, creating parent dirs as needed.""" path.parent.mkdir(parents=True, exist_ok=True) with wave.open(str(path), "wb") as wav: diff --git a/aai_cli/tts/session.py b/aai_cli/tts/session.py index 7349ba2d..f54cd7c2 100644 --- a/aai_cli/tts/session.py +++ b/aai_cli/tts/session.py @@ -86,6 +86,19 @@ def is_available() -> bool: return bool(environments.active().streaming_tts_host) +def require_available(command: str) -> None: + """Refuse to run a streaming-TTS command (speak, dub) outside the sandbox, + pointing at the exact --sandbox re-invocation.""" + if not is_available(): + raise CLIError( + f"assembly {command} is only available in the sandbox (it uses streaming TTS).", + error_type="unsupported_environment", + exit_code=2, + suggestion=f"Re-run as: assembly --sandbox {command} … " + f"(--sandbox goes before the command; or use --env {environments.SANDBOX_ENV}).", + ) + + def ws_url(params: dict[str, str]) -> str: """The streaming-TTS socket URL for the active environment, with query params.""" base = f"wss://{environments.active().streaming_tts_host}/v1/ws/" diff --git a/tests/_clip_helpers.py b/tests/_clip_helpers.py index 86e969fe..250fc109 100644 --- a/tests/_clip_helpers.py +++ b/tests/_clip_helpers.py @@ -14,7 +14,7 @@ import pytest -from aai_cli import clip_exec, llm +from aai_cli import llm, mediafile from aai_cli.clip_exec import ClipOptions _ANSI_SGR = re.compile(r"\x1b\[[0-9;]*m") @@ -70,5 +70,5 @@ def run(args: list[str]) -> subprocess.CompletedProcess[str]: stderr = detect_log if "-af" in args else "" return subprocess.CompletedProcess(args=args, returncode=0, stdout="", stderr=stderr) - monkeypatch.setattr(clip_exec, "_run_ffmpeg", run) + monkeypatch.setattr(mediafile, "run_ffmpeg", run) return calls diff --git a/tests/_dub_helpers.py b/tests/_dub_helpers.py index d1670d9e..32fdce0a 100644 --- a/tests/_dub_helpers.py +++ b/tests/_dub_helpers.py @@ -8,7 +8,6 @@ from __future__ import annotations -import re import subprocess import wave from pathlib import Path @@ -16,7 +15,7 @@ import pytest -from aai_cli import client, config, dub_exec, llm +from aai_cli import client, config, llm, mediafile from aai_cli.dub_exec import DubOptions from aai_cli.tts import session from aai_cli.tts.session import SpeakResult @@ -34,14 +33,6 @@ SAMPLE_RATE = 100 # tiny rate keeps the timeline byte math exact and readable -_ANSI_SGR = re.compile(r"\x1b\[[0-9;]*m") - - -def plain(text: str) -> str: - """Strip SGR color codes (CI forces color on, splitting flags like --lang - with style sequences) for substring assertions.""" - return _ANSI_SGR.sub("", text) - def utterance(start, speaker, text): return SimpleNamespace(start=start, end=None, speaker=speaker, text=text) @@ -125,5 +116,5 @@ def run(args: list[str]) -> subprocess.CompletedProcess[str]: recorded["wav_frames"] = wav.readframes(wav.getnframes()) return subprocess.CompletedProcess(args=args, returncode=0, stdout="", stderr="") - monkeypatch.setattr(dub_exec, "_run_ffmpeg", run) + monkeypatch.setattr(mediafile, "run_ffmpeg", run) return recorded diff --git a/tests/test_clip_command.py b/tests/test_clip_command.py index 9cfafe0c..91529ff2 100644 --- a/tests/test_clip_command.py +++ b/tests/test_clip_command.py @@ -9,7 +9,7 @@ from typer.testing import CliRunner -from aai_cli import clip_exec, llm +from aai_cli import clip_exec, llm, mediafile from aai_cli.clip_exec import ClipOptions from aai_cli.main import app @@ -138,7 +138,7 @@ def fake_run(args: list[str]) -> subprocess.CompletedProcess[str]: calls.append(args) return subprocess.CompletedProcess(args=args, returncode=0, stdout="", stderr="") - monkeypatch.setattr(clip_exec, "_run_ffmpeg", fake_run) + monkeypatch.setattr(mediafile, "run_ffmpeg", fake_run) result = runner.invoke(app, ["clip", str(media), "--range", "1-2", "--json"]) assert result.exit_code == 0, result.output # calls[0] is the silencedetect pass; calls[1] the cut. diff --git a/tests/test_clip_exec.py b/tests/test_clip_exec.py index 2f365071..65be0b9a 100644 --- a/tests/test_clip_exec.py +++ b/tests/test_clip_exec.py @@ -2,7 +2,7 @@ validation, ffmpeg orchestration, and transcript-backed --speaker/--search selection. Constructed-options tests (dataclasses.replace off the shared defaults) avoid any argv round-trip; the ffmpeg boundary is faked at -`clip_exec._run_ffmpeg`. The pure selection logic is covered in +`mediafile.run_ffmpeg`. The pure selection logic is covered in test_clip_select.py; YouTube/stdin/LLM sources in test_clip_sources.py.""" from __future__ import annotations @@ -16,7 +16,7 @@ import pytest -from aai_cli import clip_exec, config +from aai_cli import client, clip_exec, config, mediafile from aai_cli.clip_select import Segment from aai_cli.context import AppState from aai_cli.errors import CLIError, UsageError @@ -72,7 +72,8 @@ def test_run_clip_rejects_missing_file(tmp_path): assert exc.value.error_type == "file_not_found" assert exc.value.exit_code == 2 assert "File not found" in exc.value.message - assert "local audio/video file" in (exc.value.suggestion or "") + # The command name pins the shared helper's parameterization. + assert "assembly clip needs a local audio/video file" in (exc.value.suggestion or "") def test_run_clip_rejects_directory(tmp_path): @@ -126,7 +127,8 @@ def test_run_clip_requires_ffmpeg(media, monkeypatch): with pytest.raises(CLIError) as exc: clip_exec.run_clip(opts, AppState(), json_mode=False) assert exc.value.error_type == "missing_dependency" - assert "ffmpeg is required" in exc.value.message + # The purpose string pins the shared helper's parameterization. + assert "ffmpeg is required to cut media" in exc.value.message assert "Install it" in (exc.value.suggestion or "") @@ -257,7 +259,7 @@ def run(args: list[str]) -> subprocess.CompletedProcess[str]: ) return subprocess.CompletedProcess(args=args, returncode=0, stdout="", stderr="") - monkeypatch.setattr(clip_exec, "_run_ffmpeg", run) + monkeypatch.setattr(mediafile, "run_ffmpeg", run) opts = dataclasses.replace(DEFAULTS, media=str(media), ranges=["5-12.5"]) clip_exec.run_clip(opts, AppState(), json_mode=True) clips = json.loads(capsys.readouterr().out)["clips"] @@ -284,7 +286,7 @@ def fail(args: list[str]) -> subprocess.CompletedProcess[str]: args=args, returncode=1, stdout="", stderr="noise\nInvalid data found\n" ) - monkeypatch.setattr(clip_exec, "_run_ffmpeg", fail) + monkeypatch.setattr(mediafile, "run_ffmpeg", fail) opts = dataclasses.replace(DEFAULTS, media=str(media), ranges=["1-2"]) with pytest.raises(CLIError) as exc: clip_exec.run_clip(opts, AppState(), json_mode=False) @@ -299,8 +301,8 @@ def fail(args: list[str]) -> subprocess.CompletedProcess[str]: def test_run_clip_reports_exit_code_when_ffmpeg_is_silent(media, monkeypatch): monkeypatch.setattr("shutil.which", lambda name: "/usr/bin/ffmpeg") monkeypatch.setattr( - clip_exec, - "_run_ffmpeg", + mediafile, + "run_ffmpeg", lambda args: subprocess.CompletedProcess(args=args, returncode=3, stdout="", stderr=""), ) opts = dataclasses.replace(DEFAULTS, media=str(media), ranges=["1-2"]) @@ -311,8 +313,8 @@ def test_run_clip_reports_exit_code_when_ffmpeg_is_silent(media, monkeypatch): def test_run_ffmpeg_captures_output_and_does_not_raise(): # The real boundary (not the fake): output is captured as text and a non-zero - # exit must not raise — _cut_clip turns the exit code into a CLIError itself. - result = clip_exec._run_ffmpeg( + # exit must not raise — the callers turn the exit code into a CLIError. + result = mediafile.run_ffmpeg( [ sys.executable, "-c", @@ -337,7 +339,7 @@ def fake_transcribe(api_key, audio, *, config): seen["config"] = config return fake_transcript(list(UTTERANCES)) - monkeypatch.setattr(clip_exec.client, "transcribe", fake_transcribe) + monkeypatch.setattr(client, "transcribe", fake_transcribe) opts = dataclasses.replace(DEFAULTS, media=str(media), speakers=["a"]) clip_exec.run_clip(opts, AppState(), json_mode=True) assert seen["api_key"] == "sk_test" @@ -359,9 +361,9 @@ def fake_get(api_key, transcript_id): seen["args"] = (api_key, transcript_id) return fake_transcript(list(UTTERANCES)) - monkeypatch.setattr(clip_exec.client, "get_transcript", fake_get) + monkeypatch.setattr(client, "get_transcript", fake_get) monkeypatch.setattr( - clip_exec.client, + client, "transcribe", lambda *a, **k: pytest.fail("must not re-transcribe when -t is given"), ) @@ -377,7 +379,7 @@ def test_run_clip_merges_transcript_matches_with_explicit_ranges( ): config.set_api_key("default", "sk_test") utterances = [utterance(5000, 8000, "A", "hello")] - monkeypatch.setattr(clip_exec.client, "transcribe", lambda *a, **k: fake_transcript(utterances)) + monkeypatch.setattr(client, "transcribe", lambda *a, **k: fake_transcript(utterances)) opts = dataclasses.replace(DEFAULTS, media=str(media), speakers=["A"], ranges=["7-12"]) clip_exec.run_clip(opts, AppState(), json_mode=True) clips = json.loads(capsys.readouterr().out)["clips"] @@ -386,7 +388,7 @@ def test_run_clip_merges_transcript_matches_with_explicit_ranges( def test_run_clip_errors_when_transcript_has_no_utterances(media, fake_ffmpeg, monkeypatch): config.set_api_key("default", "sk_test") - monkeypatch.setattr(clip_exec.client, "transcribe", lambda *a, **k: fake_transcript(None)) + monkeypatch.setattr(client, "transcribe", lambda *a, **k: fake_transcript(None)) opts = dataclasses.replace(DEFAULTS, media=str(media), speakers=["A"]) with pytest.raises(CLIError) as exc: clip_exec.run_clip(opts, AppState(), json_mode=False) @@ -398,9 +400,7 @@ def test_run_clip_errors_when_transcript_has_no_utterances(media, fake_ffmpeg, m def test_run_clip_errors_when_nothing_matches(media, fake_ffmpeg, monkeypatch): config.set_api_key("default", "sk_test") - monkeypatch.setattr( - clip_exec.client, "transcribe", lambda *a, **k: fake_transcript(list(UTTERANCES)) - ) + monkeypatch.setattr(client, "transcribe", lambda *a, **k: fake_transcript(list(UTTERANCES))) opts = dataclasses.replace(DEFAULTS, media=str(media), speakers=["Z"]) with pytest.raises(CLIError) as exc: clip_exec.run_clip(opts, AppState(), json_mode=False) @@ -411,9 +411,7 @@ def test_run_clip_errors_when_nothing_matches(media, fake_ffmpeg, monkeypatch): def test_run_clip_status_messages(media, fake_ffmpeg, monkeypatch): config.set_api_key("default", "sk_test") - monkeypatch.setattr( - clip_exec.client, "transcribe", lambda *a, **k: fake_transcript(list(UTTERANCES)) - ) + monkeypatch.setattr(client, "transcribe", lambda *a, **k: fake_transcript(list(UTTERANCES))) messages: list[str] = [] @contextlib.contextmanager diff --git a/tests/test_clip_sources.py b/tests/test_clip_sources.py index 9db83d01..e6edeb4b 100644 --- a/tests/test_clip_sources.py +++ b/tests/test_clip_sources.py @@ -11,7 +11,7 @@ import pytest -from aai_cli import clip_exec, clip_select, config +from aai_cli import client, clip_exec, clip_select, config from aai_cli.context import AppState from aai_cli.errors import CLIError, UsageError from tests._clip_helpers import DEFAULTS, UTTERANCES, fake_transcript, record_ffmpeg @@ -87,7 +87,7 @@ def fake_transcribe(api_key, audio, *, config): seen["audio"] = audio return fake_transcript(list(UTTERANCES)) - monkeypatch.setattr(clip_exec.client, "transcribe", fake_transcribe) + monkeypatch.setattr(client, "transcribe", fake_transcribe) monkeypatch.setattr( clip_exec.llm, "transform_transcript", lambda *a, **k: '[{"start": 1, "end": 2}]' ) @@ -134,7 +134,7 @@ def test_run_clip_reads_transcript_json_from_stdin(media, fake_ffmpeg, capsys, m # No API key configured and no client call: the piped JSON is the transcript. monkeypatch.setattr(clip_exec.stdio, "piped_stdin_text", _piped_payload) monkeypatch.setattr( - clip_exec.client, + client, "get_transcript", lambda *a: pytest.fail("must not fetch when JSON is piped"), ) @@ -154,7 +154,7 @@ def fake_get(api_key, transcript_id): seen["args"] = (api_key, transcript_id) return fake_transcript(list(UTTERANCES)) - monkeypatch.setattr(clip_exec.client, "get_transcript", fake_get) + monkeypatch.setattr(client, "get_transcript", fake_get) opts = dataclasses.replace(DEFAULTS, media=str(media), transcript_id="-", speakers=["B"]) clip_exec.run_clip(opts, AppState(), json_mode=True) assert seen["args"] == ("sk_test", "tr_999") @@ -182,9 +182,7 @@ def test_run_clip_stdin_transcript_rejects_bad_json(media, fake_ffmpeg, monkeypa def test_run_clip_llm_selection_drives_the_cut(media, fake_ffmpeg, capsys, monkeypatch): config.set_api_key("default", "sk_test") - monkeypatch.setattr( - clip_exec.client, "transcribe", lambda *a, **k: fake_transcript(list(UTTERANCES)) - ) + monkeypatch.setattr(client, "transcribe", lambda *a, **k: fake_transcript(list(UTTERANCES))) seen = {} def fake_transform(api_key, *, prompt, transcript_text, model, max_tokens): @@ -222,9 +220,7 @@ def fake_transform(api_key, *, prompt, transcript_text, model, max_tokens): def test_run_clip_llm_composes_with_speaker_filter(media, fake_ffmpeg, capsys, monkeypatch): # --speaker narrows the utterances first; the LLM only sees what survived. config.set_api_key("default", "sk_test") - monkeypatch.setattr( - clip_exec.client, "transcribe", lambda *a, **k: fake_transcript(list(UTTERANCES)) - ) + monkeypatch.setattr(client, "transcribe", lambda *a, **k: fake_transcript(list(UTTERANCES))) seen = {} def fake_transform(api_key, *, prompt, transcript_text, model, max_tokens): @@ -243,9 +239,7 @@ def fake_transform(api_key, *, prompt, transcript_text, model, max_tokens): def test_run_clip_llm_works_with_transcript_id(media, fake_ffmpeg, capsys, monkeypatch): # -t with --llm alone is a valid selection (no --speaker/--search needed). config.set_api_key("default", "sk_test") - monkeypatch.setattr( - clip_exec.client, "get_transcript", lambda *a: fake_transcript(list(UTTERANCES)) - ) + monkeypatch.setattr(client, "get_transcript", lambda *a: fake_transcript(list(UTTERANCES))) monkeypatch.setattr( clip_exec.llm, "transform_transcript", @@ -259,9 +253,7 @@ def test_run_clip_llm_works_with_transcript_id(media, fake_ffmpeg, capsys, monke def test_run_clip_llm_parse_error_surfaces(media, fake_ffmpeg, monkeypatch): config.set_api_key("default", "sk_test") - monkeypatch.setattr( - clip_exec.client, "transcribe", lambda *a, **k: fake_transcript(list(UTTERANCES)) - ) + monkeypatch.setattr(client, "transcribe", lambda *a, **k: fake_transcript(list(UTTERANCES))) monkeypatch.setattr(clip_exec.llm, "transform_transcript", lambda *a, **k: "no json, sorry") opts = dataclasses.replace(DEFAULTS, media=str(media), llm_prompt="x") with pytest.raises(CLIError) as exc: @@ -271,9 +263,7 @@ def test_run_clip_llm_parse_error_surfaces(media, fake_ffmpeg, monkeypatch): def test_run_clip_llm_status_message_names_the_model(media, fake_ffmpeg, monkeypatch): config.set_api_key("default", "sk_test") - monkeypatch.setattr( - clip_exec.client, "transcribe", lambda *a, **k: fake_transcript(list(UTTERANCES)) - ) + monkeypatch.setattr(client, "transcribe", lambda *a, **k: fake_transcript(list(UTTERANCES))) monkeypatch.setattr( clip_exec.llm, "transform_transcript", lambda *a, **k: '[{"start": 1, "end": 2}]' ) diff --git a/tests/test_dub_command.py b/tests/test_dub_command.py index b30cb784..7b826f27 100644 --- a/tests/test_dub_command.py +++ b/tests/test_dub_command.py @@ -12,7 +12,7 @@ from aai_cli import dub_exec, llm from aai_cli.main import app -from tests._dub_helpers import plain +from tests._clip_helpers import plain runner = CliRunner() diff --git a/tests/test_dub_exec.py b/tests/test_dub_exec.py index 99cf99fc..1a0b54e8 100644 --- a/tests/test_dub_exec.py +++ b/tests/test_dub_exec.py @@ -8,7 +8,6 @@ from __future__ import annotations import dataclasses -import sys from pathlib import Path from types import SimpleNamespace @@ -140,7 +139,7 @@ def test_utterances_of_defaults_and_filtering(): utterance(4000, "C", " Bye "), ] ) - assert dub_exec._utterances_of(transcript) == [ + assert dub_exec._utterances_of(transcript, "tr_dub") == [ dub_exec._Utterance(start_ms=0, speaker="A", text="Hi"), dub_exec._Utterance(start_ms=4000, speaker="C", text="Bye"), ] @@ -153,7 +152,7 @@ def test_utterances_of_defaults_and_filtering(): ) def test_utterances_of_requires_spoken_utterances(utterances): with pytest.raises(CLIError) as exc: - dub_exec._utterances_of(SimpleNamespace(id="tr_x", utterances=utterances)) + dub_exec._utterances_of(SimpleNamespace(utterances=utterances), "tr_x") assert exc.value.error_type == "no_utterances" assert exc.value.exit_code == 2 assert "Transcript tr_x has no utterances to dub" in exc.value.message @@ -170,21 +169,6 @@ def test_total_seconds(duration, expected): assert dub_exec._total_seconds(transcript) == expected -def test_run_ffmpeg_captures_output_and_does_not_raise(): - # The real boundary (not the fake): output is captured as text and a non-zero - # exit must not raise — _mux turns the exit code into a CLIError itself. - result = dub_exec._run_ffmpeg( - [ - sys.executable, - "-c", - "import sys; print('out'); print('err', file=sys.stderr); sys.exit(3)", - ] - ) - assert result.returncode == 3 - assert result.stdout == "out\n" - assert result.stderr == "err\n" - - # --- validation order (cheap local checks before any credential or network) ---- @@ -210,7 +194,8 @@ def test_run_dub_rejects_missing_file(sandbox, tmp_path): dub_exec.run_dub(opts, AppState(), json_mode=False) assert exc.value.error_type == "file_not_found" assert exc.value.exit_code == 2 - assert "local audio/video file" in (exc.value.suggestion or "") + # The command name pins the shared helper's parameterization. + assert "assembly dub needs a local audio/video file" in (exc.value.suggestion or "") def test_run_dub_rejects_directory(sandbox, tmp_path): @@ -235,4 +220,5 @@ def test_run_dub_requires_ffmpeg(sandbox, media, monkeypatch): with pytest.raises(CLIError) as exc: dub_exec.run_dub(opts, AppState(), json_mode=False) assert exc.value.error_type == "missing_dependency" - assert "ffmpeg" in exc.value.message + # The purpose string pins the shared helper's parameterization. + assert "ffmpeg is required to write the dubbed file" in exc.value.message diff --git a/tests/test_dub_pipeline.py b/tests/test_dub_pipeline.py index 78990d0b..eac961b8 100644 --- a/tests/test_dub_pipeline.py +++ b/tests/test_dub_pipeline.py @@ -2,11 +2,12 @@ the transcribe → translate → synthesize → ffmpeg mux orchestration, voice assignment, and the failure modes of each boundary. The LLM Gateway, streaming TTS, and ffmpeg are faked at the modules dub_exec calls into (`llm.complete`, -`session.synthesize`, `client.transcribe`) and at `dub_exec._run_ffmpeg`; the +`session.synthesize`, `client.transcribe`) and at `mediafile.run_ffmpeg`; the pure helpers and validation order live in test_dub_exec.py.""" from __future__ import annotations +import contextlib import dataclasses import json import subprocess @@ -15,11 +16,12 @@ import pytest -from aai_cli import client, dub_exec, llm +from aai_cli import client, dub_exec, llm, mediafile from aai_cli.context import AppState from aai_cli.errors import APIError, CLIError from aai_cli.tts import session from aai_cli.tts.session import SpeakResult +from tests._clip_helpers import plain from tests._dub_helpers import ( DEFAULTS, SAMPLE_RATE, @@ -27,7 +29,6 @@ enable_sandbox, fake_transcript, patch_api_key, - plain, record_ffmpeg, record_synthesize, record_transcribe, @@ -160,6 +161,26 @@ def test_run_dub_human_summary( assert "A=jane, B=michael" in out +def test_run_dub_status_messages( + media, fake_transcribe, fake_translate, fake_synthesize, fake_ffmpeg, monkeypatch +): + messages: list[str] = [] + + @contextlib.contextmanager + def fake_status(message, *, json_mode, quiet): + messages.append(message) + yield + + monkeypatch.setattr(dub_exec.output, "status", fake_status) + _run(dataclasses.replace(DEFAULTS, media=str(media)), json_mode=False) + assert messages == [ + "Transcribing for dubbing…", + f"Translating 2 utterance(s) to German with {llm.DEFAULT_MODEL}…", + "Synthesizing 2 segment(s)…", + "Writing the dubbed file…", + ] + + def test_bare_voice_dubs_every_speaker( media, fake_transcribe, fake_translate, fake_synthesize, fake_ffmpeg ): @@ -249,8 +270,8 @@ def test_ffmpeg_failure_reports_last_stderr_line( ): monkeypatch.setattr("shutil.which", lambda name: "/usr/bin/ffmpeg") monkeypatch.setattr( - dub_exec, - "_run_ffmpeg", + mediafile, + "run_ffmpeg", lambda args: subprocess.CompletedProcess( args=args, returncode=1, stdout="", stderr="noise\nInvalid data found\n" ), @@ -271,8 +292,8 @@ def test_ffmpeg_silent_failure_reports_exit_code( ): monkeypatch.setattr("shutil.which", lambda name: "/usr/bin/ffmpeg") monkeypatch.setattr( - dub_exec, - "_run_ffmpeg", + mediafile, + "run_ffmpeg", lambda args: subprocess.CompletedProcess(args=args, returncode=3, stdout="", stderr=""), ) opts = dataclasses.replace(DEFAULTS, media=str(media)) From 300d4092ba18f67b6af846be183c7bf3f689d851 Mon Sep 17 00:00:00 2001 From: Claude Date: Fri, 12 Jun 2026 23:43:38 +0000 Subject: [PATCH 2/2] Fix the assembly dub correctness bugs found in review MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit All confirmed findings from the dub (#135) code review: - Self-overwrite guard now also catches the same file under another spelling (samefile when --out exists): on case-insensitive filesystems (macOS APFS) `--out TALK.MP4` against talk.mp4 passed the path comparison and ffmpeg corrupted the input. - Fresh transcriptions auto-detect the source language (dub input is typically not English, which is the API default); a new --source-lang flag pins it instead. - --out viability is validated before the billed pipeline: existing directory, missing parent directory, and missing file extension (ffmpeg picks the container from it) now fail upfront, and a language that slugs to nothing (e.g. 中文) asks for an explicit --out instead of colliding every such dub onto ".dub..". - --voice is parsed before any billed work, and SPEAKER=VOICE pins for speakers absent from the diarized transcript warn instead of being dropped silently (mirrors assembly speak). - A --transcript-id that is queued/processing/errored is rejected with the real reason (shared resolve_diarized_transcript, so clip gets the same fix) instead of a misleading "no utterances" error. - Translations truncated at max_tokens (finish_reason length/max_tokens) raise instead of dubbing speech that stops mid-sentence. - The success line escapes user-controlled --lang/--voice text (an embedded "[/]" crashed with MarkupError after the dub succeeded). - URLs are rejected with the URL echoed intact (Path() collapsed "s3://…" to "s3:/…") and a download hint. - ffmpeg output paths starting with "-" are passed as "./-…" so they can't be parsed as ffmpeg options (clip's cut destinations too). https://claude.ai/code/session_018TuAQTvp9PVy5EdhsDWo2h --- aai_cli/clip_exec.py | 2 +- aai_cli/commands/dub.py | 6 + aai_cli/dub_exec.py | 82 +++++++++++-- aai_cli/mediafile.py | 50 +++++++- .../test_snapshots_help_run.ambr | 2 + tests/_dub_helpers.py | 9 +- tests/test_clip_exec.py | 11 ++ tests/test_dub_command.py | 4 + tests/test_dub_exec.py | 64 +++++++++++ tests/test_dub_pipeline.py | 108 +++++++++++++++++- 10 files changed, 318 insertions(+), 20 deletions(-) diff --git a/aai_cli/clip_exec.py b/aai_cli/clip_exec.py index d86e1208..b44130a2 100644 --- a/aai_cli/clip_exec.py +++ b/aai_cli/clip_exec.py @@ -265,7 +265,7 @@ def _cut_clip(ffmpeg: str, media: Path, segment: Segment, dest: Path) -> None: f"{segment.start:.3f}", "-to", f"{segment.end:.3f}", - str(dest), + mediafile.path_arg(dest), ] ) if result.returncode != 0: diff --git a/aai_cli/commands/dub.py b/aai_cli/commands/dub.py index 1d3dc552..99604207 100644 --- a/aai_cli/commands/dub.py +++ b/aai_cli/commands/dub.py @@ -49,6 +49,11 @@ def dub( "-l", help="Target language: an ISO code (de, fr, es, …) or a language name (German).", ), + source_lang: str | None = typer.Option( + None, + "--source-lang", + help="ISO code of the source audio (e.g. de). Default: auto-detect the language.", + ), transcript_id: str | None = typer.Option( None, "--transcript-id", @@ -93,6 +98,7 @@ def dub( opts = dub_exec.DubOptions( media=media, language=lang, + source_language=source_lang, transcript_id=transcript_id, voice=voice, model=model, diff --git a/aai_cli/dub_exec.py b/aai_cli/dub_exec.py index 413d265a..27895700 100644 --- a/aai_cli/dub_exec.py +++ b/aai_cli/dub_exec.py @@ -72,6 +72,7 @@ class DubOptions: media: str language: str + source_language: str | None transcript_id: str | None voice: list[str] model: str @@ -94,6 +95,13 @@ def resolve_language(value: str) -> str: def default_out_path(media: Path, language: str) -> Path: """The default output file: ``.dub.`` next to the input.""" slug = re.sub(r"[^a-z0-9]+", "-", language.casefold()).strip("-") + if not slug: + # A name that slugs to nothing (e.g. 中文) would collide every such + # language onto one ".dub." file; make the user pick. + raise UsageError( + f"Can't derive a default output name for {language!r}.", + suggestion="Pass --out explicitly, e.g. --out dubbed.mp4.", + ) return media.parent / f"{media.stem}.dub.{slug}{media.suffix}" @@ -128,13 +136,33 @@ def _pcm_seconds(pcm: bytes | bytearray, sample_rate: int) -> float: def _validate_out(out: Path, media: Path) -> None: - """The dub must never overwrite its own input: ffmpeg would read and write the - same file concurrently, corrupting it.""" - if out.resolve() == media.resolve(): + """An unwritable or self-overwriting --out must fail here, before the billed + transcription/translation/synthesis pipeline runs. + + The samefile check catches what path comparison can't: on case-insensitive + filesystems (macOS APFS) ``--out TALK.MP4`` against ``talk.mp4`` — or a hard + link — is the same file under a different spelling, and ffmpeg would read + and write it concurrently, corrupting the input.""" + if out.resolve() == media.resolve() or (out.exists() and out.samefile(media)): raise UsageError( "--out would overwrite the input file.", suggestion="Pick a different output path.", ) + if out.is_dir(): + raise UsageError( + f"--out is a directory: {out}", + suggestion="Point --out at a file path, e.g. --out dubbed.mp4.", + ) + if not out.parent.is_dir(): + raise UsageError( + f"The output directory doesn't exist: {out.parent}", + suggestion="Create it first, or point --out somewhere that exists.", + ) + if not out.suffix: + raise UsageError( + f"The output file {out.name!r} has no extension.", + suggestion="ffmpeg picks the container from the extension; pass e.g. --out dubbed.mp4.", + ) def _mux(ffmpeg: str, media: Path, track: Path, out: Path) -> None: @@ -162,7 +190,7 @@ def _mux(ffmpeg: str, media: Path, track: Path, out: Path) -> None: "1:a", "-c:v", "copy", - str(out), + mediafile.path_arg(out), ] ) if result.returncode != 0: @@ -234,6 +262,14 @@ def _translate( api_key, model=opts.model, messages=messages, max_tokens=opts.max_tokens ) translated = gateway.content_of(response).strip() + # "length" is OpenAI's truncation marker; the gateway's Anthropic-flavored + # responses use "max_tokens". A clipped translation must never be dubbed. + if getattr(response.choices[0], "finish_reason", None) in {"length", "max_tokens"}: + raise APIError( + f"The translation of utterance {index} was cut off at --max-tokens " + f"({opts.max_tokens}).", + suggestion="Re-run with a higher --max-tokens.", + ) if not translated: raise APIError( f"The model returned an empty translation for utterance {index} " @@ -273,10 +309,26 @@ def _synthesize( return [result.pcm for result in results], results[0].sample_rate +def _warn_ignored_voice_pins( + overrides: dict[str, str], speakers: dict[str, str], *, json_mode: bool +) -> None: + """Mirror `assembly speak`: a requested --voice mapping is never dropped + silently, so a pin for a speaker the diarization didn't produce is called out.""" + present = {speaker.casefold() for speaker in speakers} + ignored = [speaker for speaker in overrides if speaker not in present] + if ignored: + output.emit_warning( + "Ignoring --voice mapping(s) for speaker(s) not in the transcript: " + f"{', '.join(ignored)}.", + json_mode=json_mode, + ) + + def _assign_voices( utterances: list[_Utterance], translations: list[str], - voice_values: list[str], + bare_voice: str | None, + overrides: dict[str, str], ) -> tuple[list[tuple[str, str]], dict[str, str]]: """Resolve each translated utterance to ``(voice, text)`` plus the speaker→voice map. @@ -284,7 +336,6 @@ def _assign_voices( mappings pin individual speakers; everyone else takes the rotation in first-appearance order (the same rules as `assembly speak`). """ - bare_voice, overrides = dialogue.parse_voice_overrides(voice_values) rotation = (bare_voice,) if bare_voice is not None else dialogue.DEFAULT_VOICE_ROTATION segments = [ dialogue.Segment(utterance.speaker, translated) @@ -299,6 +350,14 @@ def run_dub(opts: DubOptions, state: AppState, *, json_mode: bool) -> None: """Execute one `assembly dub` invocation from already-parsed flags.""" language = resolve_language(opts.language) session.require_available("dub") + # Parse --voice now: a malformed mapping must fail before the billed pipeline. + bare_voice, overrides = dialogue.parse_voice_overrides(opts.voice) + if "://" in opts.media: + # Path() would collapse the "//" and report a corrupted echo of the URL. + raise UsageError( + f"assembly dub needs a local file, not a URL: {opts.media}", + suggestion="Download the media first, then dub the local copy.", + ) media = Path(opts.media) mediafile.validate_local_media(media, "dub") out = opts.out if opts.out is not None else default_out_path(media, language) @@ -313,13 +372,18 @@ def run_dub(opts: DubOptions, state: AppState, *, json_mode: bool) -> None: status_message="Transcribing for dubbing…", json_mode=json_mode, quiet=state.quiet, + language_code=opts.source_language, + # Dub input is typically not English (the API default), so a fresh + # transcription auto-detects the source language unless --source-lang pins it. + detect_language=opts.source_language is None, ) transcript_id = str(getattr(transcript, "id", "")) utterances = _utterances_of(transcript, transcript_id) translations = _translate( api_key, utterances, language, opts, json_mode=json_mode, quiet=state.quiet ) - resolved, speakers = _assign_voices(utterances, translations, opts.voice) + resolved, speakers = _assign_voices(utterances, translations, bare_voice, overrides) + _warn_ignored_voice_pins(overrides, speakers, json_mode=json_mode) pcm_segments, sample_rate = _synthesize( api_key, resolved, language, json_mode=json_mode, quiet=state.quiet ) @@ -350,8 +414,10 @@ def run_dub(opts: DubOptions, state: AppState, *, json_mode: bool) -> None: } output.emit( payload, + # language and voices carry user-typed text, so they need escaping too. lambda _: output.success( - f"{escape(str(out))} dubbed to {language} ({len(utterances)} utterances, {voices})" + f"{escape(str(out))} dubbed to {escape(language)} " + f"({len(utterances)} utterances, {escape(voices)})" ), json_mode=json_mode, ) diff --git a/aai_cli/mediafile.py b/aai_cli/mediafile.py index 8fb3713f..32783025 100644 --- a/aai_cli/mediafile.py +++ b/aai_cli/mediafile.py @@ -16,7 +16,7 @@ import assemblyai as aai from aai_cli import client, output -from aai_cli.errors import CLIError +from aai_cli.errors import APIError, CLIError def validate_local_media(media: Path, command: str) -> None: @@ -56,6 +56,13 @@ def run_ffmpeg(args: list[str]) -> subprocess.CompletedProcess[str]: return subprocess.run(args, capture_output=True, text=True, check=False) +def path_arg(path: Path) -> str: + """``path`` as an ffmpeg argv token: a leading '-' is disambiguated with + ``./`` so a filename like ``-out.mp4`` can't be parsed as an option.""" + text = str(path) + return f"./{text}" if text.startswith("-") else text + + def ffmpeg_failure( result: subprocess.CompletedProcess[str], action: str, @@ -82,12 +89,43 @@ def resolve_diarized_transcript( status_message: str, json_mode: bool, quiet: bool, + language_code: str | None = None, + detect_language: bool = False, ) -> object: - """The diarized transcript driving the command: fetched by id, or made fresh - from the (already local) media file — always with speaker labels, so the - caller can select or voice content per speaker.""" + """The diarized transcript driving the command: fetched by id (and verified + usable), or made fresh from the (already local) media file — always with + speaker labels, so the caller can select or voice content per speaker.""" if transcript_id is not None: - return client.get_transcript(api_key, transcript_id) - config = aai.TranscriptionConfig(speaker_labels=True) + return _fetched_transcript(api_key, transcript_id) + config = aai.TranscriptionConfig( + speaker_labels=True, + language_code=language_code, + language_detection=detect_language or None, + ) with output.status(status_message, json_mode=json_mode, quiet=quiet): return client.transcribe(api_key, str(media), config=config) + + +def _fetched_transcript(api_key: str, transcript_id: str) -> object: + """A --transcript-id transcript, rejected unless it finished successfully — + a queued/processing/errored one would otherwise surface much later as a + misleading 'no utterances' failure.""" + transcript = client.get_transcript(api_key, transcript_id) + raw_status = getattr(transcript, "status", None) + status = str(getattr(raw_status, "value", raw_status) or "") + if status == "error": + raise APIError( + getattr(transcript, "error", None) or "Transcript failed.", + transcript_id=transcript_id, + ) + if status in {"queued", "processing"}: + raise CLIError( + f"Transcript {transcript_id} is still {status}.", + error_type="transcript_not_ready", + exit_code=2, + suggestion=( + f"Wait for it to finish (assembly transcripts get {transcript_id}), " + "or drop -t to transcribe the file fresh." + ), + ) + return transcript diff --git a/tests/__snapshots__/test_snapshots_help_run.ambr b/tests/__snapshots__/test_snapshots_help_run.ambr index b4476fdd..7eb60dd6 100644 --- a/tests/__snapshots__/test_snapshots_help_run.ambr +++ b/tests/__snapshots__/test_snapshots_help_run.ambr @@ -238,6 +238,8 @@ │ * --lang -l TEXT Target language: an ISO code (de, fr, es, │ │ …) or a language name (German). │ │ [required] │ + │ --source-lang TEXT ISO code of the source audio (e.g. de). │ + │ Default: auto-detect the language. │ │ --transcript-id -t TEXT Reuse an existing diarized transcript of │ │ this media instead of transcribing it │ │ again. │ diff --git a/tests/_dub_helpers.py b/tests/_dub_helpers.py index 32fdce0a..a247d015 100644 --- a/tests/_dub_helpers.py +++ b/tests/_dub_helpers.py @@ -24,6 +24,7 @@ DEFAULTS = DubOptions( media="talk.mp4", language="de", + source_language=None, transcript_id=None, voice=[], model=llm.DEFAULT_MODEL, @@ -42,9 +43,11 @@ def fake_transcript(utterances, *, audio_duration=5): return SimpleNamespace(id="tr_dub", utterances=utterances, audio_duration=audio_duration) -def completion(text): - """The slice of an OpenAI ChatCompletion that gateway.content_of reads.""" - return SimpleNamespace(choices=[SimpleNamespace(message=SimpleNamespace(content=text))]) +def completion(text, finish_reason=None): + """The slice of an OpenAI ChatCompletion that gateway.content_of and the + dub truncation check read.""" + choice = SimpleNamespace(message=SimpleNamespace(content=text), finish_reason=finish_reason) + return SimpleNamespace(choices=[choice]) def write_media(tmp_path: Path) -> Path: diff --git a/tests/test_clip_exec.py b/tests/test_clip_exec.py index 65be0b9a..efa1ab25 100644 --- a/tests/test_clip_exec.py +++ b/tests/test_clip_exec.py @@ -215,6 +215,15 @@ def test_run_clip_rounds_payload_times_to_milliseconds(media, fake_ffmpeg, capsy } +def test_dash_prefixed_clip_dest_is_disambiguated_for_ffmpeg(tmp_path, fake_ffmpeg, monkeypatch): + # A bare "-x.clip01.mp4" argv token would be parsed by ffmpeg as an option. + monkeypatch.chdir(tmp_path) + (tmp_path / "-x.mp4").write_bytes(b"\x00fake-media") + opts = dataclasses.replace(DEFAULTS, media="-x.mp4", ranges=["1-2"]) + clip_exec.run_clip(opts, AppState(), json_mode=True) + assert fake_ffmpeg[1][-1] == "./-x.clip01.mp4" + + def test_run_clip_honors_out_dir(media, tmp_path, fake_ffmpeg, capsys): out_dir = tmp_path / "clips" out_dir.mkdir() @@ -345,6 +354,8 @@ def fake_transcribe(api_key, audio, *, config): assert seen["api_key"] == "sk_test" assert seen["audio"] == str(media) assert seen["config"].speaker_labels is True + # Clip keeps the API's language defaults (only dub opts into detection). + assert seen["config"].language_detection is None payload = json.loads(capsys.readouterr().out) assert payload["transcript_id"] == "tr_123" # Speaker A's two utterances: 1.5-2.5s and 5-6s. diff --git a/tests/test_dub_command.py b/tests/test_dub_command.py index 7b826f27..3b96b5d8 100644 --- a/tests/test_dub_command.py +++ b/tests/test_dub_command.py @@ -53,6 +53,7 @@ def test_defaults_map_to_options(captured_run): assert captured_run["opts"] == dub_exec.DubOptions( media="talk.mp4", language="de", + source_language=None, transcript_id=None, voice=[], model=llm.DEFAULT_MODEL, @@ -69,6 +70,8 @@ def test_every_flag_maps_to_options(captured_run): "talk.mp4", "--lang", "German", + "--source-lang", + "fr", "-t", "tr_1", "--voice", @@ -89,6 +92,7 @@ def test_every_flag_maps_to_options(captured_run): assert captured_run["opts"] == dub_exec.DubOptions( media="talk.mp4", language="German", + source_language="fr", transcript_id="tr_1", voice=["A=jane", "paul"], model="gpt-5", diff --git a/tests/test_dub_exec.py b/tests/test_dub_exec.py index 1a0b54e8..d78884ea 100644 --- a/tests/test_dub_exec.py +++ b/tests/test_dub_exec.py @@ -8,6 +8,7 @@ from __future__ import annotations import dataclasses +import os from pathlib import Path from types import SimpleNamespace @@ -110,6 +111,15 @@ def test_default_out_path(language, expected): assert out == Path("/x") / expected +def test_default_out_path_rejects_unsluggable_language(): + # 中文 and 日本語 would both slug to "" and collide on "talk.dub..mp4", + # silently overwriting each other via ffmpeg's -y. + with pytest.raises(UsageError) as exc: + dub_exec.default_out_path(Path("/x/talk.mp4"), "中文") + assert "default output name" in exc.value.message + assert "--out" in (exc.value.suggestion or "") + + def test_assemble_timeline_fills_gaps_and_pads_tail(): # rate 1000: one second of 16-bit mono PCM is 2000 bytes. track = dub_exec.assemble_timeline([(500, b"\x01\x02")], 1000, total_seconds=1.0) @@ -214,6 +224,60 @@ def test_run_dub_refuses_to_overwrite_the_input(sandbox, media): assert "overwrite the input file" in exc.value.message +@pytest.mark.parametrize( + "url", ["https://youtube.com/watch?v=x", "s3://bucket/talk.mp4"], ids=["https", "s3"] +) +def test_run_dub_rejects_urls_with_the_url_intact(sandbox, url): + # Path() would collapse "//" and echo a corrupted "s3:/bucket/…" back. + opts = dataclasses.replace(DEFAULTS, media=url) + with pytest.raises(UsageError) as exc: + dub_exec.run_dub(opts, AppState(), json_mode=False) + assert url in exc.value.message + assert "Download the media first" in (exc.value.suggestion or "") + + +def test_run_dub_rejects_malformed_voice_before_any_network(sandbox, media): + # No transcription/ffmpeg fakes installed: pytest-socket would fail loudly + # if the malformed mapping survived to the billed pipeline. + opts = dataclasses.replace(DEFAULTS, media=str(media), voice=["A="]) + with pytest.raises(UsageError) as exc: + dub_exec.run_dub(opts, AppState(), json_mode=False) + assert "Invalid --voice mapping" in exc.value.message + + +def test_validate_out_rejects_the_input_via_hard_link(media): + # Two spellings of one file (mimics --out TALK.MP4 on a case-insensitive + # filesystem): path comparison passes, samefile must still catch it. + clone = media.parent / "TALK.MP4" + os.link(media, clone) + with pytest.raises(UsageError) as exc: + dub_exec._validate_out(clone, media) + assert "overwrite the input file" in exc.value.message + + +def test_validate_out_rejects_a_directory(media, tmp_path): + with pytest.raises(UsageError) as exc: + dub_exec._validate_out(tmp_path, media) + assert "--out is a directory" in exc.value.message + assert "file path" in (exc.value.suggestion or "") + + +def test_validate_out_rejects_a_missing_parent_directory(media, tmp_path): + with pytest.raises(UsageError) as exc: + dub_exec._validate_out(tmp_path / "missing" / "dub.mp4", media) + assert "output directory doesn't exist" in exc.value.message + assert "missing" in exc.value.message + + +def test_validate_out_rejects_an_extensionless_output(media, tmp_path): + # ffmpeg picks the container from the extension, so this would fail only + # after the whole billed pipeline (e.g. an extension-less input's default out). + with pytest.raises(UsageError) as exc: + dub_exec._validate_out(tmp_path / "noext", media) + assert "has no extension" in exc.value.message + assert ".mp4" in (exc.value.suggestion or "") + + def test_run_dub_requires_ffmpeg(sandbox, media, monkeypatch): monkeypatch.setattr("shutil.which", lambda name: None) opts = dataclasses.replace(DEFAULTS, media=str(media)) diff --git a/tests/test_dub_pipeline.py b/tests/test_dub_pipeline.py index eac961b8..22a04958 100644 --- a/tests/test_dub_pipeline.py +++ b/tests/test_dub_pipeline.py @@ -79,9 +79,12 @@ def test_run_dub_pipeline_end_to_end( opts = dataclasses.replace(DEFAULTS, media=str(media)) _run(opts, json_mode=True) - # Transcription: the local file, diarized so speakers keep distinct voices. + # Transcription: the local file, diarized so speakers keep distinct voices, + # source language auto-detected (dub input is typically not English). assert fake_transcribe["audio"] == str(media) assert fake_transcribe["config"].speaker_labels is True + assert fake_transcribe["config"].language_detection is True + assert fake_transcribe["config"].language_code is None # Translation: one gateway call per utterance, in order, with the dubbing # system prompt naming the resolved language ("de" -> "German"). @@ -161,6 +164,30 @@ def test_run_dub_human_summary( assert "A=jane, B=michael" in out +def test_human_summary_escapes_user_controlled_markup( + media, fake_transcribe, fake_translate, fake_synthesize, fake_ffmpeg, capsys +): + # An unescaped "[/]" in --lang/--voice would raise rich.errors.MarkupError — + # after the whole billed pipeline succeeded and the file was written. + opts = dataclasses.replace( + DEFAULTS, media=str(media), language="Ger[/]man", voice=["[/]bad"], out=Path("dub.x.mp4") + ) + _run(opts, json_mode=False) + out = plain(capsys.readouterr().out) + assert "dubbed to Ger[/]man" in out + assert "A=[/]bad" in out + + +def test_dash_prefixed_out_is_disambiguated_for_ffmpeg( + media, fake_transcribe, fake_translate, fake_synthesize, fake_ffmpeg, monkeypatch, tmp_path +): + # A bare "-dub.de.mp4" argv token would be parsed by ffmpeg as an option. + monkeypatch.chdir(tmp_path) + opts = dataclasses.replace(DEFAULTS, media=str(media), out=Path("-dub.de.mp4")) + _run(opts, json_mode=True) + assert fake_ffmpeg["args"][-1] == "./-dub.de.mp4" + + def test_run_dub_status_messages( media, fake_transcribe, fake_translate, fake_synthesize, fake_ffmpeg, monkeypatch ): @@ -190,12 +217,35 @@ def test_bare_voice_dubs_every_speaker( def test_voice_overrides_pin_speakers_without_consuming_rotation( - media, fake_transcribe, fake_translate, fake_synthesize, fake_ffmpeg + media, fake_transcribe, fake_translate, fake_synthesize, fake_ffmpeg, capsys ): opts = dataclasses.replace(DEFAULTS, media=str(media), voice=["A=mary"]) _run(opts, json_mode=True) # A is pinned; B still takes the first rotation voice (overrides don't consume slots). assert [cfg.voice for cfg in fake_synthesize] == ["mary", "jane"] + # Every mapping applied -> no "Ignoring" warning fires. + assert "Ignoring" not in capsys.readouterr().err + + +def test_voice_pin_for_absent_speaker_warns_instead_of_silently_dropping( + media, fake_transcribe, fake_translate, fake_synthesize, fake_ffmpeg, capsys +): + # Mirrors `assembly speak`: a requested --voice mapping is never dropped silently. + opts = dataclasses.replace(DEFAULTS, media=str(media), voice=["Z=paul"]) + _run(opts, json_mode=False) + err = plain(capsys.readouterr().err) + assert "Ignoring --voice mapping(s) for speaker(s) not in the transcript: z." in err + # Human mode warns as prose, not as a {"warning": …} JSON object. + assert not err.lstrip().startswith("{") + + +def test_source_lang_pins_the_transcription_language( + media, fake_transcribe, fake_translate, fake_synthesize, fake_ffmpeg +): + opts = dataclasses.replace(DEFAULTS, media=str(media), source_language="fr") + _run(opts, json_mode=True) + assert fake_transcribe["config"].language_code == "fr" + assert fake_transcribe["config"].language_detection is None def test_transcript_id_reuses_existing_transcript( @@ -234,6 +284,60 @@ def get_transcript(api_key, transcript_id): assert payload["audio_duration_seconds"] == 3.333 +@pytest.mark.parametrize("status", ["queued", "processing"]) +def test_transcript_id_still_in_flight_is_a_clear_error(media, fake_ffmpeg, monkeypatch, status): + # Without the status check this would surface as a misleading "no utterances + # to dub … pass one created with --speaker-labels". + monkeypatch.setattr( + client, + "get_transcript", + lambda *a: SimpleNamespace(id="tr_q", status=status, utterances=None), + ) + opts = dataclasses.replace(DEFAULTS, media=str(media), transcript_id="tr_q") + with pytest.raises(CLIError) as exc: + _run(opts, json_mode=False) + assert exc.value.error_type == "transcript_not_ready" + assert exc.value.exit_code == 2 + assert f"Transcript tr_q is still {status}" in exc.value.message + assert "assembly transcripts get tr_q" in (exc.value.suggestion or "") + + +@pytest.mark.parametrize( + ("stored_error", "expected"), + [("Audio file unreadable", "Audio file unreadable"), (None, "Transcript failed.")], + ids=["with-reason", "without-reason"], +) +def test_transcript_id_with_error_status_surfaces_the_real_error( + media, fake_ffmpeg, monkeypatch, stored_error, expected +): + monkeypatch.setattr( + client, + "get_transcript", + lambda *a: SimpleNamespace(id="tr_e", status="error", error=stored_error, utterances=None), + ) + opts = dataclasses.replace(DEFAULTS, media=str(media), transcript_id="tr_e") + with pytest.raises(APIError) as exc: + _run(opts, json_mode=False) + assert expected in exc.value.message + + +@pytest.mark.parametrize("finish", ["length", "max_tokens"], ids=["openai", "anthropic"]) +def test_truncated_translation_is_an_api_error( + media, fake_transcribe, fake_synthesize, fake_ffmpeg, monkeypatch, finish +): + # A reply clipped by max_tokens is non-empty but incomplete; dubbing it would + # produce speech that stops mid-sentence with exit 0. + monkeypatch.setattr( + llm, "complete", lambda *a, **k: completion("Hallo, aber abgeschn", finish_reason=finish) + ) + opts = dataclasses.replace(DEFAULTS, media=str(media)) + with pytest.raises(APIError) as exc: + _run(opts, json_mode=False) + assert "utterance 1" in exc.value.message + assert f"cut off at --max-tokens ({llm.DEFAULT_MAX_TOKENS})" in exc.value.message + assert "higher --max-tokens" in (exc.value.suggestion or "") + + def test_empty_translation_is_an_api_error(media, fake_synthesize, fake_ffmpeg, monkeypatch): long_text = "a" * 50 + "TAIL!" transcript = fake_transcript([utterance(0, "A", "Hello."), utterance(1000, "B", long_text)])