diff --git a/dotscope/cli/__init__.py b/dotscope/cli/__init__.py index 8a66edd..e703726 100644 --- a/dotscope/cli/__init__.py +++ b/dotscope/cli/__init__.py @@ -3,6 +3,7 @@ from .ingest import _cmd_ingest, _cmd_impact, _cmd_backtest, _cmd_conventions, _cmd_diff, _cmd_bootstrap from .hooks import _cmd_observe, _cmd_incremental, _cmd_hook, _cmd_refresh, _cmd_check, _cmd_check_backtest, _cmd_voice from .serve import _cmd_serve +from .ops import _cmd_ops from .trial import _cmd_trial from .cut_score import _cmd_cut_score from .orchestrator import _cmd_orchestrator @@ -16,6 +17,44 @@ import sys +_REFRESH_ACTIONS = {"scope", "repo", "enqueue", "run", "status", "recover"} + + +def _normalize_legacy_refresh_args(args_list): + """Rewrite legacy `dotscope refresh ` into the v1 grammar.""" + args_list = list(args_list) + try: + idx = args_list.index("refresh") + except ValueError: + return args_list + + tail = args_list[idx + 1:] + if not tail: + args_list[idx + 1:idx + 1] = ["--legacy-refresh-syntax", "repo"] + return args_list + + first = tail[0] + if first in _REFRESH_ACTIONS: + return args_list + + if first == "--repo": + del args_list[idx + 1] + args_list[idx + 1:idx + 1] = ["--legacy-refresh-syntax", "repo"] + return args_list + + if first.startswith("-"): + if "--repo" in tail: + repo_idx = idx + 1 + tail.index("--repo") + del args_list[repo_idx] + args_list[idx + 1:idx + 1] = ["--legacy-refresh-syntax", "repo"] + elif "--async" in tail: + args_list[idx + 1:idx + 1] = ["--legacy-refresh-syntax", "repo"] + return args_list + + args_list[idx + 1:idx + 1] = ["--legacy-refresh-syntax", "scope"] + return args_list + + def _safe_print(text, **kwargs): """Print with ASCII fallback for Windows cp1252 terminals.""" try: @@ -29,7 +68,7 @@ def main(argv=None): consume_decode_warnings() # Intercept help before argparse touches it - args_list = argv if argv is not None else sys.argv[1:] + args_list = _normalize_legacy_refresh_args(argv if argv is not None else sys.argv[1:]) if not args_list or args_list == ["help"] or args_list == ["--help"] or args_list == ["-h"]: from ..ux.help import print_help print_help() @@ -49,6 +88,15 @@ def main(argv=None): ) parser.add_argument("--version", action="version", version=f"%(prog)s {_version()}") parser.add_argument("-h", "--help", action="store_true", dest="show_help") + parser.add_argument("--root", default=None, help="Repository root path") + parser.add_argument( + "--format", + dest="output_format", + choices=["human", "json", "legacy-json"], + default="human", + help="Output format for control-plane commands", + ) + parser.add_argument("--json", dest="output_format", action="store_const", const="json") sub = parser.add_subparsers(dest="command") @@ -140,11 +188,18 @@ def main(argv=None): hook_sub.add_parser("claude", help="Install Claude Code pre-commit enforcement") # --- refresh --- - p_refresh = sub.add_parser("refresh", help="Refresh scopes (synchronous by default)") - p_refresh.add_argument("scopes", nargs="*", help="Scope names to refresh (omit for full repo)") - p_refresh.add_argument("--repo", action="store_true", help="Force full repo refresh") - p_refresh.add_argument("--async", dest="run_async", action="store_true", help="Queue and return (legacy async mode)") + p_refresh = sub.add_parser("refresh", help="Refresh scopes and runtime state") + p_refresh.add_argument( + "--legacy-refresh-syntax", + action="store_true", + help=argparse.SUPPRESS, + ) refresh_sub = p_refresh.add_subparsers(dest="refresh_action") + p_refresh_scope = refresh_sub.add_parser("scope", help="Refresh specific scopes") + p_refresh_scope.add_argument("scopes", nargs="+", help="Scope names to refresh") + p_refresh_scope.add_argument("--async", dest="run_async", action="store_true", help="Queue and return") + p_refresh_repo = refresh_sub.add_parser("repo", help="Refresh the entire repo") + p_refresh_repo.add_argument("--async", dest="run_async", action="store_true", help="Queue and return") p_refresh_enqueue = refresh_sub.add_parser("enqueue", help="Queue runtime refresh work") p_refresh_enqueue.add_argument("scopes", nargs="*", help="Scope names to refresh") p_refresh_enqueue.add_argument("--commit", default=None, help="Classify a commit into refresh work") @@ -152,7 +207,55 @@ def main(argv=None): p_refresh_enqueue.add_argument("--reason", default="", help="Reason stored with the queued job") p_refresh_run = refresh_sub.add_parser("run", help="Run queued refresh work") p_refresh_run.add_argument("--drain", action="store_true", help="Drain the entire queue") - refresh_sub.add_parser("status", help="Show refresh worker and queue status") + p_refresh_status = refresh_sub.add_parser("status", help="Show refresh worker and queue status") + p_refresh_status.add_argument( + "--json", + dest="output_format", + action="store_const", + const="json", + default=argparse.SUPPRESS, + ) + p_refresh_status.add_argument( + "--format", + dest="output_format", + choices=["human", "json", "legacy-json"], + default=argparse.SUPPRESS, + ) + p_refresh_recover = refresh_sub.add_parser("recover", help="Recover stale refresh locks and non-terminal jobs") + p_refresh_recover.add_argument("--dry-run", action="store_true", help="Plan recovery without mutating state") + p_refresh_recover.add_argument( + "--json", + dest="output_format", + action="store_const", + const="json", + default=argparse.SUPPRESS, + ) + p_refresh_recover.add_argument( + "--format", + dest="output_format", + choices=["human", "json", "legacy-json"], + default=argparse.SUPPRESS, + ) + + # --- ops --- + p_ops = sub.add_parser("ops", help="Operator status and recovery checks") + ops_sub = p_ops.add_subparsers(dest="ops_action") + p_ops_status = ops_sub.add_parser("status", help="Show production health status") + p_ops_status.add_argument( + "--json", + dest="output_format", + action="store_const", + const="json", + default=argparse.SUPPRESS, + help="Output as JSON", + ) + p_ops_status.add_argument( + "--format", + dest="output_format", + choices=["human", "json", "legacy-json"], + default=argparse.SUPPRESS, + help="Output format", + ) # --- utility --- p_utility = sub.add_parser("utility", help="Show utility scores for a scope") @@ -401,7 +504,21 @@ def main(argv=None): print_help() return + previous_dotscope_root = os.environ.get("DOTSCOPE_ROOT") + root_env_overridden = False + try: + if getattr(args, "root", None): + from ..models.control_plane import canonical_root + from ..paths.repo import find_repo_root + + resolved_root = find_repo_root(args.root) + if resolved_root is None: + raise ValueError("Could not find repository root") + args.root = canonical_root(resolved_root) + os.environ["DOTSCOPE_ROOT"] = args.root + root_env_overridden = True + handler = { "resolve": _cmd_resolve, "context": _cmd_context, @@ -420,6 +537,7 @@ def main(argv=None): "incremental": _cmd_incremental, "hook": _cmd_hook, "refresh": _cmd_refresh, + "ops": _cmd_ops, "utility": _cmd_utility, "virtual": _cmd_virtual, "lessons": _cmd_lessons, @@ -444,6 +562,11 @@ def main(argv=None): print(f"Error: {e}", file=sys.stderr) sys.exit(1) finally: + if root_env_overridden: + if previous_dotscope_root is None: + os.environ.pop("DOTSCOPE_ROOT", None) + else: + os.environ["DOTSCOPE_ROOT"] = previous_dotscope_root warnings = consume_decode_warnings() if warnings: count = len(warnings) diff --git a/dotscope/cli/__main__.py b/dotscope/cli/__main__.py new file mode 100644 index 0000000..473d203 --- /dev/null +++ b/dotscope/cli/__main__.py @@ -0,0 +1,7 @@ +"""Allow python -m dotscope.cli.""" + +from . import main + + +if __name__ == "__main__": + main() diff --git a/dotscope/cli/control_plane.py b/dotscope/cli/control_plane.py new file mode 100644 index 0000000..10453bc --- /dev/null +++ b/dotscope/cli/control_plane.py @@ -0,0 +1,49 @@ +"""CLI helpers for rendering control-plane operation results.""" + +from __future__ import annotations + +import json +import sys +from typing import Callable, Optional + +from ..models.control_plane import OperationResult +from ..paths.repo import find_repo_root + + +def cli_output_format(args) -> str: + fmt = getattr(args, "output_format", None) + if fmt: + return fmt + if getattr(args, "json_output", False): + return "json" + return "human" + + +def cli_root(args) -> str: + root_hint = getattr(args, "root", None) + root = find_repo_root(root_hint) if root_hint else find_repo_root() + if root is None: + raise ValueError("Could not find repository root") + from ..models.control_plane import canonical_root + + return canonical_root(root) or root + + +def emit_operation_result( + args, + result: OperationResult, + *, + render_human: Callable[[OperationResult], None], + legacy_payload: Optional[Callable[[OperationResult], dict]] = None, +) -> None: + fmt = cli_output_format(args) + if fmt == "json": + print(result.to_json(include_root=True)) + sys.exit(result.exit_code) + if fmt == "legacy-json": + payload = legacy_payload(result) if legacy_payload else result.data + print(json.dumps(payload, indent=2)) + sys.exit(result.exit_code) + + render_human(result) + sys.exit(result.exit_code) diff --git a/dotscope/cli/hooks.py b/dotscope/cli/hooks.py index e456907..85f8b56 100644 --- a/dotscope/cli/hooks.py +++ b/dotscope/cli/hooks.py @@ -178,20 +178,27 @@ def _cmd_hook(args): print("Usage: dotscope hook {install|uninstall|status|claude}") def _cmd_refresh(args): - from ..paths.repo import find_repo_root + import sys + from .control_plane import cli_root, emit_operation_result from ..workflows.refresh import ( enqueue_commit_refresh, enqueue_repo_refresh, enqueue_scope_refresh, kick_refresh_worker, - refresh_status_summary, + refresh_status_result, + recover_refresh_state_result, run_refresh_inline, run_refresh_queue, ) - root = find_repo_root() - if root is None: - raise ValueError("Could not find repository root") + root = cli_root(args) + + if getattr(args, "legacy_refresh_syntax", False): + print( + "dotscope: legacy `dotscope refresh ` syntax is deprecated; " + "use `dotscope refresh scope ` or `dotscope refresh repo`.", + file=sys.stderr, + ) if args.refresh_action == "enqueue": job = None @@ -217,45 +224,85 @@ def _cmd_refresh(args): return if args.refresh_action == "status": - status = refresh_status_summary(root) - current_targets = ", ".join(status.get("current_targets", [])) or "-" - print(f"running: {status.get('running')}") - print(f"current_job: {status.get('current_job') or '-'}") - print(f"current_targets: {current_targets}") - print(f"queued_job_count: {status.get('queued_job_count', 0)}") - if status.get("last_success_at"): - print(f"last_success_at: {status['last_success_at']}") - if status.get("last_error"): - print(f"last_error: {status['last_error']}") + result = refresh_status_result(root) + + def _render_human(op): + status = op.data.get("refresh", {}) + current_targets = ", ".join(status.get("current_targets", [])) or "-" + print(f"running: {status.get('running')}") + print(f"status: {op.status}") + print(f"current_job: {status.get('current_job') or '-'}") + print(f"current_targets: {current_targets}") + print(f"queued_job_count: {status.get('queued_job_count', 0)}") + if status.get("last_success_at"): + print(f"last_success_at: {status['last_success_at']}") + if status.get("last_error"): + print(f"last_error: {status['last_error']}") + + emit_operation_result( + args, + result, + render_human=_render_human, + legacy_payload=lambda op: op.data.get("refresh", {}), + ) + return + + if args.refresh_action == "recover": + result = recover_refresh_state_result(root, dry_run=getattr(args, "dry_run", False)) + + def _render_human(op): + report = op.data.get("recovery", {}) + actions = report.get("actions", []) + if actions: + label = "Planned refresh recovery:" if report.get("dry_run") else "Recovered refresh state:" + print(label) + for action in actions: + print(f" - {action}") + else: + print("Refresh state already recoverable; no changes needed.") + if report.get("needs_full_rebuild"): + print("Cache integrity is degraded; run `dotscope refresh repo` or `dotscope rebuild`.") + + emit_operation_result( + args, + result, + render_human=_render_human, + legacy_payload=lambda op: op.data.get("recovery", {}), + ) return - # Default: synchronous refresh (no sub-action) - scopes = getattr(args, "scopes", []) or [] - is_repo = getattr(args, "repo", False) - run_async = getattr(args, "run_async", False) + if args.refresh_action in {"scope", "repo"}: + scopes = getattr(args, "scopes", []) or [] + is_repo = args.refresh_action == "repo" + run_async = getattr(args, "run_async", False) - if run_async: - if is_repo or not scopes: - enqueue_repo_refresh(root, reason="cli-async") + if run_async: + if is_repo: + enqueue_repo_refresh(root, reason="cli-async") + else: + enqueue_scope_refresh(root, scopes, reason="cli-async") + kick_refresh_worker(root) + targets_label = ", ".join(scopes) if scopes else "repo" + print(f"Queued refresh for {targets_label} (async).") + return + + result = run_refresh_inline( + root, + targets=scopes if not is_repo else None, + repo=is_repo, + ) + + targets_label = ", ".join(result.get("targets_refreshed", [])) or "repo" + if result.get("success"): + print(f"Refreshed {targets_label} in {result['duration_ms']}ms.") else: - enqueue_scope_refresh(root, scopes, reason="cli-async") - kick_refresh_worker(root) - targets_label = ", ".join(scopes) if scopes else "repo" - print(f"Queued refresh for {targets_label} (async).") + error = result.get("error", "unknown error") + print(f"Refresh failed for {targets_label}: {error}") return - result = run_refresh_inline( - root, - targets=scopes if scopes else None, - repo=is_repo, - ) - - targets_label = ", ".join(result.get("targets_refreshed", [])) or "repo" - if result.get("success"): - print(f"Refreshed {targets_label} in {result['duration_ms']}ms.") - else: - error = result.get("error", "unknown error") - print(f"Refresh failed for {targets_label}: {error}") + if args.refresh_action is None: + print("Usage: dotscope refresh {scope|repo|enqueue|run|status|recover}") + return def _cmd_check(args): import json as json_mod @@ -488,4 +535,4 @@ def _cmd_voice(args): print("Stats:") for key, val in stats.items(): if val is not None: - print(f" {key}: {val}") \ No newline at end of file + print(f" {key}: {val}") diff --git a/dotscope/cli/ops.py b/dotscope/cli/ops.py new file mode 100644 index 0000000..4ec098e --- /dev/null +++ b/dotscope/cli/ops.py @@ -0,0 +1,39 @@ +"""CLI operator commands.""" + +from __future__ import annotations + + +def _cmd_ops(args): + from .control_plane import cli_root, emit_operation_result + from ..models.control_plane import legacy_status_dict + from ..workflows.ops import ops_status_result + + if args.ops_action != "status": + print("Usage: dotscope ops status") + return + + root = cli_root(args) + result = ops_status_result(root) + + def _render_human(op): + data = op.data + issue_codes = [issue.code for issue in op.issues] + print(f"healthy: {op.ok}") + print(f"status: {op.status}") + print(f"issues: {', '.join(issue_codes) or '-'}") + refresh = data.get("refresh", {}) + print(f"refresh_running: {refresh.get('running')}") + print(f"refresh_lock_stale: {refresh.get('lock_stale')}") + print(f"queued_job_count: {refresh.get('queued_job_count', 0)}") + cache = data.get("cache", {}) + print(f"cache_state: {cache.get('state')}") + print(f"cache_generation: {cache.get('generation_id')}") + if refresh.get("last_error"): + print(f"last_refresh_error: {refresh.get('last_error')}") + + emit_operation_result( + args, + result, + render_human=_render_human, + legacy_payload=legacy_status_dict, + ) diff --git a/dotscope/engine/runtime_overlay.py b/dotscope/engine/runtime_overlay.py index 4370699..bd440f0 100644 --- a/dotscope/engine/runtime_overlay.py +++ b/dotscope/engine/runtime_overlay.py @@ -8,20 +8,74 @@ import os import posixpath import shutil +import time +import uuid from typing import Iterable, List, Optional, Tuple from ..models import ScopeConfig, ScopeEntry, ScopesIndex from ..paths import normalize_relative_path, normalize_scope_ref, scope_storage_key +from ..storage.atomic import atomic_write_json, atomic_write_text _RUNTIME_SCOPES_DIR = os.path.join(".dotscope", "runtime_scopes") +_RUNTIME_GENERATIONS_DIR = os.path.join(".dotscope", "runtime_scope_generations") +_RUNTIME_ACTIVE_FILE = os.path.join(".dotscope", "runtime_scopes.active.json") def runtime_scopes_root(root: str) -> str: """Return the runtime overlay root for a repository.""" + active = _load_active_runtime_generation(root) + if active: + path = os.path.join(root, _RUNTIME_GENERATIONS_DIR, active) + if os.path.isdir(path): + return path + return _runtime_legacy_root(root) + + +def _runtime_legacy_root(root: str) -> str: return os.path.join(root, _RUNTIME_SCOPES_DIR) +def _runtime_generations_root(root: str) -> str: + return os.path.join(root, _RUNTIME_GENERATIONS_DIR) + + +def _runtime_active_pointer_path(root: str) -> str: + return os.path.join(root, _RUNTIME_ACTIVE_FILE) + + +def _load_active_runtime_generation(root: str) -> Optional[str]: + path = _runtime_active_pointer_path(root) + if not os.path.isfile(path): + return None + try: + import json + + data = json.loads(open(path, "r", encoding="utf-8").read()) + except (OSError, ValueError): + return None + generation_id = str(data.get("generation_id", "")) + if not generation_id or os.path.sep in generation_id or "/" in generation_id: + return None + return generation_id + + +def _new_runtime_generation_root(root: str) -> Tuple[str, str]: + generation_id = f"{int(time.time())}-{uuid.uuid4().hex[:12]}" + path = os.path.join(_runtime_generations_root(root), generation_id) + os.makedirs(path, exist_ok=False) + return generation_id, path + + +def _activate_runtime_generation(root: str, generation_id: str) -> None: + pointer = { + "schema": 1, + "generation_id": generation_id, + "activated_at": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), + } + atomic_write_json(_runtime_active_pointer_path(root), pointer) + + def runtime_index_path(root: str) -> str: """Return the runtime overlay index path.""" return os.path.join(runtime_scopes_root(root), ".scopes") @@ -91,8 +145,7 @@ def save_runtime_index(root: str, index: ScopesIndex) -> None: path = runtime_index_path(root) os.makedirs(os.path.dirname(path), exist_ok=True) - with open(path, "w", encoding="utf-8") as f: - f.write(_serialize_index(index)) + atomic_write_text(path, _serialize_index(index)) def load_effective_index(root: str) -> Optional[ScopesIndex]: @@ -313,8 +366,7 @@ def write_runtime_scope( logical = scope_storage_key(config.path, root=root) or logical_scope_path(config.path, root=root) destination = runtime_scope_path(root, logical) os.makedirs(os.path.dirname(destination), exist_ok=True) - with open(destination, "w", encoding="utf-8") as f: - f.write(serialize_scope(config)) + atomic_write_text(destination, serialize_scope(config)) tracked_index = load_effective_index(root) runtime_index = load_runtime_index(root) @@ -354,22 +406,20 @@ def sync_runtime_overlay(root: str) -> None: """Replace the runtime overlay with the current tracked scope snapshot.""" from ..engine.discovery import find_all_scopes - runtime_root = runtime_scopes_root(root) - if os.path.isdir(runtime_root): - shutil.rmtree(runtime_root) - os.makedirs(runtime_root, exist_ok=True) + generation_id, runtime_root = _new_runtime_generation_root(root) for tracked in find_all_scopes(root): logical = scope_storage_key(tracked, root=root) if not logical: continue - destination = runtime_scope_path(root, logical) + destination = os.path.join(runtime_root, logical.replace("/", os.sep)) os.makedirs(os.path.dirname(destination), exist_ok=True) shutil.copyfile(tracked, destination) tracked_index = os.path.join(root, ".scopes") if os.path.isfile(tracked_index): - shutil.copyfile(tracked_index, runtime_index_path(root)) + shutil.copyfile(tracked_index, os.path.join(runtime_root, ".scopes")) + _activate_runtime_generation(root, generation_id) def replace_runtime_overlay( @@ -378,20 +428,16 @@ def replace_runtime_overlay( index: Optional[ScopesIndex] = None, ) -> None: """Replace the runtime overlay with generated scope configs.""" - runtime_root = runtime_scopes_root(root) - if os.path.isdir(runtime_root): - shutil.rmtree(runtime_root) - os.makedirs(runtime_root, exist_ok=True) + generation_id, runtime_root = _new_runtime_generation_root(root) entries = {} for config in scopes: logical = scope_storage_key(config.path, root=root) or logical_scope_path(config.path, root=root) - destination = runtime_scope_path(root, logical) + destination = os.path.join(runtime_root, logical.replace("/", os.sep)) os.makedirs(os.path.dirname(destination), exist_ok=True) from ..engine.parser import serialize_scope - with open(destination, "w", encoding="utf-8") as f: - f.write(serialize_scope(config)) + atomic_write_text(destination, serialize_scope(config)) name = scope_name_from_logical_path(logical) entries[name] = _build_scope_entry(name, logical, config) @@ -412,4 +458,8 @@ def replace_runtime_overlay( total_repo_tokens=index.total_repo_tokens, ) - save_runtime_index(root, index) + index_path = os.path.join(runtime_root, ".scopes") + from ..workflows.ingest import _serialize_index + + atomic_write_text(index_path, _serialize_index(index)) + _activate_runtime_generation(root, generation_id) diff --git a/dotscope/engine/search.py b/dotscope/engine/search.py index 4d236f9..3b9db99 100644 --- a/dotscope/engine/search.py +++ b/dotscope/engine/search.py @@ -1,149 +1,368 @@ +"""Bounded semantic search with topology enrichment.""" + +from __future__ import annotations + import json import mmap import os +import re import struct import subprocess -from typing import Dict, Any, List +import time from collections import defaultdict +from typing import Dict, List -def execute_semantic_search(root: str, query: str) -> str: - """The two-step Semantic Intercept pipeline: fast Git grep + zero-copy topology cast.""" - - # STEP 1: The Fast Grep Pass via git grep - try: - cmd = [ - "git", "--no-pager", "grep", - "-I", # Ignore binary - "-E", # Extended Regex - "-n", # Line Numbers - "--heading", # Group by file - "--break", # Blank lines between files - query - ] - # Ignore case by default to maximize recall for the agent - cmd.insert(3, "-i") - - result = subprocess.run(cmd, cwd=root, capture_output=True, text=True, timeout=15) - # 1 means no match, 0 means match. >1 is error - if result.returncode > 1: - return json.dumps({"error": f"Git grep crashed: {result.stderr}"}) - if not result.stdout.strip(): - return json.dumps({"results": [], "query": query, "message": "No semantic matches found."}) - - except Exception as e: - return json.dumps({"error": f"Search Exception: {str(e)}"}) - - raw_output = result.stdout.splitlines() - - # Parser state machine for --heading format - parsed_files = [] - current_file = None - current_snippets = [] - - for line in raw_output: - if not line: - if current_file and current_snippets: - parsed_files.append({"file": current_file, "snippets": current_snippets}) - current_file = None - current_snippets = [] - elif current_file is None: - # It's a heading - if os.path.exists(os.path.join(root, line.strip())): - current_file = line.strip() - else: - # It's a line match: "14: fn test()" - current_snippets.append(line.strip()[:200]) # Cap snippet length safely - - if current_file and current_snippets: - parsed_files.append({"file": current_file, "snippets": current_snippets}) - - # Limit to top 15 results mapped - parsed_files = parsed_files[:15] +from ..models.control_plane import OperationResult, make_issue, make_result + + +MAX_SEARCH_BYTES = int(os.environ.get("DOTSCOPE_SEARCH_MAX_BYTES", "1000000")) +MAX_SEARCH_FILES = int(os.environ.get("DOTSCOPE_SEARCH_MAX_FILES", "15")) +MAX_SEARCH_SNIPPETS = int(os.environ.get("DOTSCOPE_SEARCH_MAX_SNIPPETS", "200")) +MAX_SEARCH_SECONDS = float(os.environ.get("DOTSCOPE_SEARCH_TIMEOUT_SECONDS", "15")) + + +def execute_search_result( + root: str, + query: str, + *, + mode: str = "literal", + request_id: str = "", +) -> OperationResult: + """Run bounded git-grep search and return a v1 control-plane result.""" + started = time.perf_counter() + mode = mode or "literal" + guard = _validate_query(query, mode) + limits = _limits() + if guard: + return make_result( + root=root, + data=_search_data(query, mode, [], _meta(False, "", [], 0), [], limits, error=guard), + issues=[make_issue("invalid_query", message=guard)], + request_id=request_id, + ) + + parsed_files, meta, grep_error = _stream_git_grep(root, query, mode=mode) + meta["elapsed_ms"] = round((time.perf_counter() - started) * 1000, 2) + + if grep_error: + return make_result( + root=root, + data=_search_data(query, mode, [], meta, [], limits, error=grep_error), + issues=[make_issue("operation_failed", message=grep_error)], + request_id=request_id, + ) if not parsed_files: - return json.dumps({"results": [], "query": query}) + issues = _search_issues(meta) + return make_result( + root=root, + data=_search_data( + query, + mode, + [], + meta, + [], + limits, + message="No semantic matches found.", + ), + issues=issues, + metrics=_search_metrics(meta), + request_id=request_id, + ) + + gravity_map, dep_map, degraded = _load_topology(root) - # STEP 2: The Topology Matrix Enrichment - # We are under the MW lock - so we can read safely! + final_output = [] + for p in parsed_files: + file = p["file"] + lookup_name = file.replace("\\", "/") + final_output.append({ + "file": file, + "architectural_gravity": gravity_map.get(lookup_name, "UNKNOWN (Not in pure graph)"), + "structural_dependencies": dep_map.get(lookup_name, [])[:10], + "snippets": p["snippets"], + }) + + return make_result( + root=root, + data=_search_data(query, mode, final_output, meta, degraded, limits), + issues=_search_issues(meta), + metrics=_search_metrics(meta), + request_id=request_id, + ) + + +def execute_semantic_search(root: str, query: str, mode: str = "literal") -> str: + """Compatibility wrapper returning the legacy search JSON object string.""" + result = execute_search_result(root, query, mode=mode) + return json.dumps(_legacy_search_payload(result), indent=2) + + +def _validate_query(query: str, mode: str) -> str: + if query is None: + return "Query must not be null." + if len(query.strip()) < 2: + return "Query must contain at least 2 non-whitespace characters." + if mode not in {"literal", "regex"}: + return "Search mode must be 'literal' or 'regex'." + if mode == "regex": + try: + re.compile(query) + except re.error as exc: + return f"Invalid regular expression: {exc}" + return "" + + +def _stream_git_grep(root: str, query: str, *, mode: str): + matcher = "-F" if mode == "literal" else "-E" + cmd = [ + "git", "--no-pager", "grep", + "-i", + "-I", + matcher, + "-n", + "--heading", + "--break", + query, + ] + proc = None + parsed_files: List[Dict[str, object]] = [] + current_file = None + current_snippets: List[str] = [] + bytes_read = 0 + snippet_count = 0 + truncated = False + limit_reason = "" + deadline = time.monotonic() + MAX_SEARCH_SECONDS + + try: + proc = subprocess.Popen( + cmd, + cwd=root, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + encoding="utf-8", + errors="replace", + ) + assert proc.stdout is not None + for line in proc.stdout: + bytes_read += len(line.encode("utf-8", errors="replace")) + if bytes_read > MAX_SEARCH_BYTES: + truncated = True + limit_reason = "max_bytes" + break + if time.monotonic() > deadline: + truncated = True + limit_reason = "timeout" + break + + clean = line.rstrip("\n") + if not clean: + if current_file and current_snippets: + parsed_files.append({"file": current_file, "snippets": current_snippets}) + if len(parsed_files) >= MAX_SEARCH_FILES: + truncated = True + limit_reason = "max_files" + break + current_file = None + current_snippets = [] + continue + + if current_file is None: + if os.path.exists(os.path.join(root, clean.strip())): + current_file = clean.strip() + continue + + if snippet_count >= MAX_SEARCH_SNIPPETS: + truncated = True + limit_reason = "max_snippets" + break + current_snippets.append(clean.strip()[:200]) + snippet_count += 1 + + if current_file and current_snippets and len(parsed_files) < MAX_SEARCH_FILES: + parsed_files.append({"file": current_file, "snippets": current_snippets}) + + if truncated and proc.poll() is None: + proc.kill() + stderr = "" + try: + _stdout_tail, stderr = proc.communicate(timeout=1) + except subprocess.TimeoutExpired: + proc.kill() + _stdout_tail, stderr = proc.communicate() + + if proc.returncode not in (0, 1, -9) and not truncated: + return [], _meta(truncated, limit_reason, parsed_files, bytes_read), f"Git grep crashed: {stderr[:500]}" + except Exception as exc: + if proc and proc.poll() is None: + proc.kill() + return [], _meta(truncated, limit_reason, parsed_files, bytes_read), f"Search Exception: {exc}" + + return parsed_files, _meta(truncated, limit_reason, parsed_files, bytes_read), "" + + +def _meta(truncated: bool, limit_reason: str, parsed_files: list, bytes_read: int) -> Dict[str, object]: + return { + "truncated": truncated, + "limit_reason": limit_reason, + "elapsed_ms": 0, + "matched_files": len(parsed_files), + "bytes_read": bytes_read, + } + + +def _limits() -> Dict[str, object]: + return { + "max_bytes": MAX_SEARCH_BYTES, + "max_files": MAX_SEARCH_FILES, + "max_snippets": MAX_SEARCH_SNIPPETS, + "timeout_seconds": MAX_SEARCH_SECONDS, + } + + +def _search_data( + query: str, + mode: str, + results: List[Dict[str, object]], + meta: Dict[str, object], + degraded: List[str], + limits: Dict[str, object], + *, + error: str = "", + message: str = "", +) -> Dict[str, object]: + data = { + "results": results, + "query": query, + "mode": mode, + "counts": { + "matched_files": int(meta.get("matched_files", 0) or 0), + "result_count": len(results), + "bytes_read": int(meta.get("bytes_read", 0) or 0), + }, + "limits": limits, + "truncation": { + "truncated": bool(meta.get("truncated", False)), + "limit_reason": str(meta.get("limit_reason", "")), + }, + "elapsed_ms": meta.get("elapsed_ms", 0), + "degraded": degraded, + } + if error: + data["error"] = error + if message: + data["message"] = message + return data + + +def _search_issues(meta: Dict[str, object]) -> List[object]: + if not meta.get("truncated"): + return [] + reason = str(meta.get("limit_reason", "")) + return [make_issue( + "search_truncated", + severity=1, + message=f"Search truncated by {reason or 'limit'}.", + )] + + +def _search_metrics(meta: Dict[str, object]) -> Dict[str, object]: + return { + "elapsed_ms": meta.get("elapsed_ms", 0), + "matched_files": int(meta.get("matched_files", 0) or 0), + "bytes_read": int(meta.get("bytes_read", 0) or 0), + "truncated": bool(meta.get("truncated", False)), + } + + +def _legacy_search_payload(result: OperationResult) -> Dict[str, object]: + data = dict(result.data) + counts = data.get("counts", {}) + truncation = data.get("truncation", {}) + payload = { + "results": data.get("results", []), + "query": data.get("query", ""), + "mode": data.get("mode", "literal"), + "degraded": data.get("degraded", []), + "truncated": bool(truncation.get("truncated", False)) if isinstance(truncation, dict) else False, + "limit_reason": str(truncation.get("limit_reason", "")) if isinstance(truncation, dict) else "", + "elapsed_ms": data.get("elapsed_ms", 0), + "matched_files": int(counts.get("matched_files", 0) or 0) if isinstance(counts, dict) else 0, + "bytes_read": int(counts.get("bytes_read", 0) or 0) if isinstance(counts, dict) else 0, + "request_id": result.request_id, + } + if data.get("error"): + payload["error"] = data.get("error") + if data.get("message"): + payload["message"] = data.get("message") + return payload + + +def _load_topology(root: str): active_buffer_id = 0 control_mmap = os.path.join(root, ".dotscope", "control.mmap") - + degraded = [] + if os.path.exists(control_mmap): try: with open(control_mmap, "r+b") as f: mm = mmap.mmap(f.fileno(), 4096) active_buffer_id = mm[0] mm.close() - except: - pass + except Exception: + degraded.append("mvcc_control_mmap_unreadable") - target_bin = "topology_A.bin" if active_buffer_id == 0 else "topology_B.bin" - bin_path = os.path.join(root, ".dotscope", target_bin) + candidates = [ + os.path.join(root, ".dotscope", "topology_A.bin" if active_buffer_id == 0 else "topology_B.bin"), + os.path.join(root, ".dotscope", "topology.bin"), + ] manifest_path = os.path.join(root, ".dotscope", "structural_manifest.json") - + gravity_map = {} dep_map = defaultdict(list) - - # Try zero-copy enrichment mapping - if os.path.exists(bin_path) and os.path.exists(manifest_path): - try: - with open(manifest_path, "r", encoding="utf-8") as f: - manifest = json.load(f) - nodes = manifest.get("nodes", []) - - with open(bin_path, "rb") as f: - # The file is just 3 continuous block arrays of sizes N * 4. We can stat it to find N. - file_size = os.path.getsize(bin_path) - N = file_size // 12 # (sources + targets + weights) = 3 arrays of u32 (4 bytes). 12 bytes per edge! - - mm = mmap.mmap(f.fileno(), file_size, access=mmap.ACCESS_READ) - sources_bytes = mm[0 : N*4] - targets_bytes = mm[N*4 : N*8] - # weights_bytes = mm[N*8 : N*12] - - # Zero-copy the byte-aligned array directly into memory structs natively mapped - sources = struct.unpack(f'<{N}I', sources_bytes) - targets = struct.unpack(f'<{N}I', targets_bytes) - - in_edges = defaultdict(int) - out_edges = defaultdict(int) - - for s_idx, t_idx in zip(sources, targets): - in_edges[t_idx] += 1 - out_edges[s_idx] += 1 - - if s_idx < len(nodes) and t_idx < len(nodes): - # Node T depends on Node S implicitly since S provides functionality to T. - # Wait, edge weights are standard (if source imports target, source depends on target). - dep_map[nodes[s_idx]].append(nodes[t_idx]) - - for i, node in enumerate(nodes): - gravity = in_edges[i] + out_edges[i] - if gravity > 50: - gravity_map[node] = f"CRITICAL HUB (Connections: {gravity})" - elif gravity > 10: - gravity_map[node] = f"HIGH (Connections: {gravity})" - elif gravity > 3: - gravity_map[node] = f"MODERATE (Connections: {gravity})" - else: - gravity_map[node] = f"LOW (Connections: {gravity})" - - mm.close() - except: - pass - # STEP 3: Return Formatting (Agent Gratification) - final_output = [] - for p in parsed_files: - file = p["file"] - # Standardize slashes for dict lookup - lookup_name = file.replace("\\", "/") - - result_node = { - "file": file, - "architectural_gravity": gravity_map.get(lookup_name, "UNKNOWN (Not in pure graph)"), - "structural_dependencies": dep_map.get(lookup_name, [])[:10], - "snippets": p["snippets"] - } - final_output.append(result_node) - - return json.dumps({"results": final_output}, indent=2) + bin_path = next((path for path in candidates if os.path.exists(path)), "") + if not bin_path or not os.path.exists(manifest_path): + degraded.append("topology_artifacts_missing") + return gravity_map, dep_map, degraded + + try: + with open(manifest_path, "r", encoding="utf-8") as f: + manifest = json.load(f) + nodes = manifest.get("nodes", []) + + file_size = os.path.getsize(bin_path) + if file_size <= 0 or file_size % 12 != 0: + degraded.append("topology_binary_invalid") + return gravity_map, dep_map, degraded + edge_count = file_size // 12 + + with open(bin_path, "rb") as f: + mm = mmap.mmap(f.fileno(), file_size, access=mmap.ACCESS_READ) + sources = struct.unpack(f"<{edge_count}I", mm[0: edge_count * 4]) + targets = struct.unpack(f"<{edge_count}I", mm[edge_count * 4: edge_count * 8]) + in_edges = defaultdict(int) + out_edges = defaultdict(int) + + for s_idx, t_idx in zip(sources, targets): + in_edges[t_idx] += 1 + out_edges[s_idx] += 1 + if s_idx < len(nodes) and t_idx < len(nodes): + dep_map[nodes[s_idx]].append(nodes[t_idx]) + + for i, node in enumerate(nodes): + gravity = in_edges[i] + out_edges[i] + if gravity > 50: + gravity_map[node] = f"CRITICAL HUB (Connections: {gravity})" + elif gravity > 10: + gravity_map[node] = f"HIGH (Connections: {gravity})" + elif gravity > 3: + gravity_map[node] = f"MODERATE (Connections: {gravity})" + else: + gravity_map[node] = f"LOW (Connections: {gravity})" + mm.close() + except Exception: + degraded.append("topology_enrichment_failed") + + return gravity_map, dep_map, degraded diff --git a/dotscope/mcp/core.py b/dotscope/mcp/core.py index 7fa0fab..67d1914 100644 --- a/dotscope/mcp/core.py +++ b/dotscope/mcp/core.py @@ -7,7 +7,6 @@ def register_core_tools(mcp, **kwargs): tracker = kwargs.get('tracker') - client_id = kwargs.get('client_id') _repo_tokens = kwargs.get('_repo_tokens') _cached_history = kwargs.get('_cached_history') _cached_graph_hubs = kwargs.get('_cached_graph_hubs') @@ -83,16 +82,67 @@ def resolve_scope( return json.dumps(data, indent=2) return final_state.get("raw_output", "") + @mcp.tool() + @mcp_tool_route(legacy=True) + def resolve_scope_legacy( + scope: str, + budget: Optional[int] = None, + follow_related: bool = True, + format: str = "json", + task: Optional[str] = None, + root: Optional[str] = None, + ) -> str: + """Legacy JSON-string resolve response. Prefer resolve_scope.""" + return resolve_scope.__wrapped__( + scope=scope, + budget=budget, + follow_related=follow_related, + format=format, + task=task, + root=root, + ) + @mcp.tool() @mcp_tool_route - def dotscope_search(query: str, root: Optional[str] = None) -> str: + def dotscope_search(query: str, mode: str = "literal", root: Optional[str] = None) -> dict: """Search the codebase for keywords, functions, or concepts. This is the primary and most accurate repository search tool. It returns exact code matches enriched with structural blast-radius metrics and dependency hierarchies. Use this for ALL initial repository exploration. Args: - query: The text string or regular expression to rapidly search across the entire code repository graph. + query: Text to search for. + mode: "literal" by default, or "regex" for explicit regular expression search. """ + from ..engine.search import execute_search_result + started = _time.perf_counter() + result = execute_search_result(root, query, mode=mode) + try: + from ..storage.cache_generation import active_generation_id + from ..storage.timing import record_timing + + data = result.data + counts = data.get("counts", {}) + truncation = data.get("truncation", {}) + record_timing( + root, + "search", + (_time.perf_counter() - started) * 1000, + query=query, + status="ok" if result.ok else result.status, + error_class=",".join(issue.code for issue in result.issues)[:80], + cache_generation=active_generation_id(root), + result_count=int(counts.get("result_count", 0) or 0) if isinstance(counts, dict) else 0, + truncated=bool(truncation.get("truncated", False)) if isinstance(truncation, dict) else False, + ) + except Exception: + pass + return result + + @mcp.tool() + @mcp_tool_route(legacy=True) + def dotscope_search_legacy(query: str, mode: str = "literal", root: Optional[str] = None) -> str: + """Legacy JSON-string search response. Prefer dotscope_search.""" from ..engine.search import execute_semantic_search - return execute_semantic_search(root, query) + + return execute_semantic_search(root, query, mode=mode) @mcp.tool() @mcp_tool_route @@ -124,6 +174,12 @@ def match_scope(task: str, root: Optional[str] = None) -> str: "task": task, }, indent=2) + @mcp.tool() + @mcp_tool_route(legacy=True) + def match_scope_legacy(task: str, root: Optional[str] = None) -> str: + """Legacy JSON-string match response. Prefer match_scope.""" + return match_scope.__wrapped__(task=task, root=root) + @mcp.tool() @mcp_tool_route def get_context(scope: str, section: Optional[str] = None, root: Optional[str] = None) -> str: @@ -155,6 +211,16 @@ def get_context(scope: str, section: Optional[str] = None, root: Optional[str] = "description": config.description, }, indent=2) + @mcp.tool() + @mcp_tool_route(legacy=True) + def get_context_legacy( + scope: str, + section: Optional[str] = None, + root: Optional[str] = None, + ) -> str: + """Legacy JSON-string context response. Prefer get_context.""" + return get_context.__wrapped__(scope=scope, section=section, root=root) + @mcp.tool() @mcp_tool_route def list_scopes(root: Optional[str] = None) -> str: @@ -187,3 +253,9 @@ def list_scopes(root: Optional[str] = None) -> str: }) return json.dumps({"scopes": scopes, "count": len(scopes)}, indent=2) + + @mcp.tool() + @mcp_tool_route(legacy=True) + def list_scopes_legacy(root: Optional[str] = None) -> str: + """Legacy JSON-string scope list response. Prefer list_scopes.""" + return list_scopes.__wrapped__(root=root) diff --git a/dotscope/mcp/hooks.py b/dotscope/mcp/hooks.py index 9d6b5f4..6db54c1 100644 --- a/dotscope/mcp/hooks.py +++ b/dotscope/mcp/hooks.py @@ -1,3 +1,6 @@ +from .middleware import mcp_tool_route + + def register_hooks_tools(mcp, **kwargs): tracker = kwargs.get('tracker') client_id = kwargs.get('client_id') @@ -8,6 +11,7 @@ def register_hooks_tools(mcp, **kwargs): _cli_root = kwargs.get('_cli_root') @mcp.tool() + @mcp_tool_route def dotscope_sync( scopes: list[str] | None = None, root: str | None = None, diff --git a/dotscope/mcp/middleware.py b/dotscope/mcp/middleware.py index 136de9c..c96859e 100644 --- a/dotscope/mcp/middleware.py +++ b/dotscope/mcp/middleware.py @@ -1,86 +1,220 @@ -import json import functools +import hashlib +import json import os -from typing import Callable, Any +import time +import uuid +from pathlib import Path +from typing import Any, Callable, Optional + from .logger import get_mcp_logger +from ..models.control_plane import OperationResult, make_issue, make_result, result_from_exception from ..paths.repo import find_repo_root +from ..storage.request_context import reset_request_id, set_request_id + + +def mcp_tool_route(func: Optional[Callable[..., Any]] = None, *, legacy: bool = False): + """Route MCP tool calls through repo discovery, telemetry, and safe errors.""" + + if func is None: + return lambda wrapped: mcp_tool_route(wrapped, legacy=legacy) -def mcp_tool_route(func: Callable[..., Any]) -> Callable[..., str]: - """A bulletproof interceptor for MCP backend tools. - - 1. Locates and provisions the `repo_root` dynamically. - 2. Executes the decorated business logic passing the `root` argument. - 3. Catches all Unhandled Exceptions formatting them correctly into Safe JSON models, - while routing stack traces strictly into `.dotscope/mcp_debug.log`. - 4. Automatically standardizes the responses to JSON `str`. - """ @functools.wraps(func) - def wrapper(*args, **kwargs) -> str: + def wrapper(*args, **kwargs) -> Any: + request_id = uuid.uuid4().hex[:12] + token = set_request_id(request_id) + started = time.perf_counter() logger = get_mcp_logger() - logger.debug(f"Executing MCP Tool: {func.__name__} {args} {kwargs}") - - # Respect an explicit root from the caller before falling back to discovery. - root_hint = kwargs.get("root") or os.environ.get("DOTSCOPE_ROOT") - root = find_repo_root(root_hint) if root_hint else None - if root is None: - root = find_repo_root() - if not root: - err_msg = "Could not find repository root" - logger.warning(f"Aborted MCP execution for '{func.__name__}': {err_msg}") - return json.dumps({"error": err_msg}) - - kwargs["root"] = root - - # Log to the real-time Mission Control feed - try: - import time - import os - from pathlib import Path - activity_path = Path(root) / ".dotscope" / "mcp_activity.jsonl" - - # Simple sanitize formatting for the UI - target_str = str(kwargs) - if "task_description" in kwargs: - target_str = f"Task: {kwargs['task_description'][:30]}..." - elif "scope" in kwargs: - target_str = f"Scope: ({kwargs.get('scope')})" - elif "task" in kwargs: - target_str = f"Task: {kwargs['task'][:30]}..." - elif "query" in kwargs: - target_str = f"Query: <{kwargs['query']}>" - elif "primary_files" in kwargs: - target_str = f"Locks: {len(kwargs['primary_files'])} targets" - - activity = { - "ts": int(time.time() * 1000), - "tool": func.__name__.replace('mcp_dotscope_', ''), - "target": target_str - } - with open(activity_path, "a", encoding="utf-8") as f: - f.write(json.dumps(activity) + "\n") - except Exception: - pass - - # MVCC Read-Plane Enforcement - try: - from .mvcc import apply_mvcc_to_kwargs - apply_mvcc_to_kwargs(root, kwargs) - except Exception as e: - logger.warning(f"Failed to cleanly apply MVCC semaphores: {e}") + status = "ok" + error_class = "" + root = None + target_str = str(kwargs) + compat_legacy = legacy or os.environ.get("DOTSCOPE_MCP_COMPAT", "").lower() == "legacy" try: + logger.debug( + "Executing MCP Tool request_id=%s tool=%s args=%s kwargs=%s", + request_id, func.__name__, args, kwargs, + ) + + root_hint = kwargs.get("root") or os.environ.get("DOTSCOPE_ROOT") + root = find_repo_root(root_hint) if root_hint else None + if root is None: + root = find_repo_root() + if not root: + status = "error" + error_class = "RepoRootNotFound" + err_msg = "Could not find repository root" + logger.warning("Aborted MCP execution request_id=%s tool=%s: %s", request_id, func.__name__, err_msg) + if compat_legacy: + return json.dumps({"error": err_msg, "request_id": request_id}) + result = make_result( + root=None, + data={"error": err_msg}, + issues=[make_issue("repo_root_missing")], + request_id=request_id, + ) + return result.to_dict(include_root=False) + + kwargs["root"] = root + target_str = _target_for_kwargs(kwargs) + + try: + from .mvcc import apply_mvcc_to_kwargs + + apply_mvcc_to_kwargs(root, kwargs) + except Exception as exc: + logger.warning("MVCC fallback request_id=%s tool=%s: %s", request_id, func.__name__, exc) + result = func(*args, **kwargs) - # Support handlers returning pre-serialized JSON explicitly, or dicts perfectly - if isinstance(result, str): - return result - # Standard auto-serializer - return json.dumps(result, indent=2) - except Exception as e: - logger.exception(f"FATAL Exception in MCP Tool Handler '{func.__name__}': {str(e)}") - return json.dumps({ - "error": "Execution Fault (Recoverable)", - "message": "The tool encountered an unexpected structural exception. DO NOT abort your overarching task. System diagnostics have been logged safely. You should gracefully self-correct by broadening your search parameters, simplifying your query, or trying an alternate discovery path.", - "details": str(e) - }, indent=2) - + if compat_legacy: + output = _legacy_output(result, request_id) + status = _status_from_output(output) + return output + + op_result = _operation_result_from_tool_result(root, result, request_id) + status = op_result.status + return op_result.to_dict(include_root=False) + except Exception as exc: + status = "error" + error_class = exc.__class__.__name__ + logger.exception( + "FATAL Exception in MCP Tool Handler request_id=%s tool=%s: %s", + request_id, func.__name__, str(exc), + ) + if compat_legacy: + return json.dumps({ + "error": "Execution Fault (Recoverable)", + "request_id": request_id, + "message": "The tool encountered an unexpected structural exception. System diagnostics have been logged.", + "details": str(exc), + }, indent=2) + result = result_from_exception( + root, + exc, + request_id=request_id, + message="The tool encountered an unexpected structural exception.", + ) + return result.to_dict(include_root=False) + finally: + elapsed_ms = round((time.perf_counter() - started) * 1000, 2) + if root: + _write_activity( + root, + request_id=request_id, + tool=func.__name__.replace("mcp_dotscope_", ""), + target=target_str, + status=status, + duration_ms=elapsed_ms, + error_class=error_class, + ) + reset_request_id(token) + return wrapper + + +def _target_for_kwargs(kwargs: dict) -> str: + if "task_description" in kwargs: + return f"Task: {kwargs['task_description'][:30]}..." + if "scope" in kwargs: + return f"Scope: ({kwargs.get('scope')})" + if "task" in kwargs: + return f"Task: {kwargs['task'][:30]}..." + if "query" in kwargs: + return f"Query: <{kwargs['query']}>" + if "primary_files" in kwargs: + return f"Locks: {len(kwargs['primary_files'])} targets" + return str(kwargs) + + +def _status_from_output(output: str) -> str: + try: + data = json.loads(output) + except (TypeError, ValueError): + return "ok" + if isinstance(data, dict) and data.get("error"): + return "error" + return "ok" + + +def _inject_request_id_if_error(output: str, request_id: str) -> str: + try: + data = json.loads(output) + except (TypeError, ValueError): + return output + if isinstance(data, dict) and data.get("error") and "request_id" not in data: + data["request_id"] = request_id + return json.dumps(data, indent=2) + return output + + +def _legacy_output(result: Any, request_id: str) -> str: + if isinstance(result, OperationResult): + return json.dumps(result.data, indent=2) + if isinstance(result, str): + return _inject_request_id_if_error(result, request_id) + return json.dumps(result, indent=2) + + +def _operation_result_from_tool_result(root: str, result: Any, request_id: str) -> OperationResult: + if isinstance(result, OperationResult): + if result.request_id == request_id: + return result + return make_result( + root=root, + data=result.data, + issues=result.issues, + warnings=result.warnings, + metrics=result.metrics, + request_id=request_id, + ) + + issues = [] + data: dict + if isinstance(result, str): + try: + parsed = json.loads(result) + except (TypeError, ValueError): + parsed = {"output": result} + if isinstance(parsed, dict): + data = parsed + else: + data = {"result": parsed} + elif isinstance(result, dict): + data = result + else: + data = {"result": result} + + if data.get("error"): + issues.append(make_issue("operation_failed", message=str(data.get("error")))) + return make_result(root=root, data=data, issues=issues, request_id=request_id) + + +def _write_activity( + root: str, + *, + request_id: str, + tool: str, + target: str, + status: str, + duration_ms: float, + error_class: str, +) -> None: + try: + activity_path = Path(root) / ".dotscope" / "mcp_activity.jsonl" + activity_path.parent.mkdir(parents=True, exist_ok=True) + root_id = hashlib.sha256(str(Path(root).resolve()).encode("utf-8")).hexdigest()[:12] + activity = { + "ts": int(time.time() * 1000), + "request_id": request_id, + "tool": tool, + "target": target, + "status": status, + "duration_ms": duration_ms, + "root_id": root_id, + "error_class": error_class, + } + with open(activity_path, "a", encoding="utf-8") as f: + f.write(json.dumps(activity) + "\n") + except Exception: + pass diff --git a/dotscope/mcp/pipelines.py b/dotscope/mcp/pipelines.py index ff8ca14..61b6014 100644 --- a/dotscope/mcp/pipelines.py +++ b/dotscope/mcp/pipelines.py @@ -1,7 +1,7 @@ import json import os import time as _time -from typing import Dict, Any, List, Optional +from typing import Dict, Any, List from .logger import get_mcp_logger logger = get_mcp_logger() @@ -221,7 +221,7 @@ def process(self, state: Dict[str, Any]) -> Dict[str, Any]: try: from ..workflows.intent import load_voice_config vc = load_voice_config(root) - except Exception as e: + except Exception: logger.debug("Failed loading voice config", exc_info=True) routing = build_routing_guidance(module, conventions=conventions, voice_config=vc, repo_root=root) @@ -352,9 +352,20 @@ def process(self, state: Dict[str, Any]) -> Dict[str, Any]: try: from ..storage.timing import record_timing + from ..storage.cache_generation import active_generation_id elapsed_ms = (_time.perf_counter() - state["_resolve_start"]) * 1000 if root: - record_timing(root, "resolve", elapsed_ms) + data = state.get("data") or {} + record_timing( + root, + "resolve", + elapsed_ms, + scope=state.get("scope"), + status="error" if state.get("halt_error") else "ok", + freshness_state=str((data.get("source_of_truth") or {}).get("state", "")), + cache_generation=active_generation_id(root), + result_count=len(data.get("files", [])) if isinstance(data, dict) else None, + ) except Exception: logger.debug("Telemetry timing log failed", exc_info=True) diff --git a/dotscope/models/__init__.py b/dotscope/models/__init__.py index 185bef4..a53ed0f 100644 --- a/dotscope/models/__init__.py +++ b/dotscope/models/__init__.py @@ -1,6 +1,7 @@ """Unified model facade: re-exports all data models from sub-modules.""" from .core import * # noqa: F401,F403 +from .control_plane import * # noqa: F401,F403 from .eval import * # noqa: F401,F403 from .history import * # noqa: F401,F403 from .intent import * # noqa: F401,F403 diff --git a/dotscope/models/control_plane.py b/dotscope/models/control_plane.py new file mode 100644 index 0000000..d19d0c5 --- /dev/null +++ b/dotscope/models/control_plane.py @@ -0,0 +1,333 @@ +"""Versioned control-plane response contract for CLI and MCP surfaces.""" + +from __future__ import annotations + +import hashlib +import json +import uuid +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any, Dict, Iterable, List, Optional + +from ..storage.request_context import get_request_id + + +SCHEMA_VERSION = "dotscope.control_plane.v1" +STATUS_HEALTHY = "healthy" +STATUS_DEGRADED = "degraded" +STATUS_FAILED = "failed" +VALID_STATUSES = {STATUS_HEALTHY, STATUS_DEGRADED, STATUS_FAILED} +VALID_SEVERITIES = {0, 1, 2} + + +ISSUE_CATALOG: Dict[str, Dict[str, object]] = { + "stale_refresh_lock": { + "severity": 2, + "message": "Refresh lock heartbeat is stale.", + "recoverable": True, + }, + "running_without_lock": { + "severity": 2, + "message": "Refresh status says a worker is running, but no valid lock exists.", + "recoverable": True, + }, + "cache_legacy_generation": { + "severity": 1, + "message": "Cache artifacts use the legacy non-generation layout.", + "recoverable": True, + }, + "cache_corrupt": { + "severity": 2, + "message": "Active cache generation is corrupt or incomplete.", + "recoverable": True, + }, + "cache_missing": { + "severity": 2, + "message": "Required cache artifacts are missing.", + "recoverable": True, + }, + "cache_error": { + "severity": 2, + "message": "Cache integrity could not be evaluated.", + "recoverable": True, + }, + "recent_mcp_failures": { + "severity": 1, + "message": "Recent MCP tool calls failed.", + "recoverable": False, + }, + "search_truncated": { + "severity": 1, + "message": "Search hit a hard result limit and returned a partial result.", + "recoverable": False, + }, + "invalid_query": { + "severity": 2, + "message": "Search query is invalid.", + "recoverable": False, + }, + "repo_root_missing": { + "severity": 2, + "message": "Could not find repository root.", + "recoverable": False, + }, + "legacy_refresh_syntax": { + "severity": 1, + "message": "Legacy refresh syntax was used.", + "recoverable": True, + }, + "dry_run_required": { + "severity": 2, + "message": "This operation must be previewed with dry_run before applying.", + "recoverable": True, + }, + "refresh_last_error": { + "severity": 1, + "message": "The last refresh job recorded an error.", + "recoverable": True, + }, + "operation_failed": { + "severity": 2, + "message": "The operation failed.", + "recoverable": False, + }, +} + + +@dataclass(frozen=True) +class RecommendedAction: + command: str = "" + destructive: bool = False + dry_run_command: str = "" + + def to_dict(self) -> Dict[str, object]: + return { + "command": self.command, + "destructive": self.destructive, + "dry_run_command": self.dry_run_command, + } + + +@dataclass(frozen=True) +class ControlIssue: + code: str + severity: int + message: str + path: str = "" + recoverable: bool = False + recommended_action: Optional[RecommendedAction] = None + + def to_dict(self) -> Dict[str, object]: + return { + "code": self.code, + "severity": self.severity, + "message": self.message, + "path": self.path, + "recoverable": self.recoverable, + "recommended_action": ( + self.recommended_action.to_dict() if self.recommended_action else None + ), + } + + +@dataclass(frozen=True) +class ControlWarning: + code: str + message: str + path: str = "" + + def to_dict(self) -> Dict[str, object]: + return { + "code": self.code, + "message": self.message, + "path": self.path, + } + + +@dataclass(frozen=True) +class OperationResult: + data: Dict[str, Any] = field(default_factory=dict) + issues: List[ControlIssue] = field(default_factory=list) + warnings: List[ControlWarning] = field(default_factory=list) + metrics: Dict[str, Any] = field(default_factory=dict) + request_id: str = "" + root: Optional[str] = None + root_id: Optional[str] = None + schema_version: str = SCHEMA_VERSION + + @property + def max_severity(self) -> int: + return max([issue.severity for issue in self.issues] or [0]) + + @property + def status(self) -> str: + severity = self.max_severity + if severity <= 0: + return STATUS_HEALTHY + if severity == 1: + return STATUS_DEGRADED + return STATUS_FAILED + + @property + def ok(self) -> bool: + return self.status == STATUS_HEALTHY + + @property + def exit_code(self) -> int: + return self.max_severity + + def to_dict(self, *, include_root: bool = True) -> Dict[str, object]: + payload = { + "schema_version": self.schema_version, + "ok": self.ok, + "status": self.status, + "request_id": self.request_id, + "root": self.root if include_root else None, + "root_id": self.root_id, + "data": self.data, + "issues": [issue.to_dict() for issue in self.issues], + "warnings": [warning.to_dict() for warning in self.warnings], + "metrics": self.metrics, + } + validate_payload(payload) + return payload + + def to_json(self, *, include_root: bool = True, indent: int = 2) -> str: + return json.dumps(self.to_dict(include_root=include_root), indent=indent) + + +def canonical_root(root: Optional[str]) -> Optional[str]: + if not root: + return None + return str(Path(root).resolve()) + + +def root_identity(root: Optional[str]) -> Optional[str]: + canonical = canonical_root(root) + if not canonical: + return None + return hashlib.sha256(canonical.encode("utf-8")).hexdigest()[:12] + + +def request_id_or_new(request_id: str = "") -> str: + return request_id or get_request_id() or uuid.uuid4().hex[:12] + + +def make_issue( + code: str, + *, + severity: Optional[int] = None, + message: str = "", + path: str = "", + recoverable: Optional[bool] = None, + command: str = "", + dry_run_command: str = "", + destructive: bool = False, +) -> ControlIssue: + defaults = ISSUE_CATALOG.get(code, {}) + resolved_severity = int(severity if severity is not None else defaults.get("severity", 2)) + if resolved_severity not in VALID_SEVERITIES: + raise ValueError(f"invalid issue severity for {code}: {resolved_severity}") + action = None + if command or dry_run_command: + action = RecommendedAction( + command=command, + destructive=destructive, + dry_run_command=dry_run_command, + ) + return ControlIssue( + code=code, + severity=resolved_severity, + message=message or str(defaults.get("message", code)), + path=path, + recoverable=bool(defaults.get("recoverable", False) if recoverable is None else recoverable), + recommended_action=action, + ) + + +def make_warning(code: str, message: str, path: str = "") -> ControlWarning: + return ControlWarning(code=code, message=message, path=path) + + +def make_result( + *, + root: Optional[str], + data: Optional[Dict[str, Any]] = None, + issues: Optional[Iterable[ControlIssue]] = None, + warnings: Optional[Iterable[ControlWarning]] = None, + metrics: Optional[Dict[str, Any]] = None, + request_id: str = "", +) -> OperationResult: + canonical = canonical_root(root) + result = OperationResult( + data=data or {}, + issues=list(issues or []), + warnings=list(warnings or []), + metrics=metrics or {}, + request_id=request_id_or_new(request_id), + root=canonical, + root_id=root_identity(canonical), + ) + result.to_dict() + return result + + +def validate_payload(value: Any, path: str = "$") -> None: + if value is None or isinstance(value, (bool, int, float)): + return + if isinstance(value, str): + _reject_json_string(value, path) + return + if isinstance(value, list): + for index, item in enumerate(value): + validate_payload(item, f"{path}[{index}]") + return + if isinstance(value, dict): + for key, item in value.items(): + if not isinstance(key, str): + raise TypeError(f"{path} contains a non-string object key: {key!r}") + validate_payload(item, f"{path}.{key}") + return + raise TypeError(f"{path} contains non-JSON value {type(value).__name__}") + + +def _reject_json_string(value: str, path: str) -> None: + stripped = value.strip() + if not stripped or stripped[0] not in "[{": + return + try: + parsed = json.loads(stripped) + except ValueError: + return + if isinstance(parsed, (dict, list)): + raise ValueError(f"{path} contains a serialized JSON string") + + +def result_from_exception( + root: Optional[str], + exc: Exception, + *, + request_id: str = "", + message: str = "", +) -> OperationResult: + return make_result( + root=root, + data={"error": message or str(exc), "error_class": exc.__class__.__name__}, + issues=[make_issue("operation_failed", message=message or str(exc))], + request_id=request_id, + ) + + +def legacy_status_dict(result: OperationResult) -> Dict[str, object]: + data = dict(result.data) + data["healthy"] = result.status == STATUS_HEALTHY + data["exit_code"] = result.exit_code + data["issues"] = [issue.code for issue in result.issues] + data["request_id"] = result.request_id + return data + + +def ensure_issue_catalog_complete(codes: Iterable[str]) -> None: + missing = sorted(set(codes) - set(ISSUE_CATALOG)) + if missing: + raise ValueError(f"issue codes missing from catalog: {', '.join(missing)}") diff --git a/dotscope/passes/incremental.py b/dotscope/passes/incremental.py index 9806c21..2183799 100644 --- a/dotscope/passes/incremental.py +++ b/dotscope/passes/incremental.py @@ -20,6 +20,7 @@ logical_scope_path, scope_name_from_logical_path, ) +from ..storage.atomic import atomic_write_json, atomic_write_text def incremental_update( @@ -120,8 +121,7 @@ def _add_to_scope_includes(root: str, scope_path: str, filepath: str) -> None: config.includes.append(filepath) content = serialize_scope(config) - with open(runtime_path, "w", encoding="utf-8") as f: - f.write(content) + atomic_write_text(runtime_path, content) def _remove_from_scope_includes(root: str, scope_path: str, filepath: str) -> None: @@ -138,8 +138,7 @@ def _remove_from_scope_includes(root: str, scope_path: str, filepath: str) -> No if filepath in config.includes: config.includes.remove(filepath) content = serialize_scope(config) - with open(runtime_path, "w", encoding="utf-8") as f: - f.write(content) + atomic_write_text(runtime_path, content) def _update_stabilities(root: str, changed_files: List[str]) -> None: @@ -176,8 +175,7 @@ def _update_stabilities(root: str, changed_files: List[str]) -> None: if updated: invariants["file_stabilities"] = stabilities - with open(inv_path, "w", encoding="utf-8") as f: - json.dump(invariants, f, indent=2) + atomic_write_json(inv_path, invariants) def _find_impacted_scope_paths(root: str, filepaths: List[str]) -> List[str]: diff --git a/dotscope/passes/lang/_treesitter.py b/dotscope/passes/lang/_treesitter.py index 20db480..4ea1b44 100644 --- a/dotscope/passes/lang/_treesitter.py +++ b/dotscope/passes/lang/_treesitter.py @@ -1,6 +1,7 @@ """Tree-sitter utilities: grammar loading, parser caching, query execution.""" -from typing import Dict, List, Tuple +import warnings +from typing import Any, Callable, Dict, List, Tuple try: from tree_sitter import Language, Parser, Node @@ -8,30 +9,43 @@ _languages: Dict[str, Language] = {} _parsers: Dict[str, Parser] = {} + def _coerce_language(factory: Callable[[], Any]) -> Language: + """Handle both capsule-returning and legacy int-returning grammar bindings.""" + value = factory() + if isinstance(value, int): + with warnings.catch_warnings(): + warnings.filterwarnings( + "ignore", + message="int argument support is deprecated", + category=DeprecationWarning, + ) + return Language(value) + return Language(value) + def get_language(name: str) -> Language: """Load and cache a tree-sitter Language.""" if name not in _languages: if name == "javascript": import tree_sitter_javascript as mod - _languages[name] = Language(mod.language()) + _languages[name] = _coerce_language(mod.language) elif name == "typescript": import tree_sitter_typescript as mod - _languages[name] = Language(mod.language_typescript()) + _languages[name] = _coerce_language(mod.language_typescript) elif name == "tsx": import tree_sitter_typescript as mod - _languages[name] = Language(mod.language_tsx()) + _languages[name] = _coerce_language(mod.language_tsx) elif name == "go": import tree_sitter_go as mod - _languages[name] = Language(mod.language()) + _languages[name] = _coerce_language(mod.language) elif name == "solidity": import tree_sitter_solidity as mod - _languages[name] = Language(mod.language()) + _languages[name] = _coerce_language(mod.language) elif name == "java": import tree_sitter_java as mod - _languages[name] = Language(mod.language()) + _languages[name] = _coerce_language(mod.language) elif name == "rust": import tree_sitter_rust as mod - _languages[name] = Language(mod.language()) + _languages[name] = _coerce_language(mod.language) else: raise ValueError(f"Unknown tree-sitter language: {name}") return _languages[name] diff --git a/dotscope/paths/repo.py b/dotscope/paths/repo.py index d5eabe9..f5b10c4 100644 --- a/dotscope/paths/repo.py +++ b/dotscope/paths/repo.py @@ -6,7 +6,8 @@ def find_repo_root(start_dir: Optional[str] = None) -> Optional[str]: Returns the directory containing the marker, or None. """ - current = os.path.abspath(start_dir or os.getcwd()) + root_hint = start_dir if start_dir is not None else os.environ.get("DOTSCOPE_ROOT") + current = os.path.abspath(root_hint or os.getcwd()) while True: if os.path.isfile(os.path.join(current, ".scopes")): diff --git a/dotscope/search/retriever.py b/dotscope/search/retriever.py index 3ddbda8..fbfb835 100644 --- a/dotscope/search/retriever.py +++ b/dotscope/search/retriever.py @@ -1,18 +1,7 @@ -"""Compiled Retrieval: hybrid dense + BM25 search with RRF fusion. - -Storage layout in .dotscope/cache/: - vectors.npy — float32 [n_chunks x dimensions], raw binary (mmap'd) - vectors_meta.json — List[RetrievalChunk] metadata (sans embedding) - bm25_index.json — Inverted index for BM25 scoring - vector_index.json — VectorIndex metadata (model, dimensions, freshness) - -Zero-dependency default: BM25-only retrieval using the inverted index. -Optional sentence-transformers enables dense cosine + RRF fusion. -""" +"""Compiled Retrieval: hybrid dense + BM25 search with RRF fusion.""" import json import math -import os import subprocess import time from dataclasses import asdict @@ -20,59 +9,53 @@ from typing import Dict, List, Optional, Tuple from .models import RetrievalChunk, SearchResult, VectorIndex +from ..storage.atomic import atomic_write_json +from ..storage.cache_generation import ( + active_cache_dir, + active_generation_id, + generation_path, + new_generation_id, + write_active_generation, +) + -# BM25 parameters BM25_K1 = 1.2 BM25_B = 0.75 - -# RRF fusion constant RRF_K = 60 - -# Staleness threshold (commits since last vector update) STALE_COMMIT_THRESHOLD = 50 +_ARTIFACT_CACHE: Dict[Tuple[str, str], dict] = {} -# --------------------------------------------------------------------------- -# Index Building -# --------------------------------------------------------------------------- def build_vector_index( root: str, chunks: List[RetrievalChunk], model_name: str = "all-MiniLM-L6-v2", ) -> VectorIndex: - """Build or rebuild the embedding index during dotscope ingest. - - Builds BM25 inverted index (always). Builds dense embeddings - (only if sentence-transformers is available). - """ - cache_dir = Path(root) / ".dotscope" / "cache" + """Build a retrieval index into a new cache generation and activate it last.""" + generation_id = new_generation_id() + cache_dir = generation_path(root, generation_id) cache_dir.mkdir(parents=True, exist_ok=True) - # Build BM25 index (always) bm25 = build_bm25_index(chunks) - with open(cache_dir / "bm25_index.json", "w", encoding="utf-8") as f: - json.dump(bm25, f) + atomic_write_json(cache_dir / "bm25_index.json", bm25) - # Save chunk metadata meta = [asdict(c) for c in chunks] - with open(cache_dir / "vectors_meta.json", "w", encoding="utf-8") as f: - json.dump(meta, f) + atomic_write_json(cache_dir / "vectors_meta.json", meta) - # Try dense embeddings dimensions = 0 + artifacts = ["bm25_index.json", "vectors_meta.json", "vector_index.json"] try: embeddings = _embed_texts([c.content for c in chunks]) if embeddings is not None: import numpy as np + np.save(str(cache_dir / "vectors.npy"), embeddings) + np.save(str(cache_dir / "vector_norms.npy"), np.linalg.norm(embeddings, axis=1) + 1e-10) dimensions = embeddings.shape[1] - model_name = model_name + artifacts.extend(["vectors.npy", "vector_norms.npy"]) except Exception: - pass # No sentence-transformers — BM25 only - - # Get current HEAD commit - head_commit = _git_head(root) + pass index = VectorIndex( model_name=model_name if dimensions > 0 else "bm25-only", @@ -80,24 +63,17 @@ def build_vector_index( file_count=len(set(c.file_path for c in chunks)), chunk_count=len(chunks), built_at=time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), - last_vector_update_commit=head_commit, + last_vector_update_commit=_git_head(root), ) - with open(cache_dir / "vector_index.json", "w", encoding="utf-8") as f: - json.dump(asdict(index), f, indent=2) - + atomic_write_json(cache_dir / "vector_index.json", asdict(index)) + write_active_generation(root, generation_id, artifacts) + _ARTIFACT_CACHE.pop((str(Path(root).resolve()), "legacy"), None) return index def build_bm25_index(chunks: List[RetrievalChunk]) -> dict: - """Build an inverted index for BM25 scoring. - - Structure: - { - "_N": 500000, "_avg_dl": 142.3, "_doc_lengths": [120, ...], - "stripe": [{"chunk_id": 4821, "tf": 3}, ...], - } - """ + """Build an inverted index for BM25 scoring.""" index: Dict[str, list] = {"_N": len(chunks), "_doc_lengths": []} term_postings: Dict[str, list] = {} total_tokens = 0 @@ -108,45 +84,37 @@ def build_bm25_index(chunks: List[RetrievalChunk]) -> dict: index["_doc_lengths"].append(doc_len) total_tokens += doc_len - # Count term frequencies tf: Dict[str, int] = {} for token in tokens: tf[token] = tf.get(token, 0) + 1 for term, count in tf.items(): - if term not in term_postings: - term_postings[term] = [] - term_postings[term].append({"chunk_id": chunk.chunk_id, "tf": count}) + term_postings.setdefault(term, []).append({"chunk_id": chunk.chunk_id, "tf": count}) index["_avg_dl"] = total_tokens / len(chunks) if chunks else 0 index.update(term_postings) return index -# --------------------------------------------------------------------------- -# Search -# --------------------------------------------------------------------------- - def search(query: str, root: str, limit: int = 10) -> List[SearchResult]: """Hybrid retrieval: dense cosine + BM25 keyword, RRF-fused.""" - cache_dir = Path(root) / ".dotscope" / "cache" + limit = max(1, min(int(limit), 50)) + cache_dir = active_cache_dir(root) + cache_key = (str(Path(root).resolve()), active_generation_id(root)) + artifacts = _load_artifacts(cache_dir, cache_key) - # Load metadata - metadata = _load_metadata(cache_dir) + metadata = artifacts.get("metadata", []) if not metadata: return [] - # BM25 path (always available) - bm25_index = _load_bm25_index(cache_dir) + bm25_index = artifacts.get("bm25_index") bm25_results = _bm25_search(query, bm25_index, limit=50) if bm25_index else [] - # Dense path (if embeddings available) dense_results = [] - embeddings = _load_embeddings(cache_dir) + embeddings = artifacts.get("embeddings") if embeddings is not None: - dense_results = _dense_search(query, embeddings, limit=50) + dense_results = _dense_search(query, embeddings, artifacts.get("norms"), limit=50) - # Fusion if dense_results and bm25_results: fused = _rrf_fusion(dense_results, bm25_results, limit=limit) elif bm25_results: @@ -156,7 +124,6 @@ def search(query: str, root: str, limit: int = 10) -> List[SearchResult]: else: return [] - # Build SearchResult objects results = [] for chunk_id, dense_score, bm25_score, rrf_score in fused: if chunk_id < len(metadata): @@ -169,21 +136,19 @@ def search(query: str, root: str, limit: int = 10) -> List[SearchResult]: rrf_score=rrf_score, recency_adjusted=rrf_score, )) - return results def check_index_freshness(root: str) -> Tuple[bool, str]: """Returns (is_fresh, reason).""" - cache_dir = Path(root) / ".dotscope" / "cache" + cache_dir = active_cache_dir(root) index_path = cache_dir / "vector_index.json" if not index_path.exists(): return False, "No index found. Run dotscope ingest." try: - with open(index_path, "r", encoding="utf-8") as f: - data = json.load(f) + data = json.loads(index_path.read_text(encoding="utf-8")) last_commit = data.get("last_vector_update_commit", "") except Exception: return False, "Cannot read index metadata." @@ -191,7 +156,6 @@ def check_index_freshness(root: str) -> Tuple[bool, str]: if not last_commit: return False, "Index has no commit watermark." - # Count commits since last update try: result = subprocess.run( ["git", "rev-list", "--count", f"{last_commit}..HEAD"], @@ -208,21 +172,14 @@ def check_index_freshness(root: str) -> Tuple[bool, str]: return False, "Cannot determine index freshness." -# --------------------------------------------------------------------------- -# BM25 Implementation -# --------------------------------------------------------------------------- - def _bm25_search(query: str, index: dict, limit: int = 50) -> List[Tuple[int, float]]: - """Score only chunks containing query terms. Returns [(chunk_id, score)].""" query_tokens = _tokenize(query) if not query_tokens: return [] - N = index.get("_N", 0) - avg_dl = index.get("_avg_dl", 1) + n_docs = index.get("_N", 0) + avg_dl = index.get("_avg_dl", 1) or 1 doc_lengths = index.get("_doc_lengths", []) - - # Collect candidate chunk IDs from postings lists candidates: Dict[int, float] = {} for term in query_tokens: @@ -231,52 +188,55 @@ def _bm25_search(query: str, index: dict, limit: int = 50) -> List[Tuple[int, fl continue df = len(postings) - idf = math.log((N - df + 0.5) / (df + 0.5) + 1.0) + idf = math.log((n_docs - df + 0.5) / (df + 0.5) + 1.0) for posting in postings: chunk_id = posting["chunk_id"] tf = posting["tf"] dl = doc_lengths[chunk_id] if chunk_id < len(doc_lengths) else avg_dl - numerator = tf * (BM25_K1 + 1) denominator = tf + BM25_K1 * (1 - BM25_B + BM25_B * dl / avg_dl) - score = idf * numerator / denominator - - candidates[chunk_id] = candidates.get(chunk_id, 0.0) + score + candidates[chunk_id] = candidates.get(chunk_id, 0.0) + idf * numerator / denominator ranked = sorted(candidates.items(), key=lambda x: -x[1]) return ranked[:limit] -# --------------------------------------------------------------------------- -# Dense Retrieval -# --------------------------------------------------------------------------- - def _load_embeddings(cache_dir: Path): - """Memory-map the embedding matrix. Returns None if unavailable.""" path = cache_dir / "vectors.npy" if not path.exists(): return None try: import numpy as np + + return np.load(str(path), mmap_mode="r") + except Exception: + return None + + +def _load_vector_norms(cache_dir: Path): + path = cache_dir / "vector_norms.npy" + if not path.exists(): + return None + try: + import numpy as np + return np.load(str(path), mmap_mode="r") except Exception: return None -def _dense_search(query: str, embeddings, limit: int = 50) -> List[Tuple[int, float]]: - """Cosine similarity search against mmap'd embedding matrix.""" +def _dense_search(query: str, embeddings, norms=None, limit: int = 50) -> List[Tuple[int, float]]: try: import numpy as np + query_vec = _embed_texts([query]) if query_vec is None: return [] - # Cosine similarity: query_vec @ embeddings.T - query_vec = query_vec[0] - # Normalize - query_norm = query_vec / (np.linalg.norm(query_vec) + 1e-10) - norms = np.linalg.norm(embeddings, axis=1, keepdims=True) + 1e-10 - similarities = embeddings @ query_norm / norms.squeeze() + query_norm = query_vec[0] / (np.linalg.norm(query_vec[0]) + 1e-10) + if norms is None: + norms = np.linalg.norm(embeddings, axis=1) + 1e-10 + similarities = embeddings @ query_norm / norms top_ids = np.argsort(similarities)[::-1][:limit] return [(int(idx), float(similarities[idx])) for idx in top_ids] except Exception: @@ -284,7 +244,6 @@ def _dense_search(query: str, embeddings, limit: int = 50) -> List[Tuple[int, fl def _embed_texts(texts: List[str], batch_size: int = 512): - """Embed texts in batches using sentence-transformers. Returns numpy array or None.""" try: from sentence_transformers import SentenceTransformer import numpy as np @@ -294,7 +253,6 @@ def _embed_texts(texts: List[str], batch_size: int = 512): if len(texts) <= batch_size: return model.encode(texts, show_progress_bar=False, convert_to_numpy=True) - # Batch to avoid OOM on large repos all_embeddings = [] for i in range(0, len(texts), batch_size): batch = texts[i:i + batch_size] @@ -302,7 +260,7 @@ def _embed_texts(texts: List[str], batch_size: int = 512): embeddings = model.encode(batch, show_progress_bar=False, convert_to_numpy=True) all_embeddings.append(embeddings) except Exception: - continue # Skip failed batch, keep going + continue if not all_embeddings: return None @@ -311,25 +269,18 @@ def _embed_texts(texts: List[str], batch_size: int = 512): return None -# --------------------------------------------------------------------------- -# RRF Fusion -# --------------------------------------------------------------------------- - def _rrf_fusion( dense_results: List[Tuple[int, float]], bm25_results: List[Tuple[int, float]], limit: int = 10, ) -> List[Tuple[int, float, float, float]]: - """Reciprocal Rank Fusion. Returns [(chunk_id, dense_score, bm25_score, rrf_score)].""" dense_ranks = {cid: rank + 1 for rank, (cid, _) in enumerate(dense_results)} bm25_ranks = {cid: rank + 1 for rank, (cid, _) in enumerate(bm25_results)} dense_scores = {cid: score for cid, score in dense_results} bm25_scores = {cid: score for cid, score in bm25_results} - all_ids = set(dense_ranks.keys()) | set(bm25_ranks.keys()) fused = [] - - for cid in all_ids: + for cid in set(dense_ranks) | set(bm25_ranks): d_rank = dense_ranks.get(cid, len(dense_results) + 100) b_rank = bm25_ranks.get(cid, len(bm25_results) + 100) rrf = 1.0 / (RRF_K + d_rank) + 1.0 / (RRF_K + b_rank) @@ -339,18 +290,12 @@ def _rrf_fusion( return fused[:limit] -# --------------------------------------------------------------------------- -# Helpers -# --------------------------------------------------------------------------- - def _load_metadata(cache_dir: Path) -> List[RetrievalChunk]: - """Load chunk metadata from sidecar.""" path = cache_dir / "vectors_meta.json" if not path.exists(): return [] try: - with open(path, "r", encoding="utf-8") as f: - data = json.load(f) + data = json.loads(path.read_text(encoding="utf-8")) return [RetrievalChunk(**item) for item in data] except Exception: return [] @@ -361,21 +306,44 @@ def _load_bm25_index(cache_dir: Path) -> Optional[dict]: if not path.exists(): return None try: - with open(path, "r", encoding="utf-8") as f: - return json.load(f) + return json.loads(path.read_text(encoding="utf-8")) except Exception: return None +def _load_artifacts(cache_dir: Path, cache_key: Tuple[str, str]) -> dict: + cached = _ARTIFACT_CACHE.get(cache_key) + if cached is not None: + return cached + + embeddings = _load_embeddings(cache_dir) + norms = _load_vector_norms(cache_dir) + if embeddings is not None and norms is None: + try: + import numpy as np + + norms = np.linalg.norm(embeddings, axis=1) + 1e-10 + except Exception: + norms = None + + loaded = { + "metadata": _load_metadata(cache_dir), + "bm25_index": _load_bm25_index(cache_dir), + "embeddings": embeddings, + "norms": norms, + } + _ARTIFACT_CACHE[cache_key] = loaded + return loaded + + def _tokenize(text: str) -> List[str]: - """Simple whitespace + punctuation tokenizer for BM25.""" import re + tokens = re.findall(r"[a-zA-Z_][a-zA-Z0-9_]*", text.lower()) return [t for t in tokens if len(t) > 1] def _git_head(root: str) -> str: - """Get current git HEAD commit hash.""" try: result = subprocess.run( ["git", "rev-parse", "HEAD"], diff --git a/dotscope/storage/atomic.py b/dotscope/storage/atomic.py index 7f96ee5..6ad156a 100644 --- a/dotscope/storage/atomic.py +++ b/dotscope/storage/atomic.py @@ -106,6 +106,62 @@ def _fsync_directory(path: Path) -> None: os.close(fd) +def atomic_write_bytes( + path: str | os.PathLike[str], + payload: bytes, +) -> None: + """Write bytes atomically by replacing the target after a flushed temp write.""" + target = Path(path) + target.parent.mkdir(parents=True, exist_ok=True) + + fd, temp_path = tempfile.mkstemp( + prefix=f".{target.name}.", + suffix=".tmp", + dir=str(target.parent), + ) + try: + with os.fdopen(fd, "wb") as handle: + handle.write(payload) + handle.flush() + os.fsync(handle.fileno()) + + os.replace(temp_path, target) + _fsync_directory(target.parent) + except Exception: + try: + if os.path.exists(temp_path): + os.unlink(temp_path) + except OSError: + pass + raise + + +def atomic_append_jsonl( + path: str | os.PathLike[str], + payload: Any, + *, + max_lines: int | None = None, +) -> None: + """Append one JSONL record and optionally keep only the newest records.""" + target = Path(path) + target.parent.mkdir(parents=True, exist_ok=True) + + with open(target, "a", encoding="utf-8", newline="\n") as handle: + handle.write(json.dumps(payload, separators=(",", ":")) + "\n") + handle.flush() + os.fsync(handle.fileno()) + + if max_lines is None: + return + + try: + lines = target.read_text(encoding="utf-8").splitlines() + except OSError: + return + if len(lines) > max_lines: + atomic_write_text(target, "\n".join(lines[-max_lines:]) + "\n") + + def _is_transient_read_error(exc: OSError) -> bool: if os.name != "nt": return False diff --git a/dotscope/storage/cache_generation.py b/dotscope/storage/cache_generation.py new file mode 100644 index 0000000..cff6210 --- /dev/null +++ b/dotscope/storage/cache_generation.py @@ -0,0 +1,168 @@ +"""Crash-safe cache generation metadata for compiled retrieval artifacts.""" + +from __future__ import annotations + +import hashlib +import os +import time +import uuid +from pathlib import Path +from typing import Dict, Iterable, Optional + +from .atomic import atomic_write_json + + +ACTIVE_GENERATION_FILE = "active_generation.json" +GENERATION_DIR = "generations" +GENERATION_SCHEMA = 1 + + +def cache_root(root: str) -> Path: + return Path(root).resolve() / ".dotscope" / "cache" + + +def generations_root(root: str) -> Path: + return cache_root(root) / GENERATION_DIR + + +def active_generation_path(root: str) -> Path: + return cache_root(root) / ACTIVE_GENERATION_FILE + + +def new_generation_id() -> str: + return f"{int(time.time())}-{uuid.uuid4().hex[:12]}" + + +def generation_path(root: str, generation_id: str) -> Path: + return generations_root(root) / generation_id + + +def compute_artifact_record(path: Path) -> Dict[str, object]: + return { + "size": path.stat().st_size, + "sha256": _sha256_file(path), + } + + +def write_active_generation( + root: str, + generation_id: str, + artifacts: Iterable[str], +) -> Dict[str, object]: + """Write and activate a generation manifest after all artifacts exist.""" + gen_dir = generation_path(root, generation_id) + records = {} + for name in artifacts: + artifact_path = gen_dir / name + if artifact_path.exists(): + records[name] = compute_artifact_record(artifact_path) + + manifest = { + "schema": GENERATION_SCHEMA, + "generation_id": generation_id, + "created_at": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), + "artifacts": records, + } + atomic_write_json(gen_dir / "generation_manifest.json", manifest) + active = { + "schema": GENERATION_SCHEMA, + "generation_id": generation_id, + "created_at": manifest["created_at"], + "manifest": str((gen_dir / "generation_manifest.json").relative_to(cache_root(root))), + "artifacts": records, + } + atomic_write_json(active_generation_path(root), active) + return active + + +def load_active_generation(root: str) -> Optional[Dict[str, object]]: + path = active_generation_path(root) + if not path.exists(): + return None + try: + import json + + data = json.loads(path.read_text(encoding="utf-8")) + except (OSError, ValueError): + return None + if not isinstance(data, dict) or not data.get("generation_id"): + return None + return data + + +def active_cache_dir(root: str) -> Path: + active = load_active_generation(root) + if active: + gen_id = str(active.get("generation_id", "")) + gen_dir = generation_path(root, gen_id) + if gen_dir.is_dir(): + return gen_dir + return cache_root(root) + + +def active_generation_id(root: str) -> str: + active = load_active_generation(root) + if active: + return str(active.get("generation_id", "legacy")) + return "legacy" + + +def validate_active_generation(root: str) -> Dict[str, object]: + """Validate active generation artifact size and hash records.""" + active = load_active_generation(root) + if not active: + legacy = cache_root(root) + legacy_files = [ + legacy / "bm25_index.json", + legacy / "vectors_meta.json", + legacy / "vector_index.json", + ] + present = [p.name for p in legacy_files if p.exists()] + return { + "state": "legacy" if present else "missing", + "generation_id": "legacy", + "missing": [p.name for p in legacy_files if not p.exists()], + "corrupt": [], + "artifacts": present, + } + + gen_id = str(active.get("generation_id", "")) + gen_dir = generation_path(root, gen_id) + missing = [] + corrupt = [] + artifacts = active.get("artifacts", {}) + if not isinstance(artifacts, dict): + artifacts = {} + + for name, expected in artifacts.items(): + path = gen_dir / str(name) + if not path.exists(): + missing.append(str(name)) + continue + if isinstance(expected, dict): + size = expected.get("size") + digest = expected.get("sha256") + if size is not None and path.stat().st_size != size: + corrupt.append(str(name)) + continue + if digest and _sha256_file(path) != digest: + corrupt.append(str(name)) + + state = "healthy" + if missing or corrupt: + state = "corrupt" + return { + "state": state, + "generation_id": gen_id, + "missing": missing, + "corrupt": corrupt, + "artifacts": sorted(artifacts), + } + + +def _sha256_file(path: Path) -> str: + h = hashlib.sha256() + with open(path, "rb") as handle: + for chunk in iter(lambda: handle.read(1024 * 1024), b""): + h.update(chunk) + return h.hexdigest() diff --git a/dotscope/storage/git_hooks.py b/dotscope/storage/git_hooks.py index e120a7a..ea12169 100644 --- a/dotscope/storage/git_hooks.py +++ b/dotscope/storage/git_hooks.py @@ -89,7 +89,6 @@ python3 -m dotscope.cli incremental "$COMMIT_HASH" 2>/dev/null || true # dotscope refresh python3 -m dotscope.cli refresh enqueue --commit "$COMMIT_HASH" 2>/dev/null || true -python3 -m dotscope.cli refresh run --drain 2>/dev/null || true & """ _POST_COMMIT_PY = """\ @@ -110,8 +109,6 @@ subprocess.run([sys.executable, "-m", "dotscope.cli", "refresh", "enqueue", "--commit", commit], timeout=30, capture_output=True) # NOTE: Intentionally fire-and-forget — worker self-terminates via queue drain. - subprocess.Popen([sys.executable, "-m", "dotscope.cli", "refresh", "run", "--drain"], - stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) except Exception: pass # Never block commits """ @@ -121,7 +118,6 @@ # dotscope post-checkout if [ "$3" = "1" ]; then python3 -m dotscope.cli refresh enqueue --repo --reason "branch switch" 2>/dev/null || true - python3 -m dotscope.cli refresh run --drain 2>/dev/null || true & fi """ @@ -134,8 +130,6 @@ subprocess.run([sys.executable, "-m", "dotscope.cli", "refresh", "enqueue", "--repo", "--reason", "branch switch"], timeout=30, capture_output=True) # NOTE: Intentionally fire-and-forget — worker self-terminates via queue drain. - subprocess.Popen([sys.executable, "-m", "dotscope.cli", "refresh", "run", "--drain"], - stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) except Exception: pass """ @@ -144,7 +138,6 @@ #!/bin/sh # dotscope post-merge python3 -m dotscope.cli refresh enqueue --repo --reason "post-merge" 2>/dev/null || true -python3 -m dotscope.cli refresh run --drain 2>/dev/null || true & """ _POST_MERGE_PY = """\ @@ -155,8 +148,6 @@ subprocess.run([sys.executable, "-m", "dotscope.cli", "refresh", "enqueue", "--repo", "--reason", "post-merge"], timeout=30, capture_output=True) # NOTE: Intentionally fire-and-forget — worker self-terminates via queue drain. - subprocess.Popen([sys.executable, "-m", "dotscope.cli", "refresh", "run", "--drain"], - stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) except Exception: pass """ diff --git a/dotscope/storage/request_context.py b/dotscope/storage/request_context.py new file mode 100644 index 0000000..3fcb1d5 --- /dev/null +++ b/dotscope/storage/request_context.py @@ -0,0 +1,25 @@ +"""Per-request telemetry context for MCP and background work.""" + +from __future__ import annotations + +from contextvars import ContextVar +from typing import Optional + + +_REQUEST_ID: ContextVar[str] = ContextVar("dotscope_request_id", default="") + + +def set_request_id(request_id: str): + """Set the current request id and return the context token.""" + return _REQUEST_ID.set(request_id) + + +def reset_request_id(token) -> None: + """Reset the request id to its previous value.""" + _REQUEST_ID.reset(token) + + +def get_request_id() -> Optional[str]: + """Return the current request id, if any.""" + value = _REQUEST_ID.get() + return value or None diff --git a/dotscope/storage/timing.py b/dotscope/storage/timing.py index baa317a..d5c061b 100644 --- a/dotscope/storage/timing.py +++ b/dotscope/storage/timing.py @@ -4,12 +4,14 @@ import os import time from dataclasses import dataclass -from typing import List +from typing import List, Optional + +from .request_context import get_request_id @dataclass class TimingEntry: - operation: str # "resolve", "check", "ingest" + operation: str duration_ms: float timestamp: str @@ -17,22 +19,47 @@ class TimingEntry: _MAX_TIMING_LINES = 5000 -def record_timing(repo_root: str, operation: str, duration_ms: float) -> None: +def record_timing( + repo_root: str, + operation: str, + duration_ms: float, + *, + request_id: Optional[str] = None, + status: str = "ok", + scope: Optional[str] = None, + query: Optional[str] = None, + error_class: str = "", + freshness_state: str = "", + cache_generation: str = "", + result_count: Optional[int] = None, + truncated: Optional[bool] = None, +) -> None: """Append a timing entry to .dotscope/timings.jsonl. Truncates at 5000 lines.""" dot_dir = os.path.join(repo_root, ".dotscope") if not os.path.isdir(dot_dir): - return # No .dotscope dir — skip silently + return path = os.path.join(dot_dir, "timings.jsonl") entry = { + "request_id": request_id or get_request_id() or "", "operation": operation, "duration_ms": round(duration_ms, 2), "timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), + "status": status, + "scope": scope or "", + "query": query or "", + "error_class": error_class, + "freshness_state": freshness_state, + "cache_generation": cache_generation, } + if result_count is not None: + entry["result_count"] = result_count + if truncated is not None: + entry["truncated"] = truncated + with open(path, "a", encoding="utf-8") as f: f.write(json.dumps(entry) + "\n") - # Truncate if too large (keep most recent half) try: with open(path, "r", encoding="utf-8") as f: lines = f.readlines() diff --git a/dotscope/workflows/ingest.py b/dotscope/workflows/ingest.py index 32c1da8..1e3a9cd 100644 --- a/dotscope/workflows/ingest.py +++ b/dotscope/workflows/ingest.py @@ -108,18 +108,21 @@ def ingest( if gravity_bytes and len(gravity_bytes) == len(node_names) * 4: gravity_scores = list(struct.unpack(f"{len(node_names)}I", gravity_bytes)) - with open(manifest_path, "w", encoding="utf-8") as f: - json.dump({ + from ..storage.atomic import atomic_write_bytes, atomic_write_json + + atomic_write_json(manifest_path, { "nodes": node_names, "gravity_scores": gravity_scores, "commits_analyzed": topology_raw.get("commits_analyzed", 0) - }, f) + }) # Flush the zero-copy array entirely bypassing serialization - with open(bin_path, "wb") as f: - f.write(topology_raw.get("edge_sources", b"")) - f.write(topology_raw.get("edge_targets", b"")) - f.write(topology_raw.get("edge_weights", b"")) + atomic_write_bytes( + bin_path, + topology_raw.get("edge_sources", b"") + + topology_raw.get("edge_targets", b"") + + topology_raw.get("edge_weights", b""), + ) progress.finish("Zero-copy WebGPU pipeline synchronized") except ImportError: pass # Fallback to python standard pipeline if deployed in non-native environments @@ -705,6 +708,8 @@ def append_to_index(root: str, planned: PlannedScope) -> None: def _write_scopes(plan: IngestPlan) -> None: """Write all planned .scope files and the .scopes index to disk.""" + from ..storage.atomic import atomic_write_text + written = 0 for ps in plan.scopes: @@ -715,16 +720,14 @@ def _write_scopes(plan: IngestPlan) -> None: os.makedirs(os.path.dirname(scope_path), exist_ok=True) content = serialize_scope(ps.config) - with open(scope_path, "w", encoding="utf-8") as f: - f.write(content) + atomic_write_text(scope_path, content) written += 1 # Write .scopes index (only if it doesn't exist) index_path = os.path.join(plan.root, ".scopes") if plan.index and not os.path.exists(index_path): content = _serialize_index(plan.index) - with open(index_path, "w", encoding="utf-8") as f: - f.write(content) + atomic_write_text(index_path, content) written += 1 import json @@ -749,8 +752,8 @@ def _write_scopes(plan: IngestPlan) -> None: try: manifest_path = os.path.join(plan.root, ".dotscope", "manifest.json") os.makedirs(os.path.dirname(manifest_path), exist_ok=True) - with open(manifest_path, "w", encoding="utf-8") as f: - json.dump(manifest_data, f, indent=2) + from ..storage.atomic import atomic_write_json + atomic_write_json(manifest_path, manifest_data) except Exception as e: _log_ingest_error(plan.root, f"Failed to write manifest.json: {e}") @@ -1093,5 +1096,5 @@ def _cache_invariants(root: str, history: Optional[HistoryAnalysis]) -> None: import json path = os.path.join(dot_dir, "invariants.json") - with open(path, "w", encoding="utf-8") as f: - json.dump(invariants, f, indent=2) + from ..storage.atomic import atomic_write_json + atomic_write_json(path, invariants) diff --git a/dotscope/workflows/ops.py b/dotscope/workflows/ops.py new file mode 100644 index 0000000..6d20c9f --- /dev/null +++ b/dotscope/workflows/ops.py @@ -0,0 +1,98 @@ +"""Operator health summary for Dotscope local state.""" + +from __future__ import annotations + +import os +from typing import Dict + +from ..models.control_plane import OperationResult, legacy_status_dict, make_issue, make_result +from ..storage.cache_generation import validate_active_generation +from .refresh import refresh_status_summary + + +def ops_status_result(root: str, *, request_id: str = "") -> OperationResult: + """Return operator health as a v1 control-plane result.""" + refresh = refresh_status_summary(root) + cache = validate_active_generation(root) + issues = [] + + if refresh.get("lock_stale"): + issues.append(make_issue( + "stale_refresh_lock", + path=".dotscope/refresh.lock", + command=f"dotscope --root {root} refresh recover", + dry_run_command=f"dotscope --root {root} refresh recover --dry-run", + )) + if refresh.get("running") and not refresh.get("lock"): + issues.append(make_issue( + "running_without_lock", + path=".dotscope/refresh_status.json", + command=f"dotscope --root {root} refresh recover", + dry_run_command=f"dotscope --root {root} refresh recover --dry-run", + )) + if refresh.get("last_error"): + issues.append(make_issue( + "refresh_last_error", + path=".dotscope/refresh_status.json", + command=f"dotscope --root {root} refresh status", + )) + + cache_state = str(cache.get("state", "")) + if cache_state in {"corrupt", "missing", "error"}: + code = f"cache_{cache_state}" + issues.append(make_issue( + code, + path=".dotscope/cache", + command=f"dotscope --root {root} refresh repo", + )) + elif cache_state == "legacy": + issues.append(make_issue( + "cache_legacy_generation", + path=".dotscope/cache", + command=f"dotscope --root {root} refresh repo", + )) + + recent_mcp_failures = _recent_mcp_failures(root) + if recent_mcp_failures: + issues.append(make_issue("recent_mcp_failures", path=".dotscope/mcp_activity.jsonl")) + + return make_result( + root=root, + data={ + "refresh": refresh, + "cache": cache, + "recent_mcp_failures": recent_mcp_failures, + }, + issues=issues, + metrics={ + "queued_job_count": int(refresh.get("queued_job_count", 0) or 0), + "recent_mcp_failure_count": len(recent_mcp_failures), + }, + request_id=request_id, + ) + + +def ops_status(root: str) -> Dict[str, object]: + """Compatibility wrapper returning the previous parseable dict shape.""" + return legacy_status_dict(ops_status_result(root)) + + +def _recent_mcp_failures(root: str) -> List[dict]: + path = os.path.join(root, ".dotscope", "mcp_activity.jsonl") + if not os.path.isfile(path): + return [] + failures = [] + try: + import json + + with open(path, "r", encoding="utf-8") as f: + for line in f.readlines()[-100:]: + try: + entry = json.loads(line) + except json.JSONDecodeError: + continue + if entry.get("status") not in {"", None, "ok", "started"}: + failures.append(entry) + except OSError: + return [] + return failures[-10:] diff --git a/dotscope/workflows/refresh.py b/dotscope/workflows/refresh.py index 9614560..b606bfb 100644 --- a/dotscope/workflows/refresh.py +++ b/dotscope/workflows/refresh.py @@ -2,9 +2,13 @@ import json import os +import socket import subprocess import sys +import threading import time +import traceback +import uuid from typing import Dict, Iterable, List, Optional, Tuple from ..paths import normalize_relative_path, scope_storage_key @@ -21,10 +25,26 @@ save_incremental_state, utc_now_iso, ) -from ..storage.atomic import atomic_write_json +from ..storage.atomic import atomic_append_jsonl, atomic_write_json +from ..models.control_plane import OperationResult, legacy_status_dict, make_issue, make_result REFRESH_WAIT_SECONDS = 1.0 +REFRESH_HEARTBEAT_SECONDS = 5.0 +MAX_REFRESH_QUEUE_LENGTH = 100 +MAX_REFRESH_JOURNAL_LINES = 5000 +MAX_WORKER_LOG_BYTES = 2 * 1024 * 1024 +TERMINAL_REFRESH_EVENTS = {"succeeded", "failed", "abandoned"} +VALID_REFRESH_EVENTS = { + "queued", + "started", + "heartbeat", + "phase", + "succeeded", + "failed", + "abandoned", + "replayed", +} def refresh_queue_path(root: str) -> str: @@ -39,12 +59,82 @@ def refresh_lock_path(root: str) -> str: return os.path.join(root, ".dotscope", "refresh.lock") +def refresh_journal_path(root: str) -> str: + return os.path.join(root, ".dotscope", "refresh_journal.jsonl") + + def _ensure_dot_dir(root: str) -> str: dot_dir = os.path.join(root, ".dotscope") os.makedirs(dot_dir, exist_ok=True) return dot_dir +def _hostname() -> str: + try: + return socket.gethostname() + except OSError: + return "unknown" + + +def _refresh_stale_seconds() -> float: + raw = os.environ.get("DOTSCOPE_REFRESH_STALE_SECONDS") + if raw: + try: + return max(10.0, float(raw)) + except ValueError: + pass + return 120.0 + + +def _parse_iso_epoch(timestamp: str) -> float: + if not timestamp: + return 0.0 + try: + from datetime import datetime + + return datetime.fromisoformat(timestamp.replace("Z", "+00:00")).timestamp() + except (TypeError, ValueError): + return 0.0 + + +def _process_start_token(pid: int) -> str: + if pid <= 0: + return "" + if os.name == "nt": + return "" + try: + with open(f"/proc/{pid}/stat", "r", encoding="utf-8") as f: + return f.read().split()[21] + except (OSError, IndexError): + return "" + + +def _pid_alive(pid: int) -> bool: + if pid <= 0: + return False + if os.name == "nt": + try: + result = subprocess.run( + ["tasklist", "/FI", f"PID eq {pid}", "/NH"], + capture_output=True, + text=True, + timeout=2, + check=False, + ) + return str(pid) in result.stdout + except Exception: + return False + try: + os.kill(pid, 0) + return True + except ProcessLookupError: + return False + except PermissionError: + return True + except OSError: + return False + + def _default_status() -> Dict[str, object]: return { "running": False, @@ -55,6 +145,12 @@ def _default_status() -> Dict[str, object]: "last_error": "", "last_job_kind": None, "last_targets": [], + "job_id": "", + "pid": None, + "heartbeat_at": "", + "phase": "", + "worker_log": "", + "last_traceback": "", } @@ -73,7 +169,7 @@ def load_refresh_queue(root: str) -> List[Dict[str, object]]: def save_refresh_queue(root: str, jobs: Iterable[Dict[str, object]]) -> None: _ensure_dot_dir(root) - atomic_write_json(refresh_queue_path(root), {"jobs": list(jobs)}) + atomic_write_json(refresh_queue_path(root), {"jobs": list(jobs)[:MAX_REFRESH_QUEUE_LENGTH]}) def load_refresh_status(root: str) -> Dict[str, object]: @@ -109,19 +205,103 @@ def _normalize_targets(targets: Iterable[str]) -> List[str]: return sorted(normalized) -def _make_job(kind: str, targets: Iterable[str], reason: str = "") -> Dict[str, object]: +def _make_job( + kind: str, + targets: Iterable[str], + reason: str = "", + *, + job_id: Optional[str] = None, + parent_request_id: str = "", + trigger: str = "", + commit_hash: str = "", +) -> Dict[str, object]: return { + "job_id": job_id or uuid.uuid4().hex, "kind": kind, "targets": _normalize_targets(targets), "reason": reason, "enqueued_at": utc_now_iso(), + "parent_request_id": parent_request_id, + "trigger": trigger, + "commit_hash": commit_hash, + } + + +def append_refresh_journal( + root: str, + event: str, + job: Dict[str, object], + *, + phase: str = "", + error: str = "", + worker_log: str = "", + request_id: str = "", +) -> None: + if event not in VALID_REFRESH_EVENTS: + raise ValueError(f"invalid refresh journal event: {event}") + record = { + "event": event, + "job_id": job.get("job_id", ""), + "kind": job.get("kind", ""), + "targets": job.get("targets", []), + "reason": job.get("reason", ""), + "ts": utc_now_iso(), + "pid": os.getpid(), + "hostname": _hostname(), + "phase": phase, + "error": error, + "worker_log": worker_log, + "request_id": request_id or job.get("parent_request_id", ""), + "trigger": job.get("trigger", ""), + "commit_hash": job.get("commit_hash", ""), } + atomic_append_jsonl( + refresh_journal_path(root), + record, + max_lines=MAX_REFRESH_JOURNAL_LINES, + ) + + +def load_refresh_journal(root: str, limit: Optional[int] = None) -> List[Dict[str, object]]: + path = refresh_journal_path(root) + if not os.path.isfile(path): + return [] + entries: List[Dict[str, object]] = [] + try: + with open(path, "r", encoding="utf-8") as f: + lines = f.readlines() + except OSError: + return [] + if limit is not None: + lines = lines[-limit:] + for line in lines: + try: + entry = json.loads(line) + except json.JSONDecodeError: + continue + if isinstance(entry, dict): + entries.append(entry) + return entries + + +def _terminal_job_ids(root: str) -> set: + terminal = set() + for entry in load_refresh_journal(root): + event = entry.get("event") + job_id = entry.get("job_id") + if event in TERMINAL_REFRESH_EVENTS and job_id: + terminal.add(job_id) + return terminal def enqueue_scope_refresh( root: str, targets: Iterable[str], reason: str = "", + *, + parent_request_id: str = "", + trigger: str = "", + commit_hash: str = "", ) -> Optional[Dict[str, object]]: targets = _normalize_targets(targets) if not targets: @@ -138,23 +318,54 @@ def enqueue_scope_refresh( if job.get("kind") == "scope" and _normalize_targets(job.get("targets", [])) == targets: return job - job = _make_job("scope", targets, reason=reason) + job = _make_job( + "scope", + targets, + reason=reason, + parent_request_id=parent_request_id, + trigger=trigger, + commit_hash=commit_hash, + ) queue.append(job) save_refresh_queue(root, queue) + append_refresh_journal(root, "queued", job) return job -def enqueue_repo_refresh(root: str, reason: str = "") -> Dict[str, object]: +def enqueue_repo_refresh( + root: str, + reason: str = "", + *, + parent_request_id: str = "", + trigger: str = "", + commit_hash: str = "", +) -> Dict[str, object]: queue = load_refresh_queue(root) status = load_refresh_status(root) if status.get("running") and status.get("current_job") == "repo": - return _make_job("repo", [], reason=reason) + return _make_job( + "repo", + [], + reason=reason, + parent_request_id=parent_request_id, + trigger=trigger, + commit_hash=commit_hash, + ) - job = _make_job("repo", [], reason=reason) - queue = [job_entry for job_entry in queue if job_entry.get("kind") != "repo"] - queue = [job_entry for job_entry in queue if job_entry.get("kind") == "repo"] + job = _make_job( + "repo", + [], + reason=reason, + parent_request_id=parent_request_id, + trigger=trigger, + commit_hash=commit_hash, + ) + for queued in queue: + if queued.get("job_id"): + append_refresh_journal(root, "abandoned", queued, phase="superseded_by_repo") queue = [job] save_refresh_queue(root, queue) + append_refresh_journal(root, "queued", job) return job @@ -335,8 +546,19 @@ def enqueue_commit_refresh(root: str, commit_hash: str) -> Optional[Dict[str, ob job = classify_refresh_job(root, changed, added, deleted, renamed=renamed) if job["kind"] == "repo": - return enqueue_repo_refresh(root, reason=str(job.get("reason", ""))) - return enqueue_scope_refresh(root, job.get("targets", []), reason=str(job.get("reason", ""))) + return enqueue_repo_refresh( + root, + reason=str(job.get("reason", "")), + trigger="git-hook", + commit_hash=commit_hash, + ) + return enqueue_scope_refresh( + root, + job.get("targets", []), + reason=str(job.get("reason", "")), + trigger="git-hook", + commit_hash=commit_hash, + ) def run_scope_refresh(root: str, targets: Iterable[str], quiet: bool = True) -> bool: @@ -404,102 +626,337 @@ def run_repo_refresh(root: str, quiet: bool = True) -> bool: return True -def _acquire_refresh_lock(root: str) -> Optional[int]: +def _read_refresh_lock(root: str) -> Tuple[Optional[Dict[str, object]], str]: + path = refresh_lock_path(root) + if not os.path.exists(path): + return None, "missing" + try: + raw = open(path, "r", encoding="utf-8").read().strip() + except OSError: + return None, "unreadable" + if not raw: + return None, "malformed" + try: + data = json.loads(raw) + except json.JSONDecodeError: + if raw.isdigit(): + return { + "legacy": True, + "pid": int(raw), + "heartbeat_at": "", + "hostname": _hostname(), + }, "legacy" + return None, "malformed" + if isinstance(data, dict): + return data, "json" + return None, "malformed" + + +def _lock_is_stale(lock: Optional[Dict[str, object]], kind: str) -> bool: + if not lock: + return True + heartbeat_epoch = _parse_iso_epoch(str(lock.get("heartbeat_at", ""))) + heartbeat_stale = (time.time() - heartbeat_epoch) > _refresh_stale_seconds() if heartbeat_epoch else True + if heartbeat_stale: + return True + + if str(lock.get("hostname", "")) != _hostname(): + return False + + pid = int(lock.get("pid") or 0) + if not _pid_alive(pid): + return True + expected_start = str(lock.get("process_start", "")) + actual_start = _process_start_token(pid) + if expected_start and actual_start and expected_start != actual_start: + return True + return False + + +def _refresh_lock_quarantine_path(root: str, reason: str) -> str: + return f"{refresh_lock_path(root)}.dead.{int(time.time())}.{reason}" + + +def _quarantine_refresh_lock(root: str, reason: str, dest: Optional[str] = None) -> Optional[str]: + path = refresh_lock_path(root) + if not os.path.exists(path): + return None + dest = dest or _refresh_lock_quarantine_path(root, reason) + try: + os.replace(path, dest) + return dest + except OSError: + return None + + +def _acquire_refresh_lock(root: str, job: Dict[str, object]) -> Optional[Tuple[int, Dict[str, object]]]: _ensure_dot_dir(root) path = refresh_lock_path(root) + existing, existing_kind = _read_refresh_lock(root) + if existing is not None or existing_kind in {"legacy", "malformed", "unreadable"}: + if _lock_is_stale(existing, existing_kind): + _quarantine_refresh_lock(root, existing_kind) + else: + return None + + owner = { + "job_id": job.get("job_id", ""), + "owner_token": uuid.uuid4().hex, + "pid": os.getpid(), + "process_start": _process_start_token(os.getpid()), + "hostname": _hostname(), + "started_at": utc_now_iso(), + "heartbeat_at": utc_now_iso(), + "kind": job.get("kind", ""), + "targets": job.get("targets", []), + "phase": "acquired", + } try: fd = os.open(path, os.O_CREAT | os.O_EXCL | os.O_WRONLY) except FileExistsError: return None - os.write(fd, str(os.getpid()).encode("utf-8")) - return fd + os.write(fd, json.dumps(owner, indent=2).encode("utf-8")) + os.fsync(fd) + os.close(fd) + return None, owner -def _release_refresh_lock(root: str, fd: Optional[int]) -> None: +def _lock_matches_owner(root: str, owner: Dict[str, object]) -> bool: + current, _kind = _read_refresh_lock(root) + if not current: + return False + return ( + current.get("job_id") == owner.get("job_id") + and current.get("owner_token") == owner.get("owner_token") + ) + + +def _heartbeat_refresh_lock( + root: str, + owner: Dict[str, object], + *, + phase: str, + job: Dict[str, object], + worker_log: str = "", +) -> None: + if not _lock_matches_owner(root, owner): + return + owner = dict(owner) + owner["heartbeat_at"] = utc_now_iso() + owner["phase"] = phase + atomic_write_json(refresh_lock_path(root), owner) + + status = load_refresh_status(root) + status.update({ + "running": True, + "current_job": job.get("kind"), + "current_targets": job.get("targets", []), + "job_id": job.get("job_id", ""), + "pid": os.getpid(), + "heartbeat_at": owner["heartbeat_at"], + "phase": phase, + "worker_log": worker_log, + }) + save_refresh_status(root, status) + append_refresh_journal(root, "heartbeat", job, phase=phase, worker_log=worker_log) + + +def _release_refresh_lock(root: str, fd: Optional[int], owner: Optional[Dict[str, object]]) -> None: if fd is not None: try: os.close(fd) except OSError: pass + if owner and _lock_matches_owner(root, owner): + try: + os.remove(refresh_lock_path(root)) + except OSError: + pass + + +def _worker_log_path(root: str, job: Dict[str, object]) -> str: + safe_job_id = str(job.get("job_id", "unknown")).replace(os.sep, "_").replace("/", "_") + started = str(job.get("enqueued_at", utc_now_iso())).replace(":", "").replace("-", "") + return os.path.join(root, ".dotscope", f"refresh_worker.{safe_job_id}.{started}.log") + + +def _remove_job_from_queue(root: str, job_id: str) -> None: + if not job_id: + return + queue = [job for job in load_refresh_queue(root) if job.get("job_id") != job_id] + save_refresh_queue(root, queue) + + +def _write_bounded_worker_log(log_path: str, text: str) -> None: + if not text: + return + os.makedirs(os.path.dirname(log_path), exist_ok=True) + with open(log_path, "a", encoding="utf-8", errors="replace") as f: + f.write(text) try: - os.remove(refresh_lock_path(root)) + if os.path.getsize(log_path) > MAX_WORKER_LOG_BYTES: + with open(log_path, "rb") as f: + f.seek(-MAX_WORKER_LOG_BYTES, os.SEEK_END) + tail = f.read() + with open(log_path, "wb") as f: + f.write(tail) except OSError: pass +def _run_with_periodic_heartbeat( + root: str, + owner: Dict[str, object], + *, + phase: str, + job: Dict[str, object], + worker_log: str, + func, + func_args: Tuple[object, ...] = (), + func_kwargs: Optional[Dict[str, object]] = None, +): + call_kwargs = func_kwargs or {} + stop_event = threading.Event() + + def _beat() -> None: + while not stop_event.wait(REFRESH_HEARTBEAT_SECONDS): + try: + _heartbeat_refresh_lock(root, owner, phase=phase, job=job, worker_log=worker_log) + except Exception as exc: + _write_bounded_worker_log( + worker_log, + f"[{utc_now_iso()}] heartbeat failed: {type(exc).__name__}: {exc}\n", + ) + + thread = threading.Thread( + target=_beat, + name=f"dotscope-refresh-heartbeat-{job.get('job_id', 'unknown')}", + daemon=True, + ) + thread.start() + try: + return func(*func_args, **call_kwargs) + finally: + stop_event.set() + thread.join(timeout=1.0) + + def run_refresh_queue(root: str, drain: bool = False) -> bool: """Run queued refresh work. Returns True if a job ran.""" - lock_fd = _acquire_refresh_lock(root) - if lock_fd is None: - return False - ran_job = False - status = load_refresh_status(root) - try: - while True: - queue = load_refresh_queue(root) - if not queue: - break - - job = queue.pop(0) + while True: + queue = load_refresh_queue(root) + if not queue: + break + + job = queue[0] + if "job_id" not in job: + job["job_id"] = uuid.uuid4().hex + queue[0] = job save_refresh_queue(root, queue) + append_refresh_journal(root, "queued", job, phase="legacy_job_id_backfill") + + acquired = _acquire_refresh_lock(root, job) + if acquired is None: + return ran_job + lock_fd, owner = acquired + worker_log = _worker_log_path(root, job) + + try: + append_refresh_journal(root, "started", job, phase="start", worker_log=worker_log) + _heartbeat_refresh_lock(root, owner, phase="start", job=job, worker_log=worker_log) + + if job.get("kind") == "repo": + append_refresh_journal(root, "phase", job, phase="repo_refresh", worker_log=worker_log) + _heartbeat_refresh_lock(root, owner, phase="repo_refresh", job=job, worker_log=worker_log) + _run_with_periodic_heartbeat( + root, + owner, + phase="repo_refresh", + job=job, + worker_log=worker_log, + func=run_repo_refresh, + func_args=(root,), + func_kwargs={"quiet": True}, + ) + else: + append_refresh_journal(root, "phase", job, phase="scope_refresh", worker_log=worker_log) + _heartbeat_refresh_lock(root, owner, phase="scope_refresh", job=job, worker_log=worker_log) + _run_with_periodic_heartbeat( + root, + owner, + phase="scope_refresh", + job=job, + worker_log=worker_log, + func=run_scope_refresh, + func_args=(root, job.get("targets", [])), + func_kwargs={"quiet": True}, + ) + + append_refresh_journal(root, "succeeded", job, phase="complete", worker_log=worker_log) + status = load_refresh_status(root) status.update({ - "running": True, - "current_job": job.get("kind"), - "current_targets": job.get("targets", []), + "running": False, + "current_job": None, + "current_targets": [], + "last_success_at": utc_now_iso(), + "last_job_kind": job.get("kind"), + "last_targets": job.get("targets", []), + "last_error": "", + "job_id": job.get("job_id", ""), + "phase": "complete", + "worker_log": worker_log, + "last_traceback": "", }) save_refresh_status(root, status) - - try: - if job.get("kind") == "repo": - run_repo_refresh(root, quiet=True) - else: - run_scope_refresh(root, job.get("targets", []), quiet=True) - - status.update({ - "last_success_at": utc_now_iso(), - "last_job_kind": job.get("kind"), - "last_targets": job.get("targets", []), - "last_error": "", - }) - except Exception as exc: - status.update({ - "last_error_at": utc_now_iso(), - "last_error": str(exc), - "last_job_kind": job.get("kind"), - "last_targets": job.get("targets", []), - }) - save_refresh_status(root, status) - raise - finally: - status.update({ - "running": False, - "current_job": None, - "current_targets": [], - }) - save_refresh_status(root, status) - - ran_job = True - if not drain: - break - finally: - _release_refresh_lock(root, lock_fd) - + _remove_job_from_queue(root, str(job.get("job_id", ""))) + except Exception as exc: + tb = traceback.format_exc() + append_refresh_journal(root, "failed", job, phase="failed", error=str(exc), worker_log=worker_log) + status = load_refresh_status(root) + status.update({ + "running": False, + "current_job": None, + "current_targets": [], + "last_error_at": utc_now_iso(), + "last_error": str(exc), + "last_job_kind": job.get("kind"), + "last_targets": job.get("targets", []), + "job_id": job.get("job_id", ""), + "phase": "failed", + "worker_log": worker_log, + "last_traceback": tb[-4000:], + }) + save_refresh_status(root, status) + _remove_job_from_queue(root, str(job.get("job_id", ""))) + raise + finally: + _release_refresh_lock(root, lock_fd, owner) + + ran_job = True + if not drain: + break return ran_job def kick_refresh_worker(root: str) -> None: """Spawn a detached worker to drain the refresh queue.""" - if not load_refresh_queue(root): + queue = load_refresh_queue(root) + if not queue: return # NOTE: Intentionally fire-and-forget — worker self-terminates via queue drain. # Popen does not support timeout=; the child process exits once the queue is empty. + log_path = _worker_log_path(root, queue[0]) + os.makedirs(os.path.dirname(log_path), exist_ok=True) + log_handle = open(log_path, "a", encoding="utf-8", errors="replace") + log_handle.write(f"[{utc_now_iso()}] starting refresh worker\n") + log_handle.flush() subprocess.Popen( [sys.executable, "-m", "dotscope.cli", "refresh", "run", "--drain"], cwd=root, - stdout=subprocess.DEVNULL, - stderr=subprocess.DEVNULL, + stdout=log_handle, + stderr=log_handle, ) @@ -507,12 +964,195 @@ def refresh_status_summary(root: str) -> Dict[str, object]: """Return the current queue + worker status.""" status = load_refresh_status(root) queue = load_refresh_queue(root) + lock, lock_kind = _read_refresh_lock(root) summary = dict(status) summary["queued_job_count"] = len(queue) summary["queued_jobs"] = queue + summary["lock"] = lock or {} + summary["lock_kind"] = lock_kind + summary["lock_stale"] = _lock_is_stale(lock, lock_kind) if lock or lock_kind != "missing" else False + summary["journal_tail"] = load_refresh_journal(root, limit=10) return summary +def refresh_status_result(root: str, *, request_id: str = "") -> OperationResult: + """Return refresh status as a v1 control-plane result.""" + summary = refresh_status_summary(root) + issues = [] + if summary.get("lock_stale"): + issues.append(make_issue( + "stale_refresh_lock", + path=".dotscope/refresh.lock", + command=f"dotscope --root {root} refresh recover", + dry_run_command=f"dotscope --root {root} refresh recover --dry-run", + )) + if summary.get("running") and not summary.get("lock"): + issues.append(make_issue( + "running_without_lock", + path=".dotscope/refresh_status.json", + command=f"dotscope --root {root} refresh recover", + dry_run_command=f"dotscope --root {root} refresh recover --dry-run", + )) + if summary.get("last_error"): + issues.append(make_issue( + "refresh_last_error", + path=".dotscope/refresh_status.json", + command=f"dotscope --root {root} refresh status", + )) + return make_result( + root=root, + data={"refresh": summary}, + issues=issues, + metrics={"queued_job_count": int(summary.get("queued_job_count", 0) or 0)}, + request_id=request_id, + ) + + +def _plan_refresh_recovery(root: str) -> Dict[str, object]: + actions: List[str] = [] + quarantine_dest = "" + status_update: Optional[Dict[str, object]] = None + queue_changed = False + + lock, lock_kind = _read_refresh_lock(root) + if lock or lock_kind in {"legacy", "malformed", "unreadable"}: + if _lock_is_stale(lock, lock_kind): + quarantine_dest = _refresh_lock_quarantine_path(root, lock_kind) + actions.append(f"quarantined_lock:{os.path.basename(quarantine_dest)}") + + status = load_refresh_status(root) + effective_lock = None if quarantine_dest else lock + effective_lock_kind = "missing" if quarantine_dest else lock_kind + if status.get("running") and (not effective_lock or _lock_is_stale(effective_lock, effective_lock_kind)): + status_update = dict(status) + status_update.update({ + "running": False, + "current_job": None, + "current_targets": [], + "phase": "recovered", + "last_error_at": utc_now_iso(), + "last_error": "Recovered stale running refresh status", + }) + actions.append("cleared_stale_running_status") + + terminal = _terminal_job_ids(root) + latest_by_job: Dict[str, Dict[str, object]] = {} + for entry in load_refresh_journal(root): + job_id = str(entry.get("job_id", "")) + if job_id: + latest_by_job[job_id] = entry + + queue = load_refresh_queue(root) + queued_ids = {str(job.get("job_id", "")) for job in queue} + replay_jobs = [] + for job_id, entry in latest_by_job.items(): + if job_id in terminal or job_id in queued_ids: + continue + if entry.get("event") in {"queued", "started", "heartbeat", "phase", "replayed"}: + job = _make_job( + str(entry.get("kind", "scope")), + entry.get("targets", []), + reason=str(entry.get("reason", "recovered")), + job_id=job_id, + parent_request_id=str(entry.get("request_id", "")), + trigger=str(entry.get("trigger", "")), + commit_hash=str(entry.get("commit_hash", "")), + ) + queue.append(job) + replay_jobs.append(job) + queue_changed = True + actions.append(f"requeued:{job_id}") + + cache_state = {"state": "unknown"} + try: + from ..storage.cache_generation import validate_active_generation + + cache_state = validate_active_generation(root) + except Exception as exc: + cache_state = {"state": "error", "error": str(exc)} + + return { + "actions": actions, + "quarantine_dest": quarantine_dest, + "status_update": status_update, + "queue": queue, + "queue_changed": queue_changed, + "replay_jobs": replay_jobs, + "cache": cache_state, + } + + +def recover_refresh_state(root: str, dry_run: bool = False) -> Dict[str, object]: + """Recover refresh queue/status from lock and journal state.""" + plan = _plan_refresh_recovery(root) + if not dry_run: + if plan.get("actions"): + _ensure_dot_dir(root) + quarantine_dest = str(plan.get("quarantine_dest", "")) + if quarantine_dest: + _quarantine_refresh_lock(root, _read_refresh_lock(root)[1], quarantine_dest) + + status_update = plan.get("status_update") + if isinstance(status_update, dict): + save_refresh_status(root, status_update) + + for job in plan.get("replay_jobs", []): + append_refresh_journal(root, "replayed", job, phase="recover") + if plan.get("queue_changed"): + save_refresh_queue(root, plan.get("queue", [])) + + cache_state = plan.get("cache", {"state": "unknown"}) + return { + "actions": plan.get("actions", []), + "dry_run": dry_run, + "status": refresh_status_summary(root), + "cache": cache_state, + "needs_full_rebuild": cache_state.get("state") in {"missing", "corrupt", "error"}, + } + + +def recover_refresh_state_result( + root: str, + *, + dry_run: bool = False, + request_id: str = "", +) -> OperationResult: + report = recover_refresh_state(root, dry_run=dry_run) + issues = [] + status = report.get("status", {}) + if status.get("lock_stale"): + issues.append(make_issue( + "stale_refresh_lock", + path=".dotscope/refresh.lock", + command=f"dotscope --root {root} refresh recover", + dry_run_command=f"dotscope --root {root} refresh recover --dry-run", + )) + if status.get("running") and not status.get("lock"): + issues.append(make_issue( + "running_without_lock", + path=".dotscope/refresh_status.json", + command=f"dotscope --root {root} refresh recover", + dry_run_command=f"dotscope --root {root} refresh recover --dry-run", + )) + cache_state = str(report.get("cache", {}).get("state", "")) + if cache_state in {"corrupt", "missing", "error"}: + issues.append(make_issue( + f"cache_{cache_state}", + path=".dotscope/cache", + command=f"dotscope --root {root} refresh repo", + )) + return make_result( + root=root, + data={"recovery": report}, + issues=issues, + metrics={ + "action_count": len(report.get("actions", [])), + "dry_run": dry_run, + }, + request_id=request_id, + ) + + def run_refresh_inline( root: str, targets: Optional[List[str]] = None, @@ -595,6 +1235,7 @@ def ensure_resolution_freshness( """Attempt to self-heal stale or missing scopes before resolution.""" from ..engine.composer import parse_expression from ..ux.health import check_staleness + from ..storage.request_context import get_request_id try: refs = [op.ref.name for op in parse_expression(scope_expr)] @@ -653,14 +1294,19 @@ def ensure_resolution_freshness( if needs_repo: job_kind = "repo" - enqueue_repo_refresh(root, reason=f"resolve:{scope_expr}") + enqueue_repo_refresh(root, reason=f"resolve:{scope_expr}", parent_request_id=get_request_id() or "") kick_refresh_worker(root) if _wait_for_repo_refresh(root, timeout_seconds): healed = True needs_repo = False if stale_scope_targets: - enqueue_scope_refresh(root, stale_scope_targets, reason=f"resolve:{scope_expr}") + enqueue_scope_refresh( + root, + stale_scope_targets, + reason=f"resolve:{scope_expr}", + parent_request_id=get_request_id() or "", + ) kick_refresh_worker(root) config, source = find_effective_scope_with_source(primary_ref or scope_expr, root=root) diff --git a/tests/test_control_plane_contract.py b/tests/test_control_plane_contract.py new file mode 100644 index 0000000..5db4936 --- /dev/null +++ b/tests/test_control_plane_contract.py @@ -0,0 +1,188 @@ +import json +import subprocess + +import pytest + +from dotscope.engine.search import execute_search_result, execute_semantic_search +from dotscope.cli import main +from dotscope.mcp.middleware import mcp_tool_route +from dotscope.models.control_plane import ( + ISSUE_CATALOG, + OperationResult, + make_issue, + make_result, +) +from dotscope.workflows.refresh import recover_refresh_state, recover_refresh_state_result + + +def test_operation_result_truth_table(tmp_path): + healthy = make_result(root=str(tmp_path), data={"value": 1}) + degraded = make_result(root=str(tmp_path), issues=[make_issue("cache_legacy_generation")]) + failed = make_result(root=str(tmp_path), issues=[make_issue("stale_refresh_lock")]) + + assert (healthy.ok, healthy.status, healthy.exit_code) == (True, "healthy", 0) + assert (degraded.ok, degraded.status, degraded.exit_code) == (False, "degraded", 1) + assert (failed.ok, failed.status, failed.exit_code) == (False, "failed", 2) + + +def test_operation_result_rejects_json_string_payload(tmp_path): + with pytest.raises(ValueError): + make_result(root=str(tmp_path), data={"opaque": '{"old": "json"}'}) + + +def test_issue_catalog_contains_initial_codes(): + expected = { + "stale_refresh_lock", + "running_without_lock", + "cache_legacy_generation", + "cache_corrupt", + "recent_mcp_failures", + "search_truncated", + "invalid_query", + "repo_root_missing", + "legacy_refresh_syntax", + "dry_run_required", + } + + assert expected <= set(ISSUE_CATALOG) + + +def test_search_literal_default_treats_regex_chars_as_text(tmp_path): + subprocess.run(["git", "init"], cwd=str(tmp_path), capture_output=True, check=False) + subprocess.run(["git", "config", "user.email", "test@test.com"], cwd=str(tmp_path), capture_output=True, check=False) + subprocess.run(["git", "config", "user.name", "Test"], cwd=str(tmp_path), capture_output=True, check=False) + target = tmp_path / "query.py" + target.write_text("value = 'a.b'\nother = 'acb'\n") + subprocess.run(["git", "add", "-A"], cwd=str(tmp_path), capture_output=True, check=False) + subprocess.run(["git", "commit", "-m", "init", "--allow-empty"], cwd=str(tmp_path), capture_output=True, check=False) + + literal = json.loads(execute_semantic_search(str(tmp_path), "a.b")) + regex = json.loads(execute_semantic_search(str(tmp_path), "a.b", mode="regex")) + + assert literal["mode"] == "literal" + assert literal["results"] + assert all("acb" not in "\n".join(result["snippets"]) for result in literal["results"]) + assert regex["mode"] == "regex" + + +def test_search_regex_mode_rejects_invalid_regex(tmp_path): + result = execute_search_result(str(tmp_path), "[", mode="regex") + payload = result.to_dict() + + assert payload["status"] == "failed" + assert payload["issues"][0]["code"] == "invalid_query" + + +def test_recover_dry_run_does_not_mutate(tmp_path): + dot_dir = tmp_path / ".dotscope" + dot_dir.mkdir() + lock_path = dot_dir / "refresh.lock" + status_path = dot_dir / "refresh_status.json" + lock_path.write_text("999999") + status_path.write_text('{"running": true, "current_job": "repo", "current_targets": []}') + + before = { + path.name: path.read_text() + for path in dot_dir.iterdir() + if path.is_file() + } + result = recover_refresh_state_result(str(tmp_path), dry_run=True) + after = { + path.name: path.read_text() + for path in dot_dir.iterdir() + if path.is_file() + } + + assert result.data["recovery"]["actions"] + assert before == after + assert not list(dot_dir.glob("refresh.lock.dead.*")) + + +def test_recover_applied_uses_same_planned_actions(tmp_path): + dot_dir = tmp_path / ".dotscope" + dot_dir.mkdir() + (dot_dir / "refresh.lock").write_text("999999") + + dry = recover_refresh_state(str(tmp_path), dry_run=True) + applied = recover_refresh_state(str(tmp_path), dry_run=False) + + assert dry["actions"] == applied["actions"] + assert not (dot_dir / "refresh.lock").exists() + + +def test_mcp_route_returns_v1_envelope_on_success(tmp_path, monkeypatch): + (tmp_path / ".git").mkdir() + monkeypatch.chdir(tmp_path) + + @mcp_tool_route + def tool(root=None): + return {"value": 1} + + result = tool() + + assert isinstance(result, dict) + assert result["schema_version"] == "dotscope.control_plane.v1" + assert result["request_id"] + assert result["root"] is None + assert result["data"] == {"value": 1} + + +def test_mcp_route_legacy_returns_json_string(tmp_path, monkeypatch): + (tmp_path / ".git").mkdir() + monkeypatch.chdir(tmp_path) + + @mcp_tool_route(legacy=True) + def tool(root=None): + return {"value": 1} + + result = tool() + + assert isinstance(result, str) + assert json.loads(result) == {"value": 1} + + +def test_mcp_route_root_failure_is_failed_envelope(tmp_path, monkeypatch): + monkeypatch.chdir(tmp_path) + + @mcp_tool_route + def tool(root=None): + return {"value": 1} + + result = tool() + + assert result["status"] == "failed" + assert result["issues"][0]["code"] == "repo_root_missing" + + +def test_cli_ops_status_json_uses_root_without_chdir(tmp_path, monkeypatch, capsys): + repo = tmp_path / "repo" + outside = tmp_path / "outside" + repo.mkdir() + outside.mkdir() + (repo / ".git").mkdir() + monkeypatch.chdir(outside) + + with pytest.raises(SystemExit): + main(["--root", str(repo), "ops", "status", "--json"]) + + payload = json.loads(capsys.readouterr().out) + assert payload["schema_version"] == "dotscope.control_plane.v1" + assert payload["root"] == str(repo.resolve()) + assert payload["issues"][0]["code"] == "cache_missing" + + +def test_cli_refresh_recover_dry_run_json_is_non_mutating(tmp_path, capsys): + (tmp_path / ".git").mkdir() + dot_dir = tmp_path / ".dotscope" + dot_dir.mkdir() + lock_path = dot_dir / "refresh.lock" + lock_path.write_text("999999") + + with pytest.raises(SystemExit): + main(["--root", str(tmp_path), "refresh", "recover", "--dry-run", "--json"]) + + payload = json.loads(capsys.readouterr().out) + assert payload["schema_version"] == "dotscope.control_plane.v1" + assert payload["data"]["recovery"]["dry_run"] is True + assert lock_path.exists() + assert not list(dot_dir.glob("refresh.lock.dead.*")) diff --git a/tests/test_production_hardening.py b/tests/test_production_hardening.py new file mode 100644 index 0000000..bad9ce6 --- /dev/null +++ b/tests/test_production_hardening.py @@ -0,0 +1,54 @@ +"""Production hardening regression tests.""" + +import json +import subprocess + +from dotscope.engine import search as engine_search +from dotscope.workflows.ops import ops_status + + +def _git_init(path): + subprocess.run(["git", "init"], cwd=str(path), capture_output=True, check=False) + subprocess.run(["git", "config", "user.email", "test@test.com"], cwd=str(path), capture_output=True, check=False) + subprocess.run(["git", "config", "user.name", "Test"], cwd=str(path), capture_output=True, check=False) + + +def _git_commit(path): + subprocess.run(["git", "add", "-A"], cwd=str(path), capture_output=True, check=False) + subprocess.run(["git", "commit", "-m", "initial"], cwd=str(path), capture_output=True, check=False) + + +def test_search_rejects_too_short_query(tmp_path): + _git_init(tmp_path) + + payload = json.loads(engine_search.execute_semantic_search(str(tmp_path), "x")) + + assert payload["error"].startswith("Query must contain") + assert payload["truncated"] is False + + +def test_search_streaming_caps_matched_files(tmp_path, monkeypatch): + _git_init(tmp_path) + for idx in range(5): + (tmp_path / f"f{idx}.py").write_text(f"def fn_{idx}():\n return {idx}\n") + _git_commit(tmp_path) + + monkeypatch.setattr(engine_search, "MAX_SEARCH_FILES", 2) + payload = json.loads(engine_search.execute_semantic_search(str(tmp_path), "def")) + + assert payload["truncated"] is True + assert payload["limit_reason"] == "max_files" + assert payload["matched_files"] == 2 + assert len(payload["results"]) == 2 + + +def test_ops_status_reports_stale_refresh_lock(tmp_path): + dot_dir = tmp_path / ".dotscope" + dot_dir.mkdir() + (dot_dir / "refresh.lock").write_text("999999") + + status = ops_status(str(tmp_path)) + + assert status["healthy"] is False + assert status["exit_code"] == 2 + assert "stale_refresh_lock" in status["issues"] diff --git a/tests/test_refresh.py b/tests/test_refresh.py index 3832f23..95b4ba4 100644 --- a/tests/test_refresh.py +++ b/tests/test_refresh.py @@ -1,9 +1,11 @@ """Tests for runtime refresh queueing, overlay precedence, and self-heal.""" +import json import os import subprocess import time +import dotscope.workflows.refresh as refresh_workflow from dotscope.cli import main from dotscope.composer import compose from dotscope.context import parse_context @@ -11,12 +13,15 @@ from dotscope.parser import parse_scope_file from dotscope.passes.incremental import incremental_update from dotscope.refresh import ( + append_refresh_journal, enqueue_commit_refresh, enqueue_repo_refresh, enqueue_scope_refresh, ensure_resolution_freshness, load_refresh_queue, + recover_refresh_state, run_refresh_queue, + save_refresh_queue, ) from dotscope.runtime_overlay import ( runtime_index_path, @@ -136,7 +141,59 @@ def test_worker_respects_refresh_lock(self, tmp_path): lock_path.parent.mkdir(parents=True, exist_ok=True) lock_path.write_text("busy") - assert run_refresh_queue(str(tmp_path), drain=True) is False + assert run_refresh_queue(str(tmp_path), drain=True) is True + assert not lock_path.exists() + assert list((tmp_path / ".dotscope").glob("refresh.lock.dead.*")) + + def test_recover_quarantines_legacy_stale_lock_and_clears_status(self, tmp_path): + dot_dir = tmp_path / ".dotscope" + dot_dir.mkdir() + (dot_dir / "refresh.lock").write_text("999999") + (dot_dir / "refresh_status.json").write_text( + '{"running": true, "current_job": "repo", "current_targets": []}' + ) + + result = recover_refresh_state(str(tmp_path)) + + assert any(action.startswith("quarantined_lock:") for action in result["actions"]) + assert "cleared_stale_running_status" in result["actions"] + assert result["status"]["running"] is False + assert not (dot_dir / "refresh.lock").exists() + + def test_recover_requeues_non_terminal_jobs_idempotently(self, tmp_path): + job = enqueue_scope_refresh(str(tmp_path), ["auth"], reason="test") + save_refresh_queue(str(tmp_path), []) + append_refresh_journal(str(tmp_path), "started", job, phase="scope_refresh") + + first = recover_refresh_state(str(tmp_path)) + second = recover_refresh_state(str(tmp_path)) + + queue = load_refresh_queue(str(tmp_path)) + assert len(queue) == 1 + assert queue[0]["job_id"] == job["job_id"] + assert any(action.startswith("requeued:") for action in first["actions"]) + assert not any(action.startswith("requeued:") for action in second["actions"]) + + def test_worker_heartbeats_during_long_phase(self, tmp_path, monkeypatch): + enqueue_scope_refresh(str(tmp_path), ["auth"], reason="test") + monkeypatch.setattr(refresh_workflow, "REFRESH_HEARTBEAT_SECONDS", 0.01) + + def slow_scope_refresh(root, targets, quiet=True): + time.sleep(0.05) + return True + + monkeypatch.setattr(refresh_workflow, "run_scope_refresh", slow_scope_refresh) + + assert run_refresh_queue(str(tmp_path), drain=True) is True + + journal_path = tmp_path / ".dotscope" / "refresh_journal.jsonl" + events = [json.loads(line) for line in journal_path.read_text().splitlines()] + phase_heartbeats = [ + event + for event in events + if event.get("event") == "heartbeat" and event.get("phase") == "scope_refresh" + ] + assert len(phase_heartbeats) >= 2 def test_modify_only_commit_queues_scope_refresh(self, tmp_path): _git_init(tmp_path) diff --git a/tests/test_search.py b/tests/test_search.py index 14c9ce5..b931ba7 100644 --- a/tests/test_search.py +++ b/tests/test_search.py @@ -68,11 +68,24 @@ def test_semantic_search_zero_copy_enrichment(tmp_path): # byte 0 is 0 -> topology_A.bin active f.write(b'\x00' * 4096) - with patch('subprocess.run') as mock_run: - # Create a mock result - import subprocess - mock_result = subprocess.CompletedProcess(args=[], returncode=0, stdout=mocked_git_stdout, stderr="") - mock_run.return_value = mock_result + class FakePopen: + call_count = 0 + + def __init__(self, *args, **kwargs): + FakePopen.call_count += 1 + self.stdout = iter(mocked_git_stdout.splitlines(True)) + self.returncode = 0 + + def poll(self): + return self.returncode + + def communicate(self, timeout=None): + return "", "" + + def kill(self): + self.returncode = -9 + + with patch('subprocess.Popen', FakePopen): # Test the execution result_str = execute_semantic_search(root, "def ") @@ -90,4 +103,4 @@ def test_semantic_search_zero_copy_enrichment(tmp_path): assert results[1]["file"] == "libs/db/connection.py" assert "HIGH" in results[1]["architectural_gravity"] - assert mock_run.call_count == 1 + assert FakePopen.call_count == 1 diff --git a/tests/test_treesitter.py b/tests/test_treesitter.py index e326a83..14a2d20 100644 --- a/tests/test_treesitter.py +++ b/tests/test_treesitter.py @@ -3,9 +3,10 @@ import os import pytest import tempfile +import warnings from dotscope.passes.lang import get_analyzer -from dotscope.passes.lang._treesitter import AVAILABLE +from dotscope.passes.lang._treesitter import AVAILABLE, get_language, _languages from dotscope.passes.lang.javascript import JavaScriptAnalyzer from dotscope.passes.lang.go import GoAnalyzer, resolve_go_import from dotscope.models.core import ResolvedImport @@ -305,6 +306,15 @@ def test_python_returns_none(self): """Python uses stdlib ast, not tree-sitter.""" assert get_analyzer("python") is None + def test_solidity_language_load_is_warning_free(self): + _languages.pop("solidity", None) + with warnings.catch_warnings(record=True) as caught: + warnings.simplefilter("always") + get_language("solidity") + assert not [ + warning for warning in caught if issubclass(warning.category, DeprecationWarning) + ] + # --------------------------------------------------------------------------- # Integration: analyze_file dispatches to tree-sitter