diff --git a/.gitignore b/.gitignore index b9aa668..a9e266e 100644 --- a/.gitignore +++ b/.gitignore @@ -18,6 +18,10 @@ Thumbs.db store/ maelstrom/maelstrom/store/ +# Python +__pycache__/ +*.pyc + # Profiling profile.json diff --git a/gepa/README.md b/gepa/README.md new file mode 100644 index 0000000..1861881 --- /dev/null +++ b/gepa/README.md @@ -0,0 +1,155 @@ +# GEPA - Genetic Evolution for Prompt Artifacts + +Skill evaluation harness and evolutionary optimizer for Claude Code agent skills (`.claude/agents/*.md`) and DST fault configurations (`src/buggify/config.rs`). + +## Quick Start + +```bash +# Phase 1: Scoring +python -m gepa.harness --list-tasks # Available ground truth tasks +python -m gepa.harness --score-baseline # Score all skills +python -m gepa.harness --skill rust-dev --offline # Score one skill +python -m gepa.harness --detail rust-dev paper_review # Detailed breakdown +python -m gepa.harness --capture-reviews # Capture via Claude CLI + +# Phase 2: Skill Evolution +python -m gepa.harness --evolve rust-dev --mock # Test GA machinery (free) +python -m gepa.harness --evolve rust-dev --live # Real evolution (~$0.05/eval) +python -m gepa.harness --evolve rust-dev --live --budget 10.0 --generations 15 + +# Phase 2: DST Config Optimization +python -m gepa.harness --dst-optimize --mock # Test DST GA (free) +python -m gepa.harness --dst-optimize --test executor_dst_test --seeds 10 +python -m gepa.harness --dst-optimize --generations 15 --population 12 +``` + +## Architecture + +``` +gepa/ + __init__.py # Package exports + __main__.py # python -m gepa entry point + harness.py # CLI: argparse + orchestration + + # Phase 1: Scoring + scorer.py # Keyword matching + composite scoring + candidate.py # SkillCandidate: markdown parsing by section + evaluator.py # Offline + live evaluation modes + + # Phase 2: Skill Evolution + mutations.py # 5 text mutation operators + skill_fitness.py # Mock, Offline, Live fitness evaluators + CostTracker + evolution.py # SkillEvolutionEngine: GA loop for skill markdown + + # Phase 2: DST Config Optimization + dst_candidate.py # DstCandidate: 32 tunable fault parameters + dst_evaluator.py # Mock + cargo test fitness evaluators + dst_optimizer.py # DstEvolutionEngine: GA loop for fault configs + + # Data + ground_truth/ + schema.md # Ground truth format documentation + paper_review.json # Expert paper review findings + pr13_review.json # PR #13 review findings (ACL DRYRUN/LOG) + pr14_review.json # PR #14 review findings (ACL categories) + reviews/ # Cached review texts (git-ignored) + results/ # Skill evolution output (git-ignored) + dst_results/ # DST optimization output (git-ignored) +``` + +## Phase 1: Scoring + +Each review is scored on four components: + +| Component | Weight | Description | +|-----------|--------|-------------| +| Weighted Recall | 0.50 | Found required issues, weighted by severity | +| Precision | 0.25 | True positives / (TP + false positives) | +| Calibration | 0.15 | Correct severity classification | +| Coverage Bonus | 0.10 | Optional findings discovered | + +Scoring uses keyword co-occurrence (deterministic, no API calls): +- Each finding has a list of keywords +- A finding is "matched" if >= 50% of keywords appear in the review +- False positives (known-correct things incorrectly flagged) reduce precision + +## Phase 2: Skill Evolution + +Evolves skill markdown files using genetic algorithms: + +- **Crossover**: Section-level uniform (50/50 per section from each parent) +- **Mutation**: 5 text operators (sentence shuffle/drop/duplicate, keyword inject, section swap) +- **Fitness**: Composite score from ground truth evaluation +- **Budget**: CostTracker enforces spending limits for live evaluations + +### Evaluator Modes + +| Mode | Cost | Usage | +|------|------|-------| +| `--mock` | Free | Tests GA machinery with synthetic fitness | +| `--live` | ~$0.05/task | Calls Claude CLI, scores against ground truth | + +### Output + +Results saved to `gepa/results/{skill_name}/`: +- `gen_NNN.json` — Population data per generation +- `best_skill.md` — Best evolved skill markdown +- `evolution_history.json` — Full history with fitness curves + +## Phase 2: DST Config Optimization + +Evolves buggify fault injection probabilities to find configurations that surface the most invariant violations: + +- **Parameters**: 32 floats (31 fault probabilities + global_multiplier) +- **Crossover**: Uniform per parameter +- **Mutation**: `current += choice([-1, 0, 1]) * step`, clamped to bounds +- **Initialization**: calm + moderate + chaos presets + random variants +- **Fitness**: 0.5 * violations_found + 0.3 * fault_coverage + 0.2 * (1 - crash_rate) + +### Rust Integration + +The `BUGGIFY_CONFIG` env var overrides `FaultConfig::moderate()` defaults: + +```bash +# Run DST tests with custom fault config +BUGGIFY_CONFIG="global_multiplier=2.0,network.packet_drop=0.05" \ + cargo test --release --test executor_dst_test +``` + +Format: comma-separated `key=value` pairs. See `src/buggify/faults.rs` for all fault IDs. + +### Output + +Results saved to `gepa/dst_results/`: +- `gen_NNN.json` — Population data per generation +- `best_config.env` — Best config in `BUGGIFY_CONFIG` format +- `evolution_history.json` — Full history + +## Ground Truth + +Ground truth files capture expert knowledge from real reviews: +- **paper_review.json**: 13 findings from 6-expert paper review +- **pr13_review.json**: 6 findings from PR #13 (ACL DRYRUN/LOG) +- **pr14_review.json**: 4 findings from PR #14 (ACL categories) + +See `ground_truth/schema.md` for the full format specification. + +## Testing + +```bash +# All Phase 1 tests +python3 -m unittest gepa.test_scorer gepa.test_candidate gepa.test_evaluator -v + +# Phase 2: Mutation + Evolution tests +python3 -m unittest gepa.test_mutations gepa.test_evolution -v + +# Phase 2: DST optimizer tests +python3 -m unittest gepa.test_dst_candidate gepa.test_dst_optimizer -v + +# Rust-side buggify config parsing +cargo test --release -p redis_sim config::tests +``` + +## Dependencies + +None. Python 3.10+ stdlib only (matching `evolve/` pattern). diff --git a/gepa/__init__.py b/gepa/__init__.py new file mode 100644 index 0000000..c5b2a2f --- /dev/null +++ b/gepa/__init__.py @@ -0,0 +1,24 @@ +""" +GEPA - Genetic Evolution for Prompt Artifacts + +Skill evaluation harness and DST configuration optimizer. +Uses ground truth from expert reviews to score and evolve +Claude Code skill files (.claude/agents/*.md). +""" + +from .scorer import Scorer +from .candidate import SkillCandidate +from .evaluator import OfflineEvaluator +from .evolution import SkillEvolutionEngine +from .dst_candidate import DstCandidate +from .dst_optimizer import DstEvolutionEngine + +__version__ = "0.2.0" +__all__ = [ + "Scorer", + "SkillCandidate", + "OfflineEvaluator", + "SkillEvolutionEngine", + "DstCandidate", + "DstEvolutionEngine", +] diff --git a/gepa/__main__.py b/gepa/__main__.py new file mode 100644 index 0000000..b9b26ad --- /dev/null +++ b/gepa/__main__.py @@ -0,0 +1,4 @@ +"""Allow running as: python -m gepa""" +from .harness import main + +main() diff --git a/gepa/candidate.py b/gepa/candidate.py new file mode 100644 index 0000000..c0feec5 --- /dev/null +++ b/gepa/candidate.py @@ -0,0 +1,177 @@ +""" +SkillCandidate - represents a skill markdown file as a mutable artifact. + +Parses skill markdown into sections by ## headings, enabling +section-level mutation and crossover for GEPA evolution. +""" + +import re +from dataclasses import dataclass, field +from pathlib import Path +from typing import Dict, List, Optional + + +@dataclass +class SkillSection: + """A single ## section within a skill markdown file.""" + heading: str # The ## heading text (without ##) + content: str # Everything between this heading and the next + level: int = 2 # Heading level (## = 2, ### = 3, etc.) + + def to_markdown(self) -> str: + prefix = "#" * self.level + return f"{prefix} {self.heading}\n\n{self.content}" + + def word_count(self) -> int: + return len(self.content.split()) + + +@dataclass +class SkillCandidate: + """ + A skill markdown file parsed into sections for mutation. + + Mirrors evolve/candidate.py pattern: serializable, identifiable, + and fitness-trackable. + """ + name: str # Skill name (e.g., "rust-dev") + frontmatter: str # YAML frontmatter (---\n...\n---) + preamble: str # Content before first ## heading + sections: List[SkillSection] = field(default_factory=list) + fitness: Optional[float] = None + generation: int = 0 + parent_ids: List[int] = field(default_factory=list) + _id: int = field(default_factory=lambda: SkillCandidate._next_id()) + + _id_counter: int = 0 + + @classmethod + def _next_id(cls) -> int: + cls._id_counter += 1 + return cls._id_counter + + @classmethod + def reset_id_counter(cls): + """Reset ID counter (useful for testing).""" + cls._id_counter = 0 + + @classmethod + def from_file(cls, path: Path) -> "SkillCandidate": + """Parse a skill markdown file into sections.""" + assert path.exists(), f"Skill file not found: {path}" + text = path.read_text() + name = path.stem + return cls.from_text(name, text) + + @classmethod + def from_text(cls, name: str, text: str) -> "SkillCandidate": + """Parse skill markdown text into sections.""" + assert isinstance(text, str) and len(text) > 0, "text must be non-empty" + + frontmatter = "" + body = text + + # Extract YAML frontmatter + fm_match = re.match(r'^---\s*\n(.*?)\n---\s*\n', text, re.DOTALL) + if fm_match: + frontmatter = fm_match.group(0) + body = text[fm_match.end():] + + # Split on ## headings (capture the heading line) + parts = re.split(r'^(#{1,6})\s+(.+)$', body, flags=re.MULTILINE) + + preamble = parts[0].strip() + sections = [] + + # parts[0] = preamble, then groups of 3: (hashes, heading, content) + i = 1 + while i < len(parts) - 2: + hashes = parts[i] + heading = parts[i + 1].strip() + content = parts[i + 2].strip() if i + 2 < len(parts) else "" + level = len(hashes) + sections.append(SkillSection( + heading=heading, + content=content, + level=level, + )) + i += 3 + + return cls( + name=name, + frontmatter=frontmatter, + preamble=preamble, + sections=sections, + ) + + def to_markdown(self) -> str: + """Reconstruct the full markdown from sections.""" + parts = [] + if self.frontmatter: + parts.append(self.frontmatter.rstrip()) + if self.preamble: + parts.append(self.preamble) + for section in self.sections: + parts.append(section.to_markdown()) + return "\n\n".join(parts) + "\n" + + def save(self, path: Path) -> None: + """Save skill markdown to file.""" + path.write_text(self.to_markdown()) + + def section_names(self) -> List[str]: + """List all section headings.""" + return [s.heading for s in self.sections] + + def get_section(self, heading: str) -> Optional[SkillSection]: + """Find a section by heading (case-insensitive).""" + heading_lower = heading.lower() + for s in self.sections: + if s.heading.lower() == heading_lower: + return s + return None + + def replace_section(self, heading: str, new_content: str) -> bool: + """Replace a section's content by heading. Returns True if found.""" + section = self.get_section(heading) + if section is None: + return False + section.content = new_content + return True + + def word_count(self) -> int: + """Total word count across all sections.""" + total = len(self.preamble.split()) + total += sum(s.word_count() for s in self.sections) + return total + + def to_dict(self) -> Dict: + """Serialize to dict for JSON storage.""" + return { + "id": self._id, + "name": self.name, + "fitness": self.fitness, + "generation": self.generation, + "parent_ids": self.parent_ids, + "section_count": len(self.sections), + "word_count": self.word_count(), + "sections": [s.heading for s in self.sections], + } + + @classmethod + def from_dict(cls, data: Dict, agents_dir: Path) -> "SkillCandidate": + """Load from dict by re-reading the skill file.""" + name = data["name"] + path = agents_dir / f"{name}.md" + candidate = cls.from_file(path) + candidate.fitness = data.get("fitness") + candidate.generation = data.get("generation", 0) + candidate.parent_ids = data.get("parent_ids", []) + return candidate + + def __repr__(self) -> str: + fitness_str = f"{self.fitness:.3f}" if self.fitness is not None else "?" + return ( + f"SkillCandidate(name={self.name!r}, fitness={fitness_str}, " + f"sections={len(self.sections)}, words={self.word_count()})" + ) diff --git a/gepa/dst_candidate.py b/gepa/dst_candidate.py new file mode 100644 index 0000000..21c73e8 --- /dev/null +++ b/gepa/dst_candidate.py @@ -0,0 +1,187 @@ +""" +DST Candidate -- represents a buggify fault configuration as a GA individual. + +32 tunable parameters: 31 fault probabilities + global_multiplier. +All floats with min/max/step/default bounds. +""" + +from dataclasses import dataclass, field +from typing import Dict, List, Optional + + +@dataclass +class DstParamBounds: + """Bounds for a single DST parameter.""" + min: float + max: float + step: float + default: float + + def clamp(self, value: float) -> float: + return max(self.min, min(self.max, value)) + + +# All 32 tunable parameters with bounds +# Derived from src/buggify/config.rs moderate() preset +DST_PARAM_BOUNDS: Dict[str, DstParamBounds] = { + # Global + "global_multiplier": DstParamBounds(0.01, 5.0, 0.1, 1.0), + # Network faults + "network.packet_drop": DstParamBounds(0.0, 0.20, 0.005, 0.01), + "network.packet_corrupt": DstParamBounds(0.0, 0.05, 0.001, 0.001), + "network.partial_write": DstParamBounds(0.0, 0.10, 0.005, 0.005), + "network.reorder": DstParamBounds(0.0, 0.20, 0.005, 0.02), + "network.connection_reset": DstParamBounds(0.0, 0.10, 0.005, 0.005), + "network.connect_timeout": DstParamBounds(0.0, 0.15, 0.005, 0.01), + "network.delay": DstParamBounds(0.0, 0.30, 0.01, 0.05), + "network.duplicate": DstParamBounds(0.0, 0.10, 0.005, 0.005), + # Timer faults + "timer.drift_fast": DstParamBounds(0.0, 0.10, 0.005, 0.01), + "timer.drift_slow": DstParamBounds(0.0, 0.10, 0.005, 0.01), + "timer.skip": DstParamBounds(0.0, 0.10, 0.005, 0.01), + "timer.duplicate": DstParamBounds(0.0, 0.05, 0.001, 0.005), + "timer.jump_forward": DstParamBounds(0.0, 0.05, 0.001, 0.001), + "timer.jump_backward": DstParamBounds(0.0, 0.02, 0.0005, 0.0005), + # Process faults + "process.crash": DstParamBounds(0.0, 0.02, 0.001, 0.001), + "process.pause": DstParamBounds(0.0, 0.10, 0.005, 0.01), + "process.slow": DstParamBounds(0.0, 0.15, 0.005, 0.02), + "process.oom": DstParamBounds(0.0, 0.005, 0.0001, 0.0001), + "process.cpu_starvation": DstParamBounds(0.0, 0.10, 0.005, 0.01), + # Disk faults + "disk.write_fail": DstParamBounds(0.0, 0.02, 0.001, 0.001), + "disk.partial_write": DstParamBounds(0.0, 0.02, 0.001, 0.001), + "disk.corruption": DstParamBounds(0.0, 0.005, 0.0001, 0.0001), + "disk.slow": DstParamBounds(0.0, 0.15, 0.005, 0.02), + "disk.fsync_fail": DstParamBounds(0.0, 0.01, 0.0005, 0.0005), + "disk.stale_read": DstParamBounds(0.0, 0.02, 0.001, 0.001), + "disk.disk_full": DstParamBounds(0.0, 0.005, 0.0001, 0.0001), + # Replication faults + "replication.gossip_drop": DstParamBounds(0.0, 0.20, 0.005, 0.02), + "replication.gossip_delay": DstParamBounds(0.0, 0.30, 0.01, 0.05), + "replication.gossip_corrupt": DstParamBounds(0.0, 0.02, 0.001, 0.001), + "replication.split_brain": DstParamBounds(0.0, 0.005, 0.0001, 0.0001), + "replication.stale_replica": DstParamBounds(0.0, 0.10, 0.005, 0.01), +} + +# Preset configurations matching Rust config.rs +PRESETS = { + "calm": { + "global_multiplier": 0.1, + "network.packet_drop": 0.001, + "network.delay": 0.01, + "timer.drift_fast": 0.001, + "timer.drift_slow": 0.001, + }, + "moderate": {k: v.default for k, v in DST_PARAM_BOUNDS.items()}, + "chaos": { + "global_multiplier": 3.0, + "network.packet_drop": 0.05, + "network.packet_corrupt": 0.01, + "network.partial_write": 0.02, + "network.reorder": 0.10, + "network.connection_reset": 0.02, + "network.connect_timeout": 0.05, + "network.delay": 0.15, + "network.duplicate": 0.02, + "timer.drift_fast": 0.05, + "timer.drift_slow": 0.05, + "timer.skip": 0.05, + "timer.duplicate": 0.02, + "timer.jump_forward": 0.01, + "timer.jump_backward": 0.005, + "process.crash": 0.005, + "process.pause": 0.05, + "process.slow": 0.10, + "process.oom": 0.001, + "process.cpu_starvation": 0.05, + "disk.write_fail": 0.005, + "disk.partial_write": 0.005, + "disk.corruption": 0.001, + "disk.slow": 0.10, + "disk.fsync_fail": 0.002, + "disk.stale_read": 0.005, + "disk.disk_full": 0.001, + "replication.gossip_drop": 0.10, + "replication.gossip_delay": 0.15, + "replication.gossip_corrupt": 0.005, + "replication.split_brain": 0.001, + "replication.stale_replica": 0.05, + }, +} + + +@dataclass +class DstCandidate: + """A buggify fault configuration as a GA individual.""" + config: Dict[str, float] = field(default_factory=dict) + fitness: Optional[float] = None + generation: int = 0 + parent_ids: List[int] = field(default_factory=list) + _id: int = field(default_factory=lambda: DstCandidate._next_id()) + + _id_counter: int = 0 + + @classmethod + def _next_id(cls) -> int: + cls._id_counter += 1 + return cls._id_counter + + @classmethod + def reset_id_counter(cls): + cls._id_counter = 0 + + @classmethod + def from_preset(cls, name: str) -> "DstCandidate": + """Create from a preset (calm, moderate, chaos).""" + assert name in PRESETS, f"Unknown preset: {name}. Available: {list(PRESETS.keys())}" + # Start with moderate defaults, then overlay preset + config = {k: v.default for k, v in DST_PARAM_BOUNDS.items()} + config.update(PRESETS[name]) + return cls(config=config) + + @classmethod + def from_env_string(cls, s: str) -> "DstCandidate": + """Parse from BUGGIFY_CONFIG env var format.""" + config = {k: v.default for k, v in DST_PARAM_BOUNDS.items()} + for pair in s.split(","): + pair = pair.strip() + if not pair: + continue + key, _, val = pair.partition("=") + key = key.strip() + val = val.strip() + if key in DST_PARAM_BOUNDS: + config[key] = DST_PARAM_BOUNDS[key].clamp(float(val)) + return cls(config=config) + + def to_env_string(self) -> str: + """Serialize to BUGGIFY_CONFIG env var format.""" + parts = [] + for key in sorted(self.config.keys()): + val = self.config[key] + parts.append(f"{key}={val:.6f}") + return ",".join(parts) + + def to_dict(self) -> Dict: + """Serialize for JSON storage.""" + return { + "id": self._id, + "config": {k: round(v, 6) for k, v in sorted(self.config.items())}, + "fitness": self.fitness, + "generation": self.generation, + "parent_ids": self.parent_ids, + } + + def is_valid(self) -> bool: + """Check all params are within bounds.""" + for key, bounds in DST_PARAM_BOUNDS.items(): + val = self.config.get(key, bounds.default) + if val < bounds.min - 1e-9 or val > bounds.max + 1e-9: + return False + return True + + def __repr__(self) -> str: + fitness_str = f"{self.fitness:.4f}" if self.fitness is not None else "?" + gm = self.config.get("global_multiplier", 1.0) + return f"DstCandidate(id={self._id}, fitness={fitness_str}, gm={gm:.2f})" diff --git a/gepa/dst_evaluator.py b/gepa/dst_evaluator.py new file mode 100644 index 0000000..a247be9 --- /dev/null +++ b/gepa/dst_evaluator.py @@ -0,0 +1,424 @@ +""" +DST fitness evaluators for fault config optimization. + +DstFitnessEvaluator: runs cargo test with BUGGIFY_CONFIG env var, parses results +MockDstEvaluator: deterministic mock for testing GA machinery +""" + +import logging +import os +import re +import subprocess +from abc import ABC, abstractmethod +from dataclasses import dataclass, field +from pathlib import Path +from typing import Dict, List, Optional + +from .dst_candidate import DstCandidate + +logger = logging.getLogger(__name__) + + +@dataclass +class TestResult: + """Result of a single test run.""" + test_name: str + seed: int + passed: bool + invariant_violations: int = 0 + panicked: bool = False + output: str = "" + + +@dataclass +class BatchResult: + """Parsed structured output from test_env_config_batch.""" + total_runs: int = 0 + total_ops: int = 0 + total_crashes: int = 0 + total_recoveries: int = 0 + failures: int = 0 + buggify_checks: int = 0 + buggify_triggers: int = 0 + faults_triggered: int = 0 + global_multiplier: float = 1.0 + test_passed: bool = False + raw_output: str = "" + + @classmethod + def parse(cls, output: str, test_passed: bool) -> "BatchResult": + """Parse GEPA_* structured output from cargo test.""" + result = cls(test_passed=test_passed, raw_output=output[:3000]) + + def _extract_int(key: str) -> int: + m = re.search(rf'{key}=(\d+)', output) + return int(m.group(1)) if m else 0 + + def _extract_float(key: str) -> float: + m = re.search(rf'{key}=([\d.]+)', output) + return float(m.group(1)) if m else 0.0 + + result.global_multiplier = _extract_float("GEPA_GLOBAL_MULTIPLIER") + result.total_runs = _extract_int("GEPA_TOTAL_RUNS") + result.total_ops = _extract_int("GEPA_TOTAL_OPS") + result.total_crashes = _extract_int("GEPA_TOTAL_CRASHES") + result.total_recoveries = _extract_int("GEPA_TOTAL_RECOVERIES") + result.failures = _extract_int("GEPA_FAILURES") + result.buggify_checks = _extract_int("GEPA_BUGGIFY_CHECKS") + result.buggify_triggers = _extract_int("GEPA_BUGGIFY_TRIGGERS") + result.faults_triggered = _extract_int("GEPA_FAULTS_TRIGGERED") + + return result + + +@dataclass +class StreamingResult: + """Parsed output from test_env_config_streaming.""" + total_ops: int = 0 + put_failures: int = 0 + get_failures: int = 0 + flushes: int = 0 + crashes: int = 0 + invariant_violations: int = 0 + test_passed: bool = False + + @classmethod + def parse(cls, output: str, test_passed: bool) -> "StreamingResult": + result = cls(test_passed=test_passed) + + def _extract_int(key: str) -> int: + m = re.search(rf'{key}=(\d+)', output) + return int(m.group(1)) if m else 0 + + result.total_ops = _extract_int("GEPA_STREAMING_OPS") + result.put_failures = _extract_int("GEPA_STREAMING_PUT_FAILURES") + result.get_failures = _extract_int("GEPA_STREAMING_GET_FAILURES") + result.flushes = _extract_int("GEPA_STREAMING_FLUSHES") + result.crashes = _extract_int("GEPA_STREAMING_CRASHES") + result.invariant_violations = _extract_int("GEPA_STREAMING_INVARIANT_VIOLATIONS") + return result + + +@dataclass +class WalResult: + """Parsed output from test_env_config_wal.""" + total_writes: int = 0 + acknowledged: int = 0 + missing_after_recovery: int = 0 + write_failures: int = 0 + sync_failures: int = 0 + disk_full: int = 0 + invariant_failures: int = 0 + test_passed: bool = False + + @classmethod + def parse(cls, output: str, test_passed: bool) -> "WalResult": + result = cls(test_passed=test_passed) + + def _extract_int(key: str) -> int: + m = re.search(rf'{key}=(\d+)', output) + return int(m.group(1)) if m else 0 + + result.total_writes = _extract_int("GEPA_WAL_TOTAL_WRITES") + result.acknowledged = _extract_int("GEPA_WAL_ACKNOWLEDGED") + result.missing_after_recovery = _extract_int("GEPA_WAL_MISSING_AFTER_RECOVERY") + result.write_failures = _extract_int("GEPA_WAL_WRITE_FAILURES") + result.sync_failures = _extract_int("GEPA_WAL_SYNC_FAILURES") + result.disk_full = _extract_int("GEPA_WAL_DISK_FULL") + result.invariant_failures = _extract_int("GEPA_WAL_INVARIANT_FAILURES") + return result + + +@dataclass +class CrdtResult: + """Parsed output from test_env_config_crdt.""" + total_ops: int = 0 + message_drops: int = 0 + convergence_failures: int = 0 + invariant_violations: int = 0 + test_passed: bool = False + + @classmethod + def parse(cls, output: str, test_passed: bool) -> "CrdtResult": + result = cls(test_passed=test_passed) + + def _extract_int(key: str) -> int: + m = re.search(rf'{key}=(\d+)', output) + return int(m.group(1)) if m else 0 + + result.total_ops = _extract_int("GEPA_CRDT_TOTAL_OPS") + result.message_drops = _extract_int("GEPA_CRDT_MESSAGE_DROPS") + result.convergence_failures = _extract_int("GEPA_CRDT_CONVERGENCE_FAILURES") + result.invariant_violations = _extract_int("GEPA_CRDT_INVARIANT_VIOLATIONS") + return result + + +class DstFitnessEvaluator(ABC): + """Abstract base class for DST fitness evaluation.""" + + @abstractmethod + def evaluate(self, candidate: DstCandidate) -> float: + """Evaluate a DST candidate. Returns fitness score 0.0-1.0.""" + ... + + +class MockDstEvaluator(DstFitnessEvaluator): + """ + Mock evaluator for testing GA machinery. + + Simulates: higher global_multiplier and moderate fault rates find more bugs, + but too-high rates cause crashes (penalty). + """ + + def __init__(self, rng=None): + import random + self._rng = rng or random.Random() + + def evaluate(self, candidate: DstCandidate) -> float: + gm = candidate.config.get("global_multiplier", 1.0) + + # Simulate: moderate configs find more bugs + # Sweet spot around gm=1.5-2.5 + bug_discovery = 1.0 - abs(gm - 2.0) / 3.0 + bug_discovery = max(0.0, min(1.0, bug_discovery)) + + # Higher fault rates = more coverage but also more crashes + total_fault_rate = sum( + v for k, v in candidate.config.items() + if k != "global_multiplier" + ) + avg_fault_rate = total_fault_rate / max(1, len(candidate.config) - 1) + + # Coverage increases with fault rate up to a point + coverage = min(1.0, avg_fault_rate * 20.0) + + # Crash penalty for very high rates + crash_penalty = 0.0 + if avg_fault_rate > 0.05: + crash_penalty = (avg_fault_rate - 0.05) * 5.0 + + score = ( + 0.50 * bug_discovery + + 0.30 * coverage + + 0.20 * (1.0 - min(1.0, crash_penalty)) + ) + + noise = self._rng.gauss(0, 0.02) + return max(0.0, min(1.0, score + noise)) + + +class CargoDstEvaluator(DstFitnessEvaluator): + """ + Multi-target evaluator: runs all 4 DST subsystem tests with BUGGIFY_CONFIG. + + Targets: + 1. dst_batch_verification::test_env_config_batch -> crash/recovery metrics + 2. streaming_dst_test::test_env_config_streaming -> object store fault metrics + 3. wal_dst_test::test_env_config_wal -> WAL disk fault metrics + 4. crdt_dst_test::test_env_config_crdt -> replication fault metrics + + Fitness score (0.0-1.0): + 0.25 * crash_score (process crashes / recovery cycles) + 0.25 * streaming_score (object store fault coverage + ops surviving) + 0.25 * wal_score (disk faults triggered, no data loss) + 0.25 * crdt_score (gossip faults, convergence maintained) + """ + + # Test targets: (test_file, test_name) + TARGETS = [ + ("dst_batch_verification", "test_env_config_batch"), + ("streaming_dst_test", "test_env_config_streaming"), + ("wal_dst_test", "test_env_config_wal"), + ("crdt_dst_test", "test_env_config_crdt"), + ] + + def __init__( + self, + timeout: int = 300, + project_dir: Path = None, + ): + self.timeout = timeout + self.project_dir = project_dir or Path(__file__).parent.parent + + def evaluate(self, candidate: DstCandidate) -> float: + env_string = candidate.to_env_string() + + # Run all 4 targets and collect results + batch = self._run_target(env_string, "dst_batch_verification", "test_env_config_batch") + streaming = self._run_target(env_string, "streaming_dst_test", "test_env_config_streaming") + wal = self._run_target(env_string, "wal_dst_test", "test_env_config_wal") + crdt = self._run_target(env_string, "crdt_dst_test", "test_env_config_crdt") + + # Parse each target's output + batch_result = BatchResult.parse(batch[0], batch[1]) if batch else None + streaming_result = StreamingResult.parse(streaming[0], streaming[1]) if streaming else None + wal_result = WalResult.parse(wal[0], wal[1]) if wal else None + crdt_result = CrdtResult.parse(crdt[0], crdt[1]) if crdt else None + + # Compute per-subsystem scores + crash_score = self._crash_subscore(batch_result, candidate) + streaming_score = self._streaming_subscore(streaming_result) + wal_score = self._wal_subscore(wal_result) + crdt_score = self._crdt_subscore(crdt_result) + + fitness = ( + 0.25 * crash_score + + 0.25 * streaming_score + + 0.25 * wal_score + + 0.25 * crdt_score + ) + + logger.info( + "Fitness=%.4f (crash=%.3f stream=%.3f wal=%.3f crdt=%.3f)", + fitness, crash_score, streaming_score, wal_score, crdt_score, + ) + + return max(0.0, min(1.0, fitness)) + + def _run_target( + self, env_string: str, test_file: str, test_name: str + ) -> Optional[tuple]: + """Run a single test target. Returns (output, passed) or None on timeout.""" + env = os.environ.copy() + env["BUGGIFY_CONFIG"] = env_string + + cmd = [ + "cargo", "test", "--release", + "--test", test_file, + test_name, + "--", "--nocapture", + ] + + try: + result = subprocess.run( + cmd, + capture_output=True, + text=True, + timeout=self.timeout, + cwd=str(self.project_dir), + env=env, + ) + output = result.stdout + result.stderr + test_passed = result.returncode == 0 + logger.debug("Target %s::%s passed=%s", test_file, test_name, test_passed) + return (output, test_passed) + + except subprocess.TimeoutExpired: + logger.warning("Target %s::%s timed out after %ds", test_file, test_name, self.timeout) + return None + + def _crash_subscore(self, result: Optional[BatchResult], candidate: DstCandidate) -> float: + """Score for crash/recovery subsystem (dst_batch_verification).""" + if result is None: + return 0.0 + + # Crash intensity: more crashes = stressing harder + if result.total_runs > 0: + crashes_per_run = result.total_crashes / result.total_runs + intensity = min(1.0, crashes_per_run / 20.0) + else: + intensity = 0.0 + + # Fault trigger rate + if result.buggify_checks > 0: + trigger_rate = result.buggify_triggers / result.buggify_checks + trigger_score = min(1.0, trigger_rate * 10.0) + else: + trigger_score = 0.0 + + # Fault coverage + configured_faults = sum( + 1 for k, v in candidate.config.items() + if k != "global_multiplier" and v > 0 + ) + if configured_faults > 0: + coverage = min(1.0, result.faults_triggered / configured_faults) + else: + coverage = 0.0 + + # Stability + stability = 1.0 if result.failures == 0 else 0.0 + + return ( + 0.30 * intensity + + 0.25 * trigger_score + + 0.25 * coverage + + 0.20 * stability + ) + + def _streaming_subscore(self, result: Optional[StreamingResult]) -> float: + """Score for object store fault subsystem (streaming_dst_test).""" + if result is None: + return 0.0 + + # Fault intensity: total store failures / total ops + total_failures = result.put_failures + result.get_failures + if result.total_ops > 0: + fault_rate = total_failures / result.total_ops + intensity = min(1.0, fault_rate * 10.0) + else: + intensity = 0.0 + + # Operations surviving: high ops count = system resilient under faults + ops_score = min(1.0, result.total_ops / 2000.0) + + # Stability: penalize invariant violations + stability = 1.0 if result.invariant_violations == 0 else 0.0 + + return ( + 0.40 * intensity + + 0.30 * ops_score + + 0.30 * stability + ) + + def _wal_subscore(self, result: Optional[WalResult]) -> float: + """Score for WAL disk fault subsystem (wal_dst_test).""" + if result is None: + return 0.0 + + # Fault intensity: disk faults triggered + total_faults = result.write_failures + result.sync_failures + result.disk_full + if result.total_writes > 0: + fault_rate = total_faults / result.total_writes + intensity = min(1.0, fault_rate * 10.0) + else: + intensity = 0.0 + + # Data durability: acknowledged writes with zero missing + if result.acknowledged > 0: + durability = 1.0 if result.missing_after_recovery == 0 else 0.0 + else: + durability = 0.5 # No acked writes = system handled all faults + + # Stability: no invariant failures + stability = 1.0 if result.invariant_failures == 0 else 0.0 + + return ( + 0.35 * intensity + + 0.35 * durability + + 0.30 * stability + ) + + def _crdt_subscore(self, result: Optional[CrdtResult]) -> float: + """Score for CRDT replication fault subsystem (crdt_dst_test).""" + if result is None: + return 0.0 + + # Fault intensity: message drops / total ops + if result.total_ops > 0: + drop_rate = result.message_drops / result.total_ops + intensity = min(1.0, drop_rate * 5.0) + else: + intensity = 0.0 + + # Operations completed + ops_score = min(1.0, result.total_ops / 12000.0) + + # Convergence: penalize convergence failures + stability = 1.0 if result.convergence_failures == 0 else 0.0 + + return ( + 0.35 * intensity + + 0.30 * ops_score + + 0.35 * stability + ) diff --git a/gepa/dst_optimizer.py b/gepa/dst_optimizer.py new file mode 100644 index 0000000..d7c2928 --- /dev/null +++ b/gepa/dst_optimizer.py @@ -0,0 +1,220 @@ +""" +DST Evolution Engine - GA loop for evolving buggify fault configurations. + +Identical pattern to evolve/evolution.py but with float parameters: +- Crossover: Uniform per parameter (50/50) +- Mutation: current += choice([-1, 0, 1]) * step, clamped to bounds +- Initialization: calm + moderate + chaos + random variants +""" + +import copy +import json +import logging +import random +from datetime import datetime +from pathlib import Path +from typing import Dict, List, Optional + +from .dst_candidate import DstCandidate, DST_PARAM_BOUNDS +from .dst_evaluator import DstFitnessEvaluator + +logger = logging.getLogger(__name__) + + +class DstEvolutionEngine: + """ + Evolutionary optimization engine for buggify fault configurations. + + Evolves fault probabilities to find configs that surface the most + invariant violations in DST tests. + """ + + def __init__( + self, + evaluator: DstFitnessEvaluator, + population_size: int = 8, + elite_count: int = 2, + mutation_rate: float = 0.3, + generations: int = 10, + tournament_k: int = 3, + results_dir: Path = Path("gepa/dst_results"), + seed: int = None, + ): + assert population_size >= 4, "population_size must be >= 4" + assert elite_count < population_size, "elite_count must be < population_size" + assert 0.0 <= mutation_rate <= 1.0 + + self.evaluator = evaluator + self.population_size = population_size + self.elite_count = elite_count + self.mutation_rate = mutation_rate + self.generations = generations + self.tournament_k = tournament_k + self.results_dir = Path(results_dir) + self.results_dir.mkdir(parents=True, exist_ok=True) + self.rng = random.Random(seed) + + self._best_ever: Optional[DstCandidate] = None + self._history: List[Dict] = [] + + def run(self) -> DstCandidate: + """Run the evolutionary optimization. Returns the best candidate.""" + logger.info("Starting DST optimization: %d generations, %d population", + self.generations, self.population_size) + + population = self._initialize_population() + + for gen in range(self.generations): + logger.info("=== Generation %d/%d ===", gen + 1, self.generations) + + # Evaluate + for candidate in population: + if candidate.fitness is None: + candidate.fitness = self.evaluator.evaluate(candidate) + candidate.generation = gen + + # Sort by fitness descending + population.sort(key=lambda c: c.fitness or 0.0, reverse=True) + + best = population[0] + avg_fitness = sum(c.fitness or 0 for c in population) / len(population) + + logger.info("Gen %d: best=%.4f avg=%.4f gm=%.2f", + gen + 1, best.fitness or 0, avg_fitness, + best.config.get("global_multiplier", 1.0)) + + if self._best_ever is None or (best.fitness or 0) > (self._best_ever.fitness or 0): + self._best_ever = best + logger.info("NEW BEST: %.4f", best.fitness) + + self._history.append({ + "generation": gen + 1, + "best_fitness": best.fitness, + "avg_fitness": avg_fitness, + "best_global_multiplier": best.config.get("global_multiplier", 1.0), + "population_fitness": [c.fitness for c in population], + }) + + self._save_generation(gen, population) + + # Create next generation (unless last) + if gen < self.generations - 1: + population = self._evolve(population) + + if self._best_ever: + self._save_final_results() + + logger.info("DST optimization complete. Best: %.4f", + self._best_ever.fitness if self._best_ever else 0) + + return self._best_ever or DstCandidate.from_preset("moderate") + + def _initialize_population(self) -> List[DstCandidate]: + """Create initial population: presets + random variants.""" + population = [] + + # Include all 3 presets + for preset in ["calm", "moderate", "chaos"]: + population.append(DstCandidate.from_preset(preset)) + + # Fill with random variants + while len(population) < self.population_size: + candidate = self._random_candidate() + if candidate.is_valid(): + population.append(candidate) + + return population[:self.population_size] + + def _random_candidate(self) -> DstCandidate: + """Generate a random valid configuration.""" + config = {} + for key, bounds in DST_PARAM_BOUNDS.items(): + steps = int((bounds.max - bounds.min) / bounds.step) + step_idx = self.rng.randint(0, max(1, steps)) + config[key] = bounds.clamp(bounds.min + step_idx * bounds.step) + return DstCandidate(config=config) + + def _evolve(self, population: List[DstCandidate]) -> List[DstCandidate]: + """Create next generation.""" + new_population = [] + + # Elitism + for candidate in population[:self.elite_count]: + elite = copy.deepcopy(candidate) + elite._id = DstCandidate._next_id() + elite.parent_ids = [candidate._id] + new_population.append(elite) + + # Offspring + while len(new_population) < self.population_size: + parent1 = self._tournament_select(population) + parent2 = self._tournament_select(population) + child = self._crossover(parent1, parent2) + child = self._mutate(child) + child.parent_ids = [parent1._id, parent2._id] + if child.is_valid(): + new_population.append(child) + + return new_population + + def _tournament_select(self, population: List[DstCandidate]) -> DstCandidate: + k = min(self.tournament_k, len(population)) + tournament = self.rng.sample(population, k) + return max(tournament, key=lambda c: c.fitness or 0) + + def _crossover(self, parent1: DstCandidate, parent2: DstCandidate) -> DstCandidate: + """Uniform crossover per parameter.""" + config = {} + for key in DST_PARAM_BOUNDS: + if self.rng.random() < 0.5: + config[key] = parent1.config.get(key, DST_PARAM_BOUNDS[key].default) + else: + config[key] = parent2.config.get(key, DST_PARAM_BOUNDS[key].default) + child = DstCandidate(config=config) + child.fitness = None + return child + + def _mutate(self, candidate: DstCandidate) -> DstCandidate: + """Mutate: current += choice([-1, 0, 1]) * step, clamped.""" + for key, bounds in DST_PARAM_BOUNDS.items(): + if self.rng.random() < self.mutation_rate: + current = candidate.config.get(key, bounds.default) + delta = self.rng.choice([-1, 0, 1]) * bounds.step + candidate.config[key] = bounds.clamp(current + delta) + candidate.fitness = None + return candidate + + def _save_generation(self, gen: int, population: List[DstCandidate]) -> None: + filename = self.results_dir / f"gen_{gen:03d}.json" + data = { + "generation": gen + 1, + "timestamp": datetime.now().isoformat(), + "population": [c.to_dict() for c in population], + } + filename.write_text(json.dumps(data, indent=2)) + + def _save_final_results(self) -> None: + if self._best_ever: + # Save as env string + env_path = self.results_dir / "best_config.env" + env_path.write_text(self._best_ever.to_env_string()) + logger.info("Best config saved to: %s", env_path) + + history_file = self.results_dir / "evolution_history.json" + data = { + "completed_at": datetime.now().isoformat(), + "generations": self.generations, + "population_size": self.population_size, + "mutation_rate": self.mutation_rate, + "best_candidate": self._best_ever.to_dict() if self._best_ever else None, + "history": self._history, + } + history_file.write_text(json.dumps(data, indent=2)) + + @property + def best_ever(self) -> Optional[DstCandidate]: + return self._best_ever + + @property + def history(self) -> List[Dict]: + return self._history diff --git a/gepa/evaluator.py b/gepa/evaluator.py new file mode 100644 index 0000000..74597f6 --- /dev/null +++ b/gepa/evaluator.py @@ -0,0 +1,262 @@ +""" +Evaluator - offline and live evaluation modes for skill quality. + +Offline mode: score cached review text against ground truth (free, instant). +Live mode: call Claude CLI with skill as context, score response (costs ~$0.05/eval). +""" + +import json +import logging +import subprocess +from dataclasses import dataclass, field +from pathlib import Path +from typing import Dict, List, Optional + +from .scorer import Scorer, ScoreResult +from .candidate import SkillCandidate + +logger = logging.getLogger(__name__) + +# Default locations +GROUND_TRUTH_DIR = Path(__file__).parent / "ground_truth" +AGENTS_DIR = Path(__file__).parent.parent / ".claude" / "agents" +REVIEWS_DIR = Path(__file__).parent / "reviews" + + +@dataclass +class EvaluationResult: + """Result of evaluating a skill across multiple ground truth tasks.""" + skill_name: str + task_scores: Dict[str, ScoreResult] # task_name -> ScoreResult + aggregate_score: float # Mean composite across tasks + + def summary(self) -> str: + lines = [f"=== {self.skill_name} (aggregate: {self.aggregate_score:.3f}) ==="] + for task_name, result in sorted(self.task_scores.items()): + lines.append(f"\n--- {task_name} ---") + lines.append(result.summary()) + return "\n".join(lines) + + +class OfflineEvaluator: + """ + Score skills against ground truth using cached review texts. + + Cached reviews are stored in gepa/reviews/{skill_name}/{task_name}.txt. + These are captured once (from real Claude reviews) and reused for + deterministic, zero-cost scoring iterations. + """ + + def __init__( + self, + ground_truth_dir: Optional[Path] = None, + reviews_dir: Optional[Path] = None, + scorer: Optional[Scorer] = None, + ): + self.ground_truth_dir = ground_truth_dir or GROUND_TRUTH_DIR + self.reviews_dir = reviews_dir or REVIEWS_DIR + self.scorer = scorer or Scorer() + + assert self.ground_truth_dir.exists(), ( + f"Ground truth directory not found: {self.ground_truth_dir}" + ) + + def load_ground_truth(self, task_name: str) -> Dict: + """Load a ground truth JSON file by task name.""" + path = self.ground_truth_dir / f"{task_name}.json" + assert path.exists(), f"Ground truth file not found: {path}" + return json.loads(path.read_text()) + + def list_tasks(self) -> List[str]: + """List available ground truth tasks.""" + return sorted( + p.stem for p in self.ground_truth_dir.glob("*.json") + ) + + def get_review_text(self, skill_name: str, task_name: str) -> Optional[str]: + """Load cached review text for a skill+task combination.""" + path = self.reviews_dir / skill_name / f"{task_name}.txt" + if path.exists(): + return path.read_text() + return None + + def save_review_text(self, skill_name: str, task_name: str, text: str) -> Path: + """Save review text for later offline scoring.""" + dir_path = self.reviews_dir / skill_name + dir_path.mkdir(parents=True, exist_ok=True) + path = dir_path / f"{task_name}.txt" + path.write_text(text) + return path + + def evaluate_skill( + self, + skill_name: str, + tasks: Optional[List[str]] = None, + ) -> EvaluationResult: + """ + Evaluate a skill across all (or specified) ground truth tasks. + + Args: + skill_name: Name of the skill (e.g., "rust-dev"). + tasks: Specific tasks to evaluate. Defaults to all available. + + Returns: + EvaluationResult with per-task and aggregate scores. + """ + if tasks is None: + tasks = self.list_tasks() + + task_scores = {} + for task_name in tasks: + review_text = self.get_review_text(skill_name, task_name) + if review_text is None: + logger.warning( + "No cached review for %s/%s, skipping", skill_name, task_name + ) + continue + + ground_truth = self.load_ground_truth(task_name) + score = self.scorer.score(review_text, ground_truth, skill_name=skill_name) + task_scores[task_name] = score + + aggregate = 0.0 + if task_scores: + aggregate = sum(s.composite for s in task_scores.values()) / len(task_scores) + + return EvaluationResult( + skill_name=skill_name, + task_scores=task_scores, + aggregate_score=aggregate, + ) + + def evaluate_all_skills(self) -> Dict[str, EvaluationResult]: + """Evaluate all skills that have cached reviews.""" + results = {} + if not self.reviews_dir.exists(): + logger.warning("Reviews directory does not exist: %s", self.reviews_dir) + return results + + for skill_dir in sorted(self.reviews_dir.iterdir()): + if skill_dir.is_dir(): + result = self.evaluate_skill(skill_dir.name) + if result.task_scores: + results[skill_dir.name] = result + + return results + + +class LiveEvaluator: + """ + Evaluate skills by running Claude CLI with skill context. + + Generates a review by invoking Claude with the skill file as context, + then scores the response against ground truth. Costs ~$0.05 per eval. + """ + + def __init__( + self, + ground_truth_dir: Optional[Path] = None, + agents_dir: Optional[Path] = None, + reviews_dir: Optional[Path] = None, + scorer: Optional[Scorer] = None, + model: str = "sonnet", + ): + self.ground_truth_dir = ground_truth_dir or GROUND_TRUTH_DIR + self.agents_dir = agents_dir or AGENTS_DIR + self.reviews_dir = reviews_dir or REVIEWS_DIR + self.scorer = scorer or Scorer() + self.model = model + + def evaluate_skill_live( + self, + skill_name: str, + task_name: str, + save_review: bool = True, + ) -> Optional[ScoreResult]: + """ + Run a live evaluation: invoke Claude, score the response. + + Args: + skill_name: Skill to evaluate (e.g., "rust-dev"). + task_name: Ground truth task (e.g., "paper_review"). + save_review: Whether to cache the review text. + + Returns: + ScoreResult, or None if the CLI call fails. + """ + ground_truth = json.loads( + (self.ground_truth_dir / f"{task_name}.json").read_text() + ) + + # Build the prompt + context_files = ground_truth.get("context_files", []) + prompt = self._build_review_prompt(task_name, context_files) + + # Invoke Claude CLI + skill_path = self.agents_dir / f"{skill_name}.md" + if not skill_path.exists(): + logger.error("Skill file not found: %s", skill_path) + return None + + review_text = self._call_claude(prompt, skill_path) + if review_text is None: + return None + + # Optionally save the review + if save_review: + evaluator = OfflineEvaluator( + ground_truth_dir=self.ground_truth_dir, + reviews_dir=self.reviews_dir, + ) + evaluator.save_review_text(skill_name, task_name, review_text) + + return self.scorer.score(review_text, ground_truth, skill_name=skill_name) + + def _build_review_prompt(self, task_name: str, context_files: List[str]) -> str: + """Build a review prompt for the given task.""" + if "paper" in task_name: + return ( + "Review the technical paper (docs/PAPER.md) for accuracy. " + "Focus on: factual claims about the architecture, benchmark " + "methodology, and testing coverage. Flag any inaccuracies, " + "misleading claims, or important gaps. Be specific and cite " + "the relevant section." + ) + elif "pr" in task_name: + files_str = ", ".join(context_files) + return ( + f"Review this PR's changes to: {files_str}. " + "Check for: correctness, TigerStyle compliance (assertions, " + "checked arithmetic), DST coverage gaps, and Redis compatibility. " + "Flag violations, bugs, and missing test coverage." + ) + return "Review the code for correctness, style, and test coverage." + + def _call_claude(self, prompt: str, skill_path: Path) -> Optional[str]: + """Invoke Claude CLI with skill context. Returns response text.""" + cmd = [ + "claude", + "--print", + "--model", self.model, + "--system", skill_path.read_text(), + prompt, + ] + + try: + result = subprocess.run( + cmd, + capture_output=True, + text=True, + timeout=120, + cwd=str(Path(__file__).parent.parent), + ) + if result.returncode != 0: + logger.error("Claude CLI failed: %s", result.stderr[:500]) + return None + return result.stdout + except subprocess.TimeoutExpired: + logger.error("Claude CLI timed out after 120s") + return None + except FileNotFoundError: + logger.error("Claude CLI not found. Install: npm i -g @anthropic-ai/claude-code") + return None diff --git a/gepa/evolution.py b/gepa/evolution.py new file mode 100644 index 0000000..2a2370d --- /dev/null +++ b/gepa/evolution.py @@ -0,0 +1,245 @@ +""" +Skill Evolution Engine - GA loop for evolving skill markdown files. + +Mirrors evolve/evolution.py patterns: +population -> evaluate -> elitism -> tournament -> crossover -> mutate -> repeat + +Key differences from config evolution: +- Candidate type: SkillCandidate (sections) not numeric config dict +- Crossover: Section-level uniform (50/50 per section from each parent) +- Mutation: Apply random text operator per section with mutation_rate probability +""" + +import copy +import json +import logging +import random +from datetime import datetime +from pathlib import Path +from typing import Dict, List, Optional + +from .candidate import SkillCandidate +from .mutations import ALL_MUTATIONS +from .skill_fitness import SkillFitnessEvaluator, CostTracker + +logger = logging.getLogger(__name__) + + +class SkillEvolutionEngine: + """ + Evolutionary optimization engine for skill markdown files. + + Uses tournament selection, section-level crossover, and text mutations + to evolve skills towards better review quality scores. + """ + + def __init__( + self, + skill_name: str, + evaluator: SkillFitnessEvaluator, + population_size: int = 8, + elite_count: int = 2, + mutation_rate: float = 0.3, + generations: int = 10, + tournament_k: int = 3, + results_dir: Path = Path("gepa/results"), + budget_cap: float = 5.0, + seed: int = None, + ): + assert population_size >= 4, "population_size must be >= 4" + assert elite_count < population_size, "elite_count must be < population_size" + assert 0.0 <= mutation_rate <= 1.0, "mutation_rate must be in [0, 1]" + + self.skill_name = skill_name + self.evaluator = evaluator + self.population_size = population_size + self.elite_count = elite_count + self.mutation_rate = mutation_rate + self.generations = generations + self.tournament_k = tournament_k + self.results_dir = Path(results_dir) / skill_name + self.results_dir.mkdir(parents=True, exist_ok=True) + self.cost_tracker = CostTracker(budget_cap=budget_cap) + self.rng = random.Random(seed) + + self._best_ever: Optional[SkillCandidate] = None + self._history: List[Dict] = [] + + def run(self) -> SkillCandidate: + """Run the evolutionary optimization. Returns the best candidate found.""" + logger.info("Starting skill evolution: %d generations, %d population, skill=%s", + self.generations, self.population_size, self.skill_name) + + # Load original skill + from .evaluator import AGENTS_DIR + original = SkillCandidate.from_file(AGENTS_DIR / f"{self.skill_name}.md") + population = self._initialize_population(original) + + for gen in range(self.generations): + logger.info("=== Generation %d/%d ===", gen + 1, self.generations) + + # Budget check + eval_cost = self.evaluator.cost_per_eval + needed = sum(1 for c in population if c.fitness is None) * eval_cost + if eval_cost > 0 and not self.cost_tracker.can_afford(needed): + logger.warning("Budget exhausted at generation %d. %s", + gen + 1, self.cost_tracker.summary()) + break + + # Evaluate unevaluated candidates + for candidate in population: + if candidate.fitness is None: + candidate.fitness = self.evaluator.evaluate(candidate) + candidate.generation = gen + if eval_cost > 0: + self.cost_tracker.charge(eval_cost) + + # Sort by fitness descending + population.sort(key=lambda c: c.fitness or 0.0, reverse=True) + + best = population[0] + avg_fitness = sum(c.fitness or 0 for c in population) / len(population) + + logger.info("Gen %d: best=%.3f avg=%.3f [%s]", + gen + 1, best.fitness or 0, avg_fitness, + ", ".join(f"{c.fitness:.3f}" for c in population[:4])) + + # Track best ever + if self._best_ever is None or (best.fitness or 0) > (self._best_ever.fitness or 0): + self._best_ever = best + logger.info("NEW BEST: %.3f", best.fitness) + + self._history.append({ + "generation": gen + 1, + "best_fitness": best.fitness, + "avg_fitness": avg_fitness, + "population_fitness": [c.fitness for c in population], + }) + + self._save_generation(gen, population) + + # Create next generation (unless last) + if gen < self.generations - 1: + population = self._evolve(population, original) + + # Save final results + if self._best_ever: + self._save_final_results() + + logger.info("Evolution complete. Best: %.3f. %s", + self._best_ever.fitness if self._best_ever else 0, + self.cost_tracker.summary()) + + return self._best_ever or original + + def _initialize_population(self, original: SkillCandidate) -> List[SkillCandidate]: + """Create initial population: original + mutated variants.""" + population = [self._clone_candidate(original)] + + while len(population) < self.population_size: + mutant = self._clone_candidate(original) + mutant = self._mutate(mutant) + mutant.parent_ids = [original._id] + population.append(mutant) + + return population + + def _evolve(self, population: List[SkillCandidate], original: SkillCandidate) -> List[SkillCandidate]: + """Create next generation through selection, crossover, and mutation.""" + new_population = [] + + # Elitism: keep top candidates + for candidate in population[:self.elite_count]: + elite = self._clone_candidate(candidate) + elite.fitness = candidate.fitness # Preserve to avoid re-evaluation + elite.parent_ids = [candidate._id] + new_population.append(elite) + + # Fill rest with offspring + while len(new_population) < self.population_size: + parent1 = self._tournament_select(population) + parent2 = self._tournament_select(population) + child = self._crossover(parent1, parent2) + child = self._mutate(child) + child.parent_ids = [parent1._id, parent2._id] + new_population.append(child) + + return new_population + + def _tournament_select(self, population: List[SkillCandidate]) -> SkillCandidate: + """Select a candidate using tournament selection.""" + k = min(self.tournament_k, len(population)) + tournament = self.rng.sample(population, k) + return max(tournament, key=lambda c: c.fitness or 0) + + def _crossover(self, parent1: SkillCandidate, parent2: SkillCandidate) -> SkillCandidate: + """Section-level uniform crossover: 50/50 per section from each parent.""" + child = self._clone_candidate(parent1) + + # Match sections by heading + p2_sections = {s.heading.lower(): s for s in parent2.sections} + for i, section in enumerate(child.sections): + key = section.heading.lower() + if key in p2_sections and self.rng.random() < 0.5: + child.sections[i] = copy.deepcopy(p2_sections[key]) + + child.fitness = None # Must re-evaluate + return child + + def _mutate(self, candidate: SkillCandidate) -> SkillCandidate: + """Apply random mutation operator per section with mutation_rate probability.""" + for section in candidate.sections: + if self.rng.random() < self.mutation_rate: + # Pick random mutation (excluding section_swap for individual sections) + mutations = [m for m in ALL_MUTATIONS if m.__name__ != "section_swap"] + mutation = self.rng.choice(mutations) + section.content = mutation(section.content, self.rng) + + candidate.fitness = None # Must re-evaluate + return candidate + + def _clone_candidate(self, original: SkillCandidate) -> SkillCandidate: + """Deep clone a candidate.""" + clone = copy.deepcopy(original) + clone._id = SkillCandidate._next_id() + clone.fitness = None + clone.parent_ids = [] + return clone + + def _save_generation(self, gen: int, population: List[SkillCandidate]) -> None: + """Save generation results to JSON.""" + filename = self.results_dir / f"gen_{gen:03d}.json" + data = { + "generation": gen + 1, + "timestamp": datetime.now().isoformat(), + "population": [c.to_dict() for c in population], + } + filename.write_text(json.dumps(data, indent=2)) + + def _save_final_results(self) -> None: + """Save best skill and evolution history.""" + if self._best_ever: + best_path = self.results_dir / "best_skill.md" + self._best_ever.save(best_path) + logger.info("Best skill saved to: %s", best_path) + + history_file = self.results_dir / "evolution_history.json" + data = { + "completed_at": datetime.now().isoformat(), + "skill_name": self.skill_name, + "generations": self.generations, + "population_size": self.population_size, + "mutation_rate": self.mutation_rate, + "budget": self.cost_tracker.summary(), + "best_candidate": self._best_ever.to_dict() if self._best_ever else None, + "history": self._history, + } + history_file.write_text(json.dumps(data, indent=2)) + + @property + def best_ever(self) -> Optional[SkillCandidate]: + return self._best_ever + + @property + def history(self) -> List[Dict]: + return self._history diff --git a/gepa/harness.py b/gepa/harness.py new file mode 100644 index 0000000..7e80df4 --- /dev/null +++ b/gepa/harness.py @@ -0,0 +1,392 @@ +""" +GEPA Harness - CLI entry point for skill evaluation and evolution. + +Usage: + python -m gepa.harness --score-baseline # Score all skills with cached reviews + python -m gepa.harness --skill rust-dev --offline # Score one skill offline + python -m gepa.harness --capture-reviews # Record baseline reviews via Claude CLI + python -m gepa.harness --list-tasks # Show available ground truth tasks + python -m gepa.harness --detail rust-dev paper_review # Detailed score for one skill+task + + # Phase 2: Evolution + python -m gepa.harness --evolve rust-dev --mock # Test GA with mock evaluator + python -m gepa.harness --evolve rust-dev --live # Real evolution via Claude CLI + python -m gepa.harness --evolve rust-dev --live --budget 10 --generations 15 + + # Phase 2: DST Config Optimization + python -m gepa.harness --dst-optimize --mock # Test DST GA with mock evaluator + python -m gepa.harness --dst-optimize --test executor_dst_test --seeds 10 +""" + +import argparse +import json +import logging +import sys +from pathlib import Path + +from .evaluator import OfflineEvaluator, LiveEvaluator, GROUND_TRUTH_DIR, REVIEWS_DIR, AGENTS_DIR + +logger = logging.getLogger(__name__) + +SKILL_NAMES = [ + "actor-model", + "distributed-systems", + "dst", + "formal-verification", + "rust-dev", + "tigerstyle", +] + + +def score_baseline(evaluator: OfflineEvaluator) -> None: + """Score all skills against all available ground truth tasks.""" + results = evaluator.evaluate_all_skills() + + if not results: + print("No cached reviews found. Run --capture-reviews first.") + print(f" Expected reviews in: {evaluator.reviews_dir}/") + print(f" Structure: reviews/{{skill_name}}/{{task_name}}.txt") + return + + print("=" * 60) + print("GEPA Baseline Scores") + print("=" * 60) + + # Sort by aggregate score descending + ranked = sorted(results.items(), key=lambda x: x[1].aggregate_score, reverse=True) + for rank, (name, result) in enumerate(ranked, 1): + print(f"\n{rank}. {result.summary()}") + + # Leaderboard summary + print("\n" + "=" * 60) + print("Leaderboard") + print("-" * 60) + print(f"{'Rank':<6}{'Skill':<25}{'Score':<10}{'Tasks':<6}") + print("-" * 60) + for rank, (name, result) in enumerate(ranked, 1): + n_tasks = len(result.task_scores) + print(f"{rank:<6}{name:<25}{result.aggregate_score:<10.3f}{n_tasks:<6}") + + +def score_skill(evaluator: OfflineEvaluator, skill_name: str, tasks=None) -> None: + """Score a single skill.""" + result = evaluator.evaluate_skill(skill_name, tasks=tasks) + if not result.task_scores: + print(f"No cached reviews for skill '{skill_name}'.") + print(f" Expected in: {evaluator.reviews_dir}/{skill_name}/") + return + print(result.summary()) + + +def show_detail(evaluator: OfflineEvaluator, skill_name: str, task_name: str) -> None: + """Show detailed scoring for one skill + task combination.""" + review_text = evaluator.get_review_text(skill_name, task_name) + if review_text is None: + print(f"No cached review for {skill_name}/{task_name}") + return + + ground_truth = evaluator.load_ground_truth(task_name) + result = evaluator.scorer.score(review_text, ground_truth, skill_name=skill_name) + + print(f"=== {skill_name} on {task_name} ===\n") + print(result.summary()) + + print("\nFinding Details:") + print("-" * 60) + for match in result.finding_matches: + status = "HIT" if match.matched else "MISS" + print( + f" [{status}] {match.finding_id}: " + f"{match.keyword_hits}/{match.keyword_total} keywords " + f"(confidence={match.confidence:.2f}, weight={match.weight:.1f})" + ) + + if result.false_positive_matches: + print("\nFalse Positive Checks:") + for fp in result.false_positive_matches: + status = "TRIGGERED" if fp.triggered else "ok" + penalty_str = f" (penalty={fp.penalty:.1f})" if fp.triggered else "" + print(f" [{status}] {fp.fp_id}{penalty_str}") + + +def capture_reviews(live_evaluator: LiveEvaluator) -> None: + """Capture baseline reviews for all skills by running Claude CLI.""" + tasks = sorted( + p.stem for p in GROUND_TRUTH_DIR.glob("*.json") + ) + + print(f"Capturing reviews for {len(SKILL_NAMES)} skills x {len(tasks)} tasks") + print(f"Estimated cost: ~${len(SKILL_NAMES) * len(tasks) * 0.05:.2f}") + print() + + for skill_name in SKILL_NAMES: + for task_name in tasks: + print(f" {skill_name}/{task_name}...", end=" ", flush=True) + result = live_evaluator.evaluate_skill_live( + skill_name, task_name, save_review=True + ) + if result: + print(f"score={result.composite:.3f}") + else: + print("FAILED") + + +def list_tasks() -> None: + """List available ground truth tasks with details.""" + tasks = sorted(GROUND_TRUTH_DIR.glob("*.json")) + if not tasks: + print("No ground truth files found.") + return + + print("Available ground truth tasks:") + print("-" * 60) + for path in tasks: + data = json.loads(path.read_text()) + n_findings = len(data.get("expected_findings", [])) + n_fp = len(data.get("known_false_positives", [])) + skills = ", ".join(data.get("skills_under_test", [])) + print(f" {path.stem}") + print(f" {data.get('description', 'No description')}") + print(f" Findings: {n_findings}, False positives: {n_fp}") + print(f" Skills: {skills}") + print() + + +def run_evolution(args) -> None: + """Run skill evolution with GA.""" + from .evolution import SkillEvolutionEngine + from .skill_fitness import MockSkillEvaluator, LiveSkillEvaluator, CostTracker + + skill_name = args.evolve + + if skill_name not in SKILL_NAMES: + print(f"Unknown skill: {skill_name}") + print(f"Available: {', '.join(SKILL_NAMES)}") + return + + if args.mock: + import random + evaluator = MockSkillEvaluator( + baseline=0.5, noise_std=0.05, + rng=random.Random(42), + ) + print(f"Running mock evolution for '{skill_name}'...") + elif args.live: + gt_dir = args.ground_truth_dir or GROUND_TRUTH_DIR + evaluator = LiveSkillEvaluator( + ground_truth_dir=gt_dir, + model=args.model, + cost_tracker=CostTracker(budget_cap=args.budget), + ) + print(f"Running live evolution for '{skill_name}' (budget=${args.budget:.2f})...") + else: + print("Specify --mock or --live for evolution mode.") + return + + engine = SkillEvolutionEngine( + skill_name=skill_name, + evaluator=evaluator, + population_size=args.population, + generations=args.generations, + budget_cap=args.budget, + seed=args.seed, + ) + best = engine.run() + + print(f"\nBest candidate: {best}") + print(f"Results saved to: {engine.results_dir}/") + + +def run_dst_optimize(args) -> None: + """Run DST config optimization with GA.""" + from .dst_optimizer import DstEvolutionEngine + from .dst_evaluator import MockDstEvaluator, CargoDstEvaluator + + if args.mock: + import random + evaluator = MockDstEvaluator(rng=random.Random(42)) + print("Running mock DST optimization...") + else: + evaluator = CargoDstEvaluator( + test_targets=[args.test] if args.test else ["executor_dst_test"], + seeds=range(0, args.seeds), + timeout=args.timeout, + ) + print(f"Running DST optimization (test={args.test or 'executor_dst_test'}, " + f"seeds={args.seeds})...") + + engine = DstEvolutionEngine( + evaluator=evaluator, + population_size=args.population, + generations=args.generations, + seed=args.seed, + ) + best = engine.run() + + print(f"\nBest candidate: {best}") + print(f"Best config: {best.to_env_string()[:100]}...") + print(f"Results saved to: {engine.results_dir}/") + + +def main() -> None: + parser = argparse.ArgumentParser( + description="GEPA - Genetic Evolution for Prompt Artifacts", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Examples: + # Phase 1: Scoring + python -m gepa.harness --score-baseline + python -m gepa.harness --skill rust-dev --offline + python -m gepa.harness --detail rust-dev paper_review + python -m gepa.harness --capture-reviews --model sonnet + python -m gepa.harness --list-tasks + + # Phase 2: Skill Evolution + python -m gepa.harness --evolve rust-dev --mock + python -m gepa.harness --evolve rust-dev --live --budget 10.0 --generations 15 + + # Phase 2: DST Config Optimization + python -m gepa.harness --dst-optimize --mock + python -m gepa.harness --dst-optimize --test executor_dst_test --seeds 10 +""", + ) + + # Phase 1: Scoring + parser.add_argument( + "--score-baseline", action="store_true", + help="Score all skills with cached reviews", + ) + parser.add_argument( + "--skill", type=str, + help="Evaluate a specific skill", + ) + parser.add_argument( + "--offline", action="store_true", + help="Use offline scoring (cached reviews only)", + ) + parser.add_argument( + "--detail", nargs=2, metavar=("SKILL", "TASK"), + help="Show detailed score for skill+task", + ) + parser.add_argument( + "--capture-reviews", action="store_true", + help="Capture baseline reviews via Claude CLI", + ) + parser.add_argument( + "--list-tasks", action="store_true", + help="List available ground truth tasks", + ) + parser.add_argument( + "--model", type=str, default="sonnet", + help="Model for live evaluation (default: sonnet)", + ) + parser.add_argument( + "--ground-truth-dir", type=Path, default=None, + help="Override ground truth directory", + ) + parser.add_argument( + "--reviews-dir", type=Path, default=None, + help="Override reviews directory", + ) + + # Phase 2: Skill Evolution + parser.add_argument( + "--evolve", type=str, metavar="SKILL", + help="Evolve a skill using GA (e.g., --evolve rust-dev)", + ) + parser.add_argument( + "--mock", action="store_true", + help="Use mock evaluator (free, for testing GA machinery)", + ) + parser.add_argument( + "--live", action="store_true", + help="Use live evaluator (calls Claude CLI, costs ~$0.05/task)", + ) + parser.add_argument( + "--budget", type=float, default=5.0, + help="Budget cap in USD for live evolution (default: 5.0)", + ) + parser.add_argument( + "--generations", type=int, default=10, + help="Number of GA generations (default: 10)", + ) + parser.add_argument( + "--population", type=int, default=8, + help="Population size per generation (default: 8)", + ) + parser.add_argument( + "--seed", type=int, default=None, + help="Random seed for reproducibility", + ) + + # Phase 2: DST Config Optimization + parser.add_argument( + "--dst-optimize", action="store_true", + help="Optimize DST buggify config using GA", + ) + parser.add_argument( + "--test", type=str, default=None, + help="Cargo test target for DST optimization (default: executor_dst_test)", + ) + parser.add_argument( + "--seeds", type=int, default=10, + help="Number of DST seeds to run per evaluation (default: 10)", + ) + parser.add_argument( + "--timeout", type=int, default=300, + help="Timeout in seconds per cargo test run (default: 300)", + ) + + parser.add_argument( + "-v", "--verbose", action="store_true", + help="Enable debug logging", + ) + + args = parser.parse_args() + + logging.basicConfig( + level=logging.DEBUG if args.verbose else logging.INFO, + format="%(levelname)s: %(message)s", + ) + + gt_dir = args.ground_truth_dir or GROUND_TRUTH_DIR + rev_dir = args.reviews_dir or REVIEWS_DIR + + if args.list_tasks: + list_tasks() + return + + # Phase 2: Evolution + if args.evolve: + run_evolution(args) + return + + if args.dst_optimize: + run_dst_optimize(args) + return + + # Phase 1: Scoring + offline_eval = OfflineEvaluator(ground_truth_dir=gt_dir, reviews_dir=rev_dir) + + if args.score_baseline: + score_baseline(offline_eval) + elif args.detail: + show_detail(offline_eval, args.detail[0], args.detail[1]) + elif args.skill and args.offline: + score_skill(offline_eval, args.skill) + elif args.capture_reviews: + live_eval = LiveEvaluator( + ground_truth_dir=gt_dir, + reviews_dir=rev_dir, + model=args.model, + ) + capture_reviews(live_eval) + elif args.skill: + # Default to offline for a single skill + score_skill(offline_eval, args.skill) + else: + parser.print_help() + + +if __name__ == "__main__": + main() diff --git a/gepa/mutations.py b/gepa/mutations.py new file mode 100644 index 0000000..c64b058 --- /dev/null +++ b/gepa/mutations.py @@ -0,0 +1,119 @@ +""" +Text mutation operators for skill evolution. + +All operators are pure functions: (content: str, rng: random.Random) -> str. +No external dependencies. +""" + +import re +import random as _random_module +from typing import List + + +def sentence_shuffle(content: str, rng: _random_module.Random) -> str: + """Shuffle sentences within the content. Skip if content has >=2 code fences.""" + if content.count("```") >= 2: + return content + sentences = re.split(r'(?<=[.!?])\s+', content) + if len(sentences) <= 1: + return content + rng.shuffle(sentences) + return " ".join(sentences) + + +def sentence_drop(content: str, rng: _random_module.Random) -> str: + """Remove one random sentence (only if at least 2 sentences).""" + sentences = re.split(r'(?<=[.!?])\s+', content) + if len(sentences) < 2: + return content + idx = rng.randrange(len(sentences)) + sentences.pop(idx) + return " ".join(sentences) + + +def sentence_duplicate(content: str, rng: _random_module.Random) -> str: + """Duplicate one random sentence.""" + if not content.strip(): + return content + sentences = re.split(r'(?<=[.!?])\s+', content) + if not sentences: + return content + idx = rng.randrange(len(sentences)) + insert_pos = rng.randrange(len(sentences) + 1) + sentences.insert(insert_pos, sentences[idx]) + return " ".join(sentences) + + +def keyword_inject(content: str, rng: _random_module.Random, keywords: List[str] = None) -> str: + """Insert a templated sentence with keywords from missed findings.""" + if not keywords: + # Default keyword pairs for common review concerns + keyword_pairs = [ + ("assertions", "postconditions"), + ("checked arithmetic", "overflow"), + ("error handling", "unwrap"), + ("DST coverage", "fault injection"), + ("Redis compatibility", "error format"), + ("TigerStyle", "invariants"), + ("borrow checker", "lifetime"), + ("shadow state", "drift"), + ] + kw1, kw2 = rng.choice(keyword_pairs) + else: + if len(keywords) >= 2: + kw1, kw2 = rng.sample(keywords, 2) + elif len(keywords) == 1: + kw1, kw2 = keywords[0], "correctness" + else: + return content + + templates = [ + f"When reviewing, pay special attention to {kw1} vs {kw2} issues.", + f"Check for {kw1} and {kw2} problems in the implementation.", + f"Verify that {kw1} is handled correctly, especially regarding {kw2}.", + ] + sentence = rng.choice(templates) + + sentences = re.split(r'(?<=[.!?])\s+', content) if content.strip() else [] + if sentences: + insert_pos = rng.randrange(len(sentences) + 1) + sentences.insert(insert_pos, sentence) + return " ".join(sentences) + return sentence + + +def section_swap(content: str, rng: _random_module.Random) -> str: + """Swap two sections at the same heading level within the content.""" + # This operates on multi-section content with ## headings + parts = re.split(r'^(#{2,3}\s+.+)$', content, flags=re.MULTILINE) + + if len(parts) < 5: # Need at least 2 sections (preamble + heading + content + heading + content) + return content + + # Collect section indices (heading is at odd indices in split result) + section_indices = [i for i in range(1, len(parts), 2)] + + if len(section_indices) < 2: + return content + + # Pick two random section indices to swap + i, j = rng.sample(range(len(section_indices)), 2) + si, sj = section_indices[i], section_indices[j] + + # Swap heading + content pairs + parts[si], parts[sj] = parts[sj], parts[si] + # Swap their content blocks too + if si + 1 < len(parts) and sj + 1 < len(parts): + parts[si + 1], parts[sj + 1] = parts[sj + 1], parts[si + 1] + + return "".join(parts) + + +# All mutation operators for convenient iteration +ALL_MUTATIONS = [ + sentence_shuffle, + sentence_drop, + sentence_duplicate, + keyword_inject, + section_swap, +] diff --git a/gepa/scorer.py b/gepa/scorer.py new file mode 100644 index 0000000..ffc4cd1 --- /dev/null +++ b/gepa/scorer.py @@ -0,0 +1,307 @@ +""" +Scorer - deterministic keyword-matching scorer for review quality. + +Scores a review text against ground truth findings using keyword +co-occurrence. No external dependencies, no API calls. +""" + +import re +from dataclasses import dataclass, field +from typing import Dict, List, Optional + + +# Severity → default weight multiplier (used when finding has no explicit weight) +SEVERITY_WEIGHTS = { + "inaccurate": 2.0, + "verified": 0.5, + "misleading": 1.5, + "gap": 1.0, + "violation": 1.5, + "nit": 0.5, +} + +# Score component weights (must sum to 1.0) +COMPONENT_WEIGHTS = { + "weighted_recall": 0.50, + "precision": 0.25, + "calibration": 0.15, + "coverage_bonus": 0.10, +} + + +@dataclass +class FindingMatch: + """Result of matching a single finding against review text.""" + finding_id: str + matched: bool + keyword_hits: int + keyword_total: int + confidence: float # 0.0-1.0, fraction of keywords found + weight: float + + +@dataclass +class FalsePositiveMatch: + """Result of checking for a known false positive in review text.""" + fp_id: str + triggered: bool + penalty: float + + +@dataclass +class ScoreResult: + """Complete scoring result for a review.""" + weighted_recall: float # 0.0-1.0 + precision: float # 0.0-1.0 + calibration: float # 0.0-1.0 + coverage_bonus: float # 0.0-1.0 + composite: float # weighted combination + finding_matches: List[FindingMatch] = field(default_factory=list) + false_positive_matches: List[FalsePositiveMatch] = field(default_factory=list) + true_positives: int = 0 + false_positives: int = 0 + missed_required: List[str] = field(default_factory=list) + + def summary(self) -> str: + lines = [ + f"Composite: {self.composite:.3f}", + f" Recall: {self.weighted_recall:.3f} (weight {COMPONENT_WEIGHTS['weighted_recall']})", + f" Precision: {self.precision:.3f} (weight {COMPONENT_WEIGHTS['precision']})", + f" Calibration: {self.calibration:.3f} (weight {COMPONENT_WEIGHTS['calibration']})", + f" Coverage: {self.coverage_bonus:.3f} (weight {COMPONENT_WEIGHTS['coverage_bonus']})", + f" TP={self.true_positives} FP={self.false_positives}", + ] + if self.missed_required: + lines.append(f" Missed required: {', '.join(self.missed_required)}") + return "\n".join(lines) + + +class Scorer: + """ + Scores review text against ground truth using keyword co-occurrence. + + A finding is considered "matched" if at least `min_keyword_fraction` + of its keywords appear in the review text (case-insensitive). + """ + + def __init__(self, min_keyword_fraction: float = 0.5): + assert 0.0 < min_keyword_fraction <= 1.0, ( + f"min_keyword_fraction must be in (0, 1], got {min_keyword_fraction}" + ) + self.min_keyword_fraction = min_keyword_fraction + + def score( + self, + review_text: str, + ground_truth: Dict, + skill_name: Optional[str] = None, + ) -> ScoreResult: + """ + Score a review against ground truth. + + Args: + review_text: The review text to score. + ground_truth: Parsed ground truth dict (from JSON). + skill_name: If provided, only score findings relevant to this skill. + + Returns: + ScoreResult with component scores and details. + """ + assert isinstance(review_text, str), "review_text must be a string" + assert isinstance(ground_truth, dict), "ground_truth must be a dict" + assert "expected_findings" in ground_truth, ( + "ground_truth must have 'expected_findings'" + ) + + normalized = _normalize_text(review_text) + + findings = ground_truth["expected_findings"] + false_positives = ground_truth.get("known_false_positives", []) + + # Filter by skill relevance if specified + if skill_name: + findings = [ + f for f in findings + if skill_name in f.get("skill_relevance", []) + ] + + # Match expected findings + finding_matches = [] + for finding in findings: + match = self._match_finding(normalized, finding) + finding_matches.append(match) + + # Check for false positives + fp_matches = [] + for fp in false_positives: + fp_match = self._match_false_positive(normalized, fp) + fp_matches.append(fp_match) + + # Compute component scores + weighted_recall = self._compute_weighted_recall(finding_matches) + true_positives = sum(1 for m in finding_matches if m.matched) + false_positive_count = sum(1 for m in fp_matches if m.triggered) + precision = self._compute_precision(true_positives, false_positive_count) + calibration = self._compute_calibration(finding_matches, findings) + coverage_bonus = self._compute_coverage_bonus(finding_matches, findings) + missed_required = [ + m.finding_id for m, f in zip(finding_matches, findings) + if not m.matched and f.get("required", False) + ] + + composite = ( + COMPONENT_WEIGHTS["weighted_recall"] * weighted_recall + + COMPONENT_WEIGHTS["precision"] * precision + + COMPONENT_WEIGHTS["calibration"] * calibration + + COMPONENT_WEIGHTS["coverage_bonus"] * coverage_bonus + ) + + return ScoreResult( + weighted_recall=weighted_recall, + precision=precision, + calibration=calibration, + coverage_bonus=coverage_bonus, + composite=composite, + finding_matches=finding_matches, + false_positive_matches=fp_matches, + true_positives=true_positives, + false_positives=false_positive_count, + missed_required=missed_required, + ) + + def _match_finding(self, normalized: str, finding: Dict) -> FindingMatch: + """Check if a finding's keywords appear in the normalized text.""" + keywords = finding.get("keywords", []) + if not keywords: + return FindingMatch( + finding_id=finding["id"], + matched=False, + keyword_hits=0, + keyword_total=0, + confidence=0.0, + weight=finding.get("weight", 1.0), + ) + + hits = sum(1 for kw in keywords if kw.lower() in normalized) + fraction = hits / len(keywords) + matched = fraction >= self.min_keyword_fraction + + return FindingMatch( + finding_id=finding["id"], + matched=matched, + keyword_hits=hits, + keyword_total=len(keywords), + confidence=fraction, + weight=finding.get("weight", SEVERITY_WEIGHTS.get(finding.get("severity", "nit"), 1.0)), + ) + + def _match_false_positive(self, normalized: str, fp: Dict) -> FalsePositiveMatch: + """Check if a known false positive was flagged in the review.""" + keywords = fp.get("keywords", []) + if not keywords: + return FalsePositiveMatch(fp_id=fp["id"], triggered=False, penalty=0.0) + + hits = sum(1 for kw in keywords if kw.lower() in normalized) + fraction = hits / len(keywords) if keywords else 0.0 + triggered = fraction >= self.min_keyword_fraction + + return FalsePositiveMatch( + fp_id=fp["id"], + triggered=triggered, + penalty=fp.get("penalty", 1.0) if triggered else 0.0, + ) + + def _compute_weighted_recall(self, matches: List[FindingMatch]) -> float: + """Weighted recall: fraction of weighted findings matched.""" + if not matches: + return 0.0 + total_weight = sum(m.weight for m in matches) + if total_weight == 0.0: + return 0.0 + matched_weight = sum(m.weight for m in matches if m.matched) + return matched_weight / total_weight + + def _compute_precision(self, true_positives: int, false_positives: int) -> float: + """Precision: TP / (TP + FP). Returns 1.0 if no positives.""" + total = true_positives + false_positives + if total == 0: + return 1.0 + return true_positives / total + + def _compute_calibration( + self, matches: List[FindingMatch], findings: List[Dict] + ) -> float: + """ + Calibration: how well the review distinguishes severity levels. + + Higher-severity findings should have higher confidence scores. + Score is 1.0 - mean_absolute_deviation of severity-vs-confidence ranking. + """ + if len(matches) < 2: + return 0.5 # Not enough data to calibrate + + severity_order = { + "inaccurate": 5, "violation": 4, "misleading": 3, + "gap": 2, "nit": 1, "verified": 0, + } + + pairs = [] + for match, finding in zip(matches, findings): + sev = severity_order.get(finding.get("severity", "nit"), 1) + pairs.append((sev, match.confidence)) + + if not pairs: + return 0.5 + + # Compute rank correlation (simplified Spearman) + n = len(pairs) + sev_ranked = _rank([p[0] for p in pairs]) + conf_ranked = _rank([p[1] for p in pairs]) + + # Spearman's rho + d_sq_sum = sum((s - c) ** 2 for s, c in zip(sev_ranked, conf_ranked)) + if n <= 1: + return 0.5 + rho = 1 - (6 * d_sq_sum) / (n * (n * n - 1)) + + # Map rho from [-1, 1] to [0, 1] + return max(0.0, min(1.0, (rho + 1) / 2)) + + def _compute_coverage_bonus( + self, matches: List[FindingMatch], findings: List[Dict] + ) -> float: + """Bonus for finding optional (non-required) issues.""" + optional_matches = [ + m for m, f in zip(matches, findings) + if not f.get("required", False) + ] + if not optional_matches: + return 0.0 + found = sum(1 for m in optional_matches if m.matched) + return found / len(optional_matches) + + +def _normalize_text(text: str) -> str: + """Lowercase and normalize whitespace for keyword matching.""" + text = text.lower() + text = re.sub(r'\s+', ' ', text) + return text + + +def _rank(values: List[float]) -> List[float]: + """Assign fractional ranks to values (for Spearman correlation).""" + n = len(values) + indexed = sorted(enumerate(values), key=lambda x: x[1]) + ranks = [0.0] * n + + i = 0 + while i < n: + j = i + while j < n and indexed[j][1] == indexed[i][1]: + j += 1 + avg_rank = (i + j - 1) / 2.0 + 1 + for k in range(i, j): + ranks[indexed[k][0]] = avg_rank + i = j + + return ranks diff --git a/gepa/skill_fitness.py b/gepa/skill_fitness.py new file mode 100644 index 0000000..d33baaa --- /dev/null +++ b/gepa/skill_fitness.py @@ -0,0 +1,201 @@ +""" +Skill fitness evaluators for evolution. + +Three evaluator types: +- MockSkillEvaluator: baseline + gaussian noise (free, for testing GA machinery) +- OfflineSkillEvaluator: wraps existing OfflineEvaluator (free, deterministic) +- LiveSkillEvaluator: calls Claude CLI, scores against ground truth (~$0.05/eval) +""" + +import logging +import shutil +import tempfile +from abc import ABC, abstractmethod +from dataclasses import dataclass +from pathlib import Path +from typing import List, Optional + +from .candidate import SkillCandidate +from .evaluator import OfflineEvaluator, LiveEvaluator, GROUND_TRUTH_DIR, REVIEWS_DIR, AGENTS_DIR +from .scorer import Scorer + +logger = logging.getLogger(__name__) + + +class SkillFitnessEvaluator(ABC): + """Abstract base class for skill fitness evaluation.""" + + @abstractmethod + def evaluate(self, candidate: SkillCandidate) -> float: + """Evaluate a skill candidate. Returns fitness score 0.0-1.0.""" + ... + + @property + @abstractmethod + def cost_per_eval(self) -> float: + """Estimated cost in USD per evaluation.""" + ... + + +@dataclass +class CostTracker: + """Track API spending against a budget cap.""" + budget_cap: float = 5.0 + spent: float = 0.0 + + def can_afford(self, cost: float) -> bool: + return (self.spent + cost) <= self.budget_cap + + def charge(self, cost: float) -> None: + self.spent += cost + + @property + def remaining(self) -> float: + return max(0.0, self.budget_cap - self.spent) + + def summary(self) -> str: + return f"${self.spent:.2f} / ${self.budget_cap:.2f} ({self.remaining:.2f} remaining)" + + +class MockSkillEvaluator(SkillFitnessEvaluator): + """ + Mock evaluator for testing GA machinery. + + Score = baseline + gaussian_noise(std=0.05), penalized if word count + deviates too far from original. + """ + + def __init__(self, baseline: float = 0.5, noise_std: float = 0.05, + original_word_count: int = 500, rng=None): + self.baseline = baseline + self.noise_std = noise_std + self.original_word_count = original_word_count + self._rng = rng # random.Random instance for determinism + + def evaluate(self, candidate: SkillCandidate) -> float: + import random as _random + rng = self._rng or _random.Random() + + noise = rng.gauss(0, self.noise_std) + score = self.baseline + noise + + # Penalize word count deviation + wc = candidate.word_count() + if self.original_word_count > 0: + ratio = wc / self.original_word_count + if ratio < 0.5 or ratio > 2.0: + score -= 0.1 * abs(1.0 - ratio) + + return max(0.0, min(1.0, score)) + + @property + def cost_per_eval(self) -> float: + return 0.0 + + +class OfflineSkillEvaluator(SkillFitnessEvaluator): + """ + Wraps existing OfflineEvaluator for scoring stability testing. + + Note: Cannot improve scores since review text is cached and doesn't + change with the skill content. Useful for verifying GA doesn't degrade. + """ + + def __init__(self, ground_truth_dir: Path = None, reviews_dir: Path = None, + tasks: List[str] = None): + self._evaluator = OfflineEvaluator( + ground_truth_dir=ground_truth_dir or GROUND_TRUTH_DIR, + reviews_dir=reviews_dir or REVIEWS_DIR, + ) + self._tasks = tasks + + def evaluate(self, candidate: SkillCandidate) -> float: + result = self._evaluator.evaluate_skill( + candidate.name, tasks=self._tasks + ) + return result.aggregate_score + + @property + def cost_per_eval(self) -> float: + return 0.0 + + +class LiveSkillEvaluator(SkillFitnessEvaluator): + """ + Live evaluator: writes skill to temp file, calls Claude CLI, scores response. + + Cost: ~$0.05 per task evaluation. + """ + + def __init__(self, ground_truth_dir: Path = None, agents_dir: Path = None, + reviews_dir: Path = None, model: str = "sonnet", + tasks: List[str] = None, cost_tracker: CostTracker = None): + self._ground_truth_dir = ground_truth_dir or GROUND_TRUTH_DIR + self._agents_dir = agents_dir or AGENTS_DIR + self._reviews_dir = reviews_dir or REVIEWS_DIR + self._model = model + self._tasks = tasks + self._scorer = Scorer() + self.cost_tracker = cost_tracker or CostTracker() + self._live_eval = LiveEvaluator( + ground_truth_dir=self._ground_truth_dir, + agents_dir=self._agents_dir, + reviews_dir=self._reviews_dir, + scorer=self._scorer, + model=self._model, + ) + + def evaluate(self, candidate: SkillCandidate) -> float: + tasks = self._tasks or self._list_tasks() + cost = 0.05 * len(tasks) + + if not self.cost_tracker.can_afford(cost): + logger.warning("Budget exhausted (spent=%s, cap=%s). Returning 0.0", + self.cost_tracker.spent, self.cost_tracker.budget_cap) + return 0.0 + + # Write candidate to temp file + with tempfile.NamedTemporaryFile(mode='w', suffix='.md', delete=False) as f: + f.write(candidate.to_markdown()) + temp_path = Path(f.name) + + try: + # Temporarily point agents dir to temp location + original_agents = self._live_eval.agents_dir + temp_agents = temp_path.parent + # Copy temp file with correct name + skill_path = temp_agents / f"{candidate.name}.md" + if skill_path != temp_path: + shutil.copy2(temp_path, skill_path) + + self._live_eval.agents_dir = temp_agents + + scores = [] + for task_name in tasks: + result = self._live_eval.evaluate_skill_live( + candidate.name, task_name, save_review=False + ) + if result: + scores.append(result.composite) + + self.cost_tracker.charge(cost) + finally: + temp_path.unlink(missing_ok=True) + skill_path_cleanup = temp_path.parent / f"{candidate.name}.md" + if skill_path_cleanup.exists() and skill_path_cleanup != temp_path: + skill_path_cleanup.unlink(missing_ok=True) + self._live_eval.agents_dir = original_agents + + if not scores: + return 0.0 + return sum(scores) / len(scores) + + def _list_tasks(self) -> List[str]: + return sorted( + p.stem for p in self._ground_truth_dir.glob("*.json") + ) + + @property + def cost_per_eval(self) -> float: + tasks = self._tasks or self._list_tasks() + return 0.05 * len(tasks) diff --git a/gepa/test_candidate.py b/gepa/test_candidate.py new file mode 100644 index 0000000..6e6ed72 --- /dev/null +++ b/gepa/test_candidate.py @@ -0,0 +1,157 @@ +"""Unit tests for candidate.py""" + +import unittest +from pathlib import Path +from .candidate import SkillCandidate, SkillSection + + +SAMPLE_SKILL = """--- +name: test-skill +description: A test skill +user_invocable: true +--- + +# Test Skill + +Preamble text before any sections. + +## Section One + +Content of section one. + +## Section Two + +Content of section two with **bold** and `code`. + +### Subsection 2.1 + +Nested content here. + +## Section Three + +Final section. +""" + + +class TestSkillSection(unittest.TestCase): + def test_to_markdown(self): + s = SkillSection(heading="My Section", content="Some content.", level=2) + assert s.to_markdown() == "## My Section\n\nSome content." + + def test_word_count(self): + s = SkillSection(heading="X", content="one two three four") + assert s.word_count() == 4 + + def test_level_3(self): + s = SkillSection(heading="Sub", content="text", level=3) + assert s.to_markdown().startswith("### Sub") + + +class TestSkillCandidate(unittest.TestCase): + def setUp(self): + SkillCandidate.reset_id_counter() + + def test_parse_sample(self): + c = SkillCandidate.from_text("test-skill", SAMPLE_SKILL) + assert c.name == "test-skill" + assert "---" in c.frontmatter + # The # heading becomes L1 section; preamble text is in that section's content + title_section = c.get_section("Test Skill") + assert title_section is not None + assert "Preamble" in title_section.content + + def test_section_count(self): + c = SkillCandidate.from_text("test-skill", SAMPLE_SKILL) + # # (L1), ## Section One, ## Section Two, ### Subsection 2.1, ## Section Three + assert len(c.sections) == 5 + + def test_section_names(self): + c = SkillCandidate.from_text("test-skill", SAMPLE_SKILL) + names = c.section_names() + assert "Section One" in names + assert "Section Two" in names + assert "Section Three" in names + assert "Subsection 2.1" in names + + def test_get_section(self): + c = SkillCandidate.from_text("test-skill", SAMPLE_SKILL) + s = c.get_section("Section One") + assert s is not None + assert "Content of section one" in s.content + + def test_get_section_case_insensitive(self): + c = SkillCandidate.from_text("test-skill", SAMPLE_SKILL) + s = c.get_section("section one") + assert s is not None + + def test_get_section_missing(self): + c = SkillCandidate.from_text("test-skill", SAMPLE_SKILL) + assert c.get_section("Nonexistent") is None + + def test_replace_section(self): + c = SkillCandidate.from_text("test-skill", SAMPLE_SKILL) + result = c.replace_section("Section One", "New content.") + assert result is True + assert c.get_section("Section One").content == "New content." + + def test_replace_missing_section(self): + c = SkillCandidate.from_text("test-skill", SAMPLE_SKILL) + result = c.replace_section("Nope", "content") + assert result is False + + def test_roundtrip(self): + c = SkillCandidate.from_text("test-skill", SAMPLE_SKILL) + md = c.to_markdown() + c2 = SkillCandidate.from_text("test-skill", md) + assert len(c.sections) == len(c2.sections) + for s1, s2 in zip(c.sections, c2.sections): + assert s1.heading == s2.heading + assert s1.level == s2.level + + def test_to_dict(self): + c = SkillCandidate.from_text("test-skill", SAMPLE_SKILL) + d = c.to_dict() + assert d["name"] == "test-skill" + assert d["section_count"] == len(c.sections) + assert isinstance(d["word_count"], int) + + def test_auto_id(self): + c1 = SkillCandidate.from_text("a", SAMPLE_SKILL) + c2 = SkillCandidate.from_text("b", SAMPLE_SKILL) + assert c1._id < c2._id + + def test_repr(self): + c = SkillCandidate.from_text("test-skill", SAMPLE_SKILL) + r = repr(c) + assert "test-skill" in r + assert "sections=" in r + + def test_no_frontmatter(self): + text = "# Title\n\nContent.\n\n## Section\n\nBody." + c = SkillCandidate.from_text("bare", text) + assert c.frontmatter == "" + + def test_word_count(self): + c = SkillCandidate.from_text("test-skill", SAMPLE_SKILL) + wc = c.word_count() + assert wc > 0 + + def test_from_real_file(self): + """Test parsing actual skill files in the repo.""" + agents = Path(__file__).parent.parent / ".claude" / "agents" + if not agents.exists(): + self.skipTest("agents directory not found") + for path in agents.glob("*.md"): + c = SkillCandidate.from_file(path) + assert c.name == path.stem + assert len(c.sections) > 0 + # Roundtrip should preserve section count + md = c.to_markdown() + c2 = SkillCandidate.from_text(c.name, md) + assert len(c.sections) == len(c2.sections), ( + f"{c.name}: {len(c.sections)} != {len(c2.sections)}" + ) + + +if __name__ == "__main__": + unittest.main() diff --git a/gepa/test_dst_candidate.py b/gepa/test_dst_candidate.py new file mode 100644 index 0000000..c42e480 --- /dev/null +++ b/gepa/test_dst_candidate.py @@ -0,0 +1,112 @@ +"""Unit tests for dst_candidate.py""" + +import unittest +from .dst_candidate import DstCandidate, DstParamBounds, DST_PARAM_BOUNDS, PRESETS + + +class TestDstParamBounds(unittest.TestCase): + def test_clamp_within(self): + b = DstParamBounds(0.0, 1.0, 0.1, 0.5) + assert b.clamp(0.5) == 0.5 + + def test_clamp_below(self): + b = DstParamBounds(0.0, 1.0, 0.1, 0.5) + assert b.clamp(-0.5) == 0.0 + + def test_clamp_above(self): + b = DstParamBounds(0.0, 1.0, 0.1, 0.5) + assert b.clamp(1.5) == 1.0 + + +class TestDstCandidate(unittest.TestCase): + def setUp(self): + DstCandidate.reset_id_counter() + + def test_from_preset_moderate(self): + c = DstCandidate.from_preset("moderate") + assert c.config["global_multiplier"] == 1.0 + assert c.config["network.packet_drop"] == 0.01 + assert c.is_valid() + + def test_from_preset_calm(self): + c = DstCandidate.from_preset("calm") + assert c.config["global_multiplier"] == 0.1 + assert c.is_valid() + + def test_from_preset_chaos(self): + c = DstCandidate.from_preset("chaos") + assert c.config["global_multiplier"] == 3.0 + assert c.is_valid() + + def test_from_preset_invalid(self): + with self.assertRaises(AssertionError): + DstCandidate.from_preset("nonexistent") + + def test_env_string_roundtrip(self): + c = DstCandidate.from_preset("moderate") + env = c.to_env_string() + c2 = DstCandidate.from_env_string(env) + for key in DST_PARAM_BOUNDS: + assert abs(c.config[key] - c2.config[key]) < 1e-5, ( + f"Mismatch for {key}: {c.config[key]} != {c2.config[key]}" + ) + + def test_env_string_format(self): + c = DstCandidate.from_preset("moderate") + env = c.to_env_string() + assert "global_multiplier=" in env + assert "network.packet_drop=" in env + # Should be comma-separated + parts = env.split(",") + assert len(parts) == len(DST_PARAM_BOUNDS) + + def test_to_dict(self): + c = DstCandidate.from_preset("moderate") + c.fitness = 0.75 + d = c.to_dict() + assert d["fitness"] == 0.75 + assert "config" in d + assert "global_multiplier" in d["config"] + + def test_is_valid(self): + c = DstCandidate.from_preset("moderate") + assert c.is_valid() + + def test_is_valid_out_of_bounds(self): + c = DstCandidate.from_preset("moderate") + c.config["global_multiplier"] = 100.0 # Way above max of 5.0 + assert not c.is_valid() + + def test_auto_id(self): + c1 = DstCandidate.from_preset("moderate") + c2 = DstCandidate.from_preset("chaos") + assert c1._id < c2._id + + def test_repr(self): + c = DstCandidate.from_preset("moderate") + c.fitness = 0.42 + r = repr(c) + assert "0.42" in r + assert "gm=" in r + + def test_param_count(self): + """Should have 32 parameters (31 faults + global_multiplier).""" + assert len(DST_PARAM_BOUNDS) == 32 + + def test_all_presets_have_all_params(self): + """All presets should produce candidates with all params.""" + for name in PRESETS: + c = DstCandidate.from_preset(name) + for key in DST_PARAM_BOUNDS: + assert key in c.config, f"Preset '{name}' missing key '{key}'" + + def test_from_env_string_partial(self): + """Parsing partial env string should fill defaults.""" + c = DstCandidate.from_env_string("global_multiplier=2.5") + assert c.config["global_multiplier"] == 2.5 + # Other params should be at defaults + assert c.config["network.packet_drop"] == DST_PARAM_BOUNDS["network.packet_drop"].default + + +if __name__ == "__main__": + unittest.main() diff --git a/gepa/test_dst_optimizer.py b/gepa/test_dst_optimizer.py new file mode 100644 index 0000000..fc15cbb --- /dev/null +++ b/gepa/test_dst_optimizer.py @@ -0,0 +1,95 @@ +"""Unit tests for dst_optimizer.py""" + +import random +import tempfile +import unittest +from pathlib import Path + +from .dst_candidate import DstCandidate +from .dst_evaluator import MockDstEvaluator +from .dst_optimizer import DstEvolutionEngine + + +class TestDstEvolutionEngine(unittest.TestCase): + def setUp(self): + DstCandidate.reset_id_counter() + self.tmpdir = tempfile.mkdtemp() + self.results_dir = Path(self.tmpdir) / "dst_results" + + def _make_engine(self, **kwargs): + evaluator = MockDstEvaluator(rng=random.Random(42)) + defaults = dict( + evaluator=evaluator, + population_size=6, + elite_count=2, + mutation_rate=0.3, + generations=3, + results_dir=self.results_dir, + seed=42, + ) + defaults.update(kwargs) + return DstEvolutionEngine(**defaults) + + def test_basic_run(self): + engine = self._make_engine() + best = engine.run() + assert best is not None + assert best.fitness is not None + assert best.fitness > 0 + + def test_deterministic_with_seed(self): + engine1 = self._make_engine(seed=99) + best1 = engine1.run() + + DstCandidate.reset_id_counter() + engine2 = self._make_engine(seed=99) + best2 = engine2.run() + + assert abs(best1.fitness - best2.fitness) < 1e-6 + + def test_population_size(self): + engine = self._make_engine(population_size=8, generations=2) + engine.run() + for entry in engine.history: + assert len(entry["population_fitness"]) == 8 + + def test_elite_preserved(self): + engine = self._make_engine(elite_count=2, generations=4) + engine.run() + fitnesses = [h["best_fitness"] for h in engine.history] + for i in range(1, len(fitnesses)): + # Best should never decrease (elitism) + assert fitnesses[i] >= fitnesses[i-1] - 0.001 + + def test_results_saved(self): + engine = self._make_engine(generations=2) + engine.run() + assert (self.results_dir / "gen_000.json").exists() + assert (self.results_dir / "gen_001.json").exists() + assert (self.results_dir / "best_config.env").exists() + assert (self.results_dir / "evolution_history.json").exists() + + def test_best_config_is_valid(self): + engine = self._make_engine() + best = engine.run() + assert best.is_valid() + + def test_crossover_produces_valid(self): + engine = self._make_engine() + p1 = DstCandidate.from_preset("calm") + p2 = DstCandidate.from_preset("chaos") + p1.fitness = 0.5 + p2.fitness = 0.5 + child = engine._crossover(p1, p2) + assert child.is_valid() + + def test_mutation_stays_in_bounds(self): + engine = self._make_engine() + for _ in range(100): + c = DstCandidate.from_preset("moderate") + c = engine._mutate(c) + assert c.is_valid(), f"Mutation produced invalid: {c.config}" + + +if __name__ == "__main__": + unittest.main() diff --git a/gepa/test_evaluator.py b/gepa/test_evaluator.py new file mode 100644 index 0000000..d3e262d --- /dev/null +++ b/gepa/test_evaluator.py @@ -0,0 +1,121 @@ +"""Unit tests for evaluator.py""" + +import json +import tempfile +import unittest +from pathlib import Path + +from .evaluator import OfflineEvaluator +from .scorer import Scorer + + +class TestOfflineEvaluator(unittest.TestCase): + def setUp(self): + self.tmpdir = tempfile.mkdtemp() + self.gt_dir = Path(self.tmpdir) / "ground_truth" + self.gt_dir.mkdir() + self.reviews_dir = Path(self.tmpdir) / "reviews" + self.reviews_dir.mkdir() + + # Create a simple ground truth file + gt = { + "task": "test_task", + "description": "Test task", + "context_files": [], + "skills_under_test": ["skill-a", "skill-b"], + "expected_findings": [ + { + "id": "f-01", + "severity": "inaccurate", + "claim": "test", + "ground_truth": "correct", + "keywords": ["alpha", "beta"], + "required": True, + "weight": 2.0, + "skill_relevance": ["skill-a"], + }, + ], + "known_false_positives": [], + } + (self.gt_dir / "test_task.json").write_text(json.dumps(gt)) + + def test_list_tasks(self): + evaluator = OfflineEvaluator( + ground_truth_dir=self.gt_dir, + reviews_dir=self.reviews_dir, + ) + tasks = evaluator.list_tasks() + assert tasks == ["test_task"] + + def test_load_ground_truth(self): + evaluator = OfflineEvaluator( + ground_truth_dir=self.gt_dir, + reviews_dir=self.reviews_dir, + ) + gt = evaluator.load_ground_truth("test_task") + assert gt["task"] == "test_task" + + def test_save_and_get_review(self): + evaluator = OfflineEvaluator( + ground_truth_dir=self.gt_dir, + reviews_dir=self.reviews_dir, + ) + path = evaluator.save_review_text("skill-a", "test_task", "alpha beta review") + assert path.exists() + text = evaluator.get_review_text("skill-a", "test_task") + assert text == "alpha beta review" + + def test_get_missing_review(self): + evaluator = OfflineEvaluator( + ground_truth_dir=self.gt_dir, + reviews_dir=self.reviews_dir, + ) + assert evaluator.get_review_text("nope", "nope") is None + + def test_evaluate_skill(self): + evaluator = OfflineEvaluator( + ground_truth_dir=self.gt_dir, + reviews_dir=self.reviews_dir, + ) + evaluator.save_review_text("skill-a", "test_task", "alpha beta found the issue") + result = evaluator.evaluate_skill("skill-a") + assert "test_task" in result.task_scores + assert result.aggregate_score > 0 + + def test_evaluate_skill_no_reviews(self): + evaluator = OfflineEvaluator( + ground_truth_dir=self.gt_dir, + reviews_dir=self.reviews_dir, + ) + result = evaluator.evaluate_skill("no-reviews") + assert len(result.task_scores) == 0 + assert result.aggregate_score == 0.0 + + def test_evaluate_all_skills(self): + evaluator = OfflineEvaluator( + ground_truth_dir=self.gt_dir, + reviews_dir=self.reviews_dir, + ) + evaluator.save_review_text("skill-a", "test_task", "alpha beta") + evaluator.save_review_text("skill-b", "test_task", "gamma delta") + + results = evaluator.evaluate_all_skills() + assert "skill-a" in results + assert "skill-b" in results + # skill-a should score higher (has relevant keywords) + assert results["skill-a"].aggregate_score > results["skill-b"].aggregate_score + + def test_evaluation_result_summary(self): + evaluator = OfflineEvaluator( + ground_truth_dir=self.gt_dir, + reviews_dir=self.reviews_dir, + ) + evaluator.save_review_text("skill-a", "test_task", "alpha beta") + result = evaluator.evaluate_skill("skill-a") + summary = result.summary() + assert "skill-a" in summary + assert "aggregate" in summary + + +if __name__ == "__main__": + unittest.main() diff --git a/gepa/test_evolution.py b/gepa/test_evolution.py new file mode 100644 index 0000000..da1d2a9 --- /dev/null +++ b/gepa/test_evolution.py @@ -0,0 +1,151 @@ +"""Unit tests for evolution.py""" + +import random +import tempfile +import unittest +from pathlib import Path + +from .candidate import SkillCandidate +from .evolution import SkillEvolutionEngine +from .skill_fitness import MockSkillEvaluator + + +SAMPLE_SKILL = """--- +name: test-skill +description: Test +user_invocable: true +--- + +# Test Skill + +Preamble text. + +## Section One + +First section content. Multiple sentences here. And another one. + +## Section Two + +Second section content. With more text. And details. + +## Section Three + +Third section content. +""" + + +class TestSkillEvolutionEngine(unittest.TestCase): + def setUp(self): + SkillCandidate.reset_id_counter() + self.tmpdir = tempfile.mkdtemp() + self.results_dir = Path(self.tmpdir) / "results" + + # Create a mock agents dir with the test skill + self.agents_dir = Path(self.tmpdir) / "agents" + self.agents_dir.mkdir(parents=True) + (self.agents_dir / "test-skill.md").write_text(SAMPLE_SKILL) + + def _make_engine(self, **kwargs): + evaluator = MockSkillEvaluator( + baseline=0.5, noise_std=0.05, + rng=random.Random(42), + ) + defaults = dict( + skill_name="test-skill", + evaluator=evaluator, + population_size=4, + elite_count=1, + mutation_rate=0.3, + generations=3, + results_dir=self.results_dir, + budget_cap=100.0, + seed=42, + ) + defaults.update(kwargs) + + # Monkey-patch AGENTS_DIR for test + import gepa.evaluator as eval_mod + self._orig_agents = eval_mod.AGENTS_DIR + eval_mod.AGENTS_DIR = self.agents_dir + + return SkillEvolutionEngine(**defaults) + + def tearDown(self): + import gepa.evaluator as eval_mod + if hasattr(self, '_orig_agents'): + eval_mod.AGENTS_DIR = self._orig_agents + + def test_basic_run(self): + engine = self._make_engine() + best = engine.run() + assert best is not None + assert best.fitness is not None + assert best.fitness > 0 + + def test_deterministic_with_seed(self): + engine1 = self._make_engine(seed=99) + best1 = engine1.run() + + SkillCandidate.reset_id_counter() + engine2 = self._make_engine(seed=99) + best2 = engine2.run() + + assert best1.fitness == best2.fitness + + def test_population_size_maintained(self): + engine = self._make_engine(population_size=6, generations=2) + engine.run() + assert len(engine.history) == 2 + for entry in engine.history: + assert len(entry["population_fitness"]) == 6 + + def test_elite_preserved(self): + engine = self._make_engine(elite_count=2, generations=3) + engine.run() + # Best fitness should not decrease between generations + fitnesses = [h["best_fitness"] for h in engine.history] + for i in range(1, len(fitnesses)): + assert fitnesses[i] >= fitnesses[i-1] - 0.001 # tiny tolerance for float + + def test_results_saved(self): + engine = self._make_engine(generations=2) + engine.run() + # Check generation files exist + assert (self.results_dir / "test-skill" / "gen_000.json").exists() + assert (self.results_dir / "test-skill" / "gen_001.json").exists() + # Check final results + assert (self.results_dir / "test-skill" / "best_skill.md").exists() + assert (self.results_dir / "test-skill" / "evolution_history.json").exists() + + def test_budget_enforcement(self): + """Engine should stop early if budget is exhausted.""" + # Use a mock that pretends to cost money + class ExpensiveEval(MockSkillEvaluator): + @property + def cost_per_eval(self): + return 1.0 + + engine = self._make_engine( + evaluator=ExpensiveEval(baseline=0.5, rng=random.Random(42)), + budget_cap=2.0, + population_size=4, + generations=10, + ) + best = engine.run() + # Should have stopped early due to budget + assert len(engine.history) < 10 + + def test_crossover_preserves_sections(self): + engine = self._make_engine() + from .evaluator import AGENTS_DIR + parent1 = SkillCandidate.from_file(AGENTS_DIR / "test-skill.md") + parent2 = SkillCandidate.from_file(AGENTS_DIR / "test-skill.md") + parent2.replace_section("Section One", "Modified content for crossover test.") + + child = engine._crossover(parent1, parent2) + # Child should have same number of sections + assert len(child.sections) == len(parent1.sections) + + +if __name__ == "__main__": + unittest.main() diff --git a/gepa/test_mutations.py b/gepa/test_mutations.py new file mode 100644 index 0000000..bec5185 --- /dev/null +++ b/gepa/test_mutations.py @@ -0,0 +1,142 @@ +"""Unit tests for mutations.py""" + +import random +import unittest +from .mutations import ( + sentence_shuffle, sentence_drop, sentence_duplicate, + keyword_inject, section_swap, ALL_MUTATIONS, +) + + +class TestSentenceShuffle(unittest.TestCase): + def test_shuffles_sentences(self): + rng = random.Random(42) + content = "First sentence. Second sentence. Third sentence." + result = sentence_shuffle(content, rng) + # Should contain all original sentences + assert "First sentence." in result + assert "Second sentence." in result + assert "Third sentence." in result + + def test_single_sentence_unchanged(self): + rng = random.Random(42) + content = "Only one sentence here." + assert sentence_shuffle(content, rng) == content + + def test_skips_code_fences(self): + rng = random.Random(42) + content = "Text. More text.\n```rust\ncode\n```\nEnd." + assert sentence_shuffle(content, rng) == content + + def test_empty_content(self): + rng = random.Random(42) + assert sentence_shuffle("", rng) == "" + + +class TestSentenceDrop(unittest.TestCase): + def test_drops_one_sentence(self): + rng = random.Random(42) + content = "First. Second. Third." + result = sentence_drop(content, rng) + parts = [p.strip() for p in result.split(".") if p.strip()] + assert len(parts) == 2 + + def test_single_sentence_unchanged(self): + rng = random.Random(42) + content = "Only one." + assert sentence_drop(content, rng) == content + + def test_deterministic(self): + content = "A. B. C. D." + r1 = sentence_drop(content, random.Random(99)) + r2 = sentence_drop(content, random.Random(99)) + assert r1 == r2 + + +class TestSentenceDuplicate(unittest.TestCase): + def test_adds_one_sentence(self): + rng = random.Random(42) + content = "First. Second." + result = sentence_duplicate(content, rng) + # Should have 3 sentence-like chunks now + assert len(result) > len(content) + + def test_empty_content(self): + rng = random.Random(42) + assert sentence_duplicate("", rng) == "" + + def test_deterministic(self): + content = "A. B. C." + r1 = sentence_duplicate(content, random.Random(7)) + r2 = sentence_duplicate(content, random.Random(7)) + assert r1 == r2 + + +class TestKeywordInject(unittest.TestCase): + def test_injects_sentence(self): + rng = random.Random(42) + content = "Existing content here." + result = keyword_inject(content, rng) + assert len(result) > len(content) + + def test_custom_keywords(self): + rng = random.Random(42) + content = "Text." + result = keyword_inject(content, rng, keywords=["alpha", "beta"]) + assert "alpha" in result or "beta" in result + + def test_empty_content(self): + rng = random.Random(42) + result = keyword_inject("", rng) + assert len(result) > 0 + + def test_single_keyword(self): + rng = random.Random(42) + result = keyword_inject("Base.", rng, keywords=["only"]) + assert "only" in result + + +class TestSectionSwap(unittest.TestCase): + def test_swaps_sections(self): + rng = random.Random(42) + content = "## A\n\nContent A.\n\n## B\n\nContent B." + result = section_swap(content, rng) + # Should still contain both sections + assert "Content A." in result + assert "Content B." in result + + def test_single_section_unchanged(self): + rng = random.Random(42) + content = "## Only\n\nOne section." + assert section_swap(content, rng) == content + + def test_no_headings_unchanged(self): + rng = random.Random(42) + content = "Just plain text without sections." + assert section_swap(content, rng) == content + + +class TestAllMutations(unittest.TestCase): + def test_all_mutations_list(self): + assert len(ALL_MUTATIONS) == 5 + + def test_all_return_string(self): + rng = random.Random(42) + content = "First sentence. Second sentence. Third sentence." + for mut in ALL_MUTATIONS: + if mut == keyword_inject: + result = mut(content, rng) + else: + result = mut(content, rng) + assert isinstance(result, str), f"{mut.__name__} returned {type(result)}" + + def test_deterministic_with_same_seed(self): + content = "Alpha. Beta. Gamma. Delta." + for mut in ALL_MUTATIONS: + r1 = mut(content, random.Random(123)) + r2 = mut(content, random.Random(123)) + assert r1 == r2, f"{mut.__name__} not deterministic" + + +if __name__ == "__main__": + unittest.main() diff --git a/gepa/test_scorer.py b/gepa/test_scorer.py new file mode 100644 index 0000000..bd9fa47 --- /dev/null +++ b/gepa/test_scorer.py @@ -0,0 +1,188 @@ +"""Unit tests for scorer.py""" + +import unittest +from .scorer import Scorer, _normalize_text, _rank, COMPONENT_WEIGHTS + + +class TestNormalizeText(unittest.TestCase): + def test_lowercase(self): + assert _normalize_text("Hello WORLD") == "hello world" + + def test_whitespace_collapse(self): + assert _normalize_text("foo bar\n\nbaz") == "foo bar baz" + + def test_empty(self): + assert _normalize_text("") == "" + + +class TestRank(unittest.TestCase): + def test_distinct_values(self): + assert _rank([3.0, 1.0, 2.0]) == [3.0, 1.0, 2.0] + + def test_tied_values(self): + ranks = _rank([1.0, 1.0, 3.0]) + assert ranks == [1.5, 1.5, 3.0] + + def test_all_same(self): + ranks = _rank([5.0, 5.0, 5.0]) + assert ranks == [2.0, 2.0, 2.0] + + +class TestScorer(unittest.TestCase): + def setUp(self): + self.scorer = Scorer(min_keyword_fraction=0.5) + self.ground_truth = { + "expected_findings": [ + { + "id": "test-01", + "severity": "inaccurate", + "claim": "test claim", + "ground_truth": "correction", + "keywords": ["bounded", "channel", "unbounded"], + "required": True, + "weight": 2.0, + "skill_relevance": ["rust-dev"], + }, + { + "id": "test-02", + "severity": "nit", + "claim": "minor issue", + "ground_truth": "correction", + "keywords": ["format", "error", "prefix"], + "required": False, + "weight": 0.5, + "skill_relevance": ["rust-dev"], + }, + ], + "known_false_positives": [ + { + "id": "fp-01", + "claim": "buggify is broken", + "why_correct": "intentional", + "keywords": ["buggify", "default", "enabled"], + "penalty": 1.0, + }, + ], + } + + def test_perfect_review(self): + """Review that hits all keywords should score high.""" + text = ( + "The bounded channel claim is wrong. Channels are unbounded. " + "Also the error format has wrong prefix." + ) + result = self.scorer.score(text, self.ground_truth) + assert result.true_positives == 2 + assert result.false_positives == 0 + assert result.weighted_recall == 1.0 + assert result.composite > 0.8 + + def test_empty_review(self): + """Empty review should score zero recall.""" + result = self.scorer.score("", self.ground_truth) + assert result.true_positives == 0 + assert result.weighted_recall == 0.0 + # precision=1.0 (no positives) + calibration=0.5 gives ~0.325 + assert result.composite < 0.4 + + def test_partial_review(self): + """Review hitting only some keywords.""" + text = "The channel implementation uses unbounded channels." + result = self.scorer.score(text, self.ground_truth) + # Should match test-01 (2/3 keywords = 0.67 > 0.5) + assert result.true_positives >= 1 + + def test_false_positive_detected(self): + """Review flagging a known false positive should lose precision.""" + text = ( + "The bounded channel claim is wrong. Channels are unbounded. " + "Also buggify default being enabled is a bug." + ) + result = self.scorer.score(text, self.ground_truth) + assert result.false_positives == 1 + assert result.precision < 1.0 + + def test_skill_filter(self): + """Filtering by skill should only score relevant findings.""" + gt = { + "expected_findings": [ + { + "id": "a", + "severity": "inaccurate", + "keywords": ["foo", "bar"], + "required": True, + "weight": 1.0, + "skill_relevance": ["rust-dev"], + }, + { + "id": "b", + "severity": "gap", + "keywords": ["baz", "qux"], + "required": True, + "weight": 1.0, + "skill_relevance": ["dst"], + }, + ], + "known_false_positives": [], + } + result = self.scorer.score("foo bar baz qux", gt, skill_name="rust-dev") + # Should only evaluate finding "a" + assert len(result.finding_matches) == 1 + assert result.finding_matches[0].finding_id == "a" + + def test_missed_required(self): + """Missing a required finding should be tracked.""" + text = "The error format has wrong prefix." + result = self.scorer.score(text, self.ground_truth) + assert "test-01" in result.missed_required + + def test_component_weights_sum_to_one(self): + total = sum(COMPONENT_WEIGHTS.values()) + assert abs(total - 1.0) < 1e-6, f"weights sum to {total}" + + def test_no_findings(self): + """Ground truth with no findings should return sensible defaults.""" + gt = {"expected_findings": [], "known_false_positives": []} + result = self.scorer.score("anything here", gt) + assert result.weighted_recall == 0.0 + assert result.precision == 1.0 + + +class TestScorerMinFraction(unittest.TestCase): + def test_high_threshold(self): + """Higher threshold requires more keywords to match.""" + scorer = Scorer(min_keyword_fraction=1.0) + gt = { + "expected_findings": [{ + "id": "x", + "severity": "gap", + "keywords": ["alpha", "beta", "gamma"], + "required": True, + "weight": 1.0, + }], + "known_false_positives": [], + } + # Only 2/3 keywords present -> should NOT match with threshold=1.0 + result = scorer.score("alpha beta", gt) + assert result.true_positives == 0 + + def test_low_threshold(self): + """Lower threshold matches with fewer keywords.""" + scorer = Scorer(min_keyword_fraction=0.3) + gt = { + "expected_findings": [{ + "id": "x", + "severity": "gap", + "keywords": ["alpha", "beta", "gamma"], + "required": True, + "weight": 1.0, + }], + "known_false_positives": [], + } + # 1/3 keywords present -> should match with threshold=0.3 + result = scorer.score("alpha", gt) + assert result.true_positives == 1 + + +if __name__ == "__main__": + unittest.main() diff --git a/src/buggify/config.rs b/src/buggify/config.rs index cbd5fca..8769d17 100644 --- a/src/buggify/config.rs +++ b/src/buggify/config.rs @@ -207,6 +207,99 @@ impl FaultConfig { self.global_multiplier = multiplier.max(0.0); self } + + /// Load from BUGGIFY_CONFIG env var, or fall back to moderate(). + /// Format: "global_multiplier=1.5,network.packet_drop=0.05,..." + pub fn from_env_or_default() -> Self { + match std::env::var("BUGGIFY_CONFIG") { + Ok(s) if !s.is_empty() => { + Self::parse_config_string(&s).unwrap_or_else(|e| { + eprintln!("BUGGIFY_CONFIG parse error: {e}. Using moderate()."); + Self::moderate() + }) + } + _ => Self::moderate(), + } + } + + /// Parse a config string in "key=value,key=value" format. + pub fn parse_config_string(s: &str) -> Result { + let mut config = Self::new(); + for pair in s.split(',') { + let pair = pair.trim(); + if pair.is_empty() { + continue; + } + let (key, val) = pair + .split_once('=') + .ok_or_else(|| format!("invalid pair: {pair}"))?; + let val: f64 = val + .trim() + .parse() + .map_err(|e| format!("invalid value for {key}: {e}"))?; + let key = key.trim(); + if key == "global_multiplier" { + config.global_multiplier = val.max(0.0); + } else if let Some(fault_id) = fault_key_to_static(key) { + config.set(fault_id, val); + } + // Unknown keys are silently ignored (forward compatibility) + } + Ok(config) + } +} + +/// Map dotted fault key string to the corresponding static fault constant. +fn fault_key_to_static(key: &str) -> Option<&'static str> { + match key { + // Network + "network.packet_drop" => Some(faults::network::PACKET_DROP), + "network.packet_corrupt" => Some(faults::network::PACKET_CORRUPT), + "network.partial_write" => Some(faults::network::PARTIAL_WRITE), + "network.reorder" => Some(faults::network::REORDER), + "network.connection_reset" => Some(faults::network::CONNECTION_RESET), + "network.connect_timeout" => Some(faults::network::CONNECT_TIMEOUT), + "network.delay" => Some(faults::network::DELAY), + "network.duplicate" => Some(faults::network::DUPLICATE), + // Timer + "timer.drift_fast" => Some(faults::timer::DRIFT_FAST), + "timer.drift_slow" => Some(faults::timer::DRIFT_SLOW), + "timer.skip" => Some(faults::timer::SKIP), + "timer.duplicate" => Some(faults::timer::DUPLICATE), + "timer.jump_forward" => Some(faults::timer::JUMP_FORWARD), + "timer.jump_backward" => Some(faults::timer::JUMP_BACKWARD), + // Process + "process.crash" => Some(faults::process::CRASH), + "process.pause" => Some(faults::process::PAUSE), + "process.slow" => Some(faults::process::SLOW), + "process.oom" => Some(faults::process::OOM), + "process.cpu_starvation" => Some(faults::process::CPU_STARVATION), + // Disk + "disk.write_fail" => Some(faults::disk::WRITE_FAIL), + "disk.partial_write" => Some(faults::disk::PARTIAL_WRITE), + "disk.corruption" => Some(faults::disk::CORRUPTION), + "disk.slow" => Some(faults::disk::SLOW), + "disk.fsync_fail" => Some(faults::disk::FSYNC_FAIL), + "disk.stale_read" => Some(faults::disk::STALE_READ), + "disk.disk_full" => Some(faults::disk::DISK_FULL), + // Object Store + "object_store.put_fail" => Some(faults::object_store::PUT_FAIL), + "object_store.get_fail" => Some(faults::object_store::GET_FAIL), + "object_store.get_corrupt" => Some(faults::object_store::GET_CORRUPT), + "object_store.timeout" => Some(faults::object_store::TIMEOUT), + "object_store.partial_write" => Some(faults::object_store::PARTIAL_WRITE), + "object_store.delete_fail" => Some(faults::object_store::DELETE_FAIL), + "object_store.list_incomplete" => Some(faults::object_store::LIST_INCOMPLETE), + "object_store.rename_fail" => Some(faults::object_store::RENAME_FAIL), + "object_store.slow" => Some(faults::object_store::SLOW), + // Replication + "replication.gossip_drop" => Some(faults::replication::GOSSIP_DROP), + "replication.gossip_delay" => Some(faults::replication::GOSSIP_DELAY), + "replication.gossip_corrupt" => Some(faults::replication::GOSSIP_CORRUPT), + "replication.split_brain" => Some(faults::replication::SPLIT_BRAIN), + "replication.stale_replica" => Some(faults::replication::STALE_REPLICA), + _ => None, + } } #[cfg(test)] @@ -257,4 +350,87 @@ mod tests { assert!(config.get(faults::network::PACKET_DROP) > 0.0); assert_eq!(config.global_multiplier, 2.0); } + + #[test] + fn test_parse_config_string_basic() { + let config = FaultConfig::parse_config_string( + "global_multiplier=2.0,network.packet_drop=0.05" + ).unwrap(); + assert_eq!(config.global_multiplier, 2.0); + assert!((config.probabilities[faults::network::PACKET_DROP] - 0.05).abs() < 1e-9); + } + + #[test] + fn test_parse_config_string_empty() { + let config = FaultConfig::parse_config_string("").unwrap(); + assert_eq!(config.global_multiplier, 1.0); // default from new() + } + + #[test] + fn test_parse_config_string_whitespace() { + let config = FaultConfig::parse_config_string( + " global_multiplier = 1.5 , network.delay = 0.1 " + ).unwrap(); + assert_eq!(config.global_multiplier, 1.5); + assert!((config.probabilities[faults::network::DELAY] - 0.1).abs() < 1e-9); + } + + #[test] + fn test_parse_config_string_all_faults() { + // Build a config string with all known faults + let pairs: Vec = faults::ALL_FAULTS.iter() + .map(|f| format!("{}=0.123", f)) + .collect(); + let s = pairs.join(","); + let config = FaultConfig::parse_config_string(&s).unwrap(); + for fault in faults::ALL_FAULTS { + assert!( + (config.probabilities[*fault] - 0.123).abs() < 1e-9, + "Fault {fault} not parsed correctly" + ); + } + } + + #[test] + fn test_parse_config_string_invalid_pair() { + let result = FaultConfig::parse_config_string("no_equals_sign"); + assert!(result.is_err()); + } + + #[test] + fn test_parse_config_string_invalid_value() { + let result = FaultConfig::parse_config_string("global_multiplier=notanumber"); + assert!(result.is_err()); + } + + #[test] + fn test_parse_config_string_unknown_key_ignored() { + let config = FaultConfig::parse_config_string( + "global_multiplier=1.0,unknown.key=0.5" + ).unwrap(); + assert_eq!(config.global_multiplier, 1.0); + // unknown key should be silently ignored + } + + #[test] + fn test_from_env_or_default_no_env() { + // When env var is not set, should return moderate() + std::env::remove_var("BUGGIFY_CONFIG"); + let config = FaultConfig::from_env_or_default(); + let moderate = FaultConfig::moderate(); + assert_eq!(config.global_multiplier, moderate.global_multiplier); + } + + #[test] + fn test_fault_key_to_static_coverage() { + // Every fault in ALL_FAULTS should be mappable from its string value + for fault in faults::ALL_FAULTS { + let result = fault_key_to_static(fault); + assert!( + result.is_some(), + "fault_key_to_static missing mapping for: {fault}" + ); + assert_eq!(result.unwrap(), *fault); + } + } } diff --git a/src/replication/crdt_dst.rs b/src/replication/crdt_dst.rs index 60064c2..8f86603 100644 --- a/src/replication/crdt_dst.rs +++ b/src/replication/crdt_dst.rs @@ -50,6 +50,26 @@ impl Default for CRDTDSTConfig { } impl CRDTDSTConfig { + /// Load from BUGGIFY_CONFIG env var, or fall back to moderate(seed). + /// Maps `replication.*` fault keys and applies `global_multiplier`. + pub fn from_env_or_default(seed: u64) -> Self { + use crate::buggify::faults; + use crate::buggify::FaultConfig; + + let fc = FaultConfig::from_env_or_default(); + let gm = fc.global_multiplier; + let base = Self::moderate(seed); + + CRDTDSTConfig { + seed, + message_drop_prob: (fc.probabilities.get(faults::replication::GOSSIP_DROP).copied() + .unwrap_or(base.message_drop_prob) * gm).clamp(0.0, 1.0), + partition_prob: (fc.probabilities.get(faults::replication::SPLIT_BRAIN).copied() + .unwrap_or(base.partition_prob) * gm).clamp(0.0, 1.0), + ..base + } + } + pub fn new(seed: u64, num_replicas: usize) -> Self { CRDTDSTConfig { seed, diff --git a/src/streaming/simulated_store.rs b/src/streaming/simulated_store.rs index 7e2c141..210ea8c 100644 --- a/src/streaming/simulated_store.rs +++ b/src/streaming/simulated_store.rs @@ -66,6 +66,37 @@ impl SimulatedStoreConfig { } } + /// Load from BUGGIFY_CONFIG env var, or fall back to default(). + /// Maps `object_store.*` fault keys and applies `global_multiplier`. + pub fn from_env_or_default() -> Self { + use crate::buggify::faults; + use crate::buggify::FaultConfig; + + let fc = FaultConfig::from_env_or_default(); + let gm = fc.global_multiplier; + let base = Self::default(); + + SimulatedStoreConfig { + put_fail_prob: (fc.probabilities.get(faults::object_store::PUT_FAIL).copied() + .unwrap_or(base.put_fail_prob) * gm).clamp(0.0, 1.0), + get_fail_prob: (fc.probabilities.get(faults::object_store::GET_FAIL).copied() + .unwrap_or(base.get_fail_prob) * gm).clamp(0.0, 1.0), + get_corrupt_prob: (fc.probabilities.get(faults::object_store::GET_CORRUPT).copied() + .unwrap_or(base.get_corrupt_prob) * gm).clamp(0.0, 1.0), + timeout_prob: (fc.probabilities.get(faults::object_store::TIMEOUT).copied() + .unwrap_or(base.timeout_prob) * gm).clamp(0.0, 1.0), + partial_write_prob: (fc.probabilities.get(faults::object_store::PARTIAL_WRITE).copied() + .unwrap_or(base.partial_write_prob) * gm).clamp(0.0, 1.0), + delete_fail_prob: (fc.probabilities.get(faults::object_store::DELETE_FAIL).copied() + .unwrap_or(base.delete_fail_prob) * gm).clamp(0.0, 1.0), + list_incomplete_prob: (fc.probabilities.get(faults::object_store::LIST_INCOMPLETE).copied() + .unwrap_or(base.list_incomplete_prob) * gm).clamp(0.0, 1.0), + rename_fail_prob: (fc.probabilities.get(faults::object_store::RENAME_FAIL).copied() + .unwrap_or(base.rename_fail_prob) * gm).clamp(0.0, 1.0), + latency_range_us: base.latency_range_us, + } + } + /// No faults - for baseline testing pub fn no_faults() -> Self { SimulatedStoreConfig { diff --git a/src/streaming/wal_store.rs b/src/streaming/wal_store.rs index 0a58206..3c5a2cc 100644 --- a/src/streaming/wal_store.rs +++ b/src/streaming/wal_store.rs @@ -413,6 +413,30 @@ impl Default for SimulatedWalStoreConfig { } impl SimulatedWalStoreConfig { + /// Load from BUGGIFY_CONFIG env var, or fall back to default(). + /// Maps `disk.*` fault keys and applies `global_multiplier`. + pub fn from_env_or_default() -> Self { + use crate::buggify::faults; + use crate::buggify::FaultConfig; + + let fc = FaultConfig::from_env_or_default(); + let gm = fc.global_multiplier; + let base = Self::default(); + + SimulatedWalStoreConfig { + write_fail_prob: (fc.probabilities.get(faults::disk::WRITE_FAIL).copied() + .unwrap_or(base.write_fail_prob) * gm).clamp(0.0, 1.0), + partial_write_prob: (fc.probabilities.get(faults::disk::PARTIAL_WRITE).copied() + .unwrap_or(base.partial_write_prob) * gm).clamp(0.0, 1.0), + fsync_fail_prob: (fc.probabilities.get(faults::disk::FSYNC_FAIL).copied() + .unwrap_or(base.fsync_fail_prob) * gm).clamp(0.0, 1.0), + corruption_prob: (fc.probabilities.get(faults::disk::CORRUPTION).copied() + .unwrap_or(base.corruption_prob) * gm).clamp(0.0, 1.0), + disk_full_prob: (fc.probabilities.get(faults::disk::DISK_FULL).copied() + .unwrap_or(base.disk_full_prob) * gm).clamp(0.0, 1.0), + } + } + /// No faults - for baseline testing pub fn no_faults() -> Self { SimulatedWalStoreConfig { diff --git a/tests/crdt_dst_test.rs b/tests/crdt_dst_test.rs index 9b95cb6..e9d22fb 100644 --- a/tests/crdt_dst_test.rs +++ b/tests/crdt_dst_test.rs @@ -269,3 +269,87 @@ fn test_crdt_dst_determinism_all_types() { assert_eq!(ops1, ops2, "ORSet: Same seed should produce same results"); } + +// ============================================================================= +// GEPA Optimizer Entry Point +// ============================================================================= + +/// Test using BUGGIFY_CONFIG env var for CRDT replication fault configuration. +/// This is a GEPA DST optimizer target. +/// +/// Run with: BUGGIFY_CONFIG="global_multiplier=2.0,..." cargo test --release --test crdt_dst_test test_env_config -- --nocapture +#[test] +fn test_env_config_crdt() { + let num_seeds: usize = 20; + let ops_per_seed: usize = 200; + + let mut total_ops: u64 = 0; + let mut total_message_drops: u64 = 0; + let mut total_convergence_failures: u64 = 0; + let mut total_invariant_violations: u64 = 0; + + // GCounter + let gc_results = run_gcounter_batch( + 50000, + num_seeds, + ops_per_seed, + CRDTDSTConfig::from_env_or_default, + ); + for r in &gc_results { + total_ops += r.total_operations; + total_message_drops += r.messages_dropped; + if !r.converged { + total_convergence_failures += 1; + } + total_invariant_violations += r.invariant_violations.len() as u64; + } + + // PNCounter + let pn_results = run_pncounter_batch( + 60000, + num_seeds, + ops_per_seed, + CRDTDSTConfig::from_env_or_default, + ); + for r in &pn_results { + total_ops += r.total_operations; + total_message_drops += r.messages_dropped; + if !r.converged { + total_convergence_failures += 1; + } + total_invariant_violations += r.invariant_violations.len() as u64; + } + + // ORSet + let or_results = run_orset_batch( + 70000, + num_seeds, + ops_per_seed, + CRDTDSTConfig::from_env_or_default, + ); + for r in &or_results { + total_ops += r.total_operations; + total_message_drops += r.messages_dropped; + if !r.converged { + total_convergence_failures += 1; + } + total_invariant_violations += r.invariant_violations.len() as u64; + } + + // Structured output for GEPA optimizer parsing + println!("\n=== GEPA CRDT Evaluation ==="); + println!("GCounter: {}", summarize_batch(&gc_results)); + println!("PNCounter: {}", summarize_batch(&pn_results)); + println!("ORSet: {}", summarize_batch(&or_results)); + println!("GEPA_CRDT_SEEDS={}", num_seeds * 3); + println!("GEPA_CRDT_TOTAL_OPS={}", total_ops); + println!("GEPA_CRDT_MESSAGE_DROPS={}", total_message_drops); + println!( + "GEPA_CRDT_CONVERGENCE_FAILURES={}", + total_convergence_failures + ); + println!( + "GEPA_CRDT_INVARIANT_VIOLATIONS={}", + total_invariant_violations + ); +} diff --git a/tests/dst_batch_verification.rs b/tests/dst_batch_verification.rs index d68c314..3e7eff0 100644 --- a/tests/dst_batch_verification.rs +++ b/tests/dst_batch_verification.rs @@ -104,3 +104,39 @@ fn test_batch_runner_core_dst() { ); assert!(results.total_operations >= 100_000); } + +/// Test using BUGGIFY_CONFIG env var for fault configuration. +/// This is the entry point for the GEPA DST optimizer. +/// +/// Run with: BUGGIFY_CONFIG="global_multiplier=2.0,..." cargo test --release --test dst_batch_verification test_env_config +#[test] +fn test_env_config_batch() { + let fault_config = FaultConfig::from_env_or_default(); + let gm = fault_config.global_multiplier; + + let stats = run_redis_dst_batch(40000, 50, 200, fault_config); + + // Structured output for GEPA optimizer parsing + println!("\n=== GEPA DST Evaluation ==="); + println!("GEPA_GLOBAL_MULTIPLIER={:.4}", gm); + println!("GEPA_TOTAL_RUNS={}", stats.total_runs); + println!("GEPA_TOTAL_OPS={}", stats.total_operations); + println!("GEPA_TOTAL_CRASHES={}", stats.total_crashes); + println!("GEPA_TOTAL_RECOVERIES={}", stats.total_recoveries); + println!("GEPA_FAILURES={}", stats.failed_seeds.len()); + println!("GEPA_FAILED_SEEDS={:?}", stats.failed_seeds); + println!("{}", stats.summary()); + + // Report buggify stats for coverage tracking + let bstats = buggify::get_stats(); + let total_checks: u64 = bstats.checks.values().sum(); + let total_triggers: u64 = bstats.triggers.values().sum(); + let faults_with_triggers = bstats.triggers.len(); + println!("GEPA_BUGGIFY_CHECKS={}", total_checks); + println!("GEPA_BUGGIFY_TRIGGERS={}", total_triggers); + println!("GEPA_FAULTS_TRIGGERED={}", faults_with_triggers); + + // This test always passes — the optimizer reads the structured output + // to compute fitness. A "failure" in DST means a bug was found, which + // is what the optimizer is trying to maximize. +} diff --git a/tests/streaming_dst_test.rs b/tests/streaming_dst_test.rs index c79f463..4b21fa0 100644 --- a/tests/streaming_dst_test.rs +++ b/tests/streaming_dst_test.rs @@ -17,7 +17,7 @@ //! - **Chaos tests**: Many faults, stress test use redis_sim::streaming::{ - run_dst_batch, summarize_batch, StreamingDSTConfig, StreamingDSTHarness, + run_dst_batch, summarize_batch, SimulatedStoreConfig, StreamingDSTConfig, StreamingDSTHarness, }; // ============================================================================= @@ -263,3 +263,61 @@ async fn test_streaming_dst_rapid_crash_recovery() { // Should handle rapid crashes gracefully assert!(result.total_operations >= 200); } + +// ============================================================================= +// GEPA Optimizer Entry Point +// ============================================================================= + +/// Test using BUGGIFY_CONFIG env var for object store fault configuration. +/// This is a GEPA DST optimizer target. +/// +/// Run with: BUGGIFY_CONFIG="global_multiplier=2.0,..." cargo test --release --test streaming_dst_test test_env_config -- --nocapture +#[tokio::test] +async fn test_env_config_streaming() { + let store_config = SimulatedStoreConfig::from_env_or_default(); + + let num_seeds: u64 = 20; + let ops_per_seed: usize = 100; + + let mut total_ops: u64 = 0; + let mut total_put_failures: u64 = 0; + let mut total_get_failures: u64 = 0; + let mut total_invariant_violations: u64 = 0; + let mut total_flushes: u64 = 0; + let mut total_crashes: u64 = 0; + + for seed in 0..num_seeds { + let config = StreamingDSTConfig { + seed, + store_config: store_config.clone(), + flush_probability: 0.15, + crash_probability: 0.03, + ..StreamingDSTConfig::default() + }; + + let mut harness = StreamingDSTHarness::new(config).await; + harness.run(ops_per_seed).await; + harness.check_invariants().await; + + let result = harness.result(); + total_ops += result.total_operations; + total_put_failures += result.store_stats.put_failures; + total_get_failures += result.store_stats.get_failures; + total_invariant_violations += result.invariant_violations.len() as u64; + total_flushes += result.flushes; + total_crashes += result.crashes; + } + + // Structured output for GEPA optimizer parsing + println!("\n=== GEPA Streaming Evaluation ==="); + println!("GEPA_STREAMING_SEEDS={}", num_seeds); + println!("GEPA_STREAMING_OPS={}", total_ops); + println!("GEPA_STREAMING_PUT_FAILURES={}", total_put_failures); + println!("GEPA_STREAMING_GET_FAILURES={}", total_get_failures); + println!("GEPA_STREAMING_FLUSHES={}", total_flushes); + println!("GEPA_STREAMING_CRASHES={}", total_crashes); + println!( + "GEPA_STREAMING_INVARIANT_VIOLATIONS={}", + total_invariant_violations + ); +} diff --git a/tests/wal_dst_test.rs b/tests/wal_dst_test.rs index 44f78a8..58a11db 100644 --- a/tests/wal_dst_test.rs +++ b/tests/wal_dst_test.rs @@ -203,6 +203,51 @@ fn test_wal_dst_truncation_correctness() { } } +// ============================================================================= +// GEPA Optimizer Entry Point +// ============================================================================= + +/// Test using BUGGIFY_CONFIG env var for WAL disk fault configuration. +/// This is a GEPA DST optimizer target. +/// +/// Run with: BUGGIFY_CONFIG="global_multiplier=2.0,..." cargo test --release --test wal_dst_test test_env_config -- --nocapture +#[test] +fn test_env_config_wal() { + let store_config = SimulatedWalStoreConfig::from_env_or_default(); + + let num_seeds: u64 = 30; + let config = WalDSTConfig { + num_writes: 100, + max_file_size: 512, + store_config, + simulate_crash: true, + fsync_after_write: true, + }; + + let results = run_wal_dst_batch(0..num_seeds, config); + let summary = summarize_wal_dst_batch(&results); + + let total_writes: usize = results.iter().map(|r| r.total_writes).sum(); + let total_acked: usize = results.iter().map(|r| r.acknowledged_writes).sum(); + let total_missing: usize = results.iter().map(|r| r.missing_after_recovery).sum(); + let total_write_failures: u64 = results.iter().map(|r| r.store_stats.write_failures).sum(); + let total_sync_failures: u64 = results.iter().map(|r| r.store_stats.sync_failures).sum(); + let total_disk_full: u64 = results.iter().map(|r| r.store_stats.disk_full_errors).sum(); + let failures: usize = results.iter().filter(|r| !r.passed).count(); + + // Structured output for GEPA optimizer parsing + println!("\n=== GEPA WAL Evaluation ==="); + println!("{}", summary); + println!("GEPA_WAL_SEEDS={}", num_seeds); + println!("GEPA_WAL_TOTAL_WRITES={}", total_writes); + println!("GEPA_WAL_ACKNOWLEDGED={}", total_acked); + println!("GEPA_WAL_MISSING_AFTER_RECOVERY={}", total_missing); + println!("GEPA_WAL_WRITE_FAILURES={}", total_write_failures); + println!("GEPA_WAL_SYNC_FAILURES={}", total_sync_failures); + println!("GEPA_WAL_DISK_FULL={}", total_disk_full); + println!("GEPA_WAL_INVARIANT_FAILURES={}", failures); +} + #[test] #[ignore] // Stress test — run manually with `cargo test --release -- --ignored` fn test_wal_dst_1000_seeds_chaos() {