Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .importlinter
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ source_modules =
aai_cli.init_exec
aai_cli.llm
aai_cli.llm_exec
aai_cli.mediafile
aai_cli.microphone
aai_cli.options
aai_cli.output
Expand Down
98 changes: 26 additions & 72 deletions aai_cli/caption_exec.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,14 @@

from __future__ import annotations

import shutil
import subprocess
import tempfile
from dataclasses import dataclass
from pathlib import Path

import assemblyai as aai
from rich.markup import escape

from aai_cli import client, output, youtube
from aai_cli import client, mediafile, output, youtube
from aai_cli.context import AppState
from aai_cli.errors import CLIError, UsageError

Expand Down Expand Up @@ -62,52 +60,6 @@ def subtitles_filter(srt: Path, font_size: int | None) -> str:
return spec


def _validate_media(media: Path) -> None:
"""Reject a missing local source before credential resolution, so a typo'd
path reads as "file not found", never as a login prompt or an ffmpeg error."""
if not media.exists():
raise CLIError(
f"File not found: {media}",
error_type="file_not_found",
exit_code=2,
suggestion="Check the path. assembly caption needs a local video file.",
)
if not media.is_file():
raise CLIError(
f"Not a file: {media}",
error_type="not_a_file",
exit_code=2,
suggestion="Pass a video file, not a directory.",
)


def _validate_out(out: Path, media: Path) -> None:
"""The captioned file must never overwrite its own input: ffmpeg would read
and write the same file concurrently, corrupting it."""
if out.resolve() == media.resolve():
raise UsageError(
"--out would overwrite the input file.",
suggestion="Pick a different output path.",
)


def _require_ffmpeg() -> str:
"""The ffmpeg executable; checked before any (billed) transcription work."""
path = shutil.which("ffmpeg")
if path is None:
raise CLIError(
"ffmpeg is required to burn captions into video, but it isn't on PATH.",
error_type="missing_dependency",
suggestion="Install it (brew install ffmpeg / apt install ffmpeg) and re-run.",
)
return path


def _run_ffmpeg(args: list[str]) -> subprocess.CompletedProcess[str]:
"""Boundary seam for tests: one ffmpeg invocation, output captured."""
return subprocess.run(args, capture_output=True, text=True, check=False)


