Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added

- `tool_transcript.entries`: privacy-preserving per-call view in the TRACE Claim (one entry per tool call with `tool_name`, `data_class` from the catalog, and the policy `decision`), derived from the audit chain so no raw parameters or response bodies are exposed. `tool_transcript.hash` continues to bind the full transcript to the audit-chain tip. Adds `transcript_entries_hash()` for offline recomputation. (#126)

## [0.2.0] - 2026-06-12

### Added
Expand Down
Binary file added audit.db
Binary file not shown.
19 changes: 18 additions & 1 deletion schemas/trace-claim.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,24 @@
"pattern": "^sha(256|384):[0-9a-f]+"
},
"call_count": { "type": "integer", "minimum": 0 },
"transcript_uri": { "type": "string" }
"transcript_uri": { "type": "string" },
"entries": {
"type": "array",
"description": "Privacy-preserving per-call view (#126): tool name, data class, and policy decision, derived from the audit chain. No raw parameters or response bodies.",
"items": {
"type": "object",
"additionalProperties": false,
"required": ["tool_name", "data_class", "decision"],
"properties": {
"tool_name": { "type": "string" },
"data_class": { "type": "string" },
"decision": {
"type": "string",
"enum": ["allow", "deny", "redact", "advisory_deny", "fault", "n/a"]
}
}
}
}
}
},
"cnf": {
Expand Down
54 changes: 51 additions & 3 deletions src/cmcp_runtime/audit/trace_claim.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@
from __future__ import annotations

import base64
import hashlib
import importlib.metadata
import json
from dataclasses import dataclass
from datetime import UTC, datetime
from typing import Annotated, Any, Literal

from agentrust_trace.models import JWK, ConfirmationKey, PolicyInfo, RuntimeInfo, ToolTranscript
from agentrust_trace.models import JWK, ConfirmationKey, PolicyInfo, RuntimeInfo
from pydantic import BaseModel, ConfigDict, Field

try:
Expand Down Expand Up @@ -117,6 +118,35 @@ class CatalogSummary(BaseModel):
drift_detected: bool = False


class ToolTranscriptEntry(BaseModel):
"""One privacy-preserving entry in the bound tool transcript (issue #126).

Derived from the audit chain, never from raw tool-call parameters or response
bodies: it carries only the tool name, the data class the call touched, and the
policy decision. No request/response payloads, so the transcript leaks no PII.
"""

model_config = ConfigDict(extra="forbid")

tool_name: str
data_class: str
decision: Literal["allow", "deny", "redact", "advisory_deny", "fault", "n/a"]


class ToolTranscriptOut(BaseModel):
"""cmcp tool_transcript: the canonical TRACE fields plus the privacy-preserving
entries array (issue #126). ``hash`` binds the full transcript to the audit chain
tip; ``entries`` lets a regulator read the per-call decision trail offline.
"""

model_config = ConfigDict(extra="forbid")

hash: Annotated[str, Field(pattern=r"^sha(256:[0-9a-f]{64}|384:[0-9a-f]{96})$")]
call_count: Annotated[int, Field(ge=0)] | None = None
transcript_uri: str | None = None
entries: list[ToolTranscriptEntry] | None = None


class GatewayTrace(BaseModel):
"""Phase 1 TRACE fields applicable to the cmcp runtime context."""

Expand All @@ -128,7 +158,7 @@ class GatewayTrace(BaseModel):
runtime: RuntimeInfo
policy: PolicyInfo
data_class: str
tool_transcript: ToolTranscript | None = None
tool_transcript: ToolTranscriptOut | None = None
cnf: ConfirmationKey


Expand Down Expand Up @@ -248,6 +278,22 @@ def _build_policy(bundle: PolicyBundleInfo) -> PolicyInfo:
)


def canonical_entries(entries: list[ToolTranscriptEntry]) -> bytes:
"""Canonical JSON of the transcript entries array, for offline hash verification."""
body = [e.model_dump() for e in entries]
return json.dumps(body, sort_keys=True, separators=(",", ":"), ensure_ascii=True).encode()


def transcript_entries_hash(entries: list[ToolTranscriptEntry]) -> str:
"""SHA-256 over the canonical entries array, as a ``sha256:`` digest string.

A verifier recomputes this from the entries and checks it against the digest the
profile carries. Distinct from ``tool_transcript.hash`` (the audit-chain tip): this
one binds the human-readable per-call view, the chain tip binds the full transcript.
"""
return "sha256:" + hashlib.sha256(canonical_entries(entries)).hexdigest()


