diff --git a/schemas/policy_spec.schema.json b/schemas/policy_spec.schema.json index aa549ec..dbb1daf 100644 --- a/schemas/policy_spec.schema.json +++ b/schemas/policy_spec.schema.json @@ -22,6 +22,13 @@ "requires": { "$ref": "#/$defs/predicate_list" }, + "actions": { + "type": "array", + "items": { + "type": "string", + "minLength": 1 + } + }, "enforcement": { "type": "string", "enum": [ diff --git a/src/cak/specs.py b/src/cak/specs.py index bbdfe51..fe4193d 100644 --- a/src/cak/specs.py +++ b/src/cak/specs.py @@ -59,6 +59,17 @@ class PolicySpec: when: tuple[str, ...] enforcement: str proof_level: str | None = None + # Optional action scope: empty = global; entries are exact action names + # or "prefix.*" globs (same matching as capabilities). + actions: tuple[str, ...] = () + + def applies_to(self, action: str) -> bool: + if not self.actions: + return True + return any( + entry == action or (entry.endswith(".*") and action.startswith(entry[:-1])) + for entry in self.actions + ) @dataclass(frozen=True, slots=True) @@ -150,6 +161,7 @@ def load_config(data: dict[str, Any]) -> GatewayConfig: when=_str_tuple(_require(raw, "when", f"policy {policy_id}"), policy_id), enforcement=enforcement, proof_level=raw.get("proof_level"), + actions=_str_tuple(raw.get("actions"), f"policy {policy_id}"), ) ) diff --git a/src/cak/verifier.py b/src/cak/verifier.py index 481324c..3fc3e41 100644 --- a/src/cak/verifier.py +++ b/src/cak/verifier.py @@ -119,8 +119,13 @@ def verify(config: GatewayConfig, proposal: Proposal) -> Decision: fired: list[str] = [] enforcement = "allow" for policy in config.policies: + if not policy.applies_to(proposal.action): + continue results = evaluate_all(list(policy.when), proposal.arguments) - if results and all(truth is Truth.TRUE for truth in results.values()): + # Empty `when` = unconditional (logical AND over the empty set): + # an action-scoped policy with no predicates always fires for its + # actions. Both cold drafters in exp-004 assumed this independently. + if all(truth is Truth.TRUE for truth in results.values()): fired.append(policy.id) reasons.append(f"policy '{policy.id}' fired -> {policy.enforcement}") if _SEVERITY[policy.enforcement] > _SEVERITY[enforcement]: diff --git a/tests/test_verifier.py b/tests/test_verifier.py index e995149..9f1902d 100644 --- a/tests/test_verifier.py +++ b/tests/test_verifier.py @@ -47,6 +47,69 @@ def test_nonpositive_amount_blocked_by_precondition(config: GatewayConfig) -> No assert any("precondition failed: amount > 0" in reason for reason in decision.reasons) +def test_action_scoped_policy_fires_only_for_its_action() -> None: + from cak.specs import load_config + + scoped = load_config( + { + "actions": [ + {"name": "tickets.delete_ticket", "required_params": ["ticket_id"]}, + {"name": "tickets.close_ticket", "required_params": ["ticket_id"]}, + {"name": "billing.refund_order", "required_params": ["amount"]}, + ], + "policies": [ + {"id": "p.never_delete", "name": "never_delete", + "when": ["ticket_id.present"], "enforcement": "block", + "actions": ["tickets.delete_ticket"]}, + {"id": "p.ticket_ns_warn", "name": "ticket_ns_warn", + "when": ["ticket_id.present"], "enforcement": "warn", + "actions": ["tickets.*"]}, + ], + "capabilities": {"support-agent": ["tickets.*", "billing.*"]}, + } + ) + deleted = verify(scoped, Proposal("support-agent", "tickets.delete_ticket", + {"ticket_id": "tk_001"})) + assert deleted.enforcement == "block" + assert deleted.fired_policies == ("p.never_delete", "p.ticket_ns_warn") + + closed = verify(scoped, Proposal("support-agent", "tickets.close_ticket", + {"ticket_id": "tk_001"})) + assert closed.enforcement == "warn" + assert closed.fired_policies == ("p.ticket_ns_warn",) + + refund = verify(scoped, Proposal("support-agent", "billing.refund_order", + {"amount": 10, "ticket_id": "tk_001"})) + assert refund.enforcement == "allow" + assert refund.fired_policies == () + + +def test_empty_when_is_unconditional() -> None: + from cak.specs import load_config + + unconditional = load_config( + { + "actions": [ + {"name": "deploy.delete_environment", "required_params": ["environment"]}, + {"name": "deploy.create_release", "required_params": ["service"]}, + ], + "policies": [ + {"id": "p.never_delete", "name": "never_delete", "when": [], + "enforcement": "block", "actions": ["deploy.delete_environment"]}, + ], + "capabilities": {"release-agent": ["deploy.*"]}, + } + ) + deleted = verify(unconditional, Proposal("release-agent", "deploy.delete_environment", + {"environment": "staging"})) + assert deleted.enforcement == "block" + assert deleted.fired_policies == ("p.never_delete",) + + created = verify(unconditional, Proposal("release-agent", "deploy.create_release", + {"service": "api"})) + assert created.enforcement == "allow" + + def test_strictest_policy_wins() -> None: from cak.specs import load_config