diff --git a/README.md b/README.md index 389ee87..bc09a5a 100644 --- a/README.md +++ b/README.md @@ -184,7 +184,7 @@ Most tools only answer this for Claude Code. cc-statistics answers it for all fo ### Prerequisites -- Python 3.8+ +- Python 3.10+ - At least one of: Claude Code CLI, Gemini CLI, Codex CLI, or Cursor installed and used ### 3 steps @@ -212,6 +212,23 @@ pipx install cc-statistics brew install androidZzT/tap/cc-statistics ``` +### Windows Tray Development Preview + +A Windows tray MVP lives in `desktop/cc-stats-tauri/`. It is a Tauri shell that starts the Python web dashboard with `python -m cc_stats_web --no-browser --json`, shows a tray menu, and opens the existing dashboard UI. Statistics, source discovery, parsing, pricing, and API responses stay in Python. + +This preview is for development builds. It does not replace the macOS Swift app, and it does not bundle Python, signing, automatic updates, or installer polish yet. + +```bash +cd desktop/cc-stats-tauri +npm install +npm test +npm run build:web + +cd src-tauri +cargo test +cargo check +``` + --- ## 📖 CLI Reference @@ -244,6 +261,16 @@ All data is read from local files. Nothing is sent over the network. | Cursor | `~/Library/Application Support/Cursor/User/globalStorage/state.vscdb` | | Git Changes | `git log --numstat` in project directory | +### Path Overrides (Cross-Platform / Testing) + +Set these environment variables to read source data from custom locations. Use paths visible to the shell or environment where `cc-stats` runs. + +| Variable | Purpose | +|----------|---------| +| `CC_STATS_CLAUDE_PROJECTS_DIR` | Claude Code project log directory | +| `CC_STATS_CODEX_HOME` | Codex home; `sessions/` is read below it | +| `CC_STATS_GEMINI_HOME` | Gemini home; `tmp/*/chats/` is read below it | + --- ## Acknowledgments diff --git a/README_CN.md b/README_CN.md index efe16e2..8c91762 100644 --- a/README_CN.md +++ b/README_CN.md @@ -184,7 +184,7 @@ ### 前置条件 -- Python 3.8+ +- Python 3.10+ - 已安装并使用过以下至少一种工具:Claude Code CLI、Gemini CLI、Codex CLI 或 Cursor ### 3 步搞定 @@ -212,6 +212,23 @@ pipx install cc-statistics brew install androidZzT/tap/cc-statistics ``` +### Windows 托盘开发预览 + +Windows 托盘 MVP 位于 `desktop/cc-stats-tauri/`。它是一个 Tauri 平台壳,启动 `python -m cc_stats_web --no-browser --json`,提供托盘菜单,并打开现有 Web 仪表盘。统计、数据源发现、解析、定价和 API 响应仍然由 Python 负责。 + +这个预览面向开发构建,不替代现有 macOS Swift 应用,也暂不包含 Python 打包、签名、自动更新或安装器打磨。 + +```bash +cd desktop/cc-stats-tauri +npm install +npm test +npm run build:web + +cd src-tauri +cargo test +cargo check +``` + --- ## 📖 CLI 参考 @@ -244,6 +261,16 @@ cc-stats-app # 启动 macOS 状态栏应用 | Cursor | `~/Library/Application Support/Cursor/User/globalStorage/state.vscdb` | | Git 变更 | 项目目录的 `git log --numstat` | +### 路径覆盖(跨平台 / 测试) + +设置以下环境变量可从自定义位置读取源数据。请使用运行 `cc-stats` 的 shell 或环境可访问的路径。 + +| 变量 | 作用 | +|------|------| +| `CC_STATS_CLAUDE_PROJECTS_DIR` | Claude Code 项目日志目录 | +| `CC_STATS_CODEX_HOME` | Codex home;读取其下的 `sessions/` | +| `CC_STATS_GEMINI_HOME` | Gemini home;读取其下的 `tmp/*/chats/` | + --- ## 致谢 diff --git a/cc_stats/analyzer.py b/cc_stats/analyzer.py index 349b6d5..c015165 100644 --- a/cc_stats/analyzer.py +++ b/cc_stats/analyzer.py @@ -10,7 +10,7 @@ from pathlib import Path from .parser import Message, Session, ToolCall -from .pricing import is_claude_model, match_model_pricing +from .pricing import match_model_pricing # 文件扩展名 → 语言映射 EXT_TO_LANG: dict[str, str] = { @@ -291,10 +291,8 @@ def compute_cache_stats( # savings = cache_read_tokens * (input_price - cache_read_price) / 1M savings_usd = 0.0 for model, usage in token_by_model.items(): - if not is_claude_model(model): - continue pricing = match_model_pricing(model) - savings_per_million = pricing["input"] - pricing["cache_read"] + savings_per_million = max(pricing["input"] - pricing["cache_read"], 0.0) savings_usd += usage.cache_read_input_tokens * savings_per_million / 1_000_000 # 按模型拆分命中率 @@ -368,6 +366,8 @@ def _collect_git_stats( cwd=project_path, capture_output=True, text=True, + encoding="utf-8", + errors="replace", timeout=10, ) if result.returncode != 0: @@ -467,7 +467,7 @@ def classify_work_mode(user_message_count: int, total_added: int, total_removed: return "Building" -def analyze_session(session: Session) -> SessionStats: +def analyze_session(session: Session, *, include_git: bool = True) -> SessionStats: """分析单个会话,返回统计结果""" stats = SessionStats( session_id=session.session_id, @@ -721,7 +721,7 @@ def analyze_session(session: Session) -> SessionStats: stats.lines_by_lang = dict(lang_stats) # -------- 4b. Git 变更统计 -------- - if stats.start_time and stats.end_time and session.project_path: + if include_git and stats.start_time and stats.end_time and session.project_path: git = _collect_git_stats( session.project_path, stats.start_time, stats.end_time ) diff --git a/cc_stats/cli.py b/cc_stats/cli.py index 5342117..687ff18 100644 --- a/cc_stats/cli.py +++ b/cc_stats/cli.py @@ -11,21 +11,19 @@ from . import __version__ from .analyzer import SessionStats, TokenUsage, analyze_session, merge_stats from .formatter import format_skill_stats, format_stats -from .parser import ( - _claude_session_entry_files, - find_codex_sessions, - find_codex_sessions_by_keyword, - find_gemini_sessions, - find_gemini_sessions_by_keyword, - find_sessions, - find_sessions_by_keyword, - parse_session_file, +from .parser import _claude_session_entry_files +from .sources import ( + SourceKind, + collect_session_files, + collect_session_files_by_keyword, + list_projects, + parse_file, ) def _parse_session(path: Path): """根据文件类型选择解析器""" - return parse_session_file(path) + return parse_file(path) def _parse_time_arg(value: str, *, as_end_of_day: bool = False) -> datetime: @@ -298,66 +296,31 @@ def _compare_projects(args) -> None: def _list_projects() -> None: """列出所有已知项目(Claude + Codex + Gemini)""" - has_any = False - - # Claude 项目 - claude_projects = Path.home() / ".claude" / "projects" - if claude_projects.exists(): - print("\n可用项目 (Claude Code):") - print("─" * 60) - for proj in sorted(claude_projects.iterdir()): - if not proj.is_dir(): - continue - jsonl_files = _claude_session_entry_files(proj) - if not jsonl_files: - continue - display_name = _resolve_project_name(proj, jsonl_files) - print(f" {display_name} ({len(jsonl_files)} 个会话)") - has_any = True - - # Codex 项目 - codex_sessions = find_codex_sessions() - if codex_sessions: - from collections import defaultdict - codex_by_dir: dict[str, list[Path]] = defaultdict(list) - for cf in codex_sessions: - try: - session = _parse_session(cf) - key = session.project_path or "Unknown" - except Exception: - key = "Unknown" - codex_by_dir[key].append(cf) - - print("\n可用项目 (Codex):") - print("─" * 60) - for name, files in sorted(codex_by_dir.items()): - display = Path(name).name if "/" in name else name - print(f" {display} ({len(files)} 个会话)") - has_any = True - - # Gemini 项目 - gemini_sessions = find_gemini_sessions() - if gemini_sessions: - # 按项目目录分组 - from collections import defaultdict - gemini_by_dir: dict[str, list[Path]] = defaultdict(list) - for gf in gemini_sessions: - try: - session = _parse_session(gf) - key = session.project_path or gf.parent.parent.name - except Exception: - key = gf.parent.parent.name - gemini_by_dir[key].append(gf) + projects = list_projects() + if not projects: + print("未找到项目数据") + print() + return - print("\n可用项目 (Gemini CLI):") + labels = { + SourceKind.CLAUDE: "Claude Code", + SourceKind.CODEX: "Codex", + SourceKind.GEMINI: "Gemini CLI", + } + by_source: dict[SourceKind, list] = {} + for project in projects: + by_source.setdefault(project.source, []).append(project) + + for source in (SourceKind.CLAUDE, SourceKind.CODEX, SourceKind.GEMINI): + items = by_source.get(source, []) + if not items: + continue + print(f"\n可用项目 ({labels[source]}):") print("─" * 60) - for name, files in sorted(gemini_by_dir.items()): - display = Path(name).name if "/" in name else name - print(f" {display} ({len(files)} 个会话)") - has_any = True - - if not has_any: - print("未找到项目数据") + for project in items: + display_name = project.display_name + display = Path(display_name).name if "/" in display_name or "\\" in display_name else display_name + print(f" {display} ({project.session_count} 个会话)") print() @@ -394,9 +357,7 @@ def _show_rate_limit(args) -> None: from .rate_limiter import analyze_rate_limit # 收集所有会话文件(Claude + Codex + Gemini) - session_files: list[Path] = find_sessions() - session_files.extend(find_codex_sessions()) - session_files.extend(find_gemini_sessions()) + session_files: list[Path] = collect_session_files() if not session_files: print("未找到会话文件。", file=sys.stderr) @@ -446,9 +407,7 @@ def _show_git_integration(args) -> None: sys.exit(1) # 收集所有会话文件 - session_files: list[Path] = find_sessions() - session_files.extend(find_codex_sessions()) - session_files.extend(find_gemini_sessions()) + session_files: list[Path] = collect_session_files() if not session_files: import sys @@ -702,24 +661,18 @@ def main(argv: list[str] | None = None) -> None: if p.is_file() and p.suffix in (".jsonl", ".json"): session_files = [p] elif p.is_dir(): - session_files = find_sessions(p) - session_files.extend(find_codex_sessions(p)) + session_files = collect_session_files(project_dir=p) if not session_files: # 作为关键词模糊搜索(Claude + Codex + Gemini) - session_files = find_sessions_by_keyword(args.path) - session_files.extend(find_codex_sessions_by_keyword(args.path)) - session_files.extend(find_gemini_sessions_by_keyword(args.path)) + session_files = collect_session_files_by_keyword(args.path) if not session_files: print(f"找不到: {args.path}", file=sys.stderr) sys.exit(1) elif args.all: - session_files = find_sessions() - session_files.extend(find_codex_sessions()) - session_files.extend(find_gemini_sessions()) + session_files = collect_session_files() else: # 默认:当前目录 - session_files = find_sessions(Path.cwd()) - session_files.extend(find_codex_sessions(Path.cwd())) + session_files = collect_session_files(project_dir=Path.cwd()) # 去重(保留原顺序) session_files = list(dict.fromkeys(session_files)) diff --git a/cc_stats/exporter.py b/cc_stats/exporter.py index d4f6dd5..7d9da3f 100644 --- a/cc_stats/exporter.py +++ b/cc_stats/exporter.py @@ -5,14 +5,8 @@ from datetime import datetime, timezone from pathlib import Path -from .parser import ( - Message, - Session, - find_codex_sessions, - find_gemini_sessions, - find_sessions, - parse_session_file, -) +from .parser import Message, Session +from .sources import collect_session_files, parse_file def _extract_text(content) -> str: @@ -121,9 +115,7 @@ def find_and_export(keyword: str, output: str | None = None, include_tools: 是否包含工具调用 """ # 搜索所有会话(Claude + Codex + Gemini) - all_files: list[Path] = list(find_sessions()) - all_files.extend(find_codex_sessions()) - all_files.extend(find_gemini_sessions()) + all_files: list[Path] = collect_session_files() # 先按 session ID 前缀匹配 matched = None @@ -136,7 +128,7 @@ def find_and_export(keyword: str, output: str | None = None, if not matched: for f in sorted(all_files, key=lambda p: p.stat().st_mtime, reverse=True): try: - session = parse_session_file(f) + session = parse_file(f) for msg in session.messages: text = _extract_text(msg.content) if keyword.lower() in text.lower(): @@ -150,7 +142,7 @@ def find_and_export(keyword: str, output: str | None = None, if not matched: return None - session = parse_session_file(matched) + session = parse_file(matched) md = export_session(session, include_tools=include_tools) if output: diff --git a/cc_stats/parser.py b/cc_stats/parser.py index df77b3a..a709861 100644 --- a/cc_stats/parser.py +++ b/cc_stats/parser.py @@ -3,9 +3,12 @@ from __future__ import annotations import json +import os +import sqlite3 from dataclasses import dataclass, field from pathlib import Path from typing import Any +from urllib.parse import unquote, urlparse @dataclass @@ -164,7 +167,22 @@ def _path_to_dirname(path: Path) -> str: 例如 /Users/foo/bar → -Users-foo-bar """ - return str(path.resolve()).replace("/", "-") + return str(path.resolve()).replace("\\", "-").replace("/", "-") + + +def _normalized_project_path(path: Path | str) -> str: + try: + resolved = str(Path(path).expanduser().resolve()) + except OSError: + resolved = str(Path(path).expanduser()) + return os.path.normcase(resolved) + + +def _home_dir() -> Path: + home = os.environ.get("HOME") + if home: + return Path(home).expanduser() + return Path.home() def _is_subagent_file(path: Path) -> bool: @@ -203,12 +221,16 @@ def _claude_session_entry_files(project_path: Path) -> list[Path]: return top_level + orphan_subagents -def find_sessions(project_dir: Path | None = None) -> list[Path]: +def find_sessions( + project_dir: Path | None = None, + *, + projects_dir: Path | None = None, +) -> list[Path]: """查找 ~/.claude/projects/ 下所有 JSONL 会话文件 如果指定 project_dir,只返回匹配的项目。 """ - claude_projects = Path.home() / ".claude" / "projects" + claude_projects = projects_dir or _home_dir() / ".claude" / "projects" if not claude_projects.exists(): return [] @@ -226,11 +248,15 @@ def find_sessions(project_dir: Path | None = None) -> list[Path]: return results -def find_sessions_by_keyword(keyword: str) -> list[Path]: +def find_sessions_by_keyword( + keyword: str, + *, + projects_dir: Path | None = None, +) -> list[Path]: """按关键词模糊匹配项目,在目录名和 JSONL 中的 cwd 中搜索""" import json - claude_projects = Path.home() / ".claude" / "projects" + claude_projects = projects_dir or _home_dir() / ".claude" / "projects" if not claude_projects.exists(): return [] @@ -683,9 +709,14 @@ def _read_codex_session_meta(path: Path) -> dict[str, Any]: return {} -def find_codex_sessions(project_dir: Path | None = None) -> list[Path]: +def find_codex_sessions( + project_dir: Path | None = None, + *, + codex_home_dir: Path | None = None, +) -> list[Path]: """查找 ~/.codex/sessions/YYYY/MM/DD/rollout-*.jsonl 会话文件""" - base = Path.home() / ".codex" / "sessions" + codex_home = codex_home_dir or _home_dir() / ".codex" + base = codex_home / "sessions" if not base.exists(): return [] @@ -693,10 +724,7 @@ def find_codex_sessions(project_dir: Path | None = None) -> list[Path]: if project_dir is None: return all_files - try: - target = str(project_dir.expanduser().resolve()) - except OSError: - target = str(project_dir) + target = _normalized_project_path(project_dir) results: list[Path] = [] for path in all_files: @@ -704,21 +732,22 @@ def find_codex_sessions(project_dir: Path | None = None) -> list[Path]: cwd = meta.get("cwd", "") if not isinstance(cwd, str) or not cwd: continue - try: - normalized = str(Path(cwd).expanduser().resolve()) - except OSError: - normalized = cwd + normalized = _normalized_project_path(cwd) if normalized == target: results.append(path) return results -def find_codex_sessions_by_keyword(keyword: str) -> list[Path]: +def find_codex_sessions_by_keyword( + keyword: str, + *, + codex_home_dir: Path | None = None, +) -> list[Path]: """按关键词搜索 Codex 会话(路径/cwd/用户消息内容)""" keyword_lower = keyword.lower() results: list[Path] = [] - for path in find_codex_sessions(): + for path in find_codex_sessions(codex_home_dir=codex_home_dir): if keyword_lower in str(path).lower(): results.append(path) continue @@ -845,6 +874,110 @@ def parse_gemini_json(path: Path) -> Session: ) +def parse_gemini_jsonl(path: Path) -> Session: + """Parse Gemini CLI JSONL session files written under ~/.gemini/tmp/*/chats.""" + session_id = path.stem + project_path = "" + messages: list[Message] = [] + + with open(path, encoding="utf-8") as f: + for line in f: + line = line.strip() + if not line: + continue + try: + record = json.loads(line) + except json.JSONDecodeError: + continue + + if "sessionId" in record: + session_id = record.get("sessionId") or session_id + if not project_path: + project_path = _gemini_jsonl_project_path(path) + continue + + msg_type = record.get("type", "") + timestamp = record.get("timestamp", "") + if msg_type == "user": + messages.append(Message( + role="user", + timestamp=timestamp, + content=_extract_gemini_content(record.get("content")), + session_id=session_id, + message_id=record.get("id", ""), + )) + elif msg_type == "gemini": + tool_calls: list[ToolCall] = [] + for tc in record.get("toolCalls", []) or []: + if not isinstance(tc, dict): + continue + raw_name = tc.get("name", "") + mapped_name = _GEMINI_TOOL_MAP.get(raw_name, raw_name) + tool_calls.append(ToolCall( + name=mapped_name, + input=tc.get("args", {}), + timestamp=tc.get("timestamp", timestamp), + tool_use_id=tc.get("id", ""), + )) + + usage: dict[str, Any] = {} + tokens = record.get("tokens") + if isinstance(tokens, dict): + usage = { + "input_tokens": tokens.get("input", 0), + "output_tokens": tokens.get("output", 0), + "cache_read_input_tokens": tokens.get("cached", 0), + "cache_creation_input_tokens": 0, + } + + messages.append(Message( + role="assistant", + timestamp=timestamp, + content=_extract_gemini_content(record.get("content")), + model=record.get("model"), + usage=usage, + tool_calls=tool_calls, + session_id=session_id, + message_id=record.get("id", ""), + )) + + if not project_path: + project_path = _gemini_jsonl_project_path(path) + + return Session( + session_id=session_id, + project_path=project_path, + file_path=path, + source="gemini", + messages=messages, + ) + + +def _gemini_jsonl_project_path(path: Path) -> str: + project_root = path.parent.parent / ".project_root" + try: + value = project_root.read_text(encoding="utf-8").strip() + if value: + return value + except OSError: + pass + + project_slug = path.parent.parent.name + projects_json = path.parent.parent.parent.parent / "projects.json" + try: + data = json.loads(projects_json.read_text(encoding="utf-8")) + except (json.JSONDecodeError, OSError): + return "" + + projects = data.get("projects", {}) + if not isinstance(projects, dict): + return "" + for project_path, slug in projects.items(): + if slug == project_slug: + return project_path + return "" + + def _extract_gemini_content(raw: Any) -> Any: """提取 Gemini 消息内容(可能是字符串或 Part 列表)""" if isinstance(raw, str): @@ -858,9 +991,13 @@ def _extract_gemini_content(raw: Any) -> Any: return raw or "" -def find_gemini_sessions() -> list[Path]: +def find_gemini_sessions( + *, + gemini_home_dir: Path | None = None, +) -> list[Path]: """查找 ~/.gemini/tmp/*/chats/*.json 会话文件""" - gemini_dir = Path.home() / ".gemini" / "tmp" + gemini_home = gemini_home_dir or _home_dir() / ".gemini" + gemini_dir = gemini_home / "tmp" if not gemini_dir.exists(): return [] @@ -868,15 +1005,19 @@ def find_gemini_sessions() -> list[Path]: for chats_dir in gemini_dir.glob("*/chats"): if not chats_dir.is_dir(): continue - for json_file in sorted(chats_dir.glob("*.json")): - results.append(json_file) + results.extend(sorted(chats_dir.glob("*.json"))) + results.extend(sorted(chats_dir.glob("*.jsonl"))) return results -def find_gemini_sessions_by_keyword(keyword: str) -> list[Path]: +def find_gemini_sessions_by_keyword( + keyword: str, + *, + gemini_home_dir: Path | None = None, +) -> list[Path]: """按关键词搜索 Gemini 会话(在 directories 和内容中搜索)""" - all_sessions = find_gemini_sessions() + all_sessions = find_gemini_sessions(gemini_home_dir=gemini_home_dir) if not all_sessions: return [] @@ -885,25 +1026,338 @@ def find_gemini_sessions_by_keyword(keyword: str) -> list[Path]: for path in all_sessions: try: - with open(path, encoding="utf-8") as f: - data = json.load(f) - dirs = data.get("directories", []) - if any(keyword_lower in d.lower() for d in dirs): + session = parse_session_file(path) + if keyword_lower in session.project_path.lower(): results.append(path) continue - summary = data.get("summary", "") - if summary and keyword_lower in summary.lower(): + content = "\n".join( + str(message.content) + for message in session.messages + if message.content + ) + if keyword_lower in content.lower(): results.append(path) - except (json.JSONDecodeError, OSError): + except (ValueError, OSError): continue return results +def _looks_like_gemini_jsonl(path: Path) -> bool: + if path.suffix != ".jsonl" or path.parent.name != "chats": + return False + try: + with open(path, encoding="utf-8") as f: + for line in f: + if not line.strip(): + continue + obj = json.loads(line) + return "sessionId" in obj and "projectHash" in obj + except (json.JSONDecodeError, OSError): + return False + return False + + +# Cursor SQLite parsing + + +def find_cursor_sessions( + *, + cursor_state_db_path: Path | None = None, +) -> list[Path]: + db_path = cursor_state_db_path or _home_dir() / ".config" / "Cursor" / "User" / "globalStorage" / "state.vscdb" + return [db_path] if db_path.exists() else [] + + +def parse_cursor_db(path: Path) -> Session: + """Parse Cursor's global SQLite state DB as one aggregate session.""" + sessions = parse_cursor_sessions(path) + messages: list[Message] = [] + project_path = "" + for session in sessions: + if not project_path and session.project_path: + project_path = session.project_path + messages.extend(session.messages) + return Session( + session_id="cursor", + project_path=project_path or "Cursor", + file_path=path, + source="cursor", + messages=messages, + ) + + +def parse_cursor_sessions(path: Path) -> list[Session]: + """Parse Cursor composer sessions from User/globalStorage/state.vscdb.""" + if not path.exists(): + return [] + + try: + con = sqlite3.connect(f"file:{path}?mode=ro", uri=True) + except sqlite3.Error: + return [] + + try: + rows = con.execute( + "SELECT key, value FROM cursorDiskKV WHERE key LIKE 'composerData:%'" + ).fetchall() + bubbles_by_composer = _cursor_bubbles_by_composer(con) + sessions: list[Session] = [] + for key, raw_value in rows: + composer = _cursor_json(raw_value) + if not isinstance(composer, dict): + continue + composer_id = str(composer.get("composerId") or str(key).split(":", 1)[-1]) + session = _parse_cursor_composer( + bubbles_by_composer.get(composer_id, {}), + path, + str(key), + composer, + ) + if session is not None: + sessions.append(session) + return sorted( + sessions, + key=lambda s: next((m.timestamp for m in s.messages if m.timestamp), ""), + ) + except sqlite3.Error: + return [] + finally: + con.close() + + +def _parse_cursor_composer( + bubbles: dict[str, dict[str, Any]], + db_path: Path, + key: str, + composer: dict[str, Any], +) -> Session | None: + composer_id = str(composer.get("composerId") or key.split(":", 1)[-1]) + if not composer_id: + return None + + model = _cursor_model(composer) + default_ts = _cursor_timestamp(composer.get("createdAt")) + messages: list[Message] = [] + project_path = "" + + headers = composer.get("fullConversationHeadersOnly") + if not isinstance(headers, list) or not headers: + conversation_map = composer.get("conversationMap") + if isinstance(conversation_map, dict): + headers = [ + {"bubbleId": bubble_id} + for bubble_id in conversation_map.keys() + if isinstance(bubble_id, str) + ] + else: + headers = [] + + for header in headers: + if not isinstance(header, dict): + continue + bubble_id = header.get("bubbleId") + if not isinstance(bubble_id, str) or not bubble_id: + continue + bubble = bubbles.get(bubble_id, {}) + if not isinstance(bubble, dict): + bubble = {} + bubble_type = bubble.get("type", header.get("type")) + role = "user" if bubble_type == 1 else "assistant" if bubble_type == 2 else "" + if not role: + continue + + if not project_path: + project_path = _cursor_project_path(bubble) or _cursor_project_path(composer) + + timestamp = _cursor_timestamp(bubble.get("createdAt")) or default_ts + bubble_model = _cursor_model(bubble) or model + usage: dict[str, Any] = {} + if role == "assistant": + usage = _cursor_usage(bubble.get("tokenCount")) + + messages.append(Message( + role=role, + timestamp=timestamp, + content=_cursor_text(bubble), + model=bubble_model or None, + usage=usage, + session_id=composer_id, + message_id=bubble_id, + )) + + added = _to_int(composer.get("totalLinesAdded", 0)) + removed = _to_int(composer.get("totalLinesRemoved", 0)) + if added or removed: + timestamp = _cursor_timestamp(composer.get("lastUpdatedAt")) or default_ts + messages.append(Message( + role="assistant", + timestamp=timestamp, + content="", + model=model or None, + tool_calls=[ + ToolCall( + name="Edit", + input={ + "target_file": "cursor://composer", + "old_string": _cursor_line_blob(removed), + "new_string": _cursor_line_blob(added), + }, + timestamp=timestamp, + ) + ], + is_meta=True, + session_id=composer_id, + )) + + if not messages: + return None + if not project_path: + project_path = _cursor_project_path(composer) or "Cursor" + + return Session( + session_id=composer_id, + project_path=project_path, + file_path=db_path, + source="cursor", + messages=messages, + ) + + +def _cursor_bubbles_by_composer( + con: sqlite3.Connection, +) -> dict[str, dict[str, dict[str, Any]]]: + bubbles: dict[str, dict[str, dict[str, Any]]] = {} + try: + rows = con.execute( + "SELECT key, value FROM cursorDiskKV WHERE key LIKE 'bubbleId:%'", + ).fetchall() + except sqlite3.Error: + return bubbles + + for key, raw_value in rows: + parts = str(key).split(":", 2) + if len(parts) != 3: + continue + _, composer_id, bubble_id = parts + bubble = _cursor_json(raw_value) + if isinstance(bubble, dict): + bubbles.setdefault(composer_id, {})[bubble_id] = bubble + return bubbles + + +def _cursor_json(value: Any) -> Any: + if isinstance(value, bytes): + text = value.decode("utf-8", errors="replace") + else: + text = str(value) + try: + return json.loads(text) + except json.JSONDecodeError: + return None + + +def _cursor_model(record: dict[str, Any]) -> str: + model_info = record.get("modelInfo") + if isinstance(model_info, dict): + model_name = model_info.get("modelName") + if isinstance(model_name, str) and model_name: + return model_name + model_config = record.get("modelConfig") + if isinstance(model_config, dict): + model_name = model_config.get("modelName") + if isinstance(model_name, str) and model_name: + return model_name + return "" + + +def _cursor_usage(token_count: Any) -> dict[str, Any]: + if not isinstance(token_count, dict): + return {} + return { + "input_tokens": _to_int(token_count.get("inputTokens", 0)), + "output_tokens": _to_int(token_count.get("outputTokens", 0)), + "cache_read_input_tokens": 0, + "cache_creation_input_tokens": 0, + } + + +def _cursor_timestamp(value: Any) -> str: + if isinstance(value, (int, float)): + try: + from datetime import datetime, timezone + + return datetime.fromtimestamp(value / 1000, tz=timezone.utc).isoformat() + except (OSError, ValueError): + return "" + if isinstance(value, str): + return value + return "" + + +def _cursor_text(record: dict[str, Any]) -> str: + text = record.get("text") + if isinstance(text, str) and text: + return text + rich = record.get("richText") + if isinstance(rich, str) and rich: + return rich + return "" + + +def _cursor_project_path(record: dict[str, Any]) -> str: + uris = record.get("workspaceUris") + if isinstance(uris, list): + for uri in uris: + if not isinstance(uri, str): + continue + path = _file_uri_to_path(uri) + if path: + return path + + workspace = record.get("workspaceProjectDir") + if isinstance(workspace, str) and workspace: + return workspace + + attached = record.get("allAttachedFileCodeChunksUris") + if isinstance(attached, list): + for uri in attached: + if isinstance(uri, str): + path = _file_uri_to_path(uri) + if path: + return str(Path(path).parent) + + return "" + + +def _file_uri_to_path(uri: str) -> str: + parsed = urlparse(uri) + if parsed.scheme != "file": + return "" + raw_path = unquote(parsed.path) + if os.name == "nt" and raw_path.startswith("/") and len(raw_path) > 2 and raw_path[2] == ":": + raw_path = raw_path[1:] + return os.path.normpath(raw_path) + + +def _cursor_line_blob(count: int) -> str: + if count <= 0: + return "" + return "\n".join("x" for _ in range(count)) + + +def _looks_like_cursor_db(path: Path) -> bool: + return path.name == "state.vscdb" + + def parse_session_file(path: Path) -> Session: """自动识别并解析会话文件(Claude / Codex / Gemini)""" + if _looks_like_cursor_db(path): + return parse_cursor_db(path) if path.suffix == ".json": return parse_gemini_json(path) + if _looks_like_gemini_jsonl(path): + return parse_gemini_jsonl(path) if _looks_like_codex_jsonl(path): return parse_codex_jsonl(path) return parse_jsonl(path) diff --git a/cc_stats/pricing.py b/cc_stats/pricing.py index 37b137b..9dc3aed 100644 --- a/cc_stats/pricing.py +++ b/cc_stats/pricing.py @@ -14,19 +14,25 @@ class Pricing(TypedDict): cache_create: float -# 价格来源(2026-04-16 校准): +# 价格来源(2026-06-10 校准): # - OpenAI: https://developers.openai.com/api/docs/pricing # - Anthropic: https://platform.claude.com/docs/en/about-claude/pricing # - Gemini: https://ai.google.dev/gemini-api/docs/pricing # # 注: -# - Gemini 2.5 Pro/Flash 按 <=200k context 档位计算(日志中无法精确区分每次请求是否 >200k)。 +# - 默认按 Standard / Paid / short context 或 <=200k 档位计算。 +# - Batch/Flex/Priority/Fast mode/Data residency/长上下文等需要请求模式或上下文字段,当前日志无法稳定区分。 # - OpenAI 暂无“cache write”单独价格字段,cache_create 退化为 input 单价。 MODEL_PRICING: dict[str, Pricing] = { # Claude + "claude-fable-5": {"input": 10.0, "output": 50.0, "cache_read": 1.00, "cache_create": 12.50}, + "claude-mythos-5": {"input": 10.0, "output": 50.0, "cache_read": 1.00, "cache_create": 12.50}, + "claude-opus-4.8": {"input": 5.0, "output": 25.0, "cache_read": 0.50, "cache_create": 6.25}, + "claude-opus-4.7": {"input": 5.0, "output": 25.0, "cache_read": 0.50, "cache_create": 6.25}, "claude-opus-4.6": {"input": 5.0, "output": 25.0, "cache_read": 0.50, "cache_create": 6.25}, "claude-opus-4.5": {"input": 5.0, "output": 25.0, "cache_read": 0.50, "cache_create": 6.25}, "claude-opus-4.1": {"input": 15.0, "output": 75.0, "cache_read": 1.50, "cache_create": 18.75}, + "claude-opus-4": {"input": 15.0, "output": 75.0, "cache_read": 1.50, "cache_create": 18.75}, "claude-sonnet-4.6": {"input": 3.0, "output": 15.0, "cache_read": 0.30, "cache_create": 3.75}, "claude-sonnet-4.5": {"input": 3.0, "output": 15.0, "cache_read": 0.30, "cache_create": 3.75}, "claude-sonnet-4": {"input": 3.0, "output": 15.0, "cache_read": 0.30, "cache_create": 3.75}, @@ -34,11 +40,15 @@ class Pricing(TypedDict): # 兼容旧会话(历史模型) "claude-haiku-legacy": {"input": 0.8, "output": 4.0, "cache_read": 0.08, "cache_create": 1.0}, # OpenAI (GPT/Codex) + "gpt-5.5": {"input": 5.00, "output": 30.00, "cache_read": 0.50, "cache_create": 5.00}, + "gpt-5.5-pro": {"input": 30.00, "output": 180.00, "cache_read": 30.00, "cache_create": 30.00}, "gpt-5.4": {"input": 2.50, "output": 15.00, "cache_read": 0.25, "cache_create": 2.50}, "gpt-5.4-mini": {"input": 0.75, "output": 4.50, "cache_read": 0.075, "cache_create": 0.75}, "gpt-5.4-nano": {"input": 0.20, "output": 1.25, "cache_read": 0.020, "cache_create": 0.20}, + "gpt-5.4-pro": {"input": 30.00, "output": 180.00, "cache_read": 30.00, "cache_create": 30.00}, + "chat-latest": {"input": 5.00, "output": 30.00, "cache_read": 0.50, "cache_create": 5.00}, "gpt-5.3-codex": {"input": 1.75, "output": 14.00, "cache_read": 0.175, "cache_create": 1.75}, - "gpt-5.3-chat-latest": {"input": 1.75, "output": 14.00, "cache_read": 0.175, "cache_create": 1.75}, + "gpt-5.3-chat-latest": {"input": 5.00, "output": 30.00, "cache_read": 0.50, "cache_create": 5.00}, # 兼容旧会话(历史模型) "gpt-4o": {"input": 2.50, "output": 10.00, "cache_read": 1.25, "cache_create": 2.50}, "gpt-4o-mini": {"input": 0.15, "output": 0.60, "cache_read": 0.075, "cache_create": 0.15}, @@ -47,6 +57,10 @@ class Pricing(TypedDict): "o3-mini": {"input": 1.10, "output": 4.40, "cache_read": 0.55, "cache_create": 1.10}, "o4-mini": {"input": 1.10, "output": 4.40, "cache_read": 0.55, "cache_create": 1.10}, # Gemini + "gemini-3.5-flash": {"input": 1.50, "output": 9.00, "cache_read": 0.15, "cache_create": 1.50}, + "gemini-3.1-pro": {"input": 2.00, "output": 12.00, "cache_read": 0.20, "cache_create": 2.00}, + "gemini-3.1-flash-lite": {"input": 0.25, "output": 1.50, "cache_read": 0.025, "cache_create": 0.25}, + "gemini-3-flash": {"input": 0.50, "output": 3.00, "cache_read": 0.05, "cache_create": 0.50}, "gemini-2.5-pro": {"input": 1.25, "output": 10.00, "cache_read": 0.125, "cache_create": 1.25}, "gemini-2.5-flash": {"input": 0.30, "output": 2.50, "cache_read": 0.03, "cache_create": 0.30}, "gemini-2.5-flash-lite": {"input": 0.10, "output": 0.40, "cache_read": 0.01, "cache_create": 0.10}, @@ -60,18 +74,28 @@ def match_model_pricing(model: str) -> Pricing: lower = model.lower() # OpenAI / Codex + if "gpt-5.5-pro" in lower: + return MODEL_PRICING["gpt-5.5-pro"] + if "gpt-5.5" in lower: + return MODEL_PRICING["gpt-5.5"] + if "gpt-5.4-pro" in lower: + return MODEL_PRICING["gpt-5.4-pro"] if "gpt-5.4-mini" in lower: return MODEL_PRICING["gpt-5.4-mini"] if "gpt-5.4-nano" in lower: return MODEL_PRICING["gpt-5.4-nano"] if "gpt-5.4" in lower: return MODEL_PRICING["gpt-5.4"] + if "chat-latest" in lower: + return MODEL_PRICING["chat-latest"] if "gpt-5.3-chat-latest" in lower: return MODEL_PRICING["gpt-5.3-chat-latest"] if "gpt-5.3-codex" in lower: return MODEL_PRICING["gpt-5.3-codex"] if "gpt-5" in lower and "codex" in lower: return MODEL_PRICING["gpt-5.3-codex"] + if "gpt-5" in lower: + return MODEL_PRICING["gpt-5.5"] if "gpt-4o-mini" in lower: return MODEL_PRICING["gpt-4o-mini"] if "gpt-4o" in lower: @@ -86,6 +110,16 @@ def match_model_pricing(model: str) -> Pricing: return MODEL_PRICING["o1"] # Gemini + if "gemini-3.5-flash" in lower: + return MODEL_PRICING["gemini-3.5-flash"] + if "gemini-3.1-pro" in lower: + return MODEL_PRICING["gemini-3.1-pro"] + if "gemini-3.1-flash-lite" in lower: + return MODEL_PRICING["gemini-3.1-flash-lite"] + if "gemini-3-flash" in lower: + return MODEL_PRICING["gemini-3-flash"] + if "gemini-3" in lower: + return MODEL_PRICING["gemini-3.5-flash"] if "gemini-2.5-pro" in lower: return MODEL_PRICING["gemini-2.5-pro"] if "gemini-2.5-flash-lite" in lower: @@ -98,12 +132,24 @@ def match_model_pricing(model: str) -> Pricing: return MODEL_PRICING["gemini-2.5-flash"] # Claude + if "fable" in lower: + return MODEL_PRICING["claude-fable-5"] + if "mythos" in lower: + return MODEL_PRICING["claude-mythos-5"] if "opus" in lower: + if "4.8" in lower or "4-8" in lower: + return MODEL_PRICING["claude-opus-4.8"] + if "4.7" in lower or "4-7" in lower: + return MODEL_PRICING["claude-opus-4.7"] if "4.6" in lower or "4-6" in lower: return MODEL_PRICING["claude-opus-4.6"] if "4.5" in lower or "4-5" in lower: return MODEL_PRICING["claude-opus-4.5"] - return MODEL_PRICING["claude-opus-4.1"] + if "4.1" in lower or "4-1" in lower: + return MODEL_PRICING["claude-opus-4.1"] + if "4" in lower: + return MODEL_PRICING["claude-opus-4"] + return MODEL_PRICING["claude-opus-4.8"] if "haiku" in lower: if "4.5" in lower or "4-5" in lower: return MODEL_PRICING["claude-haiku-4.5"] @@ -117,15 +163,22 @@ def match_model_pricing(model: str) -> Pricing: # 厂商回退(防止历史脏数据导致费用完全丢失) if "gpt" in lower or lower.startswith("o"): - return MODEL_PRICING["gpt-5.3-codex"] + return MODEL_PRICING["gpt-5.5"] if "gemini" in lower: - return MODEL_PRICING["gemini-2.5-flash"] + return MODEL_PRICING["gemini-3.5-flash"] return MODEL_PRICING["claude-sonnet-4.6"] def is_claude_model(model: str) -> bool: lower = model.lower() - return "claude" in lower or "sonnet" in lower or "opus" in lower or "haiku" in lower + return ( + "claude" in lower + or "sonnet" in lower + or "opus" in lower + or "haiku" in lower + or "fable" in lower + or "mythos" in lower + ) def estimate_cost_from_token_by_model(token_by_model: dict[str, Any]) -> float: diff --git a/cc_stats/reporter.py b/cc_stats/reporter.py index dd59d7b..708bb28 100644 --- a/cc_stats/reporter.py +++ b/cc_stats/reporter.py @@ -7,17 +7,12 @@ from pathlib import Path from .analyzer import SessionStats, TokenUsage, analyze_session, merge_stats -from .parser import ( - find_codex_sessions, - find_gemini_sessions, - find_sessions, - parse_session_file, -) from .pricing import ( Pricing, estimate_cost_from_token_by_model, match_model_pricing, ) +from .sources import collect_session_files, parse_file def _match_pricing(model: str) -> Pricing: @@ -109,11 +104,7 @@ def generate_report(period: str = "week") -> str: end_str = now.astimezone().strftime("%Y-%m-%d") # 收集所有会话(Claude + Codex + Gemini) - session_files: list[Path] = [ - f for f in find_sessions() if not f.name.startswith("agent-") - ] - session_files.extend(find_codex_sessions()) - session_files.extend(find_gemini_sessions()) + session_files: list[Path] = collect_session_files() session_files.sort(key=lambda f: f.stat().st_mtime) all_stats: list[SessionStats] = [] @@ -121,7 +112,7 @@ def generate_report(period: str = "week") -> str: for f in session_files: try: - session = parse_session_file(f) + session = parse_file(f) stats = analyze_session(session) if stats.end_time and stats.end_time < since: continue @@ -268,7 +259,7 @@ def generate_report(period: str = "week") -> str: prev_stats: list[SessionStats] = [] for f in session_files: try: - session = parse_session_file(f) + session = parse_file(f) stats_item = analyze_session(session) if stats_item.end_time and prev_since <= stats_item.end_time < since: prev_stats.append(stats_item) diff --git a/cc_stats/sources.py b/cc_stats/sources.py new file mode 100644 index 0000000..2e80b8a --- /dev/null +++ b/cc_stats/sources.py @@ -0,0 +1,259 @@ +"""Unified session source registry for Claude, Codex, and Gemini.""" + +from __future__ import annotations + +import os +from dataclasses import dataclass +from enum import Enum +from pathlib import Path + +from cc_stats.parser import ( + Session, + find_cursor_sessions, + find_codex_sessions, + find_codex_sessions_by_keyword, + find_gemini_sessions, + find_gemini_sessions_by_keyword, + find_sessions, + find_sessions_by_keyword, + parse_cursor_sessions, + parse_session_file, +) + + +class SourceKind(str, Enum): + ALL = "all" + CLAUDE = "claude" + CODEX = "codex" + GEMINI = "gemini" + CURSOR = "cursor" + + +@dataclass(frozen=True) +class SourceProject: + source: SourceKind + key: str + display_name: str + session_count: int + last_modified: float + + +def claude_projects_dir() -> Path: + return _env_path("CC_STATS_CLAUDE_PROJECTS_DIR", Path.home() / ".claude" / "projects") + + +def codex_home() -> Path: + return _env_path("CC_STATS_CODEX_HOME", Path.home() / ".codex") + + +def gemini_home() -> Path: + return _env_path("CC_STATS_GEMINI_HOME", Path.home() / ".gemini") + + +def cursor_state_db() -> Path: + raw = os.environ.get("CC_STATS_CURSOR_STATE_DB", "").strip() + if raw: + return Path(raw).expanduser() + user_dir = _env_path("CC_STATS_CURSOR_USER_DIR", _default_cursor_user_dir()) + return user_dir / "globalStorage" / "state.vscdb" + + +def _default_cursor_user_dir() -> Path: + appdata = os.environ.get("APPDATA", "").strip() + if appdata: + return Path(appdata) / "Cursor" / "User" + if os.name == "nt": + return Path.home() / "AppData" / "Roaming" / "Cursor" / "User" + if sys_platform := os.environ.get("XDG_CONFIG_HOME", "").strip(): + return Path(sys_platform) / "Cursor" / "User" + return Path.home() / ".config" / "Cursor" / "User" + + +def _env_path(name: str, default: Path) -> Path: + raw = os.environ.get(name, "").strip() + return Path(raw).expanduser() if raw else default + + +def normalize_source(source: SourceKind | str | None) -> SourceKind: + if source is None or source == "": + return SourceKind.ALL + if isinstance(source, SourceKind): + return source + value = str(source).strip().lower() + if value in {"claude-code", "claude_code"}: + value = SourceKind.CLAUDE.value + try: + return SourceKind(value) + except ValueError as exc: + allowed = ", ".join(kind.value for kind in SourceKind) + raise ValueError(f"Unknown source {source!r}; expected one of: {allowed}") from exc + + +def active_sources(source: SourceKind | str | None = None) -> tuple[SourceKind, ...]: + normalized = normalize_source(source) + if normalized == SourceKind.ALL: + return (SourceKind.CLAUDE, SourceKind.CODEX, SourceKind.GEMINI, SourceKind.CURSOR) + return (normalized,) + + +def collect_session_files( + source: SourceKind | str | None = None, + project_dir: Path | None = None, +) -> list[Path]: + files: list[Path] = [] + for kind in active_sources(source): + if kind == SourceKind.CLAUDE: + files.extend(find_sessions(project_dir, projects_dir=claude_projects_dir())) + elif kind == SourceKind.CODEX: + files.extend(find_codex_sessions(project_dir, codex_home_dir=codex_home())) + elif kind == SourceKind.GEMINI: + if project_dir is None: + files.extend(find_gemini_sessions(gemini_home_dir=gemini_home())) + else: + files.extend(_filter_sessions_by_project( + find_gemini_sessions(gemini_home_dir=gemini_home()), + project_dir, + )) + elif kind == SourceKind.CURSOR: + cursor_files = find_cursor_sessions(cursor_state_db_path=cursor_state_db()) + if project_dir is None: + files.extend(cursor_files) + else: + files.extend(_filter_sessions_by_project(cursor_files, project_dir)) + return list(dict.fromkeys(files)) + + +def collect_session_files_by_keyword( + keyword: str, + source: SourceKind | str | None = None, +) -> list[Path]: + files: list[Path] = [] + for kind in active_sources(source): + if kind == SourceKind.CLAUDE: + files.extend(find_sessions_by_keyword(keyword, projects_dir=claude_projects_dir())) + elif kind == SourceKind.CODEX: + files.extend(find_codex_sessions_by_keyword(keyword, codex_home_dir=codex_home())) + elif kind == SourceKind.GEMINI: + files.extend(find_gemini_sessions_by_keyword(keyword, gemini_home_dir=gemini_home())) + elif kind == SourceKind.CURSOR: + files.extend(_find_cursor_sessions_by_keyword(keyword)) + return list(dict.fromkeys(files)) + + +def list_projects(source: SourceKind | str | None = None) -> list[SourceProject]: + groups: dict[tuple[SourceKind, str], _ProjectGroup] = {} + for path in collect_session_files(source=source): + try: + sessions = parse_sessions(path) + except (OSError, ValueError): + continue + for session in sessions: + kind = normalize_source(session.source) + key = _project_key(path, session, kind) + display_name = session.project_path or key + last_modified = _mtime(path) + group_key = (kind, key) + if group_key not in groups: + groups[group_key] = _ProjectGroup( + source=kind, + key=key, + display_name=display_name, + session_count=0, + last_modified=last_modified, + ) + group = groups[group_key] + group.session_count += 1 + group.last_modified = max(group.last_modified, last_modified) + if session.project_path: + group.display_name = session.project_path + + return [ + SourceProject( + source=group.source, + key=group.key, + display_name=group.display_name, + session_count=group.session_count, + last_modified=group.last_modified, + ) + for group in sorted( + groups.values(), + key=lambda group: (group.source.value, group.display_name.lower(), group.key), + ) + ] + + +def parse_file(path: Path) -> Session: + return parse_session_file(path) + + +def parse_sessions(path: Path) -> list[Session]: + if path.name == "state.vscdb": + return parse_cursor_sessions(path) + return [parse_session_file(path)] + + +@dataclass +class _ProjectGroup: + source: SourceKind + key: str + display_name: str + session_count: int + last_modified: float + + +def _filter_sessions_by_project(paths: list[Path], project_dir: Path) -> list[Path]: + target = _normalized_path(project_dir) + results: list[Path] = [] + for path in paths: + try: + sessions = parse_sessions(path) + except (OSError, ValueError): + continue + if any( + session.project_path + and _normalized_path(Path(session.project_path)) == target + for session in sessions + ): + results.append(path) + return results + + +def _find_cursor_sessions_by_keyword(keyword: str) -> list[Path]: + keyword_lower = keyword.lower() + db_files = find_cursor_sessions(cursor_state_db_path=cursor_state_db()) + if not db_files: + return [] + for db_file in db_files: + try: + sessions = parse_cursor_sessions(db_file) + except (OSError, ValueError): + continue + for session in sessions: + if keyword_lower in session.project_path.lower(): + return [db_file] + if any(keyword_lower in str(message.content).lower() for message in session.messages): + return [db_file] + return [] + + +def _project_key(path: Path, session: Session, source: SourceKind) -> str: + if source == SourceKind.CLAUDE: + return path.parent.name + if session.project_path: + return session.project_path + return str(path.parent) + + +def _normalized_path(path: Path) -> str: + try: + resolved = str(path.expanduser().resolve()) + except OSError: + resolved = str(path.expanduser()) + return os.path.normcase(resolved) + + +def _mtime(path: Path) -> float: + try: + return path.stat().st_mtime + except OSError: + return 0.0 diff --git a/cc_stats/webhook.py b/cc_stats/webhook.py index bd0adcf..5ac8b8f 100644 --- a/cc_stats/webhook.py +++ b/cc_stats/webhook.py @@ -8,13 +8,8 @@ from datetime import datetime, timezone from .analyzer import SessionStats, analyze_session, merge_stats -from .parser import ( - find_codex_sessions, - find_gemini_sessions, - find_sessions, - parse_session_file, -) from .pricing import estimate_cost_from_token_by_model +from .sources import collect_session_files, parse_file def _collect_today_stats() -> SessionStats | None: @@ -24,14 +19,12 @@ def _collect_today_stats() -> SessionStats | None: 落在今天,该 session 就会被纳入统计。 """ today_key = datetime.now().strftime("%Y-%m-%d") - all_files: list = list(find_sessions()) - all_files.extend(find_codex_sessions()) - all_files.extend(find_gemini_sessions()) + all_files = collect_session_files() today_stats = [] for f in all_files: try: - session = parse_session_file(f) + session = parse_file(f) stats = analyze_session(session) # 按消息时间戳归日:token_by_date 包含今天的 key has_today_tokens = today_key in stats.token_by_date diff --git a/cc_stats_web/__main__.py b/cc_stats_web/__main__.py index 28ae7df..aa28e8b 100644 --- a/cc_stats_web/__main__.py +++ b/cc_stats_web/__main__.py @@ -1,19 +1,54 @@ -"""cc-stats-web: start local web dashboard and open browser""" +"""cc-stats-web: start local web dashboard and optionally open browser.""" +import argparse +import json import threading import webbrowser from .server import start_server -def main(): +def _build_startup_payload(host: str, port: int) -> dict: + url = f"http://{host}:{port}/" + return { + "event": "cc_stats_web_started", + "host": host, + "port": port, + "url": url, + } + + +def _parse_args(argv: list[str] | None = None) -> argparse.Namespace: + parser = argparse.ArgumentParser( + prog="cc-stats-web", + description="Start the local CC Statistics web dashboard.", + ) + parser.add_argument( + "--no-browser", + action="store_true", + help="Start the server without opening the default browser.", + ) + parser.add_argument( + "--json", + action="store_true", + help="Print a structured startup JSON line for desktop shells.", + ) + return parser.parse_args(argv) + + +def main(argv: list[str] | None = None): + args = _parse_args(argv) server, port = start_server() - url = f"http://127.0.0.1:{port}/" - print(f"CC Stats Web Dashboard: {url}") - print("Press Ctrl+C to stop.") - - # Open browser after short delay - threading.Timer(0.5, lambda: webbrowser.open(url)).start() + payload = _build_startup_payload("127.0.0.1", port) + url = payload["url"] + if args.json: + print(json.dumps(payload, ensure_ascii=False), flush=True) + else: + print(f"CC Stats Web Dashboard: {url}") + print("Press Ctrl+C to stop.") + + if not args.no_browser: + threading.Timer(0.5, lambda: webbrowser.open(url)).start() try: server.serve_forever() diff --git a/cc_stats_web/server.py b/cc_stats_web/server.py index aa85f78..89f56d4 100644 --- a/cc_stats_web/server.py +++ b/cc_stats_web/server.py @@ -5,11 +5,17 @@ import json import os import socket +import threading +import time from collections import defaultdict +from copy import deepcopy +from dataclasses import dataclass from datetime import datetime, timedelta, timezone -from http.server import HTTPServer, SimpleHTTPRequestHandler +from pathlib import Path +from http.server import SimpleHTTPRequestHandler, ThreadingHTTPServer from urllib.parse import parse_qs, urlparse +from cc_stats.cli import _trim_stats_by_date_range from cc_stats.analyzer import ( SessionStats, TokenUsage, @@ -17,45 +23,192 @@ compute_cache_stats, merge_stats, ) -from cc_stats.parser import ( - find_gemini_sessions, - find_sessions, - parse_gemini_json, - parse_jsonl, -) +from cc_stats.pricing import match_model_pricing +from cc_stats.sources import collect_session_files, list_projects, parse_file, parse_sessions _web_dir = os.path.join(os.path.dirname(__file__), "web") -# Model pricing ($/M tokens) -_PRICING = { - "opus": {"input": 15, "output": 75, "cache_read": 1.5, "cache_create": 18.75}, - "sonnet": {"input": 3, "output": 15, "cache_read": 0.3, "cache_create": 3.75}, - "haiku": {"input": 0.8, "output": 4, "cache_read": 0.08, "cache_create": 1.0}, - "gpt-4o": {"input": 2.5, "output": 10, "cache_read": 1.25, "cache_create": 2.5}, - "o1": {"input": 15, "output": 60, "cache_read": 7.5, "cache_create": 15}, - "o3": {"input": 10, "output": 40, "cache_read": 2.5, "cache_create": 10}, - "gemini-2.5-pro": {"input": 1.25, "output": 10, "cache_read": 0.31, "cache_create": 1.25}, - "gemini-2.5-flash": {"input": 0.15, "output": 0.60, "cache_read": 0.04, "cache_create": 0.15}, - "gemini-2.0-flash": {"input": 0.10, "output": 0.40, "cache_read": 0.025, "cache_create": 0.10}, -} - - -def _match_pricing(model: str) -> dict: - lower = model.lower() - # Gemini models (exact match first) - for key in ("gemini-2.5-pro", "gemini-2.5-flash", "gemini-2.0-flash"): - if key in lower: - return _PRICING[key] - if "gemini" in lower: - return _PRICING["gemini-2.5-flash"] - for key in ["opus", "haiku", "sonnet", "gpt-4o", "o1", "o3"]: - if key in lower: - return _PRICING[key] - return _PRICING["sonnet"] + +@dataclass(frozen=True) +class _AnalyzedCacheEntry: + signature: tuple[tuple[str, int | None, int | None], ...] + stats: list[SessionStats] + created_at: float + + +@dataclass(frozen=True) +class _ProjectsCacheEntry: + signature: tuple[tuple[str, int | None, int | None], ...] + projects: list[dict] + created_at: float + + +@dataclass(frozen=True) +class _DashboardPeriodRange: + since_dt: datetime | None + since_date: str | None + until_date: str | None + daily_days: int + + +_CACHE_TTL_SECONDS = 45.0 +_ANALYZED_CACHE_LOCK = threading.Lock() +_PROJECTS_CACHE_LOCK = threading.Lock() +_ANALYZED_CACHE: dict[tuple[str, str], _AnalyzedCacheEntry] = {} +_PROJECTS_CACHE: dict[str, _ProjectsCacheEntry] = {} + + +def _session_files_signature(files: list[Path]) -> tuple[tuple[str, int | None, int | None], ...]: + signature = [] + for path in files: + try: + stat = path.stat() + signature.append((str(path), stat.st_mtime_ns, stat.st_size)) + except OSError: + signature.append((str(path), None, None)) + return tuple(signature) + + +def _cache_source_key(source: str | None) -> str: + env_parts = [ + os.environ.get("CC_STATS_CLAUDE_PROJECTS_DIR", ""), + os.environ.get("CC_STATS_CODEX_HOME", ""), + os.environ.get("CC_STATS_GEMINI_HOME", ""), + os.environ.get("CC_STATS_CURSOR_STATE_DB", ""), + os.environ.get("CC_STATS_CURSOR_USER_DIR", ""), + os.environ.get("HOME", ""), + ] + return "\0".join([source or "", *env_parts]) + + +def _cache_project_key(project_dir_name) -> str: + return str(project_dir_name or "") + + +def _is_cache_fresh(created_at: float) -> bool: + return time.monotonic() - created_at <= _CACHE_TTL_SECONDS + + +def _now_local() -> datetime: + return datetime.now().astimezone() + + +def _dashboard_period_range( + period: str | None, + now: datetime | None = None, +) -> _DashboardPeriodRange | None: + if not period: + return None + + normalized = period.strip().lower() + local_now = now if now is not None else _now_local() + if local_now.tzinfo is None: + local_now = local_now.astimezone() + + if normalized == "all": + return _DashboardPeriodRange(None, None, None, 30) + if normalized == "today": + start = local_now.replace(hour=0, minute=0, second=0, microsecond=0) + elif normalized == "week": + start = (local_now - timedelta(days=local_now.weekday())).replace( + hour=0, + minute=0, + second=0, + microsecond=0, + ) + elif normalized == "month": + start = local_now.replace(day=1, hour=0, minute=0, second=0, microsecond=0) + else: + raise ValueError(f"Unsupported dashboard period: {period}") + + since_date = start.date().strftime("%Y-%m-%d") + until_date = local_now.date().strftime("%Y-%m-%d") + daily_days = max((local_now.date() - start.date()).days + 1, 1) + return _DashboardPeriodRange( + start.astimezone(timezone.utc), + since_date, + until_date, + daily_days, + ) + + +def _date_key_in_range( + date_key: str, + since_date: str | None, + until_date: str | None, +) -> bool: + if since_date and date_key < since_date: + return False + if until_date and date_key > until_date: + return False + return True + + +def _stats_matches_local_date_range( + stats: SessionStats, + since_date: str | None, + until_date: str | None, +) -> bool: + if not since_date and not until_date: + return True + if stats.token_by_date: + return any( + usage.total > 0 and _date_key_in_range(date_key, since_date, until_date) + for date_key, usage in stats.token_by_date.items() + ) + if stats.start_time: + return _date_key_in_range( + stats.start_time.astimezone().strftime("%Y-%m-%d"), + since_date, + until_date, + ) + if stats.end_time: + return _date_key_in_range( + stats.end_time.astimezone().strftime("%Y-%m-%d"), + since_date, + until_date, + ) + return False + + +def _scale_timedelta(value: timedelta, fraction: float) -> timedelta: + return timedelta(seconds=value.total_seconds() * fraction) + + +def _scale_stats_durations(stats: SessionStats, fraction: float) -> None: + fraction = max(0.0, min(fraction, 1.0)) + stats.total_duration = _scale_timedelta(stats.total_duration, fraction) + stats.ai_duration = _scale_timedelta(stats.ai_duration, fraction) + stats.user_duration = _scale_timedelta(stats.user_duration, fraction) + stats.active_duration = _scale_timedelta(stats.active_duration, fraction) + + +def _stats_for_local_date_range( + all_stats: list[SessionStats], + since_date: str | None, + until_date: str | None, +) -> list[SessionStats]: + if not since_date and not until_date: + return all_stats + + filtered = [] + for stats in all_stats: + if not _stats_matches_local_date_range(stats, since_date, until_date): + continue + original_token_total = stats.token_usage.total + stats_copy = deepcopy(stats) + _trim_stats_by_date_range(stats_copy, since_date, until_date) + if original_token_total > 0: + _scale_stats_durations( + stats_copy, + stats_copy.token_usage.total / original_token_total, + ) + filtered.append(stats_copy) + return filtered def _estimate_cost(tu: TokenUsage, model: str = "") -> float: - p = _match_pricing(model) + p = match_model_pricing(model) cost = 0.0 cost += tu.input_tokens / 1e6 * p["input"] cost += tu.output_tokens / 1e6 * p["output"] @@ -64,23 +217,11 @@ def _estimate_cost(tu: TokenUsage, model: str = "") -> float: return cost -def _resolve_project_name(proj_dir, jsonl_files): - for jf in jsonl_files: - try: - with open(jf, encoding="utf-8") as fh: - for ln in fh: - try: - obj = json.loads(ln) - if obj.get("cwd"): - return obj["cwd"] - except (json.JSONDecodeError, UnicodeDecodeError): - continue - except OSError: - continue - return proj_dir.name - - -def _stats_to_dict(stats: SessionStats, session_count: int = 1) -> dict: +def _stats_to_dict( + stats: SessionStats, + session_count: int = 1, + git_scan_skipped: bool = False, +) -> dict: def _td_seconds(td): return td.total_seconds() @@ -115,6 +256,8 @@ def _token_dict(tu): total_cost = 0.0 model_tokens = [] for model, usage in sorted(stats.token_by_model.items(), key=lambda x: x[1].total, reverse=True): + if usage.total <= 0: + continue cost = _estimate_cost(usage, model) total_cost += cost model_tokens.append({ @@ -146,6 +289,7 @@ def _token_dict(tu): "git_total_added": stats.git_total_added, "git_total_removed": stats.git_total_removed, "git_commit_count": stats.git_commit_count, + "git_scan_skipped": git_scan_skipped, "token_usage": _token_dict(stats.token_usage), "token_by_model": model_tokens, "estimated_cost": round(total_cost, 2), @@ -161,138 +305,228 @@ def _token_dict(tu): } -def _get_projects(): - from pathlib import Path - projects = [] - - # Claude projects - claude_projects = Path.home() / ".claude" / "projects" - if claude_projects.exists(): - for proj in sorted(claude_projects.iterdir()): - if not proj.is_dir(): - continue - jsonl_files = [f for f in proj.glob("*.jsonl") if not f.name.startswith("agent-")] - if not jsonl_files: - continue - display_name = _resolve_project_name(proj, jsonl_files) - projects.append({ - "dir_name": proj.name, - "display_name": display_name, - "session_count": len(jsonl_files), - "source": "claude", - }) +def _get_projects(source: str | None = None): + cache_key = _cache_source_key(source) + with _PROJECTS_CACHE_LOCK: + cached = _PROJECTS_CACHE.get(cache_key) + if cached and _is_cache_fresh(cached.created_at): + return cached.projects + + files = collect_session_files(source=source) + files.sort(key=lambda path: str(path)) + signature = _session_files_signature(files) + cached = _PROJECTS_CACHE.get(cache_key) + if cached and cached.signature == signature: + _PROJECTS_CACHE[cache_key] = _ProjectsCacheEntry( + signature=cached.signature, + projects=cached.projects, + created_at=time.monotonic(), + ) + return cached.projects + + projects = [ + { + "dir_name": project.key, + "display_name": project.display_name, + "session_count": project.session_count, + "source": project.source.value, + } + for project in list_projects(source=source) + ] + projects.sort(key=lambda x: x["session_count"], reverse=True) + _PROJECTS_CACHE[cache_key] = _ProjectsCacheEntry( + signature=signature, + projects=projects, + created_at=time.monotonic(), + ) + return projects - # Gemini projects - gemini_files = find_gemini_sessions() - if gemini_files: - gemini_by_dir: dict[str, list] = {} - for gf in gemini_files: - dir_key = gf.parent.parent.name # project hash - gemini_by_dir.setdefault(dir_key, []).append(gf) - for dir_key, files in gemini_by_dir.items(): - # Try to get project path from first session - display_name = dir_key - try: - session = parse_gemini_json(files[0]) - if session.project_path: - display_name = session.project_path - except Exception: - pass - projects.append({ - "dir_name": f"gemini:{dir_key}", - "display_name": display_name, - "session_count": len(files), - "source": "gemini", - }) - projects.sort(key=lambda x: x["session_count"], reverse=True) - return projects - - -def _collect_session_files(project_dir_name=None): - """Collect session files (Claude JSONL + Gemini JSON)""" - from pathlib import Path - files = [] - - if project_dir_name and project_dir_name.startswith("gemini:"): - # Gemini project - dir_key = project_dir_name[7:] - for gf in find_gemini_sessions(): - if gf.parent.parent.name == dir_key: - files.append(gf) - elif project_dir_name: - # Claude project - claude_projects = Path.home() / ".claude" / "projects" - proj_dir = claude_projects / project_dir_name - files = sorted(f for f in proj_dir.glob("*.jsonl") if not f.name.startswith("agent-")) - else: - # All sources - files = [f for f in find_sessions() if not f.name.startswith("agent-")] - files.extend(find_gemini_sessions()) +def _collect_session_files(project_dir_name=None, source: str | None = None): + """Collect session files from the shared source registry.""" + files = collect_session_files(source=source) + if not project_dir_name: + return files - return files + filtered = [] + for f in files: + try: + sessions = _parse_sessions_from_file(f) + except Exception: + continue + if any(session.project_path == project_dir_name for session in sessions): + filtered.append(f) + continue + if ( + any(session.source == "claude" for session in sessions) + and f.parent.name == project_dir_name + ): + filtered.append(f) + return filtered def _parse_session_file(f): - """Parse a session file based on its extension""" - if f.suffix == ".json": - return parse_gemini_json(f) - return parse_jsonl(f) + """Parse a session file through the shared source parser.""" + return parse_file(f) -def _get_stats(project_dir_name=None, since_days=None): - files = _collect_session_files(project_dir_name) - if not files: - return {"error": "No sessions found"} +def _parse_sessions_from_file(f): + """Parse one source entry into one or more sessions.""" + if getattr(f, "name", "") == "state.vscdb": + return parse_sessions(f) + return [_parse_session_file(f)] - files.sort(key=lambda f: f.stat().st_mtime) - since_dt = None - if since_days: - since_dt = datetime.now(tz=timezone.utc) - timedelta(days=since_days) +def _filter_files_by_mtime(files: list, since_dt: datetime | None): + if since_dt is None: + return files + + threshold = since_dt.timestamp() + filtered = [] + for f in files: + try: + if f.stat().st_mtime >= threshold: + filtered.append(f) + except OSError: + filtered.append(f) + return filtered + +def _session_matches_project(session, path, project_dir_name) -> bool: + if not project_dir_name: + return True + if session.project_path == project_dir_name: + return True + return session.source == "claude" and path.parent.name == project_dir_name + + +def _analyze_session_files( + files: list, + since_dt: datetime | None = None, + project_dir_name=None, +) -> list[SessionStats]: all_stats = [] for f in files: try: - session = _parse_session_file(f) - stats = analyze_session(session) - if since_dt and stats.end_time and stats.end_time < since_dt: - continue - all_stats.append(stats) + sessions = _parse_sessions_from_file(f) + for session in sessions: + if not _session_matches_project(session, f, project_dir_name): + continue + stats = analyze_session(session, include_git=False) + if since_dt and stats.end_time and stats.end_time < since_dt: + continue + all_stats.append(stats) except Exception: continue + return all_stats + + +def _get_cached_analyzed_stats( + project_dir_name=None, + source: str | None = None, +) -> list[SessionStats]: + cache_key = (_cache_source_key(source), _cache_project_key(project_dir_name)) + with _ANALYZED_CACHE_LOCK: + cached = _ANALYZED_CACHE.get(cache_key) + if cached and _is_cache_fresh(cached.created_at): + return cached.stats + + files = _collect_session_files(project_dir_name, source=source) + if not files: + return [] + + files.sort(key=lambda f: f.stat().st_mtime) + signature = _session_files_signature(files) + cached = _ANALYZED_CACHE.get(cache_key) + if cached and cached.signature == signature: + _ANALYZED_CACHE[cache_key] = _AnalyzedCacheEntry( + signature=cached.signature, + stats=cached.stats, + created_at=time.monotonic(), + ) + return cached.stats + + all_stats = _analyze_session_files(files, project_dir_name=project_dir_name) + _ANALYZED_CACHE[cache_key] = _AnalyzedCacheEntry( + signature=signature, + stats=all_stats, + created_at=time.monotonic(), + ) + return all_stats + + +def _merged_stats(all_stats: list[SessionStats]) -> SessionStats | None: + if not all_stats: + return None + return all_stats[0] if len(all_stats) == 1 else merge_stats(all_stats) + + +def _daily_date_keys( + since_dt: datetime, + days: int, + now: datetime | None = None, +) -> list[str]: + now_dt = now or datetime.now(tz=timezone.utc) + if days <= 1: + start_date = since_dt.astimezone().date() + end_date = now_dt.astimezone().date() + if start_date > end_date: + start_date = end_date + span = (end_date - start_date).days + return [ + (start_date + timedelta(days=i)).strftime("%Y-%m-%d") + for i in range(span + 1) + ] + + today = now_dt.astimezone().date() + return [ + (today - timedelta(days=i)).strftime("%Y-%m-%d") + for i in range(days - 1, -1, -1) + ] + + +def _get_stats(project_dir_name=None, since_days=None, source: str | None = None): + files = _collect_session_files(project_dir_name, source=source) + if not files: + return {"error": "No sessions found"} + + since_dt = None + if since_days: + since_dt = datetime.now(tz=timezone.utc) - timedelta(days=since_days) + + files = _filter_files_by_mtime(files, since_dt) + files.sort(key=lambda f: f.stat().st_mtime) + + all_stats = _analyze_session_files(files, since_dt, project_dir_name) if not all_stats: return {"error": "No valid sessions"} - result = all_stats[0] if len(all_stats) == 1 else merge_stats(all_stats) - return _stats_to_dict(result, session_count=len(all_stats)) + result = _merged_stats(all_stats) + if result is None: + return {"error": "No valid sessions"} + return _stats_to_dict( + result, + session_count=len(all_stats), + git_scan_skipped=True, + ) -def _get_daily_stats(project_dir_name=None, days=14): - files = _collect_session_files(project_dir_name) +def _get_daily_stats(project_dir_name=None, days=14, source: str | None = None): + files = _collect_session_files(project_dir_name, source=source) since_dt = datetime.now(tz=timezone.utc) - timedelta(days=days) + files = _filter_files_by_mtime(files, since_dt) daily: dict[str, list] = defaultdict(list) - for f in files: - try: - session = _parse_session_file(f) - stats = analyze_session(session) - if stats.end_time and stats.end_time < since_dt: - continue - if not stats.start_time: - continue - day_key = stats.start_time.astimezone().strftime("%Y-%m-%d") - daily[day_key].append(stats) - except Exception: + for stats in _analyze_session_files(files, since_dt, project_dir_name): + if not stats.start_time: continue + day_key = stats.start_time.astimezone().strftime("%Y-%m-%d") + daily[day_key].append(stats) result = [] - today = datetime.now().date() - for i in range(days - 1, -1, -1): - d = today - timedelta(days=i) - day_key = d.strftime("%Y-%m-%d") + for day_key in _daily_date_keys(since_dt, days): day_stats = daily.get(day_key, []) if day_stats: merged = merge_stats(day_stats) if len(day_stats) > 1 else day_stats[0] @@ -316,31 +550,143 @@ def _get_daily_stats(project_dir_name=None, days=14): return result -def _get_skill_stats(project_dir_name=None, since_days=None): +def _add_token_usage(target: TokenUsage, source: TokenUsage) -> None: + target.input_tokens += source.input_tokens + target.output_tokens += source.output_tokens + target.cache_read_input_tokens += source.cache_read_input_tokens + target.cache_creation_input_tokens += source.cache_creation_input_tokens + + +def _daily_token_usage_and_cost( + stats_list: list[SessionStats], + day_key: str, + fallback_stats: SessionStats, +) -> tuple[TokenUsage, float]: + usage = TokenUsage() + cost = 0.0 + saw_token_dates = False + + for stats in stats_list: + day_usage = stats.token_by_date.get(day_key) + if day_usage is None: + continue + saw_token_dates = True + _add_token_usage(usage, day_usage) + + model_map = stats.token_by_model_by_date.get(day_key) + if model_map: + cost += sum( + _estimate_cost(model_usage, model) + for model, model_usage in model_map.items() + ) + elif stats.token_usage.total > 0: + stats_cost = sum( + _estimate_cost(model_usage, model) + for model, model_usage in stats.token_by_model.items() + ) + cost += stats_cost * day_usage.total / stats.token_usage.total + + if not saw_token_dates: + usage = fallback_stats.token_usage + cost = sum( + _estimate_cost(model_usage, model) + for model, model_usage in fallback_stats.token_by_model.items() + ) + + return usage, cost + + +def _daily_active_minutes( + stats_list: list[SessionStats], + day_key: str, + fallback_stats: SessionStats, +) -> float: + seconds = 0.0 + saw_token_dates = False + for stats in stats_list: + day_usage = stats.token_by_date.get(day_key) + if day_usage is None: + continue + saw_token_dates = True + if stats.token_usage.total > 0: + seconds += ( + stats.active_duration.total_seconds() + * day_usage.total + / stats.token_usage.total + ) + + if not saw_token_dates: + seconds = fallback_stats.active_duration.total_seconds() + + return round(seconds / 60, 1) + + +def _daily_stats_from_analyzed( + all_stats: list[SessionStats], + since_dt: datetime, + days: int, + now: datetime | None = None, +) -> list[dict]: + date_keys = _daily_date_keys(since_dt, days, now=now) + date_key_set = set(date_keys) + daily: dict[str, list] = defaultdict(list) + for stats in all_stats: + if stats.token_by_date: + for day_key, usage in stats.token_by_date.items(): + if usage.total > 0 and day_key in date_key_set: + daily[day_key].append(stats) + continue + if stats.start_time: + day_key = stats.start_time.astimezone().strftime("%Y-%m-%d") + if day_key in date_key_set: + daily[day_key].append(stats) + + result = [] + for day_key in date_keys: + day_stats = daily.get(day_key, []) + if day_stats: + merged = merge_stats(day_stats) if len(day_stats) > 1 else day_stats[0] + usage, cost = _daily_token_usage_and_cost(day_stats, day_key, merged) + active_minutes = _daily_active_minutes(day_stats, day_key, merged) + result.append({ + "date": day_key, + "sessions": len(day_stats), + "messages": merged.user_message_count, + "tool_calls": merged.tool_call_total, + "active_minutes": active_minutes, + "lines_added": merged.total_added, + "lines_removed": merged.total_removed, + "tokens": usage.total, + "cost": round(cost, 2), + }) + else: + result.append({ + "date": day_key, "sessions": 0, "messages": 0, "tool_calls": 0, + "active_minutes": 0, "lines_added": 0, "lines_removed": 0, "tokens": 0, "cost": 0, + }) + return result + + +def _get_skill_stats(project_dir_name=None, since_days=None, source: str | None = None): """Return skill usage statistics as a list sorted by call_count. Skill stats always cover ALL sessions (ignoring since_days) because skill usage patterns are more meaningful at the all-time level. """ - files = _collect_session_files(project_dir_name) + files = _collect_session_files(project_dir_name, source=source) if not files: return [] files.sort(key=lambda f: f.stat().st_mtime) - all_stats = [] - for f in files: - try: - session = _parse_session_file(f) - stats = analyze_session(session) - all_stats.append(stats) - except Exception: - continue + all_stats = _analyze_session_files(files, project_dir_name=project_dir_name) if not all_stats: return [] - result = all_stats[0] if len(all_stats) == 1 else merge_stats(all_stats) + result = _merged_stats(all_stats) + if result is None: + return [] skills = [] for name, su in sorted( @@ -361,8 +707,96 @@ def _get_skill_stats(project_dir_name=None, since_days=None): return skills +def _skill_stats_from_analyzed(all_stats: list[SessionStats]) -> list[dict]: + result = _merged_stats(all_stats) + if result is None: + return [] + + skills = [] + for name, su in sorted( + result.skill_stats.items(), key=lambda x: x[1].call_count, reverse=True + ): + resolved = su.success_count + su.error_count + success_rate = ( + round(su.success_count / resolved * 100) if resolved > 0 else None + ) + skills.append({ + "name": name, + "call_count": su.call_count, + "success_count": su.success_count, + "error_count": su.error_count, + "unknown_count": su.unknown_count, + "success_rate": success_rate, + }) + return skills + + +def _get_dashboard_payload( + project_dir_name=None, + since_days=None, + daily_days=30, + source: str | None = None, + period: str | None = None, +): + all_stats = _get_cached_analyzed_stats(project_dir_name, source=source) + if not all_stats: + return { + "stats": {"error": "No sessions found"}, + "daily_stats": [], + "skills": [], + } + + since_dt = None + period_range = _dashboard_period_range(period) + if period_range is not None: + since_dt = period_range.since_dt + stats_for_range = _stats_for_local_date_range( + all_stats, + period_range.since_date, + period_range.until_date, + ) + daily_days = period_range.daily_days + daily_source_stats = stats_for_range + elif since_days: + since_dt = datetime.now(tz=timezone.utc) - timedelta(days=since_days) + stats_for_range = [ + stats for stats in all_stats + if not since_dt or not stats.end_time or stats.end_time >= since_dt + ] + daily_source_stats = all_stats + else: + stats_for_range = all_stats + daily_source_stats = all_stats + + merged = _merged_stats(stats_for_range) + if merged is None: + if period_range is not None: + stats_payload = _stats_to_dict( + SessionStats(session_id="", project_path=str(project_dir_name or "")), + session_count=0, + git_scan_skipped=True, + ) + else: + stats_payload = {"error": "No valid sessions"} + else: + stats_payload = _stats_to_dict( + merged, + session_count=len(stats_for_range), + git_scan_skipped=True, + ) + + daily_since = since_dt or datetime.now(tz=timezone.utc) - timedelta(days=daily_days) + return { + "stats": stats_payload, + "daily_stats": _daily_stats_from_analyzed(daily_source_stats, daily_since, daily_days), + "skills": _skill_stats_from_analyzed(all_stats), + } + + def _get_version_update(): """检查版本更新(供 Web API 使用)""" + if os.environ.get("CC_STATS_DESKTOP_SHELL") == "1": + return {"has_update": False} try: from cc_stats.version_checker import check_for_update result = check_for_update() @@ -386,34 +820,59 @@ def do_GET(self): parsed = urlparse(self.path) path = parsed.path params = parse_qs(parsed.query) + source = params.get("source", [None])[0] - if path == "/api/projects": - self._json(_get_projects()) - elif path == "/api/stats": - project = params.get("project", [None])[0] - days = params.get("days", [None])[0] - self._json(_get_stats( - project_dir_name=project or None, - since_days=int(days) if days and days != "0" else None, - )) - elif path == "/api/daily_stats": - project = params.get("project", [None])[0] - days = params.get("days", ["14"])[0] - self._json(_get_daily_stats( - project_dir_name=project or None, - days=int(days), - )) - elif path == "/api/skills": - project = params.get("project", [None])[0] - days = params.get("days", [None])[0] - self._json(_get_skill_stats( - project_dir_name=project or None, - since_days=int(days) if days and days != "0" else None, - )) - elif path == "/api/version_check": - self._json(_get_version_update()) - else: - super().do_GET() + try: + if path in {"", "/"}: + self._serve_index() + elif path == "/api/health": + self._json({"status": "ok"}) + elif path == "/api/projects": + self._json(_get_projects(source=source)) + elif path == "/api/stats": + project = params.get("project", [None])[0] + days = params.get("days", [None])[0] + self._json(_get_stats( + project_dir_name=project or None, + since_days=int(days) if days and days != "0" else None, + source=source, + )) + elif path == "/api/dashboard": + project = params.get("project", [None])[0] + days = params.get("days", [None])[0] + daily_days = params.get("daily_days", ["30"])[0] + period = params.get("period", [None])[0] + self._json(_get_dashboard_payload( + project_dir_name=project or None, + since_days=( + int(days) if not period and days and days != "0" else None + ), + daily_days=int(daily_days), + source=source, + period=period, + )) + elif path == "/api/daily_stats": + project = params.get("project", [None])[0] + days = params.get("days", ["14"])[0] + self._json(_get_daily_stats( + project_dir_name=project or None, + days=int(days), + source=source, + )) + elif path == "/api/skills": + project = params.get("project", [None])[0] + days = params.get("days", [None])[0] + self._json(_get_skill_stats( + project_dir_name=project or None, + since_days=int(days) if days and days != "0" else None, + source=source, + )) + elif path == "/api/version_check": + self._json(_get_version_update()) + else: + super().do_GET() + except ValueError as exc: + self._json({"error": str(exc)}) def _json(self, data): body = json.dumps(data, ensure_ascii=False).encode("utf-8") @@ -423,17 +882,45 @@ def _json(self, data): self.end_headers() self.wfile.write(body) + def _serve_index(self): + index_path = Path(_web_dir) / "index.html" + try: + body = index_path.read_bytes() + except OSError: + self.send_error(404, "Dashboard index not found") + return + + self.send_response(200) + self.send_header("Content-Type", "text/html; charset=utf-8") + self.send_header("Content-Length", str(len(body))) + self.end_headers() + self.wfile.write(body) + def log_message(self, format, *args): pass +class CcStatsHTTPServer(ThreadingHTTPServer): + daemon_threads = True + + +def _warm_dashboard_cache() -> None: + try: + _get_cached_analyzed_stats() + _get_projects() + except Exception: + pass + + def find_free_port() -> int: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.bind(("127.0.0.1", 0)) return s.getsockname()[1] -def start_server() -> tuple[HTTPServer, int]: +def start_server(warm_cache: bool = True) -> tuple[CcStatsHTTPServer, int]: port = find_free_port() - server = HTTPServer(("127.0.0.1", port), ApiHandler) + server = CcStatsHTTPServer(("127.0.0.1", port), ApiHandler) + if warm_cache: + threading.Thread(target=_warm_dashboard_cache, daemon=True).start() return server, port diff --git a/cc_stats_web/web/index.html b/cc_stats_web/web/index.html index 03986fe..a4043f3 100644 --- a/cc_stats_web/web/index.html +++ b/cc_stats_web/web/index.html @@ -1,327 +1,1321 @@ - +