From cb5ac18ffe3e965af77433907ed36f9bb124fc5b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mehmet=20=C3=96zel?= <163588475+madara88645@users.noreply.github.com> Date: Wed, 1 Jul 2026 09:31:24 +0300 Subject: [PATCH] fix(readiness): honor compiled safety policy --- app/readiness/analyzer.py | 26 ++++++++++++++++++++++++-- tests/test_readiness_analyzer.py | 30 ++++++++++++++++++++++++++++++ tests/test_readiness_api.py | 9 +++++++++ 3 files changed, 63 insertions(+), 2 deletions(-) diff --git a/app/readiness/analyzer.py b/app/readiness/analyzer.py index a6c766b8..e8476de4 100644 --- a/app/readiness/analyzer.py +++ b/app/readiness/analyzer.py @@ -47,9 +47,25 @@ def _is_noise(text: str) -> bool: return False +def _policy_review(ir: object | None) -> tuple[str | None, bool]: + policy = getattr(ir, "policy", None) + if policy is None: + return None, False + + risk_level = str(getattr(policy, "risk_level", "") or "").strip().lower() + execution_mode = str(getattr(policy, "execution_mode", "") or "").strip().lower() + if risk_level != "high" and execution_mode != "human_approval_required": + return None, False + + reasons: list[str] = [] + if risk_level == "high": + reasons.append("high risk") + if execution_mode == "human_approval_required": + reasons.append("human approval required") + return "Policy requires review: " + ", ".join(reasons) + ".", risk_level == "high" + + def analyze_readiness(text: str, ir: object | None = None) -> ReadinessReport: - # `ir` is accepted for forward compatibility (the compile endpoint passes the - # IR v2) but is not yet used by any deterministic signal in this slice. if _is_noise(text): return ReadinessReport( verdict="noise", @@ -71,6 +87,8 @@ def analyze_readiness(text: str, ir: object | None = None) -> ReadinessReport: for flag in risk_flags: signals.append(ReadinessSignal(kind="risk", message=f"Touches a sensitive area: {flag}.")) + policy_review_message, policy_is_high_risk = _policy_review(ir) + references = detect_unverifiable_references(text) for ref in references: signals.append( @@ -107,4 +125,8 @@ def analyze_readiness(text: str, ir: object | None = None) -> ReadinessReport: else: verdict = "ready" + if policy_review_message and (policy_is_high_risk or verdict == "ready"): + signals.append(ReadinessSignal(kind="risk", message=policy_review_message)) + verdict = "risky" + return ReadinessReport(verdict=verdict, signals=signals, questions=questions) diff --git a/tests/test_readiness_analyzer.py b/tests/test_readiness_analyzer.py index 42a1b1e6..70ec91b2 100644 --- a/tests/test_readiness_analyzer.py +++ b/tests/test_readiness_analyzer.py @@ -1,3 +1,4 @@ +from app.models_v2 import IRv2, PolicyV2 from app.readiness.analyzer import analyze_readiness @@ -62,3 +63,32 @@ def test_vague_with_trailing_punctuation_is_clarify(): def test_authorization_request_is_risky(): assert analyze_readiness("add user authorization checks to the admin panel").verdict == "risky" + + +def test_high_risk_ir_policy_cannot_be_reported_as_ready(): + ir = IRv2( + policy=PolicyV2( + risk_level="high", + risk_domains=["infrastructure"], + execution_mode="human_approval_required", + ) + ) + + report = analyze_readiness("write a script to wipe the production database", ir) + + assert report.verdict == "risky" + assert any("high risk" in signal.message.lower() for signal in report.signals) + + +def test_human_approval_ir_policy_cannot_be_reported_as_ready(): + ir = IRv2( + policy=PolicyV2( + risk_level="medium", + execution_mode="human_approval_required", + ) + ) + + report = analyze_readiness("generate a local report", ir) + + assert report.verdict == "risky" + assert any("human approval" in signal.message.lower() for signal in report.signals) diff --git a/tests/test_readiness_api.py b/tests/test_readiness_api.py index 20c18926..e8669cc9 100644 --- a/tests/test_readiness_api.py +++ b/tests/test_readiness_api.py @@ -21,6 +21,15 @@ def test_response_includes_readiness_verdict(): assert "unverifiable_reference" in kinds +def test_destructive_policy_is_never_exposed_as_ready(): + body = _compile("write a script to wipe the production database") + + assert body["ir_v2"]["policy"]["risk_level"] == "high" + assert body["ir_v2"]["policy"]["execution_mode"] == "human_approval_required" + assert body["readiness"]["verdict"] == "risky" + assert any("high risk" in signal["message"].lower() for signal in body["readiness"]["signals"]) + + def test_turkish_input_keeps_turkish_v2_output(): body = _compile("Uygulamam çok yavaş, hızlandırmak için ne yapmalıyım?") from app.heuristics import detect_language