diff --git a/frontend/src/components/UrlInput.tsx b/frontend/src/components/UrlInput.tsx index e3060d7..6ac15f6 100644 --- a/frontend/src/components/UrlInput.tsx +++ b/frontend/src/components/UrlInput.tsx @@ -72,14 +72,14 @@ export function UrlInput({ onSubmit, disabled }: UrlInputProps) { return; } - const youtubePattern = /^https?:\/\/(www\.)?(youtube\.com\/watch\?v=|youtu\.be\/)/; - if (!youtubePattern.test(url)) { + const urlPattern = /^https?:\/\/.+/; + if (!urlPattern.test(url)) { setError(t('error.invalid_url')); return; } const request: JobCreateRequest = { - youtube_url: url, + source_url: url, }; if (startSeconds !== undefined) request.start_time = startSeconds; if (endSeconds !== undefined) request.end_time = endSeconds; diff --git a/frontend/src/i18n/en.json b/frontend/src/i18n/en.json index 883a810..b882099 100644 --- a/frontend/src/i18n/en.json +++ b/frontend/src/i18n/en.json @@ -1,7 +1,7 @@ { "app": { "title": "BilingualSub", - "subtitle": "YouTube Bilingual Subtitle Generator", + "subtitle": "Bilingual Subtitle Generator for YouTube, X, TikTok & More", "processing_title": "Processing Video", "processing_desc": "Please wait while we transcribe and translate.", "error_title": "Something went wrong", @@ -17,8 +17,7 @@ "download_audio": "Download Audio" }, "form": { - "url_placeholder": "Paste YouTube video URL", - "paste_placeholder": "Paste YouTube URL...", + "paste_placeholder": "Paste video URL (YouTube / X / Twitter / TikTok…)", "source_lang": "Source Language", "target_lang": "Target Language", "label_translate": "Translate", @@ -33,7 +32,7 @@ "enable_range": "Set Range", "disable_range": "Disable Range", "clear_range": "Clear", - "inputModeUrl": "YouTube URL", + "inputModeUrl": "Video URL", "inputModeFile": "Upload File", "filePlaceholder": "Choose a video or audio file", "fileSelected": "Selected: {{filename}}" @@ -66,7 +65,7 @@ "unsupported": "Your browser does not support video playback" }, "error": { - "invalid_url": "Please enter a valid YouTube URL", + "invalid_url": "Please enter a valid video URL", "job_not_found": "Job not found", "download_failed": "Video download failed", "transcription_failed": "Transcription failed", diff --git a/frontend/src/i18n/zh-TW.json b/frontend/src/i18n/zh-TW.json index 217fef7..e3a1123 100644 --- a/frontend/src/i18n/zh-TW.json +++ b/frontend/src/i18n/zh-TW.json @@ -1,7 +1,7 @@ { "app": { "title": "BilingualSub", - "subtitle": "YouTube 雙語字幕自動生成工具", + "subtitle": "多平台雙語字幕自動生成工具", "processing_title": "影片處理中", "processing_desc": "請稍候,我們正在進行語音辨識與翻譯。", "error_title": "發生錯誤", @@ -17,8 +17,7 @@ "download_audio": "下載音訊" }, "form": { - "url_placeholder": "貼上 YouTube 影片網址", - "paste_placeholder": "貼上 YouTube 網址...", + "paste_placeholder": "貼上影片網址(YouTube / X / Twitter / TikTok…)", "source_lang": "原始語言", "target_lang": "翻譯語言", "label_translate": "翻譯語言", @@ -33,7 +32,7 @@ "enable_range": "設定範圍", "disable_range": "取消範圍", "clear_range": "清除", - "inputModeUrl": "YouTube 網址", + "inputModeUrl": "影片網址", "inputModeFile": "上傳檔案", "filePlaceholder": "選擇影片或音訊檔案", "fileSelected": "已選擇:{{filename}}" @@ -66,7 +65,7 @@ "unsupported": "您的瀏覽器不支援影片播放" }, "error": { - "invalid_url": "請輸入有效的 YouTube 網址", + "invalid_url": "請輸入有效的影片網址", "job_not_found": "找不到此任務", "download_failed": "影片下載失敗", "transcription_failed": "語音辨識失敗", diff --git a/frontend/src/types.ts b/frontend/src/types.ts index 6cd9d68..c8f1c1b 100644 --- a/frontend/src/types.ts +++ b/frontend/src/types.ts @@ -1,7 +1,7 @@ import type { FileType, JobStatus } from './constants'; export interface JobCreateRequest { - youtube_url: string; + source_url: string; source_lang?: string; target_lang?: string; start_time?: number; // seconds diff --git a/src/bilingualsub/api/jobs.py b/src/bilingualsub/api/jobs.py index 2930d49..041e290 100644 --- a/src/bilingualsub/api/jobs.py +++ b/src/bilingualsub/api/jobs.py @@ -29,7 +29,7 @@ class Job: """Represents a subtitle generation job.""" id: str - youtube_url: str = "" + source_url: str = "" source_lang: str = "" target_lang: str = "" local_video_path: Path | None = None @@ -59,7 +59,7 @@ def __init__(self) -> None: def create_job( self, - youtube_url: str = "", + source_url: str = "", source_lang: str = "", target_lang: str = "", start_time: float | None = None, @@ -70,7 +70,7 @@ def create_job( job_id = uuid.uuid4().hex[:12] job = Job( id=job_id, - youtube_url=youtube_url, + source_url=source_url, source_lang=source_lang, target_lang=target_lang, local_video_path=local_video_path, @@ -78,7 +78,7 @@ def create_job( end_time=end_time, ) self._jobs[job_id] = job - logger.info("job_created", job_id=job_id, youtube_url=youtube_url) + logger.info("job_created", job_id=job_id, source_url=source_url) return job def get_job(self, job_id: str) -> Job | None: diff --git a/src/bilingualsub/api/pipeline.py b/src/bilingualsub/api/pipeline.py index 1b6b8b8..138afea 100644 --- a/src/bilingualsub/api/pipeline.py +++ b/src/bilingualsub/api/pipeline.py @@ -23,7 +23,7 @@ TranscriptionError, TranslationError, VideoMetadata, - download_youtube_video, + download_video, merge_subtitles, transcribe_audio, translate_subtitle, @@ -236,8 +236,8 @@ def _put_event() -> None: loop.call_soon_threadsafe(_put_event) metadata = await asyncio.to_thread( - download_youtube_video, - job.youtube_url, + download_video, + job.source_url, video_path, on_progress=_on_download_progress, start_time=job.start_time, @@ -385,112 +385,6 @@ async def run_subtitle(job: Job) -> None: ) -async def run_pipeline(job: Job) -> None: - """Execute the full subtitle generation pipeline for a job. - - Steps: download -> transcribe -> translate -> merge/serialize -> burn. - All blocking core calls are wrapped in asyncio.to_thread(). - Progress events are sent to job.event_queue at each step. - """ - log = logger.bind(job_id=job.id) - work_dir = Path(tempfile.mkdtemp(prefix=f"bilingualsub_{job.id}_")) - - try: - # --- Step 1: Download or use local file --- - video_path, metadata = await _acquire_video(job, work_dir, log) - # Trimming is now handled during download via start_time/end_time - - # --- Step 1.5: Extract audio --- - audio_path = await _extract_audio_step(job, video_path, work_dir, log) - - # --- Step 2: Transcribe --- - _send_progress( - job, JobStatus.TRANSCRIBING, 20.0, "transcribe", "Transcribing audio" - ) - t0 = time.monotonic() - original_sub = await asyncio.to_thread( - transcribe_audio, audio_path, language=job.source_lang - ) - log.info( - "step_done", - step="transcribe", - duration_ms=int((time.monotonic() - t0) * 1000), - ) - - # --- Step 3: Translate --- - _send_progress( - job, JobStatus.TRANSLATING, 50.0, "translate", "Translating subtitles" - ) - t0 = time.monotonic() - - _on_translate_progress = _make_translate_progress_cb(job) - _on_rate_limit = _make_rate_limit_cb(job) - - translated_sub = await asyncio.to_thread( - translate_subtitle, - original_sub, - source_lang=job.source_lang, - target_lang=job.target_lang, - video_title=metadata.title, - video_description=metadata.description, - on_progress=_on_translate_progress, - on_rate_limit=_on_rate_limit, - ) - log.info( - "step_done", - step="translate", - duration_ms=int((time.monotonic() - t0) * 1000), - ) - - # --- Step 4: Merge & Serialize --- - _send_progress( - job, JobStatus.MERGING, 70.0, "merge", "Merging bilingual subtitles" - ) - t0 = time.monotonic() - - merged_entries = await asyncio.to_thread( - merge_subtitles, original_sub.entries, translated_sub.entries - ) - merged_sub = Subtitle(entries=merged_entries) - - srt_content = serialize_srt(merged_sub) - srt_path = work_dir / "subtitle.srt" - srt_path.write_text(srt_content, encoding="utf-8") - job.output_files[FileType.SRT] = srt_path - - ass_content = serialize_bilingual_ass( - original_sub, - translated_sub, - video_width=metadata.width, - video_height=metadata.height, - ) - ass_path = work_dir / "subtitle.ass" - ass_path.write_text(ass_content, encoding="utf-8") - job.output_files[FileType.ASS] = ass_path - - log.info( - "step_done", step="merge", duration_ms=int((time.monotonic() - t0) * 1000) - ) - - # --- Step 5: Save source video & complete --- - job.output_files[FileType.SOURCE_VIDEO] = video_path - _send_complete(job) - log.info("pipeline_complete", job_id=job.id) - - except PipelineError: - raise - except Exception as exc: - pipeline_err = _to_pipeline_error(exc) - _send_error( - job, pipeline_err.code, pipeline_err.message, pipeline_err.detail or "" - ) - log.error( - "pipeline_failed", - error_code=pipeline_err.code, - error=str(exc), - ) - - async def run_burn(job: Job, srt_content: str) -> None: """Burn user-edited SRT into the source video.""" log = logger.bind(job_id=job.id) diff --git a/src/bilingualsub/api/routes.py b/src/bilingualsub/api/routes.py index 6f0eb57..348b5bb 100644 --- a/src/bilingualsub/api/routes.py +++ b/src/bilingualsub/api/routes.py @@ -4,9 +4,10 @@ import asyncio import json +import re import tempfile from pathlib import Path -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING, Any, NamedTuple import structlog from fastapi import APIRouter, Form, Request, UploadFile @@ -53,6 +54,52 @@ ".webm", } +_DEFAULT_FILENAME = "video" +_SUFFIX_ORIGINAL = "(original)" +_LANG_SEPARATOR = "_to_" + + +class _FileMeta(NamedTuple): + ext: str + media_type: str + + +_FILE_META: dict[FileType, _FileMeta] = { + FileType.SRT: _FileMeta("srt", "text/plain"), + FileType.ASS: _FileMeta("ass", "text/plain"), + FileType.VIDEO: _FileMeta("mp4", "video/mp4"), + FileType.AUDIO: _FileMeta("mp3", "audio/mpeg"), + FileType.SOURCE_VIDEO: _FileMeta("mp4", "video/mp4"), +} + +# Windows-reserved + POSIX control characters (NTFS/FAT32/ext4 safe) +_FILENAME_BAD_CHARS = re.compile(r'[<>:"/\\|?*\x00-\x1f]') + +_MAX_UPLOAD_BYTES = 500 * 1024 * 1024 # 500 MB + + +def _sanitize_filename(name: str) -> str: + """Remove filesystem-unsafe characters and truncate to 120 chars.""" + cleaned = _FILENAME_BAD_CHARS.sub("", name).strip(" .") + truncated = (cleaned or _DEFAULT_FILENAME)[:120] + return truncated.rstrip(" .") or _DEFAULT_FILENAME + + +def _build_download_filename(job: Job, file_type: FileType) -> str: + """Build a human-readable download filename for the given job and file type.""" + base = _sanitize_filename(job.video_title or _DEFAULT_FILENAME) + original_only = file_type in (FileType.SOURCE_VIDEO, FileType.AUDIO) + same_lang = ( + not job.source_lang or not job.target_lang or job.source_lang == job.target_lang + ) + suffix = ( + _SUFFIX_ORIGINAL + if original_only or same_lang + else f"({job.source_lang}{_LANG_SEPARATOR}{job.target_lang})" + ) + ext = _FILE_META[file_type].ext + return f"{base} {suffix}.{ext}" + def _get_job_manager(request: Request) -> JobManager: """Get the JobManager from app state.""" @@ -83,7 +130,7 @@ async def create_job(body: JobCreateRequest, request: Request) -> JobCreateRespo """Create a new subtitle generation job.""" manager = _get_job_manager(request) job = manager.create_job( - youtube_url=str(body.youtube_url), + source_url=str(body.source_url), source_lang=body.source_lang, target_lang=body.target_lang, start_time=body.start_time, @@ -104,7 +151,6 @@ async def create_job_from_upload( request: Request, ) -> JobCreateResponse: """Create a subtitle generation job from an uploaded file.""" - # Validate file extension filename = file.filename or "" suffix = Path(filename).suffix.lower() if suffix not in _ALLOWED_UPLOAD_EXTENSIONS: @@ -113,11 +159,9 @@ async def create_job_from_upload( detail=f"Allowed: {', '.join(sorted(_ALLOWED_UPLOAD_EXTENSIONS))}", ) - # Sanitize filename to prevent path traversal safe_name = Path(filename).name or f"upload{suffix}" - # Save uploaded file to temp directory with size limit - max_size = 500 * 1024 * 1024 # 500 MB + max_size = _MAX_UPLOAD_BYTES tmp_dir = Path(tempfile.mkdtemp(prefix="bilingualsub_upload_")) saved_path = tmp_dir / safe_name bytes_written = 0 @@ -212,25 +256,10 @@ async def download_file(job_id: str, file_type: str, request: Request) -> FileRe detail="Job may not have completed this step", ) - # Set appropriate media type and filename - media_types = { - FileType.SRT: "text/plain", - FileType.ASS: "text/plain", - FileType.VIDEO: "video/mp4", - FileType.AUDIO: "audio/mpeg", - FileType.SOURCE_VIDEO: "video/mp4", - } - extensions = { - FileType.SRT: "srt", - FileType.ASS: "ass", - FileType.VIDEO: "mp4", - FileType.AUDIO: "mp3", - FileType.SOURCE_VIDEO: "mp4", - } return FileResponse( path=path, - media_type=media_types[ft], - filename=f"bilingualsub.{extensions[ft]}", + media_type=_FILE_META[ft].media_type, + filename=_build_download_filename(job, ft), ) diff --git a/src/bilingualsub/api/schemas.py b/src/bilingualsub/api/schemas.py index 41604dc..40d7059 100644 --- a/src/bilingualsub/api/schemas.py +++ b/src/bilingualsub/api/schemas.py @@ -2,7 +2,7 @@ from typing import Self -from pydantic import BaseModel, HttpUrl, model_validator +from pydantic import BaseModel, ConfigDict, HttpUrl, model_validator from bilingualsub.api.constants import FileType, JobStatus @@ -10,7 +10,9 @@ class JobCreateRequest(BaseModel): """Request body for creating a new subtitle generation job.""" - youtube_url: HttpUrl + model_config = ConfigDict(extra="forbid") + + source_url: HttpUrl source_lang: str = "en" target_lang: str = "zh-TW" start_time: float | None = None diff --git a/src/bilingualsub/core/__init__.py b/src/bilingualsub/core/__init__.py index 06aa1eb..8b1843f 100644 --- a/src/bilingualsub/core/__init__.py +++ b/src/bilingualsub/core/__init__.py @@ -3,7 +3,7 @@ from bilingualsub.core.downloader import ( DownloadError, VideoMetadata, - download_youtube_video, + download_video, ) from bilingualsub.core.merger import merge_subtitles from bilingualsub.core.subtitle import Subtitle, SubtitleEntry @@ -23,7 +23,7 @@ "TranscriptionError", "TranslationError", "VideoMetadata", - "download_youtube_video", + "download_video", "merge_subtitles", "retranslate_entries", "transcribe_audio", diff --git a/src/bilingualsub/core/downloader.py b/src/bilingualsub/core/downloader.py index 08728cf..28ea255 100644 --- a/src/bilingualsub/core/downloader.py +++ b/src/bilingualsub/core/downloader.py @@ -1,4 +1,4 @@ -"""YouTube video downloader with metadata extraction.""" +"""Video downloader (via yt-dlp) with metadata extraction.""" import json import os @@ -6,10 +6,12 @@ import subprocess # nosec B404 from collections.abc import Callable from dataclasses import dataclass +from functools import lru_cache from pathlib import Path from typing import Any import yt_dlp +from yt_dlp.extractor import gen_extractor_classes from yt_dlp.utils import download_range_func @@ -43,7 +45,7 @@ def __post_init__(self) -> None: self.description = _sanitize_description(self.description) -def download_youtube_video( +def download_video( url: str, output_path: Path, on_progress: Callable[[float, float], None] | None = None, @@ -51,10 +53,10 @@ def download_youtube_video( end_time: float | None = None, ) -> VideoMetadata: """ - Download YouTube video and extract metadata. + Download video and extract metadata. Args: - url: YouTube video URL + url: Video URL supported by yt-dlp output_path: Path where video will be saved (including extension) on_progress: Optional callback for download progress (downloaded_bytes, total_bytes) @@ -71,8 +73,8 @@ def download_youtube_video( if not url.strip(): raise ValueError("URL cannot be empty") - if not _is_youtube_url(url): - raise ValueError(f"Not a valid YouTube URL: {url}") + if not _is_supported_url(url): + raise ValueError(f"URL not supported by downloader (yt-dlp): {url}") if not output_path.parent.exists(): raise ValueError(f"Output directory does not exist: {output_path.parent}") @@ -80,7 +82,6 @@ def download_youtube_video( if output_path.exists(): raise ValueError(f"Output file already exists: {output_path}") - # Download video and get info_dict try: info_dict = _download_video( url, @@ -92,7 +93,6 @@ def download_youtube_video( except Exception as e: raise DownloadError(f"Failed to download video: {e}") from e - # Extract metadata - try FFprobe first, fallback to info_dict try: metadata = _extract_metadata_with_ffprobe(output_path) except (FileNotFoundError, OSError, subprocess.CalledProcessError): @@ -101,7 +101,7 @@ def download_youtube_video( metadata = _extract_metadata_from_info_dict( info_dict, output_path, start_time, end_time ) - except Exception as e: + except (DownloadError, ValueError, KeyError, TypeError) as e: # Clean up downloaded file on metadata extraction failure if output_path.exists(): output_path.unlink() @@ -113,8 +113,11 @@ def download_youtube_video( output_path.unlink() raise DownloadError(f"Failed to extract metadata: {e}") from e - # yt-dlp's description is not reliably available in ffprobe output. - # Always prefer the description extracted from info_dict. + # yt-dlp's title and description are not reliably present in MP4 container + # tags (ffprobe reads those). Prefer the upstream metadata from info_dict. + info_title = info_dict.get("title") + if isinstance(info_title, str) and info_title.strip(): + metadata.title = info_title.strip() metadata.description = _sanitize_description(info_dict.get("description", "")) return metadata @@ -127,15 +130,15 @@ def _sanitize_description(raw: Any) -> str: return raw.strip() -def _is_youtube_url(url: str) -> bool: - """Check if URL is a valid YouTube URL.""" - youtube_domains = [ - "youtube.com", - "www.youtube.com", - "m.youtube.com", - "youtu.be", - ] - return any(domain in url for domain in youtube_domains) +_SUPPORTED_EXTRACTOR_CLASSES: list[type] = [ + cls for cls in gen_extractor_classes() if cls.IE_NAME != "generic" +] + + +@lru_cache(maxsize=256) +def _is_supported_url(url: str) -> bool: + """Check if yt-dlp has a dedicated extractor for this URL.""" + return any(cls.suitable(url) for cls in _SUPPORTED_EXTRACTOR_CLASSES) # type: ignore[attr-defined] def _download_video( diff --git a/tests/conftest.py b/tests/conftest.py index 7e2657a..b266004 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -36,6 +36,6 @@ def sample_srt_content() -> str: @pytest.fixture -def sample_youtube_url() -> str: - """Return a sample YouTube URL for testing.""" +def sample_source_url() -> str: + """Return a sample video URL for testing.""" return "https://www.youtube.com/watch?v=dQw4w9WgXcQ" diff --git a/tests/e2e/test_filename_e2e.py b/tests/e2e/test_filename_e2e.py new file mode 100644 index 0000000..d53998a --- /dev/null +++ b/tests/e2e/test_filename_e2e.py @@ -0,0 +1,225 @@ +"""E2E test: real yt-dlp download validates info_dict title flows into Content-Disposition. + +This test hits the real YouTube network but mocks Groq (no API key needed). +Goal: verify the bug fix where video_title was previously sourced from ffprobe +MP4 container tags (which are empty) rather than yt-dlp info_dict.title. + +Gate: set ENABLE_LIVE_DOWNLOAD=1 to run. Skipped by default in CI. +""" + +from __future__ import annotations + +import os +import time +import urllib.parse +from typing import TYPE_CHECKING, Any + +import pytest +from fastapi.testclient import TestClient + +from bilingualsub.api.app import create_app + +if TYPE_CHECKING: + from collections.abc import Generator + +# --------------------------------------------------------------------------- +# Environment gate +# --------------------------------------------------------------------------- + +ENABLE_LIVE_DOWNLOAD = os.getenv("ENABLE_LIVE_DOWNLOAD") == "1" + +pytestmark = pytest.mark.skipif( + not ENABLE_LIVE_DOWNLOAD, + reason="ENABLE_LIVE_DOWNLOAD not set; skipping live yt-dlp download E2E", +) + +# --------------------------------------------------------------------------- +# Constants +# --------------------------------------------------------------------------- + +# YouTube's first public video — 19 seconds, stable, public. +_TEST_URL = "https://www.youtube.com/watch?v=jNQXAC9IVRw" +_EXPECTED_TITLE = "Me at the zoo" + +# How long (seconds) to wait for the download phase to complete. +_DOWNLOAD_TIMEOUT_SECONDS = 90 +_POLL_INTERVAL_SECONDS = 1.0 + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _filename_from_disposition(header: str) -> str: + """Extract filename from Content-Disposition header. + + Handles RFC 5987 ``filename*=utf-8''`` and + the plain ``filename="..."`` fallback. + """ + marker_rfc5987 = "filename*=utf-8''" + if marker_rfc5987 in header: + return urllib.parse.unquote(header.split(marker_rfc5987, 1)[1]) + marker_plain = 'filename="' + if marker_plain in header: + return header.split(marker_plain, 1)[1].rstrip('"') + raise AssertionError(f"Unrecognized Content-Disposition header: {header}") + + +def _poll_until_download_complete( + client: TestClient, + job_id: str, + timeout: float = _DOWNLOAD_TIMEOUT_SECONDS, +) -> dict[str, Any]: + """Poll GET /api/jobs/{job_id} until status is download_complete or failed. + + Returns the final status response dict. + Raises pytest.fail if timeout is exceeded or network error is detected. + """ + deadline = time.monotonic() + timeout + last_status: dict[str, Any] = {} + + while time.monotonic() < deadline: + resp = client.get(f"/api/jobs/{job_id}") + if resp.status_code != 200: + pytest.fail( + f"GET /api/jobs/{job_id} returned {resp.status_code}: {resp.text}" + ) + + last_status = resp.json() + status = last_status.get("status") + + if status in ("download_complete", "failed"): + return last_status + + time.sleep(_POLL_INTERVAL_SECONDS) + + pytest.fail( + f"Timed out after {timeout}s waiting for download_complete. " + f"Last status: {last_status.get('status')!r}. " + "Check network connectivity or increase _DOWNLOAD_TIMEOUT_SECONDS." + ) + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + + +@pytest.fixture(scope="module") +def live_client() -> Generator[TestClient, None, None]: + """Create a TestClient for live download tests. + + Must enter the context manager so Starlette starts the lifespan and + populates ``app.state.job_manager``; without this every request fails + with ``AttributeError`` on ``app.state``. + """ + + app = create_app() + with TestClient(app, raise_server_exceptions=False) as client: + yield client + + +# --------------------------------------------------------------------------- +# Tests +# --------------------------------------------------------------------------- + + +@pytest.mark.e2e +def test_source_video_content_disposition_uses_info_dict_title( + live_client: TestClient, +) -> None: + """Real yt-dlp download: source_video Content-Disposition uses info_dict title. + + Arrange: submit "Me at the zoo" URL (19-second public YouTube video). + Act: + 1. POST /api/jobs with source_url. + 2. Poll until status == download_complete (stops before Whisper/Groq). + 3. GET /api/jobs/{id}/download/source_video. + Assert: + - Download phase completes without error. + - Content-Disposition filename == "Me at the zoo (original).mp4". + + This test covers the regression where video_title was sourced from the + ffprobe MP4 container tag (empty string) instead of yt-dlp info_dict.title, + which caused filenames to fall back to "video (original).mp4". + """ + # 1. Create job. + create_resp = live_client.post("/api/jobs", json={"source_url": _TEST_URL}) + assert create_resp.status_code == 200, create_resp.text + job_id = create_resp.json()["job_id"] + + # 2. Poll until download_complete. + status_data = _poll_until_download_complete(live_client, job_id) + if status_data["status"] == "failed": + error = status_data.get("error", {}) + pytest.fail( + f"Live download failed; cannot verify info_dict.title regression. " + f"{error.get('code')} — {error.get('detail')}. " + "If the CI runner cannot reach YouTube, disable this test via the " + "ENABLE_LIVE_DOWNLOAD gate instead of silently skipping." + ) + + assert status_data["status"] == "download_complete" + + # 3. Download source_video and check Content-Disposition. + dl_resp = live_client.get(f"/api/jobs/{job_id}/download/source_video") + assert dl_resp.status_code == 200, dl_resp.text + + disposition = dl_resp.headers.get("content-disposition", "") + assert disposition, "Expected Content-Disposition header to be present" + + actual_filename = _filename_from_disposition(disposition) + assert actual_filename == f"{_EXPECTED_TITLE} (original).mp4", ( + f"Expected '{_EXPECTED_TITLE} (original).mp4', got {actual_filename!r}. " + "This likely means video_title is sourced from an empty ffprobe container " + "tag instead of yt-dlp info_dict.title." + ) + + +@pytest.mark.e2e +def test_audio_content_disposition_uses_info_dict_title( + live_client: TestClient, +) -> None: + """Real yt-dlp download: audio Content-Disposition uses info_dict title. + + Reuses the same video; creates a separate job to keep tests independent. + + Arrange: submit "Me at the zoo" URL. + Act: + 1. POST /api/jobs. + 2. Poll until download_complete. + 3. GET /api/jobs/{id}/download/audio. + Assert: filename == "Me at the zoo (original).mp3". + """ + # 1. Create job. + create_resp = live_client.post("/api/jobs", json={"source_url": _TEST_URL}) + assert create_resp.status_code == 200, create_resp.text + job_id = create_resp.json()["job_id"] + + # 2. Poll until download_complete. + status_data = _poll_until_download_complete(live_client, job_id) + if status_data["status"] == "failed": + error = status_data.get("error", {}) + pytest.fail( + f"Live download failed; cannot verify info_dict.title regression. " + f"{error.get('code')} — {error.get('detail')}. " + "If the CI runner cannot reach YouTube, disable this test via the " + "ENABLE_LIVE_DOWNLOAD gate instead of silently skipping." + ) + + assert status_data["status"] == "download_complete" + + # 3. Download audio and check Content-Disposition. + dl_resp = live_client.get(f"/api/jobs/{job_id}/download/audio") + assert dl_resp.status_code == 200, dl_resp.text + + disposition = dl_resp.headers.get("content-disposition", "") + assert disposition, "Expected Content-Disposition header to be present" + + actual_filename = _filename_from_disposition(disposition) + assert actual_filename == f"{_EXPECTED_TITLE} (original).mp3", ( + f"Expected '{_EXPECTED_TITLE} (original).mp3', got {actual_filename!r}. " + "This likely means video_title is sourced from an empty ffprobe container " + "tag instead of yt-dlp info_dict.title." + ) diff --git a/tests/e2e/test_workflow_skeleton.py b/tests/e2e/test_workflow_skeleton.py index f439b1e..83d127c 100644 --- a/tests/e2e/test_workflow_skeleton.py +++ b/tests/e2e/test_workflow_skeleton.py @@ -21,7 +21,7 @@ import pytest -from bilingualsub.core.downloader import DownloadError, download_youtube_video +from bilingualsub.core.downloader import DownloadError, download_video from bilingualsub.core.merger import merge_subtitles from bilingualsub.core.subtitle import Subtitle, SubtitleEntry from bilingualsub.core.transcriber import transcribe_audio @@ -82,7 +82,7 @@ def e2e_output_dir(self, tmp_path: Path) -> Path: return output_dir @pytest.mark.subtitle_only - def test_download_youtube_video(self, e2e_output_dir: Path) -> None: + def test_download_video(self, e2e_output_dir: Path) -> None: """Test downloading a YouTube video. Given a valid YouTube URL, @@ -93,7 +93,7 @@ def test_download_youtube_video(self, e2e_output_dir: Path) -> None: """ output_path = e2e_output_dir / "test_video.mp4" - metadata = download_youtube_video(TEST_YOUTUBE_URL, output_path) + metadata = download_video(TEST_YOUTUBE_URL, output_path) assert output_path.exists() assert output_path.stat().st_size > 0 @@ -114,7 +114,7 @@ def test_transcribe_audio(self, e2e_output_dir: Path) -> None: """ # Download video first video_path = e2e_output_dir / "test_video.mp4" - download_youtube_video(TEST_YOUTUBE_URL, video_path) + download_video(TEST_YOUTUBE_URL, video_path) # Transcribe subtitle = transcribe_audio(video_path, language="en") @@ -134,7 +134,7 @@ def test_translate_subtitles(self, e2e_output_dir: Path) -> None: """ # Download and transcribe video_path = e2e_output_dir / "test_video.mp4" - download_youtube_video(TEST_YOUTUBE_URL, video_path) + download_video(TEST_YOUTUBE_URL, video_path) original = transcribe_audio(video_path, language="en") # Translate @@ -158,7 +158,7 @@ def test_merge_bilingual_subtitles(self, e2e_output_dir: Path) -> None: """ # Full pipeline up to merge video_path = e2e_output_dir / "test_video.mp4" - download_youtube_video(TEST_YOUTUBE_URL, video_path) + download_video(TEST_YOUTUBE_URL, video_path) original = transcribe_audio(video_path, language="en") translated = translate_subtitle(original, source_lang="en", target_lang="zh-TW") @@ -182,7 +182,7 @@ def test_full_workflow_youtube_to_bilingual_srt(self, e2e_output_dir: Path) -> N srt_path = e2e_output_dir / "bilingual.srt" # Step 1: Download - download_youtube_video(TEST_YOUTUBE_URL, video_path) + download_video(TEST_YOUTUBE_URL, video_path) assert video_path.exists() # Step 2: Transcribe @@ -220,7 +220,7 @@ def test_full_workflow_youtube_to_bilingual_ass(self, e2e_output_dir: Path) -> N ass_path = e2e_output_dir / "bilingual.ass" # Full pipeline - metadata = download_youtube_video(TEST_YOUTUBE_URL, video_path) + metadata = download_video(TEST_YOUTUBE_URL, video_path) original = transcribe_audio(video_path, language="en") translated = translate_subtitle(original, source_lang="en", target_lang="zh-TW") @@ -260,7 +260,7 @@ def test_full_workflow_with_video_burn_in(self, e2e_output_dir: Path) -> None: output_video_path = e2e_output_dir / "output_with_subs.mp4" # Full pipeline - metadata = download_youtube_video(TEST_YOUTUBE_URL, video_path) + metadata = download_video(TEST_YOUTUBE_URL, video_path) original = transcribe_audio(video_path, language="en") translated = translate_subtitle(original, source_lang="en", target_lang="zh-TW") @@ -295,7 +295,7 @@ def test_invalid_youtube_url_raises_error(self, tmp_path: Path) -> None: output_path = tmp_path / "video.mp4" with pytest.raises((ValueError, DownloadError)): - download_youtube_video("https://invalid-url.com/video", output_path) + download_video("https://invalid-url.com/video", output_path) def test_missing_api_key_raises_error(self, tmp_path: Path, monkeypatch) -> None: """Test that missing API key raises appropriate error.""" diff --git a/tests/integration/test_api_contract.py b/tests/integration/test_api_contract.py new file mode 100644 index 0000000..d025ada --- /dev/null +++ b/tests/integration/test_api_contract.py @@ -0,0 +1,347 @@ +"""Integration tests for the API contract introduced in feat/readable-filename-multiplatform. + +Covers: +- JobCreateRequest schema: source_url field, extra="forbid" enforcement +- URL extractor acceptance (yt-dlp supported platforms) +- Async error path for unsupported URLs (invalid_input pipeline error) +- Download filename Content-Disposition header format +""" + +from __future__ import annotations + +import time +import urllib.parse +from typing import TYPE_CHECKING, Any + +import pytest +from fastapi.testclient import TestClient + +if TYPE_CHECKING: + from collections.abc import Generator + from pathlib import Path + +from bilingualsub.api.app import create_app +from bilingualsub.api.constants import FileType, JobStatus + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _filename_from_disposition(header: str) -> str: + """Extract filename from Content-Disposition header. + + Handles both RFC 5987 ``filename*=utf-8''`` and + the plain ``filename="..."`` fallback. + """ + marker_rfc5987 = "filename*=utf-8''" + if marker_rfc5987 in header: + return urllib.parse.unquote(header.split(marker_rfc5987, 1)[1]) + marker_plain = 'filename="' + if marker_plain in header: + return header.split(marker_plain, 1)[1].rstrip('"') + raise AssertionError(f"Unrecognized Content-Disposition header: {header}") + + +# --------------------------------------------------------------------------- +# Shared fixtures +# --------------------------------------------------------------------------- + + +@pytest.fixture() +def client() -> Generator[TestClient, None, None]: + """Create a TestClient that runs the full app lifespan (sets app.state). + + Must be used as a context manager internally — yielding ensures lifespan + teardown runs after the test. + """ + app = create_app() + with TestClient(app, raise_server_exceptions=False) as c: + yield c + + +@pytest.fixture() +def completed_job_factory(tmp_path: Path) -> Generator[Any, None, None]: + """Factory that injects a completed Job directly into JobManager. + + Yields a callable ``make(title, source_lang, target_lang) -> (client, job_id)``. + The TestClient is kept alive (via context manager) for the fixture's lifetime + so ``app.state.job_manager`` is accessible and the injected job persists. + """ + _app = create_app() + + with TestClient(_app, raise_server_exceptions=False) as _client: + + def make( + title: str, + source_lang: str, + target_lang: str, + ) -> tuple[TestClient, str]: + manager = _app.state.job_manager + job = manager.create_job( + source_url="https://www.youtube.com/watch?v=x", + source_lang=source_lang, + target_lang=target_lang, + ) + job.video_title = title + job.status = JobStatus.COMPLETED + + # Create a dummy file for every FileType so all download routes work. + # Use unique subdirs per call to avoid filename collisions across + # parametrize iterations. + job_tmp = tmp_path / job.id + job_tmp.mkdir(parents=True, exist_ok=True) + for ft in ( + FileType.SRT, + FileType.ASS, + FileType.VIDEO, + FileType.AUDIO, + FileType.SOURCE_VIDEO, + ): + p = job_tmp / f"{ft.value}.bin" + p.write_bytes(b"x") + job.output_files[ft] = p + + return _client, job.id + + yield make + + +# =========================================================================== +# 1. JobCreateRequest schema contract +# =========================================================================== + + +@pytest.mark.integration +class TestJobCreateRequestSchema: + """Verify that the POST /api/jobs endpoint enforces the new schema contract.""" + + def test_valid_source_url_returns_job_id(self, client: TestClient) -> None: + """POST with source_url returns 200 and a non-empty job_id string. + + Arrange: valid JobCreateRequest body using the new source_url field. + Act: POST /api/jobs. + Assert: status 200, response contains string job_id. + """ + response = client.post( + "/api/jobs", + json={"source_url": "https://www.youtube.com/watch?v=abc"}, + ) + + assert response.status_code == 200 + data = response.json() + assert isinstance(data["job_id"], str) + assert data["job_id"] + + def test_old_field_youtube_url_rejected(self, client: TestClient) -> None: + """POST with the old youtube_url field is rejected with 422 (extra='forbid'). + + Arrange: body uses youtube_url (the previous field name). + Act: POST /api/jobs. + Assert: status 422 — Pydantic extra='forbid' disallows unknown fields. + """ + response = client.post( + "/api/jobs", + json={"youtube_url": "https://www.youtube.com/watch?v=abc"}, + ) + + assert response.status_code == 422 + + def test_both_fields_rejected(self, client: TestClient) -> None: + """POST with both source_url and youtube_url fields is rejected with 422. + + Arrange: body contains source_url (valid) AND youtube_url (extra field). + Act: POST /api/jobs. + Assert: status 422 — extra='forbid' triggers even when source_url is present. + """ + response = client.post( + "/api/jobs", + json={ + "source_url": "https://www.youtube.com/watch?v=abc", + "youtube_url": "https://www.youtube.com/watch?v=abc", + }, + ) + + assert response.status_code == 422 + + def test_non_url_source_url_rejected(self, client: TestClient) -> None: + """POST with a non-URL string in source_url is rejected with 422. + + Arrange: source_url is plain text, not a valid HTTP URL. + Act: POST /api/jobs. + Assert: status 422 — Pydantic HttpUrl rejects the value. + """ + response = client.post( + "/api/jobs", + json={"source_url": "not-a-url"}, + ) + + assert response.status_code == 422 + + def test_empty_body_rejected(self, client: TestClient) -> None: + """POST with an empty body is rejected with 422 (missing required field). + + Arrange: empty JSON object. + Act: POST /api/jobs. + Assert: status 422 — source_url is required. + """ + response = client.post("/api/jobs", json={}) + + assert response.status_code == 422 + + +# =========================================================================== +# 2. URL extractor acceptance +# =========================================================================== + + +@pytest.mark.integration +@pytest.mark.parametrize( + "url", + [ + "https://www.youtube.com/watch?v=dQw4w9WgXcQ", + "https://youtu.be/dQw4w9WgXcQ", + "https://x.com/elonmusk/status/1234567890123456789", + "https://twitter.com/jack/status/20", + "https://www.tiktok.com/@user/video/1234567890", + "https://vimeo.com/12345678", + ], +) +def test_valid_http_urls_return_job_id(client: TestClient, url: str) -> None: + """POST with any syntactically valid HttpUrl returns 200 at the HTTP layer. + + HttpUrl is the only synchronous URL check; yt-dlp extractor matching runs + inside the background ``run_download`` task. So this test only proves the + schema accepts these URL shapes — it does NOT prove yt-dlp has an + extractor for any of them. Extractor-level coverage lives in + tests/unit/core/test_downloader.py::TestIsSupportedUrl and in + test_unsupported_url_fails_with_invalid_input below. + """ + response = client.post("/api/jobs", json={"source_url": url}) + + assert response.status_code == 200 + assert "job_id" in response.json() + + +# =========================================================================== +# 3. Async error path: unsupported URL +# =========================================================================== + + +@pytest.mark.integration +def test_unsupported_url_fails_with_invalid_input(client: TestClient) -> None: + """An unsupported URL creates a job then fails asynchronously with invalid_input. + + Arrange: URL that passes HttpUrl validation but has no yt-dlp extractor. + Act: + 1. POST /api/jobs → 200, job_id returned. + 2. Poll GET /api/jobs/{id} until status=failed (max 10s — the check + itself is in-memory but runs inside asyncio.to_thread, which can + queue behind other threads on a loaded CI runner). + Assert: + - job status is "failed". + - error.code == "invalid_input". + - error.detail contains "yt-dlp". + """ + response = client.post( + "/api/jobs", + json={"source_url": "https://example.com/some-random-page.html"}, + ) + assert response.status_code == 200 + job_id = response.json()["job_id"] + + # Poll until failed or timeout. + deadline = time.monotonic() + 10.0 + status_data: dict[str, Any] = {} + while time.monotonic() < deadline: + status_resp = client.get(f"/api/jobs/{job_id}") + assert status_resp.status_code == 200 + status_data = status_resp.json() + if status_data["status"] == "failed": + break + time.sleep(0.2) + + assert status_data.get("status") == "failed", ( + f"Expected status=failed, got {status_data.get('status')!r}" + ) + error = status_data.get("error") + assert error is not None, "Expected error detail in response" + assert error["code"] == "invalid_input" + assert "yt-dlp" in (error.get("detail") or "") + + +# =========================================================================== +# 4. Download filename Content-Disposition format +# =========================================================================== + + +@pytest.mark.integration +@pytest.mark.parametrize( + "title,source_lang,target_lang,file_type,expected_filename", + [ + # Translated subtitle files use (src_to_tgt) suffix. + ( + "Me at the zoo", + "en", + "zh-TW", + FileType.SRT, + "Me at the zoo (en_to_zh-TW).srt", + ), + ( + "Me at the zoo", + "en", + "zh-TW", + FileType.ASS, + "Me at the zoo (en_to_zh-TW).ass", + ), + # Source video and audio always use (original). + ( + "Me at the zoo", + "en", + "zh-TW", + FileType.SOURCE_VIDEO, + "Me at the zoo (original).mp4", + ), + ( + "Me at the zoo", + "en", + "zh-TW", + FileType.AUDIO, + "Me at the zoo (original).mp3", + ), + # Same src/tgt lang → (original) even for SRT. + ("My Video", "en", "en", FileType.SRT, "My Video (original).srt"), + # Empty title → fallback to "video". + ("", "en", "zh-TW", FileType.SRT, "video (en_to_zh-TW).srt"), + # Illegal filesystem chars are stripped. + ('Bad: "/', "en", "zh-TW", FileType.SRT, "Bad chars (en_to_zh-TW).srt"), + ], +) +def test_content_disposition_filename( + completed_job_factory: Any, + title: str, + source_lang: str, + target_lang: str, + file_type: FileType, + expected_filename: str, +) -> None: + """Content-Disposition header carries the correctly formatted filename. + + Arrange: inject a completed job with the given title/langs, pre-create + dummy output files. + Act: GET /api/jobs/{id}/download/{file_type}. + Assert: filename extracted from Content-Disposition matches expected_filename. + """ + _client, job_id = completed_job_factory(title, source_lang, target_lang) + + response = _client.get(f"/api/jobs/{job_id}/download/{file_type.value}") + + assert response.status_code == 200, response.text + disposition = response.headers.get("content-disposition", "") + assert disposition, "Expected Content-Disposition header to be present" + + actual_filename = _filename_from_disposition(disposition) + assert actual_filename == expected_filename, ( + f"Expected {expected_filename!r}, got {actual_filename!r} " + f"(full header: {disposition!r})" + ) diff --git a/tests/integration/test_full_subtitle_pipeline.py b/tests/integration/test_full_subtitle_pipeline.py index 2b0b2ee..f8289d1 100644 --- a/tests/integration/test_full_subtitle_pipeline.py +++ b/tests/integration/test_full_subtitle_pipeline.py @@ -7,7 +7,7 @@ import pytest -from bilingualsub.core.downloader import VideoMetadata, download_youtube_video +from bilingualsub.core.downloader import VideoMetadata, download_video from bilingualsub.core.merger import merge_subtitles from bilingualsub.core.subtitle import Subtitle from bilingualsub.core.transcriber import transcribe_audio @@ -123,7 +123,7 @@ def test_full_pipeline_to_bilingual_srt( dl_patches = _patch_downloader(video_path) with dl_patches[0], dl_patches[1], dl_patches[2]: - metadata = download_youtube_video(YOUTUBE_URL, video_path) + metadata = download_video(YOUTUBE_URL, video_path) assert isinstance(metadata, VideoMetadata) @@ -176,7 +176,7 @@ def test_full_pipeline_to_bilingual_ass( dl_patches = _patch_downloader(video_path) with dl_patches[0], dl_patches[1], dl_patches[2]: - metadata = download_youtube_video(YOUTUBE_URL, video_path) + metadata = download_video(YOUTUBE_URL, video_path) audio_path = tmp_path / "audio.mp3" audio_path.write_bytes(b"fake audio content") @@ -216,13 +216,13 @@ def test_full_pipeline_to_bilingual_ass( # Verify dialogue line format assert "Dialogue: 0," in written - def test_pipeline_metadata_flows_from_downloader_to_ass_serializer( + def test_ass_output_uses_fixed_playres_regardless_of_video_resolution( self, tmp_path: Path, set_fake_api_key: None, sample_whisper_api_response, ) -> None: - """VideoMetadata width/height flows correctly to ASS PlayResX/PlayResY across resolutions.""" + """PlayRes is fixed at 1920x1080 for consistent rendering, regardless of input resolution.""" resolutions = [ (1920, 1080, "1080p"), (1280, 720, "720p"), @@ -235,7 +235,7 @@ def test_pipeline_metadata_flows_from_downloader_to_ass_serializer( dl_patches = _patch_downloader(video_path, width=width, height=height) with dl_patches[0], dl_patches[1], dl_patches[2]: - metadata = download_youtube_video(YOUTUBE_URL, video_path) + metadata = download_video(YOUTUBE_URL, video_path) assert metadata.width == width, f"{label}: width mismatch" assert metadata.height == height, f"{label}: height mismatch" diff --git a/tests/unit/api/test_jobs.py b/tests/unit/api/test_jobs.py index dabbff0..da69d45 100644 --- a/tests/unit/api/test_jobs.py +++ b/tests/unit/api/test_jobs.py @@ -13,7 +13,7 @@ class TestJob: def test_default_values(self) -> None: job = Job( id="test1", - youtube_url="https://youtube.com/watch?v=test", + source_url="https://youtube.com/watch?v=test", source_lang="en", target_lang="zh-TW", ) @@ -29,13 +29,13 @@ class TestJobManager: def test_create_job(self) -> None: manager = JobManager() job = manager.create_job( - youtube_url="https://youtube.com/watch?v=test", + source_url="https://youtube.com/watch?v=test", source_lang="en", target_lang="zh-TW", ) assert job.id is not None assert len(job.id) == 12 - assert job.youtube_url == "https://youtube.com/watch?v=test" + assert job.source_url == "https://youtube.com/watch?v=test" assert job.source_lang == "en" assert job.target_lang == "zh-TW" diff --git a/tests/unit/api/test_pipeline.py b/tests/unit/api/test_pipeline.py index a8dcd19..f87f099 100644 --- a/tests/unit/api/test_pipeline.py +++ b/tests/unit/api/test_pipeline.py @@ -8,15 +8,16 @@ from bilingualsub.api.constants import FileType, JobStatus, SSEEvent from bilingualsub.api.jobs import Job -from bilingualsub.api.pipeline import run_download, run_pipeline, run_subtitle +from bilingualsub.api.pipeline import run_download, run_subtitle from bilingualsub.core.downloader import DownloadError, VideoMetadata from bilingualsub.core.subtitle import Subtitle, SubtitleEntry +from bilingualsub.utils.ffmpeg import FFmpegError def _make_job_with_time_range() -> Job: return Job( id="test456", - youtube_url="https://youtube.com/watch?v=test", + source_url="https://youtube.com/watch?v=test", source_lang="en", target_lang="zh-TW", start_time=10.0, @@ -27,7 +28,7 @@ def _make_job_with_time_range() -> Job: def _make_job() -> Job: return Job( id="test123", - youtube_url="https://youtube.com/watch?v=test", + source_url="https://youtube.com/watch?v=test", source_lang="en", target_lang="zh-TW", ) @@ -59,14 +60,13 @@ def _make_metadata() -> VideoMetadata: @pytest.mark.unit @pytest.mark.asyncio class TestRunPipeline: - @patch("bilingualsub.api.pipeline.burn_subtitles") @patch("bilingualsub.api.pipeline.serialize_bilingual_ass") @patch("bilingualsub.api.pipeline.serialize_srt") @patch("bilingualsub.api.pipeline.merge_subtitles") @patch("bilingualsub.api.pipeline.translate_subtitle") @patch("bilingualsub.api.pipeline.transcribe_audio") @patch("bilingualsub.api.pipeline.extract_audio") - @patch("bilingualsub.api.pipeline.download_youtube_video") + @patch("bilingualsub.api.pipeline.download_video") async def test_successful_pipeline( self, mock_download, @@ -76,7 +76,6 @@ async def test_successful_pipeline( mock_merge, mock_serialize_srt, mock_serialize_ass, - mock_burn, tmp_path: Path, ) -> None: sub = _make_subtitle() @@ -86,10 +85,10 @@ async def test_successful_pipeline( mock_merge.return_value = sub.entries mock_serialize_srt.return_value = "1\n00:00:00,000 --> 00:00:04,000\nLine 1" mock_serialize_ass.return_value = "[Script Info]\n..." - mock_burn.return_value = tmp_path / "output.mp4" job = _make_job() - await run_pipeline(job) + await run_download(job) + await run_subtitle(job) # Collect all events events = [] @@ -108,12 +107,12 @@ async def test_successful_pipeline( assert "on_progress" in translate_call_kwargs assert callable(translate_call_kwargs["on_progress"]) - @patch("bilingualsub.api.pipeline.download_youtube_video") + @patch("bilingualsub.api.pipeline.download_video") async def test_download_error(self, mock_download) -> None: mock_download.side_effect = DownloadError("Network error") job = _make_job() - await run_pipeline(job) + await run_download(job) events = [] while not job.event_queue.empty(): @@ -124,133 +123,45 @@ async def test_download_error(self, mock_download) -> None: assert error_events[0]["data"]["code"] == "download_failed" assert job.status == JobStatus.FAILED - @patch("bilingualsub.api.pipeline.burn_subtitles") - @patch("bilingualsub.api.pipeline.serialize_bilingual_ass") - @patch("bilingualsub.api.pipeline.serialize_srt") - @patch("bilingualsub.api.pipeline.merge_subtitles") - @patch("bilingualsub.api.pipeline.translate_subtitle") - @patch("bilingualsub.api.pipeline.transcribe_audio") @patch("bilingualsub.api.pipeline.extract_audio") - @patch("bilingualsub.api.pipeline.download_youtube_video") - async def test_transcribe_receives_audio_path( + @patch("bilingualsub.api.pipeline.download_video") + async def test_download_with_time_range_completes( self, mock_download, mock_extract_audio, - mock_transcribe, - mock_translate, - mock_merge, - mock_serialize_srt, - mock_serialize_ass, - mock_burn, ) -> None: - """Transcribe should receive extracted audio path, not video path.""" - sub = _make_subtitle() + """Given job with time range, run_download produces expected output files.""" mock_download.return_value = _make_metadata() - mock_transcribe.return_value = sub - mock_translate.return_value = sub - mock_merge.return_value = sub.entries - mock_serialize_srt.return_value = "1\n00:00:00,000 --> 00:00:04,000\nLine 1" - mock_serialize_ass.return_value = "[Script Info]\n..." - - job = _make_job() - await run_pipeline(job) - - # Verify extract_audio was called with video path - extract_call_args = mock_extract_audio.call_args[0] - assert extract_call_args[0].name == "video.mp4" - assert extract_call_args[1].name == "audio.mp3" - - # Verify transcribe_audio received audio path, NOT video path - transcribe_call_args = mock_transcribe.call_args - audio_arg = transcribe_call_args[0][0] - assert audio_arg.name == "audio.mp3" - - @patch("bilingualsub.api.pipeline.burn_subtitles") - @patch("bilingualsub.api.pipeline.serialize_bilingual_ass") - @patch("bilingualsub.api.pipeline.serialize_srt") - @patch("bilingualsub.api.pipeline.merge_subtitles") - @patch("bilingualsub.api.pipeline.translate_subtitle") - @patch("bilingualsub.api.pipeline.transcribe_audio") - @patch("bilingualsub.api.pipeline.extract_audio") - @patch("bilingualsub.api.pipeline.trim_video") - @patch("bilingualsub.api.pipeline.download_youtube_video") - async def test_pipeline_with_time_range_calls_trim( - self, - mock_download, - mock_trim, - mock_extract_audio, - mock_transcribe, - mock_translate, - mock_merge, - mock_serialize_srt, - mock_serialize_ass, - mock_burn, - ) -> None: - """Given job with time range, download_youtube_video should receive time parameters.""" - sub = _make_subtitle() - mock_download.return_value = _make_metadata() - mock_transcribe.return_value = sub - mock_translate.return_value = sub - mock_merge.return_value = sub.entries - mock_serialize_srt.return_value = "1\n00:00:00,000 --> 00:00:04,000\nLine 1" - mock_serialize_ass.return_value = "[Script Info]\n..." job = _make_job_with_time_range() - await run_pipeline(job) - - # Verify download was called with time parameters - mock_download.assert_called_once() - call_kwargs = mock_download.call_args[1] - assert call_kwargs["start_time"] == 10.0 - assert call_kwargs["end_time"] == 30.0 - - # Verify trim_video was NOT called (trimming happens during download now) - mock_trim.assert_not_called() + await run_download(job) - assert job.status == JobStatus.COMPLETED + assert job.status == JobStatus.DOWNLOAD_COMPLETE + assert FileType.SOURCE_VIDEO in job.output_files + assert FileType.AUDIO in job.output_files - @patch("bilingualsub.api.pipeline.burn_subtitles") - @patch("bilingualsub.api.pipeline.serialize_bilingual_ass") - @patch("bilingualsub.api.pipeline.serialize_srt") - @patch("bilingualsub.api.pipeline.merge_subtitles") - @patch("bilingualsub.api.pipeline.translate_subtitle") - @patch("bilingualsub.api.pipeline.transcribe_audio") @patch("bilingualsub.api.pipeline.extract_audio") - @patch("bilingualsub.api.pipeline.trim_video") - @patch("bilingualsub.api.pipeline.download_youtube_video") - async def test_pipeline_without_time_range_skips_trim( + @patch("bilingualsub.api.pipeline.download_video") + async def test_download_without_time_range_completes( self, mock_download, - mock_trim, mock_extract_audio, - mock_transcribe, - mock_translate, - mock_merge, - mock_serialize_srt, - mock_serialize_ass, - mock_burn, ) -> None: - """Given job without time range, pipeline should NOT call trim_video.""" - sub = _make_subtitle() + """Given job without time range, run_download produces expected output files.""" mock_download.return_value = _make_metadata() - mock_transcribe.return_value = sub - mock_translate.return_value = sub - mock_merge.return_value = sub.entries - mock_serialize_srt.return_value = "1\n00:00:00,000 --> 00:00:04,000\nLine 1" - mock_serialize_ass.return_value = "[Script Info]\n..." job = _make_job() - await run_pipeline(job) + await run_download(job) - mock_trim.assert_not_called() - assert job.status == JobStatus.COMPLETED + assert job.status == JobStatus.DOWNLOAD_COMPLETE + assert FileType.SOURCE_VIDEO in job.output_files @pytest.mark.unit @pytest.mark.asyncio class TestRunDownload: @patch("bilingualsub.api.pipeline.extract_audio") - @patch("bilingualsub.api.pipeline.download_youtube_video") + @patch("bilingualsub.api.pipeline.download_video") async def test_run_download_sends_download_complete( self, mock_download, mock_extract_audio ) -> None: @@ -270,7 +181,7 @@ async def test_run_download_sends_download_complete( assert job.status == JobStatus.DOWNLOAD_COMPLETE @patch("bilingualsub.api.pipeline.extract_audio") - @patch("bilingualsub.api.pipeline.download_youtube_video") + @patch("bilingualsub.api.pipeline.download_video") async def test_run_download_saves_metadata( self, mock_download, mock_extract_audio ) -> None: @@ -283,6 +194,28 @@ async def test_run_download_saves_metadata( assert job.video_width == 1920 assert job.video_height == 1080 + @patch("bilingualsub.api.pipeline.extract_audio") + @patch("bilingualsub.api.pipeline.download_video") + async def test_run_download_extract_audio_failure_sends_error( + self, mock_download, mock_extract_audio + ) -> None: + """When extract_audio raises FFmpegError, job transitions to FAILED.""" + mock_download.return_value = _make_metadata() + mock_extract_audio.side_effect = FFmpegError("ffmpeg segfault") + + job = _make_job() + await run_download(job) + + events = [] + while not job.event_queue.empty(): + events.append(job.event_queue.get_nowait()) + + error_events = [e for e in events if e["event"] == SSEEvent.ERROR] + assert len(error_events) == 1 + assert error_events[0]["data"]["code"] == "burn_failed" + assert "ffmpeg segfault" in error_events[0]["data"]["detail"] + assert job.status == JobStatus.FAILED + @pytest.mark.unit @pytest.mark.asyncio diff --git a/tests/unit/api/test_routes.py b/tests/unit/api/test_routes.py index 83fb1f6..d70042d 100644 --- a/tests/unit/api/test_routes.py +++ b/tests/unit/api/test_routes.py @@ -8,7 +8,8 @@ from bilingualsub.api.app import create_app from bilingualsub.api.constants import FileType, JobStatus -from bilingualsub.api.jobs import JobManager +from bilingualsub.api.jobs import Job, JobManager +from bilingualsub.api.routes import _build_download_filename, _sanitize_filename @pytest.fixture @@ -48,7 +49,7 @@ class TestCreateJob: async def test_create_job_valid(self, client: AsyncClient) -> None: response = await client.post( "/api/jobs", - json={"youtube_url": "https://www.youtube.com/watch?v=dQw4w9WgXcQ"}, + json={"source_url": "https://www.youtube.com/watch?v=dQw4w9WgXcQ"}, ) assert response.status_code == 200 data = response.json() @@ -58,7 +59,7 @@ async def test_create_job_valid(self, client: AsyncClient) -> None: async def test_create_job_invalid_url(self, client: AsyncClient) -> None: response = await client.post( "/api/jobs", - json={"youtube_url": "not-a-url"}, + json={"source_url": "not-a-url"}, ) assert response.status_code == 422 @@ -70,7 +71,7 @@ async def test_get_existing_job(self, client: AsyncClient) -> None: # First create a job create_resp = await client.post( "/api/jobs", - json={"youtube_url": "https://www.youtube.com/watch?v=test123"}, + json={"source_url": "https://www.youtube.com/watch?v=test123"}, ) job_id = create_resp.json()["job_id"] @@ -95,7 +96,7 @@ async def test_download_invalid_file_type(self, client: AsyncClient) -> None: # Create a job first create_resp = await client.post( "/api/jobs", - json={"youtube_url": "https://www.youtube.com/watch?v=test123"}, + json={"source_url": "https://www.youtube.com/watch?v=test123"}, ) job_id = create_resp.json()["job_id"] @@ -127,7 +128,7 @@ async def test_start_subtitle_wrong_status(self, client: AsyncClient, app) -> No """Should return 422 when job is not in download_complete state.""" create_resp = await client.post( "/api/jobs", - json={"youtube_url": "https://www.youtube.com/watch?v=test123"}, + json={"source_url": "https://www.youtube.com/watch?v=test123"}, ) job_id = create_resp.json()["job_id"] @@ -138,7 +139,7 @@ async def test_start_subtitle_success(self, client: AsyncClient, app) -> None: """Should start subtitle when job is download_complete.""" create_resp = await client.post( "/api/jobs", - json={"youtube_url": "https://www.youtube.com/watch?v=test123"}, + json={"source_url": "https://www.youtube.com/watch?v=test123"}, ) job_id = create_resp.json()["job_id"] @@ -156,7 +157,7 @@ async def test_start_subtitle_with_language_override( """Should update source/target language when triggering subtitle.""" create_resp = await client.post( "/api/jobs", - json={"youtube_url": "https://www.youtube.com/watch?v=test123"}, + json={"source_url": "https://www.youtube.com/watch?v=test123"}, ) job_id = create_resp.json()["job_id"] @@ -181,7 +182,7 @@ class TestPartialRetranslate: async def test_partial_retranslate_success(self, client: AsyncClient, app) -> None: create_resp = await client.post( "/api/jobs", - json={"youtube_url": "https://www.youtube.com/watch?v=test123"}, + json={"source_url": "https://www.youtube.com/watch?v=test123"}, ) job_id = create_resp.json()["job_id"] @@ -213,7 +214,7 @@ async def test_partial_retranslate_requires_pipeline_complete( ) -> None: create_resp = await client.post( "/api/jobs", - json={"youtube_url": "https://www.youtube.com/watch?v=test123"}, + json={"source_url": "https://www.youtube.com/watch?v=test123"}, ) job_id = create_resp.json()["job_id"] @@ -227,3 +228,93 @@ async def test_partial_retranslate_requires_pipeline_complete( }, ) assert response.status_code == 422 + + +def _make_job( + *, + title: str = "", + source_lang: str = "en", + target_lang: str = "zh-TW", +) -> Job: + job = Job(id="testjob") + job.video_title = title + job.source_lang = source_lang + job.target_lang = target_lang + return job + + +@pytest.mark.unit +class TestSanitizeFilename: + def test_empty_string_returns_video(self) -> None: + assert _sanitize_filename("") == "video" + + def test_whitespace_only_returns_video(self) -> None: + assert _sanitize_filename(" ") == "video" + + def test_strips_illegal_chars(self) -> None: + assert _sanitize_filename('My: