diff --git a/.importlinter b/.importlinter index 4df29cae..3f12f8d0 100644 --- a/.importlinter +++ b/.importlinter @@ -36,6 +36,7 @@ source_modules = aai_cli.init_exec aai_cli.llm aai_cli.llm_exec + aai_cli.mediafile aai_cli.microphone aai_cli.options aai_cli.output diff --git a/aai_cli/caption_exec.py b/aai_cli/caption_exec.py index e5986650..cbe25442 100644 --- a/aai_cli/caption_exec.py +++ b/aai_cli/caption_exec.py @@ -15,8 +15,6 @@ from __future__ import annotations -import shutil -import subprocess import tempfile from dataclasses import dataclass from pathlib import Path @@ -24,7 +22,7 @@ import assemblyai as aai from rich.markup import escape -from aai_cli import client, output, youtube +from aai_cli import client, mediafile, output, youtube from aai_cli.context import AppState from aai_cli.errors import CLIError, UsageError @@ -62,52 +60,6 @@ def subtitles_filter(srt: Path, font_size: int | None) -> str: return spec -def _validate_media(media: Path) -> None: - """Reject a missing local source before credential resolution, so a typo'd - path reads as "file not found", never as a login prompt or an ffmpeg error.""" - if not media.exists(): - raise CLIError( - f"File not found: {media}", - error_type="file_not_found", - exit_code=2, - suggestion="Check the path. assembly caption needs a local video file.", - ) - if not media.is_file(): - raise CLIError( - f"Not a file: {media}", - error_type="not_a_file", - exit_code=2, - suggestion="Pass a video file, not a directory.", - ) - - -def _validate_out(out: Path, media: Path) -> None: - """The captioned file must never overwrite its own input: ffmpeg would read - and write the same file concurrently, corrupting it.""" - if out.resolve() == media.resolve(): - raise UsageError( - "--out would overwrite the input file.", - suggestion="Pick a different output path.", - ) - - -def _require_ffmpeg() -> str: - """The ffmpeg executable; checked before any (billed) transcription work.""" - path = shutil.which("ffmpeg") - if path is None: - raise CLIError( - "ffmpeg is required to burn captions into video, but it isn't on PATH.", - error_type="missing_dependency", - suggestion="Install it (brew install ffmpeg / apt install ffmpeg) and re-run.", - ) - return path - - -def _run_ffmpeg(args: list[str]) -> subprocess.CompletedProcess[str]: - """Boundary seam for tests: one ffmpeg invocation, output captured.""" - return subprocess.run(args, capture_output=True, text=True, check=False) - - def _burn(ffmpeg: str, media: Path, srt: Path, out: Path, font_size: int | None) -> None: """Burn the ``srt`` captions into ``media``'s video stream, writing ``out``. @@ -118,7 +70,7 @@ def _burn(ffmpeg: str, media: Path, srt: Path, out: Path, font_size: int | None) re-run overwrite its own earlier output instead of stalling on ffmpeg's prompt. """ - result = _run_ffmpeg( + result = mediafile.run_ffmpeg( [ ffmpeg, "-hide_banner", @@ -135,32 +87,20 @@ def _burn(ffmpeg: str, media: Path, srt: Path, out: Path, font_size: int | None) "0:a?", "-c:a", "copy", - str(out), + mediafile.path_arg(out), ] ) if result.returncode != 0: - detail = result.stderr.strip().splitlines() - reason = detail[-1] if detail else f"ffmpeg exited with code {result.returncode}" - raise CLIError( - f"Could not write {out.name}: {reason}", + raise mediafile.ffmpeg_failure( + result, + "write", + out, error_type="caption_failed", suggestion="Check that the input is a readable video file — captions " "can't be burned into audio-only media.", ) -def _resolve_transcript( - opts: CaptionOptions, media: Path, state: AppState, *, json_mode: bool -) -> object: - """The transcript whose captions are burned in: fetched by id, or made fresh - from the (already local) media file.""" - if opts.transcript_id is not None: - return client.get_transcript(state.resolve_api_key(), opts.transcript_id) - api_key = state.resolve_api_key() - with output.status("Transcribing for captions…", json_mode=json_mode, quiet=state.quiet): - return client.transcribe(api_key, str(media), config=aai.TranscriptionConfig()) - - def _fetch_srt(transcript: object, opts: CaptionOptions, *, json_mode: bool, quiet: bool) -> str: """The transcript's SRT captions from the export endpoint; empty is an error.""" with output.status("Fetching captions…", json_mode=json_mode, quiet=quiet): @@ -181,7 +121,7 @@ def _fetch_srt(transcript: object, opts: CaptionOptions, *, json_mode: bool, qui def run_caption(opts: CaptionOptions, state: AppState, *, json_mode: bool) -> None: """Execute one `assembly caption` invocation from already-parsed flags.""" - ffmpeg = _require_ffmpeg() + ffmpeg = mediafile.require_ffmpeg("burn captions into video") if youtube.is_downloadable_url(opts.media): # A media-page URL (YouTube, …) is downloaded once — always the full # video, since the captions are burned into it. The download dir is @@ -190,7 +130,7 @@ def run_caption(opts: CaptionOptions, state: AppState, *, json_mode: bool) -> No with output.status("Downloading video…", json_mode=json_mode, quiet=state.quiet): local = youtube.download_media(opts.media, Path(td), video=True) out = opts.out if opts.out is not None else Path.cwd() / default_out_path(local).name - _validate_out(out, local) + mediafile.validate_out(out, local) _caption_and_emit(opts, local, out, ffmpeg, state, json_mode=json_mode) return if opts.media.startswith(("http://", "https://")): @@ -199,10 +139,16 @@ def run_caption(opts: CaptionOptions, state: AppState, *, json_mode: bool) -> No "media-page URL yt-dlp can download (YouTube, …).", suggestion="Download the video first, then caption the local copy.", ) + if "://" in opts.media: + # Path() would collapse the "//" and report a corrupted echo of the URL. + raise UsageError( + f"assembly caption needs a local file, not a URL: {opts.media}", + suggestion="Download the video first, then caption the local copy.", + ) media = Path(opts.media) - _validate_media(media) + mediafile.validate_local_media(media, "caption", kind="video") out = opts.out if opts.out is not None else default_out_path(media) - _validate_out(out, media) + mediafile.validate_out(out, media) _caption_and_emit(opts, media, out, ffmpeg, state, json_mode=json_mode) @@ -216,7 +162,15 @@ def _caption_and_emit( json_mode: bool, ) -> None: """Caption an already-local video file into ``out`` and report the result.""" - transcript = _resolve_transcript(opts, media, state, json_mode=json_mode) + transcript = mediafile.resolve_transcript( + state.resolve_api_key(), + opts.transcript_id, + media, + status_message="Transcribing for captions…", + json_mode=json_mode, + quiet=state.quiet, + config=aai.TranscriptionConfig(), + ) transcript_id = str(getattr(transcript, "id", "")) srt = _fetch_srt(transcript, opts, json_mode=json_mode, quiet=state.quiet) captions = srt.count("-->") # one arrow per SRT cue timing line diff --git a/aai_cli/clip_exec.py b/aai_cli/clip_exec.py index 0e46f954..14ff69c5 100644 --- a/aai_cli/clip_exec.py +++ b/aai_cli/clip_exec.py @@ -20,17 +20,14 @@ from __future__ import annotations import json -import shutil -import subprocess import tempfile from dataclasses import dataclass from pathlib import Path from types import SimpleNamespace -import assemblyai as aai from rich.markup import escape -from aai_cli import client, clip_select, jsonshape, llm, output, stdio, youtube +from aai_cli import clip_select, jsonshape, llm, mediafile, output, stdio, youtube from aai_cli.clip_select import Segment from aai_cli.context import AppState from aai_cli.errors import CLIError, UsageError @@ -141,12 +138,14 @@ def _resolve_transcript( if text.startswith("{"): return _piped_transcript(text) transcript_id = text # a bare id (e.g. from `assembly transcribe … -o id`) - if transcript_id is not None: - return client.get_transcript(state.resolve_api_key(), transcript_id) - config = aai.TranscriptionConfig(speaker_labels=True) - api_key = state.resolve_api_key() - with output.status("Transcribing for clip selection…", json_mode=json_mode, quiet=state.quiet): - return client.transcribe(api_key, str(media), config=config) + return mediafile.resolve_diarized_transcript( + state.resolve_api_key(), + transcript_id, + media, + status_message="Transcribing for clip selection…", + json_mode=json_mode, + quiet=state.quiet, + ) def _transcript_segments( @@ -191,26 +190,6 @@ def _transcript_segments( return [clip_select.segment_of(utterance) for utterance in matched], transcript_id -def _validate_media(media: Path) -> None: - """Reject a missing local source before credential resolution, so a typo'd - path reads as "file not found", never as a login prompt or an opaque - ffmpeg error.""" - if not media.exists(): - raise CLIError( - f"File not found: {media}", - error_type="file_not_found", - exit_code=2, - suggestion="Check the path. assembly clip needs a local audio/video file.", - ) - if not media.is_file(): - raise CLIError( - f"Not a file: {media}", - error_type="not_a_file", - exit_code=2, - suggestion="Pass a media file, not a directory.", - ) - - def _validate_out_dir(out_dir: Path | None) -> None: if out_dir is not None and not out_dir.is_dir(): raise UsageError( @@ -236,23 +215,6 @@ def _validate_selection(opts: ClipOptions) -> None: ) -def _require_ffmpeg() -> str: - """The ffmpeg executable; checked before any (billed) transcription work.""" - path = shutil.which("ffmpeg") - if path is None: - raise CLIError( - "ffmpeg is required to cut media, but it isn't on PATH.", - error_type="missing_dependency", - suggestion="Install it (brew install ffmpeg / apt install ffmpeg) and re-run.", - ) - return path - - -def _run_ffmpeg(args: list[str]) -> subprocess.CompletedProcess[str]: - """Boundary seam for tests: one ffmpeg invocation, output captured.""" - return subprocess.run(args, capture_output=True, text=True, check=False) - - # -30dB for at least 0.2s reads as a pause in normal speech recordings. _SILENCE_FILTER = "silencedetect=noise=-30dB:d=0.2" @@ -265,7 +227,7 @@ def _detect_silences(ffmpeg: str, media: Path) -> list[Segment]: silencedetect logs at info level on stderr, so the usual ``-loglevel error`` would silence the very lines this parses. """ - result = _run_ffmpeg( + result = mediafile.run_ffmpeg( [ ffmpeg, "-hide_banner", @@ -291,7 +253,7 @@ def _cut_clip(ffmpeg: str, media: Path, segment: Segment, dest: Path) -> None: would snap to the nearest keyframe; ``-y`` makes a re-run overwrite its own earlier output instead of stalling on ffmpeg's prompt. """ - result = _run_ffmpeg( + result = mediafile.run_ffmpeg( [ ffmpeg, "-hide_banner", @@ -304,17 +266,11 @@ def _cut_clip(ffmpeg: str, media: Path, segment: Segment, dest: Path) -> None: f"{segment.start:.3f}", "-to", f"{segment.end:.3f}", - str(dest), + mediafile.path_arg(dest), ] ) if result.returncode != 0: - detail = result.stderr.strip().splitlines() - reason = detail[-1] if detail else f"ffmpeg exited with code {result.returncode}" - raise CLIError( - f"Could not cut {dest.name}: {reason}", - error_type="clip_failed", - suggestion="Check that the input is a readable audio/video file.", - ) + raise mediafile.ffmpeg_failure(result, "cut", dest, error_type="clip_failed") def _clip_dest(media: Path, out_dir: Path | None, index: int) -> Path: @@ -350,7 +306,7 @@ def run_clip(opts: ClipOptions, state: AppState, *, json_mode: bool) -> None: _validate_selection(opts) youtube.validate_video_flag(opts.media, video=opts.video) explicit = [clip_select.parse_range(value) for value in opts.ranges] - ffmpeg = _require_ffmpeg() + ffmpeg = mediafile.require_ffmpeg("cut media") if youtube.is_downloadable_url(opts.media): # A media-page URL (YouTube, podcast page, …) is downloaded once — the # audio track by default, the full video with --video so the clips carry @@ -371,7 +327,7 @@ def run_clip(opts: ClipOptions, state: AppState, *, json_mode: bool) -> None: suggestion="Download the media first, then clip the local copy.", ) media = Path(opts.media) - _validate_media(media) + mediafile.validate_local_media(media, "clip") _cut_and_emit(opts, media, opts.out_dir, explicit, ffmpeg, state, json_mode=json_mode) diff --git a/aai_cli/commands/dub.py b/aai_cli/commands/dub.py index 9db04c36..ef96f89a 100644 --- a/aai_cli/commands/dub.py +++ b/aai_cli/commands/dub.py @@ -59,6 +59,11 @@ def dub( "-l", help="Target language: an ISO code (de, fr, es, …) or a language name (German).", ), + source_lang: str | None = typer.Option( + None, + "--source-lang", + help="ISO code of the source audio (e.g. de). Default: auto-detect the language.", + ), transcript_id: str | None = typer.Option( None, "--transcript-id", @@ -119,6 +124,7 @@ def dub( opts = dub_exec.DubOptions( media=media, language=lang, + source_language=source_lang, transcript_id=transcript_id, voice=voice, model=model, diff --git a/aai_cli/dub_exec.py b/aai_cli/dub_exec.py index 5440fcc0..14df95a1 100644 --- a/aai_cli/dub_exec.py +++ b/aai_cli/dub_exec.py @@ -19,16 +19,13 @@ from __future__ import annotations import re -import shutil -import subprocess import tempfile from dataclasses import dataclass from pathlib import Path -import assemblyai as aai from rich.markup import escape -from aai_cli import client, environments, jsonshape, output, youtube +from aai_cli import jsonshape, mediafile, output, youtube from aai_cli import llm as gateway from aai_cli.context import AppState from aai_cli.errors import APIError, CLIError, UsageError @@ -76,6 +73,7 @@ class DubOptions: media: str language: str + source_language: str | None transcript_id: str | None voice: list[str] model: str @@ -100,6 +98,13 @@ def resolve_language(value: str) -> str: def default_out_path(media: Path, language: str) -> Path: """The default output file: ``.dub.`` next to the input.""" slug = re.sub(r"[^a-z0-9]+", "-", language.casefold()).strip("-") + if not slug: + # A name that slugs to nothing (e.g. 中文) would collide every such + # language onto one ".dub." file; make the user pick. + raise UsageError( + f"Can't derive a default output name for {language!r}.", + suggestion="Pass --out explicitly, e.g. --out dubbed.mp4.", + ) return media.parent / f"{media.stem}.dub.{slug}{media.suffix}" @@ -107,7 +112,7 @@ def assemble_timeline( placed: list[tuple[int, bytes]], sample_rate: int, total_seconds: float | None, -) -> bytes: +) -> bytearray: """Lay each ``(start_ms, pcm)`` segment onto a silence timeline. Gaps before a segment's start are filled with silence; a segment whose @@ -125,7 +130,7 @@ def assemble_timeline( tail = total_seconds - _pcm_seconds(pcm, sample_rate) if tail > 0: pcm.extend(audio.silence(sample_rate, tail)) - return bytes(pcm) + return pcm def _pcm_seconds(pcm: bytes | bytearray, sample_rate: int) -> float: @@ -133,64 +138,6 @@ def _pcm_seconds(pcm: bytes | bytearray, sample_rate: int) -> float: return len(pcm) / 2 / sample_rate -def _require_sandbox() -> None: - """`assembly dub` synthesizes with streaming TTS, which is sandbox-only today.""" - if not session.is_available(): - raise CLIError( - "assembly dub is only available in the sandbox (it uses streaming TTS).", - error_type="unsupported_environment", - exit_code=2, - suggestion="Re-run as: assembly --sandbox dub … " - f"(--sandbox goes before the command; or use --env {environments.SANDBOX_ENV}).", - ) - - -def _validate_media(media: Path) -> None: - """Reject a missing local source before credential resolution, so a typo'd - path reads as "file not found", never as a login prompt or an ffmpeg error.""" - if not media.exists(): - raise CLIError( - f"File not found: {media}", - error_type="file_not_found", - exit_code=2, - suggestion="Check the path. assembly dub needs a local audio/video file.", - ) - if not media.is_file(): - raise CLIError( - f"Not a file: {media}", - error_type="not_a_file", - exit_code=2, - suggestion="Pass a media file, not a directory.", - ) - - -def _validate_out(out: Path, media: Path) -> None: - """The dub must never overwrite its own input: ffmpeg would read and write the - same file concurrently, corrupting it.""" - if out.resolve() == media.resolve(): - raise UsageError( - "--out would overwrite the input file.", - suggestion="Pick a different output path.", - ) - - -def _require_ffmpeg() -> str: - """The ffmpeg executable; checked before any (billed) transcription work.""" - path = shutil.which("ffmpeg") - if path is None: - raise CLIError( - "ffmpeg is required to write the dubbed file, but it isn't on PATH.", - error_type="missing_dependency", - suggestion="Install it (brew install ffmpeg / apt install ffmpeg) and re-run.", - ) - return path - - -def _run_ffmpeg(args: list[str]) -> subprocess.CompletedProcess[str]: - """Boundary seam for tests: one ffmpeg invocation, output captured.""" - return subprocess.run(args, capture_output=True, text=True, check=False) - - def _mux(ffmpeg: str, media: Path, track: Path, out: Path) -> None: """Swap ``track`` in as the audio of ``media``, writing ``out``. @@ -199,7 +146,7 @@ def _mux(ffmpeg: str, media: Path, track: Path, out: Path) -> None: dubs both a video and a plain audio file. ``-y`` makes a re-run overwrite its own earlier output instead of stalling on ffmpeg's prompt. """ - result = _run_ffmpeg( + result = mediafile.run_ffmpeg( [ ffmpeg, "-hide_banner", @@ -216,17 +163,11 @@ def _mux(ffmpeg: str, media: Path, track: Path, out: Path) -> None: "1:a", "-c:v", "copy", - str(out), + mediafile.path_arg(out), ] ) if result.returncode != 0: - detail = result.stderr.strip().splitlines() - reason = detail[-1] if detail else f"ffmpeg exited with code {result.returncode}" - raise CLIError( - f"Could not write {out.name}: {reason}", - error_type="dub_failed", - suggestion="Check that the input is a readable audio/video file.", - ) + raise mediafile.ffmpeg_failure(result, "write", out, error_type="dub_failed") @dataclass(frozen=True) @@ -238,21 +179,7 @@ class _Utterance: text: str -def _resolve_transcript( - opts: DubOptions, media: Path, state: AppState, *, json_mode: bool -) -> object: - """The diarized transcript driving the dub: fetched by id, or made fresh from - the (already local) media file — always with speaker labels, so each speaker - can keep a distinct voice in the dub.""" - if opts.transcript_id is not None: - return client.get_transcript(state.resolve_api_key(), opts.transcript_id) - config = aai.TranscriptionConfig(speaker_labels=True) - api_key = state.resolve_api_key() - with output.status("Transcribing for dubbing…", json_mode=json_mode, quiet=state.quiet): - return client.transcribe(api_key, str(media), config=config) - - -def _utterances_of(transcript: object) -> list[_Utterance]: +def _utterances_of(transcript: object, transcript_id: str) -> list[_Utterance]: """The transcript's spoken utterances, with empty-text ones dropped.""" utterances = [ _Utterance( @@ -264,7 +191,6 @@ def _utterances_of(transcript: object) -> list[_Utterance]: ] spoken = [utterance for utterance in utterances if utterance.text] if not spoken: - transcript_id = str(getattr(transcript, "id", "")) raise CLIError( f"Transcript {transcript_id} has no utterances to dub.", error_type="no_utterances", @@ -309,6 +235,14 @@ def _translate( api_key, model=opts.model, messages=messages, max_tokens=opts.max_tokens ) translated = gateway.content_of(response).strip() + # "length" is OpenAI's truncation marker; the gateway's Anthropic-flavored + # responses use "max_tokens". A clipped translation must never be dubbed. + if getattr(response.choices[0], "finish_reason", None) in {"length", "max_tokens"}: + raise APIError( + f"The translation of utterance {index} was cut off at --max-tokens " + f"({opts.max_tokens}).", + suggestion="Re-run with a higher --max-tokens.", + ) if not translated: raise APIError( f"The model returned an empty translation for utterance {index} " @@ -348,10 +282,36 @@ def _synthesize( return [result.pcm for result in results], results[0].sample_rate +def _warn_ignored_voice_pins( + overrides: dict[str, str], speakers: dict[str, str], *, json_mode: bool +) -> None: + """Mirror `assembly speak`: a requested --voice mapping is never dropped + silently, so a pin for a speaker the diarization didn't produce is called out.""" + present = {speaker.casefold() for speaker in speakers} + ignored = [speaker for speaker in overrides if speaker not in present] + if ignored: + output.emit_warning( + "Ignoring --voice mapping(s) for speaker(s) not in the transcript: " + f"{', '.join(ignored)}.", + json_mode=json_mode, + ) + + +@dataclass(frozen=True) +class _VoicePlan: + """The parsed --voice flags: the bare voice (if any) plus SPEAKER=VOICE pins. + + Parsed in run_dub — before the billed pipeline, so a malformed mapping + fails fast — and carried as one value through _dub_and_emit.""" + + bare: str | None + overrides: dict[str, str] + + def _assign_voices( utterances: list[_Utterance], translations: list[str], - voice_values: list[str], + plan: _VoicePlan, language: str, ) -> tuple[list[tuple[str, str]], dict[str, str]]: """Resolve each translated utterance to ``(voice, text)`` plus the speaker→voice map. @@ -362,21 +322,22 @@ def _assign_voices( each voice speaks one language, so a non-English dub switches to that language's native voice(s). """ - bare_voice, overrides = dialogue.parse_voice_overrides(voice_values) - rotation = (bare_voice,) if bare_voice is not None else voices.rotation_for(language) + rotation = (plan.bare,) if plan.bare is not None else voices.rotation_for(language) segments = [ dialogue.Segment(utterance.speaker, translated) # strict=True is an invariant guard only: _translate returns exactly one # translation per utterance, so the lengths can never differ. for utterance, translated in zip(utterances, translations, strict=True) # pragma: no mutate ] - return dialogue.assign_voices(segments, rotation, overrides) + return dialogue.assign_voices(segments, rotation, plan.overrides) def run_dub(opts: DubOptions, state: AppState, *, json_mode: bool) -> None: """Execute one `assembly dub` invocation from already-parsed flags.""" language = resolve_language(opts.language) - _require_sandbox() + session.require_available("dub") + # Parse --voice now: a malformed mapping must fail before the billed pipeline. + voice_plan = _VoicePlan(*dialogue.parse_voice_overrides(opts.voice)) youtube.validate_video_flag(opts.media, video=opts.video) youtube.validate_sections_flag(opts.media, opts.download_sections) if youtube.is_downloadable_url(opts.media): @@ -385,7 +346,7 @@ def run_dub(opts: DubOptions, state: AppState, *, json_mode: bool) -> None: # the picture, only the --download-sections slices when given — and # dubbed locally. ffmpeg is checked before the download so a missing # dependency fails before any fetch. - ffmpeg = _require_ffmpeg() + ffmpeg = mediafile.require_ffmpeg("write the dubbed file") downloading = "Downloading video…" if opts.video else "Downloading audio…" with tempfile.TemporaryDirectory(prefix="aai-dub-src-") as td: with output.status(downloading, json_mode=json_mode, quiet=state.quiet): @@ -402,8 +363,10 @@ def run_dub(opts: DubOptions, state: AppState, *, json_mode: bool) -> None: if opts.out is not None else Path.cwd() / default_out_path(local, language).name ) - _validate_out(out, local) - _dub_and_emit(opts, local, out, language, ffmpeg, state, json_mode=json_mode) + mediafile.validate_out(out, local) + _dub_and_emit( + opts, local, out, language, ffmpeg, voice_plan, state, json_mode=json_mode + ) return if opts.media.startswith(("http://", "https://")): raise UsageError( @@ -411,12 +374,18 @@ def run_dub(opts: DubOptions, state: AppState, *, json_mode: bool) -> None: "media-page URL yt-dlp can download (YouTube, podcasts, …).", suggestion="Download the media first, then dub the local copy.", ) + if "://" in opts.media: + # Path() would collapse the "//" and report a corrupted echo of the URL. + raise UsageError( + f"assembly dub needs a local file, not a URL: {opts.media}", + suggestion="Download the media first, then dub the local copy.", + ) media = Path(opts.media) - _validate_media(media) + mediafile.validate_local_media(media, "dub") out = opts.out if opts.out is not None else default_out_path(media, language) - _validate_out(out, media) - ffmpeg = _require_ffmpeg() - _dub_and_emit(opts, media, out, language, ffmpeg, state, json_mode=json_mode) + mediafile.validate_out(out, media) + ffmpeg = mediafile.require_ffmpeg("write the dubbed file") + _dub_and_emit(opts, media, out, language, ffmpeg, voice_plan, state, json_mode=json_mode) def _dub_and_emit( @@ -425,26 +394,41 @@ def _dub_and_emit( out: Path, language: str, ffmpeg: str, + voice_plan: _VoicePlan, state: AppState, *, json_mode: bool, ) -> None: """Dub an already-local media file into ``out`` and report the result.""" - transcript = _resolve_transcript(opts, media, state, json_mode=json_mode) - transcript_id = str(getattr(transcript, "id", "")) - utterances = _utterances_of(transcript) api_key = state.resolve_api_key() + transcript = mediafile.resolve_diarized_transcript( + api_key, + opts.transcript_id, + media, + status_message="Transcribing for dubbing…", + json_mode=json_mode, + quiet=state.quiet, + language_code=opts.source_language, + # Dub input is typically not English (the API default), so a fresh + # transcription auto-detects the source language unless --source-lang pins it. + detect_language=opts.source_language is None, + ) + transcript_id = str(getattr(transcript, "id", "")) + utterances = _utterances_of(transcript, transcript_id) translations = _translate( api_key, utterances, language, opts, json_mode=json_mode, quiet=state.quiet ) - resolved, speakers = _assign_voices(utterances, translations, opts.voice, language) + resolved, speakers = _assign_voices(utterances, translations, voice_plan, language) + _warn_ignored_voice_pins(voice_plan.overrides, speakers, json_mode=json_mode) pcm_segments, sample_rate = _synthesize( api_key, resolved, language, json_mode=json_mode, quiet=state.quiet ) # strict=True is an invariant guard only: _synthesize returns one PCM per segment. - starts = (u.start_ms for u in utterances) - placed = list(zip(starts, pcm_segments, strict=True)) # pragma: no mutate + placed = [ + (utterance.start_ms, pcm) + for utterance, pcm in zip(utterances, pcm_segments, strict=True) # pragma: no mutate + ] track = assemble_timeline(placed, sample_rate, _total_seconds(transcript)) with tempfile.TemporaryDirectory(prefix="aai-dub-") as tmp: wav = Path(tmp) / "dub.wav" @@ -453,7 +437,8 @@ def _dub_and_emit( _mux(ffmpeg, media, wav, out) duration = round(_pcm_seconds(track, sample_rate), 3) - voices = ", ".join(f"{speaker}={voice}" for speaker, voice in speakers.items()) + # Not named `voices`: that would shadow the tts.voices module imported above. + voices_text = ", ".join(f"{speaker}={voice}" for speaker, voice in speakers.items()) payload: dict[str, object] = { "source": opts.media, "out": str(out), @@ -466,8 +451,10 @@ def _dub_and_emit( } output.emit( payload, + # language and voices carry user-typed text, so they need escaping too. lambda _: output.success( - f"{escape(str(out))} dubbed to {language} ({len(utterances)} utterances, {voices})" + f"{escape(str(out))} dubbed to {escape(language)} " + f"({len(utterances)} utterances, {escape(voices_text)})" ), json_mode=json_mode, ) diff --git a/aai_cli/mediafile.py b/aai_cli/mediafile.py new file mode 100644 index 00000000..69ee503d --- /dev/null +++ b/aai_cli/mediafile.py @@ -0,0 +1,184 @@ +"""Shared scaffolding for commands that operate on a local media file (clip, +dub): source validation, ffmpeg discovery/invocation, and resolution of the +diarized transcript that drives selection or dubbing. + +The helpers raise identical CLIErrors regardless of the calling command — only +the command-name/purpose strings differ — so the media-file UX of `assembly +clip` and `assembly dub` can't drift apart. +""" + +from __future__ import annotations + +import shutil +import subprocess +from pathlib import Path + +import assemblyai as aai + +from aai_cli import client, output +from aai_cli.errors import APIError, CLIError, UsageError + + +def validate_local_media(media: Path, command: str, *, kind: str = "audio/video") -> None: + """Reject a missing local source before credential resolution, so a typo'd + path reads as "file not found", never as a login prompt or an opaque + ffmpeg error.""" + if not media.exists(): + raise CLIError( + f"File not found: {media}", + error_type="file_not_found", + exit_code=2, + suggestion=f"Check the path. assembly {command} needs a local {kind} file.", + ) + if not media.is_file(): + raise CLIError( + f"Not a file: {media}", + error_type="not_a_file", + exit_code=2, + suggestion="Pass a media file, not a directory.", + ) + + +def validate_out(out: Path, media: Path) -> None: + """An unwritable or self-overwriting output file must fail here, before the + billed transcription/translation/synthesis pipeline runs. + + The samefile check catches what path comparison can't: on case-insensitive + filesystems (macOS APFS) ``--out TALK.MP4`` against ``talk.mp4`` — or a hard + link — is the same file under a different spelling, and ffmpeg would read + and write it concurrently, corrupting the input.""" + if out.resolve() == media.resolve() or (out.exists() and out.samefile(media)): + raise UsageError( + "--out would overwrite the input file.", + suggestion="Pick a different output path.", + ) + if out.is_dir(): + raise UsageError( + f"--out is a directory: {out}", + suggestion="Point --out at a file path, e.g. --out dubbed.mp4.", + ) + if not out.parent.is_dir(): + raise UsageError( + f"The output directory doesn't exist: {out.parent}", + suggestion="Create it first, or point --out somewhere that exists.", + ) + if not out.suffix: + raise UsageError( + f"The output file {out.name!r} has no extension.", + suggestion="ffmpeg picks the container from the extension; pass e.g. --out dubbed.mp4.", + ) + + +def require_ffmpeg(purpose: str) -> str: + """The ffmpeg executable; checked before any (billed) transcription work.""" + path = shutil.which("ffmpeg") + if path is None: + raise CLIError( + f"ffmpeg is required to {purpose}, but it isn't on PATH.", + error_type="missing_dependency", + suggestion="Install it (brew install ffmpeg / apt install ffmpeg) and re-run.", + ) + return path + + +def run_ffmpeg(args: list[str]) -> subprocess.CompletedProcess[str]: + """Boundary seam for tests: one ffmpeg invocation, output captured.""" + return subprocess.run(args, capture_output=True, text=True, check=False) + + +def path_arg(path: Path) -> str: + """``path`` as an ffmpeg argv token: a leading '-' is disambiguated with + ``./`` so a filename like ``-out.mp4`` can't be parsed as an option.""" + text = str(path) + return f"./{text}" if text.startswith("-") else text + + +def ffmpeg_failure( + result: subprocess.CompletedProcess[str], + action: str, + dest: Path, + *, + error_type: str, + suggestion: str = "Check that the input is a readable audio/video file.", +) -> CLIError: + """A failed ffmpeg run as a clean CLIError: the reason is ffmpeg's last + stderr line (earlier noise dropped), or the exit code when it said nothing.""" + detail = result.stderr.strip().splitlines() + reason = detail[-1] if detail else f"ffmpeg exited with code {result.returncode}" + return CLIError( + f"Could not {action} {dest.name}: {reason}", + error_type=error_type, + suggestion=suggestion, + ) + + +def resolve_transcript( + api_key: str, + transcript_id: str | None, + media: Path, + *, + status_message: str, + json_mode: bool, + quiet: bool, + config: aai.TranscriptionConfig, +) -> object: + """The transcript driving the command: fetched by id (and verified usable), + or made fresh from the (already local) media file with ``config``.""" + if transcript_id is not None: + return _fetched_transcript(api_key, transcript_id) + with output.status(status_message, json_mode=json_mode, quiet=quiet): + return client.transcribe(api_key, str(media), config=config) + + +def resolve_diarized_transcript( + api_key: str, + transcript_id: str | None, + media: Path, + *, + status_message: str, + json_mode: bool, + quiet: bool, + language_code: str | None = None, + detect_language: bool = False, +) -> object: + """The diarized transcript driving the command — always with speaker labels, + so the caller can select or voice content per speaker.""" + config = aai.TranscriptionConfig( + speaker_labels=True, + language_code=language_code, + language_detection=detect_language or None, + ) + return resolve_transcript( + api_key, + transcript_id, + media, + status_message=status_message, + json_mode=json_mode, + quiet=quiet, + config=config, + ) + + +def _fetched_transcript(api_key: str, transcript_id: str) -> object: + """A --transcript-id transcript, rejected unless it finished successfully — + a queued/processing/errored one would otherwise surface much later as a + misleading 'no utterances' failure.""" + transcript = client.get_transcript(api_key, transcript_id) + raw_status = getattr(transcript, "status", None) + status = str(getattr(raw_status, "value", raw_status) or "") + if status == "error": + raise APIError( + getattr(transcript, "error", None) or "Transcript failed.", + transcript_id=transcript_id, + ) + if status in {"queued", "processing"}: + raise CLIError( + f"Transcript {transcript_id} is still {status}.", + error_type="transcript_not_ready", + exit_code=2, + suggestion=( + f"Wait for it to finish (assembly transcripts get {transcript_id}), " + "or drop -t to transcribe the file fresh." + ), + ) + return transcript diff --git a/aai_cli/speak_exec.py b/aai_cli/speak_exec.py index 56395d15..6b9059f4 100644 --- a/aai_cli/speak_exec.py +++ b/aai_cli/speak_exec.py @@ -11,9 +11,9 @@ from dataclasses import dataclass from pathlib import Path -from aai_cli import environments, output, stdio +from aai_cli import output, stdio from aai_cli.context import AppState -from aai_cli.errors import CLIError, UsageError +from aai_cli.errors import UsageError from aai_cli.tts import audio, dialogue, session, voices # The streaming-TTS reference client defaults to English, so the CLI does the @@ -177,14 +177,7 @@ def _speak_dialogue( def run_speak(opts: SpeakOptions, state: AppState, *, json_mode: bool) -> None: """Execute one `assembly speak` invocation from already-parsed flags.""" - if not session.is_available(): - raise CLIError( - "assembly speak is only available in the sandbox.", - error_type="unsupported_environment", - exit_code=2, - suggestion="Re-run as: assembly --sandbox speak … " - f"(--sandbox goes before the command; or use --env {environments.SANDBOX_ENV}).", - ) + session.require_available("speak") spoken = _read_text(opts.text) api_key = state.resolve_api_key() bare_voice, overrides = dialogue.parse_voice_overrides(opts.voice) diff --git a/aai_cli/tts/audio.py b/aai_cli/tts/audio.py index 315c1a30..eb0a4d75 100644 --- a/aai_cli/tts/audio.py +++ b/aai_cli/tts/audio.py @@ -36,7 +36,7 @@ def close(self) -> None: _PLAYBACK_CHUNK_BYTES = 4096 -def write_wav(path: Path, pcm: bytes, sample_rate: int) -> None: +def write_wav(path: Path, pcm: bytes | bytearray, sample_rate: int) -> None: """Write 16-bit mono PCM to a WAV file, creating parent dirs as needed.""" path.parent.mkdir(parents=True, exist_ok=True) with wave.open(str(path), "wb") as wav: diff --git a/aai_cli/tts/session.py b/aai_cli/tts/session.py index 7349ba2d..f54cd7c2 100644 --- a/aai_cli/tts/session.py +++ b/aai_cli/tts/session.py @@ -86,6 +86,19 @@ def is_available() -> bool: return bool(environments.active().streaming_tts_host) +def require_available(command: str) -> None: + """Refuse to run a streaming-TTS command (speak, dub) outside the sandbox, + pointing at the exact --sandbox re-invocation.""" + if not is_available(): + raise CLIError( + f"assembly {command} is only available in the sandbox (it uses streaming TTS).", + error_type="unsupported_environment", + exit_code=2, + suggestion=f"Re-run as: assembly --sandbox {command} … " + f"(--sandbox goes before the command; or use --env {environments.SANDBOX_ENV}).", + ) + + def ws_url(params: dict[str, str]) -> str: """The streaming-TTS socket URL for the active environment, with query params.""" base = f"wss://{environments.active().streaming_tts_host}/v1/ws/" diff --git a/tests/__snapshots__/test_snapshots_help_run.ambr b/tests/__snapshots__/test_snapshots_help_run.ambr index c0bdd962..241e1f11 100644 --- a/tests/__snapshots__/test_snapshots_help_run.ambr +++ b/tests/__snapshots__/test_snapshots_help_run.ambr @@ -309,6 +309,9 @@ │ * --lang -l TEXT Target language: an ISO code (de, fr, │ │ es, …) or a language name (German). │ │ [required] │ + │ --source-lang TEXT ISO code of the source audio (e.g. │ + │ de). Default: auto-detect the │ + │ language. │ │ --transcript-id -t TEXT Reuse an existing diarized transcript │ │ of this media instead of transcribing │ │ it again. │ diff --git a/tests/_clip_helpers.py b/tests/_clip_helpers.py index 9b5ee48d..6e05a43a 100644 --- a/tests/_clip_helpers.py +++ b/tests/_clip_helpers.py @@ -14,7 +14,7 @@ import pytest -from aai_cli import clip_exec, llm +from aai_cli import llm, mediafile from aai_cli.clip_exec import ClipOptions _ANSI_SGR = re.compile(r"\x1b\[[0-9;]*m") @@ -71,5 +71,5 @@ def run(args: list[str]) -> subprocess.CompletedProcess[str]: stderr = detect_log if "-af" in args else "" return subprocess.CompletedProcess(args=args, returncode=0, stdout="", stderr=stderr) - monkeypatch.setattr(clip_exec, "_run_ffmpeg", run) + monkeypatch.setattr(mediafile, "run_ffmpeg", run) return calls diff --git a/tests/_dub_helpers.py b/tests/_dub_helpers.py index 1845be24..c8ddb2b0 100644 --- a/tests/_dub_helpers.py +++ b/tests/_dub_helpers.py @@ -8,7 +8,6 @@ from __future__ import annotations -import re import subprocess import wave from pathlib import Path @@ -16,7 +15,7 @@ import pytest -from aai_cli import client, config, dub_exec, llm +from aai_cli import client, config, llm, mediafile from aai_cli.dub_exec import DubOptions from aai_cli.tts import session from aai_cli.tts.session import SpeakResult @@ -25,6 +24,7 @@ DEFAULTS = DubOptions( media="talk.mp4", language="de", + source_language=None, transcript_id=None, voice=[], model=llm.DEFAULT_MODEL, @@ -36,14 +36,6 @@ SAMPLE_RATE = 100 # tiny rate keeps the timeline byte math exact and readable -_ANSI_SGR = re.compile(r"\x1b\[[0-9;]*m") - - -def plain(text: str) -> str: - """Strip SGR color codes (CI forces color on, splitting flags like --lang - with style sequences) for substring assertions.""" - return _ANSI_SGR.sub("", text) - def utterance(start, speaker, text): return SimpleNamespace(start=start, end=None, speaker=speaker, text=text) @@ -53,9 +45,11 @@ def fake_transcript(utterances, *, audio_duration=5): return SimpleNamespace(id="tr_dub", utterances=utterances, audio_duration=audio_duration) -def completion(text): - """The slice of an OpenAI ChatCompletion that gateway.content_of reads.""" - return SimpleNamespace(choices=[SimpleNamespace(message=SimpleNamespace(content=text))]) +def completion(text, finish_reason=None): + """The slice of an OpenAI ChatCompletion that gateway.content_of and the + dub truncation check read.""" + choice = SimpleNamespace(message=SimpleNamespace(content=text), finish_reason=finish_reason) + return SimpleNamespace(choices=[choice]) def write_media(tmp_path: Path) -> Path: @@ -127,5 +121,5 @@ def run(args: list[str]) -> subprocess.CompletedProcess[str]: recorded["wav_frames"] = wav.readframes(wav.getnframes()) return subprocess.CompletedProcess(args=args, returncode=0, stdout="", stderr="") - monkeypatch.setattr(dub_exec, "_run_ffmpeg", run) + monkeypatch.setattr(mediafile, "run_ffmpeg", run) return recorded diff --git a/tests/test_caption_exec.py b/tests/test_caption_exec.py index 372b411e..e88954bd 100644 --- a/tests/test_caption_exec.py +++ b/tests/test_caption_exec.py @@ -2,7 +2,7 @@ the pure helpers (output naming, filtergraph escaping), validation order, and the faked transcribe → SRT export → ffmpeg burn-in runs. The boundaries are faked at the modules caption_exec calls into (`client.transcribe`, `client.get_transcript`, -`youtube.download_media`) and at `caption_exec._run_ffmpeg`; argv parsing lives in +`youtube.download_media`) and at `mediafile.run_ffmpeg`; argv parsing lives in test_caption_command.py.""" from __future__ import annotations @@ -10,18 +10,17 @@ import contextlib import dataclasses import json -import re import subprocess -import sys from pathlib import Path from types import SimpleNamespace import pytest -from aai_cli import caption_exec, client, config, youtube +from aai_cli import caption_exec, client, config, mediafile, youtube from aai_cli.caption_exec import CaptionOptions from aai_cli.context import AppState from aai_cli.errors import CLIError, UsageError +from tests._clip_helpers import plain # The CLI's flag defaults, as data. Tests override per-case with dataclasses.replace. DEFAULTS = CaptionOptions( @@ -34,13 +33,6 @@ SRT = "1\n00:00:00,500 --> 00:00:01,500\nHello.\n\n2\n00:00:02,000 --> 00:00:03,000\nWorld.\n" -_ANSI_SGR = re.compile(r"\x1b\[[0-9;]*m") - - -def plain(text: str) -> str: - """Strip SGR color codes (CI forces color on) for substring assertions.""" - return _ANSI_SGR.sub("", text) - def fake_transcript(srt: str = SRT, transcript_id: str = "tr_cap"): """A transcript double whose SRT export records the chars_per_caption it got.""" @@ -70,7 +62,7 @@ def run(args: list[str]) -> subprocess.CompletedProcess[str]: args=args, returncode=returncode, stdout="", stderr=stderr ) - monkeypatch.setattr(caption_exec, "_run_ffmpeg", run) + monkeypatch.setattr(mediafile, "run_ffmpeg", run) return recorded @@ -141,21 +133,6 @@ def test_subtitles_filter_escapes_filtergraph_metacharacters(): assert spec == "subtitles=/tmp/a\\'b\\:c\\,d\\;e\\[f\\]g.srt" -def test_run_ffmpeg_captures_output_and_does_not_raise(): - # The real boundary (not the fake): output is captured as text and a non-zero - # exit must not raise — _burn turns the exit code into a CLIError itself. - result = caption_exec._run_ffmpeg( - [ - sys.executable, - "-c", - "import sys; print('out'); print('err', file=sys.stderr); sys.exit(3)", - ] - ) - assert result.returncode == 3 - assert result.stdout == "out\n" - assert result.stderr == "err\n" - - # --- validation order (cheap local checks before any credential or network) ---- @@ -164,7 +141,8 @@ def test_run_caption_requires_ffmpeg(monkeypatch): with pytest.raises(CLIError) as exc: _run(DEFAULTS, json_mode=False) assert exc.value.error_type == "missing_dependency" - assert "ffmpeg" in exc.value.message + # The purpose string pins the shared helper's parameterization. + assert "ffmpeg is required to burn captions into video" in exc.value.message def test_run_caption_rejects_missing_file(fake_ffmpeg, tmp_path): @@ -173,7 +151,8 @@ def test_run_caption_rejects_missing_file(fake_ffmpeg, tmp_path): _run(opts, json_mode=False) assert exc.value.error_type == "file_not_found" assert exc.value.exit_code == 2 - assert "local video file" in (exc.value.suggestion or "") + # The command name + kind pin the shared helper's parameterization. + assert "assembly caption needs a local video file" in (exc.value.suggestion or "") def test_run_caption_rejects_directory(fake_ffmpeg, tmp_path): @@ -200,6 +179,15 @@ def test_run_caption_rejects_non_downloadable_url(fake_ffmpeg): assert "Download the video first" in (exc.value.suggestion or "") +def test_run_caption_rejects_remote_urls_with_the_url_intact(fake_ffmpeg): + # Path() would collapse "//" and echo a corrupted "s3:/bucket/…" back. + opts = dataclasses.replace(DEFAULTS, media="s3://bucket/talk.mp4") + with pytest.raises(UsageError) as exc: + _run(opts, json_mode=False) + assert "s3://bucket/talk.mp4" in exc.value.message + assert "Download the video first" in (exc.value.suggestion or "") + + # --- the faked pipeline --------------------------------------------------------- @@ -207,9 +195,11 @@ def test_run_caption_end_to_end(media, fake_transcribe, fake_ffmpeg, capsys): opts = dataclasses.replace(DEFAULTS, media=str(media)) _run(opts, json_mode=True) - # Transcription: the local file, with the resolved key. + # Transcription: the local file, with the resolved key, no diarization + # (captions don't need speaker labels). assert fake_transcribe["api_key"] == "test-key" assert fake_transcribe["audio"] == str(media) + assert fake_transcribe["config"].speaker_labels is None # No --chars-per-caption: the export endpoint gets None (its own default). assert fake_transcribe["transcript"].export_calls == [None] @@ -270,6 +260,16 @@ def fake_status(message, *, json_mode, quiet): assert messages == ["Transcribing for captions…", "Fetching captions…", "Burning captions…"] +def test_dash_prefixed_out_is_disambiguated_for_ffmpeg( + media, fake_transcribe, fake_ffmpeg, monkeypatch, tmp_path +): + # A bare "-cap.mp4" argv token would be parsed by ffmpeg as an option. + monkeypatch.chdir(tmp_path) + opts = dataclasses.replace(DEFAULTS, media=str(media), out=Path("-cap.mp4")) + _run(opts, json_mode=True) + assert fake_ffmpeg["args"][-1] == "./-cap.mp4" + + def test_run_caption_forwards_chars_per_caption(media, fake_transcribe, fake_ffmpeg): opts = dataclasses.replace(DEFAULTS, media=str(media), chars_per_caption=32) _run(opts, json_mode=True) diff --git a/tests/test_clip_command.py b/tests/test_clip_command.py index bb51ef3f..b7847e40 100644 --- a/tests/test_clip_command.py +++ b/tests/test_clip_command.py @@ -9,7 +9,7 @@ from typer.testing import CliRunner -from aai_cli import clip_exec, llm +from aai_cli import clip_exec, llm, mediafile from aai_cli.clip_exec import ClipOptions from aai_cli.main import app @@ -141,7 +141,7 @@ def fake_run(args: list[str]) -> subprocess.CompletedProcess[str]: calls.append(args) return subprocess.CompletedProcess(args=args, returncode=0, stdout="", stderr="") - monkeypatch.setattr(clip_exec, "_run_ffmpeg", fake_run) + monkeypatch.setattr(mediafile, "run_ffmpeg", fake_run) result = runner.invoke(app, ["clip", str(media), "--range", "1-2", "--json"]) assert result.exit_code == 0, result.output # calls[0] is the silencedetect pass; calls[1] the cut. diff --git a/tests/test_clip_exec.py b/tests/test_clip_exec.py index 2f365071..efa1ab25 100644 --- a/tests/test_clip_exec.py +++ b/tests/test_clip_exec.py @@ -2,7 +2,7 @@ validation, ffmpeg orchestration, and transcript-backed --speaker/--search selection. Constructed-options tests (dataclasses.replace off the shared defaults) avoid any argv round-trip; the ffmpeg boundary is faked at -`clip_exec._run_ffmpeg`. The pure selection logic is covered in +`mediafile.run_ffmpeg`. The pure selection logic is covered in test_clip_select.py; YouTube/stdin/LLM sources in test_clip_sources.py.""" from __future__ import annotations @@ -16,7 +16,7 @@ import pytest -from aai_cli import clip_exec, config +from aai_cli import client, clip_exec, config, mediafile from aai_cli.clip_select import Segment from aai_cli.context import AppState from aai_cli.errors import CLIError, UsageError @@ -72,7 +72,8 @@ def test_run_clip_rejects_missing_file(tmp_path): assert exc.value.error_type == "file_not_found" assert exc.value.exit_code == 2 assert "File not found" in exc.value.message - assert "local audio/video file" in (exc.value.suggestion or "") + # The command name pins the shared helper's parameterization. + assert "assembly clip needs a local audio/video file" in (exc.value.suggestion or "") def test_run_clip_rejects_directory(tmp_path): @@ -126,7 +127,8 @@ def test_run_clip_requires_ffmpeg(media, monkeypatch): with pytest.raises(CLIError) as exc: clip_exec.run_clip(opts, AppState(), json_mode=False) assert exc.value.error_type == "missing_dependency" - assert "ffmpeg is required" in exc.value.message + # The purpose string pins the shared helper's parameterization. + assert "ffmpeg is required to cut media" in exc.value.message assert "Install it" in (exc.value.suggestion or "") @@ -213,6 +215,15 @@ def test_run_clip_rounds_payload_times_to_milliseconds(media, fake_ffmpeg, capsy } +def test_dash_prefixed_clip_dest_is_disambiguated_for_ffmpeg(tmp_path, fake_ffmpeg, monkeypatch): + # A bare "-x.clip01.mp4" argv token would be parsed by ffmpeg as an option. + monkeypatch.chdir(tmp_path) + (tmp_path / "-x.mp4").write_bytes(b"\x00fake-media") + opts = dataclasses.replace(DEFAULTS, media="-x.mp4", ranges=["1-2"]) + clip_exec.run_clip(opts, AppState(), json_mode=True) + assert fake_ffmpeg[1][-1] == "./-x.clip01.mp4" + + def test_run_clip_honors_out_dir(media, tmp_path, fake_ffmpeg, capsys): out_dir = tmp_path / "clips" out_dir.mkdir() @@ -257,7 +268,7 @@ def run(args: list[str]) -> subprocess.CompletedProcess[str]: ) return subprocess.CompletedProcess(args=args, returncode=0, stdout="", stderr="") - monkeypatch.setattr(clip_exec, "_run_ffmpeg", run) + monkeypatch.setattr(mediafile, "run_ffmpeg", run) opts = dataclasses.replace(DEFAULTS, media=str(media), ranges=["5-12.5"]) clip_exec.run_clip(opts, AppState(), json_mode=True) clips = json.loads(capsys.readouterr().out)["clips"] @@ -284,7 +295,7 @@ def fail(args: list[str]) -> subprocess.CompletedProcess[str]: args=args, returncode=1, stdout="", stderr="noise\nInvalid data found\n" ) - monkeypatch.setattr(clip_exec, "_run_ffmpeg", fail) + monkeypatch.setattr(mediafile, "run_ffmpeg", fail) opts = dataclasses.replace(DEFAULTS, media=str(media), ranges=["1-2"]) with pytest.raises(CLIError) as exc: clip_exec.run_clip(opts, AppState(), json_mode=False) @@ -299,8 +310,8 @@ def fail(args: list[str]) -> subprocess.CompletedProcess[str]: def test_run_clip_reports_exit_code_when_ffmpeg_is_silent(media, monkeypatch): monkeypatch.setattr("shutil.which", lambda name: "/usr/bin/ffmpeg") monkeypatch.setattr( - clip_exec, - "_run_ffmpeg", + mediafile, + "run_ffmpeg", lambda args: subprocess.CompletedProcess(args=args, returncode=3, stdout="", stderr=""), ) opts = dataclasses.replace(DEFAULTS, media=str(media), ranges=["1-2"]) @@ -311,8 +322,8 @@ def test_run_clip_reports_exit_code_when_ffmpeg_is_silent(media, monkeypatch): def test_run_ffmpeg_captures_output_and_does_not_raise(): # The real boundary (not the fake): output is captured as text and a non-zero - # exit must not raise — _cut_clip turns the exit code into a CLIError itself. - result = clip_exec._run_ffmpeg( + # exit must not raise — the callers turn the exit code into a CLIError. + result = mediafile.run_ffmpeg( [ sys.executable, "-c", @@ -337,12 +348,14 @@ def fake_transcribe(api_key, audio, *, config): seen["config"] = config return fake_transcript(list(UTTERANCES)) - monkeypatch.setattr(clip_exec.client, "transcribe", fake_transcribe) + monkeypatch.setattr(client, "transcribe", fake_transcribe) opts = dataclasses.replace(DEFAULTS, media=str(media), speakers=["a"]) clip_exec.run_clip(opts, AppState(), json_mode=True) assert seen["api_key"] == "sk_test" assert seen["audio"] == str(media) assert seen["config"].speaker_labels is True + # Clip keeps the API's language defaults (only dub opts into detection). + assert seen["config"].language_detection is None payload = json.loads(capsys.readouterr().out) assert payload["transcript_id"] == "tr_123" # Speaker A's two utterances: 1.5-2.5s and 5-6s. @@ -359,9 +372,9 @@ def fake_get(api_key, transcript_id): seen["args"] = (api_key, transcript_id) return fake_transcript(list(UTTERANCES)) - monkeypatch.setattr(clip_exec.client, "get_transcript", fake_get) + monkeypatch.setattr(client, "get_transcript", fake_get) monkeypatch.setattr( - clip_exec.client, + client, "transcribe", lambda *a, **k: pytest.fail("must not re-transcribe when -t is given"), ) @@ -377,7 +390,7 @@ def test_run_clip_merges_transcript_matches_with_explicit_ranges( ): config.set_api_key("default", "sk_test") utterances = [utterance(5000, 8000, "A", "hello")] - monkeypatch.setattr(clip_exec.client, "transcribe", lambda *a, **k: fake_transcript(utterances)) + monkeypatch.setattr(client, "transcribe", lambda *a, **k: fake_transcript(utterances)) opts = dataclasses.replace(DEFAULTS, media=str(media), speakers=["A"], ranges=["7-12"]) clip_exec.run_clip(opts, AppState(), json_mode=True) clips = json.loads(capsys.readouterr().out)["clips"] @@ -386,7 +399,7 @@ def test_run_clip_merges_transcript_matches_with_explicit_ranges( def test_run_clip_errors_when_transcript_has_no_utterances(media, fake_ffmpeg, monkeypatch): config.set_api_key("default", "sk_test") - monkeypatch.setattr(clip_exec.client, "transcribe", lambda *a, **k: fake_transcript(None)) + monkeypatch.setattr(client, "transcribe", lambda *a, **k: fake_transcript(None)) opts = dataclasses.replace(DEFAULTS, media=str(media), speakers=["A"]) with pytest.raises(CLIError) as exc: clip_exec.run_clip(opts, AppState(), json_mode=False) @@ -398,9 +411,7 @@ def test_run_clip_errors_when_transcript_has_no_utterances(media, fake_ffmpeg, m def test_run_clip_errors_when_nothing_matches(media, fake_ffmpeg, monkeypatch): config.set_api_key("default", "sk_test") - monkeypatch.setattr( - clip_exec.client, "transcribe", lambda *a, **k: fake_transcript(list(UTTERANCES)) - ) + monkeypatch.setattr(client, "transcribe", lambda *a, **k: fake_transcript(list(UTTERANCES))) opts = dataclasses.replace(DEFAULTS, media=str(media), speakers=["Z"]) with pytest.raises(CLIError) as exc: clip_exec.run_clip(opts, AppState(), json_mode=False) @@ -411,9 +422,7 @@ def test_run_clip_errors_when_nothing_matches(media, fake_ffmpeg, monkeypatch): def test_run_clip_status_messages(media, fake_ffmpeg, monkeypatch): config.set_api_key("default", "sk_test") - monkeypatch.setattr( - clip_exec.client, "transcribe", lambda *a, **k: fake_transcript(list(UTTERANCES)) - ) + monkeypatch.setattr(client, "transcribe", lambda *a, **k: fake_transcript(list(UTTERANCES))) messages: list[str] = [] @contextlib.contextmanager diff --git a/tests/test_clip_sources.py b/tests/test_clip_sources.py index a08b3174..ab32007c 100644 --- a/tests/test_clip_sources.py +++ b/tests/test_clip_sources.py @@ -11,7 +11,7 @@ import pytest -from aai_cli import clip_exec, clip_select, config +from aai_cli import client, clip_exec, clip_select, config from aai_cli.context import AppState from aai_cli.errors import CLIError, UsageError from tests._clip_helpers import DEFAULTS, UTTERANCES, fake_transcript, record_ffmpeg @@ -88,7 +88,7 @@ def fake_transcribe(api_key, audio, *, config): seen["audio"] = audio return fake_transcript(list(UTTERANCES)) - monkeypatch.setattr(clip_exec.client, "transcribe", fake_transcribe) + monkeypatch.setattr(client, "transcribe", fake_transcribe) monkeypatch.setattr( clip_exec.llm, "transform_transcript", lambda *a, **k: '[{"start": 1, "end": 2}]' ) @@ -168,7 +168,7 @@ def test_run_clip_reads_transcript_json_from_stdin(media, fake_ffmpeg, capsys, m # No API key configured and no client call: the piped JSON is the transcript. monkeypatch.setattr(clip_exec.stdio, "piped_stdin_text", _piped_payload) monkeypatch.setattr( - clip_exec.client, + client, "get_transcript", lambda *a: pytest.fail("must not fetch when JSON is piped"), ) @@ -188,7 +188,7 @@ def fake_get(api_key, transcript_id): seen["args"] = (api_key, transcript_id) return fake_transcript(list(UTTERANCES)) - monkeypatch.setattr(clip_exec.client, "get_transcript", fake_get) + monkeypatch.setattr(client, "get_transcript", fake_get) opts = dataclasses.replace(DEFAULTS, media=str(media), transcript_id="-", speakers=["B"]) clip_exec.run_clip(opts, AppState(), json_mode=True) assert seen["args"] == ("sk_test", "tr_999") @@ -216,9 +216,7 @@ def test_run_clip_stdin_transcript_rejects_bad_json(media, fake_ffmpeg, monkeypa def test_run_clip_llm_selection_drives_the_cut(media, fake_ffmpeg, capsys, monkeypatch): config.set_api_key("default", "sk_test") - monkeypatch.setattr( - clip_exec.client, "transcribe", lambda *a, **k: fake_transcript(list(UTTERANCES)) - ) + monkeypatch.setattr(client, "transcribe", lambda *a, **k: fake_transcript(list(UTTERANCES))) seen = {} def fake_transform(api_key, *, prompt, transcript_text, model, max_tokens): @@ -256,9 +254,7 @@ def fake_transform(api_key, *, prompt, transcript_text, model, max_tokens): def test_run_clip_llm_composes_with_speaker_filter(media, fake_ffmpeg, capsys, monkeypatch): # --speaker narrows the utterances first; the LLM only sees what survived. config.set_api_key("default", "sk_test") - monkeypatch.setattr( - clip_exec.client, "transcribe", lambda *a, **k: fake_transcript(list(UTTERANCES)) - ) + monkeypatch.setattr(client, "transcribe", lambda *a, **k: fake_transcript(list(UTTERANCES))) seen = {} def fake_transform(api_key, *, prompt, transcript_text, model, max_tokens): @@ -277,9 +273,7 @@ def fake_transform(api_key, *, prompt, transcript_text, model, max_tokens): def test_run_clip_llm_works_with_transcript_id(media, fake_ffmpeg, capsys, monkeypatch): # -t with --llm alone is a valid selection (no --speaker/--search needed). config.set_api_key("default", "sk_test") - monkeypatch.setattr( - clip_exec.client, "get_transcript", lambda *a: fake_transcript(list(UTTERANCES)) - ) + monkeypatch.setattr(client, "get_transcript", lambda *a: fake_transcript(list(UTTERANCES))) monkeypatch.setattr( clip_exec.llm, "transform_transcript", @@ -293,9 +287,7 @@ def test_run_clip_llm_works_with_transcript_id(media, fake_ffmpeg, capsys, monke def test_run_clip_llm_parse_error_surfaces(media, fake_ffmpeg, monkeypatch): config.set_api_key("default", "sk_test") - monkeypatch.setattr( - clip_exec.client, "transcribe", lambda *a, **k: fake_transcript(list(UTTERANCES)) - ) + monkeypatch.setattr(client, "transcribe", lambda *a, **k: fake_transcript(list(UTTERANCES))) monkeypatch.setattr(clip_exec.llm, "transform_transcript", lambda *a, **k: "no json, sorry") opts = dataclasses.replace(DEFAULTS, media=str(media), llm_prompt="x") with pytest.raises(CLIError) as exc: @@ -305,9 +297,7 @@ def test_run_clip_llm_parse_error_surfaces(media, fake_ffmpeg, monkeypatch): def test_run_clip_llm_status_message_names_the_model(media, fake_ffmpeg, monkeypatch): config.set_api_key("default", "sk_test") - monkeypatch.setattr( - clip_exec.client, "transcribe", lambda *a, **k: fake_transcript(list(UTTERANCES)) - ) + monkeypatch.setattr(client, "transcribe", lambda *a, **k: fake_transcript(list(UTTERANCES))) monkeypatch.setattr( clip_exec.llm, "transform_transcript", lambda *a, **k: '[{"start": 1, "end": 2}]' ) diff --git a/tests/test_dub_command.py b/tests/test_dub_command.py index 644d6fac..af64bef2 100644 --- a/tests/test_dub_command.py +++ b/tests/test_dub_command.py @@ -12,7 +12,7 @@ from aai_cli import dub_exec, llm from aai_cli.main import app -from tests._dub_helpers import plain +from tests._clip_helpers import plain runner = CliRunner() @@ -53,6 +53,7 @@ def test_defaults_map_to_options(captured_run): assert captured_run["opts"] == dub_exec.DubOptions( media="talk.mp4", language="de", + source_language=None, transcript_id=None, voice=[], model=llm.DEFAULT_MODEL, @@ -71,6 +72,8 @@ def test_every_flag_maps_to_options(captured_run): "talk.mp4", "--lang", "German", + "--source-lang", + "fr", "-t", "tr_1", "--voice", @@ -96,6 +99,7 @@ def test_every_flag_maps_to_options(captured_run): assert captured_run["opts"] == dub_exec.DubOptions( media="talk.mp4", language="German", + source_language="fr", transcript_id="tr_1", voice=["A=jane", "paul"], model="gpt-5", diff --git a/tests/test_dub_exec.py b/tests/test_dub_exec.py index 99cf99fc..81719a54 100644 --- a/tests/test_dub_exec.py +++ b/tests/test_dub_exec.py @@ -8,13 +8,13 @@ from __future__ import annotations import dataclasses -import sys +import os from pathlib import Path from types import SimpleNamespace import pytest -from aai_cli import dub_exec +from aai_cli import dub_exec, mediafile from aai_cli.context import AppState from aai_cli.errors import CLIError, UsageError from tests._dub_helpers import ( @@ -47,8 +47,12 @@ def _fake_key(monkeypatch: pytest.MonkeyPatch): @pytest.mark.parametrize( "instance", - [DEFAULTS, dub_exec._Utterance(start_ms=0, speaker="A", text="hi")], - ids=["options", "utterance"], + [ + DEFAULTS, + dub_exec._Utterance(start_ms=0, speaker="A", text="hi"), + dub_exec._VoicePlan(bare=None, overrides={}), + ], + ids=["options", "utterance", "voice_plan"], ) def test_records_are_immutable(instance): field_name = dataclasses.fields(instance)[0].name @@ -111,6 +115,15 @@ def test_default_out_path(language, expected): assert out == Path("/x") / expected +def test_default_out_path_rejects_unsluggable_language(): + # 中文 and 日本語 would both slug to "" and collide on "talk.dub..mp4", + # silently overwriting each other via ffmpeg's -y. + with pytest.raises(UsageError) as exc: + dub_exec.default_out_path(Path("/x/talk.mp4"), "中文") + assert "default output name" in exc.value.message + assert "--out" in (exc.value.suggestion or "") + + def test_assemble_timeline_fills_gaps_and_pads_tail(): # rate 1000: one second of 16-bit mono PCM is 2000 bytes. track = dub_exec.assemble_timeline([(500, b"\x01\x02")], 1000, total_seconds=1.0) @@ -140,7 +153,7 @@ def test_utterances_of_defaults_and_filtering(): utterance(4000, "C", " Bye "), ] ) - assert dub_exec._utterances_of(transcript) == [ + assert dub_exec._utterances_of(transcript, "tr_dub") == [ dub_exec._Utterance(start_ms=0, speaker="A", text="Hi"), dub_exec._Utterance(start_ms=4000, speaker="C", text="Bye"), ] @@ -153,7 +166,7 @@ def test_utterances_of_defaults_and_filtering(): ) def test_utterances_of_requires_spoken_utterances(utterances): with pytest.raises(CLIError) as exc: - dub_exec._utterances_of(SimpleNamespace(id="tr_x", utterances=utterances)) + dub_exec._utterances_of(SimpleNamespace(utterances=utterances), "tr_x") assert exc.value.error_type == "no_utterances" assert exc.value.exit_code == 2 assert "Transcript tr_x has no utterances to dub" in exc.value.message @@ -170,21 +183,6 @@ def test_total_seconds(duration, expected): assert dub_exec._total_seconds(transcript) == expected -def test_run_ffmpeg_captures_output_and_does_not_raise(): - # The real boundary (not the fake): output is captured as text and a non-zero - # exit must not raise — _mux turns the exit code into a CLIError itself. - result = dub_exec._run_ffmpeg( - [ - sys.executable, - "-c", - "import sys; print('out'); print('err', file=sys.stderr); sys.exit(3)", - ] - ) - assert result.returncode == 3 - assert result.stdout == "out\n" - assert result.stderr == "err\n" - - # --- validation order (cheap local checks before any credential or network) ---- @@ -210,7 +208,8 @@ def test_run_dub_rejects_missing_file(sandbox, tmp_path): dub_exec.run_dub(opts, AppState(), json_mode=False) assert exc.value.error_type == "file_not_found" assert exc.value.exit_code == 2 - assert "local audio/video file" in (exc.value.suggestion or "") + # The command name pins the shared helper's parameterization. + assert "assembly dub needs a local audio/video file" in (exc.value.suggestion or "") def test_run_dub_rejects_directory(sandbox, tmp_path): @@ -229,10 +228,65 @@ def test_run_dub_refuses_to_overwrite_the_input(sandbox, media): assert "overwrite the input file" in exc.value.message +def test_run_dub_rejects_remote_urls_with_the_url_intact(sandbox): + # http(s) URLs are downloaded (or rejected by the yt-dlp branch); a bucket + # URL would otherwise reach Path(), which collapses "//" and echoes a + # corrupted "s3:/bucket/…" back. + url = "s3://bucket/talk.mp4" + opts = dataclasses.replace(DEFAULTS, media=url) + with pytest.raises(UsageError) as exc: + dub_exec.run_dub(opts, AppState(), json_mode=False) + assert url in exc.value.message + assert "Download the media first" in (exc.value.suggestion or "") + + +def test_run_dub_rejects_malformed_voice_before_any_network(sandbox, media): + # No transcription/ffmpeg fakes installed: pytest-socket would fail loudly + # if the malformed mapping survived to the billed pipeline. + opts = dataclasses.replace(DEFAULTS, media=str(media), voice=["A="]) + with pytest.raises(UsageError) as exc: + dub_exec.run_dub(opts, AppState(), json_mode=False) + assert "Invalid --voice mapping" in exc.value.message + + +def test_validate_out_rejects_the_input_via_hard_link(media): + # Two spellings of one file (mimics --out TALK.MP4 on a case-insensitive + # filesystem): path comparison passes, samefile must still catch it. + clone = media.parent / "TALK.MP4" + os.link(media, clone) + with pytest.raises(UsageError) as exc: + mediafile.validate_out(clone, media) + assert "overwrite the input file" in exc.value.message + + +def test_validate_out_rejects_a_directory(media, tmp_path): + with pytest.raises(UsageError) as exc: + mediafile.validate_out(tmp_path, media) + assert "--out is a directory" in exc.value.message + assert "file path" in (exc.value.suggestion or "") + + +def test_validate_out_rejects_a_missing_parent_directory(media, tmp_path): + with pytest.raises(UsageError) as exc: + mediafile.validate_out(tmp_path / "missing" / "dub.mp4", media) + assert "output directory doesn't exist" in exc.value.message + assert "missing" in exc.value.message + + +def test_validate_out_rejects_an_extensionless_output(media, tmp_path): + # ffmpeg picks the container from the extension, so this would fail only + # after the whole billed pipeline (e.g. an extension-less input's default out). + with pytest.raises(UsageError) as exc: + mediafile.validate_out(tmp_path / "noext", media) + assert "has no extension" in exc.value.message + assert ".mp4" in (exc.value.suggestion or "") + + def test_run_dub_requires_ffmpeg(sandbox, media, monkeypatch): monkeypatch.setattr("shutil.which", lambda name: None) opts = dataclasses.replace(DEFAULTS, media=str(media)) with pytest.raises(CLIError) as exc: dub_exec.run_dub(opts, AppState(), json_mode=False) assert exc.value.error_type == "missing_dependency" - assert "ffmpeg" in exc.value.message + # The purpose string pins the shared helper's parameterization. + assert "ffmpeg is required to write the dubbed file" in exc.value.message diff --git a/tests/test_dub_pipeline.py b/tests/test_dub_pipeline.py index a7216328..f4592100 100644 --- a/tests/test_dub_pipeline.py +++ b/tests/test_dub_pipeline.py @@ -2,7 +2,7 @@ the transcribe → translate → synthesize → ffmpeg mux orchestration, voice assignment, and the failure modes of each boundary. The LLM Gateway, streaming TTS, and ffmpeg are faked at the modules dub_exec calls into (`llm.complete`, -`session.synthesize`, `client.transcribe`) and at `dub_exec._run_ffmpeg`; the +`session.synthesize`, `client.transcribe`) and at `mediafile.run_ffmpeg`; the pure helpers and validation order live in test_dub_exec.py.""" from __future__ import annotations @@ -16,11 +16,12 @@ import pytest -from aai_cli import client, dub_exec, llm, youtube +from aai_cli import client, dub_exec, llm, mediafile from aai_cli.context import AppState -from aai_cli.errors import APIError, CLIError, UsageError +from aai_cli.errors import APIError, CLIError from aai_cli.tts import session from aai_cli.tts.session import SpeakResult +from tests._clip_helpers import plain from tests._dub_helpers import ( DEFAULTS, SAMPLE_RATE, @@ -28,7 +29,6 @@ enable_sandbox, fake_transcript, patch_api_key, - plain, record_ffmpeg, record_synthesize, record_transcribe, @@ -79,9 +79,12 @@ def test_run_dub_pipeline_end_to_end( opts = dataclasses.replace(DEFAULTS, media=str(media)) _run(opts, json_mode=True) - # Transcription: the local file, diarized so speakers keep distinct voices. + # Transcription: the local file, diarized so speakers keep distinct voices, + # source language auto-detected (dub input is typically not English). assert fake_transcribe["audio"] == str(media) assert fake_transcribe["config"].speaker_labels is True + assert fake_transcribe["config"].language_detection is True + assert fake_transcribe["config"].language_code is None # Translation: one gateway call per utterance, in order, with the dubbing # system prompt naming the resolved language ("de" -> "German"). @@ -162,6 +165,50 @@ def test_run_dub_human_summary( assert "A=juergen, B=juergen" in out +def test_human_summary_escapes_user_controlled_markup( + media, fake_transcribe, fake_translate, fake_synthesize, fake_ffmpeg, capsys +): + # An unescaped "[/]" in --lang/--voice would raise rich.errors.MarkupError — + # after the whole billed pipeline succeeded and the file was written. + opts = dataclasses.replace( + DEFAULTS, media=str(media), language="Ger[/]man", voice=["[/]bad"], out=Path("dub.x.mp4") + ) + _run(opts, json_mode=False) + out = plain(capsys.readouterr().out) + assert "dubbed to Ger[/]man" in out + assert "A=[/]bad" in out + + +def test_dash_prefixed_out_is_disambiguated_for_ffmpeg( + media, fake_transcribe, fake_translate, fake_synthesize, fake_ffmpeg, monkeypatch, tmp_path +): + # A bare "-dub.de.mp4" argv token would be parsed by ffmpeg as an option. + monkeypatch.chdir(tmp_path) + opts = dataclasses.replace(DEFAULTS, media=str(media), out=Path("-dub.de.mp4")) + _run(opts, json_mode=True) + assert fake_ffmpeg["args"][-1] == "./-dub.de.mp4" + + +def test_run_dub_status_messages( + media, fake_transcribe, fake_translate, fake_synthesize, fake_ffmpeg, monkeypatch +): + messages: list[str] = [] + + @contextlib.contextmanager + def fake_status(message, *, json_mode, quiet): + messages.append(message) + yield + + monkeypatch.setattr(dub_exec.output, "status", fake_status) + _run(dataclasses.replace(DEFAULTS, media=str(media)), json_mode=False) + assert messages == [ + "Transcribing for dubbing…", + f"Translating 2 utterance(s) to German with {llm.DEFAULT_MODEL}…", + "Synthesizing 2 segment(s)…", + "Writing the dubbed file…", + ] + + def test_bare_voice_dubs_every_speaker( media, fake_transcribe, fake_translate, fake_synthesize, fake_ffmpeg ): @@ -171,12 +218,35 @@ def test_bare_voice_dubs_every_speaker( def test_voice_overrides_pin_speakers_without_consuming_rotation( - media, fake_transcribe, fake_translate, fake_synthesize, fake_ffmpeg + media, fake_transcribe, fake_translate, fake_synthesize, fake_ffmpeg, capsys ): opts = dataclasses.replace(DEFAULTS, media=str(media), voice=["A=mary"]) _run(opts, json_mode=True) # A is pinned; B still takes German's native voice from the rotation. assert [cfg.voice for cfg in fake_synthesize] == ["mary", "juergen"] + # Every mapping applied -> no "Ignoring" warning fires. + assert "Ignoring" not in capsys.readouterr().err + + +def test_voice_pin_for_absent_speaker_warns_instead_of_silently_dropping( + media, fake_transcribe, fake_translate, fake_synthesize, fake_ffmpeg, capsys +): + # Mirrors `assembly speak`: a requested --voice mapping is never dropped silently. + opts = dataclasses.replace(DEFAULTS, media=str(media), voice=["Z=paul"]) + _run(opts, json_mode=False) + err = plain(capsys.readouterr().err) + assert "Ignoring --voice mapping(s) for speaker(s) not in the transcript: z." in err + # Human mode warns as prose, not as a {"warning": …} JSON object. + assert not err.lstrip().startswith("{") + + +def test_source_lang_pins_the_transcription_language( + media, fake_transcribe, fake_translate, fake_synthesize, fake_ffmpeg +): + opts = dataclasses.replace(DEFAULTS, media=str(media), source_language="fr") + _run(opts, json_mode=True) + assert fake_transcribe["config"].language_code == "fr" + assert fake_transcribe["config"].language_detection is None def test_english_dub_keeps_the_multi_voice_rotation( @@ -236,6 +306,60 @@ def get_transcript(api_key, transcript_id): assert payload["audio_duration_seconds"] == 3.333 +@pytest.mark.parametrize("status", ["queued", "processing"]) +def test_transcript_id_still_in_flight_is_a_clear_error(media, fake_ffmpeg, monkeypatch, status): + # Without the status check this would surface as a misleading "no utterances + # to dub … pass one created with --speaker-labels". + monkeypatch.setattr( + client, + "get_transcript", + lambda *a: SimpleNamespace(id="tr_q", status=status, utterances=None), + ) + opts = dataclasses.replace(DEFAULTS, media=str(media), transcript_id="tr_q") + with pytest.raises(CLIError) as exc: + _run(opts, json_mode=False) + assert exc.value.error_type == "transcript_not_ready" + assert exc.value.exit_code == 2 + assert f"Transcript tr_q is still {status}" in exc.value.message + assert "assembly transcripts get tr_q" in (exc.value.suggestion or "") + + +@pytest.mark.parametrize( + ("stored_error", "expected"), + [("Audio file unreadable", "Audio file unreadable"), (None, "Transcript failed.")], + ids=["with-reason", "without-reason"], +) +def test_transcript_id_with_error_status_surfaces_the_real_error( + media, fake_ffmpeg, monkeypatch, stored_error, expected +): + monkeypatch.setattr( + client, + "get_transcript", + lambda *a: SimpleNamespace(id="tr_e", status="error", error=stored_error, utterances=None), + ) + opts = dataclasses.replace(DEFAULTS, media=str(media), transcript_id="tr_e") + with pytest.raises(APIError) as exc: + _run(opts, json_mode=False) + assert expected in exc.value.message + + +@pytest.mark.parametrize("finish", ["length", "max_tokens"], ids=["openai", "anthropic"]) +def test_truncated_translation_is_an_api_error( + media, fake_transcribe, fake_synthesize, fake_ffmpeg, monkeypatch, finish +): + # A reply clipped by max_tokens is non-empty but incomplete; dubbing it would + # produce speech that stops mid-sentence with exit 0. + monkeypatch.setattr( + llm, "complete", lambda *a, **k: completion("Hallo, aber abgeschn", finish_reason=finish) + ) + opts = dataclasses.replace(DEFAULTS, media=str(media)) + with pytest.raises(APIError) as exc: + _run(opts, json_mode=False) + assert "utterance 1" in exc.value.message + assert f"cut off at --max-tokens ({llm.DEFAULT_MAX_TOKENS})" in exc.value.message + assert "higher --max-tokens" in (exc.value.suggestion or "") + + def test_empty_translation_is_an_api_error(media, fake_synthesize, fake_ffmpeg, monkeypatch): long_text = "a" * 50 + "TAIL!" transcript = fake_transcript([utterance(0, "A", "Hello."), utterance(1000, "B", long_text)]) @@ -272,8 +396,8 @@ def test_ffmpeg_failure_reports_last_stderr_line( ): monkeypatch.setattr("shutil.which", lambda name: "/usr/bin/ffmpeg") monkeypatch.setattr( - dub_exec, - "_run_ffmpeg", + mediafile, + "run_ffmpeg", lambda args: subprocess.CompletedProcess( args=args, returncode=1, stdout="", stderr="noise\nInvalid data found\n" ), @@ -294,177 +418,11 @@ def test_ffmpeg_silent_failure_reports_exit_code( ): monkeypatch.setattr("shutil.which", lambda name: "/usr/bin/ffmpeg") monkeypatch.setattr( - dub_exec, - "_run_ffmpeg", + mediafile, + "run_ffmpeg", lambda args: subprocess.CompletedProcess(args=args, returncode=3, stdout="", stderr=""), ) opts = dataclasses.replace(DEFAULTS, media=str(media)) with pytest.raises(CLIError) as exc: _run(opts, json_mode=False) assert "ffmpeg exited with code 3" in exc.value.message - - -# --- YouTube / media-page sources ---------------------------------------------- - -YT_URL = "https://www.youtube.com/watch?v=abc123" - - -@pytest.fixture -def fake_download(monkeypatch: pytest.MonkeyPatch): - """Stand in for yt-dlp: 'download' a fixed media file into the temp dir.""" - seen: dict[str, object] = {} - - def download(url, dest_dir, *, video=False, download_sections=None): - seen["url"] = url - seen["video"] = video - seen["download_sections"] = download_sections - path = dest_dir / ("vid123.mp4" if video else "vid123.m4a") - path.write_bytes(b"\x00media") - seen["path"] = path - return path - - monkeypatch.setattr(youtube, "download_media", download) - return seen - - -def test_run_dub_youtube_downloads_and_dubs_into_cwd( - tmp_path, - fake_download, - fake_transcribe, - fake_translate, - fake_synthesize, - fake_ffmpeg, - capsys, - monkeypatch, -): - monkeypatch.chdir(tmp_path) - opts = dataclasses.replace(DEFAULTS, media=YT_URL) - _run(opts, json_mode=True) - # Audio-only download by default — the whole source, no section slicing — - # and the downloaded temp file feeds the pipeline. - assert fake_download["url"] == YT_URL - assert fake_download["video"] is False - assert fake_download["download_sections"] == [] - assert fake_transcribe["audio"] == str(fake_download["path"]) - # ffmpeg muxes over the downloaded file; the default output lands in the cwd, - # named after the download (the temp dir is gone after the run). - args = fake_ffmpeg["args"] - assert args[6] == str(fake_download["path"]) - out = tmp_path / "vid123.dub.german.m4a" - assert args[-1] == str(out) - payload = json.loads(capsys.readouterr().out) - assert payload["source"] == YT_URL - assert payload["out"] == str(out) - - -def test_run_dub_youtube_video_keeps_the_picture( - tmp_path, - fake_download, - fake_transcribe, - fake_translate, - fake_synthesize, - fake_ffmpeg, - capsys, - monkeypatch, -): - monkeypatch.chdir(tmp_path) - messages: list[str] = [] - - @contextlib.contextmanager - def fake_status(message, *, json_mode, quiet): - messages.append(message) - yield - - monkeypatch.setattr(dub_exec.output, "status", fake_status) - opts = dataclasses.replace(DEFAULTS, media=YT_URL, video=True) - _run(opts, json_mode=True) - # --video fetches the full video; the dubbed default output keeps its extension. - assert fake_download["video"] is True - assert messages[0] == "Downloading video…" - payload = json.loads(capsys.readouterr().out) - assert payload["out"] == str(tmp_path / "vid123.dub.german.mp4") - - -def test_run_dub_youtube_audio_download_status_message( - tmp_path, - fake_download, - fake_transcribe, - fake_translate, - fake_synthesize, - fake_ffmpeg, - capsys, - monkeypatch, -): - monkeypatch.chdir(tmp_path) - messages: list[str] = [] - - @contextlib.contextmanager - def fake_status(message, *, json_mode, quiet): - messages.append(message) - yield - - monkeypatch.setattr(dub_exec.output, "status", fake_status) - _run(dataclasses.replace(DEFAULTS, media=YT_URL), json_mode=True) - assert messages[0] == "Downloading audio…" - - -def test_run_dub_youtube_honors_explicit_out( - tmp_path, - fake_download, - fake_transcribe, - fake_translate, - fake_synthesize, - fake_ffmpeg, - capsys, -): - out = tmp_path / "dubbed.mp4" - opts = dataclasses.replace(DEFAULTS, media=YT_URL, out=out) - _run(opts, json_mode=True) - assert fake_ffmpeg["args"][-1] == str(out) - - -def test_run_dub_youtube_download_sections_slice_the_download( - tmp_path, - fake_download, - fake_transcribe, - fake_translate, - fake_synthesize, - fake_ffmpeg, - capsys, - monkeypatch, -): - monkeypatch.chdir(tmp_path) - opts = dataclasses.replace(DEFAULTS, media=YT_URL, download_sections=["*0:00-15:00"]) - _run(opts, json_mode=True) - # The specs reach yt-dlp verbatim, so only that slice is fetched (and dubbed). - assert fake_download["download_sections"] == ["*0:00-15:00"] - - -def test_run_dub_download_sections_require_a_url_source(media, monkeypatch): - # A local file is never downloaded, so the slice specs would be a silent - # no-op — they are rejected instead, with the local-file alternative named. - monkeypatch.setattr("shutil.which", lambda name: "/usr/bin/ffmpeg") - opts = dataclasses.replace(DEFAULTS, media=str(media), download_sections=["*0:00-15:00"]) - with pytest.raises(UsageError) as exc: - _run(opts, json_mode=False) - assert "--download-sections only applies to a downloadable URL source" in exc.value.message - assert "assembly clip" in (exc.value.suggestion or "") - - -def test_run_dub_video_requires_a_url_source(media, monkeypatch): - # A local file's video stream is already copied into the dub, so --video - # would be a silent no-op — it is rejected instead. - monkeypatch.setattr("shutil.which", lambda name: "/usr/bin/ffmpeg") - opts = dataclasses.replace(DEFAULTS, media=str(media), video=True) - with pytest.raises(UsageError) as exc: - _run(opts, json_mode=False) - assert "--video only applies to a downloadable URL source" in exc.value.message - - -def test_run_dub_rejects_non_downloadable_url(monkeypatch): - monkeypatch.setattr("shutil.which", lambda name: "/usr/bin/ffmpeg") - opts = dataclasses.replace(DEFAULTS, media="https://example.com/episode.mp3") - with pytest.raises(UsageError) as exc: - _run(opts, json_mode=False) - assert "assembly dub can't fetch this URL" in exc.value.message - assert "Download the media first" in (exc.value.suggestion or "") diff --git a/tests/test_dub_sources.py b/tests/test_dub_sources.py new file mode 100644 index 00000000..1dac61f0 --- /dev/null +++ b/tests/test_dub_sources.py @@ -0,0 +1,225 @@ +"""Tests for `assembly dub`'s YouTube/media-page URL sources: the audio/video +download (`youtube.download_media`, faked), the --video flag rules, and the +cwd-relative default output naming. The local-file pipeline runs live in +test_dub_pipeline.py.""" + +from __future__ import annotations + +import contextlib +import dataclasses +import json +from pathlib import Path + +import pytest + +from aai_cli import dub_exec, youtube +from aai_cli.context import AppState +from aai_cli.errors import UsageError +from tests._dub_helpers import ( + DEFAULTS, + enable_sandbox, + patch_api_key, + record_ffmpeg, + record_synthesize, + record_transcribe, + record_translate, + write_media, +) + +YT_URL = "https://www.youtube.com/watch?v=abc123" + + +@pytest.fixture +def media(tmp_path: Path) -> Path: + return write_media(tmp_path) + + +@pytest.fixture(autouse=True) +def _sandbox_and_key(monkeypatch: pytest.MonkeyPatch): + enable_sandbox(monkeypatch) + patch_api_key(monkeypatch) + + +@pytest.fixture +def fake_transcribe(monkeypatch: pytest.MonkeyPatch): + return record_transcribe(monkeypatch) + + +@pytest.fixture +def fake_translate(monkeypatch: pytest.MonkeyPatch): + return record_translate(monkeypatch) + + +@pytest.fixture +def fake_synthesize(monkeypatch: pytest.MonkeyPatch): + return record_synthesize(monkeypatch) + + +@pytest.fixture +def fake_ffmpeg(monkeypatch: pytest.MonkeyPatch): + return record_ffmpeg(monkeypatch) + + +@pytest.fixture +def fake_download(monkeypatch: pytest.MonkeyPatch): + """Stand in for yt-dlp: 'download' a fixed media file into the temp dir.""" + seen: dict[str, object] = {} + + def download(url, dest_dir, *, video=False, download_sections=None): + seen["url"] = url + seen["video"] = video + seen["download_sections"] = download_sections + path = dest_dir / ("vid123.mp4" if video else "vid123.m4a") + path.write_bytes(b"\x00media") + seen["path"] = path + return path + + monkeypatch.setattr(youtube, "download_media", download) + return seen + + +def _run(opts, *, json_mode): + dub_exec.run_dub(opts, AppState(), json_mode=json_mode) + + +def test_run_dub_youtube_downloads_and_dubs_into_cwd( + tmp_path, + fake_download, + fake_transcribe, + fake_translate, + fake_synthesize, + fake_ffmpeg, + capsys, + monkeypatch, +): + monkeypatch.chdir(tmp_path) + opts = dataclasses.replace(DEFAULTS, media=YT_URL) + _run(opts, json_mode=True) + # Audio-only download by default — the whole source, no section slicing — + # and the downloaded temp file feeds the pipeline. + assert fake_download["url"] == YT_URL + assert fake_download["video"] is False + assert fake_download["download_sections"] == [] + assert fake_transcribe["audio"] == str(fake_download["path"]) + # ffmpeg muxes over the downloaded file; the default output lands in the cwd, + # named after the download (the temp dir is gone after the run). + args = fake_ffmpeg["args"] + assert args[6] == str(fake_download["path"]) + out = tmp_path / "vid123.dub.german.m4a" + assert args[-1] == str(out) + payload = json.loads(capsys.readouterr().out) + assert payload["source"] == YT_URL + assert payload["out"] == str(out) + + +def test_run_dub_youtube_video_keeps_the_picture( + tmp_path, + fake_download, + fake_transcribe, + fake_translate, + fake_synthesize, + fake_ffmpeg, + capsys, + monkeypatch, +): + monkeypatch.chdir(tmp_path) + messages: list[str] = [] + + @contextlib.contextmanager + def fake_status(message, *, json_mode, quiet): + messages.append(message) + yield + + monkeypatch.setattr(dub_exec.output, "status", fake_status) + opts = dataclasses.replace(DEFAULTS, media=YT_URL, video=True) + _run(opts, json_mode=True) + # --video fetches the full video; the dubbed default output keeps its extension. + assert fake_download["video"] is True + assert messages[0] == "Downloading video…" + payload = json.loads(capsys.readouterr().out) + assert payload["out"] == str(tmp_path / "vid123.dub.german.mp4") + + +def test_run_dub_youtube_audio_download_status_message( + tmp_path, + fake_download, + fake_transcribe, + fake_translate, + fake_synthesize, + fake_ffmpeg, + capsys, + monkeypatch, +): + monkeypatch.chdir(tmp_path) + messages: list[str] = [] + + @contextlib.contextmanager + def fake_status(message, *, json_mode, quiet): + messages.append(message) + yield + + monkeypatch.setattr(dub_exec.output, "status", fake_status) + _run(dataclasses.replace(DEFAULTS, media=YT_URL), json_mode=True) + assert messages[0] == "Downloading audio…" + + +def test_run_dub_youtube_honors_explicit_out( + tmp_path, + fake_download, + fake_transcribe, + fake_translate, + fake_synthesize, + fake_ffmpeg, + capsys, +): + out = tmp_path / "dubbed.mp4" + opts = dataclasses.replace(DEFAULTS, media=YT_URL, out=out) + _run(opts, json_mode=True) + assert fake_ffmpeg["args"][-1] == str(out) + + +def test_run_dub_youtube_download_sections_slice_the_download( + tmp_path, + fake_download, + fake_transcribe, + fake_translate, + fake_synthesize, + fake_ffmpeg, + capsys, + monkeypatch, +): + monkeypatch.chdir(tmp_path) + opts = dataclasses.replace(DEFAULTS, media=YT_URL, download_sections=["*0:00-15:00"]) + _run(opts, json_mode=True) + # The specs reach yt-dlp verbatim, so only that slice is fetched (and dubbed). + assert fake_download["download_sections"] == ["*0:00-15:00"] + + +def test_run_dub_download_sections_require_a_url_source(media, monkeypatch): + # A local file is never downloaded, so the slice specs would be a silent + # no-op — they are rejected instead, with the local-file alternative named. + monkeypatch.setattr("shutil.which", lambda name: "/usr/bin/ffmpeg") + opts = dataclasses.replace(DEFAULTS, media=str(media), download_sections=["*0:00-15:00"]) + with pytest.raises(UsageError) as exc: + _run(opts, json_mode=False) + assert "--download-sections only applies to a downloadable URL source" in exc.value.message + assert "assembly clip" in (exc.value.suggestion or "") + + +def test_run_dub_video_requires_a_url_source(media, monkeypatch): + # A local file's video stream is already copied into the dub, so --video + # would be a silent no-op — it is rejected instead. + monkeypatch.setattr("shutil.which", lambda name: "/usr/bin/ffmpeg") + opts = dataclasses.replace(DEFAULTS, media=str(media), video=True) + with pytest.raises(UsageError) as exc: + _run(opts, json_mode=False) + assert "--video only applies to a downloadable URL source" in exc.value.message + + +def test_run_dub_rejects_non_downloadable_url(monkeypatch): + monkeypatch.setattr("shutil.which", lambda name: "/usr/bin/ffmpeg") + opts = dataclasses.replace(DEFAULTS, media="https://example.com/episode.mp3") + with pytest.raises(UsageError) as exc: + _run(opts, json_mode=False) + assert "assembly dub can't fetch this URL" in exc.value.message + assert "Download the media first" in (exc.value.suggestion or "")