From c105401306c848022a69823282ab583d25c1090e Mon Sep 17 00:00:00 2001 From: Michael Kuehl Date: Thu, 23 Apr 2026 14:21:39 +0200 Subject: [PATCH] feat(engine): add test coverage regression gate to preflight checks (fixes #520) --- agent_fox/engine/coverage.py | 252 +++++++++ agent_fox/engine/dispatch.py | 9 + agent_fox/engine/result_handler.py | 130 +++++ agent_fox/engine/state.py | 7 +- agent_fox/knowledge/migrations.py | 18 +- docs/memory.md | 9 + tests/integration/test_db_plan_state_smoke.py | 3 +- .../property/engine/test_plan_state_props.py | 3 +- tests/property/knowledge/test_db_props.py | 2 +- .../knowledge/test_review_store_props.py | 2 +- tests/test_knowledge_pruning.py | 2 +- tests/unit/engine/test_coverage.py | 507 ++++++++++++++++++ tests/unit/engine/test_db_plan_state.py | 3 +- tests/unit/knowledge/test_db.py | 4 +- tests/unit/knowledge/test_review_store.py | 2 +- 15 files changed, 941 insertions(+), 12 deletions(-) create mode 100644 agent_fox/engine/coverage.py create mode 100644 tests/unit/engine/test_coverage.py diff --git a/agent_fox/engine/coverage.py b/agent_fox/engine/coverage.py new file mode 100644 index 00000000..9f261d77 --- /dev/null +++ b/agent_fox/engine/coverage.py @@ -0,0 +1,252 @@ +"""Test coverage measurement and regression detection. + +Detects the appropriate coverage tool for the project language, +measures per-file line coverage, and compares baseline vs post-session +coverage to detect regressions on modified files. +""" + +from __future__ import annotations + +import json +import logging +import re +import subprocess +import tomllib +from dataclasses import dataclass +from pathlib import Path + +logger = logging.getLogger(__name__) + +_COVERAGE_TIMEOUT = 600 + + +@dataclass(frozen=True) +class CoverageTool: + name: str + command: list[str] + result_path: str + + +@dataclass(frozen=True) +class FileCoverage: + file_path: str + covered_lines: int + total_lines: int + + @property + def percentage(self) -> float: + if self.total_lines == 0: + return 100.0 + return (self.covered_lines / self.total_lines) * 100.0 + + +@dataclass(frozen=True) +class CoverageResult: + files: dict[str, FileCoverage] + + def coverage_for(self, path: str) -> FileCoverage | None: + return self.files.get(path) + + def to_json(self) -> str: + return json.dumps( + { + path: { + "covered_lines": fc.covered_lines, + "total_lines": fc.total_lines, + } + for path, fc in self.files.items() + } + ) + + @staticmethod + def from_json(raw: str) -> CoverageResult | None: + if not raw: + return None + try: + data = json.loads(raw) + except (json.JSONDecodeError, TypeError): + return None + files: dict[str, FileCoverage] = {} + for path, info in data.items(): + files[path] = FileCoverage( + file_path=path, + covered_lines=info.get("covered_lines", 0), + total_lines=info.get("total_lines", 0), + ) + return CoverageResult(files=files) + + +@dataclass(frozen=True) +class CoverageRegression: + file_path: str + baseline_pct: float + current_pct: float + delta: float + + +def detect_coverage_tool(project_root: Path) -> CoverageTool | None: + pyproject = project_root / "pyproject.toml" + if pyproject.exists(): + try: + data = tomllib.loads(pyproject.read_text(encoding="utf-8")) + tool = data.get("tool", {}) + if "pytest" in tool: + return CoverageTool( + name="pytest-cov", + command=[ + "uv", + "run", + "pytest", + "--cov", + "--cov-report=json", + "-q", + "--no-header", + "-x", + ], + result_path="coverage.json", + ) + except (tomllib.TOMLDecodeError, OSError): + pass + + cargo = project_root / "Cargo.toml" + if cargo.exists(): + try: + data = tomllib.loads(cargo.read_text(encoding="utf-8")) + if "package" in data: + return CoverageTool( + name="cargo-tarpaulin", + command=["cargo", "tarpaulin", "--out", "json", "--output-dir", "."], + result_path="tarpaulin-report.json", + ) + except (tomllib.TOMLDecodeError, OSError): + pass + + go_mod = project_root / "go.mod" + if go_mod.exists(): + return CoverageTool( + name="go-cover", + command=["go", "test", "-coverprofile=coverage.out", "-covermode=count", "./..."], + result_path="coverage.out", + ) + + return None + + +def measure_coverage(project_root: Path, tool: CoverageTool) -> CoverageResult | None: + try: + subprocess.run( + tool.command, + cwd=project_root, + capture_output=True, + text=True, + timeout=_COVERAGE_TIMEOUT, + ) + except subprocess.TimeoutExpired: + logger.warning("Coverage measurement timed out after %ds", _COVERAGE_TIMEOUT) + return None + except Exception: + logger.debug("Coverage measurement failed", exc_info=True) + return None + + result_path = project_root / tool.result_path + if not result_path.exists(): + logger.debug("Coverage result file not found: %s", result_path) + return None + + try: + if tool.name == "pytest-cov": + return _parse_pytest_cov(result_path) + if tool.name == "cargo-tarpaulin": + return _parse_tarpaulin(result_path) + if tool.name == "go-cover": + return _parse_go_cover(result_path) + except Exception: + logger.debug("Failed to parse coverage output from %s", tool.name, exc_info=True) + finally: + try: + result_path.unlink(missing_ok=True) + except Exception: + pass + + return None + + +def _parse_pytest_cov(result_path: Path) -> CoverageResult: + data = json.loads(result_path.read_text()) + files: dict[str, FileCoverage] = {} + for file_path, info in data.get("files", {}).items(): + rel_path = file_path + if Path(file_path).is_absolute(): + try: + rel_path = str(Path(file_path).relative_to(result_path.parent)) + except ValueError: + pass + summary = info.get("summary", {}) + files[rel_path] = FileCoverage( + file_path=rel_path, + covered_lines=summary.get("covered_lines", 0), + total_lines=summary.get("num_statements", 0), + ) + return CoverageResult(files=files) + + +def _parse_tarpaulin(result_path: Path) -> CoverageResult: + data = json.loads(result_path.read_text()) + files: dict[str, FileCoverage] = {} + for entry in data.get("files", []): + path = entry.get("path", "") + traces = entry.get("traces", []) + total = len(traces) + covered = sum(1 for t in traces if t.get("stats", {}).get("Line", 0) > 0) + files[path] = FileCoverage( + file_path=path, + covered_lines=covered, + total_lines=total, + ) + return CoverageResult(files=files) + + +def _parse_go_cover(result_path: Path) -> CoverageResult: + content = result_path.read_text() + file_stats: dict[str, tuple[int, int]] = {} + for line in content.splitlines(): + if line.startswith("mode:"): + continue + match = re.match(r"^(.+?):(\d+)\.\d+,(\d+)\.\d+\s+(\d+)\s+(\d+)$", line) + if match: + path = match.group(1) + num_statements = int(match.group(4)) + count = int(match.group(5)) + prev_c, prev_t = file_stats.get(path, (0, 0)) + file_stats[path] = ( + prev_c + (num_statements if count > 0 else 0), + prev_t + num_statements, + ) + files = {path: FileCoverage(file_path=path, covered_lines=c, total_lines=t) for path, (c, t) in file_stats.items()} + return CoverageResult(files=files) + + +def find_regressions( + baseline: CoverageResult, + current: CoverageResult, + modified_files: list[str], +) -> list[CoverageRegression]: + regressions: list[CoverageRegression] = [] + for file_path in modified_files: + base = baseline.coverage_for(file_path) + curr = current.coverage_for(file_path) + if base is None or curr is None: + continue + if base.total_lines == 0: + continue + delta = curr.percentage - base.percentage + if delta < 0: + regressions.append( + CoverageRegression( + file_path=file_path, + baseline_pct=base.percentage, + current_pct=curr.percentage, + delta=delta, + ) + ) + return regressions diff --git a/agent_fox/engine/dispatch.py b/agent_fox/engine/dispatch.py index 7ba044d8..2a53ff53 100644 --- a/agent_fox/engine/dispatch.py +++ b/agent_fox/engine/dispatch.py @@ -11,6 +11,7 @@ import asyncio import logging +from pathlib import Path from typing import Any from agent_fox.engine.graph_sync import _is_auto_pre @@ -67,6 +68,10 @@ async def dispatch( orch._graph_sync.mark_in_progress(node_id) + # Capture coverage baseline before coder sessions + if node_archetype == "coder" and orch._result_handler is not None: + orch._result_handler.capture_coverage_baseline(node_id, Path.cwd()) + timeout_override: int | None = None max_turns_override: int | None = None if orch._result_handler is not None: @@ -235,6 +240,10 @@ async def fill_pool( orch._graph_sync.mark_in_progress(node_id) + # Capture coverage baseline before coder sessions + if archetype == "coder" and orch._result_handler is not None: + orch._result_handler.capture_coverage_baseline(node_id, Path.cwd()) + timeout_override_p: int | None = None max_turns_override_p: int | None = None if orch._result_handler is not None: diff --git a/agent_fox/engine/result_handler.py b/agent_fox/engine/result_handler.py index a7901cd7..6a35e1eb 100644 --- a/agent_fox/engine/result_handler.py +++ b/agent_fox/engine/result_handler.py @@ -15,6 +15,7 @@ import logging import math from collections.abc import Callable +from pathlib import Path from typing import Any from agent_fox.archetypes import get_archetype @@ -79,6 +80,10 @@ def __init__( self._node_max_turns: dict[str, int | None] = {} self._node_timeout: dict[str, int] = {} self._original_node_timeout: dict[str, int] = {} # per-node original timeouts + + # Coverage regression gate state + self._coverage_baselines: dict[str, Any] = {} + self._coverage_tool: Any = None # None = not checked, False = no tool self._max_timeout_retries: int = max_timeout_retries self._timeout_multiplier: float = timeout_multiplier self._timeout_ceiling_factor: float = timeout_ceiling_factor @@ -100,6 +105,125 @@ def _get_predecessors(self, node_id: str) -> list[str]: """Get predecessor node IDs for a given node.""" return self._graph_sync.predecessors(node_id) + def _get_coverage_tool(self, cwd: Path) -> Any: + """Lazy-detect the coverage tool once per run.""" + if self._coverage_tool is None: + from agent_fox.engine.coverage import detect_coverage_tool + + tool = detect_coverage_tool(cwd) + self._coverage_tool = tool if tool is not None else False + return self._coverage_tool if self._coverage_tool is not False else None + + def capture_coverage_baseline(self, node_id: str, cwd: Path) -> None: + """Measure and store baseline coverage before a coder session.""" + tool = self._get_coverage_tool(cwd) + if tool is None: + return + try: + from agent_fox.engine.coverage import measure_coverage + + result = measure_coverage(cwd, tool) + if result is not None: + self._coverage_baselines[node_id] = result + logger.debug("Captured coverage baseline for %s (%d files)", node_id, len(result.files)) + except Exception: + logger.debug("Failed to capture coverage baseline for %s", node_id, exc_info=True) + + def check_coverage_regression( + self, + record: SessionRecord, + state: ExecutionState, + cwd: Path, + ) -> str | None: + """Check for coverage regression after a successful coder session. + + Returns JSON coverage data for storage, or None if no measurement + was possible. Emits a blocking finding if coverage regressed. + """ + baseline = self._coverage_baselines.pop(record.node_id, None) + if baseline is None: + return None + + tool = self._get_coverage_tool(cwd) + if tool is None: + return None + + try: + from agent_fox.engine.coverage import find_regressions, measure_coverage + + current = measure_coverage(cwd, tool) + if current is None: + return None + + modified_files = record.files_touched or [] + regressions = find_regressions(baseline, current, modified_files) + + if regressions: + self._emit_coverage_regression(record, regressions, state) + + return current.to_json() + except Exception: + logger.debug("Coverage regression check failed for %s", record.node_id, exc_info=True) + return None + + def _emit_coverage_regression( + self, + record: SessionRecord, + regressions: list[Any], + state: ExecutionState, + ) -> None: + """Record a coverage regression finding and block the node.""" + details = "; ".join( + f"{r.file_path}: {r.baseline_pct:.1f}% → {r.current_pct:.1f}% ({r.delta:+.1f}%)" for r in regressions + ) + reason = f"Coverage regression on {len(regressions)} file(s): {details}" + logger.warning("Coverage regression for %s: %s", record.node_id, reason) + + emit_audit_event( + self._sink, + self._run_id, + AuditEventType.TASK_STATUS_CHANGE, + node_id=record.node_id, + payload={ + "from_status": "completed", + "to_status": "blocked", + "reason": reason, + "regressions": [ + { + "file": r.file_path, + "baseline": r.baseline_pct, + "current": r.current_pct, + "delta": r.delta, + } + for r in regressions + ], + }, + ) + + if self._knowledge_db_conn is not None: + try: + from agent_fox.core.node_id import parse_node_id + + parsed = parse_node_id(record.node_id) + self._knowledge_db_conn.execute( + """ + INSERT INTO review_findings + (id, severity, description, spec_name, task_group, session_id, category) + VALUES + (gen_random_uuid(), 'critical', ?, ?, ?, ?, 'coverage_regression') + """, + [ + reason, + parsed.spec_name, + str(parsed.group_number) if parsed.group_number else "1", + f"{record.node_id}:{record.attempt}", + ], + ) + except Exception: + logger.debug("Failed to persist coverage regression finding", exc_info=True) + + self._block_task(record.node_id, state, reason) + def check_skeptic_blocking( self, record: SessionRecord, @@ -267,6 +391,11 @@ def process( """Process a completed session record and persist state.""" update_state_with_session(state, record) + # Compute coverage data for successful coder sessions before DB write + coverage_data: str | None = None + if record.status == "completed" and self._get_node_archetype(record.node_id) == "coder": + coverage_data = self.check_coverage_regression(record, state, Path.cwd()) + # 105-REQ-3.2: Record session outcome to DB (unified single source of truth). # 105-REQ-4.3: Accumulate run token/cost totals. if self._knowledge_db_conn is not None: @@ -306,6 +435,7 @@ def process( error_message=record.error_message, is_transport_error=record.is_transport_error, retrieval_summary=record.retrieval_summary, # 113-REQ-7.2 + coverage_data=coverage_data, ) _record_session_db(self._knowledge_db_conn, outcome) _update_run_totals( diff --git a/agent_fox/engine/state.py b/agent_fox/engine/state.py index 220c35c4..77a985ff 100644 --- a/agent_fox/engine/state.py +++ b/agent_fox/engine/state.py @@ -84,6 +84,7 @@ class SessionOutcomeRecord: error_message: str | None # SQL NULL for successful sessions (REQ-3.E1) is_transport_error: bool retrieval_summary: str | None = None # JSON: {"facts_injected": int, "signals_active": [...]} + coverage_data: str | None = None # JSON: per-file coverage for trend tracking @dataclass @@ -411,8 +412,9 @@ def record_session( id, spec_name, task_group, node_id, touched_path, status, input_tokens, output_tokens, duration_ms, created_at, run_id, attempt, cost, model, archetype, - commit_sha, error_message, is_transport_error, retrieval_summary - ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + commit_sha, error_message, is_transport_error, retrieval_summary, + coverage_data + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, [ record.id, @@ -434,6 +436,7 @@ def record_session( record.error_message, # None -> SQL NULL (REQ-3.E1) record.is_transport_error, record.retrieval_summary, # 113-REQ-7.2 + record.coverage_data, ], ) diff --git a/agent_fox/knowledge/migrations.py b/agent_fox/knowledge/migrations.py index b472bf6a..bbaf2a2d 100644 --- a/agent_fox/knowledge/migrations.py +++ b/agent_fox/knowledge/migrations.py @@ -736,6 +736,16 @@ def _migrate_v19(conn: duckdb.DuckDBPyConnection) -> None: """) +def _migrate_v20(conn: duckdb.DuckDBPyConnection) -> None: + """Add coverage_data column to session_outcomes for trend tracking.""" + tables = { + r[0] + for r in conn.execute("SELECT table_name FROM information_schema.tables WHERE table_schema = 'main'").fetchall() + } + if "session_outcomes" in tables: + conn.execute("ALTER TABLE session_outcomes ADD COLUMN IF NOT EXISTS coverage_data TEXT") + + # Registry of all migrations, ordered by version. MIGRATIONS: list[Migration] = [ Migration( @@ -828,6 +838,11 @@ def _migrate_v19(conn: duckdb.DuckDBPyConnection) -> None: description="add errata table for lightweight errata generation", apply=_migrate_v19, ), + Migration( + version=20, + description="add coverage_data column to session_outcomes for trend tracking", + apply=_migrate_v20, + ), ] @@ -861,7 +876,8 @@ def _migrate_v19(conn: duckdb.DuckDBPyConnection) -> None: commit_sha VARCHAR, error_message TEXT, is_transport_error BOOLEAN DEFAULT FALSE, - retrieval_summary TEXT + retrieval_summary TEXT, + coverage_data TEXT ); CREATE TABLE IF NOT EXISTS plan_nodes ( diff --git a/docs/memory.md b/docs/memory.md index c78b2fa4..b01a9532 100644 --- a/docs/memory.md +++ b/docs/memory.md @@ -2,6 +2,15 @@ _3176 facts | last updated: 2026-04-23_ +**2026-04-23 coverage regression gate (issue #520):** Added +`engine/coverage.py` — detects coverage tools (pytest-cov, cargo-tarpaulin, +go test -cover), measures per-file line coverage, and compares baseline vs +post-session to detect regressions. Integrated into engine dispatch (baseline +capture before coder sessions) and result handler (regression check after +successful sessions). Regressions emit blocking findings via review_findings +table. Coverage data stored in session_outcomes (migration v20: coverage_data +column) for trend tracking. +35 tests (4399 total pass). + **2026-04-23 verifier checklist enforcement (issue #521):** Added `spec/verification_checklist.py` — builds a structured verification checklist from tasks.md checkboxes, requirements.md acceptance criteria, diff --git a/tests/integration/test_db_plan_state_smoke.py b/tests/integration/test_db_plan_state_smoke.py index f1ad8ea5..b9633ac3 100644 --- a/tests/integration/test_db_plan_state_smoke.py +++ b/tests/integration/test_db_plan_state_smoke.py @@ -93,7 +93,8 @@ commit_sha VARCHAR, error_message TEXT, is_transport_error BOOLEAN DEFAULT FALSE, - retrieval_summary TEXT + retrieval_summary TEXT, + coverage_data TEXT ); """ diff --git a/tests/property/engine/test_plan_state_props.py b/tests/property/engine/test_plan_state_props.py index d2190e4d..ddb67999 100644 --- a/tests/property/engine/test_plan_state_props.py +++ b/tests/property/engine/test_plan_state_props.py @@ -95,7 +95,8 @@ commit_sha VARCHAR, error_message TEXT, is_transport_error BOOLEAN DEFAULT FALSE, - retrieval_summary TEXT + retrieval_summary TEXT, + coverage_data TEXT ); """ diff --git a/tests/property/knowledge/test_db_props.py b/tests/property/knowledge/test_db_props.py index 64be9716..08f327d3 100644 --- a/tests/property/knowledge/test_db_props.py +++ b/tests/property/knowledge/test_db_props.py @@ -67,7 +67,7 @@ def test_n_open_close_cycles_produce_same_state(self, n: int, tmp_path_factory: version_count = db.connection.execute("SELECT COUNT(*) FROM schema_version").fetchone() assert version_count is not None - assert version_count[0] == 19 + assert version_count[0] == 20 tables = {r[0] for r in db.connection.execute("SHOW TABLES").fetchall()} assert tables == EXPECTED_TABLES diff --git a/tests/property/knowledge/test_review_store_props.py b/tests/property/knowledge/test_review_store_props.py index db8eab41..bc9e9548 100644 --- a/tests/property/knowledge/test_review_store_props.py +++ b/tests/property/knowledge/test_review_store_props.py @@ -115,7 +115,7 @@ def test_migration_idempotency(self, n_runs: int) -> None: # Version should be 18 (latest migration: v18 drop unused knowledge tables) version = conn.execute("SELECT MAX(version) FROM schema_version").fetchone() assert version is not None - assert version[0] == 19 + assert version[0] == 20 # Tables should exist (v2 + v4 migrations; v3 tables dropped by v14) tables = conn.execute( diff --git a/tests/test_knowledge_pruning.py b/tests/test_knowledge_pruning.py index d55843ce..fe4d13ed 100644 --- a/tests/test_knowledge_pruning.py +++ b/tests/test_knowledge_pruning.py @@ -475,7 +475,7 @@ def test_migration_v18_fresh_db(self) -> None: run_migrations(conn) version = conn.execute("SELECT MAX(version) FROM schema_version").fetchone()[0] - assert version == 19 + assert version == 20 conn.close() diff --git a/tests/unit/engine/test_coverage.py b/tests/unit/engine/test_coverage.py new file mode 100644 index 00000000..813c865f --- /dev/null +++ b/tests/unit/engine/test_coverage.py @@ -0,0 +1,507 @@ +"""Tests for the test coverage regression gate. + +Verifies that the coverage module correctly detects coverage tools, +measures per-file coverage, and identifies regressions on modified files. +""" + +from __future__ import annotations + +import json +import textwrap +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest + +from agent_fox.engine.coverage import ( + CoverageRegression, + CoverageResult, + CoverageTool, + FileCoverage, + detect_coverage_tool, + find_regressions, + measure_coverage, +) + + +class TestDetectCoverageTool: + def test_detects_pytest_cov(self, tmp_path: Path) -> None: + (tmp_path / "pyproject.toml").write_text( + '[tool.pytest.ini_options]\ntestpaths = ["tests"]\n' + ) + tool = detect_coverage_tool(tmp_path) + assert tool is not None + assert tool.name == "pytest-cov" + assert "--cov" in tool.command + + def test_detects_cargo_tarpaulin(self, tmp_path: Path) -> None: + (tmp_path / "Cargo.toml").write_text("[package]\nname = \"foo\"\n") + tool = detect_coverage_tool(tmp_path) + assert tool is not None + assert tool.name == "cargo-tarpaulin" + + def test_detects_go_cover(self, tmp_path: Path) -> None: + (tmp_path / "go.mod").write_text("module example.com/foo\n") + tool = detect_coverage_tool(tmp_path) + assert tool is not None + assert tool.name == "go-cover" + + def test_returns_none_for_unknown_project(self, tmp_path: Path) -> None: + assert detect_coverage_tool(tmp_path) is None + + def test_python_takes_precedence_over_makefile(self, tmp_path: Path) -> None: + (tmp_path / "pyproject.toml").write_text("[tool.pytest]\n") + (tmp_path / "Makefile").write_text("test:\n\techo hi\n") + tool = detect_coverage_tool(tmp_path) + assert tool is not None + assert tool.name == "pytest-cov" + + def test_handles_unparseable_pyproject(self, tmp_path: Path) -> None: + (tmp_path / "pyproject.toml").write_text("{{invalid toml}}") + assert detect_coverage_tool(tmp_path) is None + + def test_pyproject_without_pytest_section(self, tmp_path: Path) -> None: + (tmp_path / "pyproject.toml").write_text("[tool.ruff]\nline-length = 88\n") + assert detect_coverage_tool(tmp_path) is None + + +class TestFileCoverage: + def test_percentage_calculation(self) -> None: + fc = FileCoverage(file_path="a.py", covered_lines=75, total_lines=100) + assert fc.percentage == 75.0 + + def test_zero_total_lines_returns_100(self) -> None: + fc = FileCoverage(file_path="empty.py", covered_lines=0, total_lines=0) + assert fc.percentage == 100.0 + + def test_full_coverage(self) -> None: + fc = FileCoverage(file_path="a.py", covered_lines=50, total_lines=50) + assert fc.percentage == 100.0 + + +class TestCoverageResult: + def test_coverage_for_existing_file(self) -> None: + fc = FileCoverage(file_path="a.py", covered_lines=10, total_lines=20) + result = CoverageResult(files={"a.py": fc}) + assert result.coverage_for("a.py") is fc + + def test_coverage_for_missing_file(self) -> None: + result = CoverageResult(files={}) + assert result.coverage_for("missing.py") is None + + +class TestFindRegressions: + def test_no_regression_when_coverage_increases(self) -> None: + baseline = CoverageResult( + files={"a.py": FileCoverage("a.py", 50, 100)} + ) + current = CoverageResult( + files={"a.py": FileCoverage("a.py", 60, 100)} + ) + assert find_regressions(baseline, current, ["a.py"]) == [] + + def test_no_regression_when_coverage_unchanged(self) -> None: + baseline = CoverageResult( + files={"a.py": FileCoverage("a.py", 50, 100)} + ) + current = CoverageResult( + files={"a.py": FileCoverage("a.py", 50, 100)} + ) + assert find_regressions(baseline, current, ["a.py"]) == [] + + def test_detects_regression(self) -> None: + baseline = CoverageResult( + files={"a.py": FileCoverage("a.py", 80, 100)} + ) + current = CoverageResult( + files={"a.py": FileCoverage("a.py", 60, 100)} + ) + regressions = find_regressions(baseline, current, ["a.py"]) + assert len(regressions) == 1 + assert regressions[0].file_path == "a.py" + assert regressions[0].baseline_pct == 80.0 + assert regressions[0].current_pct == 60.0 + assert regressions[0].delta == -20.0 + + def test_ignores_unmodified_files(self) -> None: + baseline = CoverageResult( + files={ + "a.py": FileCoverage("a.py", 80, 100), + "b.py": FileCoverage("b.py", 90, 100), + } + ) + current = CoverageResult( + files={ + "a.py": FileCoverage("a.py", 80, 100), + "b.py": FileCoverage("b.py", 50, 100), + } + ) + regressions = find_regressions(baseline, current, ["a.py"]) + assert len(regressions) == 0 + + def test_skips_files_missing_from_baseline(self) -> None: + baseline = CoverageResult(files={}) + current = CoverageResult( + files={"new.py": FileCoverage("new.py", 50, 100)} + ) + assert find_regressions(baseline, current, ["new.py"]) == [] + + def test_skips_files_missing_from_current(self) -> None: + baseline = CoverageResult( + files={"deleted.py": FileCoverage("deleted.py", 80, 100)} + ) + current = CoverageResult(files={}) + assert find_regressions(baseline, current, ["deleted.py"]) == [] + + def test_skips_files_with_zero_baseline_lines(self) -> None: + baseline = CoverageResult( + files={"empty.py": FileCoverage("empty.py", 0, 0)} + ) + current = CoverageResult( + files={"empty.py": FileCoverage("empty.py", 0, 0)} + ) + assert find_regressions(baseline, current, ["empty.py"]) == [] + + def test_multiple_regressions(self) -> None: + baseline = CoverageResult( + files={ + "a.py": FileCoverage("a.py", 80, 100), + "b.py": FileCoverage("b.py", 70, 100), + } + ) + current = CoverageResult( + files={ + "a.py": FileCoverage("a.py", 60, 100), + "b.py": FileCoverage("b.py", 50, 100), + } + ) + regressions = find_regressions(baseline, current, ["a.py", "b.py"]) + assert len(regressions) == 2 + + +class TestMeasureCoverage: + def test_returns_none_on_timeout(self, tmp_path: Path) -> None: + import subprocess + + tool = CoverageTool( + name="pytest-cov", + command=["sleep", "999"], + result_path="coverage.json", + ) + with patch("agent_fox.engine.coverage._COVERAGE_TIMEOUT", 0): + with patch( + "agent_fox.engine.coverage.subprocess.run", + side_effect=subprocess.TimeoutExpired(cmd="sleep", timeout=0), + ): + result = measure_coverage(tmp_path, tool) + assert result is None + + def test_returns_none_when_result_file_missing(self, tmp_path: Path) -> None: + tool = CoverageTool( + name="pytest-cov", + command=["echo", "no-op"], + result_path="coverage.json", + ) + with patch("agent_fox.engine.coverage.subprocess.run"): + result = measure_coverage(tmp_path, tool) + assert result is None + + def test_parses_pytest_cov_output(self, tmp_path: Path) -> None: + cov_data = { + "files": { + "agent_fox/foo.py": { + "summary": { + "covered_lines": 80, + "num_statements": 100, + } + }, + "agent_fox/bar.py": { + "summary": { + "covered_lines": 45, + "num_statements": 50, + } + }, + } + } + result_path = tmp_path / "coverage.json" + result_path.write_text(json.dumps(cov_data)) + + tool = CoverageTool( + name="pytest-cov", + command=["echo", "done"], + result_path="coverage.json", + ) + with patch("agent_fox.engine.coverage.subprocess.run"): + result = measure_coverage(tmp_path, tool) + + assert result is not None + assert len(result.files) == 2 + foo = result.coverage_for("agent_fox/foo.py") + assert foo is not None + assert foo.covered_lines == 80 + assert foo.total_lines == 100 + assert foo.percentage == 80.0 + + def test_parses_go_cover_output(self, tmp_path: Path) -> None: + go_cov = textwrap.dedent("""\ + mode: count + example.com/foo/main.go:10.1,12.1 3 1 + example.com/foo/main.go:14.1,16.1 3 0 + """) + result_path = tmp_path / "coverage.out" + result_path.write_text(go_cov) + + tool = CoverageTool( + name="go-cover", + command=["echo", "done"], + result_path="coverage.out", + ) + with patch("agent_fox.engine.coverage.subprocess.run"): + result = measure_coverage(tmp_path, tool) + + assert result is not None + fc = result.coverage_for("example.com/foo/main.go") + assert fc is not None + assert fc.covered_lines == 3 + assert fc.total_lines == 6 + assert fc.percentage == 50.0 + + def test_parses_tarpaulin_output(self, tmp_path: Path) -> None: + tarp_data = { + "files": [ + { + "path": "src/main.rs", + "traces": [ + {"stats": {"Line": 1}}, + {"stats": {"Line": 0}}, + {"stats": {"Line": 1}}, + ], + } + ] + } + result_path = tmp_path / "tarpaulin-report.json" + result_path.write_text(json.dumps(tarp_data)) + + tool = CoverageTool( + name="cargo-tarpaulin", + command=["echo", "done"], + result_path="tarpaulin-report.json", + ) + with patch("agent_fox.engine.coverage.subprocess.run"): + result = measure_coverage(tmp_path, tool) + + assert result is not None + fc = result.coverage_for("src/main.rs") + assert fc is not None + assert fc.covered_lines == 2 + assert fc.total_lines == 3 + + def test_cleans_up_result_file(self, tmp_path: Path) -> None: + cov_data = {"files": {}} + result_path = tmp_path / "coverage.json" + result_path.write_text(json.dumps(cov_data)) + + tool = CoverageTool( + name="pytest-cov", + command=["echo", "done"], + result_path="coverage.json", + ) + with patch("agent_fox.engine.coverage.subprocess.run"): + measure_coverage(tmp_path, tool) + + assert not result_path.exists() + + +class TestCoverageResultSerialization: + def test_to_json_and_back(self) -> None: + result = CoverageResult( + files={ + "a.py": FileCoverage("a.py", 80, 100), + "b.py": FileCoverage("b.py", 45, 50), + } + ) + json_str = result.to_json() + parsed = json.loads(json_str) + assert "a.py" in parsed + assert parsed["a.py"]["covered_lines"] == 80 + assert parsed["a.py"]["total_lines"] == 100 + + def test_from_json(self) -> None: + data = { + "a.py": {"covered_lines": 80, "total_lines": 100}, + } + result = CoverageResult.from_json(json.dumps(data)) + assert result is not None + fc = result.coverage_for("a.py") + assert fc is not None + assert fc.percentage == 80.0 + + def test_from_json_returns_none_on_invalid(self) -> None: + assert CoverageResult.from_json("not json") is None + assert CoverageResult.from_json("") is None + + +class TestResultHandlerCoverageIntegration: + """Test coverage capture and regression check via SessionResultHandler.""" + + def _make_handler(self) -> "SessionResultHandler": + from agent_fox.engine.result_handler import SessionResultHandler + + mock_graph_sync = MagicMock() + mock_graph_sync.node_states = {} + mock_graph_sync.predecessors.return_value = [] + + return SessionResultHandler( + graph_sync=mock_graph_sync, + routing_ladders={}, + retries_before_escalation=1, + max_retries=2, + task_callback=None, + sink=None, + run_id="test-run", + graph=None, + archetypes_config=None, + knowledge_db_conn=None, + block_task_fn=MagicMock(), + check_block_budget_fn=MagicMock(), + ) + + def test_capture_baseline_stores_result(self, tmp_path: Path) -> None: + handler = self._make_handler() + baseline = CoverageResult( + files={"a.py": FileCoverage("a.py", 80, 100)} + ) + with patch( + "agent_fox.engine.coverage.detect_coverage_tool", + return_value=CoverageTool("pytest-cov", ["echo"], "coverage.json"), + ), patch( + "agent_fox.engine.coverage.measure_coverage", + return_value=baseline, + ): + handler.capture_coverage_baseline("spec:1", tmp_path) + + assert "spec:1" in handler._coverage_baselines + assert handler._coverage_baselines["spec:1"] is baseline + + def test_capture_baseline_skips_when_no_tool(self, tmp_path: Path) -> None: + handler = self._make_handler() + with patch( + "agent_fox.engine.coverage.detect_coverage_tool", + return_value=None, + ): + handler.capture_coverage_baseline("spec:1", tmp_path) + assert "spec:1" not in handler._coverage_baselines + + def test_check_regression_returns_json_on_no_regression( + self, tmp_path: Path + ) -> None: + from agent_fox.engine.state import SessionRecord + + handler = self._make_handler() + baseline = CoverageResult( + files={"a.py": FileCoverage("a.py", 80, 100)} + ) + handler._coverage_baselines["spec:1"] = baseline + handler._coverage_tool = CoverageTool("pytest-cov", ["echo"], "coverage.json") + + current = CoverageResult( + files={"a.py": FileCoverage("a.py", 90, 100)} + ) + record = SessionRecord( + node_id="spec:1", + attempt=1, + status="completed", + input_tokens=0, + output_tokens=0, + cost=0.0, + duration_ms=0, + error_message=None, + timestamp="", + files_touched=["a.py"], + ) + state = MagicMock() + with patch( + "agent_fox.engine.coverage.measure_coverage", + return_value=current, + ): + result = handler.check_coverage_regression(record, state, tmp_path) + + assert result is not None + parsed = json.loads(result) + assert "a.py" in parsed + + def test_check_regression_blocks_on_decrease(self, tmp_path: Path) -> None: + from agent_fox.engine.state import SessionRecord + + handler = self._make_handler() + baseline = CoverageResult( + files={"a.py": FileCoverage("a.py", 80, 100)} + ) + handler._coverage_baselines["spec:1"] = baseline + handler._coverage_tool = CoverageTool("pytest-cov", ["echo"], "coverage.json") + + current = CoverageResult( + files={"a.py": FileCoverage("a.py", 60, 100)} + ) + record = SessionRecord( + node_id="spec:1", + attempt=1, + status="completed", + input_tokens=0, + output_tokens=0, + cost=0.0, + duration_ms=0, + error_message=None, + timestamp="", + files_touched=["a.py"], + ) + state = MagicMock() + state.blocked_reasons = {} + with patch( + "agent_fox.engine.coverage.measure_coverage", + return_value=current, + ): + handler.check_coverage_regression(record, state, tmp_path) + + handler._block_task.assert_called_once() + call_args = handler._block_task.call_args + assert "spec:1" == call_args[0][0] + assert "Coverage regression" in call_args[0][2] + + def test_check_regression_returns_none_without_baseline( + self, tmp_path: Path + ) -> None: + from agent_fox.engine.state import SessionRecord + + handler = self._make_handler() + record = SessionRecord( + node_id="spec:1", + attempt=1, + status="completed", + input_tokens=0, + output_tokens=0, + cost=0.0, + duration_ms=0, + error_message=None, + timestamp="", + files_touched=["a.py"], + ) + state = MagicMock() + result = handler.check_coverage_regression(record, state, tmp_path) + assert result is None + + +class TestMigrationV20: + def test_adds_coverage_data_column(self) -> None: + import duckdb + + from agent_fox.knowledge.migrations import run_migrations + + conn = duckdb.connect(":memory:") + run_migrations(conn) + cols = conn.execute( + "SELECT column_name FROM information_schema.columns " + "WHERE table_name = 'session_outcomes' AND column_name = 'coverage_data'" + ).fetchall() + assert len(cols) == 1 + conn.close() diff --git a/tests/unit/engine/test_db_plan_state.py b/tests/unit/engine/test_db_plan_state.py index 0f8f5a6b..a861c3eb 100644 --- a/tests/unit/engine/test_db_plan_state.py +++ b/tests/unit/engine/test_db_plan_state.py @@ -97,7 +97,8 @@ commit_sha VARCHAR, error_message TEXT, is_transport_error BOOLEAN DEFAULT FALSE, - retrieval_summary TEXT + retrieval_summary TEXT, + coverage_data TEXT ); """ diff --git a/tests/unit/knowledge/test_db.py b/tests/unit/knowledge/test_db.py index 401a6d26..12be1925 100644 --- a/tests/unit/knowledge/test_db.py +++ b/tests/unit/knowledge/test_db.py @@ -75,7 +75,7 @@ def test_version_1_recorded(self, knowledge_config: KnowledgeConfig) -> None: rows = db.connection.execute( "SELECT version, applied_at, description FROM schema_version ORDER BY version" ).fetchall() - assert len(rows) == 19 + assert len(rows) == 20 assert rows[0][0] == 1 assert rows[0][1] is not None # applied_at is a valid timestamp assert len(rows[0][2]) > 0 # description is non-empty @@ -136,7 +136,7 @@ def test_double_open_does_not_duplicate(self, knowledge_config: KnowledgeConfig) db2.open() count = db2.connection.execute("SELECT COUNT(*) FROM schema_version").fetchone() assert count is not None - assert count[0] == 19 + assert count[0] == 20 db2.close() diff --git a/tests/unit/knowledge/test_review_store.py b/tests/unit/knowledge/test_review_store.py index 203de2f6..1b0ced1f 100644 --- a/tests/unit/knowledge/test_review_store.py +++ b/tests/unit/knowledge/test_review_store.py @@ -234,7 +234,7 @@ def test_migration_already_applied_skips(self) -> None: # Verify version is recorded version = conn.execute("SELECT MAX(version) FROM schema_version").fetchone() assert version is not None - assert version[0] == 19 + assert version[0] == 20 conn.close()