Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
157 changes: 157 additions & 0 deletions tools/job_guard.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
#!/usr/bin/env python3
"""
job_guard — cron job 失败通知守护。

用法:
python job_guard.py <command> [args...]

做的事情很简单:
1. 运行你给它的命令
2. 命令成功 → 什么都不做
3. 命令失败 → 给你发一封邮件,告诉你哪个任务挂了、报了什么错

示例 crontab 配置:
0 8 * * * cd /your/workspace && .venv/bin/python tools/job_guard.py .venv/bin/python periodic_jobs/ai_heartbeat/src/v0/observer.py

环境变量(从 .env 自动加载):
GMAIL_USERNAME — 发件邮箱
GMAIL_APP_PASSWORD — Gmail 应用专用密码
GMAIL_RECIPIENTS — 收件人(默认发给自己)
"""
import os
import sys
import subprocess
import socket
from datetime import datetime
from pathlib import Path

# ---------------------------------------------------------------------------
# 邮件发送(复用 send_email_to_myself 的逻辑,但内联以避免循环依赖)
# ---------------------------------------------------------------------------

def _load_dotenv():
"""向上查找 .env 并注入环境变量。"""
current = Path.cwd()
for parent in [current] + list(current.parents):
env_file = parent / ".env"
if env_file.exists():
with open(env_file, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line or line.startswith("#") or "=" not in line:
continue
k, v = line.split("=", 1)
k, v = k.strip(), v.strip().strip('"\'')
if k and k not in os.environ:
os.environ[k] = v
break


def _send_failure_email(subject, body):
"""发送纯文本失败通知邮件。"""
import smtplib
from email.mime.text import MIMEText

_load_dotenv()
username = os.getenv("GMAIL_USERNAME")
password = os.getenv("GMAIL_APP_PASSWORD")
to_addr = os.getenv("GMAIL_RECIPIENTS")

if not all([username, password, to_addr]):
# 邮件配置不全时退化为 stderr 输出,至少 cron 的 MAILTO 还能兜底
print(f"[job_guard] 邮件配置不全,无法发送通知。Subject: {subject}", file=sys.stderr)
print(body, file=sys.stderr)
return False

msg = MIMEText(body, "plain", "utf-8")
msg["From"] = username
msg["To"] = to_addr
msg["Subject"] = subject

try:
with smtplib.SMTP("smtp.gmail.com", 587) as server:
server.starttls()
server.login(username, password)
server.sendmail(username, [to_addr], msg.as_string())
return True
except Exception as e:
print(f"[job_guard] 发送邮件失败: {e}", file=sys.stderr)
return False


# ---------------------------------------------------------------------------
# 主逻辑
# ---------------------------------------------------------------------------

def main():
if len(sys.argv) < 2:
print(f"用法: {sys.argv[0]} <command> [args...]", file=sys.stderr)
sys.exit(1)

cmd = sys.argv[1:]
job_name = Path(cmd[-1]).stem if cmd else "unknown"
started_at = datetime.now()

try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=7200, # 2 小时超时保护
)
except subprocess.TimeoutExpired:
elapsed = datetime.now() - started_at
subject = f"[TIMEOUT] {job_name} 超时未完成"
body = (
f"任务: {' '.join(cmd)}\n"
f"机器: {socket.gethostname()}\n"
f"开始: {started_at:%Y-%m-%d %H:%M:%S}\n"
f"耗时: {elapsed}\n"
f"状态: 超过 2 小时未完成,已被强制终止\n"
)
_send_failure_email(subject, body)
sys.exit(124)
except Exception as e:
subject = f"[ERROR] {job_name} 启动失败"
body = f"任务: {' '.join(cmd)}\n异常: {e}\n"
_send_failure_email(subject, body)
sys.exit(1)

# 命令成功 → 透传 stdout 后静默退出
if result.returncode == 0:
if result.stdout:
sys.stdout.write(result.stdout)
sys.exit(0)

# 命令失败 → 发通知
elapsed = datetime.now() - started_at
subject = f"[FAILED] {job_name} 退出码 {result.returncode}"

# 截取最后 200 行,避免邮件过长
stderr_tail = "\n".join(result.stderr.splitlines()[-200:]) if result.stderr else "(无 stderr 输出)"
stdout_tail = "\n".join(result.stdout.splitlines()[-50:]) if result.stdout else "(无 stdout 输出)"

