From d5b681872a9247c8f3a93073e8c019ae848c4b3e Mon Sep 17 00:00:00 2001 From: Carlos Hernandez Date: Tue, 16 Jun 2026 20:20:21 +0200 Subject: [PATCH 1/3] feat(audit): add optional external_execution_evidence on AuditEntry (#301) Introduce an optional, independently-signed execution receipt bound to an audit entry, distinct from response_payload_hash. response_payload_hash is what the gateway forwarded; external_execution_evidence is what an independent authority (for example a safety controller) attested. Confirmed direction: Option A. - chain.py: add the optional external_execution_evidence field to AuditEntry and an append() keyword. Serialized uniformly via asdict (null when absent), so receipt-less entries hash exactly as before and existing evidence keeps verifying. - schemas/audit-entry.schema.json: add the optional receipt object (issuer, issuer_key_id, signature, evidence_hash, evidence_type, linked_call_id), not in required so entries that predate the field still validate. - cmcp_verify: opt-in receipt verification. When external_evidence_keys is supplied, check linked_call_id == call_id and the issuer Ed25519 signature over the canonical receipt. Receipt-less entries and callers without keys are unaffected. - LIMITATIONS.md: state what the receipt does and does not prove. - conformance tests: absent verifies and keeps old hashing, populated verifies, tampered fails, linked_call_id mismatch fails, unknown issuer key fails. Scope note: this lands the data model, schema, verification, and tests. Proxy ingestion (how a controller receipt rides in the upstream response) and the industrial-embodied-ai example follow next; the transport convention is flagged for maintainer input. Pre-existing audit-entry.schema.json drift (detail, workflow_id, extra entry_type enum values) is noted and left out of scope. Signed-off-by: Carlos Hernandez --- LIMITATIONS.md | 3 + schemas/audit-entry.schema.json | 41 ++++++- src/cmcp_runtime/audit/chain.py | 8 ++ src/cmcp_verify/verify.py | 47 ++++++++ tests/conformance/test_audit_conformance.py | 126 ++++++++++++++++++++ 5 files changed, 224 insertions(+), 1 deletion(-) diff --git a/LIMITATIONS.md b/LIMITATIONS.md index 3708f16..d19bf67 100644 --- a/LIMITATIONS.md +++ b/LIMITATIONS.md @@ -16,6 +16,9 @@ The TEE-sealed signing key is generated inside the enclave and cannot be extract **Phase 2 completeness: server-side attestation** Phase 1 attests the gateway boundary. It does not attest what happens on the other side of that boundary. The `tool_transcript.hash` field in the TRACE Claim records a hash of the audit chain tip, but the tool transcript binding that ties a specific tool execution to a specific response is Phase 2 work. Phase 1 partially addresses P1.4 (transitive trust into upstream dependencies) and P4.1 (typosquatted packages added to catalog) -- both are fully closed by Phase 2. Any compliance claim that relies on server-side proof must wait for Phase 2. +**External execution evidence (issue #301)** +An audit entry may carry an optional `external_execution_evidence` receipt: a signature from an independent authority (for example a safety controller) attesting to an outcome, bound to a specific `call_id`. This is deliberately distinct from `response_payload_hash`, which records what the gateway forwarded. The receipt establishes that the named issuer signed an assertion about that call. It does not establish that a physical action occurred, that it was safe, or that it meets any functional-safety standard, and it is only as trustworthy as the issuer key behind it. cMCP does not observe the actuation; it records the receipt and, when a verifier is configured with the issuer trusted key, checks the signature and the `call_id` binding. Trust in the issuer key is an out-of-band PKI concern, the same shape as the manifest issuer trust anchor in issue #302. Verification is opt-in: receipt-less entries, and verifiers that do not configure issuer keys, are unaffected. + **LLM inference and model output** cMCP intercepts tool calls at the MCP protocol boundary. It does not observe or modify LLM inference, the contents of the agent's context window, or model outputs that do not produce a tool call. A model could hallucinate a response, leak sensitive context in a chat reply, or receive a poisoned tool response that influences subsequent reasoning -- none of these are visible to the gateway. cMCP controls the tool boundary, not the model boundary. diff --git a/schemas/audit-entry.schema.json b/schemas/audit-entry.schema.json index 37b4651..d143fbc 100644 --- a/schemas/audit-entry.schema.json +++ b/schemas/audit-entry.schema.json @@ -108,6 +108,45 @@ ], "description": "Result of response inspection by the gateway; null for non-tool-call entries." }, + "external_execution_evidence": { + "type": ["object", "null"], + "additionalProperties": false, + "description": "Optional independent execution evidence bound to this call (issue #301). Distinct from response_payload_hash: response_payload_hash is what the gateway forwarded, this is what an independent authority (e.g. a safety controller) attested. Null when absent. Intentionally not in 'required' so entries that predate the field still validate.", + "required": [ + "issuer", + "issuer_key_id", + "signature", + "evidence_hash", + "evidence_type", + "linked_call_id" + ], + "properties": { + "issuer": { + "type": "string", + "description": "Identity (e.g. SPIFFE URI) of the authority that signed the receipt." + }, + "issuer_key_id": { + "type": "string", + "description": "Key identifier for the issuer public key used to verify the signature." + }, + "signature": { + "type": "string", + "description": "base64url Ed25519 signature over the canonical receipt (all fields except signature)." + }, + "evidence_hash": { + "type": "string", + "description": "sha256: of the attested execution evidence (e.g. the controller decision payload)." + }, + "evidence_type": { + "type": "string", + "description": "Receipt type and version, e.g. controller-execution-receipt/v1." + }, + "linked_call_id": { + "type": "string", + "description": "The call_id this receipt is bound to; a verifier checks it equals the entry call_id." + } + } + }, "session_sensitivity_before": { "type": ["string", "null"], "description": "Session sensitivity level before this entry was processed." @@ -125,4 +164,4 @@ "description": "SHA-256 hex of this entry's canonical JSON, excluding the entry_hash field itself." } } -} +} diff --git a/src/cmcp_runtime/audit/chain.py b/src/cmcp_runtime/audit/chain.py index 624e63d..c728199 100644 --- a/src/cmcp_runtime/audit/chain.py +++ b/src/cmcp_runtime/audit/chain.py @@ -61,6 +61,12 @@ class AuditEntry: detail: dict[str, str | int | float] | None # optional structured detail (e.g. suspicious_call_sequence) workflow_id: str | None prev_entry_hash: str # "genesis" for first entry + # #301: optional independent execution evidence, e.g. a controller-signed + # receipt of a physical outcome. Distinct from response_payload_hash: + # response_payload_hash is what the gateway forwarded, this is what an + # independent authority attested. Serialized uniformly (null when absent), + # so entries without a receipt hash exactly as before. + external_execution_evidence: dict[str, str] | None = None entry_hash: str = field(default="") # computed after construction def _canonical_body(self) -> bytes: @@ -161,6 +167,7 @@ def append( session_sensitivity_after: str | None = None, detail: dict[str, str | int | float] | None = None, workflow_id: str | None = None, + external_execution_evidence: dict[str, str] | None = None, ) -> AuditEntry: prev_hash = self._entries[-1].entry_hash if self._entries else "genesis" now = datetime.now(tz=UTC) @@ -187,6 +194,7 @@ def append( session_sensitivity_after=session_sensitivity_after, detail=detail, workflow_id=workflow_id, + external_execution_evidence=external_execution_evidence, prev_entry_hash=prev_hash, ) entry.entry_hash = entry.compute_hash() diff --git a/src/cmcp_verify/verify.py b/src/cmcp_verify/verify.py index f0c6c1f..5255605 100644 --- a/src/cmcp_verify/verify.py +++ b/src/cmcp_verify/verify.py @@ -264,6 +264,8 @@ class AuditBundleResult: def verify_audit_bundle( bundle_json: dict[str, Any], claim_json: dict[str, Any] | None = None, + *, + external_evidence_keys: dict[str, str] | None = None, ) -> AuditBundleResult: """ Verify an exported audit bundle (GET /audit/export): @@ -273,6 +275,12 @@ def verify_audit_bundle( 2. If a claim is provided, cross-check the bundle's root/tip/length against gateway.audit_chain and verify the bundle_signature with the claim's confirmation key (trace.cnf.jwk.x). + 3. #301: if external_evidence_keys is provided (issuer_key_id -> hex Ed25519 + public key), verify any external_execution_evidence receipt bound to an + entry: linked_call_id must equal the entry call_id, and the issuer + signature must verify over the canonical receipt (all fields except + signature). This is opt-in: receipt-less entries and callers that do not + supply keys are unaffected, so existing evidence keeps verifying. """ failures: list[str] = [] entries = bundle_json.get("entries", []) @@ -291,6 +299,45 @@ def verify_audit_bundle( failures.append(f"entry {i}: chain link broken") prev = entry.get("entry_hash", "") + # #301: verify independent execution receipts (opt-in via external_evidence_keys). + if external_evidence_keys is not None: + for i, entry in enumerate(entries): + ev = entry.get("external_execution_evidence") + if not ev: + continue + if ev.get("linked_call_id") != entry.get("call_id"): + failures.append( + f"entry {i}: external_execution_evidence linked_call_id does not " + "match the entry call_id" + ) + key_id = ev.get("issuer_key_id", "") + pub_hex = external_evidence_keys.get(key_id) + if not pub_hex: + failures.append( + f"entry {i}: no trusted key for external evidence issuer_key_id '{key_id}'" + ) + continue + try: + pub = Ed25519PublicKey.from_public_bytes(bytes.fromhex(pub_hex)) + signing_input = json.dumps( + {k: v for k, v in ev.items() if k != "signature"}, + sort_keys=True, + separators=(",", ":"), + ensure_ascii=True, + ).encode() + sig_b64 = ev.get("signature", "") + pad = 4 - (len(sig_b64) % 4) + sig = base64.urlsafe_b64decode(sig_b64 + ("=" * pad if pad != 4 else "")) + pub.verify(sig, signing_input) + except InvalidSignature: + failures.append( + f"entry {i}: external_execution_evidence signature is invalid" + ) + except Exception as exc: + failures.append( + f"entry {i}: external_execution_evidence could not be verified: {exc}" + ) + if claim_json is not None: chain = claim_json.get("gateway", {}).get("audit_chain", {}) if chain.get("root") != entries[0].get("entry_hash"): diff --git a/tests/conformance/test_audit_conformance.py b/tests/conformance/test_audit_conformance.py index 9f236c5..ca1a035 100644 --- a/tests/conformance/test_audit_conformance.py +++ b/tests/conformance/test_audit_conformance.py @@ -4,6 +4,7 @@ """ from __future__ import annotations +import base64 import hashlib import json import logging @@ -12,6 +13,8 @@ from unittest.mock import patch import pytest +from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey +from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat from cmcp_runtime.audit.chain import AuditChain from cmcp_runtime.audit.keys import SigningKey @@ -23,6 +26,7 @@ ToolCatalogInfo, generate_trace_claim, ) +from cmcp_verify.verify import verify_audit_bundle # ---- helpers ---------------------------------------------------------------- @@ -315,3 +319,125 @@ def test_sequence_numbers_strictly_increasing(self): d = _claim_dict(chain, key, seq=s) assert d["gateway"]["sequence_number"] == s assert seqs == sorted(set(seqs)) + + +# ---- #301: independent execution evidence ----------------------------------- + + +def _ed25519_keypair() -> tuple[Ed25519PrivateKey, str]: + priv = Ed25519PrivateKey.generate() + pub_hex = priv.public_key().public_bytes( + encoding=Encoding.Raw, format=PublicFormat.Raw + ).hex() + return priv, pub_hex + + +def _signed_receipt( + priv: Ed25519PrivateKey, + call_id: str, + *, + issuer_key_id: str = "ctrl-key-1", +) -> dict[str, str]: + """Build a controller-signed execution receipt for a call.""" + receipt = { + "issuer": "spiffe://factory.example/controller/robot-cell-7", + "issuer_key_id": issuer_key_id, + "evidence_hash": "sha256:" + "ab" * 32, + "evidence_type": "controller-execution-receipt/v1", + "linked_call_id": call_id, + } + signing_input = json.dumps( + receipt, sort_keys=True, separators=(",", ":"), ensure_ascii=True + ).encode() + receipt["signature"] = ( + base64.urlsafe_b64encode(priv.sign(signing_input)).rstrip(b"=").decode() + ) + return receipt + + +def _bundle(chain: AuditChain) -> dict: + return {"entries": [asdict(e) for e in chain.entries]} + + +class TestExternalExecutionEvidence301: + """#301: optional external_execution_evidence bound to an audit entry.""" + + def test_absent_receipt_verifies_and_keeps_old_hashing(self): + # Receipt-less entries serialize and hash exactly as before, and a + # bundle with no receipts verifies without any keys configured. + chain = AuditChain("sess-301-a") + entry = chain.append("tool_call", call_id="c1", tool_name="t", policy_decision="allow") + d = asdict(entry) + d.pop("entry_hash") + expected = hashlib.sha256( + json.dumps(d, sort_keys=True, separators=(",", ":"), ensure_ascii=True).encode() + ).hexdigest() + assert entry.entry_hash == expected + assert verify_audit_bundle(_bundle(chain)).verified + + def test_populated_receipt_verifies(self): + priv, pub_hex = _ed25519_keypair() + chain = AuditChain("sess-301-b") + chain.append( + "tool_call", + call_id="c1", + tool_name="robot.request_motion", + policy_decision="allow", + external_execution_evidence=_signed_receipt(priv, "c1"), + ) + result = verify_audit_bundle( + _bundle(chain), external_evidence_keys={"ctrl-key-1": pub_hex} + ) + assert result.verified, result.failures + + def test_tampered_receipt_fails(self): + priv, pub_hex = _ed25519_keypair() + receipt = _signed_receipt(priv, "c1") + receipt["evidence_hash"] = "sha256:" + "cd" * 32 # tamper after signing + chain = AuditChain("sess-301-c") + chain.append( + "tool_call", + call_id="c1", + tool_name="robot.request_motion", + policy_decision="allow", + external_execution_evidence=receipt, + ) + # The chain hash still matches (append sealed the tampered receipt), so + # only the receipt signature check fails. + result = verify_audit_bundle( + _bundle(chain), external_evidence_keys={"ctrl-key-1": pub_hex} + ) + assert not result.verified + assert any("signature is invalid" in f for f in result.failures) + + def test_linked_call_id_mismatch_fails(self): + priv, pub_hex = _ed25519_keypair() + chain = AuditChain("sess-301-d") + chain.append( + "tool_call", + call_id="c1", + tool_name="t", + policy_decision="allow", + external_execution_evidence=_signed_receipt(priv, "a-different-call"), + ) + result = verify_audit_bundle( + _bundle(chain), external_evidence_keys={"ctrl-key-1": pub_hex} + ) + assert not result.verified + assert any("linked_call_id" in f for f in result.failures) + + def test_unknown_issuer_key_fails(self): + priv, _ = _ed25519_keypair() + chain = AuditChain("sess-301-e") + chain.append( + "tool_call", + call_id="c1", + tool_name="t", + policy_decision="allow", + external_execution_evidence=_signed_receipt(priv, "c1", issuer_key_id="unknown"), + ) + result = verify_audit_bundle( + _bundle(chain), external_evidence_keys={"ctrl-key-1": "00" * 32} + ) + assert not result.verified + assert any("no trusted key" in f for f in result.failures) From 4f82a66e01bcff0b140bff6a8cc3ea53f2503b4f Mon Sep 17 00:00:00 2001 From: Carlos Hernandez Date: Wed, 17 Jun 2026 21:32:03 +0200 Subject: [PATCH 2/3] fix(audit): tighten external evidence verification Signed-off-by: Carlos Hernandez --- LIMITATIONS.md | 2 + docs/spec/error-codes.md | 1 + docs/spec/verification-library.md | 36 +++++++++ schemas/audit-entry.schema.json | 10 ++- src/cmcp_runtime/audit/chain.py | 2 +- src/cmcp_verify/verify.py | 74 +++++++++++++++--- tests/conformance/test_audit_conformance.py | 86 ++++++++++++++++----- 7 files changed, 179 insertions(+), 32 deletions(-) diff --git a/LIMITATIONS.md b/LIMITATIONS.md index d19bf67..07b3df8 100644 --- a/LIMITATIONS.md +++ b/LIMITATIONS.md @@ -19,6 +19,8 @@ Phase 1 attests the gateway boundary. It does not attest what happens on the oth **External execution evidence (issue #301)** An audit entry may carry an optional `external_execution_evidence` receipt: a signature from an independent authority (for example a safety controller) attesting to an outcome, bound to a specific `call_id`. This is deliberately distinct from `response_payload_hash`, which records what the gateway forwarded. The receipt establishes that the named issuer signed an assertion about that call. It does not establish that a physical action occurred, that it was safe, or that it meets any functional-safety standard, and it is only as trustworthy as the issuer key behind it. cMCP does not observe the actuation; it records the receipt and, when a verifier is configured with the issuer trusted key, checks the signature and the `call_id` binding. Trust in the issuer key is an out-of-band PKI concern, the same shape as the manifest issuer trust anchor in issue #302. Verification is opt-in: receipt-less entries, and verifiers that do not configure issuer keys, are unaffected. +The TRACE Claim does not carry a separate "external evidence present" flag. Verifiers learn that external evidence was bound by fetching the committed audit bundle and checking entries under the TRACE Claim's `gateway.audit_chain.tip`. + **LLM inference and model output** cMCP intercepts tool calls at the MCP protocol boundary. It does not observe or modify LLM inference, the contents of the agent's context window, or model outputs that do not produce a tool call. A model could hallucinate a response, leak sensitive context in a chat reply, or receive a poisoned tool response that influences subsequent reasoning -- none of these are visible to the gateway. cMCP controls the tool boundary, not the model boundary. diff --git a/docs/spec/error-codes.md b/docs/spec/error-codes.md index fd71100..de49ec2 100644 --- a/docs/spec/error-codes.md +++ b/docs/spec/error-codes.md @@ -38,5 +38,6 @@ The following error codes are defined and documented in [verification-library.md | `ATTESTATION_STALE` | | `CHAIN_BROKEN` | | `CLAIM_MALFORMED` | +| `EXTERNAL_EVIDENCE_VERIFICATION_FAILED` | > Note: `POLICY_HASH_MISMATCH`, `CATALOG_HASH_MISMATCH`, and `ATTESTATION_STALE` appear in both tables. The Runtime emits them during startup or request handling; the verification library emits them during offline or client-side verification. The semantics are consistent across both contexts. diff --git a/docs/spec/verification-library.md b/docs/spec/verification-library.md index 52b4c6a..9438d1a 100644 --- a/docs/spec/verification-library.md +++ b/docs/spec/verification-library.md @@ -63,6 +63,41 @@ def verify_trace_claim( ... ``` +### Audit Bundle Verification and External Execution Evidence + +```python +@dataclass +class AuditBundleResult: + verified: bool + entry_count: int + failures: list[str] + +def verify_audit_bundle( + bundle_json: dict, + claim_json: Optional[dict] = None, + *, + external_evidence_keys: Optional[dict[str, bytes]] = None, +) -> AuditBundleResult: + """ + Verify an exported audit bundle. When external_evidence_keys is supplied, + each key is issuer_key_id -> raw 32-byte Ed25519 public key. issuer_key_id + is lowercase hex SHA-256(public_key_bytes). + """ + ... +``` + +`external_execution_evidence.evidence_hash` is the digest of the detached evidence payload attested by the issuer, not the digest of the receipt envelope. For JSON evidence payloads, the hash pre-image is the UTF-8 bytes of the RFC 8785/JCS canonical JSON representation. For non-JSON evidence payloads, the pre-image is the exact byte string identified by the issuer's evidence format. The field value is `sha256:` or `sha384:`. + +The verifier computes the receipt signing input as canonical JSON over the receipt object excluding `signature`, with sorted keys and compact separators. It then checks: + +1. `linked_call_id` equals the audit entry `call_id`. +2. `issuer_key_id` is lowercase hex SHA-256 of the trusted issuer public key. +3. `evidence_hash` has a supported hash prefix and hex digest. +4. `evidence_type` is one of the documented receipt types. +5. The Ed25519 signature verifies over the canonical receipt signing input. + +If any external evidence check fails, the audit bundle result is `verified=False` and the failure string includes `EXTERNAL_EVIDENCE_VERIFICATION_FAILED`. + ## Per-Provider Verification Steps ### TPM Verification @@ -119,6 +154,7 @@ VerificationError enum: - ATTESTATION_STALE: attestation_generated_at is older than max_attestation_age_seconds - CHAIN_BROKEN: audit_chain_root -> audit_chain_tip traversal fails (missing entries or hash mismatch) - CLAIM_MALFORMED: claim_json fails JSON Schema validation against the TRACE Claim schema +- EXTERNAL_EVIDENCE_VERIFICATION_FAILED: an audit bundle entry contains external_execution_evidence whose call binding, key id, evidence hash, evidence type, or issuer signature cannot be verified ## Phase 1 support matrix diff --git a/schemas/audit-entry.schema.json b/schemas/audit-entry.schema.json index d143fbc..d7312e9 100644 --- a/schemas/audit-entry.schema.json +++ b/schemas/audit-entry.schema.json @@ -127,7 +127,8 @@ }, "issuer_key_id": { "type": "string", - "description": "Key identifier for the issuer public key used to verify the signature." + "pattern": "^[0-9a-f]{64}$", + "description": "Lowercase hex SHA-256 digest of the raw Ed25519 issuer public key used to verify the signature." }, "signature": { "type": "string", @@ -135,10 +136,17 @@ }, "evidence_hash": { "type": "string", + "pattern": "^sha(256|384):[0-9a-f]+", "description": "sha256: of the attested execution evidence (e.g. the controller decision payload)." }, "evidence_type": { "type": "string", + "enum": [ + "controller-execution-receipt/v1", + "tee-signed-receipt", + "controller-jwt", + "opaque-receipt" + ], "description": "Receipt type and version, e.g. controller-execution-receipt/v1." }, "linked_call_id": { diff --git a/src/cmcp_runtime/audit/chain.py b/src/cmcp_runtime/audit/chain.py index c728199..51ed49f 100644 --- a/src/cmcp_runtime/audit/chain.py +++ b/src/cmcp_runtime/audit/chain.py @@ -65,7 +65,7 @@ class AuditEntry: # receipt of a physical outcome. Distinct from response_payload_hash: # response_payload_hash is what the gateway forwarded, this is what an # independent authority attested. Serialized uniformly (null when absent), - # so entries without a receipt hash exactly as before. + # so receipt-less entries remain deterministic and schema-stable. external_execution_evidence: dict[str, str] | None = None entry_hash: str = field(default="") # computed after construction diff --git a/src/cmcp_verify/verify.py b/src/cmcp_verify/verify.py index 5255605..0423a15 100644 --- a/src/cmcp_verify/verify.py +++ b/src/cmcp_verify/verify.py @@ -12,6 +12,7 @@ import hashlib import json import logging +import re from dataclasses import dataclass, field from datetime import UTC, datetime from enum import StrEnum @@ -26,6 +27,15 @@ logger = logging.getLogger(__name__) _SW_ONLY_FIRMWARE = "software-only-dev-mode" +_EXTERNAL_EVIDENCE_ERROR = "EXTERNAL_EVIDENCE_VERIFICATION_FAILED" +_EXTERNAL_EVIDENCE_HASH_RE = re.compile(r"^sha(256|384):[0-9a-f]+$") +_ISSUER_KEY_ID_RE = re.compile(r"^[0-9a-f]{64}$") +_EXTERNAL_EVIDENCE_TYPES = frozenset({ + "controller-execution-receipt/v1", + "tee-signed-receipt", + "controller-jwt", + "opaque-receipt", +}) _KNOWN_PLATFORMS = { "amd-sev-snp", @@ -261,11 +271,15 @@ class AuditBundleResult: failures: list[str] = field(default_factory=list) +def _external_evidence_failure(entry_index: int, reason: str) -> str: + return f"entry {entry_index}: {_EXTERNAL_EVIDENCE_ERROR}: {reason}" + + def verify_audit_bundle( bundle_json: dict[str, Any], claim_json: dict[str, Any] | None = None, *, - external_evidence_keys: dict[str, str] | None = None, + external_evidence_keys: dict[str, bytes] | None = None, ) -> AuditBundleResult: """ Verify an exported audit bundle (GET /audit/export): @@ -275,8 +289,8 @@ def verify_audit_bundle( 2. If a claim is provided, cross-check the bundle's root/tip/length against gateway.audit_chain and verify the bundle_signature with the claim's confirmation key (trace.cnf.jwk.x). - 3. #301: if external_evidence_keys is provided (issuer_key_id -> hex Ed25519 - public key), verify any external_execution_evidence receipt bound to an + 3. #301: if external_evidence_keys is provided (issuer_key_id -> raw Ed25519 + public key bytes), verify any external_execution_evidence receipt bound to an entry: linked_call_id must equal the entry call_id, and the issuer signature must verify over the canonical receipt (all fields except signature). This is opt-in: receipt-less entries and callers that do not @@ -305,20 +319,56 @@ def verify_audit_bundle( ev = entry.get("external_execution_evidence") if not ev: continue + if not isinstance(ev, dict): + failures.append( + _external_evidence_failure(i, "external_execution_evidence is not an object") + ) + continue if ev.get("linked_call_id") != entry.get("call_id"): failures.append( - f"entry {i}: external_execution_evidence linked_call_id does not " - "match the entry call_id" + _external_evidence_failure( + i, + "external_execution_evidence linked_call_id does not match " + "the entry call_id", + ) ) key_id = ev.get("issuer_key_id", "") - pub_hex = external_evidence_keys.get(key_id) - if not pub_hex: + if not isinstance(key_id, str) or not _ISSUER_KEY_ID_RE.match(key_id): + failures.append( + _external_evidence_failure( + i, + "issuer_key_id must be lowercase hex SHA-256 of the issuer public key", + ) + ) + continue + evidence_hash = ev.get("evidence_hash", "") + if not isinstance(evidence_hash, str) or not _EXTERNAL_EVIDENCE_HASH_RE.match(evidence_hash): + failures.append( + _external_evidence_failure( + i, "evidence_hash must be sha256: or sha384:" + ) + ) + continue + evidence_type = ev.get("evidence_type", "") + if evidence_type not in _EXTERNAL_EVIDENCE_TYPES: + failures.append( + _external_evidence_failure(i, f"unsupported evidence_type '{evidence_type}'") + ) + continue + pub_bytes = external_evidence_keys.get(key_id) + if not pub_bytes: failures.append( - f"entry {i}: no trusted key for external evidence issuer_key_id '{key_id}'" + _external_evidence_failure( + i, f"no trusted key for external evidence issuer_key_id '{key_id}'" + ) ) continue try: - pub = Ed25519PublicKey.from_public_bytes(bytes.fromhex(pub_hex)) + if len(pub_bytes) != 32: + raise ValueError("trusted issuer key must be 32 raw Ed25519 public key bytes") + if hashlib.sha256(pub_bytes).hexdigest() != key_id: + raise ValueError("issuer_key_id does not match trusted issuer public key") + pub = Ed25519PublicKey.from_public_bytes(pub_bytes) signing_input = json.dumps( {k: v for k, v in ev.items() if k != "signature"}, sort_keys=True, @@ -331,11 +381,13 @@ def verify_audit_bundle( pub.verify(sig, signing_input) except InvalidSignature: failures.append( - f"entry {i}: external_execution_evidence signature is invalid" + _external_evidence_failure(i, "external_execution_evidence signature is invalid") ) except Exception as exc: failures.append( - f"entry {i}: external_execution_evidence could not be verified: {exc}" + _external_evidence_failure( + i, f"external_execution_evidence could not be verified: {exc}" + ) ) if claim_json is not None: diff --git a/tests/conformance/test_audit_conformance.py b/tests/conformance/test_audit_conformance.py index ca1a035..9618f81 100644 --- a/tests/conformance/test_audit_conformance.py +++ b/tests/conformance/test_audit_conformance.py @@ -324,19 +324,19 @@ def test_sequence_numbers_strictly_increasing(self): # ---- #301: independent execution evidence ----------------------------------- -def _ed25519_keypair() -> tuple[Ed25519PrivateKey, str]: +def _ed25519_keypair() -> tuple[Ed25519PrivateKey, bytes, str]: priv = Ed25519PrivateKey.generate() - pub_hex = priv.public_key().public_bytes( + pub = priv.public_key().public_bytes( encoding=Encoding.Raw, format=PublicFormat.Raw - ).hex() - return priv, pub_hex + ) + return priv, pub, hashlib.sha256(pub).hexdigest() def _signed_receipt( priv: Ed25519PrivateKey, call_id: str, *, - issuer_key_id: str = "ctrl-key-1", + issuer_key_id: str, ) -> dict[str, str]: """Build a controller-signed execution receipt for a call.""" receipt = { @@ -359,11 +359,23 @@ def _bundle(chain: AuditChain) -> dict: return {"entries": [asdict(e) for e in chain.entries]} +def _signed_bundle(chain: AuditChain, key: SigningKey) -> dict: + entries = [asdict(e) for e in chain.entries] + digest = hashlib.sha256( + json.dumps(entries, sort_keys=True, separators=(",", ":"), ensure_ascii=True).encode() + ).digest() + return { + "session_id": chain._session_id, + "entries": entries, + "bundle_signature": base64.urlsafe_b64encode(key.sign(digest)).rstrip(b"=").decode(), + } + + class TestExternalExecutionEvidence301: """#301: optional external_execution_evidence bound to an audit entry.""" - def test_absent_receipt_verifies_and_keeps_old_hashing(self): - # Receipt-less entries serialize and hash exactly as before, and a + def test_absent_receipt_verifies_with_null_evidence_field(self): + # Receipt-less entries serialize and hash deterministically, and a # bundle with no receipts verifies without any keys configured. chain = AuditChain("sess-301-a") entry = chain.append("tool_call", call_id="c1", tool_name="t", policy_decision="allow") @@ -376,23 +388,41 @@ def test_absent_receipt_verifies_and_keeps_old_hashing(self): assert verify_audit_bundle(_bundle(chain)).verified def test_populated_receipt_verifies(self): - priv, pub_hex = _ed25519_keypair() + priv, pub, key_id = _ed25519_keypair() chain = AuditChain("sess-301-b") chain.append( "tool_call", call_id="c1", tool_name="robot.request_motion", policy_decision="allow", - external_execution_evidence=_signed_receipt(priv, "c1"), + external_execution_evidence=_signed_receipt(priv, "c1", issuer_key_id=key_id), + ) + result = verify_audit_bundle( + _bundle(chain), external_evidence_keys={key_id: pub} + ) + assert result.verified, result.failures + + def test_exported_bundle_with_receipt_verifies_end_to_end(self): + priv, pub, key_id = _ed25519_keypair() + signing_key = SigningKey() + chain = AuditChain("sess-301-export") + chain.append( + "tool_call", + call_id="c1", + tool_name="robot.request_motion", + policy_decision="allow", + external_execution_evidence=_signed_receipt(priv, "c1", issuer_key_id=key_id), ) result = verify_audit_bundle( - _bundle(chain), external_evidence_keys={"ctrl-key-1": pub_hex} + _signed_bundle(chain, signing_key), + _claim_dict(chain, signing_key), + external_evidence_keys={key_id: pub}, ) assert result.verified, result.failures def test_tampered_receipt_fails(self): - priv, pub_hex = _ed25519_keypair() - receipt = _signed_receipt(priv, "c1") + priv, pub, key_id = _ed25519_keypair() + receipt = _signed_receipt(priv, "c1", issuer_key_id=key_id) receipt["evidence_hash"] = "sha256:" + "cd" * 32 # tamper after signing chain = AuditChain("sess-301-c") chain.append( @@ -405,39 +435,57 @@ def test_tampered_receipt_fails(self): # The chain hash still matches (append sealed the tampered receipt), so # only the receipt signature check fails. result = verify_audit_bundle( - _bundle(chain), external_evidence_keys={"ctrl-key-1": pub_hex} + _bundle(chain), external_evidence_keys={key_id: pub} ) assert not result.verified assert any("signature is invalid" in f for f in result.failures) def test_linked_call_id_mismatch_fails(self): - priv, pub_hex = _ed25519_keypair() + priv, pub, key_id = _ed25519_keypair() chain = AuditChain("sess-301-d") chain.append( "tool_call", call_id="c1", tool_name="t", policy_decision="allow", - external_execution_evidence=_signed_receipt(priv, "a-different-call"), + external_execution_evidence=_signed_receipt( + priv, "a-different-call", issuer_key_id=key_id + ), ) result = verify_audit_bundle( - _bundle(chain), external_evidence_keys={"ctrl-key-1": pub_hex} + _bundle(chain), external_evidence_keys={key_id: pub} ) assert not result.verified assert any("linked_call_id" in f for f in result.failures) def test_unknown_issuer_key_fails(self): - priv, _ = _ed25519_keypair() + priv, _, key_id = _ed25519_keypair() chain = AuditChain("sess-301-e") chain.append( "tool_call", call_id="c1", tool_name="t", policy_decision="allow", - external_execution_evidence=_signed_receipt(priv, "c1", issuer_key_id="unknown"), + external_execution_evidence=_signed_receipt(priv, "c1", issuer_key_id=key_id), ) result = verify_audit_bundle( - _bundle(chain), external_evidence_keys={"ctrl-key-1": "00" * 32} + _bundle(chain), external_evidence_keys={"0" * 64: b"\x00" * 32} ) assert not result.verified assert any("no trusted key" in f for f in result.failures) + + def test_key_id_mismatch_fails(self): + priv, pub, key_id = _ed25519_keypair() + chain = AuditChain("sess-301-f") + chain.append( + "tool_call", + call_id="c1", + tool_name="t", + policy_decision="allow", + external_execution_evidence=_signed_receipt(priv, "c1", issuer_key_id=key_id), + ) + result = verify_audit_bundle( + _bundle(chain), external_evidence_keys={key_id: b"\x00" * len(pub)} + ) + assert not result.verified + assert any("issuer_key_id does not match" in f for f in result.failures) From 29ef0379882a4d79505ffeeaf066498c43f5c35e Mon Sep 17 00:00:00 2001 From: Carlos Hernandez Date: Wed, 17 Jun 2026 22:01:30 +0200 Subject: [PATCH 3/3] feat(audit): ingest external execution evidence from tool responses Signed-off-by: Carlos Hernandez --- LIMITATIONS.md | 2 ++ docs/spec/verification-library.md | 2 ++ src/cmcp_runtime/mcp/proxy.py | 41 ++++++++++++++++++++++++++ tests/unit/test_mcp_proxy.py | 48 +++++++++++++++++++++++++++++++ 4 files changed, 93 insertions(+) diff --git a/LIMITATIONS.md b/LIMITATIONS.md index 07b3df8..18c362f 100644 --- a/LIMITATIONS.md +++ b/LIMITATIONS.md @@ -19,6 +19,8 @@ Phase 1 attests the gateway boundary. It does not attest what happens on the oth **External execution evidence (issue #301)** An audit entry may carry an optional `external_execution_evidence` receipt: a signature from an independent authority (for example a safety controller) attesting to an outcome, bound to a specific `call_id`. This is deliberately distinct from `response_payload_hash`, which records what the gateway forwarded. The receipt establishes that the named issuer signed an assertion about that call. It does not establish that a physical action occurred, that it was safe, or that it meets any functional-safety standard, and it is only as trustworthy as the issuer key behind it. cMCP does not observe the actuation; it records the receipt and, when a verifier is configured with the issuer trusted key, checks the signature and the `call_id` binding. Trust in the issuer key is an out-of-band PKI concern, the same shape as the manifest issuer trust anchor in issue #302. Verification is opt-in: receipt-less entries, and verifiers that do not configure issuer keys, are unaffected. +In the proxy path, cMCP binds the receipt when an allowed upstream tool response is a JSON object with a top-level `external_execution_evidence` object matching the audit schema. The full response, including that receipt if present, remains covered by `response_payload_hash`. + The TRACE Claim does not carry a separate "external evidence present" flag. Verifiers learn that external evidence was bound by fetching the committed audit bundle and checking entries under the TRACE Claim's `gateway.audit_chain.tip`. **LLM inference and model output** diff --git a/docs/spec/verification-library.md b/docs/spec/verification-library.md index 9438d1a..4affc3a 100644 --- a/docs/spec/verification-library.md +++ b/docs/spec/verification-library.md @@ -88,6 +88,8 @@ def verify_audit_bundle( `external_execution_evidence.evidence_hash` is the digest of the detached evidence payload attested by the issuer, not the digest of the receipt envelope. For JSON evidence payloads, the hash pre-image is the UTF-8 bytes of the RFC 8785/JCS canonical JSON representation. For non-JSON evidence payloads, the pre-image is the exact byte string identified by the issuer's evidence format. The field value is `sha256:` or `sha384:`. +Runtime ingestion convention: when an allowed upstream tool response is a JSON object with a top-level `external_execution_evidence` object matching the audit schema, cMCP copies that receipt into the `tool_call` audit entry. The response itself is not rewritten; `response_payload_hash` still covers the bytes returned to the caller. + The verifier computes the receipt signing input as canonical JSON over the receipt object excluding `signature`, with sorted keys and compact separators. It then checks: 1. `linked_call_id` equals the audit entry `call_id`. diff --git a/src/cmcp_runtime/mcp/proxy.py b/src/cmcp_runtime/mcp/proxy.py index b6c5458..28cc5fe 100644 --- a/src/cmcp_runtime/mcp/proxy.py +++ b/src/cmcp_runtime/mcp/proxy.py @@ -35,6 +35,15 @@ logger = logging.getLogger(__name__) +_EXTERNAL_EVIDENCE_FIELDS: frozenset[str] = frozenset({ + "issuer", + "issuer_key_id", + "signature", + "evidence_hash", + "evidence_type", + "linked_call_id", +}) + @dataclass class CallResult: @@ -73,6 +82,36 @@ def _cedar_safe(value: Any) -> Any: return str(value) +def _extract_external_execution_evidence(response_text: str) -> dict[str, str] | None: + """Return a well-formed external execution receipt from a JSON response, if present.""" + try: + decoded = json.loads(response_text) + except json.JSONDecodeError: + return None + if not isinstance(decoded, dict): + return None + + receipt = decoded.get("external_execution_evidence") + if receipt is None: + return None + if not isinstance(receipt, dict): + logger.warning( + "EXTERNAL_EVIDENCE_IGNORED: external_execution_evidence is not an object" + ) + return None + if set(receipt) != _EXTERNAL_EVIDENCE_FIELDS: + logger.warning( + "EXTERNAL_EVIDENCE_IGNORED: external_execution_evidence fields mismatch" + ) + return None + if not all(isinstance(receipt[field], str) for field in _EXTERNAL_EVIDENCE_FIELDS): + logger.warning( + "EXTERNAL_EVIDENCE_IGNORED: external_execution_evidence values must be strings" + ) + return None + return {field: receipt[field] for field in sorted(_EXTERNAL_EVIDENCE_FIELDS)} + + class CMCPProxy: """ Wraps AGT's MCPGateway so every tool call is: @@ -813,6 +852,7 @@ async def call_tool( # egress check saw (post-scan, possibly sanitized) so a verifier can match # the audited response against what the caller actually received. response_payload_hash = f"sha256:{hashlib.sha256(response_bytes).hexdigest()}" + external_execution_evidence = _extract_external_execution_evidence(agt_result) # INJECT-003: include injection scanner and pattern in audit detail when detected injection_detail: dict[str, str | int | float] | None = ( { @@ -838,6 +878,7 @@ async def call_tool( session_sensitivity_after=self._session.max_sensitivity, workflow_id=workflow_id, detail=injection_detail, + external_execution_evidence=external_execution_evidence, ) # Step 6: call log record + suspicious-sequence check diff --git a/tests/unit/test_mcp_proxy.py b/tests/unit/test_mcp_proxy.py index 697f015..91e14a6 100644 --- a/tests/unit/test_mcp_proxy.py +++ b/tests/unit/test_mcp_proxy.py @@ -2,6 +2,7 @@ from __future__ import annotations +import json from unittest.mock import MagicMock, patch import pytest @@ -323,6 +324,53 @@ async def test_audit_response_payload_hash_on_success(): assert entry.response_payload_hash == _sha256_of_text("upstream says hi") +@pytest.mark.asyncio +async def test_audit_binds_external_execution_evidence_from_json_response(): + """#301 follow-up - a well-formed upstream receipt is copied into the audit entry.""" + receipt = { + "issuer": "spiffe://factory.example/controller/robot-cell-7", + "issuer_key_id": "a" * 64, + "signature": "sig", + "evidence_hash": "sha256:" + "b" * 64, + "evidence_type": "controller-execution-receipt/v1", + "linked_call_id": "c1", + } + proxy, _, chain = _make_proxy() + wire_mock_gateway( + proxy, + response_text=json.dumps( + { + "controller_decision": "rejected", + "reason": "human_detected", + "external_execution_evidence": receipt, + } + ), + ) + result = await proxy.call_tool("c1", "test.tool", {}) + assert result.allowed is True + entry = next(e for e in reversed(chain.entries) if e.entry_type == "tool_call") + assert entry.external_execution_evidence == receipt + + +@pytest.mark.asyncio +async def test_audit_ignores_malformed_external_execution_evidence(): + """Malformed receipt-looking response fields are not bound as audit evidence.""" + proxy, _, chain = _make_proxy() + wire_mock_gateway( + proxy, + response_text=json.dumps({ + "external_execution_evidence": { + "issuer": "spiffe://factory.example/controller/robot-cell-7", + "linked_call_id": "c1", + } + }), + ) + result = await proxy.call_tool("c1", "test.tool", {}) + assert result.allowed is True + entry = next(e for e in reversed(chain.entries) if e.entry_type == "tool_call") + assert entry.external_execution_evidence is None + + @pytest.mark.asyncio async def test_audit_response_payload_hash_uses_sanitized_content(): """#293 - when the scanner sanitizes the response, the audit hash must cover