def _build_cnf(signing_key: Any) -> ConfirmationKey:
pub_hex: str = signing_key.public_key_hex
x = base64.urlsafe_b64encode(bytes.fromhex(pub_hex)).rstrip(b"=").decode()
Expand All @@ -269,6 +315,7 @@ def generate_trace_claim(
audit_chain_root: str,
audit_chain_tip: str,
audit_chain_length: int,
transcript_entries: list[ToolTranscriptEntry] | None = None,
attestation_stale: bool = False,
catalog_exceptions: list[dict[str, str]] | None = None,
call_log_summary: CallLogSummary | None = None,
Expand All @@ -295,9 +342,10 @@ def generate_trace_claim(
runtime=_build_runtime(attestation_report),
policy=_build_policy(policy_bundle),
data_class=call_summary.session_max_sensitivity,
tool_transcript=ToolTranscript(
tool_transcript=ToolTranscriptOut(
hash=tool_transcript_hash,
call_count=call_summary.tool_calls_total,
entries=transcript_entries,
),
cnf=_build_cnf(signing_key),
)
Expand Down
22 changes: 22 additions & 0 deletions src/cmcp_runtime/session/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
CallSummary,
PolicyBundleInfo,
ToolCatalogInfo,
ToolTranscriptEntry,
generate_trace_claim,
)
from cmcp_runtime.session.call_log import CallLog, SessionCallLog
Expand Down Expand Up @@ -178,6 +179,26 @@ def close_session(
{e.tool_name for e in tool_calls if e.tool_name is not None}
)

# Privacy-preserving tool transcript (#126): one entry per tool call carrying
# the tool name, the data class from the catalog, and the policy decision.
# No request/response payloads, so the transcript leaks no PII. The entries
# are derived from the same audit chain whose tip is tool_transcript.hash.
transcript_entries: list[ToolTranscriptEntry] = []
for e in tool_calls:
if e.tool_name is None:
continue
catalog_entry = catalog.entries.get(e.tool_name)
data_class = (
catalog_entry.sensitivity_level if catalog_entry is not None else "unknown"
)
transcript_entries.append(
ToolTranscriptEntry(
tool_name=e.tool_name,
data_class=data_class,
decision=e.policy_decision or "n/a",
)
)

# Build call graph summary: prefer SessionCallLog (richer, with adjacency
# tracking) and fall back to deriving domains from the audit chain entries.
if session_call_log is not None:
Expand Down Expand Up @@ -232,6 +253,7 @@ def close_session(
audit_chain_root=chain.chain_root,
audit_chain_tip=chain.chain_tip,
audit_chain_length=chain.length,
transcript_entries=transcript_entries,
attestation_stale=attestation_stale,
catalog_exceptions=catalog_exceptions,
call_log_summary=call_log_summary,
Expand Down
94 changes: 94 additions & 0 deletions tests/unit/test_trace_claim.py
Original file line number Diff line number Diff line change
Expand Up @@ -366,3 +366,97 @@ def test_build_runtime_all_known_providers_accepted():
attestation_validity_seconds=86400,
)
_build_runtime(report) # must not raise


# ── tool_transcript entries (#126) ──────────────────────────────────────────────


def _three_entries() -> list:
from cmcp_runtime.audit.trace_claim import ToolTranscriptEntry

return [
ToolTranscriptEntry(tool_name="document_reader", data_class="confidential", decision="allow"),
ToolTranscriptEntry(tool_name="credit_score_lookup", data_class="confidential", decision="allow"),
ToolTranscriptEntry(tool_name="risk_report_writer", data_class="internal", decision="advisory_deny"),
]


def _claim_with_entries() -> RuntimeClaim:
chain = AuditChain("sess-126")
return generate_trace_claim(
session_id="sess-126",
signing_key=SigningKey(),
attestation_report=_make_report(),
policy_bundle=PolicyBundleInfo(
hash="sha256:" + "0" * 64, enforcement_mode="enforcing", policy_version="1.0.0"
),
tool_catalog=ToolCatalogInfo(hash="sha256:" + "1" * 64),
call_summary=_make_call_summary(),
audit_chain_root=chain.chain_root,
audit_chain_tip=chain.chain_tip,
audit_chain_length=chain.length,
transcript_entries=_three_entries(),
do_sign=False,
)


def test_transcript_entries_present_and_ordered():
claim = _claim_with_entries()
entries = claim.trace.tool_transcript.entries
assert entries is not None
assert [e.tool_name for e in entries] == [
"document_reader",
"credit_score_lookup",
"risk_report_writer",
]
assert [e.decision for e in entries] == ["allow", "allow", "advisory_deny"]


def test_transcript_hash_is_audit_chain_tip():
"""Acceptance #1: tool_transcript.hash binds to the audit chain tip."""
chain = AuditChain("sess-126")
claim = generate_trace_claim(
session_id="sess-126",
signing_key=SigningKey(),
attestation_report=_make_report(),
policy_bundle=PolicyBundleInfo(
hash="sha256:" + "0" * 64, enforcement_mode="enforcing", policy_version="1.0.0"
),
tool_catalog=ToolCatalogInfo(hash="sha256:" + "1" * 64),
call_summary=_make_call_summary(),
audit_chain_root=chain.chain_root,
audit_chain_tip=chain.chain_tip,
audit_chain_length=chain.length,
transcript_entries=_three_entries(),
do_sign=False,
)
assert claim.trace.tool_transcript.hash == f"sha256:{chain.chain_tip}"
assert claim.gateway.audit_chain.tip == chain.chain_tip


def test_transcript_entries_carry_no_payloads():
"""Privacy: serialized entries expose only tool_name, data_class, decision."""
claim = _claim_with_entries()
dumped = claim.model_dump(exclude_none=True)
for entry in dumped["trace"]["tool_transcript"]["entries"]:
assert set(entry.keys()) == {"tool_name", "data_class", "decision"}


def test_transcript_entries_hash_roundtrip():
"""A verifier can recompute the entries digest offline."""
from cmcp_runtime.audit.trace_claim import transcript_entries_hash

entries = _three_entries()
h = transcript_entries_hash(entries)
assert h.startswith("sha256:")
assert transcript_entries_hash(entries) == h
# A different decision changes the digest (tamper-evident).
entries[2].decision = "allow"
assert transcript_entries_hash(entries) != h


def test_transcript_entries_optional():
"""call_count is still set when no entries are supplied (backward compatible)."""
claim = _make_claim()
assert claim.trace.tool_transcript.entries is None
assert claim.trace.tool_transcript.call_count == 2
Loading