Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .importlinter
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ source_modules =
aai_cli.options
aai_cli.output
aai_cli.procs
aai_cli.remotefs
aai_cli.render
aai_cli.speak_exec
aai_cli.stdio
Expand Down Expand Up @@ -93,6 +94,7 @@ source_modules =
aai_cli.eval_data
aai_cli.hotkey
aai_cli.llm
aai_cli.remotefs
aai_cli.sync_stt
aai_cli.telemetry
aai_cli.wer
Expand Down
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ assembly init # scaffold a starter app
## 📋 Key features

- **Transcription**: `assembly transcribe` handles files, URLs, and YouTube/podcast pages, with flags for speaker labels, PII redaction, summarization, sentiment, chapters, and more.
- **Batch transcription**: point `assembly transcribe` at a directory or glob (or pipe paths with `--from-stdin`) to transcribe everything concurrently, with sidecar files that make re-runs resumable. Add `--llm "prompt"` to run an LLM prompt over each finished transcript, saved into the sidecars.
- **Batch transcription**: point `assembly transcribe` at a directory or glob — local, or in bucket storage (`"s3://bucket/calls/*.mp3"`, `gs://`, `az://`, …, with the matching fsspec backend such as `s3fs` installed) — or pipe paths with `--from-stdin` to transcribe everything concurrently, with sidecar files that make re-runs resumable. Add `--llm "prompt"` to run an LLM prompt over each finished transcript, saved into the sidecars.
- **Real-time streaming**: `assembly stream` transcribes the microphone, a file, or a URL live — on macOS it can capture system audio too.
- **Dictation**: `assembly dictate` is push-to-talk for your terminal — press Enter to record, Enter again to get the utterance back instantly from the Sync API (up to 120 s per utterance).
- **Voice agent**: `assembly agent` runs a full-duplex spoken conversation in your terminal.
Expand All @@ -203,6 +203,7 @@ Transcribe in batches — a directory, a glob, or a piped list, resumable on re-

```sh
assembly transcribe ./recordings
assembly transcribe "s3://bucket/calls/*.mp3" # needs: pip install s3fs
find . -name "*.wav" | assembly transcribe --from-stdin
```

Expand Down
11 changes: 8 additions & 3 deletions aai_cli/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
StreamingParameters,
)

from aai_cli import environments, jsonshape, stdio
from aai_cli import environments, jsonshape, remotefs, stdio
from aai_cli.errors import APIError, CLIError, UsageError, auth_failure, is_auth_failure
from aai_cli.streaming.diagnostics import classify_error, silence_streaming_logging

Expand Down Expand Up @@ -45,14 +45,17 @@ def _make_streaming_client(api_key: str) -> _StreamingClientLike:
return client


