diff --git a/tools/job_guard.py b/tools/job_guard.py new file mode 100644 index 00000000..55f9efa9 --- /dev/null +++ b/tools/job_guard.py @@ -0,0 +1,157 @@ +#!/usr/bin/env python3 +""" +job_guard — cron job 失败通知守护。 + +用法: + python job_guard.py [args...] + +做的事情很简单: + 1. 运行你给它的命令 + 2. 命令成功 → 什么都不做 + 3. 命令失败 → 给你发一封邮件,告诉你哪个任务挂了、报了什么错 + +示例 crontab 配置: + 0 8 * * * cd /your/workspace && .venv/bin/python tools/job_guard.py .venv/bin/python periodic_jobs/ai_heartbeat/src/v0/observer.py + +环境变量(从 .env 自动加载): + GMAIL_USERNAME — 发件邮箱 + GMAIL_APP_PASSWORD — Gmail 应用专用密码 + GMAIL_RECIPIENTS — 收件人(默认发给自己) +""" +import os +import sys +import subprocess +import socket +from datetime import datetime +from pathlib import Path + +# --------------------------------------------------------------------------- +# 邮件发送(复用 send_email_to_myself 的逻辑,但内联以避免循环依赖) +# --------------------------------------------------------------------------- + +def _load_dotenv(): + """向上查找 .env 并注入环境变量。""" + current = Path.cwd() + for parent in [current] + list(current.parents): + env_file = parent / ".env" + if env_file.exists(): + with open(env_file, "r", encoding="utf-8") as f: + for line in f: + line = line.strip() + if not line or line.startswith("#") or "=" not in line: + continue + k, v = line.split("=", 1) + k, v = k.strip(), v.strip().strip('"\'') + if k and k not in os.environ: + os.environ[k] = v + break + + +def _send_failure_email(subject, body): + """发送纯文本失败通知邮件。""" + import smtplib + from email.mime.text import MIMEText + + _load_dotenv() + username = os.getenv("GMAIL_USERNAME") + password = os.getenv("GMAIL_APP_PASSWORD") + to_addr = os.getenv("GMAIL_RECIPIENTS") + + if not all([username, password, to_addr]): + # 邮件配置不全时退化为 stderr 输出,至少 cron 的 MAILTO 还能兜底 + print(f"[job_guard] 邮件配置不全,无法发送通知。Subject: {subject}", file=sys.stderr) + print(body, file=sys.stderr) + return False + + msg = MIMEText(body, "plain", "utf-8") + msg["From"] = username + msg["To"] = to_addr + msg["Subject"] = subject + + try: + with smtplib.SMTP("smtp.gmail.com", 587) as server: + server.starttls() + server.login(username, password) + server.sendmail(username, [to_addr], msg.as_string()) + return True + except Exception as e: + print(f"[job_guard] 发送邮件失败: {e}", file=sys.stderr) + return False + + +# --------------------------------------------------------------------------- +# 主逻辑 +# --------------------------------------------------------------------------- + +def main(): + if len(sys.argv) < 2: + print(f"用法: {sys.argv[0]} [args...]", file=sys.stderr) + sys.exit(1) + + cmd = sys.argv[1:] + job_name = Path(cmd[-1]).stem if cmd else "unknown" + started_at = datetime.now() + + try: + result = subprocess.run( + cmd, + capture_output=True, + text=True, + timeout=7200, # 2 小时超时保护 + ) + except subprocess.TimeoutExpired: + elapsed = datetime.now() - started_at + subject = f"[TIMEOUT] {job_name} 超时未完成" + body = ( + f"任务: {' '.join(cmd)}\n" + f"机器: {socket.gethostname()}\n" + f"开始: {started_at:%Y-%m-%d %H:%M:%S}\n" + f"耗时: {elapsed}\n" + f"状态: 超过 2 小时未完成,已被强制终止\n" + ) + _send_failure_email(subject, body) + sys.exit(124) + except Exception as e: + subject = f"[ERROR] {job_name} 启动失败" + body = f"任务: {' '.join(cmd)}\n异常: {e}\n" + _send_failure_email(subject, body) + sys.exit(1) + + # 命令成功 → 透传 stdout 后静默退出 + if result.returncode == 0: + if result.stdout: + sys.stdout.write(result.stdout) + sys.exit(0) + + # 命令失败 → 发通知 + elapsed = datetime.now() - started_at + subject = f"[FAILED] {job_name} 退出码 {result.returncode}" + + # 截取最后 200 行,避免邮件过长 + stderr_tail = "\n".join(result.stderr.splitlines()[-200:]) if result.stderr else "(无 stderr 输出)" + stdout_tail = "\n".join(result.stdout.splitlines()[-50:]) if result.stdout else "(无 stdout 输出)" + + body = ( + f"任务: {' '.join(cmd)}\n" + f"机器: {socket.gethostname()}\n" + f"开始: {started_at:%Y-%m-%d %H:%M:%S}\n" + f"耗时: {elapsed}\n" + f"退出码: {result.returncode}\n" + f"\n{'='*60}\n" + f"STDERR (最后 200 行):\n{stderr_tail}\n" + f"\n{'='*60}\n" + f"STDOUT (最后 50 行):\n{stdout_tail}\n" + ) + + _send_failure_email(subject, body) + + # 同时输出到 stderr,让 cron 日志也有记录 + print(f"[job_guard] {job_name} failed with exit code {result.returncode}", file=sys.stderr) + if result.stderr: + sys.stderr.write(result.stderr) + + sys.exit(result.returncode) + + +if __name__ == "__main__": + main() diff --git a/tools/semantic_search/search/chunker.py b/tools/semantic_search/search/chunker.py index ebb39863..00f2aaaa 100644 --- a/tools/semantic_search/search/chunker.py +++ b/tools/semantic_search/search/chunker.py @@ -25,14 +25,19 @@ def chunk(self, file_path: str, content: str) -> List[Chunk]: """按标题分块并保留元数据。""" metadata, body = self.parse_yaml_frontmatter(content) chunks = [] - - lines = body.split('\n') + + # Calculate line offset: frontmatter lines + separator lines are skipped + all_lines = content.split('\n') + body_lines = body.split('\n') + frontmatter_offset = len(all_lines) - len(body_lines) + + lines = body_lines current_header = "" current_chunk_lines = [] chunk_idx = 0 - start_line = 1 # TODO: accurately track line numbers if needed + start_line = frontmatter_offset + 1 - for i, line in enumerate(lines, 1): + for i, line in enumerate(lines, frontmatter_offset + 1): if line.startswith('#'): # Save previous chunk if it exists if current_chunk_lines: diff --git a/tools/semantic_search/search/models.py b/tools/semantic_search/search/models.py index 4ac99adb..994b6c5e 100644 --- a/tools/semantic_search/search/models.py +++ b/tools/semantic_search/search/models.py @@ -10,7 +10,7 @@ class Chunk: source_file: str = "" # 源文件相对路径 header: str = "" # 所属标题 position: Tuple[int, int] = (0, 0) # (start_line, end_line) - metadata: Dict[str, Any] = None + metadata: Optional[Dict[str, Any]] = None def to_dict(self, include_embedding: bool = False) -> Dict[str, Any]: d = asdict(self) diff --git a/tools/semantic_search/tests/__init__.py b/tools/semantic_search/tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tools/semantic_search/tests/test_chunker.py b/tools/semantic_search/tests/test_chunker.py new file mode 100644 index 00000000..e52d0c83 --- /dev/null +++ b/tools/semantic_search/tests/test_chunker.py @@ -0,0 +1,75 @@ +import pytest +import sys +from pathlib import Path + +sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) + +from search.chunker import MarkdownChunker + + +@pytest.fixture +def chunker(): + return MarkdownChunker(max_chunk_size=200) + + +class TestParseYamlFrontmatter: + def test_with_frontmatter(self, chunker): + content = "---\ntitle: Test\ntags: [a, b]\n---\n# Hello\nBody text" + metadata, body = chunker.parse_yaml_frontmatter(content) + assert metadata == {"title": "Test", "tags": ["a", "b"]} + assert body.startswith("# Hello") + + def test_without_frontmatter(self, chunker): + content = "# Hello\nBody text" + metadata, body = chunker.parse_yaml_frontmatter(content) + assert metadata == {} + assert body == content.strip() + + def test_invalid_yaml(self, chunker): + content = "---\n: invalid: yaml: [[\n---\nBody" + metadata, body = chunker.parse_yaml_frontmatter(content) + assert metadata == {} + + +class TestChunking: + def test_basic_header_split(self, chunker): + content = "# Section 1\nText A\n# Section 2\nText B" + chunks = chunker.chunk("test.md", content) + assert len(chunks) == 2 + assert "Text A" in chunks[0].text + assert "Text B" in chunks[1].text + + def test_chunk_ids_are_sequential(self, chunker): + content = "# A\nfoo\n# B\nbar\n# C\nbaz" + chunks = chunker.chunk("doc.md", content) + assert [c.id for c in chunks] == ["doc.md:0", "doc.md:1", "doc.md:2"] + + def test_no_headers(self, chunker): + content = "Just some plain text\nwith multiple lines." + chunks = chunker.chunk("plain.md", content) + assert len(chunks) == 1 + assert chunks[0].header == "" + + def test_large_chunk_is_split(self): + chunker = MarkdownChunker(max_chunk_size=50) + content = "# Header\n" + "word " * 100 + chunks = chunker.chunk("big.md", content) + assert len(chunks) > 1 + for c in chunks: + assert c.header == "# Header" + + def test_position_tracking(self, chunker): + content = "# A\nline1\nline2\n# B\nline3" + chunks = chunker.chunk("pos.md", content) + assert chunks[0].position[0] == 1 + assert chunks[1].position[0] == 4 + + def test_metadata_propagated(self, chunker): + content = "---\nauthor: test\n---\n# A\nfoo\n# B\nbar" + chunks = chunker.chunk("meta.md", content) + for c in chunks: + assert c.metadata == {"author": "test"} + + def test_empty_content(self, chunker): + chunks = chunker.chunk("empty.md", "") + assert len(chunks) == 0 or all(c.text.strip() == "" for c in chunks) diff --git a/tools/send_email_to_myself.py b/tools/send_email_to_myself.py index 91123a6e..1cd9f621 100644 --- a/tools/send_email_to_myself.py +++ b/tools/send_email_to_myself.py @@ -48,48 +48,65 @@ def load_dotenv(): break def md_to_html(md_content, title=None, css=None): - html = md_content - + # Protect fenced code blocks from markdown transformation + code_blocks = [] + def stash_code_block(match): + lang = match.group(1) or "" + code = match.group(2) + code = code.replace("&", "&").replace("<", "<").replace(">", ">") + placeholder = f"\x00CODEBLOCK{len(code_blocks)}\x00" + code_blocks.append(f'
{code}
' if lang else f'
{code}
') + return placeholder + + html = re.sub(r'```(\w*)\n(.*?)```', stash_code_block, md_content, flags=re.DOTALL) + html = re.sub(r'^# (.+)$', r'

