Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions LIMITATIONS.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,13 @@ The TEE-sealed signing key is generated inside the enclave and cannot be extract
**Phase 2 completeness: server-side attestation**
Phase 1 attests the gateway boundary. It does not attest what happens on the other side of that boundary. The `tool_transcript.hash` field in the TRACE Claim records a hash of the audit chain tip, but the tool transcript binding that ties a specific tool execution to a specific response is Phase 2 work. Phase 1 partially addresses P1.4 (transitive trust into upstream dependencies) and P4.1 (typosquatted packages added to catalog) -- both are fully closed by Phase 2. Any compliance claim that relies on server-side proof must wait for Phase 2.

**External execution evidence (issue #301)**
An audit entry may carry an optional `external_execution_evidence` receipt: a signature from an independent authority (for example a safety controller) attesting to an outcome, bound to a specific `call_id`. This is deliberately distinct from `response_payload_hash`, which records what the gateway forwarded. The receipt establishes that the named issuer signed an assertion about that call. It does not establish that a physical action occurred, that it was safe, or that it meets any functional-safety standard, and it is only as trustworthy as the issuer key behind it. cMCP does not observe the actuation; it records the receipt and, when a verifier is configured with the issuer trusted key, checks the signature and the `call_id` binding. Trust in the issuer key is an out-of-band PKI concern, the same shape as the manifest issuer trust anchor in issue #302. Verification is opt-in: receipt-less entries, and verifiers that do not configure issuer keys, are unaffected.

In the proxy path, cMCP binds the receipt when an allowed upstream tool response is a JSON object with a top-level `external_execution_evidence` object matching the audit schema. The full response, including that receipt if present, remains covered by `response_payload_hash`.

The TRACE Claim does not carry a separate "external evidence present" flag. Verifiers learn that external evidence was bound by fetching the committed audit bundle and checking entries under the TRACE Claim's `gateway.audit_chain.tip`.

**LLM inference and model output**
cMCP intercepts tool calls at the MCP protocol boundary. It does not observe or modify LLM inference, the contents of the agent's context window, or model outputs that do not produce a tool call. A model could hallucinate a response, leak sensitive context in a chat reply, or receive a poisoned tool response that influences subsequent reasoning -- none of these are visible to the gateway. cMCP controls the tool boundary, not the model boundary.

Expand Down
1 change: 1 addition & 0 deletions docs/spec/error-codes.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,5 +38,6 @@ The following error codes are defined and documented in [verification-library.md
| `ATTESTATION_STALE` |
| `CHAIN_BROKEN` |
| `CLAIM_MALFORMED` |
| `EXTERNAL_EVIDENCE_VERIFICATION_FAILED` |

> Note: `POLICY_HASH_MISMATCH`, `CATALOG_HASH_MISMATCH`, and `ATTESTATION_STALE` appear in both tables. The Runtime emits them during startup or request handling; the verification library emits them during offline or client-side verification. The semantics are consistent across both contexts.
38 changes: 38 additions & 0 deletions docs/spec/verification-library.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,43 @@ def verify_trace_claim(
...
```

### Audit Bundle Verification and External Execution Evidence

```python
@dataclass
class AuditBundleResult:
verified: bool
entry_count: int
failures: list[str]

def verify_audit_bundle(
bundle_json: dict,
claim_json: Optional[dict] = None,
*,
external_evidence_keys: Optional[dict[str, bytes]] = None,
) -> AuditBundleResult:
"""
Verify an exported audit bundle. When external_evidence_keys is supplied,
each key is issuer_key_id -> raw 32-byte Ed25519 public key. issuer_key_id
is lowercase hex SHA-256(public_key_bytes).
"""
...
```

`external_execution_evidence.evidence_hash` is the digest of the detached evidence payload attested by the issuer, not the digest of the receipt envelope. For JSON evidence payloads, the hash pre-image is the UTF-8 bytes of the RFC 8785/JCS canonical JSON representation. For non-JSON evidence payloads, the pre-image is the exact byte string identified by the issuer's evidence format. The field value is `sha256:<hex>` or `sha384:<hex>`.

Runtime ingestion convention: when an allowed upstream tool response is a JSON object with a top-level `external_execution_evidence` object matching the audit schema, cMCP copies that receipt into the `tool_call` audit entry. The response itself is not rewritten; `response_payload_hash` still covers the bytes returned to the caller.

The verifier computes the receipt signing input as canonical JSON over the receipt object excluding `signature`, with sorted keys and compact separators. It then checks:

1. `linked_call_id` equals the audit entry `call_id`.
2. `issuer_key_id` is lowercase hex SHA-256 of the trusted issuer public key.
3. `evidence_hash` has a supported hash prefix and hex digest.
4. `evidence_type` is one of the documented receipt types.
5. The Ed25519 signature verifies over the canonical receipt signing input.

If any external evidence check fails, the audit bundle result is `verified=False` and the failure string includes `EXTERNAL_EVIDENCE_VERIFICATION_FAILED`.

## Per-Provider Verification Steps

### TPM Verification
Expand Down Expand Up @@ -119,6 +156,7 @@ VerificationError enum:
- ATTESTATION_STALE: attestation_generated_at is older than max_attestation_age_seconds
- CHAIN_BROKEN: audit_chain_root -> audit_chain_tip traversal fails (missing entries or hash mismatch)
- CLAIM_MALFORMED: claim_json fails JSON Schema validation against the TRACE Claim schema
- EXTERNAL_EVIDENCE_VERIFICATION_FAILED: an audit bundle entry contains external_execution_evidence whose call binding, key id, evidence hash, evidence type, or issuer signature cannot be verified

## Phase 1 support matrix

Expand Down
49 changes: 48 additions & 1 deletion schemas/audit-entry.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,53 @@
],
"description": "Result of response inspection by the gateway; null for non-tool-call entries."
},
"external_execution_evidence": {
"type": ["object", "null"],
"additionalProperties": false,
"description": "Optional independent execution evidence bound to this call (issue #301). Distinct from response_payload_hash: response_payload_hash is what the gateway forwarded, this is what an independent authority (e.g. a safety controller) attested. Null when absent. Intentionally not in 'required' so entries that predate the field still validate.",
"required": [
"issuer",
"issuer_key_id",
"signature",
"evidence_hash",
"evidence_type",
"linked_call_id"
],
"properties": {
"issuer": {
"type": "string",
"description": "Identity (e.g. SPIFFE URI) of the authority that signed the receipt."
},
"issuer_key_id": {
"type": "string",
"pattern": "^[0-9a-f]{64}$",
"description": "Lowercase hex SHA-256 digest of the raw Ed25519 issuer public key used to verify the signature."
},
"signature": {
"type": "string",
"description": "base64url Ed25519 signature over the canonical receipt (all fields except signature)."
},
"evidence_hash": {
"type": "string",
"pattern": "^sha(256|384):[0-9a-f]+",
"description": "sha256:<hex> of the attested execution evidence (e.g. the controller decision payload)."
},
"evidence_type": {
"type": "string",
"enum": [
"controller-execution-receipt/v1",
"tee-signed-receipt",
"controller-jwt",
"opaque-receipt"
],
"description": "Receipt type and version, e.g. controller-execution-receipt/v1."
},
"linked_call_id": {
"type": "string",
"description": "The call_id this receipt is bound to; a verifier checks it equals the entry call_id."
}
}
},
"session_sensitivity_before": {
"type": ["string", "null"],
"description": "Session sensitivity level before this entry was processed."
Expand All @@ -125,4 +172,4 @@
"description": "SHA-256 hex of this entry's canonical JSON, excluding the entry_hash field itself."
}
}
}
}
8 changes: 8 additions & 0 deletions src/cmcp_runtime/audit/chain.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,12 @@ class AuditEntry:
detail: dict[str, str | int | float] | None # optional structured detail (e.g. suspicious_call_sequence)
workflow_id: str | None
prev_entry_hash: str # "genesis" for first entry
# #301: optional independent execution evidence, e.g. a controller-signed
# receipt of a physical outcome. Distinct from response_payload_hash:
# response_payload_hash is what the gateway forwarded, this is what an
# independent authority attested. Serialized uniformly (null when absent),
# so receipt-less entries remain deterministic and schema-stable.
external_execution_evidence: dict[str, str] | None = None
entry_hash: str = field(default="") # computed after construction

def _canonical_body(self) -> bytes:
Expand Down Expand Up @@ -161,6 +167,7 @@ def append(
session_sensitivity_after: str | None = None,
detail: dict[str, str | int | float] | None = None,
workflow_id: str | None = None,
external_execution_evidence: dict[str, str] | None = None,
) -> AuditEntry:
prev_hash = self._entries[-1].entry_hash if self._entries else "genesis"
now = datetime.now(tz=UTC)
Expand All @@ -187,6 +194,7 @@ def append(
session_sensitivity_after=session_sensitivity_after,
detail=detail,
workflow_id=workflow_id,
external_execution_evidence=external_execution_evidence,
prev_entry_hash=prev_hash,
)
entry.entry_hash = entry.compute_hash()
Expand Down
41 changes: 41 additions & 0 deletions src/cmcp_runtime/mcp/proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,15 @@

logger = logging.getLogger(__name__)

_EXTERNAL_EVIDENCE_FIELDS: frozenset[str] = frozenset({
"issuer",
"issuer_key_id",
"signature",
"evidence_hash",
"evidence_type",
"linked_call_id",
})


@dataclass
class CallResult:
Expand Down Expand Up @@ -73,6 +82,36 @@ def _cedar_safe(value: Any) -> Any:
return str(value)


def _extract_external_execution_evidence(response_text: str) -> dict[str, str] | None:
"""Return a well-formed external execution receipt from a JSON response, if present."""
try:
decoded = json.loads(response_text)
except json.JSONDecodeError:
return None
if not isinstance(decoded, dict):
return None

receipt = decoded.get("external_execution_evidence")
if receipt is None:
return None
if not isinstance(receipt, dict):
logger.warning(
"EXTERNAL_EVIDENCE_IGNORED: external_execution_evidence is not an object"
)
return None
if set(receipt) != _EXTERNAL_EVIDENCE_FIELDS:
logger.warning(
"EXTERNAL_EVIDENCE_IGNORED: external_execution_evidence fields mismatch"
)
return None
if not all(isinstance(receipt[field], str) for field in _EXTERNAL_EVIDENCE_FIELDS):
logger.warning(
"EXTERNAL_EVIDENCE_IGNORED: external_execution_evidence values must be strings"
)
return None
return {field: receipt[field] for field in sorted(_EXTERNAL_EVIDENCE_FIELDS)}


class CMCPProxy:
"""
Wraps AGT's MCPGateway so every tool call is:
Expand Down Expand Up @@ -813,6 +852,7 @@ async def call_tool(
# egress check saw (post-scan, possibly sanitized) so a verifier can match
# the audited response against what the caller actually received.
response_payload_hash = f"sha256:{hashlib.sha256(response_bytes).hexdigest()}"
external_execution_evidence = _extract_external_execution_evidence(agt_result)
# INJECT-003: include injection scanner and pattern in audit detail when detected
injection_detail: dict[str, str | int | float] | None = (
{
Expand All @@ -838,6 +878,7 @@ async def call_tool(
session_sensitivity_after=self._session.max_sensitivity,
workflow_id=workflow_id,
detail=injection_detail,
external_execution_evidence=external_execution_evidence,
)

# Step 6: call log record + suspicious-sequence check
Expand Down
99 changes: 99 additions & 0 deletions src/cmcp_verify/verify.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import hashlib
import json
import logging
import re
from dataclasses import dataclass, field
from datetime import UTC, datetime
from enum import StrEnum
Expand All @@ -26,6 +27,15 @@
logger = logging.getLogger(__name__)

_SW_ONLY_FIRMWARE = "software-only-dev-mode"
_EXTERNAL_EVIDENCE_ERROR = "EXTERNAL_EVIDENCE_VERIFICATION_FAILED"
_EXTERNAL_EVIDENCE_HASH_RE = re.compile(r"^sha(256|384):[0-9a-f]+$")
_ISSUER_KEY_ID_RE = re.compile(r"^[0-9a-f]{64}$")
_EXTERNAL_EVIDENCE_TYPES = frozenset({
"controller-execution-receipt/v1",
"tee-signed-receipt",
"controller-jwt",
"opaque-receipt",
})

_KNOWN_PLATFORMS = {
"amd-sev-snp",
Expand Down Expand Up @@ -261,9 +271,15 @@ class AuditBundleResult:
failures: list[str] = field(default_factory=list)


def _external_evidence_failure(entry_index: int, reason: str) -> str:
return f"entry {entry_index}: {_EXTERNAL_EVIDENCE_ERROR}: {reason}"


def verify_audit_bundle(
bundle_json: dict[str, Any],
claim_json: dict[str, Any] | None = None,
*,
external_evidence_keys: dict[str, bytes] | None = None,
) -> AuditBundleResult:
"""
Verify an exported audit bundle (GET /audit/export):
Expand All @@ -273,6 +289,12 @@ def verify_audit_bundle(
2. If a claim is provided, cross-check the bundle's root/tip/length
against gateway.audit_chain and verify the bundle_signature with the
claim's confirmation key (trace.cnf.jwk.x).
3. #301: if external_evidence_keys is provided (issuer_key_id -> raw Ed25519
public key bytes), verify any external_execution_evidence receipt bound to an
entry: linked_call_id must equal the entry call_id, and the issuer
signature must verify over the canonical receipt (all fields except
signature). This is opt-in: receipt-less entries and callers that do not
supply keys are unaffected, so existing evidence keeps verifying.
"""
failures: list[str] = []
entries = bundle_json.get("entries", [])
Expand All @@ -291,6 +313,83 @@ def verify_audit_bundle(
failures.append(f"entry {i}: chain link broken")
prev = entry.get("entry_hash", "")

# #301: verify independent execution receipts (opt-in via external_evidence_keys).
if external_evidence_keys is not None:
for i, entry in enumerate(entries):
ev = entry.get("external_execution_evidence")
if not ev:
continue
if not isinstance(ev, dict):
failures.append(
_external_evidence_failure(i, "external_execution_evidence is not an object")
)
continue
if ev.get("linked_call_id") != entry.get("call_id"):
failures.append(
_external_evidence_failure(
i,
"external_execution_evidence linked_call_id does not match "
"the entry call_id",
)
)
key_id = ev.get("issuer_key_id", "")
if not isinstance(key_id, str) or not _ISSUER_KEY_ID_RE.match(key_id):
failures.append(
_external_evidence_failure(
i,
"issuer_key_id must be lowercase hex SHA-256 of the issuer public key",
)
)
continue
evidence_hash = ev.get("evidence_hash", "")
if not isinstance(evidence_hash, str) or not _EXTERNAL_EVIDENCE_HASH_RE.match(evidence_hash):
failures.append(
_external_evidence_failure(
i, "evidence_hash must be sha256:<hex> or sha384:<hex>"
)
)
continue
evidence_type = ev.get("evidence_type", "")
if evidence_type not in _EXTERNAL_EVIDENCE_TYPES:
failures.append(
_external_evidence_failure(i, f"unsupported evidence_type '{evidence_type}'")
)
continue
pub_bytes = external_evidence_keys.get(key_id)
if not pub_bytes:
failures.append(
_external_evidence_failure(
i, f"no trusted key for external evidence issuer_key_id '{key_id}'"
)
)
continue
try:
if len(pub_bytes) != 32:
raise ValueError("trusted issuer key must be 32 raw Ed25519 public key bytes")
if hashlib.sha256(pub_bytes).hexdigest() != key_id:
raise ValueError("issuer_key_id does not match trusted issuer public key")
pub = Ed25519PublicKey.from_public_bytes(pub_bytes)
signing_input = json.dumps(
{k: v for k, v in ev.items() if k != "signature"},
sort_keys=True,
separators=(",", ":"),
ensure_ascii=True,
).encode()
sig_b64 = ev.get("signature", "")
pad = 4 - (len(sig_b64) % 4)
sig = base64.urlsafe_b64decode(sig_b64 + ("=" * pad if pad != 4 else ""))
pub.verify(sig, signing_input)
except InvalidSignature:
failures.append(
_external_evidence_failure(i, "external_execution_evidence signature is invalid")
)
except Exception as exc:
failures.append(
_external_evidence_failure(
i, f"external_execution_evidence could not be verified: {exc}"
)
)

if claim_json is not None:
chain = claim_json.get("gateway", {}).get("audit_chain", {})
if chain.get("root") != entries[0].get("entry_hash"):
Expand Down
Loading