-
Notifications
You must be signed in to change notification settings - Fork 2
Refactor monolithic core.py into focused modules and add response caching #1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,135 @@ | ||
| """SHA256-based LLM response cache. | ||
|
|
||
| Caches API responses by (prompt_hash, model) to avoid redundant calls. | ||
| Stored as JSON files in a .autolean_cache/ directory. | ||
| """ | ||
|
|
||
| from __future__ import annotations | ||
|
|
||
| import hashlib | ||
| import json | ||
| import os | ||
| import tempfile | ||
| import time | ||
| from pathlib import Path | ||
| from typing import Optional | ||
|
|
||
| from .util import CommandResult | ||
|
|
||
| _DEFAULT_CACHE_DIR = ".autolean_cache" | ||
|
|
||
|
|
||
| class ResponseCache: | ||
| """Disk-backed cache for LLM API responses.""" | ||
|
|
||
| def __init__(self, cache_dir: Optional[Path] = None, *, enabled: bool = True): | ||
| self.enabled = enabled | ||
| self.cache_dir = cache_dir or Path(_DEFAULT_CACHE_DIR) | ||
| self._hits = 0 | ||
| self._misses = 0 | ||
|
|
||
| @property | ||
| def hits(self) -> int: | ||
| return self._hits | ||
|
|
||
| @property | ||
| def misses(self) -> int: | ||
| return self._misses | ||
|
|
||
| @staticmethod | ||
| def _cache_key(prompt: str, model: str) -> str: | ||
| """Generate a deterministic cache key from prompt + model.""" | ||
| content = json.dumps({"prompt": prompt, "model": model}, sort_keys=True) | ||
| return hashlib.sha256(content.encode("utf-8")).hexdigest() | ||
|
|
||
| def _cache_path(self, key: str) -> Path: | ||
| # Use first 2 chars as subdirectory to avoid flat directory with thousands of files | ||
| subdir = self.cache_dir / key[:2] | ||
| subdir.mkdir(parents=True, exist_ok=True) | ||
| return subdir / f"{key}.json" | ||
|
|
||
| def get(self, prompt: str, model: str) -> Optional[CommandResult]: | ||
| """Look up a cached response. Returns None on miss.""" | ||
| if not self.enabled: | ||
| return None | ||
|
|
||
| key = self._cache_key(prompt, model) | ||
| path = self._cache_path(key) | ||
|
|
||
| if not path.exists(): | ||
| self._misses += 1 | ||
| return None | ||
|
|
||
| try: | ||
| data = json.loads(path.read_text(encoding="utf-8")) | ||
| self._hits += 1 | ||
| return CommandResult( | ||
| argv=data.get("argv", []), | ||
| returncode=data.get("returncode", 0), | ||
| stdout=data.get("stdout", ""), | ||
| stderr=data.get("stderr", ""), | ||
| ) | ||
| except (json.JSONDecodeError, OSError, KeyError): | ||
| self._misses += 1 | ||
| return None | ||
|
|
||
| def put(self, prompt: str, model: str, result: CommandResult) -> None: | ||
| """Store a successful response in the cache.""" | ||
| if not self.enabled: | ||
| return | ||
| # Only cache successful responses | ||
| if result.returncode != 0: | ||
| return | ||
|
|
||
| key = self._cache_key(prompt, model) | ||
| path = self._cache_path(key) | ||
|
|
||
| data = { | ||
| "prompt_sha256": hashlib.sha256(prompt.encode("utf-8")).hexdigest(), | ||
| "model": model, | ||
| "cached_at": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), | ||
| "argv": result.argv, | ||
| "returncode": result.returncode, | ||
| "stdout": result.stdout, | ||
| "stderr": result.stderr, | ||
| } | ||
| try: | ||
| # Atomic write: write to temp file then rename to prevent corruption | ||
| path.parent.mkdir(parents=True, exist_ok=True) | ||
| fd, tmp_path = tempfile.mkstemp(dir=path.parent, suffix=".tmp") | ||
| try: | ||
| with os.fdopen(fd, "w", encoding="utf-8") as f: | ||
| json.dump(data, f, ensure_ascii=True) | ||
| os.replace(tmp_path, path) | ||
| except BaseException: | ||
| try: | ||
| os.unlink(tmp_path) | ||
| except OSError: | ||
| pass | ||
| raise | ||
| except OSError: | ||
| pass # Cache write failure is non-fatal | ||
|
Comment on lines
+76
to
+111
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Writing directly to the cache file can lead to corrupted entries if the process is interrupted or if there is concurrent access. It is safer to write to a temporary file and then perform an atomic replace. Additionally, References
|
||
|
|
||
| def clear(self) -> int: | ||
| """Remove all cached entries. Returns count of files removed.""" | ||
| if not self.cache_dir.exists(): | ||
| return 0 | ||
| count = 0 | ||
| for f in self.cache_dir.rglob("*.json"): | ||
| try: | ||
| f.unlink() | ||
| count += 1 | ||
| except OSError: | ||
| pass | ||
| return count | ||
|
|
||
| def stats(self) -> dict[str, int]: | ||
| """Return cache hit/miss statistics.""" | ||
| total = 0 | ||
| if self.cache_dir.exists(): | ||
| total = sum(1 for _ in self.cache_dir.rglob("*.json")) | ||
| return { | ||
| "hits": self._hits, | ||
| "misses": self._misses, | ||
| "total_entries": total, | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,208 @@ | ||
| """Lean compiler interaction, error extraction, and error memory.""" | ||
|
|
||
| from __future__ import annotations | ||
|
|
||
| import re | ||
| from collections import OrderedDict | ||
| from pathlib import Path | ||
| from typing import Optional | ||
|
|
||
| from .util import CommandResult | ||
| from .providers import run_subprocess | ||
|
|
||
| _LEAN_LOCATION_PREFIX_RE = re.compile(r"^(?:[A-Za-z]:)?[^:\s]*\.lean:\d+:\d+:\s*") | ||
| _WHITESPACE_RE = re.compile(r"\s+") | ||
| _LEAN_MODULE_PART_RE = re.compile(r"^[A-Za-z_][A-Za-z0-9_']*$") | ||
|
|
||
| REPAIR_ERROR_MEMORY_LIMIT = 6 | ||
|
|
||
|
|
||
| # --------------------------------------------------------------------------- | ||
| # Compile | ||
| # --------------------------------------------------------------------------- | ||
|
|
||
| def compile_lean( | ||
| argv: list[str], | ||
| *, | ||
| cwd: Path, | ||
| live: bool = False, | ||
| stdout_sink=None, | ||
| stderr_sink=None, | ||
| ) -> CommandResult: | ||
| return run_subprocess( | ||
| argv, cwd=cwd, live=live, | ||
| stdout_sink=stdout_sink, stderr_sink=stderr_sink, | ||
| ) | ||
|
|
||
|
|
||
| # --------------------------------------------------------------------------- | ||
| # Error extraction and memory | ||
| # --------------------------------------------------------------------------- | ||
|
|
||
| def extract_compact_error_lines(compiler_res: CommandResult) -> list[str]: | ||
| combined = (compiler_res.stdout + "\n" + compiler_res.stderr).strip() | ||
| if not combined: | ||
| return [] | ||
|
|
||
| lines: list[str] = [] | ||
| for raw in combined.splitlines(): | ||
| line = raw.strip() | ||
| if not line: | ||
| continue | ||
| lowered = line.lower() | ||
| if ( | ||
| "error" in lowered | ||
| or "parse failure" in lowered | ||
| or "policy failure" in lowered | ||
| or "failed before producing lean output" in lowered | ||
| ): | ||
| lines.append(line) | ||
|
|
||
| if lines: | ||
| return lines | ||
|
|
||
| for raw in combined.splitlines(): | ||
| line = raw.strip() | ||
| if line: | ||
| return [line] | ||
| return [] | ||
|
|
||
|
|
||
| def normalize_error_line(line: str) -> str: | ||
| normalized = _LEAN_LOCATION_PREFIX_RE.sub("", line.strip()) | ||
| normalized = _WHITESPACE_RE.sub(" ", normalized).strip() | ||
| return normalized | ||
|
|
||
|
|
||
| def update_error_memory( | ||
| memory: OrderedDict[str, tuple[str, int, int]], | ||
| compiler_res: CommandResult, | ||
| *, | ||
| iter_no: int, | ||
| ) -> None: | ||
| for line in extract_compact_error_lines(compiler_res): | ||
| key = normalize_error_line(line) | ||
| if not key: | ||
| continue | ||
| if key in memory: | ||
| _last_line, count, _last_iter = memory[key] | ||
| memory[key] = (key, count + 1, iter_no) | ||
| memory.move_to_end(key) | ||
| else: | ||
| memory[key] = (key, 1, iter_no) | ||
|
|
||
|
|
||
| def format_error_memory(memory: OrderedDict[str, tuple[str, int, int]], *, limit: int) -> str: | ||
| if limit <= 0 or not memory: | ||
| return "" | ||
| recent_items = list(memory.items())[-limit:] | ||
| lines: list[str] = [] | ||
| for idx, (_key, (display, count, last_iter)) in enumerate(reversed(recent_items), start=1): | ||
| if count > 1: | ||
| lines.append(f"{idx}. [seen {count}x, last iter {last_iter}] {display}") | ||
| else: | ||
| lines.append(f"{idx}. [iter {last_iter}] {display}") | ||
| return "\n".join(lines) | ||
|
|
||
|
|
||
| # --------------------------------------------------------------------------- | ||
| # Lean code analysis | ||
| # --------------------------------------------------------------------------- | ||
|
|
||
| def extract_top_level_prop_from_theorem_header(header: str) -> Optional[str]: | ||
| depth = 0 | ||
| last_colon = -1 | ||
| for i, ch in enumerate(header): | ||
| if ch in "([{": | ||
| depth += 1 | ||
| elif ch in ")]}": | ||
| depth = max(0, depth - 1) | ||
| elif ch == ":" and depth == 0: | ||
| if i + 1 < len(header) and header[i + 1] == "=": | ||
| continue | ||
| last_colon = i | ||
| if last_colon < 0: | ||
| return None | ||
| return header[last_colon + 1:].strip() | ||
|
Comment on lines
+112
to
+126
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The logic for extracting the proposition from the theorem header incorrectly uses the last colon at depth 0. In Lean 4, propositions often contain colons at the top level (e.g., in quantifiers like References
|
||
|
|
||
|
|
||
| def detect_trivialized_statement(lean_code: str, *, theorem_name: str) -> Optional[str]: | ||
| start_re = re.compile(rf"\b(?:theorem|lemma)\s+{re.escape(theorem_name)}\b") | ||
| m = start_re.search(lean_code) | ||
| if m is None: | ||
| return None | ||
| end = lean_code.find(":=", m.end()) | ||
| if end < 0: | ||
| return None | ||
| header = lean_code[m.start():end] | ||
| prop = extract_top_level_prop_from_theorem_header(header) | ||
| if not prop: | ||
| return None | ||
| match = re.match(r"^\(?\s*(True|False)\b", prop) | ||
| if match is None: | ||
| return None | ||
| return match.group(1) | ||
|
|
||
|
|
||
| def module_name_from_lean_path(lean_path: Path, *, run_cwd: Path) -> Optional[str]: | ||
| try: | ||
| rel = lean_path.resolve().relative_to(run_cwd.resolve()) | ||
| except ValueError: | ||
| return None | ||
| if rel.suffix != ".lean": | ||
| return None | ||
| parts = rel.with_suffix("").parts | ||
| if not parts: | ||
| return None | ||
| for part in parts: | ||
| if not _LEAN_MODULE_PART_RE.fullmatch(part): | ||
| return None | ||
| return ".".join(parts) | ||
|
|
||
|
|
||
| def inject_imports(lean_code: str, module_names: list[str]) -> str: | ||
| if not module_names: | ||
| return lean_code | ||
|
|
||
| ordered_modules: list[str] = [] | ||
| seen: set[str] = set() | ||
| for module in module_names: | ||
| module = module.strip() | ||
| if not module or module in seen: | ||
| continue | ||
| seen.add(module) | ||
| ordered_modules.append(module) | ||
| if not ordered_modules: | ||
| return lean_code | ||
|
|
||
| lines = lean_code.splitlines() | ||
| existing_imports: set[str] = set() | ||
| insert_at = 0 | ||
|
|
||
| for idx, line in enumerate(lines): | ||
| stripped = line.strip() | ||
| if not stripped: | ||
| insert_at = idx + 1 | ||
| continue | ||
| if stripped.startswith("--"): | ||
| insert_at = idx + 1 | ||
| continue | ||
| if stripped.startswith("import "): | ||
| module = stripped[len("import "):].strip() | ||
| if module: | ||
| existing_imports.add(module) | ||
| insert_at = idx + 1 | ||
| continue | ||
| break | ||
|
|
||
| missing_import_lines = [ | ||
| f"import {module}" for module in ordered_modules if module not in existing_imports | ||
| ] | ||
| if not missing_import_lines: | ||
| return lean_code | ||
|
|
||
| merged_lines = lines[:insert_at] + missing_import_lines + lines[insert_at:] | ||
| merged = "\n".join(merged_lines) | ||
| if lean_code.endswith("\n"): | ||
| return merged + "\n" | ||
| return merged | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Calling
mkdiron every cache lookup is inefficient and unnecessary for a read operation. This should be moved to theputmethod and only executed when a write is actually performed.References