Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions LIMITATIONS.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,14 @@ The TEE-sealed signing key is generated inside the enclave and cannot be extract
**Phase 2 completeness: server-side attestation**
Phase 1 attests the gateway boundary. It does not attest what happens on the other side of that boundary. The `tool_transcript.hash` field in the TRACE Claim records a hash of the audit chain tip, but the tool transcript binding that ties a specific tool execution to a specific response is Phase 2 work. Phase 1 partially addresses P1.4 (transitive trust into upstream dependencies) and P4.1 (typosquatted packages added to catalog) -- both are fully closed by Phase 2. Any compliance claim that relies on server-side proof must wait for Phase 2.

**Tool server non-repudiation**
The audit chain records a `response_payload_hash` for each tool call and an `evidence_class` that indicates the assurance level of the recorded response:

- **`tls-pinned`**: The tool server URL uses HTTPS and has a non-placeholder TLS certificate fingerprint in the catalog. The response was received over a TLS connection whose certificate was pinned at catalog-load time. A verifier can confirm the server identity against the catalog fingerprint.
- **`hash-only`**: The tool server uses HTTP, has no TLS fingerprint assigned in the catalog (dev placeholder), or TLS pinning could not be enforced on the current platform. The hash proves what the gateway received, but the server identity cannot be independently verified from the audit record alone.

Tool servers do not sign their individual responses. A `tls-pinned` entry proves the response came from a server holding the catalog-pinned certificate but does not prevent the server itself from later denying it produced a specific response. For strong non-repudiation, configure non-placeholder TLS fingerprints for all upstream servers so all evidence is `tls-pinned`, and treat the TEE attestation as the binding authority for what the gateway recorded.

**LLM inference and model output**
cMCP intercepts tool calls at the MCP protocol boundary. It does not observe or modify LLM inference, the contents of the agent's context window, or model outputs that do not produce a tool call. A model could hallucinate a response, leak sensitive context in a chat reply, or receive a poisoned tool response that influences subsequent reasoning -- none of these are visible to the gateway. cMCP controls the tool boundary, not the model boundary.

Expand Down
3 changes: 3 additions & 0 deletions src/cmcp_runtime/audit/chain.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ class AuditEntry:
detail: dict[str, str | int | float] | None # optional structured detail (e.g. suspicious_call_sequence)
workflow_id: str | None
prev_entry_hash: str # "genesis" for first entry
evidence_class: str = field(default="hash-only") # "tls-pinned" when server TLS cert pin is enforced
entry_hash: str = field(default="") # computed after construction

