diff --git a/docs/architecture.md b/docs/architecture.md index a64ff92..3463132 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -15,9 +15,9 @@ ▼ ▼ ▼ ┌─────────────────┐ ┌─────────────────────┐ ┌──────────────────┐ │ session_path │ │ jsonl_parser │ │ exclusion_rules │ -│ list_projects │ │ parse_session │ │ load + match │ -│ list_sessions │ │ quick_session_info │ └────────┬─────────┘ -│ safe_join │ │ _parse_tool_result │ │ +│ list_projects │ │ session_peek │ │ load + match │ +│ list_sessions │ │ tool_dispatch │ └────────┬─────────┘ +│ safe_join │ │ jsonl_helpers │ │ └────────┬────────┘ └──────────┬──────────┘ │ │ │ │ └────────────┬───────────┴────────────────────────┘ @@ -48,7 +48,7 @@ | Layer | Responsibility | Key modules | |-------|----------------|-------------| | **Data discovery** | Resolve `~/.claude/projects/`, list projects and sessions, prevent path traversal | `utils/session_path.py` | -| **Parsing** | JSONL → session dict (messages, metadata, tool rendering) | `utils/jsonl_parser.py` | +| **Parsing** | JSONL → session dict (messages, metadata, tool rendering) | `utils/jsonl_parser.py`, `utils/tool_dispatch.py`, `utils/session_peek.py`, `utils/jsonl_helpers.py` | | **Filtering** | Exclude sensitive sessions via rules file | `utils/exclusion_rules.py` | | **Statistics** | Aggregates for API and exporters | `utils/session_stats.py` | | **Export — Markdown** | Session → YAML-frontmatter Markdown | `utils/md_exporter.py` | @@ -71,13 +71,13 @@ ## Dispatch table -In `utils/jsonl_parser.py`, tool results are classified through `_parse_tool_result`, a **predicate-ordered dispatch table** (not a simple `if tool_name == ...` chain). **Order is load-bearing**: the first matching predicate wins. Tests in `tests/test_jsonl_parser.py` guard ordering regressions. +In `utils/tool_dispatch.py`, tool results are classified through `_parse_tool_result`, a **predicate-ordered dispatch table** (not a simple `if tool_name == ...` chain). **Order is load-bearing**: the first matching predicate wins. Tests in `tests/test_jsonl_parser.py` and `tests/test_real_session_fixtures.py` guard ordering regressions. When adding a new tool renderer: -1. Add predicate + builder pair in the dispatch table in the correct order (specific before generic). -2. Add or extend a JSONL fixture under `tests/fixtures/` if needed. -3. Run `pytest tests/test_jsonl_parser.py -v`. +1. Add a `(predicate, builder)` pair to `_TOOL_RESULT_DISPATCH` in `utils/tool_dispatch.py`, preserving existing predicate order unless you also update fixtures and ordering tests (`tests/test_jsonl_parser.py`, `tests/test_real_session_fixtures.py`). Order is **not** “specific before generic” in general — the first match wins. `_tool_result_pred_task_message` is the intentional broad-before-narrow exception (`task_id` or `message` before retrieval/completed/async). +2. Add or extend a JSONL fixture under `tests/fixtures/` (especially for overlaps with existing predicates). +3. Run `pytest tests/test_jsonl_parser.py tests/test_real_session_fixtures.py -v`. ## Export state machine diff --git a/tests/test_jsonl_parser.py b/tests/test_jsonl_parser.py index fea0a6e..8accf2f 100644 --- a/tests/test_jsonl_parser.py +++ b/tests/test_jsonl_parser.py @@ -234,6 +234,16 @@ def test_plan_result(self): r = _parse_tool_result({"plan": [], "filePath": "/plan.md"}) assert r["result_type"] == "plan" + def test_plan_with_content_not_classified_as_file_write(self): + """plan is registered before file_write in _TOOL_RESULT_DISPATCH.""" + r = _parse_tool_result({ + "plan": [], + "filePath": "/plan.md", + "content": "plan body", + }) + assert r["result_type"] == "plan" + assert r["file_path"] == "/plan.md" + def test_unknown_fallback(self): r = _parse_tool_result({"unexpected": True}) assert r["result_type"] == "unknown" diff --git a/utils/jsonl_helpers.py b/utils/jsonl_helpers.py new file mode 100644 index 0000000..8fcc3ed --- /dev/null +++ b/utils/jsonl_helpers.py @@ -0,0 +1,99 @@ +"""Shared content helpers for JSONL parsing and session peek.""" + +import re +from typing import Any + +from models.session import MessageDict + + +def entry_message(entry: dict[str, Any]) -> dict[str, Any]: + m = entry.get("message") + return m if isinstance(m, dict) else {} + + +def normalize_content(content: Any) -> list[dict[str, Any]]: + """Content can be a plain string, a list of strings, or a list of typed + blocks. Normalize everything into [{type, text}, ...] form.""" + if isinstance(content, str): + return [{"type": "text", "text": content}] + if isinstance(content, list): + result = [] + for part in content: + if isinstance(part, str): + result.append({"type": "text", "text": part}) + elif isinstance(part, dict): + result.append(part) + return result + return [] + + +def extract_text(content_parts: Any) -> str: + """Grab just the text blocks out of a content array, ignore tool_use/thinking.""" + parts = normalize_content(content_parts) + texts = [] + for part in parts: + if part.get("type") == "text": + texts.append(part.get("text", "")) + return "\n".join(texts) + + +def extract_images(content_parts: Any) -> list[dict[str, Any]]: + """Pull base64 image blocks out of a content array. + Also looks inside nested tool_result content blocks.""" + parts = normalize_content(content_parts) + images = [] + for part in parts: + if part.get("type") == "image": + source = part.get("source", {}) + if source.get("type") == "base64" and source.get("data"): + images.append({ + "media_type": source.get("media_type", "image/png"), + "data": source["data"], + }) + elif part.get("type") == "tool_result": + # Nested content is usually a block list; string content is not normalized here. + nested = part.get("content", []) + if isinstance(nested, list): + for sub in nested: + if isinstance(sub, dict) and sub.get("type") == "image": + source = sub.get("source", {}) + if source.get("type") == "base64" and source.get("data"): + images.append({ + "media_type": source.get("media_type", "image/png"), + "data": source["data"], + }) + return images + + +def first_title_line(text: str, max_chars: int = 100) -> str: + """First non-empty line after system-tag strip, truncated for session titles.""" + return strip_system_tags(text).strip().split("\n")[0][:max_chars] + + +def infer_title(messages: list[MessageDict]) -> str: + """Use the first line of the first real user message as the session title.""" + for msg in messages: + if msg["role"] == "user" and msg.get("text"): + first_line = first_title_line(msg["text"]) + if first_line: + return first_line + return "Untitled Session" + + +def strip_system_tags(text: str) -> str: + """Strip out the internal XML tags Claude Code injects (system-reminder, + ide_opened_file, etc.) so exported text is clean.""" + # Remove block tags and their content + for tag in ( + "system-reminder", "ide_opened_file", "user-prompt-submit-hook", + "claude_background_info", "fast_mode_info", "env", + ): + text = re.sub(rf"<{tag}>[\s\S]*?", "", text) + # Strip remaining known opening/closing tags + text = re.sub( + r"", + "", + text, + ) + return text.strip() diff --git a/utils/jsonl_parser.py b/utils/jsonl_parser.py index 14b71c3..bfb6088 100644 --- a/utils/jsonl_parser.py +++ b/utils/jsonl_parser.py @@ -6,9 +6,37 @@ from datetime import datetime from typing import Any -from models.session import MessageDict, QuickSessionInfoDict, SessionDict +from models.session import MessageDict, SessionDict +from utils.jsonl_helpers import ( + entry_message as _entry_message, + extract_images as _extract_images, + extract_text as _extract_text, + infer_title as _infer_title, + normalize_content as _normalize_content, + strip_system_tags as _strip_system_tags, +) +from utils.session_peek import quick_session_info +from utils.tool_dispatch import _TOOL_RESULT_DISPATCH, _parse_tool_result from utils.validation import validate_session_dict +__all__ = [ + "parse_session", + "quick_session_info", + "_parse_tool_result", + "_TOOL_RESULT_DISPATCH", + "_entry_message", + "_process_user", + "_process_assistant", + "_process_system", + "_process_progress", + "_normalize_content", + "_extract_text", + "_extract_images", + "_infer_title", + "_strip_system_tags", + "_track_file_activity", +] + def parse_session(filepath: str) -> SessionDict: """Main entry point. Reads every line from a .jsonl file and builds up @@ -131,11 +159,6 @@ def parse_session(filepath: str) -> SessionDict: ) -def _entry_message(entry: dict[str, Any]) -> dict[str, Any]: - m = entry.get("message") - return m if isinstance(m, dict) else {} - - def _process_user( entry: dict[str, Any], messages: list[MessageDict], metadata: dict[str, Any] ) -> None: @@ -341,421 +364,3 @@ def _track_file_activity( url_or_query = tool_input.get("url") or tool_input.get("query", "") if url_or_query: metadata["web_fetches"].append(url_or_query) - - -def _tool_result_pred_bash(tr: dict[str, Any]) -> bool: - return "stdout" in tr or "stderr" in tr - - -def _tool_result_build_bash(tr: dict[str, Any], base: dict[str, Any]) -> dict[str, Any]: - result = dict(base) - result["result_type"] = "bash" - result["stdout"] = tr.get("stdout", "") - result["stderr"] = tr.get("stderr", "") - result["exit_code"] = tr.get("exitCode") - result["interrupted"] = tr.get("interrupted", False) - result["is_error"] = tr.get("is_error", False) - result["return_code_interpretation"] = tr.get("returnCodeInterpretation") - return result - - -def _tool_result_pred_file_edit(tr: dict[str, Any]) -> bool: - return "structuredPatch" in tr or ( - "filePath" in tr and "newString" in tr - ) - - -def _tool_result_build_file_edit(tr: dict[str, Any], base: dict[str, Any]) -> dict[str, Any]: - result = dict(base) - result["result_type"] = "file_edit" - result["file_path"] = tr.get("filePath", "") - result["replace_all"] = tr.get("replaceAll", False) - return result - - -def _tool_result_pred_file_write(tr: dict[str, Any]) -> bool: - return "filePath" in tr and "content" in tr - - -def _tool_result_build_file_write(tr: dict[str, Any], base: dict[str, Any]) -> dict[str, Any]: - result = dict(base) - result["result_type"] = "file_write" - result["file_path"] = tr.get("filePath", "") - return result - - -def _tool_result_pred_glob(tr: dict[str, Any]) -> bool: - return "filenames" in tr and isinstance(tr.get("filenames"), list) - - -def _tool_result_build_glob(tr: dict[str, Any], base: dict[str, Any]) -> dict[str, Any]: - result = dict(base) - filenames = tr["filenames"] - result["result_type"] = "glob" - result["num_files"] = tr.get("numFiles", len(filenames)) - result["truncated"] = tr.get("truncated", False) - result["duration_ms"] = tr.get("durationMs") - result["filenames"] = filenames - return result - - -def _tool_result_pred_grep(tr: dict[str, Any]) -> bool: - return "mode" in tr and "numFiles" in tr - - -def _tool_result_build_grep(tr: dict[str, Any], base: dict[str, Any]) -> dict[str, Any]: - result = dict(base) - result["result_type"] = "grep" - result["mode"] = tr.get("mode") - result["num_files"] = tr.get("numFiles", 0) - result["num_lines"] = tr.get("numLines", 0) - result["duration_ms"] = tr.get("durationMs") - content = tr.get("content", "") - if content and isinstance(content, str): - result["content"] = content - return result - - -def _tool_result_pred_file_read(tr: dict[str, Any]) -> bool: - return "file" in tr and isinstance(tr["file"], dict) - - -def _tool_result_build_file_read(tr: dict[str, Any], base: dict[str, Any]) -> dict[str, Any]: - result = dict(base) - file_obj = tr["file"] - result["result_type"] = "file_read" - result["file_path"] = file_obj.get("filePath", "") - result["num_lines"] = file_obj.get("numLines") - content = file_obj.get("content", "") - if content and isinstance(content, str): - result["content"] = content - return result - - -def _tool_result_pred_web_search(tr: dict[str, Any]) -> bool: - return "query" in tr and "results" in tr - - -def _tool_result_build_web_search(tr: dict[str, Any], base: dict[str, Any]) -> dict[str, Any]: - result = dict(base) - result["result_type"] = "web_search" - result["query"] = tr.get("query", "") - # Defensive: legacy ``len(tr.get("results", []))`` crashed when key existed - # with value None (``len(None)``). Non-sized ``results`` → count 0. - raw_results = tr.get("results") - if isinstance(raw_results, (list, tuple, set, dict)): - result["result_count"] = len(raw_results) - else: - result["result_count"] = 0 - result["duration_seconds"] = tr.get("durationSeconds") - return result - - -def _tool_result_pred_web_fetch(tr: dict[str, Any]) -> bool: - return "url" in tr and "code" in tr - - -def _tool_result_build_web_fetch(tr: dict[str, Any], base: dict[str, Any]) -> dict[str, Any]: - result = dict(base) - result["result_type"] = "web_fetch" - result["url"] = tr.get("url", "") - result["status_code"] = tr.get("code") - result["duration_ms"] = tr.get("durationMs") - return result - - -def _tool_result_pred_task_message(tr: dict[str, Any]) -> bool: - # Broad: matches ``task_id`` OR ``message``. Runs before retrieval/completed/async - # arms below — same short-circuit order as the original if/elif chain. Payloads - # that also carry e.g. ``agentId`` still classify here if they have ``message``. - # Refining order needs golden fixtures; track as follow-up if real collisions appear. - return "task_id" in tr or "message" in tr - - -def _tool_result_build_task_message(tr: dict[str, Any], base: dict[str, Any]) -> dict[str, Any]: - result = dict(base) - result["result_type"] = "task" - result["task_id"] = tr.get("task_id") - result["task_type"] = tr.get("task_type") - return result - - -def _tool_result_pred_task_retrieval(tr: dict[str, Any]) -> bool: - return "retrieval_status" in tr and "task" in tr - - -def _tool_result_build_task_retrieval(tr: dict[str, Any], base: dict[str, Any]) -> dict[str, Any]: - result = dict(base) - task_obj = tr["task"] if isinstance(tr["task"], dict) else {} - result["result_type"] = "task" - result["retrieval_status"] = tr.get("retrieval_status") - result["task_id"] = task_obj.get("task_id") - return result - - -def _tool_result_pred_task_completed(tr: dict[str, Any]) -> bool: - return "agentId" in tr and "totalDurationMs" in tr - - -def _tool_result_build_task_completed(tr: dict[str, Any], base: dict[str, Any]) -> dict[str, Any]: - result = dict(base) - result["result_type"] = "task" - result["agent_id"] = tr.get("agentId") - result["status"] = tr.get("status") - result["total_duration_ms"] = tr.get("totalDurationMs") - result["total_tokens"] = tr.get("totalTokens") - result["total_tool_use_count"] = tr.get("totalToolUseCount") - return result - - -def _tool_result_pred_task_async(tr: dict[str, Any]) -> bool: - return "agentId" in tr and "isAsync" in tr - - -def _tool_result_build_task_async(tr: dict[str, Any], base: dict[str, Any]) -> dict[str, Any]: - result = dict(base) - result["result_type"] = "task" - result["agent_id"] = tr.get("agentId") - result["status"] = tr.get("status") - result["description"] = tr.get("description") - return result - - -def _tool_result_pred_todo_write(tr: dict[str, Any]) -> bool: - return "newTodos" in tr or "oldTodos" in tr - - -def _tool_result_build_todo_write(tr: dict[str, Any], base: dict[str, Any]) -> dict[str, Any]: - result = dict(base) - new_todos = tr.get("newTodos", []) - result["result_type"] = "todo_write" - result["todo_count"] = len(new_todos) if isinstance(new_todos, list) else 0 - result["todos"] = new_todos if isinstance(new_todos, list) else [] - return result - - -def _tool_result_pred_user_input(tr: dict[str, Any]) -> bool: - return "questions" in tr and "answers" in tr - - -def _tool_result_build_user_input(tr: dict[str, Any], base: dict[str, Any]) -> dict[str, Any]: - result = dict(base) - result["result_type"] = "user_input" - result["questions"] = tr.get("questions", []) - result["answers"] = tr.get("answers", {}) - return result - - -def _tool_result_pred_plan(tr: dict[str, Any]) -> bool: - return "plan" in tr and "filePath" in tr - - -def _tool_result_build_plan(tr: dict[str, Any], base: dict[str, Any]) -> dict[str, Any]: - result = dict(base) - result["result_type"] = "plan" - result["file_path"] = tr.get("filePath", "") - return result - - -# Dispatch registry: **first matching predicate wins** (same as legacy if/elif). -# Order is load-bearing — do not sort alphabetically or “more specific first” -# without replaying tests and real session fixtures. -# -# Notably ``task_message`` is intentionally broad (``task_id`` or ``message``) -# and sits before ``task_retrieval`` / ``task_completed`` / ``task_async`` so -# payloads that include overlapping keys still match the legacy branch order. -# -# To add a shape: append ``(pred, build)`` here, or insert only after verifying -# predicates above would not steal intended matches. -_TOOL_RESULT_DISPATCH = ( - (_tool_result_pred_bash, _tool_result_build_bash), - (_tool_result_pred_file_edit, _tool_result_build_file_edit), - (_tool_result_pred_file_write, _tool_result_build_file_write), - (_tool_result_pred_glob, _tool_result_build_glob), - (_tool_result_pred_grep, _tool_result_build_grep), - (_tool_result_pred_file_read, _tool_result_build_file_read), - (_tool_result_pred_web_search, _tool_result_build_web_search), - (_tool_result_pred_web_fetch, _tool_result_build_web_fetch), - (_tool_result_pred_task_message, _tool_result_build_task_message), - (_tool_result_pred_task_retrieval, _tool_result_build_task_retrieval), - (_tool_result_pred_task_completed, _tool_result_build_task_completed), - (_tool_result_pred_task_async, _tool_result_build_task_async), - (_tool_result_pred_todo_write, _tool_result_build_todo_write), - (_tool_result_pred_user_input, _tool_result_build_user_input), - (_tool_result_pred_plan, _tool_result_build_plan), -) - - -def _parse_tool_result( - tool_result: Any, slug: str | None = None -) -> dict[str, Any] | None: - """Figure out what kind of tool result this is (bash, file edit, glob, etc.) - by looking at which keys are present, since the JSONL doesn't always tag them. - - Classification uses ``_TOOL_RESULT_DISPATCH``: ordered ``(predicate, builder)`` - pairs; the **first** predicate that matches wins (parity with the historical - ``if``/``elif`` chain — order is not strictly “specific before generic”). - - Append a new pair at the end to register a shape, or insert mid-table only - after checking interactions with broader predicates above (see notes on the - tuple).""" - if not isinstance(tool_result, dict): - return None - - base = {"slug": slug} - for pred, build in _TOOL_RESULT_DISPATCH: - if pred(tool_result): - return build(tool_result, base) - - result = dict(base) - result["result_type"] = "unknown" - return result - - -def quick_session_info(filepath: str) -> QuickSessionInfoDict: - """Lightweight peek at a session file -- returns title and last_timestamp - without fully parsing all messages. Much faster than parse_session() for - large files. - - Strategy: read the first ~50 lines for the title, then seek to the end of - the file and read the last chunk to find the last timestamp.""" - title = None - first_ts = None - last_ts = None - - # --- Pass 1: read first lines to find the title and first_timestamp --- - with open(filepath, "r", encoding="utf-8", errors="replace") as f: - lines_read = 0 - for line in f: - lines_read += 1 - if lines_read > 80: - break - line = line.strip() - if not line: - continue - try: - entry = json.loads(line) - except json.JSONDecodeError: - continue - - ts = entry.get("timestamp") - if ts: - if first_ts is None: - first_ts = ts - last_ts = ts # keep updating in case file is small - - if title is None and entry.get("type") == "user": - msg = _entry_message(entry) - text = _extract_text(msg.get("content", [])) - if text: - clean = _strip_system_tags(text).strip() - first_line = clean.split("\n")[0][:100] - if first_line: - title = first_line - - # --- Pass 2: read last chunk for the last timestamp --- - file_size = os.path.getsize(filepath) - if file_size > 10000: - # Only bother with tail-read for non-tiny files - chunk_size = min(file_size, 32768) - with open(filepath, "rb") as f: - f.seek(file_size - chunk_size) - tail = f.read().decode("utf-8", errors="replace") - # Parse lines in reverse to find latest timestamp - for line in reversed(tail.splitlines()): - line = line.strip() - if not line: - continue - try: - entry = json.loads(line) - except json.JSONDecodeError: - continue - ts = entry.get("timestamp") - if ts: - last_ts = ts - break - - return { - "title": title or "Untitled Session", - "first_timestamp": first_ts, - "last_timestamp": last_ts, - } - - -def _normalize_content(content: Any) -> list[dict[str, Any]]: - """Content can be a plain string, a list of strings, or a list of typed - blocks. Normalize everything into [{type, text}, ...] form.""" - if isinstance(content, str): - return [{"type": "text", "text": content}] - if isinstance(content, list): - result = [] - for part in content: - if isinstance(part, str): - result.append({"type": "text", "text": part}) - elif isinstance(part, dict): - result.append(part) - return result - return [] - - -def _extract_text(content_parts: Any) -> str: - """Grab just the text blocks out of a content array, ignore tool_use/thinking.""" - parts = _normalize_content(content_parts) - texts = [] - for part in parts: - if part.get("type") == "text": - texts.append(part.get("text", "")) - return "\n".join(texts) - - -def _extract_images(content_parts: Any) -> list[dict[str, Any]]: - """Pull base64 image blocks out of a content array. - Also looks inside nested tool_result content blocks.""" - parts = _normalize_content(content_parts) - images = [] - for part in parts: - if part.get("type") == "image": - source = part.get("source", {}) - if source.get("type") == "base64" and source.get("data"): - images.append({ - "media_type": source.get("media_type", "image/png"), - "data": source["data"], - }) - elif part.get("type") == "tool_result": - nested = part.get("content", []) - if isinstance(nested, list): - for sub in nested: - if isinstance(sub, dict) and sub.get("type") == "image": - source = sub.get("source", {}) - if source.get("type") == "base64" and source.get("data"): - images.append({ - "media_type": source.get("media_type", "image/png"), - "data": source["data"], - }) - return images - - -def _infer_title(messages: list[MessageDict]) -> str: - """Use the first line of the first real user message as the session title.""" - for msg in messages: - if msg["role"] == "user" and msg.get("text"): - text = _strip_system_tags(msg["text"]).strip() - first_line = text.split("\n")[0][:100] - if first_line: - return first_line - return "Untitled Session" - - -def _strip_system_tags(text: str) -> str: - """Strip out the internal XML tags Claude Code injects (system-reminder, - ide_opened_file, etc.) so exported text is clean.""" - import re - # Remove block tags and their content - for tag in ( - "system-reminder", "ide_opened_file", "user-prompt-submit-hook", - "claude_background_info", "fast_mode_info", "env", - ): - text = re.sub(rf"<{tag}>[\s\S]*?", "", text) - # Strip remaining known opening/closing tags - text = re.sub(r"", "", text) - return text.strip() diff --git a/utils/session_peek.py b/utils/session_peek.py new file mode 100644 index 0000000..afa9b11 --- /dev/null +++ b/utils/session_peek.py @@ -0,0 +1,80 @@ +"""Fast metadata peek for Claude Code JSONL session files.""" + +import json +import os + +from models.session import QuickSessionInfoDict +from utils.jsonl_helpers import entry_message, extract_text, first_title_line + +_TAIL_READ_MIN_BYTES = 10000 +_MAX_HEAD_LINES = 80 + + +def quick_session_info(filepath: str) -> QuickSessionInfoDict: + """Lightweight peek at a session file -- returns title and last_timestamp + without fully parsing all messages. Much faster than parse_session() for + large files. + + Strategy: read at most the first 80 lines for title, then tail-read the end + of files larger than 10_000 bytes for last_timestamp.""" + title = None + first_ts = None + last_ts = None + file_size = os.path.getsize(filepath) + + # --- Pass 1: read first lines to find the title and first_timestamp --- + with open(filepath, "r", encoding="utf-8", errors="replace") as f: + lines_read = 0 + for line in f: + lines_read += 1 + if lines_read > _MAX_HEAD_LINES: + break + line = line.strip() + if not line: + continue + try: + entry = json.loads(line) + except json.JSONDecodeError: + continue + + ts = entry.get("timestamp") + if ts: + if first_ts is None: + first_ts = ts + last_ts = ts # keep updating in case file is small + + if title is None and entry.get("type") == "user": + msg = entry_message(entry) + text = extract_text(msg.get("content", [])) + if text: + first_line = first_title_line(text) + if first_line: + title = first_line + + # --- Pass 2: read last chunk for the last timestamp --- + if file_size > _TAIL_READ_MIN_BYTES: + # Only bother with tail-read for non-tiny files + chunk_size = min(file_size, 32768) + with open(filepath, "rb") as f: + f.seek(file_size - chunk_size) + tail = f.read().decode("utf-8", errors="replace") + # First line in tail is often a partial record after seek; json.loads skips it. + # Parse lines in reverse to find latest timestamp + for line in reversed(tail.splitlines()): + line = line.strip() + if not line: + continue + try: + entry = json.loads(line) + except json.JSONDecodeError: + continue + ts = entry.get("timestamp") + if ts: + last_ts = ts + break + + return { + "title": title or "Untitled Session", + "first_timestamp": first_ts, + "last_timestamp": last_ts, + } diff --git a/utils/tool_dispatch.py b/utils/tool_dispatch.py new file mode 100644 index 0000000..bd42578 --- /dev/null +++ b/utils/tool_dispatch.py @@ -0,0 +1,276 @@ +"""Tool-result classification for Claude Code JSONL toolUseResult blobs. + +Dispatch registry: **first matching predicate wins** (legacy if/elif parity). +Order is load-bearing — do not sort alphabetically or "more specific first" +without replaying tests and real session fixtures. + +Notably ``task_message`` is broad (``task_id`` or ``message``) and sits before +``task_retrieval`` / ``task_completed`` / ``task_async``. + +To add a shape: append ``(pred, build)`` at the end, or insert only after +verifying predicates above would not steal intended matches. +""" + +from typing import Any + + +def _tool_result_pred_bash(tr: dict[str, Any]) -> bool: + return "stdout" in tr or "stderr" in tr + + +def _tool_result_build_bash(tr: dict[str, Any], base: dict[str, Any]) -> dict[str, Any]: + result = dict(base) + result["result_type"] = "bash" + result["stdout"] = tr.get("stdout", "") + result["stderr"] = tr.get("stderr", "") + result["exit_code"] = tr.get("exitCode") + result["interrupted"] = tr.get("interrupted", False) + result["is_error"] = tr.get("is_error", False) + result["return_code_interpretation"] = tr.get("returnCodeInterpretation") + return result + + +def _tool_result_pred_file_edit(tr: dict[str, Any]) -> bool: + return "structuredPatch" in tr or ( + "filePath" in tr and "newString" in tr + ) + + +def _tool_result_build_file_edit(tr: dict[str, Any], base: dict[str, Any]) -> dict[str, Any]: + # Summary fields only; full blob (e.g. structuredPatch) stays on message tool_result. + result = dict(base) + result["result_type"] = "file_edit" + result["file_path"] = tr.get("filePath", "") + result["replace_all"] = tr.get("replaceAll", False) + return result + + +def _tool_result_pred_plan(tr: dict[str, Any]) -> bool: + return "plan" in tr and "filePath" in tr + + +def _tool_result_build_plan(tr: dict[str, Any], base: dict[str, Any]) -> dict[str, Any]: + result = dict(base) + result["result_type"] = "plan" + result["file_path"] = tr.get("filePath", "") + return result + + +def _tool_result_pred_file_write(tr: dict[str, Any]) -> bool: + return "filePath" in tr and "content" in tr + + +def _tool_result_build_file_write(tr: dict[str, Any], base: dict[str, Any]) -> dict[str, Any]: + result = dict(base) + result["result_type"] = "file_write" + result["file_path"] = tr.get("filePath", "") + return result + + +def _tool_result_pred_glob(tr: dict[str, Any]) -> bool: + return "filenames" in tr and isinstance(tr.get("filenames"), list) + + +def _tool_result_build_glob(tr: dict[str, Any], base: dict[str, Any]) -> dict[str, Any]: + result = dict(base) + filenames = tr["filenames"] + result["result_type"] = "glob" + result["num_files"] = tr.get("numFiles", len(filenames)) + result["truncated"] = tr.get("truncated", False) + result["duration_ms"] = tr.get("durationMs") + result["filenames"] = filenames + return result + + +def _tool_result_pred_grep(tr: dict[str, Any]) -> bool: + return "mode" in tr and "numFiles" in tr + + +def _tool_result_build_grep(tr: dict[str, Any], base: dict[str, Any]) -> dict[str, Any]: + result = dict(base) + result["result_type"] = "grep" + result["mode"] = tr.get("mode") + result["num_files"] = tr.get("numFiles", 0) + result["num_lines"] = tr.get("numLines", 0) + result["duration_ms"] = tr.get("durationMs") + content = tr.get("content", "") + if isinstance(content, str): + result["content"] = content + return result + + +def _tool_result_pred_file_read(tr: dict[str, Any]) -> bool: + return "file" in tr and isinstance(tr["file"], dict) + + +def _tool_result_build_file_read(tr: dict[str, Any], base: dict[str, Any]) -> dict[str, Any]: + result = dict(base) + file_obj = tr["file"] + result["result_type"] = "file_read" + result["file_path"] = file_obj.get("filePath", "") + result["num_lines"] = file_obj.get("numLines") + content = file_obj.get("content", "") + if isinstance(content, str): + result["content"] = content + return result + + +def _tool_result_pred_web_search(tr: dict[str, Any]) -> bool: + return "query" in tr and "results" in tr + + +def _tool_result_build_web_search(tr: dict[str, Any], base: dict[str, Any]) -> dict[str, Any]: + result = dict(base) + result["result_type"] = "web_search" + result["query"] = tr.get("query", "") + # Defensive: legacy ``len(tr.get("results", []))`` crashed when key existed + # with value None (``len(None)``). Non-sized ``results`` → count 0. + raw_results = tr.get("results") + if isinstance(raw_results, (list, tuple, set, dict)): + result["result_count"] = len(raw_results) + else: + result["result_count"] = 0 + result["duration_seconds"] = tr.get("durationSeconds") + return result + + +def _tool_result_pred_web_fetch(tr: dict[str, Any]) -> bool: + return "url" in tr and "code" in tr + + +def _tool_result_build_web_fetch(tr: dict[str, Any], base: dict[str, Any]) -> dict[str, Any]: + result = dict(base) + result["result_type"] = "web_fetch" + result["url"] = tr.get("url", "") + result["status_code"] = tr.get("code") + result["duration_ms"] = tr.get("durationMs") + return result + + +def _tool_result_pred_task_message(tr: dict[str, Any]) -> bool: + # Broad: matches ``task_id`` OR ``message``. Runs before retrieval/completed/async + # arms below — same short-circuit order as the original if/elif chain. Payloads + # that also carry e.g. ``agentId`` still classify here if they have ``message``. + # Refining order needs golden fixtures; track as follow-up if real collisions appear. + return "task_id" in tr or "message" in tr + + +def _tool_result_build_task_message(tr: dict[str, Any], base: dict[str, Any]) -> dict[str, Any]: + result = dict(base) + result["result_type"] = "task" + result["task_id"] = tr.get("task_id") + result["task_type"] = tr.get("task_type") + return result + + +def _tool_result_pred_task_retrieval(tr: dict[str, Any]) -> bool: + return "retrieval_status" in tr and "task" in tr + + +def _tool_result_build_task_retrieval(tr: dict[str, Any], base: dict[str, Any]) -> dict[str, Any]: + result = dict(base) + task_obj = tr["task"] if isinstance(tr["task"], dict) else {} + result["result_type"] = "task" + result["retrieval_status"] = tr.get("retrieval_status") + result["task_id"] = task_obj.get("task_id") + return result + + +def _tool_result_pred_task_completed(tr: dict[str, Any]) -> bool: + return "agentId" in tr and "totalDurationMs" in tr + + +def _tool_result_build_task_completed(tr: dict[str, Any], base: dict[str, Any]) -> dict[str, Any]: + result = dict(base) + result["result_type"] = "task" + result["agent_id"] = tr.get("agentId") + result["status"] = tr.get("status") + result["total_duration_ms"] = tr.get("totalDurationMs") + result["total_tokens"] = tr.get("totalTokens") + result["total_tool_use_count"] = tr.get("totalToolUseCount") + return result + + +def _tool_result_pred_task_async(tr: dict[str, Any]) -> bool: + return "agentId" in tr and "isAsync" in tr + + +def _tool_result_build_task_async(tr: dict[str, Any], base: dict[str, Any]) -> dict[str, Any]: + result = dict(base) + result["result_type"] = "task" + result["agent_id"] = tr.get("agentId") + result["status"] = tr.get("status") + result["description"] = tr.get("description") + return result + + +def _tool_result_pred_todo_write(tr: dict[str, Any]) -> bool: + return "newTodos" in tr or "oldTodos" in tr + + +def _tool_result_build_todo_write(tr: dict[str, Any], base: dict[str, Any]) -> dict[str, Any]: + result = dict(base) + new_todos = tr.get("newTodos", []) + result["result_type"] = "todo_write" + result["todo_count"] = len(new_todos) if isinstance(new_todos, list) else 0 + result["todos"] = new_todos if isinstance(new_todos, list) else [] + return result + + +def _tool_result_pred_user_input(tr: dict[str, Any]) -> bool: + return "questions" in tr and "answers" in tr + + +def _tool_result_build_user_input(tr: dict[str, Any], base: dict[str, Any]) -> dict[str, Any]: + result = dict(base) + result["result_type"] = "user_input" + result["questions"] = tr.get("questions", []) + result["answers"] = tr.get("answers", {}) + return result + + +# Registry order is load-bearing (see module docstring). +# ``plan`` before ``file_write``: plan blobs may carry ``filePath`` + ``content``. +_TOOL_RESULT_DISPATCH = ( + (_tool_result_pred_bash, _tool_result_build_bash), + (_tool_result_pred_file_edit, _tool_result_build_file_edit), + (_tool_result_pred_plan, _tool_result_build_plan), + (_tool_result_pred_file_write, _tool_result_build_file_write), + (_tool_result_pred_glob, _tool_result_build_glob), + (_tool_result_pred_grep, _tool_result_build_grep), + (_tool_result_pred_file_read, _tool_result_build_file_read), + (_tool_result_pred_web_search, _tool_result_build_web_search), + (_tool_result_pred_web_fetch, _tool_result_build_web_fetch), + (_tool_result_pred_task_message, _tool_result_build_task_message), + (_tool_result_pred_task_retrieval, _tool_result_build_task_retrieval), + (_tool_result_pred_task_completed, _tool_result_build_task_completed), + (_tool_result_pred_task_async, _tool_result_build_task_async), + (_tool_result_pred_todo_write, _tool_result_build_todo_write), + (_tool_result_pred_user_input, _tool_result_build_user_input), +) + + +def _parse_tool_result( + tool_result: Any, slug: str | None = None +) -> dict[str, Any] | None: + """Figure out what kind of tool result this is (bash, file edit, glob, etc.) + by looking at which keys are present, since the JSONL doesn't always tag them. + + Classification uses ``_TOOL_RESULT_DISPATCH``: ordered ``(predicate, builder)`` + pairs; the **first** predicate that matches wins (parity with the historical + ``if``/``elif`` chain — order is not strictly “specific before generic”). + + Append a new pair at the end to register a shape, or insert mid-table only + after checking interactions with broader predicates above (see notes on the + tuple).""" + if not isinstance(tool_result, dict): + return None + + base = {"slug": slug} + for pred, build in _TOOL_RESULT_DISPATCH: + if pred(tool_result): + return build(tool_result, base) + + result = dict(base) + result["result_type"] = "unknown" + return result