diff --git a/LIMITATIONS.md b/LIMITATIONS.md index 3708f16..18c362f 100644 --- a/LIMITATIONS.md +++ b/LIMITATIONS.md @@ -16,6 +16,13 @@ The TEE-sealed signing key is generated inside the enclave and cannot be extract **Phase 2 completeness: server-side attestation** Phase 1 attests the gateway boundary. It does not attest what happens on the other side of that boundary. The `tool_transcript.hash` field in the TRACE Claim records a hash of the audit chain tip, but the tool transcript binding that ties a specific tool execution to a specific response is Phase 2 work. Phase 1 partially addresses P1.4 (transitive trust into upstream dependencies) and P4.1 (typosquatted packages added to catalog) -- both are fully closed by Phase 2. Any compliance claim that relies on server-side proof must wait for Phase 2. +**External execution evidence (issue #301)** +An audit entry may carry an optional `external_execution_evidence` receipt: a signature from an independent authority (for example a safety controller) attesting to an outcome, bound to a specific `call_id`. This is deliberately distinct from `response_payload_hash`, which records what the gateway forwarded. The receipt establishes that the named issuer signed an assertion about that call. It does not establish that a physical action occurred, that it was safe, or that it meets any functional-safety standard, and it is only as trustworthy as the issuer key behind it. cMCP does not observe the actuation; it records the receipt and, when a verifier is configured with the issuer trusted key, checks the signature and the `call_id` binding. Trust in the issuer key is an out-of-band PKI concern, the same shape as the manifest issuer trust anchor in issue #302. Verification is opt-in: receipt-less entries, and verifiers that do not configure issuer keys, are unaffected. + +In the proxy path, cMCP binds the receipt when an allowed upstream tool response is a JSON object with a top-level `external_execution_evidence` object matching the audit schema. The full response, including that receipt if present, remains covered by `response_payload_hash`. + +The TRACE Claim does not carry a separate "external evidence present" flag. Verifiers learn that external evidence was bound by fetching the committed audit bundle and checking entries under the TRACE Claim's `gateway.audit_chain.tip`. + **LLM inference and model output** cMCP intercepts tool calls at the MCP protocol boundary. It does not observe or modify LLM inference, the contents of the agent's context window, or model outputs that do not produce a tool call. A model could hallucinate a response, leak sensitive context in a chat reply, or receive a poisoned tool response that influences subsequent reasoning -- none of these are visible to the gateway. cMCP controls the tool boundary, not the model boundary. diff --git a/docs/spec/error-codes.md b/docs/spec/error-codes.md index fd71100..de49ec2 100644 --- a/docs/spec/error-codes.md +++ b/docs/spec/error-codes.md @@ -38,5 +38,6 @@ The following error codes are defined and documented in [verification-library.md | `ATTESTATION_STALE` | | `CHAIN_BROKEN` | | `CLAIM_MALFORMED` | +| `EXTERNAL_EVIDENCE_VERIFICATION_FAILED` | > Note: `POLICY_HASH_MISMATCH`, `CATALOG_HASH_MISMATCH`, and `ATTESTATION_STALE` appear in both tables. The Runtime emits them during startup or request handling; the verification library emits them during offline or client-side verification. The semantics are consistent across both contexts. diff --git a/docs/spec/verification-library.md b/docs/spec/verification-library.md index 52b4c6a..4affc3a 100644 --- a/docs/spec/verification-library.md +++ b/docs/spec/verification-library.md @@ -63,6 +63,43 @@ def verify_trace_claim( ... ``` +### Audit Bundle Verification and External Execution Evidence + +```python +@dataclass +class AuditBundleResult: + verified: bool + entry_count: int + failures: list[str] + +def verify_audit_bundle( + bundle_json: dict, + claim_json: Optional[dict] = None, + *, + external_evidence_keys: Optional[dict[str, bytes]] = None, +) -> AuditBundleResult: + """ + Verify an exported audit bundle. When external_evidence_keys is supplied, + each key is issuer_key_id -> raw 32-byte Ed25519 public key. issuer_key_id + is lowercase hex SHA-256(public_key_bytes). + """ + ... +``` + +`external_execution_evidence.evidence_hash` is the digest of the detached evidence payload attested by the issuer, not the digest of the receipt envelope. For JSON evidence payloads, the hash pre-image is the UTF-8 bytes of the RFC 8785/JCS canonical JSON representation. For non-JSON evidence payloads, the pre-image is the exact byte string identified by the issuer's evidence format. The field value is `sha256:` or `sha384:`. + +Runtime ingestion convention: when an allowed upstream tool response is a JSON object with a top-level `external_execution_evidence` object matching the audit schema, cMCP copies that receipt into the `tool_call` audit entry. The response itself is not rewritten; `response_payload_hash` still covers the bytes returned to the caller. + +The verifier computes the receipt signing input as canonical JSON over the receipt object excluding `signature`, with sorted keys and compact separators. It then checks: + +1. `linked_call_id` equals the audit entry `call_id`. +2. `issuer_key_id` is lowercase hex SHA-256 of the trusted issuer public key. +3. `evidence_hash` has a supported hash prefix and hex digest. +4. `evidence_type` is one of the documented receipt types. +5. The Ed25519 signature verifies over the canonical receipt signing input. + +If any external evidence check fails, the audit bundle result is `verified=False` and the failure string includes `EXTERNAL_EVIDENCE_VERIFICATION_FAILED`. + ## Per-Provider Verification Steps ### TPM Verification @@ -119,6 +156,7 @@ VerificationError enum: - ATTESTATION_STALE: attestation_generated_at is older than max_attestation_age_seconds - CHAIN_BROKEN: audit_chain_root -> audit_chain_tip traversal fails (missing entries or hash mismatch) - CLAIM_MALFORMED: claim_json fails JSON Schema validation against the TRACE Claim schema +- EXTERNAL_EVIDENCE_VERIFICATION_FAILED: an audit bundle entry contains external_execution_evidence whose call binding, key id, evidence hash, evidence type, or issuer signature cannot be verified ## Phase 1 support matrix diff --git a/schemas/audit-entry.schema.json b/schemas/audit-entry.schema.json index 37b4651..d7312e9 100644 --- a/schemas/audit-entry.schema.json +++ b/schemas/audit-entry.schema.json @@ -108,6 +108,53 @@ ], "description": "Result of response inspection by the gateway; null for non-tool-call entries." }, + "external_execution_evidence": { + "type": ["object", "null"], + "additionalProperties": false, + "description": "Optional independent execution evidence bound to this call (issue #301). Distinct from response_payload_hash: response_payload_hash is what the gateway forwarded, this is what an independent authority (e.g. a safety controller) attested. Null when absent. Intentionally not in 'required' so entries that predate the field still validate.", + "required": [ + "issuer", + "issuer_key_id", + "signature", + "evidence_hash", + "evidence_type", + "linked_call_id" + ], + "properties": { + "issuer": { + "type": "string", + "description": "Identity (e.g. SPIFFE URI) of the authority that signed the receipt." + }, + "issuer_key_id": { + "type": "string", + "pattern": "^[0-9a-f]{64}$", + "description": "Lowercase hex SHA-256 digest of the raw Ed25519 issuer public key used to verify the signature." + }, + "signature": { + "type": "string", + "description": "base64url Ed25519 signature over the canonical receipt (all fields except signature)." + }, + "evidence_hash": { + "type": "string", + "pattern": "^sha(256|384):[0-9a-f]+", + "description": "sha256: of the attested execution evidence (e.g. the controller decision payload)." + }, + "evidence_type": { + "type": "string", + "enum": [ + "controller-execution-receipt/v1", + "tee-signed-receipt", + "controller-jwt", + "opaque-receipt" + ], + "description": "Receipt type and version, e.g. controller-execution-receipt/v1." + }, + "linked_call_id": { + "type": "string", + "description": "The call_id this receipt is bound to; a verifier checks it equals the entry call_id." + } + } + }, "session_sensitivity_before": { "type": ["string", "null"], "description": "Session sensitivity level before this entry was processed." @@ -125,4 +172,4 @@ "description": "SHA-256 hex of this entry's canonical JSON, excluding the entry_hash field itself." } } -} +} diff --git a/src/cmcp_runtime/audit/chain.py b/src/cmcp_runtime/audit/chain.py index 624e63d..51ed49f 100644 --- a/src/cmcp_runtime/audit/chain.py +++ b/src/cmcp_runtime/audit/chain.py @@ -61,6 +61,12 @@ class AuditEntry: detail: dict[str, str | int | float] | None # optional structured detail (e.g. suspicious_call_sequence) workflow_id: str | None prev_entry_hash: str # "genesis" for first entry + # #301: optional independent execution evidence, e.g. a controller-signed + # receipt of a physical outcome. Distinct from response_payload_hash: + # response_payload_hash is what the gateway forwarded, this is what an + # independent authority attested. Serialized uniformly (null when absent), + # so receipt-less entries remain deterministic and schema-stable. + external_execution_evidence: dict[str, str] | None = None entry_hash: str = field(default="") # computed after construction def _canonical_body(self) -> bytes: @@ -161,6 +167,7 @@ def append( session_sensitivity_after: str | None = None, detail: dict[str, str | int | float] | None = None, workflow_id: str | None = None, + external_execution_evidence: dict[str, str] | None = None, ) -> AuditEntry: prev_hash = self._entries[-1].entry_hash if self._entries else "genesis" now = datetime.now(tz=UTC) @@ -187,6 +194,7 @@ def append( session_sensitivity_after=session_sensitivity_after, detail=detail, workflow_id=workflow_id, + external_execution_evidence=external_execution_evidence, prev_entry_hash=prev_hash, ) entry.entry_hash = entry.compute_hash() diff --git a/src/cmcp_runtime/mcp/proxy.py b/src/cmcp_runtime/mcp/proxy.py index b6c5458..28cc5fe 100644 --- a/src/cmcp_runtime/mcp/proxy.py +++ b/src/cmcp_runtime/mcp/proxy.py @@ -35,6 +35,15 @@ logger = logging.getLogger(__name__) +_EXTERNAL_EVIDENCE_FIELDS: frozenset[str] = frozenset({ + "issuer", + "issuer_key_id", + "signature", + "evidence_hash", + "evidence_type", + "linked_call_id", +}) + @dataclass class CallResult: @@ -73,6 +82,36 @@ def _cedar_safe(value: Any) -> Any: return str(value) +def _extract_external_execution_evidence(response_text: str) -> dict[str, str] | None: + """Return a well-formed external execution receipt from a JSON response, if present.""" + try: + decoded = json.loads(response_text) + except json.JSONDecodeError: + return None + if not isinstance(decoded, dict): + return None + + receipt = decoded.get("external_execution_evidence") + if receipt is None: + return None + if not isinstance(receipt, dict): + logger.warning( + "EXTERNAL_EVIDENCE_IGNORED: external_execution_evidence is not an object" + ) + return None + if set(receipt) != _EXTERNAL_EVIDENCE_FIELDS: + logger.warning( + "EXTERNAL_EVIDENCE_IGNORED: external_execution_evidence fields mismatch" + ) + return None + if not all(isinstance(receipt[field], str) for field in _EXTERNAL_EVIDENCE_FIELDS): + logger.warning( + "EXTERNAL_EVIDENCE_IGNORED: external_execution_evidence values must be strings" + ) + return None + return {field: receipt[field] for field in sorted(_EXTERNAL_EVIDENCE_FIELDS)} + + class CMCPProxy: """ Wraps AGT's MCPGateway so every tool call is: @@ -813,6 +852,7 @@ async def call_tool( # egress check saw (post-scan, possibly sanitized) so a verifier can match # the audited response against what the caller actually received. response_payload_hash = f"sha256:{hashlib.sha256(response_bytes).hexdigest()}" + external_execution_evidence = _extract_external_execution_evidence(agt_result) # INJECT-003: include injection scanner and pattern in audit detail when detected injection_detail: dict[str, str | int | float] | None = ( { @@ -838,6 +878,7 @@ async def call_tool( session_sensitivity_after=self._session.max_sensitivity, workflow_id=workflow_id, detail=injection_detail, + external_execution_evidence=external_execution_evidence, ) # Step 6: call log record + suspicious-sequence check diff --git a/src/cmcp_verify/verify.py b/src/cmcp_verify/verify.py index f0c6c1f..0423a15 100644 --- a/src/cmcp_verify/verify.py +++ b/src/cmcp_verify/verify.py @@ -12,6 +12,7 @@ import hashlib import json import logging +import re from dataclasses import dataclass, field from datetime import UTC, datetime from enum import StrEnum @@ -26,6 +27,15 @@ logger = logging.getLogger(__name__) _SW_ONLY_FIRMWARE = "software-only-dev-mode" +_EXTERNAL_EVIDENCE_ERROR = "EXTERNAL_EVIDENCE_VERIFICATION_FAILED" +_EXTERNAL_EVIDENCE_HASH_RE = re.compile(r"^sha(256|384):[0-9a-f]+$") +_ISSUER_KEY_ID_RE = re.compile(r"^[0-9a-f]{64}$") +_EXTERNAL_EVIDENCE_TYPES = frozenset({ + "controller-execution-receipt/v1", + "tee-signed-receipt", + "controller-jwt", + "opaque-receipt", +}) _KNOWN_PLATFORMS = { "amd-sev-snp", @@ -261,9 +271,15 @@ class AuditBundleResult: failures: list[str] = field(default_factory=list) +def _external_evidence_failure(entry_index: int, reason: str) -> str: + return f"entry {entry_index}: {_EXTERNAL_EVIDENCE_ERROR}: {reason}" + + def verify_audit_bundle( bundle_json: dict[str, Any], claim_json: dict[str, Any] | None = None, + *, + external_evidence_keys: dict[str, bytes] | None = None, ) -> AuditBundleResult: """ Verify an exported audit bundle (GET /audit/export): @@ -273,6 +289,12 @@ def verify_audit_bundle( 2. If a claim is provided, cross-check the bundle's root/tip/length against gateway.audit_chain and verify the bundle_signature with the claim's confirmation key (trace.cnf.jwk.x). + 3. #301: if external_evidence_keys is provided (issuer_key_id -> raw Ed25519 + public key bytes), verify any external_execution_evidence receipt bound to an + entry: linked_call_id must equal the entry call_id, and the issuer + signature must verify over the canonical receipt (all fields except + signature). This is opt-in: receipt-less entries and callers that do not + supply keys are unaffected, so existing evidence keeps verifying. """ failures: list[str] = [] entries = bundle_json.get("entries", []) @@ -291,6 +313,83 @@ def verify_audit_bundle( failures.append(f"entry {i}: chain link broken") prev = entry.get("entry_hash", "") + # #301: verify independent execution receipts (opt-in via external_evidence_keys). + if external_evidence_keys is not None: + for i, entry in enumerate(entries): + ev = entry.get("external_execution_evidence") + if not ev: + continue + if not isinstance(ev, dict): + failures.append( + _external_evidence_failure(i, "external_execution_evidence is not an object") + ) + continue + if ev.get("linked_call_id") != entry.get("call_id"): + failures.append( + _external_evidence_failure( + i, + "external_execution_evidence linked_call_id does not match " + "the entry call_id", + ) + ) + key_id = ev.get("issuer_key_id", "") + if not isinstance(key_id, str) or not _ISSUER_KEY_ID_RE.match(key_id): + failures.append( + _external_evidence_failure( + i, + "issuer_key_id must be lowercase hex SHA-256 of the issuer public key", + ) + ) + continue + evidence_hash = ev.get("evidence_hash", "") + if not isinstance(evidence_hash, str) or not _EXTERNAL_EVIDENCE_HASH_RE.match(evidence_hash): + failures.append( + _external_evidence_failure( + i, "evidence_hash must be sha256: or sha384:" + ) + ) + continue + evidence_type = ev.get("evidence_type", "") + if evidence_type not in _EXTERNAL_EVIDENCE_TYPES: + failures.append( + _external_evidence_failure(i, f"unsupported evidence_type '{evidence_type}'") + ) + continue + pub_bytes = external_evidence_keys.get(key_id) + if not pub_bytes: + failures.append( + _external_evidence_failure( + i, f"no trusted key for external evidence issuer_key_id '{key_id}'" + ) + ) + continue + try: + if len(pub_bytes) != 32: + raise ValueError("trusted issuer key must be 32 raw Ed25519 public key bytes") + if hashlib.sha256(pub_bytes).hexdigest() != key_id: + raise ValueError("issuer_key_id does not match trusted issuer public key") + pub = Ed25519PublicKey.from_public_bytes(pub_bytes) + signing_input = json.dumps( + {k: v for k, v in ev.items() if k != "signature"}, + sort_keys=True, + separators=(",", ":"), + ensure_ascii=True, + ).encode() + sig_b64 = ev.get("signature", "") + pad = 4 - (len(sig_b64) % 4) + sig = base64.urlsafe_b64decode(sig_b64 + ("=" * pad if pad != 4 else "")) + pub.verify(sig, signing_input) + except InvalidSignature: + failures.append( + _external_evidence_failure(i, "external_execution_evidence signature is invalid") + ) + except Exception as exc: + failures.append( + _external_evidence_failure( + i, f"external_execution_evidence could not be verified: {exc}" + ) + ) + if claim_json is not None: chain = claim_json.get("gateway", {}).get("audit_chain", {}) if chain.get("root") != entries[0].get("entry_hash"): diff --git a/tests/conformance/test_audit_conformance.py b/tests/conformance/test_audit_conformance.py index 9f236c5..9618f81 100644 --- a/tests/conformance/test_audit_conformance.py +++ b/tests/conformance/test_audit_conformance.py @@ -4,6 +4,7 @@ """ from __future__ import annotations +import base64 import hashlib import json import logging @@ -12,6 +13,8 @@ from unittest.mock import patch import pytest +from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey +from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat from cmcp_runtime.audit.chain import AuditChain from cmcp_runtime.audit.keys import SigningKey @@ -23,6 +26,7 @@ ToolCatalogInfo, generate_trace_claim, ) +from cmcp_verify.verify import verify_audit_bundle # ---- helpers ---------------------------------------------------------------- @@ -315,3 +319,173 @@ def test_sequence_numbers_strictly_increasing(self): d = _claim_dict(chain, key, seq=s) assert d["gateway"]["sequence_number"] == s assert seqs == sorted(set(seqs)) + + +# ---- #301: independent execution evidence ----------------------------------- + + +def _ed25519_keypair() -> tuple[Ed25519PrivateKey, bytes, str]: + priv = Ed25519PrivateKey.generate() + pub = priv.public_key().public_bytes( + encoding=Encoding.Raw, format=PublicFormat.Raw + ) + return priv, pub, hashlib.sha256(pub).hexdigest() + + +def _signed_receipt( + priv: Ed25519PrivateKey, + call_id: str, + *, + issuer_key_id: str, +) -> dict[str, str]: + """Build a controller-signed execution receipt for a call.""" + receipt = { + "issuer": "spiffe://factory.example/controller/robot-cell-7", + "issuer_key_id": issuer_key_id, + "evidence_hash": "sha256:" + "ab" * 32, + "evidence_type": "controller-execution-receipt/v1", + "linked_call_id": call_id, + } + signing_input = json.dumps( + receipt, sort_keys=True, separators=(",", ":"), ensure_ascii=True + ).encode() + receipt["signature"] = ( + base64.urlsafe_b64encode(priv.sign(signing_input)).rstrip(b"=").decode() + ) + return receipt + + +def _bundle(chain: AuditChain) -> dict: + return {"entries": [asdict(e) for e in chain.entries]} + + +def _signed_bundle(chain: AuditChain, key: SigningKey) -> dict: + entries = [asdict(e) for e in chain.entries] + digest = hashlib.sha256( + json.dumps(entries, sort_keys=True, separators=(",", ":"), ensure_ascii=True).encode() + ).digest() + return { + "session_id": chain._session_id, + "entries": entries, + "bundle_signature": base64.urlsafe_b64encode(key.sign(digest)).rstrip(b"=").decode(), + } + + +class TestExternalExecutionEvidence301: + """#301: optional external_execution_evidence bound to an audit entry.""" + + def test_absent_receipt_verifies_with_null_evidence_field(self): + # Receipt-less entries serialize and hash deterministically, and a + # bundle with no receipts verifies without any keys configured. + chain = AuditChain("sess-301-a") + entry = chain.append("tool_call", call_id="c1", tool_name="t", policy_decision="allow") + d = asdict(entry) + d.pop("entry_hash") + expected = hashlib.sha256( + json.dumps(d, sort_keys=True, separators=(",", ":"), ensure_ascii=True).encode() + ).hexdigest() + assert entry.entry_hash == expected + assert verify_audit_bundle(_bundle(chain)).verified + + def test_populated_receipt_verifies(self): + priv, pub, key_id = _ed25519_keypair() + chain = AuditChain("sess-301-b") + chain.append( + "tool_call", + call_id="c1", + tool_name="robot.request_motion", + policy_decision="allow", + external_execution_evidence=_signed_receipt(priv, "c1", issuer_key_id=key_id), + ) + result = verify_audit_bundle( + _bundle(chain), external_evidence_keys={key_id: pub} + ) + assert result.verified, result.failures + + def test_exported_bundle_with_receipt_verifies_end_to_end(self): + priv, pub, key_id = _ed25519_keypair() + signing_key = SigningKey() + chain = AuditChain("sess-301-export") + chain.append( + "tool_call", + call_id="c1", + tool_name="robot.request_motion", + policy_decision="allow", + external_execution_evidence=_signed_receipt(priv, "c1", issuer_key_id=key_id), + ) + result = verify_audit_bundle( + _signed_bundle(chain, signing_key), + _claim_dict(chain, signing_key), + external_evidence_keys={key_id: pub}, + ) + assert result.verified, result.failures + + def test_tampered_receipt_fails(self): + priv, pub, key_id = _ed25519_keypair() + receipt = _signed_receipt(priv, "c1", issuer_key_id=key_id) + receipt["evidence_hash"] = "sha256:" + "cd" * 32 # tamper after signing + chain = AuditChain("sess-301-c") + chain.append( + "tool_call", + call_id="c1", + tool_name="robot.request_motion", + policy_decision="allow", + external_execution_evidence=receipt, + ) + # The chain hash still matches (append sealed the tampered receipt), so + # only the receipt signature check fails. + result = verify_audit_bundle( + _bundle(chain), external_evidence_keys={key_id: pub} + ) + assert not result.verified + assert any("signature is invalid" in f for f in result.failures) + + def test_linked_call_id_mismatch_fails(self): + priv, pub, key_id = _ed25519_keypair() + chain = AuditChain("sess-301-d") + chain.append( + "tool_call", + call_id="c1", + tool_name="t", + policy_decision="allow", + external_execution_evidence=_signed_receipt( + priv, "a-different-call", issuer_key_id=key_id + ), + ) + result = verify_audit_bundle( + _bundle(chain), external_evidence_keys={key_id: pub} + ) + assert not result.verified + assert any("linked_call_id" in f for f in result.failures) + + def test_unknown_issuer_key_fails(self): + priv, _, key_id = _ed25519_keypair() + chain = AuditChain("sess-301-e") + chain.append( + "tool_call", + call_id="c1", + tool_name="t", + policy_decision="allow", + external_execution_evidence=_signed_receipt(priv, "c1", issuer_key_id=key_id), + ) + result = verify_audit_bundle( + _bundle(chain), external_evidence_keys={"0" * 64: b"\x00" * 32} + ) + assert not result.verified + assert any("no trusted key" in f for f in result.failures) + + def test_key_id_mismatch_fails(self): + priv, pub, key_id = _ed25519_keypair() + chain = AuditChain("sess-301-f") + chain.append( + "tool_call", + call_id="c1", + tool_name="t", + policy_decision="allow", + external_execution_evidence=_signed_receipt(priv, "c1", issuer_key_id=key_id), + ) + result = verify_audit_bundle( + _bundle(chain), external_evidence_keys={key_id: b"\x00" * len(pub)} + ) + assert not result.verified + assert any("issuer_key_id does not match" in f for f in result.failures) diff --git a/tests/unit/test_mcp_proxy.py b/tests/unit/test_mcp_proxy.py index 697f015..91e14a6 100644 --- a/tests/unit/test_mcp_proxy.py +++ b/tests/unit/test_mcp_proxy.py @@ -2,6 +2,7 @@ from __future__ import annotations +import json from unittest.mock import MagicMock, patch import pytest @@ -323,6 +324,53 @@ async def test_audit_response_payload_hash_on_success(): assert entry.response_payload_hash == _sha256_of_text("upstream says hi") +@pytest.mark.asyncio +async def test_audit_binds_external_execution_evidence_from_json_response(): + """#301 follow-up - a well-formed upstream receipt is copied into the audit entry.""" + receipt = { + "issuer": "spiffe://factory.example/controller/robot-cell-7", + "issuer_key_id": "a" * 64, + "signature": "sig", + "evidence_hash": "sha256:" + "b" * 64, + "evidence_type": "controller-execution-receipt/v1", + "linked_call_id": "c1", + } + proxy, _, chain = _make_proxy() + wire_mock_gateway( + proxy, + response_text=json.dumps( + { + "controller_decision": "rejected", + "reason": "human_detected", + "external_execution_evidence": receipt, + } + ), + ) + result = await proxy.call_tool("c1", "test.tool", {}) + assert result.allowed is True + entry = next(e for e in reversed(chain.entries) if e.entry_type == "tool_call") + assert entry.external_execution_evidence == receipt + + +@pytest.mark.asyncio +async def test_audit_ignores_malformed_external_execution_evidence(): + """Malformed receipt-looking response fields are not bound as audit evidence.""" + proxy, _, chain = _make_proxy() + wire_mock_gateway( + proxy, + response_text=json.dumps({ + "external_execution_evidence": { + "issuer": "spiffe://factory.example/controller/robot-cell-7", + "linked_call_id": "c1", + } + }), + ) + result = await proxy.call_tool("c1", "test.tool", {}) + assert result.allowed is True + entry = next(e for e in reversed(chain.entries) if e.entry_type == "tool_call") + assert entry.external_execution_evidence is None + + @pytest.mark.asyncio async def test_audit_response_payload_hash_uses_sanitized_content(): """#293 - when the scanner sanitizes the response, the audit hash must cover