Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 105 additions & 9 deletions agent_fox/session/review_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
logger = logging.getLogger(__name__)

# ---------------------------------------------------------------------------
# Security keyword detection for automatic category classification
# Multi-category keyword classification for automatic category detection
# ---------------------------------------------------------------------------

_SECURITY_KEYWORDS: frozenset[str] = frozenset(
Expand Down Expand Up @@ -80,20 +80,116 @@
}
)

_CORRECTNESS_KEYWORDS: frozenset[str] = frozenset(
{
"wrong behavior",
"wrong behaviour",
"incorrect behavior",
"incorrect behaviour",
"missing functionality",
"missing feature",
"wrong output",
"incorrect output",
"wrong result",
"incorrect result",
"not implemented",
"undefined behavior",
"undefined behaviour",
"logic error",
"wrong logic",
"incorrect logic",
"regression",
"spec compliance",
}
)

def _detect_security_category(description: str) -> str | None:
"""Return 'security' if the description contains security-related keywords.
_COMPATIBILITY_KEYWORDS: frozenset[str] = frozenset(
{
"api mismatch",
"proto field",
"protobuf",
"field disagreement",
"api disagreement",
"interface mismatch",
"type mismatch",
"schema mismatch",
"incompatible",
"backward compatibility",
"breaking change",
"contract violation",
"interface violation",
}
)

Case-insensitive substring match against a known set of security keywords.
Returns None if no security keywords are found.
_TESTING_KEYWORDS: frozenset[str] = frozenset(
{
"missing test",
"no test",
"test coverage",
"untested",
"test infrastructure",
"test suite",
"test case",
"lacks test",
"not tested",
"needs test",
"add test",
"unit test",
"integration test",
"regression test",
}
)

_CONFIGURATION_KEYWORDS: frozenset[str] = frozenset(
{
"wrong port",
"missing config",
"misconfigured",
"misconfiguration",
"config error",
"wrong url",
"wrong endpoint",
"wrong address",
"deployment config",
"environment variable",
"env var",
}
)

# Ordered list of (category, keywords) — security is first because it is the
# highest-priority category; a finding that matches both security and
# correctness keywords is a security finding.
_CATEGORY_RULES: tuple[tuple[str, frozenset[str]], ...] = (
("security", _SECURITY_KEYWORDS),
("correctness", _CORRECTNESS_KEYWORDS),
("compatibility", _COMPATIBILITY_KEYWORDS),
("testing", _TESTING_KEYWORDS),
("configuration", _CONFIGURATION_KEYWORDS),
)


def _classify_category(description: str) -> str | None:
"""Return the category for *description* based on keyword matching.

Checks each category's keywords in priority order (security first, then
correctness, compatibility, testing, configuration). Returns the first
matching category name, or ``None`` if no keywords match.

