diff --git a/.importlinter b/.importlinter index 9a666fdc..a87b4470 100644 --- a/.importlinter +++ b/.importlinter @@ -10,6 +10,7 @@ source_modules = aai_cli.agent_exec aai_cli.argscan aai_cli.auth + aai_cli.caption_exec aai_cli.client aai_cli.clip_exec aai_cli.clip_select @@ -61,6 +62,7 @@ modules = aai_cli.commands.account aai_cli.commands.agent aai_cli.commands.audit + aai_cli.commands.caption aai_cli.commands.clip aai_cli.commands.deploy aai_cli.commands.dev diff --git a/aai_cli/caption_exec.py b/aai_cli/caption_exec.py new file mode 100644 index 00000000..e5986650 --- /dev/null +++ b/aai_cli/caption_exec.py @@ -0,0 +1,238 @@ +"""Run logic for `assembly caption`: transcribe → SRT export → ffmpeg burn-in. + +The command module (aai_cli/commands/caption.py) only parses argv — it builds a +``CaptionOptions`` and hands it to ``run_caption`` via ``context.run_command`` +(the options/run split, see AGENTS.md), so tests drive the whole pipeline by +constructing options directly. + +The pipeline: the video is transcribed (or an existing transcript is reused via +``--transcript-id``), the transcript's SRT captions are fetched from the export +endpoint, and ffmpeg's ``subtitles`` filter burns them into the picture (open +captions, always visible) while the audio stream is copied untouched. A +YouTube/media-page URL is downloaded first — always the full video, since the +captions are burned into it. +""" + +from __future__ import annotations + +import shutil +import subprocess +import tempfile +from dataclasses import dataclass +from pathlib import Path + +import assemblyai as aai +from rich.markup import escape + +from aai_cli import client, output, youtube +from aai_cli.context import AppState +from aai_cli.errors import CLIError, UsageError + + +@dataclass(frozen=True) +class CaptionOptions: + """Every `assembly caption` flag as plain data (``--json`` excluded: + run_command resolves it into the ``json_mode`` argument).""" + + # The raw source as typed: a local path, or a downloadable media-page URL + # (a pathlib.Path would collapse the "//" in "https://"). + media: str + transcript_id: str | None + chars_per_caption: int | None + font_size: int | None + out: Path | None + + +def default_out_path(media: Path) -> Path: + """The default output file: ``.captioned`` next to the input.""" + return media.parent / f"{media.stem}.captioned{media.suffix}" + + +# ffmpeg's filtergraph syntax gives these characters meaning (option/filter/chain +# separators, stream labels, quoting), so a path embedded in `-vf subtitles=…` +# must escape them or a TMPDIR containing one would corrupt the filter spec. +_FILTER_ESCAPES = str.maketrans({ch: f"\\{ch}" for ch in "\\':,;[]"}) + + +def subtitles_filter(srt: Path, font_size: int | None) -> str: + """The ``-vf`` filtergraph burning ``srt`` into the video.""" + spec = f"subtitles={str(srt).translate(_FILTER_ESCAPES)}" + if font_size is not None: + spec += f":force_style=FontSize={font_size}" + return spec + + +def _validate_media(media: Path) -> None: + """Reject a missing local source before credential resolution, so a typo'd + path reads as "file not found", never as a login prompt or an ffmpeg error.""" + if not media.exists(): + raise CLIError( + f"File not found: {media}", + error_type="file_not_found", + exit_code=2, + suggestion="Check the path. assembly caption needs a local video file.", + ) + if not media.is_file(): + raise CLIError( + f"Not a file: {media}", + error_type="not_a_file", + exit_code=2, + suggestion="Pass a video file, not a directory.", + ) + + +def _validate_out(out: Path, media: Path) -> None: + """The captioned file must never overwrite its own input: ffmpeg would read + and write the same file concurrently, corrupting it.""" + if out.resolve() == media.resolve(): + raise UsageError( + "--out would overwrite the input file.", + suggestion="Pick a different output path.", + ) + + +def _require_ffmpeg() -> str: + """The ffmpeg executable; checked before any (billed) transcription work.""" + path = shutil.which("ffmpeg") + if path is None: + raise CLIError( + "ffmpeg is required to burn captions into video, but it isn't on PATH.", + error_type="missing_dependency", + suggestion="Install it (brew install ffmpeg / apt install ffmpeg) and re-run.", + ) + return path + + +def _run_ffmpeg(args: list[str]) -> subprocess.CompletedProcess[str]: + """Boundary seam for tests: one ffmpeg invocation, output captured.""" + return subprocess.run(args, capture_output=True, text=True, check=False) + + +def _burn(ffmpeg: str, media: Path, srt: Path, out: Path, font_size: int | None) -> None: + """Burn the ``srt`` captions into ``media``'s video stream, writing ``out``. + + The video is necessarily re-encoded (the captions become pixels); ``-c:a + copy`` carries the audio over untouched. The explicit ``-map 0:v`` makes + audio-only input an ffmpeg error ("matches no streams") instead of a silent + uncaptioned copy; ``-map 0:a?`` keeps a silent video legal. ``-y`` makes a + re-run overwrite its own earlier output instead of stalling on ffmpeg's + prompt. + """ + result = _run_ffmpeg( + [ + ffmpeg, + "-hide_banner", + "-loglevel", + "error", + "-y", + "-i", + str(media), + "-vf", + subtitles_filter(srt, font_size), + "-map", + "0:v", + "-map", + "0:a?", + "-c:a", + "copy", + str(out), + ] + ) + if result.returncode != 0: + detail = result.stderr.strip().splitlines() + reason = detail[-1] if detail else f"ffmpeg exited with code {result.returncode}" + raise CLIError( + f"Could not write {out.name}: {reason}", + error_type="caption_failed", + suggestion="Check that the input is a readable video file — captions " + "can't be burned into audio-only media.", + ) + + +def _resolve_transcript( + opts: CaptionOptions, media: Path, state: AppState, *, json_mode: bool +) -> object: + """The transcript whose captions are burned in: fetched by id, or made fresh + from the (already local) media file.""" + if opts.transcript_id is not None: + return client.get_transcript(state.resolve_api_key(), opts.transcript_id) + api_key = state.resolve_api_key() + with output.status("Transcribing for captions…", json_mode=json_mode, quiet=state.quiet): + return client.transcribe(api_key, str(media), config=aai.TranscriptionConfig()) + + +def _fetch_srt(transcript: object, opts: CaptionOptions, *, json_mode: bool, quiet: bool) -> str: + """The transcript's SRT captions from the export endpoint; empty is an error.""" + with output.status("Fetching captions…", json_mode=json_mode, quiet=quiet): + srt = client.select_transcript_field( + transcript, "srt", chars_per_caption=opts.chars_per_caption + ) + if not srt.strip(): + transcript_id = str(getattr(transcript, "id", "")) + raise CLIError( + f"Transcript {transcript_id} has no captions to burn in.", + error_type="no_captions", + exit_code=2, + suggestion="The media may contain no speech; check it with " + "'assembly transcribe '.", + ) + return srt + + +def run_caption(opts: CaptionOptions, state: AppState, *, json_mode: bool) -> None: + """Execute one `assembly caption` invocation from already-parsed flags.""" + ffmpeg = _require_ffmpeg() + if youtube.is_downloadable_url(opts.media): + # A media-page URL (YouTube, …) is downloaded once — always the full + # video, since the captions are burned into it. The download dir is + # temporary, so the default output lands in the current directory. + with tempfile.TemporaryDirectory(prefix="aai-caption-src-") as td: + with output.status("Downloading video…", json_mode=json_mode, quiet=state.quiet): + local = youtube.download_media(opts.media, Path(td), video=True) + out = opts.out if opts.out is not None else Path.cwd() / default_out_path(local).name + _validate_out(out, local) + _caption_and_emit(opts, local, out, ffmpeg, state, json_mode=json_mode) + return + if opts.media.startswith(("http://", "https://")): + raise UsageError( + "assembly caption can't fetch this URL; it captions a local file or a " + "media-page URL yt-dlp can download (YouTube, …).", + suggestion="Download the video first, then caption the local copy.", + ) + media = Path(opts.media) + _validate_media(media) + out = opts.out if opts.out is not None else default_out_path(media) + _validate_out(out, media) + _caption_and_emit(opts, media, out, ffmpeg, state, json_mode=json_mode) + + +def _caption_and_emit( + opts: CaptionOptions, + media: Path, + out: Path, + ffmpeg: str, + state: AppState, + *, + json_mode: bool, +) -> None: + """Caption an already-local video file into ``out`` and report the result.""" + transcript = _resolve_transcript(opts, media, state, json_mode=json_mode) + transcript_id = str(getattr(transcript, "id", "")) + srt = _fetch_srt(transcript, opts, json_mode=json_mode, quiet=state.quiet) + captions = srt.count("-->") # one arrow per SRT cue timing line + with tempfile.TemporaryDirectory(prefix="aai-caption-") as tmp: + srt_path = Path(tmp) / "captions.srt" + srt_path.write_text(srt, encoding="utf-8") + with output.status("Burning captions…", json_mode=json_mode, quiet=state.quiet): + _burn(ffmpeg, media, srt_path, out, opts.font_size) + payload: dict[str, object] = { + "source": opts.media, + "out": str(out), + "transcript_id": transcript_id, + "captions": captions, + } + output.emit( + payload, + lambda _: output.success(f"{escape(str(out))} {captions} caption(s) burned in"), + json_mode=json_mode, + ) diff --git a/aai_cli/clip_exec.py b/aai_cli/clip_exec.py index aae96f48..0e46f954 100644 --- a/aai_cli/clip_exec.py +++ b/aai_cli/clip_exec.py @@ -54,6 +54,7 @@ class ClipOptions: padding: float snap: bool out_dir: Path | None + video: bool def _llm_segments( @@ -347,15 +348,19 @@ def run_clip(opts: ClipOptions, state: AppState, *, json_mode: bool) -> None: """Execute one `assembly clip` invocation from already-parsed flags.""" _validate_out_dir(opts.out_dir) _validate_selection(opts) + youtube.validate_video_flag(opts.media, video=opts.video) explicit = [clip_select.parse_range(value) for value in opts.ranges] ffmpeg = _require_ffmpeg() if youtube.is_downloadable_url(opts.media): - # A media-page URL (YouTube, podcast page, …) is downloaded once and - # clipped locally. The download dir is temporary, so the clips land in - # --out-dir or the current directory — never next to the temp file. + # A media-page URL (YouTube, podcast page, …) is downloaded once — the + # audio track by default, the full video with --video so the clips carry + # video too — and clipped locally. The download dir is temporary, so the + # clips land in --out-dir or the current directory — never next to the + # temp file. + downloading = "Downloading video…" if opts.video else "Downloading audio…" with tempfile.TemporaryDirectory(prefix="aai-clip-") as td: - with output.status("Downloading audio…", json_mode=json_mode, quiet=state.quiet): - local = youtube.download_audio(opts.media, Path(td)) + with output.status(downloading, json_mode=json_mode, quiet=state.quiet): + local = youtube.download_media(opts.media, Path(td), video=opts.video) out_dir = opts.out_dir if opts.out_dir is not None else Path.cwd() _cut_and_emit(opts, local, out_dir, explicit, ffmpeg, state, json_mode=json_mode) return diff --git a/aai_cli/commands/caption.py b/aai_cli/commands/caption.py new file mode 100644 index 00000000..1287e9a3 --- /dev/null +++ b/aai_cli/commands/caption.py @@ -0,0 +1,85 @@ +from __future__ import annotations + +from pathlib import Path + +import typer + +from aai_cli import caption_exec, help_panels, options +from aai_cli.context import run_command +from aai_cli.help_text import examples_epilog + +app = typer.Typer() + + +@app.command( + rich_help_panel=help_panels.TRANSCRIPTION, + epilog=examples_epilog( + [ + ("Burn captions into a video", "assembly caption talk.mp4"), + ( + "Caption a YouTube video (downloaded via yt-dlp)", + 'assembly caption "https://youtube.com/watch?v=ID"', + ), + ( + "Reuse a finished transcript instead of re-transcribing", + "assembly caption talk.mp4 -t TRANSCRIPT_ID", + ), + ( + "Shorter caption lines in a bigger font", + "assembly caption talk.mp4 --chars-per-caption 32 --font-size 28", + ), + ("Choose the output file", "assembly caption talk.mp4 --out talk-captioned.mp4"), + ] + ), +) +def caption( + ctx: typer.Context, + media: str = typer.Argument( + ..., + help="Video to caption: a local file, or a YouTube/media-page URL " + "(the full video is downloaded via yt-dlp).", + ), + transcript_id: str | None = typer.Option( + None, + "--transcript-id", + "-t", + help="Reuse an existing transcript of this media instead of transcribing it again.", + ), + chars_per_caption: int | None = typer.Option( + None, + "--chars-per-caption", + min=1, + help="Max characters per caption line.", + ), + font_size: int | None = typer.Option( + None, + "--font-size", + min=1, + help="Font size of the burned-in captions (ffmpeg's default styling when omitted).", + ), + out: Path | None = typer.Option( + None, "--out", help="Output file (default: .captioned next to the input)." + ), + json_out: bool = options.json_option("Emit JSON describing the captioned file."), +) -> None: + """Burn always-visible captions into a video. + + The video is transcribed (or an existing transcript is reused with + --transcript-id), the transcript's SRT captions are fetched, and ffmpeg + (which must be installed) burns them into the picture as open captions — + the audio stream is copied untouched. A YouTube/media-page URL is + downloaded first (always the full video); its output lands in --out or + the current directory. + """ + opts = caption_exec.CaptionOptions( + media=media, + transcript_id=transcript_id, + chars_per_caption=chars_per_caption, + font_size=font_size, + out=out, + ) + run_command( + ctx, + lambda state, json_mode: caption_exec.run_caption(opts, state, json_mode=json_mode), + json=json_out, + ) diff --git a/aai_cli/commands/clip.py b/aai_cli/commands/clip.py index 8f9ba866..6c6aa487 100644 --- a/aai_cli/commands/clip.py +++ b/aai_cli/commands/clip.py @@ -29,6 +29,10 @@ "Clip a YouTube video's audio with an LLM", 'assembly clip "https://youtube.com/watch?v=ID" --llm "the best quote"', ), + ( + "Cut video clips from the full YouTube video", + 'assembly clip "https://youtube.com/watch?v=ID" --video --llm "the best quote"', + ), ( "Reuse a finished transcript instead of re-transcribing", "assembly clip meeting.mp4 -t TRANSCRIPT_ID --speaker B", @@ -104,6 +108,12 @@ def clip( out_dir: Path | None = typer.Option( None, "--out-dir", help="Directory for the clip files (default: next to the input)." ), + video: bool = typer.Option( + False, + "--video", + help="Download the full video (not just the audio track) for a URL source, " + "so the clips are cut from the video. Local files keep their video already.", + ), json_out: bool = options.json_option("Emit JSON describing the clips written."), ) -> None: """Cut clips out of a media file by speaker, text match, LLM pick, or time range. @@ -114,7 +124,8 @@ def clip( boundaries snap into nearby silence so cuts don't land mid-word (--no-snap disables), and each surviving segment is written as .clipNN using ffmpeg (which must be installed). A YouTube/media-page source is - downloaded first; its clips land in --out-dir or the current directory. + downloaded first (audio only, or the full video with --video); its clips + land in --out-dir or the current directory. """ opts = clip_exec.ClipOptions( media=media, @@ -128,6 +139,7 @@ def clip( padding=padding, snap=snap, out_dir=out_dir, + video=video, ) run_command( ctx, diff --git a/aai_cli/commands/dub.py b/aai_cli/commands/dub.py index a1f72125..d7ba6418 100644 --- a/aai_cli/commands/dub.py +++ b/aai_cli/commands/dub.py @@ -18,6 +18,10 @@ [ ("Dub a talk into German (sandbox only)", "assembly --sandbox dub talk.mp4 --lang de"), ("Use a language name instead of a code", "assembly --sandbox dub talk.mp4 -l Spanish"), + ( + "Dub the full video from YouTube", + 'assembly --sandbox dub "https://youtube.com/watch?v=ID" -l de --video', + ), ( "Dub every speaker with one voice", "assembly --sandbox dub talk.mp4 -l fr --voice paul", @@ -41,7 +45,8 @@ def dub( ctx: typer.Context, media: str = typer.Argument( ..., - help="Local audio/video file to dub (the video stream is copied untouched).", + help="Audio/video to dub: a local file (the video stream is copied untouched), " + "or a YouTube/media-page URL (downloaded via yt-dlp).", ), lang: str = typer.Option( ..., @@ -79,6 +84,12 @@ def dub( out: Path | None = typer.Option( None, "--out", help="Output file (default: .dub. next to the input)." ), + video: bool = typer.Option( + False, + "--video", + help="Download the full video (not just the audio track) for a URL source, " + "so the dub keeps the picture. Local files keep their video already.", + ), json_out: bool = options.json_option("Emit JSON describing the dubbed file."), ) -> None: """Dub a video or audio file into another language (sandbox only). @@ -87,9 +98,10 @@ def dub( utterance timestamps, each utterance is translated by an LLM Gateway model, the translations are synthesized with streaming TTS (one voice per speaker), and ffmpeg lays the new audio over the original — video copied - untouched. Streaming TTS only exists in the sandbox today — run it as - 'assembly --sandbox dub' (--sandbox goes before the subcommand). Requires - ffmpeg. + untouched. A YouTube/media-page URL is downloaded first (audio only, or + the full video with --video). Streaming TTS only exists in the sandbox + today — run it as 'assembly --sandbox dub' (--sandbox goes before the + subcommand). Requires ffmpeg. """ opts = dub_exec.DubOptions( media=media, @@ -99,6 +111,7 @@ def dub( model=model, max_tokens=max_tokens, out=out, + video=video, ) run_command( ctx, diff --git a/aai_cli/dub_exec.py b/aai_cli/dub_exec.py index af479640..d59349ee 100644 --- a/aai_cli/dub_exec.py +++ b/aai_cli/dub_exec.py @@ -10,9 +10,10 @@ the target language by an LLM Gateway model, each translation is synthesized with streaming TTS (one voice per speaker), the segments are laid out on a silence timeline at their original start times, and ffmpeg swaps the new track -over the original media (video stream copied untouched). Streaming TTS only -exists in the sandbox today, so — like `assembly speak` — the command is -sandbox-only. +over the original media (video stream copied untouched). A YouTube/media-page +URL is downloaded first — audio only, or the full video with ``--video`` so the +dub keeps the picture. Streaming TTS only exists in the sandbox today, so — +like `assembly speak` — the command is sandbox-only. """ from __future__ import annotations @@ -27,7 +28,7 @@ import assemblyai as aai from rich.markup import escape -from aai_cli import client, environments, jsonshape, output +from aai_cli import client, environments, jsonshape, output, youtube from aai_cli import llm as gateway from aai_cli.context import AppState from aai_cli.errors import APIError, CLIError, UsageError @@ -80,6 +81,7 @@ class DubOptions: model: str max_tokens: int out: Path | None + video: bool def resolve_language(value: str) -> str: @@ -374,12 +376,52 @@ def run_dub(opts: DubOptions, state: AppState, *, json_mode: bool) -> None: """Execute one `assembly dub` invocation from already-parsed flags.""" language = resolve_language(opts.language) _require_sandbox() + youtube.validate_video_flag(opts.media, video=opts.video) + if youtube.is_downloadable_url(opts.media): + # A media-page URL (YouTube, podcast page, …) is downloaded once — the + # audio track by default, the full video with --video so the dub keeps + # the picture — and dubbed locally. ffmpeg is checked before the + # download so a missing dependency fails before any fetch. + ffmpeg = _require_ffmpeg() + downloading = "Downloading video…" if opts.video else "Downloading audio…" + with tempfile.TemporaryDirectory(prefix="aai-dub-src-") as td: + with output.status(downloading, json_mode=json_mode, quiet=state.quiet): + local = youtube.download_media(opts.media, Path(td), video=opts.video) + # The download dir is temporary, so the default output lands in the + # current directory — never next to the temp file. + out = ( + opts.out + if opts.out is not None + else Path.cwd() / default_out_path(local, language).name + ) + _validate_out(out, local) + _dub_and_emit(opts, local, out, language, ffmpeg, state, json_mode=json_mode) + return + if opts.media.startswith(("http://", "https://")): + raise UsageError( + "assembly dub can't fetch this URL; it dubs a local file or a " + "media-page URL yt-dlp can download (YouTube, podcasts, …).", + suggestion="Download the media first, then dub the local copy.", + ) media = Path(opts.media) _validate_media(media) out = opts.out if opts.out is not None else default_out_path(media, language) _validate_out(out, media) ffmpeg = _require_ffmpeg() + _dub_and_emit(opts, media, out, language, ffmpeg, state, json_mode=json_mode) + +def _dub_and_emit( + opts: DubOptions, + media: Path, + out: Path, + language: str, + ffmpeg: str, + state: AppState, + *, + json_mode: bool, +) -> None: + """Dub an already-local media file into ``out`` and report the result.""" transcript = _resolve_transcript(opts, media, state, json_mode=json_mode) transcript_id = str(getattr(transcript, "id", "")) utterances = _utterances_of(transcript) diff --git a/aai_cli/main.py b/aai_cli/main.py index 001c2434..b3a233c8 100644 --- a/aai_cli/main.py +++ b/aai_cli/main.py @@ -25,6 +25,7 @@ account, agent, audit, + caption, clip, deploy, dev, @@ -74,6 +75,7 @@ "llm", "clip", "dub", + "caption", "eval", "webhooks", # Setup & Tools — get set up & maintain @@ -415,6 +417,7 @@ def main( app.add_typer(llm.app) app.add_typer(clip.app) app.add_typer(dub.app) +app.add_typer(caption.app) app.add_typer(evaluate.app) # eval app.add_typer(account.app) # balance, usage, limits app.add_typer(login.app) # login, logout, whoami diff --git a/aai_cli/skills/aai-cli/references/transcription.md b/aai_cli/skills/aai-cli/references/transcription.md index 6575d6ef..f0c2ec56 100644 --- a/aai_cli/skills/aai-cli/references/transcription.md +++ b/aai_cli/skills/aai-cli/references/transcription.md @@ -118,8 +118,9 @@ assembly llm --list-models ## `assembly clip MEDIA` — cut a media file by transcript content Cuts clips out of an audio/video file with ffmpeg (must be installed). `MEDIA` -is a local file or a YouTube/media-page URL (audio downloaded via yt-dlp; the -clips then land in `--out-dir` or the current directory). `--speaker`/`--search` +is a local file or a YouTube/media-page URL (audio downloaded via yt-dlp — or +the full video with `--video`, so the clips carry video; the clips then land +in `--out-dir` or the current directory). `--speaker`/`--search` select diarized utterances — the file is transcribed with speaker labels on the fly, or pass `-t/--transcript-id` (an id, or `-` to read an id or `transcribe --json` output from stdin). `--llm "instruction"` sends the @@ -136,7 +137,8 @@ High-value flags: (repeatable). - LLM: `--model` (default `claude-haiku-4-5-20251001`), `--max-tokens N`. - Shaping: `--padding 0.5` (seconds around each clip), `--no-snap` (cut at the - exact selected times instead of snapping into silence), `--out-dir clips/`. + exact selected times instead of snapping into silence), `--out-dir clips/`, + `--video` (URL sources: download the full video, not just the audio track). - Output: `--json` (paths + start/end/duration of each clip written). Examples: @@ -145,6 +147,30 @@ Examples: assembly clip meeting.mp4 --speaker A assembly clip call.mp3 --search "pricing" --padding 0.5 assembly clip talk.mp4 --range 1:30-2:45 --range 10:00-10:30 -assembly clip "https://youtube.com/watch?v=ID" --llm "the strongest quote" +assembly clip "https://youtube.com/watch?v=ID" --video --llm "the strongest quote" assembly transcribe meeting.mp4 --speaker-labels --json | assembly clip meeting.mp4 -t - --llm "the funniest exchange" ``` + +## `assembly caption MEDIA` — burn always-visible captions into a video + +Transcribes a video (or reuses a transcript with `-t/--transcript-id`), fetches +the transcript's SRT captions, and burns them into the picture with ffmpeg +(must be installed) as open captions — the audio stream is copied untouched. +`MEDIA` is a local file or a YouTube/media-page URL (always downloaded as the +full video via yt-dlp; the output then lands in `--out` or the current +directory). The default output is `.captioned` next to the input. + +High-value flags: + +- Shaping: `--chars-per-caption 32` (max characters per caption line), + `--font-size 28` (ffmpeg's default styling when omitted). +- Output: `--out captioned.mp4`, `--json` (source, output path, transcript id, + caption count). + +Examples: + +```bash +assembly caption talk.mp4 +assembly caption "https://youtube.com/watch?v=ID" +assembly caption talk.mp4 -t TRANSCRIPT_ID --chars-per-caption 32 --font-size 28 +``` diff --git a/aai_cli/stream_exec.py b/aai_cli/stream_exec.py index f2bdf72e..aa3e03e2 100644 --- a/aai_cli/stream_exec.py +++ b/aai_cli/stream_exec.py @@ -186,7 +186,7 @@ def _dispatch(session: StreamSession, opts: SourceOptions) -> None: elif opts.source and youtube.is_downloadable_url(opts.source): # Fetch the audio first, then stream the local file in real time. with tempfile.TemporaryDirectory(prefix="aai-yt-") as td: - local = youtube.download_audio(opts.source, Path(td)) + local = youtube.download_media(opts.source, Path(td)) session.run(FileSource(str(local)), TARGET_RATE) elif opts.from_file: file_audio = FileSource(client.resolve_audio_source(opts.source, sample=opts.sample)) diff --git a/aai_cli/transcribe_exec.py b/aai_cli/transcribe_exec.py index f0467ff2..673ab1a7 100644 --- a/aai_cli/transcribe_exec.py +++ b/aai_cli/transcribe_exec.py @@ -187,7 +187,7 @@ def run_transcription( if youtube.is_downloadable_url(audio): # Fetch first; AssemblyAI can't read a YouTube/podcast page URL itself. with tempfile.TemporaryDirectory(prefix="aai-yt-") as td: - local = youtube.download_audio(audio, Path(td), download_sections=download_sections) + local = youtube.download_media(audio, Path(td), download_sections=download_sections) return client.transcribe(api_key, str(local), config=transcription_config) return client.transcribe(api_key, audio, config=transcription_config) diff --git a/aai_cli/youtube.py b/aai_cli/youtube.py index d91ecf89..bfc9afeb 100644 --- a/aai_cli/youtube.py +++ b/aai_cli/youtube.py @@ -1,7 +1,9 @@ -"""Downloading audio from media-page URLs (YouTube, podcast pages, …) via yt-dlp. +"""Downloading media from media-page URLs (YouTube, podcast pages, …) via yt-dlp. The AssemblyAI API fetches direct audio URLs itself; this module handles the URLs it -can't — HTML pages whose audio yt-dlp knows how to extract. +can't — HTML pages whose media yt-dlp knows how to extract. Downloads fetch the +audio track by default (all the API needs); commands whose output is the media +itself (clip/dub/caption) can fetch the full video instead. """ from __future__ import annotations @@ -59,10 +61,10 @@ def is_youtube_url(source: str | None) -> bool: def is_downloadable_url(source: str | None) -> bool: - """True if `source` is a media-page URL whose audio must be downloaded first. + """True if `source` is a media-page URL whose media must be downloaded first. YouTube is matched by shape alone — no yt-dlp import needed, so a missing yt-dlp - still routes to ``download_audio``'s install hint. Other http(s) URLs match when + still routes to ``download_media``'s install hint. Other http(s) URLs match when a dedicated yt-dlp extractor claims them (Apple Podcasts, Spreaker, SoundCloud, …). Direct audio URLs and unknown pages match only yt-dlp's catch-all ``Generic`` extractor, which is excluded: those pass through untouched for the API to fetch. @@ -142,32 +144,36 @@ def parse_download_sections( return chapters, ranges, from_url -def download_audio(url: str, dest_dir: Path, *, download_sections: list[str] | None = None) -> Path: - """Download the best audio track of `url` into `dest_dir` and return its path. +def validate_video_flag(source: str, *, video: bool) -> None: + """Reject ``--video`` for a source that isn't a downloadable URL. - Uses yt-dlp; the resulting container (m4a/webm/…) is decodable by ffmpeg - (streaming) and uploadable for transcription. `download_sections` accepts yt-dlp's - ``--download-sections`` specs (e.g. ``*0:00-5:00`` for the first five minutes), each - fetching only that part of the source. + The flag selects the full-video download for a media-page URL; a local file's + video stream is already operated on directly, so the flag would be a silent + no-op there — and a requested flag is never dropped silently. """ - try: - import yt_dlp - except ImportError as exc: - raise CLIError( - "YouTube support needs yt-dlp.", - error_type="ytdlp_missing", - exit_code=2, - suggestion="Install it: pip install yt-dlp", - ) from exc + if video and not is_downloadable_url(source): + raise UsageError( + "--video only applies to a downloadable URL source (YouTube, media pages, …).", + suggestion="A local file's video is used directly already; drop --video.", + ) + +def _ytdlp_options( + dest_dir: Path, *, video: bool, download_sections: list[str] | None +) -> dict[str, object]: + """The yt-dlp options for one download (caller has already imported yt_dlp).""" options: dict[str, object] = { - "format": "bestaudio/best", + "format": "bestvideo*+bestaudio/best" if video else "bestaudio/best", "outtmpl": str(dest_dir / "%(id)s.%(ext)s"), "quiet": True, "no_warnings": True, "noprogress": True, "logger": _YTDLP_LOGGER, } + if video: + # Separate video+audio streams (YouTube's norm) must merge into one + # container; mp4 keeps the result playable (and clippable) everywhere. + options["merge_output_format"] = "mp4" if download_sections: from yt_dlp.utils import download_range_func @@ -181,6 +187,37 @@ def download_audio(url: str, dest_dir: Path, *, download_sections: list[str] | N ) # Cut at exact timestamps rather than the nearest keyframe (yt-dlp's default). options["force_keyframes_at_cuts"] = True + return options + + +def download_media( + url: str, + dest_dir: Path, + *, + video: bool = False, + download_sections: list[str] | None = None, +) -> Path: + """Download the media of `url` into `dest_dir` and return its path. + + Uses yt-dlp. The default fetches only the best audio track — all transcription + needs; the resulting container (m4a/webm/…) is decodable by ffmpeg (streaming) + and uploadable. ``video=True`` fetches the full video instead (best video+audio, + merged to mp4) for commands whose output is the media itself. `download_sections` + accepts yt-dlp's ``--download-sections`` specs (e.g. ``*0:00-5:00`` for the first + five minutes), each fetching only that part of the source. + """ + noun = "video" if video else "audio" + try: + import yt_dlp + except ImportError as exc: + raise CLIError( + "YouTube support needs yt-dlp.", + error_type="ytdlp_missing", + exit_code=2, + suggestion="Install it: pip install yt-dlp", + ) from exc + + options = _ytdlp_options(dest_dir, video=video, download_sections=download_sections) try: # yt-dlp types `params` as a private `_Params` TypedDict, but a plain options # dict is the documented public API; pyright can't reconcile the two. @@ -189,7 +226,7 @@ def download_audio(url: str, dest_dir: Path, *, download_sections: list[str] | N path = Path(ydl.prepare_filename(info)) except Exception as exc: # yt-dlp raises many types; surface one clean CLI error raise CLIError( - f"Could not download audio from {url}: {_ytdlp_error_message(exc)}", + f"Could not download {noun} from {url}: {_ytdlp_error_message(exc)}", error_type="youtube_error", exit_code=1, ) from exc @@ -198,11 +235,11 @@ def download_audio(url: str, dest_dir: Path, *, download_sections: list[str] | N # Post-processing can change the extension; fall back to whatever landed. # yt-dlp may also drop sidecars (thumbnail, .info.json, a leftover .part) # in dest_dir, and iterdir() order is arbitrary, so pick the largest file: - # the decoded audio track dwarfs any metadata sidecar. + # the decoded media track dwarfs any metadata sidecar. files = [p for p in dest_dir.iterdir() if p.is_file()] if not files: raise CLIError( - f"yt-dlp produced no audio file for {url}.", + f"yt-dlp produced no {noun} file for {url}.", error_type="youtube_error", exit_code=1, ) diff --git a/tests/__snapshots__/test_snapshots_help_run.ambr b/tests/__snapshots__/test_snapshots_help_run.ambr index cd06d620..29b6a7ed 100644 --- a/tests/__snapshots__/test_snapshots_help_run.ambr +++ b/tests/__snapshots__/test_snapshots_help_run.ambr @@ -65,6 +65,62 @@ + ''' +# --- +# name: test_command_help_matches_snapshot[caption] + ''' + + Usage: assembly caption [OPTIONS] MEDIA + + Burn always-visible captions into a video. + + The video is transcribed (or an existing transcript is reused with + --transcript-id), the transcript's SRT captions are fetched, and ffmpeg + (which must be installed) burns them into the picture as open captions — + the audio stream is copied untouched. A YouTube/media-page URL is + downloaded first (always the full video); its output lands in --out or + the current directory. + + ╭─ Arguments ──────────────────────────────────────────────────────────────────╮ + │ * media TEXT Video to caption: a local file, or a │ + │ YouTube/media-page URL (the full video is downloaded │ + │ via yt-dlp). │ + │ [required] │ + ╰──────────────────────────────────────────────────────────────────────────────╯ + ╭─ Options ────────────────────────────────────────────────────────────────────╮ + │ --transcript-id -t TEXT Reuse an existing │ + │ transcript of this media │ + │ instead of transcribing │ + │ it again. │ + │ --chars-per-caption INTEGER RANGE [x>=1] Max characters per │ + │ caption line. │ + │ --font-size INTEGER RANGE [x>=1] Font size of the │ + │ burned-in captions │ + │ (ffmpeg's default styling │ + │ when omitted). │ + │ --out PATH Output file (default: │ + │ .captioned │ + │ next to the input). │ + │ --json -j Emit JSON describing the │ + │ captioned file. │ + │ --help Show this message and │ + │ exit. │ + ╰──────────────────────────────────────────────────────────────────────────────╯ + + Examples + Burn captions into a video + $ assembly caption talk.mp4 + Caption a YouTube video (downloaded via yt-dlp) + $ assembly caption "https://youtube.com/watch?v=ID" + Reuse a finished transcript instead of re-transcribing + $ assembly caption talk.mp4 -t TRANSCRIPT_ID + Shorter caption lines in a bigger font + $ assembly caption talk.mp4 --chars-per-caption 32 --font-size 28 + Choose the output file + $ assembly caption talk.mp4 --out talk-captioned.mp4 + + + ''' # --- # name: test_command_help_matches_snapshot[clip] @@ -80,7 +136,8 @@ boundaries snap into nearby silence so cuts don't land mid-word (--no-snap disables), and each surviving segment is written as .clipNN using ffmpeg (which must be installed). A YouTube/media-page source is - downloaded first; its clips land in --out-dir or the current directory. + downloaded first (audio only, or the full video with --video); its clips + land in --out-dir or the current directory. ╭─ Arguments ──────────────────────────────────────────────────────────────────╮ │ * media TEXT Audio/video to cut clips from: a local file, or a │ @@ -125,6 +182,14 @@ │ --out-dir PATH Directory for the │ │ clip files (default: │ │ next to the input). │ + │ --video Download the full │ + │ video (not just the │ + │ audio track) for a │ + │ URL source, so the │ + │ clips are cut from │ + │ the video. Local │ + │ files keep their │ + │ video already. │ │ --json -j Emit JSON describing │ │ the clips written. │ │ --help Show this message │ @@ -152,6 +217,9 @@ $ assembly clip meeting.mp4 --llm "the three strongest customer objections" Clip a YouTube video's audio with an LLM $ assembly clip "https://youtube.com/watch?v=ID" --llm "the best quote" + Cut video clips from the full YouTube video + $ assembly clip "https://youtube.com/watch?v=ID" --video --llm "the best + quote" Reuse a finished transcript instead of re-transcribing $ assembly clip meeting.mp4 -t TRANSCRIPT_ID --speaker B Pipe transcribe straight into clip @@ -225,13 +293,15 @@ utterance timestamps, each utterance is translated by an LLM Gateway model, the translations are synthesized with streaming TTS (one voice per speaker), and ffmpeg lays the new audio over the original — video copied - untouched. Streaming TTS only exists in the sandbox today — run it as - 'assembly --sandbox dub' (--sandbox goes before the subcommand). Requires - ffmpeg. + untouched. A YouTube/media-page URL is downloaded first (audio only, or + the full video with --video). Streaming TTS only exists in the sandbox + today — run it as 'assembly --sandbox dub' (--sandbox goes before the + subcommand). Requires ffmpeg. ╭─ Arguments ──────────────────────────────────────────────────────────────────╮ - │ * media TEXT Local audio/video file to dub (the video stream is │ - │ copied untouched). │ + │ * media TEXT Audio/video to dub: a local file (the video stream is │ + │ copied untouched), or a YouTube/media-page URL │ + │ (downloaded via yt-dlp). │ │ [required] │ ╰──────────────────────────────────────────────────────────────────────────────╯ ╭─ Options ────────────────────────────────────────────────────────────────────╮ @@ -248,6 +318,10 @@ │ native voice(s). │ │ --out PATH Output file (default: │ │ .dub. next to the input). │ + │ --video Download the full video (not just the │ + │ audio track) for a URL source, so the dub │ + │ keeps the picture. Local files keep their │ + │ video already. │ │ --json -j Emit JSON describing the dubbed file. │ │ --help Show this message and exit. │ ╰──────────────────────────────────────────────────────────────────────────────╯ @@ -264,6 +338,8 @@ $ assembly --sandbox dub talk.mp4 --lang de Use a language name instead of a code $ assembly --sandbox dub talk.mp4 -l Spanish + Dub the full video from YouTube + $ assembly --sandbox dub "https://youtube.com/watch?v=ID" -l de --video Dub every speaker with one voice $ assembly --sandbox dub talk.mp4 -l fr --voice paul Pin a voice per diarized speaker diff --git a/tests/_clip_helpers.py b/tests/_clip_helpers.py index 86e969fe..9b5ee48d 100644 --- a/tests/_clip_helpers.py +++ b/tests/_clip_helpers.py @@ -32,6 +32,7 @@ padding=0.0, snap=True, out_dir=None, + video=False, ) diff --git a/tests/_dub_helpers.py b/tests/_dub_helpers.py index d1670d9e..4ee57eee 100644 --- a/tests/_dub_helpers.py +++ b/tests/_dub_helpers.py @@ -30,6 +30,7 @@ model=llm.DEFAULT_MODEL, max_tokens=llm.DEFAULT_MAX_TOKENS, out=None, + video=False, ) SAMPLE_RATE = 100 # tiny rate keeps the timeline byte math exact and readable diff --git a/tests/_snapshot_surface.py b/tests/_snapshot_surface.py index 447a63d1..1b7f94f5 100644 --- a/tests/_snapshot_surface.py +++ b/tests/_snapshot_surface.py @@ -33,6 +33,7 @@ "llm", "clip", "dub", + "caption", "eval", "webhooks", } diff --git a/tests/test_caption_command.py b/tests/test_caption_command.py new file mode 100644 index 00000000..4797cab6 --- /dev/null +++ b/tests/test_caption_command.py @@ -0,0 +1,132 @@ +"""CLI-level tests for `assembly caption`: argv → CaptionOptions parsing, error +rendering, and the command's placement in the root help. The pipeline itself is +covered in test_caption_exec.py.""" + +from __future__ import annotations + +import json +import re +from pathlib import Path + +from typer.testing import CliRunner + +from aai_cli import caption_exec +from aai_cli.caption_exec import CaptionOptions +from aai_cli.main import app + +runner = CliRunner() + +_ANSI_SGR = re.compile(r"\x1b\[[0-9;]*m") + + +def _plain(text: str) -> str: + return _ANSI_SGR.sub("", text) + + +def _capture_run_caption(monkeypatch): + captured = {} + + def fake_run_caption(opts, state, *, json_mode): + captured["opts"] = opts + captured["json_mode"] = json_mode + + monkeypatch.setattr(caption_exec, "run_caption", fake_run_caption) + return captured + + +def test_caption_parses_every_flag_into_options(monkeypatch): + captured = _capture_run_caption(monkeypatch) + result = runner.invoke( + app, + [ + "caption", + "talk.mp4", + "-t", + "tr_abc", + "--chars-per-caption", + "32", + "--font-size", + "28", + "--out", + "captioned.mp4", + "--json", + ], + ) + assert result.exit_code == 0, result.output + assert captured["opts"] == CaptionOptions( + media="talk.mp4", + transcript_id="tr_abc", + chars_per_caption=32, + font_size=28, + out=Path("captioned.mp4"), + ) + assert captured["json_mode"] is True + + +def test_caption_defaults_when_only_media_is_given(monkeypatch): + captured = _capture_run_caption(monkeypatch) + result = runner.invoke(app, ["caption", "talk.mp4"]) + assert result.exit_code == 0, result.output + assert captured["opts"] == CaptionOptions( + media="talk.mp4", + transcript_id=None, + chars_per_caption=None, + font_size=None, + out=None, + ) + assert captured["json_mode"] is False + + +def test_caption_accepts_the_minimum_flag_values(monkeypatch): + # Both numeric flags declare min=1; the boundary value must parse. + captured = _capture_run_caption(monkeypatch) + result = runner.invoke( + app, ["caption", "talk.mp4", "--chars-per-caption", "1", "--font-size", "1"] + ) + assert result.exit_code == 0, result.output + assert captured["opts"].chars_per_caption == 1 + assert captured["opts"].font_size == 1 + + +def test_caption_rejects_zero_flag_values(): + result = runner.invoke(app, ["caption", "talk.mp4", "--font-size", "0"]) + assert result.exit_code == 2 + result = runner.invoke(app, ["caption", "talk.mp4", "--chars-per-caption", "0"]) + assert result.exit_code == 2 + + +def test_caption_requires_the_media_argument(): + result = runner.invoke(app, ["caption"]) + assert result.exit_code == 2 + + +def test_caption_missing_file_renders_clean_error(tmp_path, monkeypatch): + monkeypatch.setattr("shutil.which", lambda name: "/usr/bin/ffmpeg") + result = runner.invoke(app, ["caption", str(tmp_path / "nope.mp4")]) + assert result.exit_code == 2 + plain = _plain(result.output) + assert "File not found" in plain + assert "Traceback" not in plain + + +def test_caption_json_error_shape(tmp_path, monkeypatch): + monkeypatch.setattr("shutil.which", lambda name: "/usr/bin/ffmpeg") + result = runner.invoke(app, ["caption", str(tmp_path / "nope.mp4"), "--json"]) + assert result.exit_code == 2 + err = json.loads(_plain(result.output).strip()) + assert err["error"]["type"] == "file_not_found" + + +def test_caption_is_listed_between_dub_and_eval_in_root_help(): + # Pins caption's slot in _COMMAND_ORDER: it renders in the "Run AssemblyAI" + # panel after dub, not alphabetically at the end of the help. + result = runner.invoke(app, ["--help"]) + assert result.exit_code == 0 + plain = _plain(result.output) + + def row(name: str) -> int: + match = re.search(rf"^[│|\s]*{name}\s", plain, flags=re.MULTILINE) + assert match is not None, f"{name} not in root help" + return match.start() + + assert row("dub") < row("caption") < row("eval") diff --git a/tests/test_caption_exec.py b/tests/test_caption_exec.py new file mode 100644 index 00000000..372b411e --- /dev/null +++ b/tests/test_caption_exec.py @@ -0,0 +1,406 @@ +"""Direct tests of the `assembly caption` options/run seam (aai_cli/caption_exec.py): +the pure helpers (output naming, filtergraph escaping), validation order, and the +faked transcribe → SRT export → ffmpeg burn-in runs. The boundaries are faked at +the modules caption_exec calls into (`client.transcribe`, `client.get_transcript`, +`youtube.download_media`) and at `caption_exec._run_ffmpeg`; argv parsing lives in +test_caption_command.py.""" + +from __future__ import annotations + +import contextlib +import dataclasses +import json +import re +import subprocess +import sys +from pathlib import Path +from types import SimpleNamespace + +import pytest + +from aai_cli import caption_exec, client, config, youtube +from aai_cli.caption_exec import CaptionOptions +from aai_cli.context import AppState +from aai_cli.errors import CLIError, UsageError + +# The CLI's flag defaults, as data. Tests override per-case with dataclasses.replace. +DEFAULTS = CaptionOptions( + media="talk.mp4", + transcript_id=None, + chars_per_caption=None, + font_size=None, + out=None, +) + +SRT = "1\n00:00:00,500 --> 00:00:01,500\nHello.\n\n2\n00:00:02,000 --> 00:00:03,000\nWorld.\n" + +_ANSI_SGR = re.compile(r"\x1b\[[0-9;]*m") + + +def plain(text: str) -> str: + """Strip SGR color codes (CI forces color on) for substring assertions.""" + return _ANSI_SGR.sub("", text) + + +def fake_transcript(srt: str = SRT, transcript_id: str = "tr_cap"): + """A transcript double whose SRT export records the chars_per_caption it got.""" + calls: list[object] = [] + + def export(chars_per_caption=None): + calls.append(chars_per_caption) + return srt + + return SimpleNamespace(id=transcript_id, export_subtitles_srt=export, export_calls=calls) + + +def record_ffmpeg(monkeypatch, *, returncode: int = 0, stderr: str = ""): + """Resolve ffmpeg and record the invocation plus the SRT it was handed. + + The temp SRT is deleted right after the burn, so its contents are captured + here, while the file still exists (args[8] is the -vf filtergraph). + """ + monkeypatch.setattr("shutil.which", lambda name: f"/usr/bin/{name}") + recorded: dict[str, object] = {} + + def run(args: list[str]) -> subprocess.CompletedProcess[str]: + recorded["args"] = args + srt_path = args[8].removeprefix("subtitles=").split(":force_style")[0] + recorded["srt"] = Path(srt_path).read_text(encoding="utf-8") + return subprocess.CompletedProcess( + args=args, returncode=returncode, stdout="", stderr=stderr + ) + + monkeypatch.setattr(caption_exec, "_run_ffmpeg", run) + return recorded + + +@pytest.fixture +def media(tmp_path: Path) -> Path: + path = tmp_path / "talk.mp4" + path.write_bytes(b"\x00fake-media") + return path + + +@pytest.fixture(autouse=True) +def _fake_key(monkeypatch: pytest.MonkeyPatch): + monkeypatch.setattr(config, "resolve_api_key", lambda **_: "test-key") + + +@pytest.fixture +def fake_transcribe(monkeypatch: pytest.MonkeyPatch): + """Record the transcription request and return the canned transcript.""" + calls: dict[str, object] = {} + transcript = fake_transcript() + + def _fake(api_key, audio, *, config): + calls["api_key"] = api_key + calls["audio"] = audio + calls["config"] = config + return transcript + + monkeypatch.setattr(client, "transcribe", _fake) + calls["transcript"] = transcript + return calls + + +@pytest.fixture +def fake_ffmpeg(monkeypatch: pytest.MonkeyPatch): + return record_ffmpeg(monkeypatch) + + +def _run(opts, *, json_mode): + caption_exec.run_caption(opts, AppState(), json_mode=json_mode) + + +# --- records and pure helpers -------------------------------------------------- + + +def test_options_are_immutable(): + field_name = dataclasses.fields(DEFAULTS)[0].name + with pytest.raises(dataclasses.FrozenInstanceError): + setattr(DEFAULTS, field_name, None) + + +def test_default_out_path(): + assert caption_exec.default_out_path(Path("/x/talk.mp4")) == Path("/x/talk.captioned.mp4") + + +def test_subtitles_filter_plain_path(): + assert caption_exec.subtitles_filter(Path("/tmp/c.srt"), None) == "subtitles=/tmp/c.srt" + + +def test_subtitles_filter_appends_font_size(): + spec = caption_exec.subtitles_filter(Path("/tmp/c.srt"), 28) + assert spec == "subtitles=/tmp/c.srt:force_style=FontSize=28" + + +def test_subtitles_filter_escapes_filtergraph_metacharacters(): + # ffmpeg's filtergraph syntax gives these characters meaning; an unescaped + # one in a TMPDIR-derived path would corrupt the filter spec. + spec = caption_exec.subtitles_filter(Path("/tmp/a'b:c,d;e[f]g.srt"), None) + assert spec == "subtitles=/tmp/a\\'b\\:c\\,d\\;e\\[f\\]g.srt" + + +def test_run_ffmpeg_captures_output_and_does_not_raise(): + # The real boundary (not the fake): output is captured as text and a non-zero + # exit must not raise — _burn turns the exit code into a CLIError itself. + result = caption_exec._run_ffmpeg( + [ + sys.executable, + "-c", + "import sys; print('out'); print('err', file=sys.stderr); sys.exit(3)", + ] + ) + assert result.returncode == 3 + assert result.stdout == "out\n" + assert result.stderr == "err\n" + + +# --- validation order (cheap local checks before any credential or network) ---- + + +def test_run_caption_requires_ffmpeg(monkeypatch): + monkeypatch.setattr("shutil.which", lambda name: None) + with pytest.raises(CLIError) as exc: + _run(DEFAULTS, json_mode=False) + assert exc.value.error_type == "missing_dependency" + assert "ffmpeg" in exc.value.message + + +def test_run_caption_rejects_missing_file(fake_ffmpeg, tmp_path): + opts = dataclasses.replace(DEFAULTS, media=str(tmp_path / "nope.mp4")) + with pytest.raises(CLIError) as exc: + _run(opts, json_mode=False) + assert exc.value.error_type == "file_not_found" + assert exc.value.exit_code == 2 + assert "local video file" in (exc.value.suggestion or "") + + +def test_run_caption_rejects_directory(fake_ffmpeg, tmp_path): + opts = dataclasses.replace(DEFAULTS, media=str(tmp_path)) + with pytest.raises(CLIError) as exc: + _run(opts, json_mode=False) + assert exc.value.error_type == "not_a_file" + assert exc.value.exit_code == 2 + assert "not a directory" in (exc.value.suggestion or "") + + +def test_run_caption_refuses_to_overwrite_the_input(fake_ffmpeg, media): + opts = dataclasses.replace(DEFAULTS, media=str(media), out=media) + with pytest.raises(UsageError) as exc: + _run(opts, json_mode=False) + assert "overwrite the input file" in exc.value.message + + +def test_run_caption_rejects_non_downloadable_url(fake_ffmpeg): + opts = dataclasses.replace(DEFAULTS, media="https://example.com/episode.mp3") + with pytest.raises(UsageError) as exc: + _run(opts, json_mode=False) + assert "assembly caption can't fetch this URL" in exc.value.message + assert "Download the video first" in (exc.value.suggestion or "") + + +# --- the faked pipeline --------------------------------------------------------- + + +def test_run_caption_end_to_end(media, fake_transcribe, fake_ffmpeg, capsys): + opts = dataclasses.replace(DEFAULTS, media=str(media)) + _run(opts, json_mode=True) + + # Transcription: the local file, with the resolved key. + assert fake_transcribe["api_key"] == "test-key" + assert fake_transcribe["audio"] == str(media) + # No --chars-per-caption: the export endpoint gets None (its own default). + assert fake_transcribe["transcript"].export_calls == [None] + + # The burn: re-encoded video with the SRT filter, audio copied, default out. + out = media.parent / "talk.captioned.mp4" + args = fake_ffmpeg["args"] + filtergraph = args[8] + assert args == [ + "/usr/bin/ffmpeg", + "-hide_banner", + "-loglevel", + "error", + "-y", + "-i", + str(media), + "-vf", + filtergraph, + "-map", + "0:v", + "-map", + "0:a?", + "-c:a", + "copy", + str(out), + ] + assert filtergraph.startswith("subtitles=") + assert "aai-caption-" in filtergraph + assert filtergraph.endswith("captions.srt") + assert fake_ffmpeg["srt"] == SRT + + payload = json.loads(capsys.readouterr().out) + assert payload == { + "source": str(media), + "out": str(out), + "transcript_id": "tr_cap", + "captions": 2, + } + + +def test_run_caption_human_summary(media, fake_transcribe, fake_ffmpeg, capsys): + opts = dataclasses.replace(DEFAULTS, media=str(media), out=Path("captioned.mp4")) + _run(opts, json_mode=False) + out = plain(capsys.readouterr().out) + assert "captioned.mp4" in out + assert "2 caption(s) burned in" in out + + +def test_run_caption_status_messages(media, fake_transcribe, fake_ffmpeg, monkeypatch): + messages: list[str] = [] + + @contextlib.contextmanager + def fake_status(message, *, json_mode, quiet): + messages.append(message) + yield + + monkeypatch.setattr(caption_exec.output, "status", fake_status) + _run(dataclasses.replace(DEFAULTS, media=str(media)), json_mode=False) + assert messages == ["Transcribing for captions…", "Fetching captions…", "Burning captions…"] + + +def test_run_caption_forwards_chars_per_caption(media, fake_transcribe, fake_ffmpeg): + opts = dataclasses.replace(DEFAULTS, media=str(media), chars_per_caption=32) + _run(opts, json_mode=True) + assert fake_transcribe["transcript"].export_calls == [32] + + +def test_run_caption_font_size_reaches_the_filtergraph(media, fake_transcribe, fake_ffmpeg): + opts = dataclasses.replace(DEFAULTS, media=str(media), font_size=28) + _run(opts, json_mode=True) + assert fake_ffmpeg["args"][8].endswith(":force_style=FontSize=28") + + +def test_transcript_id_reuses_existing_transcript(media, fake_ffmpeg, monkeypatch, capsys): + fetched: dict[str, object] = {} + transcript = fake_transcript(transcript_id="tr_99") + + def get_transcript(api_key, transcript_id): + fetched["args"] = (api_key, transcript_id) + return transcript + + monkeypatch.setattr(client, "get_transcript", get_transcript) + monkeypatch.setattr( + client, + "transcribe", + lambda *a, **k: pytest.fail("must not re-transcribe with --transcript-id"), + ) + opts = dataclasses.replace(DEFAULTS, media=str(media), transcript_id="tr_99") + _run(opts, json_mode=True) + assert fetched["args"] == ("test-key", "tr_99") + payload = json.loads(capsys.readouterr().out) + assert payload["transcript_id"] == "tr_99" + + +def test_empty_srt_is_a_no_captions_error(media, fake_ffmpeg, monkeypatch): + monkeypatch.setattr(client, "transcribe", lambda *a, **k: fake_transcript(srt=" \n")) + opts = dataclasses.replace(DEFAULTS, media=str(media)) + with pytest.raises(CLIError) as exc: + _run(opts, json_mode=False) + assert exc.value.error_type == "no_captions" + assert exc.value.exit_code == 2 + assert "Transcript tr_cap has no captions to burn in" in exc.value.message + + +def test_ffmpeg_failure_reports_last_stderr_line(media, fake_transcribe, monkeypatch): + record_ffmpeg(monkeypatch, returncode=1, stderr="noise\nInvalid data found\n") + opts = dataclasses.replace(DEFAULTS, media=str(media)) + with pytest.raises(CLIError) as exc: + _run(opts, json_mode=False) + assert exc.value.error_type == "caption_failed" + assert "Could not write talk.captioned.mp4" in exc.value.message + # The last stderr line is the reason ffmpeg gives; earlier noise is dropped. + assert "Invalid data found" in exc.value.message + assert "noise" not in exc.value.message + assert "audio-only media" in (exc.value.suggestion or "") + + +def test_ffmpeg_silent_failure_reports_exit_code(media, fake_transcribe, monkeypatch): + record_ffmpeg(monkeypatch, returncode=3) + opts = dataclasses.replace(DEFAULTS, media=str(media)) + with pytest.raises(CLIError) as exc: + _run(opts, json_mode=False) + assert "ffmpeg exited with code 3" in exc.value.message + + +# --- YouTube / media-page sources ---------------------------------------------- + +YT_URL = "https://www.youtube.com/watch?v=abc123" + + +@pytest.fixture +def fake_download(monkeypatch: pytest.MonkeyPatch): + """Stand in for yt-dlp: 'download' a fixed video file into the temp dir.""" + seen: dict[str, object] = {} + + def download(url, dest_dir, *, video=False): + seen["url"] = url + seen["video"] = video + path = dest_dir / "vid123.mp4" + path.write_bytes(b"\x00video") + seen["path"] = path + return path + + monkeypatch.setattr(youtube, "download_media", download) + return seen + + +def test_run_caption_youtube_downloads_the_full_video( + tmp_path, fake_download, fake_transcribe, fake_ffmpeg, capsys, monkeypatch +): + monkeypatch.chdir(tmp_path) + opts = dataclasses.replace(DEFAULTS, media=YT_URL) + _run(opts, json_mode=True) + # Captions are burned into the picture, so the download is always the video. + assert fake_download["url"] == YT_URL + assert fake_download["video"] is True + assert fake_transcribe["audio"] == str(fake_download["path"]) + # ffmpeg reads the downloaded temp file; the default output lands in the cwd, + # named after the download (the temp dir is gone after the run). + out = tmp_path / "vid123.captioned.mp4" + assert fake_ffmpeg["args"][6] == str(fake_download["path"]) + assert fake_ffmpeg["args"][-1] == str(out) + payload = json.loads(capsys.readouterr().out) + assert payload["source"] == YT_URL + assert payload["out"] == str(out) + + +def test_run_caption_youtube_status_messages( + tmp_path, fake_download, fake_transcribe, fake_ffmpeg, monkeypatch +): + monkeypatch.chdir(tmp_path) + messages: list[str] = [] + + @contextlib.contextmanager + def fake_status(message, *, json_mode, quiet): + messages.append(message) + yield + + monkeypatch.setattr(caption_exec.output, "status", fake_status) + _run(dataclasses.replace(DEFAULTS, media=YT_URL), json_mode=False) + assert messages == [ + "Downloading video…", + "Transcribing for captions…", + "Fetching captions…", + "Burning captions…", + ] + + +def test_run_caption_youtube_honors_explicit_out( + tmp_path, fake_download, fake_transcribe, fake_ffmpeg +): + out = tmp_path / "with-captions.mp4" + opts = dataclasses.replace(DEFAULTS, media=YT_URL, out=out) + _run(opts, json_mode=True) + assert fake_ffmpeg["args"][-1] == str(out) diff --git a/tests/test_clip_command.py b/tests/test_clip_command.py index 9cfafe0c..bb51ef3f 100644 --- a/tests/test_clip_command.py +++ b/tests/test_clip_command.py @@ -63,6 +63,7 @@ def test_clip_parses_every_flag_into_options(monkeypatch, tmp_path): "--no-snap", "--out-dir", str(tmp_path), + "--video", "--json", ], ) @@ -79,6 +80,7 @@ def test_clip_parses_every_flag_into_options(monkeypatch, tmp_path): padding=0.5, snap=False, out_dir=tmp_path, + video=True, ) assert captured["json_mode"] is True @@ -99,6 +101,7 @@ def test_clip_defaults_when_only_media_is_given(monkeypatch): padding=0.0, snap=True, out_dir=None, + video=False, ) assert captured["json_mode"] is False diff --git a/tests/test_clip_sources.py b/tests/test_clip_sources.py index 9db83d01..a08b3174 100644 --- a/tests/test_clip_sources.py +++ b/tests/test_clip_sources.py @@ -34,17 +34,18 @@ def fake_ffmpeg(monkeypatch): @pytest.fixture def fake_download(monkeypatch): - """Stand in for yt-dlp: 'download' a fixed audio file into the temp dir.""" + """Stand in for yt-dlp: 'download' a fixed media file into the temp dir.""" seen: dict[str, object] = {} - def download(url, dest_dir): + def download(url, dest_dir, *, video=False): seen["url"] = url - path = dest_dir / "vid123.m4a" - path.write_bytes(b"\x00audio") + seen["video"] = video + path = dest_dir / ("vid123.mp4" if video else "vid123.m4a") + path.write_bytes(b"\x00media") seen["path"] = path return path - monkeypatch.setattr(clip_exec.youtube, "download_audio", download) + monkeypatch.setattr(clip_exec.youtube, "download_media", download) return seen @@ -112,9 +113,42 @@ def fake_status(message, *, json_mode, quiet): monkeypatch.setattr(clip_exec.output, "status", fake_status) opts = dataclasses.replace(DEFAULTS, media=YT_URL, ranges=["1-2"]) clip_exec.run_clip(opts, AppState(), json_mode=False) + # Without --video only the audio track is fetched. + assert fake_download["video"] is False assert messages == ["Downloading audio…", "Detecting silence…", "Cutting 1 clip(s)…"] +def test_run_clip_video_downloads_the_full_video( + tmp_path, fake_ffmpeg, fake_download, capsys, monkeypatch +): + monkeypatch.chdir(tmp_path) + messages: list[str] = [] + + @contextlib.contextmanager + def fake_status(message, *, json_mode, quiet): + messages.append(message) + yield + + monkeypatch.setattr(clip_exec.output, "status", fake_status) + opts = dataclasses.replace(DEFAULTS, media=YT_URL, ranges=["1-2"], video=True) + clip_exec.run_clip(opts, AppState(), json_mode=True) + # --video fetches the full video, and the clips carry its container/extension. + assert fake_download["video"] is True + assert messages[0] == "Downloading video…" + assert fake_ffmpeg[1][-1] == str(tmp_path / "vid123.clip01.mp4") + payload = json.loads(capsys.readouterr().out) + assert payload["clips"][0]["path"] == str(tmp_path / "vid123.clip01.mp4") + + +def test_run_clip_video_requires_a_url_source(media, fake_ffmpeg): + # A local file already carries its video into every clip, so --video would be + # a silent no-op — it is rejected instead. + opts = dataclasses.replace(DEFAULTS, media=str(media), ranges=["1-2"], video=True) + with pytest.raises(UsageError) as exc: + clip_exec.run_clip(opts, AppState(), json_mode=False) + assert "--video only applies to a downloadable URL source" in exc.value.message + + # --- transcript piped on stdin (-t -) ------------------------------------------- diff --git a/tests/test_dub_command.py b/tests/test_dub_command.py index b30cb784..c0f418b1 100644 --- a/tests/test_dub_command.py +++ b/tests/test_dub_command.py @@ -58,6 +58,7 @@ def test_defaults_map_to_options(captured_run): model=llm.DEFAULT_MODEL, max_tokens=llm.DEFAULT_MAX_TOKENS, out=None, + video=False, ) @@ -81,6 +82,7 @@ def test_every_flag_maps_to_options(captured_run): "7", "--out", "dubbed.mp4", + "--video", "--json", ], ) @@ -94,4 +96,5 @@ def test_every_flag_maps_to_options(captured_run): model="gpt-5", max_tokens=7, out=Path("dubbed.mp4"), + video=True, ) diff --git a/tests/test_dub_pipeline.py b/tests/test_dub_pipeline.py index 200e2e1a..66a0b244 100644 --- a/tests/test_dub_pipeline.py +++ b/tests/test_dub_pipeline.py @@ -7,6 +7,7 @@ from __future__ import annotations +import contextlib import dataclasses import json import subprocess @@ -15,9 +16,9 @@ import pytest -from aai_cli import client, dub_exec, llm +from aai_cli import client, dub_exec, llm, youtube from aai_cli.context import AppState -from aai_cli.errors import APIError, CLIError +from aai_cli.errors import APIError, CLIError, UsageError from aai_cli.tts import session from aai_cli.tts.session import SpeakResult from tests._dub_helpers import ( @@ -301,3 +302,138 @@ def test_ffmpeg_silent_failure_reports_exit_code( with pytest.raises(CLIError) as exc: _run(opts, json_mode=False) assert "ffmpeg exited with code 3" in exc.value.message + + +# --- YouTube / media-page sources ---------------------------------------------- + +YT_URL = "https://www.youtube.com/watch?v=abc123" + + +@pytest.fixture +def fake_download(monkeypatch: pytest.MonkeyPatch): + """Stand in for yt-dlp: 'download' a fixed media file into the temp dir.""" + seen: dict[str, object] = {} + + def download(url, dest_dir, *, video=False): + seen["url"] = url + seen["video"] = video + path = dest_dir / ("vid123.mp4" if video else "vid123.m4a") + path.write_bytes(b"\x00media") + seen["path"] = path + return path + + monkeypatch.setattr(youtube, "download_media", download) + return seen + + +def test_run_dub_youtube_downloads_and_dubs_into_cwd( + tmp_path, + fake_download, + fake_transcribe, + fake_translate, + fake_synthesize, + fake_ffmpeg, + capsys, + monkeypatch, +): + monkeypatch.chdir(tmp_path) + opts = dataclasses.replace(DEFAULTS, media=YT_URL) + _run(opts, json_mode=True) + # Audio-only download by default; the downloaded temp file feeds the pipeline. + assert fake_download["url"] == YT_URL + assert fake_download["video"] is False + assert fake_transcribe["audio"] == str(fake_download["path"]) + # ffmpeg muxes over the downloaded file; the default output lands in the cwd, + # named after the download (the temp dir is gone after the run). + args = fake_ffmpeg["args"] + assert args[6] == str(fake_download["path"]) + out = tmp_path / "vid123.dub.german.m4a" + assert args[-1] == str(out) + payload = json.loads(capsys.readouterr().out) + assert payload["source"] == YT_URL + assert payload["out"] == str(out) + + +def test_run_dub_youtube_video_keeps_the_picture( + tmp_path, + fake_download, + fake_transcribe, + fake_translate, + fake_synthesize, + fake_ffmpeg, + capsys, + monkeypatch, +): + monkeypatch.chdir(tmp_path) + messages: list[str] = [] + + @contextlib.contextmanager + def fake_status(message, *, json_mode, quiet): + messages.append(message) + yield + + monkeypatch.setattr(dub_exec.output, "status", fake_status) + opts = dataclasses.replace(DEFAULTS, media=YT_URL, video=True) + _run(opts, json_mode=True) + # --video fetches the full video; the dubbed default output keeps its extension. + assert fake_download["video"] is True + assert messages[0] == "Downloading video…" + payload = json.loads(capsys.readouterr().out) + assert payload["out"] == str(tmp_path / "vid123.dub.german.mp4") + + +def test_run_dub_youtube_audio_download_status_message( + tmp_path, + fake_download, + fake_transcribe, + fake_translate, + fake_synthesize, + fake_ffmpeg, + capsys, + monkeypatch, +): + monkeypatch.chdir(tmp_path) + messages: list[str] = [] + + @contextlib.contextmanager + def fake_status(message, *, json_mode, quiet): + messages.append(message) + yield + + monkeypatch.setattr(dub_exec.output, "status", fake_status) + _run(dataclasses.replace(DEFAULTS, media=YT_URL), json_mode=True) + assert messages[0] == "Downloading audio…" + + +def test_run_dub_youtube_honors_explicit_out( + tmp_path, + fake_download, + fake_transcribe, + fake_translate, + fake_synthesize, + fake_ffmpeg, + capsys, +): + out = tmp_path / "dubbed.mp4" + opts = dataclasses.replace(DEFAULTS, media=YT_URL, out=out) + _run(opts, json_mode=True) + assert fake_ffmpeg["args"][-1] == str(out) + + +def test_run_dub_video_requires_a_url_source(media, monkeypatch): + # A local file's video stream is already copied into the dub, so --video + # would be a silent no-op — it is rejected instead. + monkeypatch.setattr("shutil.which", lambda name: "/usr/bin/ffmpeg") + opts = dataclasses.replace(DEFAULTS, media=str(media), video=True) + with pytest.raises(UsageError) as exc: + _run(opts, json_mode=False) + assert "--video only applies to a downloadable URL source" in exc.value.message + + +def test_run_dub_rejects_non_downloadable_url(monkeypatch): + monkeypatch.setattr("shutil.which", lambda name: "/usr/bin/ffmpeg") + opts = dataclasses.replace(DEFAULTS, media="https://example.com/episode.mp3") + with pytest.raises(UsageError) as exc: + _run(opts, json_mode=False) + assert "assembly dub can't fetch this URL" in exc.value.message + assert "Download the media first" in (exc.value.suggestion or "") diff --git a/tests/test_smoke.py b/tests/test_smoke.py index bc16fee1..20b6fbb3 100644 --- a/tests/test_smoke.py +++ b/tests/test_smoke.py @@ -159,6 +159,7 @@ def test_help_lists_commands_in_workflow_order(): "llm", "clip", "dub", + "caption", "eval", "webhooks", # Setup & Tools diff --git a/tests/test_stream_command.py b/tests/test_stream_command.py index 676fd6ff..2d079865 100644 --- a/tests/test_stream_command.py +++ b/tests/test_stream_command.py @@ -271,7 +271,7 @@ def test_stream_youtube_url_downloads_then_streams(monkeypatch, tmp_path): w.setsampwidth(2) w.setframerate(16000) w.writeframes(b"\x00\x01" * 100) - monkeypatch.setattr("aai_cli.stream_exec.youtube.download_audio", lambda url, d: fake) + monkeypatch.setattr("aai_cli.stream_exec.youtube.download_media", lambda url, d: fake) seen = {} def fake_stream(api_key, source, *, params, on_begin=None, on_turn=None, on_termination=None): @@ -295,7 +295,7 @@ def test_stream_podcast_page_url_downloads_then_streams(monkeypatch, tmp_path): w.setsampwidth(2) w.setframerate(16000) w.writeframes(b"\x00\x01" * 100) - monkeypatch.setattr("aai_cli.stream_exec.youtube.download_audio", lambda url, d: fake) + monkeypatch.setattr("aai_cli.stream_exec.youtube.download_media", lambda url, d: fake) seen = {} def fake_stream(api_key, source, *, params, on_begin=None, on_turn=None, on_termination=None): @@ -317,7 +317,7 @@ def test_stream_downloadable_url_resolves_credentials_before_downloading(monkeyp monkeypatch.setattr("aai_cli.context._interactive_session", lambda: False) downloads = [] monkeypatch.setattr( - "aai_cli.stream_exec.youtube.download_audio", + "aai_cli.stream_exec.youtube.download_media", lambda url, dest: downloads.append(url), ) monkeypatch.setattr( diff --git a/tests/test_transcribe.py b/tests/test_transcribe.py index 28991567..59d5a209 100644 --- a/tests/test_transcribe.py +++ b/tests/test_transcribe.py @@ -334,7 +334,7 @@ def test_transcribe_youtube_url_downloads_then_transcribes(monkeypatch, mocker, fake = tmp_path / "vid.m4a" fake.write_bytes(b"x") monkeypatch.setattr( - "aai_cli.transcribe_exec.youtube.download_audio", + "aai_cli.transcribe_exec.youtube.download_media", lambda url, d, *, download_sections=None: fake, ) tx = mocker.patch( @@ -348,7 +348,7 @@ def test_transcribe_youtube_url_downloads_then_transcribes(monkeypatch, mocker, def test_transcribe_youtube_url_forwards_download_sections(monkeypatch, mocker, tmp_path): - # --download-sections must reach youtube.download_audio so only that slice is fetched. + # --download-sections must reach youtube.download_media so only that slice is fetched. _auth() fake = tmp_path / "vid.m4a" fake.write_bytes(b"x") @@ -358,7 +358,7 @@ def _capture(url, d, *, download_sections=None): seen["sections"] = download_sections return fake - monkeypatch.setattr("aai_cli.transcribe_exec.youtube.download_audio", _capture) + monkeypatch.setattr("aai_cli.transcribe_exec.youtube.download_media", _capture) mocker.patch( "aai_cli.transcribe_exec.client.transcribe", autospec=True, @@ -379,7 +379,7 @@ def test_transcribe_podcast_page_url_downloads_then_transcribes(monkeypatch, moc fake = tmp_path / "episode.m4a" fake.write_bytes(b"x") monkeypatch.setattr( - "aai_cli.transcribe_exec.youtube.download_audio", + "aai_cli.transcribe_exec.youtube.download_media", lambda url, d, *, download_sections=None: fake, ) tx = mocker.patch( @@ -432,7 +432,7 @@ def test_transcribe_direct_audio_url_passes_through_without_download(monkeypatch def _no_download(url, d, *, download_sections=None): raise AssertionError("direct audio URLs must not be downloaded") - monkeypatch.setattr("aai_cli.transcribe_exec.youtube.download_audio", _no_download) + monkeypatch.setattr("aai_cli.transcribe_exec.youtube.download_media", _no_download) tx = mocker.patch( "aai_cli.transcribe_exec.client.transcribe", autospec=True, diff --git a/tests/test_youtube.py b/tests/test_youtube.py index 7cd71b90..de05e0fa 100644 --- a/tests/test_youtube.py +++ b/tests/test_youtube.py @@ -44,7 +44,7 @@ def test_is_downloadable_url_passes_direct_and_local_sources_through(): def test_is_downloadable_url_without_ytdlp_still_matches_youtube(monkeypatch): - # With yt-dlp unimportable, YouTube still matches by URL shape (so download_audio + # With yt-dlp unimportable, YouTube still matches by URL shape (so download_media # can raise its install hint); extractor-matched hosts degrade to API pass-through. monkeypatch.setitem(sys.modules, "yt_dlp", None) # force ImportError monkeypatch.setitem(sys.modules, "yt_dlp.extractor", None) @@ -62,7 +62,7 @@ def _fake_ytdlp(monkeypatch, ydl_cls): monkeypatch.setitem(sys.modules, "yt_dlp", types.SimpleNamespace(YoutubeDL=ydl_cls)) -def test_download_audio_returns_prepared_path(tmp_path, monkeypatch): +def test_download_media_returns_prepared_path(tmp_path, monkeypatch): created = tmp_path / "vid123.m4a" captured = {} @@ -85,7 +85,7 @@ def prepare_filename(self, info): return str(created) _fake_ytdlp(monkeypatch, FakeYDL) - out = youtube.download_audio("https://youtu.be/vid123", tmp_path) + out = youtube.download_media("https://youtu.be/vid123", tmp_path) assert out == created assert out.is_file() # yt-dlp is driven quietly (no console noise) and actually downloads the media. @@ -93,9 +93,92 @@ def prepare_filename(self, info): assert captured["opts"]["no_warnings"] is True assert captured["opts"]["noprogress"] is True assert captured["download"] is True + # The default fetches only the audio track — no video download, no merging. + assert captured["opts"]["format"] == "bestaudio/best" + assert "merge_output_format" not in captured["opts"] -def test_download_audio_routes_ytdlp_output_to_silent_logger(tmp_path, monkeypatch, capsys): +def test_download_media_video_fetches_merged_video(tmp_path, monkeypatch): + # video=True must request the full video (best video+audio) merged into one + # mp4 container, so the result is playable/clippable everywhere. + captured = {} + + class FakeYDL: + def __init__(self, opts): + captured["opts"] = opts + + def __enter__(self): + return self + + def __exit__(self, *exc): + return False + + def extract_info(self, url, download): + (tmp_path / "x.mp4").write_bytes(b"video") + return {"id": "x", "ext": "mp4"} + + def prepare_filename(self, info): + return str(tmp_path / "x.mp4") + + _fake_ytdlp(monkeypatch, FakeYDL) + out = youtube.download_media("https://youtu.be/x", tmp_path, video=True) + assert out == tmp_path / "x.mp4" + assert captured["opts"]["format"] == "bestvideo*+bestaudio/best" + assert captured["opts"]["merge_output_format"] == "mp4" + + +def test_download_media_video_errors_name_the_video(tmp_path, monkeypatch): + # With video=True the failure messages say "video", not "audio". + _fake_ytdlp(monkeypatch, _raising_ydl("network down")) + with pytest.raises(CLIError) as exc: + youtube.download_media("https://youtu.be/x", tmp_path, video=True) + assert exc.value.message == "Could not download video from https://youtu.be/x: network down" + + +def test_download_media_video_no_file_produced_names_the_video(tmp_path, monkeypatch): + class FakeYDL: + def __init__(self, opts): + pass + + def __enter__(self): + return self + + def __exit__(self, *exc): + return False + + def extract_info(self, url, download): + return {"id": "x"} # writes no file + + def prepare_filename(self, info): + return str(tmp_path / "guessed.mp4") # doesn't exist + + _fake_ytdlp(monkeypatch, FakeYDL) + with pytest.raises(CLIError) as exc: + youtube.download_media("https://youtu.be/x", tmp_path, video=True) + assert "no video file" in exc.value.message + + +def test_validate_video_flag_accepts_downloadable_urls(): + youtube.validate_video_flag("https://youtu.be/abc123", video=True) # no exception + + +@pytest.mark.parametrize("source", ["talk.mp4", "https://example.com/episode.mp3"]) +def test_validate_video_flag_rejects_non_downloadable_sources(source): + # --video only changes what a media-page download fetches; a local file (or a + # direct URL the API fetches itself) already carries its video, so the flag + # would be silently dropped — and a requested flag is never dropped silently. + with pytest.raises(UsageError) as exc: + youtube.validate_video_flag(source, video=True) + assert "--video only applies to a downloadable URL source" in exc.value.message + assert "drop --video" in (exc.value.suggestion or "") + + +@pytest.mark.parametrize("source", ["talk.mp4", "https://youtu.be/abc123"]) +def test_validate_video_flag_without_video_is_a_no_op(source): + youtube.validate_video_flag(source, video=False) # no exception + + +def test_download_media_routes_ytdlp_output_to_silent_logger(tmp_path, monkeypatch, capsys): # yt-dlp's default logger writes its own "ERROR: …" line to stderr before the CLI's # clean error, duplicating the message; the passed logger must swallow everything. import logging @@ -120,7 +203,7 @@ def prepare_filename(self, info): return str(tmp_path / "x.m4a") _fake_ytdlp(monkeypatch, FakeYDL) - youtube.download_audio("https://youtu.be/x", tmp_path) + youtube.download_media("https://youtu.be/x", tmp_path) logger = captured["opts"]["logger"] # Structurally quiet: no propagation to root, only swallow-everything handlers. assert logger.name == "aai_cli.youtube.yt_dlp" @@ -136,7 +219,7 @@ def prepare_filename(self, info): assert out.out == "" -def test_download_audio_falls_back_to_landed_file(tmp_path, monkeypatch): +def test_download_media_falls_back_to_landed_file(tmp_path, monkeypatch): landed = tmp_path / "actual.webm" class FakeYDL: @@ -157,10 +240,10 @@ def prepare_filename(self, info): return str(tmp_path / "guessed.m4a") # wrong extension; file doesn't exist _fake_ytdlp(monkeypatch, FakeYDL) - assert youtube.download_audio("https://youtu.be/x", tmp_path) == landed + assert youtube.download_media("https://youtu.be/x", tmp_path) == landed -def test_download_audio_falls_back_to_largest_file(tmp_path, monkeypatch): +def test_download_media_falls_back_to_largest_file(tmp_path, monkeypatch): # yt-dlp can leave sidecars (thumbnail, .info.json) next to the audio track; # the fallback must pick the audio (largest), not an arbitrary iterdir() entry. audio = tmp_path / "actual.webm" @@ -185,10 +268,10 @@ def prepare_filename(self, info): return str(tmp_path / "guessed.m4a") # wrong extension; file doesn't exist _fake_ytdlp(monkeypatch, FakeYDL) - assert youtube.download_audio("https://youtu.be/x", tmp_path) == audio + assert youtube.download_media("https://youtu.be/x", tmp_path) == audio -def test_download_audio_no_file_produced_raises(tmp_path, monkeypatch): +def test_download_media_no_file_produced_raises(tmp_path, monkeypatch): # prepare_filename points at a missing file and nothing landed in dest_dir. class FakeYDL: def __init__(self, opts): @@ -208,7 +291,7 @@ def prepare_filename(self, info): _fake_ytdlp(monkeypatch, FakeYDL) with pytest.raises(CLIError) as exc: - youtube.download_audio("https://youtu.be/x", tmp_path) + youtube.download_media("https://youtu.be/x", tmp_path) assert exc.value.error_type == "youtube_error" assert exc.value.exit_code == 1 assert "no audio file" in exc.value.message @@ -234,10 +317,10 @@ def prepare_filename(self, info): return FakeYDL -def test_download_audio_error_raises_cli_error(tmp_path, monkeypatch): +def test_download_media_error_raises_cli_error(tmp_path, monkeypatch): _fake_ytdlp(monkeypatch, _raising_ydl("network down")) with pytest.raises(CLIError) as exc: - youtube.download_audio("https://youtu.be/x", tmp_path) + youtube.download_media("https://youtu.be/x", tmp_path) assert exc.value.error_type == "youtube_error" assert exc.value.exit_code == 1 # A message without boilerplate passes through untouched. @@ -250,13 +333,13 @@ def test_download_audio_error_raises_cli_error(tmp_path, monkeypatch): ) -def test_download_audio_trims_ytdlp_bug_report_boilerplate(tmp_path, monkeypatch): +def test_download_media_trims_ytdlp_bug_report_boilerplate(tmp_path, monkeypatch): # yt-dlp appends report-a-bug boilerplate to extractor errors; only the # meaningful part should reach the user, without the "ERROR: " prefix. message = f"ERROR: [youtube] abc: Video unavailable; {_YTDLP_BOILERPLATE}" _fake_ytdlp(monkeypatch, _raising_ydl(message)) with pytest.raises(CLIError) as exc: - youtube.download_audio("https://youtu.be/x", tmp_path) + youtube.download_media("https://youtu.be/x", tmp_path) assert exc.value.message == ( "Could not download audio from https://youtu.be/x: [youtube] abc: Video unavailable" ) @@ -264,19 +347,19 @@ def test_download_audio_trims_ytdlp_bug_report_boilerplate(tmp_path, monkeypatch assert "latest version" not in exc.value.message -def test_download_audio_all_boilerplate_message_falls_back_to_raw_text(tmp_path, monkeypatch): +def test_download_media_all_boilerplate_message_falls_back_to_raw_text(tmp_path, monkeypatch): # When trimming would leave nothing, keep the original message over an empty error. message = _YTDLP_BOILERPLATE[0].upper() + _YTDLP_BOILERPLATE[1:] _fake_ytdlp(monkeypatch, _raising_ydl(message)) with pytest.raises(CLIError) as exc: - youtube.download_audio("https://youtu.be/x", tmp_path) + youtube.download_media("https://youtu.be/x", tmp_path) assert message in exc.value.message -def test_download_audio_missing_ytdlp_raises(tmp_path, monkeypatch): +def test_download_media_missing_ytdlp_raises(tmp_path, monkeypatch): monkeypatch.setitem(sys.modules, "yt_dlp", None) # force ImportError on `import yt_dlp` with pytest.raises(CLIError) as exc: - youtube.download_audio("https://youtu.be/x", tmp_path) + youtube.download_media("https://youtu.be/x", tmp_path) assert exc.value.error_type == "ytdlp_missing" assert exc.value.exit_code == 2 @@ -284,7 +367,7 @@ def test_download_audio_missing_ytdlp_raises(tmp_path, monkeypatch): def test_missing_ytdlp_suggests_install(tmp_path, monkeypatch): monkeypatch.setitem(sys.modules, "yt_dlp", None) # force ImportError on `import yt_dlp` with pytest.raises(CLIError) as exc: - youtube.download_audio("https://youtu.be/x", tmp_path) + youtube.download_media("https://youtu.be/x", tmp_path) assert "yt-dlp" in exc.value.message assert "pip install yt-dlp" in (exc.value.suggestion or "") @@ -337,7 +420,7 @@ def test_parse_download_sections_rejects_malformed(spec, needle): assert exc.value.exit_code == 2 -def test_download_audio_with_sections_sets_download_ranges(tmp_path, monkeypatch): +def test_download_media_with_sections_sets_download_ranges(tmp_path, monkeypatch): # --download-sections must reach yt-dlp as download_ranges + force_keyframes_at_cuts # (exact cuts, not the nearest keyframe). captured = {} @@ -360,7 +443,7 @@ def prepare_filename(self, info): return str(tmp_path / "x.m4a") _fake_ytdlp(monkeypatch, FakeYDL) - youtube.download_audio( + youtube.download_media( "https://youtu.be/x", tmp_path, download_sections=["*0:00-5:00", "intro"] ) download_ranges = captured["opts"]["download_ranges"] @@ -370,7 +453,7 @@ def prepare_filename(self, info): assert captured["opts"]["force_keyframes_at_cuts"] is True -def test_download_audio_without_sections_omits_download_ranges(tmp_path, monkeypatch): +def test_download_media_without_sections_omits_download_ranges(tmp_path, monkeypatch): # The default path must not set download_ranges (downloads the whole track). captured = {} @@ -392,6 +475,6 @@ def prepare_filename(self, info): return str(tmp_path / "x.m4a") _fake_ytdlp(monkeypatch, FakeYDL) - youtube.download_audio("https://youtu.be/x", tmp_path) + youtube.download_media("https://youtu.be/x", tmp_path) assert "download_ranges" not in captured["opts"] assert "force_keyframes_at_cuts" not in captured["opts"]