def _burn(ffmpeg: str, media: Path, srt: Path, out: Path, font_size: int | None) -> None:
"""Burn the ``srt`` captions into ``media``'s video stream, writing ``out``.

Expand All @@ -118,7 +70,7 @@ def _burn(ffmpeg: str, media: Path, srt: Path, out: Path, font_size: int | None)
re-run overwrite its own earlier output instead of stalling on ffmpeg's
prompt.
"""
result = _run_ffmpeg(
result = mediafile.run_ffmpeg(
[
ffmpeg,
"-hide_banner",
Expand All @@ -135,32 +87,20 @@ def _burn(ffmpeg: str, media: Path, srt: Path, out: Path, font_size: int | None)
"0:a?",
"-c:a",
"copy",
str(out),
mediafile.path_arg(out),
]
)
if result.returncode != 0:
detail = result.stderr.strip().splitlines()
reason = detail[-1] if detail else f"ffmpeg exited with code {result.returncode}"
raise CLIError(
f"Could not write {out.name}: {reason}",
raise mediafile.ffmpeg_failure(
result,
"write",
out,
error_type="caption_failed",
suggestion="Check that the input is a readable video file — captions "
"can't be burned into audio-only media.",
)


def _resolve_transcript(
opts: CaptionOptions, media: Path, state: AppState, *, json_mode: bool
) -> object:
"""The transcript whose captions are burned in: fetched by id, or made fresh
from the (already local) media file."""
if opts.transcript_id is not None:
return client.get_transcript(state.resolve_api_key(), opts.transcript_id)
api_key = state.resolve_api_key()
with output.status("Transcribing for captions…", json_mode=json_mode, quiet=state.quiet):
return client.transcribe(api_key, str(media), config=aai.TranscriptionConfig())


def _fetch_srt(transcript: object, opts: CaptionOptions, *, json_mode: bool, quiet: bool) -> str:
"""The transcript's SRT captions from the export endpoint; empty is an error."""
with output.status("Fetching captions…", json_mode=json_mode, quiet=quiet):
Expand All @@ -181,7 +121,7 @@ def _fetch_srt(transcript: object, opts: CaptionOptions, *, json_mode: bool, qui

def run_caption(opts: CaptionOptions, state: AppState, *, json_mode: bool) -> None:
"""Execute one `assembly caption` invocation from already-parsed flags."""
ffmpeg = _require_ffmpeg()
ffmpeg = mediafile.require_ffmpeg("burn captions into video")
if youtube.is_downloadable_url(opts.media):
# A media-page URL (YouTube, …) is downloaded once — always the full
# video, since the captions are burned into it. The download dir is
Expand All @@ -190,7 +130,7 @@ def run_caption(opts: CaptionOptions, state: AppState, *, json_mode: bool) -> No
with output.status("Downloading video…", json_mode=json_mode, quiet=state.quiet):
local = youtube.download_media(opts.media, Path(td), video=True)
out = opts.out if opts.out is not None else Path.cwd() / default_out_path(local).name
_validate_out(out, local)
mediafile.validate_out(out, local)
_caption_and_emit(opts, local, out, ffmpeg, state, json_mode=json_mode)
return
if opts.media.startswith(("http://", "https://")):
Expand All @@ -199,10 +139,16 @@ def run_caption(opts: CaptionOptions, state: AppState, *, json_mode: bool) -> No
"media-page URL yt-dlp can download (YouTube, …).",
suggestion="Download the video first, then caption the local copy.",
)
if "://" in opts.media:
# Path() would collapse the "//" and report a corrupted echo of the URL.
raise UsageError(
f"assembly caption needs a local file, not a URL: {opts.media}",
suggestion="Download the video first, then caption the local copy.",
)
media = Path(opts.media)
_validate_media(media)
mediafile.validate_local_media(media, "caption", kind="video")
out = opts.out if opts.out is not None else default_out_path(media)
_validate_out(out, media)
mediafile.validate_out(out, media)
_caption_and_emit(opts, media, out, ffmpeg, state, json_mode=json_mode)


Expand All @@ -216,7 +162,15 @@ def _caption_and_emit(
json_mode: bool,
) -> None:
"""Caption an already-local video file into ``out`` and report the result."""
transcript = _resolve_transcript(opts, media, state, json_mode=json_mode)
transcript = mediafile.resolve_transcript(
state.resolve_api_key(),
opts.transcript_id,
media,
status_message="Transcribing for captions…",
json_mode=json_mode,
quiet=state.quiet,
config=aai.TranscriptionConfig(),
)
transcript_id = str(getattr(transcript, "id", ""))
srt = _fetch_srt(transcript, opts, json_mode=json_mode, quiet=state.quiet)
captions = srt.count("-->") # one arrow per SRT cue timing line
Expand Down
74 changes: 15 additions & 59 deletions aai_cli/clip_exec.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,14 @@
from __future__ import annotations

import json
import shutil
import subprocess
import tempfile
from dataclasses import dataclass
from pathlib import Path
from types import SimpleNamespace

import assemblyai as aai
from rich.markup import escape

from aai_cli import client, clip_select, jsonshape, llm, output, stdio, youtube
from aai_cli import clip_select, jsonshape, llm, mediafile, output, stdio, youtube
from aai_cli.clip_select import Segment
from aai_cli.context import AppState
from aai_cli.errors import CLIError, UsageError
Expand Down Expand Up @@ -141,12 +138,14 @@ def _resolve_transcript(
if text.startswith("{"):
return _piped_transcript(text)
transcript_id = text # a bare id (e.g. from `assembly transcribe … -o id`)
if transcript_id is not None:
return client.get_transcript(state.resolve_api_key(), transcript_id)
config = aai.TranscriptionConfig(speaker_labels=True)
api_key = state.resolve_api_key()
with output.status("Transcribing for clip selection…", json_mode=json_mode, quiet=state.quiet):
return client.transcribe(api_key, str(media), config=config)
return mediafile.resolve_diarized_transcript(
state.resolve_api_key(),
transcript_id,
media,
status_message="Transcribing for clip selection…",
json_mode=json_mode,
quiet=state.quiet,
)


def _transcript_segments(
Expand Down Expand Up @@ -191,26 +190,6 @@ def _transcript_segments(
return [clip_select.segment_of(utterance) for utterance in matched], transcript_id


def _validate_media(media: Path) -> None:
"""Reject a missing local source before credential resolution, so a typo'd
path reads as "file not found", never as a login prompt or an opaque
ffmpeg error."""
if not media.exists():
raise CLIError(
f"File not found: {media}",
error_type="file_not_found",
exit_code=2,
suggestion="Check the path. assembly clip needs a local audio/video file.",
)
if not media.is_file():
raise CLIError(
f"Not a file: {media}",
error_type="not_a_file",
exit_code=2,
suggestion="Pass a media file, not a directory.",
)


def _validate_out_dir(out_dir: Path | None) -> None:
if out_dir is not None and not out_dir.is_dir():
raise UsageError(
Expand All @@ -236,23 +215,6 @@ def _validate_selection(opts: ClipOptions) -> None:
)


def _require_ffmpeg() -> str:
"""The ffmpeg executable; checked before any (billed) transcription work."""
path = shutil.which("ffmpeg")
if path is None:
raise CLIError(
"ffmpeg is required to cut media, but it isn't on PATH.",
error_type="missing_dependency",
suggestion="Install it (brew install ffmpeg / apt install ffmpeg) and re-run.",
)
return path


def _run_ffmpeg(args: list[str]) -> subprocess.CompletedProcess[str]:
"""Boundary seam for tests: one ffmpeg invocation, output captured."""
return subprocess.run(args, capture_output=True, text=True, check=False)


# -30dB for at least 0.2s reads as a pause in normal speech recordings.
_SILENCE_FILTER = "silencedetect=noise=-30dB:d=0.2"

Expand All @@ -265,7 +227,7 @@ def _detect_silences(ffmpeg: str, media: Path) -> list[Segment]:
silencedetect logs at info level on stderr, so the usual ``-loglevel
error`` would silence the very lines this parses.
"""
result = _run_ffmpeg(
result = mediafile.run_ffmpeg(
[
ffmpeg,
"-hide_banner",
Expand All @@ -291,7 +253,7 @@ def _cut_clip(ffmpeg: str, media: Path, segment: Segment, dest: Path) -> None:
would snap to the nearest keyframe; ``-y`` makes a re-run overwrite its own
earlier output instead of stalling on ffmpeg's prompt.
"""
result = _run_ffmpeg(
result = mediafile.run_ffmpeg(
[
ffmpeg,
"-hide_banner",
Expand All @@ -304,17 +266,11 @@ def _cut_clip(ffmpeg: str, media: Path, segment: Segment, dest: Path) -> None:
f"{segment.start:.3f}",
"-to",
f"{segment.end:.3f}",
str(dest),
mediafile.path_arg(dest),
]
)
if result.returncode != 0:
detail = result.stderr.strip().splitlines()
reason = detail[-1] if detail else f"ffmpeg exited with code {result.returncode}"
raise CLIError(
f"Could not cut {dest.name}: {reason}",
error_type="clip_failed",
suggestion="Check that the input is a readable audio/video file.",
)
raise mediafile.ffmpeg_failure(result, "cut", dest, error_type="clip_failed")


def _clip_dest(media: Path, out_dir: Path | None, index: int) -> Path:
Expand Down Expand Up @@ -350,7 +306,7 @@ def run_clip(opts: ClipOptions, state: AppState, *, json_mode: bool) -> None:
_validate_selection(opts)
youtube.validate_video_flag(opts.media, video=opts.video)
explicit = [clip_select.parse_range(value) for value in opts.ranges]
ffmpeg = _require_ffmpeg()
ffmpeg = mediafile.require_ffmpeg("cut media")
if youtube.is_downloadable_url(opts.media):
# A media-page URL (YouTube, podcast page, …) is downloaded once — the
# audio track by default, the full video with --video so the clips carry
Expand All @@ -371,7 +327,7 @@ def run_clip(opts: ClipOptions, state: AppState, *, json_mode: bool) -> None:
suggestion="Download the media first, then clip the local copy.",
)
media = Path(opts.media)
_validate_media(media)
mediafile.validate_local_media(media, "clip")
_cut_and_emit(opts, media, opts.out_dir, explicit, ffmpeg, state, json_mode=json_mode)


Expand Down
6 changes: 6 additions & 0 deletions aai_cli/commands/dub.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ def dub(
"-l",
help="Target language: an ISO code (de, fr, es, …) or a language name (German).",
),
source_lang: str | None = typer.Option(
None,
"--source-lang",
help="ISO code of the source audio (e.g. de). Default: auto-detect the language.",
),
transcript_id: str | None = typer.Option(
None,
"--transcript-id",
Expand Down Expand Up @@ -119,6 +124,7 @@ def dub(
opts = dub_exec.DubOptions(
media=media,
language=lang,
source_language=source_lang,
transcript_id=transcript_id,
voice=voice,
model=model,
Expand Down
Loading
Loading