From e2d0011672909a871c91902c8f748d88ce08e599 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maple=EF=BC=81?= Date: Sat, 18 Apr 2026 09:50:11 +0800 Subject: [PATCH 1/5] feat: rename source_url and support any yt-dlp-supported site - Rename youtube_url -> source_url across backend schemas, Job, routes, pipeline and all tests, plus frontend types, component and i18n copy. Reflects that the tool now supports any platform yt-dlp can handle (X, Twitter, TikTok, etc.), not just YouTube. - Replace regex-based _is_youtube_url() with _is_supported_url(), which delegates to yt-dlp extractors (excluding the generic one) so URL validation always matches yt-dlp's actual capabilities. - Download filenames now use the video title plus a language-intent suffix: " (original).srt" for source-only outputs and "<title> (en_to_zh-TW).srt" for translated ones, with illegal chars stripped and length capped at 120 chars. - Frontend copy and URL validation relaxed to accept any https URL. --- frontend/src/components/UrlInput.tsx | 6 +- frontend/src/i18n/en.json | 10 +-- frontend/src/i18n/zh-TW.json | 10 +-- frontend/src/types.ts | 2 +- src/bilingualsub/api/jobs.py | 8 +- src/bilingualsub/api/pipeline.py | 2 +- src/bilingualsub/api/routes.py | 62 ++++++++++----- src/bilingualsub/api/schemas.py | 2 +- src/bilingualsub/core/downloader.py | 18 ++--- tests/conftest.py | 4 +- tests/unit/api/test_jobs.py | 6 +- tests/unit/api/test_pipeline.py | 4 +- tests/unit/api/test_routes.py | 112 ++++++++++++++++++++++++--- tests/unit/api/test_schemas.py | 30 ++++--- tests/unit/core/test_downloader.py | 43 +++++++++- 15 files changed, 233 insertions(+), 86 deletions(-) diff --git a/frontend/src/components/UrlInput.tsx b/frontend/src/components/UrlInput.tsx index e3060d7..6ac15f6 100644 --- a/frontend/src/components/UrlInput.tsx +++ b/frontend/src/components/UrlInput.tsx @@ -72,14 +72,14 @@ export function UrlInput({ onSubmit, disabled }: UrlInputProps) { return; } - const youtubePattern = /^https?:\/\/(www\.)?(youtube\.com\/watch\?v=|youtu\.be\/)/; - if (!youtubePattern.test(url)) { + const urlPattern = /^https?:\/\/.+/; + if (!urlPattern.test(url)) { setError(t('error.invalid_url')); return; } const request: JobCreateRequest = { - youtube_url: url, + source_url: url, }; if (startSeconds !== undefined) request.start_time = startSeconds; if (endSeconds !== undefined) request.end_time = endSeconds; diff --git a/frontend/src/i18n/en.json b/frontend/src/i18n/en.json index 883a810..b83654f 100644 --- a/frontend/src/i18n/en.json +++ b/frontend/src/i18n/en.json @@ -1,7 +1,7 @@ { "app": { "title": "BilingualSub", - "subtitle": "YouTube Bilingual Subtitle Generator", + "subtitle": "Bilingual Subtitle Generator for YouTube, X, TikTok & More", "processing_title": "Processing Video", "processing_desc": "Please wait while we transcribe and translate.", "error_title": "Something went wrong", @@ -17,8 +17,8 @@ "download_audio": "Download Audio" }, "form": { - "url_placeholder": "Paste YouTube video URL", - "paste_placeholder": "Paste YouTube URL...", + "url_placeholder": "Paste video URL (YouTube / X / Twitter / TikTok…)", + "paste_placeholder": "Paste video URL (YouTube / X / Twitter / TikTok…)", "source_lang": "Source Language", "target_lang": "Target Language", "label_translate": "Translate", @@ -33,7 +33,7 @@ "enable_range": "Set Range", "disable_range": "Disable Range", "clear_range": "Clear", - "inputModeUrl": "YouTube URL", + "inputModeUrl": "Video URL", "inputModeFile": "Upload File", "filePlaceholder": "Choose a video or audio file", "fileSelected": "Selected: {{filename}}" @@ -66,7 +66,7 @@ "unsupported": "Your browser does not support video playback" }, "error": { - "invalid_url": "Please enter a valid YouTube URL", + "invalid_url": "Please enter a valid video URL", "job_not_found": "Job not found", "download_failed": "Video download failed", "transcription_failed": "Transcription failed", diff --git a/frontend/src/i18n/zh-TW.json b/frontend/src/i18n/zh-TW.json index 217fef7..fb0b364 100644 --- a/frontend/src/i18n/zh-TW.json +++ b/frontend/src/i18n/zh-TW.json @@ -1,7 +1,7 @@ { "app": { "title": "BilingualSub", - "subtitle": "YouTube 雙語字幕自動生成工具", + "subtitle": "多平台雙語字幕自動生成工具", "processing_title": "影片處理中", "processing_desc": "請稍候,我們正在進行語音辨識與翻譯。", "error_title": "發生錯誤", @@ -17,8 +17,8 @@ "download_audio": "下載音訊" }, "form": { - "url_placeholder": "貼上 YouTube 影片網址", - "paste_placeholder": "貼上 YouTube 網址...", + "url_placeholder": "貼上影片網址(YouTube / X / Twitter / TikTok…)", + "paste_placeholder": "貼上影片網址(YouTube / X / Twitter / TikTok…)", "source_lang": "原始語言", "target_lang": "翻譯語言", "label_translate": "翻譯語言", @@ -33,7 +33,7 @@ "enable_range": "設定範圍", "disable_range": "取消範圍", "clear_range": "清除", - "inputModeUrl": "YouTube 網址", + "inputModeUrl": "影片網址", "inputModeFile": "上傳檔案", "filePlaceholder": "選擇影片或音訊檔案", "fileSelected": "已選擇:{{filename}}" @@ -66,7 +66,7 @@ "unsupported": "您的瀏覽器不支援影片播放" }, "error": { - "invalid_url": "請輸入有效的 YouTube 網址", + "invalid_url": "請輸入有效的影片網址", "job_not_found": "找不到此任務", "download_failed": "影片下載失敗", "transcription_failed": "語音辨識失敗", diff --git a/frontend/src/types.ts b/frontend/src/types.ts index 6cd9d68..c8f1c1b 100644 --- a/frontend/src/types.ts +++ b/frontend/src/types.ts @@ -1,7 +1,7 @@ import type { FileType, JobStatus } from './constants'; export interface JobCreateRequest { - youtube_url: string; + source_url: string; source_lang?: string; target_lang?: string; start_time?: number; // seconds diff --git a/src/bilingualsub/api/jobs.py b/src/bilingualsub/api/jobs.py index 2930d49..041e290 100644 --- a/src/bilingualsub/api/jobs.py +++ b/src/bilingualsub/api/jobs.py @@ -29,7 +29,7 @@ class Job: """Represents a subtitle generation job.""" id: str - youtube_url: str = "" + source_url: str = "" source_lang: str = "" target_lang: str = "" local_video_path: Path | None = None @@ -59,7 +59,7 @@ def __init__(self) -> None: def create_job( self, - youtube_url: str = "", + source_url: str = "", source_lang: str = "", target_lang: str = "", start_time: float | None = None, @@ -70,7 +70,7 @@ def create_job( job_id = uuid.uuid4().hex[:12] job = Job( id=job_id, - youtube_url=youtube_url, + source_url=source_url, source_lang=source_lang, target_lang=target_lang, local_video_path=local_video_path, @@ -78,7 +78,7 @@ def create_job( end_time=end_time, ) self._jobs[job_id] = job - logger.info("job_created", job_id=job_id, youtube_url=youtube_url) + logger.info("job_created", job_id=job_id, source_url=source_url) return job def get_job(self, job_id: str) -> Job | None: diff --git a/src/bilingualsub/api/pipeline.py b/src/bilingualsub/api/pipeline.py index 1b6b8b8..77f4311 100644 --- a/src/bilingualsub/api/pipeline.py +++ b/src/bilingualsub/api/pipeline.py @@ -237,7 +237,7 @@ def _put_event() -> None: metadata = await asyncio.to_thread( download_youtube_video, - job.youtube_url, + job.source_url, video_path, on_progress=_on_download_progress, start_time=job.start_time, diff --git a/src/bilingualsub/api/routes.py b/src/bilingualsub/api/routes.py index 6f0eb57..7efa775 100644 --- a/src/bilingualsub/api/routes.py +++ b/src/bilingualsub/api/routes.py @@ -4,6 +4,7 @@ import asyncio import json +import re import tempfile from pathlib import Path from typing import TYPE_CHECKING, Any @@ -53,6 +54,46 @@ ".webm", } +_EXTENSIONS: dict[FileType, str] = { + FileType.SRT: "srt", + FileType.ASS: "ass", + FileType.VIDEO: "mp4", + FileType.AUDIO: "mp3", + FileType.SOURCE_VIDEO: "mp4", +} + +_MEDIA_TYPES: dict[FileType, str] = { + FileType.SRT: "text/plain", + FileType.ASS: "text/plain", + FileType.VIDEO: "video/mp4", + FileType.AUDIO: "audio/mpeg", + FileType.SOURCE_VIDEO: "video/mp4", +} + +_FILENAME_BAD_CHARS = re.compile(r'[<>:"/\\|?*\x00-\x1f]') + + +def _sanitize_filename(name: str) -> str: + """Remove filesystem-unsafe characters and truncate to 120 chars.""" + cleaned = _FILENAME_BAD_CHARS.sub("", name).strip(" .") + return (cleaned or "video")[:120] + + +def _build_download_filename(job: Job, file_type: FileType) -> str: + """Build a human-readable download filename for the given job and file type.""" + base = _sanitize_filename(job.video_title or "video") + original_only = file_type in (FileType.SOURCE_VIDEO, FileType.AUDIO) + same_lang = ( + not job.source_lang or not job.target_lang or job.source_lang == job.target_lang + ) + suffix = ( + "(original)" + if original_only or same_lang + else f"({job.source_lang}_to_{job.target_lang})" + ) + ext = _EXTENSIONS[file_type] + return f"{base} {suffix}.{ext}" + def _get_job_manager(request: Request) -> JobManager: """Get the JobManager from app state.""" @@ -83,7 +124,7 @@ async def create_job(body: JobCreateRequest, request: Request) -> JobCreateRespo """Create a new subtitle generation job.""" manager = _get_job_manager(request) job = manager.create_job( - youtube_url=str(body.youtube_url), + source_url=str(body.source_url), source_lang=body.source_lang, target_lang=body.target_lang, start_time=body.start_time, @@ -212,25 +253,10 @@ async def download_file(job_id: str, file_type: str, request: Request) -> FileRe detail="Job may not have completed this step", ) - # Set appropriate media type and filename - media_types = { - FileType.SRT: "text/plain", - FileType.ASS: "text/plain", - FileType.VIDEO: "video/mp4", - FileType.AUDIO: "audio/mpeg", - FileType.SOURCE_VIDEO: "video/mp4", - } - extensions = { - FileType.SRT: "srt", - FileType.ASS: "ass", - FileType.VIDEO: "mp4", - FileType.AUDIO: "mp3", - FileType.SOURCE_VIDEO: "mp4", - } return FileResponse( path=path, - media_type=media_types[ft], - filename=f"bilingualsub.{extensions[ft]}", + media_type=_MEDIA_TYPES[ft], + filename=_build_download_filename(job, ft), ) diff --git a/src/bilingualsub/api/schemas.py b/src/bilingualsub/api/schemas.py index 41604dc..1a4d0ff 100644 --- a/src/bilingualsub/api/schemas.py +++ b/src/bilingualsub/api/schemas.py @@ -10,7 +10,7 @@ class JobCreateRequest(BaseModel): """Request body for creating a new subtitle generation job.""" - youtube_url: HttpUrl + source_url: HttpUrl source_lang: str = "en" target_lang: str = "zh-TW" start_time: float | None = None diff --git a/src/bilingualsub/core/downloader.py b/src/bilingualsub/core/downloader.py index 08728cf..e12089a 100644 --- a/src/bilingualsub/core/downloader.py +++ b/src/bilingualsub/core/downloader.py @@ -71,8 +71,8 @@ def download_youtube_video( if not url.strip(): raise ValueError("URL cannot be empty") - if not _is_youtube_url(url): - raise ValueError(f"Not a valid YouTube URL: {url}") + if not _is_supported_url(url): + raise ValueError(f"此網址目前的下載器(yt-dlp)無法處理: {url}") if not output_path.parent.exists(): raise ValueError(f"Output directory does not exist: {output_path.parent}") @@ -127,15 +127,11 @@ def _sanitize_description(raw: Any) -> str: return raw.strip() -def _is_youtube_url(url: str) -> bool: - """Check if URL is a valid YouTube URL.""" - youtube_domains = [ - "youtube.com", - "www.youtube.com", - "m.youtube.com", - "youtu.be", - ] - return any(domain in url for domain in youtube_domains) +def _is_supported_url(url: str) -> bool: + """Check if yt-dlp has a dedicated extractor for this URL.""" + from yt_dlp.extractor import gen_extractors # noqa: PLC0415 + + return any(ie.suitable(url) for ie in gen_extractors() if ie.IE_NAME != "generic") def _download_video( diff --git a/tests/conftest.py b/tests/conftest.py index 7e2657a..b266004 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -36,6 +36,6 @@ def sample_srt_content() -> str: @pytest.fixture -def sample_youtube_url() -> str: - """Return a sample YouTube URL for testing.""" +def sample_source_url() -> str: + """Return a sample video URL for testing.""" return "https://www.youtube.com/watch?v=dQw4w9WgXcQ" diff --git a/tests/unit/api/test_jobs.py b/tests/unit/api/test_jobs.py index dabbff0..da69d45 100644 --- a/tests/unit/api/test_jobs.py +++ b/tests/unit/api/test_jobs.py @@ -13,7 +13,7 @@ class TestJob: def test_default_values(self) -> None: job = Job( id="test1", - youtube_url="https://youtube.com/watch?v=test", + source_url="https://youtube.com/watch?v=test", source_lang="en", target_lang="zh-TW", ) @@ -29,13 +29,13 @@ class TestJobManager: def test_create_job(self) -> None: manager = JobManager() job = manager.create_job( - youtube_url="https://youtube.com/watch?v=test", + source_url="https://youtube.com/watch?v=test", source_lang="en", target_lang="zh-TW", ) assert job.id is not None assert len(job.id) == 12 - assert job.youtube_url == "https://youtube.com/watch?v=test" + assert job.source_url == "https://youtube.com/watch?v=test" assert job.source_lang == "en" assert job.target_lang == "zh-TW" diff --git a/tests/unit/api/test_pipeline.py b/tests/unit/api/test_pipeline.py index a8dcd19..4bd4f40 100644 --- a/tests/unit/api/test_pipeline.py +++ b/tests/unit/api/test_pipeline.py @@ -16,7 +16,7 @@ def _make_job_with_time_range() -> Job: return Job( id="test456", - youtube_url="https://youtube.com/watch?v=test", + source_url="https://youtube.com/watch?v=test", source_lang="en", target_lang="zh-TW", start_time=10.0, @@ -27,7 +27,7 @@ def _make_job_with_time_range() -> Job: def _make_job() -> Job: return Job( id="test123", - youtube_url="https://youtube.com/watch?v=test", + source_url="https://youtube.com/watch?v=test", source_lang="en", target_lang="zh-TW", ) diff --git a/tests/unit/api/test_routes.py b/tests/unit/api/test_routes.py index 83fb1f6..9bfb4a3 100644 --- a/tests/unit/api/test_routes.py +++ b/tests/unit/api/test_routes.py @@ -8,7 +8,8 @@ from bilingualsub.api.app import create_app from bilingualsub.api.constants import FileType, JobStatus -from bilingualsub.api.jobs import JobManager +from bilingualsub.api.jobs import Job, JobManager +from bilingualsub.api.routes import _build_download_filename, _sanitize_filename @pytest.fixture @@ -48,7 +49,7 @@ class TestCreateJob: async def test_create_job_valid(self, client: AsyncClient) -> None: response = await client.post( "/api/jobs", - json={"youtube_url": "https://www.youtube.com/watch?v=dQw4w9WgXcQ"}, + json={"source_url": "https://www.youtube.com/watch?v=dQw4w9WgXcQ"}, ) assert response.status_code == 200 data = response.json() @@ -58,7 +59,7 @@ async def test_create_job_valid(self, client: AsyncClient) -> None: async def test_create_job_invalid_url(self, client: AsyncClient) -> None: response = await client.post( "/api/jobs", - json={"youtube_url": "not-a-url"}, + json={"source_url": "not-a-url"}, ) assert response.status_code == 422 @@ -70,7 +71,7 @@ async def test_get_existing_job(self, client: AsyncClient) -> None: # First create a job create_resp = await client.post( "/api/jobs", - json={"youtube_url": "https://www.youtube.com/watch?v=test123"}, + json={"source_url": "https://www.youtube.com/watch?v=test123"}, ) job_id = create_resp.json()["job_id"] @@ -95,7 +96,7 @@ async def test_download_invalid_file_type(self, client: AsyncClient) -> None: # Create a job first create_resp = await client.post( "/api/jobs", - json={"youtube_url": "https://www.youtube.com/watch?v=test123"}, + json={"source_url": "https://www.youtube.com/watch?v=test123"}, ) job_id = create_resp.json()["job_id"] @@ -127,7 +128,7 @@ async def test_start_subtitle_wrong_status(self, client: AsyncClient, app) -> No """Should return 422 when job is not in download_complete state.""" create_resp = await client.post( "/api/jobs", - json={"youtube_url": "https://www.youtube.com/watch?v=test123"}, + json={"source_url": "https://www.youtube.com/watch?v=test123"}, ) job_id = create_resp.json()["job_id"] @@ -138,7 +139,7 @@ async def test_start_subtitle_success(self, client: AsyncClient, app) -> None: """Should start subtitle when job is download_complete.""" create_resp = await client.post( "/api/jobs", - json={"youtube_url": "https://www.youtube.com/watch?v=test123"}, + json={"source_url": "https://www.youtube.com/watch?v=test123"}, ) job_id = create_resp.json()["job_id"] @@ -156,7 +157,7 @@ async def test_start_subtitle_with_language_override( """Should update source/target language when triggering subtitle.""" create_resp = await client.post( "/api/jobs", - json={"youtube_url": "https://www.youtube.com/watch?v=test123"}, + json={"source_url": "https://www.youtube.com/watch?v=test123"}, ) job_id = create_resp.json()["job_id"] @@ -181,7 +182,7 @@ class TestPartialRetranslate: async def test_partial_retranslate_success(self, client: AsyncClient, app) -> None: create_resp = await client.post( "/api/jobs", - json={"youtube_url": "https://www.youtube.com/watch?v=test123"}, + json={"source_url": "https://www.youtube.com/watch?v=test123"}, ) job_id = create_resp.json()["job_id"] @@ -213,7 +214,7 @@ async def test_partial_retranslate_requires_pipeline_complete( ) -> None: create_resp = await client.post( "/api/jobs", - json={"youtube_url": "https://www.youtube.com/watch?v=test123"}, + json={"source_url": "https://www.youtube.com/watch?v=test123"}, ) job_id = create_resp.json()["job_id"] @@ -227,3 +228,94 @@ async def test_partial_retranslate_requires_pipeline_complete( }, ) assert response.status_code == 422 + + +def _make_job( + *, + title: str = "", + source_lang: str = "en", + target_lang: str = "zh-TW", +) -> Job: + job = Job(id="testjob") + job.video_title = title + job.source_lang = source_lang + job.target_lang = target_lang + return job + + +@pytest.mark.unit +class TestSanitizeFilename: + def test_empty_string_returns_video(self) -> None: + assert _sanitize_filename("") == "video" + + def test_whitespace_only_returns_video(self) -> None: + assert _sanitize_filename(" ") == "video" + + def test_strips_illegal_chars(self) -> None: + result = _sanitize_filename('My: <video> / test"') + assert "<" not in result + assert ">" not in result + assert ":" not in result + assert '"' not in result + assert "/" not in result + assert len(result) > 0 + + def test_truncates_to_120_chars(self) -> None: + long_name = "a" * 200 + assert len(_sanitize_filename(long_name)) == 120 + + def test_strips_leading_trailing_dots_and_spaces(self) -> None: + assert _sanitize_filename(" .hello. ") == "hello" + + +@pytest.mark.unit +class TestBuildDownloadFilename: + def test_empty_title_falls_back_to_video(self) -> None: + job = _make_job(title="", source_lang="en", target_lang="en") + result = _build_download_filename(job, FileType.SRT) + assert result == "video (original).srt" + + def test_same_lang_produces_original_suffix(self) -> None: + job = _make_job(title="My Video", source_lang="en", target_lang="en") + result = _build_download_filename(job, FileType.SRT) + assert result == "My Video (original).srt" + + def test_translation_produces_lang_suffix(self) -> None: + job = _make_job(title="My Video", source_lang="en", target_lang="zh-TW") + result = _build_download_filename(job, FileType.SRT) + assert result == "My Video (en_to_zh-TW).srt" + + def test_title_with_illegal_chars_stripped(self) -> None: + job = _make_job( + title="My: <video> / test", source_lang="en", target_lang="zh-TW" + ) + result = _build_download_filename(job, FileType.SRT) + assert ":" not in result + assert "<" not in result + assert ">" not in result + assert result.endswith("(en_to_zh-TW).srt") + + def test_source_video_always_original(self) -> None: + job = _make_job(title="My Video", source_lang="en", target_lang="zh-TW") + result = _build_download_filename(job, FileType.SOURCE_VIDEO) + assert result == "My Video (original).mp4" + + def test_audio_always_original(self) -> None: + job = _make_job(title="My Video", source_lang="en", target_lang="zh-TW") + result = _build_download_filename(job, FileType.AUDIO) + assert result == "My Video (original).mp3" + + def test_ass_with_translation(self) -> None: + job = _make_job(title="My Video", source_lang="en", target_lang="zh-TW") + result = _build_download_filename(job, FileType.ASS) + assert result == "My Video (en_to_zh-TW).ass" + + def test_burned_video_with_translation(self) -> None: + job = _make_job(title="My Video", source_lang="en", target_lang="zh-TW") + result = _build_download_filename(job, FileType.VIDEO) + assert result == "My Video (en_to_zh-TW).mp4" + + def test_empty_source_lang_produces_original(self) -> None: + job = _make_job(title="My Video", source_lang="", target_lang="zh-TW") + result = _build_download_filename(job, FileType.SRT) + assert result == "My Video (original).srt" diff --git a/tests/unit/api/test_schemas.py b/tests/unit/api/test_schemas.py index 3df4c8c..9228e57 100644 --- a/tests/unit/api/test_schemas.py +++ b/tests/unit/api/test_schemas.py @@ -17,20 +17,18 @@ @pytest.mark.unit class TestJobCreateRequest: - def test_valid_youtube_url(self) -> None: - req = JobCreateRequest( - youtube_url="https://www.youtube.com/watch?v=dQw4w9WgXcQ" - ) - assert str(req.youtube_url) == "https://www.youtube.com/watch?v=dQw4w9WgXcQ" + def test_valid_source_url(self) -> None: + req = JobCreateRequest(source_url="https://www.youtube.com/watch?v=dQw4w9WgXcQ") + assert str(req.source_url) == "https://www.youtube.com/watch?v=dQw4w9WgXcQ" def test_default_languages(self) -> None: - req = JobCreateRequest(youtube_url="https://www.youtube.com/watch?v=test123") + req = JobCreateRequest(source_url="https://www.youtube.com/watch?v=test123") assert req.source_lang == "en" assert req.target_lang == "zh-TW" def test_custom_languages(self) -> None: req = JobCreateRequest( - youtube_url="https://www.youtube.com/watch?v=test", + source_url="https://www.youtube.com/watch?v=test", source_lang="ja", target_lang="en", ) @@ -39,16 +37,16 @@ def test_custom_languages(self) -> None: def test_invalid_url_rejected(self) -> None: with pytest.raises(ValidationError): - JobCreateRequest(youtube_url="not-a-url") + JobCreateRequest(source_url="not-a-url") def test_default_time_range_is_none(self) -> None: - req = JobCreateRequest(youtube_url="https://www.youtube.com/watch?v=test123") + req = JobCreateRequest(source_url="https://www.youtube.com/watch?v=test123") assert req.start_time is None assert req.end_time is None def test_valid_time_range(self) -> None: req = JobCreateRequest( - youtube_url="https://www.youtube.com/watch?v=test123", + source_url="https://www.youtube.com/watch?v=test123", start_time=10.0, end_time=30.0, ) @@ -57,7 +55,7 @@ def test_valid_time_range(self) -> None: def test_start_time_only(self) -> None: req = JobCreateRequest( - youtube_url="https://www.youtube.com/watch?v=test123", + source_url="https://www.youtube.com/watch?v=test123", start_time=10.0, ) assert req.start_time == 10.0 @@ -65,7 +63,7 @@ def test_start_time_only(self) -> None: def test_end_time_only(self) -> None: req = JobCreateRequest( - youtube_url="https://www.youtube.com/watch?v=test123", + source_url="https://www.youtube.com/watch?v=test123", end_time=30.0, ) assert req.start_time is None @@ -74,14 +72,14 @@ def test_end_time_only(self) -> None: def test_negative_start_time_rejected(self) -> None: with pytest.raises(ValidationError, match="start_time must be non-negative"): JobCreateRequest( - youtube_url="https://www.youtube.com/watch?v=test123", + source_url="https://www.youtube.com/watch?v=test123", start_time=-1.0, ) def test_negative_end_time_rejected(self) -> None: with pytest.raises(ValidationError, match="end_time must be non-negative"): JobCreateRequest( - youtube_url="https://www.youtube.com/watch?v=test123", + source_url="https://www.youtube.com/watch?v=test123", end_time=-5.0, ) @@ -90,7 +88,7 @@ def test_start_time_greater_than_end_time_rejected(self) -> None: ValidationError, match="start_time must be less than end_time" ): JobCreateRequest( - youtube_url="https://www.youtube.com/watch?v=test123", + source_url="https://www.youtube.com/watch?v=test123", start_time=30.0, end_time=10.0, ) @@ -100,7 +98,7 @@ def test_start_time_equal_to_end_time_rejected(self) -> None: ValidationError, match="start_time must be less than end_time" ): JobCreateRequest( - youtube_url="https://www.youtube.com/watch?v=test123", + source_url="https://www.youtube.com/watch?v=test123", start_time=10.0, end_time=10.0, ) diff --git a/tests/unit/core/test_downloader.py b/tests/unit/core/test_downloader.py index d374c50..974cab1 100644 --- a/tests/unit/core/test_downloader.py +++ b/tests/unit/core/test_downloader.py @@ -10,6 +10,7 @@ from bilingualsub.core.downloader import ( DownloadError, VideoMetadata, + _is_supported_url, download_youtube_video, ) @@ -247,10 +248,12 @@ def test_empty_url_raises_error(self, tmp_path): with pytest.raises(ValueError, match="URL cannot be empty"): download_youtube_video(" ", tmp_path / "video.mp4") - def test_non_youtube_url_raises_error(self, tmp_path): - """Test that non-YouTube URL raises error.""" - with pytest.raises(ValueError, match="Not a valid YouTube URL"): - download_youtube_video("https://vimeo.com/123456", tmp_path / "video.mp4") + def test_unsupported_url_raises_error(self, tmp_path): + """Test that a URL with no dedicated yt-dlp extractor raises error.""" + with pytest.raises(ValueError, match="yt-dlp"): + download_youtube_video( + "https://example.com/random.html", tmp_path / "video.mp4" + ) def test_output_directory_not_exists_raises_error(self): """Test that non-existent output directory raises error.""" @@ -837,3 +840,35 @@ def extract_info_side_effect(url, download=True): # Duration should be calculated as (end - start) assert metadata.duration == 60.0 + + +@pytest.mark.unit +class TestIsSupportedUrl: + """Test cases for _is_supported_url using real yt-dlp extractor registry.""" + + def test_youtube_watch_url_supported(self) -> None: + assert _is_supported_url("https://www.youtube.com/watch?v=dQw4w9WgXcQ") is True + + def test_youtu_be_short_url_supported(self) -> None: + assert _is_supported_url("https://youtu.be/dQw4w9WgXcQ") is True + + def test_x_com_url_supported(self) -> None: + assert _is_supported_url("https://x.com/user/status/123456789012345678") is True + + def test_twitter_com_url_supported(self) -> None: + assert ( + _is_supported_url("https://twitter.com/user/status/123456789012345678") + is True + ) + + def test_tiktok_url_supported(self) -> None: + assert ( + _is_supported_url("https://www.tiktok.com/@user/video/1234567890123456789") + is True + ) + + def test_generic_webpage_not_supported(self) -> None: + assert _is_supported_url("https://example.com/random.html") is False + + def test_non_url_string_not_supported(self) -> None: + assert _is_supported_url("not-a-url") is False From edf32435528c2b2051258c88edd28a2320c1cc67 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maple=EF=BC=81?= <mapleee723@gmail.com> Date: Sat, 18 Apr 2026 12:10:27 +0800 Subject: [PATCH 2/5] refactor: address review findings from /review + /test-review + /qa Correctness / Cleanliness: - Rename download_youtube_video -> download_video (leaked abstraction); update all call sites in pipeline, core/__init__ and tests. - Swap Chinese error string for English ("URL not supported by downloader (yt-dlp): ...") to match the rest of the file and avoid leaking a UI concern into core. - Strip trailing " ." after 120-char truncation in _sanitize_filename to avoid Windows-invalid names ending in space. - Update module docstring in downloader.py to drop "YouTube". Efficiency (Blocker): - Cache yt-dlp extractor classes at module load; _is_supported_url now iterates the cached list and calls suitable() on classes (~11ms/call -> ~0ms). Removes the deferred import + PLC0415 noqa. Simplicity / Convention: - Merge _EXTENSIONS and _MEDIA_TYPES into a single _FILE_META dict keyed by FileType (NamedTuple of ext + media_type). - Extract magic strings: _DEFAULT_FILENAME, _SUFFIX_ORIGINAL, _LANG_SEPARATOR. Frontend: - Tighten URL pattern from /^https?:\/\/.+/ to /^https?:\/\/\S+$/ so whitespace-only suffixes are rejected client-side. API defensiveness (QA F2): - JobCreateRequest now uses extra="forbid"; clients that send youtube_url (old field) alongside source_url get 422 instead of a silent success. Tests: - Strengthen two assertion-roulette cases to assert exact strings / legal content preservation, so a mutation that returns the fallback "video" would now fail the tests. --- src/bilingualsub/api/pipeline.py | 4 +- src/bilingualsub/api/routes.py | 43 +++++++------ src/bilingualsub/api/schemas.py | 4 +- src/bilingualsub/core/__init__.py | 4 +- src/bilingualsub/core/downloader.py | 22 ++++--- tests/e2e/test_workflow_skeleton.py | 20 +++--- .../test_full_subtitle_pipeline.py | 8 +-- tests/unit/api/test_pipeline.py | 16 ++--- tests/unit/api/test_routes.py | 9 +-- tests/unit/core/test_downloader.py | 62 ++++++++----------- 10 files changed, 91 insertions(+), 101 deletions(-) diff --git a/src/bilingualsub/api/pipeline.py b/src/bilingualsub/api/pipeline.py index 77f4311..d2cce4f 100644 --- a/src/bilingualsub/api/pipeline.py +++ b/src/bilingualsub/api/pipeline.py @@ -23,7 +23,7 @@ TranscriptionError, TranslationError, VideoMetadata, - download_youtube_video, + download_video, merge_subtitles, transcribe_audio, translate_subtitle, @@ -236,7 +236,7 @@ def _put_event() -> None: loop.call_soon_threadsafe(_put_event) metadata = await asyncio.to_thread( - download_youtube_video, + download_video, job.source_url, video_path, on_progress=_on_download_progress, diff --git a/src/bilingualsub/api/routes.py b/src/bilingualsub/api/routes.py index 7efa775..7fc7285 100644 --- a/src/bilingualsub/api/routes.py +++ b/src/bilingualsub/api/routes.py @@ -7,7 +7,7 @@ import re import tempfile from pathlib import Path -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING, Any, NamedTuple import structlog from fastapi import APIRouter, Form, Request, UploadFile @@ -54,20 +54,22 @@ ".webm", } -_EXTENSIONS: dict[FileType, str] = { - FileType.SRT: "srt", - FileType.ASS: "ass", - FileType.VIDEO: "mp4", - FileType.AUDIO: "mp3", - FileType.SOURCE_VIDEO: "mp4", -} +_DEFAULT_FILENAME = "video" +_SUFFIX_ORIGINAL = "(original)" +_LANG_SEPARATOR = "_to_" + + +class _FileMeta(NamedTuple): + ext: str + media_type: str + -_MEDIA_TYPES: dict[FileType, str] = { - FileType.SRT: "text/plain", - FileType.ASS: "text/plain", - FileType.VIDEO: "video/mp4", - FileType.AUDIO: "audio/mpeg", - FileType.SOURCE_VIDEO: "video/mp4", +_FILE_META: dict[FileType, _FileMeta] = { + FileType.SRT: _FileMeta("srt", "text/plain"), + FileType.ASS: _FileMeta("ass", "text/plain"), + FileType.VIDEO: _FileMeta("mp4", "video/mp4"), + FileType.AUDIO: _FileMeta("mp3", "audio/mpeg"), + FileType.SOURCE_VIDEO: _FileMeta("mp4", "video/mp4"), } _FILENAME_BAD_CHARS = re.compile(r'[<>:"/\\|?*\x00-\x1f]') @@ -76,22 +78,23 @@ def _sanitize_filename(name: str) -> str: """Remove filesystem-unsafe characters and truncate to 120 chars.""" cleaned = _FILENAME_BAD_CHARS.sub("", name).strip(" .") - return (cleaned or "video")[:120] + truncated = (cleaned or _DEFAULT_FILENAME)[:120] + return truncated.rstrip(" .") or _DEFAULT_FILENAME def _build_download_filename(job: Job, file_type: FileType) -> str: """Build a human-readable download filename for the given job and file type.""" - base = _sanitize_filename(job.video_title or "video") + base = _sanitize_filename(job.video_title or _DEFAULT_FILENAME) original_only = file_type in (FileType.SOURCE_VIDEO, FileType.AUDIO) same_lang = ( not job.source_lang or not job.target_lang or job.source_lang == job.target_lang ) suffix = ( - "(original)" + _SUFFIX_ORIGINAL if original_only or same_lang - else f"({job.source_lang}_to_{job.target_lang})" + else f"({job.source_lang}{_LANG_SEPARATOR}{job.target_lang})" ) - ext = _EXTENSIONS[file_type] + ext = _FILE_META[file_type].ext return f"{base} {suffix}.{ext}" @@ -255,7 +258,7 @@ async def download_file(job_id: str, file_type: str, request: Request) -> FileRe return FileResponse( path=path, - media_type=_MEDIA_TYPES[ft], + media_type=_FILE_META[ft].media_type, filename=_build_download_filename(job, ft), ) diff --git a/src/bilingualsub/api/schemas.py b/src/bilingualsub/api/schemas.py index 1a4d0ff..40d7059 100644 --- a/src/bilingualsub/api/schemas.py +++ b/src/bilingualsub/api/schemas.py @@ -2,7 +2,7 @@ from typing import Self -from pydantic import BaseModel, HttpUrl, model_validator +from pydantic import BaseModel, ConfigDict, HttpUrl, model_validator from bilingualsub.api.constants import FileType, JobStatus @@ -10,6 +10,8 @@ class JobCreateRequest(BaseModel): """Request body for creating a new subtitle generation job.""" + model_config = ConfigDict(extra="forbid") + source_url: HttpUrl source_lang: str = "en" target_lang: str = "zh-TW" diff --git a/src/bilingualsub/core/__init__.py b/src/bilingualsub/core/__init__.py index 06aa1eb..8b1843f 100644 --- a/src/bilingualsub/core/__init__.py +++ b/src/bilingualsub/core/__init__.py @@ -3,7 +3,7 @@ from bilingualsub.core.downloader import ( DownloadError, VideoMetadata, - download_youtube_video, + download_video, ) from bilingualsub.core.merger import merge_subtitles from bilingualsub.core.subtitle import Subtitle, SubtitleEntry @@ -23,7 +23,7 @@ "TranscriptionError", "TranslationError", "VideoMetadata", - "download_youtube_video", + "download_video", "merge_subtitles", "retranslate_entries", "transcribe_audio", diff --git a/src/bilingualsub/core/downloader.py b/src/bilingualsub/core/downloader.py index e12089a..b1b06af 100644 --- a/src/bilingualsub/core/downloader.py +++ b/src/bilingualsub/core/downloader.py @@ -1,4 +1,4 @@ -"""YouTube video downloader with metadata extraction.""" +"""Video downloader (via yt-dlp) with metadata extraction.""" import json import os @@ -10,6 +10,7 @@ from typing import Any import yt_dlp +from yt_dlp.extractor import gen_extractor_classes from yt_dlp.utils import download_range_func @@ -43,7 +44,7 @@ def __post_init__(self) -> None: self.description = _sanitize_description(self.description) -def download_youtube_video( +def download_video( url: str, output_path: Path, on_progress: Callable[[float, float], None] | None = None, @@ -51,10 +52,10 @@ def download_youtube_video( end_time: float | None = None, ) -> VideoMetadata: """ - Download YouTube video and extract metadata. + Download video and extract metadata. Args: - url: YouTube video URL + url: Video URL supported by yt-dlp output_path: Path where video will be saved (including extension) on_progress: Optional callback for download progress (downloaded_bytes, total_bytes) @@ -72,7 +73,7 @@ def download_youtube_video( raise ValueError("URL cannot be empty") if not _is_supported_url(url): - raise ValueError(f"此網址目前的下載器(yt-dlp)無法處理: {url}") + raise ValueError(f"URL not supported by downloader (yt-dlp): {url}") if not output_path.parent.exists(): raise ValueError(f"Output directory does not exist: {output_path.parent}") @@ -80,7 +81,6 @@ def download_youtube_video( if output_path.exists(): raise ValueError(f"Output file already exists: {output_path}") - # Download video and get info_dict try: info_dict = _download_video( url, @@ -92,7 +92,6 @@ def download_youtube_video( except Exception as e: raise DownloadError(f"Failed to download video: {e}") from e - # Extract metadata - try FFprobe first, fallback to info_dict try: metadata = _extract_metadata_with_ffprobe(output_path) except (FileNotFoundError, OSError, subprocess.CalledProcessError): @@ -127,11 +126,14 @@ def _sanitize_description(raw: Any) -> str: return raw.strip() +_SUPPORTED_EXTRACTOR_CLASSES: list[Any] = [ + cls for cls in gen_extractor_classes() if cls.IE_NAME != "generic" +] + + def _is_supported_url(url: str) -> bool: """Check if yt-dlp has a dedicated extractor for this URL.""" - from yt_dlp.extractor import gen_extractors # noqa: PLC0415 - - return any(ie.suitable(url) for ie in gen_extractors() if ie.IE_NAME != "generic") + return any(cls.suitable(url) for cls in _SUPPORTED_EXTRACTOR_CLASSES) def _download_video( diff --git a/tests/e2e/test_workflow_skeleton.py b/tests/e2e/test_workflow_skeleton.py index f439b1e..83d127c 100644 --- a/tests/e2e/test_workflow_skeleton.py +++ b/tests/e2e/test_workflow_skeleton.py @@ -21,7 +21,7 @@ import pytest -from bilingualsub.core.downloader import DownloadError, download_youtube_video +from bilingualsub.core.downloader import DownloadError, download_video from bilingualsub.core.merger import merge_subtitles from bilingualsub.core.subtitle import Subtitle, SubtitleEntry from bilingualsub.core.transcriber import transcribe_audio @@ -82,7 +82,7 @@ def e2e_output_dir(self, tmp_path: Path) -> Path: return output_dir @pytest.mark.subtitle_only - def test_download_youtube_video(self, e2e_output_dir: Path) -> None: + def test_download_video(self, e2e_output_dir: Path) -> None: """Test downloading a YouTube video. Given a valid YouTube URL, @@ -93,7 +93,7 @@ def test_download_youtube_video(self, e2e_output_dir: Path) -> None: """ output_path = e2e_output_dir / "test_video.mp4" - metadata = download_youtube_video(TEST_YOUTUBE_URL, output_path) + metadata = download_video(TEST_YOUTUBE_URL, output_path) assert output_path.exists() assert output_path.stat().st_size > 0 @@ -114,7 +114,7 @@ def test_transcribe_audio(self, e2e_output_dir: Path) -> None: """ # Download video first video_path = e2e_output_dir / "test_video.mp4" - download_youtube_video(TEST_YOUTUBE_URL, video_path) + download_video(TEST_YOUTUBE_URL, video_path) # Transcribe subtitle = transcribe_audio(video_path, language="en") @@ -134,7 +134,7 @@ def test_translate_subtitles(self, e2e_output_dir: Path) -> None: """ # Download and transcribe video_path = e2e_output_dir / "test_video.mp4" - download_youtube_video(TEST_YOUTUBE_URL, video_path) + download_video(TEST_YOUTUBE_URL, video_path) original = transcribe_audio(video_path, language="en") # Translate @@ -158,7 +158,7 @@ def test_merge_bilingual_subtitles(self, e2e_output_dir: Path) -> None: """ # Full pipeline up to merge video_path = e2e_output_dir / "test_video.mp4" - download_youtube_video(TEST_YOUTUBE_URL, video_path) + download_video(TEST_YOUTUBE_URL, video_path) original = transcribe_audio(video_path, language="en") translated = translate_subtitle(original, source_lang="en", target_lang="zh-TW") @@ -182,7 +182,7 @@ def test_full_workflow_youtube_to_bilingual_srt(self, e2e_output_dir: Path) -> N srt_path = e2e_output_dir / "bilingual.srt" # Step 1: Download - download_youtube_video(TEST_YOUTUBE_URL, video_path) + download_video(TEST_YOUTUBE_URL, video_path) assert video_path.exists() # Step 2: Transcribe @@ -220,7 +220,7 @@ def test_full_workflow_youtube_to_bilingual_ass(self, e2e_output_dir: Path) -> N ass_path = e2e_output_dir / "bilingual.ass" # Full pipeline - metadata = download_youtube_video(TEST_YOUTUBE_URL, video_path) + metadata = download_video(TEST_YOUTUBE_URL, video_path) original = transcribe_audio(video_path, language="en") translated = translate_subtitle(original, source_lang="en", target_lang="zh-TW") @@ -260,7 +260,7 @@ def test_full_workflow_with_video_burn_in(self, e2e_output_dir: Path) -> None: output_video_path = e2e_output_dir / "output_with_subs.mp4" # Full pipeline - metadata = download_youtube_video(TEST_YOUTUBE_URL, video_path) + metadata = download_video(TEST_YOUTUBE_URL, video_path) original = transcribe_audio(video_path, language="en") translated = translate_subtitle(original, source_lang="en", target_lang="zh-TW") @@ -295,7 +295,7 @@ def test_invalid_youtube_url_raises_error(self, tmp_path: Path) -> None: output_path = tmp_path / "video.mp4" with pytest.raises((ValueError, DownloadError)): - download_youtube_video("https://invalid-url.com/video", output_path) + download_video("https://invalid-url.com/video", output_path) def test_missing_api_key_raises_error(self, tmp_path: Path, monkeypatch) -> None: """Test that missing API key raises appropriate error.""" diff --git a/tests/integration/test_full_subtitle_pipeline.py b/tests/integration/test_full_subtitle_pipeline.py index 2b0b2ee..7920fbf 100644 --- a/tests/integration/test_full_subtitle_pipeline.py +++ b/tests/integration/test_full_subtitle_pipeline.py @@ -7,7 +7,7 @@ import pytest -from bilingualsub.core.downloader import VideoMetadata, download_youtube_video +from bilingualsub.core.downloader import VideoMetadata, download_video from bilingualsub.core.merger import merge_subtitles from bilingualsub.core.subtitle import Subtitle from bilingualsub.core.transcriber import transcribe_audio @@ -123,7 +123,7 @@ def test_full_pipeline_to_bilingual_srt( dl_patches = _patch_downloader(video_path) with dl_patches[0], dl_patches[1], dl_patches[2]: - metadata = download_youtube_video(YOUTUBE_URL, video_path) + metadata = download_video(YOUTUBE_URL, video_path) assert isinstance(metadata, VideoMetadata) @@ -176,7 +176,7 @@ def test_full_pipeline_to_bilingual_ass( dl_patches = _patch_downloader(video_path) with dl_patches[0], dl_patches[1], dl_patches[2]: - metadata = download_youtube_video(YOUTUBE_URL, video_path) + metadata = download_video(YOUTUBE_URL, video_path) audio_path = tmp_path / "audio.mp3" audio_path.write_bytes(b"fake audio content") @@ -235,7 +235,7 @@ def test_pipeline_metadata_flows_from_downloader_to_ass_serializer( dl_patches = _patch_downloader(video_path, width=width, height=height) with dl_patches[0], dl_patches[1], dl_patches[2]: - metadata = download_youtube_video(YOUTUBE_URL, video_path) + metadata = download_video(YOUTUBE_URL, video_path) assert metadata.width == width, f"{label}: width mismatch" assert metadata.height == height, f"{label}: height mismatch" diff --git a/tests/unit/api/test_pipeline.py b/tests/unit/api/test_pipeline.py index 4bd4f40..584fb39 100644 --- a/tests/unit/api/test_pipeline.py +++ b/tests/unit/api/test_pipeline.py @@ -66,7 +66,7 @@ class TestRunPipeline: @patch("bilingualsub.api.pipeline.translate_subtitle") @patch("bilingualsub.api.pipeline.transcribe_audio") @patch("bilingualsub.api.pipeline.extract_audio") - @patch("bilingualsub.api.pipeline.download_youtube_video") + @patch("bilingualsub.api.pipeline.download_video") async def test_successful_pipeline( self, mock_download, @@ -108,7 +108,7 @@ async def test_successful_pipeline( assert "on_progress" in translate_call_kwargs assert callable(translate_call_kwargs["on_progress"]) - @patch("bilingualsub.api.pipeline.download_youtube_video") + @patch("bilingualsub.api.pipeline.download_video") async def test_download_error(self, mock_download) -> None: mock_download.side_effect = DownloadError("Network error") @@ -131,7 +131,7 @@ async def test_download_error(self, mock_download) -> None: @patch("bilingualsub.api.pipeline.translate_subtitle") @patch("bilingualsub.api.pipeline.transcribe_audio") @patch("bilingualsub.api.pipeline.extract_audio") - @patch("bilingualsub.api.pipeline.download_youtube_video") + @patch("bilingualsub.api.pipeline.download_video") async def test_transcribe_receives_audio_path( self, mock_download, @@ -173,7 +173,7 @@ async def test_transcribe_receives_audio_path( @patch("bilingualsub.api.pipeline.transcribe_audio") @patch("bilingualsub.api.pipeline.extract_audio") @patch("bilingualsub.api.pipeline.trim_video") - @patch("bilingualsub.api.pipeline.download_youtube_video") + @patch("bilingualsub.api.pipeline.download_video") async def test_pipeline_with_time_range_calls_trim( self, mock_download, @@ -186,7 +186,7 @@ async def test_pipeline_with_time_range_calls_trim( mock_serialize_ass, mock_burn, ) -> None: - """Given job with time range, download_youtube_video should receive time parameters.""" + """Given job with time range, download_video should receive time parameters.""" sub = _make_subtitle() mock_download.return_value = _make_metadata() mock_transcribe.return_value = sub @@ -217,7 +217,7 @@ async def test_pipeline_with_time_range_calls_trim( @patch("bilingualsub.api.pipeline.transcribe_audio") @patch("bilingualsub.api.pipeline.extract_audio") @patch("bilingualsub.api.pipeline.trim_video") - @patch("bilingualsub.api.pipeline.download_youtube_video") + @patch("bilingualsub.api.pipeline.download_video") async def test_pipeline_without_time_range_skips_trim( self, mock_download, @@ -250,7 +250,7 @@ async def test_pipeline_without_time_range_skips_trim( @pytest.mark.asyncio class TestRunDownload: @patch("bilingualsub.api.pipeline.extract_audio") - @patch("bilingualsub.api.pipeline.download_youtube_video") + @patch("bilingualsub.api.pipeline.download_video") async def test_run_download_sends_download_complete( self, mock_download, mock_extract_audio ) -> None: @@ -270,7 +270,7 @@ async def test_run_download_sends_download_complete( assert job.status == JobStatus.DOWNLOAD_COMPLETE @patch("bilingualsub.api.pipeline.extract_audio") - @patch("bilingualsub.api.pipeline.download_youtube_video") + @patch("bilingualsub.api.pipeline.download_video") async def test_run_download_saves_metadata( self, mock_download, mock_extract_audio ) -> None: diff --git a/tests/unit/api/test_routes.py b/tests/unit/api/test_routes.py index 9bfb4a3..c6acd49 100644 --- a/tests/unit/api/test_routes.py +++ b/tests/unit/api/test_routes.py @@ -252,13 +252,7 @@ def test_whitespace_only_returns_video(self) -> None: assert _sanitize_filename(" ") == "video" def test_strips_illegal_chars(self) -> None: - result = _sanitize_filename('My: <video> / test"') - assert "<" not in result - assert ">" not in result - assert ":" not in result - assert '"' not in result - assert "/" not in result - assert len(result) > 0 + assert _sanitize_filename('My: <video> / test"') == "My video test" def test_truncates_to_120_chars(self) -> None: long_name = "a" * 200 @@ -293,6 +287,7 @@ def test_title_with_illegal_chars_stripped(self) -> None: assert ":" not in result assert "<" not in result assert ">" not in result + assert result.startswith("My") assert result.endswith("(en_to_zh-TW).srt") def test_source_video_always_original(self) -> None: diff --git a/tests/unit/core/test_downloader.py b/tests/unit/core/test_downloader.py index 974cab1..bb40893 100644 --- a/tests/unit/core/test_downloader.py +++ b/tests/unit/core/test_downloader.py @@ -1,4 +1,4 @@ -"""Unit tests for YouTube video downloader.""" +"""Unit tests for video downloader.""" import json import subprocess @@ -11,7 +11,7 @@ DownloadError, VideoMetadata, _is_supported_url, - download_youtube_video, + download_video, ) @@ -112,8 +112,8 @@ def test_whitespace_title_raises_error(self): ) -class TestDownloadYoutubeVideo: - """Test cases for download_youtube_video function.""" +class TestDownloadVideo: + """Test cases for download_video function.""" @pytest.fixture def mock_yt_dlp(self): @@ -182,7 +182,7 @@ def extract_info_side_effect(url, download=True): mock_subprocess.run.return_value = mock_result # Download video - metadata = download_youtube_video( + metadata = download_video( "https://www.youtube.com/watch?v=dQw4w9WgXcQ", output_path ) @@ -235,7 +235,7 @@ def extract_info_side_effect(url, download=True): mock_result.stdout = valid_ffprobe_output mock_subprocess.run.return_value = mock_result - metadata = download_youtube_video("https://youtu.be/dQw4w9WgXcQ", output_path) + metadata = download_video("https://youtu.be/dQw4w9WgXcQ", output_path) assert metadata.title == "Test Video" mock_ydl_instance.extract_info.assert_called_once() @@ -243,22 +243,20 @@ def extract_info_side_effect(url, download=True): def test_empty_url_raises_error(self, tmp_path): """Test that empty URL raises error.""" with pytest.raises(ValueError, match="URL cannot be empty"): - download_youtube_video("", tmp_path / "video.mp4") + download_video("", tmp_path / "video.mp4") with pytest.raises(ValueError, match="URL cannot be empty"): - download_youtube_video(" ", tmp_path / "video.mp4") + download_video(" ", tmp_path / "video.mp4") def test_unsupported_url_raises_error(self, tmp_path): """Test that a URL with no dedicated yt-dlp extractor raises error.""" with pytest.raises(ValueError, match="yt-dlp"): - download_youtube_video( - "https://example.com/random.html", tmp_path / "video.mp4" - ) + download_video("https://example.com/random.html", tmp_path / "video.mp4") def test_output_directory_not_exists_raises_error(self): """Test that non-existent output directory raises error.""" with pytest.raises(ValueError, match="Output directory does not exist"): - download_youtube_video( + download_video( "https://www.youtube.com/watch?v=dQw4w9WgXcQ", Path("/nonexistent/path/video.mp4"), ) @@ -269,9 +267,7 @@ def test_output_file_exists_raises_error(self, tmp_path): output_path.touch() with pytest.raises(ValueError, match="Output file already exists"): - download_youtube_video( - "https://www.youtube.com/watch?v=dQw4w9WgXcQ", output_path - ) + download_video("https://www.youtube.com/watch?v=dQw4w9WgXcQ", output_path) def test_yt_dlp_download_error_raises_download_error( self, tmp_path, mock_yt_dlp, mock_subprocess @@ -284,9 +280,7 @@ def test_yt_dlp_download_error_raises_download_error( mock_ydl_instance.extract_info.side_effect = Exception("Network error") with pytest.raises(DownloadError, match="Failed to download video"): - download_youtube_video( - "https://www.youtube.com/watch?v=dQw4w9WgXcQ", output_path - ) + download_video("https://www.youtube.com/watch?v=dQw4w9WgXcQ", output_path) def test_ffprobe_error_falls_back_to_info_dict( self, tmp_path, mock_yt_dlp, mock_subprocess @@ -314,7 +308,7 @@ def extract_info_side_effect(url, download=True): mock_subprocess.run.side_effect = subprocess.CalledProcessError(1, "ffprobe") # Should succeed using info_dict fallback - metadata = download_youtube_video( + metadata = download_video( "https://www.youtube.com/watch?v=dQw4w9WgXcQ", output_path ) @@ -361,9 +355,7 @@ def extract_info_side_effect(url, download=True): # DownloadError is caught by line 85's except block and re-raised with pytest.raises(DownloadError, match="Failed to extract metadata"): - download_youtube_video( - "https://www.youtube.com/watch?v=dQw4w9WgXcQ", output_path - ) + download_video("https://www.youtube.com/watch?v=dQw4w9WgXcQ", output_path) # Verify file was cleaned up assert not output_path.exists() @@ -395,9 +387,7 @@ def extract_info_side_effect(url, download=True): mock_subprocess.run.return_value = mock_result with pytest.raises(DownloadError, match="Failed to extract metadata"): - download_youtube_video( - "https://www.youtube.com/watch?v=dQw4w9WgXcQ", output_path - ) + download_video("https://www.youtube.com/watch?v=dQw4w9WgXcQ", output_path) # Verify file was cleaned up assert not output_path.exists() @@ -443,7 +433,7 @@ def extract_info_side_effect(url, download=True): ) mock_subprocess.run.return_value = mock_result - metadata = download_youtube_video( + metadata = download_video( "https://www.youtube.com/watch?v=dQw4w9WgXcQ", output_path ) @@ -488,7 +478,7 @@ def extract_info_side_effect(url, download=True): ) mock_subprocess.run.return_value = mock_result - metadata = download_youtube_video( + metadata = download_video( "https://www.youtube.com/watch?v=dQw4w9WgXcQ", output_path ) @@ -520,7 +510,7 @@ def extract_info_side_effect(url, download=True): mock_result.stdout = valid_ffprobe_output mock_subprocess.run.return_value = mock_result - metadata = download_youtube_video( + metadata = download_video( "https://www.youtube.com/watch?v=dQw4w9WgXcQ", output_path ) @@ -557,9 +547,7 @@ def extract_info_side_effect(url, download=True): mock_result.stdout = valid_ffprobe_output mock_subprocess.run.return_value = mock_result - download_youtube_video( - "https://www.youtube.com/watch?v=dQw4w9WgXcQ", output_path - ) + download_video("https://www.youtube.com/watch?v=dQw4w9WgXcQ", output_path) ydl_opts = mock_yt_dlp.YoutubeDL.call_args[0][0] assert ydl_opts["cookiefile"] == str(cookie_file) @@ -591,7 +579,7 @@ def test_all_youtube_url_formats_accepted(self, tmp_path): ) output_path = tmp_path / f"video_{valid_urls.index(url)}.mp4" # Should not raise ValueError - download_youtube_video(url, output_path) + download_video(url, output_path) class TestDownloadWithTimeRange: @@ -658,7 +646,7 @@ def extract_info_side_effect(url, download=True): mock_subprocess.run.return_value = mock_result # Download with time range - metadata = download_youtube_video( + metadata = download_video( "https://www.youtube.com/watch?v=dQw4w9WgXcQ", output_path, start_time=30.0, @@ -701,7 +689,7 @@ def extract_info_side_effect(url, download=True): mock_subprocess.run.side_effect = subprocess.CalledProcessError(1, "ffprobe") # Download with only start_time - metadata = download_youtube_video( + metadata = download_video( "https://www.youtube.com/watch?v=dQw4w9WgXcQ", output_path, start_time=30.0, @@ -742,7 +730,7 @@ def extract_info_side_effect(url, download=True): mock_subprocess.run.side_effect = subprocess.CalledProcessError(1, "ffprobe") # Download with only end_time - metadata = download_youtube_video( + metadata = download_video( "https://www.youtube.com/watch?v=dQw4w9WgXcQ", output_path, end_time=90.0, @@ -784,7 +772,7 @@ def extract_info_side_effect(url, download=True): mock_subprocess.run.return_value = mock_result # Download without time range - metadata = download_youtube_video( + metadata = download_video( "https://www.youtube.com/watch?v=dQw4w9WgXcQ", output_path ) @@ -823,7 +811,7 @@ def extract_info_side_effect(url, download=True): mock_subprocess.run.side_effect = subprocess.CalledProcessError(1, "ffprobe") # Download with time range (fallback format) - metadata = download_youtube_video( + metadata = download_video( "https://www.youtube.com/watch?v=dQw4w9WgXcQ", output_path, start_time=30.0, From ae62e34b2e6aa741b2d13b6ffd81d7605daacd24 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maple=EF=BC=81?= <mapleee723@gmail.com> Date: Sat, 18 Apr 2026 12:25:20 +0800 Subject: [PATCH 3/5] fix(downloader): pull video title from yt-dlp info_dict, not mp4 tags MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit download_video was populating VideoMetadata.title from ffprobe's MP4 container tags, which for a freshly-downloaded video is empty — ffprobe falls back to the filename stem `video`. The yt-dlp info_dict carries the upstream platform title (YouTube / X / TikTok…), matching the pre-existing treatment of `description`. Previously unobservable because the download filename was hardcoded to `bilingualsub.{ext}`. After the rename to use `video_title` in the Content-Disposition filename, every downloaded file showed up as `video (original).{ext}`. E2E run with "Me at the zoo" (YouTube's first video) now produces `Me at the zoo (original).mp4` as expected. --- src/bilingualsub/core/downloader.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/bilingualsub/core/downloader.py b/src/bilingualsub/core/downloader.py index b1b06af..763c6df 100644 --- a/src/bilingualsub/core/downloader.py +++ b/src/bilingualsub/core/downloader.py @@ -112,8 +112,11 @@ def download_video( output_path.unlink() raise DownloadError(f"Failed to extract metadata: {e}") from e - # yt-dlp's description is not reliably available in ffprobe output. - # Always prefer the description extracted from info_dict. + # yt-dlp's title and description are not reliably present in MP4 container + # tags (ffprobe reads those). Prefer the upstream metadata from info_dict. + info_title = info_dict.get("title") + if isinstance(info_title, str) and info_title.strip(): + metadata.title = info_title.strip() metadata.description = _sanitize_description(info_dict.get("description", "")) return metadata From 5d67db36503f4e2e14d8ab2e0b95b905ff170b6d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maple=EF=BC=81?= <mapleee723@gmail.com> Date: Sat, 18 Apr 2026 12:49:06 +0800 Subject: [PATCH 4/5] test: add API contract IT + live-download E2E for filename + URL changes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit tests/integration/test_api_contract.py (19 tests, @integration): - JobCreateRequest schema: source_url accepted; youtube_url (old field), both fields simultaneously, non-URL strings, and empty bodies all get 422 via HttpUrl + extra="forbid". - HTTP-layer acceptance of any valid HttpUrl (YouTube, youtu.be, X, Twitter, TikTok, Vimeo) — the test is intentionally scoped to schema layer only; extractor-level coverage is in the unit tests and in the async failure test below. - Async unsupported-URL path: example.com creates a job then fails via the pipeline with error.code=invalid_input and a yt-dlp-specific detail. 10-second polling window to survive CI thread contention. - Content-Disposition filename format: parametrized over title, language pair, and file type, covering translated outputs, same-lang original suffix, empty-title fallback, and illegal-char sanitisation. - Both fixtures use `with TestClient(app) as c: yield c` so Starlette actually runs the lifespan and populates app.state.job_manager. tests/e2e/test_filename_e2e.py (2 tests, @e2e, gated by ENABLE_LIVE_DOWNLOAD=1): - Real yt-dlp download of "Me at the zoo" through FastAPI, then asserts Content-Disposition filename == "Me at the zoo (original).{mp4|mp3}". - Specifically exists to catch regression of the bug where video_title was read from ffprobe MP4 container tags (empty) instead of yt-dlp info_dict.title. If the live download fails we pytest.fail — silent skip would hide the regression the test is built to detect. tests/unit/core/test_downloader.py: - Rename test_ffprobe_missing_title_uses_filename -> test_info_dict_ title_overrides_ffprobe. The old expectation (fall back to filename stem) tested behaviour that was itself the bug; the new assertion matches the documented contract that info_dict.title always wins. --- tests/e2e/test_filename_e2e.py | 225 ++++++++++++++++ tests/integration/test_api_contract.py | 347 +++++++++++++++++++++++++ tests/unit/core/test_downloader.py | 18 +- 3 files changed, 584 insertions(+), 6 deletions(-) create mode 100644 tests/e2e/test_filename_e2e.py create mode 100644 tests/integration/test_api_contract.py diff --git a/tests/e2e/test_filename_e2e.py b/tests/e2e/test_filename_e2e.py new file mode 100644 index 0000000..d53998a --- /dev/null +++ b/tests/e2e/test_filename_e2e.py @@ -0,0 +1,225 @@ +"""E2E test: real yt-dlp download validates info_dict title flows into Content-Disposition. + +This test hits the real YouTube network but mocks Groq (no API key needed). +Goal: verify the bug fix where video_title was previously sourced from ffprobe +MP4 container tags (which are empty) rather than yt-dlp info_dict.title. + +Gate: set ENABLE_LIVE_DOWNLOAD=1 to run. Skipped by default in CI. +""" + +from __future__ import annotations + +import os +import time +import urllib.parse +from typing import TYPE_CHECKING, Any + +import pytest +from fastapi.testclient import TestClient + +from bilingualsub.api.app import create_app + +if TYPE_CHECKING: + from collections.abc import Generator + +# --------------------------------------------------------------------------- +# Environment gate +# --------------------------------------------------------------------------- + +ENABLE_LIVE_DOWNLOAD = os.getenv("ENABLE_LIVE_DOWNLOAD") == "1" + +pytestmark = pytest.mark.skipif( + not ENABLE_LIVE_DOWNLOAD, + reason="ENABLE_LIVE_DOWNLOAD not set; skipping live yt-dlp download E2E", +) + +# --------------------------------------------------------------------------- +# Constants +# --------------------------------------------------------------------------- + +# YouTube's first public video — 19 seconds, stable, public. +_TEST_URL = "https://www.youtube.com/watch?v=jNQXAC9IVRw" +_EXPECTED_TITLE = "Me at the zoo" + +# How long (seconds) to wait for the download phase to complete. +_DOWNLOAD_TIMEOUT_SECONDS = 90 +_POLL_INTERVAL_SECONDS = 1.0 + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _filename_from_disposition(header: str) -> str: + """Extract filename from Content-Disposition header. + + Handles RFC 5987 ``filename*=utf-8''<pct-encoded>`` and + the plain ``filename="..."`` fallback. + """ + marker_rfc5987 = "filename*=utf-8''" + if marker_rfc5987 in header: + return urllib.parse.unquote(header.split(marker_rfc5987, 1)[1]) + marker_plain = 'filename="' + if marker_plain in header: + return header.split(marker_plain, 1)[1].rstrip('"') + raise AssertionError(f"Unrecognized Content-Disposition header: {header}") + + +def _poll_until_download_complete( + client: TestClient, + job_id: str, + timeout: float = _DOWNLOAD_TIMEOUT_SECONDS, +) -> dict[str, Any]: + """Poll GET /api/jobs/{job_id} until status is download_complete or failed. + + Returns the final status response dict. + Raises pytest.fail if timeout is exceeded or network error is detected. + """ + deadline = time.monotonic() + timeout + last_status: dict[str, Any] = {} + + while time.monotonic() < deadline: + resp = client.get(f"/api/jobs/{job_id}") + if resp.status_code != 200: + pytest.fail( + f"GET /api/jobs/{job_id} returned {resp.status_code}: {resp.text}" + ) + + last_status = resp.json() + status = last_status.get("status") + + if status in ("download_complete", "failed"): + return last_status + + time.sleep(_POLL_INTERVAL_SECONDS) + + pytest.fail( + f"Timed out after {timeout}s waiting for download_complete. " + f"Last status: {last_status.get('status')!r}. " + "Check network connectivity or increase _DOWNLOAD_TIMEOUT_SECONDS." + ) + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + + +@pytest.fixture(scope="module") +def live_client() -> Generator[TestClient, None, None]: + """Create a TestClient for live download tests. + + Must enter the context manager so Starlette starts the lifespan and + populates ``app.state.job_manager``; without this every request fails + with ``AttributeError`` on ``app.state``. + """ + + app = create_app() + with TestClient(app, raise_server_exceptions=False) as client: + yield client + + +# --------------------------------------------------------------------------- +# Tests +# --------------------------------------------------------------------------- + + +@pytest.mark.e2e +def test_source_video_content_disposition_uses_info_dict_title( + live_client: TestClient, +) -> None: + """Real yt-dlp download: source_video Content-Disposition uses info_dict title. + + Arrange: submit "Me at the zoo" URL (19-second public YouTube video). + Act: + 1. POST /api/jobs with source_url. + 2. Poll until status == download_complete (stops before Whisper/Groq). + 3. GET /api/jobs/{id}/download/source_video. + Assert: + - Download phase completes without error. + - Content-Disposition filename == "Me at the zoo (original).mp4". + + This test covers the regression where video_title was sourced from the + ffprobe MP4 container tag (empty string) instead of yt-dlp info_dict.title, + which caused filenames to fall back to "video (original).mp4". + """ + # 1. Create job. + create_resp = live_client.post("/api/jobs", json={"source_url": _TEST_URL}) + assert create_resp.status_code == 200, create_resp.text + job_id = create_resp.json()["job_id"] + + # 2. Poll until download_complete. + status_data = _poll_until_download_complete(live_client, job_id) + if status_data["status"] == "failed": + error = status_data.get("error", {}) + pytest.fail( + f"Live download failed; cannot verify info_dict.title regression. " + f"{error.get('code')} — {error.get('detail')}. " + "If the CI runner cannot reach YouTube, disable this test via the " + "ENABLE_LIVE_DOWNLOAD gate instead of silently skipping." + ) + + assert status_data["status"] == "download_complete" + + # 3. Download source_video and check Content-Disposition. + dl_resp = live_client.get(f"/api/jobs/{job_id}/download/source_video") + assert dl_resp.status_code == 200, dl_resp.text + + disposition = dl_resp.headers.get("content-disposition", "") + assert disposition, "Expected Content-Disposition header to be present" + + actual_filename = _filename_from_disposition(disposition) + assert actual_filename == f"{_EXPECTED_TITLE} (original).mp4", ( + f"Expected '{_EXPECTED_TITLE} (original).mp4', got {actual_filename!r}. " + "This likely means video_title is sourced from an empty ffprobe container " + "tag instead of yt-dlp info_dict.title." + ) + + +@pytest.mark.e2e +def test_audio_content_disposition_uses_info_dict_title( + live_client: TestClient, +) -> None: + """Real yt-dlp download: audio Content-Disposition uses info_dict title. + + Reuses the same video; creates a separate job to keep tests independent. + + Arrange: submit "Me at the zoo" URL. + Act: + 1. POST /api/jobs. + 2. Poll until download_complete. + 3. GET /api/jobs/{id}/download/audio. + Assert: filename == "Me at the zoo (original).mp3". + """ + # 1. Create job. + create_resp = live_client.post("/api/jobs", json={"source_url": _TEST_URL}) + assert create_resp.status_code == 200, create_resp.text + job_id = create_resp.json()["job_id"] + + # 2. Poll until download_complete. + status_data = _poll_until_download_complete(live_client, job_id) + if status_data["status"] == "failed": + error = status_data.get("error", {}) + pytest.fail( + f"Live download failed; cannot verify info_dict.title regression. " + f"{error.get('code')} — {error.get('detail')}. " + "If the CI runner cannot reach YouTube, disable this test via the " + "ENABLE_LIVE_DOWNLOAD gate instead of silently skipping." + ) + + assert status_data["status"] == "download_complete" + + # 3. Download audio and check Content-Disposition. + dl_resp = live_client.get(f"/api/jobs/{job_id}/download/audio") + assert dl_resp.status_code == 200, dl_resp.text + + disposition = dl_resp.headers.get("content-disposition", "") + assert disposition, "Expected Content-Disposition header to be present" + + actual_filename = _filename_from_disposition(disposition) + assert actual_filename == f"{_EXPECTED_TITLE} (original).mp3", ( + f"Expected '{_EXPECTED_TITLE} (original).mp3', got {actual_filename!r}. " + "This likely means video_title is sourced from an empty ffprobe container " + "tag instead of yt-dlp info_dict.title." + ) diff --git a/tests/integration/test_api_contract.py b/tests/integration/test_api_contract.py new file mode 100644 index 0000000..d025ada --- /dev/null +++ b/tests/integration/test_api_contract.py @@ -0,0 +1,347 @@ +"""Integration tests for the API contract introduced in feat/readable-filename-multiplatform. + +Covers: +- JobCreateRequest schema: source_url field, extra="forbid" enforcement +- URL extractor acceptance (yt-dlp supported platforms) +- Async error path for unsupported URLs (invalid_input pipeline error) +- Download filename Content-Disposition header format +""" + +from __future__ import annotations + +import time +import urllib.parse +from typing import TYPE_CHECKING, Any + +import pytest +from fastapi.testclient import TestClient + +if TYPE_CHECKING: + from collections.abc import Generator + from pathlib import Path + +from bilingualsub.api.app import create_app +from bilingualsub.api.constants import FileType, JobStatus + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _filename_from_disposition(header: str) -> str: + """Extract filename from Content-Disposition header. + + Handles both RFC 5987 ``filename*=utf-8''<pct-encoded>`` and + the plain ``filename="..."`` fallback. + """ + marker_rfc5987 = "filename*=utf-8''" + if marker_rfc5987 in header: + return urllib.parse.unquote(header.split(marker_rfc5987, 1)[1]) + marker_plain = 'filename="' + if marker_plain in header: + return header.split(marker_plain, 1)[1].rstrip('"') + raise AssertionError(f"Unrecognized Content-Disposition header: {header}") + + +# --------------------------------------------------------------------------- +# Shared fixtures +# --------------------------------------------------------------------------- + + +@pytest.fixture() +def client() -> Generator[TestClient, None, None]: + """Create a TestClient that runs the full app lifespan (sets app.state). + + Must be used as a context manager internally — yielding ensures lifespan + teardown runs after the test. + """ + app = create_app() + with TestClient(app, raise_server_exceptions=False) as c: + yield c + + +@pytest.fixture() +def completed_job_factory(tmp_path: Path) -> Generator[Any, None, None]: + """Factory that injects a completed Job directly into JobManager. + + Yields a callable ``make(title, source_lang, target_lang) -> (client, job_id)``. + The TestClient is kept alive (via context manager) for the fixture's lifetime + so ``app.state.job_manager`` is accessible and the injected job persists. + """ + _app = create_app() + + with TestClient(_app, raise_server_exceptions=False) as _client: + + def make( + title: str, + source_lang: str, + target_lang: str, + ) -> tuple[TestClient, str]: + manager = _app.state.job_manager + job = manager.create_job( + source_url="https://www.youtube.com/watch?v=x", + source_lang=source_lang, + target_lang=target_lang, + ) + job.video_title = title + job.status = JobStatus.COMPLETED + + # Create a dummy file for every FileType so all download routes work. + # Use unique subdirs per call to avoid filename collisions across + # parametrize iterations. + job_tmp = tmp_path / job.id + job_tmp.mkdir(parents=True, exist_ok=True) + for ft in ( + FileType.SRT, + FileType.ASS, + FileType.VIDEO, + FileType.AUDIO, + FileType.SOURCE_VIDEO, + ): + p = job_tmp / f"{ft.value}.bin" + p.write_bytes(b"x") + job.output_files[ft] = p + + return _client, job.id + + yield make + + +# =========================================================================== +# 1. JobCreateRequest schema contract +# =========================================================================== + + +@pytest.mark.integration +class TestJobCreateRequestSchema: + """Verify that the POST /api/jobs endpoint enforces the new schema contract.""" + + def test_valid_source_url_returns_job_id(self, client: TestClient) -> None: + """POST with source_url returns 200 and a non-empty job_id string. + + Arrange: valid JobCreateRequest body using the new source_url field. + Act: POST /api/jobs. + Assert: status 200, response contains string job_id. + """ + response = client.post( + "/api/jobs", + json={"source_url": "https://www.youtube.com/watch?v=abc"}, + ) + + assert response.status_code == 200 + data = response.json() + assert isinstance(data["job_id"], str) + assert data["job_id"] + + def test_old_field_youtube_url_rejected(self, client: TestClient) -> None: + """POST with the old youtube_url field is rejected with 422 (extra='forbid'). + + Arrange: body uses youtube_url (the previous field name). + Act: POST /api/jobs. + Assert: status 422 — Pydantic extra='forbid' disallows unknown fields. + """ + response = client.post( + "/api/jobs", + json={"youtube_url": "https://www.youtube.com/watch?v=abc"}, + ) + + assert response.status_code == 422 + + def test_both_fields_rejected(self, client: TestClient) -> None: + """POST with both source_url and youtube_url fields is rejected with 422. + + Arrange: body contains source_url (valid) AND youtube_url (extra field). + Act: POST /api/jobs. + Assert: status 422 — extra='forbid' triggers even when source_url is present. + """ + response = client.post( + "/api/jobs", + json={ + "source_url": "https://www.youtube.com/watch?v=abc", + "youtube_url": "https://www.youtube.com/watch?v=abc", + }, + ) + + assert response.status_code == 422 + + def test_non_url_source_url_rejected(self, client: TestClient) -> None: + """POST with a non-URL string in source_url is rejected with 422. + + Arrange: source_url is plain text, not a valid HTTP URL. + Act: POST /api/jobs. + Assert: status 422 — Pydantic HttpUrl rejects the value. + """ + response = client.post( + "/api/jobs", + json={"source_url": "not-a-url"}, + ) + + assert response.status_code == 422 + + def test_empty_body_rejected(self, client: TestClient) -> None: + """POST with an empty body is rejected with 422 (missing required field). + + Arrange: empty JSON object. + Act: POST /api/jobs. + Assert: status 422 — source_url is required. + """ + response = client.post("/api/jobs", json={}) + + assert response.status_code == 422 + + +# =========================================================================== +# 2. URL extractor acceptance +# =========================================================================== + + +@pytest.mark.integration +@pytest.mark.parametrize( + "url", + [ + "https://www.youtube.com/watch?v=dQw4w9WgXcQ", + "https://youtu.be/dQw4w9WgXcQ", + "https://x.com/elonmusk/status/1234567890123456789", + "https://twitter.com/jack/status/20", + "https://www.tiktok.com/@user/video/1234567890", + "https://vimeo.com/12345678", + ], +) +def test_valid_http_urls_return_job_id(client: TestClient, url: str) -> None: + """POST with any syntactically valid HttpUrl returns 200 at the HTTP layer. + + HttpUrl is the only synchronous URL check; yt-dlp extractor matching runs + inside the background ``run_download`` task. So this test only proves the + schema accepts these URL shapes — it does NOT prove yt-dlp has an + extractor for any of them. Extractor-level coverage lives in + tests/unit/core/test_downloader.py::TestIsSupportedUrl and in + test_unsupported_url_fails_with_invalid_input below. + """ + response = client.post("/api/jobs", json={"source_url": url}) + + assert response.status_code == 200 + assert "job_id" in response.json() + + +# =========================================================================== +# 3. Async error path: unsupported URL +# =========================================================================== + + +@pytest.mark.integration +def test_unsupported_url_fails_with_invalid_input(client: TestClient) -> None: + """An unsupported URL creates a job then fails asynchronously with invalid_input. + + Arrange: URL that passes HttpUrl validation but has no yt-dlp extractor. + Act: + 1. POST /api/jobs → 200, job_id returned. + 2. Poll GET /api/jobs/{id} until status=failed (max 10s — the check + itself is in-memory but runs inside asyncio.to_thread, which can + queue behind other threads on a loaded CI runner). + Assert: + - job status is "failed". + - error.code == "invalid_input". + - error.detail contains "yt-dlp". + """ + response = client.post( + "/api/jobs", + json={"source_url": "https://example.com/some-random-page.html"}, + ) + assert response.status_code == 200 + job_id = response.json()["job_id"] + + # Poll until failed or timeout. + deadline = time.monotonic() + 10.0 + status_data: dict[str, Any] = {} + while time.monotonic() < deadline: + status_resp = client.get(f"/api/jobs/{job_id}") + assert status_resp.status_code == 200 + status_data = status_resp.json() + if status_data["status"] == "failed": + break + time.sleep(0.2) + + assert status_data.get("status") == "failed", ( + f"Expected status=failed, got {status_data.get('status')!r}" + ) + error = status_data.get("error") + assert error is not None, "Expected error detail in response" + assert error["code"] == "invalid_input" + assert "yt-dlp" in (error.get("detail") or "") + + +# =========================================================================== +# 4. Download filename Content-Disposition format +# =========================================================================== + + +@pytest.mark.integration +@pytest.mark.parametrize( + "title,source_lang,target_lang,file_type,expected_filename", + [ + # Translated subtitle files use (src_to_tgt) suffix. + ( + "Me at the zoo", + "en", + "zh-TW", + FileType.SRT, + "Me at the zoo (en_to_zh-TW).srt", + ), + ( + "Me at the zoo", + "en", + "zh-TW", + FileType.ASS, + "Me at the zoo (en_to_zh-TW).ass", + ), + # Source video and audio always use (original). + ( + "Me at the zoo", + "en", + "zh-TW", + FileType.SOURCE_VIDEO, + "Me at the zoo (original).mp4", + ), + ( + "Me at the zoo", + "en", + "zh-TW", + FileType.AUDIO, + "Me at the zoo (original).mp3", + ), + # Same src/tgt lang → (original) even for SRT. + ("My Video", "en", "en", FileType.SRT, "My Video (original).srt"), + # Empty title → fallback to "video". + ("", "en", "zh-TW", FileType.SRT, "video (en_to_zh-TW).srt"), + # Illegal filesystem chars are stripped. + ('Bad: <chars>"/', "en", "zh-TW", FileType.SRT, "Bad chars (en_to_zh-TW).srt"), + ], +) +def test_content_disposition_filename( + completed_job_factory: Any, + title: str, + source_lang: str, + target_lang: str, + file_type: FileType, + expected_filename: str, +) -> None: + """Content-Disposition header carries the correctly formatted filename. + + Arrange: inject a completed job with the given title/langs, pre-create + dummy output files. + Act: GET /api/jobs/{id}/download/{file_type}. + Assert: filename extracted from Content-Disposition matches expected_filename. + """ + _client, job_id = completed_job_factory(title, source_lang, target_lang) + + response = _client.get(f"/api/jobs/{job_id}/download/{file_type.value}") + + assert response.status_code == 200, response.text + disposition = response.headers.get("content-disposition", "") + assert disposition, "Expected Content-Disposition header to be present" + + actual_filename = _filename_from_disposition(disposition) + assert actual_filename == expected_filename, ( + f"Expected {expected_filename!r}, got {actual_filename!r} " + f"(full header: {disposition!r})" + ) diff --git a/tests/unit/core/test_downloader.py b/tests/unit/core/test_downloader.py index bb40893..6e0f32c 100644 --- a/tests/unit/core/test_downloader.py +++ b/tests/unit/core/test_downloader.py @@ -392,10 +392,15 @@ def extract_info_side_effect(url, download=True): # Verify file was cleaned up assert not output_path.exists() - def test_ffprobe_missing_title_uses_filename( + def test_info_dict_title_overrides_ffprobe( self, tmp_path, mock_yt_dlp, mock_subprocess ): - """Test that missing title in ffprobe uses filename as fallback.""" + """info_dict.title is authoritative; ffprobe container tags are ignored. + + MP4 container tags are usually empty for freshly-downloaded files, so + ffprobe would fall back to the filename stem. yt-dlp's info_dict carries + the real platform title (YouTube / X / TikTok), and that must win. + """ output_path = tmp_path / "my_video.mp4" mock_ydl_instance = MagicMock() @@ -404,7 +409,7 @@ def test_ffprobe_missing_title_uses_filename( def extract_info_side_effect(url, download=True): output_path.touch() return { - "title": "Test Video", + "title": "Real Platform Title", "duration": 120.0, "width": 1920, "height": 1080, @@ -413,7 +418,8 @@ def extract_info_side_effect(url, download=True): mock_ydl_instance.extract_info.side_effect = extract_info_side_effect - # Mock ffprobe without title + # ffprobe returns no title (typical for fresh downloads) and would + # fall back to the filename stem "my_video" — but info_dict should win. mock_result = Mock() mock_result.stdout = json.dumps( { @@ -427,7 +433,7 @@ def extract_info_side_effect(url, download=True): ], "format": { "duration": "120.5", - "tags": {}, # No title + "tags": {}, }, } ) @@ -437,7 +443,7 @@ def extract_info_side_effect(url, download=True): "https://www.youtube.com/watch?v=dQw4w9WgXcQ", output_path ) - assert metadata.title == "my_video" + assert metadata.title == "Real Platform Title" def test_ffprobe_fractional_fps(self, tmp_path, mock_yt_dlp, mock_subprocess): """Test parsing fractional FPS like 30000/1001.""" From 60707875858490c85114d9f557e197c99953cd0c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maple=EF=BC=81?= <mapleee723@gmail.com> Date: Mon, 20 Apr 2026 11:05:41 +0800 Subject: [PATCH 5/5] refactor: address code review + test review findings Production code: - Delete run_pipeline dead code (103 lines) - Replace list[Any] with list[type] for extractor classes - Add @lru_cache to _is_supported_url (1862 extractors) - Replace bare except Exception with specific exceptions - Add _FILENAME_BAD_CHARS comment, extract _MAX_UPLOAD_BYTES constant - Remove WHAT comments, remove dead url_placeholder i18n key Test code: - Delete 3 Implementation-Detail/Mockery tests in test_pipeline - Migrate all tests from run_pipeline to run_download/run_subtitle - Remove mock internal assertions in test_downloader - Use exact value assertion in test_routes - Rename misleading IT to clarify PlayRes fixed design - Add coverage: extract_audio failure + sanitize edge cases Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> --- frontend/src/i18n/en.json | 1 - frontend/src/i18n/zh-TW.json | 1 - src/bilingualsub/api/pipeline.py | 106 ------------- src/bilingualsub/api/routes.py | 8 +- src/bilingualsub/core/downloader.py | 8 +- .../test_full_subtitle_pipeline.py | 4 +- tests/unit/api/test_pipeline.py | 143 +++++------------- tests/unit/api/test_routes.py | 16 +- tests/unit/core/test_downloader.py | 24 +-- 9 files changed, 63 insertions(+), 248 deletions(-) diff --git a/frontend/src/i18n/en.json b/frontend/src/i18n/en.json index b83654f..b882099 100644 --- a/frontend/src/i18n/en.json +++ b/frontend/src/i18n/en.json @@ -17,7 +17,6 @@ "download_audio": "Download Audio" }, "form": { - "url_placeholder": "Paste video URL (YouTube / X / Twitter / TikTok…)", "paste_placeholder": "Paste video URL (YouTube / X / Twitter / TikTok…)", "source_lang": "Source Language", "target_lang": "Target Language", diff --git a/frontend/src/i18n/zh-TW.json b/frontend/src/i18n/zh-TW.json index fb0b364..e3a1123 100644 --- a/frontend/src/i18n/zh-TW.json +++ b/frontend/src/i18n/zh-TW.json @@ -17,7 +17,6 @@ "download_audio": "下載音訊" }, "form": { - "url_placeholder": "貼上影片網址(YouTube / X / Twitter / TikTok…)", "paste_placeholder": "貼上影片網址(YouTube / X / Twitter / TikTok…)", "source_lang": "原始語言", "target_lang": "翻譯語言", diff --git a/src/bilingualsub/api/pipeline.py b/src/bilingualsub/api/pipeline.py index d2cce4f..138afea 100644 --- a/src/bilingualsub/api/pipeline.py +++ b/src/bilingualsub/api/pipeline.py @@ -385,112 +385,6 @@ async def run_subtitle(job: Job) -> None: ) -async def run_pipeline(job: Job) -> None: - """Execute the full subtitle generation pipeline for a job. - - Steps: download -> transcribe -> translate -> merge/serialize -> burn. - All blocking core calls are wrapped in asyncio.to_thread(). - Progress events are sent to job.event_queue at each step. - """ - log = logger.bind(job_id=job.id) - work_dir = Path(tempfile.mkdtemp(prefix=f"bilingualsub_{job.id}_")) - - try: - # --- Step 1: Download or use local file --- - video_path, metadata = await _acquire_video(job, work_dir, log) - # Trimming is now handled during download via start_time/end_time - - # --- Step 1.5: Extract audio --- - audio_path = await _extract_audio_step(job, video_path, work_dir, log) - - # --- Step 2: Transcribe --- - _send_progress( - job, JobStatus.TRANSCRIBING, 20.0, "transcribe", "Transcribing audio" - ) - t0 = time.monotonic() - original_sub = await asyncio.to_thread( - transcribe_audio, audio_path, language=job.source_lang - ) - log.info( - "step_done", - step="transcribe", - duration_ms=int((time.monotonic() - t0) * 1000), - ) - - # --- Step 3: Translate --- - _send_progress( - job, JobStatus.TRANSLATING, 50.0, "translate", "Translating subtitles" - ) - t0 = time.monotonic() - - _on_translate_progress = _make_translate_progress_cb(job) - _on_rate_limit = _make_rate_limit_cb(job) - - translated_sub = await asyncio.to_thread( - translate_subtitle, - original_sub, - source_lang=job.source_lang, - target_lang=job.target_lang, - video_title=metadata.title, - video_description=metadata.description, - on_progress=_on_translate_progress, - on_rate_limit=_on_rate_limit, - ) - log.info( - "step_done", - step="translate", - duration_ms=int((time.monotonic() - t0) * 1000), - ) - - # --- Step 4: Merge & Serialize --- - _send_progress( - job, JobStatus.MERGING, 70.0, "merge", "Merging bilingual subtitles" - ) - t0 = time.monotonic() - - merged_entries = await asyncio.to_thread( - merge_subtitles, original_sub.entries, translated_sub.entries - ) - merged_sub = Subtitle(entries=merged_entries) - - srt_content = serialize_srt(merged_sub) - srt_path = work_dir / "subtitle.srt" - srt_path.write_text(srt_content, encoding="utf-8") - job.output_files[FileType.SRT] = srt_path - - ass_content = serialize_bilingual_ass( - original_sub, - translated_sub, - video_width=metadata.width, - video_height=metadata.height, - ) - ass_path = work_dir / "subtitle.ass" - ass_path.write_text(ass_content, encoding="utf-8") - job.output_files[FileType.ASS] = ass_path - - log.info( - "step_done", step="merge", duration_ms=int((time.monotonic() - t0) * 1000) - ) - - # --- Step 5: Save source video & complete --- - job.output_files[FileType.SOURCE_VIDEO] = video_path - _send_complete(job) - log.info("pipeline_complete", job_id=job.id) - - except PipelineError: - raise - except Exception as exc: - pipeline_err = _to_pipeline_error(exc) - _send_error( - job, pipeline_err.code, pipeline_err.message, pipeline_err.detail or "" - ) - log.error( - "pipeline_failed", - error_code=pipeline_err.code, - error=str(exc), - ) - - async def run_burn(job: Job, srt_content: str) -> None: """Burn user-edited SRT into the source video.""" log = logger.bind(job_id=job.id) diff --git a/src/bilingualsub/api/routes.py b/src/bilingualsub/api/routes.py index 7fc7285..348b5bb 100644 --- a/src/bilingualsub/api/routes.py +++ b/src/bilingualsub/api/routes.py @@ -72,8 +72,11 @@ class _FileMeta(NamedTuple): FileType.SOURCE_VIDEO: _FileMeta("mp4", "video/mp4"), } +# Windows-reserved + POSIX control characters (NTFS/FAT32/ext4 safe) _FILENAME_BAD_CHARS = re.compile(r'[<>:"/\\|?*\x00-\x1f]') +_MAX_UPLOAD_BYTES = 500 * 1024 * 1024 # 500 MB + def _sanitize_filename(name: str) -> str: """Remove filesystem-unsafe characters and truncate to 120 chars.""" @@ -148,7 +151,6 @@ async def create_job_from_upload( request: Request, ) -> JobCreateResponse: """Create a subtitle generation job from an uploaded file.""" - # Validate file extension filename = file.filename or "" suffix = Path(filename).suffix.lower() if suffix not in _ALLOWED_UPLOAD_EXTENSIONS: @@ -157,11 +159,9 @@ async def create_job_from_upload( detail=f"Allowed: {', '.join(sorted(_ALLOWED_UPLOAD_EXTENSIONS))}", ) - # Sanitize filename to prevent path traversal safe_name = Path(filename).name or f"upload{suffix}" - # Save uploaded file to temp directory with size limit - max_size = 500 * 1024 * 1024 # 500 MB + max_size = _MAX_UPLOAD_BYTES tmp_dir = Path(tempfile.mkdtemp(prefix="bilingualsub_upload_")) saved_path = tmp_dir / safe_name bytes_written = 0 diff --git a/src/bilingualsub/core/downloader.py b/src/bilingualsub/core/downloader.py index 763c6df..28ea255 100644 --- a/src/bilingualsub/core/downloader.py +++ b/src/bilingualsub/core/downloader.py @@ -6,6 +6,7 @@ import subprocess # nosec B404 from collections.abc import Callable from dataclasses import dataclass +from functools import lru_cache from pathlib import Path from typing import Any @@ -100,7 +101,7 @@ def download_video( metadata = _extract_metadata_from_info_dict( info_dict, output_path, start_time, end_time ) - except Exception as e: + except (DownloadError, ValueError, KeyError, TypeError) as e: # Clean up downloaded file on metadata extraction failure if output_path.exists(): output_path.unlink() @@ -129,14 +130,15 @@ def _sanitize_description(raw: Any) -> str: return raw.strip() -_SUPPORTED_EXTRACTOR_CLASSES: list[Any] = [ +_SUPPORTED_EXTRACTOR_CLASSES: list[type] = [ cls for cls in gen_extractor_classes() if cls.IE_NAME != "generic" ] +@lru_cache(maxsize=256) def _is_supported_url(url: str) -> bool: """Check if yt-dlp has a dedicated extractor for this URL.""" - return any(cls.suitable(url) for cls in _SUPPORTED_EXTRACTOR_CLASSES) + return any(cls.suitable(url) for cls in _SUPPORTED_EXTRACTOR_CLASSES) # type: ignore[attr-defined] def _download_video( diff --git a/tests/integration/test_full_subtitle_pipeline.py b/tests/integration/test_full_subtitle_pipeline.py index 7920fbf..f8289d1 100644 --- a/tests/integration/test_full_subtitle_pipeline.py +++ b/tests/integration/test_full_subtitle_pipeline.py @@ -216,13 +216,13 @@ def test_full_pipeline_to_bilingual_ass( # Verify dialogue line format assert "Dialogue: 0," in written - def test_pipeline_metadata_flows_from_downloader_to_ass_serializer( + def test_ass_output_uses_fixed_playres_regardless_of_video_resolution( self, tmp_path: Path, set_fake_api_key: None, sample_whisper_api_response, ) -> None: - """VideoMetadata width/height flows correctly to ASS PlayResX/PlayResY across resolutions.""" + """PlayRes is fixed at 1920x1080 for consistent rendering, regardless of input resolution.""" resolutions = [ (1920, 1080, "1080p"), (1280, 720, "720p"), diff --git a/tests/unit/api/test_pipeline.py b/tests/unit/api/test_pipeline.py index 584fb39..f87f099 100644 --- a/tests/unit/api/test_pipeline.py +++ b/tests/unit/api/test_pipeline.py @@ -8,9 +8,10 @@ from bilingualsub.api.constants import FileType, JobStatus, SSEEvent from bilingualsub.api.jobs import Job -from bilingualsub.api.pipeline import run_download, run_pipeline, run_subtitle +from bilingualsub.api.pipeline import run_download, run_subtitle from bilingualsub.core.downloader import DownloadError, VideoMetadata from bilingualsub.core.subtitle import Subtitle, SubtitleEntry +from bilingualsub.utils.ffmpeg import FFmpegError def _make_job_with_time_range() -> Job: @@ -59,7 +60,6 @@ def _make_metadata() -> VideoMetadata: @pytest.mark.unit @pytest.mark.asyncio class TestRunPipeline: - @patch("bilingualsub.api.pipeline.burn_subtitles") @patch("bilingualsub.api.pipeline.serialize_bilingual_ass") @patch("bilingualsub.api.pipeline.serialize_srt") @patch("bilingualsub.api.pipeline.merge_subtitles") @@ -76,7 +76,6 @@ async def test_successful_pipeline( mock_merge, mock_serialize_srt, mock_serialize_ass, - mock_burn, tmp_path: Path, ) -> None: sub = _make_subtitle() @@ -86,10 +85,10 @@ async def test_successful_pipeline( mock_merge.return_value = sub.entries mock_serialize_srt.return_value = "1\n00:00:00,000 --> 00:00:04,000\nLine 1" mock_serialize_ass.return_value = "[Script Info]\n..." - mock_burn.return_value = tmp_path / "output.mp4" job = _make_job() - await run_pipeline(job) + await run_download(job) + await run_subtitle(job) # Collect all events events = [] @@ -113,7 +112,7 @@ async def test_download_error(self, mock_download) -> None: mock_download.side_effect = DownloadError("Network error") job = _make_job() - await run_pipeline(job) + await run_download(job) events = [] while not job.event_queue.empty(): @@ -124,126 +123,38 @@ async def test_download_error(self, mock_download) -> None: assert error_events[0]["data"]["code"] == "download_failed" assert job.status == JobStatus.FAILED - @patch("bilingualsub.api.pipeline.burn_subtitles") - @patch("bilingualsub.api.pipeline.serialize_bilingual_ass") - @patch("bilingualsub.api.pipeline.serialize_srt") - @patch("bilingualsub.api.pipeline.merge_subtitles") - @patch("bilingualsub.api.pipeline.translate_subtitle") - @patch("bilingualsub.api.pipeline.transcribe_audio") - @patch("bilingualsub.api.pipeline.extract_audio") - @patch("bilingualsub.api.pipeline.download_video") - async def test_transcribe_receives_audio_path( - self, - mock_download, - mock_extract_audio, - mock_transcribe, - mock_translate, - mock_merge, - mock_serialize_srt, - mock_serialize_ass, - mock_burn, - ) -> None: - """Transcribe should receive extracted audio path, not video path.""" - sub = _make_subtitle() - mock_download.return_value = _make_metadata() - mock_transcribe.return_value = sub - mock_translate.return_value = sub - mock_merge.return_value = sub.entries - mock_serialize_srt.return_value = "1\n00:00:00,000 --> 00:00:04,000\nLine 1" - mock_serialize_ass.return_value = "[Script Info]\n..." - - job = _make_job() - await run_pipeline(job) - - # Verify extract_audio was called with video path - extract_call_args = mock_extract_audio.call_args[0] - assert extract_call_args[0].name == "video.mp4" - assert extract_call_args[1].name == "audio.mp3" - - # Verify transcribe_audio received audio path, NOT video path - transcribe_call_args = mock_transcribe.call_args - audio_arg = transcribe_call_args[0][0] - assert audio_arg.name == "audio.mp3" - - @patch("bilingualsub.api.pipeline.burn_subtitles") - @patch("bilingualsub.api.pipeline.serialize_bilingual_ass") - @patch("bilingualsub.api.pipeline.serialize_srt") - @patch("bilingualsub.api.pipeline.merge_subtitles") - @patch("bilingualsub.api.pipeline.translate_subtitle") - @patch("bilingualsub.api.pipeline.transcribe_audio") @patch("bilingualsub.api.pipeline.extract_audio") - @patch("bilingualsub.api.pipeline.trim_video") @patch("bilingualsub.api.pipeline.download_video") - async def test_pipeline_with_time_range_calls_trim( + async def test_download_with_time_range_completes( self, mock_download, - mock_trim, mock_extract_audio, - mock_transcribe, - mock_translate, - mock_merge, - mock_serialize_srt, - mock_serialize_ass, - mock_burn, ) -> None: - """Given job with time range, download_video should receive time parameters.""" - sub = _make_subtitle() + """Given job with time range, run_download produces expected output files.""" mock_download.return_value = _make_metadata() - mock_transcribe.return_value = sub - mock_translate.return_value = sub - mock_merge.return_value = sub.entries - mock_serialize_srt.return_value = "1\n00:00:00,000 --> 00:00:04,000\nLine 1" - mock_serialize_ass.return_value = "[Script Info]\n..." job = _make_job_with_time_range() - await run_pipeline(job) - - # Verify download was called with time parameters - mock_download.assert_called_once() - call_kwargs = mock_download.call_args[1] - assert call_kwargs["start_time"] == 10.0 - assert call_kwargs["end_time"] == 30.0 - - # Verify trim_video was NOT called (trimming happens during download now) - mock_trim.assert_not_called() + await run_download(job) - assert job.status == JobStatus.COMPLETED + assert job.status == JobStatus.DOWNLOAD_COMPLETE + assert FileType.SOURCE_VIDEO in job.output_files + assert FileType.AUDIO in job.output_files - @patch("bilingualsub.api.pipeline.burn_subtitles") - @patch("bilingualsub.api.pipeline.serialize_bilingual_ass") - @patch("bilingualsub.api.pipeline.serialize_srt") - @patch("bilingualsub.api.pipeline.merge_subtitles") - @patch("bilingualsub.api.pipeline.translate_subtitle") - @patch("bilingualsub.api.pipeline.transcribe_audio") @patch("bilingualsub.api.pipeline.extract_audio") - @patch("bilingualsub.api.pipeline.trim_video") @patch("bilingualsub.api.pipeline.download_video") - async def test_pipeline_without_time_range_skips_trim( + async def test_download_without_time_range_completes( self, mock_download, - mock_trim, mock_extract_audio, - mock_transcribe, - mock_translate, - mock_merge, - mock_serialize_srt, - mock_serialize_ass, - mock_burn, ) -> None: - """Given job without time range, pipeline should NOT call trim_video.""" - sub = _make_subtitle() + """Given job without time range, run_download produces expected output files.""" mock_download.return_value = _make_metadata() - mock_transcribe.return_value = sub - mock_translate.return_value = sub - mock_merge.return_value = sub.entries - mock_serialize_srt.return_value = "1\n00:00:00,000 --> 00:00:04,000\nLine 1" - mock_serialize_ass.return_value = "[Script Info]\n..." job = _make_job() - await run_pipeline(job) + await run_download(job) - mock_trim.assert_not_called() - assert job.status == JobStatus.COMPLETED + assert job.status == JobStatus.DOWNLOAD_COMPLETE + assert FileType.SOURCE_VIDEO in job.output_files @pytest.mark.unit @@ -283,6 +194,28 @@ async def test_run_download_saves_metadata( assert job.video_width == 1920 assert job.video_height == 1080 + @patch("bilingualsub.api.pipeline.extract_audio") + @patch("bilingualsub.api.pipeline.download_video") + async def test_run_download_extract_audio_failure_sends_error( + self, mock_download, mock_extract_audio + ) -> None: + """When extract_audio raises FFmpegError, job transitions to FAILED.""" + mock_download.return_value = _make_metadata() + mock_extract_audio.side_effect = FFmpegError("ffmpeg segfault") + + job = _make_job() + await run_download(job) + + events = [] + while not job.event_queue.empty(): + events.append(job.event_queue.get_nowait()) + + error_events = [e for e in events if e["event"] == SSEEvent.ERROR] + assert len(error_events) == 1 + assert error_events[0]["data"]["code"] == "burn_failed" + assert "ffmpeg segfault" in error_events[0]["data"]["detail"] + assert job.status == JobStatus.FAILED + @pytest.mark.unit @pytest.mark.asyncio diff --git a/tests/unit/api/test_routes.py b/tests/unit/api/test_routes.py index c6acd49..d70042d 100644 --- a/tests/unit/api/test_routes.py +++ b/tests/unit/api/test_routes.py @@ -261,6 +261,14 @@ def test_truncates_to_120_chars(self) -> None: def test_strips_leading_trailing_dots_and_spaces(self) -> None: assert _sanitize_filename(" .hello. ") == "hello" + def test_sanitize_all_bad_chars_returns_default(self) -> None: + result = _sanitize_filename(":::") + assert result == "video" + + def test_sanitize_dots_only_returns_default(self) -> None: + result = _sanitize_filename("...") + assert result == "video" + @pytest.mark.unit class TestBuildDownloadFilename: @@ -281,14 +289,10 @@ def test_translation_produces_lang_suffix(self) -> None: def test_title_with_illegal_chars_stripped(self) -> None: job = _make_job( - title="My: <video> / test", source_lang="en", target_lang="zh-TW" + title='My: <video> / test"', source_lang="en", target_lang="zh-TW" ) result = _build_download_filename(job, FileType.SRT) - assert ":" not in result - assert "<" not in result - assert ">" not in result - assert result.startswith("My") - assert result.endswith("(en_to_zh-TW).srt") + assert result == "My video test (en_to_zh-TW).srt" def test_source_video_always_original(self) -> None: job = _make_job(title="My Video", source_lang="en", target_lang="zh-TW") diff --git a/tests/unit/core/test_downloader.py b/tests/unit/core/test_downloader.py index 6e0f32c..77f6705 100644 --- a/tests/unit/core/test_downloader.py +++ b/tests/unit/core/test_downloader.py @@ -186,23 +186,6 @@ def extract_info_side_effect(url, download=True): "https://www.youtube.com/watch?v=dQw4w9WgXcQ", output_path ) - # Verify yt-dlp was called correctly - mock_yt_dlp.YoutubeDL.assert_called_once() - ydl_opts = mock_yt_dlp.YoutubeDL.call_args[0][0] - assert "format" in ydl_opts - assert "outtmpl" in ydl_opts - assert ydl_opts["merge_output_format"] == "mp4" - - mock_ydl_instance.extract_info.assert_called_once_with( - "https://www.youtube.com/watch?v=dQw4w9WgXcQ", download=True - ) - - # Verify ffprobe was called - mock_subprocess.run.assert_called_once() - ffprobe_call = mock_subprocess.run.call_args - assert "ffprobe" in ffprobe_call[0][0] - assert str(output_path) in ffprobe_call[0][0] - # Verify metadata assert metadata.title == "Test Video" assert metadata.duration == 120.5 @@ -238,7 +221,6 @@ def extract_info_side_effect(url, download=True): metadata = download_video("https://youtu.be/dQw4w9WgXcQ", output_path) assert metadata.title == "Test Video" - mock_ydl_instance.extract_info.assert_called_once() def test_empty_url_raises_error(self, tmp_path): """Test that empty URL raises error.""" @@ -584,8 +566,10 @@ def test_all_youtube_url_formats_accepted(self, tmp_path): fps=30.0, ) output_path = tmp_path / f"video_{valid_urls.index(url)}.mp4" - # Should not raise ValueError - download_video(url, output_path) + # Should not raise ValueError and should return valid metadata + result = download_video(url, output_path) + assert result.title == "Test" + assert result.duration == 120.0 class TestDownloadWithTimeRange: