From e3ab28b73d15053cec8517b2cea0a6fc112a6658 Mon Sep 17 00:00:00 2001 From: chen Date: Thu, 28 May 2026 05:22:23 +0800 Subject: [PATCH 1/5] Extract JSONL parser monolith into focused modules --- docs/architecture.md | 2 +- utils/jsonl_helpers.py | 94 +++++++++ utils/jsonl_parser.py | 452 +++-------------------------------------- utils/session_peek.py | 77 +++++++ utils/tool_dispatch.py | 283 ++++++++++++++++++++++++++ 5 files changed, 483 insertions(+), 425 deletions(-) create mode 100644 utils/jsonl_helpers.py create mode 100644 utils/session_peek.py create mode 100644 utils/tool_dispatch.py diff --git a/docs/architecture.md b/docs/architecture.md index a64ff92..8831461 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -71,7 +71,7 @@ ## Dispatch table -In `utils/jsonl_parser.py`, tool results are classified through `_parse_tool_result`, a **predicate-ordered dispatch table** (not a simple `if tool_name == ...` chain). **Order is load-bearing**: the first matching predicate wins. Tests in `tests/test_jsonl_parser.py` guard ordering regressions. +In `utils/tool_dispatch.py`, tool results are classified through `_parse_tool_result`, a **predicate-ordered dispatch table** (not a simple `if tool_name == ...` chain). **Order is load-bearing**: the first matching predicate wins. Tests in `tests/test_jsonl_parser.py` and `tests/test_real_session_fixtures.py` guard ordering regressions. When adding a new tool renderer: diff --git a/utils/jsonl_helpers.py b/utils/jsonl_helpers.py new file mode 100644 index 0000000..1e4c2e1 --- /dev/null +++ b/utils/jsonl_helpers.py @@ -0,0 +1,94 @@ +"""Shared content helpers for JSONL parsing and session peek.""" + +import re +from typing import Any + +from models.session import MessageDict + + +def entry_message(entry: dict[str, Any]) -> dict[str, Any]: + m = entry.get("message") + return m if isinstance(m, dict) else {} + + +def normalize_content(content: Any) -> list[dict[str, Any]]: + """Content can be a plain string, a list of strings, or a list of typed + blocks. Normalize everything into [{type, text}, ...] form.""" + if isinstance(content, str): + return [{"type": "text", "text": content}] + if isinstance(content, list): + result = [] + for part in content: + if isinstance(part, str): + result.append({"type": "text", "text": part}) + elif isinstance(part, dict): + result.append(part) + return result + return [] + + +def extract_text(content_parts: Any) -> str: + """Grab just the text blocks out of a content array, ignore tool_use/thinking.""" + parts = normalize_content(content_parts) + texts = [] + for part in parts: + if part.get("type") == "text": + texts.append(part.get("text", "")) + return "\n".join(texts) + + +def extract_images(content_parts: Any) -> list[dict[str, Any]]: + """Pull base64 image blocks out of a content array. + Also looks inside nested tool_result content blocks.""" + parts = normalize_content(content_parts) + images = [] + for part in parts: + if part.get("type") == "image": + source = part.get("source", {}) + if source.get("type") == "base64" and source.get("data"): + images.append({ + "media_type": source.get("media_type", "image/png"), + "data": source["data"], + }) + elif part.get("type") == "tool_result": + nested = part.get("content", []) + if isinstance(nested, list): + for sub in nested: + if isinstance(sub, dict) and sub.get("type") == "image": + source = sub.get("source", {}) + if source.get("type") == "base64" and source.get("data"): + images.append({ + "media_type": source.get("media_type", "image/png"), + "data": source["data"], + }) + return images + + +def infer_title(messages: list[MessageDict]) -> str: + """Use the first line of the first real user message as the session title.""" + for msg in messages: + if msg["role"] == "user" and msg.get("text"): + text = strip_system_tags(msg["text"]).strip() + first_line = text.split("\n")[0][:100] + if first_line: + return first_line + return "Untitled Session" + + +def strip_system_tags(text: str) -> str: + """Strip out the internal XML tags Claude Code injects (system-reminder, + ide_opened_file, etc.) so exported text is clean.""" + # Remove block tags and their content + for tag in ( + "system-reminder", "ide_opened_file", "user-prompt-submit-hook", + "claude_background_info", "fast_mode_info", "env", + ): + text = re.sub(rf"<{tag}>[\s\S]*?", "", text) + # Strip remaining known opening/closing tags + text = re.sub( + r"", + "", + text, + ) + return text.strip() diff --git a/utils/jsonl_parser.py b/utils/jsonl_parser.py index 14b71c3..4d477d6 100644 --- a/utils/jsonl_parser.py +++ b/utils/jsonl_parser.py @@ -6,9 +6,36 @@ from datetime import datetime from typing import Any -from models.session import MessageDict, QuickSessionInfoDict, SessionDict +from models.session import MessageDict, SessionDict +from utils.jsonl_helpers import ( + entry_message as _entry_message, + extract_images as _extract_images, + extract_text as _extract_text, + infer_title as _infer_title, + normalize_content as _normalize_content, + strip_system_tags as _strip_system_tags, +) +from utils.session_peek import quick_session_info +from utils.tool_dispatch import _TOOL_RESULT_DISPATCH, _parse_tool_result from utils.validation import validate_session_dict +__all__ = [ + "parse_session", + "quick_session_info", + "_parse_tool_result", + "_TOOL_RESULT_DISPATCH", + "_process_user", + "_process_assistant", + "_process_system", + "_process_progress", + "_normalize_content", + "_extract_text", + "_extract_images", + "_infer_title", + "_strip_system_tags", + "_track_file_activity", +] + def parse_session(filepath: str) -> SessionDict: """Main entry point. Reads every line from a .jsonl file and builds up @@ -131,11 +158,6 @@ def parse_session(filepath: str) -> SessionDict: ) -def _entry_message(entry: dict[str, Any]) -> dict[str, Any]: - m = entry.get("message") - return m if isinstance(m, dict) else {} - - def _process_user( entry: dict[str, Any], messages: list[MessageDict], metadata: dict[str, Any] ) -> None: @@ -341,421 +363,3 @@ def _track_file_activity( url_or_query = tool_input.get("url") or tool_input.get("query", "") if url_or_query: metadata["web_fetches"].append(url_or_query) - - -def _tool_result_pred_bash(tr: dict[str, Any]) -> bool: - return "stdout" in tr or "stderr" in tr - - -def _tool_result_build_bash(tr: dict[str, Any], base: dict[str, Any]) -> dict[str, Any]: - result = dict(base) - result["result_type"] = "bash" - result["stdout"] = tr.get("stdout", "") - result["stderr"] = tr.get("stderr", "") - result["exit_code"] = tr.get("exitCode") - result["interrupted"] = tr.get("interrupted", False) - result["is_error"] = tr.get("is_error", False) - result["return_code_interpretation"] = tr.get("returnCodeInterpretation") - return result - - -def _tool_result_pred_file_edit(tr: dict[str, Any]) -> bool: - return "structuredPatch" in tr or ( - "filePath" in tr and "newString" in tr - ) - - -def _tool_result_build_file_edit(tr: dict[str, Any], base: dict[str, Any]) -> dict[str, Any]: - result = dict(base) - result["result_type"] = "file_edit" - result["file_path"] = tr.get("filePath", "") - result["replace_all"] = tr.get("replaceAll", False) - return result - - -def _tool_result_pred_file_write(tr: dict[str, Any]) -> bool: - return "filePath" in tr and "content" in tr - - -def _tool_result_build_file_write(tr: dict[str, Any], base: dict[str, Any]) -> dict[str, Any]: - result = dict(base) - result["result_type"] = "file_write" - result["file_path"] = tr.get("filePath", "") - return result - - -def _tool_result_pred_glob(tr: dict[str, Any]) -> bool: - return "filenames" in tr and isinstance(tr.get("filenames"), list) - - -def _tool_result_build_glob(tr: dict[str, Any], base: dict[str, Any]) -> dict[str, Any]: - result = dict(base) - filenames = tr["filenames"] - result["result_type"] = "glob" - result["num_files"] = tr.get("numFiles", len(filenames)) - result["truncated"] = tr.get("truncated", False) - result["duration_ms"] = tr.get("durationMs") - result["filenames"] = filenames - return result - - -def _tool_result_pred_grep(tr: dict[str, Any]) -> bool: - return "mode" in tr and "numFiles" in tr - - -def _tool_result_build_grep(tr: dict[str, Any], base: dict[str, Any]) -> dict[str, Any]: - result = dict(base) - result["result_type"] = "grep" - result["mode"] = tr.get("mode") - result["num_files"] = tr.get("numFiles", 0) - result["num_lines"] = tr.get("numLines", 0) - result["duration_ms"] = tr.get("durationMs") - content = tr.get("content", "") - if content and isinstance(content, str): - result["content"] = content - return result - - -def _tool_result_pred_file_read(tr: dict[str, Any]) -> bool: - return "file" in tr and isinstance(tr["file"], dict) - - -def _tool_result_build_file_read(tr: dict[str, Any], base: dict[str, Any]) -> dict[str, Any]: - result = dict(base) - file_obj = tr["file"] - result["result_type"] = "file_read" - result["file_path"] = file_obj.get("filePath", "") - result["num_lines"] = file_obj.get("numLines") - content = file_obj.get("content", "") - if content and isinstance(content, str): - result["content"] = content - return result - - -def _tool_result_pred_web_search(tr: dict[str, Any]) -> bool: - return "query" in tr and "results" in tr - - -def _tool_result_build_web_search(tr: dict[str, Any], base: dict[str, Any]) -> dict[str, Any]: - result = dict(base) - result["result_type"] = "web_search" - result["query"] = tr.get("query", "") - # Defensive: legacy ``len(tr.get("results", []))`` crashed when key existed - # with value None (``len(None)``). Non-sized ``results`` → count 0. - raw_results = tr.get("results") - if isinstance(raw_results, (list, tuple, set, dict)): - result["result_count"] = len(raw_results) - else: - result["result_count"] = 0 - result["duration_seconds"] = tr.get("durationSeconds") - return result - - -def _tool_result_pred_web_fetch(tr: dict[str, Any]) -> bool: - return "url" in tr and "code" in tr - - -def _tool_result_build_web_fetch(tr: dict[str, Any], base: dict[str, Any]) -> dict[str, Any]: - result = dict(base) - result["result_type"] = "web_fetch" - result["url"] = tr.get("url", "") - result["status_code"] = tr.get("code") - result["duration_ms"] = tr.get("durationMs") - return result - - -def _tool_result_pred_task_message(tr: dict[str, Any]) -> bool: - # Broad: matches ``task_id`` OR ``message``. Runs before retrieval/completed/async - # arms below — same short-circuit order as the original if/elif chain. Payloads - # that also carry e.g. ``agentId`` still classify here if they have ``message``. - # Refining order needs golden fixtures; track as follow-up if real collisions appear. - return "task_id" in tr or "message" in tr - - -def _tool_result_build_task_message(tr: dict[str, Any], base: dict[str, Any]) -> dict[str, Any]: - result = dict(base) - result["result_type"] = "task" - result["task_id"] = tr.get("task_id") - result["task_type"] = tr.get("task_type") - return result - - -def _tool_result_pred_task_retrieval(tr: dict[str, Any]) -> bool: - return "retrieval_status" in tr and "task" in tr - - -def _tool_result_build_task_retrieval(tr: dict[str, Any], base: dict[str, Any]) -> dict[str, Any]: - result = dict(base) - task_obj = tr["task"] if isinstance(tr["task"], dict) else {} - result["result_type"] = "task" - result["retrieval_status"] = tr.get("retrieval_status") - result["task_id"] = task_obj.get("task_id") - return result - - -def _tool_result_pred_task_completed(tr: dict[str, Any]) -> bool: - return "agentId" in tr and "totalDurationMs" in tr - - -def _tool_result_build_task_completed(tr: dict[str, Any], base: dict[str, Any]) -> dict[str, Any]: - result = dict(base) - result["result_type"] = "task" - result["agent_id"] = tr.get("agentId") - result["status"] = tr.get("status") - result["total_duration_ms"] = tr.get("totalDurationMs") - result["total_tokens"] = tr.get("totalTokens") - result["total_tool_use_count"] = tr.get("totalToolUseCount") - return result - - -def _tool_result_pred_task_async(tr: dict[str, Any]) -> bool: - return "agentId" in tr and "isAsync" in tr - - -def _tool_result_build_task_async(tr: dict[str, Any], base: dict[str, Any]) -> dict[str, Any]: - result = dict(base) - result["result_type"] = "task" - result["agent_id"] = tr.get("agentId") - result["status"] = tr.get("status") - result["description"] = tr.get("description") - return result - - -def _tool_result_pred_todo_write(tr: dict[str, Any]) -> bool: - return "newTodos" in tr or "oldTodos" in tr - - -def _tool_result_build_todo_write(tr: dict[str, Any], base: dict[str, Any]) -> dict[str, Any]: - result = dict(base) - new_todos = tr.get("newTodos", []) - result["result_type"] = "todo_write" - result["todo_count"] = len(new_todos) if isinstance(new_todos, list) else 0 - result["todos"] = new_todos if isinstance(new_todos, list) else [] - return result - - -def _tool_result_pred_user_input(tr: dict[str, Any]) -> bool: - return "questions" in tr and "answers" in tr - - -def _tool_result_build_user_input(tr: dict[str, Any], base: dict[str, Any]) -> dict[str, Any]: - result = dict(base) - result["result_type"] = "user_input" - result["questions"] = tr.get("questions", []) - result["answers"] = tr.get("answers", {}) - return result - - -def _tool_result_pred_plan(tr: dict[str, Any]) -> bool: - return "plan" in tr and "filePath" in tr - - -def _tool_result_build_plan(tr: dict[str, Any], base: dict[str, Any]) -> dict[str, Any]: - result = dict(base) - result["result_type"] = "plan" - result["file_path"] = tr.get("filePath", "") - return result - - -# Dispatch registry: **first matching predicate wins** (same as legacy if/elif). -# Order is load-bearing — do not sort alphabetically or “more specific first” -# without replaying tests and real session fixtures. -# -# Notably ``task_message`` is intentionally broad (``task_id`` or ``message``) -# and sits before ``task_retrieval`` / ``task_completed`` / ``task_async`` so -# payloads that include overlapping keys still match the legacy branch order. -# -# To add a shape: append ``(pred, build)`` here, or insert only after verifying -# predicates above would not steal intended matches. -_TOOL_RESULT_DISPATCH = ( - (_tool_result_pred_bash, _tool_result_build_bash), - (_tool_result_pred_file_edit, _tool_result_build_file_edit), - (_tool_result_pred_file_write, _tool_result_build_file_write), - (_tool_result_pred_glob, _tool_result_build_glob), - (_tool_result_pred_grep, _tool_result_build_grep), - (_tool_result_pred_file_read, _tool_result_build_file_read), - (_tool_result_pred_web_search, _tool_result_build_web_search), - (_tool_result_pred_web_fetch, _tool_result_build_web_fetch), - (_tool_result_pred_task_message, _tool_result_build_task_message), - (_tool_result_pred_task_retrieval, _tool_result_build_task_retrieval), - (_tool_result_pred_task_completed, _tool_result_build_task_completed), - (_tool_result_pred_task_async, _tool_result_build_task_async), - (_tool_result_pred_todo_write, _tool_result_build_todo_write), - (_tool_result_pred_user_input, _tool_result_build_user_input), - (_tool_result_pred_plan, _tool_result_build_plan), -) - - -def _parse_tool_result( - tool_result: Any, slug: str | None = None -) -> dict[str, Any] | None: - """Figure out what kind of tool result this is (bash, file edit, glob, etc.) - by looking at which keys are present, since the JSONL doesn't always tag them. - - Classification uses ``_TOOL_RESULT_DISPATCH``: ordered ``(predicate, builder)`` - pairs; the **first** predicate that matches wins (parity with the historical - ``if``/``elif`` chain — order is not strictly “specific before generic”). - - Append a new pair at the end to register a shape, or insert mid-table only - after checking interactions with broader predicates above (see notes on the - tuple).""" - if not isinstance(tool_result, dict): - return None - - base = {"slug": slug} - for pred, build in _TOOL_RESULT_DISPATCH: - if pred(tool_result): - return build(tool_result, base) - - result = dict(base) - result["result_type"] = "unknown" - return result - - -def quick_session_info(filepath: str) -> QuickSessionInfoDict: - """Lightweight peek at a session file -- returns title and last_timestamp - without fully parsing all messages. Much faster than parse_session() for - large files. - - Strategy: read the first ~50 lines for the title, then seek to the end of - the file and read the last chunk to find the last timestamp.""" - title = None - first_ts = None - last_ts = None - - # --- Pass 1: read first lines to find the title and first_timestamp --- - with open(filepath, "r", encoding="utf-8", errors="replace") as f: - lines_read = 0 - for line in f: - lines_read += 1 - if lines_read > 80: - break - line = line.strip() - if not line: - continue - try: - entry = json.loads(line) - except json.JSONDecodeError: - continue - - ts = entry.get("timestamp") - if ts: - if first_ts is None: - first_ts = ts - last_ts = ts # keep updating in case file is small - - if title is None and entry.get("type") == "user": - msg = _entry_message(entry) - text = _extract_text(msg.get("content", [])) - if text: - clean = _strip_system_tags(text).strip() - first_line = clean.split("\n")[0][:100] - if first_line: - title = first_line - - # --- Pass 2: read last chunk for the last timestamp --- - file_size = os.path.getsize(filepath) - if file_size > 10000: - # Only bother with tail-read for non-tiny files - chunk_size = min(file_size, 32768) - with open(filepath, "rb") as f: - f.seek(file_size - chunk_size) - tail = f.read().decode("utf-8", errors="replace") - # Parse lines in reverse to find latest timestamp - for line in reversed(tail.splitlines()): - line = line.strip() - if not line: - continue - try: - entry = json.loads(line) - except json.JSONDecodeError: - continue - ts = entry.get("timestamp") - if ts: - last_ts = ts - break - - return { - "title": title or "Untitled Session", - "first_timestamp": first_ts, - "last_timestamp": last_ts, - } - - -def _normalize_content(content: Any) -> list[dict[str, Any]]: - """Content can be a plain string, a list of strings, or a list of typed - blocks. Normalize everything into [{type, text}, ...] form.""" - if isinstance(content, str): - return [{"type": "text", "text": content}] - if isinstance(content, list): - result = [] - for part in content: - if isinstance(part, str): - result.append({"type": "text", "text": part}) - elif isinstance(part, dict): - result.append(part) - return result - return [] - - -def _extract_text(content_parts: Any) -> str: - """Grab just the text blocks out of a content array, ignore tool_use/thinking.""" - parts = _normalize_content(content_parts) - texts = [] - for part in parts: - if part.get("type") == "text": - texts.append(part.get("text", "")) - return "\n".join(texts) - - -def _extract_images(content_parts: Any) -> list[dict[str, Any]]: - """Pull base64 image blocks out of a content array. - Also looks inside nested tool_result content blocks.""" - parts = _normalize_content(content_parts) - images = [] - for part in parts: - if part.get("type") == "image": - source = part.get("source", {}) - if source.get("type") == "base64" and source.get("data"): - images.append({ - "media_type": source.get("media_type", "image/png"), - "data": source["data"], - }) - elif part.get("type") == "tool_result": - nested = part.get("content", []) - if isinstance(nested, list): - for sub in nested: - if isinstance(sub, dict) and sub.get("type") == "image": - source = sub.get("source", {}) - if source.get("type") == "base64" and source.get("data"): - images.append({ - "media_type": source.get("media_type", "image/png"), - "data": source["data"], - }) - return images - - -def _infer_title(messages: list[MessageDict]) -> str: - """Use the first line of the first real user message as the session title.""" - for msg in messages: - if msg["role"] == "user" and msg.get("text"): - text = _strip_system_tags(msg["text"]).strip() - first_line = text.split("\n")[0][:100] - if first_line: - return first_line - return "Untitled Session" - - -def _strip_system_tags(text: str) -> str: - """Strip out the internal XML tags Claude Code injects (system-reminder, - ide_opened_file, etc.) so exported text is clean.""" - import re - # Remove block tags and their content - for tag in ( - "system-reminder", "ide_opened_file", "user-prompt-submit-hook", - "claude_background_info", "fast_mode_info", "env", - ): - text = re.sub(rf"<{tag}>[\s\S]*?", "", text) - # Strip remaining known opening/closing tags - text = re.sub(r"", "", text) - return text.strip() diff --git a/utils/session_peek.py b/utils/session_peek.py new file mode 100644 index 0000000..7cf7c9c --- /dev/null +++ b/utils/session_peek.py @@ -0,0 +1,77 @@ +"""Fast metadata peek for Claude Code JSONL session files.""" + +import json +import os + +from models.session import QuickSessionInfoDict +from utils.jsonl_helpers import entry_message, extract_text, strip_system_tags + + +def quick_session_info(filepath: str) -> QuickSessionInfoDict: + """Lightweight peek at a session file -- returns title and last_timestamp + without fully parsing all messages. Much faster than parse_session() for + large files. + + Strategy: read the first ~50 lines for the title, then seek to the end of + the file and read the last chunk to find the last timestamp.""" + title = None + first_ts = None + last_ts = None + + # --- Pass 1: read first lines to find the title and first_timestamp --- + with open(filepath, "r", encoding="utf-8", errors="replace") as f: + lines_read = 0 + for line in f: + lines_read += 1 + if lines_read > 80: + break + line = line.strip() + if not line: + continue + try: + entry = json.loads(line) + except json.JSONDecodeError: + continue + + ts = entry.get("timestamp") + if ts: + if first_ts is None: + first_ts = ts + last_ts = ts # keep updating in case file is small + + if title is None and entry.get("type") == "user": + msg = entry_message(entry) + text = extract_text(msg.get("content", [])) + if text: + clean = strip_system_tags(text).strip() + first_line = clean.split("\n")[0][:100] + if first_line: + title = first_line + + # --- Pass 2: read last chunk for the last timestamp --- + file_size = os.path.getsize(filepath) + if file_size > 10000: + # Only bother with tail-read for non-tiny files + chunk_size = min(file_size, 32768) + with open(filepath, "rb") as f: + f.seek(file_size - chunk_size) + tail = f.read().decode("utf-8", errors="replace") + # Parse lines in reverse to find latest timestamp + for line in reversed(tail.splitlines()): + line = line.strip() + if not line: + continue + try: + entry = json.loads(line) + except json.JSONDecodeError: + continue + ts = entry.get("timestamp") + if ts: + last_ts = ts + break + + return { + "title": title or "Untitled Session", + "first_timestamp": first_ts, + "last_timestamp": last_ts, + } diff --git a/utils/tool_dispatch.py b/utils/tool_dispatch.py new file mode 100644 index 0000000..a67752a --- /dev/null +++ b/utils/tool_dispatch.py @@ -0,0 +1,283 @@ +"""Tool-result classification for Claude Code JSONL toolUseResult blobs. + +Dispatch registry: **first matching predicate wins** (legacy if/elif parity). +Order is load-bearing — do not sort alphabetically or "more specific first" +without replaying tests and real session fixtures. + +Notably ``task_message`` is broad (``task_id`` or ``message``) and sits before +``task_retrieval`` / ``task_completed`` / ``task_async``. + +To add a shape: append ``(pred, build)`` at the end, or insert only after +verifying predicates above would not steal intended matches. +""" + +from typing import Any + + +def _tool_result_pred_bash(tr: dict[str, Any]) -> bool: + return "stdout" in tr or "stderr" in tr + + +def _tool_result_build_bash(tr: dict[str, Any], base: dict[str, Any]) -> dict[str, Any]: + result = dict(base) + result["result_type"] = "bash" + result["stdout"] = tr.get("stdout", "") + result["stderr"] = tr.get("stderr", "") + result["exit_code"] = tr.get("exitCode") + result["interrupted"] = tr.get("interrupted", False) + result["is_error"] = tr.get("is_error", False) + result["return_code_interpretation"] = tr.get("returnCodeInterpretation") + return result + + +def _tool_result_pred_file_edit(tr: dict[str, Any]) -> bool: + return "structuredPatch" in tr or ( + "filePath" in tr and "newString" in tr + ) + + +def _tool_result_build_file_edit(tr: dict[str, Any], base: dict[str, Any]) -> dict[str, Any]: + result = dict(base) + result["result_type"] = "file_edit" + result["file_path"] = tr.get("filePath", "") + result["replace_all"] = tr.get("replaceAll", False) + return result + + +def _tool_result_pred_file_write(tr: dict[str, Any]) -> bool: + return "filePath" in tr and "content" in tr + + +def _tool_result_build_file_write(tr: dict[str, Any], base: dict[str, Any]) -> dict[str, Any]: + result = dict(base) + result["result_type"] = "file_write" + result["file_path"] = tr.get("filePath", "") + return result + + +def _tool_result_pred_glob(tr: dict[str, Any]) -> bool: + return "filenames" in tr and isinstance(tr.get("filenames"), list) + + +def _tool_result_build_glob(tr: dict[str, Any], base: dict[str, Any]) -> dict[str, Any]: + result = dict(base) + filenames = tr["filenames"] + result["result_type"] = "glob" + result["num_files"] = tr.get("numFiles", len(filenames)) + result["truncated"] = tr.get("truncated", False) + result["duration_ms"] = tr.get("durationMs") + result["filenames"] = filenames + return result + + +def _tool_result_pred_grep(tr: dict[str, Any]) -> bool: + return "mode" in tr and "numFiles" in tr + + +def _tool_result_build_grep(tr: dict[str, Any], base: dict[str, Any]) -> dict[str, Any]: + result = dict(base) + result["result_type"] = "grep" + result["mode"] = tr.get("mode") + result["num_files"] = tr.get("numFiles", 0) + result["num_lines"] = tr.get("numLines", 0) + result["duration_ms"] = tr.get("durationMs") + content = tr.get("content", "") + if content and isinstance(content, str): + result["content"] = content + return result + + +def _tool_result_pred_file_read(tr: dict[str, Any]) -> bool: + return "file" in tr and isinstance(tr["file"], dict) + + +def _tool_result_build_file_read(tr: dict[str, Any], base: dict[str, Any]) -> dict[str, Any]: + result = dict(base) + file_obj = tr["file"] + result["result_type"] = "file_read" + result["file_path"] = file_obj.get("filePath", "") + result["num_lines"] = file_obj.get("numLines") + content = file_obj.get("content", "") + if content and isinstance(content, str): + result["content"] = content + return result + + +def _tool_result_pred_web_search(tr: dict[str, Any]) -> bool: + return "query" in tr and "results" in tr + + +def _tool_result_build_web_search(tr: dict[str, Any], base: dict[str, Any]) -> dict[str, Any]: + result = dict(base) + result["result_type"] = "web_search" + result["query"] = tr.get("query", "") + # Defensive: legacy ``len(tr.get("results", []))`` crashed when key existed + # with value None (``len(None)``). Non-sized ``results`` → count 0. + raw_results = tr.get("results") + if isinstance(raw_results, (list, tuple, set, dict)): + result["result_count"] = len(raw_results) + else: + result["result_count"] = 0 + result["duration_seconds"] = tr.get("durationSeconds") + return result + + +def _tool_result_pred_web_fetch(tr: dict[str, Any]) -> bool: + return "url" in tr and "code" in tr + + +def _tool_result_build_web_fetch(tr: dict[str, Any], base: dict[str, Any]) -> dict[str, Any]: + result = dict(base) + result["result_type"] = "web_fetch" + result["url"] = tr.get("url", "") + result["status_code"] = tr.get("code") + result["duration_ms"] = tr.get("durationMs") + return result + + +def _tool_result_pred_task_message(tr: dict[str, Any]) -> bool: + # Broad: matches ``task_id`` OR ``message``. Runs before retrieval/completed/async + # arms below — same short-circuit order as the original if/elif chain. Payloads + # that also carry e.g. ``agentId`` still classify here if they have ``message``. + # Refining order needs golden fixtures; track as follow-up if real collisions appear. + return "task_id" in tr or "message" in tr + + +def _tool_result_build_task_message(tr: dict[str, Any], base: dict[str, Any]) -> dict[str, Any]: + result = dict(base) + result["result_type"] = "task" + result["task_id"] = tr.get("task_id") + result["task_type"] = tr.get("task_type") + return result + + +def _tool_result_pred_task_retrieval(tr: dict[str, Any]) -> bool: + return "retrieval_status" in tr and "task" in tr + + +def _tool_result_build_task_retrieval(tr: dict[str, Any], base: dict[str, Any]) -> dict[str, Any]: + result = dict(base) + task_obj = tr["task"] if isinstance(tr["task"], dict) else {} + result["result_type"] = "task" + result["retrieval_status"] = tr.get("retrieval_status") + result["task_id"] = task_obj.get("task_id") + return result + + +def _tool_result_pred_task_completed(tr: dict[str, Any]) -> bool: + return "agentId" in tr and "totalDurationMs" in tr + + +def _tool_result_build_task_completed(tr: dict[str, Any], base: dict[str, Any]) -> dict[str, Any]: + result = dict(base) + result["result_type"] = "task" + result["agent_id"] = tr.get("agentId") + result["status"] = tr.get("status") + result["total_duration_ms"] = tr.get("totalDurationMs") + result["total_tokens"] = tr.get("totalTokens") + result["total_tool_use_count"] = tr.get("totalToolUseCount") + return result + + +def _tool_result_pred_task_async(tr: dict[str, Any]) -> bool: + return "agentId" in tr and "isAsync" in tr + + +def _tool_result_build_task_async(tr: dict[str, Any], base: dict[str, Any]) -> dict[str, Any]: + result = dict(base) + result["result_type"] = "task" + result["agent_id"] = tr.get("agentId") + result["status"] = tr.get("status") + result["description"] = tr.get("description") + return result + + +def _tool_result_pred_todo_write(tr: dict[str, Any]) -> bool: + return "newTodos" in tr or "oldTodos" in tr + + +def _tool_result_build_todo_write(tr: dict[str, Any], base: dict[str, Any]) -> dict[str, Any]: + result = dict(base) + new_todos = tr.get("newTodos", []) + result["result_type"] = "todo_write" + result["todo_count"] = len(new_todos) if isinstance(new_todos, list) else 0 + result["todos"] = new_todos if isinstance(new_todos, list) else [] + return result + + +def _tool_result_pred_user_input(tr: dict[str, Any]) -> bool: + return "questions" in tr and "answers" in tr + + +def _tool_result_build_user_input(tr: dict[str, Any], base: dict[str, Any]) -> dict[str, Any]: + result = dict(base) + result["result_type"] = "user_input" + result["questions"] = tr.get("questions", []) + result["answers"] = tr.get("answers", {}) + return result + + +def _tool_result_pred_plan(tr: dict[str, Any]) -> bool: + return "plan" in tr and "filePath" in tr + + +def _tool_result_build_plan(tr: dict[str, Any], base: dict[str, Any]) -> dict[str, Any]: + result = dict(base) + result["result_type"] = "plan" + result["file_path"] = tr.get("filePath", "") + return result + + +# Dispatch registry: **first matching predicate wins** (same as legacy if/elif). +# Order is load-bearing — do not sort alphabetically or “more specific first” +# without replaying tests and real session fixtures. +# +# Notably ``task_message`` is intentionally broad (``task_id`` or ``message``) +# and sits before ``task_retrieval`` / ``task_completed`` / ``task_async`` so +# payloads that include overlapping keys still match the legacy branch order. +# +# To add a shape: append ``(pred, build)`` here, or insert only after verifying +# predicates above would not steal intended matches. +_TOOL_RESULT_DISPATCH = ( + (_tool_result_pred_bash, _tool_result_build_bash), + (_tool_result_pred_file_edit, _tool_result_build_file_edit), + (_tool_result_pred_file_write, _tool_result_build_file_write), + (_tool_result_pred_glob, _tool_result_build_glob), + (_tool_result_pred_grep, _tool_result_build_grep), + (_tool_result_pred_file_read, _tool_result_build_file_read), + (_tool_result_pred_web_search, _tool_result_build_web_search), + (_tool_result_pred_web_fetch, _tool_result_build_web_fetch), + (_tool_result_pred_task_message, _tool_result_build_task_message), + (_tool_result_pred_task_retrieval, _tool_result_build_task_retrieval), + (_tool_result_pred_task_completed, _tool_result_build_task_completed), + (_tool_result_pred_task_async, _tool_result_build_task_async), + (_tool_result_pred_todo_write, _tool_result_build_todo_write), + (_tool_result_pred_user_input, _tool_result_build_user_input), + (_tool_result_pred_plan, _tool_result_build_plan), +) + + +def _parse_tool_result( + tool_result: Any, slug: str | None = None +) -> dict[str, Any] | None: + """Figure out what kind of tool result this is (bash, file edit, glob, etc.) + by looking at which keys are present, since the JSONL doesn't always tag them. + + Classification uses ``_TOOL_RESULT_DISPATCH``: ordered ``(predicate, builder)`` + pairs; the **first** predicate that matches wins (parity with the historical + ``if``/``elif`` chain — order is not strictly “specific before generic”). + + Append a new pair at the end to register a shape, or insert mid-table only + after checking interactions with broader predicates above (see notes on the + tuple).""" + if not isinstance(tool_result, dict): + return None + + base = {"slug": slug} + for pred, build in _TOOL_RESULT_DISPATCH: + if pred(tool_result): + return build(tool_result, base) + + result = dict(base) + result["result_type"] = "unknown" + return result From b0b712b5d56780bf2bba9e9e2efe47bb3a9fb697 Mon Sep 17 00:00:00 2001 From: yu-med Date: Fri, 29 May 2026 03:14:58 +0800 Subject: [PATCH 2/5] =?UTF-8?q?fix(session=5Fpeek):=20scan=20full=20file?= =?UTF-8?q?=20when=20size=20=E2=89=A410KB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- docs/architecture.md | 6 +++--- utils/session_peek.py | 10 +++++++--- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/docs/architecture.md b/docs/architecture.md index 8831461..b82dc2a 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -75,9 +75,9 @@ In `utils/tool_dispatch.py`, tool results are classified through `_parse_tool_re When adding a new tool renderer: -1. Add predicate + builder pair in the dispatch table in the correct order (specific before generic). -2. Add or extend a JSONL fixture under `tests/fixtures/` if needed. -3. Run `pytest tests/test_jsonl_parser.py -v`. +1. Add a `(predicate, builder)` pair to `_TOOL_RESULT_DISPATCH` in `utils/tool_dispatch.py`, preserving existing predicate order unless you also update fixtures and ordering tests (`tests/test_jsonl_parser.py`, `tests/test_real_session_fixtures.py`). Order is **not** “specific before generic” in general — the first match wins. `_tool_result_pred_task_message` is the intentional broad-before-narrow exception (`task_id` or `message` before retrieval/completed/async). +2. Add or extend a JSONL fixture under `tests/fixtures/` (especially for overlaps with existing predicates). +3. Run `pytest tests/test_jsonl_parser.py tests/test_real_session_fixtures.py -v`. ## Export state machine diff --git a/utils/session_peek.py b/utils/session_peek.py index 7cf7c9c..99a1b98 100644 --- a/utils/session_peek.py +++ b/utils/session_peek.py @@ -6,6 +6,9 @@ from models.session import QuickSessionInfoDict from utils.jsonl_helpers import entry_message, extract_text, strip_system_tags +_TAIL_READ_MIN_BYTES = 10 * 1024 +_MAX_HEAD_LINES = 80 + def quick_session_info(filepath: str) -> QuickSessionInfoDict: """Lightweight peek at a session file -- returns title and last_timestamp @@ -17,13 +20,15 @@ def quick_session_info(filepath: str) -> QuickSessionInfoDict: title = None first_ts = None last_ts = None + file_size = os.path.getsize(filepath) # --- Pass 1: read first lines to find the title and first_timestamp --- with open(filepath, "r", encoding="utf-8", errors="replace") as f: lines_read = 0 for line in f: lines_read += 1 - if lines_read > 80: + # Large files use pass-2 tail read for last_timestamp; cap head scan only then. + if file_size > _TAIL_READ_MIN_BYTES and lines_read > _MAX_HEAD_LINES: break line = line.strip() if not line: @@ -49,8 +54,7 @@ def quick_session_info(filepath: str) -> QuickSessionInfoDict: title = first_line # --- Pass 2: read last chunk for the last timestamp --- - file_size = os.path.getsize(filepath) - if file_size > 10000: + if file_size > _TAIL_READ_MIN_BYTES: # Only bother with tail-read for non-tiny files chunk_size = min(file_size, 32768) with open(filepath, "rb") as f: From ddefa46fddfeb7fbe11717e581487cb85f277ff6 Mon Sep 17 00:00:00 2001 From: yu-med Date: Fri, 29 May 2026 03:30:00 +0800 Subject: [PATCH 3/5] refactor(jsonl): address parser-split review nits --- docs/architecture.md | 8 ++++---- utils/jsonl_helpers.py | 9 +++++++-- utils/jsonl_parser.py | 1 + utils/session_peek.py | 6 +++--- utils/tool_dispatch.py | 30 ++++++++++++++++-------------- 5 files changed, 31 insertions(+), 23 deletions(-) diff --git a/docs/architecture.md b/docs/architecture.md index b82dc2a..3463132 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -15,9 +15,9 @@ ▼ ▼ ▼ ┌─────────────────┐ ┌─────────────────────┐ ┌──────────────────┐ │ session_path │ │ jsonl_parser │ │ exclusion_rules │ -│ list_projects │ │ parse_session │ │ load + match │ -│ list_sessions │ │ quick_session_info │ └────────┬─────────┘ -│ safe_join │ │ _parse_tool_result │ │ +│ list_projects │ │ session_peek │ │ load + match │ +│ list_sessions │ │ tool_dispatch │ └────────┬─────────┘ +│ safe_join │ │ jsonl_helpers │ │ └────────┬────────┘ └──────────┬──────────┘ │ │ │ │ └────────────┬───────────┴────────────────────────┘ @@ -48,7 +48,7 @@ | Layer | Responsibility | Key modules | |-------|----------------|-------------| | **Data discovery** | Resolve `~/.claude/projects/`, list projects and sessions, prevent path traversal | `utils/session_path.py` | -| **Parsing** | JSONL → session dict (messages, metadata, tool rendering) | `utils/jsonl_parser.py` | +| **Parsing** | JSONL → session dict (messages, metadata, tool rendering) | `utils/jsonl_parser.py`, `utils/tool_dispatch.py`, `utils/session_peek.py`, `utils/jsonl_helpers.py` | | **Filtering** | Exclude sensitive sessions via rules file | `utils/exclusion_rules.py` | | **Statistics** | Aggregates for API and exporters | `utils/session_stats.py` | | **Export — Markdown** | Session → YAML-frontmatter Markdown | `utils/md_exporter.py` | diff --git a/utils/jsonl_helpers.py b/utils/jsonl_helpers.py index 1e4c2e1..8fcc3ed 100644 --- a/utils/jsonl_helpers.py +++ b/utils/jsonl_helpers.py @@ -51,6 +51,7 @@ def extract_images(content_parts: Any) -> list[dict[str, Any]]: "data": source["data"], }) elif part.get("type") == "tool_result": + # Nested content is usually a block list; string content is not normalized here. nested = part.get("content", []) if isinstance(nested, list): for sub in nested: @@ -64,12 +65,16 @@ def extract_images(content_parts: Any) -> list[dict[str, Any]]: return images +def first_title_line(text: str, max_chars: int = 100) -> str: + """First non-empty line after system-tag strip, truncated for session titles.""" + return strip_system_tags(text).strip().split("\n")[0][:max_chars] + + def infer_title(messages: list[MessageDict]) -> str: """Use the first line of the first real user message as the session title.""" for msg in messages: if msg["role"] == "user" and msg.get("text"): - text = strip_system_tags(msg["text"]).strip() - first_line = text.split("\n")[0][:100] + first_line = first_title_line(msg["text"]) if first_line: return first_line return "Untitled Session" diff --git a/utils/jsonl_parser.py b/utils/jsonl_parser.py index 4d477d6..bfb6088 100644 --- a/utils/jsonl_parser.py +++ b/utils/jsonl_parser.py @@ -24,6 +24,7 @@ "quick_session_info", "_parse_tool_result", "_TOOL_RESULT_DISPATCH", + "_entry_message", "_process_user", "_process_assistant", "_process_system", diff --git a/utils/session_peek.py b/utils/session_peek.py index 99a1b98..e4dbaeb 100644 --- a/utils/session_peek.py +++ b/utils/session_peek.py @@ -4,7 +4,7 @@ import os from models.session import QuickSessionInfoDict -from utils.jsonl_helpers import entry_message, extract_text, strip_system_tags +from utils.jsonl_helpers import entry_message, extract_text, first_title_line _TAIL_READ_MIN_BYTES = 10 * 1024 _MAX_HEAD_LINES = 80 @@ -48,8 +48,7 @@ def quick_session_info(filepath: str) -> QuickSessionInfoDict: msg = entry_message(entry) text = extract_text(msg.get("content", [])) if text: - clean = strip_system_tags(text).strip() - first_line = clean.split("\n")[0][:100] + first_line = first_title_line(text) if first_line: title = first_line @@ -60,6 +59,7 @@ def quick_session_info(filepath: str) -> QuickSessionInfoDict: with open(filepath, "rb") as f: f.seek(file_size - chunk_size) tail = f.read().decode("utf-8", errors="replace") + # First line in tail is often a partial record after seek; json.loads skips it. # Parse lines in reverse to find latest timestamp for line in reversed(tail.splitlines()): line = line.strip() diff --git a/utils/tool_dispatch.py b/utils/tool_dispatch.py index a67752a..2998283 100644 --- a/utils/tool_dispatch.py +++ b/utils/tool_dispatch.py @@ -37,6 +37,7 @@ def _tool_result_pred_file_edit(tr: dict[str, Any]) -> bool: def _tool_result_build_file_edit(tr: dict[str, Any], base: dict[str, Any]) -> dict[str, Any]: + # Summary fields only; full blob (e.g. structuredPatch) stays on message tool_result. result = dict(base) result["result_type"] = "file_edit" result["file_path"] = tr.get("filePath", "") @@ -44,6 +45,17 @@ def _tool_result_build_file_edit(tr: dict[str, Any], base: dict[str, Any]) -> di return result +def _tool_result_pred_plan(tr: dict[str, Any]) -> bool: + return "plan" in tr and "filePath" in tr + + +def _tool_result_build_plan(tr: dict[str, Any], base: dict[str, Any]) -> dict[str, Any]: + result = dict(base) + result["result_type"] = "plan" + result["file_path"] = tr.get("filePath", "") + return result + + def _tool_result_pred_file_write(tr: dict[str, Any]) -> bool: return "filePath" in tr and "content" in tr @@ -82,7 +94,7 @@ def _tool_result_build_grep(tr: dict[str, Any], base: dict[str, Any]) -> dict[st result["num_lines"] = tr.get("numLines", 0) result["duration_ms"] = tr.get("durationMs") content = tr.get("content", "") - if content and isinstance(content, str): + if isinstance(content, str): result["content"] = content return result @@ -98,7 +110,7 @@ def _tool_result_build_file_read(tr: dict[str, Any], base: dict[str, Any]) -> di result["file_path"] = file_obj.get("filePath", "") result["num_lines"] = file_obj.get("numLines") content = file_obj.get("content", "") - if content and isinstance(content, str): + if isinstance(content, str): result["content"] = content return result @@ -217,17 +229,6 @@ def _tool_result_build_user_input(tr: dict[str, Any], base: dict[str, Any]) -> d return result -def _tool_result_pred_plan(tr: dict[str, Any]) -> bool: - return "plan" in tr and "filePath" in tr - - -def _tool_result_build_plan(tr: dict[str, Any], base: dict[str, Any]) -> dict[str, Any]: - result = dict(base) - result["result_type"] = "plan" - result["file_path"] = tr.get("filePath", "") - return result - - # Dispatch registry: **first matching predicate wins** (same as legacy if/elif). # Order is load-bearing — do not sort alphabetically or “more specific first” # without replaying tests and real session fixtures. @@ -241,6 +242,8 @@ def _tool_result_build_plan(tr: dict[str, Any], base: dict[str, Any]) -> dict[st _TOOL_RESULT_DISPATCH = ( (_tool_result_pred_bash, _tool_result_build_bash), (_tool_result_pred_file_edit, _tool_result_build_file_edit), + # plan before file_write: plan blobs may also carry filePath + content + (_tool_result_pred_plan, _tool_result_build_plan), (_tool_result_pred_file_write, _tool_result_build_file_write), (_tool_result_pred_glob, _tool_result_build_glob), (_tool_result_pred_grep, _tool_result_build_grep), @@ -253,7 +256,6 @@ def _tool_result_build_plan(tr: dict[str, Any], base: dict[str, Any]) -> dict[st (_tool_result_pred_task_async, _tool_result_build_task_async), (_tool_result_pred_todo_write, _tool_result_build_todo_write), (_tool_result_pred_user_input, _tool_result_build_user_input), - (_tool_result_pred_plan, _tool_result_build_plan), ) From 942fb0a4ca7c7847f58cf99a7f0fcfc99f0cfacf Mon Sep 17 00:00:00 2001 From: yu-med Date: Fri, 29 May 2026 03:32:49 +0800 Subject: [PATCH 4/5] fix(jsonl): address parser-split review follow-ups --- tests/test_jsonl_parser.py | 10 ++++++++++ utils/jsonl_parser.py | 9 ++++++++- utils/session_peek.py | 4 ++-- utils/tool_dispatch.py | 13 ++----------- 4 files changed, 22 insertions(+), 14 deletions(-) diff --git a/tests/test_jsonl_parser.py b/tests/test_jsonl_parser.py index fea0a6e..8accf2f 100644 --- a/tests/test_jsonl_parser.py +++ b/tests/test_jsonl_parser.py @@ -234,6 +234,16 @@ def test_plan_result(self): r = _parse_tool_result({"plan": [], "filePath": "/plan.md"}) assert r["result_type"] == "plan" + def test_plan_with_content_not_classified_as_file_write(self): + """plan is registered before file_write in _TOOL_RESULT_DISPATCH.""" + r = _parse_tool_result({ + "plan": [], + "filePath": "/plan.md", + "content": "plan body", + }) + assert r["result_type"] == "plan" + assert r["file_path"] == "/plan.md" + def test_unknown_fallback(self): r = _parse_tool_result({"unexpected": True}) assert r["result_type"] == "unknown" diff --git a/utils/jsonl_parser.py b/utils/jsonl_parser.py index bfb6088..dd27798 100644 --- a/utils/jsonl_parser.py +++ b/utils/jsonl_parser.py @@ -15,7 +15,6 @@ normalize_content as _normalize_content, strip_system_tags as _strip_system_tags, ) -from utils.session_peek import quick_session_info from utils.tool_dispatch import _TOOL_RESULT_DISPATCH, _parse_tool_result from utils.validation import validate_session_dict @@ -38,6 +37,14 @@ ] +def __getattr__(name: str) -> Any: + if name == "quick_session_info": + from utils.session_peek import quick_session_info + + return quick_session_info + raise AttributeError(f"module {__name__!r} has no attribute {name!r}") + + def parse_session(filepath: str) -> SessionDict: """Main entry point. Reads every line from a .jsonl file and builds up a session dict with messages, metadata (tokens, models, tool counts), diff --git a/utils/session_peek.py b/utils/session_peek.py index e4dbaeb..4e81855 100644 --- a/utils/session_peek.py +++ b/utils/session_peek.py @@ -15,8 +15,8 @@ def quick_session_info(filepath: str) -> QuickSessionInfoDict: without fully parsing all messages. Much faster than parse_session() for large files. - Strategy: read the first ~50 lines for the title, then seek to the end of - the file and read the last chunk to find the last timestamp.""" + Strategy: files over 10 KiB cap the head scan at 80 lines for title, then + tail-read for last_timestamp; smaller files are scanned fully in pass 1.""" title = None first_ts = None last_ts = None diff --git a/utils/tool_dispatch.py b/utils/tool_dispatch.py index 2998283..bd42578 100644 --- a/utils/tool_dispatch.py +++ b/utils/tool_dispatch.py @@ -229,20 +229,11 @@ def _tool_result_build_user_input(tr: dict[str, Any], base: dict[str, Any]) -> d return result -# Dispatch registry: **first matching predicate wins** (same as legacy if/elif). -# Order is load-bearing — do not sort alphabetically or “more specific first” -# without replaying tests and real session fixtures. -# -# Notably ``task_message`` is intentionally broad (``task_id`` or ``message``) -# and sits before ``task_retrieval`` / ``task_completed`` / ``task_async`` so -# payloads that include overlapping keys still match the legacy branch order. -# -# To add a shape: append ``(pred, build)`` here, or insert only after verifying -# predicates above would not steal intended matches. +# Registry order is load-bearing (see module docstring). +# ``plan`` before ``file_write``: plan blobs may carry ``filePath`` + ``content``. _TOOL_RESULT_DISPATCH = ( (_tool_result_pred_bash, _tool_result_build_bash), (_tool_result_pred_file_edit, _tool_result_build_file_edit), - # plan before file_write: plan blobs may also carry filePath + content (_tool_result_pred_plan, _tool_result_build_plan), (_tool_result_pred_file_write, _tool_result_build_file_write), (_tool_result_pred_glob, _tool_result_build_glob), From e7d7dfc17dbb2a526bca6519a77060edfb11298c Mon Sep 17 00:00:00 2001 From: yu-med Date: Fri, 29 May 2026 04:58:46 +0800 Subject: [PATCH 5/5] fix(session_peek): restore quick_session_info parity with monolith --- utils/jsonl_parser.py | 9 +-------- utils/session_peek.py | 9 ++++----- 2 files changed, 5 insertions(+), 13 deletions(-) diff --git a/utils/jsonl_parser.py b/utils/jsonl_parser.py index dd27798..bfb6088 100644 --- a/utils/jsonl_parser.py +++ b/utils/jsonl_parser.py @@ -15,6 +15,7 @@ normalize_content as _normalize_content, strip_system_tags as _strip_system_tags, ) +from utils.session_peek import quick_session_info from utils.tool_dispatch import _TOOL_RESULT_DISPATCH, _parse_tool_result from utils.validation import validate_session_dict @@ -37,14 +38,6 @@ ] -def __getattr__(name: str) -> Any: - if name == "quick_session_info": - from utils.session_peek import quick_session_info - - return quick_session_info - raise AttributeError(f"module {__name__!r} has no attribute {name!r}") - - def parse_session(filepath: str) -> SessionDict: """Main entry point. Reads every line from a .jsonl file and builds up a session dict with messages, metadata (tokens, models, tool counts), diff --git a/utils/session_peek.py b/utils/session_peek.py index 4e81855..afa9b11 100644 --- a/utils/session_peek.py +++ b/utils/session_peek.py @@ -6,7 +6,7 @@ from models.session import QuickSessionInfoDict from utils.jsonl_helpers import entry_message, extract_text, first_title_line -_TAIL_READ_MIN_BYTES = 10 * 1024 +_TAIL_READ_MIN_BYTES = 10000 _MAX_HEAD_LINES = 80 @@ -15,8 +15,8 @@ def quick_session_info(filepath: str) -> QuickSessionInfoDict: without fully parsing all messages. Much faster than parse_session() for large files. - Strategy: files over 10 KiB cap the head scan at 80 lines for title, then - tail-read for last_timestamp; smaller files are scanned fully in pass 1.""" + Strategy: read at most the first 80 lines for title, then tail-read the end + of files larger than 10_000 bytes for last_timestamp.""" title = None first_ts = None last_ts = None @@ -27,8 +27,7 @@ def quick_session_info(filepath: str) -> QuickSessionInfoDict: lines_read = 0 for line in f: lines_read += 1 - # Large files use pass-2 tail read for last_timestamp; cap head scan only then. - if file_size > _TAIL_READ_MIN_BYTES and lines_read > _MAX_HEAD_LINES: + if lines_read > _MAX_HEAD_LINES: break line = line.strip() if not line: