From 3bb9e674c45e5feac5a42b1f85fb5296508d3598 Mon Sep 17 00:00:00 2001 From: "Vitaly D." Date: Fri, 12 Jun 2026 08:20:30 +0300 Subject: [PATCH 1/2] =?UTF-8?q?feat(v0.1):=20optional=20action=20scope=20o?= =?UTF-8?q?n=20PolicySpec=20=E2=80=94=20exp-004=20design=20finding?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- schemas/policy_spec.schema.json | 7 +++++++ src/cak/specs.py | 12 +++++++++++ src/cak/verifier.py | 2 ++ tests/test_verifier.py | 37 +++++++++++++++++++++++++++++++++ 4 files changed, 58 insertions(+) diff --git a/schemas/policy_spec.schema.json b/schemas/policy_spec.schema.json index aa549ec..dbb1daf 100644 --- a/schemas/policy_spec.schema.json +++ b/schemas/policy_spec.schema.json @@ -22,6 +22,13 @@ "requires": { "$ref": "#/$defs/predicate_list" }, + "actions": { + "type": "array", + "items": { + "type": "string", + "minLength": 1 + } + }, "enforcement": { "type": "string", "enum": [ diff --git a/src/cak/specs.py b/src/cak/specs.py index bbdfe51..fe4193d 100644 --- a/src/cak/specs.py +++ b/src/cak/specs.py @@ -59,6 +59,17 @@ class PolicySpec: when: tuple[str, ...] enforcement: str proof_level: str | None = None + # Optional action scope: empty = global; entries are exact action names + # or "prefix.*" globs (same matching as capabilities). + actions: tuple[str, ...] = () + + def applies_to(self, action: str) -> bool: + if not self.actions: + return True + return any( + entry == action or (entry.endswith(".*") and action.startswith(entry[:-1])) + for entry in self.actions + ) @dataclass(frozen=True, slots=True) @@ -150,6 +161,7 @@ def load_config(data: dict[str, Any]) -> GatewayConfig: when=_str_tuple(_require(raw, "when", f"policy {policy_id}"), policy_id), enforcement=enforcement, proof_level=raw.get("proof_level"), + actions=_str_tuple(raw.get("actions"), f"policy {policy_id}"), ) ) diff --git a/src/cak/verifier.py b/src/cak/verifier.py index 481324c..91e0b76 100644 --- a/src/cak/verifier.py +++ b/src/cak/verifier.py @@ -119,6 +119,8 @@ def verify(config: GatewayConfig, proposal: Proposal) -> Decision: fired: list[str] = [] enforcement = "allow" for policy in config.policies: + if not policy.applies_to(proposal.action): + continue results = evaluate_all(list(policy.when), proposal.arguments) if results and all(truth is Truth.TRUE for truth in results.values()): fired.append(policy.id) diff --git a/tests/test_verifier.py b/tests/test_verifier.py index e995149..4f509ed 100644 --- a/tests/test_verifier.py +++ b/tests/test_verifier.py @@ -47,6 +47,43 @@ def test_nonpositive_amount_blocked_by_precondition(config: GatewayConfig) -> No assert any("precondition failed: amount > 0" in reason for reason in decision.reasons) +def test_action_scoped_policy_fires_only_for_its_action() -> None: + from cak.specs import load_config + + scoped = load_config( + { + "actions": [ + {"name": "tickets.delete_ticket", "required_params": ["ticket_id"]}, + {"name": "tickets.close_ticket", "required_params": ["ticket_id"]}, + {"name": "billing.refund_order", "required_params": ["amount"]}, + ], + "policies": [ + {"id": "p.never_delete", "name": "never_delete", + "when": ["ticket_id.present"], "enforcement": "block", + "actions": ["tickets.delete_ticket"]}, + {"id": "p.ticket_ns_warn", "name": "ticket_ns_warn", + "when": ["ticket_id.present"], "enforcement": "warn", + "actions": ["tickets.*"]}, + ], + "capabilities": {"support-agent": ["tickets.*", "billing.*"]}, + } + ) + deleted = verify(scoped, Proposal("support-agent", "tickets.delete_ticket", + {"ticket_id": "tk_001"})) + assert deleted.enforcement == "block" + assert deleted.fired_policies == ("p.never_delete", "p.ticket_ns_warn") + + closed = verify(scoped, Proposal("support-agent", "tickets.close_ticket", + {"ticket_id": "tk_001"})) + assert closed.enforcement == "warn" + assert closed.fired_policies == ("p.ticket_ns_warn",) + + refund = verify(scoped, Proposal("support-agent", "billing.refund_order", + {"amount": 10, "ticket_id": "tk_001"})) + assert refund.enforcement == "allow" + assert refund.fired_policies == () + + def test_strictest_policy_wins() -> None: from cak.specs import load_config From edbed320cc41185673452313336f58c4c14f510c Mon Sep 17 00:00:00 2001 From: "Vitaly D." Date: Fri, 12 Jun 2026 08:30:34 +0300 Subject: [PATCH 2/2] =?UTF-8?q?fix(v0.1):=20empty=20policy=20when=20is=20u?= =?UTF-8?q?nconditional=20=E2=80=94=20second=20exp-004=20finding?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/cak/verifier.py | 5 ++++- tests/test_verifier.py | 26 ++++++++++++++++++++++++++ 2 files changed, 30 insertions(+), 1 deletion(-) diff --git a/src/cak/verifier.py b/src/cak/verifier.py index 91e0b76..3fc3e41 100644 --- a/src/cak/verifier.py +++ b/src/cak/verifier.py @@ -122,7 +122,10 @@ def verify(config: GatewayConfig, proposal: Proposal) -> Decision: if not policy.applies_to(proposal.action): continue results = evaluate_all(list(policy.when), proposal.arguments) - if results and all(truth is Truth.TRUE for truth in results.values()): + # Empty `when` = unconditional (logical AND over the empty set): + # an action-scoped policy with no predicates always fires for its + # actions. Both cold drafters in exp-004 assumed this independently. + if all(truth is Truth.TRUE for truth in results.values()): fired.append(policy.id) reasons.append(f"policy '{policy.id}' fired -> {policy.enforcement}") if _SEVERITY[policy.enforcement] > _SEVERITY[enforcement]: diff --git a/tests/test_verifier.py b/tests/test_verifier.py index 4f509ed..9f1902d 100644 --- a/tests/test_verifier.py +++ b/tests/test_verifier.py @@ -84,6 +84,32 @@ def test_action_scoped_policy_fires_only_for_its_action() -> None: assert refund.fired_policies == () +def test_empty_when_is_unconditional() -> None: + from cak.specs import load_config + + unconditional = load_config( + { + "actions": [ + {"name": "deploy.delete_environment", "required_params": ["environment"]}, + {"name": "deploy.create_release", "required_params": ["service"]}, + ], + "policies": [ + {"id": "p.never_delete", "name": "never_delete", "when": [], + "enforcement": "block", "actions": ["deploy.delete_environment"]}, + ], + "capabilities": {"release-agent": ["deploy.*"]}, + } + ) + deleted = verify(unconditional, Proposal("release-agent", "deploy.delete_environment", + {"environment": "staging"})) + assert deleted.enforcement == "block" + assert deleted.fired_policies == ("p.never_delete",) + + created = verify(unconditional, Proposal("release-agent", "deploy.create_release", + {"service": "api"})) + assert created.enforcement == "allow" + + def test_strictest_policy_wins() -> None: from cak.specs import load_config