def resolve_audio_source(source: str | None, *, sample: bool, check_local: bool = True) -> str:
def resolve_audio_source(
source: str | None, *, sample: bool, check_local: bool = True, allow_remote: bool = False
) -> str:
"""The audio reference to use: the hosted --sample clip, else the given path/URL.

Shared by `transcribe` and `stream` so both accept a file or URL and `--sample`.
A local path that doesn't exist fails here — before any credential resolution or
request — so a typo'd filename reads as "file not found", not as an API failure.
``--show-code`` paths pass ``check_local=False``: generating code for a file you
don't have yet is legitimate.
don't have yet is legitimate. Callers that can fetch fsspec bucket URLs (s3://,
gs://, …) pass ``allow_remote=True``; elsewhere those fail as missing files.
"""
if sample:
if source:
Expand All @@ -68,6 +71,8 @@ def resolve_audio_source(source: str | None, *, sample: bool, check_local: bool
suggestion="Or pass --sample to use the hosted demo file.",
)
if check_local and not source.startswith(("http://", "https://")):
if allow_remote and remotefs.is_remote_url(source):
return source
path = Path(source)
if not path.exists():
raise CLIError(
Expand Down
9 changes: 8 additions & 1 deletion aai_cli/commands/transcribe.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
("Transcribe a local file", "assembly transcribe call.mp3"),
("Batch-transcribe a folder", "assembly transcribe ./recordings"),
("Batch-transcribe a glob", 'assembly transcribe "calls/*.mp3"'),
("Batch-transcribe an S3 prefix", 'assembly transcribe "s3://bucket/calls/*.mp3"'),
("Try it with the hosted sample", "assembly transcribe --sample"),
("Transcribe a YouTube video", "assembly transcribe https://youtu.be/dtp6b76pMak"),
("Transcribe a podcast page", 'assembly transcribe "https://podcasts.apple.com/…"'),
Expand All @@ -33,7 +34,9 @@
def transcribe(
ctx: typer.Context,
source: str | None = typer.Argument(
None, help="Audio file, URL, YouTube/podcast URL, or a directory/glob (batch mode)."
None,
help="Audio file, URL, YouTube/podcast URL, bucket URL (s3://, gs://, …), or a "
"directory/glob (batch mode).",
),
sample: bool = typer.Option(False, "--sample", help="Use the hosted wildfires.mp3 sample."),
# batch mode
Expand Down Expand Up @@ -348,6 +351,10 @@ def transcribe(
sources already transcribed — with changed --llm prompts it replays just
the LLM step, never a second transcription.

Bucket URLs (s3://, gs://, az://, sftp://, …) work for single files and for
batches (a glob, or a folder ending in /); install the matching fsspec
backend first (e.g. pip install s3fs) and use its usual credentials.

Curated flags cover common features; --config KEY=VALUE and --config-file reach every other field. Analysis (summary, chapters, ...) renders in human mode.
"""
opts = transcribe_exec.TranscribeOptions(
Expand Down
134 changes: 134 additions & 0 deletions aai_cli/remotefs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
"""Fetching audio from fsspec-addressable storage (s3://, gs://, az://, sftp://, …).

The AssemblyAI API fetches plain http(s) URLs itself, and `youtube.py` handles
media-page URLs; this module covers bucket/remote-storage URLs neither can read.
fsspec core ships with the CLI, but each protocol's backend (s3fs, gcsfs, adlfs,
…) is an optional install — a missing one surfaces as a clean install hint, not a
traceback. Credentials are the backend's business (e.g. the standard AWS
environment/config for s3://), never AssemblyAI's API key.
"""

from __future__ import annotations

from abc import abstractmethod
from contextlib import contextmanager
from pathlib import Path, PurePosixPath
from typing import TYPE_CHECKING, Protocol

from aai_cli.errors import CLIError

if TYPE_CHECKING:
from collections.abc import Generator

# Schemes that never route through fsspec even though it knows them: the API
# fetches web URLs itself, and file:// paths take the ordinary local checks.
_NON_REMOTE_PROTOCOLS = frozenset({"http", "https", "file", "local"})


class _RemoteFS(Protocol):
"""The slice of ``fsspec.AbstractFileSystem`` this module touches."""

@abstractmethod
def glob(self, path: str, **kwargs: bool) -> dict[str, dict[str, object]]:
"""Expand a glob; with ``detail=True``, a path -> info mapping."""

@abstractmethod
def find(self, path: str) -> list[str]:
"""Every file under ``path``, recursively."""

@abstractmethod
def unstrip_protocol(self, name: str) -> str:
"""Re-attach the protocol prefix to a bare path."""

@abstractmethod
def get_file(self, rpath: str, lpath: str) -> None:
"""Copy one remote file to a local path."""


def is_remote_url(source: str | None) -> bool:
"""True if `source` is a URL whose scheme names an fsspec filesystem (s3://, gs://, …).

http(s) is excluded (the API fetches web URLs itself), as is file:// (local
files go through the ordinary path checks); an unknown scheme is not remote.
"""
if not source or "://" not in source:
return False
protocol = source.partition("://")[0]
if protocol in _NON_REMOTE_PROTOCOLS:
return False
from fsspec.registry import known_implementations

return protocol in known_implementations


def _filesystem(url: str) -> tuple[_RemoteFS, str]:
"""The (filesystem, bare path) pair for `url`.

A protocol whose backend isn't installed becomes a clean CLIError; fsspec's
own ImportError text already names the package to install (e.g. "Install
s3fs to access S3").
"""
import fsspec

try:
# fsspec ships no py.typed; the declared annotation pins the pair's type.
pair: tuple[_RemoteFS, str] = fsspec.url_to_fs(url) # pyright: ignore[reportUnknownMemberType, reportUnknownVariableType]
except ImportError as exc:
protocol = url.partition("://")[0]
raise CLIError(
f"Reading {protocol}:// URLs needs an extra package.",
error_type="remote_backend_missing",
exit_code=2,
suggestion=f"{exc}.",
) from exc
return pair


@contextmanager
def _remote_errors(url: str) -> Generator[None]:
"""Normalize backend exceptions for one remote operation into clean CLIErrors."""
try:
yield
except FileNotFoundError as exc:
raise CLIError(
f"Remote file not found: {url}",
error_type="file_not_found",
exit_code=2,
suggestion="Check the path. A folder needs a trailing '/'; globs (*.mp3) also work.",
) from exc
except CLIError:
raise
except Exception as exc: # backends raise many types; surface one clean CLI error
reason = " ".join(str(exc).split())
raise CLIError(
f"Could not access {url}: {reason}",
error_type="remote_error",
exit_code=1,
) from exc


def download(url: str, dest_dir: Path) -> Path:
"""Copy the remote file at `url` into `dest_dir` and return its local path."""
fs, path = _filesystem(url)
local = dest_dir / PurePosixPath(path).name
with _remote_errors(url):
fs.get_file(path, str(local))
return local


def glob_files(url: str) -> list[str]:
"""The full URLs of the files matching the remote glob `url`, sorted."""
fs, path = _filesystem(url)
with _remote_errors(url):
details = fs.glob(path, detail=True)
return sorted(
fs.unstrip_protocol(match) for match, info in details.items() if info.get("type") == "file"
)


def list_files(url: str) -> list[str]:
"""The full URLs of every file under the remote folder/prefix `url`, recursively, sorted."""
fs, path = _filesystem(url)
with _remote_errors(url):
found = fs.find(path)
return sorted(fs.unstrip_protocol(match) for match in found)
40 changes: 34 additions & 6 deletions aai_cli/transcribe_batch.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
"""Batch transcription: directories, globs, and stdin lists with sidecar resume.

``assembly transcribe`` switches to batch mode when the source is a directory or a
glob pattern, or when ``--from-stdin`` supplies one path/URL per line. Sources run
glob pattern — local, or on fsspec-addressable remote storage (an ``s3://…/*.mp3``
glob, or a trailing-slash folder like ``s3://bucket/calls/``) — or when
``--from-stdin`` supplies one path/URL per line. Sources run
concurrently behind a live progress table; each finished source gets a
``<source>.aai.json`` sidecar holding the full transcript. The sidecar doubles as
the resume marker — a re-run skips any source whose sidecar records a completed
Expand Down Expand Up @@ -29,7 +31,7 @@
from rich.live import Live
from rich.markup import escape

from aai_cli import client, jsonshape, llm, output, stdio, theme, transcribe_exec
from aai_cli import client, jsonshape, llm, output, remotefs, stdio, theme, transcribe_exec
from aai_cli.errors import CLIError, NotAuthenticated, UsageError, mutually_exclusive

if TYPE_CHECKING:
Expand Down Expand Up @@ -73,8 +75,9 @@ def expand_sources(source: str | None, *, from_stdin: bool, sample: bool) -> lis
"""The batch source list, or ``None`` when this is a single-source invocation.

Batch mode triggers on ``--from-stdin``, a directory (scanned recursively for
audio files), or a glob pattern that names no existing file. A plain file, URL,
``-`` (audio piped on stdin), or ``--sample`` stays on the single-source path.
audio files), a glob pattern that names no existing file, or a bucket URL
that is a glob or trailing-slash folder. A plain file, URL, ``-`` (audio
piped on stdin), or ``--sample`` stays on the single-source path.
"""
if from_stdin:
return _stdin_sources(source, sample=sample)
Expand All @@ -84,6 +87,8 @@ def expand_sources(source: str | None, *, from_stdin: bool, sample: bool) -> lis
# whole working directory; instead it stays single-source and fails validation.
if not source or sample or source == "-" or source.startswith(_URL_PREFIXES):
return None
if remotefs.is_remote_url(source):
return _remote_sources(source)
path = Path(source)
if path.is_dir():
return _directory_sources(path)
Expand Down Expand Up @@ -119,6 +124,29 @@ def _directory_sources(path: Path) -> list[str]:
return files


def _remote_sources(url: str) -> list[str] | None:
"""Batch sources for a bucket/remote URL, or ``None`` when it's a single file.

Mirrors the local rules: a glob expands to its file matches (sidecars
excluded), a trailing-slash folder to its audio files (recursive, filtered by
``AUDIO_EXTENSIONS``); anything else is downloaded as one file.
"""
if _GLOB_CHARS.intersection(url):
matches = [u for u in remotefs.glob_files(url) if not u.endswith(SIDECAR_SUFFIX)]
if not matches:
raise UsageError(f"No files match {url}.")
return matches
if url.endswith("/"):
files = [u for u in remotefs.list_files(url) if Path(u).suffix.lower() in AUDIO_EXTENSIONS]
if not files:
raise UsageError(
f"No audio files found under {url}.",
suggestion="Recognized extensions: " + ", ".join(sorted(AUDIO_EXTENSIONS)) + ".",
)
return files
return None


def _glob_sources(pattern: str) -> list[str]:
# pathlib globs are always relative, so peel an absolute pattern's anchor off
# and glob from there ("" anchors at the working directory; Path("") is ".").
Expand Down Expand Up @@ -159,8 +187,8 @@ def reject_single_source_flags(

def sidecar_path(source: str) -> Path:
"""Where ``source``'s sidecar lives: ``<file>.aai.json`` next to a local file, or
a slug + URL-hash name in the working directory for a URL."""
if source.startswith(_URL_PREFIXES):
a slug + URL-hash name in the working directory for a URL (web or bucket)."""
if source.startswith(_URL_PREFIXES) or remotefs.is_remote_url(source):
digest = hashlib.sha256(source.encode()).hexdigest()[:8]
slug = re.sub(r"[^A-Za-z0-9._-]+", "-", source.partition("://")[2]).strip("-.")[:64]
return Path(f"{slug}-{digest}{SIDECAR_SUFFIX}")
Expand Down
15 changes: 13 additions & 2 deletions aai_cli/transcribe_exec.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
config_builder,
llm,
output,
remotefs,
stdio,
transcribe_render,
youtube,
Expand Down Expand Up @@ -155,7 +156,7 @@ def check_source_exists(source: str | None, *, sample: bool) -> None:
Stdin (``-``) is exempt: its bytes are buffered at transcription time.
"""
if source != "-":
client.resolve_audio_source(source, sample=sample)
client.resolve_audio_source(source, sample=sample, allow_remote=True)


def run_transcription(
Expand All @@ -177,7 +178,12 @@ def run_transcription(
local.write_bytes(data)
return client.transcribe(api_key, str(local), config=transcription_config)

audio = client.resolve_audio_source(source, sample=sample)
audio = client.resolve_audio_source(source, sample=sample, allow_remote=True)
if remotefs.is_remote_url(audio):
# Fetch from bucket/remote storage first; the API can't read s3://-style URLs.
with tempfile.TemporaryDirectory(prefix="aai-remote-") as td:
local = remotefs.download(audio, Path(td))
return client.transcribe(api_key, str(local), config=transcription_config)
if youtube.is_downloadable_url(audio):
# Fetch first; AssemblyAI can't read a YouTube/podcast page URL itself.
with tempfile.TemporaryDirectory(prefix="aai-yt-") as td:
Expand Down Expand Up @@ -368,6 +374,11 @@ def _print_show_code(opts: TranscribeOptions, merged: dict[str, object]) -> None
Raw stdout, so `--show-code > script.py` runs. No source/--sample needed — fall
back to a placeholder path for a pure snippet.
"""
if opts.source and remotefs.is_remote_url(opts.source):
raise UsageError(
"--show-code does not support bucket URLs (s3://, gs://, …) yet.",
suggestion="Download the audio first and pass the local file.",
)
audio = (
client.resolve_audio_source(opts.source, sample=opts.sample, check_local=False)
if opts.source or opts.sample
Expand Down
8 changes: 6 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ dependencies = [
# `assembly eval` WER scoring uses jiwer (the de-facto standard WER
# implementation); wer.py imports it lazily to keep CLI startup fast.
"jiwer>=4.0",
# Bucket-URL audio sources (s3://, gs://, …) for `assembly transcribe` (remotefs.py,
# imported lazily). fsspec core only — each protocol's backend (s3fs, gcsfs, adlfs,
# …) stays a user-installed extra surfaced via a clean install hint.
"fsspec>=2026.4.0",
]

[project.urls]
Expand Down Expand Up @@ -271,8 +275,8 @@ paths = ["aai_cli", "tests"]
exclude = ["aai_cli/_version.py"]
min_confidence = 90
ignore_decorators = ["@app.command", "@app.callback"]
ignore_names = ["app", "capture_output", "download", "healthy", "ist", "memory_keyring", "org",
"preserve_logging_state", "refresh"]
ignore_names = ["app", "capture_output", "download", "healthy", "ist", "lpath", "memory_keyring",
"org", "preserve_logging_state", "refresh", "rpath"]

[tool.deptry]
exclude = ["docs", "dist", ".venv", "aai_cli/init/templates"]
Expand Down
Loading
Loading