diff --git a/docs/TRANSCRIPTION-PROVIDER-MATRIX-2026-05-18.md b/docs/TRANSCRIPTION-PROVIDER-MATRIX-2026-05-18.md index d05ea51..0983022 100644 --- a/docs/TRANSCRIPTION-PROVIDER-MATRIX-2026-05-18.md +++ b/docs/TRANSCRIPTION-PROVIDER-MATRIX-2026-05-18.md @@ -46,6 +46,39 @@ adding more providers. the file-based OpenAI diarization path, because both have first-class streaming APIs and richer vocabulary/diarization controls. +## Implemented Router Behavior + +`whisperforge_core.audio.build_transcription_plan()` now exposes the provider +router contract as fixture-friendly structured data. This is a planning layer, +not a runtime behavior change: `transcribe_audio()` still uses the existing +default OpenAI path, size chunker, and sequential chunk transcription unless +the caller explicitly selects another backend or chunker. + +The implemented plan fields connect this matrix to code: + +| Plan field | Implemented behavior | +| --- | --- | +| `capabilities` | Reports backend limits and feature flags for `openai`, `mlx`, `whisper_cpp`, and `whisperx`. | +| `media` | Summarizes ffprobe-style media fixtures, or stays unprobed when no fixture/inspection is requested. | +| `normalization` | Emits a planned-only FFmpeg command for video extraction or large probed audio that needs mono 16 kHz PCM normalization. | +| `output_contract` | Marks text-only backends versus WhisperX segment timestamps and diarization capability. | +| `privacy` | States whether audio leaves the device, which cloud provider receives it, and which local temp artifacts are expected. | +| `cost` | States whether provider API billing applies, estimated billable minutes when duration is known, and whether local/FFmpeg compute is expected. | + +Current router fixture coverage pins the main lanes: + +- `openai`: cloud, billable audio-minute receipt, chunked for files over + `CHUNK_THRESHOLD_BYTES`, and no default FFmpeg probe. +- `mlx`: local/private receipt, no provider API billing, and no normalization + for ordinary small audio. +- `whisperx`: local timestamp-capable plan, whole-file default for large inputs + unless `CHUNKER=vad`, and explicit diarization-capable output metadata. +- Video sources and large probed audio: planned FFmpeg extraction/resampling + before transcription, without requiring FFmpeg in the default unit suite. + +Do not enable this normalization path as a runtime default until a product +decision accepts the `privacy` and `cost` receipts for the selected backend. + ## Smallest Next Integration Add an OpenAI diarized transcription mode without changing the default. diff --git a/tests/test_audio.py b/tests/test_audio.py index 1c5325b..2ae01fd 100644 --- a/tests/test_audio.py +++ b/tests/test_audio.py @@ -6,6 +6,7 @@ client boundary — no network traffic. """ +import json from pathlib import Path from unittest.mock import MagicMock @@ -42,6 +43,35 @@ def mock_openai(monkeypatch): return client +def media_probe_fixture( + *, + duration="120.0", + audio_codec="aac", + sample_rate="48000", + channels=2, + video=False, + container="wav", +): + streams = [] + if video: + streams.append({ + "codec_type": "video", + "codec_name": "h264", + "duration": duration, + }) + streams.append({ + "codec_type": "audio", + "codec_name": audio_codec, + "sample_rate": sample_rate, + "channels": channels, + "duration": duration, + }) + return { + "streams": streams, + "format": {"duration": duration, "format_name": container}, + } + + class TestChunkAudio: def test_small_file_yields_single_chunk(self, silent_wav): chunks, tmp_dir = audio.chunk_audio(silent_wav, target_size_mb=25) @@ -96,6 +126,72 @@ def boom(*a, **k): class TestTranscriptionRouterPlan: + def test_probe_media_uses_ffprobe_json(self, silent_wav, monkeypatch): + probe = media_probe_fixture( + duration="42.5", + audio_codec="pcm_s16le", + sample_rate="16000", + channels=1, + ) + result = MagicMock() + result.stdout = json.dumps(probe) + calls = {} + + def fake_run(argv, check, capture_output, text): + calls["argv"] = argv + calls["check"] = check + calls["capture_output"] = capture_output + calls["text"] = text + return result + + monkeypatch.setattr(audio.subprocess, "run", fake_run) + + assert audio.probe_media(silent_wav) == probe + assert calls["argv"][0] == "ffprobe" + assert str(silent_wav) in calls["argv"] + assert calls["check"] is True + assert calls["capture_output"] is True + assert calls["text"] is True + + def test_plan_does_not_probe_media_by_default(self, tmp_path, monkeypatch): + path = tmp_path / "small.wav" + path.write_bytes(b"audio") + + def fail_probe(_path): + raise AssertionError("ffprobe should not run by default") + + monkeypatch.setattr(audio, "probe_media", fail_probe) + + plan = audio.build_transcription_plan(path, backend="openai") + + assert plan["strategy"] == "single_pass" + assert plan["media"]["probe_available"] is False + assert plan["normalization"]["commands"] == [] + + def test_plan_inspects_media_when_requested(self, tmp_path, monkeypatch): + path = tmp_path / "clip.mp4" + path.write_bytes(b"video") + probe = media_probe_fixture(video=True, container="mov,mp4,m4a,3gp,3g2,mj2") + calls = [] + + def fake_probe(source_path): + calls.append(source_path) + return probe + + monkeypatch.setattr(audio, "probe_media", fake_probe) + + plan = audio.build_transcription_plan( + path, + backend="openai", + inspect_media=True, + ) + + assert calls == [path] + assert plan["media"]["probe_available"] is True + assert plan["media"]["has_video"] is True + assert plan["normalization"]["required"] is True + assert "ffprobe" in plan["privacy"]["local_processing_steps"] + def test_transcription_capabilities_reports_whisperx_supports_segments(self): caps = audio.transcription_capabilities("whisperx") assert caps["backend"] == "whisperx" @@ -103,6 +199,49 @@ def test_transcription_capabilities_reports_whisperx_supports_segments(self): assert caps["supports_diarization"] is True assert caps["privacy_mode"] == "local" + def test_plan_cloud_backend_receipt_shows_upload_and_billable_minutes(self, tmp_path): + path = tmp_path / "meeting.wav" + path.write_bytes(b"audio") + probe = media_probe_fixture( + duration="90.0", + audio_codec="pcm_s16le", + sample_rate="16000", + channels=1, + ) + + plan = audio.build_transcription_plan( + path, + backend="openai", + media_probe=probe, + ) + + assert plan["privacy"]["mode"] == "cloud" + assert plan["privacy"]["audio_leaves_device"] is True + assert plan["privacy"]["cloud_provider"] == "openai" + assert plan["cost"]["provider_api_billable"] is True + assert plan["cost"]["estimated_billable_minutes"] == 1.5 + assert plan["cost"]["pricing_review_required"] is True + + def test_plan_local_private_backend_receipt_stays_offline(self, tmp_path): + path = tmp_path / "private.m4a" + path.write_bytes(b"audio") + probe = media_probe_fixture( + duration="60.0", + audio_codec="aac", + sample_rate="44100", + channels=2, + container="mov,mp4,m4a,3gp,3g2,mj2", + ) + + plan = audio.build_transcription_plan(path, backend="mlx", media_probe=probe) + + assert plan["strategy"] == "single_pass" + assert plan["privacy"]["mode"] == "local" + assert plan["privacy"]["audio_leaves_device"] is False + assert plan["privacy"]["cloud_provider"] is None + assert plan["cost"]["provider_api_billable"] is False + assert plan["normalization"]["required"] is False + def test_plan_large_openai_uses_size_chunking(self, tmp_path): path = tmp_path / "large.wav" path.write_bytes(b"0" * (audio.CHUNK_THRESHOLD_BYTES + 1024)) @@ -113,6 +252,40 @@ def test_plan_large_openai_uses_size_chunking(self, tmp_path): assert "exceeds_chunk_threshold" in plan["reasons"] assert plan["requires_ffmpeg"] is False + def test_plan_large_audio_fixture_adds_ffmpeg_normalization(self, tmp_path): + path = tmp_path / "large.wav" + path.write_bytes(b"0" * (audio.CHUNK_THRESHOLD_BYTES + 1024)) + normalized = tmp_path / "normalized.wav" + probe = media_probe_fixture( + duration="125.0", + audio_codec="aac", + sample_rate="48000", + channels=2, + ) + + plan = audio.build_transcription_plan( + path, + backend="openai", + chunker="size", + media_probe=probe, + normalized_audio_path=normalized, + ) + + assert plan["strategy"] == "chunked_size" + assert plan["requires_ffmpeg"] is True + assert plan["media"]["duration_seconds"] == 125.0 + assert plan["normalization"]["required"] is True + assert plan["normalization"]["target"]["sample_rate_hz"] == 16000 + assert plan["normalization"]["target"]["channels"] == 1 + assert plan["normalization"]["output_path"] == str(normalized) + assert plan["normalization"]["commands"][0]["argv"][0] == "ffmpeg" + assert str(normalized) in plan["normalization"]["commands"][0]["argv"] + assert "ffprobe" in plan["privacy"]["local_processing_steps"] + assert "ffmpeg_normalization" in plan["privacy"]["local_processing_steps"] + assert "chunking" in plan["privacy"]["local_processing_steps"] + assert plan["privacy"]["temp_artifacts"] == ["normalized_audio", "chunks"] + assert plan["cost"]["ffmpeg_compute_required"] is True + def test_plan_large_whisperx_prefers_whole_file_without_vad(self, tmp_path): path = tmp_path / "large.wav" path.write_bytes(b"0" * (audio.CHUNK_THRESHOLD_BYTES + 1024)) @@ -121,6 +294,7 @@ def test_plan_large_whisperx_prefers_whole_file_without_vad(self, tmp_path): assert plan["strategy"] == "whole_file" assert plan["capabilities"]["supports_segments"] is True + assert plan["output_contract"]["timestamps"] == "segments" def test_plan_large_whisperx_uses_vad_when_requested(self, tmp_path): path = tmp_path / "large.wav" @@ -130,6 +304,33 @@ def test_plan_large_whisperx_uses_vad_when_requested(self, tmp_path): assert plan["strategy"] == "chunked_vad" + def test_plan_whisperx_fixture_is_timestamped_and_diarization_capable(self, tmp_path): + path = tmp_path / "interview.wav" + path.write_bytes(b"0" * (audio.CHUNK_THRESHOLD_BYTES + 1024)) + probe = media_probe_fixture( + duration="600.0", + audio_codec="pcm_s16le", + sample_rate="16000", + channels=1, + ) + + plan = audio.build_transcription_plan( + path, + backend="whisperx", + chunker="size", + media_probe=probe, + ) + + assert plan["strategy"] == "whole_file" + assert plan["output_contract"]["segments"] is True + assert plan["output_contract"]["timestamps"] == "segments" + assert plan["output_contract"]["diarization"]["capable"] is True + assert plan["output_contract"]["diarization"]["requires_hf_token"] is True + assert plan["privacy"]["mode"] == "local" + assert plan["privacy"]["audio_leaves_device"] is False + assert plan["cost"]["provider_api_billable"] is False + assert plan["normalization"]["required"] is False + def test_plan_video_source_flags_ffmpeg_requirement(self, tmp_path): path = tmp_path / "clip.mp4" path.write_bytes(b"video-bytes") @@ -138,3 +339,5 @@ def test_plan_video_source_flags_ffmpeg_requirement(self, tmp_path): assert plan["requires_ffmpeg"] is True assert "video_source_requires_extraction" in plan["reasons"] + assert plan["normalization"]["required"] is True + assert plan["normalization"]["commands"][0]["argv"][0] == "ffmpeg" diff --git a/whisperforge_core/audio.py b/whisperforge_core/audio.py index 77f7581..2d3348a 100644 --- a/whisperforge_core/audio.py +++ b/whisperforge_core/audio.py @@ -9,9 +9,11 @@ """ import hashlib +import json import math import os import shutil +import subprocess import tempfile from dataclasses import dataclass, field from pathlib import Path @@ -60,6 +62,10 @@ class TranscriptionDetails: MIN_CHUNK_LENGTH_MS = 5_000 MAX_CHUNKS = 20 VIDEO_SOURCE_EXTENSIONS = {".mp4", ".mov", ".mkv", ".webm", ".avi", ".m4v"} +NORMALIZED_AUDIO_SAMPLE_RATE_HZ = 16_000 +NORMALIZED_AUDIO_CHANNELS = 1 +NORMALIZED_AUDIO_CODEC = "pcm_s16le" +NORMALIZED_AUDIO_SUFFIX = ".wav" @dataclass(frozen=True) @@ -144,11 +150,238 @@ def transcription_capabilities(backend: Optional[str] = None) -> dict[str, Any]: } +def probe_media(source_path: str | Path) -> dict[str, Any]: + result = subprocess.run( + [ + "ffprobe", + "-v", "error", + "-print_format", "json", + "-show_format", + "-show_streams", + str(source_path), + ], + check=True, + capture_output=True, + text=True, + ) + return json.loads(result.stdout or "{}") + + +def _first_float(*values: Any) -> Optional[float]: + for value in values: + if value in (None, ""): + continue + try: + return float(value) + except (TypeError, ValueError): + continue + return None + + +def _first_int(*values: Any) -> Optional[int]: + for value in values: + if value in (None, ""): + continue + try: + return int(value) + except (TypeError, ValueError): + continue + return None + + +def _media_summary( + media_probe: Optional[dict[str, Any]], + *, + suffix: str, +) -> dict[str, Any]: + if not media_probe: + return { + "probe_available": False, + "source_suffix": suffix, + "has_audio": None, + "has_video": suffix in VIDEO_SOURCE_EXTENSIONS, + "duration_seconds": None, + "audio_codec": None, + "video_codec": None, + "sample_rate_hz": None, + "channels": None, + "container": None, + } + + streams = media_probe.get("streams") or [] + audio_stream = next( + (stream for stream in streams if stream.get("codec_type") == "audio"), + {}, + ) + video_stream = next( + (stream for stream in streams if stream.get("codec_type") == "video"), + {}, + ) + media_format = media_probe.get("format") or {} + + return { + "probe_available": True, + "source_suffix": suffix, + "has_audio": bool(audio_stream), + "has_video": bool(video_stream), + "duration_seconds": _first_float( + media_format.get("duration"), + audio_stream.get("duration"), + video_stream.get("duration"), + ), + "audio_codec": audio_stream.get("codec_name"), + "video_codec": video_stream.get("codec_name"), + "sample_rate_hz": _first_int(audio_stream.get("sample_rate")), + "channels": _first_int(audio_stream.get("channels")), + "container": media_format.get("format_name"), + } + + +def _normalization_reasons( + suffix: str, + media: dict[str, Any], + *, + large: bool, +) -> list[str]: + if not large and suffix not in VIDEO_SOURCE_EXTENSIONS and not media.get("has_video"): + return [] + + reasons: list[str] = [] + if suffix in VIDEO_SOURCE_EXTENSIONS or media.get("has_video"): + reasons.append("extract_audio_from_video") + if media.get("sample_rate_hz") not in (None, NORMALIZED_AUDIO_SAMPLE_RATE_HZ): + reasons.append("resample_audio") + if media.get("channels") not in (None, NORMALIZED_AUDIO_CHANNELS): + reasons.append("downmix_audio") + if media.get("audio_codec") not in (None, NORMALIZED_AUDIO_CODEC): + reasons.append("transcode_audio_codec") + return reasons + + +def _normalization_plan( + source_path: Path, + *, + required: bool, + reasons: list[str], + output_path: Optional[str | Path] = None, +) -> dict[str, Any]: + normalized_output = ( + str(output_path) if output_path else f"/normalized{NORMALIZED_AUDIO_SUFFIX}" + ) + command = [ + "ffmpeg", + "-hide_banner", + "-nostdin", + "-y", + "-i", str(source_path), + "-map", "0:a:0", + "-vn", + "-ac", str(NORMALIZED_AUDIO_CHANNELS), + "-ar", str(NORMALIZED_AUDIO_SAMPLE_RATE_HZ), + "-c:a", NORMALIZED_AUDIO_CODEC, + normalized_output, + ] + return { + "required": required, + "tool": "ffmpeg", + "execution": "planned_only", + "target": { + "suffix": NORMALIZED_AUDIO_SUFFIX, + "codec": NORMALIZED_AUDIO_CODEC, + "sample_rate_hz": NORMALIZED_AUDIO_SAMPLE_RATE_HZ, + "channels": NORMALIZED_AUDIO_CHANNELS, + }, + "reasons": reasons, + "output_path": normalized_output if required else None, + "commands": [{"argv": command, "purpose": "extract_normalized_audio"}] + if required else [], + } + + +def _output_contract( + backend: str, + caps: dict[str, Any], +) -> dict[str, Any]: + diarization_enabled = ( + backend == "whisperx" and WHISPERX_DIARIZATION and bool(WHISPERX_HF_TOKEN) + ) + return { + "text": True, + "segments": caps["supports_segments"], + "timestamps": "segments" if caps["supports_segments"] else "unavailable", + "diarization": { + "capable": caps["supports_diarization"], + "enabled": diarization_enabled, + "requires_hf_token": backend == "whisperx", + }, + } + + +def _privacy_receipt( + backend: str, + caps: dict[str, Any], + *, + media_inspected: bool, + normalization_required: bool, + chunked: bool, +) -> dict[str, Any]: + local_steps: list[str] = [] + if media_inspected: + local_steps.append("ffprobe") + if normalization_required: + local_steps.append("ffmpeg_normalization") + if chunked: + local_steps.append("chunking") + + temp_artifacts: list[str] = [] + if normalization_required: + temp_artifacts.append("normalized_audio") + if chunked: + temp_artifacts.append("chunks") + + return { + "mode": caps["privacy_mode"], + "audio_leaves_device": caps["privacy_mode"] == "cloud", + "cloud_provider": "openai" if backend == "openai" else None, + "local_processing_steps": local_steps, + "temp_artifacts": temp_artifacts, + "requires_receipt_before_default_change": True, + } + + +def _cost_receipt( + backend: str, + caps: dict[str, Any], + media: dict[str, Any], + *, + normalization_required: bool, +) -> dict[str, Any]: + duration_seconds = media.get("duration_seconds") + estimated_minutes = ( + round(duration_seconds / 60.0, 3) + if isinstance(duration_seconds, (int, float)) else None + ) + provider_billable = caps["privacy_mode"] == "cloud" + return { + "provider_api_billable": provider_billable, + "billable_provider": "openai" if backend == "openai" else None, + "billable_unit": "audio_minutes" if provider_billable else None, + "estimated_billable_minutes": estimated_minutes if provider_billable else None, + "local_compute_required": caps["privacy_mode"] == "local" + or normalization_required, + "ffmpeg_compute_required": normalization_required, + "pricing_review_required": provider_billable, + } + + def build_transcription_plan( source_path: str | Path, *, backend: Optional[str] = None, chunker: Optional[str] = None, + inspect_media: bool = False, + media_probe: Optional[dict[str, Any]] = None, + normalized_audio_path: Optional[str | Path] = None, ) -> dict[str, Any]: path = Path(source_path) selected_backend = resolve_transcription_backend(backend) @@ -163,6 +396,21 @@ def build_transcription_plan( if suffix in VIDEO_SOURCE_EXTENSIONS: reasons.append("video_source_requires_extraction") + media_inspected = media_probe is not None + if inspect_media and media_probe is None: + media_probe = probe_media(path) + media_inspected = True + media = _media_summary(media_probe, suffix=suffix) + normalization_reasons = _normalization_reasons(suffix, media, large=large) + normalization_required = bool(normalization_reasons) + reasons.extend(reason for reason in normalization_reasons if reason not in reasons) + normalization = _normalization_plan( + path, + required=normalization_required, + reasons=normalization_reasons, + output_path=normalized_audio_path, + ) + if not large: strategy = "single_pass" elif selected_backend == "whisperx" and selected_chunker != "vad": @@ -172,6 +420,7 @@ def build_transcription_plan( else: strategy = "chunked_size" + chunked = strategy in {"chunked_size", "chunked_vad"} return { "backend": selected_backend, "chunker": selected_chunker, @@ -179,8 +428,28 @@ def build_transcription_plan( "file_size_bytes": file_size, "chunk_threshold_bytes": CHUNK_THRESHOLD_BYTES, "source_suffix": suffix, - "requires_ffmpeg": bool(suffix in VIDEO_SOURCE_EXTENSIONS or caps["needs_ffmpeg"]), + "requires_ffmpeg": bool( + suffix in VIDEO_SOURCE_EXTENSIONS + or caps["needs_ffmpeg"] + or normalization_required + ), "capabilities": caps, + "media": media, + "normalization": normalization, + "output_contract": _output_contract(selected_backend, caps), + "privacy": _privacy_receipt( + selected_backend, + caps, + media_inspected=media_inspected, + normalization_required=normalization_required, + chunked=chunked, + ), + "cost": _cost_receipt( + selected_backend, + caps, + media, + normalization_required=normalization_required, + ), "reasons": reasons, }