Skip to content
Closed
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,16 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Fixed

- **Dead code benchmark: filter feature-removal false positives from ground truth** (supermodeltools/supermodel-public-api#714):
The ground truth extractor now applies the existing `_is_feature_removal_fp` filter (which
was implemented but never called). Symbols deleted in a PR that are also imported by other
files deleted in the same PR are excluded from GT — they were live code co-removed with
their consumers, not dead code. Genuinely orphaned symbols with no deleted importer are
kept. This fixes 0-recall scores for PRs like n8n #23572 and prisma #28485 where whole
files were removed as part of a feature deletion.

## [0.14.0] - 2026-02-13

### Added
Expand Down
43 changes: 12 additions & 31 deletions src/mcpbr/benchmarks/supermodel/api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,9 @@
import hashlib
import json
import logging
import os
import sys
import tempfile
import time
from typing import Any

logger = logging.getLogger("mcpbr.supermodel")

Expand Down Expand Up @@ -44,29 +43,19 @@ async def call_supermodel_api(
with open(zip_path, "rb") as f:
zip_hash = hashlib.sha256(f.read()).hexdigest()[:12]
ep_name = endpoint_path.strip("/").replace("/", "-")
idempotency_key = f"bench:{ep_name}:{zip_hash}:v2"
idempotency_key = f"bench:{ep_name}:{zip_hash}:v3"

headers = [
"-H",
"Accept: application/json",
"-H",
f"Idempotency-Key: {idempotency_key}",
]

# Pass API key via curl config file to avoid exposure in process table (ps aux)
api_key_config_path: str | None = None
if api_key:
with tempfile.NamedTemporaryFile(
mode="w", suffix=".cfg", prefix="mcpbr_curl_", delete=False
) as api_key_fd:
api_key_fd.write(f'header = "X-Api-Key: {api_key}"\n')
api_key_config_path = api_key_fd.name
os.chmod(api_key_config_path, 0o600)
headers.extend(["-H", f"X-Api-Key: {api_key}"])

# Initial request with file upload
upload_cmd = ["curl", "-s", "-X", "POST", url, "-F", f"file=@{zip_path}", *headers]
if api_key_config_path:
upload_cmd.extend(["--config", api_key_config_path])

start_time = time.time()
print(
Expand All @@ -83,10 +72,7 @@ async def call_supermodel_api(
if proc.returncode != 0:
raise RuntimeError(f"Supermodel API request failed: {stderr.decode()}")

try:
response = json.loads(stdout.decode())
except json.JSONDecodeError as e:
raise RuntimeError(f"Non-JSON response from Supermodel API: {stdout.decode()[:500]}") from e
response = json.loads(stdout.decode())

# Poll if async — use lightweight requests (1-byte dummy file instead of
# re-uploading the full zip). The API recognizes the idempotency key and
Expand All @@ -102,6 +88,8 @@ async def call_supermodel_api(

# Create poll dummy on first iteration only
if poll_dummy_path is None:
import tempfile

with tempfile.NamedTemporaryFile(suffix=".zip", delete=False) as poll_dummy:
poll_dummy.write(b"\n")
poll_dummy_path = poll_dummy.name
Expand All @@ -116,8 +104,6 @@ async def call_supermodel_api(
f"file=@{poll_dummy_path}",
*headers,
]
if api_key_config_path:
poll_cmd.extend(["--config", api_key_config_path])

retry_after = response.get("retryAfter", 10)
poll_count += 1
Expand All @@ -138,17 +124,12 @@ async def call_supermodel_api(

if proc.returncode != 0:
raise RuntimeError(f"Supermodel API poll failed: {stderr.decode()}")
try:
response = json.loads(stdout.decode())
except json.JSONDecodeError as e:
raise RuntimeError(
f"Non-JSON poll response from Supermodel API: {stdout.decode()[:500]}"
) from e
response = json.loads(stdout.decode())
finally:
if poll_dummy_path is not None:
os.unlink(poll_dummy_path)
if api_key_config_path is not None:
os.unlink(api_key_config_path)
import os as _os

_os.unlink(poll_dummy_path)

elapsed = time.time() - start_time

Expand All @@ -161,6 +142,6 @@ async def call_supermodel_api(
if isinstance(status, int) and status >= 400:
raise RuntimeError(f"Supermodel API HTTP {status}: {response.get('message', response)}")

api_result = response.get("result", response)
api_result: dict[str, Any] = response.get("result", response)
print(f" Supermodel API: completed in {elapsed:.1f}s", file=sys.stderr, flush=True)
return dict(api_result)
return api_result
60 changes: 45 additions & 15 deletions src/mcpbr/benchmarks/supermodel/benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ def __init__(
tasks: list[dict[str, Any]] | None = None,
supermodel_api_base: str = "https://api.supermodel.dev",
supermodel_api_key: str | None = None,
resolved_threshold: float = 0.8,
ground_truth_dir: str | Path | None = None,
supermodel_api_timeout: int = 900,
**kwargs: Any,
Expand All @@ -63,9 +62,6 @@ def __init__(
tasks: List of task config dicts from YAML.
supermodel_api_base: Base URL for Supermodel API.
supermodel_api_key: API key (or set SUPERMODEL_API_KEY env var).
resolved_threshold: Recall threshold to consider a task 'resolved' (precision is
reported but not required — the API returns many valid dead-code
candidates beyond the GT set, so precision is not a fair gate).
ground_truth_dir: Directory to cache ground truth JSON files.
supermodel_api_timeout: Max seconds to wait for Supermodel API (default 900).
**kwargs: Additional keyword arguments (ignored for forward compat).
Expand All @@ -75,7 +71,6 @@ def __init__(
self.api_base = supermodel_api_base
self.api_key = supermodel_api_key or os.environ.get("SUPERMODEL_API_KEY")
self.api_timeout = supermodel_api_timeout
self.resolved_threshold = resolved_threshold
self.gt_dir = Path(ground_truth_dir) if ground_truth_dir else DEFAULT_GT_DIR
self.gt_dir.mkdir(parents=True, exist_ok=True)

Expand Down Expand Up @@ -286,11 +281,14 @@ async def create_environment(
self,
task: dict[str, Any],
docker_manager: DockerEnvironmentManager,
is_mcp: bool = True,
) -> TaskEnvironment:
"""Create an isolated environment for the task.

Clones repo at pre-merge commit, calls Supermodel API (or uses cached
analysis), places analysis JSON in workdir, writes REPORT.json placeholder.
For baseline runs (is_mcp=False) the analysis file is written to a hidden
path so the agent cannot accidentally discover and use it.
"""
# Copy to avoid mutating the shared task dict (breaks A/B comparisons)
task = {**task}
Expand Down Expand Up @@ -478,13 +476,19 @@ async def create_environment(
{k: v for k, v in ep.items() if k in ep_keep} for ep in entry_points[:200]
]

# Write all candidates directly into a single analysis file.
# Write analysis file. For MCP the file goes at the well-known path
# that enhanced_prompt_v2 references. For baseline it goes to a hidden
# path so the agent cannot accidentally discover it — but evaluate()
# can still read it for the alive-set precision calculation.
analysis_data = {
"metadataSummary": metadata_summary,
"deadCodeCandidates": slimmed,
"entryPoints": slim_entry_points,
}
index_path = Path(host_workdir) / self._endpoint.analysis_filename
if is_mcp:
index_path = Path(host_workdir) / self._endpoint.analysis_filename
else:
index_path = Path(host_workdir) / f".{self._endpoint.analysis_filename}"
index_path.write_text(json.dumps(analysis_data, indent=2))

logger.info(
Expand All @@ -507,7 +511,10 @@ async def create_environment(
"deadCodeCandidates": [],
"entryPoints": [],
}
index_path = Path(host_workdir) / self._endpoint.analysis_filename
if is_mcp:
index_path = Path(host_workdir) / self._endpoint.analysis_filename
else:
index_path = Path(host_workdir) / f".{self._endpoint.analysis_filename}"
index_path.write_text(json.dumps(empty_analysis, indent=2))

# Start Docker container
Expand Down Expand Up @@ -623,29 +630,52 @@ async def evaluate(
else:
agent_findings = self._extract_findings_from_text(solution)

# Load entry points (confirmed-alive symbols) from the analysis file to
# use as the alive set for precision. FP = findings that are confirmed
# alive, not simply "findings not in GT" (which penalises correct dead
# code the PR happened not to remove).
# For baseline runs the file is at the hidden (.filename) path to prevent
# the agent from discovering it; check both paths.
alive_code: list[dict] | None = None
analysis_path = Path(env.host_workdir) / self._endpoint.analysis_filename
hidden_path = Path(env.host_workdir) / f".{self._endpoint.analysis_filename}"
if not analysis_path.exists() and hidden_path.exists():
analysis_path = hidden_path
if analysis_path.exists():
try:
with open(analysis_path) as f:
analysis_data = json.load(f)
alive_code = analysis_data.get("entryPoints", [])
except (json.JSONDecodeError, OSError):
pass

# Compute P/R/F1
metrics = compute_prf1(agent_findings, ground_truth, key_fields)
metrics = compute_prf1(agent_findings, ground_truth, key_fields, alive_code=alive_code)

precision = metrics["precision"]
recall = metrics["recall"]
resolved = recall >= self.resolved_threshold
# resolved = True means the agent completed without error (evaluate() was reached).
# Pass/fail is not gated on a recall threshold — use precision/recall to judge quality.
resolved = True
fp_mode = "vs alive set" if (alive_code is not None and alive_code) else "vs gt"

# Log results
print(f"\n{'=' * 50}")
print(f"SUPERMODEL EVALUATION - {env.instance_id} ({self.analysis_type})")
print(f" Found: {metrics['found']} items")
print(f" Expected: {metrics['expected']} items")
print(f" Found: {metrics['found']} items (unique file+name pairs)")
print(f" Expected: {metrics['expected']} items (GT from PR diff)")
print(f" True Positives: {metrics['true_positives']}")
print(f" False Positives: {metrics['false_positives']}")
print(f" False Positives: {metrics['false_positives']} ({fp_mode})")
print(f" False Negatives: {metrics['false_negatives']}")
print(f" Precision: {precision * 100:.1f}%")
print(f" Precision: {precision * 100:.1f}% ({fp_mode})")
print(f" Recall: {recall * 100:.1f}%")
print(f" F1 Score: {metrics['f1_score'] * 100:.1f}%")
print(f" Resolved: {resolved}")
print(f" Succeeded: {resolved}")
print(f"{'=' * 50}\n")

return {
"resolved": resolved,
"fp_mode": fp_mode,
**metrics,
}

Expand Down
105 changes: 105 additions & 0 deletions src/mcpbr/benchmarks/supermodel/endpoints/dead_code.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
(r"^-\s*export\s+const\s+(\w+)\s*[=:]", "const"),
(r"^-\s*export\s+default\s+(?:async\s+)?function\s+(\w+)", "function"),
(r"^-\s*export\s+default\s+class\s+(\w+)", "class"),
(r"^-\s*export\s+interface\s+(\w+)", "interface"),
(r"^-\s*export\s+type\s+(\w+)\s*[={<]", "type"),
(r"^-\s*export\s+(?:const\s+)?enum\s+(\w+)", "enum"),
]

# Patterns for Python declarations
Expand Down Expand Up @@ -44,6 +47,14 @@
r"\.rs$",
]

# Patterns for state-machine import parsing of deleted lines
_IMPORT_OPEN_RE = re.compile(r"^-\s*import\s+(?:type\s+)?(?:\w+\s*,\s*)?\{")
_IMPORT_SINGLE_RE = re.compile(
r"^-\s*import\s+(?:type\s+)?(?:\w+\s*,\s*)?\{([^}]+)\}\s+from\s+['\"]([^'\"]+)['\"]"
)
_IMPORT_DEFAULT_RE = re.compile(r"^-\s*import\s+(?:type\s+)?(\w+)\s+from\s+['\"]([^'\"]+)['\"]")
_IMPORT_FROM_RE = re.compile(r"from\s+['\"]([^'\"]+)['\"]")

SKIP_NAMES = {
"default",
"module",
Expand Down Expand Up @@ -152,6 +163,8 @@ def enhanced_prompt_v2(self) -> str:
continue
if "Type/interface" in reason:
continue
if c.get("confidence") not in ("high", None):
continue

dead_code.append({
"file": c.get("file", ""),
Expand Down Expand Up @@ -220,6 +233,15 @@ def extract_ground_truth(
declarations = _parse_diff(diff, language)
if scope_prefix:
declarations = [d for d in declarations if d.file.startswith(scope_prefix)]

# Filter feature-removal false positives: if a deleted symbol is imported
# by another file also deleted in the same PR, it was live code (not dead
# code) that was removed together with its consumers. Such symbols should
# not appear in the ground truth because no static-analysis tool would
# ever report them as dead pre-merge. (#714)
deleted_imports = _parse_deleted_imports(diff)
declarations = [d for d in declarations if not _is_feature_removal_fp(d, deleted_imports)]

return [{"file": d.file, "name": d.name, "type": d.type} for d in declarations]


Expand Down Expand Up @@ -263,3 +285,86 @@ def _parse_diff(diff_text: str, language: str = "typescript") -> list[RemovedDec
break

return declarations


def _parse_deleted_imports(diff_text: str) -> dict[str, set[str]]:
"""Parse deleted import statements, returning symbol → set[module_specifier].

Handles single-line and multi-line named import blocks plus default imports.
Capturing the module specifier (the `from '...'` part) lets callers do
file-aware filtering rather than just name-based filtering.
"""
# symbol_name -> set of module specifiers that imported it
imports: dict[str, set[str]] = {}
accumulating = False
buf: list[str] = []

def _add(name: str, spec: str) -> None:
name = name.strip().split(" as ")[0].strip()
if name:
imports.setdefault(name, set()).add(spec)

def _flush_block(raw: str) -> None:
# Extract module specifier from the accumulated block
m_from = _IMPORT_FROM_RE.search(raw)
spec = m_from.group(1) if m_from else ""
brace_open = raw.find("{")
brace_close = raw.find("}")
if brace_open != -1 and brace_close != -1:
names_part = raw[brace_open + 1 : brace_close]
for part in names_part.split(","):
_add(part, spec)

for line in diff_text.split("\n"):
is_deleted = line.startswith("-") and not line.startswith("---")

if not accumulating:
if not is_deleted:
continue
# Single-line named import: -import { foo, bar } from '...'
m = _IMPORT_SINGLE_RE.match(line)
if m:
spec = m.group(2)
for part in m.group(1).split(","):
_add(part, spec)
continue
# Default import: -import Foo from '...'
m = _IMPORT_DEFAULT_RE.match(line)
if m:
imports.setdefault(m.group(1), set()).add(m.group(2))
continue
# Start of a multi-line named import block
if _IMPORT_OPEN_RE.match(line):
accumulating = True
buf = [line[1:]] # strip leading '-'
continue
else:
if is_deleted:
buf.append(line[1:])
joined = " ".join(buf)
if "}" in joined:
accumulating = False
_flush_block(joined)
buf = []

return imports


def _is_feature_removal_fp(
decl: "RemovedDeclaration", deleted_imports: dict[str, set[str]]
) -> bool:
"""True if the declaration appears to be a feature-removal false positive.

A symbol is considered a false positive when another deleted file in the
same PR imports it FROM a module whose basename matches the declaration's
file. This ties the filter to the actual source file rather than just the
name, preventing spurious suppression of unrelated same-named exports.
"""
if decl.name not in deleted_imports:
return False
decl_stem = re.sub(r"\.(ts|tsx|js|jsx)$", "", decl.file.split("/")[-1])
for spec in deleted_imports[decl.name]:
spec_stem = re.sub(r"\.(ts|tsx|js|jsx)$", "", spec.rstrip("/").split("/")[-1])
if spec_stem and spec_stem == decl_stem:
return True
return False
Loading