From 938a23699e5a5aa824bb75246a70900c29a84fef Mon Sep 17 00:00:00 2001 From: Claude Date: Wed, 17 Jun 2026 04:15:53 +0000 Subject: [PATCH] feat(clip,dub,caption): add --from-stdin batch mode MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Abstract transcribe's batch primitive (--from-stdin/--concurrency/--force) into a shared app/batch.py runner and wire it into clip, dub, and caption, so each processes a piped list of sources concurrently — making the media commands as pipeline-composable as transcribe already was (find … | assembly caption --from-stdin). - app/batch.py: a generic concurrent runner parameterized by a per-source worker — stdin source expansion, a live status table, one NDJSON record per source under --json, and a fail-summary that exits non-zero. Lighter than transcribe's sidecar/--llm/--llm-reduce path on purpose. - clip/dub/caption: --from-stdin reads one path/URL per line and runs each source's existing single-source pipeline concurrently. A re-run skips sources whose output already exists (--force redoes them); --out and -t/--transcript-id are rejected in batch mode (they can't span many sources). - options.py: the batch option factories now take per-command help text so the flags read naturally on each command (video vs audio, caption vs dub vs clip). Co-Authored-By: Claude Opus 4.8 (1M context) Claude-Session: https://claude.ai/code/session_015PJdgpwb2zQAJSyyMQvijh --- README.md | 6 + aai_cli/app/batch.py | 189 ++++++++++++++++ aai_cli/commands/caption/__init__.py | 28 ++- aai_cli/commands/caption/_exec.py | 103 +++++++-- aai_cli/commands/clip/__init__.py | 28 ++- aai_cli/commands/clip/_exec.py | 123 +++++++++-- aai_cli/commands/dub/__init__.py | 28 ++- aai_cli/commands/dub/_exec.py | 128 +++++++++-- aai_cli/options.py | 30 ++- .../test_snapshots_help_run.ambr | 71 +++++-- tests/_clip_helpers.py | 3 + tests/_dub_helpers.py | 3 + tests/test_batch.py | 201 ++++++++++++++++++ tests/test_caption_batch.py | 144 +++++++++++++ tests/test_caption_command.py | 6 + tests/test_caption_exec.py | 3 + tests/test_clip_batch.py | 131 ++++++++++++ tests/test_clip_command.py | 6 + tests/test_dub_batch.py | 128 +++++++++++ tests/test_dub_command.py | 6 + 20 files changed, 1282 insertions(+), 83 deletions(-) create mode 100644 aai_cli/app/batch.py create mode 100644 tests/test_batch.py create mode 100644 tests/test_caption_batch.py create mode 100644 tests/test_clip_batch.py create mode 100644 tests/test_dub_batch.py diff --git a/README.md b/README.md index 2bd2bd52..c913546d 100644 --- a/README.md +++ b/README.md @@ -109,6 +109,12 @@ assembly clip "https://www.youtube.com/watch?v=UF8uR6Z6KLc" --video \ assembly caption video.mp4 --chars-per-caption 24 --font-size 28 ``` +`clip`, `dub`, and `caption` each batch a piped list too: `--from-stdin` reads one path/URL per line and processes them concurrently (`--concurrency`), skipping sources whose output already exists so a re-run only does what's left (`--force` redoes them): + +```sh +find talks -name "*.mp4" | assembly caption --from-stdin --concurrency 3 +``` + **Keep a live to-do list from your mic** — `llm -f` re-runs the prompt over the growing transcript, updating in place: ```sh diff --git a/aai_cli/app/batch.py b/aai_cli/app/batch.py new file mode 100644 index 00000000..737a922e --- /dev/null +++ b/aai_cli/app/batch.py @@ -0,0 +1,189 @@ +"""Concurrent many-source batch mode for the media commands (clip, dub, caption). + +``assembly transcribe`` owns a richer batch path — per-source ``.aai.json`` +sidecars, resumable ``--llm`` chains, ``--llm-reduce`` (see +``app/transcribe/batch.py``). This is the lighter shared machinery for the +one-source-in / one-output-out media commands: a command reads its source list +with :func:`stdin_sources` and, when batch mode is active, hands each source to +:func:`run_batch` with a worker that runs its existing single-source path. + +The worker returns a :class:`SourceResult` (or raises a ``CLIError`` for a +per-source failure). The runner renders one live status table, emits one NDJSON +record per source under ``--json``, and raises so the process exits non-zero if +any source failed — a re-run resumes (each command skips a source whose output +already exists unless ``--force``) and retries only the failures. + +Source expansion stays deliberately small: stdin only (one path/URL per line). +Directory/glob/feed expansion is transcribe's concern, not these commands'. +""" + +from __future__ import annotations + +import dataclasses +from collections.abc import Callable, Generator +from contextlib import contextmanager + +from rich.live import Live +from rich.markup import escape +from rich.table import Table + +from aai_cli.core import stdio +from aai_cli.core.errors import CLIError, NotAuthenticated, UsageError +from aai_cli.ui import output, theme + +# A worker turns one source into its result, or raises a CLIError to fail it. +Worker = Callable[[str], "SourceResult"] + + +@dataclasses.dataclass(frozen=True) +class SourceResult: + """One source's outcome: the JSON payload the command would emit single-source, + plus a one-line human ``summary`` for the progress table. ``status`` is + ``"completed"`` for processed sources or ``"skipped"`` for ones a re-run left + alone (their output already existed and ``--force`` wasn't passed).""" + + payload: dict[str, object] + summary: str + status: str = "completed" + + +def stdin_sources(media: str, *, from_stdin: bool) -> list[str] | None: + """The batch source list read from stdin, or ``None`` for a single-source run. + + ``None`` means "not batch mode" (no ``--from-stdin``); the caller then handles + its lone ``media`` argument as before. With ``--from-stdin`` a positional source + is rejected (the list comes from stdin, not argv), and an empty stdin is a usage + error rather than a silent no-op. + """ + if not from_stdin: + return None + if media: + raise UsageError( + "--from-stdin reads sources from stdin; don't also pass a source.", + suggestion="Drop the positional source, or drop --from-stdin to process just it.", + ) + lines = list(dict.fromkeys(stdio.iter_piped_stdin_lines())) # dedupe, keep order + if not lines: + raise UsageError( + "No sources received on stdin.", + suggestion="Pipe one path or URL per line, e.g. " + "find . -name '*.mp4' | assembly caption --from-stdin.", + ) + return lines + + +@dataclasses.dataclass +class _Item: + source: str + status: str = "queued" # queued → processing → completed | skipped | failed + summary: str = "" # the result one-liner, or the error message when failed + payload: dict[str, object] | None = None + + def record(self) -> dict[str, object]: + """The NDJSON record emitted for this source under ``--json``.""" + # "type" discriminates NDJSON lines CLI-wide (see REFERENCE.md "JSON output"). + rec: dict[str, object] = {"type": "result", "source": self.source, "status": self.status} + if self.status == "failed": + rec["error"] = self.summary + elif self.payload is not None: + rec.update(self.payload) + return rec + + +def _process_one(item: _Item, worker: Worker) -> None: + """Worker body: run one source, recording its result or per-source failure. + + A NotAuthenticated re-raises so :func:`_drain` aborts the whole batch — one + rejected key fails every source identically — while any other CLIError is + recorded on the item and the batch carries on. + """ + try: + item.status = "processing" + result = worker(item.source) + item.status, item.summary, item.payload = result.status, result.summary, result.payload + except CLIError as err: + item.status, item.summary = "failed", err.message + if isinstance(err, NotAuthenticated): + raise + + +def _render_table(items: list[_Item]) -> Table: + table = output.data_table("Source", "Status", "Result") + for item in items: + table.add_row(escape(item.source), theme.status_text(item.status), escape(item.summary)) + return table + + +@contextmanager +def _progress_table(items: list[_Item], *, json_mode: bool) -> Generator[None]: + """Render the batch as a live-updating table (human mode); a no-op under ``--json``. + + Rich renders nothing while running on a non-interactive console and prints the + final frame once on stop, so piped/agent runs still get the result table. JSON + mode skips Rich entirely — NDJSON per source is the output. + """ + if json_mode: + yield + return + with Live( + get_renderable=lambda: _render_table(items), + console=output.console, + refresh_per_second=4, # pragma: no mutate (cosmetic refresh cadence) + ): + yield + + +def _drain(items: list[_Item], *, worker: Worker, concurrency: int, json_mode: bool) -> None: + """Run the workers, emitting one NDJSON record per finished source under ``--json``. + + The first exception that escapes a worker (NotAuthenticated, or a bug) drops the + not-yet-started sources and re-raises once the in-flight ones drain. + """ + from concurrent.futures import ThreadPoolExecutor, as_completed + + with ThreadPoolExecutor(max_workers=concurrency) as pool: + futures = {pool.submit(_process_one, item, worker): item for item in items} + for future in as_completed(futures): + if (exc := future.exception()) is not None: + pool.shutdown(cancel_futures=True) # pragma: no mutate (best-effort cleanup) + raise exc + if json_mode: + output.emit_ndjson(futures[future].record()) + + +def _summarize(items: list[_Item], *, summary_verb: str, json_mode: bool, quiet: bool) -> None: + """Report the batch tally, raising so a partly-failed batch exits non-zero.""" + failed = sum(1 for item in items if item.status == "failed") + if failed: + raise CLIError( + f"{failed} of {len(items)} sources failed.", + error_type="batch_failed", + suggestion="Re-run the same command to retry only the failures — " + "sources whose output already exists are skipped.", + ) + completed = sum(1 for item in items if item.status == "completed") + skipped = len(items) - completed + if not json_mode and not quiet: + output.error_console.print( + output.success(f"{summary_verb} {completed}, skipped {skipped}.") + ) + + +def run_batch( + sources: list[str], + *, + worker: Worker, + concurrency: int, + summary_verb: str, + json_mode: bool, + quiet: bool, +) -> None: + """Process ``sources`` concurrently through ``worker``, one output per source. + + Raises CLIError (exit 1) when any source failed so scripts can trust the exit + code; a re-run resumes (finished outputs are skipped) and retries the failures. + """ + items = [_Item(source) for source in sources] + with _progress_table(items, json_mode=json_mode): + _drain(items, worker=worker, concurrency=concurrency, json_mode=json_mode) + _summarize(items, summary_verb=summary_verb, json_mode=json_mode, quiet=quiet) diff --git a/aai_cli/commands/caption/__init__.py b/aai_cli/commands/caption/__init__.py index 286baf6d..7865ee27 100644 --- a/aai_cli/commands/caption/__init__.py +++ b/aai_cli/commands/caption/__init__.py @@ -36,15 +36,28 @@ "assembly caption talk.mp4 --chars-per-caption 32 --font-size 28", ), ("Choose the output file", "assembly caption talk.mp4 --out talk-captioned.mp4"), + ( + "Caption a whole folder of videos", + 'find clips -name "*.mp4" | assembly caption --from-stdin', + ), ] ), ) def caption( ctx: typer.Context, - media: str = typer.Argument( - ..., + media: str | None = typer.Argument( + None, help="Video to caption: a local file, or a YouTube/media-page URL " - "(the full video is downloaded via yt-dlp)", + "(the full video is downloaded via yt-dlp). Omit with --from-stdin", + ), + from_stdin: bool = options.batch_from_stdin_option( + "Batch mode: read video paths/URLs from stdin, one per line (composes with find/ls output)" + ), + concurrency: int = options.batch_concurrency_option( + "How many sources to caption at once in batch mode" + ), + force: bool = options.batch_force_option( + "Batch mode: re-caption sources whose output file already exists" ), transcript_id: str | None = typer.Option( None, @@ -77,12 +90,19 @@ def caption( the audio stream is copied untouched. A YouTube/media-page URL is downloaded first (always the full video); its output lands in --out or the current directory. + + Batch mode: pipe one path/URL per line with --from-stdin to caption many + sources concurrently (--concurrency). Each writes its own .captioned; + a re-run skips sources whose output already exists (--force re-does them). """ opts = caption_exec.CaptionOptions( - media=media, + media=media or "", transcript_id=transcript_id, chars_per_caption=chars_per_caption, font_size=font_size, out=out, + from_stdin=from_stdin, + concurrency=concurrency, + force=force, ) run_with_options(ctx, caption_exec.run_caption, opts, json=json_out) diff --git a/aai_cli/commands/caption/_exec.py b/aai_cli/commands/caption/_exec.py index 06a58235..606e3d9a 100644 --- a/aai_cli/commands/caption/_exec.py +++ b/aai_cli/commands/caption/_exec.py @@ -15,6 +15,7 @@ from __future__ import annotations +import dataclasses import os import tempfile from dataclasses import dataclass @@ -23,10 +24,10 @@ import assemblyai as aai from rich.markup import escape -from aai_cli.app import mediafile +from aai_cli.app import batch, mediafile from aai_cli.app.context import AppState from aai_cli.core import client -from aai_cli.core.errors import CLIError +from aai_cli.core.errors import CLIError, UsageError, mutually_exclusive from aai_cli.ui import output @@ -36,12 +37,16 @@ class CaptionOptions: run_command resolves it into the ``json_mode`` argument).""" # The raw source as typed: a local path, or a downloadable media-page URL - # (a pathlib.Path would collapse the "//" in "https://"). + # (a pathlib.Path would collapse the "//" in "https://"). Empty in batch mode, + # where the sources arrive on stdin (--from-stdin) instead of as the argument. media: str transcript_id: str | None chars_per_caption: int | None font_size: int | None out: Path | None + from_stdin: bool + concurrency: int + force: bool def default_out_path(media: Path) -> Path: @@ -128,10 +133,82 @@ def _fetch_srt(transcript: object, opts: CaptionOptions, *, json_mode: bool, qui def run_caption(opts: CaptionOptions, state: AppState, *, json_mode: bool) -> None: - """Execute one `assembly caption` invocation from already-parsed flags.""" + """Execute `assembly caption`: one source, or a stdin batch (`--from-stdin`).""" + sources = batch.stdin_sources(opts.media, from_stdin=opts.from_stdin) + if sources is not None: + _reject_batch_conflicts(opts) + batch.run_batch( + sources, + worker=_caption_worker(opts, state, force=opts.force, json_mode=json_mode), + concurrency=opts.concurrency, + summary_verb="Captioned", + json_mode=json_mode, + quiet=state.quiet, + ) + return + if not opts.media: + raise UsageError( + "Pass a video file or URL to caption, or --from-stdin to read a list from stdin.", + suggestion="e.g. assembly caption talk.mp4", + ) + result = _caption_one(opts, state, json_mode=json_mode) + output.emit( + result.payload, lambda _: output.success(escape(result.summary)), json_mode=json_mode + ) + + +def _reject_batch_conflicts(opts: CaptionOptions) -> None: + """Single-result flags that can't span a many-source batch.""" + mutually_exclusive( + ("--out", opts.out), + ("--from-stdin", True), + suggestion="In batch mode each source gets its own .captioned; drop --out.", + ) + mutually_exclusive( + ("--transcript-id/-t", opts.transcript_id), + ("--from-stdin", True), + suggestion="A transcript id can't apply to many sources; drop -t in batch mode.", + ) + + +def _caption_worker( + opts: CaptionOptions, state: AppState, *, force: bool, json_mode: bool +) -> batch.Worker: + """A per-source worker for the batch runner: skip a source whose default output + already exists (unless ``--force``), else caption it with spinners silenced.""" + quiet_state = dataclasses.replace(state, quiet=True) + + def worker(source: str) -> batch.SourceResult: + if not force and (existing := _existing_output(source)) is not None: + return batch.SourceResult( + payload={"source": source, "out": str(existing)}, + summary=f"{existing} exists", + status="skipped", + ) + return _caption_one( + dataclasses.replace(opts, media=source), quiet_state, json_mode=json_mode + ) + + return worker + + +def _existing_output(source: str) -> Path | None: + """The default output for a local ``source`` when it already exists (so batch mode + skips it), else ``None`` — a URL (output name isn't known until download) or a + source with no prior output, both of which are processed.""" + if "://" in source: + return None + out = default_out_path(Path(source)) + return out if out.exists() else None + + +def _caption_one(opts: CaptionOptions, state: AppState, *, json_mode: bool) -> batch.SourceResult: + """Resolve ``opts.media`` to a local video, caption it, and return the result. + + A media-page URL is downloaded once — always the full video, since the + captions are burned into it. + """ ffmpeg = mediafile.require_ffmpeg("burn captions into video") - # A media-page URL is downloaded once — always the full video, since the - # captions are burned into it. with mediafile.resolve_media_source( opts.media, "caption", @@ -148,10 +225,10 @@ def run_caption(opts: CaptionOptions, state: AppState, *, json_mode: bool) -> No opts.out, media, downloaded=downloaded, namer=default_out_path ) mediafile.validate_out(out, media) - _caption_and_emit(opts, media, out, ffmpeg, state, json_mode=json_mode) + return _caption_build(opts, media, out, ffmpeg, state, json_mode=json_mode) -def _caption_and_emit( +def _caption_build( opts: CaptionOptions, media: Path, out: Path, @@ -159,8 +236,8 @@ def _caption_and_emit( state: AppState, *, json_mode: bool, -) -> None: - """Caption an already-local video file into ``out`` and report the result.""" +) -> batch.SourceResult: + """Caption an already-local video file into ``out``; the result as plain data.""" transcript = mediafile.resolve_transcript( state.resolve_api_key(), opts.transcript_id, @@ -184,8 +261,4 @@ def _caption_and_emit( "transcript_id": transcript_id, "captions": captions, } - output.emit( - payload, - lambda _: output.success(f"{escape(str(out))} {captions} caption(s) burned in"), - json_mode=json_mode, - ) + return batch.SourceResult(payload=payload, summary=f"{out} {captions} caption(s) burned in") diff --git a/aai_cli/commands/clip/__init__.py b/aai_cli/commands/clip/__init__.py index 3a74299b..d0567b42 100644 --- a/aai_cli/commands/clip/__init__.py +++ b/aai_cli/commands/clip/__init__.py @@ -54,15 +54,28 @@ "Pad each clip and collect them in a directory", "assembly clip meeting.mp4 --speaker A --padding 0.5 --out-dir clips", ), + ( + "Clip a whole folder by speaker", + 'find calls -name "*.mp4" | assembly clip --from-stdin --speaker A', + ), ] ), ) def clip( ctx: typer.Context, - media: str = typer.Argument( - ..., + media: str | None = typer.Argument( + None, help="Audio/video to cut clips from: a local file, or a YouTube/media-page " - "URL (audio downloaded via yt-dlp)", + "URL (audio downloaded via yt-dlp). Omit with --from-stdin", + ), + from_stdin: bool = options.batch_from_stdin_option( + "Batch mode: read media paths/URLs from stdin, one per line (composes with find/ls output)" + ), + concurrency: int = options.batch_concurrency_option( + "How many sources to clip at once in batch mode" + ), + force: bool = options.batch_force_option( + "Batch mode: re-clip sources that already have clip files" ), transcript_id: str | None = typer.Option( None, @@ -134,9 +147,13 @@ def clip( using ffmpeg (which must be installed). A YouTube/media-page source is downloaded first (audio only, or the full video with --video); its clips land in --out-dir or the current directory. + + Batch mode: pipe one path/URL per line with --from-stdin to clip many sources + concurrently (--concurrency), reusing the same selectors for each. A re-run + skips sources that already have clip files (--force re-cuts them). """ opts = clip_exec.ClipOptions( - media=media, + media=media or "", transcript_id=transcript_id, speakers=speaker, search=search, @@ -148,5 +165,8 @@ def clip( snap=snap, out_dir=out_dir, video=video, + from_stdin=from_stdin, + concurrency=concurrency, + force=force, ) run_with_options(ctx, clip_exec.run_clip, opts, json=json_out) diff --git a/aai_cli/commands/clip/_exec.py b/aai_cli/commands/clip/_exec.py index 6ca0cc86..bbb03871 100644 --- a/aai_cli/commands/clip/_exec.py +++ b/aai_cli/commands/clip/_exec.py @@ -19,6 +19,7 @@ from __future__ import annotations +import dataclasses import json from dataclasses import dataclass from pathlib import Path @@ -26,12 +27,12 @@ from rich.markup import escape -from aai_cli.app import mediafile +from aai_cli.app import batch, mediafile from aai_cli.app.context import AppState from aai_cli.commands.clip import _select as clip_select from aai_cli.commands.clip._select import Segment from aai_cli.core import jsonshape, llm, stdio, youtube -from aai_cli.core.errors import CLIError, UsageError +from aai_cli.core.errors import CLIError, UsageError, mutually_exclusive from aai_cli.ui import output @@ -41,7 +42,8 @@ class ClipOptions: resolves it into the ``json_mode`` argument).""" # The raw source as typed: a local path, or a downloadable media-page URL - # (a pathlib.Path would collapse the "//" in "https://"). + # (a pathlib.Path would collapse the "//" in "https://"). Empty in batch mode, + # where the sources arrive on stdin (--from-stdin) instead of as the argument. media: str transcript_id: str | None speakers: list[str] @@ -54,6 +56,9 @@ class ClipOptions: snap: bool out_dir: Path | None video: bool + from_stdin: bool + concurrency: int + force: bool def _llm_segments( @@ -311,14 +316,104 @@ def human_line(self) -> str: def run_clip(opts: ClipOptions, state: AppState, *, json_mode: bool) -> None: - """Execute one `assembly clip` invocation from already-parsed flags.""" + """Execute `assembly clip`: one source, or a stdin batch (`--from-stdin`).""" _validate_out_dir(opts.out_dir) _validate_selection(opts) - youtube.validate_video_flag(opts.media, video=opts.video) explicit = [clip_select.parse_range(value) for value in opts.ranges] ffmpeg = mediafile.require_ffmpeg("cut media") - # A media-page URL is downloaded once — the audio track by default, the full - # video with --video so the clips carry video too — and clipped locally. + sources = batch.stdin_sources(opts.media, from_stdin=opts.from_stdin) + if sources is not None: + _reject_batch_conflicts(opts) + batch.run_batch( + sources, + worker=_clip_worker( + opts, explicit, ffmpeg, state, force=opts.force, json_mode=json_mode + ), + concurrency=opts.concurrency, + summary_verb="Clipped", + json_mode=json_mode, + quiet=state.quiet, + ) + return + if not opts.media: + raise UsageError( + "Pass a media file or URL to clip, or --from-stdin to read a list from stdin.", + suggestion="e.g. assembly clip meeting.mp4 --speaker A", + ) + payload, written = _clip_one(opts, explicit, ffmpeg, state, json_mode=json_mode) + output.emit( + payload, + lambda _: "\n".join(clip.human_line() for clip in written), + json_mode=json_mode, + ) + + +def _reject_batch_conflicts(opts: ClipOptions) -> None: + """Flags that can't span a many-source batch (a single transcript id can't).""" + mutually_exclusive( + ("--transcript-id/-t", opts.transcript_id), + ("--from-stdin", True), + suggestion="A transcript id can't apply to many sources; drop -t in batch mode.", + ) + + +def _clip_worker( + opts: ClipOptions, + explicit: list[Segment], + ffmpeg: str, + state: AppState, + *, + force: bool, + json_mode: bool, +) -> batch.Worker: + """A per-source worker for the batch runner: skip a source whose clips already + exist (unless ``--force``), else cut it with spinners silenced.""" + quiet_state = dataclasses.replace(state, quiet=True) + + def worker(source: str) -> batch.SourceResult: + if not force and _clips_exist(source, opts.out_dir): + return batch.SourceResult( + payload={"source": source}, summary="clips exist", status="skipped" + ) + payload, written = _clip_one( + dataclasses.replace(opts, media=source), + explicit, + ffmpeg, + quiet_state, + json_mode=json_mode, + ) + return batch.SourceResult(payload=payload, summary=f"{len(written)} clip(s)") + + return worker + + +def _clips_exist(source: str, out_dir: Path | None) -> bool: + """Whether a local ``source`` already has clip files (so batch mode skips it). + + URLs never match (the clipped stem isn't known until download), so they're + always processed. + """ + if "://" in source: + return False + src = Path(source) + directory = out_dir if out_dir is not None else src.parent + return any(directory.glob(f"{src.stem}.clip*{src.suffix}")) + + +def _clip_one( + opts: ClipOptions, + explicit: list[Segment], + ffmpeg: str, + state: AppState, + *, + json_mode: bool, +) -> tuple[dict[str, object], list[WrittenClip]]: + """Resolve ``opts.media`` to a local file and cut its clips; the payload + clips. + + A media-page URL is downloaded once — the audio track by default, the full + video with --video so the clips carry video too — and clipped locally. + """ + youtube.validate_video_flag(opts.media, video=opts.video) with mediafile.resolve_media_source( opts.media, "clip", @@ -336,10 +431,10 @@ def run_clip(opts: ClipOptions, state: AppState, *, json_mode: bool) -> None: out_dir: Path | None = opts.out_dir if downloaded and out_dir is None: out_dir = Path.cwd() - _cut_and_emit(opts, media, out_dir, explicit, ffmpeg, state, json_mode=json_mode) + return _cut(opts, media, out_dir, explicit, ffmpeg, state, json_mode=json_mode) -def _cut_and_emit( +def _cut( opts: ClipOptions, media: Path, out_dir: Path | None, @@ -348,8 +443,8 @@ def _cut_and_emit( state: AppState, *, json_mode: bool, -) -> None: - """Select, cut, and report the clips for an already-local media file.""" +) -> tuple[dict[str, object], list[WrittenClip]]: + """Select and cut the clips for an already-local media file; the payload + clips.""" matched, transcript_id = _transcript_segments(opts, media, state, json_mode=json_mode) segments = clip_select.merge_segments([*matched, *explicit], opts.padding) if opts.snap: @@ -368,8 +463,4 @@ def _cut_and_emit( "transcript_id": transcript_id, "clips": [clip.payload() for clip in written], } - output.emit( - payload, - lambda _: "\n".join(clip.human_line() for clip in written), - json_mode=json_mode, - ) + return payload, written diff --git a/aai_cli/commands/dub/__init__.py b/aai_cli/commands/dub/__init__.py index 6939316a..0e62fc80 100644 --- a/aai_cli/commands/dub/__init__.py +++ b/aai_cli/commands/dub/__init__.py @@ -51,15 +51,28 @@ "Choose the output file", "assembly --sandbox dub talk.mp4 -l de --out talk-german.mp4", ), + ( + "Dub a whole folder into German", + 'find talks -name "*.mp4" | assembly --sandbox dub --from-stdin -l de', + ), ] ), ) def dub( ctx: typer.Context, - media: str = typer.Argument( - ..., + media: str | None = typer.Argument( + None, help="Audio/video to dub: a local file (the video stream is copied untouched), " - "or a YouTube/media-page URL (downloaded via yt-dlp)", + "or a YouTube/media-page URL (downloaded via yt-dlp). Omit with --from-stdin", + ), + from_stdin: bool = options.batch_from_stdin_option( + "Batch mode: read media paths/URLs from stdin, one per line (composes with find/ls output)" + ), + concurrency: int = options.batch_concurrency_option( + "How many sources to dub at once in batch mode" + ), + force: bool = options.batch_force_option( + "Batch mode: re-dub sources whose output file already exists" ), lang: str = typer.Option( ..., @@ -127,9 +140,13 @@ def dub( time slice of it). Streaming TTS only exists in the sandbox today — run it as 'assembly --sandbox dub' (--sandbox goes before the subcommand). Requires ffmpeg. + + Batch mode: pipe one path/URL per line with --from-stdin to dub many sources + concurrently (--concurrency). Each writes its own .dub.; a + re-run skips sources whose output already exists (--force re-does them). """ opts = dub_exec.DubOptions( - media=media, + media=media or "", language=lang, source_language=source_lang, transcript_id=transcript_id, @@ -139,5 +156,8 @@ def dub( out=out, video=video, download_sections=download_sections, + from_stdin=from_stdin, + concurrency=concurrency, + force=force, ) run_with_options(ctx, dub_exec.run_dub, opts, json=json_out) diff --git a/aai_cli/commands/dub/_exec.py b/aai_cli/commands/dub/_exec.py index 85905197..606153dd 100644 --- a/aai_cli/commands/dub/_exec.py +++ b/aai_cli/commands/dub/_exec.py @@ -18,6 +18,7 @@ from __future__ import annotations +import dataclasses import re import tempfile from dataclasses import dataclass @@ -25,11 +26,11 @@ from rich.markup import escape -from aai_cli.app import mediafile +from aai_cli.app import batch, mediafile from aai_cli.app.context import AppState from aai_cli.commands.dub import _pipeline as pipeline from aai_cli.core import youtube -from aai_cli.core.errors import UsageError +from aai_cli.core.errors import UsageError, mutually_exclusive from aai_cli.tts import audio, dialogue, session from aai_cli.ui import output @@ -62,6 +63,7 @@ class DubOptions: """Every `assembly dub` flag as plain data (``--json`` excluded: run_command resolves it into the ``json_mode`` argument).""" + # Empty in batch mode, where the sources arrive on stdin (--from-stdin). media: str language: str source_language: str | None @@ -72,6 +74,9 @@ class DubOptions: out: Path | None video: bool download_sections: list[str] + from_stdin: bool + concurrency: int + force: bool def resolve_language(value: str) -> str: @@ -100,19 +105,109 @@ def default_out_path(media: Path, language: str) -> Path: def run_dub(opts: DubOptions, state: AppState, *, json_mode: bool) -> None: - """Execute one `assembly dub` invocation from already-parsed flags.""" + """Execute `assembly dub`: one source, or a stdin batch (`--from-stdin`).""" language = resolve_language(opts.language) session.require_available("dub") # Parse --voice now: a malformed mapping must fail before the billed pipeline. voice_plan = pipeline.VoicePlan(*dialogue.parse_voice_overrides(opts.voice)) + sources = batch.stdin_sources(opts.media, from_stdin=opts.from_stdin) + if sources is not None: + _reject_batch_conflicts(opts) + batch.run_batch( + sources, + worker=_dub_worker( + opts, state, language, voice_plan, force=opts.force, json_mode=json_mode + ), + concurrency=opts.concurrency, + summary_verb="Dubbed", + json_mode=json_mode, + quiet=state.quiet, + ) + return + if not opts.media: + raise UsageError( + "Pass a video/audio file or URL to dub, or --from-stdin to read a list from stdin.", + suggestion="e.g. assembly --sandbox dub talk.mp4 -l de", + ) + result = _dub_one(opts, state, language, voice_plan, json_mode=json_mode) + output.emit( + result.payload, lambda _: output.success(escape(result.summary)), json_mode=json_mode + ) + + +def _reject_batch_conflicts(opts: DubOptions) -> None: + """Single-result flags that can't span a many-source batch.""" + mutually_exclusive( + ("--out", opts.out), + ("--from-stdin", True), + suggestion="In batch mode each source gets its own .dub.; drop --out.", + ) + mutually_exclusive( + ("--transcript-id/-t", opts.transcript_id), + ("--from-stdin", True), + suggestion="A transcript id can't apply to many sources; drop -t in batch mode.", + ) + + +def _dub_worker( + opts: DubOptions, + state: AppState, + language: str, + voice_plan: pipeline.VoicePlan, + *, + force: bool, + json_mode: bool, +) -> batch.Worker: + """A per-source worker for the batch runner: skip a source whose default output + already exists (unless ``--force``), else dub it with spinners silenced.""" + quiet_state = dataclasses.replace(state, quiet=True) + + def worker(source: str) -> batch.SourceResult: + if not force and (existing := _existing_output(source, language)) is not None: + return batch.SourceResult( + payload={"source": source, "out": str(existing)}, + summary=f"{existing} exists", + status="skipped", + ) + return _dub_one( + dataclasses.replace(opts, media=source), + quiet_state, + language, + voice_plan, + json_mode=json_mode, + ) + + return worker + + +def _existing_output(source: str, language: str) -> Path | None: + """The default output for a local ``source`` when it already exists (so batch mode + skips it), else ``None`` — a URL or a source with no prior output, both processed.""" + if "://" in source: + return None + out = default_out_path(Path(source), language) + return out if out.exists() else None + + +def _dub_one( + opts: DubOptions, + state: AppState, + language: str, + voice_plan: pipeline.VoicePlan, + *, + json_mode: bool, +) -> batch.SourceResult: + """Resolve ``opts.media`` to a local file, dub it, and return the result. + + A media-page URL is downloaded once — the audio track by default, the full + video with --video so the dub keeps the picture, only the --download-sections + slices when given — and dubbed locally. + """ youtube.validate_video_flag(opts.media, video=opts.video) youtube.validate_sections_flag(opts.media, opts.download_sections) # ffmpeg is checked before any (billed) download/transcription so a missing # dependency fails before any fetch. ffmpeg = mediafile.require_ffmpeg("write the dubbed file") - # A media-page URL is downloaded once — the audio track by default, the full - # video with --video so the dub keeps the picture, only the - # --download-sections slices when given — and dubbed locally. with mediafile.resolve_media_source( opts.media, "dub", @@ -129,10 +224,12 @@ def run_dub(opts: DubOptions, state: AppState, *, json_mode: bool) -> None: opts.out, media, downloaded=downloaded, namer=lambda m: default_out_path(m, language) ) mediafile.validate_out(out, media) - _dub_and_emit(opts, media, out, language, ffmpeg, voice_plan, state, json_mode=json_mode) + return _dub_build( + opts, media, out, language, ffmpeg, voice_plan, state, json_mode=json_mode + ) -def _dub_and_emit( +def _dub_build( opts: DubOptions, media: Path, out: Path, @@ -142,8 +239,8 @@ def _dub_and_emit( state: AppState, *, json_mode: bool, -) -> None: - """Dub an already-local media file into ``out`` and report the result.""" +) -> batch.SourceResult: + """Dub an already-local media file into ``out``; the result as plain data.""" api_key = state.resolve_api_key() transcript = mediafile.resolve_diarized_transcript( api_key, @@ -192,12 +289,7 @@ def _dub_and_emit( "sample_rate": sample_rate, "audio_duration_seconds": duration, } - output.emit( - payload, - # language and voices carry user-typed text, so they need escaping too. - lambda _: output.success( - f"{escape(str(out))} dubbed to {escape(language)} " - f"({len(utterances)} utterances, {escape(voices_text)})" - ), - json_mode=json_mode, + return batch.SourceResult( + payload=payload, + summary=f"{out} dubbed to {language} ({len(utterances)} utterances, {voices_text})", ) diff --git a/aai_cli/options.py b/aai_cli/options.py index 03824a07..493e3822 100644 --- a/aai_cli/options.py +++ b/aai_cli/options.py @@ -52,36 +52,46 @@ def chars_per_caption_option() -> int | None: # this module owns the FBT003 carve-out for Typer's boolean positional defaults. -def batch_from_stdin_option() -> bool: - """The ``--from-stdin`` flag: batch mode fed one path/URL per stdin line.""" +def batch_from_stdin_option( + help_text: str = "Batch mode: read audio paths/URLs from stdin, one per line " + "(composes with find/ls/yt-dlp output)", +) -> bool: + """The ``--from-stdin`` flag: batch mode fed one path/URL per stdin line. + + ``help_text`` lets the media commands (clip/dub/caption) reword "audio" for + their own source kind; the default carries ``transcribe``'s wording. + """ flag: bool = typer.Option( False, "--from-stdin", - help="Batch mode: read audio paths/URLs from stdin, one per line " - "(composes with find/ls/yt-dlp output)", + help=help_text, rich_help_panel=help_panels.OPT_BATCH, ) return flag -def batch_concurrency_option() -> int: - """The ``--concurrency`` option: how many sources transcribe at once in batch mode.""" +def batch_concurrency_option( + help_text: str = "How many sources to transcribe at once in batch mode", +) -> int: + """The ``--concurrency`` option: how many sources run at once in batch mode.""" value: int = typer.Option( DEFAULT_BATCH_CONCURRENCY, "--concurrency", min=1, - help="How many sources to transcribe at once in batch mode", + help=help_text, rich_help_panel=help_panels.OPT_BATCH, ) return value -def batch_force_option() -> bool: - """The ``--force`` flag: re-transcribe even when a completed sidecar exists.""" +def batch_force_option( + help_text: str = "Batch mode: re-transcribe sources whose sidecar already records a completed run", +) -> bool: + """The ``--force`` flag: reprocess a source even when its output already exists.""" flag: bool = typer.Option( False, "--force", - help="Batch mode: re-transcribe sources whose sidecar already records a completed run", + help=help_text, rich_help_panel=help_panels.OPT_BATCH, ) return flag diff --git a/tests/__snapshots__/test_snapshots_help_run.ambr b/tests/__snapshots__/test_snapshots_help_run.ambr index 002dde60..e0c2b43b 100644 --- a/tests/__snapshots__/test_snapshots_help_run.ambr +++ b/tests/__snapshots__/test_snapshots_help_run.ambr @@ -181,7 +181,7 @@ # name: test_command_help_matches_snapshot[caption] ''' - Usage: assembly caption [OPTIONS] MEDIA + Usage: assembly caption [OPTIONS] [MEDIA] Burn always-visible captions into a video @@ -192,11 +192,15 @@ downloaded first (always the full video); its output lands in --out or the current directory. + Batch mode: pipe one path/URL per line with --from-stdin to caption many + sources concurrently (--concurrency). Each writes its own + .captioned; + a re-run skips sources whose output already exists (--force re-does them). + ╭─ Arguments ──────────────────────────────────────────────────────────────────╮ - │ * media TEXT Video to caption: a local file, or a │ + │ media [MEDIA] Video to caption: a local file, or a │ │ YouTube/media-page URL (the full video is downloaded │ - │ via yt-dlp) │ - │ [required] │ + │ via yt-dlp). Omit with --from-stdin │ ╰──────────────────────────────────────────────────────────────────────────────╯ ╭─ Options ────────────────────────────────────────────────────────────────────╮ │ --transcript-id -t TEXT Reuse an existing │ @@ -216,6 +220,16 @@ │ captioned file │ │ --help Show this message and │ │ exit. │ + ╰──────────────────────────────────────────────────────────────────────────────╯ + ╭─ Batch ──────────────────────────────────────────────────────────────────────╮ + │ --from-stdin Batch mode: read video paths/URLs │ + │ from stdin, one per line │ + │ (composes with find/ls output) │ + │ --concurrency INTEGER RANGE [x>=1] How many sources to caption at │ + │ once in batch mode │ + │ [default: 4] │ + │ --force Batch mode: re-caption sources │ + │ whose output file already exists │ ╰──────────────────────────────────────────────────────────────────────────────╯ Examples @@ -229,6 +243,8 @@ $ assembly caption talk.mp4 --chars-per-caption 32 --font-size 28 Choose the output file $ assembly caption talk.mp4 --out talk-captioned.mp4 + Caption a whole folder of videos + $ find clips -name "*.mp4" | assembly caption --from-stdin @@ -237,7 +253,7 @@ # name: test_command_help_matches_snapshot[clip] ''' - Usage: assembly clip [OPTIONS] MEDIA + Usage: assembly clip [OPTIONS] [MEDIA] Cut clips from media by speaker, text match, LLM pick, or time range @@ -250,10 +266,14 @@ downloaded first (audio only, or the full video with --video); its clips land in --out-dir or the current directory. + Batch mode: pipe one path/URL per line with --from-stdin to clip many sources + concurrently (--concurrency), reusing the same selectors for each. A re-run + skips sources that already have clip files (--force re-cuts them). + ╭─ Arguments ──────────────────────────────────────────────────────────────────╮ - │ * media TEXT Audio/video to cut clips from: a local file, or a │ - │ YouTube/media-page URL (audio downloaded via yt-dlp) │ - │ [required] │ + │ media [MEDIA] Audio/video to cut clips from: a local file, or a │ + │ YouTube/media-page URL (audio downloaded via yt-dlp). │ + │ Omit with --from-stdin │ ╰──────────────────────────────────────────────────────────────────────────────╯ ╭─ Options ────────────────────────────────────────────────────────────────────╮ │ --transcript-id -t TEXT Reuse an existing │ @@ -306,6 +326,16 @@ │ --help Show this message │ │ and exit. │ ╰──────────────────────────────────────────────────────────────────────────────╯ + ╭─ Batch ──────────────────────────────────────────────────────────────────────╮ + │ --from-stdin Batch mode: read media paths/URLs │ + │ from stdin, one per line │ + │ (composes with find/ls output) │ + │ --concurrency INTEGER RANGE [x>=1] How many sources to clip at once │ + │ in batch mode │ + │ [default: 4] │ + │ --force Batch mode: re-clip sources that │ + │ already have clip files │ + ╰──────────────────────────────────────────────────────────────────────────────╯ ╭─ LLM Transform ──────────────────────────────────────────────────────────────╮ │ --llm TEXT Let an LLM Gateway model pick the windows to │ │ clip from the timestamped transcript (e.g. │ @@ -338,6 +368,8 @@ meeting.mp4 -t - --llm "the funniest exchange" Pad each clip and collect them in a directory $ assembly clip meeting.mp4 --speaker A --padding 0.5 --out-dir clips + Clip a whole folder by speaker + $ find calls -name "*.mp4" | assembly clip --from-stdin --speaker A @@ -401,7 +433,7 @@ # name: test_command_help_matches_snapshot[dub] ''' - Usage: assembly dub [OPTIONS] MEDIA + Usage: assembly dub [OPTIONS] [MEDIA] [sandbox] Dub a video or audio file into another language @@ -415,11 +447,14 @@ today — run it as 'assembly --sandbox dub' (--sandbox goes before the subcommand). Requires ffmpeg. + Batch mode: pipe one path/URL per line with --from-stdin to dub many sources + concurrently (--concurrency). Each writes its own .dub.; a + re-run skips sources whose output already exists (--force re-does them). + ╭─ Arguments ──────────────────────────────────────────────────────────────────╮ - │ * media TEXT Audio/video to dub: a local file (the video stream is │ + │ media [MEDIA] Audio/video to dub: a local file (the video stream is │ │ copied untouched), or a YouTube/media-page URL │ - │ (downloaded via yt-dlp) │ - │ [required] │ + │ (downloaded via yt-dlp). Omit with --from-stdin │ ╰──────────────────────────────────────────────────────────────────────────────╯ ╭─ Options ────────────────────────────────────────────────────────────────────╮ │ * --lang -l TEXT Target language: an ISO code (de, fr, │ @@ -451,6 +486,16 @@ │ --json -j Emit JSON describing the dubbed file │ │ --help Show this message and exit. │ ╰──────────────────────────────────────────────────────────────────────────────╯ + ╭─ Batch ──────────────────────────────────────────────────────────────────────╮ + │ --from-stdin Batch mode: read media paths/URLs │ + │ from stdin, one per line │ + │ (composes with find/ls output) │ + │ --concurrency INTEGER RANGE [x>=1] How many sources to dub at once │ + │ in batch mode │ + │ [default: 4] │ + │ --force Batch mode: re-dub sources whose │ + │ output file already exists │ + ╰──────────────────────────────────────────────────────────────────────────────╯ ╭─ LLM Transform ──────────────────────────────────────────────────────────────╮ │ --model TEXT LLM Gateway model that translates the │ │ utterances │ @@ -477,6 +522,8 @@ $ assembly --sandbox dub talk.mp4 -l de -t TRANSCRIPT_ID Choose the output file $ assembly --sandbox dub talk.mp4 -l de --out talk-german.mp4 + Dub a whole folder into German + $ find talks -name "*.mp4" | assembly --sandbox dub --from-stdin -l de diff --git a/tests/_clip_helpers.py b/tests/_clip_helpers.py index ce50c2e9..d186c94c 100644 --- a/tests/_clip_helpers.py +++ b/tests/_clip_helpers.py @@ -34,6 +34,9 @@ snap=True, out_dir=None, video=False, + from_stdin=False, + concurrency=4, + force=False, ) diff --git a/tests/_dub_helpers.py b/tests/_dub_helpers.py index cfe87491..4338cc16 100644 --- a/tests/_dub_helpers.py +++ b/tests/_dub_helpers.py @@ -33,6 +33,9 @@ out=None, video=False, download_sections=[], + from_stdin=False, + concurrency=4, + force=False, ) SAMPLE_RATE = 100 # tiny rate keeps the timeline byte math exact and readable diff --git a/tests/test_batch.py b/tests/test_batch.py new file mode 100644 index 00000000..632020df --- /dev/null +++ b/tests/test_batch.py @@ -0,0 +1,201 @@ +"""Tests for the shared media-batch runner (aai_cli/app/batch.py). + +Drives :func:`batch.run_batch` with fake workers (no command pipeline) so the +concurrency, NDJSON, skip, failure, and summary behavior is exercised directly, +and :func:`batch.stdin_sources` with a stubbed stdin reader. +""" + +from __future__ import annotations + +import dataclasses +import json + +import pytest + +from aai_cli.app import batch +from aai_cli.core import stdio +from aai_cli.core.errors import CLIError, NotAuthenticated, UsageError + + +def _completed(source: str) -> batch.SourceResult: + return batch.SourceResult( + payload={"source": source, "out": f"{source}.out"}, summary=f"{source} done" + ) + + +def test_source_result_is_immutable(): + result = batch.SourceResult(payload={}, summary="x") + # A computed field name (not a string literal) so ruff B010 doesn't rewrite the + # setattr back into a static assignment pyright would then reject. + field_name = dataclasses.fields(result)[0].name + with pytest.raises(dataclasses.FrozenInstanceError): + setattr(result, field_name, None) + + +# --- stdin_sources ------------------------------------------------------------- + + +def test_stdin_sources_returns_none_without_the_flag(): + assert batch.stdin_sources("talk.mp4", from_stdin=False) is None + + +def test_stdin_sources_reads_and_dedupes_stdin_lines(monkeypatch): + monkeypatch.setattr(stdio, "iter_piped_stdin_lines", lambda: iter(["a.mp4", "b.mp4", "a.mp4"])) + assert batch.stdin_sources("", from_stdin=True) == ["a.mp4", "b.mp4"] + + +def test_stdin_sources_rejects_a_positional_source_with_the_flag(): + with pytest.raises(UsageError) as exc: + batch.stdin_sources("talk.mp4", from_stdin=True) + assert "don't also pass a source" in exc.value.message + + +def test_stdin_sources_rejects_empty_stdin(monkeypatch): + monkeypatch.setattr(stdio, "iter_piped_stdin_lines", lambda: iter([])) + with pytest.raises(UsageError) as exc: + batch.stdin_sources("", from_stdin=True) + assert "No sources received on stdin" in exc.value.message + + +# --- run_batch: human mode ----------------------------------------------------- + + +def test_run_batch_human_renders_a_table_and_a_summary(capsys): + batch.run_batch( + ["a.mp4", "b.mp4"], + worker=_completed, + concurrency=2, + summary_verb="Captioned", + json_mode=False, + quiet=False, + ) + captured = capsys.readouterr() + # The table (stdout) lists every source and its result line. + assert "a.mp4" in captured.out + assert "b.mp4" in captured.out + assert "done" in captured.out + # The tally lands on stderr so stdout stays the result surface. + assert "Captioned 2, skipped 0." in captured.err + # Human mode emits no JSON. + assert "{" not in captured.out + + +def test_run_batch_summary_counts_skips(capsys): + def worker(source: str) -> batch.SourceResult: + if source == "a.mp4": + return batch.SourceResult( + payload={"source": source}, summary="exists", status="skipped" + ) + return _completed(source) + + batch.run_batch( + ["a.mp4", "b.mp4"], + worker=worker, + concurrency=1, + summary_verb="Dubbed", + json_mode=False, + quiet=False, + ) + assert "Dubbed 1, skipped 1." in capsys.readouterr().err + + +def test_run_batch_quiet_suppresses_the_summary(capsys): + batch.run_batch( + ["a.mp4"], + worker=_completed, + concurrency=1, + summary_verb="Clipped", + json_mode=False, + quiet=True, + ) + assert "Clipped" not in capsys.readouterr().err + + +# --- run_batch: JSON mode ------------------------------------------------------ + + +def test_run_batch_json_emits_one_ndjson_record_per_source(capsys): + batch.run_batch( + ["a.mp4", "b.mp4"], + worker=_completed, + concurrency=1, + summary_verb="Captioned", + json_mode=True, + quiet=False, + ) + captured = capsys.readouterr() + records = [json.loads(line) for line in captured.out.splitlines()] + assert len(records) == 2 + assert {r["source"] for r in records} == {"a.mp4", "b.mp4"} + assert all(r["type"] == "result" and r["status"] == "completed" for r in records) + # The completed payload is merged in (the command's own JSON shape). + assert {r["out"] for r in records} == {"a.mp4.out", "b.mp4.out"} + # JSON mode prints no human summary on stderr. + assert "Captioned" not in captured.err + + +def test_run_batch_json_records_a_skip_with_its_payload(capsys): + def worker(source: str) -> batch.SourceResult: + return batch.SourceResult( + payload={"source": source, "out": "x"}, summary="exists", status="skipped" + ) + + batch.run_batch( + ["a.mp4"], worker=worker, concurrency=1, summary_verb="Dubbed", json_mode=True, quiet=False + ) + record = json.loads(capsys.readouterr().out) + assert record == {"type": "result", "source": "a.mp4", "status": "skipped", "out": "x"} + + +def test_run_batch_json_records_a_failure_as_an_error(capsys): + def worker(source: str) -> batch.SourceResult: + if source == "bad.mp4": + raise CLIError("boom", error_type="x") + return _completed(source) + + with pytest.raises(CLIError) as exc: + batch.run_batch( + ["bad.mp4", "ok.mp4"], + worker=worker, + concurrency=1, + summary_verb="Captioned", + json_mode=True, + quiet=False, + ) + # The batch still exits non-zero, with a resume hint. + assert "1 of 2 sources failed" in exc.value.message + assert exc.value.error_type == "batch_failed" + records = [json.loads(line) for line in capsys.readouterr().out.splitlines()] + failed = next(r for r in records if r["status"] == "failed") + assert failed == {"type": "result", "source": "bad.mp4", "status": "failed", "error": "boom"} + + +# --- run_batch: failure handling ----------------------------------------------- + + +def test_run_batch_raises_when_any_source_fails(capsys): + def worker(source: str) -> batch.SourceResult: + raise CLIError("nope", error_type="x") + + with pytest.raises(CLIError) as exc: + batch.run_batch( + ["a.mp4"], worker=worker, concurrency=1, summary_verb="X", json_mode=False, quiet=False + ) + assert exc.value.error_type == "batch_failed" + # A failed batch never prints the success tally. + assert "skipped" not in capsys.readouterr().err + + +def test_run_batch_aborts_the_whole_batch_on_not_authenticated(): + def worker(source: str) -> batch.SourceResult: + raise NotAuthenticated("sign in") + + with pytest.raises(NotAuthenticated): + batch.run_batch( + ["a.mp4", "b.mp4"], + worker=worker, + concurrency=1, + summary_verb="X", + json_mode=False, + quiet=False, + ) diff --git a/tests/test_caption_batch.py b/tests/test_caption_batch.py new file mode 100644 index 00000000..47edb874 --- /dev/null +++ b/tests/test_caption_batch.py @@ -0,0 +1,144 @@ +"""Batch-mode tests for `assembly caption` (--from-stdin / --concurrency / --force). + +The single-source pipeline (`_caption_one` → transcribe → SRT → ffmpeg) is covered +in test_caption_exec.py; here `_caption_one` is faked so the tests pin only the +batch wiring: source dispatch, the skip-on-existing-output worker, and the +single-result flags that can't span a batch. +""" + +from __future__ import annotations + +import dataclasses +import json +from pathlib import Path + +import pytest + +from aai_cli.app import batch +from aai_cli.app.context import AppState +from aai_cli.commands.caption import _exec as caption_exec +from aai_cli.core import stdio +from aai_cli.core.errors import UsageError +from tests.test_caption_exec import DEFAULTS + + +def _pipe(monkeypatch: pytest.MonkeyPatch, *sources: str) -> None: + monkeypatch.setattr(stdio, "iter_piped_stdin_lines", lambda: iter(sources)) + + +def _fake_caption_one(seen: list[str]): + def one(opts, state, *, json_mode): + # The batch worker silences spinners by handing each source a quiet state + # (concurrent Rich spinners would clobber the live table). + assert state.quiet is True + seen.append(opts.media) + return batch.SourceResult( + payload={"source": opts.media, "out": f"{opts.media}.captioned"}, + summary=f"{opts.media} captioned", + ) + + return one + + +def _batch_opts(**overrides): + return dataclasses.replace(DEFAULTS, media="", from_stdin=True, **overrides) + + +# --- source dispatch ----------------------------------------------------------- + + +def test_batch_captions_each_piped_source(monkeypatch, capsys): + _pipe(monkeypatch, "a.mp4", "b.mp4") + seen: list[str] = [] + monkeypatch.setattr(caption_exec, "_caption_one", _fake_caption_one(seen)) + + caption_exec.run_caption(_batch_opts(concurrency=2), AppState(), json_mode=False) + + assert sorted(seen) == ["a.mp4", "b.mp4"] + assert "Captioned 2, skipped 0." in capsys.readouterr().err + + +def test_batch_emits_ndjson_per_source(monkeypatch, capsys): + _pipe(monkeypatch, "a.mp4") + monkeypatch.setattr(caption_exec, "_caption_one", _fake_caption_one([])) + + caption_exec.run_caption(_batch_opts(), AppState(), json_mode=True) + + record = json.loads(capsys.readouterr().out) + assert record["source"] == "a.mp4" + assert record["out"] == "a.mp4.captioned" + assert record["status"] == "completed" + + +def test_single_source_still_requires_a_media_argument(): + opts = dataclasses.replace(DEFAULTS, media="") + with pytest.raises(UsageError) as exc: + caption_exec.run_caption(opts, AppState(), json_mode=False) + assert "--from-stdin" in (exc.value.suggestion or "") or "--from-stdin" in exc.value.message + + +# --- skip-on-existing-output (resume) ------------------------------------------ + + +def test_batch_skips_a_source_whose_output_exists(tmp_path, monkeypatch, capsys): + src = tmp_path / "a.mp4" + src.write_bytes(b"x") + (tmp_path / "a.captioned.mp4").write_bytes(b"old") # prior output + _pipe(monkeypatch, str(src)) + seen: list[str] = [] + monkeypatch.setattr(caption_exec, "_caption_one", _fake_caption_one(seen)) + + caption_exec.run_caption(_batch_opts(), AppState(), json_mode=False) + + assert seen == [] # never re-captioned + assert "Captioned 0, skipped 1." in capsys.readouterr().err + + +def test_force_recaptions_a_source_whose_output_exists(tmp_path, monkeypatch, capsys): + src = tmp_path / "a.mp4" + src.write_bytes(b"x") + (tmp_path / "a.captioned.mp4").write_bytes(b"old") + _pipe(monkeypatch, str(src)) + seen: list[str] = [] + monkeypatch.setattr(caption_exec, "_caption_one", _fake_caption_one(seen)) + + caption_exec.run_caption(_batch_opts(force=True), AppState(), json_mode=False) + + assert seen == [str(src)] # processed despite the existing output + assert "Captioned 1, skipped 0." in capsys.readouterr().err + + +# --- flags that can't span a batch --------------------------------------------- + + +def test_batch_rejects_out(monkeypatch): + _pipe(monkeypatch, "a.mp4") + with pytest.raises(UsageError) as exc: + caption_exec.run_caption(_batch_opts(out=Path("x.mp4")), AppState(), json_mode=False) + assert "drop --out" in (exc.value.suggestion or "") + + +def test_batch_rejects_transcript_id(monkeypatch): + _pipe(monkeypatch, "a.mp4") + with pytest.raises(UsageError) as exc: + caption_exec.run_caption(_batch_opts(transcript_id="tr_1"), AppState(), json_mode=False) + assert "can't apply to many sources" in (exc.value.suggestion or "") + + +# --- _existing_output -------------------------------------------------------- + + +def test_existing_output_is_none_for_a_url(): + assert caption_exec._existing_output("https://youtu.be/x") is None + + +def test_existing_output_is_none_when_missing(tmp_path): + assert caption_exec._existing_output(str(tmp_path / "a.mp4")) is None + + +def test_existing_output_returns_the_path_when_present(tmp_path): + src = tmp_path / "a.mp4" + src.write_bytes(b"x") + out = tmp_path / "a.captioned.mp4" + out.write_bytes(b"old") + assert caption_exec._existing_output(str(src)) == out diff --git a/tests/test_caption_command.py b/tests/test_caption_command.py index 4d394f37..0d4a7ba3 100644 --- a/tests/test_caption_command.py +++ b/tests/test_caption_command.py @@ -59,6 +59,9 @@ def test_caption_parses_every_flag_into_options(monkeypatch): chars_per_caption=32, font_size=28, out=Path("captioned.mp4"), + from_stdin=False, + concurrency=4, + force=False, ) assert captured["json_mode"] is True @@ -73,6 +76,9 @@ def test_caption_defaults_when_only_media_is_given(monkeypatch): chars_per_caption=None, font_size=None, out=None, + from_stdin=False, + concurrency=4, + force=False, ) assert captured["json_mode"] is False diff --git a/tests/test_caption_exec.py b/tests/test_caption_exec.py index 96052e24..f7be79a7 100644 --- a/tests/test_caption_exec.py +++ b/tests/test_caption_exec.py @@ -32,6 +32,9 @@ chars_per_caption=None, font_size=None, out=None, + from_stdin=False, + concurrency=4, + force=False, ) SRT = "1\n00:00:00,500 --> 00:00:01,500\nHello.\n\n2\n00:00:02,000 --> 00:00:03,000\nWorld.\n" diff --git a/tests/test_clip_batch.py b/tests/test_clip_batch.py new file mode 100644 index 00000000..0b9aa83d --- /dev/null +++ b/tests/test_clip_batch.py @@ -0,0 +1,131 @@ +"""Batch-mode tests for `assembly clip` (--from-stdin / --concurrency / --force). + +The single-source pipeline (`_clip_one` → transcript selection → ffmpeg cuts) is +covered in test_clip_exec.py; here it is faked so the tests pin only the batch +wiring: source dispatch, the skip-when-clips-already-exist worker, and the +transcript-id flag that can't span a batch. clip resolves ffmpeg and validates +the (shared) selection before dispatch, so those run for real. +""" + +from __future__ import annotations + +import dataclasses + +import pytest + +from aai_cli.app.context import AppState +from aai_cli.commands.clip import _exec as clip_exec +from aai_cli.core import stdio +from aai_cli.core.errors import UsageError +from tests._clip_helpers import DEFAULTS + + +@pytest.fixture(autouse=True) +def _ffmpeg_on_path(monkeypatch: pytest.MonkeyPatch): + monkeypatch.setattr("shutil.which", lambda name: f"/usr/bin/{name}") + + +def _pipe(monkeypatch: pytest.MonkeyPatch, *sources: str) -> None: + monkeypatch.setattr(stdio, "iter_piped_stdin_lines", lambda: iter(sources)) + + +def _fake_clip_one(seen: list[str], *, clips: int = 2): + def one(opts, explicit, ffmpeg, state, *, json_mode): + # The batch worker silences spinners by handing each source a quiet state. + assert state.quiet is True + seen.append(opts.media) + payload = {"source": opts.media, "transcript_id": "tr", "clips": [{}] * clips} + return payload, [object()] * clips + + return one + + +def _batch_opts(**overrides): + # A selector is required (validated before dispatch); --speaker applies per source. + return dataclasses.replace(DEFAULTS, media="", from_stdin=True, speakers=["A"], **overrides) + + +def test_batch_clips_each_piped_source(monkeypatch, capsys): + _pipe(monkeypatch, "a.mp4", "b.mp4") + seen: list[str] = [] + monkeypatch.setattr(clip_exec, "_clip_one", _fake_clip_one(seen)) + + clip_exec.run_clip(_batch_opts(concurrency=2), AppState(), json_mode=False) + + assert sorted(seen) == ["a.mp4", "b.mp4"] + captured = capsys.readouterr() + assert "Clipped 2, skipped 0." in captured.err + assert "2 clip(s)" in captured.out # the per-source summary in the table + + +def test_single_source_still_requires_a_media_argument(): + opts = dataclasses.replace(DEFAULTS, media="", speakers=["A"]) + with pytest.raises(UsageError) as exc: + clip_exec.run_clip(opts, AppState(), json_mode=False) + assert "--from-stdin" in (exc.value.suggestion or "") or "--from-stdin" in exc.value.message + + +def test_batch_skips_a_source_that_already_has_clips(tmp_path, monkeypatch, capsys): + src = tmp_path / "a.mp4" + src.write_bytes(b"x") + (tmp_path / "a.clip01.mp4").write_bytes(b"old") # a prior clip + _pipe(monkeypatch, str(src)) + seen: list[str] = [] + monkeypatch.setattr(clip_exec, "_clip_one", _fake_clip_one(seen)) + + clip_exec.run_clip(_batch_opts(), AppState(), json_mode=False) + + assert seen == [] + assert "Clipped 0, skipped 1." in capsys.readouterr().err + + +def test_force_reclips_a_source_that_already_has_clips(tmp_path, monkeypatch, capsys): + src = tmp_path / "a.mp4" + src.write_bytes(b"x") + (tmp_path / "a.clip01.mp4").write_bytes(b"old") + _pipe(monkeypatch, str(src)) + seen: list[str] = [] + monkeypatch.setattr(clip_exec, "_clip_one", _fake_clip_one(seen)) + + clip_exec.run_clip(_batch_opts(force=True), AppState(), json_mode=False) + + assert seen == [str(src)] + assert "Clipped 1, skipped 0." in capsys.readouterr().err + + +def test_batch_rejects_transcript_id(monkeypatch): + _pipe(monkeypatch, "a.mp4") + with pytest.raises(UsageError) as exc: + clip_exec.run_clip(_batch_opts(transcript_id="tr_1"), AppState(), json_mode=False) + assert "can't apply to many sources" in (exc.value.suggestion or "") + + +# --- _clips_exist ------------------------------------------------------------ + + +def test_clips_exist_is_false_for_a_url(): + assert clip_exec._clips_exist("https://youtu.be/x", None) is False + + +def test_clips_exist_is_false_when_no_clip_files(tmp_path): + src = tmp_path / "a.mp4" + src.write_bytes(b"x") + assert clip_exec._clips_exist(str(src), None) is False + + +def test_clips_exist_is_true_when_a_clip_file_is_present(tmp_path): + src = tmp_path / "a.mp4" + src.write_bytes(b"x") + (tmp_path / "a.clip03.mp4").write_bytes(b"old") + assert clip_exec._clips_exist(str(src), None) is True + + +def test_clips_exist_checks_the_out_dir_when_given(tmp_path): + src = tmp_path / "a.mp4" + src.write_bytes(b"x") + out_dir = tmp_path / "clips" + out_dir.mkdir() + # The clip would land in out_dir, not next to the source. + assert clip_exec._clips_exist(str(src), out_dir) is False + (out_dir / "a.clip01.mp4").write_bytes(b"old") + assert clip_exec._clips_exist(str(src), out_dir) is True diff --git a/tests/test_clip_command.py b/tests/test_clip_command.py index 4ed1872d..305f6c32 100644 --- a/tests/test_clip_command.py +++ b/tests/test_clip_command.py @@ -83,6 +83,9 @@ def test_clip_parses_every_flag_into_options(monkeypatch, tmp_path): snap=False, out_dir=tmp_path, video=True, + from_stdin=False, + concurrency=4, + force=False, ) assert captured["json_mode"] is True @@ -104,6 +107,9 @@ def test_clip_defaults_when_only_media_is_given(monkeypatch): snap=True, out_dir=None, video=False, + from_stdin=False, + concurrency=4, + force=False, ) assert captured["json_mode"] is False diff --git a/tests/test_dub_batch.py b/tests/test_dub_batch.py new file mode 100644 index 00000000..5b15c294 --- /dev/null +++ b/tests/test_dub_batch.py @@ -0,0 +1,128 @@ +"""Batch-mode tests for `assembly dub` (--from-stdin / --concurrency / --force). + +The single-source pipeline (`_dub_one`) is covered in test_dub_exec.py; here it +is faked so the tests pin only the batch wiring: source dispatch, the +skip-on-existing-output worker, and the single-result flags that can't span a +batch. The invocation-global setup dub runs before dispatch (sandbox/session, +--voice parsing) still executes, so the sandbox fixture is required. +""" + +from __future__ import annotations + +import dataclasses +from pathlib import Path + +import pytest + +from aai_cli.app import batch +from aai_cli.app.context import AppState +from aai_cli.commands.dub import _exec as dub_exec +from aai_cli.core import stdio +from aai_cli.core.errors import UsageError +from tests._dub_helpers import DEFAULTS, enable_sandbox, patch_api_key + + +@pytest.fixture(autouse=True) +def _sandbox_and_key(monkeypatch: pytest.MonkeyPatch): + enable_sandbox(monkeypatch) + patch_api_key(monkeypatch) + + +def _pipe(monkeypatch: pytest.MonkeyPatch, *sources: str) -> None: + monkeypatch.setattr(stdio, "iter_piped_stdin_lines", lambda: iter(sources)) + + +def _fake_dub_one(seen: list[str]): + def one(opts, state, language, voice_plan, *, json_mode): + # The batch worker silences spinners by handing each source a quiet state. + assert state.quiet is True + seen.append(opts.media) + return batch.SourceResult( + payload={"source": opts.media, "out": f"{opts.media}.dub.de"}, + summary=f"{opts.media} dubbed", + ) + + return one + + +def _batch_opts(**overrides): + return dataclasses.replace(DEFAULTS, media="", from_stdin=True, **overrides) + + +def test_batch_dubs_each_piped_source(monkeypatch, capsys): + _pipe(monkeypatch, "a.mp4", "b.mp4") + seen: list[str] = [] + monkeypatch.setattr(dub_exec, "_dub_one", _fake_dub_one(seen)) + + dub_exec.run_dub(_batch_opts(concurrency=2), AppState(), json_mode=False) + + assert sorted(seen) == ["a.mp4", "b.mp4"] + assert "Dubbed 2, skipped 0." in capsys.readouterr().err + + +def test_single_source_still_requires_a_media_argument(monkeypatch): + opts = dataclasses.replace(DEFAULTS, media="") + with pytest.raises(UsageError) as exc: + dub_exec.run_dub(opts, AppState(), json_mode=False) + assert "--from-stdin" in (exc.value.suggestion or "") or "--from-stdin" in exc.value.message + + +def test_batch_skips_a_source_whose_output_exists(tmp_path, monkeypatch, capsys): + src = tmp_path / "a.mp4" + src.write_bytes(b"x") + (tmp_path / "a.dub.german.mp4").write_bytes(b"old") # prior output for --lang de + _pipe(monkeypatch, str(src)) + seen: list[str] = [] + monkeypatch.setattr(dub_exec, "_dub_one", _fake_dub_one(seen)) + + dub_exec.run_dub(_batch_opts(), AppState(), json_mode=False) + + assert seen == [] + assert "Dubbed 0, skipped 1." in capsys.readouterr().err + + +def test_force_redubs_a_source_whose_output_exists(tmp_path, monkeypatch, capsys): + src = tmp_path / "a.mp4" + src.write_bytes(b"x") + (tmp_path / "a.dub.german.mp4").write_bytes(b"old") + _pipe(monkeypatch, str(src)) + seen: list[str] = [] + monkeypatch.setattr(dub_exec, "_dub_one", _fake_dub_one(seen)) + + dub_exec.run_dub(_batch_opts(force=True), AppState(), json_mode=False) + + assert seen == [str(src)] + assert "Dubbed 1, skipped 0." in capsys.readouterr().err + + +def test_batch_rejects_out(monkeypatch): + _pipe(monkeypatch, "a.mp4") + with pytest.raises(UsageError) as exc: + dub_exec.run_dub(_batch_opts(out=Path("x.mp4")), AppState(), json_mode=False) + assert "drop --out" in (exc.value.suggestion or "") + + +def test_batch_rejects_transcript_id(monkeypatch): + _pipe(monkeypatch, "a.mp4") + with pytest.raises(UsageError) as exc: + dub_exec.run_dub(_batch_opts(transcript_id="tr_1"), AppState(), json_mode=False) + assert "can't apply to many sources" in (exc.value.suggestion or "") + + +# --- _existing_output -------------------------------------------------------- + + +def test_existing_output_is_none_for_a_url(): + assert dub_exec._existing_output("https://youtu.be/x", "German") is None + + +def test_existing_output_is_none_when_missing(tmp_path): + assert dub_exec._existing_output(str(tmp_path / "a.mp4"), "German") is None + + +def test_existing_output_returns_the_path_when_present(tmp_path): + src = tmp_path / "a.mp4" + src.write_bytes(b"x") + out = tmp_path / "a.dub.german.mp4" + out.write_bytes(b"old") + assert dub_exec._existing_output(str(src), "German") == out diff --git a/tests/test_dub_command.py b/tests/test_dub_command.py index ad8f3c87..ea79a145 100644 --- a/tests/test_dub_command.py +++ b/tests/test_dub_command.py @@ -62,6 +62,9 @@ def test_defaults_map_to_options(captured_run): out=None, video=False, download_sections=[], + from_stdin=False, + concurrency=4, + force=False, ) @@ -108,4 +111,7 @@ def test_every_flag_maps_to_options(captured_run): out=Path("dubbed.mp4"), video=True, download_sections=["*0:00-15:00", "intro"], + from_stdin=False, + concurrency=4, + force=False, )