Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions schemas/policy_spec.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,13 @@
"requires": {
"$ref": "#/$defs/predicate_list"
},
"actions": {
"type": "array",
"items": {
"type": "string",
"minLength": 1
}
},
"enforcement": {
"type": "string",
"enum": [
Expand Down
12 changes: 12 additions & 0 deletions src/cak/specs.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,17 @@ class PolicySpec:
when: tuple[str, ...]
enforcement: str
proof_level: str | None = None
# Optional action scope: empty = global; entries are exact action names
# or "prefix.*" globs (same matching as capabilities).
actions: tuple[str, ...] = ()

def applies_to(self, action: str) -> bool:
if not self.actions:
return True
return any(
entry == action or (entry.endswith(".*") and action.startswith(entry[:-1]))
for entry in self.actions
)


@dataclass(frozen=True, slots=True)
Expand Down Expand Up @@ -150,6 +161,7 @@ def load_config(data: dict[str, Any]) -> GatewayConfig:
when=_str_tuple(_require(raw, "when", f"policy {policy_id}"), policy_id),
enforcement=enforcement,
proof_level=raw.get("proof_level"),
actions=_str_tuple(raw.get("actions"), f"policy {policy_id}"),
)
)

Expand Down
7 changes: 6 additions & 1 deletion src/cak/verifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,13 @@ def verify(config: GatewayConfig, proposal: Proposal) -> Decision:
fired: list[str] = []
enforcement = "allow"
for policy in config.policies:
if not policy.applies_to(proposal.action):
continue
results = evaluate_all(list(policy.when), proposal.arguments)
if results and all(truth is Truth.TRUE for truth in results.values()):
# Empty `when` = unconditional (logical AND over the empty set):
# an action-scoped policy with no predicates always fires for its
# actions. Both cold drafters in exp-004 assumed this independently.
if all(truth is Truth.TRUE for truth in results.values()):
fired.append(policy.id)
reasons.append(f"policy '{policy.id}' fired -> {policy.enforcement}")
if _SEVERITY[policy.enforcement] > _SEVERITY[enforcement]:
Expand Down
63 changes: 63 additions & 0 deletions tests/test_verifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,69 @@ def test_nonpositive_amount_blocked_by_precondition(config: GatewayConfig) -> No
assert any("precondition failed: amount > 0" in reason for reason in decision.reasons)


def test_action_scoped_policy_fires_only_for_its_action() -> None:
from cak.specs import load_config

scoped = load_config(
{
"actions": [
{"name": "tickets.delete_ticket", "required_params": ["ticket_id"]},
{"name": "tickets.close_ticket", "required_params": ["ticket_id"]},
{"name": "billing.refund_order", "required_params": ["amount"]},
],
"policies": [
{"id": "p.never_delete", "name": "never_delete",
"when": ["ticket_id.present"], "enforcement": "block",
"actions": ["tickets.delete_ticket"]},
{"id": "p.ticket_ns_warn", "name": "ticket_ns_warn",
"when": ["ticket_id.present"], "enforcement": "warn",
"actions": ["tickets.*"]},
],
"capabilities": {"support-agent": ["tickets.*", "billing.*"]},
}
)
deleted = verify(scoped, Proposal("support-agent", "tickets.delete_ticket",
{"ticket_id": "tk_001"}))
assert deleted.enforcement == "block"
assert deleted.fired_policies == ("p.never_delete", "p.ticket_ns_warn")

closed = verify(scoped, Proposal("support-agent", "tickets.close_ticket",
{"ticket_id": "tk_001"}))
assert closed.enforcement == "warn"
assert closed.fired_policies == ("p.ticket_ns_warn",)

refund = verify(scoped, Proposal("support-agent", "billing.refund_order",
{"amount": 10, "ticket_id": "tk_001"}))
assert refund.enforcement == "allow"
assert refund.fired_policies == ()


def test_empty_when_is_unconditional() -> None:
from cak.specs import load_config

unconditional = load_config(
{
"actions": [
{"name": "deploy.delete_environment", "required_params": ["environment"]},
{"name": "deploy.create_release", "required_params": ["service"]},
],
"policies": [
{"id": "p.never_delete", "name": "never_delete", "when": [],
"enforcement": "block", "actions": ["deploy.delete_environment"]},
],
"capabilities": {"release-agent": ["deploy.*"]},
}
)
deleted = verify(unconditional, Proposal("release-agent", "deploy.delete_environment",
{"environment": "staging"}))
assert deleted.enforcement == "block"
assert deleted.fired_policies == ("p.never_delete",)

created = verify(unconditional, Proposal("release-agent", "deploy.create_release",
{"service": "api"}))
assert created.enforcement == "allow"


def test_strictest_policy_wins() -> None:
from cak.specs import load_config

Expand Down
Loading