\1

', html, flags=re.MULTILINE) html = re.sub(r'^## (.+)$', r'

\1

', html, flags=re.MULTILINE) html = re.sub(r'^### (.+)$', r'

\1

', html, flags=re.MULTILINE) - + html = re.sub(r'\*\*(.+?)\*\*', r'\1', html) - html = re.sub(r'\*(.+?)\*', r'\1', html) - + html = re.sub(r'(?\1', html) + html = re.sub(r'\[([^\]]+)\]\(([^)]+)\)', r'\1', html) - + html = re.sub(r'^- (.+)$', r'
  • \1
  • ', html, flags=re.MULTILINE) html = re.sub(r'(
  • .*
  • \n?)+', r'\n', html) - + html = re.sub(r'^\d+\. (.+)$', r'
  • \1
  • ', html, flags=re.MULTILINE) - + html = re.sub(r'^---$', r'
    ', html, flags=re.MULTILINE) - + html = re.sub(r'`([^`]+)`', r'\1', html) - + def convert_table(match): table_content = match.group(0) lines = table_content.strip().split('\n') html_table = '\n' for i, line in enumerate(lines): - if re.match(r'^[\|\-\s]+$', line): + if re.match(r'^[\|\-\:\s]+$', line): + continue + cells = [c.strip() for c in line.split('|')] + cells = [c for c in cells if c] # drop empty from leading/trailing | + if not cells: continue - cells = [c.strip() for c in line.split('|') if c.strip()] tag = 'th' if i == 0 else 'td' row = ''.join(f'<{tag}>{c}' for c in cells) html_table += f'{row}\n' html_table += '
    ' return html_table - + html = re.sub(r'(\|.+\|\n)+', convert_table, html) - + html = re.sub(r'\n\n', '

    ', html) html = '

    ' + html + '

    ' html = re.sub(r'

    (.*?)

    ', r'\1', html, flags=re.DOTALL) html = re.sub(r'

    (

    )

    ', r'\1', html, flags=re.DOTALL) html = re.sub(r'

    (.*?
    )

    ', r'\1', html, flags=re.DOTALL) html = re.sub(r'

    (


    )

    ', r'\1', html) + + # Restore code blocks + for i, block in enumerate(code_blocks): + html = html.replace(f"\x00CODEBLOCK{i}\x00", block) title_html = f'{title}' if title else '' css_html = css if css else CSS_STYLES