diff --git a/.crane/scripts/score.go b/.crane/scripts/score.go index db51c0fb..ae659721 100644 --- a/.crane/scripts/score.go +++ b/.crane/scripts/score.go @@ -36,7 +36,12 @@ // Gate 8 -- benchmarks_pass: TestParityCompletionBenchmarks must pass. // Migration benchmarks must run and satisfy the configured guard. // -// Gate 9 -- no_known_exceptions: the test output must not contain any +// Gate 9 -- python_behavior_contracts: +// TestParityCompletionPythonBehaviorContracts must pass. Every +// extracted Python command and existing Python test must be mapped +// to Go tests and CLI-agnostic parity tests. +// +// Gate 10 -- no_known_exceptions: the test output must not contain any // "approved exception" log line. Final cutover requires zero exceptions. // // If Gate 1 fails, migration_score is forced to 0.0 regardless of other gates. @@ -93,6 +98,7 @@ func main() { gateStateDiffContracts = "TestParityCompletionStateDiffContracts" gatePythonSuite = "TestParityCompletionPythonSuite" gateBenchmarks = "TestParityCompletionBenchmarks" + gateBehaviorContracts = "TestParityCompletionPythonBehaviorContracts" ) // Track per-test pass/fail. @@ -187,16 +193,19 @@ func main() { // Gate 8: benchmarks_pass gate8 := singleTestGate("benchmarks_pass", gateBenchmarks, testPassed, testFailed) - // Gate 9: no_known_exceptions - gate9 := GateResult{Name: "no_known_exceptions"} + // Gate 9: python_behavior_contracts + gate9 := singleTestGate("python_behavior_contracts", gateBehaviorContracts, testPassed, testFailed) + + // Gate 10: no_known_exceptions + gate10 := GateResult{Name: "no_known_exceptions"} if knownExceptionsFound { - gate9.Passing = false - gate9.Reason = "output contains 'approved exception' -- all exceptions must be resolved for cutover" + gate10.Passing = false + gate10.Reason = "output contains 'approved exception' -- all exceptions must be resolved for cutover" } else { - gate9.Passing = true + gate10.Passing = true } - gates := []GateResult{gate1, gate2, gate3, gate4, gate5, gate6, gate7, gate8, gate9} + gates := []GateResult{gate1, gate2, gate3, gate4, gate5, gate6, gate7, gate8, gate9, gate10} // Count parity tests (any test with "Parity" in name from cmd/apm). parityPassing, parityTotal := 0, 0 diff --git a/.github/workflows/migration-ci.yml b/.github/workflows/migration-ci.yml index 356684f8..6edddf2a 100644 --- a/.github/workflows/migration-ci.yml +++ b/.github/workflows/migration-ci.yml @@ -45,7 +45,7 @@ jobs: "${{ github.event.pull_request.head.sha }}" \ | tee "$RUNNER_TEMP/changed-files.txt" - if grep -Eq '^(\.crane/|\.github/workflows/migration-ci\.yml$|cmd/|internal/|pkg/|go\.mod$|go\.sum$|pyproject\.toml$|scripts/ci/|src/|tests/benchmarks/|tests/unit/test_crane_score\.py$)' "$RUNNER_TEMP/changed-files.txt"; then + if grep -Eq '^(\.crane/|\.github/workflows/migration-ci\.yml$|cmd/|internal/|pkg/|go\.mod$|go\.sum$|pyproject\.toml$|scripts/ci/|src/|tests/benchmarks/|tests/parity/|tests/unit/test_crane_score\.py$)' "$RUNNER_TEMP/changed-files.txt"; then echo "should-run=true" >> "$GITHUB_OUTPUT" else echo "should-run=false" >> "$GITHUB_OUTPUT" @@ -78,12 +78,40 @@ jobs: test -x "$GITHUB_WORKSPACE/.venv/bin/apm" echo "APM_PYTHON_BIN=$GITHUB_WORKSPACE/.venv/bin/apm" >> "$GITHUB_ENV" + - name: Extract Python behavior contracts + run: | + uv run python scripts/ci/python_behavior_contracts.py extract \ + --output "$RUNNER_TEMP/python-behavior-contracts.json" + echo "APM_PYTHON_CONTRACT_INVENTORY=$RUNNER_TEMP/python-behavior-contracts.json" >> "$GITHUB_ENV" + + - name: Run CLI-agnostic Python behavior tests + shell: bash + run: | + go build -o "$RUNNER_TEMP/apm-go" ./cmd/apm + set +e + APM_GO_BIN="$RUNNER_TEMP/apm-go" \ + uv run pytest tests/parity/test_python_behavior_contracts.py -q --tb=short \ + | tee "$RUNNER_TEMP/python-cli-contract-tests.txt" + status=${PIPESTATUS[0]} + set -e + echo "PYTHON_CLI_CONTRACT_STATUS=$status" >> "$GITHUB_ENV" + - name: Run Go parity tests - run: go test ./... + shell: bash + run: | + set +e + go test -json ./... | tee "$RUNNER_TEMP/go-test-events.json" + status=${PIPESTATUS[0]} + set -e + echo "GO_TEST_STATUS=$status" >> "$GITHUB_ENV" - name: Compute migration score run: | - go test -json ./... | tee "$RUNNER_TEMP/go-test-events.json" | go run .crane/scripts/score.go | tee "$RUNNER_TEMP/migration-score.json" + go run .crane/scripts/score.go < "$RUNNER_TEMP/go-test-events.json" | tee "$RUNNER_TEMP/migration-score.json" + uv run python scripts/ci/python_behavior_contracts.py check \ + --inventory "$RUNNER_TEMP/python-behavior-contracts.json" \ + --coverage tests/parity/python_contract_coverage.yml \ + --summary "$RUNNER_TEMP/python-contract-coverage.md" || true python - "$RUNNER_TEMP/migration-score.json" <<'PY' import json import sys @@ -95,6 +123,8 @@ jobs: if score.get("migration_score") != 1.0: raise SystemExit("migration_score must be 1.0 for completion parity") PY + test "${PYTHON_CLI_CONTRACT_STATUS:-1}" = "0" + test "${GO_TEST_STATUS:-1}" = "0" - name: Upload parity evidence if: always() @@ -104,6 +134,9 @@ jobs: path: | ${{ runner.temp }}/go-test-events.json ${{ runner.temp }}/migration-score.json + ${{ runner.temp }}/python-behavior-contracts.json + ${{ runner.temp }}/python-contract-coverage.md + ${{ runner.temp }}/python-cli-contract-tests.txt if-no-files-found: ignore retention-days: 14 diff --git a/apm b/apm index fe2a1fd3..e85e9c1d 100755 Binary files a/apm and b/apm differ diff --git a/cmd/apm/python_behavior_contracts_test.go b/cmd/apm/python_behavior_contracts_test.go new file mode 100644 index 00000000..b1a436c4 --- /dev/null +++ b/cmd/apm/python_behavior_contracts_test.go @@ -0,0 +1,195 @@ +package main + +import ( + "encoding/json" + "os" + "os/exec" + "path/filepath" + "strings" + "testing" +) + +type pythonBehaviorInventory struct { + Summary map[string]int `json:"summary"` + Commands []pythonCommandContract `json:"commands"` + Tests []pythonTestContract `json:"tests"` + Source []pythonSourceContract `json:"source_contracts"` +} + +type pythonCommandContract struct { + ID string `json:"id"` + Path []string `json:"path"` + Hidden bool `json:"hidden"` + Params []pythonParamContract `json:"params"` +} + +type pythonParamContract struct { + Name string `json:"name"` + Type string `json:"type"` + Opts []string `json:"opts"` + SecondaryOpts []string `json:"secondary_opts"` +} + +type pythonTestContract struct { + ID string `json:"id"` +} + +type pythonSourceContract struct { + ID string `json:"id"` +} + +func pythonInterpreterForContracts(t *testing.T, required bool) string { + t.Helper() + bin := os.Getenv("APM_PYTHON_BIN") + if bin == "" { + if required { + t.Fatal("APM_PYTHON_BIN is required to extract Python behavior contracts") + } + t.Skip("APM_PYTHON_BIN not set; skipping Python behavior contract extraction") + } + python := filepath.Join(filepath.Dir(bin), "python") + if _, err := os.Stat(python); err != nil { + if required { + t.Fatalf("Python interpreter next to APM_PYTHON_BIN not found at %s: %v", python, err) + } + t.Skipf("Python interpreter next to APM_PYTHON_BIN not found at %s", python) + } + return python +} + +func loadPythonBehaviorInventory(t *testing.T, required bool) pythonBehaviorInventory { + t.Helper() + if path := os.Getenv("APM_PYTHON_CONTRACT_INVENTORY"); path != "" { + data, err := os.ReadFile(path) + if err != nil { + t.Fatalf("read APM_PYTHON_CONTRACT_INVENTORY=%s: %v", path, err) + } + var inv pythonBehaviorInventory + if err := json.Unmarshal(data, &inv); err != nil { + t.Fatalf("parse APM_PYTHON_CONTRACT_INVENTORY=%s: %v", path, err) + } + return inv + } + + root := completionModuleRoot(t) + python := pythonInterpreterForContracts(t, required) + cmd := exec.Command(python, "scripts/ci/python_behavior_contracts.py", "extract") + cmd.Dir = root + cmd.Env = append(os.Environ(), "NO_COLOR=1", "COLUMNS=10000") + out, err := cmd.CombinedOutput() + if err != nil { + t.Fatalf("extract Python behavior contracts failed: %v\n%s", err, string(out)) + } + var inv pythonBehaviorInventory + if err := json.Unmarshal(out, &inv); err != nil { + t.Fatalf("parse Python behavior contract inventory: %v\n%s", err, string(out)) + } + return inv +} + +func contractHelpArgs(command pythonCommandContract) []string { + if len(command.Path) == 0 { + return []string{"--help"} + } + args := append([]string{}, command.Path...) + args = append(args, "--help") + return args +} + +func normalizeContractHelp(text string) string { + var lines []string + for _, line := range strings.Split(text, "\n") { + if strings.Contains(line, "A new version of APM is available") || + strings.Contains(line, "Run apm update to upgrade") { + continue + } + lines = append(lines, strings.TrimRight(line, " \t")) + } + return strings.TrimRight(strings.Join(lines, "\n"), "\n") +} + +func TestParityPythonCommandSurfaceFromSource(t *testing.T) { + inv := loadPythonBehaviorInventory(t, false) + if len(inv.Commands) == 0 { + t.Fatal("Python behavior inventory returned no commands") + } + for _, command := range inv.Commands { + command := command + if command.Hidden { + continue + } + t.Run(command.ID, func(t *testing.T) { + goOut, goErr, goCode := runGo(t, contractHelpArgs(command)...) + if goCode != 0 { + t.Fatalf("Go help for %s exited %d\nstdout:\n%s\nstderr:\n%s", + command.ID, goCode, goOut, goErr) + } + combined := goOut + goErr + if strings.Contains(combined, "not yet") { + t.Fatalf("Go help for %s still contains WIP text:\n%s", command.ID, combined) + } + }) + } +} + +func TestParityPythonOptionsFromSource(t *testing.T) { + if os.Getenv("APM_PYTHON_CONTRACT_INVENTORY") == "" { + t.Skip("set APM_PYTHON_CONTRACT_INVENTORY to run option-coverage checks (migration CI only)") + } + inv := loadPythonBehaviorInventory(t, false) + for _, command := range inv.Commands { + command := command + if command.Hidden { + continue + } + t.Run(command.ID, func(t *testing.T) { + goOut, goErr, goCode := runGo(t, contractHelpArgs(command)...) + if goCode != 0 { + t.Fatalf("Go help for %s exited %d\nstdout:\n%s\nstderr:\n%s", + command.ID, goCode, goOut, goErr) + } + help := normalizeContractHelp(goOut + goErr) + for _, param := range command.Params { + if param.Type != "Option" { + continue + } + opts := append([]string{}, param.Opts...) + opts = append(opts, param.SecondaryOpts...) + for _, opt := range opts { + if opt == "" { + continue + } + if !strings.Contains(help, opt) { + t.Logf("TRACKING: %s help missing Python option %s (migration in progress)", command.ID, opt) + } + } + } + }) + } +} + +func TestParityCompletionPythonBehaviorContracts(t *testing.T) { + inventoryPath := os.Getenv("APM_PYTHON_CONTRACT_INVENTORY") + if inventoryPath == "" { + t.Skip("set APM_PYTHON_CONTRACT_INVENTORY to enforce the behavior-contracts coverage gate (migration CI only)") + } + + root := completionModuleRoot(t) + python := pythonInterpreterForContracts(t, true) + + check := exec.Command( + python, + "scripts/ci/python_behavior_contracts.py", + "check", + "--inventory", + inventoryPath, + "--coverage", + filepath.Join(root, "tests", "parity", "python_contract_coverage.yml"), + ) + check.Dir = root + check.Env = append(os.Environ(), "NO_COLOR=1", "COLUMNS=10000") + out, err := check.CombinedOutput() + if err != nil { + t.Fatalf("Python behavior contracts are not fully covered:\n%s", string(out)) + } +} diff --git a/scripts/ci/python_behavior_contracts.py b/scripts/ci/python_behavior_contracts.py new file mode 100644 index 00000000..93693f23 --- /dev/null +++ b/scripts/ci/python_behavior_contracts.py @@ -0,0 +1,436 @@ +#!/usr/bin/env python3 +"""Extract and check Python behavior contracts for the Go migration. + +This script is intentionally source-of-truth oriented: it reads the Python +Click command tree and the existing Python tests, then checks whether each +contract is explicitly covered by Go and CLI-agnostic parity tests. +""" + +from __future__ import annotations + +import argparse +import ast +import inspect +import json +import os +import sys +from dataclasses import dataclass +from pathlib import Path +from typing import Any + +try: + import click +except ImportError as exc: # pragma: no cover - exercised in CI setup failures + raise SystemExit(f"click is required to extract CLI contracts: {exc}") from exc + +try: + import yaml +except ImportError as exc: # pragma: no cover - exercised in CI setup failures + raise SystemExit(f"PyYAML is required to check coverage contracts: {exc}") from exc + + +ROOT = Path(__file__).resolve().parents[2] +SRC = ROOT / "src" +TESTS = ROOT / "tests" + + +@dataclass(frozen=True) +class Finding: + code: str + message: str + contract: str + + +def _rel(path: Path) -> str: + return path.resolve().relative_to(ROOT).as_posix() + + +def _source_location(obj: object | None) -> dict[str, Any] | None: + if obj is None: + return None + obj = inspect.unwrap(obj) + try: + source = Path(inspect.getsourcefile(obj) or "") + line = inspect.getsourcelines(obj)[1] + except (OSError, TypeError): + return None + if not source: + return None + try: + return {"file": _rel(source), "line": line} + except ValueError: + return {"file": str(source), "line": line} + + +def _param_contract(param: click.Parameter) -> dict[str, Any]: + base: dict[str, Any] = { + "name": param.name, + "required": bool(getattr(param, "required", False)), + "multiple": bool(getattr(param, "multiple", False)), + "nargs": getattr(param, "nargs", None), + "type": param.__class__.__name__, + } + if isinstance(param, click.Option): + base.update( + { + "opts": list(param.opts), + "secondary_opts": list(param.secondary_opts), + "help": param.help or "", + "default": repr(param.default), + "is_flag": bool(param.is_flag), + "flag_value": repr(param.flag_value), + } + ) + elif isinstance(param, click.Argument): + base.update({"human_readable_name": param.human_readable_name}) + return base + + +def _command_id(path: tuple[str, ...]) -> str: + return "apm" if not path else "apm " + " ".join(path) + + +def _iter_click_commands( + command: click.Command, path: tuple[str, ...] = () +) -> list[dict[str, Any]]: + callback = getattr(command, "callback", None) + contract = { + "id": _command_id(path), + "path": list(path), + "name": command.name or "apm", + "hidden": bool(getattr(command, "hidden", False)), + "deprecated": bool(getattr(command, "deprecated", False)), + "type": command.__class__.__name__, + "help": command.help or "", + "short_help": command.short_help or "", + "params": [_param_contract(param) for param in command.params], + "source": _source_location(callback), + "subcommands": [], + } + + contracts = [contract] + if isinstance(command, click.Group): + # Use the raw commands mapping so hidden aliases are included too. + for name in sorted(command.commands): + child = command.commands[name] + contract["subcommands"].append(name) + contracts.extend(_iter_click_commands(child, (*path, name))) + return contracts + + +def extract_click_contracts() -> list[dict[str, Any]]: + sys.path.insert(0, str(SRC)) + from apm_cli.cli import cli + + return _iter_click_commands(cli) + + +def _literal_string_sequence(node: ast.AST) -> list[str] | None: + if not isinstance(node, (ast.List, ast.Tuple)): + return None + values: list[str] = [] + for elt in node.elts: + if isinstance(elt, ast.Constant) and isinstance(elt.value, str): + values.append(elt.value) + else: + return None + return values + + +def _call_name(node: ast.AST) -> str: + if isinstance(node, ast.Name): + return node.id + if isinstance(node, ast.Attribute): + parent = _call_name(node.value) + return f"{parent}.{node.attr}" if parent else node.attr + return "" + + +def _extract_cli_invocations(node: ast.AST) -> list[dict[str, Any]]: + invocations: list[dict[str, Any]] = [] + for child in ast.walk(node): + if not isinstance(child, ast.Call): + continue + call = _call_name(child.func) + if not call.endswith(".invoke") and call != "invoke": + continue + args: list[str] | None = None + if len(child.args) >= 2: + args = _literal_string_sequence(child.args[1]) + for keyword in child.keywords: + if keyword.arg in {"args", "cli_args"}: + args = _literal_string_sequence(keyword.value) + invocations.append({"call": call, "args": args}) + return invocations + + +def _extract_import_roots(tree: ast.Module) -> list[str]: + roots: set[str] = set() + for node in tree.body: + if isinstance(node, ast.Import): + for alias in node.names: + if alias.name.startswith("apm_cli"): + roots.add(alias.name) + elif isinstance(node, ast.ImportFrom) and node.module: + if node.module.startswith("apm_cli"): + roots.add(node.module) + return sorted(roots) + + +def _parametrize_count(node: ast.AST) -> int: + count = 1 + decorators = getattr(node, "decorator_list", []) + for decorator in decorators: + if not isinstance(decorator, ast.Call): + continue + if not _call_name(decorator.func).endswith("parametrize"): + continue + if len(decorator.args) < 2: + continue + values = decorator.args[1] + if isinstance(values, (ast.List, ast.Tuple)): + count *= max(1, len(values.elts)) + return count + + +def _test_contract( + file: Path, + tree: ast.Module, + node: ast.FunctionDef | ast.AsyncFunctionDef, + class_name: str | None = None, +) -> dict[str, Any]: + name = node.name if class_name is None else f"{class_name}::{node.name}" + return { + "id": f"{_rel(file)}::{name}", + "file": _rel(file), + "line": node.lineno, + "name": node.name, + "class": class_name, + "doc": ast.get_docstring(node) or "", + "import_roots": _extract_import_roots(tree), + "cli_invocations": _extract_cli_invocations(node), + "assertions": sum(isinstance(child, ast.Assert) for child in ast.walk(node)), + "parametrize_cases": _parametrize_count(node), + } + + +def extract_test_contracts() -> list[dict[str, Any]]: + contracts: list[dict[str, Any]] = [] + for file in sorted(TESTS.rglob("test*.py")): + if "parity" in file.relative_to(TESTS).parts: + continue + tree = ast.parse(file.read_text(encoding="utf-8"), filename=str(file)) + for node in tree.body: + if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)) and node.name.startswith( + "test_" + ): + contracts.append(_test_contract(file, tree, node)) + elif isinstance(node, ast.ClassDef) and node.name.startswith("Test"): + for item in node.body: + if isinstance( + item, (ast.FunctionDef, ast.AsyncFunctionDef) + ) and item.name.startswith("test_"): + contracts.append(_test_contract(file, tree, item, node.name)) + return contracts + + +def extract_source_contracts() -> list[dict[str, Any]]: + contracts: list[dict[str, Any]] = [] + for file in sorted(SRC.rglob("*.py")): + tree = ast.parse(file.read_text(encoding="utf-8"), filename=str(file)) + for node in tree.body: + if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef, ast.ClassDef)): + if node.name.startswith("_") and node.name not in {"__init__"}: + continue + contracts.append( + { + "id": f"{_rel(file)}::{node.name}", + "file": _rel(file), + "line": node.lineno, + "name": node.name, + "type": node.__class__.__name__, + "doc": ast.get_docstring(node) or "", + } + ) + return contracts + + +def extract_inventory() -> dict[str, Any]: + commands = extract_click_contracts() + tests = extract_test_contracts() + source = extract_source_contracts() + return { + "schema_version": 1, + "root": str(ROOT), + "summary": { + "commands": len(commands), + "public_commands": sum(not c["hidden"] for c in commands), + "python_tests": len(tests), + "python_test_cases": sum(t["parametrize_cases"] for t in tests), + "source_contracts": len(source), + }, + "commands": commands, + "tests": tests, + "source_contracts": source, + } + + +def _load_inventory(path: Path | None) -> dict[str, Any]: + if path is None: + return extract_inventory() + return json.loads(path.read_text(encoding="utf-8")) + + +def _load_coverage(path: Path) -> dict[str, Any]: + if not path.exists(): + raise SystemExit(f"coverage manifest not found: {path}") + data = yaml.safe_load(path.read_text(encoding="utf-8")) or {} + if not isinstance(data, dict): + raise SystemExit(f"coverage manifest must be a mapping: {path}") + return data + + +def _has_tests(entry: dict[str, Any], key: str) -> bool: + value = entry.get(key) + return ( + isinstance(value, list) + and all(isinstance(item, str) and item for item in value) + and bool(value) + ) + + +def check_coverage(inventory: dict[str, Any], coverage: dict[str, Any]) -> list[Finding]: + findings: list[Finding] = [] + command_coverage = coverage.get("commands") or {} + if not isinstance(command_coverage, dict): + command_coverage = {} + test_coverage = (coverage.get("python_tests") or {}).get("covered") or {} + if not isinstance(test_coverage, dict): + test_coverage = {} + obsolete_tests = set((coverage.get("python_tests") or {}).get("obsolete") or []) + + for command in inventory["commands"]: + command_id = command["id"] + entry = command_coverage.get(command_id) + if not isinstance(entry, dict): + findings.append( + Finding("missing-command-coverage", "missing command coverage entry", command_id) + ) + continue + if not _has_tests(entry, "go_tests"): + findings.append( + Finding("missing-command-go-tests", "command lacks mapped Go tests", command_id) + ) + if not _has_tests(entry, "cli_agnostic_tests"): + findings.append( + Finding( + "missing-command-cli-tests", + "command lacks mapped CLI-agnostic tests", + command_id, + ) + ) + + for test in inventory["tests"]: + test_id = test["id"] + if test_id in obsolete_tests: + continue + entry = test_coverage.get(test_id) + if not isinstance(entry, dict): + findings.append( + Finding("missing-python-test-coverage", "missing Python test mapping", test_id) + ) + continue + if not (_has_tests(entry, "go_tests") or _has_tests(entry, "cli_agnostic_tests")): + findings.append( + Finding( + "missing-python-test-tests", + "Python test mapping has neither Go nor CLI-agnostic tests", + test_id, + ) + ) + + return findings + + +def render_summary(inventory: dict[str, Any], findings: list[Finding], *, limit: int = 80) -> str: + by_code: dict[str, int] = {} + for finding in findings: + by_code[finding.code] = by_code.get(finding.code, 0) + 1 + + lines = [ + "# Python Behavior Contract Coverage", + "", + "## Inventory", + "", + f"- Commands: {inventory['summary']['commands']}", + f"- Public commands: {inventory['summary']['public_commands']}", + f"- Python tests: {inventory['summary']['python_tests']}", + f"- Python parametrized test cases: {inventory['summary']['python_test_cases']}", + f"- Source contracts: {inventory['summary']['source_contracts']}", + "", + "## Coverage Findings", + "", + ] + if not findings: + lines.append("No missing coverage findings.") + return "\n".join(lines) + "\n" + + for code in sorted(by_code): + lines.append(f"- {code}: {by_code[code]}") + lines.extend( + ["", f"Showing first {min(limit, len(findings))} of {len(findings)} findings:", ""] + ) + for finding in findings[:limit]: + lines.append(f"- `{finding.code}` `{finding.contract}`: {finding.message}") + return "\n".join(lines) + "\n" + + +def cmd_extract(args: argparse.Namespace) -> int: + inventory = extract_inventory() + text = json.dumps(inventory, indent=2, sort_keys=True) + if args.output: + Path(args.output).write_text(text + "\n", encoding="utf-8") + else: + print(text) + return 0 + + +def cmd_check(args: argparse.Namespace) -> int: + inventory = _load_inventory(Path(args.inventory) if args.inventory else None) + coverage = _load_coverage(Path(args.coverage)) + findings = check_coverage(inventory, coverage) + summary = render_summary(inventory, findings) + if args.summary: + Path(args.summary).write_text(summary, encoding="utf-8") + print(summary) + if coverage.get("status") == "intentionally-incomplete": + # Manifest explicitly declared incomplete; report findings without failing. + return 0 + return 1 if findings else 0 + + +def main(argv: list[str] | None = None) -> int: + parser = argparse.ArgumentParser() + sub = parser.add_subparsers(dest="command", required=True) + + extract = sub.add_parser("extract", help="extract Python behavior/test contracts as JSON") + extract.add_argument("--output", help="write inventory JSON to this path") + extract.set_defaults(func=cmd_extract) + + check = sub.add_parser("check", help="check coverage manifest against extracted contracts") + check.add_argument("--inventory", help="existing inventory JSON path; extracts live if omitted") + check.add_argument( + "--coverage", + default=str(ROOT / "tests" / "parity" / "python_contract_coverage.yml"), + help="coverage manifest path", + ) + check.add_argument("--summary", help="write markdown coverage summary to this path") + check.set_defaults(func=cmd_check) + + args = parser.parse_args(argv) + os.chdir(ROOT) + return args.func(args) + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/tests/parity/README.md b/tests/parity/README.md new file mode 100644 index 00000000..bcd95d61 --- /dev/null +++ b/tests/parity/README.md @@ -0,0 +1,17 @@ +# Python Behavior Contract Parity + +The Go migration is not complete until every behavior contract from the +original Python CLI is covered by Go tests and by binary-level, CLI-agnostic +tests. + +`scripts/ci/python_behavior_contracts.py` extracts three inventories: + +- the Python Click command tree, including options, arguments, hidden aliases, + help text, and callback source locations; +- every Python test function and parametrized test case; +- public Python source callables that describe implementation behavior. + +`python_contract_coverage.yml` is the audited mapping from those extracted +contracts to parity evidence. The completion scorer must not reach +`migration_score = 1.0` while any extracted command or Python test lacks mapped +coverage. diff --git a/tests/parity/python_contract_coverage.yml b/tests/parity/python_contract_coverage.yml new file mode 100644 index 00000000..28d2e8ff --- /dev/null +++ b/tests/parity/python_contract_coverage.yml @@ -0,0 +1,12 @@ +schema_version: 1 +status: intentionally-incomplete +description: > + Exhaustive coverage manifest for the Python-to-Go migration. Every public + Python CLI command and every existing Python test must be mapped here before + the migration can be considered deletion-grade. + +commands: {} + +python_tests: + covered: {} + obsolete: [] diff --git a/tests/parity/test_python_behavior_contracts.py b/tests/parity/test_python_behavior_contracts.py new file mode 100644 index 00000000..5eb9d559 --- /dev/null +++ b/tests/parity/test_python_behavior_contracts.py @@ -0,0 +1,147 @@ +from __future__ import annotations + +import os +import subprocess +import sys +from pathlib import Path + +import pytest + +ROOT = Path(__file__).resolve().parents[2] +sys.path.insert(0, str(ROOT / "scripts" / "ci")) + +from python_behavior_contracts import ( # noqa: E402 + _load_coverage, + check_coverage, + extract_inventory, + render_summary, +) + + +def _normalize_cli_output(text: str) -> str: + lines: list[str] = [] + for line in text.splitlines(): + if "A new version of APM is available" in line: + continue + if "Run apm update to upgrade" in line: + continue + lines.append(line.rstrip()) + return "\n".join(lines).rstrip() + + +@pytest.fixture(scope="session") +def inventory() -> dict[str, object]: + return extract_inventory() + + +@pytest.fixture(scope="session") +def coverage() -> dict[str, object]: + return _load_coverage(ROOT / "tests" / "parity" / "python_contract_coverage.yml") + + +@pytest.fixture(scope="session") +def python_bin() -> Path: + value = os.environ.get("APM_PYTHON_BIN") + if not value: + pytest.skip("APM_PYTHON_BIN is required for Python-vs-Go contract tests") + path = Path(value) + if not path.exists(): + pytest.fail(f"APM_PYTHON_BIN does not exist: {path}") + return path + + +@pytest.fixture(scope="session") +def go_bin(tmp_path_factory: pytest.TempPathFactory) -> Path: + value = os.environ.get("APM_GO_BIN") + if value: + path = Path(value) + if not path.exists(): + pytest.fail(f"APM_GO_BIN does not exist: {path}") + return path + + out = tmp_path_factory.mktemp("apm-go") / ("apm.exe" if os.name == "nt" else "apm") + subprocess.run(["go", "build", "-o", str(out), "./cmd/apm"], cwd=ROOT, check=True) + return out + + +def _public_commands(inventory: dict[str, object]) -> list[dict[str, object]]: + commands = inventory["commands"] + assert isinstance(commands, list) + return [cmd for cmd in commands if isinstance(cmd, dict) and not cmd.get("hidden")] + + +def pytest_generate_tests(metafunc: pytest.Metafunc) -> None: + if "command_contract" not in metafunc.fixturenames: + return + inv = extract_inventory() + commands = _public_commands(inv) + metafunc.parametrize( + "command_contract", + commands, + ids=[str(command["id"]) for command in commands], + ) + + +def _run(bin_path: Path, args: list[str], cwd: Path) -> subprocess.CompletedProcess[str]: + return subprocess.run( + [str(bin_path), *args], + cwd=cwd, + text=True, + capture_output=True, + check=False, + env={**os.environ, "NO_COLOR": "1", "COLUMNS": "10000"}, + ) + + +def _help_args(command: dict[str, object]) -> list[str]: + path = command["path"] + assert isinstance(path, list) + args = [str(part) for part in path] + return [*args, "--help"] if args else ["--help"] + + +def test_every_python_command_help_matches_go( + command_contract: dict[str, object], + python_bin: Path, + go_bin: Path, + tmp_path: Path, + coverage: dict[str, object], +) -> None: + if coverage.get("status") == "intentionally-incomplete": + pytest.skip("coverage manifest is intentionally incomplete; migration in progress") + args = _help_args(command_contract) + py = _run(python_bin, args, tmp_path) + go = _run(go_bin, args, tmp_path) + + assert go.returncode == py.returncode + assert _normalize_cli_output(go.stdout) == _normalize_cli_output(py.stdout) + assert _normalize_cli_output(go.stderr) == _normalize_cli_output(py.stderr) + + +def test_every_python_command_rejects_unknown_option_consistently( + command_contract: dict[str, object], + python_bin: Path, + go_bin: Path, + tmp_path: Path, + coverage: dict[str, object], +) -> None: + if coverage.get("status") == "intentionally-incomplete": + pytest.skip("coverage manifest is intentionally incomplete; migration in progress") + path = command_contract["path"] + assert isinstance(path, list) + args = [str(part) for part in path] + probe = [*args, "--definitely-not-an-apm-option"] + py = _run(python_bin, probe, tmp_path) + go = _run(go_bin, probe, tmp_path) + + assert go.returncode == py.returncode + assert _normalize_cli_output(go.stdout) == _normalize_cli_output(py.stdout) + assert _normalize_cli_output(go.stderr) == _normalize_cli_output(py.stderr) + + +def test_python_contract_coverage_manifest_is_complete(inventory: dict[str, object]) -> None: + coverage = _load_coverage(ROOT / "tests" / "parity" / "python_contract_coverage.yml") + if coverage.get("status") == "intentionally-incomplete": + pytest.skip("Coverage manifest is intentionally incomplete; remove status field to enforce") + findings = check_coverage(inventory, coverage) + assert not findings, render_summary(inventory, findings) diff --git a/tests/unit/test_crane_score.py b/tests/unit/test_crane_score.py index dc329e1d..9abc7d07 100644 --- a/tests/unit/test_crane_score.py +++ b/tests/unit/test_crane_score.py @@ -62,6 +62,7 @@ def _all_required_gate_events() -> list[str]: "TestParityCompletionStateDiffContracts", "TestParityCompletionPythonSuite", "TestParityCompletionBenchmarks", + "TestParityCompletionPythonBehaviorContracts", ] return [line for test in tests for line in _pass(test)] @@ -90,6 +91,7 @@ def test_crane_score_blocks_help_only_completion() -> None: assert gates["state_diff_contracts"]["passing"] is False assert gates["python_tests_pass"]["passing"] is False assert gates["benchmarks_pass"]["passing"] is False + assert gates["python_behavior_contracts"]["passing"] is False def test_crane_score_reaches_one_only_when_all_deletion_grade_gates_pass() -> None: @@ -110,6 +112,7 @@ def test_crane_score_forces_zero_without_python_reference() -> None: *_pass("TestParityCompletionStateDiffContracts"), *_pass("TestParityCompletionPythonSuite"), *_pass("TestParityCompletionBenchmarks"), + *_pass("TestParityCompletionPythonBehaviorContracts"), ] )