diff --git a/.env.example b/.env.example deleted file mode 100644 index 9d3b64d..0000000 --- a/.env.example +++ /dev/null @@ -1,40 +0,0 @@ -# OpenRouter API 配置(必需) -OPENROUTER_API_KEY=your-api-key-here -OPENROUTER_API_URL=https://openrouter.ai/api/v1 -OPENROUTER_APP_NAME=video_note_generator -OPENROUTER_HTTP_REFERER=https://github.com - -# Unsplash API 配置(必需) -UNSPLASH_ACCESS_KEY=your-unsplash-access-key-here -UNSPLASH_SECRET_KEY=your-unsplash-secret-key-here - -# 输出目录配置 -OUTPUT_DIR=generated_notes - -# Whisper 配置 -WHISPER_MODEL=medium # 可选: tiny, base, small, medium, large-v2 -WHISPER_LANGUAGE=zh # 默认语言,可选:zh, en, ja 等 - -# FFmpeg 配置 -# Windows 用户需要设置 FFmpeg 路径,Mac/Linux 用户通常不需要 -# FFMPEG_PATH=C:\\path\\to\\ffmpeg.exe - -# 代理配置(可选,如果你在中国大陆使用,建议配置) -# HTTP_PROXY=http://127.0.0.1:7890 -# HTTPS_PROXY=http://127.0.0.1:7890 - -# 内容生成配置 -MAX_TOKENS=2000 # 生成小红书内容的最大长度 -CONTENT_CHUNK_SIZE=2000 # 长文本分块大小(字符数) -TEMPERATURE=0.7 # AI 创造性程度 (0.0-1.0) -TOP_P=0.9 # 采样阈值 (0.0-1.0) - -# 笔记样式配置 -USE_EMOJI=true # 是否在内容中使用表情符号 -TAG_COUNT=5 # 生成的标签数量 -MIN_PARAGRAPHS=3 # 最少段落数 -MAX_PARAGRAPHS=6 # 最多段落数 - -# 调试配置 -DEBUG=false # 是否启用调试模式 -LOG_LEVEL=info # 日志级别:debug, info, warning, error diff --git a/.gitignore b/.gitignore index 02256fc..ca766f3 100644 --- a/.gitignore +++ b/.gitignore @@ -43,3 +43,5 @@ urls.txt # OS specific .DS_Store Thumbs.db +temp_*/ +logs/ diff --git a/README.md b/README.md index 0c36ff3..3f09e10 100644 --- a/README.md +++ b/README.md @@ -45,13 +45,17 @@ ![Version](https://img.shields.io/badge/version-1.0.0-blue.svg) ![Python](https://img.shields.io/badge/python-3.8+-green.svg) -## 👤 作者信息 +## Huggingface space: [rednote-gen](https://huggingface.co/spaces/windane/rednote-gen) +PS: 最好还是本地运行,Space 直接下载视频失败率高,可以使用MD文件解析。 + +## 👤 原作者信息 - **作者**:玄清 - **博客**:[天天悦读](https://blog.example.com) - **Email**:grow8org@gmail.com - **GitHub**:[whotto/Video_note_generator](https://github.com/whotto/Video_note_generator) + ## 🎯 应用场景 - **内容创作者**:快速将视频/直播内容转换为文章 @@ -96,24 +100,12 @@ graph TD ## 🚀 使用方式 -支持三种使用方式: - -1. **处理单个视频**: -```bash -python video_note_generator.py https://example.com/video -``` - -2. **批量处理 URL 文件**: -```bash -# urls.txt 文件,每行一个视频链接 -python video_note_generator.py urls.txt +启动 WebUI: +```shell +$ python web.py ``` -3. **处理 Markdown 文件**: -```bash -# 支持 Markdown 链接和直接 URL -python video_note_generator.py notes.md -``` +打开:http://127.0.0.1:7860 ## 🛠️ 使用工具 @@ -134,40 +126,13 @@ python video_note_generator.py notes.md # 安装 Python 依赖 pip install -r requirements.txt - -# 配置环境变量 -cp .env.example .env -``` - -### 2. 配置 API 密钥 - -编辑 `.env` 文件,填入必要的 API 密钥: -```ini -# OpenRouter API(必需) -OPENROUTER_API_KEY=your-api-key-here - -# Unsplash API(必需) -UNSPLASH_ACCESS_KEY=your-unsplash-access-key-here -UNSPLASH_SECRET_KEY=your-unsplash-secret-key-here -``` - -### 3. 开始使用 - -1. 创建 `urls.txt` 文件,每行一个视频链接 -2. 运行环境检查: -```bash -python check_environment.py -``` -3. 运行生成器: -```bash -python video_note_generator.py test.md ``` ## 📄 输出文件 每个视频会生成三个文件: -1. **原始笔记** (`YYYYMMDD_HHMMSS.md`): +1. **原始笔记** : - 完整的视频转录文本 - 保留所有细节内容 @@ -185,14 +150,18 @@ python video_note_generator.py test.md ## ⚙️ 配置说明 -在 `.env` 文件中可以调整以下参数: +在设置页面填写必要的 API Key: +- OpenRouter API(必需) +- Unsplash API (建议) + +还有其他设置: -```ini -# 内容生成配置 +``` MAX_TOKENS=2000 # 生成小红书内容的最大长度 CONTENT_CHUNK_SIZE=2000 # 长文本分块大小(字符数) TEMPERATURE=0.7 # AI 创造性程度 (0.0-1.0) + # 代理设置(可选) # HTTP_PROXY=http://127.0.0.1:7890 # HTTPS_PROXY=http://127.0.0.1:7890 diff --git a/requirements.txt b/requirements.txt index 879604f..8e04181 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,17 +1,12 @@ -openai>=1.0.0 -httpx>=0.24.1 -yt-dlp>=2023.11.16 -openai-whisper>=2023.11.17 -python-dotenv>=1.0.0 -requests>=2.31.0 -beautifulsoup4>=4.12.2 -python-unsplash>=1.1.0 -Pillow>=10.1.0 -urllib3>=2.1.0 -certifi>=2023.11.17 -ffmpeg-python>=0.2.0 - -# Optional dependencies for better performance -torch>=2.1.0 -pytube>=15.0.0 -you-get>=0.4.1650 +beautifulsoup4==4.12.3 +gradio==5.9.1 +gradio_modal==0.0.4 +httpx==0.28.1 +openai==1.58.1 +openai_whisper==20240930 +python-dotenv==1.0.1 +pytube==15.0.0 +setuptools==75.1.0 +yt_dlp==2024.12.23 +python-unsplash==1.2.5 +moviepy~=2.1.1 \ No newline at end of file diff --git a/src/__init__.py b/src/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/adapter/__init__.py b/src/adapter/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/adapter/ffmpeg.py b/src/adapter/ffmpeg.py new file mode 100644 index 0000000..c49d71b --- /dev/null +++ b/src/adapter/ffmpeg.py @@ -0,0 +1,24 @@ +import subprocess + +from src.logger import app_logger + + +class FfmpegAdapter: + def __init__(self): + ffmpeg_path = None + try: + subprocess.run(["/opt/homebrew/bin/ffmpeg", "-version"], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + app_logger.info("✅ ffmpeg is available at /opt/homebrew/bin/ffmpeg") + ffmpeg_path = "/opt/homebrew/bin/ffmpeg" + except Exception: + try: + subprocess.run(["ffmpeg", "-version"], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + app_logger.info("✅ ffmpeg is available (from PATH)") + ffmpeg_path = "ffmpeg" + except Exception as e: + app_logger.warning(f"⚠️ ffmpeg not found: {str(e)}") + self.ffmpeg_path = ffmpeg_path diff --git a/src/adapter/openrouter.py b/src/adapter/openrouter.py new file mode 100644 index 0000000..a4a454c --- /dev/null +++ b/src/adapter/openrouter.py @@ -0,0 +1,78 @@ +import asyncio +from typing import Optional + +import openai + +from src.environment.env import Environment +from src.logger import app_logger +from src.video.prompt import share_prompt + + +class OpenRouterAdapter: + def __init__(self, env: Environment): + self.client = openai.OpenAI( + api_key=env.openrouter_api_key, + base_url=env.openrouter_api_url or 'https://openrouter.ai/api/v1', + default_headers={ + "HTTP-Referer": env.openrouter_http_referer, + "X-Title": env.openrouter_app_name + } + ) + self.ai_model = env.openrouter_ai_model + self.api_available = False + # Initialize the wait queue and flow limiter + self.wait_queue = asyncio.Queue() + self.flow_limiter = asyncio.Semaphore(5) # Allow 5 requests per minute + + def connect(self): + if self.client.api_key: + try: + app_logger.info(f"正在测试 OpenRouter API 连接...") + response = self.client.models.list() + app_logger.info("✅ OpenRouter API 连接测试成功") + self.api_available = True + except Exception as e: + app_logger.error(f"❌ OpenRouter API 连接测试失败: {e}") + app_logger.error("将继续尝试使用API,但可能会遇到问题") + self.api_available = False + + async def generate(self, system_prompt_type, user_prompt_type, content, + temperature=0.7, max_tokens=4000) -> Optional[str]: + if not self.api_available: + app_logger.error("OpenRouter API 不可用,无法生成") + return None + + # Wait for the flow limiter to allow the request + await self.flow_limiter.acquire() + + try: + system_prompt = share_prompt(prompt_type=system_prompt_type, content=content) + user_prompt = share_prompt(prompt_type=user_prompt_type, content=content) or content + app_logger.info('OpenRouter API 开始请求') + + # Add the request to the wait queue + await self.wait_queue.put(None) + + # Wait for the previous requests to complete + await asyncio.sleep(60 / 5) # 5 requests per minute + + response = self.client.chat.completions.create( + model=self.ai_model, + messages=[ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": user_prompt} + ], + temperature=temperature, + max_tokens=max_tokens + ) + if not response.choices: + app_logger.error("OpenRouter API 返回结果为空") + return None + return response.choices[0].message.content + except Exception as e: + app_logger.error(f"OpenRouter API 请求失败: {e}") + return None + finally: + # Release the flow limiter + self.flow_limiter.release() + await self.wait_queue.get() \ No newline at end of file diff --git a/src/adapter/unsplash.py b/src/adapter/unsplash.py new file mode 100644 index 0000000..60ec752 --- /dev/null +++ b/src/adapter/unsplash.py @@ -0,0 +1,99 @@ +from typing import List + +import httpx + +from src.environment.env import Environment +from unsplash.api import Api as UnsplashApi +from unsplash.auth import Auth as UnsplashAuth + +from src.logger import app_logger + + +class UnsplashAdapter: + def __init__(self, env: Environment): + self.unsplash_client = None + self.unsplash_available = False + self.unsplash_access_key = env.unsplash_access_key + + if env.unsplash_access_key: + auth = UnsplashAuth( + client_id=env.unsplash_access_key, + client_secret=None, + redirect_uri=None + ) + try: + unsplash_client = UnsplashApi(auth) + self.unsplash_client = unsplash_client + self.unsplash_available = True + app_logger.info("✅ Unsplash API 配置成功") + except Exception as e: + app_logger.error(f"❌ Failed to initialize Unsplash client: {str(e)}") + self.unsplash_available = False + else: + app_logger.warning("⚠️ 未设置 Unsplash API 密钥") + self.unsplash_available = False + + def get_images(self, query: str, count: int = 3) -> List[str]: + if not self.unsplash_available: + app_logger.warning("⚠️ Unsplash API 不可用") + return [] + + try: + headers = { + 'Authorization': f'Client-ID {self.unsplash_access_key}' + } + # Query each keyword + all_photos = [] + for keyword in query.split(','): + response = httpx.get( + 'https://api.unsplash.com/search/photos', + params={ + 'query': keyword.strip(), + 'per_page': count, + 'orientation': 'portrait', # 小红书偏好竖版图片 + 'content_filter': 'high' # 只返回高质量图片 + }, + headers=headers, + verify=False # 禁用SSL验证 + ) + + if response.status_code == 200: + data = response.json() + if data['results']: + # 获取图片URL,优先使用regular尺寸 + photos = [photo['urls'].get('regular', photo['urls']['small']) + for photo in data['results']] + all_photos.extend(photos) + + # 如果收集到的图片不够,用最后一个关键词继续搜索 + while len(all_photos) < count and query: + response = httpx.get( + 'https://api.unsplash.com/search/photos', + params={ + 'query': query.split(',')[-1].strip(), + 'per_page': count - len(all_photos), + 'orientation': 'portrait', + 'content_filter': 'high', + 'page': 2 # 获取下一页的结果 + }, + headers=headers, + verify=False + ) + + if response.status_code == 200: + data = response.json() + if data['results']: + photos = [photo['urls'].get('regular', photo['urls']['small']) + for photo in data['results']] + all_photos.extend(photos) + else: + break + else: + break + + # 返回指定数量的图片 + return all_photos[:count] + + except Exception as e: + app_logger.error(f"⚠️ 获取图片失败: {str(e)}") + return [] \ No newline at end of file diff --git a/src/downloader/__init__.py b/src/downloader/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/downloader/downloader.py b/src/downloader/downloader.py new file mode 100644 index 0000000..cfbe97b --- /dev/null +++ b/src/downloader/downloader.py @@ -0,0 +1,210 @@ +import os +import subprocess +import time +from typing import Optional, Tuple, Dict + +import httpx +import yt_dlp + +from src.downloader.error import DownloadError +from src.logger import app_logger + + +def download_video(platform_type: str, url: str, temp_dir: str) -> Tuple[Optional[str], Optional[Dict[str, str]]]: + """Download video and get file path & info""" + try: + if not platform_type: + raise DownloadError("不支持的视频平台", "unknown", "platform_error") + # Basic download options + options = { + 'format': 'bestaudio/best', + 'outtmpl': os.path.join(temp_dir, '%(title)s.%(ext)s'), + 'postprocessors': [{ + 'key': 'FFmpegExtractAudio', + 'preferredcodec': 'mp3', + }], + 'quiet': True, + 'no_warnings': True, + } + + # Download Video + for attempt in range(3): # 最多重试3次 + try: + with yt_dlp.YoutubeDL(options) as ydl: + app_logger.info(f"正在尝试下载(第{attempt + 1}次)...") + info = ydl.extract_info(url, download=True) + if not info: + raise DownloadError("无法获取视频信息", platform_type, "info_error") + + # 找到下载的音频文件 + downloaded_files = [f for f in os.listdir(temp_dir) if f.endswith('.mp3')] + if not downloaded_files: + raise DownloadError("未找到下载的音频文件", platform_type, "file_error") + + audio_path = os.path.join(temp_dir, downloaded_files[0]) + if not os.path.exists(audio_path): + raise DownloadError("音频文件不存在", platform_type, "file_error") + + video_info = { + 'title': info.get('title', '未知标题'), + 'uploader': info.get('uploader', '未知作者'), + 'description': info.get('description', ''), + 'duration': info.get('duration', 0), + 'platform': platform_type + } + + app_logger.info(f"✅ {platform_type}视频下载成功") + return audio_path, video_info + + except Exception as e: + app_logger.warning(f"⚠️ 下载失败(第{attempt + 1}次): {str(e)}") + if attempt < 2: # 如果不是最后一次尝试 + app_logger.warning("等待5秒后重试...") + time.sleep(5) + else: + raise # 最后一次失败,抛出异常 + except Exception as e: + error_msg = _handle_download_error(e, platform_type, url) + app_logger.error(f"⚠️ {error_msg}") + return None, None + + +def _get_alternative_download_method(platform: str, url: str) -> Optional[str]: + """Get alternative download type""" + if platform == 'youtube': + return 'pytube' + elif platform == 'douyin': + return 'requests' + elif platform == 'bilibili': + return 'you-get' + return None + +def download_with_alternative_method(url: str, temp_dir: str, method: str) -> Optional[str]: + """Use alternative method download""" + try: + if method == 'you-get': + cmd = ['you-get', '--no-proxy', '--no-check-certificate', '-o', temp_dir, url] + result = subprocess.run(cmd, capture_output=True, text=True) + if result.returncode == 0: + # 查找下载的文件 + files = [f for f in os.listdir(temp_dir) if f.endswith(('.mp4', '.flv', '.webm'))] + if files: + return os.path.join(temp_dir, files[0]) + raise Exception(result.stderr) + + elif method == 'requests': + # 使用requests直接下载 + headers = { + 'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 13_2_3 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/13.0.3 Mobile/15E148 Safari/604.1', + 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', + 'Accept-Language': 'en-US,en;q=0.5', + 'Connection': 'keep-alive', + 'Upgrade-Insecure-Requests': '1' + } + + # 首先获取页面内容 + response = httpx.get(url, headers=headers, verify=False) + + if response.status_code == 200: + # 尝试从页面中提取视频URL + from bs4 import BeautifulSoup + soup = BeautifulSoup(response.text, 'html.parser') + + video_url = None + # 查找video标签 + video_tags = soup.find_all('video') + for video in video_tags: + src = video.get('src') or video.get('data-src') + if src: + video_url = src + break + + if not video_url: + # 尝试查找其他可能包含视频URL的元素 + import re + video_patterns = [ + r'https?://[^"\'\s]+\.(?:mp4|m3u8)[^"\'\s]*', + r'playAddr":"([^"]+)"', + r'play_url":"([^"]+)"' + ] + for pattern in video_patterns: + matches = re.findall(pattern, response.text) + if matches: + video_url = matches[0] + break + + if video_url: + if not video_url.startswith('http'): + video_url = 'https:' + video_url if video_url.startswith('//') else video_url + + # 下载视频 + video_response = httpx.get(video_url, headers=headers, stream=True, verify=False) + if video_response.status_code == 200: + file_path = os.path.join(temp_dir, 'video.mp4') + with open(file_path, 'wb') as f: + for chunk in video_response.iter_content(chunk_size=8192): + if chunk: + f.write(chunk) + return file_path + + raise Exception(f"无法下载视频: HTTP {video_response.status_code}") + raise Exception(f"无法访问页面: HTTP {response.status_code}") + + elif method == 'pytube': + # 禁用SSL验证 + import ssl + ssl._create_default_https_context = ssl._create_unverified_context + + from pytube import YouTube + yt = YouTube(url) + # 获取最高质量的MP4格式视频 + video = yt.streams.filter(progressive=True, file_extension='mp4').order_by('resolution').desc().first() + if video: + return video.download(output_path=temp_dir) + raise Exception("未找到合适的视频流") + + except Exception as e: + print(f"备用下载方法 {method} 失败: {str(e)}") + return None + +def download_image(url: str, output_file_path: str) -> str: + """ + 下载图片 + """ + try: + response = httpx.get(url, verify=False) + if response.status_code == 200: + with open(output_file_path, 'wb') as f: + f.write(response.content) + return output_file_path + except Exception as e: + print(f"下载图片失败: {str(e)}") + return None + + +def _handle_download_error(error: Exception, platform: str, url: str) -> str: + """ + 处理下载错误并返回用户友好的错误消息 + + Args: + error: 异常对象 + platform: 平台名称 + url: 视频URL + + Returns: + str: 用户友好的错误消息 + """ + error_msg = str(error) + + if "SSL" in error_msg: + return "⚠️ SSL证书验证失败,请检查网络连接" + elif "cookies" in error_msg.lower(): + return f"⚠️ {platform}访问被拒绝,可能需要更新cookie或更换IP地址" + elif "404" in error_msg: + return "⚠️ 视频不存在或已被删除" + elif "403" in error_msg: + return "⚠️ 访问被拒绝,可能需要登录或更换IP地址" + elif "unavailable" in error_msg.lower(): + return "⚠️ 视频当前不可用,可能是地区限制或版权问题" + else: + return f"⚠️ 下载失败: {error_msg}" \ No newline at end of file diff --git a/src/downloader/error.py b/src/downloader/error.py new file mode 100644 index 0000000..c5417d2 --- /dev/null +++ b/src/downloader/error.py @@ -0,0 +1,11 @@ +class DownloadError(Exception): + """Customize download error class""" + + def __init__(self, message: str, platform: str, error_type: str, details: str = None): + self.message = message + self.platform = platform + self.error_type = error_type + self.details = details + super().__init__(self.message) + + \ No newline at end of file diff --git a/src/environment/__init__.py b/src/environment/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/environment/env.py b/src/environment/env.py new file mode 100644 index 0000000..7024be6 --- /dev/null +++ b/src/environment/env.py @@ -0,0 +1,33 @@ +import ssl +from dotenv import load_dotenv +import os + +from src.setting.setting import global_setting + + +class Environment(object): + def __init__(self, model_name): + self.openrouter_ai_model = model_name + # OpenRouter configuration + self.openrouter_api_key = global_setting['openrouter_api_key'] + self.openrouter_api_url = global_setting['openrouter_api_url'] + self.openrouter_app_name = global_setting['openrouter_app_name'] + self.openrouter_http_referer = global_setting['openrouter_http_referer'] + self.openrouter_available = False + + # Unsplash configuration + self.unsplash_access_key = global_setting['unsplash_access_key'] + self.unsplash_secret_key = global_setting['unsplash_secret_key'] + self.unsplash_redirect_uri = global_setting['unsplash_redirect_uri'] + self.unsplash_available = False + + @classmethod + def config_proxy(cls): + if global_setting['http_proxy']: + os.environ['HTTP_PROXY'] = global_setting['http_proxy'] + if global_setting['https_proxy']: + os.environ['HTTPS_PROXY'] = global_setting['https_proxy'] + + @classmethod + def disabled_ssl_verify(cls): + ssl._create_default_https_context = ssl._create_unverified_context \ No newline at end of file diff --git a/src/logger/__init__.py b/src/logger/__init__.py new file mode 100644 index 0000000..a33ac9b --- /dev/null +++ b/src/logger/__init__.py @@ -0,0 +1,3 @@ +from .logger import Logger + +app_logger = Logger('video-note-generator') \ No newline at end of file diff --git a/src/logger/logger.py b/src/logger/logger.py new file mode 100644 index 0000000..82ea269 --- /dev/null +++ b/src/logger/logger.py @@ -0,0 +1,50 @@ +import datetime +import logging +import os +from logging.handlers import TimedRotatingFileHandler + + +class Logger: + def __init__(self, name, log_level=logging.INFO): + self.logger = logging.getLogger(name) + self.logger.setLevel(log_level) + + # 创建日志目录 + log_dir = os.getenv('LOG_DIR', 'logs') + if not os.path.exists(log_dir): + os.makedirs(log_dir) + self.log_dir = log_dir + + # 文件处理器 + now_date = datetime.datetime.now().strftime('%Y-%m-%d') + file_handler = TimedRotatingFileHandler( + filename=os.path.join(log_dir, f'{name}-{now_date}.log'), + when='midnight', + backupCount=7 + ) + file_handler.setFormatter(logging.Formatter( + '%(asctime)s - %(name)s - %(levelname)s - %(message)s' + )) + self.logger.addHandler(file_handler) + + # 控制台处理器 + console_handler = logging.StreamHandler() + console_handler.setFormatter(logging.Formatter( + '%(asctime)s - %(name)s - %(levelname)s - %(message)s' + )) + self.logger.addHandler(console_handler) + + def debug(self, message): + self.logger.debug(message) + + def info(self, message): + self.logger.info(message) + + def warning(self, message): + self.logger.warning(message) + + def error(self, message): + self.logger.error(message) + + def critical(self, message): + self.logger.critical(message) diff --git a/src/platform/__init__.py b/src/platform/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/platform/platfrom.py b/src/platform/platfrom.py new file mode 100644 index 0000000..7f379b0 --- /dev/null +++ b/src/platform/platfrom.py @@ -0,0 +1,41 @@ +import os +from typing import Optional + + +class Platform: + def __init__(self): + self._init_cookie() + self.type = None + + def detect(self, url: str): + """ + 确定视频平台 + + Args: + url: 视频URL + + Returns: + str: 平台名称 ('youtube', 'douyin', 'bilibili') 或 None + """ + if 'youtube.com' in url or 'youtu.be' in url: + self.type = 'youtube' + elif 'douyin.com' in url: + self.type = 'douyin' + elif 'bilibili.com' in url: + self.type = 'bilibili' + + def _init_cookie(self): + """Init cookie parameter""" + self.cookie_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'cookies') + self.platform_cookies = { + 'douyin': os.path.join(self.cookie_dir, 'douyin_cookies.txt'), + 'bilibili': os.path.join(self.cookie_dir, 'bilibili_cookies.txt'), + 'youtube': os.path.join(self.cookie_dir, 'youtube_cookies.txt') + } + + def _validate_cookies(self, platform: str) -> bool: + if platform not in self.platform_cookies: + return False + cookie_file = self.platform_cookies[platform] + return os.path.exists(cookie_file) + diff --git a/src/setting/__init__.py b/src/setting/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/setting/setting.py b/src/setting/setting.py new file mode 100644 index 0000000..5612585 --- /dev/null +++ b/src/setting/setting.py @@ -0,0 +1,75 @@ +# src/settings/setting.py +import json +import gradio as gr + +from src.logger import app_logger + +global_setting = { + "openrouter_api_key": '', + "openrouter_api_url": 'https://openrouter.ai/api/v1', + "openrouter_app_name": 'rednote-generator', + "openrouter_http_referer": 'https://github.com', + "unsplash_access_key": '', + "unsplash_secret_key": '', + "unsplash_redirect_uri": 'https://github.com', + "whisper_model": 'medium', + "whisper_language": 'zh', + "ffmpeg_path": '', + "http_proxy": '', + "https_proxy": '', + "output_dir": 'generated_notes', + "max_tokens": 5000, + "content_chunk_size": 2000, + "temperature": 0.7, + "top_p": 0.9, + "use_emoji": True, + "tag_count": 5, + "min_paragraphs": 3, + "max_paragraphs": 6, + "debug": False, + "log_level": 'info' +} + + +def update_and_save_settings(*args): + keys = list(global_setting.keys()) + updated_settings = {key: value for key, value in zip(keys, args)} + global_setting.update(updated_settings) + + required_keys_filled = check_required_keys() + status_message = "设置已更新" if required_keys_filled else "⚠️ 必选参数配置缺失" + + # convert global_setting to json str + saved_settings = json.dumps(global_setting) + + return [status_message, gr.update(visible=False), saved_settings] + + +# 修改 load_settings 函数 +def load_settings(saved_settings: str): + try: + # convert json str to dict + if saved_settings: + saved_settings = json.loads(saved_settings) + global_setting.update(saved_settings) + except Exception as e: + gr.Error(f"Error loading settings: {e}") + + required_keys_filled = check_required_keys() + return_dict = { + 'warning_icon': gr.update(visible=not required_keys_filled), + } + + # append global_setting values to return_dict + for key in global_setting.keys(): + return_dict[key] = global_setting[key] + + return [return_dict[key] for key in return_dict.keys()] + + +def check_required_keys(setting_str: str = None): + if setting_str: + setting_dict = json.loads(setting_str) + global_setting.update(setting_dict) + required_keys = ["openrouter_api_key"] + return all(global_setting[key] for key in required_keys) \ No newline at end of file diff --git a/src/util/__init__.py b/src/util/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/util/extend.py b/src/util/extend.py new file mode 100644 index 0000000..bd355f1 --- /dev/null +++ b/src/util/extend.py @@ -0,0 +1,5 @@ +def extend(cls): + def decorator(func): + setattr(cls, func.__name__, func) + return func + return decorator \ No newline at end of file diff --git a/src/util/url.py b/src/util/url.py new file mode 100644 index 0000000..b6e6c14 --- /dev/null +++ b/src/util/url.py @@ -0,0 +1,81 @@ +import re +from typing import List + +from src.logger import app_logger + + +def extract_urls_from_text(text: str) -> list: + """ + 从文本中提取所有有效的URL + 支持的URL格式: + - 视频平台URL (YouTube, Bilibili, 抖音等) + - 包含http://或https://的标准URL + - 短链接URL (如t.co等) + + Args: + text: 输入文本 + + Returns: + list: 提取到的有效URL列表 + """ + # URL正则模式 + url_patterns = [ + # 标准URL + r'https?://[^\s<>\[\]"\']+[^\s<>\[\]"\'.,]', + # 短链接 + r'https?://[a-zA-Z0-9]+\.[a-zA-Z]{2,3}/[^\s<>\[\]"\']+', + # Bilibili + r'BV[a-zA-Z0-9]{10}', + # 抖音分享链接 + r'v\.douyin\.com/[a-zA-Z0-9]+', + ] + + urls = [] + for pattern in url_patterns: + matches = re.finditer(pattern, text, re.IGNORECASE) + for match in matches: + url = match.group() + # 对于不完整的BV号,添加完整的bilibili前缀 + if url.startswith('BV'): + url = f'https://www.bilibili.com/video/{url}' + urls.append(url) + + # 去重并保持顺序 + seen = set() + return [url for url in urls if not (url in seen or seen.add(url))] + +def process_markdown_file(self, input_file: str) -> List[str]: + """处理markdown文件,生成优化后的笔记 + + Args: + input_file (str): 输入的markdown文件路径 + """ + try: + # 读取markdown文件 + with open(input_file, 'r', encoding='utf-8') as f: + content = f.read() + + # 提取视频链接 + video_links = re.findall( + r'https?://(?:www\.)?(?:youtube\.com/watch\?v=|youtu\.be/|bilibili\.com/video/|douyin\.com/video/)[^\s\)]+', + content) + + if not video_links: + app_logger.error("未在markdown文件中找到视频链接") + return [] + + app_logger.info(f"找到 {len(video_links)} 个视频链接,开始处理...\n") + + # 处理每个视频链接 + urls = [] + for i, url in enumerate(video_links, 1): + print(f"处理第 {i}/{len(video_links)} 个视频: {url}\n") + # append to urls + urls.append(url) + + seen = set() + return [url for url in urls if not (url in seen or seen.add(url))] + + except Exception as e: + app_logger.error(f"处理markdown文件时出错: {str(e)}") + return [] \ No newline at end of file diff --git a/src/video/__init__.py b/src/video/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/video/audio.py b/src/video/audio.py new file mode 100644 index 0000000..55b3130 --- /dev/null +++ b/src/video/audio.py @@ -0,0 +1,30 @@ +import os +from moviepy.editor import VideoFileClip + +from src.logger import app_logger + +video_extensions = ['.mp4', '.avi', '.mov', '.mkv'] +audio_extensions = ['.mp3', '.wav', '.aac'] + +# Check file type +def _check_file_type(file_path): + _, ext = os.path.splitext(file_path) + if ext.lower() in video_extensions: + return 'video' + elif ext.lower() in audio_extensions: + return 'audio' + else: + return 'unknown' + +# Extract audio from video +def extract_audio_to_mp3(file_path, output_path) -> str: + file_type = _check_file_type(file_path) + if file_type == 'audio': + return file_path + app_logger.info(f"🎙 提取视频 {file_path} 到 \n语音 {output_path}") + # 使用 moviepy 提取音频 + video_clip = VideoFileClip(file_path) + audio_clip = video_clip.audio + audio_clip.write_audiofile(output_path, codec='libmp3lame') + audio_clip.close() + video_clip.close() \ No newline at end of file diff --git a/src/video/convert.py b/src/video/convert.py new file mode 100644 index 0000000..8cd7d70 --- /dev/null +++ b/src/video/convert.py @@ -0,0 +1,328 @@ +import datetime +import os +import re +import shutil +import urllib +from tkinter import Listbox +from typing import List, Tuple, Any + +import openai +import whisper + +from src.adapter.openrouter import OpenRouterAdapter +from src.adapter.unsplash import UnsplashAdapter +from src.downloader.downloader import download_video, download_image +from src.logger import app_logger +from src.platform.platfrom import Platform +from src.video.audio import extract_audio_to_mp3 +from src.video.prompt import share_prompt + + +class VideoNoteGenerator: + def __init__(self, output_dir: str = "temp_notes", + openrouter_adapter: OpenRouterAdapter = None, + unsplash_adapter: UnsplashAdapter = None, + ffmpeg_path: str = None): + self.output_dir = output_dir + + self.openrouter_adapter = openrouter_adapter + self.unsplash_adapter = unsplash_adapter + self.ffmpeg_path = ffmpeg_path + self.platform = Platform() + self.whisper_model = None + + async def process_video_full(self, url: str) -> Tuple[str, str, str, str, List[str]]: + app_logger.info('📹 [完整流程]开始处理视频...') + # Create temporary folder + temp_dir = os.path.join(self.output_dir, 'temp') + os.makedirs(temp_dir, exist_ok=True) + + # Determine platform + platform = Platform() + platform.detect(url) + + try: + app_logger.info('⬇️ 正在下载视频...') + result = download_video(platform_type=platform.type, url=url, temp_dir=temp_dir) + if not result: + app_logger.warning(f"⚠️ 视频下载失败,返回为空: {url}") + return '⚠️ 视频下载失败', '', '', '', [] + + audio_path, video_info = result + if not audio_path or not video_info: + app_logger.warning(f"⚠️ 视频下载失败,音轨或视频信息返回为空: {url}") + return '⚠️ 视频下载失败,音轨或视频信息返回为空', '', '', '', [] + + app_logger.info(f"✅ 视频下载成功: {video_info['title']}") + + return await self.process_video_path(audio_path, temp_dir) + finally: + # Clear temporary files + app_logger.info('🗑️ 正在清理临时文件...') + if os.path.exists(temp_dir): + shutil.rmtree(temp_dir) + + async def process_video_organized(self, content: bytes) -> Tuple[str, str, str, str, List[str]]: + try: + content_str = content.decode('utf-8') + if not content_str: + content_str = content.decode('gbk') + rednote_content, images = await self.gen_rednote_version(content_str) + return '✅ 处理成功', '-', content_str, rednote_content, images + except Exception as e: + app_logger.error(f"⚠️ 视频处理失败: {str(e)}") + return '⚠️ 视频处理失败', '', '', '', [] + + async def process_video_path(self, file_path: str, temp_dir: str) -> Tuple[str, str, str, str, List[str]]: + try: + # create temp audio file use file_path last path without extension + audio_path = os.path.join(temp_dir, os.path.basename(file_path).split('.')[0] + '.mp3') + extract_audio_to_mp3(file_path, audio_path) + + # Transcribe audio + app_logger.info('🎙️ 正在转录音频...') + app_logger.info('⚠️ 注意:转录音频可能需要几分钟时间,请耐心等待...') + transcript = self._transcribe_audio(audio_path) + if not transcript: + app_logger.warning(f"⚠️ 音频转录失败,返回为空") + return '⚠️ 音频转录失败', '', '', '', [] + + # Organize long content + app_logger.info('📝 正在整理长文版本...') + organized_content = await self._organize_long_content(transcript) + + return await self.process_video_organized(organized_content.encode('utf-8')) + except Exception as e: + app_logger.error(f"⚠️ 视频处理失败: {str(e)}") + return '⚠️ 视频处理失败', '', '', '', [] + + def load_whisper_model(self): + """加载Whisper模型""" + app_logger.info("Initializing Whisper model...") + self.whisper_model = None + try: + self.whisper_model = whisper.load_model("medium", download_root=".") + app_logger.info("✅ Whisper model loaded successfully") + except Exception as e: + app_logger.warning(f"⚠️ Whisper model loading failed: {str(e)}") + app_logger.info("Will retry loading later...") + + async def gen_rednote_version(self, organized_content: str) -> Tuple[str, List[str]]: + """Generate rednote version""" + app_logger.info('📝 正在整理小红书风格笔记...') + try: + rednote_content, titles, tags, images = await self._convert_to_xiaohongshu(organized_content) + # 全文 + full_content = f"# {titles[0]}\n\n{rednote_content}\n\n---\n" + full_content = full_content + "\n".join([f"#{tag}" for tag in tags]) + return full_content, images + except Exception as e: + app_logger.error(f"❌ 生成小红书风格笔记失败: {e}") + import traceback + print(f"错误详情:\n{traceback.format_exc()}") + return '', [] + + def _transcribe_audio(self, audio_path: str, language: str = 'zh', prompt: str = '以下是一段视频的转录内容。请用流畅的中文输出。') -> str: + """Transcribe audio use Whisper""" + try: + self._ensure_whisper_model() + if not self.whisper_model: + raise Exception("Whisper model not available") + app_logger.info('正在转录音频(这可能需要几分钟)...') + result = self.whisper_model.transcribe( + audio_path, + language=language, + task='transcribe', + best_of=5, + initial_prompt=prompt, + ) + return result['text'].strip() + except Exception as e: + app_logger.error(f"Transcribe audio error: {e}") + return '' + + + def _ensure_whisper_model(self) -> None: + """确保Whisper模型已加载""" + if self.whisper_model is None: + try: + app_logger.info("正在加载Whisper模型...") + self.whisper_model = whisper.load_model("medium") + app_logger.info("✅ Whisper模型加载成功") + except Exception as e: + app_logger.warning(f"⚠️ Whisper模型加载失败: {str(e)}") + + + async def _organize_long_content(self, content: str, duration: int = 0) -> str: + """Use LLM to organize long content""" + if not content.strip(): + return "" + if not self.openrouter_adapter.api_available: + app_logger.error("OpenRouter API not available, can't organize long content") + return content + + content_chunks = self._split_content(content) + organized_chunks = [] + app_logger.info(f"🤖 正在组织长内容(共{len(content_chunks)}个chunk)...") + + for i, chunk in enumerate(content_chunks, 1): + app_logger.info(f"正在处理第 {i}/{len(content_chunks)} 部分...") + organized_chunk = await self.openrouter_adapter.generate( + system_prompt_type='organize_system_prompt', + user_prompt_type='organize_user_prompt', + content=chunk, + ) + organized_chunks.append(organized_chunk) + + return "\n\n".join(organized_chunks) + + def _split_content(self, text: str, max_chars: int = 2000) -> List[str]: + """按段落分割文本,保持上下文的连贯性 + + 特点: + 1. 保持段落完整性:不会在段落中间断开 + 2. 保持句子完整性:确保句子不会被截断 + 3. 添加重叠内容:每个chunk都包含上一个chunk的最后一段 + 4. 智能分割:对于超长段落,按句子分割并保持完整性 + """ + if not text: + return [] + + paragraphs = text.split('\n\n') + chunks = [] + current_chunk = [] + current_length = 0 + last_paragraph = None # 用于存储上一个chunk的最后一段 + + for para in paragraphs: + para = para.strip() + if not para: # 跳过空段落 + continue + + para_length = len(para) + + # 如果这是新chunk的开始,且有上一个chunk的最后一段,添加它作为上下文 + if not current_chunk and last_paragraph: + current_chunk.append(f"上文概要:\n{last_paragraph}\n") + current_length += len(last_paragraph) + 20 # 加上标题的长度 + + # 如果单个段落就超过了最大长度,需要按句子分割 + if para_length > max_chars: + # 如果当前块不为空,先保存 + if current_chunk: + last_paragraph = current_chunk[-1] + chunks.append('\n\n'.join(current_chunk)) + current_chunk = [] + current_length = 0 + if last_paragraph: + current_chunk.append(f"上文概要:\n{last_paragraph}\n") + current_length += len(last_paragraph) + 20 + + # 按句子分割长段落 + sentences = re.split(r'([。!?])', para) + current_sentence = [] + current_sentence_length = 0 + + for i in range(0, len(sentences), 2): + sentence = sentences[i] + # 如果有标点符号,加上标点 + if i + 1 < len(sentences): + sentence += sentences[i + 1] + + # 如果加上这个句子会超过最大长度,保存当前块并开始新块 + if current_sentence_length + len(sentence) > max_chars and current_sentence: + chunks.append(''.join(current_sentence)) + current_sentence = [sentence] + current_sentence_length = len(sentence) + else: + current_sentence.append(sentence) + current_sentence_length += len(sentence) + + # 保存最后一个句子块 + if current_sentence: + chunks.append(''.join(current_sentence)) + else: + # 如果加上这个段落会超过最大长度,保存当前块并开始新块 + if current_length + para_length > max_chars and current_chunk: + last_paragraph = current_chunk[-1] + chunks.append('\n\n'.join(current_chunk)) + current_chunk = [] + current_length = 0 + if last_paragraph: + current_chunk.append(f"上文概要:\n{last_paragraph}\n") + current_length += len(last_paragraph) + 20 + current_chunk.append(para) + current_length += para_length + + # 保存最后一个块 + if current_chunk: + chunks.append('\n\n'.join(current_chunk)) + + return chunks + + + + async def _convert_to_xiaohongshu(self, content: str) -> Tuple[str, List[str], List[str], List[str]]: + """Convert the content into a structured format for xiaohongshu. """ + try: + xiaohongshu_content = await self.openrouter_adapter.generate( + system_prompt_type='rednote_system_prompt', + user_prompt_type='rednote_user_prompt', + content=content + ) + app_logger.info(f"✅ 小红书内容转换成功: {xiaohongshu_content[:50]}...") + # Get title, first line... + content_lines = xiaohongshu_content.split('\n') + titles = [] + for line in content_lines: + line = line.strip() + if line and not line.startswith('#') and ':' not in line and '。' not in line: + titles = [line] + break + if not titles: + app_logger.info("⚠️ 未找到标题,尝试其他方式提取...") + # 尝试其他方式提取标题 + title_match = re.search(r'^[^#\n]+', xiaohongshu_content) + if title_match: + titles = [title_match.group(0).strip()] + if titles: + app_logger.info(f"✅ 提取到标题: {titles[0]}") + else: + app_logger.warning("⚠️ 未能提取到标题") + + # Get Tags, find all tag start with sharp + tags = [] + tag_matches = re.findall(r'#([^\s#]+)', xiaohongshu_content) + if tag_matches: + tags = tag_matches + app_logger.info(f"✅ 提取到{len(tags)}个标签") + else: + app_logger.info("⚠️ 未找到标签") + + # Get Images + images = [] + if not self.unsplash_adapter.unsplash_available: + app_logger.error("Unsplash is not available, cannot get images.") + return xiaohongshu_content, titles, tags, images + search_terms = titles + tags[:2] if tags else titles + search_query = ' '.join(search_terms) + # convert tags to english + app_logger.info(f"🌐 正在翻译: {search_query}...") + search_query = await self.openrouter_adapter.generate( + system_prompt_type='translate_system_prompt', + user_prompt_type=search_query, + content=search_query, + ) + if not search_query: + search_query = ' '.join(search_terms) + app_logger.info(f"🌐 正在搜索图片: {search_query}...") + images = self.unsplash_adapter.get_images(query=search_query) + if images: + app_logger.info(f"✅ 提取到{len(images)}张图片") + else: + app_logger.warning("⚠️ 未找到图片") + return xiaohongshu_content, titles, tags, images + except Exception as e: + app_logger.error(f"❌ 小红书内容转换失败: {str(e)}") + return content, [], [], [] + diff --git a/src/video/prompt.py b/src/video/prompt.py new file mode 100644 index 0000000..0deb824 --- /dev/null +++ b/src/video/prompt.py @@ -0,0 +1,122 @@ +from typing import Optional + + +def share_prompt(prompt_type: str, content: str) -> Optional[str]: + if prompt_type == "organize_system_prompt": + return """你是一位著名的科普作家和博客作者,著作等身,屡获殊荣,尤其在内容创作领域有深厚的造诣。 + +请使用 4C 模型(建立联系 Connection、展示冲突 Conflict、强调改变 Change、即时收获 Catch)为转录的文字内容创建结构。 + +写作要求: +- 从用户的问题出发,引导读者理解核心概念及其背景 +- 使用第二人称与读者对话,语气亲切平实 +- 确保所有观点和内容基于用户提供的转录文本 +- 如无具体实例,则不编造 +- 涉及复杂逻辑时,使用直观类比 +- 避免内容重复冗余 +- 逻辑递进清晰,从问题开始,逐步深入 + +Markdown格式要求: +- 大标题突出主题,吸引眼球,最好使用疑问句 +- 小标题简洁有力,结构清晰,尽量使用单词或短语 +- 直入主题,在第一部分清晰阐述问题和需求 +- 正文使用自然段,避免使用列表形式 +- 内容翔实,避免过度简略,特别注意保留原文中的数据和示例信息 +- 如有来源URL,使用文内链接形式 +- 保留原文中的Markdown格式图片链接""" + + elif prompt_type == "organize_user_prompt": + return f"""请根据以下转录文字内容,创作一篇结构清晰、易于理解的博客文章。 + +转录文字内容: + +{content}""" + + elif prompt_type == "rednote_system_prompt": + return """你是一位专业的小红书爆款文案写作大师,擅长将普通内容转换为刷屏级爆款笔记。 +请将输入的内容转换为小红书风格的笔记,需要满足以下要求: + +1. 标题创作(重要!!): +- 二极管标题法: + * 追求快乐:产品/方法 + 只需N秒 + 逆天效果 + * 逃避痛苦:不采取行动 + 巨大损失 + 紧迫感 +- 爆款关键词(必选1-2个): + * 高转化词:好用到哭、宝藏、神器、压箱底、隐藏干货、高级感 + * 情感词:绝绝子、破防了、治愈、万万没想到、爆款、永远可以相信 + * 身份词:小白必看、手残党必备、打工人、普通女生 + * 程度词:疯狂点赞、超有料、无敌、一百分、良心推荐 +- 标题规则: + * 字数:15个汉字以内 + * emoji:2-4个相关表情 + * 标点:感叹号、省略号增强表达 + * 风格:口语化、制造悬念 + +2. 正文创作: +**不要长篇大论,全文在500个汉字以内!!!** +- 开篇设置(抓住痛点): + * 共情开场:描述读者痛点 + * 悬念引导:埋下解决方案的伏笔 + * 场景还原:具体描述场景 + * **100个汉字以内** +- 内容结构: + * 每段开头用emoji引导 + * 重点内容加粗突出 + * 适当空行增加可读性 + * 步骤说明要清晰 + * **每个段落控制在50个汉字以内,20-30个为适中。** +- 写作风格: + * 热情亲切的语气 + * 大量使用口语化表达 + * 插入互动性问句 + * 加入个人经验分享 +- 高级技巧: + * 使用平台热梗 + * 加入流行口头禅 + * 设置悬念和爆点 + * 情感共鸣描写 + * 不要使用“首先、其次、接下来”等列举结构 + +3. 标签优化: +- 提取4类标签(每类1-2个): + * 核心关键词:主题相关 + * 关联关键词:长尾词 + * 高转化词:购买意向强 + * 热搜词:行业热点 + +4. 整体要求: +- 内容体量:根据内容自动调整 +- 结构清晰:善用分点和空行 +- 情感真实:避免过度营销 +- 互动引导:设置互动机会 +- AI友好:避免机器味 + +注意:创作时要始终记住,标题决定打开率,内容决定完播率,互动决定涨粉率! +""" + + elif prompt_type == "rednote_user_prompt": + return f"""请将以下内容转换为爆款小红书笔记。 + +内容如下: +{content} + +请按照以下格式返回: +1. 第一行:爆款标题(遵循二极管标题法,必须有emoji) +2. 空一行 +3. 正文内容(注意结构、风格、技巧的运用,控制在600-800字之间) +4. 空一行 +5. 标签列表(每类标签都要有,用#号开头) + +创作要求: +1. 标题要让人忍不住点进来看 +2. 内容要有干货,但表达要轻松 +3. 每段都要用emoji装饰 +4. 标签要覆盖核心词、关联词、转化词、热搜词 +5. 设置2-3处互动引导 +6. 通篇要有感情和温度 +7. 正文控制在600-800字之间 +""" + + elif prompt_type == "translate_system_prompt": + return "你是一个翻译助手。请将输入的中文关键词翻译成最相关的1-3个英文关键词,用逗号分隔。直接返回翻译结果,不要加任何解释。例如:\n输入:'保险理财知识'\n输出:insurance,finance,investment" + else: + return None diff --git a/video_note_generator.py b/video_note_generator.py index 106f3fb..d98a068 100644 --- a/video_note_generator.py +++ b/video_note_generator.py @@ -1,1090 +1,144 @@ +# main import os -import sys -import json -import time import shutil -import re -import subprocess -from typing import Dict, List, Optional, Tuple -import datetime -from pathlib import Path -import random -from itertools import zip_longest +from typing import Tuple -import yt_dlp -import httpx -from unsplash.api import Api as UnsplashApi -from unsplash.auth import Auth as UnsplashAuth -from dotenv import load_dotenv -from bs4 import BeautifulSoup -import whisper -import openai -import argparse +from src.adapter.ffmpeg import FfmpegAdapter +from src.adapter.openrouter import OpenRouterAdapter +from src.adapter.unsplash import UnsplashAdapter +from src.environment.env import Environment +from src.logger import app_logger +from src.setting.setting import global_setting +from src.video.convert import VideoNoteGenerator -# 加载环境变量 -load_dotenv() -# 检查必要的环境变量 -required_env_vars = { - 'OPENROUTER_API_KEY': '用于OpenRouter API', - 'OPENROUTER_API_URL': '用于OpenRouter API', - 'OPENROUTER_APP_NAME': '用于OpenRouter API', - 'OPENROUTER_HTTP_REFERER': '用于OpenRouter API', - 'UNSPLASH_ACCESS_KEY': '用于图片搜索', - 'UNSPLASH_SECRET_KEY': '用于Unsplash认证', - 'UNSPLASH_REDIRECT_URI': '用于Unsplash回调' -} - -missing_env_vars = [] -for var, desc in required_env_vars.items(): - if not os.getenv(var): - missing_env_vars.append(f" - {var} ({desc})") - -if missing_env_vars: - print("注意:以下环境变量未设置:") - print("\n".join(missing_env_vars)) - print("\n将使用基本功能继续运行(无AI优化和图片)。") - print("如需完整功能,请在 .env 文件中设置相应的 API 密钥。") - print("继续处理...\n") - -# 配置代理 -http_proxy = os.getenv('HTTP_PROXY') -https_proxy = os.getenv('HTTPS_PROXY') -proxies = { - 'http': http_proxy, - 'https': https_proxy -} if http_proxy and https_proxy else None - -# 禁用 SSL 验证(仅用于开发环境) -import ssl -ssl._create_default_https_context = ssl._create_unverified_context - -# OpenRouter configuration -openrouter_api_key = os.getenv('OPENROUTER_API_KEY') -openrouter_app_name = os.getenv('OPENROUTER_APP_NAME', 'video_note_generator') -openrouter_http_referer = os.getenv('OPENROUTER_HTTP_REFERER', 'https://github.com') -openrouter_available = False - -# 配置 OpenAI API -client = openai.OpenAI( - api_key=openrouter_api_key, - base_url="https://openrouter.ai/api/v1", - default_headers={ - "HTTP-Referer": openrouter_http_referer, - "X-Title": openrouter_app_name, - } -) - -# 选择要使用的模型 -AI_MODEL = "google/gemini-pro" # 使用 Gemini Pro 模型 - -# Test OpenRouter connection -if openrouter_api_key: +async def generate_video_note(model_name, parse_type, input_content) -> Tuple[str, str, str, str, list] or None: try: - print(f"正在测试 OpenRouter API 连接...") - response = client.models.list() # 使用更简单的API调用来测试连接 - print("✅ OpenRouter API 连接测试成功") - openrouter_available = True - except Exception as e: - print(f"⚠️ OpenRouter API 连接测试失败: {str(e)}") - print("将继续尝试使用API,但可能会遇到问题") + env = Environment(model_name) + Environment.config_proxy() -# 检查Unsplash配置 -unsplash_access_key = os.getenv('UNSPLASH_ACCESS_KEY') -unsplash_client = None + # Init open router adapter + openrouter_adapter = OpenRouterAdapter(env) + openrouter_adapter.connect() -if unsplash_access_key: - try: - auth = UnsplashAuth( - client_id=unsplash_access_key, - client_secret=None, - redirect_uri=None - ) - unsplash_client = UnsplashApi(auth) - print("✅ Unsplash API 配置成功") - except Exception as e: - print(f"❌ Failed to initialize Unsplash client: {str(e)}") - -# 检查ffmpeg -ffmpeg_path = None -try: - subprocess.run(["/opt/homebrew/bin/ffmpeg", "-version"], - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - print("✅ ffmpeg is available at /opt/homebrew/bin/ffmpeg") - ffmpeg_path = "/opt/homebrew/bin/ffmpeg" -except Exception: - try: - subprocess.run(["ffmpeg", "-version"], - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - print("✅ ffmpeg is available (from PATH)") - ffmpeg_path = "ffmpeg" - except Exception as e: - print(f"⚠️ ffmpeg not found: {str(e)}") + # Init adapter + ffmpeg_adapter = FfmpegAdapter() + unsplash_adapter = UnsplashAdapter(env=env) -class DownloadError(Exception): - """自定义下载错误类""" - def __init__(self, message: str, platform: str, error_type: str, details: str = None): - self.message = message - self.platform = platform - self.error_type = error_type - self.details = details - super().__init__(self.message) - -class VideoNoteGenerator: - def __init__(self, output_dir: str = "temp_notes"): - self.output_dir = output_dir + generator = None + output_dir = global_setting['output_dir'] os.makedirs(output_dir, exist_ok=True) - - self.openrouter_available = openrouter_available - self.unsplash_client = unsplash_client - self.ffmpeg_path = ffmpeg_path - - # 初始化whisper模型 - print("正在加载Whisper模型...") - self.whisper_model = None - try: - self.whisper_model = whisper.load_model("medium") - print("✅ Whisper模型加载成功") - except Exception as e: - print(f"⚠️ Whisper模型加载失败: {str(e)}") - print("将在需要时重试加载") - - # 日志目录 - self.log_dir = os.path.join(self.output_dir, 'logs') - os.makedirs(self.log_dir, exist_ok=True) - - # cookie目录 - self.cookie_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'cookies') - os.makedirs(self.cookie_dir, exist_ok=True) - - # 平台cookie文件 - self.platform_cookies = { - 'douyin': os.path.join(self.cookie_dir, 'douyin_cookies.txt'), - 'bilibili': os.path.join(self.cookie_dir, 'bilibili_cookies.txt'), - 'youtube': os.path.join(self.cookie_dir, 'youtube_cookies.txt') - } - - def _ensure_whisper_model(self) -> None: - """确保Whisper模型已加载""" - if self.whisper_model is None: - try: - print("正在加载Whisper模型...") - self.whisper_model = whisper.load_model("medium") - print("✅ Whisper模型加载成功") - except Exception as e: - print(f"⚠️ Whisper模型加载失败: {str(e)}") - - def _determine_platform(self, url: str) -> Optional[str]: - """ - 确定视频平台 - - Args: - url: 视频URL - - Returns: - str: 平台名称 ('youtube', 'douyin', 'bilibili') 或 None - """ - if 'youtube.com' in url or 'youtu.be' in url: - return 'youtube' - elif 'douyin.com' in url: - return 'douyin' - elif 'bilibili.com' in url: - return 'bilibili' - return None - - def _handle_download_error(self, error: Exception, platform: str, url: str) -> str: - """ - 处理下载错误并返回用户友好的错误消息 - - Args: - error: 异常对象 - platform: 平台名称 - url: 视频URL - - Returns: - str: 用户友好的错误消息 - """ - error_msg = str(error) - - if "SSL" in error_msg: - return "⚠️ SSL证书验证失败,请检查网络连接" - elif "cookies" in error_msg.lower(): - return f"⚠️ {platform}访问被拒绝,可能需要更新cookie或更换IP地址" - elif "404" in error_msg: - return "⚠️ 视频不存在或已被删除" - elif "403" in error_msg: - return "⚠️ 访问被拒绝,可能需要登录或更换IP地址" - elif "unavailable" in error_msg.lower(): - return "⚠️ 视频当前不可用,可能是地区限制或版权问题" - else: - return f"⚠️ 下载失败: {error_msg}" - - def _get_platform_options(self, platform: str) -> Dict: - """获取平台特定的下载选项""" - # 基本选项 - options = { - 'format': 'bestvideo[ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]/best', - 'outtmpl': '%(title)s.%(ext)s' - } - - if platform in self.platform_cookies and os.path.exists(self.platform_cookies[platform]): - options['cookiefile'] = self.platform_cookies[platform] - - return options - - def _validate_cookies(self, platform: str) -> bool: - """验证cookie是否有效""" - if platform not in self.platform_cookies: - return False - - cookie_file = self.platform_cookies[platform] - return os.path.exists(cookie_file) - - def _get_alternative_download_method(self, platform: str, url: str) -> Optional[str]: - """获取备用下载方法""" - if platform == 'youtube': - return 'pytube' - elif platform == 'douyin': - return 'requests' - elif platform == 'bilibili': - return 'you-get' - return None - - def _download_with_alternative_method(self, platform: str, url: str, temp_dir: str, method: str) -> Optional[str]: - """使用备用方法下载""" - try: - if method == 'you-get': - cmd = ['you-get', '--no-proxy', '--no-check-certificate', '-o', temp_dir, url] - result = subprocess.run(cmd, capture_output=True, text=True) - if result.returncode == 0: - # 查找下载的文件 - files = [f for f in os.listdir(temp_dir) if f.endswith(('.mp4', '.flv', '.webm'))] - if files: - return os.path.join(temp_dir, files[0]) - raise Exception(result.stderr) - - elif method == 'requests': - # 使用requests直接下载 - headers = { - 'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 13_2_3 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/13.0.3 Mobile/15E148 Safari/604.1', - 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', - 'Accept-Language': 'en-US,en;q=0.5', - 'Connection': 'keep-alive', - 'Upgrade-Insecure-Requests': '1' - } - - # 首先获取页面内容 - response = httpx.get(url, headers=headers, verify=False) - - if response.status_code == 200: - # 尝试从页面中提取视频URL - from bs4 import BeautifulSoup - soup = BeautifulSoup(response.text, 'html.parser') - - video_url = None - # 查找video标签 - video_tags = soup.find_all('video') - for video in video_tags: - src = video.get('src') or video.get('data-src') - if src: - video_url = src - break - - if not video_url: - # 尝试查找其他可能包含视频URL的元素 - import re - video_patterns = [ - r'https?://[^"\'\s]+\.(?:mp4|m3u8)[^"\'\s]*', - r'playAddr":"([^"]+)"', - r'play_url":"([^"]+)"' - ] - for pattern in video_patterns: - matches = re.findall(pattern, response.text) - if matches: - video_url = matches[0] - break - - if video_url: - if not video_url.startswith('http'): - video_url = 'https:' + video_url if video_url.startswith('//') else video_url - - # 下载视频 - video_response = httpx.get(video_url, headers=headers, stream=True, verify=False) - if video_response.status_code == 200: - file_path = os.path.join(temp_dir, 'video.mp4') - with open(file_path, 'wb') as f: - for chunk in video_response.iter_content(chunk_size=8192): - if chunk: - f.write(chunk) - return file_path - - raise Exception(f"无法下载视频: HTTP {video_response.status_code}") - raise Exception(f"无法访问页面: HTTP {response.status_code}") - - elif method == 'pytube': - # 禁用SSL验证 - import ssl - ssl._create_default_https_context = ssl._create_unverified_context - - from pytube import YouTube - yt = YouTube(url) - # 获取最高质量的MP4格式视频 - video = yt.streams.filter(progressive=True, file_extension='mp4').order_by('resolution').desc().first() - if video: - return video.download(output_path=temp_dir) - raise Exception("未找到合适的视频流") - - except Exception as e: - print(f"备用下载方法 {method} 失败: {str(e)}") - return None - - def _download_video(self, url: str, temp_dir: str) -> Tuple[Optional[str], Optional[Dict[str, str]]]: - """下载视频并返回音频文件路径和信息""" - try: - platform = self._determine_platform(url) - if not platform: - raise DownloadError("不支持的视频平台", "unknown", "platform_error") - - # 基本下载选项 - options = { - 'format': 'bestaudio/best', - 'outtmpl': os.path.join(temp_dir, '%(title)s.%(ext)s'), - 'postprocessors': [{ - 'key': 'FFmpegExtractAudio', - 'preferredcodec': 'mp3', - }], - 'quiet': True, - 'no_warnings': True, - } - - # 下载视频 - for attempt in range(3): # 最多重试3次 - try: - with yt_dlp.YoutubeDL(options) as ydl: - print(f"正在尝试下载(第{attempt + 1}次)...") - info = ydl.extract_info(url, download=True) - if not info: - raise DownloadError("无法获取视频信息", platform, "info_error") - - # 找到下载的音频文件 - downloaded_files = [f for f in os.listdir(temp_dir) if f.endswith('.mp3')] - if not downloaded_files: - raise DownloadError("未找到下载的音频文件", platform, "file_error") - - audio_path = os.path.join(temp_dir, downloaded_files[0]) - if not os.path.exists(audio_path): - raise DownloadError("音频文件不存在", platform, "file_error") - - video_info = { - 'title': info.get('title', '未知标题'), - 'uploader': info.get('uploader', '未知作者'), - 'description': info.get('description', ''), - 'duration': info.get('duration', 0), - 'platform': platform - } - - print(f"✅ {platform}视频下载成功") - return audio_path, video_info - - except Exception as e: - print(f"⚠️ 下载失败(第{attempt + 1}次): {str(e)}") - if attempt < 2: # 如果不是最后一次尝试 - print("等待5秒后重试...") - time.sleep(5) - else: - raise # 最后一次失败,抛出异常 - - except Exception as e: - error_msg = self._handle_download_error(e, platform, url) - print(f"⚠️ {error_msg}") - return None, None - - def _transcribe_audio(self, audio_path: str) -> str: - """使用Whisper转录音频""" - try: - self._ensure_whisper_model() - if not self.whisper_model: - raise Exception("Whisper模型未加载") - - print("正在转录音频(这可能需要几分钟)...") - result = self.whisper_model.transcribe( - audio_path, - language='zh', # 指定中文 - task='transcribe', - best_of=5, - initial_prompt="以下是一段视频的转录内容。请用流畅的中文输出。" # 添加中文提示 - ) - return result["text"].strip() - - except Exception as e: - print(f"⚠️ 音频转录失败: {str(e)}") - return "" - def _organize_content(self, content: str) -> str: - """使用AI整理内容""" - try: - if not self.openrouter_available: - print("⚠️ OpenRouter API 未配置,将返回原始内容") - return content - - # 构建系统提示词 - system_prompt = """你是一位著名的科普作家和博客作者,著作等身,屡获殊荣,尤其在内容创作领域有深厚的造诣。 - -请使用 4C 模型(建立联系 Connection、展示冲突 Conflict、强调改变 Change、即时收获 Catch)为转录的文字内容创建结构。 - -写作要求: -- 从用户的问题出发,引导读者理解核心概念及其背景 -- 使用第二人称与读者对话,语气亲切平实 -- 确保所有观点和内容基于用户提供的转录文本 -- 如无具体实例,则不编造 -- 涉及复杂逻辑时,使用直观类比 -- 避免内容重复冗余 -- 逻辑递进清晰,从问题开始,逐步深入 - -Markdown格式要求: -- 大标题突出主题,吸引眼球,最好使用疑问句 -- 小标题简洁有力,结构清晰,尽量使用单词或短语 -- 直入主题,在第一部分清晰阐述问题和需求 -- 正文使用自然段,避免使用列表形式 -- 内容翔实,避免过度简略,特别注意保留原文中的数据和示例信息 -- 如有来源URL,使用文内链接形式 -- 保留原文中的Markdown格式图片链接""" - - # 构建用户提示词 - final_prompt = f"""请根据以下转录文字内容,创作一篇结构清晰、易于理解的博客文章。 - -转录文字内容: - -{content}""" - - # 调用API - response = client.chat.completions.create( - model=AI_MODEL, - messages=[ - {"role": "system", "content": system_prompt}, - {"role": "user", "content": final_prompt} - ], - temperature=0.7, - max_tokens=4000 - ) - - if response.choices: - return response.choices[0].message.content.strip() - - return content - - except Exception as e: - print(f"⚠️ 内容整理失败: {str(e)}") - return content - - def split_content(self, text: str, max_chars: int = 2000) -> List[str]: - """按段落分割文本,保持上下文的连贯性 - - 特点: - 1. 保持段落完整性:不会在段落中间断开 - 2. 保持句子完整性:确保句子不会被截断 - 3. 添加重叠内容:每个chunk都包含上一个chunk的最后一段 - 4. 智能分割:对于超长段落,按句子分割并保持完整性 - """ - if not text: - return [] - - paragraphs = text.split('\n\n') - chunks = [] - current_chunk = [] - current_length = 0 - last_paragraph = None # 用于存储上一个chunk的最后一段 - - for para in paragraphs: - para = para.strip() - if not para: # 跳过空段落 - continue - - para_length = len(para) - - # 如果这是新chunk的开始,且有上一个chunk的最后一段,添加它作为上下文 - if not current_chunk and last_paragraph: - current_chunk.append(f"上文概要:\n{last_paragraph}\n") - current_length += len(last_paragraph) + 20 # 加上标题的长度 - - # 如果单个段落就超过了最大长度,需要按句子分割 - if para_length > max_chars: - # 如果当前块不为空,先保存 - if current_chunk: - last_paragraph = current_chunk[-1] - chunks.append('\n\n'.join(current_chunk)) - current_chunk = [] - current_length = 0 - if last_paragraph: - current_chunk.append(f"上文概要:\n{last_paragraph}\n") - current_length += len(last_paragraph) + 20 - - # 按句子分割长段落 - sentences = re.split(r'([。!?])', para) - current_sentence = [] - current_sentence_length = 0 - - for i in range(0, len(sentences), 2): - sentence = sentences[i] - # 如果有标点符号,加上标点 - if i + 1 < len(sentences): - sentence += sentences[i + 1] - - # 如果加上这个句子会超过最大长度,保存当前块并开始新块 - if current_sentence_length + len(sentence) > max_chars and current_sentence: - chunks.append(''.join(current_sentence)) - current_sentence = [sentence] - current_sentence_length = len(sentence) - else: - current_sentence.append(sentence) - current_sentence_length += len(sentence) - - # 保存最后一个句子块 - if current_sentence: - chunks.append(''.join(current_sentence)) - else: - # 如果加上这个段落会超过最大长度,保存当前块并开始新块 - if current_length + para_length > max_chars and current_chunk: - last_paragraph = current_chunk[-1] - chunks.append('\n\n'.join(current_chunk)) - current_chunk = [] - current_length = 0 - if last_paragraph: - current_chunk.append(f"上文概要:\n{last_paragraph}\n") - current_length += len(last_paragraph) + 20 - current_chunk.append(para) - current_length += para_length - - # 保存最后一个块 - if current_chunk: - chunks.append('\n\n'.join(current_chunk)) - - return chunks - - def _organize_long_content(self, content: str, duration: int = 0) -> str: - """使用AI整理长文内容""" - if not content.strip(): - return "" - - if not self.openrouter_available: - print("⚠️ OpenRouter API 不可用,将返回原始内容") - return content - - content_chunks = self.split_content(content) - organized_chunks = [] - - print(f"内容将分为 {len(content_chunks)} 个部分进行处理...") - - for i, chunk in enumerate(content_chunks, 1): - print(f"正在处理第 {i}/{len(content_chunks)} 部分...") - organized_chunk = self._organize_content(chunk) - organized_chunks.append(organized_chunk) - - return "\n\n".join(organized_chunks) - - def convert_to_xiaohongshu(self, content: str) -> Tuple[str, List[str], List[str], List[str]]: - """将博客文章转换为小红书风格的笔记,并生成标题和标签""" - try: - if not self.openrouter_available: - print("⚠️ OpenRouter API 未配置,将返回原始内容") - return content, [], [], [] - - # 构建系统提示词 - system_prompt = """你是一位专业的小红书爆款文案写作大师,擅长将普通内容转换为刷屏级爆款笔记。 -请将输入的内容转换为小红书风格的笔记,需要满足以下要求: - -1. 标题创作(重要‼️): -- 二极管标题法: - * 追求快乐:产品/方法 + 只需N秒 + 逆天效果 - * 逃避痛苦:不采取行动 + 巨大损失 + 紧迫感 -- 爆款关键词(必选1-2个): - * 高转化词:好用到哭、宝藏、神器、压箱底、隐藏干货、高级感 - * 情感词:绝绝子、破防了、治愈、万万没想到、爆款、永远可以相信 - * 身份词:小白必看、手残党必备、打工人、普通女生 - * 程度词:疯狂点赞、超有料、无敌、一百分、良心推荐 -- 标题规则: - * 字数:20字以内 - * emoji:2-4个相关表情 - * 标点:感叹号、省略号增强表达 - * 风格:口语化、制造悬念 - -2. 正文创作: -- 开篇设置(抓住痛点): - * 共情开场:描述读者痛点 - * 悬念引导:埋下解决方案的伏笔 - * 场景还原:具体描述场景 -- 内容结构: - * 每段开头用emoji引导 - * 重点内容加粗突出 - * 适当空行增加可读性 - * 步骤说明要清晰 -- 写作风格: - * 热情亲切的语气 - * 大量使用口语化表达 - * 插入互动性问句 - * 加入个人经验分享 -- 高级技巧: - * 使用平台热梗 - * 加入流行口头禅 - * 设置悬念和爆点 - * 情感共鸣描写 - -3. 标签优化: -- 提取4类标签(每类1-2个): - * 核心关键词:主题相关 - * 关联关键词:长尾词 - * 高转化词:购买意向强 - * 热搜词:行业热点 - -4. 整体要求: -- 内容体量:根据内容自动调整 -- 结构清晰:善用分点和空行 -- 情感真实:避免过度营销 -- 互动引导:设置互动机会 -- AI友好:避免机器味 - -注意:创作时要始终记住,标题决定打开率,内容决定完播率,互动决定涨粉率!""" - - # 构建用户提示词 - user_prompt = f"""请将以下内容转换为爆款小红书笔记。 - -内容如下: -{content} - -请按照以下格式返回: -1. 第一行:爆款标题(遵循二极管标题法,必须有emoji) -2. 空一行 -3. 正文内容(注意结构、风格、技巧的运用,控制在600-800字之间) -4. 空一行 -5. 标签列表(每类标签都要有,用#号开头) - -创作要求: -1. 标题要让人忍不住点进来看 -2. 内容要有干货,但表达要轻松 -3. 每段都要用emoji装饰 -4. 标签要覆盖核心词、关联词、转化词、热搜词 -5. 设置2-3处互动引导 -6. 通篇要有感情和温度 -7. 正文控制在600-800字之间 - -""" - - # 调用API - response = client.chat.completions.create( - model=AI_MODEL, - messages=[ - {"role": "system", "content": system_prompt}, - {"role": "user", "content": user_prompt} - ], - temperature=0.7, - max_tokens=2000 - ) - - if not response.choices: - raise Exception("API 返回结果为空") - - # 处理返回的内容 - xiaohongshu_content = response.choices[0].message.content.strip() - print(f"\n📝 API返回内容:\n{xiaohongshu_content}\n") - - # 提取标题(第一行) - content_lines = xiaohongshu_content.split('\n') - titles = [] - for line in content_lines: - line = line.strip() - if line and not line.startswith('#') and ':' not in line and '。' not in line: - titles = [line] - break - - if not titles: - print("⚠️ 未找到标题,尝试其他方式提取...") - # 尝试其他方式提取标题 - title_match = re.search(r'^[^#\n]+', xiaohongshu_content) - if title_match: - titles = [title_match.group(0).strip()] - - if titles: - print(f"✅ 提取到标题: {titles[0]}") - else: - print("⚠️ 未能提取到标题") - - # 提取标签(查找所有#开头的标签) - tags = [] - tag_matches = re.findall(r'#([^\s#]+)', xiaohongshu_content) - if tag_matches: - tags = tag_matches - print(f"✅ 提取到{len(tags)}个标签") - else: - print("⚠️ 未找到标签") - - # 获取相关图片 - images = [] - if self.unsplash_client: - # 使用标题和标签作为搜索关键词 - search_terms = titles + tags[:2] if tags else titles - search_query = ' '.join(search_terms) - try: - images = self._get_unsplash_images(search_query, count=4) - if images: - print(f"✅ 成功获取{len(images)}张配图") - else: - print("⚠️ 未找到相关配图") - except Exception as e: - print(f"⚠️ 获取配图失败: {str(e)}") - - return xiaohongshu_content, titles, tags, images - - except Exception as e: - print(f"⚠️ 转换小红书笔记失败: {str(e)}") - return content, [], [], [] - - def _get_unsplash_images(self, query: str, count: int = 3) -> List[str]: - """从Unsplash获取相关图片""" - if not self.unsplash_client: - print("⚠️ Unsplash客户端未初始化") - return [] - - try: - # 将查询词翻译成英文以获得更好的结果 - if self.openrouter_available: - try: - response = client.chat.completions.create( - model=AI_MODEL, - messages=[ - {"role": "system", "content": "你是一个翻译助手。请将输入的中文关键词翻译成最相关的1-3个英文关键词,用逗号分隔。直接返回翻译结果,不要加任何解释。例如:\n输入:'保险理财知识'\n输出:insurance,finance,investment"}, - {"role": "user", "content": query} - ], - temperature=0.3, - max_tokens=50 - ) - if response.choices: - query = response.choices[0].message.content.strip() - except Exception as e: - print(f"⚠️ 翻译关键词失败: {str(e)}") - - # 使用httpx直接调用Unsplash API - headers = { - 'Authorization': f'Client-ID {os.getenv("UNSPLASH_ACCESS_KEY")}' - } - - # 对每个关键词分别搜索 - all_photos = [] - for keyword in query.split(','): - response = httpx.get( - 'https://api.unsplash.com/search/photos', - params={ - 'query': keyword.strip(), - 'per_page': count, - 'orientation': 'portrait', # 小红书偏好竖版图片 - 'content_filter': 'high' # 只返回高质量图片 - }, - headers=headers, - verify=False # 禁用SSL验证 - ) - - if response.status_code == 200: - data = response.json() - if data['results']: - # 获取图片URL,优先使用regular尺寸 - photos = [photo['urls'].get('regular', photo['urls']['small']) - for photo in data['results']] - all_photos.extend(photos) - - # 如果收集到的图片不够,用最后一个关键词继续搜索 - while len(all_photos) < count and query: - response = httpx.get( - 'https://api.unsplash.com/search/photos', - params={ - 'query': query.split(',')[-1].strip(), - 'per_page': count - len(all_photos), - 'orientation': 'portrait', - 'content_filter': 'high', - 'page': 2 # 获取下一页的结果 - }, - headers=headers, - verify=False - ) - - if response.status_code == 200: - data = response.json() - if data['results']: - photos = [photo['urls'].get('regular', photo['urls']['small']) - for photo in data['results']] - all_photos.extend(photos) - else: - break - else: - break - - # 返回指定数量的图片 - return all_photos[:count] - - except Exception as e: - print(f"⚠️ 获取图片失败: {str(e)}") - return [] - - def process_video(self, url: str) -> List[str]: - """处理视频链接,生成笔记 - - Args: - url (str): 视频链接 - - Returns: - List[str]: 生成的笔记文件路径列表 - """ - print("\n📹 正在处理视频...") - - # 创建临时目录 - temp_dir = os.path.join(self.output_dir, 'temp') - os.makedirs(temp_dir, exist_ok=True) - - try: - # 下载视频 - print("⬇️ 正在下载视频...") - result = self._download_video(url, temp_dir) - if not result: - return [] - - audio_path, video_info = result - if not audio_path or not video_info: - return [] - - print(f"✅ 视频下载成功: {video_info['title']}") - - # 转录音频 - print("\n🎙️ 正在转录音频...") - print("正在转录音频(这可能需要几分钟)...") - transcript = self._transcribe_audio(audio_path) - if not transcript: - return [] - - # 保存原始转录内容 - timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") - original_file = os.path.join(self.output_dir, f"{timestamp}_original.md") - with open(original_file, 'w', encoding='utf-8') as f: - f.write(f"# {video_info['title']}\n\n") - f.write(f"## 视频信息\n") - f.write(f"- 作者:{video_info['uploader']}\n") - f.write(f"- 时长:{video_info['duration']}秒\n") - f.write(f"- 平台:{video_info['platform']}\n") - f.write(f"- 链接:{url}\n\n") - f.write(f"## 原始转录内容\n\n") - f.write(transcript) + # generate video note + generator = VideoNoteGenerator( + output_dir=output_dir, + openrouter_adapter=openrouter_adapter, + unsplash_adapter=unsplash_adapter, + ffmpeg_path=ffmpeg_adapter.ffmpeg_path, + ) - # 整理长文版本 - print("\n📝 正在整理长文版本...") - organized_content = self._organize_long_content(transcript, video_info['duration']) - organized_file = os.path.join(self.output_dir, f"{timestamp}_organized.md") - with open(organized_file, 'w', encoding='utf-8') as f: - f.write(f"# {video_info['title']} - 整理版\n\n") - f.write(f"## 视频信息\n") - f.write(f"- 作者:{video_info['uploader']}\n") - f.write(f"- 时长:{video_info['duration']}秒\n") - f.write(f"- 平台:{video_info['platform']}\n") - f.write(f"- 链接:{url}\n\n") - f.write(f"## 内容整理\n\n") - f.write(organized_content) - - # 生成小红书版本 - print("\n📱 正在生成小红书版本...") - try: - xiaohongshu_content, titles, tags, images = self.convert_to_xiaohongshu(organized_content) - - # 保存小红书版本 - xiaohongshu_file = os.path.join(self.output_dir, f"{timestamp}_xiaohongshu.md") - - # 写入文件 - with open(xiaohongshu_file, "w", encoding="utf-8") as f: - # 写入标题 - f.write(f"# {titles[0]}\n\n") - - # 如果有图片,先写入第一张作为封面 - if images: - f.write(f"![封面图]({images[0]})\n\n") - - # 写入正文内容的前半部分 - content_parts = xiaohongshu_content.split('\n\n') - mid_point = len(content_parts) // 2 - - # 写入前半部分 - f.write('\n\n'.join(content_parts[:mid_point])) - f.write('\n\n') - - # 如果有第二张图片,插入到中间 - if len(images) > 1: - f.write(f"![配图]({images[1]})\n\n") - - # 写入后半部分 - f.write('\n\n'.join(content_parts[mid_point:])) - - # 如果有第三张图片,插入到末尾 - if len(images) > 2: - f.write(f"\n\n![配图]({images[2]})") - - # 写入标签 - if tags: - f.write("\n\n---\n") - f.write("\n".join([f"#{tag}" for tag in tags])) - print(f"\n✅ 小红书版本已保存至: {xiaohongshu_file}") - return [original_file, organized_file, xiaohongshu_file] - except Exception as e: - print(f"⚠️ 生成小红书版本失败: {str(e)}") - import traceback - print(f"错误详情:\n{traceback.format_exc()}") - - print(f"\n✅ 笔记已保存至: {original_file}") - print(f"✅ 整理版内容已保存至: {organized_file}") - return [original_file, organized_file] - - except Exception as e: - print(f"⚠️ 处理视频时出错: {str(e)}") - return [] - - finally: - # 清理临时文件 + app_logger.info("📹 正在生成视频笔记...") + + notice = '' + transcript_output_tran = '' + organized_output_tran = '' + xiaohongshu_output_tran = '' + images = [] + # Generate video note + if parse_type == '单URL': + (notice, transcript_output_tran, organized_output_tran, + xiaohongshu_output_tran, images) = await generator.process_video_full(input_content.strip()) + elif parse_type == 'MD文档': + (notice, transcript_output_tran, organized_output_tran, + xiaohongshu_output_tran, images) = await generator.process_video_organized(input_content.strip()) + elif parse_type == '本地视频文件': + video_file_path = input_content + app_logger.info("📹 本地视频文件,正在转换为视频笔记...") + app_logger.info(f"📹 视频路径:{video_file_path}") + # Create temporary folder + temp_dir = os.path.join(generator.output_dir, 'temp') + os.makedirs(temp_dir, exist_ok=True) + (notice, transcript_output_tran, organized_output_tran, + xiaohongshu_output_tran, images) = await generator.process_video_path(video_file_path, temp_dir) if os.path.exists(temp_dir): shutil.rmtree(temp_dir) + return notice, transcript_output_tran, organized_output_tran, xiaohongshu_output_tran, images + except Exception as e: + app_logger.error(f"❌ 生成视频笔记失败:{str(e)}") + return None - def process_markdown_file(self, input_file: str) -> None: - """处理markdown文件,生成优化后的笔记 - - Args: - input_file (str): 输入的markdown文件路径 - """ - try: - # 读取markdown文件 - with open(input_file, 'r', encoding='utf-8') as f: - content = f.read() - - # 提取视频链接 - video_links = re.findall(r'https?://(?:www\.)?(?:youtube\.com/watch\?v=|youtu\.be/|bilibili\.com/video/|douyin\.com/video/)[^\s\)]+', content) - - if not video_links: - print("未在markdown文件中找到视频链接") - return - - print(f"找到 {len(video_links)} 个视频链接,开始处理...\n") - - # 处理每个视频链接 - for i, url in enumerate(video_links, 1): - print(f"处理第 {i}/{len(video_links)} 个视频: {url}\n") - self.process_video(url) - - except Exception as e: - print(f"处理markdown文件时出错: {str(e)}") - raise - -def extract_urls_from_text(text: str) -> list: - """ - 从文本中提取所有有效的URL - 支持的URL格式: - - 视频平台URL (YouTube, Bilibili, 抖音等) - - 包含http://或https://的标准URL - - 短链接URL (如t.co等) - - Args: - text: 输入文本 - - Returns: - list: 提取到的有效URL列表 - """ - # URL正则模式 - url_patterns = [ - # 标准URL - r'https?://[^\s<>\[\]"\']+[^\s<>\[\]"\'.,]', - # 短链接 - r'https?://[a-zA-Z0-9]+\.[a-zA-Z]{2,3}/[^\s<>\[\]"\']+', - # Bilibili - r'BV[a-zA-Z0-9]{10}', - # 抖音分享链接 - r'v\.douyin\.com/[a-zA-Z0-9]+', - ] - - urls = [] - for pattern in url_patterns: - matches = re.finditer(pattern, text, re.IGNORECASE) - for match in matches: - url = match.group() - # 对于不完整的BV号,添加完整的bilibili前缀 - if url.startswith('BV'): - url = f'https://www.bilibili.com/video/{url}' - urls.append(url) - - # 去重并保持顺序 - seen = set() - return [url for url in urls if not (url in seen or seen.add(url))] -if __name__ == '__main__': - import sys, os, re - import argparse - - parser = argparse.ArgumentParser(description='视频笔记生成器') - parser.add_argument('input', help='输入源:视频URL、包含URL的文件或markdown文件') - parser.add_argument('--xiaohongshu', action='store_true', help='生成小红书风格的笔记') - args = parser.parse_args() - - generator = VideoNoteGenerator() - - if os.path.exists(args.input): - # 读取文件内容 - try: - with open(args.input, 'r', encoding='utf-8') as f: - content = f.read() - except UnicodeDecodeError: - try: - # 尝试使用gbk编码 - with open(args.input, 'r', encoding='gbk') as f: - content = f.read() - except Exception as e: - print(f"⚠️ 无法读取文件: {str(e)}") - sys.exit(1) - - # 如果是markdown文件,直接处理 - if args.input.endswith('.md'): - print(f"📝 处理Markdown文件: {args.input}") - generator.process_markdown_file(args.input) - else: - # 从文件内容中提取URL - urls = extract_urls_from_text(content) - - if not urls: - print("⚠️ 未在文件中找到有效的URL") - sys.exit(1) - - print(f"📋 从文件中找到 {len(urls)} 个URL:") - for i, url in enumerate(urls, 1): - print(f" {i}. {url}") - - print("\n开始处理URL...") - for i, url in enumerate(urls, 1): - print(f"\n处理第 {i}/{len(urls)} 个URL: {url}") - try: - generator.process_video(url) - except Exception as e: - print(f"⚠️ 处理URL时出错:{str(e)}") - continue - else: - # 检查是否是有效的URL - if not args.input.startswith(('http://', 'https://')): - print("⚠️ 错误:请输入有效的URL、包含URL的文件或markdown文件路径") - print("\n使用示例:") - print("1. 处理单个视频:") - print(" python video_note_generator.py https://example.com/video") - print("\n2. 处理包含URL的文件:") - print(" python video_note_generator.py urls.txt") - print(" - 文件中的URL可以是任意格式,每行一个或多个") - print(" - 支持带有其他文字的行") - print(" - 支持使用#注释") - print("\n3. 处理Markdown文件:") - print(" python video_note_generator.py notes.md") - sys.exit(1) - - # 处理单个URL - try: - print(f"🎥 处理视频URL: {args.input}") - generator.process_video(args.input) - except Exception as e: - print(f"⚠️ 处理URL时出错:{str(e)}") - sys.exit(1) \ No newline at end of file +# async def main(): +# +# if os.path.exists(args.input): +# try: +# with open(args.input, 'r', encoding='utf-8') as f: +# content = f.read() +# except UnicodeDecodeError: +# try: +# # 尝试使用gbk编码 +# with open(args.input, 'r', encoding='gbk') as f: +# content = f.read() +# except Exception as e: +# app_logger.error(f"❌ 无法读取文件: {str(e)}") +# sys.exit(1) +# +# # if filename contain '_organized' and end with '.md', generate rednote directly +# if '_organized' in args.input and args.input.endswith('.md'): +# app_logger.info("📝 检测到文件名包含 '_organized', 将直接生成小红书笔记") +# timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") +# output_file_path = os.path.join(output_dir, f"{timestamp}_xiaohongshu.md") +# # get organized_content from args.input +# with open(args.input, 'r', encoding='utf-8') as f: +# content = f.read() +# await generator.gen_rednote_version(content, output_file_path) +# app_logger.info(f"📝 小红书笔记已生成:{output_file_path}") +# sys.exit(0) +# +# # if file name contain '.md', get urls from markdown file +# urls = [] +# if args.input.endswith('.md'): +# app_logger.info("📝 检测到文件名为 '.md', 将从markdown文件中获取视频链接") +# urls = process_markdown_file(args.input) +# else: +# urls = extract_urls_from_text(content) +# +# if not urls: +# app_logger.error("❌ 未在文件中找到视频链接") +# sys.exit(1) +# +# # generate video note +# app_logger.info(f"📋 从文件中找到 {len(urls)} 个URL") +# for i, url in enumerate(urls, 1): +# app_logger.info(f"📹 正在处理第 {i}/{len(urls)} 个视频:{url}") +# try: +# await generator.process_video_full(url) +# except Exception as e: +# app_logger.error(f"❌ 处理视频失败:{str(e)}") +# sys.exit(1) +# else: +# # check input +# if not args.input.startswith(('http://', 'https://')): +# print("⚠️ 错误:请输入有效的URL、包含URL的文件或markdown文件路径") +# print("\n使用示例:") +# print("1. 处理单个视频:") +# print(" python video_note_generator.py https://example.com/video") +# print("\n2. 处理包含URL的文件:") +# print(" python video_note_generator.py urls.txt") +# print(" - 文件中的URL可以是任意格式,每行一个或多个") +# print(" - 支持带有其他文字的行") +# print(" - 支持使用#注释") +# print("\n3. 处理Markdown文件:") +# print(" python video_note_generator.py notes.md") +# sys.exit(1) +# # generate video note +# app_logger.info(f"📹 正在处理视频:{args.input}") +# try: +# await generator.process_video_full(args.input) +# except Exception as e: +# app_logger.error(f"❌ 处理视频失败:{str(e)}") +# sys.exit(1) +# +# +# if __name__ == "__main__": +# asyncio.run(main()) \ No newline at end of file diff --git a/web.py b/web.py new file mode 100644 index 0000000..c995aaf --- /dev/null +++ b/web.py @@ -0,0 +1,240 @@ +import gradio as gr +from gradio_modal import Modal + +from src.setting.setting import global_setting, check_required_keys, load_settings, update_and_save_settings +from video_note_generator import generate_video_note + + +def update_input_visibility(choice): + return ( + gr.update(visible=(choice == "单URL")), + # gr.update(visible=(choice == "多行URL文档")), + gr.update(visible=(choice == "MD文档")), + gr.update(visible=(choice == "本地视频文件")) + ) + +def check_if_valid_url(content): + if not content: + return False + try: + urls = content.split('\n') + for url in urls: + url = url.strip() + if not url.startswith("http") and not url.startswith("https"): + return False + return True + except Exception as e: + return False + +def generate_btn_if_enabled(model_name, parse_type, url_input, md_file_input, local_video_input): + required_keys_valid = check_required_keys() + input_valid = ( + (parse_type == "单URL" and check_if_valid_url(url_input)) or + (parse_type == "MD文档" and md_file_input) or + (parse_type == "本地视频文件" and local_video_input) + ) + return gr.Button(value="生成", interactive=(model_name and input_valid and required_keys_valid)) + +def main(): + with gr.Blocks(title='爆款生成器') as webui: + local_storage = gr.BrowserState(storage_key='g_setting', default_value=global_setting, secret='asdc123') + + gr.Markdown("# 操作区") + + with gr.Row(): + model_name = gr.Textbox(label="Openrouter模型名", value="google/gemini-2.0-flash-exp:free", + interactive=True) + parse_type = gr.Radio( + label="解析内容", + choices=["单URL", "MD文档", "本地视频文件"], + value="单URL", interactive=True + ) + + with gr.Group(): + url_input = gr.Textbox(label="url 名称", visible=True, interactive=True) + md_file_input = gr.File(label="上传文档", visible=False, interactive=True, type='binary') + local_video_input = gr.File(label="上传视频", visible=False, interactive=True, type='filepath') + + parse_type.change( + fn=update_input_visibility, + inputs=[parse_type], + outputs=[url_input, md_file_input, local_video_input] + ) + + with gr.Row(): + settings_btn = gr.Button("设置") + generate_btn = gr.Button("生成", interactive=False) + + warning_icon = gr.Markdown("⚠️ 必选参数配置缺失", visible=not check_required_keys(local_storage.value)) + + # Add event listeners to update generate_btn + model_name.change(generate_btn_if_enabled, + inputs=[model_name, parse_type, url_input, md_file_input, local_video_input], + outputs=generate_btn) + parse_type.change(generate_btn_if_enabled, + inputs=[model_name, parse_type, url_input, md_file_input, local_video_input], + outputs=generate_btn) + url_input.change(generate_btn_if_enabled, + inputs=[model_name, parse_type, url_input, md_file_input, local_video_input], + outputs=generate_btn) + # file_input.change(generate_btn_if_enabled, + # inputs=[model_name, parse_type, url_input, md_file_input, local_video_input], + # outputs=generate_btn) + md_file_input.change(generate_btn_if_enabled, + inputs=[model_name, parse_type, url_input, md_file_input, local_video_input], + outputs=generate_btn) + local_video_input.change(generate_btn_if_enabled, + inputs=[model_name, parse_type, url_input, md_file_input, + local_video_input], outputs=generate_btn) + + with Modal(visible=False) as settings_modal: + gr.Markdown("# 设置") + + with gr.Tabs(): + with gr.TabItem("OpenRouter设置"): + openrouter_api_key = gr.Textbox(label="OPENROUTER_API_KEY", interactive=True, + placeholder="Require, Fill your API key here") + openrouter_api_url = gr.Textbox(label="OPENROUTER_API_URL", interactive=True, + placeholder="Require, OpenRouter API URL") + openrouter_app_name = gr.Textbox(label="OPENROUTER_APP_NAME", interactive=True, + placeholder="Require, OpenRouter App Name") + openrouter_http_referer = gr.Textbox(label="OPENROUTER_HTTP_REFERER", interactive=True, + placeholder="Require, OpenRouter HTTP Referer") + + with gr.TabItem("Unsplash设置"): + unsplash_access_key = gr.Textbox(label="UNSPLASH_ACCESS_KEY", interactive=True, + placeholder="Unsplash Access Key") + unsplash_secret_key = gr.Textbox(label="UNSPLASH_SECRET_KEY", interactive=True, + placeholder="Unsplash Secret Key") + unsplash_redirect_uri = gr.Textbox(label="UNSPLASH_REDIRECT_URI", interactive=True, + placeholder="Unsplash Redirect URI") + + with gr.TabItem("Whisper设置"): + whisper_model = gr.Radio( + label="WHISPER_MODEL", + choices=["tiny", "base", "small", "medium", "large-v2"], interactive=True + ) + whisper_language = gr.Radio( + label="WHISPER_LANGUAGE", + choices=["zh", "en", "ja"], interactive=True + ) + ffmpeg_path = gr.Textbox(label="FFMPEG_PATH", interactive=True, + placeholder="Windows 用户需要设置 FFmpeg 路径,Mac/Linux 用户通常不需要") + + with gr.TabItem("代理设置"): + http_proxy = gr.Textbox(label="HTTP_PROXY", interactive=True) + https_proxy = gr.Textbox(label="HTTPS_PROXY", interactive=True) + + with gr.TabItem("生成设置"): + output_dir = gr.Textbox(label="OUTPUT_DIR", interactive=True, placeholder="Output Directory") + max_tokens = gr.Slider(label="MAX_TOKENS", minimum=100, maximum=5000, step=100, interactive=True, + info='生成小红书内容的最大长度') + content_chunk_size = gr.Slider(label="CONTENT_CHUNK_SIZE", minimum=100, maximum=5000, step=100, + interactive=True, info='长文本分块大小(字符数)') + temperature = gr.Slider(label="TEMPERATURE", minimum=0.0, maximum=1.0, step=0.1, interactive=True, + info='AI 创造性程度 (0.0-1.0)') + top_p = gr.Slider(label="TOP_P", minimum=0.0, maximum=1.0, step=0.1, interactive=True, + info='采样阈值 (0.0-1.0)') + use_emoji = gr.Checkbox(label="USE_EMOJI", interactive=True, info='是否在内容中使用表情符号') + tag_count = gr.Slider(label="TAG_COUNT", minimum=1, maximum=10, step=1, interactive=True, + info='生成的标签数量') + min_paragraphs = gr.Slider(label="MIN_PARAGRAPHS", minimum=1, maximum=10, step=1, interactive=True, + info='最少段落数') + max_paragraphs = gr.Slider(label="MAX_PARAGRAPHS", minimum=1, maximum=10, step=1, interactive=True, + info='最多段落数') + + with gr.TabItem("调试设置"): + debug = gr.Checkbox(label="DEBUG", interactive=True) + log_level = gr.Radio( + label="LOG_LEVEL", + choices=["debug", "info", "warning", "error"], interactive=True + ) + + confirm_btn = gr.Button("确认") + + webui.load(load_settings, inputs=[local_storage], outputs=[ + warning_icon, + openrouter_api_key, openrouter_api_url, openrouter_app_name, + openrouter_http_referer, unsplash_access_key, unsplash_secret_key, + unsplash_redirect_uri, whisper_model, whisper_language, + ffmpeg_path, http_proxy, https_proxy, output_dir, + max_tokens, content_chunk_size, temperature, + top_p, use_emoji, tag_count, + min_paragraphs, max_paragraphs, debug, + log_level, + ]) + + settings_btn.click(lambda: Modal(visible=True), None, settings_modal) + + confirm_btn.click( + fn=update_and_save_settings, + inputs=[ + openrouter_api_key, openrouter_api_url, openrouter_app_name, + openrouter_http_referer, unsplash_access_key, unsplash_secret_key, + unsplash_redirect_uri, whisper_model, whisper_language, + ffmpeg_path, http_proxy, https_proxy, output_dir, + max_tokens, content_chunk_size, temperature, + top_p, use_emoji, tag_count, + min_paragraphs, max_paragraphs, debug, + log_level + ], + outputs=[warning_icon, settings_modal, local_storage] + ) + + result_status = gr.Textbox('生成中...', interactive=False, visible=False, show_label=False) + with gr.Tabs(visible=False) as result_tabs: + + with gr.TabItem("原始笔记"): + transcript_output = gr.Markdown(show_copy_button=True) + + with gr.TabItem("整理版笔记"): + organized_output = gr.Markdown(show_copy_button=True) + + with gr.TabItem("小红书版本"): + xiaohongshu_output = gr.Markdown(show_copy_button=True) + with gr.Row(): + note_images = gr.Gallery(label='推荐配图', + show_label=False, columns=[3], rows=[1], + object_fit="contain", height="auto") + + # 生成 + def on_btn_click(): + return gr.Button(interactive=False), gr.Tabs(visible=True), gr.Textbox('生成中...', interactive=False, visible=True, show_label=False) + + async def generate(*args): + # 调用逻辑 + model_name = args[0] + gen_parse_type = args[1] + input_content = None + if gen_parse_type == "单URL": + input_content = args[2] + elif gen_parse_type == "MD文档": + input_content = args[3] + elif gen_parse_type == "本地视频文件": + input_content = args[4] + + notice_info, transcript_output_tran, organized_output_tran, xiaohongshu_output_tran, images = await generate_video_note(model_name, gen_parse_type, input_content) + + return [gr.Textbox(notice_info, visible=True), transcript_output_tran, organized_output_tran, xiaohongshu_output_tran, gr.Button(interactive=True), images] + + def after_slow_function(): + return gr.Button(interactive=True), gr.Textbox('生成中...', interactive=False, visible=False, show_label=False) + + generate_btn.click( + fn=on_btn_click, + outputs=[generate_btn, result_tabs, result_status] + ).then( + fn=generate, + inputs = [model_name, parse_type, + url_input, md_file_input, local_video_input, local_storage], + outputs=[warning_icon, transcript_output, organized_output, xiaohongshu_output, generate_btn, note_images] + ).then( + fn=after_slow_function, + outputs=[generate_btn, result_status] + ) + + webui.launch() + + +if __name__ == "__main__": + main()