diff --git a/capiscio_sdk/__init__.py b/capiscio_sdk/__init__.py index afaf959..d467831 100644 --- a/capiscio_sdk/__init__.py +++ b/capiscio_sdk/__init__.py @@ -66,6 +66,23 @@ # Event emission from .events import EventEmitter +# RFC-005: PIP request/response types +from .pip import ( + PIPRequest, + PIPResponse, + SubjectAttributes, + ActionAttributes, + ResourceAttributes, + ContextAttributes, + EnvironmentAttributes, + Obligation as PIPObligation, + EnforcementMode, + PIP_VERSION, + DECISION_ALLOW, + DECISION_DENY, + DECISION_OBSERVE, +) + __all__ = [ "__version__", # Security middleware @@ -119,5 +136,19 @@ "AgentIdentity", # Event emission "EventEmitter", + # RFC-005: PIP types + "PIPRequest", + "PIPResponse", + "SubjectAttributes", + "ActionAttributes", + "ResourceAttributes", + "ContextAttributes", + "EnvironmentAttributes", + "PIPObligation", + "EnforcementMode", + "PIP_VERSION", + "DECISION_ALLOW", + "DECISION_DENY", + "DECISION_OBSERVE", ] diff --git a/capiscio_sdk/pip.py b/capiscio_sdk/pip.py new file mode 100644 index 0000000..49aa3dd --- /dev/null +++ b/capiscio_sdk/pip.py @@ -0,0 +1,377 @@ +"""RFC-005: PIP Request Builder — SDK convenience types. + +Provides dataclasses for constructing PDP Integration Profile (PIP) requests +and interpreting responses. These are thin data structures with serialization — +no business logic, no PDP client, no enforcement. Intended for SDK consumers +building custom PEP integrations. + +For the thin PDP client (Option B, delegating to Go core), see +``capiscio_mcp.pip.PolicyClient``. + +Usage:: + + from capiscio_sdk.pip import ( + PIPRequest, SubjectAttributes, ActionAttributes, + ResourceAttributes, ContextAttributes, EnvironmentAttributes, + PIPResponse, Obligation, EnforcementMode, + ) + + request = PIPRequest( + subject=SubjectAttributes( + did="did:web:example.com:agents:bot", + badge_jti="badge-session-id", + ial="IAL-1", + trust_level="2", + ), + action=ActionAttributes(operation="tools/call"), + resource=ResourceAttributes(identifier="database://prod/users"), + context=ContextAttributes( + txn_id="019471a2-...", + enforcement_mode="EM-OBSERVE", + ), + ) + payload = request.to_dict() +""" + +from __future__ import annotations + +import os +import uuid +from dataclasses import dataclass, field +from datetime import datetime, timezone +from enum import Enum +from typing import Any, Dict, List, Optional, Union + + +# --------------------------------------------------------------------------- +# Constants +# --------------------------------------------------------------------------- + +PIP_VERSION = "capiscio.pip.v1" +"""Protocol version identifier. PEPs MUST include this in every request.""" + +DECISION_ALLOW = "ALLOW" +DECISION_DENY = "DENY" +DECISION_OBSERVE = "ALLOW_OBSERVE" +"""PEP-only telemetry value for EM-OBSERVE fallback on PDP unavailability.""" + + +# --------------------------------------------------------------------------- +# Enforcement Mode +# --------------------------------------------------------------------------- + + +class EnforcementMode(Enum): + """PEP enforcement strictness level (RFC-008 §10.5 total order). + + Values: + OBSERVE: Log only, never block. + GUARD: Block on verification failure, log PDP denials. + DELEGATE: Block on verification + PDP deny, best-effort obligations. + STRICT: Block on everything including obligation failures. + """ + + OBSERVE = "EM-OBSERVE" + GUARD = "EM-GUARD" + DELEGATE = "EM-DELEGATE" + STRICT = "EM-STRICT" + + def stricter_than(self, other: EnforcementMode) -> bool: + """Return True if this mode is stricter than *other*.""" + order = list(EnforcementMode) + return order.index(self) > order.index(other) + + @classmethod + def from_env(cls) -> EnforcementMode: + """Read enforcement mode from ``CAPISCIO_ENFORCEMENT_MODE``. + + Returns ``OBSERVE`` (the safe rollout default) when the variable + is unset or empty. + + Raises: + ValueError: If the variable is set but not a recognised mode. + """ + val = os.environ.get("CAPISCIO_ENFORCEMENT_MODE", "") + if not val: + return cls.OBSERVE + try: + return cls(val) + except ValueError: + valid = ", ".join(m.value for m in cls) + raise ValueError( + f"Unknown enforcement mode: {val!r} (valid: {valid})" + ) from None + + +# --------------------------------------------------------------------------- +# Request types (RFC-005 §5) +# --------------------------------------------------------------------------- + + +@dataclass +class SubjectAttributes: + """Identifies the acting agent (RFC-005 §5.1). + + Attributes: + did: Agent DID from badge ``sub`` claim. + badge_jti: Badge ``jti`` claim. + ial: Identity Assurance Level (e.g. ``"IAL-1"``). + trust_level: Badge trust level string (e.g. ``"1"``, ``"2"``, ``"3"``). + """ + + did: str = "" + badge_jti: str = "" + ial: str = "" + trust_level: str = "" + + def to_dict(self) -> Dict[str, str]: + return { + "did": self.did, + "badge_jti": self.badge_jti, + "ial": self.ial, + "trust_level": self.trust_level, + } + + +@dataclass +class ActionAttributes: + """Identifies what is being attempted (RFC-005 §5.1). + + Attributes: + operation: Tool name, HTTP method+route, etc. + capability_class: ``None`` in badge-only mode (RFC-008). + """ + + operation: str = "" + capability_class: Optional[str] = None + + def to_dict(self) -> Dict[str, Any]: + return { + "capability_class": self.capability_class, + "operation": self.operation, + } + + +@dataclass +class ResourceAttributes: + """Identifies the target resource (RFC-005 §5.1). + + Attributes: + identifier: Target resource URI. + """ + + identifier: str = "" + + def to_dict(self) -> Dict[str, str]: + return {"identifier": self.identifier} + + +@dataclass +class ContextAttributes: + """Correlation and authority context (RFC-005 §5.1). + + Envelope-sourced fields (``envelope_id``, ``delegation_depth``, + ``constraints``, ``parent_constraints``) MUST be ``None`` in + badge-only mode. They serialise as JSON ``null``, not absent keys. + + Attributes: + txn_id: Transaction correlation ID (UUID v7 recommended). + enforcement_mode: PEP-level enforcement mode string. + hop_id: Optional hop attestation ID. + envelope_id: ``None`` until RFC-008. + delegation_depth: ``None`` until RFC-008. + constraints: ``None`` until RFC-008. + parent_constraints: ``None`` until RFC-008. + """ + + txn_id: str = "" + enforcement_mode: Union[str, EnforcementMode] = "EM-OBSERVE" + hop_id: Optional[str] = None + envelope_id: Optional[str] = None + delegation_depth: Optional[int] = None + constraints: Optional[Any] = None + parent_constraints: Optional[Any] = None + + def __post_init__(self) -> None: + if isinstance(self.enforcement_mode, EnforcementMode): + self.enforcement_mode = self.enforcement_mode.value + + def to_dict(self) -> Dict[str, Any]: + return { + "txn_id": self.txn_id, + "hop_id": self.hop_id, + "envelope_id": self.envelope_id, + "delegation_depth": self.delegation_depth, + "constraints": self.constraints, + "parent_constraints": self.parent_constraints, + "enforcement_mode": self.enforcement_mode, + } + + +@dataclass +class EnvironmentAttributes: + """PEP runtime context (RFC-005 §5.1). + + Attributes: + workspace: Optional workspace / tenant identifier. + pep_id: Optional PEP instance identifier. + time: ISO 8601 timestamp (RECOMMENDED). Auto-populated by + :meth:`PIPRequest.to_dict` if not set. + """ + + workspace: Optional[str] = None + pep_id: Optional[str] = None + time: Optional[str] = None + + def to_dict(self) -> Dict[str, Optional[str]]: + d: Dict[str, Optional[str]] = {} + if self.workspace is not None: + d["workspace"] = self.workspace + if self.pep_id is not None: + d["pep_id"] = self.pep_id + if self.time is not None: + d["time"] = self.time + return d + + +@dataclass +class PIPRequest: + """RFC-005 §5 Decision Request. + + Attributes: + subject: Agent identity attributes. + action: Attempted operation. + resource: Target resource. + context: Correlation / authority context. + environment: PEP runtime context. + pip_version: Protocol version (auto-set). + """ + + subject: SubjectAttributes = field(default_factory=SubjectAttributes) + action: ActionAttributes = field(default_factory=ActionAttributes) + resource: ResourceAttributes = field(default_factory=ResourceAttributes) + context: ContextAttributes = field(default_factory=ContextAttributes) + environment: EnvironmentAttributes = field(default_factory=EnvironmentAttributes) + pip_version: str = PIP_VERSION + + def to_dict(self) -> Dict[str, Any]: + """Serialise to PIP wire format (JSON-compatible dict). + + Automatically populates ``context.txn_id`` (UUID v7 via + :func:`uuid.uuid7` when available, else :func:`uuid.uuid4`) + and ``environment.time`` (ISO 8601 UTC) if not already set. + """ + # Auto-populate txn_id if empty + ctx = self.context + if not ctx.txn_id: + ctx.txn_id = _generate_uuid7() + + # Auto-populate environment time if missing + env = self.environment + if env.time is None: + env.time = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") + + return { + "pip_version": self.pip_version, + "subject": self.subject.to_dict(), + "action": self.action.to_dict(), + "resource": self.resource.to_dict(), + "context": ctx.to_dict(), + "environment": env.to_dict(), + } + + +# --------------------------------------------------------------------------- +# Response types (RFC-005 §6) +# --------------------------------------------------------------------------- + + +@dataclass +class Obligation: + """A conditional contract returned by the PDP (RFC-005 §7.1). + + Attributes: + type: Obligation type (e.g. ``"rate_limit"``, ``"audit_log"``). + params: Opaque parameters dictionary. ``None`` serialises as + JSON ``null``. + """ + + type: str = "" + params: Optional[Dict[str, Any]] = None + + def to_dict(self) -> Dict[str, Any]: + return { + "type": self.type, + "params": self.params, + } + + @classmethod + def from_dict(cls, d: Dict[str, Any]) -> Obligation: + params = d.get("params") + if params is not None and not isinstance(params, dict): + params = None + return cls(type=d.get("type", ""), params=params) + + +@dataclass +class PIPResponse: + """RFC-005 §6.1 Decision Response. + + Attributes: + decision: ``"ALLOW"`` or ``"DENY"`` (PDP values only). + decision_id: Globally unique decision identifier. + obligations: List of obligations to enforce. + reason: Optional human-readable explanation. + ttl: Optional cache lifetime in seconds. + """ + + decision: str = "" + decision_id: str = "" + obligations: List[Obligation] = field(default_factory=list) + reason: str = "" + ttl: Optional[int] = None + + @property + def is_allow(self) -> bool: + return self.decision == DECISION_ALLOW + + @property + def is_deny(self) -> bool: + return self.decision == DECISION_DENY + + def to_dict(self) -> Dict[str, Any]: + d: Dict[str, Any] = { + "decision": self.decision, + "decision_id": self.decision_id, + "obligations": [o.to_dict() for o in self.obligations], + } + if self.reason: + d["reason"] = self.reason + if self.ttl is not None: + d["ttl"] = self.ttl + return d + + @classmethod + def from_dict(cls, d: Dict[str, Any]) -> PIPResponse: + obligations_raw = d.get("obligations") or [] + obligations = [Obligation.from_dict(o) for o in obligations_raw if isinstance(o, dict)] + return cls( + decision=d.get("decision", ""), + decision_id=d.get("decision_id", ""), + obligations=obligations, + reason=d.get("reason", ""), + ttl=d.get("ttl"), + ) + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _generate_uuid7() -> str: + """Generate a UUID v7 string if available, falling back to UUID v4.""" + # Python 3.14+ has uuid.uuid7(), earlier versions need fallback + if hasattr(uuid, "uuid7"): + return str(uuid.uuid7()) # type: ignore[attr-defined] + return str(uuid.uuid4()) diff --git a/tests/unit/test_pip.py b/tests/unit/test_pip.py new file mode 100644 index 0000000..c83a86b --- /dev/null +++ b/tests/unit/test_pip.py @@ -0,0 +1,444 @@ +"""Tests for capiscio_sdk.pip — RFC-005 PIP request builder types.""" + +import json +import os +from unittest import mock + +import pytest + +from capiscio_sdk.pip import ( + DECISION_ALLOW, + DECISION_DENY, + DECISION_OBSERVE, + PIP_VERSION, + ActionAttributes, + ContextAttributes, + EnforcementMode, + EnvironmentAttributes, + Obligation, + PIPRequest, + PIPResponse, + ResourceAttributes, + SubjectAttributes, +) + + +# --------------------------------------------------------------------------- +# PIP_VERSION constant +# --------------------------------------------------------------------------- + + +class TestPIPVersion: + def test_value(self) -> None: + assert PIP_VERSION == "capiscio.pip.v1" + + +# --------------------------------------------------------------------------- +# Decision constants +# --------------------------------------------------------------------------- + + +class TestDecisionConstants: + def test_allow(self) -> None: + assert DECISION_ALLOW == "ALLOW" + + def test_deny(self) -> None: + assert DECISION_DENY == "DENY" + + def test_observe(self) -> None: + assert DECISION_OBSERVE == "ALLOW_OBSERVE" + + +# --------------------------------------------------------------------------- +# EnforcementMode +# --------------------------------------------------------------------------- + + +class TestEnforcementMode: + def test_values_match_rfc(self) -> None: + assert EnforcementMode.OBSERVE.value == "EM-OBSERVE" + assert EnforcementMode.GUARD.value == "EM-GUARD" + assert EnforcementMode.DELEGATE.value == "EM-DELEGATE" + assert EnforcementMode.STRICT.value == "EM-STRICT" + + def test_from_string(self) -> None: + assert EnforcementMode("EM-OBSERVE") == EnforcementMode.OBSERVE + assert EnforcementMode("EM-STRICT") == EnforcementMode.STRICT + + def test_from_string_invalid(self) -> None: + with pytest.raises(ValueError): + EnforcementMode("INVALID") + + def test_stricter_than_ordering(self) -> None: + assert EnforcementMode.STRICT.stricter_than(EnforcementMode.OBSERVE) + assert EnforcementMode.DELEGATE.stricter_than(EnforcementMode.GUARD) + assert EnforcementMode.GUARD.stricter_than(EnforcementMode.OBSERVE) + assert not EnforcementMode.OBSERVE.stricter_than(EnforcementMode.STRICT) + assert not EnforcementMode.OBSERVE.stricter_than(EnforcementMode.OBSERVE) + + def test_from_env_default(self) -> None: + with mock.patch.dict(os.environ, {}, clear=True): + # Remove the key entirely if present + os.environ.pop("CAPISCIO_ENFORCEMENT_MODE", None) + assert EnforcementMode.from_env() == EnforcementMode.OBSERVE + + def test_from_env_empty_string(self) -> None: + with mock.patch.dict(os.environ, {"CAPISCIO_ENFORCEMENT_MODE": ""}): + assert EnforcementMode.from_env() == EnforcementMode.OBSERVE + + def test_from_env_valid(self) -> None: + with mock.patch.dict(os.environ, {"CAPISCIO_ENFORCEMENT_MODE": "EM-STRICT"}): + assert EnforcementMode.from_env() == EnforcementMode.STRICT + + def test_from_env_invalid(self) -> None: + with mock.patch.dict(os.environ, {"CAPISCIO_ENFORCEMENT_MODE": "BOGUS"}): + with pytest.raises(ValueError, match="Unknown enforcement mode"): + EnforcementMode.from_env() + + def test_all_four_modes_exist(self) -> None: + assert len(EnforcementMode) == 4 + + +# --------------------------------------------------------------------------- +# SubjectAttributes +# --------------------------------------------------------------------------- + + +class TestSubjectAttributes: + def test_to_dict(self) -> None: + s = SubjectAttributes( + did="did:web:example.com:agents:bot", + badge_jti="badge-123", + ial="IAL-1", + trust_level="2", + ) + d = s.to_dict() + assert d == { + "did": "did:web:example.com:agents:bot", + "badge_jti": "badge-123", + "ial": "IAL-1", + "trust_level": "2", + } + + def test_defaults_are_empty_strings(self) -> None: + s = SubjectAttributes() + assert s.did == "" + assert s.badge_jti == "" + assert s.ial == "" + assert s.trust_level == "" + + +# --------------------------------------------------------------------------- +# ActionAttributes +# --------------------------------------------------------------------------- + + +class TestActionAttributes: + def test_to_dict_badge_only(self) -> None: + a = ActionAttributes(operation="tools/call") + d = a.to_dict() + assert d["operation"] == "tools/call" + assert d["capability_class"] is None + + def test_to_dict_with_capability(self) -> None: + a = ActionAttributes(operation="GET /v1/agents", capability_class="read") + d = a.to_dict() + assert d["capability_class"] == "read" + + +# --------------------------------------------------------------------------- +# ResourceAttributes +# --------------------------------------------------------------------------- + + +class TestResourceAttributes: + def test_to_dict(self) -> None: + r = ResourceAttributes(identifier="database://prod/users") + assert r.to_dict() == {"identifier": "database://prod/users"} + + +# --------------------------------------------------------------------------- +# ContextAttributes +# --------------------------------------------------------------------------- + + +class TestContextAttributes: + def test_to_dict_badge_only_nulls(self) -> None: + """Envelope fields MUST serialize as null, not absent.""" + c = ContextAttributes(txn_id="txn-1", enforcement_mode="EM-OBSERVE") + d = c.to_dict() + assert d["txn_id"] == "txn-1" + assert d["enforcement_mode"] == "EM-OBSERVE" + # Envelope fields present as None (→ JSON null) + assert "envelope_id" in d + assert d["envelope_id"] is None + assert "delegation_depth" in d + assert d["delegation_depth"] is None + assert "constraints" in d + assert d["constraints"] is None + assert "parent_constraints" in d + assert d["parent_constraints"] is None + + def test_enforcement_mode_enum_normalised(self) -> None: + """EnforcementMode enum should be normalised to its string value.""" + c = ContextAttributes( + txn_id="txn-enum", + enforcement_mode=EnforcementMode.STRICT, + ) + assert c.enforcement_mode == "EM-STRICT" + d = c.to_dict() + assert d["enforcement_mode"] == "EM-STRICT" + # Must be JSON-serialisable + j = json.dumps(d) + assert '"EM-STRICT"' in j + + def test_json_null_serialisation(self) -> None: + """Verify JSON output has explicit null values for envelope fields.""" + c = ContextAttributes(txn_id="t") + j = json.dumps(c.to_dict()) + parsed = json.loads(j) + assert parsed["envelope_id"] is None + assert parsed["constraints"] is None + assert parsed["parent_constraints"] is None + + +# --------------------------------------------------------------------------- +# EnvironmentAttributes +# --------------------------------------------------------------------------- + + +class TestEnvironmentAttributes: + def test_to_dict_all_set(self) -> None: + e = EnvironmentAttributes(workspace="prod", pep_id="pep-1", time="2026-03-20T10:00:00Z") + d = e.to_dict() + assert d == {"workspace": "prod", "pep_id": "pep-1", "time": "2026-03-20T10:00:00Z"} + + def test_to_dict_omits_none(self) -> None: + """Optional fields are omitted when None (not serialised as null).""" + e = EnvironmentAttributes() + d = e.to_dict() + assert "workspace" not in d + assert "pep_id" not in d + assert "time" not in d + + def test_to_dict_partial(self) -> None: + e = EnvironmentAttributes(workspace="staging") + d = e.to_dict() + assert d == {"workspace": "staging"} + assert "pep_id" not in d + + +# --------------------------------------------------------------------------- +# PIPRequest +# --------------------------------------------------------------------------- + + +class TestPIPRequest: + def test_default_pip_version(self) -> None: + r = PIPRequest() + assert r.pip_version == PIP_VERSION + + def test_to_dict_auto_populates_txn_id(self) -> None: + r = PIPRequest( + subject=SubjectAttributes(did="did:web:x"), + action=ActionAttributes(operation="GET /"), + resource=ResourceAttributes(identifier="/"), + context=ContextAttributes(enforcement_mode="EM-GUARD"), + ) + d = r.to_dict() + assert d["context"]["txn_id"], "txn_id should be auto-populated" + assert len(d["context"]["txn_id"]) == 36 # UUID format + + def test_to_dict_preserves_txn_id(self) -> None: + """If txn_id is already set, do not overwrite it.""" + r = PIPRequest( + context=ContextAttributes(txn_id="existing-txn-id"), + ) + d = r.to_dict() + assert d["context"]["txn_id"] == "existing-txn-id" + + def test_to_dict_auto_populates_time(self) -> None: + r = PIPRequest() + d = r.to_dict() + assert d["environment"]["time"] is not None + assert d["environment"]["time"].endswith("Z") + + def test_to_dict_preserves_time(self) -> None: + r = PIPRequest( + environment=EnvironmentAttributes(time="2026-01-01T00:00:00Z"), + ) + d = r.to_dict() + assert d["environment"]["time"] == "2026-01-01T00:00:00Z" + + def test_full_roundtrip(self) -> None: + """Full request → dict → JSON → dict matches structure.""" + r = PIPRequest( + subject=SubjectAttributes( + did="did:web:example.com:agents:bot", + badge_jti="badge-123", + ial="IAL-1", + trust_level="2", + ), + action=ActionAttributes(operation="tools/call"), + resource=ResourceAttributes(identifier="db://users"), + context=ContextAttributes( + txn_id="fixed-txn", + enforcement_mode="EM-DELEGATE", + ), + environment=EnvironmentAttributes( + workspace="prod", + pep_id="pep-1", + time="2026-03-20T12:00:00Z", + ), + ) + d = r.to_dict() + j = json.dumps(d) + parsed = json.loads(j) + + assert parsed["pip_version"] == PIP_VERSION + assert parsed["subject"]["did"] == "did:web:example.com:agents:bot" + assert parsed["action"]["capability_class"] is None + assert parsed["context"]["envelope_id"] is None + assert parsed["environment"]["workspace"] == "prod" + + def test_to_dict_idempotent(self) -> None: + """Calling to_dict() twice produces the same result.""" + r = PIPRequest( + context=ContextAttributes(txn_id="txn-fixed"), + environment=EnvironmentAttributes(time="2026-01-01T00:00:00Z"), + ) + d1 = r.to_dict() + d2 = r.to_dict() + assert d1 == d2 + + +# --------------------------------------------------------------------------- +# Obligation +# --------------------------------------------------------------------------- + + +class TestObligation: + def test_to_dict(self) -> None: + o = Obligation(type="rate_limit", params={"max_rps": 100}) + d = o.to_dict() + assert d == {"type": "rate_limit", "params": {"max_rps": 100}} + + def test_to_dict_null_params(self) -> None: + o = Obligation(type="audit_log") + d = o.to_dict() + assert d["params"] is None + + def test_from_dict(self) -> None: + o = Obligation.from_dict({"type": "audit_log", "params": {"level": "full"}}) + assert o.type == "audit_log" + assert o.params == {"level": "full"} + + def test_from_dict_no_params(self) -> None: + o = Obligation.from_dict({"type": "log"}) + assert o.type == "log" + assert o.params is None + + def test_from_dict_invalid_params_type(self) -> None: + """params must be a dict; non-dict values are dropped to None.""" + o = Obligation.from_dict({"type": "x", "params": "not-a-dict"}) + assert o.params is None + + def test_from_dict_empty(self) -> None: + o = Obligation.from_dict({}) + assert o.type == "" + assert o.params is None + + +# --------------------------------------------------------------------------- +# PIPResponse +# --------------------------------------------------------------------------- + + +class TestPIPResponse: + def test_is_allow(self) -> None: + r = PIPResponse(decision="ALLOW", decision_id="d1") + assert r.is_allow + assert not r.is_deny + + def test_is_deny(self) -> None: + r = PIPResponse(decision="DENY", decision_id="d2") + assert r.is_deny + assert not r.is_allow + + def test_to_dict(self) -> None: + r = PIPResponse( + decision="ALLOW", + decision_id="d1", + obligations=[Obligation(type="log")], + reason="ok", + ttl=60, + ) + d = r.to_dict() + assert d["decision"] == "ALLOW" + assert d["decision_id"] == "d1" + assert len(d["obligations"]) == 1 + assert d["reason"] == "ok" + assert d["ttl"] == 60 + + def test_to_dict_omits_empty_reason_and_ttl(self) -> None: + r = PIPResponse(decision="DENY", decision_id="d2") + d = r.to_dict() + assert "reason" not in d + assert "ttl" not in d + + def test_from_dict(self) -> None: + raw = { + "decision": "ALLOW", + "decision_id": "pdp-1", + "obligations": [ + {"type": "rate_limit", "params": {"max": 10}}, + {"type": "audit"}, + ], + "reason": "approved", + "ttl": 120, + } + r = PIPResponse.from_dict(raw) + assert r.decision == "ALLOW" + assert r.decision_id == "pdp-1" + assert len(r.obligations) == 2 + assert r.obligations[0].type == "rate_limit" + assert r.obligations[0].params == {"max": 10} + assert r.obligations[1].type == "audit" + assert r.reason == "approved" + assert r.ttl == 120 + + def test_from_dict_minimal(self) -> None: + r = PIPResponse.from_dict({"decision": "DENY", "decision_id": "x"}) + assert r.is_deny + assert r.obligations == [] + assert r.reason == "" + assert r.ttl is None + + def test_from_dict_rejects_non_dict_obligations(self) -> None: + """Non-dict entries in obligations list are skipped.""" + r = PIPResponse.from_dict({ + "decision": "ALLOW", + "decision_id": "y", + "obligations": [{"type": "ok"}, "bad", 42], + }) + assert len(r.obligations) == 1 + assert r.obligations[0].type == "ok" + + def test_roundtrip(self) -> None: + """PIPResponse → to_dict → JSON → from_dict roundtrip.""" + original = PIPResponse( + decision="ALLOW", + decision_id="d1", + obligations=[Obligation(type="rl", params={"rps": 100})], + reason="test", + ttl=300, + ) + j = json.dumps(original.to_dict()) + restored = PIPResponse.from_dict(json.loads(j)) + assert restored.decision == original.decision + assert restored.decision_id == original.decision_id + assert len(restored.obligations) == 1 + assert restored.obligations[0].type == "rl" + assert restored.obligations[0].params == {"rps": 100} + assert restored.reason == original.reason + assert restored.ttl == original.ttl