diff --git a/LIMITATIONS.md b/LIMITATIONS.md index a897115..708100c 100644 --- a/LIMITATIONS.md +++ b/LIMITATIONS.md @@ -24,6 +24,9 @@ The audit chain records a `response_payload_hash` for each tool call and an `evi Tool servers do not sign their individual responses. A `tls-pinned` entry proves the response came from a server holding the catalog-pinned certificate but does not prevent the server itself from later denying it produced a specific response. For strong non-repudiation, configure non-placeholder TLS fingerprints for all upstream servers so all evidence is `tls-pinned`, and treat the TEE attestation as the binding authority for what the gateway recorded. +**Agent Manifest identity binding (issue #302)** +When configured, cMCP verifies a signed Agent Manifest against a trusted issuer key, checks that the authenticated agent subject equals `manifest.agent_id`, and requires the manifest's policy and catalog hashes to match the loaded runtime hashes. The Trust Record carries this as `gateway.agent_identity`. This proves that the session was bound to the reviewed manifest identity and approved hashes. It does not prove that the agent behaved correctly, that the model output was safe, or that the same logical agent persisted across restarts. The first implementation takes the authenticated subject from configuration; production deployments should source it from the agent SVID/mTLS identity path. Trust in the manifest issuer key remains an out-of-band PKI concern. + **LLM inference and model output** cMCP intercepts tool calls at the MCP protocol boundary. It does not observe or modify LLM inference, the contents of the agent's context window, or model outputs that do not produce a tool call. A model could hallucinate a response, leak sensitive context in a chat reply, or receive a poisoned tool response that influences subsequent reasoning -- none of these are visible to the gateway. cMCP controls the tool boundary, not the model boundary. diff --git a/docs/configuration.md b/docs/configuration.md index 91c65fd..67a93e9 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -35,6 +35,16 @@ attestation: # AMD measurement register hex for SEV-SNP). # expected_measurement: "sha256:..." +agent_manifest: + # Optional signed Agent Manifest binding (#302). When set, the runtime + # verifies the manifest issuer signature, checks that the authenticated + # agent subject matches manifest.agent_id, and requires the manifest's + # policy/catalog hashes to match the loaded runtime hashes before sessions + # can be created. + path: ./agent-manifest.json + trust_anchor_path: ./manifest-public-key.json + authenticated_subject: spiffe://factory.example/agent/material-movement/dev + # Path to the directory containing .cedar policy files and manifest.json. # Must not contain '..' components. Relative paths are resolved from the # working directory at startup. @@ -72,6 +82,16 @@ policy_reload_interval_seconds: 0 | `staleness_policy` | string | `fail_closed` | Action when attestation validity expires. Valid values: `fail_closed` (terminate sessions), `warn_only` (allow sessions, mark claims as stale). | | `expected_measurement` | string | none | Optional. Expected TEE measurement value. If set, the runtime verifies the hardware measurement matches this string at startup and exits with a non-zero status if it does not. | +### agent_manifest + +All fields are optional as a group. If `path` is set, `trust_anchor_path` must also be set. When the block is configured, cMCP fails closed on manifest signature failure, subject mismatch, policy hash drift, or catalog hash drift. + +| Field | Type | Default | Description | +|-------|------|---------|-------------| +| `path` | string | none | Path to the signed Agent Manifest JSON document. Path traversal (`..` components) is rejected. | +| `trust_anchor_path` | string | none | Path to a JSON trust anchor containing the issuer Ed25519 public key, either as `{ "key_id": "...", "public_key_base64url": "..." }` or `{ "keys": [...] }`. | +| `authenticated_subject` | string | none | SPIFFE URI for the authenticated agent subject. This must equal `manifest.agent_id`. In production this should come from the agent SVID/mTLS identity; the config field is the current runtime input for that subject. | + ### top-level fields | Field | Type | Default | Description | @@ -91,6 +111,7 @@ Environment variables control secrets and mode flags that must not appear in con | `CMCP_DEV_MODE=1` | Enables software-only attestation. No hardware TEE required. TRACE Claims will show `partially_verified` status. Required when `provider` is `software-only`. | `attestation.provider` (forces software-only) | | `CMCP_BEARER_TOKEN` | Optional bearer token for runtime HTTP auth. If set, all requests to the runtime must include `Authorization: Bearer `. If unset, no bearer auth is enforced. | none | | `OPAQUE_ATTESTATION_URL` | Enables the Opaque Managed Runtime provider. Must be set to the Opaque attestation service URL. Required when `provider` is `opaque` or `auto` on Opaque infrastructure. | enables `opaque` provider detection | +| `CMCP_POLICY_HASH` | SHA-256 hash of the approved policy bundle. Required in non-dev mode and checked by startup before Agent Manifest binding. The gateway fails closed at startup if this is unset and `CMCP_DEV_MODE` is not `1`. Format: `sha256:`. | none (startup policy integrity check) | | `CMCP_CATALOG_HASH` | SHA-256 hash of the approved `catalog.json`. Required in non-dev mode. The gateway fails closed at startup if this is unset and `CMCP_DEV_MODE` is not `1`. Format: `sha256:`. | none (additional startup check) | ## Enforcement modes @@ -129,6 +150,7 @@ catalog_path: ./catalog.json - Set `attestation.enforcement_mode` to `enforcing`. Advisory mode provides no blocking protection against policy violations. - Set `CMCP_CATALOG_HASH` to the SHA-256 of the approved `catalog.json`. The gateway fails closed at startup if this is unset in non-dev mode, but setting it explicitly pins the approved catalog hash and prevents silent substitution. +- Configure `agent_manifest.path`, `agent_manifest.trust_anchor_path`, and `agent_manifest.authenticated_subject` for agents with signed manifests. The runtime will refuse to start if the signed manifest does not bind the authenticated agent subject to the loaded policy bundle and catalog hashes. - Set `attestation.expected_measurement` to the expected TEE measurement for your deployment. Without this, a different binary could be deployed and would still produce valid attestation reports. - Use a real TEE provider (`tpm`, `sev-snp`, `tdx`, or `opaque`), not `software-only`. Software-only mode does not provide a hardware root of trust and leaves threat classes T1 through T4 open. - Rotate the TEE signing key by performing a full enclave restart on a regular schedule. The signing key is hardware-sealed per enclave instance; rotation requires restart. diff --git a/docs/quickstart.md b/docs/quickstart.md index d8b1dc3..89b9f0c 100644 --- a/docs/quickstart.md +++ b/docs/quickstart.md @@ -252,7 +252,7 @@ The response is a signed `GatewayClaim`. It looks like: "trace": { "eat_profile": "tag:agentrust.io,2026:trace-v0.1", "iat": 1749081600, - "subject": "spiffe://cmcp.gateway/session/demo-session-001", + "subject": "spiffe://cmcp.gateway/tee/", "runtime": { "platform": "tpm2", "measurement": "sha256:0000000000000000000000000000000000000000000000000000000000000000", diff --git a/docs/spec/error-codes.md b/docs/spec/error-codes.md index fd71100..ff4bddb 100644 --- a/docs/spec/error-codes.md +++ b/docs/spec/error-codes.md @@ -10,6 +10,7 @@ This is the normative registry for all error codes used across the cMCP Runtime. | `ATTESTATION_PROVIDER_UNSUPPORTED` | 500 | FATAL | No supported TEE provider detected and `CMCP_DEV_MODE` is not set | [attestation.md §1.1](attestation.md) | | `POLICY_HASH_MISMATCH` | 500 | FATAL | Measured policy bundle hash does not match deployment manifest | [failure-modes.md FM-4](failure-modes.md) | | `CATALOG_HASH_MISMATCH` | 500 | FATAL | Measured catalog hash does not match deployment manifest | [attestation.md §5](attestation.md) | +| `AGENT_MANIFEST_BINDING_FAILED` | 500 | FATAL | Signed Agent Manifest signature, authenticated subject, policy hash, or catalog hash did not match the runtime session inputs | [session-policy.md](session-policy.md) | | `TOOL_NOT_IN_CATALOG` | 403 | WARN | Agent requested a tool not present in the attested catalog | [cedar-policy.md](cedar-policy.md) | | `POLICY_DENY` | 403 | INFO | Cedar policy evaluation returned deny for this call | [cedar-policy.md](cedar-policy.md) | | `CATALOG_TOOL_NAME_COLLISION` | 500 | FATAL | Two catalog entries register the same `tool_name` | [tool-identity.md](tool-identity.md) | @@ -35,6 +36,7 @@ The following error codes are defined and documented in [verification-library.md | `PUBLIC_KEY_NOT_BOUND` | | `POLICY_HASH_MISMATCH` | | `CATALOG_HASH_MISMATCH` | +| `AGENT_MANIFEST_MISMATCH` | | `ATTESTATION_STALE` | | `CHAIN_BROKEN` | | `CLAIM_MALFORMED` | diff --git a/docs/spec/session-policy.md b/docs/spec/session-policy.md index 2f74a9d..3979b1f 100644 --- a/docs/spec/session-policy.md +++ b/docs/spec/session-policy.md @@ -143,6 +143,44 @@ In practice: session `max_duration_seconds` is set to `min(configured_session_ma This means a long-running agent that handles high-sensitivity data early in its session will face increasingly tight call restrictions as the session progresses - both because `max_sensitivity` is monotonically increasing and because the TRACE token TTL is monotonically decreasing. Deployments should size TRACE token lifetimes to match expected task durations. +## Agent Manifest Identity Binding + +When `agent_manifest` is configured, session creation is bound to a signed Agent Manifest. The runtime loads the manifest and issuer trust anchor, delegates Agent Manifest signature and artifact verification to the `agent-manifest` SDK `verify_manifest()` API, and extracts: + +- `manifest_id` +- `agent_id` +- `artifacts.policy_bundle.hash` +- `artifacts.tool_manifest.catalog_hash` + +The authenticated agent subject for the session MUST be a SPIFFE URI and MUST equal `manifest.agent_id`. In the current HTTP runtime this subject is supplied by `agent_manifest.authenticated_subject`; production deployments SHOULD wire this value from the inbound agent SVID/mTLS identity. The runtime records the provenance of this assertion in `gateway.agent_identity.subject_source`: `config` for configured input, `svid` for transport/mTLS SVID, or `manifest-dev` for the development-mode fallback that uses `manifest.agent_id`. If the subject, manifest signature, manifest expiry, policy hash, or catalog hash does not match, the runtime fails closed before serving the session. + +The Agent Manifest signing pre-image uses the manifest `signed_fields` subset. For HITL workflows, `hitl_record.approvals` is normalized to an empty array before signing so approvals can be attached after the manifest is issued without invalidating the manifest signature. + +This binding answers "who acted" for the session. It does not replace `trace.subject`, which identifies the cMCP gateway's TEE-backed issuing identity. The session identifier remains in `gateway.session_id`. Instead, the TRACE Trust Record carries `gateway.agent_identity` alongside the gateway subject: + +```json +{ + "trace": { + "subject": "spiffe://cmcp.gateway/tee/" + }, + "gateway": { + "session_id": "", + "agent_identity": { + "manifest_id": "", + "agent_id": "spiffe://factory.example/agent/material-movement/dev", + "authenticated_subject": "spiffe://factory.example/agent/material-movement/dev", + "subject_source": "config", + "issuer": "spiffe://factory.example/signing-authority/development", + "issuer_key_id": "", + "policy_bundle_hash": "sha256:", + "tool_catalog_hash": "sha256:" + } + } +} +``` + +Offline verifiers SHOULD cross-check `gateway.agent_identity` against the signed manifest and trusted issuer key. This keeps the runtime boundary check and the evidence artifact self-checking. + ## TRACE Claim Fields from Session State The following fields from session state are included in the TRACE attestation record for the session (written at session close): @@ -151,3 +189,4 @@ The following fields from session state are included in the TRACE attestation re |-------|------|-------------| | `session_max_sensitivity` | string | The highest `max_sensitivity` value reached during the session. | | `session_reset_count` | integer | Number of times `POST /session/reset` was called during the session lifetime. Normally `0`; a non-zero value warrants review. | +| `agent_identity` | object | Optional Agent Manifest binding: manifest ID, bound agent ID, authenticated subject, subject source, issuer key ID, policy hash, and catalog hash. Present only when `agent_manifest` is configured and verified. | diff --git a/docs/spec/verification-library.md b/docs/spec/verification-library.md index 52b4c6a..a35127d 100644 --- a/docs/spec/verification-library.md +++ b/docs/spec/verification-library.md @@ -46,6 +46,10 @@ def verify_trace_claim( claim_json: dict, approved: ApprovedHashes, max_attestation_age_seconds: int = 86400, + *, + trusted_public_key_hex: Optional[str] = None, + agent_manifest: Optional[dict] = None, + trusted_agent_manifest_keys: Optional[dict[str, bytes]] = None, ) -> VerificationResult: """ Verify a TRACE Claim without trusting the operator. @@ -55,15 +59,24 @@ def verify_trace_claim( 2. Verify signature over canonical claim body using tee_public_key 3. Check policy_bundle.hash against approved.policy_bundle_hash 4. Check tool_catalog.hash against approved.tool_catalog_hash - 5. Check attestation freshness (timestamp within max_attestation_age_seconds) - 6. Verify audit chain continuity (audit_chain_root, audit_chain_tip) + 5. If agent_manifest and trusted_agent_manifest_keys are provided, verify + the Agent Manifest issuer signature with agent-manifest SDK + verify_manifest() and cross-check gateway.agent_identity: + manifest_id, agent_id/authenticated_subject, subject_source, policy hash, + catalog hash, and manifest expiry. + 6. Check attestation freshness (timestamp within max_attestation_age_seconds) + 7. Verify audit chain continuity (audit_chain_root, audit_chain_tip) Returns VerificationResult with status and details. """ - ... -``` - -## Per-Provider Verification Steps + ... +``` + +`trusted_agent_manifest_keys` keeps cMCP's runtime-facing shape as raw Ed25519 +public key bytes keyed by issuer `key_id`; the verifier base64url-encodes those +keys when calling the Agent Manifest SDK. + +## Per-Provider Verification Steps ### TPM Verification @@ -116,6 +129,7 @@ VerificationError enum: - PUBLIC_KEY_NOT_BOUND: tee_public_key is not bound to the attestation_report (measurement mismatch or quote verification failed) - POLICY_HASH_MISMATCH: policy_bundle.hash != approved.policy_bundle_hash - CATALOG_HASH_MISMATCH: tool_catalog.hash != approved.tool_catalog_hash +- AGENT_MANIFEST_MISMATCH: gateway.agent_identity does not match the signed Agent Manifest, the manifest signature is invalid, or trusted issuer keys were not supplied for a requested manifest check - ATTESTATION_STALE: attestation_generated_at is older than max_attestation_age_seconds - CHAIN_BROKEN: audit_chain_root -> audit_chain_tip traversal fails (missing entries or hash mismatch) - CLAIM_MALFORMED: claim_json fails JSON Schema validation against the TRACE Claim schema diff --git a/pyproject.toml b/pyproject.toml index a2858e3..90ed6d0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -27,6 +27,8 @@ classifiers = [ requires-python = ">=3.11" dependencies = [ "agentrust-trace>=0.1", + # Replace with a version constraint after the next SDK release exports verify_manifest. + "agent-manifest @ git+https://github.com/agentrust-io/agent-manifest.git@1297c223d68fdaf95ac9438d9de844597281a3c2#subdirectory=python", "cryptography>=42.0", "pyyaml>=6.0", "httpx>=0.27", @@ -62,6 +64,9 @@ Documentation = "https://github.com/agentrust-io/cmcp/tree/main/docs" [tool.hatch.build.targets.wheel] packages = ["src/cmcp_runtime", "src/cmcp_verify"] +[tool.hatch.metadata] +allow-direct-references = true + [tool.pytest.ini_options] testpaths = ["tests"] asyncio_mode = "auto" diff --git a/schemas/trace-claim.schema.json b/schemas/trace-claim.schema.json index 442e1dc..f32bb4c 100644 --- a/schemas/trace-claim.schema.json +++ b/schemas/trace-claim.schema.json @@ -170,6 +170,50 @@ "drift_detected": { "type": "boolean" } } }, + "agent_identity": { + "type": "object", + "additionalProperties": false, + "required": [ + "manifest_id", + "agent_id", + "authenticated_subject", + "subject_source", + "issuer", + "issuer_key_id", + "policy_bundle_hash", + "tool_catalog_hash" + ], + "description": "Optional Agent Manifest binding carried by issue #302. The runtime emits this only after the signed manifest, authenticated agent subject, policy hash, and catalog hash have been checked at session creation.", + "properties": { + "manifest_id": { "type": "string" }, + "agent_id": { + "type": "string", + "pattern": "^spiffe://" + }, + "authenticated_subject": { + "type": "string", + "pattern": "^spiffe://" + }, + "subject_source": { + "type": "string", + "enum": ["config", "svid", "manifest-dev"], + "description": "Source of the authenticated_subject assertion. config means configured runtime input, svid means transport/mTLS SVID, and manifest-dev means dev-mode fallback from manifest.agent_id." + }, + "issuer": { + "type": "string", + "pattern": "^spiffe://" + }, + "issuer_key_id": { "type": "string" }, + "policy_bundle_hash": { + "type": "string", + "pattern": "^sha(256|384):[0-9a-f]+" + }, + "tool_catalog_hash": { + "type": "string", + "pattern": "^sha(256|384):[0-9a-f]+" + } + } + }, "attestation_generated_at": { "type": "string", "format": "date-time" diff --git a/src/cmcp_runtime/agent_manifest.py b/src/cmcp_runtime/agent_manifest.py new file mode 100644 index 0000000..0d29be7 --- /dev/null +++ b/src/cmcp_runtime/agent_manifest.py @@ -0,0 +1,280 @@ +"""Agent Manifest binding helpers for session identity evidence.""" + +from __future__ import annotations + +import base64 +import hashlib +import json +import re +from dataclasses import dataclass +from datetime import UTC, datetime +from pathlib import Path +from typing import Any, Literal, cast + +import agent_manifest as agent_manifest_sdk + +from cmcp_runtime.errors import ConfigError + +SIGNED_FIELDS: tuple[str, ...] = tuple(agent_manifest_sdk.SIGNED_FIELDS) + +_B64URL_RE = re.compile(r"^[A-Za-z0-9\-_]*$") +_HASH_RE = re.compile(r"^(sha256:[0-9a-f]{64}|sha384:[0-9a-f]{96})$") +_SUBJECT_SOURCES: frozenset[str] = frozenset({"config", "svid", "manifest-dev"}) + +SubjectSource = Literal["config", "svid", "manifest-dev"] + + +@dataclass(frozen=True) +class AgentManifestBinding: + """Session-ready Agent Manifest identity binding.""" + + manifest_id: str + agent_id: str + authenticated_subject: str + subject_source: SubjectSource + issuer: str + issuer_key_id: str + policy_bundle_hash: str + tool_catalog_hash: str + + +def _b64url_decode(value: str) -> bytes: + if not _B64URL_RE.match(value): + raise ConfigError("Agent Manifest signature/key must use base64url encoding") + padding = 4 - (len(value) % 4) + return base64.urlsafe_b64decode(value + ("=" * padding if padding != 4 else "")) + + +def _b64url_encode(value: bytes) -> str: + return base64.urlsafe_b64encode(value).rstrip(b"=").decode() + + +def _key_id(public_key: bytes) -> str: + return hashlib.sha256(public_key).hexdigest() + + +def signing_pre_image(manifest: dict[str, Any]) -> bytes: + """Return the Agent Manifest SDK signing pre-image.""" + return agent_manifest_sdk.signing_pre_image(manifest) + + +def load_agent_manifest(path: str) -> dict[str, Any]: + try: + with Path(path).open() as f: + manifest = json.load(f) + except (OSError, json.JSONDecodeError) as exc: + raise ConfigError(f"Cannot read Agent Manifest: {exc}") from exc + if not isinstance(manifest, dict): + raise ConfigError("Agent Manifest must be a JSON object") + return manifest + + +def load_agent_manifest_trust_anchor(path: str) -> dict[str, bytes]: + """Load issuer public keys from a JSON trust anchor file.""" + try: + with Path(path).open() as f: + raw = json.load(f) + except (OSError, json.JSONDecodeError) as exc: + raise ConfigError(f"Cannot read Agent Manifest trust anchor: {exc}") from exc + + anchors: dict[str, bytes] = {} + if isinstance(raw, dict) and "public_key_base64url" in raw: + public_key = _b64url_decode(str(raw["public_key_base64url"])) + key_id = str(raw.get("key_id") or _key_id(public_key)) + anchors[key_id] = public_key + return anchors + if isinstance(raw, dict) and "keys" in raw and isinstance(raw["keys"], list): + for item in raw["keys"]: + if not isinstance(item, dict): + raise ConfigError("Agent Manifest trust anchor keys must be objects") + public_key = _b64url_decode(str(item.get("public_key_base64url", ""))) + key_id = str(item.get("key_id") or _key_id(public_key)) + anchors[key_id] = public_key + return anchors + raise ConfigError( + "Agent Manifest trust anchor must contain public_key_base64url or keys[]" + ) + + +def _trusted_keys_for_sdk(trusted_keys: dict[str, bytes]) -> dict[str, str]: + sdk_keys: dict[str, str] = {} + for key_id, public_key in trusted_keys.items(): + if len(public_key) != 32: + raise ConfigError("Agent Manifest trust anchor contains an invalid Ed25519 key") + if _key_id(public_key) != key_id: + raise ConfigError("Agent Manifest trust anchor key_id does not match public key") + sdk_keys[key_id] = _b64url_encode(public_key) + return sdk_keys + + +def _result_value(value: Any) -> str: + return str(getattr(value, "value", value)) + + +def _raise_for_sdk_result(result: Any, *, require_runtime_artifacts: bool) -> None: + if result.result == agent_manifest_sdk.OverallResult.VALID: + if result.signature_verified is not True: + raise ConfigError("Agent Manifest signature verification failed") + if require_runtime_artifacts: + fields = result.fields_verified + if fields.policy_bundle != agent_manifest_sdk.FieldResult.MATCH: + raise ConfigError( + "Agent Manifest policy bundle hash does not match runtime policy" + ) + if fields.tool_manifest != agent_manifest_sdk.FieldResult.MATCH: + raise ConfigError( + "Agent Manifest tool catalog hash does not match runtime catalog" + ) + return + + mismatch_fields = {str(detail.field) for detail in result.mismatch_details} + if "policy_bundle" in mismatch_fields: + raise ConfigError("Agent Manifest policy bundle hash does not match runtime policy") + if "tool_manifest" in mismatch_fields: + raise ConfigError("Agent Manifest tool catalog hash does not match runtime catalog") + if "signature" in mismatch_fields: + raise ConfigError("Agent Manifest signature verification failed") + if result.result == agent_manifest_sdk.OverallResult.EXPIRED: + raise ConfigError("Agent Manifest has expired") + if result.result == agent_manifest_sdk.OverallResult.SIGNATURE_MISSING: + raise ConfigError("Agent Manifest signature block is missing") + if result.result == agent_manifest_sdk.OverallResult.UNVERIFIABLE: + raise ConfigError("Agent Manifest signature verification failed") + if result.result == agent_manifest_sdk.OverallResult.INCOMPATIBLE_VERSION: + raise ConfigError("Agent Manifest version is not supported by the SDK verifier") + if result.result == agent_manifest_sdk.OverallResult.INCOMPLETE: + raise ConfigError("Agent Manifest SDK verification is incomplete") + raise ConfigError(f"Agent Manifest SDK verification failed: {_result_value(result.result)}") + + +def _verify_with_sdk( + manifest: dict[str, Any], + trusted_keys: dict[str, bytes], + *, + policy_bundle_hash: str | None = None, + tool_catalog_hash: str | None = None, + require_runtime_artifacts: bool = False, +) -> None: + result = agent_manifest_sdk.verify_manifest( + manifest, + agent_manifest_sdk.VerificationContext( + policy_bundle_hash=policy_bundle_hash, + tool_catalog_hash=tool_catalog_hash, + trusted_keys=_trusted_keys_for_sdk(trusted_keys), + ), + agent_manifest_sdk.RevocationStore(), + ) + _raise_for_sdk_result(result, require_runtime_artifacts=require_runtime_artifacts) + + +def verify_agent_manifest_signature( + manifest: dict[str, Any], + trusted_keys: dict[str, bytes], +) -> None: + _verify_with_sdk(manifest, trusted_keys) + + +def _require_hash(value: Any, field: str) -> str: + if not isinstance(value, str) or not _HASH_RE.match(value): + raise ConfigError(f"Agent Manifest {field} must be a sha-prefixed hash") + return value + + +def _parse_manifest_time(value: Any, field: str) -> datetime: + if not isinstance(value, str) or not value: + raise ConfigError(f"Agent Manifest {field} is missing") + try: + parsed = datetime.fromisoformat(value.replace("Z", "+00:00")) + except ValueError as exc: + raise ConfigError(f"Agent Manifest {field} must be an RFC3339 timestamp") from exc + if parsed.tzinfo is None: + raise ConfigError(f"Agent Manifest {field} must include a timezone") + return parsed.astimezone(UTC) + + +def _manifest_binding_fields(manifest: dict[str, Any]) -> tuple[str, str, str, str, str, str]: + manifest_id = manifest.get("manifest_id") + agent_id = manifest.get("agent_id") + issuer = manifest.get("issuer") + if not isinstance(manifest_id, str) or not manifest_id: + raise ConfigError("Agent Manifest manifest_id is missing") + if not isinstance(agent_id, str) or not agent_id.startswith("spiffe://"): + raise ConfigError("Agent Manifest agent_id must be a SPIFFE URI") + if not isinstance(issuer, str) or not issuer.startswith("spiffe://"): + raise ConfigError("Agent Manifest issuer must be a SPIFFE URI") + artifacts = manifest.get("artifacts") + if not isinstance(artifacts, dict): + raise ConfigError("Agent Manifest artifacts object is missing") + policy = artifacts.get("policy_bundle") + tools = artifacts.get("tool_manifest") + if not isinstance(policy, dict) or not isinstance(tools, dict): + raise ConfigError("Agent Manifest must bind policy_bundle and tool_manifest") + policy_hash = _require_hash(policy.get("hash"), "artifacts.policy_bundle.hash") + catalog_hash = _require_hash( + tools.get("catalog_hash"), "artifacts.tool_manifest.catalog_hash" + ) + signature = manifest.get("signature") + key_id = str(signature.get("key_id") or "") if isinstance(signature, dict) else "" + return manifest_id, agent_id, issuer, key_id, policy_hash, catalog_hash + + +def verify_agent_manifest_binding( + manifest: dict[str, Any], + trusted_keys: dict[str, bytes], + *, + authenticated_subject: str | None, + policy_bundle_hash: str, + tool_catalog_hash: str, + authenticated_subject_source: str | None = None, + allow_dev_subject_from_manifest: bool = False, + now: datetime | None = None, +) -> AgentManifestBinding: + """Verify manifest signature and bind it to runtime session inputs.""" + manifest_id, agent_id, issuer, key_id, manifest_policy, manifest_catalog = ( + _manifest_binding_fields(manifest) + ) + + _parse_manifest_time(manifest.get("issued_at"), "issued_at") + expires_at = _parse_manifest_time(manifest.get("expires_at"), "expires_at") + current_time = now or datetime.now(UTC) + if current_time.tzinfo is None: + current_time = current_time.replace(tzinfo=UTC) + if expires_at <= current_time.astimezone(UTC): + raise ConfigError("Agent Manifest has expired") + _verify_with_sdk( + manifest, + trusted_keys, + policy_bundle_hash=policy_bundle_hash, + tool_catalog_hash=tool_catalog_hash, + require_runtime_artifacts=True, + ) + + subject = authenticated_subject + subject_source = authenticated_subject_source + if subject is None and allow_dev_subject_from_manifest: + subject = agent_id + subject_source = "manifest-dev" + if subject_source is None: + subject_source = "config" + if subject_source not in _SUBJECT_SOURCES: + raise ConfigError("Agent Manifest subject_source is not supported") + if subject_source == "manifest-dev" and not allow_dev_subject_from_manifest: + raise ConfigError("Agent Manifest manifest-dev subject_source requires dev mode") + subject_source = cast(SubjectSource, subject_source) + if not isinstance(subject, str) or not subject.startswith("spiffe://"): + raise ConfigError("Authenticated agent subject must be a SPIFFE URI") + if subject != agent_id: + raise ConfigError( + "Agent Manifest agent_id does not match authenticated session subject" + ) + + return AgentManifestBinding( + manifest_id=manifest_id, + agent_id=agent_id, + authenticated_subject=subject, + subject_source=subject_source, + issuer=issuer, + issuer_key_id=key_id, + policy_bundle_hash=manifest_policy, + tool_catalog_hash=manifest_catalog, + ) diff --git a/src/cmcp_runtime/audit/trace_claim.py b/src/cmcp_runtime/audit/trace_claim.py index ccf6b5e..e59c8b6 100644 --- a/src/cmcp_runtime/audit/trace_claim.py +++ b/src/cmcp_runtime/audit/trace_claim.py @@ -12,6 +12,8 @@ from agentrust_trace.models import JWK, ConfirmationKey, PolicyInfo, RuntimeInfo, ToolTranscript from pydantic import BaseModel, ConfigDict, Field +AgentSubjectSource = Literal["config", "svid", "manifest-dev"] + try: _RUNTIME_VERSION: str = importlib.metadata.version("cmcp-runtime") # was cmcp-gateway except importlib.metadata.PackageNotFoundError: @@ -78,6 +80,18 @@ class AttestationReportInfo: raw_evidence: str | None = None +@dataclass +class AgentIdentityInfo: + manifest_id: str + agent_id: str + authenticated_subject: str + subject_source: AgentSubjectSource + issuer: str + issuer_key_id: str + policy_bundle_hash: str + tool_catalog_hash: str + + # ── Pydantic output models ───────────────────────────────────────────────────── @@ -117,6 +131,19 @@ class CatalogSummary(BaseModel): drift_detected: bool = False +class AgentIdentityOut(BaseModel): + model_config = ConfigDict(extra="forbid") + + manifest_id: str + agent_id: Annotated[str, Field(pattern=r"^spiffe://")] + authenticated_subject: Annotated[str, Field(pattern=r"^spiffe://")] + subject_source: AgentSubjectSource + issuer: Annotated[str, Field(pattern=r"^spiffe://")] + issuer_key_id: str + policy_bundle_hash: str + tool_catalog_hash: str + + class GatewayTrace(BaseModel): """Phase 1 TRACE fields applicable to the cmcp runtime context.""" @@ -159,6 +186,7 @@ class GatewayAddenda(BaseModel): attestation_stale: bool catalog_exceptions: list[dict[str, str]] = Field(default_factory=list) call_log_summary: CallLogSummary | None = None + agent_identity: AgentIdentityOut | None = None class RuntimeClaim(BaseModel): @@ -255,6 +283,10 @@ def _build_cnf(signing_key: Any) -> ConfirmationKey: return ConfirmationKey(jwk=JWK(kty="OKP", crv="Ed25519", x=x, kid=kid)) +def _build_subject(signing_key: Any) -> str: + return f"spiffe://cmcp.gateway/tee/{signing_key.public_key_hex[:16]}" + + # ── Public API ───────────────────────────────────────────────────────────────── @@ -272,6 +304,7 @@ def generate_trace_claim( attestation_stale: bool = False, catalog_exceptions: list[dict[str, str]] | None = None, call_log_summary: CallLogSummary | None = None, + agent_identity: AgentIdentityInfo | None = None, sequence_number: int = 1, prev_claim_hash: str | None = None, do_sign: bool = True, @@ -291,7 +324,7 @@ def generate_trace_claim( trace = GatewayTrace( eat_profile="tag:agentrust.io,2026:trace-v0.1", iat=int(datetime.now(tz=UTC).timestamp()), - subject=f"spiffe://cmcp.gateway/session/{session_id}", + subject=_build_subject(signing_key), runtime=_build_runtime(attestation_report), policy=_build_policy(policy_bundle), data_class=call_summary.session_max_sensitivity, @@ -334,6 +367,20 @@ def generate_trace_claim( attestation_stale=attestation_stale, catalog_exceptions=catalog_exceptions or [], call_log_summary=call_log_summary, + agent_identity=( + AgentIdentityOut( + manifest_id=agent_identity.manifest_id, + agent_id=agent_identity.agent_id, + authenticated_subject=agent_identity.authenticated_subject, + subject_source=agent_identity.subject_source, + issuer=agent_identity.issuer, + issuer_key_id=agent_identity.issuer_key_id, + policy_bundle_hash=agent_identity.policy_bundle_hash, + tool_catalog_hash=agent_identity.tool_catalog_hash, + ) + if agent_identity is not None + else None + ), ) claim = RuntimeClaim(trace=trace, gateway=gateway) diff --git a/src/cmcp_runtime/cli.py b/src/cmcp_runtime/cli.py index 2ddda46..62e6566 100644 --- a/src/cmcp_runtime/cli.py +++ b/src/cmcp_runtime/cli.py @@ -106,6 +106,10 @@ def start(config: str, enforcement: str | None) -> None: help="Out-of-band pinned Ed25519 public key (hex) to cross-check trace.cnf.jwk.") @click.option("--audit-bundle", default=None, type=click.Path(exists=True), help="Also verify an exported audit bundle (GET /audit/export) against the claim.") +@click.option("--agent-manifest", default=None, type=click.Path(exists=True), + help="Signed Agent Manifest to cross-check against the Trust Record.") +@click.option("--agent-manifest-trust-anchor", default=None, type=click.Path(exists=True), + help="JSON issuer public key trust anchor for --agent-manifest.") def verify( claim_file: str, policy_hash: str | None, @@ -113,6 +117,8 @@ def verify( max_age: int, trusted_key: str | None, audit_bundle: str | None, + agent_manifest: str | None, + agent_manifest_trust_anchor: str | None, ) -> None: """Verify a signed TRACE Claim (and optionally its audit bundle). @@ -123,6 +129,10 @@ def verify( """ import json as _json + from cmcp_runtime.agent_manifest import ( + load_agent_manifest, + load_agent_manifest_trust_anchor, + ) from cmcp_verify import ApprovedHashes, verify_audit_bundle, verify_trace_claim with open(claim_file) as f: @@ -139,11 +149,26 @@ def verify( or claim.get("gateway", {}).get("catalog", {}).get("hash", ""), ) + manifest_json = load_agent_manifest(agent_manifest) if agent_manifest is not None else None + manifest_keys = ( + load_agent_manifest_trust_anchor(agent_manifest_trust_anchor) + if agent_manifest_trust_anchor is not None + else None + ) + if agent_manifest is not None and agent_manifest_trust_anchor is None: + click.echo( + "[cmcp verify] agent_manifest FAIL pass --agent-manifest-trust-anchor", + err=True, + ) + raise SystemExit(1) + result = verify_trace_claim( claim, approved, max_attestation_age_seconds=max_age, trusted_public_key_hex=trusted_key, + agent_manifest=manifest_json, + trusted_agent_manifest_keys=manifest_keys, ) def _line(name: str, ok: bool, note: str = "") -> None: diff --git a/src/cmcp_runtime/config.py b/src/cmcp_runtime/config.py index edbc636..d403d61 100644 --- a/src/cmcp_runtime/config.py +++ b/src/cmcp_runtime/config.py @@ -47,9 +47,17 @@ class AttestationConfig: expected_measurement: str | None = None +@dataclass +class AgentManifestConfig: + path: str | None = None + trust_anchor_path: str | None = None + authenticated_subject: str | None = None + + @dataclass class Config: attestation: AttestationConfig = field(default_factory=AttestationConfig) + agent_manifest: AgentManifestConfig = field(default_factory=AgentManifestConfig) policy_bundle_path: str = "policy/" catalog_path: str = "catalog.json" listen_addr: str = "0.0.0.0:8443" @@ -60,8 +68,24 @@ class Config: bearer_token: str | None = None -_KNOWN_TOP_KEYS = {"attestation", "policy_bundle_path", "catalog_path", "listen_addr", "max_response_size_bytes", "policy_reload_interval_seconds", "audit_db_path"} -_KNOWN_ATTEST_KEYS = {"provider", "enforcement_mode", "validity_seconds", "staleness_policy", "expected_measurement"} +_KNOWN_TOP_KEYS = { + "attestation", + "agent_manifest", + "policy_bundle_path", + "catalog_path", + "listen_addr", + "max_response_size_bytes", + "policy_reload_interval_seconds", + "audit_db_path", +} +_KNOWN_ATTEST_KEYS = { + "provider", + "enforcement_mode", + "validity_seconds", + "staleness_policy", + "expected_measurement", +} +_KNOWN_AGENT_MANIFEST_KEYS = {"path", "trust_anchor_path", "authenticated_subject"} def _check_no_traversal(field_name: str, path_str: str) -> None: @@ -108,6 +132,19 @@ def load_config(path: str) -> Config: f"Unknown attestation key '{key}'. Valid keys: {sorted(_KNOWN_ATTEST_KEYS)}" ) + manifest_raw = raw.get("agent_manifest", {}) + if manifest_raw is None: + manifest_raw = {} + if not isinstance(manifest_raw, dict): + raise ConfigError("'agent_manifest' must be a mapping") + + for key in manifest_raw: + if key not in _KNOWN_AGENT_MANIFEST_KEYS: + raise ConfigError( + "Unknown agent_manifest key " + f"'{key}'. Valid keys: {sorted(_KNOWN_AGENT_MANIFEST_KEYS)}" + ) + try: provider = TEEProvider(attest_raw.get("provider", "auto")) except ValueError as err: @@ -152,6 +189,26 @@ def load_config(path: str) -> Config: _check_no_traversal("catalog_path", catalog_path) _check_no_traversal("audit_db_path", audit_db_path) + agent_manifest_path = manifest_raw.get("path") + trust_anchor_path = manifest_raw.get("trust_anchor_path") + authenticated_subject = manifest_raw.get("authenticated_subject") + if agent_manifest_path is not None and not isinstance(agent_manifest_path, str): + raise ConfigError("agent_manifest.path must be a string") + if trust_anchor_path is not None and not isinstance(trust_anchor_path, str): + raise ConfigError("agent_manifest.trust_anchor_path must be a string") + if authenticated_subject is not None and not isinstance(authenticated_subject, str): + raise ConfigError("agent_manifest.authenticated_subject must be a string") + if authenticated_subject is not None and not authenticated_subject.startswith("spiffe://"): + raise ConfigError("agent_manifest.authenticated_subject must be a SPIFFE URI") + if bool(agent_manifest_path) != bool(trust_anchor_path): + raise ConfigError( + "agent_manifest.path and agent_manifest.trust_anchor_path must be set together" + ) + if agent_manifest_path is not None: + _check_no_traversal("agent_manifest.path", agent_manifest_path) + if trust_anchor_path is not None: + _check_no_traversal("agent_manifest.trust_anchor_path", trust_anchor_path) + return Config( attestation=AttestationConfig( provider=provider, @@ -160,6 +217,11 @@ def load_config(path: str) -> Config: staleness_policy=staleness_policy, expected_measurement=expected_measurement, ), + agent_manifest=AgentManifestConfig( + path=agent_manifest_path, + trust_anchor_path=trust_anchor_path, + authenticated_subject=authenticated_subject, + ), policy_bundle_path=policy_bundle_path, catalog_path=catalog_path, listen_addr=raw.get("listen_addr", "0.0.0.0:8443"), diff --git a/src/cmcp_runtime/session/manager.py b/src/cmcp_runtime/session/manager.py index dec07a8..acab6b9 100644 --- a/src/cmcp_runtime/session/manager.py +++ b/src/cmcp_runtime/session/manager.py @@ -12,8 +12,10 @@ from typing import Any from uuid import uuid4 +from cmcp_runtime.agent_manifest import AgentManifestBinding from cmcp_runtime.audit.chain import AuditChain from cmcp_runtime.audit.trace_claim import ( + AgentIdentityInfo, AttestationReportInfo, CallGraphSummary, CallLogSummary, @@ -218,6 +220,22 @@ def close_session( suspicious_sequences_detected=state.suspicious_sequences, ) + agent_identity: AgentIdentityInfo | None = None + binding = getattr(ctx, "agent_manifest", None) + if not isinstance(binding, AgentManifestBinding): + binding = None + if binding is not None: + agent_identity = AgentIdentityInfo( + manifest_id=binding.manifest_id, + agent_id=binding.agent_id, + authenticated_subject=binding.authenticated_subject, + subject_source=binding.subject_source, + issuer=binding.issuer, + issuer_key_id=binding.issuer_key_id, + policy_bundle_hash=binding.policy_bundle_hash, + tool_catalog_hash=binding.tool_catalog_hash, + ) + # AUDIT-005: increment the module-level counter to get a monotonic sequence number. global _CLAIM_SEQUENCE _CLAIM_SEQUENCE += 1 @@ -235,6 +253,7 @@ def close_session( attestation_stale=attestation_stale, catalog_exceptions=catalog_exceptions, call_log_summary=call_log_summary, + agent_identity=agent_identity, sequence_number=_CLAIM_SEQUENCE, prev_claim_hash=self._last_claim_hash, do_sign=True, diff --git a/src/cmcp_runtime/startup.py b/src/cmcp_runtime/startup.py index d4e21c5..7003ae0 100644 --- a/src/cmcp_runtime/startup.py +++ b/src/cmcp_runtime/startup.py @@ -12,6 +12,12 @@ from dataclasses import dataclass from typing import Any +from cmcp_runtime.agent_manifest import ( + AgentManifestBinding, + load_agent_manifest, + load_agent_manifest_trust_anchor, + verify_agent_manifest_binding, +) from cmcp_runtime.audit.keys import SigningKey from cmcp_runtime.audit.store import SqliteAuditStore from cmcp_runtime.catalog.loader import ToolCatalog, load_catalog @@ -56,6 +62,7 @@ class RuntimeContext: audit_store: SqliteAuditStore | None = None spiffe: SpiffeClientResult | None = None nras_appraisal: AppraisalResult | None = None + agent_manifest: AgentManifestBinding | None = None def _jwk_thumbprint_sha256(x_b64url: str) -> bytes: @@ -249,7 +256,35 @@ def run_startup(config_path: str) -> RuntimeContext: catalog.catalog_hash, ) - # Step 5b: SPIFFE/SPIRE SVID fetch (non-fatal - falls back to self-signed TLS) + # Step 5b: optional Agent Manifest binding (#302). When configured, this is + # fail-closed: signature, subject, policy hash, and catalog hash must agree + # before any session can be created. + agent_manifest: AgentManifestBinding | None = None + if config.agent_manifest.path is not None and config.agent_manifest.trust_anchor_path is not None: + try: + manifest = load_agent_manifest(config.agent_manifest.path) + trusted_keys = load_agent_manifest_trust_anchor( + config.agent_manifest.trust_anchor_path + ) + agent_manifest = verify_agent_manifest_binding( + manifest, + trusted_keys, + authenticated_subject=config.agent_manifest.authenticated_subject, + policy_bundle_hash=policy_bundle.bundle_hash, + tool_catalog_hash=catalog.catalog_hash, + allow_dev_subject_from_manifest=config.dev_mode, + ) + except ConfigError as exc: + _fatal("AGENT_MANIFEST_BINDING_FAILED", str(exc), action="startup_aborted") + sys.exit(1) + + logger.info( + "Agent Manifest bound: manifest_id=%s agent_id=%s", + agent_manifest.manifest_id, + agent_manifest.agent_id, + ) + + # Step 5c: SPIFFE/SPIRE SVID fetch (non-fatal - falls back to self-signed TLS) # SVID issuance is conditioned on TEE attestation succeeding (handled by the # SPIRE node attestation plugin on the SPIRE server side). spiffe_result = fetch_svid() @@ -264,11 +299,11 @@ def run_startup(config_path: str) -> RuntimeContext: spiffe_result.failure_reason, ) - # Step 5c: NRAS post-attestation appraisal (non-fatal, Phase 2 / v0.2 -- issue #125). + # Step 5d: NRAS post-attestation appraisal (non-fatal, Phase 2 / v0.2 -- issue #125). # CMCP_NRAS_API_KEY missing -> skip with warning; any NRAS error -> skip with warning. nras_appraisal = try_appraise(attestation_report) - # Step 5d: open durable audit store and warn on orphaned sessions (AUDIT-001). + # Step 5e: open durable audit store and warn on orphaned sessions (AUDIT-001). try: from pathlib import Path as _Path audit_store = SqliteAuditStore(_Path(config.audit_db_path)) @@ -298,4 +333,5 @@ def run_startup(config_path: str) -> RuntimeContext: audit_store=audit_store, spiffe=spiffe_result, nras_appraisal=nras_appraisal, + agent_manifest=agent_manifest, ) diff --git a/src/cmcp_verify/verify.py b/src/cmcp_verify/verify.py index 77dd2ab..6ab8ea8 100644 --- a/src/cmcp_verify/verify.py +++ b/src/cmcp_verify/verify.py @@ -21,7 +21,9 @@ from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PublicKey from pydantic import ValidationError +from cmcp_runtime.agent_manifest import verify_agent_manifest_binding from cmcp_runtime.audit.trace_claim import RuntimeClaim +from cmcp_runtime.errors import ConfigError logger = logging.getLogger(__name__) @@ -81,6 +83,7 @@ class VerificationError(StrEnum): CHAIN_BROKEN = "CHAIN_BROKEN" CLAIM_MALFORMED = "CLAIM_MALFORMED" HARDWARE_ATTESTATION_FAILED = "HARDWARE_ATTESTATION_FAILED" + AGENT_MANIFEST_MISMATCH = "AGENT_MANIFEST_MISMATCH" @dataclass @@ -350,6 +353,8 @@ def verify_trace_claim( max_attestation_age_seconds: int = 86400, *, trusted_public_key_hex: str | None = None, + agent_manifest: dict[str, Any] | None = None, + trusted_agent_manifest_keys: dict[str, bytes] | None = None, ) -> VerificationResult: """ Verify a TRACE Claim without trusting the operator. @@ -361,9 +366,11 @@ def verify_trace_claim( 2c. Optional out-of-band trusted_public_key_hex cross-check 3. trace.policy.bundle_hash check against approved.policy_bundle_hash 4. gateway.catalog.hash check against approved.tool_catalog_hash - 5. Attestation freshness check - 6. Audit chain consistency check - 7. Platform-specific attestation verification (dispatched per-platform) + 5. Optional Agent Manifest binding check when agent_manifest and trusted + issuer keys are provided. + 6. Attestation freshness check + 7. Audit chain consistency check + 8. Platform-specific attestation verification (dispatched per-platform) Returns VerificationResult with status and details. @@ -464,7 +471,59 @@ def verify_trace_claim( if failure is None: failure = VerificationError.CATALOG_HASH_MISMATCH - # Step 5: Attestation freshness + # Step 5: Optional Agent Manifest binding cross-check (#302). + if agent_manifest is not None: + agent_identity = claim_json.get("gateway", {}).get("agent_identity") + if not isinstance(agent_identity, dict): + unverified.append("agent_manifest.binding") + failure = failure or VerificationError.AGENT_MANIFEST_MISMATCH + details["agent_manifest"] = "claim has no gateway.agent_identity binding" + elif not trusted_agent_manifest_keys: + unverified.append("agent_manifest.binding") + failure = failure or VerificationError.AGENT_MANIFEST_MISMATCH + details["agent_manifest"] = "no trusted Agent Manifest issuer keys provided" + else: + try: + binding = verify_agent_manifest_binding( + agent_manifest, + trusted_agent_manifest_keys, + authenticated_subject=agent_identity.get("authenticated_subject"), + authenticated_subject_source=agent_identity.get("subject_source"), + policy_bundle_hash=claimed_policy, + tool_catalog_hash=claimed_catalog, + allow_dev_subject_from_manifest=( + agent_identity.get("subject_source") == "manifest-dev" + ), + ) + expected_identity = { + "manifest_id": binding.manifest_id, + "agent_id": binding.agent_id, + "authenticated_subject": binding.authenticated_subject, + "subject_source": binding.subject_source, + "issuer": binding.issuer, + "issuer_key_id": binding.issuer_key_id, + "policy_bundle_hash": binding.policy_bundle_hash, + "tool_catalog_hash": binding.tool_catalog_hash, + } + mismatched = [ + key + for key, expected in expected_identity.items() + if agent_identity.get(key) != expected + ] + if mismatched: + unverified.append("agent_manifest.binding") + failure = failure or VerificationError.AGENT_MANIFEST_MISMATCH + details["agent_manifest"] = ( + "gateway.agent_identity mismatch: " + ", ".join(mismatched) + ) + else: + verified.append("agent_manifest.binding") + except ConfigError as exc: + unverified.append("agent_manifest.binding") + failure = failure or VerificationError.AGENT_MANIFEST_MISMATCH + details["agent_manifest"] = str(exc) + + # Step 6: Attestation freshness age, is_fresh = _check_attestation_freshness(claim_json, max_attestation_age_seconds) if is_fresh: verified.append("attestation_freshness") @@ -474,7 +533,7 @@ def verify_trace_claim( failure = VerificationError.ATTESTATION_STALE details["attestation_age_seconds"] = str(age) - # Step 6: Audit chain consistency + # Step 7: Audit chain consistency chain_ok, chain_err = _check_audit_chain(claim_json) if chain_ok: verified.append("audit_chain") @@ -485,7 +544,7 @@ def verify_trace_claim( if chain_err: details["chain_error"] = chain_err - # Step 7: Platform-specific attestation + # Step 8: Platform-specific attestation platform = _runtime.get("platform", "") if _is_sw_only: diff --git a/tests/unit/test_agent_manifest.py b/tests/unit/test_agent_manifest.py new file mode 100644 index 0000000..b04074f --- /dev/null +++ b/tests/unit/test_agent_manifest.py @@ -0,0 +1,253 @@ +"""Tests for Agent Manifest identity binding (#302).""" + +from __future__ import annotations + +import base64 +import hashlib +import json +from datetime import UTC, datetime +from pathlib import Path + +import pytest +from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey +from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat + +from cmcp_runtime import agent_manifest as cmcp_agent_manifest +from cmcp_runtime.agent_manifest import ( + SIGNED_FIELDS, + load_agent_manifest_trust_anchor, + signing_pre_image, + verify_agent_manifest_binding, +) +from cmcp_runtime.errors import ConfigError + +POLICY_HASH = "sha256:" + "a" * 64 +CATALOG_HASH = "sha256:" + "b" * 64 +AGENT_ID = "spiffe://factory.example/agent/material-movement/dev" + + +def _b64url(data: bytes) -> str: + return base64.urlsafe_b64encode(data).rstrip(b"=").decode() + + +def _keypair() -> tuple[Ed25519PrivateKey, bytes, str]: + priv = Ed25519PrivateKey.generate() + pub = priv.public_key().public_bytes(Encoding.Raw, PublicFormat.Raw) + return priv, pub, hashlib.sha256(pub).hexdigest() + + +def _signed_manifest( + priv: Ed25519PrivateKey, + key_id: str, + *, + agent_id: str = AGENT_ID, + policy_hash: str = POLICY_HASH, + catalog_hash: str = CATALOG_HASH, + expires_at: str = "2099-09-10T00:00:00Z", +) -> dict: + manifest = { + "@context": "https://agentmanifest.agentrust.io/v0.1/context.json", + "@type": "AgentManifest", + "manifest_id": "0197739a-8c00-7000-8000-000000000001", + "agent_id": agent_id, + "version": "0.1", + "issued_at": "2026-06-12T00:00:00Z", + "expires_at": expires_at, + "issuer": "spiffe://factory.example/signing-authority/development", + "crypto_profile": "standard", + "artifacts": { + "policy_bundle": { + "hash": policy_hash, + "policy_language": "cedar", + "version": "0.1.0", + "enforcement_mode": "enforce", + }, + "tool_manifest": { + "catalog_hash": catalog_hash, + "tools": [], + "allow_dynamic_registration": False, + "rug_pull_policy": "deny-and-alert", + }, + }, + "delegation_chain": [], + } + manifest["signature"] = { + "algorithm": "Ed25519", + "key_id": key_id, + "key_type": "software", + "signed_at": "2026-06-12T00:00:00Z", + "signed_fields": list(SIGNED_FIELDS), + "signature_value": _b64url(priv.sign(signing_pre_image(manifest))), + } + return manifest + + +def test_valid_manifest_binds_subject_policy_and_catalog() -> None: + priv, pub, key_id = _keypair() + manifest = _signed_manifest(priv, key_id) + binding = verify_agent_manifest_binding( + manifest, + {key_id: pub}, + authenticated_subject=AGENT_ID, + policy_bundle_hash=POLICY_HASH, + tool_catalog_hash=CATALOG_HASH, + ) + assert binding.manifest_id == manifest["manifest_id"] + assert binding.agent_id == AGENT_ID + assert binding.subject_source == "config" + assert binding.issuer_key_id == key_id + + +def test_binding_verification_delegates_to_sdk_with_encoded_keys(monkeypatch) -> None: + priv, pub, key_id = _keypair() + manifest = _signed_manifest(priv, key_id) + captured = {} + + def fake_verify_manifest(manifest_arg, context, revocation_store): + captured["manifest"] = manifest_arg + captured["trusted_keys"] = context.trusted_keys + captured["policy_bundle_hash"] = context.policy_bundle_hash + captured["tool_catalog_hash"] = context.tool_catalog_hash + assert isinstance(revocation_store, cmcp_agent_manifest.agent_manifest_sdk.RevocationStore) + return cmcp_agent_manifest.agent_manifest_sdk.VerificationResult( + manifest_id=manifest_arg["manifest_id"], + result=cmcp_agent_manifest.agent_manifest_sdk.OverallResult.VALID, + signature_verified=True, + fields_verified=cmcp_agent_manifest.agent_manifest_sdk.FieldsVerified( + policy_bundle=cmcp_agent_manifest.agent_manifest_sdk.FieldResult.MATCH, + tool_manifest=cmcp_agent_manifest.agent_manifest_sdk.FieldResult.MATCH, + ), + ) + + monkeypatch.setattr( + cmcp_agent_manifest.agent_manifest_sdk, + "verify_manifest", + fake_verify_manifest, + ) + + binding = verify_agent_manifest_binding( + manifest, + {key_id: pub}, + authenticated_subject=AGENT_ID, + policy_bundle_hash=POLICY_HASH, + tool_catalog_hash=CATALOG_HASH, + ) + + assert binding.manifest_id == manifest["manifest_id"] + assert captured == { + "manifest": manifest, + "trusted_keys": {key_id: _b64url(pub)}, + "policy_bundle_hash": POLICY_HASH, + "tool_catalog_hash": CATALOG_HASH, + } + + +def test_dev_subject_fallback_is_marked_as_manifest_dev() -> None: + priv, pub, key_id = _keypair() + manifest = _signed_manifest(priv, key_id) + binding = verify_agent_manifest_binding( + manifest, + {key_id: pub}, + authenticated_subject=None, + policy_bundle_hash=POLICY_HASH, + tool_catalog_hash=CATALOG_HASH, + allow_dev_subject_from_manifest=True, + ) + assert binding.authenticated_subject == AGENT_ID + assert binding.subject_source == "manifest-dev" + + +def test_subject_mismatch_fails_closed() -> None: + priv, pub, key_id = _keypair() + manifest = _signed_manifest(priv, key_id) + with pytest.raises(ConfigError, match="authenticated session subject"): + verify_agent_manifest_binding( + manifest, + {key_id: pub}, + authenticated_subject="spiffe://factory.example/agent/other/dev", + policy_bundle_hash=POLICY_HASH, + tool_catalog_hash=CATALOG_HASH, + ) + + +def test_tampered_manifest_signature_fails_closed() -> None: + priv, pub, key_id = _keypair() + manifest = _signed_manifest(priv, key_id) + manifest["agent_id"] = "spiffe://factory.example/agent/other/dev" + with pytest.raises(ConfigError, match="signature verification failed"): + verify_agent_manifest_binding( + manifest, + {key_id: pub}, + authenticated_subject=AGENT_ID, + policy_bundle_hash=POLICY_HASH, + tool_catalog_hash=CATALOG_HASH, + ) + + +def test_policy_hash_drift_fails_closed() -> None: + priv, pub, key_id = _keypair() + manifest = _signed_manifest(priv, key_id) + with pytest.raises(ConfigError, match="policy bundle hash"): + verify_agent_manifest_binding( + manifest, + {key_id: pub}, + authenticated_subject=AGENT_ID, + policy_bundle_hash="sha256:" + "0" * 64, + tool_catalog_hash=CATALOG_HASH, + ) + + +def test_catalog_hash_drift_fails_closed() -> None: + priv, pub, key_id = _keypair() + manifest = _signed_manifest(priv, key_id) + with pytest.raises(ConfigError, match="tool catalog hash"): + verify_agent_manifest_binding( + manifest, + {key_id: pub}, + authenticated_subject=AGENT_ID, + policy_bundle_hash=POLICY_HASH, + tool_catalog_hash="sha256:" + "0" * 64, + ) + + +def test_expired_manifest_fails_closed() -> None: + priv, pub, key_id = _keypair() + manifest = _signed_manifest(priv, key_id, expires_at="2026-06-16T00:00:00Z") + with pytest.raises(ConfigError, match="expired"): + verify_agent_manifest_binding( + manifest, + {key_id: pub}, + authenticated_subject=AGENT_ID, + policy_bundle_hash=POLICY_HASH, + tool_catalog_hash=CATALOG_HASH, + now=datetime(2026, 6, 17, tzinfo=UTC), + ) + + +def test_trust_anchor_loader_accepts_single_public_key(tmp_path: Path) -> None: + _, pub, key_id = _keypair() + path = tmp_path / "manifest-public-key.json" + path.write_text( + json.dumps({ + "algorithm": "Ed25519", + "key_id": key_id, + "public_key_base64url": _b64url(pub), + }) + ) + assert load_agent_manifest_trust_anchor(str(path)) == {key_id: pub} + + +def test_signing_pre_image_delegates_to_agent_manifest_sdk(monkeypatch) -> None: + manifest = {"manifest_id": "0197739a-8c00-7000-8000-000000000001"} + + def fake_signing_pre_image(manifest_arg): + assert manifest_arg is manifest + return b"sdk-pre-image" + + monkeypatch.setattr( + cmcp_agent_manifest.agent_manifest_sdk, + "signing_pre_image", + fake_signing_pre_image, + ) + + assert cmcp_agent_manifest.signing_pre_image(manifest) == b"sdk-pre-image" diff --git a/tests/unit/test_config.py b/tests/unit/test_config.py index be804af..5796998 100644 --- a/tests/unit/test_config.py +++ b/tests/unit/test_config.py @@ -50,6 +50,22 @@ def test_load_full_config(config_file): assert cfg.max_response_size_bytes == 1048576 +def test_load_agent_manifest_config(config_file): + path = config_file(""" + agent_manifest: + path: /etc/cmcp/agent-manifest.json + trust_anchor_path: /etc/cmcp/manifest-public-key.json + authenticated_subject: spiffe://factory.example/agent/material-movement/dev + """) + cfg = load_config(path) + assert cfg.agent_manifest.path == "/etc/cmcp/agent-manifest.json" + assert cfg.agent_manifest.trust_anchor_path == "/etc/cmcp/manifest-public-key.json" + assert ( + cfg.agent_manifest.authenticated_subject + == "spiffe://factory.example/agent/material-movement/dev" + ) + + def test_invalid_provider(config_file): path = config_file("attestation:\n provider: quantum\n") with pytest.raises(ConfigError, match="provider"): @@ -75,6 +91,29 @@ def test_unknown_key_raises(config_file): load_config(path) +def test_unknown_agent_manifest_key_raises(config_file): + path = config_file("agent_manifest:\n surprise: value\n") + with pytest.raises(ConfigError, match="surprise"): + load_config(path) + + +def test_agent_manifest_path_requires_trust_anchor(config_file): + path = config_file("agent_manifest:\n path: /etc/cmcp/agent-manifest.json\n") + with pytest.raises(ConfigError, match="set together"): + load_config(path) + + +def test_agent_manifest_subject_must_be_spiffe(config_file): + path = config_file(""" + agent_manifest: + path: /etc/cmcp/agent-manifest.json + trust_anchor_path: /etc/cmcp/manifest-public-key.json + authenticated_subject: not-a-spiffe-uri + """) + with pytest.raises(ConfigError, match="SPIFFE"): + load_config(path) + + def test_empty_config_uses_defaults(config_file): path = config_file("") cfg = load_config(path) diff --git a/tests/unit/test_session_manager.py b/tests/unit/test_session_manager.py index c5638fd..9c008ba 100644 --- a/tests/unit/test_session_manager.py +++ b/tests/unit/test_session_manager.py @@ -10,6 +10,7 @@ import pytest +from cmcp_runtime.agent_manifest import AgentManifestBinding from cmcp_runtime.audit.chain import AuditChain from cmcp_runtime.audit.keys import SigningKey from cmcp_runtime.session.manager import SessionManager @@ -63,6 +64,7 @@ def _make_ctx(*, stale_attestation: bool = False) -> MagicMock: tee_provider = MagicMock() tee_provider.get_attestation_report.return_value = MagicMock() ctx.tee_provider = tee_provider + ctx.agent_manifest = None return ctx @@ -95,6 +97,23 @@ def test_create_session_chain_has_session_start() -> None: assert chain.entries[0].entry_type == "session_start" +def test_create_session_carries_agent_manifest_binding() -> None: + ctx = _make_ctx() + ctx.agent_manifest = AgentManifestBinding( + manifest_id="0197739a-8c00-7000-8000-000000000001", + agent_id="spiffe://factory.example/agent/material-movement/dev", + authenticated_subject="spiffe://factory.example/agent/material-movement/dev", + subject_source="config", + issuer="spiffe://factory.example/signing-authority/development", + issuer_key_id="a" * 64, + policy_bundle_hash="sha256:" + "a" * 64, + tool_catalog_hash="sha256:" + "b" * 64, + ) + mgr = SessionManager(ctx) + state, _ = mgr.create_session() + assert state.session_id + + # ── close_session ───────────────────────────────────────────────────────────── @@ -138,6 +157,26 @@ def test_close_session_claim_session_id_matches() -> None: assert claim["gateway"]["session_id"] == state.session_id +def test_close_session_claim_includes_agent_identity_binding() -> None: + ctx = _make_ctx() + ctx.agent_manifest = AgentManifestBinding( + manifest_id="0197739a-8c00-7000-8000-000000000001", + agent_id="spiffe://factory.example/agent/material-movement/dev", + authenticated_subject="spiffe://factory.example/agent/material-movement/dev", + subject_source="config", + issuer="spiffe://factory.example/signing-authority/development", + issuer_key_id="a" * 64, + policy_bundle_hash="sha256:" + "a" * 64, + tool_catalog_hash="sha256:" + "b" * 64, + ) + mgr = SessionManager(ctx) + state, chain = mgr.create_session() + claim = mgr.close_session(state.session_id, state, chain) + assert claim["gateway"]["agent_identity"]["manifest_id"] == ctx.agent_manifest.manifest_id + assert claim["gateway"]["agent_identity"]["agent_id"] == ctx.agent_manifest.agent_id + assert claim["gateway"]["agent_identity"]["subject_source"] == "config" + + def test_close_session_attestation_stale_flag_false_when_fresh() -> None: mgr = SessionManager(_make_ctx(stale_attestation=False)) state, chain = mgr.create_session() diff --git a/tests/unit/test_startup.py b/tests/unit/test_startup.py index fec03b7..f05469c 100644 --- a/tests/unit/test_startup.py +++ b/tests/unit/test_startup.py @@ -2,13 +2,20 @@ from __future__ import annotations +import base64 +import hashlib import json import os from pathlib import Path from unittest.mock import MagicMock, patch import pytest +from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey +from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat +from cmcp_runtime.agent_manifest import SIGNED_FIELDS, signing_pre_image +from cmcp_runtime.catalog.loader import load_catalog +from cmcp_runtime.policy.bundle import load_policy_bundle from cmcp_runtime.startup import RuntimeContext, run_startup MANIFEST = { @@ -38,6 +45,59 @@ "approved_by": "test", } +AGENT_ID = "spiffe://factory.example/agent/material-movement/dev" + + +def _b64url(data: bytes) -> str: + return base64.urlsafe_b64encode(data).rstrip(b"=").decode() + + +def _write_agent_manifest_files( + tmp_path: Path, + *, + policy_hash: str, + catalog_hash: str, +) -> tuple[Path, Path]: + priv = Ed25519PrivateKey.generate() + pub = priv.public_key().public_bytes(Encoding.Raw, PublicFormat.Raw) + key_id = hashlib.sha256(pub).hexdigest() + manifest = { + "@context": "https://agentmanifest.agentrust.io/v0.1/context.json", + "@type": "AgentManifest", + "manifest_id": "0197739a-8c00-7000-8000-000000000001", + "agent_id": AGENT_ID, + "version": "0.1", + "issued_at": "2026-06-12T00:00:00Z", + "expires_at": "2099-09-10T00:00:00Z", + "issuer": "spiffe://factory.example/signing-authority/development", + "crypto_profile": "standard", + "artifacts": { + "policy_bundle": {"hash": policy_hash, "policy_language": "cedar"}, + "tool_manifest": {"catalog_hash": catalog_hash, "tools": []}, + }, + "delegation_chain": [], + } + manifest["signature"] = { + "algorithm": "Ed25519", + "key_id": key_id, + "key_type": "software", + "signed_at": "2026-06-12T00:00:00Z", + "signed_fields": list(SIGNED_FIELDS), + "signature_value": _b64url(priv.sign(signing_pre_image(manifest))), + } + + manifest_path = tmp_path / "agent-manifest.json" + key_path = tmp_path / "manifest-public-key.json" + manifest_path.write_text(json.dumps(manifest)) + key_path.write_text( + json.dumps({ + "algorithm": "Ed25519", + "key_id": key_id, + "public_key_base64url": _b64url(pub), + }) + ) + return manifest_path, key_path + @pytest.fixture def complete_setup(tmp_path: Path, monkeypatch): @@ -81,6 +141,52 @@ def test_startup_returns_gateway_context_with_all_fields(complete_setup): assert ctx.attestation_report.provider == "software-only" +def test_startup_binds_configured_agent_manifest(complete_setup): + config_path = Path(complete_setup) + tmp_path = config_path.parent + policy_hash = load_policy_bundle(str(tmp_path / "policy")).bundle_hash + catalog_hash = load_catalog(str(tmp_path / "catalog.json")).catalog_hash + manifest_path, key_path = _write_agent_manifest_files( + tmp_path, + policy_hash=policy_hash, + catalog_hash=catalog_hash, + ) + config_path.write_text( + config_path.read_text() + + "\nagent_manifest:\n" + + f" path: {manifest_path}\n" + + f" trust_anchor_path: {key_path}\n" + + f" authenticated_subject: {AGENT_ID}\n" + ) + + ctx = run_startup(str(config_path)) + assert ctx.agent_manifest is not None + assert ctx.agent_manifest.agent_id == AGENT_ID + + +def test_startup_fails_on_agent_manifest_subject_mismatch(complete_setup): + config_path = Path(complete_setup) + tmp_path = config_path.parent + policy_hash = load_policy_bundle(str(tmp_path / "policy")).bundle_hash + catalog_hash = load_catalog(str(tmp_path / "catalog.json")).catalog_hash + manifest_path, key_path = _write_agent_manifest_files( + tmp_path, + policy_hash=policy_hash, + catalog_hash=catalog_hash, + ) + config_path.write_text( + config_path.read_text() + + "\nagent_manifest:\n" + + f" path: {manifest_path}\n" + + f" trust_anchor_path: {key_path}\n" + + " authenticated_subject: spiffe://factory.example/agent/other/dev\n" + ) + + with pytest.raises(SystemExit) as exc_info: + run_startup(str(config_path)) + assert exc_info.value.code == 1 + + def test_startup_fails_on_missing_config(tmp_path): """Conformance: startup exits on invalid config.""" with pytest.raises(SystemExit) as exc_info: diff --git a/tests/unit/test_trace_claim.py b/tests/unit/test_trace_claim.py index 7f32fd3..2222bad 100644 --- a/tests/unit/test_trace_claim.py +++ b/tests/unit/test_trace_claim.py @@ -8,6 +8,7 @@ from cmcp_runtime.audit.chain import AuditChain from cmcp_runtime.audit.keys import SigningKey from cmcp_runtime.audit.trace_claim import ( + AgentIdentityInfo, AttestationReportInfo, CallGraphSummary, CallSummary, @@ -180,6 +181,12 @@ def test_generate_claim_session_id(): assert claim.gateway.session_id == "sess-001" +def test_generate_claim_subject_is_gateway_identity(): + claim = _make_claim() + assert claim.trace.subject.startswith("spiffe://cmcp.gateway/tee/") + assert claim.gateway.session_id == "sess-001" + + def test_generate_claim_cnf_jwk(): key = SigningKey() claim = _make_claim(signing_key=key) @@ -280,6 +287,43 @@ def test_generate_claim_software_only_platform(): assert claim.trace.runtime.firmware_version == "software-only-dev-mode" +def test_generate_claim_agent_identity_binding(): + key = SigningKey() + chain = AuditChain("sess-001") + claim = generate_trace_claim( + session_id="sess-001", + signing_key=key, + attestation_report=_make_report(), + policy_bundle=PolicyBundleInfo( + hash="sha256:" + "0" * 64, + enforcement_mode="enforcing", + policy_version="1.0.0", + ), + tool_catalog=ToolCatalogInfo(hash="sha256:" + "1" * 64), + call_summary=_make_call_summary(), + audit_chain_root=chain.chain_root, + audit_chain_tip=chain.chain_tip, + audit_chain_length=chain.length, + agent_identity=AgentIdentityInfo( + manifest_id="0197739a-8c00-7000-8000-000000000001", + agent_id="spiffe://factory.example/agent/material-movement/dev", + authenticated_subject="spiffe://factory.example/agent/material-movement/dev", + subject_source="config", + issuer="spiffe://factory.example/signing-authority/development", + issuer_key_id="a" * 64, + policy_bundle_hash="sha256:" + "0" * 64, + tool_catalog_hash="sha256:" + "1" * 64, + ), + do_sign=False, + ) + assert claim.gateway.agent_identity is not None + assert ( + claim.gateway.agent_identity.agent_id + == "spiffe://factory.example/agent/material-movement/dev" + ) + assert claim.gateway.agent_identity.subject_source == "config" + + # ── RuntimeClaim Pydantic validation ───────────────────────────────────────── diff --git a/tests/unit/test_verify.py b/tests/unit/test_verify.py index c8e088b..c322192 100644 --- a/tests/unit/test_verify.py +++ b/tests/unit/test_verify.py @@ -8,9 +8,14 @@ import secrets from datetime import UTC, datetime, timedelta +from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey +from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat + +from cmcp_runtime.agent_manifest import SIGNED_FIELDS, signing_pre_image from cmcp_runtime.audit.chain import AuditChain from cmcp_runtime.audit.keys import SigningKey from cmcp_runtime.audit.trace_claim import ( + AgentIdentityInfo, AttestationReportInfo, CallGraphSummary, CallSummary, @@ -28,6 +33,8 @@ POLICY_HASH = "sha256:" + "a" * 64 CATALOG_HASH = "sha256:" + "b" * 64 +AGENT_ID = "spiffe://factory.example/agent/material-movement/dev" +MANIFEST_ID = "0197739a-8c00-7000-8000-000000000001" def _make_nonce_for_key(key: SigningKey) -> str: @@ -47,7 +54,12 @@ def _make_nonce_for_key(key: SigningKey) -> str: return (fingerprint + salt).hex() -def _make_signed_claim(policy_hash=POLICY_HASH, catalog_hash=CATALOG_HASH, provider="software-only"): +def _make_signed_claim( + policy_hash=POLICY_HASH, + catalog_hash=CATALOG_HASH, + provider="software-only", + agent_identity: AgentIdentityInfo | None = None, +): key = SigningKey() chain = AuditChain("test-session") measurement = "DEVELOPMENT_ONLY" if provider == "software-only" else "ab" * 32 @@ -85,11 +97,63 @@ def _make_signed_claim(policy_hash=POLICY_HASH, catalog_hash=CATALOG_HASH, provi audit_chain_root=chain.chain_root, audit_chain_tip=chain.chain_tip, audit_chain_length=chain.length, + agent_identity=agent_identity, do_sign=True, ) return _to_dict(claim), key +def _manifest_keypair() -> tuple[Ed25519PrivateKey, bytes, str]: + priv = Ed25519PrivateKey.generate() + pub = priv.public_key().public_bytes(Encoding.Raw, PublicFormat.Raw) + return priv, pub, hashlib.sha256(pub).hexdigest() + + +def _b64url(data: bytes) -> str: + return base64.urlsafe_b64encode(data).rstrip(b"=").decode() + + +def _signed_manifest(priv: Ed25519PrivateKey, key_id: str) -> dict: + manifest = { + "@context": "https://agentmanifest.agentrust.io/v0.1/context.json", + "@type": "AgentManifest", + "manifest_id": MANIFEST_ID, + "agent_id": AGENT_ID, + "version": "0.1", + "issued_at": "2026-06-12T00:00:00Z", + "expires_at": "2099-09-10T00:00:00Z", + "issuer": "spiffe://factory.example/signing-authority/development", + "crypto_profile": "standard", + "artifacts": { + "policy_bundle": {"hash": POLICY_HASH, "policy_language": "cedar"}, + "tool_manifest": {"catalog_hash": CATALOG_HASH, "tools": []}, + }, + "delegation_chain": [], + } + manifest["signature"] = { + "algorithm": "Ed25519", + "key_id": key_id, + "key_type": "software", + "signed_at": "2026-06-12T00:00:00Z", + "signed_fields": list(SIGNED_FIELDS), + "signature_value": _b64url(priv.sign(signing_pre_image(manifest))), + } + return manifest + + +def _agent_identity(*, agent_id: str = AGENT_ID) -> AgentIdentityInfo: + return AgentIdentityInfo( + manifest_id=MANIFEST_ID, + agent_id=agent_id, + authenticated_subject=AGENT_ID, + subject_source="config", + issuer="spiffe://factory.example/signing-authority/development", + issuer_key_id="", + policy_bundle_hash=POLICY_HASH, + tool_catalog_hash=CATALOG_HASH, + ) + + def _approved(): return ApprovedHashes(policy_bundle_hash=POLICY_HASH, tool_catalog_hash=CATALOG_HASH) @@ -159,6 +223,37 @@ def test_mismatched_catalog_hash_fails(): assert "tool_catalog.hash" in result.unverified_fields +def test_agent_manifest_binding_is_verified(): + priv, pub, key_id = _manifest_keypair() + manifest = _signed_manifest(priv, key_id) + identity = _agent_identity() + identity.issuer_key_id = key_id + claim_dict, _ = _make_signed_claim(agent_identity=identity) + result = verify_trace_claim( + claim_dict, + _approved(), + agent_manifest=manifest, + trusted_agent_manifest_keys={key_id: pub}, + ) + assert "agent_manifest.binding" in result.verified_fields + + +def test_agent_manifest_binding_mismatch_fails(): + priv, pub, key_id = _manifest_keypair() + manifest = _signed_manifest(priv, key_id) + identity = _agent_identity(agent_id="spiffe://factory.example/agent/other/dev") + identity.issuer_key_id = key_id + claim_dict, _ = _make_signed_claim(agent_identity=identity) + result = verify_trace_claim( + claim_dict, + _approved(), + agent_manifest=manifest, + trusted_agent_manifest_keys={key_id: pub}, + ) + assert "agent_manifest.binding" in result.unverified_fields + assert result.failure_reason == VerificationError.AGENT_MANIFEST_MISMATCH + + # -- Attestation freshness ----------------------------------------------------