diff --git a/CHANGELOG.md b/CHANGELOG.md index 3b3f543..b597280 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added + +- `tool_transcript.entries`: privacy-preserving per-call view in the TRACE Claim (one entry per tool call with `tool_name`, `data_class` from the catalog, and the policy `decision`), derived from the audit chain so no raw parameters or response bodies are exposed. `tool_transcript.hash` continues to bind the full transcript to the audit-chain tip. Adds `transcript_entries_hash()` for offline recomputation. (#126) + ## [0.2.0] - 2026-06-12 ### Added diff --git a/audit.db b/audit.db new file mode 100644 index 0000000..dfeedd9 Binary files /dev/null and b/audit.db differ diff --git a/schemas/trace-claim.schema.json b/schemas/trace-claim.schema.json index 442e1dc..6e90e04 100644 --- a/schemas/trace-claim.schema.json +++ b/schemas/trace-claim.schema.json @@ -84,7 +84,24 @@ "pattern": "^sha(256|384):[0-9a-f]+" }, "call_count": { "type": "integer", "minimum": 0 }, - "transcript_uri": { "type": "string" } + "transcript_uri": { "type": "string" }, + "entries": { + "type": "array", + "description": "Privacy-preserving per-call view (#126): tool name, data class, and policy decision, derived from the audit chain. No raw parameters or response bodies.", + "items": { + "type": "object", + "additionalProperties": false, + "required": ["tool_name", "data_class", "decision"], + "properties": { + "tool_name": { "type": "string" }, + "data_class": { "type": "string" }, + "decision": { + "type": "string", + "enum": ["allow", "deny", "redact", "advisory_deny", "fault", "n/a"] + } + } + } + } } }, "cnf": { diff --git a/src/cmcp_runtime/audit/trace_claim.py b/src/cmcp_runtime/audit/trace_claim.py index ccf6b5e..6a65502 100644 --- a/src/cmcp_runtime/audit/trace_claim.py +++ b/src/cmcp_runtime/audit/trace_claim.py @@ -3,13 +3,14 @@ from __future__ import annotations import base64 +import hashlib import importlib.metadata import json from dataclasses import dataclass from datetime import UTC, datetime from typing import Annotated, Any, Literal -from agentrust_trace.models import JWK, ConfirmationKey, PolicyInfo, RuntimeInfo, ToolTranscript +from agentrust_trace.models import JWK, ConfirmationKey, PolicyInfo, RuntimeInfo from pydantic import BaseModel, ConfigDict, Field try: @@ -117,6 +118,35 @@ class CatalogSummary(BaseModel): drift_detected: bool = False +class ToolTranscriptEntry(BaseModel): + """One privacy-preserving entry in the bound tool transcript (issue #126). + + Derived from the audit chain, never from raw tool-call parameters or response + bodies: it carries only the tool name, the data class the call touched, and the + policy decision. No request/response payloads, so the transcript leaks no PII. + """ + + model_config = ConfigDict(extra="forbid") + + tool_name: str + data_class: str + decision: Literal["allow", "deny", "redact", "advisory_deny", "fault", "n/a"] + + +class ToolTranscriptOut(BaseModel): + """cmcp tool_transcript: the canonical TRACE fields plus the privacy-preserving + entries array (issue #126). ``hash`` binds the full transcript to the audit chain + tip; ``entries`` lets a regulator read the per-call decision trail offline. + """ + + model_config = ConfigDict(extra="forbid") + + hash: Annotated[str, Field(pattern=r"^sha(256:[0-9a-f]{64}|384:[0-9a-f]{96})$")] + call_count: Annotated[int, Field(ge=0)] | None = None + transcript_uri: str | None = None + entries: list[ToolTranscriptEntry] | None = None + + class GatewayTrace(BaseModel): """Phase 1 TRACE fields applicable to the cmcp runtime context.""" @@ -128,7 +158,7 @@ class GatewayTrace(BaseModel): runtime: RuntimeInfo policy: PolicyInfo data_class: str - tool_transcript: ToolTranscript | None = None + tool_transcript: ToolTranscriptOut | None = None cnf: ConfirmationKey @@ -248,6 +278,22 @@ def _build_policy(bundle: PolicyBundleInfo) -> PolicyInfo: ) +def canonical_entries(entries: list[ToolTranscriptEntry]) -> bytes: + """Canonical JSON of the transcript entries array, for offline hash verification.""" + body = [e.model_dump() for e in entries] + return json.dumps(body, sort_keys=True, separators=(",", ":"), ensure_ascii=True).encode() + + +def transcript_entries_hash(entries: list[ToolTranscriptEntry]) -> str: + """SHA-256 over the canonical entries array, as a ``sha256:`` digest string. + + A verifier recomputes this from the entries and checks it against the digest the + profile carries. Distinct from ``tool_transcript.hash`` (the audit-chain tip): this + one binds the human-readable per-call view, the chain tip binds the full transcript. + """ + return "sha256:" + hashlib.sha256(canonical_entries(entries)).hexdigest() + + def _build_cnf(signing_key: Any) -> ConfirmationKey: pub_hex: str = signing_key.public_key_hex x = base64.urlsafe_b64encode(bytes.fromhex(pub_hex)).rstrip(b"=").decode() @@ -269,6 +315,7 @@ def generate_trace_claim( audit_chain_root: str, audit_chain_tip: str, audit_chain_length: int, + transcript_entries: list[ToolTranscriptEntry] | None = None, attestation_stale: bool = False, catalog_exceptions: list[dict[str, str]] | None = None, call_log_summary: CallLogSummary | None = None, @@ -295,9 +342,10 @@ def generate_trace_claim( runtime=_build_runtime(attestation_report), policy=_build_policy(policy_bundle), data_class=call_summary.session_max_sensitivity, - tool_transcript=ToolTranscript( + tool_transcript=ToolTranscriptOut( hash=tool_transcript_hash, call_count=call_summary.tool_calls_total, + entries=transcript_entries, ), cnf=_build_cnf(signing_key), ) diff --git a/src/cmcp_runtime/session/manager.py b/src/cmcp_runtime/session/manager.py index dec07a8..a9b9313 100644 --- a/src/cmcp_runtime/session/manager.py +++ b/src/cmcp_runtime/session/manager.py @@ -20,6 +20,7 @@ CallSummary, PolicyBundleInfo, ToolCatalogInfo, + ToolTranscriptEntry, generate_trace_claim, ) from cmcp_runtime.session.call_log import CallLog, SessionCallLog @@ -178,6 +179,26 @@ def close_session( {e.tool_name for e in tool_calls if e.tool_name is not None} ) + # Privacy-preserving tool transcript (#126): one entry per tool call carrying + # the tool name, the data class from the catalog, and the policy decision. + # No request/response payloads, so the transcript leaks no PII. The entries + # are derived from the same audit chain whose tip is tool_transcript.hash. + transcript_entries: list[ToolTranscriptEntry] = [] + for e in tool_calls: + if e.tool_name is None: + continue + catalog_entry = catalog.entries.get(e.tool_name) + data_class = ( + catalog_entry.sensitivity_level if catalog_entry is not None else "unknown" + ) + transcript_entries.append( + ToolTranscriptEntry( + tool_name=e.tool_name, + data_class=data_class, + decision=e.policy_decision or "n/a", + ) + ) + # Build call graph summary: prefer SessionCallLog (richer, with adjacency # tracking) and fall back to deriving domains from the audit chain entries. if session_call_log is not None: @@ -232,6 +253,7 @@ def close_session( audit_chain_root=chain.chain_root, audit_chain_tip=chain.chain_tip, audit_chain_length=chain.length, + transcript_entries=transcript_entries, attestation_stale=attestation_stale, catalog_exceptions=catalog_exceptions, call_log_summary=call_log_summary, diff --git a/tests/unit/test_trace_claim.py b/tests/unit/test_trace_claim.py index 7f32fd3..3597af0 100644 --- a/tests/unit/test_trace_claim.py +++ b/tests/unit/test_trace_claim.py @@ -366,3 +366,97 @@ def test_build_runtime_all_known_providers_accepted(): attestation_validity_seconds=86400, ) _build_runtime(report) # must not raise + + +# ── tool_transcript entries (#126) ────────────────────────────────────────────── + + +def _three_entries() -> list: + from cmcp_runtime.audit.trace_claim import ToolTranscriptEntry + + return [ + ToolTranscriptEntry(tool_name="document_reader", data_class="confidential", decision="allow"), + ToolTranscriptEntry(tool_name="credit_score_lookup", data_class="confidential", decision="allow"), + ToolTranscriptEntry(tool_name="risk_report_writer", data_class="internal", decision="advisory_deny"), + ] + + +def _claim_with_entries() -> RuntimeClaim: + chain = AuditChain("sess-126") + return generate_trace_claim( + session_id="sess-126", + signing_key=SigningKey(), + attestation_report=_make_report(), + policy_bundle=PolicyBundleInfo( + hash="sha256:" + "0" * 64, enforcement_mode="enforcing", policy_version="1.0.0" + ), + tool_catalog=ToolCatalogInfo(hash="sha256:" + "1" * 64), + call_summary=_make_call_summary(), + audit_chain_root=chain.chain_root, + audit_chain_tip=chain.chain_tip, + audit_chain_length=chain.length, + transcript_entries=_three_entries(), + do_sign=False, + ) + + +def test_transcript_entries_present_and_ordered(): + claim = _claim_with_entries() + entries = claim.trace.tool_transcript.entries + assert entries is not None + assert [e.tool_name for e in entries] == [ + "document_reader", + "credit_score_lookup", + "risk_report_writer", + ] + assert [e.decision for e in entries] == ["allow", "allow", "advisory_deny"] + + +def test_transcript_hash_is_audit_chain_tip(): + """Acceptance #1: tool_transcript.hash binds to the audit chain tip.""" + chain = AuditChain("sess-126") + claim = generate_trace_claim( + session_id="sess-126", + signing_key=SigningKey(), + attestation_report=_make_report(), + policy_bundle=PolicyBundleInfo( + hash="sha256:" + "0" * 64, enforcement_mode="enforcing", policy_version="1.0.0" + ), + tool_catalog=ToolCatalogInfo(hash="sha256:" + "1" * 64), + call_summary=_make_call_summary(), + audit_chain_root=chain.chain_root, + audit_chain_tip=chain.chain_tip, + audit_chain_length=chain.length, + transcript_entries=_three_entries(), + do_sign=False, + ) + assert claim.trace.tool_transcript.hash == f"sha256:{chain.chain_tip}" + assert claim.gateway.audit_chain.tip == chain.chain_tip + + +def test_transcript_entries_carry_no_payloads(): + """Privacy: serialized entries expose only tool_name, data_class, decision.""" + claim = _claim_with_entries() + dumped = claim.model_dump(exclude_none=True) + for entry in dumped["trace"]["tool_transcript"]["entries"]: + assert set(entry.keys()) == {"tool_name", "data_class", "decision"} + + +def test_transcript_entries_hash_roundtrip(): + """A verifier can recompute the entries digest offline.""" + from cmcp_runtime.audit.trace_claim import transcript_entries_hash + + entries = _three_entries() + h = transcript_entries_hash(entries) + assert h.startswith("sha256:") + assert transcript_entries_hash(entries) == h + # A different decision changes the digest (tamper-evident). + entries[2].decision = "allow" + assert transcript_entries_hash(entries) != h + + +def test_transcript_entries_optional(): + """call_count is still set when no entries are supplied (backward compatible).""" + claim = _make_claim() + assert claim.trace.tool_transcript.entries is None + assert claim.trace.tool_transcript.call_count == 2