diff --git a/tools/job_guard.py b/tools/job_guard.py
new file mode 100644
index 00000000..55f9efa9
--- /dev/null
+++ b/tools/job_guard.py
@@ -0,0 +1,157 @@
+#!/usr/bin/env python3
+"""
+job_guard — cron job 失败通知守护。
+
+用法:
+ python job_guard.py [args...]
+
+做的事情很简单:
+ 1. 运行你给它的命令
+ 2. 命令成功 → 什么都不做
+ 3. 命令失败 → 给你发一封邮件,告诉你哪个任务挂了、报了什么错
+
+示例 crontab 配置:
+ 0 8 * * * cd /your/workspace && .venv/bin/python tools/job_guard.py .venv/bin/python periodic_jobs/ai_heartbeat/src/v0/observer.py
+
+环境变量(从 .env 自动加载):
+ GMAIL_USERNAME — 发件邮箱
+ GMAIL_APP_PASSWORD — Gmail 应用专用密码
+ GMAIL_RECIPIENTS — 收件人(默认发给自己)
+"""
+import os
+import sys
+import subprocess
+import socket
+from datetime import datetime
+from pathlib import Path
+
+# ---------------------------------------------------------------------------
+# 邮件发送(复用 send_email_to_myself 的逻辑,但内联以避免循环依赖)
+# ---------------------------------------------------------------------------
+
+def _load_dotenv():
+ """向上查找 .env 并注入环境变量。"""
+ current = Path.cwd()
+ for parent in [current] + list(current.parents):
+ env_file = parent / ".env"
+ if env_file.exists():
+ with open(env_file, "r", encoding="utf-8") as f:
+ for line in f:
+ line = line.strip()
+ if not line or line.startswith("#") or "=" not in line:
+ continue
+ k, v = line.split("=", 1)
+ k, v = k.strip(), v.strip().strip('"\'')
+ if k and k not in os.environ:
+ os.environ[k] = v
+ break
+
+
+def _send_failure_email(subject, body):
+ """发送纯文本失败通知邮件。"""
+ import smtplib
+ from email.mime.text import MIMEText
+
+ _load_dotenv()
+ username = os.getenv("GMAIL_USERNAME")
+ password = os.getenv("GMAIL_APP_PASSWORD")
+ to_addr = os.getenv("GMAIL_RECIPIENTS")
+
+ if not all([username, password, to_addr]):
+ # 邮件配置不全时退化为 stderr 输出,至少 cron 的 MAILTO 还能兜底
+ print(f"[job_guard] 邮件配置不全,无法发送通知。Subject: {subject}", file=sys.stderr)
+ print(body, file=sys.stderr)
+ return False
+
+ msg = MIMEText(body, "plain", "utf-8")
+ msg["From"] = username
+ msg["To"] = to_addr
+ msg["Subject"] = subject
+
+ try:
+ with smtplib.SMTP("smtp.gmail.com", 587) as server:
+ server.starttls()
+ server.login(username, password)
+ server.sendmail(username, [to_addr], msg.as_string())
+ return True
+ except Exception as e:
+ print(f"[job_guard] 发送邮件失败: {e}", file=sys.stderr)
+ return False
+
+
+# ---------------------------------------------------------------------------
+# 主逻辑
+# ---------------------------------------------------------------------------
+
+def main():
+ if len(sys.argv) < 2:
+ print(f"用法: {sys.argv[0]} [args...]", file=sys.stderr)
+ sys.exit(1)
+
+ cmd = sys.argv[1:]
+ job_name = Path(cmd[-1]).stem if cmd else "unknown"
+ started_at = datetime.now()
+
+ try:
+ result = subprocess.run(
+ cmd,
+ capture_output=True,
+ text=True,
+ timeout=7200, # 2 小时超时保护
+ )
+ except subprocess.TimeoutExpired:
+ elapsed = datetime.now() - started_at
+ subject = f"[TIMEOUT] {job_name} 超时未完成"
+ body = (
+ f"任务: {' '.join(cmd)}\n"
+ f"机器: {socket.gethostname()}\n"
+ f"开始: {started_at:%Y-%m-%d %H:%M:%S}\n"
+ f"耗时: {elapsed}\n"
+ f"状态: 超过 2 小时未完成,已被强制终止\n"
+ )
+ _send_failure_email(subject, body)
+ sys.exit(124)
+ except Exception as e:
+ subject = f"[ERROR] {job_name} 启动失败"
+ body = f"任务: {' '.join(cmd)}\n异常: {e}\n"
+ _send_failure_email(subject, body)
+ sys.exit(1)
+
+ # 命令成功 → 透传 stdout 后静默退出
+ if result.returncode == 0:
+ if result.stdout:
+ sys.stdout.write(result.stdout)
+ sys.exit(0)
+
+ # 命令失败 → 发通知
+ elapsed = datetime.now() - started_at
+ subject = f"[FAILED] {job_name} 退出码 {result.returncode}"
+
+ # 截取最后 200 行,避免邮件过长
+ stderr_tail = "\n".join(result.stderr.splitlines()[-200:]) if result.stderr else "(无 stderr 输出)"
+ stdout_tail = "\n".join(result.stdout.splitlines()[-50:]) if result.stdout else "(无 stdout 输出)"
+
+ body = (
+ f"任务: {' '.join(cmd)}\n"
+ f"机器: {socket.gethostname()}\n"
+ f"开始: {started_at:%Y-%m-%d %H:%M:%S}\n"
+ f"耗时: {elapsed}\n"
+ f"退出码: {result.returncode}\n"
+ f"\n{'='*60}\n"
+ f"STDERR (最后 200 行):\n{stderr_tail}\n"
+ f"\n{'='*60}\n"
+ f"STDOUT (最后 50 行):\n{stdout_tail}\n"
+ )
+
+ _send_failure_email(subject, body)
+
+ # 同时输出到 stderr,让 cron 日志也有记录
+ print(f"[job_guard] {job_name} failed with exit code {result.returncode}", file=sys.stderr)
+ if result.stderr:
+ sys.stderr.write(result.stderr)
+
+ sys.exit(result.returncode)
+
+
+if __name__ == "__main__":
+ main()
diff --git a/tools/semantic_search/search/chunker.py b/tools/semantic_search/search/chunker.py
index ebb39863..00f2aaaa 100644
--- a/tools/semantic_search/search/chunker.py
+++ b/tools/semantic_search/search/chunker.py
@@ -25,14 +25,19 @@ def chunk(self, file_path: str, content: str) -> List[Chunk]:
"""按标题分块并保留元数据。"""
metadata, body = self.parse_yaml_frontmatter(content)
chunks = []
-
- lines = body.split('\n')
+
+ # Calculate line offset: frontmatter lines + separator lines are skipped
+ all_lines = content.split('\n')
+ body_lines = body.split('\n')
+ frontmatter_offset = len(all_lines) - len(body_lines)
+
+ lines = body_lines
current_header = ""
current_chunk_lines = []
chunk_idx = 0
- start_line = 1 # TODO: accurately track line numbers if needed
+ start_line = frontmatter_offset + 1
- for i, line in enumerate(lines, 1):
+ for i, line in enumerate(lines, frontmatter_offset + 1):
if line.startswith('#'):
# Save previous chunk if it exists
if current_chunk_lines:
diff --git a/tools/semantic_search/search/models.py b/tools/semantic_search/search/models.py
index 4ac99adb..994b6c5e 100644
--- a/tools/semantic_search/search/models.py
+++ b/tools/semantic_search/search/models.py
@@ -10,7 +10,7 @@ class Chunk:
source_file: str = "" # 源文件相对路径
header: str = "" # 所属标题
position: Tuple[int, int] = (0, 0) # (start_line, end_line)
- metadata: Dict[str, Any] = None
+ metadata: Optional[Dict[str, Any]] = None
def to_dict(self, include_embedding: bool = False) -> Dict[str, Any]:
d = asdict(self)
diff --git a/tools/semantic_search/tests/__init__.py b/tools/semantic_search/tests/__init__.py
new file mode 100644
index 00000000..e69de29b
diff --git a/tools/semantic_search/tests/test_chunker.py b/tools/semantic_search/tests/test_chunker.py
new file mode 100644
index 00000000..e52d0c83
--- /dev/null
+++ b/tools/semantic_search/tests/test_chunker.py
@@ -0,0 +1,75 @@
+import pytest
+import sys
+from pathlib import Path
+
+sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
+
+from search.chunker import MarkdownChunker
+
+
+@pytest.fixture
+def chunker():
+ return MarkdownChunker(max_chunk_size=200)
+
+
+class TestParseYamlFrontmatter:
+ def test_with_frontmatter(self, chunker):
+ content = "---\ntitle: Test\ntags: [a, b]\n---\n# Hello\nBody text"
+ metadata, body = chunker.parse_yaml_frontmatter(content)
+ assert metadata == {"title": "Test", "tags": ["a", "b"]}
+ assert body.startswith("# Hello")
+
+ def test_without_frontmatter(self, chunker):
+ content = "# Hello\nBody text"
+ metadata, body = chunker.parse_yaml_frontmatter(content)
+ assert metadata == {}
+ assert body == content.strip()
+
+ def test_invalid_yaml(self, chunker):
+ content = "---\n: invalid: yaml: [[\n---\nBody"
+ metadata, body = chunker.parse_yaml_frontmatter(content)
+ assert metadata == {}
+
+
+class TestChunking:
+ def test_basic_header_split(self, chunker):
+ content = "# Section 1\nText A\n# Section 2\nText B"
+ chunks = chunker.chunk("test.md", content)
+ assert len(chunks) == 2
+ assert "Text A" in chunks[0].text
+ assert "Text B" in chunks[1].text
+
+ def test_chunk_ids_are_sequential(self, chunker):
+ content = "# A\nfoo\n# B\nbar\n# C\nbaz"
+ chunks = chunker.chunk("doc.md", content)
+ assert [c.id for c in chunks] == ["doc.md:0", "doc.md:1", "doc.md:2"]
+
+ def test_no_headers(self, chunker):
+ content = "Just some plain text\nwith multiple lines."
+ chunks = chunker.chunk("plain.md", content)
+ assert len(chunks) == 1
+ assert chunks[0].header == ""
+
+ def test_large_chunk_is_split(self):
+ chunker = MarkdownChunker(max_chunk_size=50)
+ content = "# Header\n" + "word " * 100
+ chunks = chunker.chunk("big.md", content)
+ assert len(chunks) > 1
+ for c in chunks:
+ assert c.header == "# Header"
+
+ def test_position_tracking(self, chunker):
+ content = "# A\nline1\nline2\n# B\nline3"
+ chunks = chunker.chunk("pos.md", content)
+ assert chunks[0].position[0] == 1
+ assert chunks[1].position[0] == 4
+
+ def test_metadata_propagated(self, chunker):
+ content = "---\nauthor: test\n---\n# A\nfoo\n# B\nbar"
+ chunks = chunker.chunk("meta.md", content)
+ for c in chunks:
+ assert c.metadata == {"author": "test"}
+
+ def test_empty_content(self, chunker):
+ chunks = chunker.chunk("empty.md", "")
+ assert len(chunks) == 0 or all(c.text.strip() == "" for c in chunks)
diff --git a/tools/send_email_to_myself.py b/tools/send_email_to_myself.py
index 91123a6e..1cd9f621 100644
--- a/tools/send_email_to_myself.py
+++ b/tools/send_email_to_myself.py
@@ -48,48 +48,65 @@ def load_dotenv():
break
def md_to_html(md_content, title=None, css=None):
- html = md_content
-
+ # Protect fenced code blocks from markdown transformation
+ code_blocks = []
+ def stash_code_block(match):
+ lang = match.group(1) or ""
+ code = match.group(2)
+ code = code.replace("&", "&").replace("<", "<").replace(">", ">")
+ placeholder = f"\x00CODEBLOCK{len(code_blocks)}\x00"
+ code_blocks.append(f'{code}
' if lang else f'{code}
')
+ return placeholder
+
+ html = re.sub(r'```(\w*)\n(.*?)```', stash_code_block, md_content, flags=re.DOTALL)
+
html = re.sub(r'^# (.+)$', r'\1
', html, flags=re.MULTILINE)
html = re.sub(r'^## (.+)$', r'\1
', html, flags=re.MULTILINE)
html = re.sub(r'^### (.+)$', r'\1
', html, flags=re.MULTILINE)
-
+
html = re.sub(r'\*\*(.+?)\*\*', r'\1', html)
- html = re.sub(r'\*(.+?)\*', r'\1', html)
-
+ html = re.sub(r'(?\1', html)
+
html = re.sub(r'\[([^\]]+)\]\(([^)]+)\)', r'\1', html)
-
+
html = re.sub(r'^- (.+)$', r'\1', html, flags=re.MULTILINE)
html = re.sub(r'(.*\n?)+', r'\n', html)
-
+
html = re.sub(r'^\d+\. (.+)$', r'\1', html, flags=re.MULTILINE)
-
+
html = re.sub(r'^---$', r'
', html, flags=re.MULTILINE)
-
+
html = re.sub(r'`([^`]+)`', r'\1', html)
-
+
def convert_table(match):
table_content = match.group(0)
lines = table_content.strip().split('\n')
html_table = '\n'
for i, line in enumerate(lines):
- if re.match(r'^[\|\-\s]+$', line):
+ if re.match(r'^[\|\-\:\s]+$', line):
+ continue
+ cells = [c.strip() for c in line.split('|')]
+ cells = [c for c in cells if c] # drop empty from leading/trailing |
+ if not cells:
continue
- cells = [c.strip() for c in line.split('|') if c.strip()]
tag = 'th' if i == 0 else 'td'
row = ''.join(f'<{tag}>{c}{tag}>' for c in cells)
html_table += f'{row}
\n'
html_table += '
'
return html_table
-
+
html = re.sub(r'(\|.+\|\n)+', convert_table, html)
-
+
html = re.sub(r'\n\n', '
', html)
html = '
' + html + '
'
html = re.sub(r'(.*?)
', r'\1', html, flags=re.DOTALL)
html = re.sub(r'(
)', r'\1', html, flags=re.DOTALL)
html = re.sub(r'(
)', r'\1', html, flags=re.DOTALL)
html = re.sub(r'(
)', r'\1', html)
+
+ # Restore code blocks
+ for i, block in enumerate(code_blocks):
+ html = html.replace(f"\x00CODEBLOCK{i}\x00", block)
title_html = f'{title}' if title else ''
css_html = css if css else CSS_STYLES