diff --git a/.gitignore b/.gitignore index ca46faeb..2f5d37ca 100644 --- a/.gitignore +++ b/.gitignore @@ -171,4 +171,10 @@ config.backup.yaml # runtime runtime/ dev/ -installer_files/ \ No newline at end of file +installer_files/ + +# config +config.yaml +# bilibili upload +cookies.json +qrcode.png diff --git a/batch/tasks_setting.xlsx b/batch/tasks_setting.xlsx index 2ebd2df9..477cc99c 100644 Binary files a/batch/tasks_setting.xlsx and b/batch/tasks_setting.xlsx differ diff --git a/batch/utils/batch_processor_get_title_introduction.py b/batch/utils/batch_processor_get_title_introduction.py new file mode 100644 index 00000000..e554b8a8 --- /dev/null +++ b/batch/utils/batch_processor_get_title_introduction.py @@ -0,0 +1,145 @@ +import os, sys +sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..'))#补充当前文件的父目录到系统路径 +from core.ask_gpt import ask_gpt +import re +from pathlib import Path +from core.prompts_storage import get_title_introduction_prompt +import pandas as pd +from rich import print as rprint +import json +import shutil + +def clean_srt_date(srt_date): + pass + +#### 为了提高主页的CTR,需要手动优化标题前缀 +prefix_base_dir = '[加州大学伯克利分校-电子工程与计算机科学]-' +### + +def read_all_trans_srt(): + ''' + need move to root path + ''' + rprint(f"[yellow]read the srt from the output folder....[/yellow]") + all_trans_str=[] + print(os.path.dirname(__file__)) + base_path = Path(os.path.dirname(__file__)).parent / 'output' + # 获取所有Lecture文件夹并按数字排序 + lecture_folders = [f for f in base_path.iterdir() if f.is_dir() ] + lecture_folders.sort(key=lambda x: int(re.search(r'Lecture (\d+)', x.name).group(1))) + + # 按顺序读取每个trans.srt文件 + for folder in lecture_folders: + trans_file = folder / "trans.srt" + if trans_file.exists(): + with open(trans_file, 'r', encoding='utf-8') as f: + content = str(trans_file.parent) + '||' + str(trans_file.parent.name) + '||' + f.read() + print(content) + all_trans_str.append(content) + rprint(f"[green]🎉 read the all srt {len(all_trans_str)} from the output folder completed![/green]") + rprint(f"[green]=================================================[/green]") + rprint(f"[green]{all_trans_str}[/green]") + rprint(f"[green]=================================================[/green]") + return all_trans_str + + +def copy_and_rename_videos(responses, result_dir_path): + """ + 根据生成的标题,复制并重命名视频文件 + + Args: + responses: 包含file_path和title的字典列表 + result_dir_path: 目标目录路径 (字符串或Path对象) + + Returns: + tuple: (成功数量, 总数量) + """ + rprint(f"[yellow]start copy and rename videos....[/yellow]") + # 创建result目录 + result_dir = Path(result_dir_path) + result_dir.mkdir(exist_ok=True) + + total_count = len(responses) + rprint(f"[yellow]start copy and rename videos....[/yellow]") + for i, response in enumerate(responses, 1): + file_path = response.get('file_path', '') + title = response.get('title', f'unknown_{i}') + + rprint(f"[yellow]🌐 processing {i+1}/{total_count}[/yellow]") + # 构建源文件路径 + source_video = Path(file_path) / 'output_sub.mp4' + + if source_video.exists(): + # 清理标题作为新文件名 + new_filename = f"{prefix_base_dir} + {title}.mp4" + target_path = result_dir / new_filename + + try: + # 复制文件 + shutil.copy2(source_video, target_path) + except Exception as e: + rprint(f"[red]❌ 复制失败: {e}[/red]") + else: + rprint(f"[red]⚠️ 源文件不存在: {source_video}[/red]") + + +# 根据当前output的目录中的简介调用大模型 +# 批量生成视频的标题和简介 +def get_tasks_setting_info(): + base_path = Path(os.path.dirname(__file__)).parent / 'tasks_setting.xlsx' + df = pd.read_excel(base_path) + return df + +def f(response_data): + try: + json.loads(response_data) + return response_data + except (json.JSONDecodeError, ValueError): + return "{'title':'error', 'introduction':'error'}" + +def get_title_introduction_batch(): + responses = [] + trans_srtall_trans_srt =read_all_trans_srt() + trans_srt_len = len(all_trans_srt) + for i in range(trans_srt_len): + trans_srt = all_trans_srt[i] + rprint(f"[yellow]🌐 processing {i}/{trans_srt_len}[/yellow]") + prompt = get_title_introduction_prompt(trans_srt); + try: + response = ask_gpt(prompt, response_json=True, log_title='subtitle_trim') + responses.append(response) + rprint(f"[yellow]{responses[-1]}[/yellow]") + except Exception as e: + print(f"Error: {e}") + flat_responses = [] + # 去除responses数组中的空字符串元素 + for item in responses: + if isinstance(item, list): + # 如果是列表,展开里面的字典 + flat_responses.extend(item) + elif isinstance(item, dict): + # 如果是字典,直接添加 + flat_responses.append(item) + + responses = flat_responses + rprint(f"[green]=================================================[/green]") + rprint(f"[green]🎉 responses:[/green]") + rprint(responses) + rprint("[green]🎉 All processing completed![/green]") + + base_path = Path(os.path.dirname(__file__)).parent + + tasks_setting_info = get_tasks_setting_info() + + copy_and_rename_videos(responses, base_path / 'result') + # 将 responses JSON 数组转换成 DataFrame + responses_df = pd.DataFrame(responses) + result_df = pd.concat([tasks_setting_info, responses_df], axis=1) + result_df.to_excel(base_path / 'result.xlsx' , index=False, engine='openpyxl') + + + + + +if __name__ == "__main__": + get_title_introduction_batch() \ No newline at end of file diff --git a/batch/utils/upload_video_2_bilibili/upload_video_2_bilibili.py b/batch/utils/upload_video_2_bilibili/upload_video_2_bilibili.py new file mode 100644 index 00000000..ed5d33d5 --- /dev/null +++ b/batch/utils/upload_video_2_bilibili/upload_video_2_bilibili.py @@ -0,0 +1,274 @@ +import os +import time +import subprocess +from pathlib import Path +from typing import Collection +import pandas as pd +from rich.console import Console +from rich.panel import Panel +import json +import re +from core.prompts_storage import get_title_introduction_prompt +from core.ask_gpt import ask_gpt +from core.config_utils import load_key +import pexpect +import sys +import datetime + + +console = Console() + +##############参数控制################## + +TID=36 # 野生技术协会 +COLLECTION="[Web3]智能合约开发教程" +################################ + +EXCEL_DEFAULT_PATH = os.path.join("batch", "output", "bilibili_upload_tasks.xlsx") + +def method1_upload(video_path, title, tags, introduction, schedule_time, partition, collection=None, cookies_path="cookies.json"): + # 如果当前的 biliup 不存在 就进行安装 + from shutil import which + if which("biliup") is None: + os.system('pip install biliup') + # biliup login 首先进行bilibili登陆操作 + os.system('biliup login') + # biliup 进行视频上传操作 + if not video_path or not os.path.exists(video_path): + raise ValueError(f"视频路径不存在: {video_path}") + args = [video_path, "--title", "\"" + (title or Path(video_path).parent.name) + "\""] + + + if introduction: + args += ["--desc", "\""+ introduction + "\""] + if tags: + args += ["--tag", "\"" + tags + "\""] + if partition and str(partition).strip().isdigit(): + args += ["--tid", "\"" + str(int(partition)) + "\""] + if schedule_time and str(schedule_time).strip().isdigit(): + args += ["--dtime", "\"" + str(int(schedule_time)) + "\""] + # 合集 + if collection: + args += ["--collection", "\"" + str(collection) + "\"" ] + + # 需要先运行这个命令,阻塞当前的进程 + cmd = ["biliup"] + if cookies_path and os.path.exists(cookies_path): + cmd += ["-u", cookies_path] + cmd += ["upload"] + args + print("cmd: " + ' '.join(cmd)) + exit_code = os.system(' '.join(cmd)) + + # 在 Unix 系统中,0 表示成功 + if exit_code == 0: + print("✅ biliup login 执行成功") + return True + else: + print(f"❌ biliup login 执行失败,退出码: {exit_code}") + return False + + +def method2_generate_excel(output_root="batch/output", excel_path=EXCEL_DEFAULT_PATH): + base = Path(output_root) + rows = [] + + # 获取当前时间 + now = datetime.datetime.now() + # 获取明天的日期,时间设为18:00:00 + tomorrow_6pm = now.replace(hour=18, minute=0, second=0, microsecond=0) + datetime.timedelta(days=1) + # 转换为时间戳 + base_timestamp = int(tomorrow_6pm.timestamp()) + # Debug + # print(base_timestamp) + # print(tomorrow_6pm) + # print(base) + if base.exists(): + for child in base.iterdir(): + if child.is_dir(): + preferred = child / "output_sub.mp4" + if preferred.exists(): + video_path = str(preferred) + else: + mp4s = list(child.glob("*.mp4")) + video_path = str(mp4s[0]) if mp4s else "" + desc_path = child / "log" / "sentence_splitbynlp.txt" + desc = "" + + try: + if desc_path.exists(): + desc = desc_path.read_text(encoding="utf-8").strip() + except Exception: + desc = "" + + prompt = get_title_introduction_prompt(desc); + # 通过调用当前的 gpt的方法来进行 标题和简介的生成 + try: + desc = ask_gpt(prompt, response_json=True, log_title='subtitle_trim') + except Exception as e: + print(f"Error: {e}") + # DEBUG + # print("测试 : ") + # print(desc) + # DEBUG + title = desc['title'] + introduction = desc['introduction'] + tags = desc['tags'] + rows.append({ + "视频路径": video_path, + "标题": title, + "标签": tags, + "描述简介": introduction, + "版权声明": 1, + "定时发布时间戳": base_timestamp, + "分区": TID, + "加入合集": COLLECTION + }) + base_timestamp += 86400 + df = pd.DataFrame(rows) + os.makedirs(os.path.dirname(excel_path), exist_ok=True) + df.to_excel(excel_path, index=False, engine="openpyxl") + console.print(Panel(f"Excel 生成完成: {excel_path}", title="[bold green]方法2[/bold green]")) + return excel_path + +def method3_upload_from_excel(excel_path=EXCEL_DEFAULT_PATH, cookies=None): + df = pd.read_excel(excel_path) + status_col = "Status" + if status_col not in df.columns: + df[status_col] = "" + try: + df[status_col] = df[status_col].astype(str) + except Exception: + pass + for idx, row in df.iterrows(): + if str(df.at[idx, status_col]).strip().lower() == "done": + continue + try: + video_path = str(row.get("视频路径", "")).strip() + title = str(row.get("标题", "")) + tags = str(row.get("标签", "")) + introduction = str(row.get("描述简介", "")) + description = str(row.get("版权声明", "")) + schedule_time = str(row.get("定时发布时间戳", "")) + partition = str(row.get("分区", "")) + collection = str(row.get("加入合集", "")) + # + cookies_use = cookies if (cookies and os.path.exists(str(cookies))) else None + console.print(Panel( + f"视频路径: {video_path}\n" + f"标题: {title}\n" + f"标签: {tags}\n" + f"描述简介: {introduction}\n" + f"版权声明/描述: {description}\n" + f"定时发布时间戳: {schedule_time}\n" + f"分区:{partition}\n" + f"加入合集: {collection}", + title="[bold blue]上传参数[/bold blue]" + )) + # + method1_upload( + video_path=video_path, title=title, tags=tags, introduction=introduction, schedule_time=schedule_time, partition=partition, collection=collection, cookies_path="cookies.json" + ) + + df.at[idx, status_col] = "Done" + console.print(Panel(f"上传完成: {row.get('视频路径', '')}", title="[bold green]方法3[/bold green]")) + except Exception as e: + msg = re.sub(r"[\x00-\x08\x0b-\x0c\x0e-\x1f]", "", str(e)).replace("\n", " ").strip() + df.at[idx, status_col] = f"Error: {msg}" + console.print(Panel(str(e), title="[bold red]上传失败[/bold red]")) + finally: + df.to_excel(excel_path, index=False, engine="openpyxl") + return True + +# 生产环境 +if __name__ == "__main__": + import argparse + parser = argparse.ArgumentParser() + sub = parser.add_subparsers(dest="cmd") + p1 = sub.add_parser("upload-video") + p1.add_argument("--video", required=True) + p1.add_argument("--cover", default="") + p1.add_argument("--partition_tid", default="") + p1.add_argument("--tags", default="") + p1.add_argument("--description", default="") + p1.add_argument("--schedule_time", default="") + p1.add_argument("--collection", default="") + p1.add_argument("--cookies", default="cookies.json") + p1.add_argument("--proxy", default=None) + p1.add_argument("--title", default=None) + p2 = sub.add_parser("generate-excel") + p2.add_argument("--output-root", default="batch/output") + p2.add_argument("--excel", default=EXCEL_DEFAULT_PATH) + p3 = sub.add_parser("upload-excel") + p3.add_argument("--excel", default=EXCEL_DEFAULT_PATH) + p3.add_argument("--cookies", default="cookies.json") + p3.add_argument("--proxy", default=None) + args = parser.parse_args() + if args.cmd == "upload-video": + method1_upload( + video_path=args.video, + cover=args.cover, + partition_tid=args.partition_tid, + tags=args.tags, + description=args.description, + schedule_time=args.schedule_time, + collection=args.collection, + cookies_path=args.cookies, + proxy=args.proxy, + title=args.title + ) + elif args.cmd == "generate-excel": + method2_generate_excel(output_root=args.output_root, excel_path=args.excel) + elif args.cmd == "upload-excel": + method3_upload_from_excel(excel_path=args.excel, cookies=args.cookies, proxy=args.proxy) + else: + parser.print_help() +## 测试环境 +# if __name__ == '__main__': + # method3_upload_from_excel() + # method2_generate_excel() +# method1_upload( +# video_path="batch/output/segment_02/output_sub.mp4", +# cover="", +# partition_tid="", +# tags="第1章:[智能合约] 无需信任-透明协议-价值互联", +# description="""🌐 区块链的信任危机与解决方案: + +# 你是否曾因不信任中介机构而感到焦虑?麦当劳彩票舞弊、银行倒闭事件、Robinhood限制交易……历史一次次证明,承诺往往不堪一击。区块链智能合约应运而生,它能否终结“不信任”的怪圈? + +# 🔑 智能合约:信任的基石 + +# 智能合约是一种部署在去中心化区块链上的协议,一旦部署,便不可篡改。它像一个自动执行的数字协议,公开透明,无需人为干预。通过密码学和代码,智能合约确保了协议的公平执行,让信任不再依赖于人品,而是依赖于数学。 + +# 💡 智能合约如何解决现实问题? + +# * 麦当劳彩票舞弊:将彩票代码部署到区块链上,每次黑客尝试篡改,所有人都会收到通知,且无法更改。 +# * Robinhood限制交易:使用去中心化交易所,无需中心化机构,避免单方面限制交易。 +# * 银行倒闭:通过透明的偿付能力检查,构建类似银行的智能合约,防止资不抵债。 + +# 🌟 智能合约的优势 + +# * 去中心化:无需信任中介机构,协议由去中心化网络执行。 +# * 透明性:所有交易和代码公开可查,杜绝暗箱操作。 +# * 高效性:交易瞬间完成,无需漫长的清算和结算。 +# * 安全性:难以篡改,保护资产安全。 + +# 🌱 智能合约的应用 + +# * DeFi (去中心化金融):提供无需信任的金融服务。 +# * DAO (去中心化自治组织):通过智能合约实现社区自治。 +# * NFT (非同质化代币):赋予数字资产独一无二的价值。 + +# 🚀 加入智能合约的未来 + +# 智能合约正在重塑各行各业,从金融到艺术,再到供应链管理。现在就加入这场革命,探索智能合约的无限可能! + +# #智能合约 #区块链 #去中心化 #DeFi #信任危机 #技术未来""", +# schedule_time="", +# collection="", +# cookies_path="cookies.json", +# proxy=None, +# title=None +# ) + + +# 测试命令: biliup upload /Users/luogaiyu/code/VideoLingo/batch/output/segment_02/output_sub.mp4 --title "测试视频" --tag "测试,视频" --desc "这是一个测试视频" --copyright 1 --dtime 1767862800 --tid 36 diff --git a/config.yaml b/config.yaml index 3622e7c8..bd1213b7 100644 --- a/config.yaml +++ b/config.yaml @@ -6,12 +6,12 @@ display_language: "zh-CN" # API settings api: - key: 'your_api_key' + key: '' base_url: 'https://api.302.ai' model: 'gemini-2.0-flash' # Language settings, written into the prompt, can be described in natural language -target_language: '简体中文' +target_language: 'zh' # Whether to use Demucs for vocal separation before transcription demucs: true @@ -23,9 +23,9 @@ whisper: language: 'en' detected_language: 'en' # Whisper running mode ["local", "cloud", "elevenlabs"]. Specifies where to run, cloud uses 302.ai API - runtime: 'local' + runtime: 'cloud' # 302.ai API key - whisperX_302_api_key: 'your_302_api_key' + whisperX_302_api_key: 'sk-SZZ4FDEHYZN7vSbw45VkhOnfkkz6NXeNDEwemvc0H2jQF1SC' # ElevenLabs API key elevenlabs_api_key: 'your_elevenlabs_api_key' @@ -67,7 +67,7 @@ tts_method: 'f5tts' # SiliconFlow FishTTS sf_fish_tts: # SiliconFlow API key - api_key: 'YOUR_API_KEY' + api_key: '' # only for mode "preset" voice: 'anna' # *only for mode "custom", dont set manually @@ -78,17 +78,17 @@ sf_fish_tts: # OpenAI TTS-1 API configuration, 302.ai API only openai_tts: - api_key: 'YOUR_302_API_KEY' + api_key: '' voice: 'alloy' # Azure configuration, 302.ai API only azure_tts: - api_key: 'YOUR_302_API_KEY' + api_key: '' voice: 'zh-CN-YunfengNeural' # FishTTS configuration, 302.ai API only fish_tts: - api_key: 'YOUR_302_API_KEY' + api_key: '' character: 'AD学姐' character_id_dict: 'AD学姐': '7f92f8afb8ec43bf81429cc1c9199cb1' @@ -108,7 +108,7 @@ gpt_sovits: refer_mode: 3 f5tts: - 302_api: 'YOUR_302_API_KEY' + 302_api: '' # *Audio speed range speed_factor: diff --git a/core/all_whisper_methods/audio_preprocess.py b/core/all_whisper_methods/audio_preprocess.py index b722df1f..58cd903a 100644 --- a/core/all_whisper_methods/audio_preprocess.py +++ b/core/all_whisper_methods/audio_preprocess.py @@ -147,6 +147,7 @@ def save_results(df: pd.DataFrame): # Remove rows where 'text' is empty initial_rows = len(df) + df = df[df['text'].str.len() > 0] removed_rows = initial_rows - len(df) if removed_rows > 0: diff --git a/core/all_whisper_methods/demucs_vl.py b/core/all_whisper_methods/demucs_vl.py index de975fa9..58d9dbe5 100644 --- a/core/all_whisper_methods/demucs_vl.py +++ b/core/all_whisper_methods/demucs_vl.py @@ -10,6 +10,9 @@ from demucs.api import Separator from demucs.apply import BagOfModels import gc +import logging + +log = logging.getLogger(__name__) AUDIO_DIR = "output/audio" RAW_AUDIO_FILE = os.path.join(AUDIO_DIR, "raw.mp3") @@ -43,6 +46,7 @@ def demucs_main(): "clip": "rescale", "as_float": False, "bits_per_sample": 16} console.print("🎤 Saving vocals track...") + log.info(f"vocals shape: {outputs['vocals'].shape}") save_audio(outputs['vocals'].cpu(), VOCAL_AUDIO_FILE, **kwargs) console.print("🎹 Saving background music...") diff --git a/core/all_whisper_methods/whisperX_302.py b/core/all_whisper_methods/whisperX_302.py index 621bed94..da623cfa 100644 --- a/core/all_whisper_methods/whisperX_302.py +++ b/core/all_whisper_methods/whisperX_302.py @@ -1,4 +1,4 @@ -import requests +import subprocess import sys, os sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) from core.config_utils import load_key @@ -7,10 +7,11 @@ import json import librosa import soundfile as sf -import io +import tempfile from core.all_whisper_methods.audio_preprocess import save_language OUTPUT_LOG_DIR = "output/log" + def transcribe_audio_302(raw_audio_path: str, vocal_audio_path: str, start: float = None, end: float = None): os.makedirs(OUTPUT_LOG_DIR, exist_ok=True) LOG_FILE = f"{OUTPUT_LOG_DIR}/whisperx302_{start}_{end}.json" @@ -19,50 +20,136 @@ def transcribe_audio_302(raw_audio_path: str, vocal_audio_path: str, start: floa return json.load(f) WHISPER_LANGUAGE = load_key("whisper.language") - save_language(WHISPER_LANGUAGE) # since 302ai doesn't return language - url = "https://api.302.ai/302/whisperx" + save_language(WHISPER_LANGUAGE) # 加载音频并处理start和end参数 y, sr = librosa.load(vocal_audio_path, sr=16000) - audio_duration = len(y) / sr if start is None or end is None: start = 0 end = audio_duration + # 如果文件是属于 只传一次的话 + if os.path.exists(LOG_FILE): + with open(LOG_FILE, "r", encoding="utf-8") as f: + return json.load(f) - start_sample = int(start * sr) - end_sample = int(end * sr) - y_slice = y[start_sample:end_sample] - - # 将音频数据直接写入内存缓冲区 - audio_buffer = io.BytesIO() - sf.write(audio_buffer, y_slice, sr, format='WAV', subtype='PCM_16') - audio_buffer.seek(0) - - files = [ - ('audio_input', ( - 'audio_slice.wav', # 虚拟文件名 - audio_buffer, - 'application/octet-stream' - )) - ] - - payload = { - "processing_type": "align", - "language": WHISPER_LANGUAGE, - "output": "raw" - } - - start_time = time.time() - rprint(f"[cyan]🎤 Transcribing audio with language: <{WHISPER_LANGUAGE}> ...[/cyan]") - headers = {'Authorization': f'Bearer {load_key("whisper.whisperX_302_api_key")}'} - response = requests.request("POST", url, headers=headers, data=payload, files=files) - - response_json = response.json() + # ✅ 新代码 - 使用FFmpeg切分: + if start is not None and end is not None and (start >= 0 or end <= audio_duration): + # 使用FFmpeg直接切分,保持原始格式 + with tempfile.NamedTemporaryFile(suffix='.mp3', delete=False) as temp_file: + ffmpeg_command = [ + 'ffmpeg', + '-i', vocal_audio_path, + '-ss', str(start), + '-t', str(end - start), + '-c', 'copy', # 复制编码,不重新编码 + '-y', # 覆盖输出文件 + temp_file.name + ] + + rprint(f"[cyan]🔪 使用FFmpeg切分音频: {start}s - {end}s[/cyan]") + ffmpeg_result = subprocess.run(ffmpeg_command, capture_output=True, text=True) + if ffmpeg_result.returncode != 0: + rprint(f"[red]❌ FFmpeg切分失败: {ffmpeg_result.stderr}[/red]") + return None + + audio_file = temp_file.name + + # 检查切分后的文件 + file_size = os.path.getsize(audio_file) + rprint(f"[green]✓ 切分完成,文件大小: {file_size / 1024 / 1024:.1f}MB[/green]") + else: + # 直接使用原始文件 + audio_file = vocal_audio_path + rprint(f"[cyan]📁 使用完整音频文件[/cyan]") + # # 创建临时音频文件 + # audio_file = "output/audio/raw.mp3" + try: + + # 构建curl命令 - 完全模拟你成功的命令 + api_key = load_key("whisper.whisperX_302_api_key") + + curl_command = [ + 'curl', + '--proxy', 'http://127.0.0.1:7897', + '-X', 'POST', + '-H', f'Authorization: Bearer {api_key}', + '-F', f'audio_input=@{audio_file}', + '-F', f'processing_type=align', + '-F', f'output=raw', + '-F', f'language={WHISPER_LANGUAGE}', + 'https://api.302.ai/302/whisperx' + ] + + start_time = time.time() + rprint(f"[cyan]🎤 使用curl转录音频,语言: <{WHISPER_LANGUAGE}> ...[/cyan]") + + # 打印实际执行的命令(正确格式化) + cmd_parts = [] + for arg in curl_command: + if ' ' in arg or arg.startswith('Authorization:') or arg.startswith('Content-Type:'): + cmd_parts.append(f'"{arg}"') + else: + cmd_parts.append(arg) + cmd_str = ' '.join(cmd_parts) + rprint(f"[yellow]执行命令: {cmd_str}[/yellow]") + + # 执行curl命令 + result = subprocess.run( + curl_command, + capture_output=True, + text=True, + timeout=180 + ) + print(result) + if result.returncode != 0: + rprint(f"[red]❌ curl命令失败 (返回码: {result.returncode})[/red]") + rprint(f"[red]错误信息: {result.stderr}[/red]") + if result.stdout: + rprint(f"[yellow]输出信息: {result.stdout}[/yellow]") + return None + + # 解析JSON响应 + try: + response_json = json.loads(result.stdout) + rprint(f"[green]✓ 成功获取响应[/green]") + + # 检查响应格式并转换为标准格式 + if 'segments' not in response_json and 'text' in response_json: + # 如果是简单的whisper格式,转换为segments格式 + response_json = { + 'segments': [{ + 'start': 0, + 'end': audio_duration, + 'text': response_json['text'] + }], + 'language': WHISPER_LANGUAGE + } + + rprint(f"[green]✓ 成功获取 {len(response_json.get('segments', []))} 个片段[/green]") + + except json.JSONDecodeError as e: + rprint(f"[red]❌ JSON解析失败: {e}[/red]") + rprint(f"[yellow]原始响应: {result.stdout[:500]}...[/yellow]") + return None + + except subprocess.TimeoutExpired: + rprint(f"[red]❌ 请求超时[/red]") + return None + except Exception as e: + rprint(f"[red]❌ 执行失败: {e}[/red]") + return None + finally: + # ✅ 修复:只删除临时文件 + if audio_file != vocal_audio_path: + try: + os.remove(audio_file) + except: + pass # 调整时间戳 - if start is not None: - for segment in response_json['segments']: + if start is not None and start > 0: + for segment in response_json.get('segments', []): segment['start'] += start segment['end'] += start for word in segment.get('words', []): @@ -76,10 +163,16 @@ def transcribe_audio_302(raw_audio_path: str, vocal_audio_path: str, start: floa json.dump(response_json, f, indent=4, ensure_ascii=False) elapsed_time = time.time() - start_time - rprint(f"[green]✓ Transcription completed in {elapsed_time:.2f} seconds[/green]") + rprint(f"[green]✓ 转录完成,耗时 {elapsed_time:.2f} 秒[/green]") return response_json if __name__ == "__main__": # 使用示例: - result = transcribe_audio_302("output/audio/raw.mp3", "output/audio/raw.mp3") - rprint(result) + result = transcribe_audio_302("output/audio/vocal.mp3", "output/audio/vocal.mp3") + # if result: + # rprint(f"[green]成功!获得 {len(result.get('segments', []))} 个片段[/green]") + # # 打印第一个片段的内容 + # if result.get('segments'): + # rprint(f"[cyan]第一个片段: {result['segments'][0].get('text', 'N/A')}[/cyan]") + # else: + # rprint("[red]失败![/red]") \ No newline at end of file diff --git a/core/prompts_storage.py b/core/prompts_storage.py index ea10e140..90072934 100644 --- a/core/prompts_storage.py +++ b/core/prompts_storage.py @@ -18,6 +18,27 @@ def get_split_prompt(sentence, num_parts = 2, word_limit = 20): 3. Split at natural points like punctuation marks or conjunctions 4. If provided text is repeated words, simply split at the middle of the repeated words. +## Example +Input: 'This is a long sentence that needs splitting for subtitles' +Expected Output JSON: +{{ + "analysis": "Long sentence about subtitle splitting, can be split after 'sentence'", + "split": "This is a long sentence[br]that needs splitting for subtitles" +}} + +Input: 'Machine learning algorithms are becoming increasingly sophisticated and powerful in modern applications' +Expected Output JSON: +{{ + "analysis": "Technical sentence about ML, natural split after 'sophisticated'", + "split": "Machine learning algorithms are becoming increasingly sophisticated[br]and powerful in modern applications" +}} + +## Output Requirements +- MUST use [br] tags to mark split positions +- MUST return valid JSON format +- The "split" field MUST contain the complete sentence with [br] inserted at split points +- Do NOT split the sentence into separate parts, keep it as one string with [br] markers + ## Output in only JSON format {{ "analysis": "Brief analysis of the text structure", @@ -155,6 +176,13 @@ def get_prompt_faithfulness(lines, shared_prompt): 3. Understand the context: Fully comprehend and reflect the background and contextual relationships of the text. +### Response Format Requirements +- IMPORTANT: The response MUST be a single JSON object/dictionary, NOT a list +- Each line should be a key-value pair in the dictionary +- Keys should be the line numbers as strings +- DO NOT wrap the response in an array/list +- DO NOT include any explanation text outside the JSON structure + ## INPUT {lines} @@ -163,7 +191,10 @@ def get_prompt_faithfulness(lines, shared_prompt): ## Output in only JSON format {json.dumps(json_format, ensure_ascii=False, indent=4)} -Note: << >> represents placeholders that should not appear in your answer +Note: +1. << >> represents placeholders that should not appear in your answer +2. The output must be a SINGLE dictionary/object +3. The format should exactly match the example above ''' return prompt_faithfulness.strip() @@ -334,3 +365,79 @@ def get_correct_text_prompt(text): "text": "cleaned text here" }} '''.strip() + + +## ================================================================ +## @ batch_processor_get_title_introduction.py +def get_title_introduction_prompt(text): + return f''' +## Role +You are a professional video title and introduction generator for Bilibili platform. + +## Task +1. Extract the file path from the input (before first "||") +2. Extract the original title (between first and second "||") and get the lecture number +3. Analyze the SRT subtitle content (after second "||") to understand the video topic +4. Generate appropriate title and introduction based on the subtitle content + +## Requirements +1. Title must be concise and attractive for Chinese audience +2. Introduction should be engaging but not too verbose +3. Format should follow Bilibili style +4. Title must include the chapter number as prefix + +## Format Requirements +- Title format: 第X章:[核心主题] 关键词1-关键词2-关键词3 (总长度不超过35字) +- Introduction format: 至少400字的简洁介绍,要求有吸引力,并且段落清晰 +- Tags: 输出当前的tags + +## Examples +Good title: 第20章:[Raft算法] 日志复制-选举机制-一致性保证 +Good introduction: +🎲 精彩内容: + MIT教授用生动的赌博游戏演示概率论 + 深入解析著名的蒙提霍尔问题 + 揭示统计数据背后的真相与陷阱 + 探讨条件概率在现实生活中的应用 +🎯 核心知识点: + 条件概率的基本概念与计算 + 概率树方法的应用 + 容斥原理在概率中的运用 + 统计数据的正确解读方法 +💡 精彩案例: + 蒙提霍尔游戏的完整分析 + 伯克利性别歧视案例研究 + 航空公司准点率的统计陷阱 + 赌场骰子游戏的概率分析 +⏰ 重要时间点: + 00:00 课程介绍 + 05:23 蒙提霍尔问题详解 + 32:15 条件概率基础 + 45:30 统计陷阱案例分析 + 58:20 实际应用讨论 +🎓 适合人群: + 数学专业学生 + 概率论初学者 + 数据分析从业者 + 对统计学感兴趣的观众 +#数学教育 #概率论 #MIT公开课 #统计学 #数据分析 +Good tags: 数学教育,概率论,MIT公开课,统计学,数据分析 + +## INPUT Format +The input contains: file_path||original_title||srt_content +Where: +- file_path: The complete path to the subtitle file +- original_title: The original lecture title (e.g., "Lecture 20: Blockstack") +- srt_content: The subtitle content with timestamps and text + +## INPUT +{text} + +## Output in only JSON format +{{ + "file_path": "提取的完整文件路径", + "title": "第X章:[核心主题] 关键词1-关键词2-关键词3", + "introduction": "至少400字的简洁介绍,要求有吸引力,并且段落清晰,参考上面的例子", + "tags":"要有三个不同的标签,要求高度概括当前的视频的内容,根据字幕中的内容" +}} +''' \ No newline at end of file diff --git a/core/split_video_utils/split_video.py b/core/split_video_utils/split_video.py new file mode 100644 index 00000000..979f4cab --- /dev/null +++ b/core/split_video_utils/split_video.py @@ -0,0 +1,886 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +import os +import sys +import subprocess +import argparse +import json +from pathlib import Path +from rich.console import Console +from rich.panel import Panel +from rich.table import Table +from rich import print as rprint +from rich.prompt import Confirm + +# 创建控制台对象 +console = Console() + +def format_time(seconds): + """格式化时间显示""" + if seconds < 0: + return "0:00.000" + + hours = int(seconds // 3600) + minutes = int((seconds % 3600) // 60) + secs = seconds % 60 + + if hours > 0: + return f"{hours}:{minutes:02d}:{secs:06.3f}" + else: + return f"{minutes}:{secs:06.3f}" + +def get_video_duration(video_path): + """获取视频时长""" + try: + cmd = [ + 'ffprobe', + '-v', 'quiet', + '-print_format', 'json', + '-show_format', + video_path + ] + + result = subprocess.run(cmd, capture_output=True, text=True, timeout=30) + + if result.returncode == 0: + info = json.loads(result.stdout) + duration = float(info['format']['duration']) + return duration + else: + rprint(f"[red]❌ 获取视频时长失败[/red]") + return None + + except Exception as e: + rprint(f"[red]❌ 获取视频时长错误: {e}[/red]") + return None + +def check_demucs_installation(): + """检查Demucs是否安装""" + try: + result = subprocess.run(['python', '-c', 'import demucs'], + capture_output=True, text=True, timeout=10) + return result.returncode == 0 + except: + return False + +def extract_video_segment(input_path, start_time, duration, output_path): + """提取视频片段""" + cmd = [ + 'ffmpeg', + '-i', input_path, + '-ss', str(start_time), + '-t', str(duration), + '-c', 'copy', + output_path, + '-y' + ] + + try: + subprocess.run(cmd, check=True, capture_output=True, timeout=60) + return True + except Exception as e: + rprint(f"[red]❌ 视频片段提取失败: {e}[/red]") + return False + +def extract_audio_from_video(video_path, output_audio_path): + """从视频中提取音频""" + cmd = [ + 'ffmpeg', + '-i', video_path, + '-vn', + '-acodec', 'libmp3lame', + '-ab', '192k', + '-ar', '44100', + output_audio_path, + '-y' + ] + + try: + subprocess.run(cmd, check=True, capture_output=True, timeout=30) + return True + except Exception as e: + rprint(f"[red]❌ 音频提取失败: {e}[/red]") + return False +def separate_vocals_with_demucs(audio_path, output_dir): + """使用Demucs分离人声""" + try: + # 检查输入文件 + if not os.path.exists(audio_path): + rprint(f"[red]❌ 音频文件不存在: {audio_path}[/red]") + return None + + file_size = os.path.getsize(audio_path) + rprint(f"[cyan] 📁 音频文件: {os.path.basename(audio_path)} ({file_size/1024:.1f}KB)[/cyan]") + + # 创建临时目录 + temp_dir = os.path.join(output_dir, "demucs_temp") + os.makedirs(temp_dir, exist_ok=True) + rprint(f"[cyan] 📂 临时目录: {temp_dir}[/cyan]") + + # 运行Demucs + cmd = [ + 'python', '-m', 'demucs.separate', + '--two-stems=vocals', + '-o', temp_dir, + audio_path + ] + + rprint(f"[cyan] 🎤 开始分离人声...[/cyan]") + rprint(f"[dim] 命令: {' '.join(cmd)}[/dim]") + + with console.status("[yellow]🎤 分离人声中...", spinner="dots"): + result = subprocess.run(cmd, capture_output=True, text=True, timeout=300) + + rprint(f"[cyan] 📊 Demucs返回码: {result.returncode}[/cyan]") + + if result.returncode != 0: + rprint(f"[red]❌ Demucs执行失败[/red]") + rprint(f"[red]stderr: {result.stderr}[/red]") + rprint(f"[red]stdout: {result.stdout}[/red]") + return None + + # 查找输出文件 + audio_name = os.path.splitext(os.path.basename(audio_path))[0] + rprint(f"[cyan] 🔍 查找输出文件,音频名: {audio_name}[/cyan]") + + vocals_path = None + all_files = [] + + # 搜索输出文件 + for root, dirs, files in os.walk(temp_dir): + rprint(f"[dim] 搜索目录: {root}[/dim]") + for file in files: + full_path = os.path.join(root, file) + all_files.append(full_path) + rprint(f"[dim] 文件: {file}[/dim]") + + if 'vocals' in file.lower() and audio_name in file: + vocals_path = full_path + rprint(f"[green] ✓ 找到人声文件: {file}[/green]") + break + + if not vocals_path: + rprint(f"[yellow]⚠️ 未找到匹配的人声文件[/yellow]") + rprint(f"[yellow]期望包含: 'vocals' 和 '{audio_name}'[/yellow]") + rprint(f"[yellow]所有文件:[/yellow]") + for f in all_files: + rprint(f"[dim] - {f}[/dim]") + + # 尝试查找任何包含vocals的文件 + for f in all_files: + if 'vocals' in os.path.basename(f).lower(): + vocals_path = f + rprint(f"[yellow] 🔄 使用备选文件: {os.path.basename(f)}[/yellow]") + break + + if not vocals_path: + rprint(f"[red]❌ 完全找不到人声文件[/red]") + return None + + # 检查找到的文件 + if not os.path.exists(vocals_path): + rprint(f"[red]❌ 人声文件不存在: {vocals_path}[/red]") + return None + + vocals_size = os.path.getsize(vocals_path) + rprint(f"[green] ✓ 人声文件大小: {vocals_size/1024:.1f}KB[/green]") + + if vocals_size < 1024: # 小于1KB可能是空文件 + rprint(f"[yellow]⚠️ 人声文件太小,可能分离失败[/yellow]") + + # 移动到输出目录 + final_vocals_path = os.path.join(output_dir, f"{audio_name}_vocals.mp3") + rprint(f"[cyan] 📁 目标路径: {final_vocals_path}[/cyan]") + + # 转换为mp3格式 + if vocals_path.endswith('.wav'): + rprint(f"[cyan] 🔄 转换WAV到MP3[/cyan]") + convert_cmd = [ + 'ffmpeg', '-i', vocals_path, + '-acodec', 'libmp3lame', + '-ab', '192k', + final_vocals_path, '-y' + ] + convert_result = subprocess.run(convert_cmd, capture_output=True, text=True, timeout=60) + + if convert_result.returncode != 0: + rprint(f"[red]❌ 格式转换失败[/red]") + rprint(f"[red]stderr: {convert_result.stderr}[/red]") + return None + else: + rprint(f"[cyan] 📋 复制文件[/cyan]") + import shutil + shutil.copy2(vocals_path, final_vocals_path) + + # 验证最终文件 + if os.path.exists(final_vocals_path): + final_size = os.path.getsize(final_vocals_path) + rprint(f"[green] ✅ 人声分离完成: {os.path.basename(final_vocals_path)} ({final_size/1024:.1f}KB)[/green]") + + # 清理临时目录 + try: + import shutil + shutil.rmtree(temp_dir) + rprint(f"[dim] 🧹 清理临时目录[/dim]") + except: + pass + + return final_vocals_path + else: + rprint(f"[red]❌ 最终文件创建失败[/red]") + return None + + except subprocess.TimeoutExpired: + rprint(f"[red]❌ Demucs执行超时 (>300秒)[/red]") + return None + except Exception as e: + rprint(f"[red]❌ 人声分离错误: {e}[/red]") + import traceback + rprint(f"[red]详细错误: {traceback.format_exc()}[/red]") + return None + +def detect_speech_pauses_in_segment(vocals_path): + """检测音频片段中的人声停顿""" + speech_configs = [ + (-15, 0.05, "词间停顿(-15dB, 50ms)", "词间"), + (-18, 0.05, "短句停顿(-18dB, 50ms)", "短句"), + (-20, 0.05, "句间停顿(-20dB, 50ms)", "句间"), + (-25, 0.05, "段落停顿(-25dB, 50ms)", "段落"), + (-15, 0.1, "词间停顿(-15dB, 100ms)", "词间"), + (-18, 0.1, "短句停顿(-18dB, 100ms)", "短句"), + (-20, 0.1, "句间停顿(-20dB, 100ms)", "句间"), + (-25, 0.1, "段落停顿(-25dB, 100ms)", "段落"), + (-15, 0.15, "长词间(-15dB, 150ms)", "长词间"), + (-18, 0.15, "长句间(-18dB, 150ms)", "长句间"), + (-20, 0.15, "自然停顿(-20dB, 150ms)", "自然"), + ] + + all_results = [] + + for noise_db, min_duration, desc, pause_type in speech_configs: + cmd = [ + 'ffmpeg', + '-i', vocals_path, + '-af', f'silencedetect=noise={noise_db}dB:duration={min_duration}', + '-f', 'null', + '-', + '-v', 'info' + ] + + try: + result = subprocess.run(cmd, capture_output=True, text=True, timeout=15) + + silence_periods = [] + current_silence_start = None + + for line in result.stderr.split('\n'): + line = line.strip() + + if 'silence_start:' in line: + try: + start_part = line.split('silence_start:')[1].strip() + silence_start = float(start_part.split()[0]) + current_silence_start = silence_start + except: + continue + + elif 'silence_end:' in line and current_silence_start is not None: + try: + parts = line.split('silence_end:')[1] + + if '|' in parts: + end_part = parts.split('|')[0].strip() + duration_part = parts.split('silence_duration:')[1].strip() + silence_end = float(end_part) + silence_duration = float(duration_part) + else: + silence_end = float(parts.strip()) + silence_duration = silence_end - current_silence_start + + if silence_duration >= min_duration: + silence_periods.append({ + 'start': current_silence_start, + 'end': silence_end, + 'duration': silence_duration, + 'center': (current_silence_start + silence_end) / 2, + 'type': pause_type + }) + current_silence_start = None + except: + continue + + # 按停顿时长分类 + micro_pauses = [s for s in silence_periods if 0.05 <= s['duration'] < 0.1] + short_pauses = [s for s in silence_periods if 0.1 <= s['duration'] < 0.2] + medium_pauses = [s for s in silence_periods if 0.2 <= s['duration'] < 0.5] + long_pauses = [s for s in silence_periods if s['duration'] >= 0.5] + + result_info = { + 'config': (noise_db, min_duration, desc, pause_type), + 'silences': silence_periods, + 'count': len(silence_periods), + 'micro': len(micro_pauses), + 'short': len(short_pauses), + 'medium': len(medium_pauses), + 'long': len(long_pauses) + } + all_results.append(result_info) + + except Exception as e: + continue + + return all_results + +# ==================== 主要功能函数 ==================== +def generate_cut_plan(input_video_path, output_dir, target_interval=30): + """ + 函数1: 生成切分计划 + 输入长视频,每隔30分钟进行切分检测,输出执行计划 + """ + rprint(Panel.fit("[bold magenta]🎯 生成智能切分计划[/bold magenta]", border_style="magenta")) + + # 检查文件和环境 + if not os.path.exists(input_video_path): + rprint(f"[bold red]❌ 文件不存在: {input_video_path}[/bold red]") + return None + + if not check_demucs_installation(): + rprint("[red]❌ Demucs未安装,请运行: pip install demucs[/red]") + return None + + # 获取视频信息 + total_duration = get_video_duration(input_video_path) + if total_duration is None: + return None + + rprint(f"[green]✓ 视频文件[/green]: [cyan]{os.path.basename(input_video_path)}[/cyan]") + rprint(f"[green]✓ 视频时长[/green]: [yellow]{format_time(total_duration)}[/yellow]") + + # 创建输出目录 + os.makedirs(output_dir, exist_ok=True) + + # 💾 定义保存文件路径 + progress_file = os.path.join(output_dir, "cut_progress.json") + plan_file = os.path.join(output_dir, "cut_plan.json") + + # 计算检测点 + interval_seconds = target_interval * 60 + detection_points = [] + + current_time = interval_seconds + while current_time < total_duration: + detection_points.append(current_time) + current_time += interval_seconds + + if not detection_points: + rprint(f"[yellow]⚠️ 视频时长不足{target_interval}分钟,无需切分[/yellow]") + # 返回单段计划 + plan = { + 'input_video': input_video_path, + 'total_duration': total_duration, + 'target_interval': target_interval, + 'cut_points': [], + 'segments': [{ + 'index': 1, + 'start': 0, + 'end': total_duration, + 'duration': total_duration, + 'cut_type': 'whole' + }] + } + return plan + + # 🔄 检查是否有已保存的进度 + cut_points = [] + start_index = 0 + + if os.path.exists(progress_file): + try: + with open(progress_file, 'r', encoding='utf-8') as f: + progress_data = json.load(f) + + # 验证进度文件是否匹配当前任务 + if (progress_data.get('input_video') == input_video_path and + abs(progress_data.get('total_duration', 0) - total_duration) < 1): + + cut_points = progress_data.get('completed_cut_points', []) + start_index = len(cut_points) + + if start_index > 0: + rprint(f"[green]🔄 发现已有进度: 已完成 {start_index}/{len(detection_points)} 个切分点[/green]") + for point in cut_points: + rprint(f"[dim] ✓ {format_time(point['target'])} -> {format_time(point['actual'])}[/dim]") + except: + rprint(f"[yellow]⚠️ 无法加载进度文件,重新开始[/yellow]") + + rprint(f"[cyan]📍 计划检测 {len(detection_points)} 个切分点[/cyan]") + + # 对每个检测点进行分析 + try: + for i, target_time in enumerate(detection_points): + # 跳过已完成的点 + if i < start_index: + continue + + rprint(f"\n[yellow]🎯 分析切分点 {i+1}/{len(detection_points)} (目标: {format_time(target_time)})[/yellow]") + + cut_point = detect_optimal_cut_point( + input_video_path, + target_time, + total_duration, + output_dir, + i+1 + ) + + if cut_point: + cut_points.append(cut_point) + rprint(f"[green]✅ 找到切分点: {format_time(cut_point['actual'])} (偏差: {cut_point['deviation']:+.1f}s)[/green]") + else: + # 使用备选点 + fallback_point = { + 'target': target_time, + 'actual': target_time, + 'deviation': 0, + 'silence_duration': 0, + 'silence_type': 'fallback', + 'confidence': 'low' + } + cut_points.append(fallback_point) + rprint(f"[yellow]⚠️ 使用备选点: {format_time(target_time)}[/yellow]") + + # 💾 每完成一个点就保存进度 + try: + progress_data = { + 'input_video': input_video_path, + 'total_duration': total_duration, + 'target_interval': target_interval, + 'completed_cut_points': cut_points, + 'progress': f"{len(cut_points)}/{len(detection_points)}" + } + with open(progress_file, 'w', encoding='utf-8') as f: + json.dump(progress_data, f, ensure_ascii=False, indent=2) + rprint(f"[dim]💾 进度已保存 ({len(cut_points)}/{len(detection_points)})[/dim]") + except: + pass + + except KeyboardInterrupt: + rprint(f"\n[yellow]⚠️ 用户中断,进度已保存,可重新运行继续[/yellow]") + return None + + # 生成段落信息 + segments = [] + + # 第一段:从开始到第一个切分点 + if cut_points: + segments.append({ + 'index': 1, + 'start': 0, + 'end': cut_points[0]['actual'], + 'duration': cut_points[0]['actual'], + 'cut_type': 'start' + }) + + # 中间段落 + for i in range(len(cut_points) - 1): + segments.append({ + 'index': i + 2, + 'start': cut_points[i]['actual'], + 'end': cut_points[i + 1]['actual'], + 'duration': cut_points[i + 1]['actual'] - cut_points[i]['actual'], + 'cut_type': 'middle' + }) + + # 最后一段:从最后一个切分点到结束 + segments.append({ + 'index': len(cut_points) + 1, + 'start': cut_points[-1]['actual'], + 'end': total_duration, + 'duration': total_duration - cut_points[-1]['actual'], + 'cut_type': 'end' + }) + + # 创建切分计划 + plan = { + 'input_video': input_video_path, + 'total_duration': total_duration, + 'target_interval': target_interval, + 'cut_points': cut_points, + 'segments': segments + } + + # 保存计划到文件 + with open(plan_file, 'w', encoding='utf-8') as f: + json.dump(plan, f, ensure_ascii=False, indent=2) + + rprint(f"[green]✓ 切分计划已保存: {plan_file}[/green]") + + # 🧹 完成后清理进度文件 + try: + if os.path.exists(progress_file): + os.remove(progress_file) + except: + pass + + return plan + +def detect_optimal_cut_point(input_video_path, target_time, total_duration, output_dir, point_index): + """ + 函数2: 切分检测函数 (简化版) + 在指定时间点附近检测最佳切分位置 + - 使用30秒分析窗口 + - 只检测-25dB以下的静音 + - 选择窗口内最后一个静音点作为切分点 + """ + # 定义分析窗口:目标时间前后各30秒 + window_size = 30 # 30秒 + start_time = max(0, target_time - window_size) + end_time = min(total_duration, target_time + window_size) + analysis_duration = end_time - start_time + + rprint(f"[cyan] 📊 分析窗口: {format_time(start_time)} - {format_time(end_time)} (±{window_size}s)[/cyan]") + + # 提取分析片段 + segment_path = os.path.join(output_dir, f"temp_segment_{point_index}.mp4") + if not extract_video_segment(input_video_path, start_time, analysis_duration, segment_path): + rprint(f"[yellow] ⚠️ 提取片段失败,使用目标时间[/yellow]") + return { + 'target': target_time, + 'actual': target_time, + 'deviation': 0, + 'silence_duration': 0, + 'silence_type': 'fallback', + 'confidence': 'low', + 'reason': 'extract_failed' + } + + # 提取音频 + audio_path = os.path.join(output_dir, f"temp_audio_{point_index}.mp3") + if not extract_audio_from_video(segment_path, audio_path): + rprint(f"[yellow] ⚠️ 提取音频失败,使用目标时间[/yellow]") + if os.path.exists(segment_path): + os.remove(segment_path) + return { + 'target': target_time, + 'actual': target_time, + 'deviation': 0, + 'silence_duration': 0, + 'silence_type': 'fallback', + 'confidence': 'low', + 'reason': 'audio_failed' + } + + # 分离人声 + vocals_path = separate_vocals_with_demucs(audio_path, output_dir) + if not vocals_path: + rprint(f"[yellow] ⚠️ 人声分离失败,使用目标时间[/yellow]") + for temp_file in [segment_path, audio_path]: + if os.path.exists(temp_file): + os.remove(temp_file) + return { + 'target': target_time, + 'actual': target_time, + 'deviation': 0, + 'silence_duration': 0, + 'silence_type': 'fallback', + 'confidence': 'low', + 'reason': 'vocals_failed' + } + + # 检测30秒窗口内的所有静音段:-25dB,最小时长50ms + rprint(f"[cyan] 🔍 检测30秒窗口内的静音段 (-25dB, ≥50ms)[/cyan]") + + cmd = [ + 'ffmpeg', + '-i', vocals_path, + '-af', 'silencedetect=noise=-25dB:duration=0.05', + '-f', 'null', + '-', + '-v', 'info' + ] + + silences = [] + try: + result = subprocess.run(cmd, capture_output=True, text=True, timeout=30) + + current_silence_start = None + + for line in result.stderr.split('\n'): + line = line.strip() + + # 解析 silence_start + if 'silence_start:' in line: + try: + start_part = line.split('silence_start:')[1].strip() + silence_start = float(start_part.split()[0]) + current_silence_start = silence_start + except Exception: + continue + + # 解析 silence_end + elif 'silence_end:' in line and current_silence_start is not None: + try: + parts = line.split('silence_end:')[1] + + if '|' in parts: + end_part = parts.split('|')[0].strip() + duration_part = parts.split('silence_duration:')[1].strip() + silence_end = float(end_part) + silence_duration = float(duration_part) + else: + silence_end = float(parts.strip()) + silence_duration = silence_end - current_silence_start + + if silence_duration >= 0.05: # 至少50ms + silences.append({ + 'start': current_silence_start, + 'end': silence_end, + 'duration': silence_duration, + 'center': (current_silence_start + silence_end) / 2, + 'absolute_center': start_time + (current_silence_start + silence_end) / 2, + 'type': 'detected' + }) + + current_silence_start = None + + except Exception: + continue + + except Exception as e: + rprint(f"[red] ❌ 静音检测失败: {e}[/red]") + silences = [] + + if not silences: + rprint(f"[yellow] ⚠️ 未检测到静音段,使用目标时间[/yellow]") + # 清理临时文件 + for temp_file in [segment_path, audio_path, vocals_path]: + if os.path.exists(temp_file): + os.remove(temp_file) + return { + 'target': target_time, + 'actual': target_time, + 'deviation': 0, + 'silence_duration': 0, + 'silence_type': 'fallback', + 'confidence': 'low', + 'reason': 'no_silences' + } + + rprint(f"[green] ✓ 检测到 {len(silences)} 个静音段[/green]") + + # 显示所有静音段的信息 + for i, silence in enumerate(silences): + rprint(f" {i+1}. {format_time(silence['absolute_center'])} (时长: {silence['duration']*1000:.0f}ms)") + + # 选择最后一个静音段作为切分点 + last_silence = silences[-1] + absolute_time = last_silence['absolute_center'] + + best_candidate = { + 'target': target_time, + 'actual': absolute_time, + 'deviation': absolute_time - target_time, + 'silence_duration': last_silence['duration'], + 'silence_type': last_silence['type'], + 'confidence': 'high', + 'strategy': 'last_silence', + 'total_silences': len(silences) + } + + # 清理临时文件 + for temp_file in [segment_path, audio_path, vocals_path]: + if os.path.exists(temp_file): + os.remove(temp_file) + + # 输出结果 + rprint(f"[green] ✅ 选择最后一个静音段: {format_time(absolute_time)} | " + f"偏差: {best_candidate['deviation']:+.1f}s | " + f"静音: {best_candidate['silence_duration']*1000:.0f}ms | " + f"总静音段: {len(silences)}个[/green]") + + return best_candidate + +def execute_cut_plan(plan, output_dir): + """ + 函数3: 执行切分计划 + 根据切分计划实际切分视频 + """ + rprint(Panel.fit("[bold green]🚀 执行视频切分[/bold green]", border_style="green")) + + input_video = plan['input_video'] + segments = plan['segments'] + + if not os.path.exists(input_video): + rprint(f"[red]❌ 源视频文件不存在: {input_video}[/red]") + return False + + # 创建输出目录 + segments_dir = os.path.join(output_dir, "segments") + os.makedirs(segments_dir, exist_ok=True) + + rprint(f"[cyan]📁 输出目录: {segments_dir}[/cyan]") + rprint(f"[cyan]🎬 开始切分 {len(segments)} 个片段...[/cyan]") + + success_count = 0 + + for segment in segments: + segment_name = f"segment_{segment['index']:02d}.mp4" + output_path = os.path.join(segments_dir, segment_name) + + rprint(f"\n[yellow]✂️ 切分片段 {segment['index']}: {format_time(segment['start'])} - {format_time(segment['end'])}[/yellow]") + + cmd = [ + 'ffmpeg', + '-i', input_video, + '-ss', str(segment['start']), + '-t', str(segment['duration']), + '-c', 'copy', + output_path, + '-y' + ] + + try: + with console.status(f"[yellow]处理片段 {segment['index']}...", spinner="dots"): + result = subprocess.run(cmd, capture_output=True, text=True, timeout=300) + + if result.returncode == 0: + file_size = os.path.getsize(output_path) / 1024 / 1024 # MB + rprint(f"[green]✅ 片段 {segment['index']} 完成: {segment_name} ({file_size:.1f}MB)[/green]") + success_count += 1 + else: + rprint(f"[red]❌ 片段 {segment['index']} 失败: {result.stderr}[/red]") + + except subprocess.TimeoutExpired: + rprint(f"[red]❌ 片段 {segment['index']} 超时[/red]") + except Exception as e: + rprint(f"[red]❌ 片段 {segment['index']} 错误: {e}[/red]") + + # 生成切分报告 + report_file = os.path.join(output_dir, "cut_report.txt") + with open(report_file, 'w', encoding='utf-8') as f: + f.write("视频切分报告\n") + f.write("=" * 50 + "\n\n") + f.write(f"源视频: {os.path.basename(input_video)}\n") + f.write(f"总时长: {format_time(plan['total_duration'])}\n") + f.write(f"目标间隔: {plan['target_interval']} 分钟\n") + f.write(f"切分点数: {len(plan['cut_points'])}\n") + f.write(f"生成片段: {len(segments)}\n") + f.write(f"成功片段: {success_count}\n") + f.write(f"成功率: {success_count/len(segments)*100:.1f}%\n\n") + + f.write("片段详情:\n") + f.write("-" * 30 + "\n") + for segment in segments: + f.write(f"片段 {segment['index']:2d}: {format_time(segment['start'])} - {format_time(segment['end'])} ({format_time(segment['duration'])})\n") + + rprint(f"\n[green]🎉 切分完成! 成功: {success_count}/{len(segments)}[/green]") + rprint(f"[cyan]📋 报告已保存: {report_file}[/cyan]") + + return success_count == len(segments) + +def display_cut_plan(plan): + """显示切分计划的详细信息""" + rprint(Panel.fit("[bold blue]📋 切分计划预览[/bold blue]", border_style="blue")) + + # 基本信息 + rprint(f"[green]📁 源视频[/green]: {os.path.basename(plan['input_video'])}") + rprint(f"[green]⏱️ 总时长[/green]: {format_time(plan['total_duration'])}") + rprint(f"[green]🎯 目标间隔[/green]: {plan['target_interval']} 分钟") + rprint(f"[green]✂️ 切分点[/green]: {len(plan['cut_points'])} 个") + rprint(f"[green]📹 生成片段[/green]: {len(plan['segments'])} 个") + + # 切分点详情 + if plan['cut_points']: + rprint(f"\n[cyan]🎯 切分点详情:[/cyan]") + for i, cp in enumerate(plan['cut_points']): + confidence_color = "green" if cp.get('confidence') == 'high' else "yellow" if cp.get('confidence') == 'medium' else "red" + rprint(f" {i+1}. {format_time(cp['actual'])} (偏差: {cp['deviation']:+.1f}s, 类型: {cp['silence_type']}, 置信度: [{confidence_color}]{cp.get('confidence', 'unknown')}[/{confidence_color}])") + + # 段落预览表格 + rprint(f"\n[cyan]📹 段落预览:[/cyan]") + table = Table(show_header=True, header_style="bold magenta") + table.add_column("片段", style="dim", width=6) + table.add_column("开始时间", style="cyan") + table.add_column("结束时间", style="cyan") + table.add_column("时长", style="yellow") + table.add_column("类型", style="green") + + for segment in plan['segments']: + table.add_row( + f"{segment['index']:02d}", + format_time(segment['start']), + format_time(segment['end']), + format_time(segment['duration']), + segment['cut_type'] + ) + + console.print(table) + +def main(): + """主函数:组装调用逻辑""" + parser = argparse.ArgumentParser(description="智能视频切分工具") + parser.add_argument("--input", "-i", required=True, help="输入视频文件") + parser.add_argument("--output", "-o", required=True, help="输出目录") + parser.add_argument("--interval", "-t", type=int, default=30, help="目标切分间隔(分钟)") + parser.add_argument("--auto", "-a", action="store_true", help="自动执行,不询问确认") + + args = parser.parse_args() + + # 步骤1: 生成切分计划 + rprint("[bold cyan]步骤 1/3: 生成切分计划[/bold cyan]") + plan = generate_cut_plan(args.input, args.output, args.interval) + + if not plan: + rprint("[red]❌ 生成切分计划失败[/red]") + return + + # 步骤2: 显示计划并确认 + rprint(f"\n[bold cyan]步骤 2/3: 预览切分计划[/bold cyan]") + display_cut_plan(plan) + + # 询问用户确认 + if not args.auto: + if not Confirm.ask("\n[bold yellow]是否确认执行切分计划?[/bold yellow]"): + rprint("[yellow]❌ 用户取消操作[/yellow]") + return + + # 步骤3: 执行切分 + rprint(f"\n[bold cyan]步骤 3/3: 执行视频切分[/bold cyan]") + success = execute_cut_plan(plan, args.output) + + if success: + rprint(Panel( + "[bold green]🎉 视频切分完成![/bold green]\n\n" + f"• 源视频: {os.path.basename(plan['input_video'])}\n" + f"• 生成片段: {len(plan['segments'])} 个\n" + f"• 输出目录: {args.output}/segments\n" + f"• 切分报告: {args.output}/cut_report.txt", + title="✨ 完成", + border_style="green" + )) + else: + rprint("[red]❌ 视频切分过程中出现错误[/red]") + +if __name__ == "__main__": + if len(sys.argv) > 1: + main() + else: + # 直接调用示例 + input_video = "/Users/luogaiyu/code/VideoLingo/videos/Learn Solidity Smart Contract Development | Full 2024 Cyfrin Updraft Course.webm" + output_directory = "/Users/luogaiyu/code/VideoLingo/output/smart_cut_test" + + # 步骤1: 生成切分计划 + rprint("[bold cyan]步骤 1/3: 生成切分计划[/bold cyan]") + plan = generate_cut_plan(input_video, output_directory, target_interval=30) + + if plan: + # 步骤2: 显示计划 + rprint(f"\n[bold cyan]步骤 2/3: 预览切分计划[/bold cyan]") + display_cut_plan(plan) + + # 步骤3: 询问确认并执行 + if Confirm.ask("\n[bold yellow]是否确认执行切分计划?[/bold yellow]"): + rprint(f"\n[bold cyan]步骤 3/3: 执行视频切分[/bold cyan]") + execute_cut_plan(plan, output_directory) + else: + rprint("[yellow]用户取消操作[/yellow]") \ No newline at end of file diff --git a/core/split_video_utils/video_analyse.py b/core/split_video_utils/video_analyse.py new file mode 100644 index 00000000..284c1534 --- /dev/null +++ b/core/split_video_utils/video_analyse.py @@ -0,0 +1,272 @@ +import numpy as np +import librosa +import warnings + +warnings.filterwarnings('ignore') + +def create_terminal_30s_timeline(audio_path): + """在终端内显示30秒音频时间线""" + + print("🎵 Loading 30 seconds of audio...") + + # 加载音频 + y, sr = librosa.load(audio_path, sr=22050, duration=30.0) + hop_length = int(0.01 * sr) + frame_length = hop_length * 4 + + rms_energy = librosa.feature.rms(y=y, frame_length=frame_length, hop_length=hop_length)[0] + rms_db = librosa.amplitude_to_db(rms_energy, ref=np.max) + time_frames = librosa.frames_to_time(np.arange(len(rms_energy)), sr=sr, hop_length=hop_length) + + print(f"✓ Loaded: {len(y)/sr:.2f}s, Generated {len(rms_db)} data points") + + # 创建终端ASCII图表 + print("\n" + "="*100) + print(" 30-SECOND AUDIO dB TIMELINE") + print("="*100) + + # 图表参数 + width = 90 # 90个字符宽度,每个字符代表约0.33秒 + height = 25 # 25行高度 + + min_db = np.min(rms_db) + max_db = np.max(rms_db) + + # 绘制主图表 + for row in range(height): + line = "" + db_level = max_db - (row / height) * (max_db - min_db) + + # 添加dB标签 + db_label = f"{db_level:6.1f}dB |" + + for col in range(width): + time_idx = int((col / width) * len(rms_db)) + if time_idx < len(rms_db): + current_db = rms_db[time_idx] + + if current_db >= db_level: + line += "█" # 实心块 + elif current_db >= db_level - 1: + line += "▓" # 深灰 + elif current_db >= db_level - 2: + line += "▒" # 中灰 + elif current_db >= db_level - 3: + line += "░" # 浅灰 + else: + line += " " # 空白 + else: + line += " " + + # 添加阈值标记 + threshold_mark = "" + if abs(db_level - (-20)) < 1: + threshold_mark = " ← -20dB (Strict)" + elif abs(db_level - (-25)) < 1: + threshold_mark = " ← -25dB (Normal)" + elif abs(db_level - (-30)) < 1: + threshold_mark = " ← -30dB (Sensitive)" + elif abs(db_level - (-35)) < 1: + threshold_mark = " ← -35dB (Ultra)" + + print(db_label + line + "|" + threshold_mark) + + # 时间轴 + time_axis = " |" + for i in range(0, width, 15): # 每15个字符一个时间标记 + time_val = (i / width) * 30 + time_axis += f"{time_val:4.0f}s" + " " * 11 + print(time_axis) + + # 底部标尺 + scale_line = " |" + for i in range(0, width, 5): + if i % 15 == 0: + scale_line += "|" + else: + scale_line += "." + print(scale_line) + + print("="*100) + + # 静音检测可视化 + print("\n" + "="*100) + print(" SILENCE DETECTION TIMELINE") + print("="*100) + + # 创建静音检测图 + silence_levels = [ + (-20, "🔴", "STRICT"), + (-25, "🟠", "NORMAL"), + (-30, "🟢", "SENSITIVE"), + (-35, "🟣", "ULTRA") + ] + + for threshold, emoji, name in silence_levels: + line = f"{name:>10} {threshold:3d}dB |" + + for col in range(width): + time_idx = int((col / width) * len(rms_db)) + if time_idx < len(rms_db): + if rms_db[time_idx] < threshold: + line += "█" # 静音 + else: + line += "░" # 活跃 + else: + line += " " + + # 计算静音百分比 + silent_frames = np.sum(rms_db < threshold) + silent_percent = (silent_frames / len(rms_db)) * 100 + + line += f"| {silent_percent:5.1f}% silent" + print(line) + + # 时间轴(重复) + time_axis = " |" + for i in range(0, width, 15): + time_val = (i / width) * 30 + time_axis += f"{time_val:4.0f}s" + " " * 11 + print(time_axis) + + print("="*100) + + return time_frames, rms_db + +def show_30s_detailed_analysis(time_frames, rms_db): + """显示详细的30秒分析""" + + print("\n" + "🔍 DETAILED 30-SECOND ANALYSIS") + print("="*80) + + # 基本统计 + max_db = np.max(rms_db) + min_db = np.min(rms_db) + mean_db = np.mean(rms_db) + std_db = np.std(rms_db) + + print(f"📊 BASIC STATISTICS:") + print(f" Max dB: {max_db:7.2f} dB") + print(f" Min dB: {min_db:7.2f} dB") + print(f" Mean dB: {mean_db:7.2f} dB") + print(f" Std Dev: {std_db:7.2f} dB") + print(f" Range: {max_db - min_db:7.2f} dB") + + # 每秒分析 + print(f"\n⏱️ SECOND-BY-SECOND ANALYSIS:") + print("-" * 60) + print(f"{'Second':<8} {'Avg dB':<8} {'Min dB':<8} {'Max dB':<8} {'Status':<12}") + print("-" * 60) + + for sec in range(30): + start_idx = np.argmin(np.abs(time_frames - sec)) + end_idx = np.argmin(np.abs(time_frames - (sec + 1))) + + if end_idx > start_idx: + sec_data = rms_db[start_idx:end_idx] + avg_db = np.mean(sec_data) + min_sec_db = np.min(sec_data) + max_sec_db = np.max(sec_data) + + # 状态判断 + if avg_db < -35: + status = "VERY QUIET" + elif avg_db < -30: + status = "QUIET" + elif avg_db < -25: + status = "MEDIUM" + elif avg_db < -15: + status = "LOUD" + else: + status = "VERY LOUD" + + print(f"{sec:2d}s {avg_db:6.1f} {min_sec_db:6.1f} {max_sec_db:6.1f} {status}") + + # 活跃时段检测 + print(f"\n🎵 ACTIVE PERIODS (> -30dB):") + print("-" * 40) + + active_mask = rms_db > -30 + in_active = False + active_start = 0 + active_periods = [] + + for i, is_active in enumerate(active_mask): + if is_active and not in_active: + active_start = i + in_active = True + elif not is_active and in_active: + active_end = i - 1 + duration = time_frames[active_end] - time_frames[active_start] + if duration > 0.1: # 只显示超过0.1秒的活跃段 + active_periods.append({ + 'start': time_frames[active_start], + 'end': time_frames[active_end], + 'duration': duration, + 'peak_db': np.max(rms_db[active_start:active_end]) + }) + in_active = False + + # 处理最后一段 + if in_active: + duration = time_frames[-1] - time_frames[active_start] + if duration > 0.1: + active_periods.append({ + 'start': time_frames[active_start], + 'end': time_frames[-1], + 'duration': duration, + 'peak_db': np.max(rms_db[active_start:]) + }) + + if active_periods: + for i, period in enumerate(active_periods[:10]): # 显示前10个 + print(f"{i+1:2d}. {period['start']:6.2f}s - {period['end']:6.2f}s " + f"({period['duration']:5.2f}s) Peak: {period['peak_db']:6.1f}dB") + else: + print(" No significant active periods found") + + print("="*80) + +def terminal_30s_complete_analysis(audio_path): + """完整的终端30秒分析""" + + print("🎵 COMPLETE 30-SECOND AUDIO ANALYSIS") + print(f"📁 File: {audio_path}") + print("="*100) + + # 1. 创建终端时间线 + time_frames, rms_db = create_terminal_30s_timeline(audio_path) + + # 2. 详细分析 + show_30s_detailed_analysis(time_frames, rms_db) + + # 3. 推荐设置 + mean_db = np.mean(rms_db) + print(f"\n💡 RECOMMENDATIONS:") + print("-" * 30) + + if mean_db < -40: + print(" 🔇 Audio is very quiet - use -35dB threshold") + print(" 📝 Consider audio enhancement") + elif mean_db < -30: + print(" 🔉 Audio is quiet - use -30dB threshold") + elif mean_db < -20: + print(" 🔊 Audio is normal - use -25dB threshold (recommended)") + else: + print(" 📢 Audio is loud - use -20dB threshold") + + print("="*100) + + return time_frames, rms_db + +# 使用方法 +if __name__ == "__main__": + audio_file = "/home/darkchunk/code/VideoLingo/output/test_segments/segment_start_audio_vocals.mp3" + terminal_30s_complete_analysis(audio_file) + + +# 切分点 1: 2.75s (在1.5s-4.0s静音段的中点) +# 切分点 2: 7.25s (在6.5s-8.0s静音段的中点) +# 切分点 3: 13.25s (在12.5s-14.0s静音段的中点) +# 切分点 4: 18.25s (在17.5s-19.0s静音段的中点) +# 切分点 5: 23.0s (在22.0s-24.0s静音段的中点) \ No newline at end of file diff --git a/core/step1_ytdlp.py b/core/step1_ytdlp.py index f79d88e5..4f583ca5 100644 --- a/core/step1_ytdlp.py +++ b/core/step1_ytdlp.py @@ -6,23 +6,21 @@ from core.config_utils import load_key def sanitize_filename(filename): - # Remove or replace illegal characters filename = re.sub(r'[<>:"/\\|?*]', '', filename) - # Ensure filename doesn't start or end with a dot or space filename = filename.strip('. ') - # Use default name if filename is empty return filename if filename else 'video' def download_video_ytdlp(url, save_path='output', resolution='1080', cutoff_time=None): allowed_resolutions = ['360', '1080', 'best'] - if resolution not in allowed_resolutions: - resolution = '360' - os.makedirs(save_path, exist_ok=True) + # 配置 yt-dlp 选项 ydl_opts = { + # 选择最佳视频和音频格式 'format': 'bestvideo+bestaudio/best' if resolution == 'best' else f'bestvideo[height<={resolution}]+bestaudio/best[height<={resolution}]', + # 输出模板 'outtmpl': f'{save_path}/%(title)s.%(ext)s', 'noplaylist': True, + # 写入缩略图 'writethumbnail': True, 'postprocessors': [{ 'key': 'FFmpegThumbnailsConvertor', @@ -55,7 +53,7 @@ def download_video_ytdlp(url, save_path='output', resolution='1080', cutoff_time if new_filename != filename: os.rename(os.path.join(save_path, file), os.path.join(save_path, new_filename + ext)) - # cut the video to make demo + # 提供裁剪功能 但不是所有的都是用 if cutoff_time: print(f"Cutoff time: {cutoff_time}, Now checking video duration...") video_file = find_video_files(save_path) diff --git a/core/step2_whisperX.py b/core/step2_whisperX.py index 1fc5c8a1..df420599 100644 --- a/core/step2_whisperX.py +++ b/core/step2_whisperX.py @@ -8,6 +8,8 @@ from core.all_whisper_methods.audio_preprocess import process_transcription, convert_video_to_audio, split_audio, save_results, CLEANED_CHUNKS_EXCEL_PATH, normalize_audio_volume from core.step1_ytdlp import find_video_files +import json + def transcribe(): if os.path.exists(CLEANED_CHUNKS_EXCEL_PATH): rprint("[yellow]⚠️ Transcription results already exist, skipping transcription step.[/yellow]") @@ -24,9 +26,13 @@ def transcribe(): else: vocal_audio = RAW_AUDIO_FILE - # step2 Extract audio + # # step2 Extract audio segments = split_audio(RAW_AUDIO_FILE) + # 输出数组到JSON文件 + # with open('log/segments.json', 'w', encoding='utf-8') as f: + # json.dump(segments, f, indent=4, ensure_ascii=False, default=str) + # step3 Transcribe audio all_results = [] runtime = load_key("whisper.runtime") @@ -41,17 +47,29 @@ def transcribe(): rprint("[cyan]🎤 Transcribing audio with ElevenLabs API...[/cyan]") for start, end in segments: - result = ts(RAW_AUDIO_FILE, vocal_audio, start, end) + result = ts(RAW_AUDIO_FILE, vocal_audio,start, end) all_results.append(result) + + # # 输出数组到JSON文件 + # with open('log/all_results.json', 'w', encoding='utf-8') as f: + # json.dump(all_results, f, indent=4, ensure_ascii=False, default=str) # step4 Combine results combined_result = {'segments': []} for result in all_results: combined_result['segments'].extend(result['segments']) + # with open('log/combined_result.json', 'w', encoding='utf-8') as f: + # json.dump(combined_result, f, indent=4, ensure_ascii=False, default=str) + # step5 Process df df = process_transcription(combined_result) + + # print(len(df['start'].unique())) + # print(len(df['start'])) + # print(len(df['start'].unique()) == len(df['start']) ) + # df.to_excel(CLEANED_CHUNKS_EXCEL_PATH, index=False) save_results(df) if __name__ == "__main__": - transcribe() \ No newline at end of file + transcribe() diff --git a/core/step7_merge_sub_to_vid.py b/core/step7_merge_sub_to_vid.py index 2fa218f8..3c04d63b 100644 --- a/core/step7_merge_sub_to_vid.py +++ b/core/step7_merge_sub_to_vid.py @@ -36,15 +36,32 @@ TRANS_SRT = f"{OUTPUT_DIR}/trans.srt" def check_gpu_available(): - try: - result = subprocess.run(['ffmpeg', '-encoders'], capture_output=True, text=True) - return 'h264_nvenc' in result.stdout - except: + # 当前gpu + # try: + # result = subprocess.run(['ffmpeg', '-encoders'], capture_output=True, text=True) + # return 'h264_nvenc' in result.stdout + # except: return False -def merge_subtitles_to_video(): +def merge_subtitles_to_video(test_mode=False, test_duration=30): + """ + 合并字幕到视频 + + Args: + test_mode (bool): 是否为测试模式,默认False + test_duration (int): 测试模式下的时长(秒),默认30秒 + """ video_file = find_video_files() - os.makedirs(os.path.dirname(OUTPUT_VIDEO), exist_ok=True) + + # 🔥 根据模式决定输出文件 + if test_mode: + output_video = f"{OUTPUT_DIR}/output_sub_test_{test_duration}s.mp4" + rprint(f"[bold yellow]🧪 测试模式:只处理前{test_duration}秒[/bold yellow]") + else: + output_video = OUTPUT_VIDEO + rprint("[bold blue]📹 正式模式:处理完整视频[/bold blue]") + + os.makedirs(os.path.dirname(output_video), exist_ok=True) # Check resolution if not load_key("burn_subtitles"): @@ -53,7 +70,7 @@ def merge_subtitles_to_video(): # Create a black frame frame = np.zeros((1080, 1920, 3), dtype=np.uint8) fourcc = cv2.VideoWriter_fourcc(*'mp4v') - out = cv2.VideoWriter(OUTPUT_VIDEO, fourcc, 1, (1920, 1080)) + out = cv2.VideoWriter(output_video, fourcc, 1, (1920, 1080)) out.write(frame) out.release() @@ -69,8 +86,23 @@ def merge_subtitles_to_video(): TARGET_HEIGHT = int(video.get(cv2.CAP_PROP_FRAME_HEIGHT)) video.release() rprint(f"[bold green]Video resolution: {TARGET_WIDTH}x{TARGET_HEIGHT}[/bold green]") + + # 🔥 修复AV1问题和文件兼容性的FFmpeg命令 ffmpeg_cmd = [ - 'ffmpeg', '-i', video_file, + 'ffmpeg', + '-y', # 🔥 强制覆盖输出文件 + '-hwaccel', 'none', # 禁用硬件加速,解决AV1问题 + '-fflags', '+genpts', # 生成时间戳 + '-avoid_negative_ts', 'make_zero', # 避免时间戳问题 + '-i', video_file, + ] + + # 🔥 如果是测试模式,添加时长限制 + if test_mode: + ffmpeg_cmd.extend(['-t', str(test_duration)]) + + # 添加视频滤镜 + ffmpeg_cmd.extend([ '-vf', ( f"scale={TARGET_WIDTH}:{TARGET_HEIGHT}:force_original_aspect_ratio=decrease," f"pad={TARGET_WIDTH}:{TARGET_HEIGHT}:(ow-iw)/2:(oh-ih)/2," @@ -80,32 +112,90 @@ def merge_subtitles_to_video(): f"subtitles={TRANS_SRT}:force_style='FontSize={TRANS_FONT_SIZE},FontName={TRANS_FONT_NAME}," f"PrimaryColour={TRANS_FONT_COLOR},OutlineColour={TRANS_OUTLINE_COLOR},OutlineWidth={TRANS_OUTLINE_WIDTH}," f"BackColour={TRANS_BACK_COLOR},Alignment=2,MarginV=27,BorderStyle=4'" - ).encode('utf-8'), - ] + ), + ]) + # GPU检测和编码设置 gpu_available = check_gpu_available() - if gpu_available: + if gpu_available and not test_mode: # 测试模式使用CPU更稳定 rprint("[bold green]NVIDIA GPU encoder detected, will use GPU acceleration.[/bold green]") ffmpeg_cmd.extend(['-c:v', 'h264_nvenc']) else: rprint("[bold yellow]No NVIDIA GPU encoder detected, will use CPU instead.[/bold yellow]") + ffmpeg_cmd.extend(['-c:v', 'libx264']) + if test_mode: + ffmpeg_cmd.extend(['-preset', 'fast']) # 测试模式使用快速编码 + else: + ffmpeg_cmd.extend(['-preset', 'medium']) # 正式模式使用平衡编码 - ffmpeg_cmd.extend(['-y', OUTPUT_VIDEO]) + # 🔥 修复文件兼容性问题 + ffmpeg_cmd.extend([ + '-pix_fmt', 'yuv420p', # 🔥 确保像素格式兼容性 + '-c:a', 'aac', # 🔥 重新编码音频为AAC确保兼容性 + '-b:a', '128k', # 音频比特率 + '-movflags', '+faststart', # 🔥 优化MP4文件结构,便于播放 + output_video + ]) - print("🎬 Start merging subtitles to video...") + mode_text = f"前{test_duration}秒测试" if test_mode else "完整视频" + print(f"🎬 开始处理{mode_text}...") start_time = time.time() + + # 🔥 改进错误处理,过滤AV1警告 process = subprocess.Popen(ffmpeg_cmd) try: - process.wait() + stdout, stderr = process.communicate() if process.returncode == 0: - print(f"\n✅ Done! Time taken: {time.time() - start_time:.2f} seconds") + print(f"\n✅ 完成!处理时间: {time.time() - start_time:.2f} 秒") + print(f"📁 输出文件: {output_video}") + + # 🔥 验证输出文件 + if os.path.exists(output_video): + file_size = os.path.getsize(output_video) / (1024 * 1024) # MB + print(f"📊 文件大小: {file_size:.2f} MB") + + # 简单验证文件是否可读 + try: + test_video = cv2.VideoCapture(output_video) + frame_count = int(test_video.get(cv2.CAP_PROP_FRAME_COUNT)) + test_video.release() + print(f"✅ 文件验证通过,总帧数: {frame_count}") + except: + print("⚠️ 文件可能有问题,请检查") + else: - print("\n❌ FFmpeg execution error") + print(f"\n❌ FFmpeg执行错误:") + # 🔥 过滤掉AV1相关的重复警告 + filtered_errors = [] + for line in stderr.split('\n'): + if not any(keyword in line for keyword in [ + 'Missing Sequence Header', + 'hardware accelerated AV1', + 'Failed to get pixel format', + 'Your platform doesn\'t suppport' + ]): + if line.strip(): # 只保留非空行 + filtered_errors.append(line) + + # 只显示最后几行有用的错误信息 + if filtered_errors: + print('\n'.join(filtered_errors[-5:])) + else: + print("处理完成,但有一些AV1兼容性警告(已过滤)") + except Exception as e: - print(f"\n❌ Error occurred: {e}") + print(f"\n❌ 发生错误: {e}") if process.poll() is None: process.kill() +# 🔥 使用示例 if __name__ == "__main__": - merge_subtitles_to_video() \ No newline at end of file + # 测试模式:只处理前30秒 + # merge_subtitles_to_video(test_mode=True, test_duration=30) + + # 正式模式:处理完整视频 + # merge_subtitles_to_video(test_mode=False) + + # 或者简写 + merge_subtitles_to_video() # 默认正式模式 \ No newline at end of file diff --git a/total.sh b/total.sh new file mode 100644 index 00000000..9cb3ada0 --- /dev/null +++ b/total.sh @@ -0,0 +1,18 @@ +conda activate videolingo +rm -rf output/ +python -m core.step1_ytdlp +# 语音识别 +python -m core.step2_whisperX + +# # # 文本分割 +python -m core.step3_1_spacy_split +python -m core.step3_2_splitbymeaning + +# # 文本处理和翻译 +python -m core.step4_1_summarize +python -m core.step4_2_translate_all + +# 字幕处理 +python -m core.step5_splitforsub +python -m core.step6_generate_final_timeline +python -m core.step7_merge_sub_to_vid