From 02584b1eb8efc59b1660dee3b0c09b986841ed3e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maple=EF=BC=81?= Date: Wed, 13 May 2026 14:09:35 +0800 Subject: [PATCH] feat: video intro and watermark for source attribution Add a cinematic intro (5s) and persistent top-right watermark to burned videos, attributing the original creator. The intro displays channel name, video title, URLs, and a bilingual copyright notice with staged fade-in animation. Skipped automatically for local file uploads. - Extend VideoMetadata/Job with channel, channel_url, video_fps fields - Add generate_intro() using FFmpeg lavfi color + multi-layer drawtext - Add concat_videos() using FFmpeg concat demuxer (stream copy) - Add watermark drawtext to burn_subtitles via watermark_text param - Graceful degradation: intro/concat failures fall back to subtitle-only - Extract _run_ffmpeg_with_progress() to eliminate subprocess duplication - 190 unit tests, 0 regressions Co-Authored-By: Claude Opus 4.6 (1M context) --- docs/arch/intro-watermark.md | 355 +++++++++++++++++++++ docs/design/intro-watermark.md | 180 +++++++++++ src/bilingualsub/api/constants.py | 1 + src/bilingualsub/api/jobs.py | 3 + src/bilingualsub/api/pipeline.py | 74 ++++- src/bilingualsub/core/downloader.py | 16 + src/bilingualsub/utils/__init__.py | 4 + src/bilingualsub/utils/ffmpeg.py | 439 +++++++++++++++++++++++--- tests/unit/api/test_pipeline.py | 290 ++++++++++++++++- tests/unit/utils/test_ffmpeg_intro.py | 394 +++++++++++++++++++++++ 10 files changed, 1714 insertions(+), 42 deletions(-) create mode 100644 docs/arch/intro-watermark.md create mode 100644 docs/design/intro-watermark.md create mode 100644 tests/unit/utils/test_ffmpeg_intro.py diff --git a/docs/arch/intro-watermark.md b/docs/arch/intro-watermark.md new file mode 100644 index 0000000..d52d0bd --- /dev/null +++ b/docs/arch/intro-watermark.md @@ -0,0 +1,355 @@ +# Architecture: 影片片頭與浮水印 + +## 概述 + +在現有三段管線(download → subtitle → burn)的 `run_burn` 完成後,加入兩個後置步驟:watermark burn(正片加浮水印)和 intro + concat(生成片頭,接合)。是否執行這兩個後置步驟由 `job.video_channel` 是否為空字串決定——本地上傳沒有頻道資訊故自動跳過,維持現有行為。 + +兩個新 FFmpeg 函數 `generate_intro()` 和 `concat_videos()` 加入 `src/bilingualsub/utils/ffmpeg.py`,均使用與 `burn_subtitles` 相同的 `subprocess.Popen + SpooledTemporaryFile + progress pipe` 模式。浮水印則是在現有 `burn_subtitles` 的 `-vf` filter chain 末端用逗號串接 `drawtext`,透過新增 `watermark_text: str | None = None` 可選參數實現,現有呼叫方不受影響。 + +`downloader.py` 的 `VideoMetadata` 加入 `channel: str = ""` 欄位,`download_video()` 從 info_dict 以 `channel` → `uploader` → `""` 的優先序擷取並寫入。`Job` dataclass 加入 `video_channel: str = ""` 和 `video_channel_url: str = ""`,`run_download` 在現有 metadata 寫入區段補存這兩個欄位。 + +字體策略:Inter 和 Noto Serif TC 不在所有系統預裝。`drawtext` 使用 `font=` 名稱而非 `fontfile=` 路徑,讓 FFmpeg 依系統 fontconfig 解析。設計規格的 Inter(無衬線)對應 `font='Arial'`,Noto Serif TC(衬線)對應 `font='serif'`。系統若缺字體 FFmpeg 自動回退到內建 monospace,視覺略差但不會失敗,符合降級精神。片頭生成強制使用 libx264(即使在 macOS 上),因為 VideoToolbox 不支援以 `lavfi color` 作為輸入源的硬體加速路徑,且 5 秒片頭的軟體編碼成本可接受。 + +## Files to Create / Modify + +### 新建 + +| 路徑 | 說明 | +| ---------------------------------------------------- | ------------------------------------------------------------------------ | +| `tests/unit/utils/test_ffmpeg_intro.py` | UT:`generate_intro`、`concat_videos`、`burn_subtitles` watermark branch | +| `tests/integration/test_intro_watermark_pipeline.py` | IT:Journey 索引的因果鏈測試 | + +### 修改 + +| 路徑 | 修改內容 | +| ------------------------------------- | ---------------------------------------------------------------------------------------------------------------------------------- | +| `src/bilingualsub/core/downloader.py` | `VideoMetadata` 加 `channel: str = ""`;`download_video()` 補加 channel 擷取;`_extract_metadata_from_info_dict()` 加 channel 欄位 | +| `src/bilingualsub/api/jobs.py` | `Job` 加 `video_channel: str = ""` 和 `video_channel_url: str = ""` | +| `src/bilingualsub/api/constants.py` | `FileType` 加 `INTRO_VIDEO = "intro_video"` | +| `src/bilingualsub/utils/ffmpeg.py` | `burn_subtitles()` 加 `watermark_text` 參數;新增 `generate_intro()`;新增 `concat_videos()` | +| `src/bilingualsub/api/pipeline.py` | `run_download()` 補存 channel;`run_burn()` 加片頭流程和降級邏輯;burn 進度重映射 | + +## Responsibility Map + +| 元件 | 層級 | 負責 | 不碰 | +| --------------------------------------------------------- | --------- | ------------------------------------------------------------------------------------------------------------- | ------------------------------------- | +| `core/downloader.py` — `VideoMetadata.channel` | Core Data | 持有頻道名稱;fallback 順序:`channel` → `uploader` → `""` | pipeline 邏輯、FFmpeg | +| `utils/ffmpeg.py` — `burn_subtitles(watermark_text=)` | Utils | 將 drawtext 串接到既有 vf_filter 末端;`None` 時行為與現在完全相同 | channel 來源判斷 | +| `utils/ffmpeg.py` — `generate_intro()` | Utils | 用 FFmpeg `color + drawtext + fade` 生成黑底片頭;回傳 Path;FFmpegError 向上拋 | pipeline 進度、job 狀態、channel 決策 | +| `utils/ffmpeg.py` — `concat_videos()` | Utils | 用 FFmpeg concat demuxer 接合兩段影片;回傳 Path;FFmpegError 向上拋 | 片頭內容決策 | +| `api/pipeline.py` — `run_burn()` 後置邏輯 | Pipeline | 判斷 `job.video_channel` 是否有值;依序呼叫 burn(含 watermark)→ generate_intro → concat;降級邏輯;進度管理 | FFmpeg filter 細節 | +| `api/jobs.py` — `Job.video_channel` / `video_channel_url` | State | 跨 phase 攜帶頻道名稱和 URL | 決策邏輯 | + +## Interface Design + +### `VideoMetadata` 新增欄位 + +```python +@dataclass +class VideoMetadata: + title: str + duration: float + width: int + height: int + fps: float + description: str = "" + channel: str = "" # 新增;空字串合法(本地上傳) +``` + +`__post_init__` 不驗證 `channel`,空字串不拋錯。 + +### channel 擷取邏輯(在 `download_video()` info_dict 覆寫區段末尾加入) + +```python +channel_raw = info_dict.get("channel") or info_dict.get("uploader") or "" +metadata.channel = channel_raw.strip() if isinstance(channel_raw, str) else "" +``` + +### `Job` 新增欄位 + +```python +video_channel: str = "" # 頻道名稱,空字串 → 跳過片頭流程 +video_channel_url: str = "" # 頻道 URL,空字串 → 片頭中不顯示頻道 URL 行 +``` + +### `run_download()` 中的 channel_url 判斷 + +```python +raw_channel_url = info_dict.get("channel_url", "") or "" +is_youtube_source = "youtube.com" in (job.source_url or "") +job.video_channel_url = raw_channel_url if (is_youtube_source and raw_channel_url) else "" +``` + +### `burn_subtitles` 新增參數 + +```python +def burn_subtitles( + video_path: Path, + subtitle_path: Path, + output_path: Path, + *, + on_progress: Callable[[float], None] | None = None, + watermark_text: str | None = None, # 新增;None → 不加浮水印 +) -> Path: +``` + +watermark drawtext 串接邏輯(在既有 `vf_filter` 建立後加入): + +```python +if watermark_text is not None: + safe_text = watermark_text.replace("'", "\\'").replace(":", "\\:") + watermark_drawtext = ( + f"drawtext=text='{safe_text}'" + ":font='Arial'" + ":fontsize=12" + ":fontcolor=white@0.45" + ":shadowcolor=black@0.8" + ":shadowx=1:shadowy=1" + ":x=w-tw-20" + ":y=18" + ) + vf_filter = f"{vf_filter},{watermark_drawtext}" +``` + +### `generate_intro` 函數簽名 + +```python +def generate_intro( + output_path: Path, + *, + width: int, + height: int, + fps: float, + channel: str, + video_title: str, + video_url: str, + channel_url: str = "", # 空字串 → 不顯示頻道 URL 行 + duration: float = 5.0, + on_progress: Callable[[float], None] | None = None, +) -> Path: +``` + +FFmpeg 命令結構: + +``` +ffmpeg + -f lavfi + -i color=c=black:s={width}x{height}:r={fps}:d={duration} + -vf "{drawtext_chain},fade=t=out:st={duration - 0.5}:d=0.5" + -c:v libx264 -crf 23 -preset fast + -an + -progress pipe:1 + -y {output_path} +``` + +`drawtext_chain` 為多個 drawtext filter 以逗號串接,各 block 使用 `enable='between(t,{start},{end})'` 和 `alpha=` 表達式實現分段淡入效果。`channel_url` 為空時跳過對應的 drawtext filter。文字溢出透過 `drawtext` 的座標和 `fix_bounds=1` 參數限制顯示寬度。 + +### `concat_videos` 函數簽名 + +```python +def concat_videos( + first_path: Path, + second_path: Path, + output_path: Path, + *, + on_progress: Callable[[float], None] | None = None, +) -> Path: +``` + +FFmpeg 命令結構: + +``` +# 暫存 concat list 文件(output_path.parent / "concat_list.txt"): +file '/absolute/path/to/intro.mp4' +file '/absolute/path/to/main.mp4' + +ffmpeg + -f concat -safe 0 + -i {concat_list_path} + -c copy + -progress pipe:1 + -y {output_path} +``` + +`-c copy` 不重新編碼。片頭固定 libx264,正片在 macOS 用 h264_videotoolbox,Linux 用 libx264,兩者均為 H.264,concat 後容器相容。暫存 concat list 文件在 `try/finally` 中刪除。 + +## Data Flow + +### Journey 1:YouTube URL 燒錄流程(有頻道資訊) + +``` +POST /api/jobs { source_url: "https://youtube.com/..." } + │ + run_download(job) + ├── download_video() → VideoMetadata { channel: "3Blue1Brown", ... } + ├── job.video_channel = "3Blue1Brown" + ├── job.video_channel_url = "youtube.com/@3Blue1Brown" ← is_youtube=True + ├── job.video_width = 1920, job.video_height = 1080 + └── job.output_files[SOURCE_VIDEO] = work_dir/video.mp4 + │ + run_subtitle(job) → (unchanged) + │ +POST /api/jobs/:id/burn { srt_content } + │ + run_burn(job, srt_content) + │ + ├── [0%→80%] burn_subtitles( + │ source_video, srt_path, work_dir/output.mp4, + │ watermark_text="Source: 3Blue1Brown", + │ on_progress=lambda p: _send_progress(..., p * 0.8), + │ ) → work_dir/output.mp4 + │ + ├── job.video_channel 不為空 → 進入片頭流程 + │ + ├── [80%→90%] generate_intro( + │ work_dir/intro.mp4, + │ width=1920, height=1080, fps=30.0, + │ channel="3Blue1Brown", + │ video_title=job.video_title, + │ video_url=job.source_url, + │ channel_url="youtube.com/@3Blue1Brown", + │ ) → work_dir/intro.mp4 + │ + ├── [90%→99%] concat_videos( + │ work_dir/intro.mp4, + │ work_dir/output.mp4, + │ work_dir/final.mp4, + │ ) → work_dir/final.mp4 + │ + ├── job.output_files[VIDEO] = work_dir/final.mp4 + └── _send_complete(job) ← progress = 100% +``` + +### Journey 2:本地上傳(無頻道資訊) + +``` + run_download(job) + ├── _acquire_video() → extract_video_metadata()(無 channel) + ├── job.video_channel = "" + └── 其餘不變 + │ + run_burn(job, srt_content) + ├── [0%→100%] burn_subtitles(..., watermark_text=None) ← 行為與現在一致 + ├── job.video_channel == "" → 跳過片頭流程 + ├── job.output_files[VIDEO] = work_dir/output.mp4 + └── _send_complete(job) +``` + +### Journey 3:非 YouTube URL(有頻道名稱,無頻道 URL) + +``` + run_download(job) + ├── download_video() → VideoMetadata { channel: "Bilibili主播", ... } + ├── job.video_channel = "Bilibili主播" + └── job.video_channel_url = "" ← is_youtube=False → 強制空字串 + │ + run_burn(job, srt_content) + ├── generate_intro(..., channel_url="") ← 不顯示頻道 URL 行 + └── 其餘同 Journey 1 +``` + +### 降級路徑 + +``` + generate_intro() 拋 FFmpegError + ├── log.warning("intro_generation_failed", error=str(exc)) + ├── 跳過 concat_videos + ├── job.output_files[VIDEO] = output_video ← 僅字幕正片 + └── _send_complete(job) + + concat_videos() 拋 FFmpegError(intro 已生成) + ├── log.warning("concat_failed", error=str(exc)) + ├── job.output_files[VIDEO] = output_video ← 降級為僅字幕正片 + └── _send_complete(job) +``` + +## Build Sequence + +### Phase 1:Core Data 擴充(additive) + +- `src/bilingualsub/core/downloader.py`:`VideoMetadata` 加 `channel: str = ""`;`download_video()` 補加 channel 擷取 +- `src/bilingualsub/api/jobs.py`:`Job` 加 `video_channel: str = ""` 和 `video_channel_url: str = ""` +- `src/bilingualsub/api/constants.py`:`FileType` 加 `INTRO_VIDEO = "intro_video"` + +### Phase 2:FFmpeg 工具函數(additive,向後相容) + +- `src/bilingualsub/utils/ffmpeg.py`: + - `burn_subtitles()` 加 `watermark_text: str | None = None` 參數 + - 新增 `generate_intro()` + - 新增 `concat_videos()` + +### Phase 3:Pipeline 整合(breaking — `run_burn` 行為改變) + +- `src/bilingualsub/api/pipeline.py`: + - `run_download()`:補存 `job.video_channel` 和 `job.video_channel_url` + - `run_burn()`:加 watermark 參數、片頭流程、降級邏輯、進度重映射 + +### Phase 4:測試(additive) + +- `tests/unit/utils/test_ffmpeg_intro.py` +- `tests/integration/test_intro_watermark_pipeline.py` +- 既有 pipeline tests 補充 channel 相關 cases + +## Infra Reuse + +| 現有元件 | 本功能如何複用 | +| ------------------------------------------------- | -------------------------------------------------------------------------------- | +| `subprocess.Popen + SpooledTemporaryFile` pattern | `generate_intro` 和 `concat_videos` 複製 `burn_subtitles` 的 subprocess 管理模式 | +| `_send_progress(job, JobStatus.BURNING, ...)` | intro 和 concat 子步驟繼續用 `BURNING` status,前端不需改 | +| `asyncio.to_thread()` | `generate_intro` 和 `concat_videos` 都是阻塞操作,用相同包裝模式 | +| `extract_video_metadata()` | 解析度和 fps 由 job 欄位傳入,不需額外呼叫 | + +## Test Strategy + +### Unit Test 邊界 + +**`tests/unit/utils/test_ffmpeg_intro.py`** + +| 目標 | 測試行為 | +| ---------------------------------------------- | ------------------------------------------------- | +| `burn_subtitles(watermark_text="Source: X")` | `-vf` 包含逗號分隔的 drawtext | +| `burn_subtitles(watermark_text=None)` | `-vf` 不含 drawtext(regression) | +| `burn_subtitles` watermark text 含 `:` | drawtext 中 `:` 被 escape | +| `burn_subtitles` watermark text 含 `'` | drawtext 中 `'` 被 escape | +| `generate_intro(..., channel_url="yt.com/@x")` | cmd 含 `color=` source;drawtext chain 含頻道 URL | +| `generate_intro(..., channel_url="")` | drawtext chain 不含頻道 URL | +| `generate_intro` FFmpeg 失敗 | 拋 `FFmpegError` | +| `generate_intro` 固定用 libx264 | cmd 中 `-c:v libx264`,不受 platform 影響 | +| `concat_videos(first, second, output)` | cmd 含 `-f concat`、`-c copy`;concat list 正確 | +| `concat_videos` 輸入不存在 | 拋 `FFmpegError` | + +**`tests/unit/api/test_pipeline.py` 補充** + +| 目標 | 測試行為 | +| -------------------------------- | --------------------------------------------------- | +| `run_burn` channel 不為空 | watermark_text 傳入;generate_intro + concat 被呼叫 | +| `run_burn` channel 為空 | watermark_text=None;generate_intro 未被呼叫 | +| `run_burn` generate_intro 失敗 | log warning;concat 未被呼叫;仍 COMPLETE | +| `run_burn` concat 失敗 | log warning;仍 COMPLETE;VIDEO = output.mp4 | +| `run_download` YouTube + channel | job.video_channel 正確 | +| `run_download` fallback uploader | job.video_channel = uploader | +| `run_download` 本地上傳 | job.video_channel = "" | + +### Integration Test 邊界 + +**`tests/integration/test_intro_watermark_pipeline.py`** + +| Journey | Test Chain | +| ------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------- | +| J1:YouTube → 完整片頭+浮水印 | POST /jobs → run_download (mock) → run_burn → assert burn with watermark → assert generate_intro called → assert concat → VIDEO = final.mp4 | +| J2:本地上傳 → 無片頭無浮水印 | POST /jobs (local) → run_burn → assert burn without watermark → generate_intro never called | +| J3:非 YouTube → 片頭無頻道 URL | run_burn → assert generate_intro(channel_url="") | +| 降級:intro 失敗 | generate_intro raises → still COMPLETED → VIDEO = output.mp4 | +| 降級:concat 失敗 | concat raises → still COMPLETED → VIDEO = output.mp4 | + +### Mock 決策 + +| 對象 | Mock / Real | 原因 | +| ----------------------------------------------------- | ---------------------------- | ------------------------------ | +| `subprocess.Popen` | Mock | 避免真實 FFmpeg;驗證 cmd 結構 | +| `burn_subtitles` / `generate_intro` / `concat_videos` | Mock + spy(pipeline tests) | 隔離 run_burn 邏輯 | +| `download_video` | Mock(沿用現有) | 避免網路請求 | + +### Coverage 要求 + +核心邏輯 ≥ 90%,整體 ≥ 80% diff --git a/docs/design/intro-watermark.md b/docs/design/intro-watermark.md new file mode 100644 index 0000000..492ec69 --- /dev/null +++ b/docs/design/intro-watermark.md @@ -0,0 +1,180 @@ +# 影片片頭與浮水印 — 來源宣告 + +## 背景與問題 + +使用者用 BilingualSub 產生翻譯字幕後,經常會在 Facebook、YouTube、X 等社群平台分享燒錄過的影片。目前產出的影片沒有任何來源標示,觀看者無法得知原始影片出處,也無法找到原始創作者。 + +缺乏來源宣告會造成兩個問題: + +1. **對原始創作者不尊重** — 看起來像是搬運而非善意分享,原作者若看到觀感不佳 +2. **對分享者有法律風險** — 翻譯屬於著作權法的「改作」,明確標註來源、提供移除管道可展現善意 + +## 使用者角色 + +**分享者**:使用 BilingualSub 翻譯影片字幕,並在社群平台分享給中文受眾的人。動機是讓更多人看到好內容,非營利目的。 + +## 需求情境 + +- 分享者:When 我在 Facebook 分享翻譯過的影片, I want to 影片自動包含來源宣告, so I can 讓觀看者知道原始出處、展現對創作者的尊重、降低著作權爭議風險。 + +## 設計意圖 + +- **永遠啟用,不提供開關** → 來源宣告是對原作者的基本尊重,不應該是可選的 +- **片頭用電影 disclaimer 風格** → 觀眾習慣電影片頭有來源/版權宣告,自然不突兀 +- **著作權聲明用中英雙語** → 原作者多為英語系,需要看得懂;分享對象多為中文使用者,也需要中文 +- **措辭用「如需移除,請聯繫上傳者」** → 中性語氣,不預設對方態度,提供合理的移除管道 +- **浮水印全程顯示** → 即使觀眾跳過片頭,正片中仍能看到原始來源 +- **本地檔案跳過** → 本地檔案沒有頻道和 URL 資訊,強加無意義的片頭反而奇怪 +- **非 YouTube 平台不顯示頻道 URL** → 各平台頻道 URL 格式不統一,只顯示頻道名即可 +- **片頭解析度配合來源影片** → FFmpeg concat 要求兩段影片解析度一致,自動配合可避免黑邊或縮放問題 + +## User Journey + +### Journey 1:分享者 — 翻譯 YouTube 影片並分享 + +前置條件:使用者提交一個 YouTube URL 進行翻譯 + +1. 使用者提交 YouTube URL → 系統下載影片並擷取 metadata(含頻道名稱) +2. 系統完成轉錄、翻譯、字幕合併 +3. 使用者編輯字幕後點擊燒錄 → 系統開始燒錄流程 +4. 系統生成片頭影片(黑底,來源宣告,約 5 秒)→ 解析度配合來源影片 +5. 系統燒錄字幕至正片,同時加上右上角浮水印 → 浮水印顯示 `Source: {頻道名稱}` +6. 系統將片頭與正片接合 → 產出最終影片 +7. 使用者下載最終影片 → 影片開頭有來源宣告,正片有浮水印 + +### Journey 2:分享者 — 使用本地影片檔案 + +前置條件:使用者上傳本地影片檔案(非 URL) + +1. 使用者上傳本地影片 → 系統擷取 metadata(無頻道資訊) +2. 系統完成轉錄、翻譯、字幕合併 +3. 使用者編輯字幕後點擊燒錄 → 系統開始燒錄流程 +4. 系統燒錄字幕至影片,**不加片頭、不加浮水印** +5. 使用者下載最終影片 → 僅有燒錄字幕,與現有行為一致 + +### Journey 3:分享者 — 使用非 YouTube 平台(Bilibili、Twitter 等) + +前置條件:使用者提交非 YouTube 的 URL + +1. 使用者提交 URL → 系統透過 yt-dlp 下載並擷取 metadata +2. 系統完成轉錄、翻譯、字幕合併 +3. 使用者編輯字幕後點擊燒錄 → 系統開始燒錄流程 +4. 系統生成片頭影片 → 頻道名稱正常顯示,**頻道 URL 省略**,影片 URL 正常顯示 +5. 系統燒錄字幕至正片,加上右上角浮水印 +6. 系統將片頭與正片接合 → 產出最終影片 + +## 片頭視覺規格 + +### 設計風格 + +Variant 3 / 英文 eyebrow 左對齊文件風,黑底電影 disclaimer 風格。 + +### 內容結構(從上到下,左對齊) + +| 層級 | 內容 | 字體 | 大小 | 顏色(白字透明度) | +| -------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------------------------------------------------ | ---- | ------------------ | +| Eyebrow | `ORIGINAL VIDEO FROM` | Inter, 500 weight | 10px | 0.3 | +| 中文標籤 | `原始影片來自` | Noto Serif TC, 300 weight | 13px | 0.6 | +| 頻道名稱 | `{channel}` | Inter, 500 weight | 32px | 1.0 | +| 頻道 URL | `youtube.com/@{channel}`(僅 YouTube) | Inter, 400 weight | 11px | 0.35 | +| 影片標題 | `{title}` | Noto Serif TC, 300 weight | 16px | 0.7 | +| 影片 URL | `{url}` | Inter, 400 weight | 12px | 0.5 | +| 中文聲明 | 翻譯字幕由開源專案 BilingualSub 產生 / 所有內容及著作權屬於原始創作者所有 / 如需移除,請聯繫上傳者 | Noto Serif TC, 300 weight | 12px | 0.45 | +| 英文聲明 | Subtitles generated by BilingualSub (open source) / All content and copyrights belong to the original creator / For removal requests, please contact the uploader | Inter, 400 weight | 11px | 0.35 | +| 品牌 | `BilingualSub`(右下角) | Inter, 400 weight, uppercase, letter-spacing 3px | 10px | 0.25 | + +### 動畫 + +- 各區塊依序淡入(間隔約 0.3s),帶微小上滑位移 +- 全部顯示後停留約 2 秒 +- 整體淡出 +- 總時長約 5 秒 + +### 解析度 + +配合來源影片的 width x height 生成,確保 concat 無縫接合。 + +### 實現方式 + +FFmpeg `color` filter(黑底)+ 多層 `drawtext` filter + `fade` filter。不需外部圖片素材。 + +## 浮水印視覺規格 + +| 項目 | 規格 | +| -------- | -------------------------------- | +| 位置 | 右上角(top: 18px, right: 20px) | +| 內容 | `Source: {channel}` | +| 字體 | Inter, 400 weight | +| 大小 | 12px | +| 顏色 | 白字,透明度 0.45 | +| 陰影 | 黑色陰影(確保在亮色背景上可讀) | +| 顯示範圍 | 正片全程 | + +### 實現方式 + +在 burn 步驟的 FFmpeg filter chain 中加入 `drawtext` filter。 + +## 替代流程 + +- **yt-dlp 未回傳頻道名稱**:使用 `uploader` 欄位作為 fallback;若兩者皆無,使用影片標題代替頻道名稱,浮水印顯示 `Source: {title}` +- **影片標題過長**:片頭中截斷顯示(FFmpeg drawtext 限定最大寬度為影片寬度的 70%) + +## 錯誤情境 + +### 系統錯誤 + +- **FFmpeg 片頭生成失敗**:跳過片頭,僅產出燒錄字幕的正片(降級處理),記錄警告 log +- **FFmpeg concat 失敗**:同上,降級為僅正片輸出 + +### 使用者誤操作 + +- 無特殊情境(功能為自動觸發,無使用者操作) + +## Out of Scope + +- 前端片頭/浮水印開關 +- 使用者自訂片頭內容或樣式 +- 使用者自訂浮水印位置或內容 +- 片尾(outro) +- 動態浮水印(移動、閃爍等) + +## 整合點 + +- **yt-dlp**:需要從 `info_dict` 擷取 `channel`、`uploader`、`channel_url` 欄位(目前未擷取) +- **FFmpeg**:片頭生成(drawtext + color + fade)、浮水印(drawtext)、接合(concat) +- **Pipeline**:在現有 burn 步驟之後新增片頭生成和接合步驟 +- **Job dataclass**:新增 `video_channel` 欄位 + +## Acceptance Criteria + +- Given 使用者提交 YouTube URL 並完成燒錄 + When 下載最終影片 + Then 影片開頭有約 5 秒的來源宣告片頭,包含頻道名稱、影片標題、YouTube URL、中英雙語著作權聲明 + +- Given 使用者提交 YouTube URL 並完成燒錄 + When 播放最終影片的正片部分 + Then 右上角全程顯示半透明浮水印 `Source: {頻道名稱}` + +- Given 使用者上傳本地影片檔案並完成燒錄 + When 下載最終影片 + Then 影片不包含片頭,不包含浮水印,行為與現有一致 + +- Given 使用者提交非 YouTube 平台的 URL(如 Bilibili)並完成燒錄 + When 下載最終影片 + Then 片頭顯示頻道名稱但不顯示頻道 URL,影片 URL 正常顯示 + +- Given 來源影片解析度為 720p + When 系統生成片頭 + Then 片頭解析度為 1280x720,與正片 concat 後無黑邊或縮放 + +- Given FFmpeg 片頭生成過程中發生錯誤 + When 系統偵測到錯誤 + Then 跳過片頭,僅輸出燒錄字幕的正片,並記錄警告 log + +## 開放問題 + +(無) + +## 視覺參考 + +HTML 預覽檔:`preview-intro-watermark.html`(Variant 3 / 英文 eyebrow 左對齊文件風) diff --git a/src/bilingualsub/api/constants.py b/src/bilingualsub/api/constants.py index f85ec2b..3f1a84b 100644 --- a/src/bilingualsub/api/constants.py +++ b/src/bilingualsub/api/constants.py @@ -25,6 +25,7 @@ class FileType(StrEnum): VIDEO = "video" AUDIO = "audio" SOURCE_VIDEO = "source_video" + INTRO_VIDEO = "intro_video" class SSEEvent(StrEnum): diff --git a/src/bilingualsub/api/jobs.py b/src/bilingualsub/api/jobs.py index 37ad00f..bc010fe 100644 --- a/src/bilingualsub/api/jobs.py +++ b/src/bilingualsub/api/jobs.py @@ -46,10 +46,13 @@ class Job: video_height: int = 0 video_title: str = "" video_description: str = "" + video_channel: str = "" # channel name; empty → skip intro flow + video_channel_url: str = "" # channel URL; empty → omit channel URL line in intro glossary_text: str = "" subtitle_source: str = "" processing_mode: ProcessingMode = ProcessingMode.SUBTITLE video_duration: float = 0.0 + video_fps: float = 0.0 output_files: dict[FileType, Path] = field(default_factory=dict) event_queue: asyncio.Queue[dict[str, object]] = field(default_factory=asyncio.Queue) created_at: float = field(default_factory=time.monotonic) diff --git a/src/bilingualsub/api/pipeline.py b/src/bilingualsub/api/pipeline.py index e833c37..42fab1f 100644 --- a/src/bilingualsub/api/pipeline.py +++ b/src/bilingualsub/api/pipeline.py @@ -41,8 +41,10 @@ from bilingualsub.utils import ( FFmpegError, burn_subtitles, + concat_videos, extract_audio, extract_video_metadata, + generate_intro, trim_video, ) @@ -293,9 +295,20 @@ async def run_download(job: Job) -> None: # Save metadata for subtitle phase job.video_width = metadata.width job.video_height = metadata.height + job.video_fps = metadata.fps job.video_title = metadata.title job.video_description = metadata.description job.video_duration = metadata.duration + job.video_channel = metadata.channel + + # Only show channel URL for YouTube sources + source_url = job.source_url or "" + is_youtube_source = "youtube.com" in source_url or "youtu.be" in source_url + raw_channel_url = metadata.channel_url + job.video_channel_url = ( + raw_channel_url if (is_youtube_source and raw_channel_url) else "" + ) + job.output_files[FileType.SOURCE_VIDEO] = video_path _send_download_complete(job) @@ -538,6 +551,11 @@ async def run_subtitle(job: Job) -> None: ) +_BURN_PROGRESS_END = 80.0 +_INTRO_PROGRESS_END = 90.0 +_CONCAT_PROGRESS_END = 99.0 + + async def run_burn(job: Job, srt_content: str) -> None: """Burn user-edited SRT into the source video.""" log = logger.bind(job_id=job.id) @@ -550,11 +568,16 @@ async def run_burn(job: Job, srt_content: str) -> None: _send_progress(job, JobStatus.BURNING, 0.0, "burn", "Burning subtitles") output_video = work_dir / "output.mp4" + has_channel = bool(job.video_channel) + watermark_text = f"Source: {job.video_channel}" if has_channel else None + + burn_scale = _BURN_PROGRESS_END / 100.0 if has_channel else 1.0 + def on_burn_progress(percent: float) -> None: _send_progress( job, JobStatus.BURNING, - percent, + percent * burn_scale, "burn", f"Burning subtitles ({percent:.0f}%)", ) @@ -565,7 +588,56 @@ def on_burn_progress(percent: float) -> None: srt_path, output_video, on_progress=on_burn_progress, + watermark_text=watermark_text, ) + + if has_channel: + intro_path = work_dir / "intro.mp4" + try: + await asyncio.to_thread( + generate_intro, + intro_path, + width=job.video_width, + height=job.video_height, + fps=job.video_fps if job.video_fps > 0 else 30.0, + channel=job.video_channel, + video_title=job.video_title, + video_url=job.source_url, + channel_url=job.video_channel_url, + on_progress=lambda p: _send_progress( + job, + JobStatus.BURNING, + _BURN_PROGRESS_END + + p * (_INTRO_PROGRESS_END - _BURN_PROGRESS_END) / 100.0, + "intro", + f"Generating intro ({p:.0f}%)", + ), + ) + job.output_files[FileType.INTRO_VIDEO] = intro_path + except FFmpegError as exc: + log.warning("intro_generation_failed", error=str(exc)) + # Degrade gracefully: skip intro, use subtitle-only main video + else: + final_path = work_dir / "final.mp4" + try: + await asyncio.to_thread( + concat_videos, + intro_path, + output_video, + final_path, + on_progress=lambda p: _send_progress( + job, + JobStatus.BURNING, + _INTRO_PROGRESS_END + + p * (_CONCAT_PROGRESS_END - _INTRO_PROGRESS_END) / 100.0, + "concat", + f"Combining intro and video ({p:.0f}%)", + ), + ) + output_video = final_path + except FFmpegError as exc: + log.warning("concat_failed", error=str(exc)) + job.output_files[FileType.VIDEO] = output_video _send_complete(job) log.info("burn_complete", job_id=job.id) diff --git a/src/bilingualsub/core/downloader.py b/src/bilingualsub/core/downloader.py index 28ea255..2a29c9a 100644 --- a/src/bilingualsub/core/downloader.py +++ b/src/bilingualsub/core/downloader.py @@ -29,6 +29,8 @@ class VideoMetadata: height: int fps: float description: str = "" + channel: str = "" # channel name; empty for local uploads + channel_url: str = "" # raw channel URL from yt-dlp; empty for local uploads def __post_init__(self) -> None: """Validate metadata constraints.""" @@ -119,6 +121,7 @@ def download_video( if isinstance(info_title, str) and info_title.strip(): metadata.title = info_title.strip() metadata.description = _sanitize_description(info_dict.get("description", "")) + metadata.channel, metadata.channel_url = _extract_channel_from_info(info_dict) return metadata @@ -130,6 +133,15 @@ def _sanitize_description(raw: Any) -> str: return raw.strip() +def _extract_channel_from_info(info_dict: dict[str, Any]) -> tuple[str, str]: + """Extract channel name and URL from a yt-dlp info_dict.""" + channel_raw = info_dict.get("channel") or info_dict.get("uploader") or "" + channel = channel_raw.strip() if isinstance(channel_raw, str) else "" + raw_url = info_dict.get("channel_url") or "" + channel_url = raw_url.strip() if isinstance(raw_url, str) else "" + return channel, channel_url + + _SUPPORTED_EXTRACTOR_CLASSES: list[type] = [ cls for cls in gen_extractor_classes() if cls.IE_NAME != "generic" ] @@ -279,6 +291,8 @@ def _extract_metadata_from_info_dict( if fps is None or fps <= 0: fps = 30.0 + channel, channel_url = _extract_channel_from_info(info_dict) + return VideoMetadata( title=title, duration=float(duration), @@ -286,6 +300,8 @@ def _extract_metadata_from_info_dict( height=int(height), fps=float(fps), description=_sanitize_description(info_dict.get("description", "")), + channel=channel, + channel_url=channel_url, ) diff --git a/src/bilingualsub/utils/__init__.py b/src/bilingualsub/utils/__init__.py index 5a08997..f695d32 100644 --- a/src/bilingualsub/utils/__init__.py +++ b/src/bilingualsub/utils/__init__.py @@ -4,8 +4,10 @@ from bilingualsub.utils.ffmpeg import ( FFmpegError, burn_subtitles, + concat_videos, extract_audio, extract_video_metadata, + generate_intro, get_audio_duration, split_audio, trim_video, @@ -15,8 +17,10 @@ "FFmpegError", "Settings", "burn_subtitles", + "concat_videos", "extract_audio", "extract_video_metadata", + "generate_intro", "get_audio_duration", "get_groq_api_key", "get_settings", diff --git a/src/bilingualsub/utils/ffmpeg.py b/src/bilingualsub/utils/ffmpeg.py index 71e1e0b..2ed0777 100644 --- a/src/bilingualsub/utils/ffmpeg.py +++ b/src/bilingualsub/utils/ffmpeg.py @@ -4,6 +4,7 @@ import subprocess # nosec B404 import sys import tempfile +import uuid from collections.abc import Callable, Iterable from pathlib import Path @@ -34,12 +35,85 @@ def _parse_and_report_progress( continue +def _escape_drawtext(text: str) -> str: + """Escape special characters for FFmpeg drawtext filter.""" + return ( + text.replace("\\", "\\\\") + .replace("%", "%%") + .replace("'", "\\'") + .replace(":", "\\:") + ) + + +def _run_ffmpeg_with_progress( + cmd: list[str], + *, + total_duration: float, + on_progress: Callable[[float], None] | None, + error_prefix: str, +) -> None: + """Run an FFmpeg command, streaming progress and raising FFmpegError on failure.""" + stdout_target: int = ( + subprocess.PIPE + if on_progress is not None and total_duration > 0 + else subprocess.DEVNULL + ) + + with tempfile.SpooledTemporaryFile(max_size=1024 * 1024) as stderr_file: + try: + process = subprocess.Popen( # nosec B603 + cmd, + stdout=stdout_target, + stderr=stderr_file, + ) + + if process.stdout: + try: + if on_progress is not None: + _parse_and_report_progress( + process.stdout, + total_duration=total_duration, + on_progress=on_progress, + ) + finally: + close_stdout = getattr(process.stdout, "close", None) + if callable(close_stdout): + close_stdout() + + returncode = process.wait() + if returncode != 0: + stderr_file.seek(0) + stderr_output = stderr_file.read().decode("utf-8", errors="replace") + raise FFmpegError(f"{error_prefix}: {stderr_output}") + except FFmpegError: + raise + except Exception as e: + raise FFmpegError(f"{error_prefix}: {e}") from e + + +def _append_watermark_drawtext(vf_filter: str, watermark_text: str) -> str: + """Append a corner watermark drawtext filter to an existing vf filter chain.""" + safe_text = _escape_drawtext(watermark_text) + watermark_drawtext = ( + f"drawtext=text='{safe_text}'" + ":font='Arial'" + ":fontsize=16" + ":fontcolor=white@0.6" + ":shadowcolor=black@0.8" + ":shadowx=1:shadowy=1" + ":x=w-tw-20" + ":y=18" + ) + return f"{vf_filter},{watermark_drawtext}" + + def burn_subtitles( video_path: Path, subtitle_path: Path, output_path: Path, *, on_progress: Callable[[float], None] | None = None, + watermark_text: str | None = None, ) -> Path: """Burn subtitles into video. @@ -48,6 +122,7 @@ def burn_subtitles( subtitle_path: Subtitle file (.srt or .ass) output_path: Output video file on_progress: Optional callback for progress updates (0-100) + watermark_text: Optional watermark text to overlay in the top-right corner Returns: Path to output video file @@ -91,6 +166,9 @@ def burn_subtitles( ) vf_filter = f"subtitles={subtitle_path}:force_style='{force_style}'" + if watermark_text is not None: + vf_filter = _append_watermark_drawtext(vf_filter, watermark_text) + # Get video duration for progress calculation metadata = extract_video_metadata(video_path) total_duration = float(metadata["duration"]) @@ -119,46 +197,12 @@ def burn_subtitles( str(output_path), ] - # Only open stdout pipe when progress callback is enabled. - # Otherwise route stdout to DEVNULL to avoid filling PIPE buffers. - stdout_target: int = ( - subprocess.PIPE - if on_progress is not None and total_duration > 0 - else subprocess.DEVNULL + _run_ffmpeg_with_progress( + cmd, + total_duration=total_duration, + on_progress=on_progress, + error_prefix="Failed to burn subtitles", ) - - # Use a temporary file for stderr to prevent pipe buffer deadlock - with tempfile.SpooledTemporaryFile(max_size=1024 * 1024) as stderr_file: - try: - process = subprocess.Popen( # nosec B603 - cmd, - stdout=stdout_target, - stderr=stderr_file, - ) - - if process.stdout: - try: - if on_progress is not None: - _parse_and_report_progress( - process.stdout, - total_duration=total_duration, - on_progress=on_progress, - ) - finally: - close_stdout = getattr(process.stdout, "close", None) - if callable(close_stdout): - close_stdout() - - returncode = process.wait() - if returncode != 0: - stderr_file.seek(0) - stderr_output = stderr_file.read().decode("utf-8", errors="replace") - raise FFmpegError(f"Failed to burn subtitles: {stderr_output}") - except FFmpegError: - raise - except Exception as e: - raise FFmpegError(f"Failed to burn subtitles: {e}") from e - return output_path @@ -417,3 +461,320 @@ def split_audio( chunk_idx += 1 return chunks + + +def generate_intro( # noqa: PLR0915 + output_path: Path, + *, + width: int, + height: int, + fps: float, + channel: str, + video_title: str, + video_url: str, + channel_url: str = "", + duration: float = 5.0, + on_progress: Callable[[float], None] | None = None, +) -> Path: + """Generate a black-background intro card video with source attribution text. + + Forces libx264 (not h264_videotoolbox) because lavfi color source does not + support hardware-accelerated encoding paths on macOS. + """ + left_margin = int(width * 0.10) + + # Each text layer fades in 0.3 s after the previous; earliest at t=0.3 + fade_step = 0.3 + blocks: list[str] = [] + + def _block_enable(start: float) -> str: + return f"between(t,{start:.1f},{duration:.1f})" + + def _dt( + text: str, + font: str, + fontsize: int, + fontcolor: str, + x: str, + y: str, + enable_expr: str, + fade_start: float, + ) -> str: + safe = _escape_drawtext(text) + alpha_expr = f"if(lt(t,{fade_start:.1f}),0,min((t-{fade_start:.1f})/0.3,1))" + return ( + f"drawtext=text='{safe}'" + f":font='{font}'" + f":fontsize={fontsize}" + f":fontcolor={fontcolor}" + f":x={x}" + f":y={y}" + f":alpha='{alpha_expr}'" + f":enable='{enable_expr}'" + ":fix_bounds=1" + ) + + # Y positions scaled to video height + y_eyebrow = int(height * 0.14) + y_chinese_label = y_eyebrow + int(height / 25) + y_channel = y_chinese_label + int(height / 22) + y_channel_url = y_channel + int(height / 20) + y_title = (y_channel_url if channel_url else y_channel) + int(height / 22) + y_video_url = y_title + int(height / 28) + y_decl_zh_1 = y_video_url + int(height / 20) + y_decl_zh_2 = y_decl_zh_1 + int(height / 40) + y_decl_zh_3 = y_decl_zh_2 + int(height / 40) + y_decl_en_1 = y_decl_zh_3 + int(height / 30) + y_decl_en_2 = y_decl_en_1 + int(height / 44) + y_decl_en_3 = y_decl_en_2 + int(height / 44) + + x_left = str(left_margin) + x_brand = f"w-tw-{int(width * 0.04)}" + y_brand = f"h-th-{int(height * 0.05)}" + + slot = 0 + + def _next_start() -> float: + nonlocal slot + start = fade_step + slot * fade_step + slot += 1 + return start + + # Eyebrow + _start = _next_start() + blocks.append( + _dt( + "ORIGINAL VIDEO FROM", + "Arial", + max(1, int(height / 54)), + "white@0.3", + x_left, + str(y_eyebrow), + _block_enable(_start), + _start, + ) + ) + + # Chinese label + _start = _next_start() + blocks.append( + _dt( + "原始影片來自", + "serif", + max(1, int(height / 42)), + "white@0.6", + x_left, + str(y_chinese_label), + _block_enable(_start), + _start, + ) + ) + + # Channel name + _start = _next_start() + blocks.append( + _dt( + channel, + "Arial", + max(1, int(height / 17)), + "white@1.0", + x_left, + str(y_channel), + _block_enable(_start), + _start, + ) + ) + + # Channel URL (optional) + if channel_url: + _start = _next_start() + blocks.append( + _dt( + channel_url, + "Arial", + max(1, int(height / 49)), + "white@0.35", + x_left, + str(y_channel_url), + _block_enable(_start), + _start, + ) + ) + + # Video title + _start = _next_start() + blocks.append( + _dt( + video_title, + "serif", + max(1, int(height / 34)), + "white@0.7", + x_left, + str(y_title), + _block_enable(_start), + _start, + ) + ) + + # Video URL + _start = _next_start() + blocks.append( + _dt( + video_url, + "Arial", + max(1, int(height / 45)), + "white@0.5", + x_left, + str(y_video_url), + _block_enable(_start), + _start, + ) + ) + + # Chinese declaration (3 lines) + decl_zh = [ + "翻譯字幕由開源專案 BilingualSub 產生", + "所有內容及著作權屬於原始創作者所有", + "如需移除,請聯繫上傳者", # noqa: RUF001 + ] + decl_zh_y = [y_decl_zh_1, y_decl_zh_2, y_decl_zh_3] + decl_zh_start = _next_start() + for line, y_pos in zip(decl_zh, decl_zh_y, strict=True): + blocks.append( + _dt( + line, + "serif", + max(1, int(height / 45)), + "white@0.45", + x_left, + str(y_pos), + _block_enable(decl_zh_start), + decl_zh_start, + ) + ) + + # English declaration (3 lines) + decl_en = [ + "Subtitles generated by BilingualSub (open source)", + "All content and copyrights belong to the original creator", + "For removal requests, please contact the uploader", + ] + decl_en_y = [y_decl_en_1, y_decl_en_2, y_decl_en_3] + decl_en_start = _next_start() + for line, y_pos in zip(decl_en, decl_en_y, strict=True): + blocks.append( + _dt( + line, + "Arial", + max(1, int(height / 49)), + "white@0.35", + x_left, + str(y_pos), + _block_enable(decl_en_start), + decl_en_start, + ) + ) + + # BilingualSub branding at bottom-right corner + blocks.append( + _dt( + "BilingualSub", + "Arial", + max(1, int(height / 54)), + "white@0.25", + x_brand, + y_brand, + _block_enable(fade_step), + fade_step, + ) + ) + + fade_out_start = duration - 0.5 + drawtext_chain = ",".join(blocks) + vf = f"{drawtext_chain},fade=t=out:st={fade_out_start:.2f}:d=0.5" + + cmd = [ + "ffmpeg", + "-f", + "lavfi", + "-i", + f"color=c=black:s={width}x{height}:r={fps}:d={duration}", + "-vf", + vf, + "-c:v", + "libx264", + "-crf", + "23", + "-preset", + "fast", + "-an", + "-progress", + "pipe:1", + "-y", + str(output_path), + ] + + _run_ffmpeg_with_progress( + cmd, + total_duration=duration, + on_progress=on_progress, + error_prefix="Failed to generate intro", + ) + return output_path + + +def concat_videos( + first_path: Path, + second_path: Path, + output_path: Path, + *, + on_progress: Callable[[float], None] | None = None, +) -> Path: + """Concatenate two videos using FFmpeg concat demuxer (no re-encode).""" + if not first_path.exists() or not first_path.is_file(): + raise FFmpegError(f"First video does not exist: {first_path}") + if not second_path.exists() or not second_path.is_file(): + raise FFmpegError(f"Second video does not exist: {second_path}") + + concat_list_path = output_path.parent / f"concat_list_{uuid.uuid4().hex[:8]}.txt" + try: + concat_list_path.write_text( + f"file '{first_path.resolve()}'\nfile '{second_path.resolve()}'\n", + encoding="utf-8", + ) + + # Estimate total duration for progress (sum of both clips) + try: + meta1 = extract_video_metadata(first_path) + meta2 = extract_video_metadata(second_path) + total_duration = float(meta1["duration"]) + float(meta2["duration"]) + except FFmpegError: + total_duration = 0.0 + + cmd = [ + "ffmpeg", + "-f", + "concat", + "-safe", + "0", + "-i", + str(concat_list_path), + "-c", + "copy", + "-progress", + "pipe:1", + "-y", + str(output_path), + ] + + _run_ffmpeg_with_progress( + cmd, + total_duration=total_duration, + on_progress=on_progress, + error_prefix="Failed to concat videos", + ) + finally: + if concat_list_path.exists(): + concat_list_path.unlink() + + return output_path diff --git a/tests/unit/api/test_pipeline.py b/tests/unit/api/test_pipeline.py index f87f099..6cc3ea2 100644 --- a/tests/unit/api/test_pipeline.py +++ b/tests/unit/api/test_pipeline.py @@ -2,13 +2,13 @@ from datetime import timedelta from pathlib import Path -from unittest.mock import patch +from unittest.mock import MagicMock, patch import pytest from bilingualsub.api.constants import FileType, JobStatus, SSEEvent from bilingualsub.api.jobs import Job -from bilingualsub.api.pipeline import run_download, run_subtitle +from bilingualsub.api.pipeline import run_burn, run_download, run_subtitle from bilingualsub.core.downloader import DownloadError, VideoMetadata from bilingualsub.core.subtitle import Subtitle, SubtitleEntry from bilingualsub.utils.ffmpeg import FFmpegError @@ -262,3 +262,289 @@ async def test_run_subtitle_sends_complete( event_types = [e["event"] for e in events] assert SSEEvent.COMPLETE in event_types assert job.status == JobStatus.COMPLETED + + +# --------------------------------------------------------------------------- +# run_burn — watermark + intro + concat + degradation +# --------------------------------------------------------------------------- + + +def _make_burn_job(tmp_path: Path, *, channel: str = "TestChannel") -> Job: + """Return a job pre-populated with the output files run_burn expects.""" + job = Job( + id="burn001", + source_url="https://youtube.com/watch?v=abc", + source_lang="en", + target_lang="zh-TW", + video_width=1920, + video_height=1080, + video_fps=30.0, + video_title="Test Video", + video_channel=channel, + video_channel_url="https://youtube.com/@TestChannel" if channel else "", + ) + # run_burn reads SOURCE_VIDEO to determine work_dir + video_path = tmp_path / "video.mp4" + video_path.write_bytes(b"fake") + job.output_files[FileType.SOURCE_VIDEO] = video_path + return job + + +@pytest.mark.unit +@pytest.mark.asyncio +class TestRunBurn: + """Tests for run_burn watermark, intro, concat, and degradation logic.""" + + @patch("bilingualsub.api.pipeline.concat_videos") + @patch("bilingualsub.api.pipeline.generate_intro") + @patch("bilingualsub.api.pipeline.burn_subtitles") + async def test_when_channel_set_then_burn_called_with_watermark_and_intro_called( + self, + mock_burn: object, + mock_intro: object, + mock_concat: object, + tmp_path: Path, + ) -> None: + """When video_channel is set, burn_subtitles receives watermark_text and generate_intro is called.""" + mock_burn.return_value = tmp_path / "output.mp4" + mock_intro.return_value = tmp_path / "intro.mp4" + mock_concat.return_value = tmp_path / "final.mp4" + + job = _make_burn_job(tmp_path, channel="3Blue1Brown") + srt = "1\n00:00:00,000 --> 00:00:04,000\nHello\n" + + await run_burn(job, srt) + + # burn_subtitles must have received watermark_text with the channel name + mock_burn.assert_called_once() + call_kwargs = mock_burn.call_args.kwargs + assert call_kwargs["watermark_text"] == "Source: 3Blue1Brown" + + # generate_intro must have been called with expected channel args + mock_intro.assert_called_once() + intro_kwargs = mock_intro.call_args.kwargs + assert intro_kwargs["channel"] == "3Blue1Brown" + assert intro_kwargs["channel_url"] == "https://youtube.com/@TestChannel" + + # concat_videos must have been called to combine intro + main + mock_concat.assert_called_once() + + # Final VIDEO output should be the concatenated file + assert job.output_files[FileType.VIDEO] == tmp_path / "final.mp4" + assert job.status == JobStatus.COMPLETED + + @patch("bilingualsub.api.pipeline.concat_videos") + @patch("bilingualsub.api.pipeline.generate_intro") + @patch("bilingualsub.api.pipeline.burn_subtitles") + async def test_when_channel_empty_then_burn_called_without_watermark_and_intro_skipped( + self, + mock_burn: object, + mock_intro: object, + mock_concat: object, + tmp_path: Path, + ) -> None: + """When video_channel is empty, watermark_text=None and generate_intro is never called.""" + mock_burn.return_value = tmp_path / "output.mp4" + + job = _make_burn_job(tmp_path, channel="") + srt = "1\n00:00:00,000 --> 00:00:04,000\nHello\n" + + await run_burn(job, srt) + + mock_burn.assert_called_once() + call_kwargs = mock_burn.call_args.kwargs + assert call_kwargs["watermark_text"] is None + + mock_intro.assert_not_called() + mock_concat.assert_not_called() + + assert job.output_files[FileType.VIDEO] == tmp_path / "output.mp4" + assert job.status == JobStatus.COMPLETED + + @patch("bilingualsub.api.pipeline.logger") + @patch("bilingualsub.api.pipeline.concat_videos") + @patch("bilingualsub.api.pipeline.generate_intro") + @patch("bilingualsub.api.pipeline.burn_subtitles") + async def test_when_generate_intro_fails_then_concat_skipped_and_job_completes( + self, + mock_burn: object, + mock_intro: object, + mock_concat: object, + mock_logger: object, + tmp_path: Path, + ) -> None: + """When generate_intro raises FFmpegError, concat is skipped but job still COMPLETED.""" + mock_log = MagicMock() + mock_logger.bind.return_value = mock_log # type: ignore[union-attr] + + mock_burn.return_value = tmp_path / "output.mp4" + mock_intro.side_effect = FFmpegError("lavfi failed") + + job = _make_burn_job(tmp_path, channel="BadChannel") + srt = "1\n00:00:00,000 --> 00:00:04,000\nHello\n" + + await run_burn(job, srt) + + mock_concat.assert_not_called() + # Job still completes - no error event + assert job.status == JobStatus.COMPLETED + # VIDEO should fall back to output.mp4, not final.mp4 + assert job.output_files[FileType.VIDEO] == tmp_path / "output.mp4" + # Degradation must be logged as a warning + mock_log.warning.assert_called_once_with( + "intro_generation_failed", error="lavfi failed" + ) + + @patch("bilingualsub.api.pipeline.logger") + @patch("bilingualsub.api.pipeline.concat_videos") + @patch("bilingualsub.api.pipeline.generate_intro") + @patch("bilingualsub.api.pipeline.burn_subtitles") + async def test_when_concat_fails_then_job_completes_with_output_video( + self, + mock_burn: object, + mock_intro: object, + mock_concat: object, + mock_logger: object, + tmp_path: Path, + ) -> None: + """When concat_videos raises FFmpegError, job still COMPLETED and VIDEO = output.mp4.""" + mock_log = MagicMock() + mock_logger.bind.return_value = mock_log # type: ignore[union-attr] + + mock_burn.return_value = tmp_path / "output.mp4" + mock_intro.return_value = tmp_path / "intro.mp4" + mock_concat.side_effect = FFmpegError("concat failed") + + job = _make_burn_job(tmp_path, channel="SomeChannel") + srt = "1\n00:00:00,000 --> 00:00:04,000\nHello\n" + + await run_burn(job, srt) + + assert job.status == JobStatus.COMPLETED + assert job.output_files[FileType.VIDEO] == tmp_path / "output.mp4" + # Degradation must be logged as a warning + mock_log.warning.assert_called_once_with("concat_failed", error="concat failed") + + +# --------------------------------------------------------------------------- +# run_download - channel metadata propagation +# --------------------------------------------------------------------------- + + +@pytest.mark.unit +@pytest.mark.asyncio +class TestRunDownloadChannel: + """Tests for channel metadata saved by run_download.""" + + @patch("bilingualsub.api.pipeline.extract_audio") + @patch("bilingualsub.api.pipeline.download_video") + async def test_when_youtube_url_with_channel_then_job_stores_channel( + self, + mock_download: object, + mock_extract_audio: object, + ) -> None: + """run_download must save video_channel from metadata.channel.""" + metadata = VideoMetadata( + title="Test", + duration=60.0, + width=1920, + height=1080, + fps=30.0, + channel="3Blue1Brown", + channel_url="https://www.youtube.com/channel/UCYO_jab_esuFRV4b17AJtAw", + ) + mock_download.return_value = metadata + + job = _make_job() + await run_download(job) + + assert job.video_channel == "3Blue1Brown" + + @patch("bilingualsub.api.pipeline.extract_audio") + @patch("bilingualsub.api.pipeline.download_video") + async def test_when_youtube_url_with_channel_then_channel_url_stored( + self, + mock_download: object, + mock_extract_audio: object, + ) -> None: + """run_download must save video_channel_url when source is YouTube.""" + channel_url = "https://www.youtube.com/channel/UCYO_jab_esuFRV4b17AJtAw" + metadata = VideoMetadata( + title="Test", + duration=60.0, + width=1920, + height=1080, + fps=30.0, + channel="3Blue1Brown", + channel_url=channel_url, + ) + mock_download.return_value = metadata + + job = _make_job() # source_url contains youtube.com + await run_download(job) + + assert job.video_channel_url == channel_url + + @patch("bilingualsub.api.pipeline.extract_audio") + @patch("bilingualsub.api.pipeline.download_video") + async def test_when_local_upload_then_video_channel_is_empty( + self, + mock_download: object, + mock_extract_audio: object, + tmp_path: Path, + ) -> None: + """Local upload jobs must have video_channel = '' after run_download.""" + local_video = tmp_path / "local.mp4" + local_video.write_bytes(b"fake") + + job = Job( + id="local01", + source_url="", + source_lang="en", + target_lang="zh-TW", + local_video_path=local_video, + ) + + with patch( + "bilingualsub.api.pipeline.extract_video_metadata", + return_value={ + "title": "Local", + "duration": 30.0, + "width": 1280, + "height": 720, + "fps": 24.0, + }, + ): + await run_download(job) + + assert job.video_channel == "" + + @patch("bilingualsub.api.pipeline.extract_audio") + @patch("bilingualsub.api.pipeline.download_video") + async def test_when_non_youtube_url_then_channel_url_cleared( + self, + mock_download: object, + mock_extract_audio: object, + ) -> None: + """Non-YouTube source must clear video_channel_url even if metadata has channel_url.""" + metadata = VideoMetadata( + title="Test", + duration=60.0, + width=1920, + height=1080, + fps=30.0, + channel="BilibiliUser", + channel_url="https://space.bilibili.com/12345", + ) + mock_download.return_value = metadata + + job = Job( + id="bilibili01", + source_url="https://www.bilibili.com/video/BV1234", + source_lang="zh", + target_lang="en", + ) + await run_download(job) + + assert job.video_channel == "BilibiliUser" + assert job.video_channel_url == "" # Non-YouTube → cleared diff --git a/tests/unit/utils/test_ffmpeg_intro.py b/tests/unit/utils/test_ffmpeg_intro.py new file mode 100644 index 0000000..4ccb131 --- /dev/null +++ b/tests/unit/utils/test_ffmpeg_intro.py @@ -0,0 +1,394 @@ +"""Unit tests for generate_intro, concat_videos, and burn_subtitles watermark branch.""" + +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest + +from bilingualsub.utils.ffmpeg import ( + FFmpegError, + _escape_drawtext, + burn_subtitles, + concat_videos, + generate_intro, +) + +# --------------------------------------------------------------------------- +# Shared helpers +# --------------------------------------------------------------------------- + +_MOCK_METADATA = { + "duration": 10.0, + "width": 1920, + "height": 1080, + "fps": 30.0, + "title": "test video", +} + + +def _make_popen_mock(returncode: int = 0) -> MagicMock: + """Return a pre-configured Popen mock that reports success by default.""" + process = MagicMock() + process.stdout = [] # no progress lines → no on_progress calls + process.wait.return_value = returncode + return process + + +def _get_popen_cmd(mock_popen: MagicMock) -> list[str]: + """Extract the command list from the first Popen call.""" + return mock_popen.call_args[0][0] + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + + +@pytest.fixture +def mock_burn_ffmpeg(): + """Mock subprocess.Popen, extract_video_metadata, and SpooledTemporaryFile + for burn_subtitles tests.""" + with ( + patch("bilingualsub.utils.ffmpeg.subprocess.Popen") as mock_popen, + patch("bilingualsub.utils.ffmpeg.extract_video_metadata") as mock_metadata, + patch("bilingualsub.utils.ffmpeg.tempfile.SpooledTemporaryFile") as mock_stderr, + ): + mock_popen.return_value = _make_popen_mock(returncode=0) + + mock_file = MagicMock() + mock_file.read.return_value = b"" + mock_stderr.return_value.__enter__.return_value = mock_file + + mock_metadata.return_value = _MOCK_METADATA + + yield { + "popen": mock_popen, + "metadata": mock_metadata, + "stderr_file": mock_stderr, + } + + +@pytest.fixture +def mock_intro_ffmpeg(): + """Mock subprocess.Popen and SpooledTemporaryFile for generate_intro tests.""" + with ( + patch("bilingualsub.utils.ffmpeg.subprocess.Popen") as mock_popen, + patch("bilingualsub.utils.ffmpeg.tempfile.SpooledTemporaryFile") as mock_stderr, + ): + mock_popen.return_value = _make_popen_mock(returncode=0) + + mock_file = MagicMock() + mock_file.read.return_value = b"" + mock_stderr.return_value.__enter__.return_value = mock_file + + yield { + "popen": mock_popen, + "stderr_file": mock_stderr, + } + + +# --------------------------------------------------------------------------- +# burn_subtitles — watermark branch +# --------------------------------------------------------------------------- + + +@pytest.mark.unit +class TestBurnSubtitlesWatermark: + """Tests for the watermark_text parameter of burn_subtitles.""" + + def test_when_watermark_text_given_then_vf_contains_drawtext( + self, tmp_path: Path, mock_burn_ffmpeg: dict + ) -> None: + """Given watermark_text, the -vf filter must include a drawtext segment.""" + video_path = tmp_path / "video.mp4" + video_path.write_bytes(b"fake video") + subtitle_path = tmp_path / "subtitle.srt" + subtitle_path.write_bytes(b"fake subtitle") + output_path = tmp_path / "output.mp4" + + burn_subtitles( + video_path, subtitle_path, output_path, watermark_text="Source: TestChannel" + ) + + cmd = _get_popen_cmd(mock_burn_ffmpeg["popen"]) + vf_idx = cmd.index("-vf") + vf_filter = cmd[vf_idx + 1] + # The filter chain must contain a drawtext element after a comma + assert "drawtext=" in vf_filter + assert "Source\\: TestChannel" in vf_filter + + def test_when_watermark_text_is_none_then_vf_has_no_drawtext( + self, tmp_path: Path, mock_burn_ffmpeg: dict + ) -> None: + """Given watermark_text=None, the -vf filter must NOT include drawtext (regression guard).""" + video_path = tmp_path / "video.mp4" + video_path.write_bytes(b"fake video") + subtitle_path = tmp_path / "subtitle.srt" + subtitle_path.write_bytes(b"fake subtitle") + output_path = tmp_path / "output.mp4" + + burn_subtitles(video_path, subtitle_path, output_path, watermark_text=None) + + cmd = _get_popen_cmd(mock_burn_ffmpeg["popen"]) + vf_idx = cmd.index("-vf") + vf_filter = cmd[vf_idx + 1] + assert "drawtext=" not in vf_filter + + def test_when_watermark_text_contains_colon_then_escaped_in_drawtext( + self, tmp_path: Path, mock_burn_ffmpeg: dict + ) -> None: + """Given watermark_text with ':', the colon must be escaped as \\: in the filter.""" + video_path = tmp_path / "video.mp4" + video_path.write_bytes(b"fake video") + subtitle_path = tmp_path / "subtitle.srt" + subtitle_path.write_bytes(b"fake subtitle") + output_path = tmp_path / "output.mp4" + + burn_subtitles(video_path, subtitle_path, output_path, watermark_text="Ch:Name") + + cmd = _get_popen_cmd(mock_burn_ffmpeg["popen"]) + vf_idx = cmd.index("-vf") + vf_filter = cmd[vf_idx + 1] + # Raw ':' must not appear as a bare character inside the drawtext value + assert "Ch\\:Name" in vf_filter + + def test_when_watermark_text_contains_single_quote_then_escaped_in_drawtext( + self, tmp_path: Path, mock_burn_ffmpeg: dict + ) -> None: + """Given watermark_text with \"'\", it must be escaped as \\' in the filter.""" + video_path = tmp_path / "video.mp4" + video_path.write_bytes(b"fake video") + subtitle_path = tmp_path / "subtitle.srt" + subtitle_path.write_bytes(b"fake subtitle") + output_path = tmp_path / "output.mp4" + + burn_subtitles( + video_path, subtitle_path, output_path, watermark_text="Bob's Channel" + ) + + cmd = _get_popen_cmd(mock_burn_ffmpeg["popen"]) + vf_idx = cmd.index("-vf") + vf_filter = cmd[vf_idx + 1] + assert "Bob\\'s Channel" in vf_filter + + +# --------------------------------------------------------------------------- +# generate_intro +# --------------------------------------------------------------------------- + + +@pytest.mark.unit +class TestGenerateIntro: + """Tests for generate_intro.""" + + def test_when_channel_url_given_then_cmd_contains_color_source_and_channel_url( + self, tmp_path: Path, mock_intro_ffmpeg: dict + ) -> None: + """generate_intro with channel_url must use lavfi color source and embed the URL.""" + output_path = tmp_path / "intro.mp4" + + generate_intro( + output_path, + width=1280, + height=720, + fps=30.0, + channel="TestChannel", + video_title="My Video", + video_url="https://youtube.com/watch?v=abc", + channel_url="https://youtube.com/@TestChannel", + ) + + cmd = _get_popen_cmd(mock_intro_ffmpeg["popen"]) + # Must use lavfi color input (not a file path) + assert "-f" in cmd + lavfi_idx = cmd.index("-f") + assert cmd[lavfi_idx + 1] == "lavfi" + assert any("color=c=black" in arg for arg in cmd) + # Channel URL must appear in the -vf filter + vf_idx = cmd.index("-vf") + vf_value = cmd[vf_idx + 1] + assert "TestChannel" in vf_value + # _escape_drawtext converts ":" to "\:", so the colon in https: is escaped + assert "https\\://youtube.com/@TestChannel" in vf_value + # 13 drawtext blocks when channel_url is present (12 without) + assert vf_value.count("drawtext=") == 13 + + def test_when_channel_url_empty_then_vf_does_not_contain_channel_url_value( + self, tmp_path: Path, mock_intro_ffmpeg: dict + ) -> None: + """generate_intro with channel_url='' must NOT embed the URL in the filter.""" + output_path = tmp_path / "intro.mp4" + channel_url = "https://youtube.com/@ShouldNotAppear" + + generate_intro( + output_path, + width=1280, + height=720, + fps=30.0, + channel="TestChannel", + video_title="My Video", + video_url="https://youtube.com/watch?v=abc", + channel_url="", # empty → skip channel URL drawtext block + ) + + cmd = _get_popen_cmd(mock_intro_ffmpeg["popen"]) + vf_idx = cmd.index("-vf") + vf_value = cmd[vf_idx + 1] + assert channel_url not in vf_value + # Exactly 12 drawtext blocks when channel_url is omitted (13 when present) + assert vf_value.count("drawtext=") == 12 + + def test_when_ffmpeg_fails_then_raises_ffmpeg_error( + self, tmp_path: Path, mock_intro_ffmpeg: dict + ) -> None: + """generate_intro must raise FFmpegError when the subprocess exits non-zero.""" + output_path = tmp_path / "intro.mp4" + + mock_process = mock_intro_ffmpeg["popen"].return_value + mock_process.wait.return_value = 1 # non-zero → failure + + mock_file = mock_intro_ffmpeg["stderr_file"].return_value.__enter__.return_value + mock_file.read.return_value = b"lavfi color error" + + with pytest.raises(FFmpegError, match="Failed to generate intro"): + generate_intro( + output_path, + width=1280, + height=720, + fps=30.0, + channel="Ch", + video_title="T", + video_url="https://example.com", + ) + + def test_generate_intro_always_uses_libx264_regardless_of_platform( + self, tmp_path: Path, mock_intro_ffmpeg: dict + ) -> None: + """generate_intro must always pass -c:v libx264 even when platform is darwin.""" + output_path = tmp_path / "intro.mp4" + + with patch("bilingualsub.utils.ffmpeg.sys.platform", "darwin"): + generate_intro( + output_path, + width=1920, + height=1080, + fps=30.0, + channel="Ch", + video_title="T", + video_url="https://example.com", + ) + + cmd = _get_popen_cmd(mock_intro_ffmpeg["popen"]) + assert "-c:v" in cmd + cv_idx = cmd.index("-c:v") + assert cmd[cv_idx + 1] == "libx264" + # VideoToolbox must NOT be used for intro + assert "h264_videotoolbox" not in cmd + + +# --------------------------------------------------------------------------- +# concat_videos +# --------------------------------------------------------------------------- + + +@pytest.mark.unit +class TestConcatVideos: + """Tests for concat_videos.""" + + @pytest.fixture + def mock_concat_ffmpeg(self): + """Mock subprocess.Popen, SpooledTemporaryFile, and extract_video_metadata + for concat_videos tests.""" + with ( + patch("bilingualsub.utils.ffmpeg.subprocess.Popen") as mock_popen, + patch( + "bilingualsub.utils.ffmpeg.tempfile.SpooledTemporaryFile" + ) as mock_stderr, + patch("bilingualsub.utils.ffmpeg.extract_video_metadata") as mock_metadata, + ): + mock_popen.return_value = _make_popen_mock(returncode=0) + + mock_file = MagicMock() + mock_file.read.return_value = b"" + mock_stderr.return_value.__enter__.return_value = mock_file + + mock_metadata.return_value = _MOCK_METADATA + + yield { + "popen": mock_popen, + "stderr_file": mock_stderr, + "metadata": mock_metadata, + } + + def test_when_both_inputs_exist_then_cmd_uses_concat_demuxer_and_copy( + self, tmp_path: Path, mock_concat_ffmpeg: dict + ) -> None: + """concat_videos must build a command with -f concat and -c copy.""" + first = tmp_path / "intro.mp4" + first.write_bytes(b"fake intro") + second = tmp_path / "main.mp4" + second.write_bytes(b"fake main") + output = tmp_path / "final.mp4" + + concat_videos(first, second, output) + + cmd = _get_popen_cmd(mock_concat_ffmpeg["popen"]) + + # Must specify concat demuxer + assert "-f" in cmd + f_idx = cmd.index("-f") + assert cmd[f_idx + 1] == "concat" + + # Must copy streams without re-encoding + assert "-c" in cmd + c_idx = cmd.index("-c") + assert cmd[c_idx + 1] == "copy" + + def test_when_first_input_does_not_exist_then_raises_ffmpeg_error( + self, tmp_path: Path + ) -> None: + """concat_videos must raise FFmpegError when the first input file is missing.""" + first = tmp_path / "nonexistent.mp4" # does not exist + second = tmp_path / "main.mp4" + second.write_bytes(b"fake main") + output = tmp_path / "final.mp4" + + with pytest.raises(FFmpegError, match="First video does not exist"): + concat_videos(first, second, output) + + def test_when_second_input_does_not_exist_then_raises_ffmpeg_error( + self, tmp_path: Path + ) -> None: + """concat_videos must raise FFmpegError when the second input file is missing.""" + first = tmp_path / "intro.mp4" + first.write_bytes(b"fake intro") + second = tmp_path / "nonexistent.mp4" # does not exist + output = tmp_path / "final.mp4" + + with pytest.raises(FFmpegError, match="Second video does not exist"): + concat_videos(first, second, output) + + +# --------------------------------------------------------------------------- +# _escape_drawtext (pure-function unit tests) +# --------------------------------------------------------------------------- + + +@pytest.mark.unit +class TestEscapeDrawtext: + """Tests for the _escape_drawtext helper.""" + + def test_colon_is_escaped(self) -> None: + assert _escape_drawtext("a:b") == "a\\:b" + + def test_single_quote_is_escaped(self) -> None: + assert _escape_drawtext("it's") == "it\\'s" + + def test_backslash_is_doubled(self) -> None: + assert _escape_drawtext("a\\b") == "a\\\\b" + + def test_plain_text_unchanged(self) -> None: + assert _escape_drawtext("hello world") == "hello world" + + def test_percent_is_doubled(self) -> None: + assert _escape_drawtext("100%") == "100%%"