Case-insensitive substring match.
"""
lower = description.lower()
for keyword in _SECURITY_KEYWORDS:
if keyword in lower:
return "security"
for category, keywords in _CATEGORY_RULES:
for keyword in keywords:
if keyword in lower:
return category
return None


# Backward-compatibility alias — callers that imported _detect_security_category
# directly continue to work unchanged.
_detect_security_category = _classify_category


# ---------------------------------------------------------------------------
# Field-level key normalization (74-REQ-2.4)
# ---------------------------------------------------------------------------
Expand Down Expand Up @@ -152,7 +248,7 @@ def parse_review_findings(
req_ref = obj.get("requirement_ref")
if isinstance(req_ref, str):
req_ref = truncate_field(req_ref, max_length=MAX_REF_LENGTH, field_name="finding.requirement_ref")
category = _detect_security_category(description)
category = _classify_category(description)
results.append(
ReviewFinding(
id=str(uuid.uuid4()),
Expand Down
101 changes: 101 additions & 0 deletions tests/unit/session/test_review_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from pathlib import Path

from agent_fox.session.review_parser import (
_classify_category,
parse_auditor_output,
parse_legacy_review_md,
parse_legacy_verification_md,
Expand Down Expand Up @@ -417,3 +418,103 @@ def test_empty_audit_array(self) -> None:
assert result is not None
assert result.overall_verdict == "PASS"
assert len(result.entries) == 0


# ---------------------------------------------------------------------------
# Multi-category classification (issue #485)
# ---------------------------------------------------------------------------


class TestClassifyCategory:
"""_classify_category() returns the correct category for each keyword group."""

def test_security_keywords_return_security(self) -> None:
"""Security keywords produce category='security'."""
assert _classify_category("sql injection found in query builder") == "security"
assert _classify_category("Path traversal in file upload handler") == "security"
assert _classify_category("Remote code execution via deserialization") == "security"

def test_correctness_keywords_return_correctness(self) -> None:
"""Correctness keywords produce category='correctness'."""
assert _classify_category("Wrong behavior when input is empty") == "correctness"
assert _classify_category("This is a logic error in the parser") == "correctness"
assert _classify_category("Missing functionality: no retry on 429") == "correctness"

def test_compatibility_keywords_return_compatibility(self) -> None:
"""Compatibility keywords produce category='compatibility'."""
assert _classify_category("API mismatch between client and server") == "compatibility"
assert _classify_category("Proto field disagreement in response") == "compatibility"
assert _classify_category("Breaking change in the public interface") == "compatibility"

def test_testing_keywords_return_testing(self) -> None:
"""Testing keywords produce category='testing'."""
assert _classify_category("Missing test for error path") == "testing"
assert _classify_category("No test coverage for edge cases") == "testing"
assert _classify_category("The function is untested") == "testing"

def test_configuration_keywords_return_configuration(self) -> None:
"""Configuration keywords produce category='configuration'."""
assert _classify_category("Wrong port configured for service") == "configuration"
assert _classify_category("Missing config for database connection") == "configuration"
assert _classify_category("Misconfigured environment variable") == "configuration"

def test_no_match_returns_none(self) -> None:
"""Descriptions with no recognized keywords return None."""
assert _classify_category("The code looks fine overall") is None
assert _classify_category("General observation about code style") is None

def test_security_takes_priority_over_correctness(self) -> None:
"""Security is detected even when correctness keywords also appear."""
result = _classify_category("Wrong behavior due to sql injection vulnerability")
assert result == "security"

def test_case_insensitive(self) -> None:
"""Keyword matching is case-insensitive."""
assert _classify_category("MISSING TEST for this function") == "testing"
assert _classify_category("SQL INJECTION risk") == "security"


class TestCategoryPopulatedInFindings:
"""parse_review_output() populates category on each ReviewFinding."""

def test_security_finding_has_category(self) -> None:
"""Finding with security keyword has category='security'."""
response = '{"findings": [{"severity": "critical", "description": "SQL injection in login form"}]}'
findings = parse_review_output(response, "s", "1", "sid")
assert len(findings) == 1
assert findings[0].category == "security"

def test_correctness_finding_has_category(self) -> None:
"""Finding with correctness keyword has category='correctness'."""
response = '{"findings": [{"severity": "major", "description": "Wrong behavior when retrying"}]}'
findings = parse_review_output(response, "s", "1", "sid")
assert len(findings) == 1
assert findings[0].category == "correctness"

def test_testing_finding_has_category(self) -> None:
"""Finding with testing keyword has category='testing'."""
response = '{"findings": [{"severity": "minor", "description": "Missing test for timeout path"}]}'
findings = parse_review_output(response, "s", "1", "sid")
assert len(findings) == 1
assert findings[0].category == "testing"

def test_compatibility_finding_has_category(self) -> None:
"""Finding with compatibility keyword has category='compatibility'."""
response = '{"findings": [{"severity": "major", "description": "API mismatch in response schema"}]}'
findings = parse_review_output(response, "s", "1", "sid")
assert len(findings) == 1
assert findings[0].category == "compatibility"

def test_configuration_finding_has_category(self) -> None:
"""Finding with configuration keyword has category='configuration'."""
response = '{"findings": [{"severity": "major", "description": "Wrong port in deployment config"}]}'
findings = parse_review_output(response, "s", "1", "sid")
assert len(findings) == 1
assert findings[0].category == "configuration"

def test_unclassified_finding_has_none_category(self) -> None:
"""Finding with no recognized keywords has category=None."""
response = '{"findings": [{"severity": "observation", "description": "Consider renaming this variable"}]}'
findings = parse_review_output(response, "s", "1", "sid")
assert len(findings) == 1
assert findings[0].category is None