From 45dd4cc0fac1aa62083acb0ecb51a086844153e2 Mon Sep 17 00:00:00 2001 From: Claude Date: Sat, 13 Jun 2026 15:33:05 +0000 Subject: [PATCH] Refactor near-500-line files along natural seams Three core files were approaching the 500-line gate. Each is split along an existing concern boundary, leaving the originals well under the cap and the public behavior unchanged: - transcribe_batch.py (464): batch-mode source selection (directory/glob/stdin expansion, bucket-URL discovery, the single-source-flag rejection) moves to a new transcribe_sources.py. transcribe_exec calls it directly; transcribe_batch keeps only the sidecar/resume/run machinery and imports the shared constants. _remote_sources splits into _remote_glob_sources/_remote_folder_sources to mirror the local _glob_sources/_directory_sources pair (and keep the focused module at complexity rank A). - transcribe_exec.py (474): the pure argument validators and the unrecognized-extension warning move to transcribe_validate.py. - commands/dub/_exec.py (460): the audio pipeline (translate, synthesize, timeline assembly, voice assignment, ffmpeg mux, utterance extraction) moves to commands/dub/_pipeline.py, exposing public names per the clip/_select convention; _exec keeps the orchestration. config.py (480) is left intact: it's a single cohesive config/keyring layer with no clean seam, so splitting it would fight the convention rather than follow one. --- .importlinter | 2 + aai_cli/commands/dub/_exec.py | 266 ++----------------------- aai_cli/commands/dub/_pipeline.py | 258 ++++++++++++++++++++++++ aai_cli/transcribe_batch.py | 168 ++-------------- aai_cli/transcribe_exec.py | 117 ++--------- aai_cli/transcribe_sources.py | 173 ++++++++++++++++ aai_cli/transcribe_validate.py | 106 ++++++++++ tests/test_dub_exec.py | 21 +- tests/test_transcribe_batch_sources.py | 12 +- tests/test_transcribe_out.py | 4 +- 10 files changed, 601 insertions(+), 526 deletions(-) create mode 100644 aai_cli/commands/dub/_pipeline.py create mode 100644 aai_cli/transcribe_sources.py create mode 100644 aai_cli/transcribe_validate.py diff --git a/.importlinter b/.importlinter index 075cae94..7cc2a3f0 100644 --- a/.importlinter +++ b/.importlinter @@ -53,6 +53,8 @@ source_modules = aai_cli.transcribe_batch aai_cli.transcribe_exec aai_cli.transcribe_render + aai_cli.transcribe_sources + aai_cli.transcribe_validate aai_cli.tts aai_cli.typer_patches aai_cli.update_check diff --git a/aai_cli/commands/dub/_exec.py b/aai_cli/commands/dub/_exec.py index 617c031b..eb3f09a8 100644 --- a/aai_cli/commands/dub/_exec.py +++ b/aai_cli/commands/dub/_exec.py @@ -25,12 +25,11 @@ from rich.markup import escape -from aai_cli import jsonshape, mediafile, output, youtube -from aai_cli import llm as gateway +from aai_cli import mediafile, output, youtube +from aai_cli.commands.dub import _pipeline as pipeline from aai_cli.context import AppState -from aai_cli.errors import APIError, CLIError, UsageError -from aai_cli.tts import audio, dialogue, session, voices -from aai_cli.tts.session import SpeakConfig +from aai_cli.errors import UsageError +from aai_cli.tts import audio, dialogue, session # ISO-639-1 codes accepted by --lang, mapped to the language *name* both the # translation prompt and the streaming-TTS `language` param expect. A value not @@ -55,16 +54,6 @@ "zh": "Chinese", } -# System prompt for the per-utterance translation calls. Length matters: the dub -# replaces speech that occupied a fixed window, so the model is told to keep the -# spoken length close to the original. -TRANSLATION_SYSTEM_TEMPLATE = ( - "You translate dialogue for dubbing. Translate the user's text to {language}. " - "Keep the meaning and register, and stay close to the original spoken length so " - "the dub fits the original timing. Reply with only the translated text — no " - "quotes, notes, or extra commentary." -) - @dataclass(frozen=True) class DubOptions: @@ -108,236 +97,12 @@ def default_out_path(media: Path, language: str) -> Path: return media.parent / f"{media.stem}.dub.{slug}{media.suffix}" -def assemble_timeline( - placed: list[tuple[int, bytes]], - sample_rate: int, - total_seconds: float | None, -) -> bytearray: - """Lay each ``(start_ms, pcm)`` segment onto a silence timeline. - - Gaps before a segment's start are filled with silence; a segment whose - predecessor overran its start time is appended immediately (the dub drifts - rather than dropping speech). The tail is padded out to ``total_seconds`` - (the source duration) so the dubbed track never ends early. - """ - pcm = bytearray() - for start_ms, segment in placed: - gap = start_ms / 1000 - _pcm_seconds(pcm, sample_rate) - if gap > 0: - pcm.extend(audio.silence(sample_rate, gap)) - pcm.extend(segment) - if total_seconds is not None: - tail = total_seconds - _pcm_seconds(pcm, sample_rate) - if tail > 0: - pcm.extend(audio.silence(sample_rate, tail)) - return pcm - - -def _pcm_seconds(pcm: bytes | bytearray, sample_rate: int) -> float: - """Seconds of audio in 16-bit mono PCM: two bytes per sample.""" - return len(pcm) / 2 / sample_rate - - -def _mux(ffmpeg: str, media: Path, track: Path, out: Path) -> None: - """Swap ``track`` in as the audio of ``media``, writing ``out``. - - ``-map 0:v?`` carries the video stream over untouched (``-c:v copy``) when - there is one, and maps nothing for audio-only input, so the same invocation - dubs both a video and a plain audio file. ``-y`` makes a re-run overwrite - its own earlier output instead of stalling on ffmpeg's prompt. - """ - result = mediafile.run_ffmpeg( - [ - ffmpeg, - "-hide_banner", - "-loglevel", - "error", - "-y", - "-i", - str(media), - "-i", - str(track), - "-map", - "0:v?", - "-map", - "1:a", - "-c:v", - "copy", - mediafile.path_arg(out), - ] - ) - if result.returncode != 0: - raise mediafile.ffmpeg_failure(result, "write", out, error_type="dub_failed") - - -@dataclass(frozen=True) -class _Utterance: - """One diarized utterance reduced to the fields the dub pipeline needs.""" - - start_ms: int - speaker: str - text: str - - -def _utterances_of(transcript: object, transcript_id: str) -> list[_Utterance]: - """The transcript's spoken utterances, with empty-text ones dropped.""" - utterances = [ - _Utterance( - start_ms=jsonshape.as_int(getattr(item, "start", 0)), - speaker=str(getattr(item, "speaker", None) or "A"), - text=str(getattr(item, "text", "") or "").strip(), - ) - for item in jsonshape.object_list(getattr(transcript, "utterances", None)) - ] - spoken = [utterance for utterance in utterances if utterance.text] - if not spoken: - raise CLIError( - f"Transcript {transcript_id} has no utterances to dub.", - error_type="no_utterances", - exit_code=2, - suggestion=( - "Dubbing needs a diarized transcript. Pass a --transcript-id created " - "with --speaker-labels, or drop -t to let dub transcribe the file." - ), - ) - return spoken - - -def _total_seconds(transcript: object) -> float | None: - """The source duration in seconds (used to pad the dubbed track's tail).""" - duration = getattr(transcript, "audio_duration", None) - if isinstance(duration, int | float) and not isinstance(duration, bool): - return float(duration) - return None - - -def _translate( - api_key: str, - utterances: list[_Utterance], - language: str, - opts: DubOptions, - *, - json_mode: bool, - quiet: bool, -) -> list[str]: - """Translate each utterance to ``language`` with the LLM Gateway, in order. - - One call per utterance keeps the translation↔timestamp alignment exact — - no reply-parsing step that could shift a line against its window. - """ - system = TRANSLATION_SYSTEM_TEMPLATE.format(language=language) - translating = f"Translating {len(utterances)} utterance(s) to {language} with {opts.model}…" - translations: list[str] = [] - with output.status(translating, json_mode=json_mode, quiet=quiet): - for index, utterance in enumerate(utterances, 1): - messages = gateway.build_messages(utterance.text, system=system) - response = gateway.complete( - api_key, model=opts.model, messages=messages, max_tokens=opts.max_tokens - ) - translated = gateway.content_of(response).strip() - # "length" is OpenAI's truncation marker; the gateway's Anthropic-flavored - # responses use "max_tokens". A clipped translation must never be dubbed. - if getattr(response.choices[0], "finish_reason", None) in {"length", "max_tokens"}: - raise APIError( - f"The translation of utterance {index} was cut off at --max-tokens " - f"({opts.max_tokens}).", - suggestion="Re-run with a higher --max-tokens.", - ) - if not translated: - raise APIError( - f"The model returned an empty translation for utterance {index} " - f"({utterance.text[:50]!r})." - ) - translations.append(translated) - return translations - - -def _synthesize( - api_key: str, - segments: list[tuple[str, str]], - language: str, - *, - json_mode: bool, - quiet: bool, -) -> tuple[list[bytes], int]: - """Synthesize each ``(voice, text)`` segment; returns the PCM list + sample rate. - - Every segment must come back at one rate — the timeline math places segments - by sample position, so a mid-run rate change would silently shift timing. - """ - synthesizing = f"Synthesizing {len(segments)} segment(s)…" - with output.status(synthesizing, json_mode=json_mode, quiet=quiet): - results = [ - session.synthesize( - api_key, - SpeakConfig(text=text, voice=voice, language=language), - on_warning=lambda m: output.emit_warning(m, json_mode=json_mode), - ) - for voice, text in segments - ] - rates = {result.sample_rate for result in results} - if len(rates) > 1: - raise APIError(f"TTS service returned mixed sample rates ({sorted(rates)}).") - # `segments` is never empty (_utterances_of raised otherwise), so results[0] exists. - return [result.pcm for result in results], results[0].sample_rate - - -def _warn_ignored_voice_pins( - overrides: dict[str, str], speakers: dict[str, str], *, json_mode: bool -) -> None: - """Mirror `assembly speak`: a requested --voice mapping is never dropped - silently, so a pin for a speaker the diarization didn't produce is called out.""" - present = {speaker.casefold() for speaker in speakers} - ignored = [speaker for speaker in overrides if speaker not in present] - if ignored: - output.emit_warning( - "Ignoring --voice mapping(s) for speaker(s) not in the transcript: " - f"{', '.join(ignored)}.", - json_mode=json_mode, - ) - - -@dataclass(frozen=True) -class _VoicePlan: - """The parsed --voice flags: the bare voice (if any) plus SPEAKER=VOICE pins. - - Parsed in run_dub — before the billed pipeline, so a malformed mapping - fails fast — and carried as one value through _dub_and_emit.""" - - bare: str | None - overrides: dict[str, str] - - -def _assign_voices( - utterances: list[_Utterance], - translations: list[str], - plan: _VoicePlan, - language: str, -) -> tuple[list[tuple[str, str]], dict[str, str]]: - """Resolve each translated utterance to ``(voice, text)`` plus the speaker→voice map. - - A bare ``--voice`` dubs every speaker with that one voice; ``SPEAKER=VOICE`` - mappings pin individual speakers; everyone else takes the target language's - rotation in first-appearance order (the same rules as `assembly speak`) — - each voice speaks one language, so a non-English dub switches to that - language's native voice(s). - """ - rotation = (plan.bare,) if plan.bare is not None else voices.rotation_for(language) - segments = [ - dialogue.Segment(utterance.speaker, translated) - # strict=True is an invariant guard only: _translate returns exactly one - # translation per utterance, so the lengths can never differ. - for utterance, translated in zip(utterances, translations, strict=True) # pragma: no mutate - ] - return dialogue.assign_voices(segments, rotation, plan.overrides) - - def run_dub(opts: DubOptions, state: AppState, *, json_mode: bool) -> None: """Execute one `assembly dub` invocation from already-parsed flags.""" language = resolve_language(opts.language) session.require_available("dub") # Parse --voice now: a malformed mapping must fail before the billed pipeline. - voice_plan = _VoicePlan(*dialogue.parse_voice_overrides(opts.voice)) + voice_plan = pipeline.VoicePlan(*dialogue.parse_voice_overrides(opts.voice)) youtube.validate_video_flag(opts.media, video=opts.video) youtube.validate_sections_flag(opts.media, opts.download_sections) if youtube.is_downloadable_url(opts.media): @@ -394,7 +159,7 @@ def _dub_and_emit( out: Path, language: str, ffmpeg: str, - voice_plan: _VoicePlan, + voice_plan: pipeline.VoicePlan, state: AppState, *, json_mode: bool, @@ -414,30 +179,29 @@ def _dub_and_emit( detect_language=opts.source_language is None, ) transcript_id = str(getattr(transcript, "id", "")) - utterances = _utterances_of(transcript, transcript_id) - translations = _translate( + utterances = pipeline.utterances_of(transcript, transcript_id) + translations = pipeline.translate( api_key, utterances, language, opts, json_mode=json_mode, quiet=state.quiet ) - resolved, speakers = _assign_voices(utterances, translations, voice_plan, language) - _warn_ignored_voice_pins(voice_plan.overrides, speakers, json_mode=json_mode) - pcm_segments, sample_rate = _synthesize( + resolved, speakers = pipeline.assign_voices(utterances, translations, voice_plan, language) + pipeline.warn_ignored_voice_pins(voice_plan.overrides, speakers, json_mode=json_mode) + pcm_segments, sample_rate = pipeline.synthesize( api_key, resolved, language, json_mode=json_mode, quiet=state.quiet ) - # strict=True is an invariant guard only: _synthesize returns one PCM per segment. + # strict=True is an invariant guard only: synthesize returns one PCM per segment. placed = [ (utterance.start_ms, pcm) for utterance, pcm in zip(utterances, pcm_segments, strict=True) # pragma: no mutate ] - track = assemble_timeline(placed, sample_rate, _total_seconds(transcript)) + track = pipeline.assemble_timeline(placed, sample_rate, pipeline.total_seconds(transcript)) with tempfile.TemporaryDirectory(prefix="aai-dub-") as tmp: wav = Path(tmp) / "dub.wav" audio.write_wav(wav, track, sample_rate) with output.status("Writing the dubbed file…", json_mode=json_mode, quiet=state.quiet): - _mux(ffmpeg, media, wav, out) + pipeline.mux(ffmpeg, media, wav, out) - duration = round(_pcm_seconds(track, sample_rate), 3) - # Not named `voices`: that would shadow the tts.voices module imported above. + duration = round(pipeline.pcm_seconds(track, sample_rate), 3) voices_text = ", ".join(f"{speaker}={voice}" for speaker, voice in speakers.items()) payload: dict[str, object] = { "source": opts.media, diff --git a/aai_cli/commands/dub/_pipeline.py b/aai_cli/commands/dub/_pipeline.py new file mode 100644 index 00000000..2fa10ba8 --- /dev/null +++ b/aai_cli/commands/dub/_pipeline.py @@ -0,0 +1,258 @@ +"""The `assembly dub` audio pipeline: translate → synthesize → timeline → mux. + +The orchestration (argv resolution, source download, result reporting) lives in +``_exec``; the per-utterance transforms that turn a diarized transcript into a +dubbed audio track are gathered here so each stage stays unit-testable on its own +(see tests/test_dub_exec.py for the pure helpers, tests/test_dub_pipeline.py for +the faked end-to-end runs). ``_exec`` imports this module as ``pipeline`` and the +names below are its public surface; ``_pcm_seconds``-style internals stay private. +""" + +from __future__ import annotations + +from dataclasses import dataclass +from pathlib import Path +from typing import TYPE_CHECKING + +from aai_cli import jsonshape, mediafile, output +from aai_cli import llm as gateway +from aai_cli.errors import APIError, CLIError +from aai_cli.tts import audio, dialogue, session, voices +from aai_cli.tts.session import SpeakConfig + +if TYPE_CHECKING: + from aai_cli.commands.dub._exec import DubOptions + +# System prompt for the per-utterance translation calls. Length matters: the dub +# replaces speech that occupied a fixed window, so the model is told to keep the +# spoken length close to the original. +TRANSLATION_SYSTEM_TEMPLATE = ( + "You translate dialogue for dubbing. Translate the user's text to {language}. " + "Keep the meaning and register, and stay close to the original spoken length so " + "the dub fits the original timing. Reply with only the translated text — no " + "quotes, notes, or extra commentary." +) + + +def assemble_timeline( + placed: list[tuple[int, bytes]], + sample_rate: int, + total_seconds: float | None, +) -> bytearray: + """Lay each ``(start_ms, pcm)`` segment onto a silence timeline. + + Gaps before a segment's start are filled with silence; a segment whose + predecessor overran its start time is appended immediately (the dub drifts + rather than dropping speech). The tail is padded out to ``total_seconds`` + (the source duration) so the dubbed track never ends early. + """ + pcm = bytearray() + for start_ms, segment in placed: + gap = start_ms / 1000 - pcm_seconds(pcm, sample_rate) + if gap > 0: + pcm.extend(audio.silence(sample_rate, gap)) + pcm.extend(segment) + if total_seconds is not None: + tail = total_seconds - pcm_seconds(pcm, sample_rate) + if tail > 0: + pcm.extend(audio.silence(sample_rate, tail)) + return pcm + + +def pcm_seconds(pcm: bytes | bytearray, sample_rate: int) -> float: + """Seconds of audio in 16-bit mono PCM: two bytes per sample.""" + return len(pcm) / 2 / sample_rate + + +def mux(ffmpeg: str, media: Path, track: Path, out: Path) -> None: + """Swap ``track`` in as the audio of ``media``, writing ``out``. + + ``-map 0:v?`` carries the video stream over untouched (``-c:v copy``) when + there is one, and maps nothing for audio-only input, so the same invocation + dubs both a video and a plain audio file. ``-y`` makes a re-run overwrite + its own earlier output instead of stalling on ffmpeg's prompt. + """ + result = mediafile.run_ffmpeg( + [ + ffmpeg, + "-hide_banner", + "-loglevel", + "error", + "-y", + "-i", + str(media), + "-i", + str(track), + "-map", + "0:v?", + "-map", + "1:a", + "-c:v", + "copy", + mediafile.path_arg(out), + ] + ) + if result.returncode != 0: + raise mediafile.ffmpeg_failure(result, "write", out, error_type="dub_failed") + + +@dataclass(frozen=True) +class Utterance: + """One diarized utterance reduced to the fields the dub pipeline needs.""" + + start_ms: int + speaker: str + text: str + + +def utterances_of(transcript: object, transcript_id: str) -> list[Utterance]: + """The transcript's spoken utterances, with empty-text ones dropped.""" + utterances = [ + Utterance( + start_ms=jsonshape.as_int(getattr(item, "start", 0)), + speaker=str(getattr(item, "speaker", None) or "A"), + text=str(getattr(item, "text", "") or "").strip(), + ) + for item in jsonshape.object_list(getattr(transcript, "utterances", None)) + ] + spoken = [utterance for utterance in utterances if utterance.text] + if not spoken: + raise CLIError( + f"Transcript {transcript_id} has no utterances to dub.", + error_type="no_utterances", + exit_code=2, + suggestion=( + "Dubbing needs a diarized transcript. Pass a --transcript-id created " + "with --speaker-labels, or drop -t to let dub transcribe the file." + ), + ) + return spoken + + +def total_seconds(transcript: object) -> float | None: + """The source duration in seconds (used to pad the dubbed track's tail).""" + duration = getattr(transcript, "audio_duration", None) + if isinstance(duration, int | float) and not isinstance(duration, bool): + return float(duration) + return None + + +def translate( + api_key: str, + utterances: list[Utterance], + language: str, + opts: DubOptions, + *, + json_mode: bool, + quiet: bool, +) -> list[str]: + """Translate each utterance to ``language`` with the LLM Gateway, in order. + + One call per utterance keeps the translation↔timestamp alignment exact — + no reply-parsing step that could shift a line against its window. + """ + system = TRANSLATION_SYSTEM_TEMPLATE.format(language=language) + translating = f"Translating {len(utterances)} utterance(s) to {language} with {opts.model}…" + translations: list[str] = [] + with output.status(translating, json_mode=json_mode, quiet=quiet): + for index, utterance in enumerate(utterances, 1): + messages = gateway.build_messages(utterance.text, system=system) + response = gateway.complete( + api_key, model=opts.model, messages=messages, max_tokens=opts.max_tokens + ) + translated = gateway.content_of(response).strip() + # "length" is OpenAI's truncation marker; the gateway's Anthropic-flavored + # responses use "max_tokens". A clipped translation must never be dubbed. + if getattr(response.choices[0], "finish_reason", None) in {"length", "max_tokens"}: + raise APIError( + f"The translation of utterance {index} was cut off at --max-tokens " + f"({opts.max_tokens}).", + suggestion="Re-run with a higher --max-tokens.", + ) + if not translated: + raise APIError( + f"The model returned an empty translation for utterance {index} " + f"({utterance.text[:50]!r})." + ) + translations.append(translated) + return translations + + +def synthesize( + api_key: str, + segments: list[tuple[str, str]], + language: str, + *, + json_mode: bool, + quiet: bool, +) -> tuple[list[bytes], int]: + """Synthesize each ``(voice, text)`` segment; returns the PCM list + sample rate. + + Every segment must come back at one rate — the timeline math places segments + by sample position, so a mid-run rate change would silently shift timing. + """ + synthesizing = f"Synthesizing {len(segments)} segment(s)…" + with output.status(synthesizing, json_mode=json_mode, quiet=quiet): + results = [ + session.synthesize( + api_key, + SpeakConfig(text=text, voice=voice, language=language), + on_warning=lambda m: output.emit_warning(m, json_mode=json_mode), + ) + for voice, text in segments + ] + rates = {result.sample_rate for result in results} + if len(rates) > 1: + raise APIError(f"TTS service returned mixed sample rates ({sorted(rates)}).") + # `segments` is never empty (utterances_of raised otherwise), so results[0] exists. + return [result.pcm for result in results], results[0].sample_rate + + +def warn_ignored_voice_pins( + overrides: dict[str, str], speakers: dict[str, str], *, json_mode: bool +) -> None: + """Mirror `assembly speak`: a requested --voice mapping is never dropped + silently, so a pin for a speaker the diarization didn't produce is called out.""" + present = {speaker.casefold() for speaker in speakers} + ignored = [speaker for speaker in overrides if speaker not in present] + if ignored: + output.emit_warning( + "Ignoring --voice mapping(s) for speaker(s) not in the transcript: " + f"{', '.join(ignored)}.", + json_mode=json_mode, + ) + + +@dataclass(frozen=True) +class VoicePlan: + """The parsed --voice flags: the bare voice (if any) plus SPEAKER=VOICE pins. + + Parsed in run_dub — before the billed pipeline, so a malformed mapping + fails fast — and carried as one value through _dub_and_emit.""" + + bare: str | None + overrides: dict[str, str] + + +def assign_voices( + utterances: list[Utterance], + translations: list[str], + plan: VoicePlan, + language: str, +) -> tuple[list[tuple[str, str]], dict[str, str]]: + """Resolve each translated utterance to ``(voice, text)`` plus the speaker→voice map. + + A bare ``--voice`` dubs every speaker with that one voice; ``SPEAKER=VOICE`` + mappings pin individual speakers; everyone else takes the target language's + rotation in first-appearance order (the same rules as `assembly speak`) — + each voice speaks one language, so a non-English dub switches to that + language's native voice(s). + """ + rotation = (plan.bare,) if plan.bare is not None else voices.rotation_for(language) + segments = [ + dialogue.Segment(utterance.speaker, translated) + # strict=True is an invariant guard only: translate returns exactly one + # translation per utterance, so the lengths can never differ. + for utterance, translated in zip(utterances, translations, strict=True) # pragma: no mutate + ] + return dialogue.assign_voices(segments, rotation, plan.overrides) diff --git a/aai_cli/transcribe_batch.py b/aai_cli/transcribe_batch.py index 849eb642..1a00fa23 100644 --- a/aai_cli/transcribe_batch.py +++ b/aai_cli/transcribe_batch.py @@ -3,12 +3,13 @@ ``assembly transcribe`` switches to batch mode when the source is a directory or a glob pattern — local, or on fsspec-addressable remote storage (an ``s3://…/*.mp3`` glob, or a trailing-slash folder like ``s3://bucket/calls/``) — or when -``--from-stdin`` supplies one path/URL per line. Sources run -concurrently behind a live progress table; each finished source gets a -``.aai.json`` sidecar holding the full transcript. The sidecar doubles as -the resume marker — a re-run skips any source whose sidecar records a completed -transcription of the same bytes — so retrying a partly-failed batch only pays for -what's missing (``--force`` re-transcribes everything). +``--from-stdin`` supplies one path/URL per line (the source-list expansion itself +lives in ``transcribe_sources``). Sources run concurrently behind a live progress +table; each finished source gets a ``.aai.json`` sidecar holding the full +transcript. The sidecar doubles as the resume marker — a re-run skips any source +whose sidecar records a completed transcription of the same bytes — so retrying a +partly-failed batch only pays for what's missing (``--force`` re-transcribes +everything). ``--llm`` prompts run per source once its transcription is recorded, landing under the sidecar's ``transform`` key. The chain is resumable on its own: a re-run with @@ -31,164 +32,19 @@ from rich.live import Live from rich.markup import escape -from aai_cli import client, jsonshape, llm, output, remotefs, stdio, theme, transcribe_exec -from aai_cli.errors import CLIError, NotAuthenticated, UsageError, mutually_exclusive +from aai_cli import client, jsonshape, llm, output, remotefs, theme, transcribe_exec +from aai_cli.errors import CLIError, NotAuthenticated +from aai_cli.transcribe_sources import SIDECAR_SUFFIX, URL_PREFIXES if TYPE_CHECKING: import assemblyai as aai from rich.table import Table -SIDECAR_SUFFIX = ".aai.json" - -# What a directory scan picks up (an explicit glob or stdin list is taken as-is). -AUDIO_EXTENSIONS = frozenset( - { - ".3gp", - ".aac", - ".aif", - ".aiff", - ".amr", - ".flac", - ".m4a", - ".m4b", - ".mka", - ".mkv", - ".mov", - ".mp2", - ".mp3", - ".mp4", - ".mpga", - ".oga", - ".ogg", - ".opus", - ".wav", - ".webm", - ".wma", - } -) - -_URL_PREFIXES = ("http://", "https://") -_GLOB_CHARS = frozenset("*?[") - - -def expand_sources(source: str | None, *, from_stdin: bool, sample: bool) -> list[str] | None: - """The batch source list, or ``None`` when this is a single-source invocation. - - Batch mode triggers on ``--from-stdin``, a directory (scanned recursively for - audio files), a glob pattern that names no existing file, or a bucket URL - that is a glob or trailing-slash folder. A plain file, URL, ``-`` (audio - piped on stdin), or ``--sample`` stays on the single-source path. - """ - if from_stdin: - return _stdin_sources(source, sample=sample) - # `not source` (rather than `is None`) also catches the empty string — e.g. an - # unset shell variable in `assembly transcribe "$FILE"`. `Path("")` is `Path(".")`, - # so it would otherwise fall into the directory branch and batch-transcribe the - # whole working directory; instead it stays single-source and fails validation. - if not source or sample or source == "-" or source.startswith(_URL_PREFIXES): - return None - if remotefs.is_remote_url(source): - return _remote_sources(source) - path = Path(source) - if path.is_dir(): - return _directory_sources(path) - if not path.exists() and _GLOB_CHARS.intersection(source): - return _glob_sources(source) - return None - - -def _stdin_sources(source: str | None, *, sample: bool) -> list[str]: - if source is not None or sample: - raise UsageError( - "--from-stdin reads sources from stdin; don't also pass a source or --sample." - ) - lines = list(dict.fromkeys(stdio.iter_piped_stdin_lines())) # dedupe, keep order - if not lines: - raise UsageError( - "No sources received on stdin.", - suggestion="Pipe one path or URL per line, e.g. " - "find . -name '*.mp3' | assembly transcribe --from-stdin.", - ) - return lines - - -def _directory_sources(path: Path) -> list[str]: - files = sorted( - str(p) for p in path.rglob("*") if p.is_file() and p.suffix.lower() in AUDIO_EXTENSIONS - ) - if not files: - raise UsageError( - f"No audio files found under {path}.", - suggestion="Recognized extensions: " + ", ".join(sorted(AUDIO_EXTENSIONS)) + ".", - ) - return files - - -def _remote_sources(url: str) -> list[str] | None: - """Batch sources for a bucket/remote URL, or ``None`` when it's a single file. - - Mirrors the local rules: a glob expands to its file matches (sidecars - excluded), a trailing-slash folder to its audio files (recursive, filtered by - ``AUDIO_EXTENSIONS``); anything else is downloaded as one file. - """ - if _GLOB_CHARS.intersection(url): - matches = [u for u in remotefs.glob_files(url) if not u.endswith(SIDECAR_SUFFIX)] - if not matches: - raise UsageError(f"No files match {url}.") - return matches - if url.endswith("/"): - files = [u for u in remotefs.list_files(url) if Path(u).suffix.lower() in AUDIO_EXTENSIONS] - if not files: - raise UsageError( - f"No audio files found under {url}.", - suggestion="Recognized extensions: " + ", ".join(sorted(AUDIO_EXTENSIONS)) + ".", - ) - return files - return None - - -def _glob_sources(pattern: str) -> list[str]: - # pathlib globs are always relative, so peel an absolute pattern's anchor off - # and glob from there ("" anchors at the working directory; Path("") is "."). - anchor = Path(pattern).anchor - matches = sorted( - str(p) - for p in Path(anchor).glob(pattern.removeprefix(anchor)) - if p.is_file() and not str(p).endswith(SIDECAR_SUFFIX) - ) - if not matches: - raise UsageError(f"No files match {pattern}.") - return matches - - -def reject_single_source_flags( - *, - out: Path | None, - output_field: object | None, - show_code: bool, -) -> None: - """Batch mode writes one sidecar per source; the single-result flags don't apply. - - ``--llm`` is deliberately not here: in batch mode the chain runs per source and - its steps land in each sidecar. - """ - mutually_exclusive( - ("--show-code", show_code), - ("multiple sources", True), - suggestion="Pass one file or URL with --show-code.", - ) - mutually_exclusive( - ("--out", out), - ("-o/--output", output_field), - ("multiple sources", True), - suggestion=f"Each source gets a '{SIDECAR_SUFFIX}' sidecar with the full result.", - ) - def sidecar_path(source: str) -> Path: """Where ``source``'s sidecar lives: ``.aai.json`` next to a local file, or a slug + URL-hash name in the working directory for a URL (web or bucket).""" - if source.startswith(_URL_PREFIXES) or remotefs.is_remote_url(source): + if source.startswith(URL_PREFIXES) or remotefs.is_remote_url(source): digest = hashlib.sha256(source.encode()).hexdigest()[:8] slug = re.sub(r"[^A-Za-z0-9._-]+", "-", source.partition("://")[2]).strip("-.")[:64] return Path(f"{slug}-{digest}{SIDECAR_SUFFIX}") @@ -197,7 +53,7 @@ def sidecar_path(source: str) -> Path: def _source_digest(source: str) -> str | None: """SHA-256 of a local file's bytes; ``None`` for URLs (and paths that aren't files).""" - if source.startswith(_URL_PREFIXES) or not Path(source).is_file(): + if source.startswith(URL_PREFIXES) or not Path(source).is_file(): return None with Path(source).open("rb") as f: return hashlib.file_digest(f, "sha256").hexdigest() diff --git a/aai_cli/transcribe_exec.py b/aai_cli/transcribe_exec.py index 673ab1a7..cb5ed704 100644 --- a/aai_cli/transcribe_exec.py +++ b/aai_cli/transcribe_exec.py @@ -8,7 +8,6 @@ from __future__ import annotations import json -import os import tempfile from dataclasses import dataclass from pathlib import Path @@ -27,101 +26,13 @@ remotefs, stdio, transcribe_render, + transcribe_sources, + transcribe_validate, youtube, ) from aai_cli.code_gen.transcribe import render as render_transcribe_code from aai_cli.context import AppState -from aai_cli.errors import UsageError, mutually_exclusive - -# The PII policy strings the SDK accepts, validated client-side so a typo'd -# --redact-pii-policy fails before any upload — mirroring how an unknown --config -# key is rejected with the valid field list. -PII_POLICY_VALUES = frozenset(policy.value for policy in aai.PIIRedactionPolicy) - - -def validate_pii_policies(policies: list[str] | None) -> None: - unknown = [p for p in policies or [] if p not in PII_POLICY_VALUES] - if unknown: - valid = ", ".join(sorted(PII_POLICY_VALUES)) - raise UsageError(f"Unknown PII policy(s) {unknown}. Valid policies: {valid}.") - - -def validate_language_flags(language_code: str | None, *, language_detection: bool | None) -> None: - mutually_exclusive( - ("--language-code", language_code), - ("--language-detection", language_detection), - suggestion="Force a language or auto-detect it, not both.", - ) - - -def validate_speakers_expected(merged: dict[str, object]) -> None: - # Checked on the merged dict so `--config speaker_labels=true` also counts. - if merged.get("speakers_expected") and not merged.get("speaker_labels"): - raise UsageError( - "--speakers-expected only applies when diarization is enabled.", - suggestion="Add --speaker-labels.", - ) - - -def validate_out_with_llm(out: Path | None, llm_prompts: list[str] | None) -> None: - # --out captures the transcript itself; an LLM transform is a separate step. - mutually_exclusive( - ("--out", out), - ("--llm", llm_prompts), - suggestion='Pipe the transform instead, e.g. -o text | assembly llm -f "…".', - ) - - -def validate_out_path(out: Path | None) -> None: - """Reject an unusable ``--out`` up front, before the (billed, possibly long) - transcription runs — not after it finishes.""" - if out is None: - return - if ".." in out.parts: # reject path-traversal segments in --out - raise UsageError(f"--out path can't contain '..': {out}") - parent = out.parent - if not parent.is_dir(): - raise UsageError( - f"--out directory doesn't exist: {parent}", - suggestion="Create it first, or point --out at an existing directory.", - ) - if not os.access(parent, os.W_OK): - raise UsageError(f"--out directory isn't writable: {parent}") - - -def validate_json_with_output( - output_field: choices.TranscriptOutput | None, *, json_mode: bool -) -> None: - """``--json`` promises the full JSON payload (same as ``-o json``); any other - ``-o`` field contradicts it rather than silently winning.""" - if output_field is None or output_field is choices.TranscriptOutput.json: - return - mutually_exclusive( - ("--json", json_mode), - (f"-o {output_field.value}", output_field), - suggestion="Drop --json, or use -o json for the full JSON payload.", - ) - - -def warn_unrecognized_extension(source: str | None, *, json_mode: bool, quiet: bool) -> None: - """Warn when a single local source doesn't carry a known audio extension. - - Directory batch mode filters by ``AUDIO_EXTENSIONS``; single-file mode uploads - anything, so a likely-non-audio file (e.g. ``.txt``) gets a stderr heads-up — - never an error, since the server is the truth about what it can transcribe. - """ - from aai_cli.transcribe_batch import AUDIO_EXTENSIONS # avoid a module-load cycle - - if quiet or not source or source.startswith(("http://", "https://")): - return - suffix = Path(source).suffix.lower() - if not suffix or suffix in AUDIO_EXTENSIONS: - return - output.emit_warning( - f"'{source}' has extension '{suffix}', which doesn't look like audio; " - "the API decides what it can transcribe.", - json_mode=json_mode, - ) +from aai_cli.errors import UsageError def render_transform_steps(d: dict[str, Any]) -> str: @@ -402,26 +313,28 @@ def run_transcribe(opts: TranscribeOptions, state: AppState, *, json_mode: bool) # Module-load order: transcribe_batch imports this module, so import it lazily. from aai_cli import transcribe_batch - validate_language_flags(opts.language_code, language_detection=opts.language_detection) + transcribe_validate.validate_language_flags( + opts.language_code, language_detection=opts.language_detection + ) pii_policies = config_builder.split_csv(opts.redact_pii_policy) - validate_pii_policies(pii_policies) + transcribe_validate.validate_pii_policies(pii_policies) flags = opts.flags(pii_policies) - validate_out_with_llm(opts.out, opts.llm_prompt) - validate_out_path(opts.out) - validate_json_with_output(opts.output_field, json_mode=json_mode) + transcribe_validate.validate_out_with_llm(opts.out, opts.llm_prompt) + transcribe_validate.validate_out_path(opts.out) + transcribe_validate.validate_json_with_output(opts.output_field, json_mode=json_mode) client.validate_chars_per_caption(opts.chars_per_caption, opts.output_field) merged = config_builder.merge_transcribe_config( flags=flags, overrides=opts.config_kv, config_file=opts.config_file ) - validate_speakers_expected(merged) + transcribe_validate.validate_speakers_expected(merged) - sources = transcribe_batch.expand_sources( + sources = transcribe_sources.expand_sources( opts.source, from_stdin=opts.from_stdin, sample=opts.sample ) if sources is not None: - transcribe_batch.reject_single_source_flags( + transcribe_sources.reject_single_source_flags( out=opts.out, output_field=opts.output_field, show_code=opts.show_code, @@ -448,7 +361,9 @@ def run_transcribe(opts: TranscribeOptions, state: AppState, *, json_mode: bool) # A typo'd path must read as "file not found", not trigger a login. check_source_exists(opts.source, sample=opts.sample) - warn_unrecognized_extension(opts.source, json_mode=json_mode, quiet=state.quiet) + transcribe_validate.warn_unrecognized_extension( + opts.source, json_mode=json_mode, quiet=state.quiet + ) api_key = state.resolve_api_key() with output.status("Transcribing…", json_mode=json_mode, quiet=state.quiet): diff --git a/aai_cli/transcribe_sources.py b/aai_cli/transcribe_sources.py new file mode 100644 index 00000000..d8d582f4 --- /dev/null +++ b/aai_cli/transcribe_sources.py @@ -0,0 +1,173 @@ +"""Batch-mode source selection for ``assembly transcribe``. + +Splitting a transcribe invocation into its source list — a directory scan, a +glob, a ``--from-stdin`` list, or a bucket URL that is itself a glob/folder — is +a self-contained concern with no dependency on the batch *run* (sidecar resume, +concurrency, output), so it lives here. ``transcribe_batch`` imports the +constants and ``expand_sources``/``reject_single_source_flags`` it needs; the run +machinery stays there. ``transcribe_exec`` calls these directly to decide between +the single-source and batch paths. +""" + +from __future__ import annotations + +from pathlib import Path + +from aai_cli import remotefs, stdio +from aai_cli.errors import UsageError, mutually_exclusive + +SIDECAR_SUFFIX = ".aai.json" + +# What a directory scan picks up (an explicit glob or stdin list is taken as-is). +AUDIO_EXTENSIONS = frozenset( + { + ".3gp", + ".aac", + ".aif", + ".aiff", + ".amr", + ".flac", + ".m4a", + ".m4b", + ".mka", + ".mkv", + ".mov", + ".mp2", + ".mp3", + ".mp4", + ".mpga", + ".oga", + ".ogg", + ".opus", + ".wav", + ".webm", + ".wma", + } +) + +URL_PREFIXES = ("http://", "https://") +_GLOB_CHARS = frozenset("*?[") + + +def expand_sources(source: str | None, *, from_stdin: bool, sample: bool) -> list[str] | None: + """The batch source list, or ``None`` when this is a single-source invocation. + + Batch mode triggers on ``--from-stdin``, a directory (scanned recursively for + audio files), a glob pattern that names no existing file, or a bucket URL + that is a glob or trailing-slash folder. A plain file, URL, ``-`` (audio + piped on stdin), or ``--sample`` stays on the single-source path. + """ + if from_stdin: + return _stdin_sources(source, sample=sample) + # `not source` (rather than `is None`) also catches the empty string — e.g. an + # unset shell variable in `assembly transcribe "$FILE"`. `Path("")` is `Path(".")`, + # so it would otherwise fall into the directory branch and batch-transcribe the + # whole working directory; instead it stays single-source and fails validation. + if not source or sample or source == "-" or source.startswith(URL_PREFIXES): + return None + if remotefs.is_remote_url(source): + return _remote_sources(source) + path = Path(source) + if path.is_dir(): + return _directory_sources(path) + if not path.exists() and _GLOB_CHARS.intersection(source): + return _glob_sources(source) + return None + + +def _stdin_sources(source: str | None, *, sample: bool) -> list[str]: + if source is not None or sample: + raise UsageError( + "--from-stdin reads sources from stdin; don't also pass a source or --sample." + ) + lines = list(dict.fromkeys(stdio.iter_piped_stdin_lines())) # dedupe, keep order + if not lines: + raise UsageError( + "No sources received on stdin.", + suggestion="Pipe one path or URL per line, e.g. " + "find . -name '*.mp3' | assembly transcribe --from-stdin.", + ) + return lines + + +def _directory_sources(path: Path) -> list[str]: + files = sorted( + str(p) for p in path.rglob("*") if p.is_file() and p.suffix.lower() in AUDIO_EXTENSIONS + ) + if not files: + raise UsageError( + f"No audio files found under {path}.", + suggestion="Recognized extensions: " + ", ".join(sorted(AUDIO_EXTENSIONS)) + ".", + ) + return files + + +def _remote_sources(url: str) -> list[str] | None: + """Batch sources for a bucket/remote URL, or ``None`` when it's a single file. + + Mirrors the local rules (``_glob_sources``/``_directory_sources``): a glob + expands to its file matches, a trailing-slash folder to its audio files; + anything else is downloaded as one file. + """ + if _GLOB_CHARS.intersection(url): + return _remote_glob_sources(url) + if url.endswith("/"): + return _remote_folder_sources(url) + return None + + +def _remote_glob_sources(url: str) -> list[str]: + """The remote files matching a bucket glob, with sidecars excluded.""" + matches = [u for u in remotefs.glob_files(url) if not u.endswith(SIDECAR_SUFFIX)] + if not matches: + raise UsageError(f"No files match {url}.") + return matches + + +def _remote_folder_sources(url: str) -> list[str]: + """The audio files under a trailing-slash bucket folder (recursive).""" + files = [u for u in remotefs.list_files(url) if Path(u).suffix.lower() in AUDIO_EXTENSIONS] + if not files: + raise UsageError( + f"No audio files found under {url}.", + suggestion="Recognized extensions: " + ", ".join(sorted(AUDIO_EXTENSIONS)) + ".", + ) + return files + + +def _glob_sources(pattern: str) -> list[str]: + # pathlib globs are always relative, so peel an absolute pattern's anchor off + # and glob from there ("" anchors at the working directory; Path("") is "."). + anchor = Path(pattern).anchor + matches = sorted( + str(p) + for p in Path(anchor).glob(pattern.removeprefix(anchor)) + if p.is_file() and not str(p).endswith(SIDECAR_SUFFIX) + ) + if not matches: + raise UsageError(f"No files match {pattern}.") + return matches + + +def reject_single_source_flags( + *, + out: Path | None, + output_field: object | None, + show_code: bool, +) -> None: + """Batch mode writes one sidecar per source; the single-result flags don't apply. + + ``--llm`` is deliberately not here: in batch mode the chain runs per source and + its steps land in each sidecar. + """ + mutually_exclusive( + ("--show-code", show_code), + ("multiple sources", True), + suggestion="Pass one file or URL with --show-code.", + ) + mutually_exclusive( + ("--out", out), + ("-o/--output", output_field), + ("multiple sources", True), + suggestion=f"Each source gets a '{SIDECAR_SUFFIX}' sidecar with the full result.", + ) diff --git a/aai_cli/transcribe_validate.py b/aai_cli/transcribe_validate.py new file mode 100644 index 00000000..7c558392 --- /dev/null +++ b/aai_cli/transcribe_validate.py @@ -0,0 +1,106 @@ +"""Argument validation and warnings for ``assembly transcribe``. + +These checks run before any billed work — a typo'd ``--redact-pii-policy``, a +contradictory flag pair, or an unwritable ``--out`` directory should fail (or +warn) up front rather than after a long upload. They're pure functions of the +parsed flags with no transcription state, so they live apart from the execution +body in ``transcribe_exec`` and the onboarding wizard can reuse the same surface. +""" + +from __future__ import annotations + +import os +from pathlib import Path + +import assemblyai as aai + +from aai_cli import choices, output, transcribe_sources +from aai_cli.errors import UsageError, mutually_exclusive + +# The PII policy strings the SDK accepts, validated client-side so a typo'd +# --redact-pii-policy fails before any upload — mirroring how an unknown --config +# key is rejected with the valid field list. +PII_POLICY_VALUES = frozenset(policy.value for policy in aai.PIIRedactionPolicy) + + +def validate_pii_policies(policies: list[str] | None) -> None: + unknown = [p for p in policies or [] if p not in PII_POLICY_VALUES] + if unknown: + valid = ", ".join(sorted(PII_POLICY_VALUES)) + raise UsageError(f"Unknown PII policy(s) {unknown}. Valid policies: {valid}.") + + +def validate_language_flags(language_code: str | None, *, language_detection: bool | None) -> None: + mutually_exclusive( + ("--language-code", language_code), + ("--language-detection", language_detection), + suggestion="Force a language or auto-detect it, not both.", + ) + + +def validate_speakers_expected(merged: dict[str, object]) -> None: + # Checked on the merged dict so `--config speaker_labels=true` also counts. + if merged.get("speakers_expected") and not merged.get("speaker_labels"): + raise UsageError( + "--speakers-expected only applies when diarization is enabled.", + suggestion="Add --speaker-labels.", + ) + + +def validate_out_with_llm(out: Path | None, llm_prompts: list[str] | None) -> None: + # --out captures the transcript itself; an LLM transform is a separate step. + mutually_exclusive( + ("--out", out), + ("--llm", llm_prompts), + suggestion='Pipe the transform instead, e.g. -o text | assembly llm -f "…".', + ) + + +def validate_out_path(out: Path | None) -> None: + """Reject an unusable ``--out`` up front, before the (billed, possibly long) + transcription runs — not after it finishes.""" + if out is None: + return + if ".." in out.parts: # reject path-traversal segments in --out + raise UsageError(f"--out path can't contain '..': {out}") + parent = out.parent + if not parent.is_dir(): + raise UsageError( + f"--out directory doesn't exist: {parent}", + suggestion="Create it first, or point --out at an existing directory.", + ) + if not os.access(parent, os.W_OK): + raise UsageError(f"--out directory isn't writable: {parent}") + + +def validate_json_with_output( + output_field: choices.TranscriptOutput | None, *, json_mode: bool +) -> None: + """``--json`` promises the full JSON payload (same as ``-o json``); any other + ``-o`` field contradicts it rather than silently winning.""" + if output_field is None or output_field is choices.TranscriptOutput.json: + return + mutually_exclusive( + ("--json", json_mode), + (f"-o {output_field.value}", output_field), + suggestion="Drop --json, or use -o json for the full JSON payload.", + ) + + +def warn_unrecognized_extension(source: str | None, *, json_mode: bool, quiet: bool) -> None: + """Warn when a single local source doesn't carry a known audio extension. + + Directory batch mode filters by ``AUDIO_EXTENSIONS``; single-file mode uploads + anything, so a likely-non-audio file (e.g. ``.txt``) gets a stderr heads-up — + never an error, since the server is the truth about what it can transcribe. + """ + if quiet or not source or source.startswith(("http://", "https://")): + return + suffix = Path(source).suffix.lower() + if not suffix or suffix in transcribe_sources.AUDIO_EXTENSIONS: + return + output.emit_warning( + f"'{source}' has extension '{suffix}', which doesn't look like audio; " + "the API decides what it can transcribe.", + json_mode=json_mode, + ) diff --git a/tests/test_dub_exec.py b/tests/test_dub_exec.py index dff55a1a..abe7b2df 100644 --- a/tests/test_dub_exec.py +++ b/tests/test_dub_exec.py @@ -16,6 +16,7 @@ from aai_cli import mediafile from aai_cli.commands.dub import _exec as dub_exec +from aai_cli.commands.dub import _pipeline as dub_pipeline from aai_cli.context import AppState from aai_cli.errors import CLIError, UsageError from tests._dub_helpers import ( @@ -50,8 +51,8 @@ def _fake_key(monkeypatch: pytest.MonkeyPatch): "instance", [ DEFAULTS, - dub_exec._Utterance(start_ms=0, speaker="A", text="hi"), - dub_exec._VoicePlan(bare=None, overrides={}), + dub_pipeline.Utterance(start_ms=0, speaker="A", text="hi"), + dub_pipeline.VoicePlan(bare=None, overrides={}), ], ids=["options", "utterance", "voice_plan"], ) @@ -127,7 +128,7 @@ def test_default_out_path_rejects_unsluggable_language(): def test_assemble_timeline_fills_gaps_and_pads_tail(): # rate 1000: one second of 16-bit mono PCM is 2000 bytes. - track = dub_exec.assemble_timeline([(500, b"\x01\x02")], 1000, total_seconds=1.0) + track = dub_pipeline.assemble_timeline([(500, b"\x01\x02")], 1000, total_seconds=1.0) # 0.5 s leading silence, the segment, then a 0.499 s tail pad to 1.0 s. assert track == b"\x00" * 1000 + b"\x01\x02" + b"\x00" * 998 @@ -136,12 +137,12 @@ def test_assemble_timeline_overlap_appends_without_silence(): # The first segment runs to 0.1 s; the second "starts" at 0.05 s, so it is # appended immediately (the dub drifts) rather than overlapping or crashing. placed = [(0, b"\x01" * 200), (50, b"\x02\x02")] - track = dub_exec.assemble_timeline(placed, 1000, total_seconds=None) + track = dub_pipeline.assemble_timeline(placed, 1000, total_seconds=None) assert track == b"\x01" * 200 + b"\x02\x02" def test_assemble_timeline_skips_tail_when_track_is_long_enough(): - track = dub_exec.assemble_timeline([(0, b"\x01" * 200)], 1000, total_seconds=0.05) + track = dub_pipeline.assemble_timeline([(0, b"\x01" * 200)], 1000, total_seconds=0.05) assert track == b"\x01" * 200 @@ -154,9 +155,9 @@ def test_utterances_of_defaults_and_filtering(): utterance(4000, "C", " Bye "), ] ) - assert dub_exec._utterances_of(transcript, "tr_dub") == [ - dub_exec._Utterance(start_ms=0, speaker="A", text="Hi"), - dub_exec._Utterance(start_ms=4000, speaker="C", text="Bye"), + assert dub_pipeline.utterances_of(transcript, "tr_dub") == [ + dub_pipeline.Utterance(start_ms=0, speaker="A", text="Hi"), + dub_pipeline.Utterance(start_ms=4000, speaker="C", text="Bye"), ] @@ -167,7 +168,7 @@ def test_utterances_of_defaults_and_filtering(): ) def test_utterances_of_requires_spoken_utterances(utterances): with pytest.raises(CLIError) as exc: - dub_exec._utterances_of(SimpleNamespace(utterances=utterances), "tr_x") + dub_pipeline.utterances_of(SimpleNamespace(utterances=utterances), "tr_x") assert exc.value.error_type == "no_utterances" assert exc.value.exit_code == 2 assert "Transcript tr_x has no utterances to dub" in exc.value.message @@ -181,7 +182,7 @@ def test_utterances_of_requires_spoken_utterances(utterances): ) def test_total_seconds(duration, expected): transcript = SimpleNamespace(audio_duration=duration) - assert dub_exec._total_seconds(transcript) == expected + assert dub_pipeline.total_seconds(transcript) == expected # --- validation order (cheap local checks before any credential or network) ---- diff --git a/tests/test_transcribe_batch_sources.py b/tests/test_transcribe_batch_sources.py index 2cddaa2d..a1668dd8 100644 --- a/tests/test_transcribe_batch_sources.py +++ b/tests/test_transcribe_batch_sources.py @@ -14,7 +14,7 @@ import pytest from typer.testing import CliRunner -from aai_cli import config, transcribe_batch +from aai_cli import config, transcribe_batch, transcribe_sources from aai_cli.errors import UsageError from aai_cli.main import app @@ -129,7 +129,7 @@ def test_stdin_source_list_dedupes_preserving_order(monkeypatch): import io monkeypatch.setattr("sys.stdin", io.StringIO("b.mp3\na.mp3\nb.mp3\n")) - assert transcribe_batch.expand_sources(None, from_stdin=True, sample=False) == [ + assert transcribe_sources.expand_sources(None, from_stdin=True, sample=False) == [ "b.mp3", "a.mp3", ] @@ -158,7 +158,7 @@ def test_from_stdin_rejects_sample(): @pytest.mark.parametrize("source", ["-", "https://example.com/a.mp3", None, ""]) def test_non_batch_sources_return_none(source): - assert transcribe_batch.expand_sources(source, from_stdin=False, sample=False) is None + assert transcribe_sources.expand_sources(source, from_stdin=False, sample=False) is None def test_empty_source_is_rejected_not_treated_as_cwd(tmp_path, mocker, monkeypatch): @@ -175,13 +175,13 @@ def test_empty_source_is_rejected_not_treated_as_cwd(tmp_path, mocker, monkeypat def test_sample_returns_none_even_without_source(): - assert transcribe_batch.expand_sources(None, from_stdin=False, sample=True) is None + assert transcribe_sources.expand_sources(None, from_stdin=False, sample=True) is None def test_expand_sources_directory_error_message_names_the_path(tmp_path): (tmp_path / "calls").mkdir() with pytest.raises(UsageError, match="No audio files found under calls"): - transcribe_batch.expand_sources("calls", from_stdin=False, sample=False) + transcribe_sources.expand_sources("calls", from_stdin=False, sample=False) @pytest.mark.parametrize( @@ -295,7 +295,7 @@ def test_remote_glob_without_matches_exits_2(memory_fs): def test_plain_remote_file_url_stays_single_source(memory_fs): # No glob and no trailing slash: a bucket URL is one file, like a local path. for url in ("memory://calls/a.mp3", "memory://calls"): - assert transcribe_batch.expand_sources(url, from_stdin=False, sample=False) is None + assert transcribe_sources.expand_sources(url, from_stdin=False, sample=False) is None def test_sidecar_path_for_remote_url_is_slug_plus_hash(): diff --git a/tests/test_transcribe_out.py b/tests/test_transcribe_out.py index 69abb69a..faaa753f 100644 --- a/tests/test_transcribe_out.py +++ b/tests/test_transcribe_out.py @@ -118,7 +118,7 @@ def test_transcribe_out_missing_parent_dir_fails_before_transcribing(tmp_path): def test_transcribe_out_unwritable_parent_dir_fails_before_transcribing(tmp_path, monkeypatch): import os - from aai_cli import transcribe_exec + from aai_cli import transcribe_validate _auth() out = tmp_path / "x.txt" @@ -131,7 +131,7 @@ def fake_access(path, mode, **kwargs): return False return real_access(path, mode, **kwargs) - monkeypatch.setattr(transcribe_exec.os, "access", fake_access) + monkeypatch.setattr(transcribe_validate.os, "access", fake_access) with patch(_TRANSCRIBE) as tx: result = runner.invoke(app, ["transcribe", "audio.mp3", "--out", str(out)]) assert result.exit_code == 2