body = (
f"任务: {' '.join(cmd)}\n"
f"机器: {socket.gethostname()}\n"
f"开始: {started_at:%Y-%m-%d %H:%M:%S}\n"
f"耗时: {elapsed}\n"
f"退出码: {result.returncode}\n"
f"\n{'='*60}\n"
f"STDERR (最后 200 行):\n{stderr_tail}\n"
f"\n{'='*60}\n"
f"STDOUT (最后 50 行):\n{stdout_tail}\n"
)

_send_failure_email(subject, body)

# 同时输出到 stderr,让 cron 日志也有记录
print(f"[job_guard] {job_name} failed with exit code {result.returncode}", file=sys.stderr)
if result.stderr:
sys.stderr.write(result.stderr)

sys.exit(result.returncode)


if __name__ == "__main__":
main()
13 changes: 9 additions & 4 deletions tools/semantic_search/search/chunker.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,19 @@ def chunk(self, file_path: str, content: str) -> List[Chunk]:
"""按标题分块并保留元数据。"""
metadata, body = self.parse_yaml_frontmatter(content)
chunks = []

lines = body.split('\n')

# Calculate line offset: frontmatter lines + separator lines are skipped
all_lines = content.split('\n')
body_lines = body.split('\n')
frontmatter_offset = len(all_lines) - len(body_lines)

lines = body_lines
current_header = ""
current_chunk_lines = []
chunk_idx = 0
start_line = 1 # TODO: accurately track line numbers if needed
start_line = frontmatter_offset + 1

for i, line in enumerate(lines, 1):
for i, line in enumerate(lines, frontmatter_offset + 1):
if line.startswith('#'):
# Save previous chunk if it exists
if current_chunk_lines:
Expand Down
2 changes: 1 addition & 1 deletion tools/semantic_search/search/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ class Chunk:
source_file: str = "" # 源文件相对路径
header: str = "" # 所属标题
position: Tuple[int, int] = (0, 0) # (start_line, end_line)
metadata: Dict[str, Any] = None
metadata: Optional[Dict[str, Any]] = None

def to_dict(self, include_embedding: bool = False) -> Dict[str, Any]:
d = asdict(self)
Expand Down
Empty file.
75 changes: 75 additions & 0 deletions tools/semantic_search/tests/test_chunker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import pytest
import sys
from pathlib import Path

sys.path.insert(0, str(Path(__file__).resolve().parent.parent))

from search.chunker import MarkdownChunker


@pytest.fixture
def chunker():
return MarkdownChunker(max_chunk_size=200)


class TestParseYamlFrontmatter:
def test_with_frontmatter(self, chunker):
content = "---\ntitle: Test\ntags: [a, b]\n---\n# Hello\nBody text"
metadata, body = chunker.parse_yaml_frontmatter(content)
assert metadata == {"title": "Test", "tags": ["a", "b"]}
assert body.startswith("# Hello")

def test_without_frontmatter(self, chunker):
content = "# Hello\nBody text"
metadata, body = chunker.parse_yaml_frontmatter(content)
assert metadata == {}
assert body == content.strip()

def test_invalid_yaml(self, chunker):
content = "---\n: invalid: yaml: [[\n---\nBody"
metadata, body = chunker.parse_yaml_frontmatter(content)
assert metadata == {}


class TestChunking:
def test_basic_header_split(self, chunker):
content = "# Section 1\nText A\n# Section 2\nText B"
chunks = chunker.chunk("test.md", content)
assert len(chunks) == 2
assert "Text A" in chunks[0].text
assert "Text B" in chunks[1].text

def test_chunk_ids_are_sequential(self, chunker):
content = "# A\nfoo\n# B\nbar\n# C\nbaz"
chunks = chunker.chunk("doc.md", content)
assert [c.id for c in chunks] == ["doc.md:0", "doc.md:1", "doc.md:2"]

def test_no_headers(self, chunker):
content = "Just some plain text\nwith multiple lines."
chunks = chunker.chunk("plain.md", content)
assert len(chunks) == 1
assert chunks[0].header == ""

def test_large_chunk_is_split(self):
chunker = MarkdownChunker(max_chunk_size=50)
content = "# Header\n" + "word " * 100
chunks = chunker.chunk("big.md", content)
assert len(chunks) > 1
for c in chunks:
assert c.header == "# Header"