def _canonical_body(self) -> bytes:
Expand Down Expand Up @@ -156,6 +157,7 @@ def append(
latency_us: int | None = None,
request_payload_hash: str | None = None,
response_payload_hash: str | None = None,
evidence_class: str = "hash-only",
response_inspection_result: InspectionResult | None = None,
session_sensitivity_before: str | None = None,
session_sensitivity_after: str | None = None,
Expand Down Expand Up @@ -183,6 +185,7 @@ def append(
request_payload_hash=request_payload_hash,
response_payload_hash=response_payload_hash,
response_inspection_result=response_inspection_result,
evidence_class=evidence_class,
session_sensitivity_before=session_sensitivity_before,
session_sensitivity_after=session_sensitivity_after,
detail=detail,
Expand Down
12 changes: 12 additions & 0 deletions src/cmcp_runtime/mcp/proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -813,6 +813,17 @@ async def call_tool(
# egress check saw (post-scan, possibly sanitized) so a verifier can match
# the audited response against what the caller actually received.
response_payload_hash = f"sha256:{hashlib.sha256(response_bytes).hexdigest()}"
# Evidence class: tls-pinned when the upstream server has a real cert pin in the catalog.
from cmcp_runtime.mcp import tls_pinning as _tls_mod
_fp = entry.server.tls_fingerprint if entry else ""
evidence_class = (
"tls-pinned"
if entry
and entry.server.url.startswith("https://")
and _fp
and _fp != _tls_mod.PLACEHOLDER_FINGERPRINT
else "hash-only"
)
# INJECT-003: include injection scanner and pattern in audit detail when detected
injection_detail: dict[str, str | int | float] | None = (
{
Expand All @@ -834,6 +845,7 @@ async def call_tool(
latency_us=latency_us,
request_payload_hash=request_payload_hash,
response_payload_hash=response_payload_hash,
evidence_class=evidence_class,
session_sensitivity_before=sensitivity_before,
session_sensitivity_after=self._session.max_sensitivity,
workflow_id=workflow_id,
Expand Down
26 changes: 20 additions & 6 deletions src/cmcp_runtime/startup.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,12 @@

from __future__ import annotations

import base64
import hashlib
import json
import logging
import os
import secrets
import sys
from dataclasses import dataclass
from typing import Any
Expand Down Expand Up @@ -54,6 +58,16 @@ class RuntimeContext:
nras_appraisal: AppraisalResult | None = None


def _jwk_thumbprint_sha256(x_b64url: str) -> bytes:
"""RFC 7638 §3 JWK Thumbprint — SHA-256(UTF-8(JSON of sorted required OKP members))."""
canonical = json.dumps(
{"crv": "Ed25519", "kty": "OKP", "x": x_b64url},
separators=(",", ":"),
sort_keys=True,
).encode()
return hashlib.sha256(canonical).digest()


def _fatal(code: str, message: str, **fields: Any) -> None:
"""Log a FATAL structured entry and exit with code 1."""
entry = {
Expand Down Expand Up @@ -101,14 +115,14 @@ def run_startup(config_path: str) -> RuntimeContext:
signing_key = SigningKey()
logger.info("Signing key generated: %s...", signing_key.public_key_hex[:16])

# CRYPTO-001 + CRYPTO-002: the first 32 bytes of the nonce are SHA-256(public_key_bytes)
# so verifiers can re-derive the fingerprint from the public key in cnf.jwk and confirm
# it matches report_data[:32] -- binding the attestation report to this specific keypair.
# CRYPTO-001 + CRYPTO-002: the first 32 bytes of the nonce are the RFC 7638 JWK Thumbprint
# (SHA-256 of the sorted JSON OKP key members) so verifiers can re-derive the fingerprint
# from cnf.jwk and confirm it matches report_data[:32] -- binding the attestation report
# to this specific keypair.
# The remaining 32 bytes are a random salt so two gateways with different random bytes
# produce different nonces even if they share the same keypair (blue-green deploy).
import hashlib
import secrets
key_fingerprint = hashlib.sha256(signing_key.public_key_bytes).digest()
_x_b64 = base64.urlsafe_b64encode(signing_key.public_key_bytes).rstrip(b"=").decode()
key_fingerprint = _jwk_thumbprint_sha256(_x_b64)
random_salt = secrets.token_bytes(32)
nonce = key_fingerprint + random_salt
try:
Expand Down
24 changes: 18 additions & 6 deletions src/cmcp_verify/verify.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,17 @@

logger = logging.getLogger(__name__)


def _jwk_thumbprint_sha256(x_b64url: str) -> bytes:
"""RFC 7638 §3 JWK Thumbprint — SHA-256(UTF-8(JSON of sorted required OKP members))."""
canonical = json.dumps(
{"crv": "Ed25519", "kty": "OKP", "x": x_b64url},
separators=(",", ":"),
sort_keys=True,
).encode()
return hashlib.sha256(canonical).digest()


_SW_ONLY_FIRMWARE = "software-only-dev-mode"

_KNOWN_PLATFORMS = {
Expand Down Expand Up @@ -147,12 +158,13 @@ def _verify_key_binding(
"""
CRYPTO-001: verify that cnf.jwk public key fingerprint matches report_data[:32].

The gateway embeds SHA-256(public_key_bytes) as the first 32 bytes of the nonce
it submits to the TEE when requesting the attestation report. The TEE hardware
The gateway embeds the RFC 7638 JWK Thumbprint (SHA-256 of the JSON representation
of required OKP key members, sorted lexicographically) as the first 32 bytes of the
nonce it submits to the TEE when requesting the attestation report. The TEE hardware
commits that nonce into the signed report_data field. The nonce is stored as
trace.runtime.nonce (base64url of the full 64-byte value).

Verifiers re-derive SHA-256(cnf.jwk.x public key bytes) and compare it against
Verifiers re-derive the RFC 7638 JWK Thumbprint from cnf.jwk.x and compare it against
nonce[:32]. A mismatch means the public key was substituted after attestation;
the claim must be rejected with PUBLIC_KEY_NOT_BOUND.

Expand All @@ -176,12 +188,12 @@ def _verify_key_binding(
try:
padding = 4 - (len(x_b64) % 4)
padded = x_b64 + ("=" * padding if padding != 4 else "")
pub_key_bytes = base64.urlsafe_b64decode(padded)
base64.urlsafe_b64decode(padded) # validate encoding; bytes not needed
except Exception as exc:
return False, f"cannot decode trace.cnf.jwk.x: {exc}"

# Compute SHA-256(public_key_bytes) -- the expected fingerprint
expected_fingerprint = hashlib.sha256(pub_key_bytes).digest()
# Compute RFC 7638 JWK Thumbprint -- the expected fingerprint
expected_fingerprint = _jwk_thumbprint_sha256(x_b64)

# Extract the nonce from trace.runtime.nonce (base64url, first 32 bytes = fingerprint)
nonce_b64 = claim.get("trace", {}).get("runtime", {}).get("nonce", "")
Expand Down
22 changes: 16 additions & 6 deletions tests/unit/test_verify.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,16 @@
def _make_nonce_for_key(key: SigningKey) -> str:
"""Build a report_data hex string matching the CRYPTO-001 format.

First 32 bytes: SHA-256(public_key_bytes) -- verifiable key fingerprint.
First 32 bytes: RFC 7638 JWK Thumbprint (SHA-256 of sorted OKP members) -- verifiable key fingerprint.
Next 32 bytes: random salt -- session uniqueness (CRYPTO-002).
"""
fingerprint = hashlib.sha256(key.public_key_bytes).digest()
x_b64 = base64.urlsafe_b64encode(key.public_key_bytes).rstrip(b"=").decode()
jwk_json = json.dumps(
{"crv": "Ed25519", "kty": "OKP", "x": x_b64},
separators=(",", ":"),
sort_keys=True,
).encode()
fingerprint = hashlib.sha256(jwk_json).digest()
salt = secrets.token_bytes(32)
return (fingerprint + salt).hex()

Expand Down Expand Up @@ -233,9 +239,7 @@ def test_tee_key_binding_happy_path():
"""CRYPTO-001 -- valid key with correct fingerprint in nonce passes binding check."""
key = SigningKey()
chain = AuditChain("test-session")
fingerprint = hashlib.sha256(key.public_key_bytes).digest()
salt = secrets.token_bytes(32)
report_data = (fingerprint + salt).hex()
report_data = _make_nonce_for_key(key)

claim = generate_trace_claim(
session_id="test-session",
Expand Down Expand Up @@ -293,7 +297,13 @@ def test_tee_key_binding_attack_path_mismatched_fingerprint():
attacker_key = SigningKey()

chain = AuditChain("test-session")
gateway_fingerprint = hashlib.sha256(gateway_key.public_key_bytes).digest()
_gw_x_b64 = base64.urlsafe_b64encode(gateway_key.public_key_bytes).rstrip(b"=").decode()
_gw_jwk_json = json.dumps(
{"crv": "Ed25519", "kty": "OKP", "x": _gw_x_b64},
separators=(",", ":"),
sort_keys=True,
).encode()
gateway_fingerprint = hashlib.sha256(_gw_jwk_json).digest()
salt = secrets.token_bytes(32)
report_data = (gateway_fingerprint + salt).hex()

Expand Down