def test_position_tracking(self, chunker):
content = "# A\nline1\nline2\n# B\nline3"
chunks = chunker.chunk("pos.md", content)
assert chunks[0].position[0] == 1
assert chunks[1].position[0] == 4

def test_metadata_propagated(self, chunker):
content = "---\nauthor: test\n---\n# A\nfoo\n# B\nbar"
chunks = chunker.chunk("meta.md", content)
for c in chunks:
assert c.metadata == {"author": "test"}

def test_empty_content(self, chunker):
chunks = chunker.chunk("empty.md", "")
assert len(chunks) == 0 or all(c.text.strip() == "" for c in chunks)
45 changes: 31 additions & 14 deletions tools/send_email_to_myself.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,48 +48,65 @@ def load_dotenv():
break

def md_to_html(md_content, title=None, css=None):
html = md_content

# Protect fenced code blocks from markdown transformation
code_blocks = []
def stash_code_block(match):
lang = match.group(1) or ""
code = match.group(2)
code = code.replace("&", "&amp;").replace("<", "&lt;").replace(">", "&gt;")
placeholder = f"\x00CODEBLOCK{len(code_blocks)}\x00"
code_blocks.append(f'<pre><code class="language-{lang}">{code}</code></pre>' if lang else f'<pre><code>{code}</code></pre>')
return placeholder

html = re.sub(r'```(\w*)\n(.*?)```', stash_code_block, md_content, flags=re.DOTALL)

html = re.sub(r'^# (.+)$', r'<h1>\1</h1>', html, flags=re.MULTILINE)
html = re.sub(r'^## (.+)$', r'<h2>\1</h2>', html, flags=re.MULTILINE)
html = re.sub(r'^### (.+)$', r'<h3>\1</h3>', html, flags=re.MULTILINE)

html = re.sub(r'\*\*(.+?)\*\*', r'<strong>\1</strong>', html)
html = re.sub(r'\*(.+?)\*', r'<em>\1</em>', html)
html = re.sub(r'(?<!\*)\*(?!\*)(.+?)(?<!\*)\*(?!\*)', r'<em>\1</em>', html)

html = re.sub(r'\[([^\]]+)\]\(([^)]+)\)', r'<a href="\2">\1</a>', html)

html = re.sub(r'^- (.+)$', r'<li>\1</li>', html, flags=re.MULTILINE)
html = re.sub(r'(<li>.*</li>\n?)+', r'<ul>\g<0></ul>\n', html)

html = re.sub(r'^\d+\. (.+)$', r'<li>\1</li>', html, flags=re.MULTILINE)

html = re.sub(r'^---$', r'<hr>', html, flags=re.MULTILINE)

html = re.sub(r'`([^`]+)`', r'<code>\1</code>', html)

def convert_table(match):
table_content = match.group(0)
lines = table_content.strip().split('\n')
html_table = '<table>\n'
for i, line in enumerate(lines):
if re.match(r'^[\|\-\s]+$', line):
if re.match(r'^[\|\-\:\s]+$', line):
continue
cells = [c.strip() for c in line.split('|')]
cells = [c for c in cells if c] # drop empty from leading/trailing |
if not cells:
continue
cells = [c.strip() for c in line.split('|') if c.strip()]
tag = 'th' if i == 0 else 'td'
row = ''.join(f'<{tag}>{c}</{tag}>' for c in cells)
html_table += f'<tr>{row}</tr>\n'
html_table += '</table>'
return html_table

html = re.sub(r'(\|.+\|\n)+', convert_table, html)

html = re.sub(r'\n\n', '</p><p>', html)
html = '<p>' + html + '</p>'
html = re.sub(r'<p>(<h[123]>.*?</h[123]>)</p>', r'\1', html, flags=re.DOTALL)
html = re.sub(r'<p>(<ul>.*?</ul>)</p>', r'\1', html, flags=re.DOTALL)
html = re.sub(r'<p>(<table>.*?</table>)</p>', r'\1', html, flags=re.DOTALL)
html = re.sub(r'<p>(<hr>)</p>', r'\1', html)

# Restore code blocks
for i, block in enumerate(code_blocks):
html = html.replace(f"\x00CODEBLOCK{i}\x00", block)

title_html = f'<title>{title}</title>' if title else ''
css_html = css if css else CSS_STYLES
Expand Down