diff --git a/README.md b/README.md index a9f6d5f..251eb6b 100644 --- a/README.md +++ b/README.md @@ -197,6 +197,68 @@ async with CapiscioMCPClient( print(result) ``` +## MCPServerIdentity.connect() — "Let's Encrypt" Style Setup + +Register your MCP server and get a badge with a single call: + +```python +from capiscio_mcp import MCPServerIdentity + +identity = await MCPServerIdentity.connect( + server_id="550e8400-...", # From the dashboard + api_key="sk_live_...", +) + +print(identity.did) # did:web:registry.capisc.io:servers:550e8400-... +print(identity.badge) # Current badge JWS (auto-issued) +``` + +### Using Environment Variables + +```python +identity = await MCPServerIdentity.from_env() +``` + +| Variable | Required | Description | +|----------|----------|-------------| +| `CAPISCIO_SERVER_ID` | Yes | Server UUID from dashboard | +| `CAPISCIO_API_KEY` | Yes | Registry API key | +| `CAPISCIO_SERVER_URL` | No | Registry URL (default: production) | +| `CAPISCIO_SERVER_DOMAIN` | No | Domain for badge issuance | +| `CAPISCIO_SERVER_PRIVATE_KEY_PEM` | No | PEM-encoded Ed25519 private key for ephemeral environments | + +### Deploying to Containers / Serverless + +In ephemeral environments (Docker, Lambda, Cloud Run) the local `~/.capiscio/` directory +doesn't survive restarts. On first run the SDK generates a keypair and logs a capture hint: + +``` +╔══════════════════════════════════════════════════════════╗ +║ New server identity generated — save key for persistence ║ +╚══════════════════════════════════════════════════════════╝ + + Add to your secrets manager / .env: + + CAPISCIO_SERVER_PRIVATE_KEY_PEM='-----BEGIN PRIVATE KEY-----\nMC4C...\n-----END PRIVATE KEY-----\n' +``` + +Copy that value into your secrets manager and set it as an environment variable. +On subsequent starts the SDK will recover the same DID without generating a new identity. + +**Key resolution priority:** env var → local file → generate new. + +```yaml +# docker-compose.yml +services: + mcp-server: + environment: + CAPISCIO_SERVER_ID: "550e8400-..." + CAPISCIO_API_KEY: "sk_live_..." + CAPISCIO_SERVER_PRIVATE_KEY_PEM: "${MCP_SERVER_KEY}" # from secrets +``` + +See the [Deployment Guide](https://docs.capisc.io/mcp-guard/guides/deployment/) for full examples. + ## Core Connection Modes MCP Guard connects to capiscio-core for cryptographic operations: @@ -299,6 +361,11 @@ config = VerifyConfig( | Variable | Description | Default | |----------|-------------|---------| +| `CAPISCIO_SERVER_ID` | Server UUID (for `MCPServerIdentity`) | — | +| `CAPISCIO_API_KEY` | Registry API key (for `MCPServerIdentity`) | — | +| `CAPISCIO_SERVER_URL` | Registry server URL | `https://registry.capisc.io` | +| `CAPISCIO_SERVER_DOMAIN` | Domain for badge issuance | (derived from server URL) | +| `CAPISCIO_SERVER_PRIVATE_KEY_PEM` | PEM-encoded Ed25519 private key (ephemeral envs) | — | | `CAPISCIO_CORE_ADDR` | External core address | (embedded mode) | | `CAPISCIO_SERVER_ORIGIN` | Server origin for guard | (auto-detect) | | `CAPISCIO_LOG_LEVEL` | Logging verbosity | `info` | diff --git a/capiscio_mcp/__init__.py b/capiscio_mcp/__init__.py index 352c2e7..98f14f1 100644 --- a/capiscio_mcp/__init__.py +++ b/capiscio_mcp/__init__.py @@ -10,13 +10,30 @@ - Server identity registration for MCP servers - PoP (Proof of Possession) handshake for server key verification - Evidence logging for audit and forensics +- One-line server identity setup via MCPServerIdentity.connect() Installation: pip install capiscio-mcp # Standalone pip install capiscio-mcp[mcp] # With MCP SDK integration pip install capiscio-mcp[crypto] # With PoP signing/verification -Quickstart (Server-side): +Quickstart ("Let's Encrypt" style — recommended): + from capiscio_mcp import MCPServerIdentity + from capiscio_mcp.integrations.mcp import CapiscioMCPServer + + identity = await MCPServerIdentity.connect( + server_id=os.environ["CAPISCIO_SERVER_ID"], + api_key=os.environ["CAPISCIO_API_KEY"], + ) + server = CapiscioMCPServer(identity=identity) + + @server.tool(min_trust_level=2) + async def read_file(path: str) -> str: + ... + + server.run() + +Quickstart (@guard decorator): from capiscio_mcp import guard @guard(min_trust_level=2) @@ -33,7 +50,7 @@ async def read_database(query: str) -> list[dict]: if result.state == ServerState.VERIFIED_PRINCIPAL: print(f"Trusted at level {result.trust_level}") -Quickstart (Server Registration): +Quickstart (Server Registration, manual): from capiscio_mcp import setup_server_identity result = await setup_server_identity( @@ -95,6 +112,8 @@ async def read_database(query: str) -> list[dict]: RegistrationError, KeyGenerationError, ) +from capiscio_mcp.keeper import ServerBadgeKeeper +from capiscio_mcp.connect import MCPServerIdentity from capiscio_mcp._core.version import ( MCP_VERSION, CORE_MIN_VERSION, @@ -154,4 +173,7 @@ async def read_database(query: str) -> list[dict]: "setup_server_identity_sync", "RegistrationError", "KeyGenerationError", + # One-liner identity setup (MCPServerIdentity.connect()) + "MCPServerIdentity", + "ServerBadgeKeeper", ] diff --git a/capiscio_mcp/connect.py b/capiscio_mcp/connect.py new file mode 100644 index 0000000..f41fffa --- /dev/null +++ b/capiscio_mcp/connect.py @@ -0,0 +1,491 @@ +"""MCPServerIdentity — "Let's Encrypt" style MCP server identity setup. + +Mirrors ``CapiscIO.connect()`` from capiscio-sdk-python but for MCP servers. +Keys are persisted in ``~/.capiscio/mcp-servers/{server_id}/``. + +Usage:: + + import asyncio + import os + from capiscio_mcp import MCPServerIdentity + + async def main(): + identity = await MCPServerIdentity.connect( + server_id=os.environ["CAPISCIO_SERVER_ID"], + api_key=os.environ["CAPISCIO_API_KEY"], + ) + print(f"DID: {identity.did}") + print(f"Badge: {identity.badge}") + + # Or with environment variables + identity = await MCPServerIdentity.from_env() + + asyncio.run(main()) +""" + +from __future__ import annotations + +import asyncio +import logging +import os +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any, Callable, Optional +from urllib.parse import urlparse + +import base58 +import requests +from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey +from cryptography.hazmat.primitives.serialization import ( + Encoding, + NoEncryption, + PrivateFormat, + PublicFormat, + load_pem_private_key, +) + +from capiscio_mcp.keeper import ServerBadgeKeeper +from capiscio_mcp.registration import ( + RegistrationError, + generate_server_keypair, + register_server_identity, +) + +logger = logging.getLogger(__name__) + +DEFAULT_REGISTRY = "https://registry.capisc.io" +DEFAULT_MCP_KEYS_DIR = Path.home() / ".capiscio" / "mcp-servers" + +# Env var for injecting the private key in ephemeral environments +ENV_SERVER_PRIVATE_KEY = "CAPISCIO_SERVER_PRIVATE_KEY_PEM" + + +# --------------------------------------------------------------------------- +# Key derivation helpers +# --------------------------------------------------------------------------- + + +def _did_from_ed25519_pub_raw(pub_raw: bytes) -> str: + """Derive a did:key from raw Ed25519 public key bytes (32 bytes).""" + # Multicodec prefix 0xed01 identifies Ed25519 public keys + multicodec = b"\xed\x01" + pub_raw + return "did:key:z" + base58.b58encode(multicodec).decode() + + +def _load_private_key_pem(pem_text: str) -> tuple[Ed25519PrivateKey, str, str, str]: + """Load a PEM-encoded Ed25519 private key and derive all identity artefacts. + + Returns: + (private_key, private_key_pem, public_key_pem, did) + """ + key = load_pem_private_key(pem_text.encode(), password=None) + if not isinstance(key, Ed25519PrivateKey): + raise ValueError("Expected an Ed25519 private key") + + priv_pem = key.private_bytes(Encoding.PEM, PrivateFormat.PKCS8, NoEncryption()).decode() + pub_pem = key.public_key().public_bytes(Encoding.PEM, PublicFormat.SubjectPublicKeyInfo).decode() + pub_raw = key.public_key().public_bytes(Encoding.Raw, PublicFormat.Raw) + did = _did_from_ed25519_pub_raw(pub_raw) + return key, priv_pem, pub_pem, did + + +def _log_key_capture_hint(server_id: str, private_key_pem: str) -> None: + """Write a one-time hint to stderr telling the user how to persist key material. + + Uses ``print(..., file=sys.stderr)`` instead of the logger so the private + key never enters log aggregation pipelines. The hint is only emitted on + first-run key generation. + """ + import sys as _sys # local import — only needed for this hint + + escaped_pem = private_key_pem.replace("\n", "\\n") + hint = ( + "\n" + " ╔══════════════════════════════════════════════════════════════╗\n" + " ║ New server identity generated — save key for persistence ║\n" + " ╚══════════════════════════════════════════════════════════════╝\n" + "\n" + " If this server runs in an ephemeral environment (containers,\n" + " serverless, CI) the identity will be lost on restart unless\n" + " you persist the private key.\n" + "\n" + " Add to your secrets manager / .env:\n" + "\n" + f' CAPISCIO_SERVER_PRIVATE_KEY_PEM="{escaped_pem}"\n' + "\n" + " The DID will be re-derived automatically on startup.\n" + ) + print(hint, file=_sys.stderr, flush=True) + + +# --------------------------------------------------------------------------- +# Badge issuance helper +# --------------------------------------------------------------------------- + + +def _derive_domain(url: str) -> str: + """Extract the host (and non-standard port) from a URL for use as badge domain.""" + parsed = urlparse(url) + host = parsed.hostname or "localhost" + port = parsed.port + # Omit standard ports (80/443) from the domain string + if port and port not in (80, 443): + return f"{host}:{port}" + return host + + +def _issue_badge_sync( + server_id: str, + api_key: str, + ca_url: str, + domain: Optional[str] = None, +) -> Optional[str]: + """Call ``POST /v1/sdk/servers/{server_id}/badge`` and return the badge JWS.""" + url = f"{ca_url.rstrip('/')}/v1/sdk/servers/{server_id}/badge" + headers = { + "X-Capiscio-Registry-Key": api_key, + "Content-Type": "application/json", + } + effective_domain = domain or _derive_domain(ca_url) + try: + resp = requests.post(url, headers=headers, json={"domain": effective_domain}, timeout=30) + if resp.status_code in (200, 201): + try: + data = resp.json() + except ValueError as exc: + logger.warning("Badge issuance response was not valid JSON: %s", exc) + return None + # Try multiple common response shapes + nested = data.get("data") or {} + badge = ( + nested.get("badge") + or nested.get("token") + or data.get("badge") + or data.get("token") + ) + if badge: + logger.info("Badge issued for server %s", server_id) + return badge + logger.warning("Badge issue response had no badge field: %s", data) + return None + logger.warning( + "Badge issuance failed for server %s: %d — %s", + server_id, + resp.status_code, + resp.text, + ) + return None + except requests.RequestException as exc: + logger.warning("Badge issuance request failed: %s", exc) + return None + + +async def _issue_badge( + server_id: str, + api_key: str, + ca_url: str, + domain: Optional[str] = None, +) -> Optional[str]: + """Async wrapper for badge issuance.""" + loop = asyncio.get_event_loop() + return await loop.run_in_executor( + None, + _issue_badge_sync, + server_id, + api_key, + ca_url, + domain, + ) + + +# --------------------------------------------------------------------------- +# MCPServerIdentity dataclass +# --------------------------------------------------------------------------- + + +@dataclass +class MCPServerIdentity: + """Fully-configured MCP server identity returned by :meth:`connect`. + + Attributes: + server_id: MCP server UUID (from the CapiscIO dashboard). + did: Server DID (``did:key:z6Mk...``). + api_key: Registry API key. + server_url: Registry base URL. + keys_dir: Directory containing the server's keys. + badge: Current trust badge JWS (auto-renewed in background when keeper is running). + private_key_pem: PEM-encoded Ed25519 private key for PoP signing. + """ + + server_id: str + did: str + api_key: str + server_url: str + keys_dir: Path + badge: Optional[str] = None + private_key_pem: Optional[str] = None + _keeper: Any = field(default=None, repr=False) + + def get_badge(self) -> Optional[str]: + """Return the current badge, preferring the keeper's latest renewal.""" + if self._keeper is not None: + fresh = self._keeper.get_current_badge() + if fresh: + return fresh + return self.badge + + def close(self) -> None: + """Stop badge auto-renewal and release resources.""" + if self._keeper is not None: + try: + self._keeper.stop() + except Exception: + pass + self._keeper = None + + def __enter__(self) -> "MCPServerIdentity": + return self + + def __exit__(self, exc_type: object, exc_val: object, exc_tb: object) -> bool: + self.close() + return False + + # ------------------------------------------------------------------ + # Class-level factory methods + # ------------------------------------------------------------------ + + @classmethod + async def connect( + cls, + server_id: str, + api_key: str, + *, + server_url: str = DEFAULT_REGISTRY, + domain: Optional[str] = None, + keys_dir: Optional[Path] = None, + auto_badge: bool = True, + renewal_threshold: int = 30, + on_badge_renew: Optional[Callable[[str], None]] = None, + ) -> "MCPServerIdentity": + """Connect to CapiscIO and get a fully-configured MCP server identity. + + This is the "Let's Encrypt" style one-liner for MCP servers: + + 1. Checks ``~/.capiscio/mcp-servers/{server_id}/`` for existing keys + (idempotent — safe to call multiple times). + 2. Generates an Ed25519 keypair via capiscio-core if none exist. + 3. Registers the DID + public key with the registry. + 4. Issues an initial badge via ``POST /v1/sdk/servers/{server_id}/badge``. + 5. Starts :class:`~capiscio_mcp.keeper.ServerBadgeKeeper` for auto-renewal. + 6. Returns :class:`MCPServerIdentity` with all credentials loaded. + + Args: + server_id: MCP server UUID (from the CapiscIO dashboard). + api_key: Registry API key (``X-Capiscio-Registry-Key``). + server_url: Registry base URL (default: production). + domain: Domain to record in the badge (e.g. ``"tools.example.com"``). + Defaults to the hostname extracted from ``server_url``. + Use ``CAPISCIO_SERVER_DOMAIN`` env var via :meth:`from_env`. + keys_dir: Override for key storage directory. + auto_badge: If ``True``, issue an initial badge and start auto-renewal. + renewal_threshold: Renew badge this many seconds before expiry. + on_badge_renew: Optional callback ``(badge: str) -> None`` on renewal. + + Returns: + :class:`MCPServerIdentity` with ``.did``, ``.badge``, ``.keys_dir``, + ``.get_badge()``, and ``.close()``. + + Example:: + + identity = await MCPServerIdentity.connect( + server_id=os.environ["CAPISCIO_SERVER_ID"], + api_key=os.environ["CAPISCIO_API_KEY"], + ) + print(f"Server DID: {identity.did}") + + # Use in CapiscioMCPServer + server = CapiscioMCPServer(identity=identity) + """ + server_url = server_url.rstrip("/") + + # Step 1: Resolve keys directory + effective_keys_dir = Path(keys_dir) if keys_dir else (DEFAULT_MCP_KEYS_DIR / server_id) + effective_keys_dir.mkdir(parents=True, exist_ok=True) + + private_key_path = effective_keys_dir / "private_key.pem" + pub_key_path = effective_keys_dir / "public_key.pem" + did_file = effective_keys_dir / "did.txt" + + did: Optional[str] = None + private_key_pem: Optional[str] = None + pub_pem: Optional[str] = None + is_new_identity = False + + # ------------------------------------------------------------------ + # Step 2: Resolve private key — env var → local file → generate new + # ------------------------------------------------------------------ + env_pem = os.environ.get(ENV_SERVER_PRIVATE_KEY) + + if env_pem: + # --- Source: environment variable --- + env_pem = env_pem.replace("\\n", "\n") # Handle escaped newlines + _, private_key_pem, pub_pem, did = _load_private_key_pem(env_pem) + logger.info("Loaded server identity from %s: %s", ENV_SERVER_PRIVATE_KEY, did) + + # Persist to disk so subsequent restarts can use local file + private_key_path.write_text(private_key_pem) + os.chmod(private_key_path, 0o600) + pub_key_path.write_text(pub_pem) + did_file.write_text(did) + + elif private_key_path.exists(): + # --- Source: local file --- + raw_pem = private_key_path.read_text() + _, private_key_pem, pub_pem, did = _load_private_key_pem(raw_pem) + logger.info("Recovered server identity from local keys: %s", did) + + # Ensure public key & DID files are consistent + pub_key_path.write_text(pub_pem) + did_file.write_text(did) + + else: + # --- Source: generate new keypair --- + is_new_identity = True + logger.info("Generating Ed25519 keypair for MCP server %s...", server_id) + keys = await generate_server_keypair(output_dir=str(effective_keys_dir)) + did = keys["did_key"] + private_key_pem = keys["private_key_pem"] + pub_pem = keys.get("public_key_pem", "") + + # Persist DID for future recovery + did_file.write_text(did) + + # Persist public key for re-registration on recovery + if pub_pem: + pub_key_path.write_text(pub_pem) + + # Ensure private key is at the canonical path + if "private_key_path" in keys: + existing = Path(keys["private_key_path"]) + if existing.resolve() != private_key_path.resolve() and existing.exists(): + private_key_path.write_bytes(existing.read_bytes()) + os.chmod(private_key_path, 0o600) + elif not private_key_path.exists(): + private_key_path.write_text(private_key_pem) + os.chmod(private_key_path, 0o600) + + # ------------------------------------------------------------------ + # Step 3: Register DID with registry (idempotent — safe to repeat) + # ------------------------------------------------------------------ + assert did is not None # narrowing for type checkers + effective_pub_pem = pub_pem or "" + try: + await register_server_identity( + server_id=server_id, + api_key=api_key, + did=did, + public_key=effective_pub_pem, + ca_url=server_url, + ) + logger.info("Server identity registered: %s", did) + except RegistrationError as exc: + status_code = getattr(exc, "status_code", None) + if status_code in (None, 409): + # 409 = identity already registered (idempotent), None = network error + logger.debug("Registration returned: %s — continuing", exc) + else: + logger.warning( + "Server identity registration failed (status %s): %s", + status_code, + exc, + ) + raise + + # ------------------------------------------------------------------ + # Step 3.5: First-run capture hint & rotation warning + # ------------------------------------------------------------------ + if is_new_identity: + _log_key_capture_hint(server_id, private_key_pem) + + # Step 5: Issue initial badge and start keeper + badge: Optional[str] = None + keeper: Optional[ServerBadgeKeeper] = None + + if auto_badge: + badge = await _issue_badge(server_id, api_key, server_url, domain=domain) + if badge: + keeper = ServerBadgeKeeper( + server_id=server_id, + api_key=api_key, + initial_badge=badge, + ca_url=server_url, + renewal_threshold=renewal_threshold, + on_renew=on_badge_renew, + ) + keeper.start() + else: + logger.warning( + "Badge issuance failed — server identity set up without badge" + ) + + logger.info("MCPServerIdentity ready for server %s: %s", server_id, did) + return cls( + server_id=server_id, + did=did, # type: ignore[arg-type] + api_key=api_key, + server_url=server_url, + keys_dir=effective_keys_dir, + badge=badge, + private_key_pem=private_key_pem, + _keeper=keeper, + ) + + @classmethod + async def from_env(cls, **kwargs: Any) -> "MCPServerIdentity": + """Connect using environment variables. + + Reads: + - ``CAPISCIO_SERVER_ID`` (required) + - ``CAPISCIO_API_KEY`` (required) + - ``CAPISCIO_SERVER_URL`` (optional, default: production) + - ``CAPISCIO_SERVER_DOMAIN`` (optional, default: hostname from SERVER_URL) + - ``CAPISCIO_SERVER_PRIVATE_KEY_PEM`` (optional — PEM-encoded Ed25519 + private key for ephemeral environments; printed on first generation) + + Additional keyword arguments are forwarded to :meth:`connect`. + + Raises: + ValueError: If ``CAPISCIO_SERVER_ID`` or ``CAPISCIO_API_KEY`` is unset. + + Example:: + + # .env + # CAPISCIO_SERVER_ID=550e8400-e29b-41d4-a716-446655440000 + # CAPISCIO_API_KEY=sk_live_... + + identity = await MCPServerIdentity.from_env() + """ + server_id = os.environ.get("CAPISCIO_SERVER_ID") + if not server_id: + raise ValueError( + "CAPISCIO_SERVER_ID environment variable is required. " + "Create an MCP server at https://app.capisc.io" + ) + + api_key = os.environ.get("CAPISCIO_API_KEY") + if not api_key: + raise ValueError( + "CAPISCIO_API_KEY environment variable is required. " + "Get your API key at https://app.capisc.io" + ) + + server_url = os.environ.get("CAPISCIO_SERVER_URL", DEFAULT_REGISTRY) + domain = os.environ.get("CAPISCIO_SERVER_DOMAIN") + + return await cls.connect( + server_id=server_id, + api_key=api_key, + server_url=server_url, + domain=domain or None, + **kwargs, + ) diff --git a/capiscio_mcp/integrations/mcp.py b/capiscio_mcp/integrations/mcp.py index d76b2bc..66669d7 100644 --- a/capiscio_mcp/integrations/mcp.py +++ b/capiscio_mcp/integrations/mcp.py @@ -1,51 +1,61 @@ """ -MCP SDK Integration — requires `pip install capiscio-mcp[mcp]` +MCP SDK Integration — requires ``pip install capiscio-mcp[mcp]`` -Provides two separate integration classes: -1. Server-side: CapiscioMCPServer (guard tools, disclose identity, PoP signing) -2. Client-side: CapiscioMCPClient (verify server identity, PoP verification) +Provides two integration classes: + +1. **Server-side**: :class:`CapiscioMCPServer` — guard tools, disclose identity + via ``_meta`` in the initialize response, and sign PoP challenges. +2. **Client-side**: :class:`CapiscioMCPClient` — verify server identity extracted + from ``_meta`` on connection, enforce ``min_trust_level``. + +Usage (Server):: -Usage (Server): from capiscio_mcp.integrations.mcp import CapiscioMCPServer + from capiscio_mcp import MCPServerIdentity - server = CapiscioMCPServer( - name="filesystem", - did="did:web:mcp.example.com:servers:filesystem", - badge="eyJhbGc...", - private_key_path="/path/to/key.pem", # For PoP signing + identity = await MCPServerIdentity.connect( + server_id=os.environ["CAPISCIO_SERVER_ID"], + api_key=os.environ["CAPISCIO_API_KEY"], ) + server = CapiscioMCPServer(identity=identity) + @server.tool(min_trust_level=2) async def read_file(path: str) -> str: with open(path) as f: return f.read() - # Run the server server.run() -Usage (Client): +Usage (Client):: + from capiscio_mcp.integrations.mcp import CapiscioMCPClient async with CapiscioMCPClient( - server_url="https://mcp.example.com", - min_trust_level=2, - require_pop=True, # Require PoP verification + command="python server/main.py", + badge="eyJhbGc...", + min_trust_level=1, + fail_on_unverified=True, ) as client: - result = await client.call_tool("read_file", {"path": "/data/file.txt"}) + result = await client.call_tool("list_files", {"directory": "/tmp"}) """ from __future__ import annotations +import contextvars import logging import os from dataclasses import dataclass, field from functools import wraps from typing import Any, Callable, Coroutine, Dict, List, Optional, TypeVar, Union -# Check if MCP SDK (FastMCP) is available +# -------------------------------------------------------------------------- +# MCP server SDK (FastMCP) +# -------------------------------------------------------------------------- try: from mcp.server.fastmcp import FastMCP from mcp.types import Tool, TextContent + MCP_AVAILABLE = True except ImportError: FastMCP = None # type: ignore @@ -53,9 +63,13 @@ async def read_file(path: str) -> str: TextContent = None # type: ignore MCP_AVAILABLE = False +# -------------------------------------------------------------------------- +# MCP client SDK +# -------------------------------------------------------------------------- try: from mcp.client.session import ClientSession as McpClientSession from mcp.client.stdio import stdio_client, StdioServerParameters + MCP_CLIENT_AVAILABLE = True except ImportError: McpClientSession = None # type: ignore @@ -63,19 +77,22 @@ async def read_file(path: str) -> str: StdioServerParameters = None # type: ignore MCP_CLIENT_AVAILABLE = False -# Check if cryptography is available for PoP +# -------------------------------------------------------------------------- +# Cryptography (PoP) +# -------------------------------------------------------------------------- try: from cryptography.hazmat.primitives.asymmetric.ed25519 import ( Ed25519PrivateKey, Ed25519PublicKey, ) + CRYPTO_AVAILABLE = True except ImportError: Ed25519PrivateKey = None # type: ignore Ed25519PublicKey = None # type: ignore CRYPTO_AVAILABLE = False -from capiscio_mcp.types import ServerState, CallerCredential +from capiscio_mcp.types import ServerState, ServerErrorCode, CallerCredential from capiscio_mcp.server import ( verify_server, VerifyConfig, @@ -103,9 +120,146 @@ async def read_file(path: str) -> str: T = TypeVar("T") +# --------------------------------------------------------------------------- +# _meta injection machinery +# --------------------------------------------------------------------------- + +# Per-run contextvar carrying the _meta dict to inject into InitializeResult. +# Set by CapiscioMCPServer.run() before starting the server; cleared on exit. +_capiscio_meta_ctx: contextvars.ContextVar[Optional[Dict[str, Any]]] = contextvars.ContextVar( + "_capiscio_meta_ctx", default=None +) + + +def _install_credential_extraction(fastmcp_instance: "FastMCP") -> None: + """Wrap the registered ``CallToolRequest`` handler to extract caller credentials from ``_meta``. + + For stdio (non-HTTP) transport, the caller's CapiscIO badge must travel in the + JSON-RPC ``_meta`` of each tool call request under the key + ``capiscio_caller_badge`` (API key: ``capiscio_caller_api_key``). + + This is the stdio equivalent of the ``X-Capiscio-Badge`` HTTP header (RFC-002 §9.1). + The server-side ``@guard`` decorator reads the credential from the + ``_current_credential`` contextvar, which this wrapper sets before dispatching. + + For HTTP-based transports, callers MUST send the badge in the + ``X-Capiscio-Badge`` header (takes precedence over ``Authorization: Bearer``). + """ + try: + from mcp import types as _mcp_types + from capiscio_mcp.types import CallerCredential + from capiscio_mcp.guard import set_credential, _current_credential as _cred_ctx + except ImportError: + return + + mcp_server = getattr(fastmcp_instance, "_mcp_server", None) + if mcp_server is None: + return + + original_handler = mcp_server.request_handlers.get(_mcp_types.CallToolRequest) + if original_handler is None: + return + + async def _handler_with_credential(req: _mcp_types.CallToolRequest) -> Any: + # Extract caller credentials from _meta (RFC-008 §Appendix-B / _meta.capiscio convention) + badge: Optional[str] = None + api_key: Optional[str] = None + meta = getattr(req.params, "meta", None) + if meta is not None: + extra: Dict[str, Any] = getattr(meta, "model_extra", None) or {} + badge = extra.get("capiscio_caller_badge") + api_key = extra.get("capiscio_caller_api_key") + + if badge or api_key: + cred = CallerCredential(badge_jws=badge, api_key=api_key) + token = set_credential(cred) + try: + return await original_handler(req) + finally: + _cred_ctx.reset(token) + return await original_handler(req) + + mcp_server.request_handlers[_mcp_types.CallToolRequest] = _handler_with_credential + logger.debug("CapiscIO: installed caller credential extraction from _meta on CallToolRequest") + + +def _patch_server_session_once() -> bool: + """Idempotently patch ``ServerSession._received_request`` to inject ``_meta``. + + The patch wraps ``responder.respond`` for ``InitializeRequest`` messages so + that the ``meta`` field on the returned ``InitializeResult`` is set from the + ``_capiscio_meta_ctx`` contextvar. All other logic (protocol-version + negotiation, state transitions) continues to run in the original method. + + Returns: + ``True`` if the patch is now in place, ``False`` if the MCP SDK is not + installed. + """ + try: + from mcp.server.session import ServerSession + from mcp import types as _mcp_types + except ImportError: + return False + + if getattr(ServerSession, "_capiscio_meta_patched", False): + return True + + _original_received_request = ServerSession._received_request + + @wraps(_original_received_request) + async def _patched_received_request( + session_self: Any, + responder: Any, + ) -> None: + meta = _capiscio_meta_ctx.get() + + # Fast path: no meta to inject — delegate straight to original + if meta is None: + await _original_received_request(session_self, responder) + return + + # Only intercept InitializeRequest messages + try: + req_root = responder.request.root + except AttributeError: + await _original_received_request(session_self, responder) + return + + if not isinstance(req_root, _mcp_types.InitializeRequest): + await _original_received_request(session_self, responder) + return + + # Wrap responder.respond to inject _meta into the InitializeResult + _original_respond = responder.respond + + async def _respond_with_meta(result: Any) -> None: + try: + # ServerResult is a RootModel; result.root is InitializeResult + inner = getattr(result, "root", result) + if isinstance(inner, _mcp_types.InitializeResult): + inner.meta = meta + except Exception as exc: + logger.debug("_meta injection: failed to set meta on result: %s", exc) + await _original_respond(result) + + responder.respond = _respond_with_meta + try: + await _original_received_request(session_self, responder) + finally: + responder.respond = _original_respond + + ServerSession._received_request = _patched_received_request # type: ignore[method-assign] + ServerSession._capiscio_meta_patched = True # type: ignore[attr-defined] + logger.debug("CapiscIO: patched ServerSession._received_request for _meta injection") + return True + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + def _require_mcp_server() -> None: - """Raise ImportError if MCP server SDK (FastMCP) is not available.""" if not MCP_AVAILABLE: raise ImportError( "MCP SDK integration requires the 'mcp' package. " @@ -114,7 +268,6 @@ def _require_mcp_server() -> None: def _require_mcp_client() -> None: - """Raise ImportError if MCP client SDK is not available.""" if not MCP_CLIENT_AVAILABLE: raise ImportError( "MCP client integration requires the 'mcp' package. " @@ -122,43 +275,59 @@ def _require_mcp_client() -> None: ) +# --------------------------------------------------------------------------- +# CapiscioMCPServer +# --------------------------------------------------------------------------- + + class CapiscioMCPServer: - """ - MCP Server with CapiscIO identity disclosure, PoP signing, and tool guarding. - + """MCP Server with CapiscIO identity disclosure, PoP signing, and tool guarding. + This class wraps FastMCP to: - 1. Automatically inject identity into initialize response _meta - 2. Sign PoP challenges to prove key ownership (RFC-007) - 3. Guard registered tools with @guard decorator for trust enforcement - - Attributes: - name: Server name - did: Server DID (did:web:... or did:key:...) - badge: Server trust badge JWS (optional but recommended) - default_min_trust_level: Default minimum trust level for tools - pop_enabled: Whether PoP signing is available - - Example: - server = CapiscioMCPServer( - name="filesystem", - did="did:web:mcp.example.com:servers:filesystem", - badge=os.environ.get("SERVER_BADGE"), - private_key_path="/path/to/server.key.pem", + + 1. Automatically inject server identity (DID, badge) into the ``_meta`` + field of MCP ``initialize`` responses (RFC-007 §6.2). + 2. Sign PoP challenges from clients to prove key ownership (RFC-007). + 3. Guard registered tools with trust-level requirements (RFC-006). + + The ``_meta`` injection works by patching + ``mcp.server.session.ServerSession._received_request`` **once** (globally, + idempotently) at server start-time. A per-invocation contextvar carries the + meta dict so that only this server's responses are affected. + + Example:: + + from capiscio_mcp import MCPServerIdentity + from capiscio_mcp.integrations.mcp import CapiscioMCPServer + + identity = await MCPServerIdentity.connect( + server_id=os.environ["CAPISCIO_SERVER_ID"], + api_key=os.environ["CAPISCIO_API_KEY"], ) - + + server = CapiscioMCPServer(identity=identity) + @server.tool(min_trust_level=2) async def read_file(path: str) -> str: with open(path) as f: return f.read() - - # Run the server + server.run() + + You can also supply credentials directly (without ``MCPServerIdentity``):: + + server = CapiscioMCPServer( + name="filesystem", + did="did:web:mcp.example.com:servers:filesystem", + badge=os.environ.get("SERVER_BADGE"), + private_key_path="/path/to/server.key.pem", + ) """ - + def __init__( self, - name: str, - did: str, + name: Optional[str] = None, + did: Optional[str] = None, badge: Optional[str] = None, default_min_trust_level: int = 0, version: str = "1.0.0", @@ -166,117 +335,131 @@ def __init__( private_key_path: Optional[str] = None, private_key_pem: Optional[Union[str, bytes]] = None, key_id: Optional[str] = None, - ): - """ - Initialize CapiscIO MCP Server. - + # Convenience: pass MCPServerIdentity directly + identity: Optional[Any] = None, + ) -> None: + """Initialize ``CapiscioMCPServer``. + Args: - name: Server name (shown to clients) - did: Server DID for identity disclosure - badge: Server badge JWS for identity verification - default_min_trust_level: Default minimum trust level for tools - version: Server version string - private_key: Ed25519 private key for PoP signing (optional) - private_key_path: Path to PEM file containing private key (optional) - private_key_pem: PEM-encoded private key string/bytes (optional) - key_id: Key ID for JWS header (defaults to DID#keys-1) + name: Server name shown to clients. + did: Server DID for identity disclosure. + badge: Server badge JWS for client verification. + default_min_trust_level: Default minimum trust level for tools. + version: Server version string. + private_key: ``Ed25519PrivateKey`` object for PoP signing (optional). + private_key_path: Path to PEM file containing private key (optional). + private_key_pem: PEM-encoded private key string/bytes (optional). + key_id: Key ID for JWS header (defaults to ``{did}#keys-1``). + identity: :class:`~capiscio_mcp.connect.MCPServerIdentity` instance. + When provided, ``name``, ``did``, ``badge``, and + ``private_key_pem`` are derived from it automatically. """ _require_mcp_server() - - self.name = name + + # Accept MCPServerIdentity as a convenience shortcut + if identity is not None: + name = name or getattr(identity, "server_id", "mcp-server") + if did is None: + did = getattr(identity, "did", None) + if badge is None: + badge = identity.get_badge() if callable(getattr(identity, "get_badge", None)) else getattr(identity, "badge", None) + if private_key_pem is None and private_key is None and private_key_path is None: + private_key_pem = getattr(identity, "private_key_pem", None) + + if not did: + raise ValueError("'did' is required (or provide an 'identity' with a DID)") + + self.name = name or "mcp-server" self.did = did self.badge = badge self.default_min_trust_level = default_min_trust_level self.version = version - + # Load private key for PoP signing self._private_key: Optional["Ed25519PrivateKey"] = None self._key_id = key_id or f"{did}#keys-1" - + if private_key is not None: self._private_key = private_key elif private_key_path is not None: self._load_private_key_from_file(private_key_path) elif private_key_pem is not None: self._private_key = load_private_key_from_pem(private_key_pem) - + # Create underlying FastMCP server - self._server = FastMCP(name) + self._server = FastMCP(self.name) self._tools: Dict[str, Callable] = {} self._tool_configs: Dict[str, GuardConfig] = {} - - self._setup_identity_injection() - + + # Attempt to install the session patch so _meta can be injected at run-time + if MCP_AVAILABLE: + _patch_server_session_once() + # Install handler wrapper that extracts caller credentials from _meta + # for stdio transport (where HTTP headers are not available). + _install_credential_extraction(self._server) + + # ------------------------------------------------------------------ + # Private helpers + # ------------------------------------------------------------------ + def _load_private_key_from_file(self, path: str) -> None: - """Load private key from PEM file.""" if not CRYPTO_AVAILABLE: logger.warning( "PoP signing requires 'cryptography' package. " "Install with: pip install capiscio-mcp[crypto]" ) return - try: - with open(path, "rb") as f: - pem_data = f.read() + with open(path, "rb") as fh: + pem_data = fh.read() self._private_key = load_private_key_from_pem(pem_data) - logger.debug(f"Loaded private key from {path}") - except Exception as e: - logger.warning(f"Failed to load private key from {path}: {e}") - + logger.debug("Loaded private key from %s", path) + except Exception as exc: + logger.warning("Failed to load private key from %s: %s", path, exc) + + # ------------------------------------------------------------------ + # Public properties + # ------------------------------------------------------------------ + @property def pop_enabled(self) -> bool: - """Check if PoP signing is available.""" + """Whether PoP signing is available (private key loaded).""" return self._private_key is not None - - def _setup_identity_injection(self) -> None: - """ - Set up identity injection into initialize response. - - Per RFC-007 §6.2, server identity is disclosed via _meta in - the initialize response. - """ - # The MCP SDK provides hooks for customizing responses - # This implementation depends on the specific MCP SDK version - # For now, we'll store the identity info to be included - self._identity_meta = { - "capiscio_server_did": self.did, - } - if self.badge: - self._identity_meta["capiscio_server_badge"] = self.badge - + + @property + def server(self) -> "FastMCP": + """The underlying :class:`~mcp.server.fastmcp.FastMCP` instance.""" + return self._server + + @property + def identity_meta(self) -> Dict[str, str]: + """The identity ``_meta`` dict (DID + badge) used in initialize responses.""" + return self._identity_meta.copy() + + # ------------------------------------------------------------------ + # Identity meta builder + # ------------------------------------------------------------------ + def create_initialize_response_meta( self, request_meta: Optional[Dict[str, Any]] = None, ) -> Dict[str, Any]: - """ - Create the _meta object for initialize response. - - This method should be called when building the initialize response. - It includes: - 1. Server identity (DID, badge) - 2. PoP response (if client sent PoP request and we have a private key) - + """Build the ``_meta`` dict for an ``initialize`` response (RFC-007 §6.2). + + Includes server identity (DID, badge) and, if the client sent a PoP nonce + and a private key is loaded, a PoP signature. + Args: - request_meta: The _meta from the initialize request (for PoP) - + request_meta: The ``_meta`` from the ``initialize`` *request* (for PoP). + Returns: - Dict to include as _meta in initialize response - - Example: - # In your initialize handler - def handle_initialize(request): - response_meta = server.create_initialize_response_meta( - request_meta=request.params.get("_meta") - ) - return InitializeResult( - capabilities=..., - _meta=response_meta, - ) + Dict to be included as ``_meta`` in the ``InitializeResult``. """ - meta = self._identity_meta.copy() - - # Handle PoP if client sent nonce and we have a key + meta: Dict[str, Any] = {"capiscio_server_did": self.did} + if self.badge: + meta["capiscio_server_badge"] = self.badge + + # Attach PoP signature if client sent a nonce and we have a key if self._private_key is not None and request_meta is not None: pop_request = PoPRequest.from_meta(request_meta) if pop_request is not None: @@ -288,11 +471,23 @@ def handle_initialize(request): ) meta.update(pop_response.to_meta()) logger.debug("Added PoP signature to initialize response") - except Exception as e: - logger.warning(f"Failed to create PoP response: {e}") - + except Exception as exc: + logger.warning("Failed to create PoP response: %s", exc) + return meta - + + # Internal alias used by _setup_identity_injection (legacy compat) + @property + def _identity_meta(self) -> Dict[str, Any]: + meta: Dict[str, Any] = {"capiscio_server_did": self.did} + if self.badge: + meta["capiscio_server_badge"] = self.badge + return meta + + # ------------------------------------------------------------------ + # Tool registration + # ------------------------------------------------------------------ + def tool( self, name: Optional[str] = None, @@ -300,128 +495,112 @@ def tool( min_trust_level: Optional[int] = None, config: Optional[GuardConfig] = None, ) -> Callable[[Callable[..., Coroutine[Any, Any, T]]], Callable[..., Coroutine[Any, Any, T]]]: - """ - Register a tool with CapiscIO guard. - - This decorator: - 1. Registers the function as an MCP tool via FastMCP - 2. Wraps it with @guard for access control based on caller trust level - + """Register a tool with CapiscIO trust-level guarding. + + Wraps the function with :func:`~capiscio_mcp.guard.guard` for access + control based on caller trust level, then registers it with FastMCP. + Args: - name: Tool name (default: function name) - description: Tool description - min_trust_level: Minimum trust level (overrides default) - config: Full guard configuration - - Returns: - Decorator function - - Example: + name: Tool name (defaults to function name). + description: Tool description (defaults to docstring). + min_trust_level: Minimum trust level (overrides server default). + config: Full :class:`~capiscio_mcp.guard.GuardConfig`. + + Example:: + @server.tool(min_trust_level=2) async def execute_query(sql: str) -> list[dict]: ... """ + def decorator( func: Callable[..., Coroutine[Any, Any, T]] ) -> Callable[..., Coroutine[Any, Any, T]]: tool_name = name or func.__name__ tool_description = description or func.__doc__ or f"Tool: {tool_name}" - - # Build effective config + effective_config = config or GuardConfig() if min_trust_level is not None: effective_config.min_trust_level = min_trust_level elif effective_config.min_trust_level == 0: effective_config.min_trust_level = self.default_min_trust_level - - # Apply guard decorator + guarded_func = guard(config=effective_config, tool_name=tool_name)(func) - - # Store for reference + self._tools[tool_name] = guarded_func self._tool_configs[tool_name] = effective_config - - # Register with FastMCP server using its @tool decorator - # FastMCP will handle the MCP protocol details + self._server.tool(name=tool_name, description=tool_description)(guarded_func) - - logger.debug(f"Registered tool '{tool_name}' with trust level {effective_config.min_trust_level}") - + + logger.debug( + "Registered tool '%s' with trust level %d", + tool_name, + effective_config.min_trust_level, + ) return guarded_func - + return decorator - - @property - def server(self) -> "FastMCP": - """Access the underlying FastMCP server.""" - return self._server - - @property - def identity_meta(self) -> Dict[str, str]: - """Get the identity metadata for initialize response.""" - return self._identity_meta.copy() - + + # ------------------------------------------------------------------ + # Run + # ------------------------------------------------------------------ + def run(self, transport: str = "stdio") -> None: - """ - Run the server with the specified transport. - + """Run the server with ``_meta`` identity injection enabled. + + Sets ``_capiscio_meta_ctx`` so that the patched ``ServerSession`` + injects the CapiscIO identity into every ``initialize`` response for + the lifetime of this call. + Args: - transport: Transport type - "stdio" (default) or "streamable-http" - - Example: - server.run() # stdio transport - server.run(transport="streamable-http") # HTTP transport + transport: ``"stdio"`` (default) or ``"streamable-http"``. """ - self._server.run(transport=transport) - + meta = self.create_initialize_response_meta() + token = _capiscio_meta_ctx.set(meta) + try: + self._server.run(transport=transport) + finally: + _capiscio_meta_ctx.reset(token) + def run_stdio(self) -> None: - """Run the server over stdio transport. - - Deprecated: Use run() instead. - """ - # For backwards compatibility, delegate to run() - self._server.run(transport="stdio") - + """Run over stdio transport (deprecated — use :meth:`run` instead).""" + self.run(transport="stdio") + def run_sse(self, port: int = 8080) -> None: - """Run the server over SSE transport. - - Deprecated: Use run(transport="sse") instead. SSE is deprecated in favor of streamable-http. - """ - logger.warning("SSE transport is deprecated, use streamable-http instead") - self._server.run(transport="sse") + """Run over SSE transport (deprecated — use ``run(transport='streamable-http')``).""" + logger.warning("SSE transport is deprecated; use streamable-http instead") + self.run(transport="sse") + + +# --------------------------------------------------------------------------- +# CapiscioMCPClient +# --------------------------------------------------------------------------- class CapiscioMCPClient: - """ - MCP Client with automatic server identity and PoP verification. - - This class wraps MCP client functionality to: - 1. Generate PoP request (nonce) for initialize request - 2. Verify server identity and PoP response on connection - 3. Enforce trust level requirements - 4. Include caller credentials in tool requests - - Attributes: - server_url: URL of the MCP server - min_trust_level: Minimum required trust level - fail_on_unverified: If True, raise on unverified servers - require_pop: If True, require PoP verification (did:key servers) - pop_verified: Whether PoP verification succeeded - - Example: + """MCP Client with automatic server identity and PoP verification. + + On connection this client: + + 1. Calls ``session.initialize()`` to get the MCP ``InitializeResult``. + 2. Extracts ``capiscio_server_did`` and ``capiscio_server_badge`` from + ``result.meta`` (the ``_meta`` dict in the JSON response). + 3. Calls :func:`~capiscio_mcp.server.verify_server` with those values. + 4. Enforces ``min_trust_level`` and ``fail_on_unverified`` constraints. + + Example:: + async with CapiscioMCPClient( - server_url="https://mcp.example.com", - min_trust_level=2, - require_pop=True, - badge="eyJhbGc...", # Your client badge + command="python server/main.py", + badge=agent.badge, + min_trust_level=1, + fail_on_unverified=True, ) as client: - # Server identity and PoP already verified - print(f"Trusted at level {client.server_trust_level}") - print(f"PoP verified: {client.pop_verified}") - - result = await client.call_tool("read_file", {"path": "/data/file.txt"}) - - For stdio transport (subprocess server): + print(f"Server verified at level {client.server_trust_level}") + files = await client.call_tool("list_files", {"directory": "/tmp"}) + + For stdio transport (subprocess server):: + async with CapiscioMCPClient( command="python", args=["my_mcp_server.py"], @@ -429,7 +608,7 @@ class CapiscioMCPClient: ) as client: result = await client.call_tool("my_tool", {"arg": "value"}) """ - + def __init__( self, server_url: Optional[str] = None, @@ -441,33 +620,37 @@ def __init__( verify_config: Optional[VerifyConfig] = None, badge: Optional[str] = None, api_key: Optional[str] = None, - ): - """ - Initialize CapiscIO MCP Client. - + env: Optional[Dict[str, str]] = None, + ) -> None: + """Initialize ``CapiscioMCPClient``. + Args: - server_url: URL of the MCP server (for HTTP transport) - command: Command to run server (for stdio transport) - args: Arguments for command (for stdio transport) - min_trust_level: Minimum required server trust level - fail_on_unverified: If True, raise when server doesn't disclose identity - require_pop: If True, require PoP verification for did:key servers - verify_config: Full verification configuration - badge: Client badge for authentication (recommended) - api_key: Client API key for authentication (alternative) - + server_url: MCP server URL (HTTP transport — not yet implemented). + command: Command to launch server process (stdio transport). + args: Arguments for the server command. + min_trust_level: Minimum required server trust level. + fail_on_unverified: If ``True``, raise when server doesn't disclose identity. + require_pop: If ``True``, require PoP verification for ``did:key`` servers. + verify_config: Full :class:`~capiscio_mcp.server.VerifyConfig`. + badge: Client badge JWS for authentication. + api_key: Client API key for authentication (alternative to badge). + env: Additional environment variables to pass to the server subprocess + (stdio transport only). All ``CAPISCIO_*`` variables from the + current process are forwarded automatically; use this to override + or extend them. + Raises: - ValueError: If neither server_url nor command is provided + ValueError: If neither ``server_url`` nor ``command`` is provided. + ImportError: If the ``mcp`` package is not installed. """ _require_mcp_client() - - # Ensure at least one transport method is configured + if server_url is None and command is None: raise ValueError( "Either server_url or command must be provided to CapiscioMCPClient " "to select an HTTP or stdio transport." ) - + self.server_url = server_url self.command = command self.args = args or [] @@ -475,97 +658,86 @@ def __init__( self.fail_on_unverified = fail_on_unverified self.require_pop = require_pop self.verify_config = verify_config or VerifyConfig(min_trust_level=min_trust_level) - - # Client credentials + self._extra_env = env or {} + self._credential = CallerCredential( badge_jws=badge, api_key=api_key, ) - + self._session: Optional[McpClientSession] = None self._context_manager: Optional[Any] = None self._verify_result: Optional[VerifyResult] = None - + # PoP state self._pop_request: Optional[PoPRequest] = None self._pop_response: Optional[PoPResponse] = None self._pop_verified: bool = False - + + # ------------------------------------------------------------------ + # Initialize request _meta (PoP nonce) + # ------------------------------------------------------------------ + def create_initialize_request_meta(self) -> Dict[str, Any]: - """ - Create the _meta object for initialize request. - - This should be called when building the initialize request. - It generates a PoP nonce to be signed by the server. - + """Create the ``_meta`` dict for the ``initialize`` request (PoP nonce). + Returns: - Dict to include as _meta in initialize request - - Example: - # In your client code - meta = client.create_initialize_request_meta() - result = await session.initialize( - client_info=ClientInfo(...), - _meta=meta, - ) + Dict to include as ``_meta`` in the ``initialize`` request. """ self._pop_request = generate_pop_request() return self._pop_request.to_meta() - + + # ------------------------------------------------------------------ + # Server verification + # ------------------------------------------------------------------ + def verify_initialize_response( self, response_meta: Optional[Dict[str, Any]], server_public_key: Optional["Ed25519PublicKey"] = None, ) -> bool: - """ - Verify the initialize response including PoP. - - This should be called after receiving the initialize response. - It extracts the PoP signature and verifies it. - + """Verify the ``initialize`` response including optional PoP. + Args: - response_meta: The _meta from initialize response - server_public_key: Server's public key for PoP verification - (if None, will try to extract from did:key) - + response_meta: The ``_meta`` dict from the ``InitializeResult``. + server_public_key: Server public key for PoP (auto-extracted from + ``did:key`` if not provided). + Returns: - True if PoP verification succeeded, False otherwise - + ``True`` if PoP verification succeeded, ``False`` otherwise. + Raises: - PoPSignatureError: If PoP verification fails and require_pop=True + :class:`~capiscio_mcp.pop.PoPSignatureError`: If PoP is required and fails. """ if response_meta is None: logger.debug("No _meta in initialize response") return False - - # Extract PoP response + self._pop_response = PoPResponse.from_meta(response_meta) if self._pop_response is None: logger.debug("No PoP response in initialize response") return False - + if self._pop_request is None: logger.warning("PoP response received but no request was sent") return False - - # Get public key for verification + if server_public_key is None: - # Try to extract from server DID server_did = response_meta.get("capiscio_server_did") if server_did and server_did.startswith("did:key:"): try: server_public_key = extract_public_key_from_did_key(server_did) - except Exception as e: - logger.warning(f"Failed to extract public key from DID: {e}") + except Exception as exc: + logger.warning("Failed to extract public key from DID: %s", exc) if self.require_pop: - raise PoPSignatureError(f"Cannot extract public key from {server_did}") + raise PoPSignatureError( + f"Cannot extract public key from {server_did}" + ) return False else: - # For did:web, we'd need to fetch DID document - logger.debug(f"Cannot verify PoP for non-did:key: {server_did}") + logger.debug("Cannot verify PoP for non-did:key DID: %s", server_did) return False - - # Verify PoP + try: verify_pop_response( request=self._pop_request, @@ -575,51 +747,128 @@ def verify_initialize_response( self._pop_verified = True logger.info("PoP verification succeeded") return True - except PoPError as e: - logger.warning(f"PoP verification failed: {e}") + except PoPError as exc: + logger.warning("PoP verification failed: %s", exc) if self.require_pop: raise return False - + + async def _verify_server_from_meta(self, meta: Optional[Dict[str, Any]]) -> None: + """Extract server identity from ``_meta`` and call :func:`verify_server`. + + Sets ``self._verify_result`` and enforces ``min_trust_level`` / + ``fail_on_unverified`` constraints. + + Args: + meta: The ``_meta`` dict from ``InitializeResult.meta``. + + Raises: + :class:`~capiscio_mcp.errors.ServerVerifyError`: If constraints are violated. + """ + if not meta or not isinstance(meta, dict): + if self.fail_on_unverified: + raise ServerVerifyError( + error_code=ServerErrorCode.DID_INVALID, + detail=( + "Server did not disclose identity (_meta missing)" + + (f" but min_trust_level={self.min_trust_level} is required" + if self.min_trust_level > 0 else "") + ), + ) + logger.debug("Server did not disclose identity (_meta absent or non-dict)") + return + + server_did = meta.get("capiscio_server_did") + server_badge = meta.get("capiscio_server_badge") + + if not server_did: + if self.fail_on_unverified: + raise ServerVerifyError( + error_code=ServerErrorCode.DID_INVALID, + detail="Server _meta does not contain capiscio_server_did", + ) + logger.debug("Server _meta has no capiscio_server_did") + return + + logger.info("Verifying server identity: DID=%s", server_did) + self._verify_result = await verify_server( + server_did=server_did, + server_badge=server_badge, + config=self.verify_config, + ) + + state = self._verify_result.state + trust_level = self._verify_result.trust_level or 0 + + logger.info( + "Server verification result: state=%s trust_level=%d", + state, + trust_level, + ) + + if self.fail_on_unverified and state == ServerState.UNVERIFIED_ORIGIN: + raise ServerVerifyError( + error_code=ServerErrorCode.DID_INVALID, + detail=f"Server identity could not be verified (state={state.value})", + state=state, + server_did=server_did, + ) + + if trust_level < self.min_trust_level: + raise ServerVerifyError( + error_code=ServerErrorCode.TRUST_INSUFFICIENT, + detail=( + f"Server trust level {trust_level} is below required " + f"min_trust_level={self.min_trust_level}" + ), + state=state, + server_did=server_did, + ) + + # ------------------------------------------------------------------ + # Context manager / connect / close + # ------------------------------------------------------------------ + @property def pop_verified(self) -> bool: """Whether PoP verification succeeded.""" return self._pop_verified - + async def __aenter__(self) -> "CapiscioMCPClient": - """ - Async context manager entry. - - Connects to server and verifies identity. - """ await self.connect() return self - + async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: - """Async context manager exit.""" await self.close() - + async def connect(self) -> None: - """ - Connect to MCP server. - + """Connect to the MCP server and verify its identity. + For stdio transport, spawns the server process. - For HTTP transport, connects to the server URL (not yet implemented). - - Note: - Server identity verification (min_trust_level, fail_on_unverified) is - not yet functional due to MCP SDK limitations. The SDK does not currently - expose _meta from the initialize response, which is needed to extract - server DID and badge. Server-side trust enforcement works fully. - + Extracts ``_meta`` from the ``InitializeResult`` and calls + :func:`~capiscio_mcp.server.verify_server` to enforce + ``min_trust_level`` and ``fail_on_unverified`` constraints. + Raises: - NotImplementedError: If HTTP transport is requested (not yet supported) + :class:`~capiscio_mcp.errors.ServerVerifyError`: If server verification fails. + :exc:`NotImplementedError`: If HTTP transport is requested (not yet supported). """ if self.command: - # Stdio transport - spawn server process + # Stdio transport — spawn the server subprocess. + # MCP's stdio_client only passes a small whitelist of env vars + # (HOME, PATH, etc.) to the subprocess by default. Forward all + # CAPISCIO_* variables so the server can authenticate with the + # registry, then apply any caller-specified overrides. + capiscio_env = { + k: v + for k, v in os.environ.items() + if k.startswith("CAPISCIO_") or k == "MCP_SERVER_COMMAND" + } + subprocess_env = {**capiscio_env, **self._extra_env} server_params = StdioServerParameters( command=self.command, args=self.args, + env=subprocess_env if subprocess_env else None, ) self._context_manager = stdio_client(server_params) try: @@ -627,14 +876,20 @@ async def connect(self) -> None: self._session = McpClientSession(read_stream, write_stream) try: await self._session.__aenter__() - # Initialize the session - await self._session.initialize() + # Initialize the session and capture the result + result = await self._session.initialize() + # Extract _meta — InitializeResult.meta is the _meta dict + response_meta: Optional[Dict[str, Any]] = getattr(result, "meta", None) + await self._verify_server_from_meta(response_meta) except Exception: - # Clean up session on failure + if self._session: + try: + await self._session.__aexit__(None, None, None) + except Exception: + pass self._session = None raise except Exception: - # Clean up context manager on failure if self._context_manager: try: await self._context_manager.__aexit__(None, None, None) @@ -643,99 +898,102 @@ async def connect(self) -> None: self._context_manager = None raise else: - # HTTP transport would go here - logger.warning("HTTP transport not yet implemented, use stdio with command/args") + logger.warning("HTTP transport not yet implemented; use stdio with command/args") raise NotImplementedError("HTTP transport not yet implemented") - - # Note: Server identity verification is not yet functional. - # MCP SDK currently doesn't expose _meta from initialize response, - # so we cannot extract server_did and server_badge for verification. - # The min_trust_level and fail_on_unverified parameters are stored - # for future use when MCP SDK adds _meta passthrough support. - - logger.info(f"Connected to MCP server: {self.command or self.server_url}") - + + logger.info("Connected to MCP server: %s", self.command or self.server_url) + async def close(self) -> None: - """Close connection to MCP server.""" + """Close the connection to the MCP server.""" if self._session: await self._session.__aexit__(None, None, None) self._session = None if self._context_manager: await self._context_manager.__aexit__(None, None, None) self._context_manager = None - + + # ------------------------------------------------------------------ + # Properties + # ------------------------------------------------------------------ + @property def server_state(self) -> Optional[ServerState]: """Server verification state after connection.""" return self._verify_result.state if self._verify_result else None - + @property def server_trust_level(self) -> Optional[int]: """Server trust level if verified.""" return self._verify_result.trust_level if self._verify_result else None - + @property def server_did(self) -> Optional[str]: """Server DID if disclosed.""" return self._verify_result.server_did if self._verify_result else None - + @property def is_verified(self) -> bool: - """Check if server identity is cryptographically verified.""" + """Whether the server identity is cryptographically verified.""" return ( self._verify_result is not None and self._verify_result.state == ServerState.VERIFIED_PRINCIPAL ) - + + # ------------------------------------------------------------------ + # Tool calls + # ------------------------------------------------------------------ + async def call_tool( self, name: str, arguments: Optional[Dict[str, Any]] = None, ) -> Any: - """ - Call a tool on the connected server. - - Automatically includes client credentials in the request. - + """Call a tool on the connected server, forwarding client credentials. + + For stdio transport the caller's badge (and/or API key) is forwarded in + the JSON-RPC ``_meta`` of the ``tools/call`` request under the keys + ``capiscio_caller_badge`` / ``capiscio_caller_api_key``. The server-side + :func:`_install_credential_extraction` wrapper picks these up and sets + the ``_current_credential`` contextvar before the guarded tool runs. + Args: - name: Tool name - arguments: Tool arguments - + name: Tool name. + arguments: Tool arguments dict. + Returns: - Tool result from the server - + Tool result from the server. + Raises: - RuntimeError: If not connected + :exc:`RuntimeError`: If not connected. """ if self._session is None: raise RuntimeError("Client not connected. Use 'async with' context.") - - # Set credential context for the call - token = set_credential(self._credential) - try: - # Call tool via MCP client session - result = await self._session.call_tool(name, arguments or {}) - return result - finally: - # Reset credential context to avoid leakage between calls/tasks - from capiscio_mcp.guard import _current_credential - _current_credential.reset(token) - + + # Build _meta carrying caller credentials for stdio transport. + meta: Optional[Dict[str, Any]] = None + if self._credential.badge_jws or self._credential.api_key: + meta = {} + if self._credential.badge_jws: + meta["capiscio_caller_badge"] = self._credential.badge_jws + if self._credential.api_key: + meta["capiscio_caller_api_key"] = self._credential.api_key + + return await self._session.call_tool(name, arguments or {}, meta=meta) + async def list_tools(self) -> List[Dict[str, Any]]: - """ - List available tools on the server. - + """List tools available on the connected server. + Returns: - List of tool definitions + List of ``{"name": ..., "description": ...}`` dicts. + + Raises: + :exc:`RuntimeError`: If not connected. """ if self._session is None: raise RuntimeError("Client not connected. Use 'async with' context.") - + result = await self._session.list_tools() return [ - { - "name": tool.name, - "description": tool.description, - } + {"name": tool.name, "description": tool.description} for tool in result.tools ] diff --git a/capiscio_mcp/keeper.py b/capiscio_mcp/keeper.py new file mode 100644 index 0000000..2604d0b --- /dev/null +++ b/capiscio_mcp/keeper.py @@ -0,0 +1,271 @@ +"""ServerBadgeKeeper — Auto-renewal for MCP server badges. + +Mirrors BadgeKeeper from capiscio-sdk-python but uses the MCP server +badge API (POST /v1/sdk/servers/{id}/badge). + +Example: + keeper = ServerBadgeKeeper( + server_id="550e8400-...", + api_key="sk_live_...", + initial_badge="eyJhbGc...", + renewal_threshold=30, + ) + keeper.start() + badge = keeper.get_current_badge() + keeper.stop() +""" + +from __future__ import annotations + +import base64 +import json +import logging +import threading +import time +from typing import Callable, Optional + +import requests + +logger = logging.getLogger(__name__) + +DEFAULT_RENEWAL_THRESHOLD = 30 # seconds before expiry to renew +DEFAULT_CHECK_INTERVAL = 10 # seconds between checks + + +def _decode_jwt_exp(token: str) -> Optional[int]: + """Decode the exp claim from a JWT/JWS compact format without signature verification. + + Args: + token: JWS/JWT compact serialization (header.payload.signature) + + Returns: + Unix timestamp of expiry, or None if not decodable. + """ + try: + parts = token.split(".") + if len(parts) < 2: + return None + payload_b64 = parts[1] + # Pad to a multiple of 4 for base64url decoding + remainder = len(payload_b64) % 4 + if remainder: + payload_b64 += "=" * (4 - remainder) + payload_bytes = base64.urlsafe_b64decode(payload_b64) + payload = json.loads(payload_bytes) + exp = payload.get("exp") + return int(exp) if exp is not None else None + except Exception: + return None + + +class ServerBadgeKeeper: + """Background badge renewal manager for MCP servers. + + Watches the badge ``exp`` claim and automatically renews the badge + before it expires, ensuring the server always has a fresh badge for + identity disclosure in the ``_meta`` of MCP initialize responses. + + Uses ``POST /v1/sdk/servers/{server_id}/badge`` for renewal — the same + endpoint used by :func:`~capiscio_mcp.connect.MCPServerIdentity.connect`. + + Example:: + + keeper = ServerBadgeKeeper( + server_id="550e8400-...", + api_key="sk_live_...", + initial_badge="eyJhbGc...", + ) + with keeper: + badge = keeper.get_current_badge() + """ + + def __init__( + self, + server_id: str, + api_key: str, + initial_badge: str, + ca_url: str = "https://registry.capisc.io", + renewal_threshold: int = DEFAULT_RENEWAL_THRESHOLD, + check_interval: int = DEFAULT_CHECK_INTERVAL, + on_renew: Optional[Callable[[str], None]] = None, + ) -> None: + """Initialize ServerBadgeKeeper. + + Args: + server_id: MCP server UUID (from the CapiscIO dashboard). + api_key: Registry API key (``X-Capiscio-Registry-Key``). + initial_badge: The badge JWS returned by :func:`connect`. + ca_url: Registry base URL. + renewal_threshold: Renew when ``exp - now <= renewal_threshold`` seconds. + check_interval: How often (seconds) the background thread wakes to check. + on_renew: Optional callback called with the new badge string after each renewal. + """ + self.server_id = server_id + self.api_key = api_key + self.ca_url = ca_url.rstrip("/") + self.renewal_threshold = renewal_threshold + self.check_interval = check_interval + self.on_renew = on_renew + + self._current_badge: Optional[str] = initial_badge + self._badge_lock = threading.Lock() + self._thread: Optional[threading.Thread] = None + self._stop_event = threading.Event() + self._running = False + + # ------------------------------------------------------------------ + # Public interface + # ------------------------------------------------------------------ + + def start(self) -> None: + """Start the background renewal thread. + + Raises: + RuntimeError: If the keeper is already running. + """ + if self._running: + raise RuntimeError("ServerBadgeKeeper is already running") + + logger.info( + "Starting ServerBadgeKeeper for server %s (threshold=%ds)", + self.server_id, + self.renewal_threshold, + ) + self._stop_event.clear() + self._running = True + self._thread = threading.Thread( + target=self._run, + name=f"capiscio-badge-keeper-{self.server_id[:8]}", + daemon=True, + ) + self._thread.start() + + def stop(self) -> None: + """Request the background renewal thread to stop and wait up to 5s for it.""" + if not self._running: + return + logger.info("Stopping ServerBadgeKeeper for server %s...", self.server_id) + self._stop_event.set() + if self._thread: + self._thread.join(timeout=5) + if self._thread.is_alive(): + logger.warning( + "ServerBadgeKeeper thread for server %s did not exit within " + "5s; background renewal may still be running", + self.server_id, + ) + else: + logger.info("ServerBadgeKeeper stopped") + self._thread = None + self._running = False + + def get_current_badge(self) -> Optional[str]: + """Return the current (most recently renewed) badge JWS.""" + with self._badge_lock: + return self._current_badge + + def is_running(self) -> bool: + """Whether the background renewal thread is running.""" + return self._running + + # ------------------------------------------------------------------ + # Background thread + # ------------------------------------------------------------------ + + def _run(self) -> None: + """Background thread: periodically check expiry and renew if needed.""" + while not self._stop_event.is_set(): + try: + self._maybe_renew() + except Exception as exc: + logger.warning("ServerBadgeKeeper check failed: %s", exc) + self._stop_event.wait(timeout=self.check_interval) + + logger.debug("ServerBadgeKeeper thread exiting for server %s", self.server_id) + self._running = False + + def _maybe_renew(self) -> None: + """Renew the badge if it is close to expiry.""" + with self._badge_lock: + badge = self._current_badge + + if badge is None: + return + + exp = _decode_jwt_exp(badge) + if exp is None: + logger.debug("Could not decode exp from badge; skipping renewal check") + return + + now = int(time.time()) + seconds_until_expiry = exp - now + logger.debug( + "Badge for server %s expires in %ds (threshold=%ds)", + self.server_id, + seconds_until_expiry, + self.renewal_threshold, + ) + + if seconds_until_expiry <= self.renewal_threshold: + logger.info( + "Badge for server %s expires in %ds — renewing", + self.server_id, + seconds_until_expiry, + ) + self._renew() + + def _renew(self) -> None: + """Call ``POST /v1/sdk/servers/{server_id}/badge`` to get a fresh badge.""" + url = f"{self.ca_url}/v1/sdk/servers/{self.server_id}/badge" + headers = { + "X-Capiscio-Registry-Key": self.api_key, + "Content-Type": "application/json", + } + try: + resp = requests.post(url, headers=headers, json={}, timeout=30) + if resp.status_code in (200, 201): + try: + data = resp.json() + except ValueError as exc: + logger.warning("Badge renewal response was not valid JSON: %s", exc) + return + # Try multiple common response shapes + new_badge = ( + (data.get("data") or {}).get("badge") + or data.get("badge") + or data.get("token") + ) + if new_badge: + with self._badge_lock: + self._current_badge = new_badge + logger.info("Badge renewed for server %s", self.server_id) + if self.on_renew: + try: + self.on_renew(new_badge) + except Exception as exc: + logger.warning("on_renew callback failed: %s", exc) + else: + logger.warning( + "Badge renewal response had no badge field: %s", data + ) + else: + logger.warning( + "Badge renewal failed for server %s: %d — %s", + self.server_id, + resp.status_code, + resp.text, + ) + except requests.RequestException as exc: + logger.warning("Badge renewal request failed: %s", exc) + + # ------------------------------------------------------------------ + # Context manager + # ------------------------------------------------------------------ + + def __enter__(self) -> "ServerBadgeKeeper": + self.start() + return self + + def __exit__(self, exc_type: object, exc_val: object, exc_tb: object) -> bool: + self.stop() + return False diff --git a/docs/guides/deployment.md b/docs/guides/deployment.md new file mode 100644 index 0000000..0e0b51a --- /dev/null +++ b/docs/guides/deployment.md @@ -0,0 +1,255 @@ +# Deploying MCP Servers + +This guide covers deploying MCP servers with CapiscIO identity to production environments — especially ephemeral ones like Docker, AWS Lambda, and Cloud Run where local storage doesn't persist between restarts. + +--- + +## Identity Persistence Challenge + +When you call `MCPServerIdentity.connect()`, the SDK: + +1. Generates an Ed25519 keypair +2. Registers the public key with the CapiscIO registry +3. Receives a `did:web` DID from the registry (e.g. `did:web:registry.capisc.io:servers:...`) +4. Stores keys under `~/.capiscio/servers/{server_id}/` + +> **Note:** A `did:key` is only used in local dev mode when no registry is involved. In production with an API key, the registry assigns a `did:web` identity. + +In persistent environments (VMs, bare metal) this works automatically — the keys survive restarts. In **ephemeral environments** (containers, serverless, CI runners) the filesystem is recreated on every deploy, which would silently generate a new identity each time. + +--- + +## Key Injection via Environment Variable + +Set `CAPISCIO_SERVER_PRIVATE_KEY_PEM` to inject the server's Ed25519 private key from your secrets manager. The SDK will recover the same DID on every start without generating a new keypair. + +### First-Run Capture + +On the very first run (when no key exists anywhere), the SDK generates a keypair and logs a capture hint to stderr: + +``` +╔══════════════════════════════════════════════════════════════════╗ +║ New server identity generated — save key for persistence ║ +╚══════════════════════════════════════════════════════════════════╝ + + Add to your secrets manager / .env: + + CAPISCIO_SERVER_PRIVATE_KEY_PEM='-----BEGIN PRIVATE KEY-----\nMC4CAQ...xYz\n-----END PRIVATE KEY-----\n' + + The DID will be recovered automatically from the key on startup. +``` + +Copy the entire PEM string (including `\n` line breaks) and store it in your secrets manager. + +### Key Resolution Priority + +The SDK resolves the server identity in this order: + +| Priority | Source | When Used | +|----------|--------|-----------| +| 1 | `CAPISCIO_SERVER_PRIVATE_KEY_PEM` env var | Containers, serverless, CI | +| 2 | Local key file (`~/.capiscio/servers/{id}/private_key.pem`) | Persistent environments | +| 3 | Generate new keypair | First run only | + +!!! warning "DID Changes on New Key Generation" + If neither the env var nor local files are available, the SDK generates a **new** keypair + with a **different** DID. Any badges issued to the old DID will no longer be valid. + Always persist the key in ephemeral environments. + +--- + +## Environment Variables Reference + +All variables used by `MCPServerIdentity.connect()` and `MCPServerIdentity.from_env()`: + +| Variable | Required | Description | +|----------|----------|-------------| +| `CAPISCIO_SERVER_ID` | Yes | Server UUID from the dashboard | +| `CAPISCIO_API_KEY` | Yes | Registry API key | +| `CAPISCIO_SERVER_URL` | No | Registry URL (default: `https://registry.capisc.io`) | +| `CAPISCIO_SERVER_DOMAIN` | No | Domain for badge issuance (default: derived from server URL) | +| `CAPISCIO_SERVER_PRIVATE_KEY_PEM` | No | PEM-encoded Ed25519 private key for identity persistence | + +--- + +## Docker + +### docker-compose.yml + +```yaml +services: + mcp-server: + build: . + environment: + CAPISCIO_SERVER_ID: "550e8400-e29b-41d4-a716-446655440000" + CAPISCIO_API_KEY: "${CAPISCIO_API_KEY}" + CAPISCIO_SERVER_PRIVATE_KEY_PEM: "${MCP_SERVER_KEY}" +``` + +### Dockerfile + +```dockerfile +FROM python:3.12-slim + +WORKDIR /app +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt +COPY . . + +# Identity recovered from CAPISCIO_SERVER_PRIVATE_KEY_PEM at runtime +CMD ["python", "server.py"] +``` + +### Server Code + +```python +from capiscio_mcp import MCPServerIdentity +from capiscio_mcp.integrations.mcp import CapiscioMCPServer + +async def main(): + # Reads CAPISCIO_SERVER_ID, CAPISCIO_API_KEY, and + # CAPISCIO_SERVER_PRIVATE_KEY_PEM from environment + identity = await MCPServerIdentity.from_env() + + server = CapiscioMCPServer( + name="my-server", + did=identity.did, + badge=identity.badge, + ) + + @server.tool(min_trust_level=2) + async def my_tool(param: str) -> str: + return f"Result: {param}" + + server.run() +``` + +--- + +## AWS Lambda + +```python +import json +from capiscio_mcp import MCPServerIdentity + +async def handler(event, context): + # Key injected via Lambda environment variables or Secrets Manager + identity = await MCPServerIdentity.from_env() + + return { + "statusCode": 200, + "body": json.dumps({ + "server_did": identity.did, + "badge_valid": identity.badge is not None, + }), + } +``` + +Configure the Lambda environment: + +```bash +aws lambda update-function-configuration \ + --function-name my-mcp-server \ + --environment "Variables={ + CAPISCIO_SERVER_ID=550e8400-..., + CAPISCIO_API_KEY=sk_live_..., + CAPISCIO_SERVER_PRIVATE_KEY_PEM=$(cat private_key.pem | tr '\n' '\\n') + }" +``` + +!!! tip "Use AWS Secrets Manager" + For production, store the PEM in AWS Secrets Manager and reference it + via a Lambda layer or the Secrets Manager SDK rather than inline env vars. + +--- + +## Google Cloud Run + +```yaml +# cloud-run-service.yaml +apiVersion: serving.knative.dev/v1 +kind: Service +metadata: + name: mcp-server +spec: + template: + spec: + containers: + - image: gcr.io/my-project/mcp-server + env: + - name: CAPISCIO_SERVER_ID + value: "550e8400-..." + - name: CAPISCIO_API_KEY + valueFrom: + secretKeyRef: + name: capiscio-api-key + key: latest + - name: CAPISCIO_SERVER_PRIVATE_KEY_PEM + valueFrom: + secretKeyRef: + name: mcp-server-key + key: latest +``` + +--- + +## Kubernetes + +```yaml +apiVersion: apps/v1 +kind: Deployment +metadata: + name: mcp-server +spec: + template: + spec: + containers: + - name: mcp-server + image: my-registry/mcp-server:latest + env: + - name: CAPISCIO_SERVER_ID + value: "550e8400-..." + - name: CAPISCIO_API_KEY + valueFrom: + secretKeyRef: + name: capiscio-secrets + key: api-key + - name: CAPISCIO_SERVER_PRIVATE_KEY_PEM + valueFrom: + secretKeyRef: + name: capiscio-secrets + key: server-private-key +``` + +Create the secret: + +```bash +kubectl create secret generic capiscio-secrets \ + --from-literal=api-key="sk_live_..." \ + --from-file=server-private-key=private_key.pem +``` + +--- + +## Key Rotation + +To rotate the server identity: + +1. Delete (or unset) `CAPISCIO_SERVER_PRIVATE_KEY_PEM` +2. Remove local key files if present (`~/.capiscio/servers/{id}/`) +3. Restart the server — a new keypair and DID will be generated +4. Capture the new key from the log hint +5. Store the new key in your secrets manager + +!!! danger "Badge Invalidation" + Rotating keys changes the server's DID. Any existing badges issued to the + old DID are no longer valid. The server will automatically request a new badge + on the next `connect()` call. + +--- + +## Next Steps + +- [Server Registration](server-registration.md) — Manual step-by-step registration +- [Server-Side (Guarding Tools)](server-side.md) — Protect tools with trust levels +- [MCP SDK Integration](mcp-integration.md) — CapiscioMCPServer wrapper diff --git a/docs/guides/server-registration.md b/docs/guides/server-registration.md index 117ba76..1dc7fa8 100644 --- a/docs/guides/server-registration.md +++ b/docs/guides/server-registration.md @@ -191,10 +191,20 @@ except RegistrationError as e: | Variable | Description | |----------|-------------| +| `CAPISCIO_SERVER_ID` | Server UUID from dashboard | +| `CAPISCIO_API_KEY` | Registry API key | +| `CAPISCIO_SERVER_URL` | Registry URL (default: production) | +| `CAPISCIO_SERVER_DOMAIN` | Domain for badge issuance | +| `CAPISCIO_SERVER_PRIVATE_KEY_PEM` | PEM-encoded Ed25519 private key (ephemeral environments) | | `CAPISCIO_CORE_ADDR` | External core address (default: embedded) | | `CAPISCIO_SERVER_DID` | Pre-configured server DID | | `CAPISCIO_SERVER_PRIVATE_KEY` | Path to private key PEM | +!!! tip "Deploying to containers or serverless?" + See the [Deployment Guide](deployment.md) for Docker, Lambda, Cloud Run, and + Kubernetes examples using `CAPISCIO_SERVER_PRIVATE_KEY_PEM` for ephemeral + identity persistence. + ## Security Best Practices 1. **Never commit private keys** @@ -223,6 +233,7 @@ except RegistrationError as e: ## Next Steps +- [Deployment Guide](deployment.md) - Docker, Lambda, Cloud Run, Kubernetes - [Protect MCP Tools](server-side.md) - Add trust-level requirements - [Client-Side Verification](client-side.md) - Verify servers before connecting - [Evidence Logging](evidence.md) - Audit trail for all tool calls diff --git a/mkdocs.yml b/mkdocs.yml index 6c9f8f2..b20dbad 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -66,6 +66,7 @@ nav: - Quickstart: getting-started/quickstart.md - Guides: - Server Registration: guides/server-registration.md + - Deployment: guides/deployment.md - Server-Side (Guarding Tools): guides/server-side.md - Client-Side (Verifying Servers): guides/client-side.md - Evidence Logging: guides/evidence.md diff --git a/pyproject.toml b/pyproject.toml index adc6892..785c854 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -32,6 +32,8 @@ dependencies = [ "protobuf>=4.25.0", "requests>=2.31.0", "platformdirs>=4.0.0", + "cryptography>=42.0.0", + "base58>=2.1.0", ] [project.optional-dependencies] diff --git a/tests/test_connect.py b/tests/test_connect.py new file mode 100644 index 0000000..d940b01 --- /dev/null +++ b/tests/test_connect.py @@ -0,0 +1,423 @@ +"""Tests for capiscio_mcp.connect.MCPServerIdentity.""" + +import os +from pathlib import Path +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from capiscio_mcp.connect import ( + MCPServerIdentity, + _issue_badge_sync, + _load_private_key_pem, + _log_key_capture_hint, + DEFAULT_REGISTRY, + DEFAULT_MCP_KEYS_DIR, + ENV_SERVER_PRIVATE_KEY, +) +from capiscio_mcp.keeper import ServerBadgeKeeper + + +SERVER_ID = "550e8400-e29b-41d4-a716-446655440000" +API_KEY = "sk_test_abc123" +FAKE_DID = "did:key:z6MkhaXgBZDvotDkL5257faiztiGiC2QtKLGpbnnEGta2doK" +FAKE_BADGE = "eyJhbGciOiJFZERTQSJ9.eyJleHAiOjk5OTk5OTk5OTl9.fakesig" +FAKE_PRIV_KEY_PEM = "-----BEGIN PRIVATE KEY-----\nfake\n-----END PRIVATE KEY-----\n" +FAKE_PUB_KEY_PEM = "-----BEGIN PUBLIC KEY-----\nfake\n-----END PUBLIC KEY-----\n" + + +def _make_real_ed25519_pem() -> tuple[str, str, str]: + """Generate a real Ed25519 keypair for tests that need valid crypto.""" + from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey + from cryptography.hazmat.primitives.serialization import ( + Encoding, NoEncryption, PrivateFormat, PublicFormat, + ) + + key = Ed25519PrivateKey.generate() + priv_pem = key.private_bytes(Encoding.PEM, PrivateFormat.PKCS8, NoEncryption()).decode() + pub_pem = key.public_key().public_bytes(Encoding.PEM, PublicFormat.SubjectPublicKeyInfo).decode() + _, _, _, did = _load_private_key_pem(priv_pem) + return priv_pem, pub_pem, did + + +# --------------------------------------------------------------------------- +# _issue_badge_sync +# --------------------------------------------------------------------------- + + +class TestIssueBadgeSync: + def test_returns_badge_from_data_key(self): + mock_resp = MagicMock() + mock_resp.status_code = 200 + mock_resp.json.return_value = {"data": {"badge": FAKE_BADGE}} + with patch("capiscio_mcp.connect.requests.post", return_value=mock_resp): + result = _issue_badge_sync(SERVER_ID, API_KEY, DEFAULT_REGISTRY) + assert result == FAKE_BADGE + + def test_returns_badge_from_root_key(self): + mock_resp = MagicMock() + mock_resp.status_code = 200 + mock_resp.json.return_value = {"badge": FAKE_BADGE} + with patch("capiscio_mcp.connect.requests.post", return_value=mock_resp): + result = _issue_badge_sync(SERVER_ID, API_KEY, DEFAULT_REGISTRY) + assert result == FAKE_BADGE + + def test_returns_badge_from_token_key(self): + mock_resp = MagicMock() + mock_resp.status_code = 200 + mock_resp.json.return_value = {"token": FAKE_BADGE} + with patch("capiscio_mcp.connect.requests.post", return_value=mock_resp): + result = _issue_badge_sync(SERVER_ID, API_KEY, DEFAULT_REGISTRY) + assert result == FAKE_BADGE + + def test_returns_none_on_http_error(self): + mock_resp = MagicMock() + mock_resp.status_code = 500 + mock_resp.text = "Server Error" + with patch("capiscio_mcp.connect.requests.post", return_value=mock_resp): + result = _issue_badge_sync(SERVER_ID, API_KEY, DEFAULT_REGISTRY) + assert result is None + + def test_returns_none_on_network_error(self): + import requests as req + with patch( + "capiscio_mcp.connect.requests.post", + side_effect=req.RequestException("timeout"), + ): + result = _issue_badge_sync(SERVER_ID, API_KEY, DEFAULT_REGISTRY) + assert result is None + + def test_returns_none_when_no_badge_field(self): + mock_resp = MagicMock() + mock_resp.status_code = 200 + mock_resp.json.return_value = {"status": "ok"} + with patch("capiscio_mcp.connect.requests.post", return_value=mock_resp): + result = _issue_badge_sync(SERVER_ID, API_KEY, DEFAULT_REGISTRY) + assert result is None + + +# --------------------------------------------------------------------------- +# MCPServerIdentity dataclass +# --------------------------------------------------------------------------- + + +class TestMCPServerIdentityDataclass: + def _make_identity(self, **kwargs) -> MCPServerIdentity: + defaults = dict( + server_id=SERVER_ID, + did=FAKE_DID, + api_key=API_KEY, + server_url=DEFAULT_REGISTRY, + keys_dir=Path("/tmp/test-keys"), + badge=FAKE_BADGE, + private_key_pem=FAKE_PRIV_KEY_PEM, + ) + defaults.update(kwargs) + return MCPServerIdentity(**defaults) + + def test_get_badge_returns_badge_without_keeper(self): + identity = self._make_identity() + assert identity.get_badge() == FAKE_BADGE + + def test_get_badge_prefers_keeper(self): + keeper = MagicMock(spec=ServerBadgeKeeper) + keeper.get_current_badge.return_value = "keeper-badge" + identity = self._make_identity(_keeper=keeper) + assert identity.get_badge() == "keeper-badge" + + def test_get_badge_falls_back_to_badge_when_keeper_empty(self): + keeper = MagicMock(spec=ServerBadgeKeeper) + keeper.get_current_badge.return_value = None + identity = self._make_identity(_keeper=keeper) + assert identity.get_badge() == FAKE_BADGE + + def test_close_stops_keeper(self): + keeper = MagicMock(spec=ServerBadgeKeeper) + identity = self._make_identity(_keeper=keeper) + identity.close() + keeper.stop.assert_called_once() + assert identity._keeper is None + + def test_close_without_keeper_is_noop(self): + identity = self._make_identity(_keeper=None) + identity.close() # Should not raise + + def test_context_manager(self): + keeper = MagicMock(spec=ServerBadgeKeeper) + identity = self._make_identity(_keeper=keeper) + with identity: + pass + keeper.stop.assert_called_once() + + def test_context_manager_returns_self(self): + identity = self._make_identity() + with identity as ctx: + assert ctx is identity + + +# --------------------------------------------------------------------------- +# MCPServerIdentity.connect() — new keys path +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +class TestMCPServerIdentityConnect: + @pytest.fixture + def tmp_keys_dir(self, tmp_path): + return tmp_path / "mcp-server-keys" + + async def test_connect_generates_keys_when_none_exist(self, tmp_keys_dir): + """connect() should generate keys when the keys_dir is empty.""" + fake_keys = { + "did_key": FAKE_DID, + "public_key_pem": FAKE_PUB_KEY_PEM, + "private_key_pem": FAKE_PRIV_KEY_PEM, + "key_id": "key-1", + } + + with ( + patch("capiscio_mcp.connect.generate_server_keypair", new_callable=AsyncMock, return_value=fake_keys), + patch("capiscio_mcp.connect.register_server_identity", new_callable=AsyncMock), + patch("capiscio_mcp.connect._issue_badge", new_callable=AsyncMock, return_value=FAKE_BADGE), + patch("capiscio_mcp.connect.ServerBadgeKeeper") as MockKeeper, + ): + mock_keeper = MagicMock(spec=ServerBadgeKeeper) + mock_keeper.get_current_badge.return_value = FAKE_BADGE + MockKeeper.return_value = mock_keeper + + identity = await MCPServerIdentity.connect( + server_id=SERVER_ID, + api_key=API_KEY, + server_url="http://localhost:8080", + keys_dir=tmp_keys_dir, + ) + + assert identity.did == FAKE_DID + assert identity.server_id == SERVER_ID + assert identity.api_key == API_KEY + assert identity.badge == FAKE_BADGE + mock_keeper.start.assert_called_once() + + async def test_connect_recovers_existing_keys(self, tmp_keys_dir): + """connect() should not regenerate keys when they already exist.""" + # Write keys to disk + tmp_keys_dir.mkdir(parents=True, exist_ok=True) + (tmp_keys_dir / "private_key.pem").write_text(FAKE_PRIV_KEY_PEM) + (tmp_keys_dir / "did.txt").write_text(FAKE_DID) + (tmp_keys_dir / "public_key.pem").write_text(FAKE_PUB_KEY_PEM) + + with ( + patch("capiscio_mcp.connect.generate_server_keypair", new_callable=AsyncMock) as mock_gen, + patch("capiscio_mcp.connect._load_private_key_pem", return_value=( + None, FAKE_PRIV_KEY_PEM, FAKE_PUB_KEY_PEM, FAKE_DID, + )), + patch("capiscio_mcp.connect.register_server_identity", new_callable=AsyncMock), + patch("capiscio_mcp.connect._issue_badge", new_callable=AsyncMock, return_value=FAKE_BADGE), + patch("capiscio_mcp.connect.ServerBadgeKeeper") as MockKeeper, + ): + mock_keeper = MagicMock(spec=ServerBadgeKeeper) + mock_keeper.get_current_badge.return_value = FAKE_BADGE + MockKeeper.return_value = mock_keeper + + identity = await MCPServerIdentity.connect( + server_id=SERVER_ID, + api_key=API_KEY, + server_url="http://localhost:8080", + keys_dir=tmp_keys_dir, + ) + + # Should NOT have regenerated keys + mock_gen.assert_not_called() + assert identity.did == FAKE_DID + + async def test_connect_no_badge_when_auto_badge_false(self, tmp_keys_dir): + """connect(auto_badge=False) should skip badge issuance.""" + fake_keys = { + "did_key": FAKE_DID, + "public_key_pem": FAKE_PUB_KEY_PEM, + "private_key_pem": FAKE_PRIV_KEY_PEM, + "key_id": "key-1", + } + + with ( + patch("capiscio_mcp.connect.generate_server_keypair", new_callable=AsyncMock, return_value=fake_keys), + patch("capiscio_mcp.connect.register_server_identity", new_callable=AsyncMock), + patch("capiscio_mcp.connect._issue_badge", new_callable=AsyncMock) as mock_issue, + ): + identity = await MCPServerIdentity.connect( + server_id=SERVER_ID, + api_key=API_KEY, + keys_dir=tmp_keys_dir, + auto_badge=False, + ) + + mock_issue.assert_not_called() + assert identity.badge is None + assert identity._keeper is None + + async def test_connect_handles_badge_failure_gracefully(self, tmp_keys_dir): + """connect() should complete even if badge issuance fails.""" + fake_keys = { + "did_key": FAKE_DID, + "public_key_pem": FAKE_PUB_KEY_PEM, + "private_key_pem": FAKE_PRIV_KEY_PEM, + "key_id": "key-1", + } + + with ( + patch("capiscio_mcp.connect.generate_server_keypair", new_callable=AsyncMock, return_value=fake_keys), + patch("capiscio_mcp.connect.register_server_identity", new_callable=AsyncMock), + patch("capiscio_mcp.connect._issue_badge", new_callable=AsyncMock, return_value=None), + ): + identity = await MCPServerIdentity.connect( + server_id=SERVER_ID, + api_key=API_KEY, + keys_dir=tmp_keys_dir, + ) + + assert identity.badge is None + assert identity._keeper is None + assert identity.did == FAKE_DID + + async def test_connect_uses_env_var_private_key(self, tmp_keys_dir): + """connect() should load identity from CAPISCIO_SERVER_PRIVATE_KEY_PEM.""" + real_priv, real_pub, real_did = _make_real_ed25519_pem() + + with ( + patch.dict(os.environ, {ENV_SERVER_PRIVATE_KEY: real_priv}), + patch("capiscio_mcp.connect.generate_server_keypair", new_callable=AsyncMock) as mock_gen, + patch("capiscio_mcp.connect.register_server_identity", new_callable=AsyncMock), + patch("capiscio_mcp.connect._issue_badge", new_callable=AsyncMock, return_value=None), + ): + identity = await MCPServerIdentity.connect( + server_id=SERVER_ID, + api_key=API_KEY, + keys_dir=tmp_keys_dir, + ) + + # Should NOT have generated a new keypair + mock_gen.assert_not_called() + assert identity.did == real_did + # Should have persisted key to disk + assert (tmp_keys_dir / "private_key.pem").exists() + assert (tmp_keys_dir / "public_key.pem").exists() + assert (tmp_keys_dir / "did.txt").read_text() == real_did + + async def test_connect_env_var_takes_precedence_over_local_file(self, tmp_keys_dir): + """Env var key should override a different key on disk.""" + real_priv, real_pub, real_did = _make_real_ed25519_pem() + + # Write a different (fake) key to disk + tmp_keys_dir.mkdir(parents=True, exist_ok=True) + (tmp_keys_dir / "private_key.pem").write_text(FAKE_PRIV_KEY_PEM) + (tmp_keys_dir / "did.txt").write_text(FAKE_DID) + + with ( + patch.dict(os.environ, {ENV_SERVER_PRIVATE_KEY: real_priv}), + patch("capiscio_mcp.connect.generate_server_keypair", new_callable=AsyncMock) as mock_gen, + patch("capiscio_mcp.connect.register_server_identity", new_callable=AsyncMock), + patch("capiscio_mcp.connect._issue_badge", new_callable=AsyncMock, return_value=None), + ): + identity = await MCPServerIdentity.connect( + server_id=SERVER_ID, + api_key=API_KEY, + keys_dir=tmp_keys_dir, + ) + + mock_gen.assert_not_called() + assert identity.did == real_did # env var DID, not FAKE_DID + + async def test_connect_logs_capture_hint_on_new_generation(self, tmp_keys_dir): + """connect() should log a capture hint when generating a new identity.""" + fake_keys = { + "did_key": FAKE_DID, + "public_key_pem": FAKE_PUB_KEY_PEM, + "private_key_pem": FAKE_PRIV_KEY_PEM, + "key_id": "key-1", + } + + with ( + patch("capiscio_mcp.connect.generate_server_keypair", new_callable=AsyncMock, return_value=fake_keys), + patch("capiscio_mcp.connect.register_server_identity", new_callable=AsyncMock), + patch("capiscio_mcp.connect._issue_badge", new_callable=AsyncMock, return_value=None), + patch("capiscio_mcp.connect._log_key_capture_hint") as mock_hint, + ): + await MCPServerIdentity.connect( + server_id=SERVER_ID, + api_key=API_KEY, + keys_dir=tmp_keys_dir, + ) + + mock_hint.assert_called_once_with(SERVER_ID, FAKE_PRIV_KEY_PEM) + + async def test_connect_no_capture_hint_on_recovery(self, tmp_keys_dir): + """connect() should NOT log a capture hint when recovering from local keys.""" + tmp_keys_dir.mkdir(parents=True, exist_ok=True) + (tmp_keys_dir / "private_key.pem").write_text(FAKE_PRIV_KEY_PEM) + + with ( + patch("capiscio_mcp.connect._load_private_key_pem", return_value=( + None, FAKE_PRIV_KEY_PEM, FAKE_PUB_KEY_PEM, FAKE_DID, + )), + patch("capiscio_mcp.connect.register_server_identity", new_callable=AsyncMock), + patch("capiscio_mcp.connect._issue_badge", new_callable=AsyncMock, return_value=None), + patch("capiscio_mcp.connect._log_key_capture_hint") as mock_hint, + ): + await MCPServerIdentity.connect( + server_id=SERVER_ID, + api_key=API_KEY, + keys_dir=tmp_keys_dir, + ) + + mock_hint.assert_not_called() + + +# --------------------------------------------------------------------------- +# MCPServerIdentity.from_env() +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +class TestMCPServerIdentityFromEnv: + async def test_raises_without_server_id(self): + with patch.dict(os.environ, {}, clear=True): + with pytest.raises(ValueError, match="CAPISCIO_SERVER_ID"): + await MCPServerIdentity.from_env() + + async def test_raises_without_api_key(self): + env = {"CAPISCIO_SERVER_ID": SERVER_ID} + with patch.dict(os.environ, env, clear=True): + with pytest.raises(ValueError, match="CAPISCIO_API_KEY"): + await MCPServerIdentity.from_env() + + async def test_calls_connect_with_env_values(self): + env = { + "CAPISCIO_SERVER_ID": SERVER_ID, + "CAPISCIO_API_KEY": API_KEY, + "CAPISCIO_SERVER_URL": "http://localhost:8080", + } + + fake_identity = MCPServerIdentity( + server_id=SERVER_ID, + did=FAKE_DID, + api_key=API_KEY, + server_url="http://localhost:8080", + keys_dir=Path("/tmp"), + ) + + with patch.dict(os.environ, env, clear=True): + with patch.object( + MCPServerIdentity, + "connect", + new_callable=AsyncMock, + return_value=fake_identity, + ) as mock_connect: + result = await MCPServerIdentity.from_env() + + mock_connect.assert_called_once_with( + server_id=SERVER_ID, + api_key=API_KEY, + server_url="http://localhost:8080", + domain=None, + ) + assert result is fake_identity diff --git a/tests/test_integrations.py b/tests/test_integrations.py index a0a8980..c927868 100644 --- a/tests/test_integrations.py +++ b/tests/test_integrations.py @@ -9,9 +9,9 @@ CapiscioMCPClient, MCP_AVAILABLE, MCP_CLIENT_AVAILABLE, + _install_credential_extraction, ) from capiscio_mcp.types import ServerState -from capiscio_mcp.errors import GuardError, ServerVerifyError # Skip tests that require MCP package if not installed requires_mcp = pytest.mark.skipif(not MCP_AVAILABLE, reason="MCP package not installed") @@ -238,6 +238,7 @@ async def test_context_manager_cleanup(self): client = CapiscioMCPClient( command="python", args=["server.py"], + fail_on_unverified=False, ) async with client: @@ -308,3 +309,303 @@ async def test_list_tools_raises_when_not_connected(self): await client.list_tools() assert "not connected" in str(exc_info.value).lower() + + +@requires_mcp +class TestInstallCredentialExtraction: + """Tests for _install_credential_extraction — stdio badge-in-_meta mechanism.""" + + def _make_fastmcp_with_handler(self): + """Return a minimal FastMCP-like mock with a CallToolRequest handler.""" + from mcp import types as _mcp_types + + original_handler_called_with = [] + + async def original_handler(req): + original_handler_called_with.append(req) + return "original_result" + + mock_server = MagicMock() + mock_server.request_handlers = { + _mcp_types.CallToolRequest: original_handler, + } + + mock_fastmcp = MagicMock() + mock_fastmcp._mcp_server = mock_server + return mock_fastmcp, mock_server, original_handler_called_with + + def test_installs_wrapper_over_call_tool_handler(self): + """_install_credential_extraction should replace the CallToolRequest handler.""" + from mcp import types as _mcp_types + + mock_fastmcp, mock_server, _ = self._make_fastmcp_with_handler() + original_handler = mock_server.request_handlers[_mcp_types.CallToolRequest] + + _install_credential_extraction(mock_fastmcp) + + new_handler = mock_server.request_handlers[_mcp_types.CallToolRequest] + assert new_handler is not original_handler + + def test_no_mcp_server_attribute_is_safe(self): + """Should silently return if fastmcp has no _mcp_server attribute.""" + mock_fastmcp = MagicMock(spec=[]) # no _mcp_server + # Should not raise + _install_credential_extraction(mock_fastmcp) + + def test_no_handler_registered_is_safe(self): + """Should silently return if CallToolRequest handler is not registered.""" + from mcp import types as _mcp_types + + mock_server = MagicMock() + mock_server.request_handlers = {} # no CallToolRequest handler + mock_fastmcp = MagicMock() + mock_fastmcp._mcp_server = mock_server + + _install_credential_extraction(mock_fastmcp) + # request_handlers unchanged + assert _mcp_types.CallToolRequest not in mock_server.request_handlers + + @pytest.mark.asyncio + async def test_badge_extracted_from_meta_and_sets_credential(self): + """Wrapper should extract capiscio_caller_badge from _meta and set contextvar.""" + from mcp import types as _mcp_types + from capiscio_mcp.guard import _current_credential + + captured_cred = [] + + async def original_handler(req): + # Capture whatever credential is set when the original handler runs + captured_cred.append(_current_credential.get()) + return "ok" + + mock_server = MagicMock() + mock_server.request_handlers = {_mcp_types.CallToolRequest: original_handler} + mock_fastmcp = MagicMock() + mock_fastmcp._mcp_server = mock_server + + _install_credential_extraction(mock_fastmcp) + wrapper = mock_server.request_handlers[_mcp_types.CallToolRequest] + + # Build a CallToolRequest with badge in _meta + meta = _mcp_types.RequestParams.Meta(capiscio_caller_badge="badge_jws_value") + params = _mcp_types.CallToolRequestParams(name="test_tool", arguments={}, _meta=meta) + req = _mcp_types.CallToolRequest(params=params) + + await wrapper(req) + + assert len(captured_cred) == 1 + cred = captured_cred[0] + assert cred is not None + assert cred.badge_jws == "badge_jws_value" + assert cred.api_key is None + + @pytest.mark.asyncio + async def test_api_key_extracted_from_meta(self): + """Wrapper should extract capiscio_caller_api_key from _meta.""" + from mcp import types as _mcp_types + from capiscio_mcp.guard import _current_credential + + captured_cred = [] + + async def original_handler(req): + captured_cred.append(_current_credential.get()) + return "ok" + + mock_server = MagicMock() + mock_server.request_handlers = {_mcp_types.CallToolRequest: original_handler} + mock_fastmcp = MagicMock() + mock_fastmcp._mcp_server = mock_server + + _install_credential_extraction(mock_fastmcp) + wrapper = mock_server.request_handlers[_mcp_types.CallToolRequest] + + meta = _mcp_types.RequestParams.Meta(capiscio_caller_api_key="sk-test-key") + params = _mcp_types.CallToolRequestParams(name="test_tool", arguments={}, _meta=meta) + req = _mcp_types.CallToolRequest(params=params) + + await wrapper(req) + + assert captured_cred[0].api_key == "sk-test-key" + assert captured_cred[0].badge_jws is None + + @pytest.mark.asyncio + async def test_no_meta_passes_through_without_credential(self): + """Wrapper should call original handler unchanged when _meta has no credentials.""" + from mcp import types as _mcp_types + from capiscio_mcp.guard import _current_credential + + captured_cred = [] + + async def original_handler(req): + captured_cred.append(_current_credential.get()) + return "ok" + + mock_server = MagicMock() + mock_server.request_handlers = {_mcp_types.CallToolRequest: original_handler} + mock_fastmcp = MagicMock() + mock_fastmcp._mcp_server = mock_server + + _install_credential_extraction(mock_fastmcp) + wrapper = mock_server.request_handlers[_mcp_types.CallToolRequest] + + # No _meta on the request + params = _mcp_types.CallToolRequestParams(name="test_tool", arguments={}) + req = _mcp_types.CallToolRequest(params=params) + + await wrapper(req) + + # Original handler should have been called; no credential set + assert len(captured_cred) == 1 + # Default contextvar should be None (no credential) + assert captured_cred[0] is None + + @pytest.mark.asyncio + async def test_credential_contextvar_is_reset_after_call(self): + """Credential contextvar must be reset to its prior value after the handler returns.""" + from mcp import types as _mcp_types + from capiscio_mcp.guard import _current_credential, set_credential + from capiscio_mcp.types import CallerCredential + + async def original_handler(req): + return "ok" + + mock_server = MagicMock() + mock_server.request_handlers = {_mcp_types.CallToolRequest: original_handler} + mock_fastmcp = MagicMock() + mock_fastmcp._mcp_server = mock_server + + _install_credential_extraction(mock_fastmcp) + wrapper = mock_server.request_handlers[_mcp_types.CallToolRequest] + + # Set a pre-existing credential + prior_cred = CallerCredential(badge_jws="prior_badge") + token = set_credential(prior_cred) + try: + meta = _mcp_types.RequestParams.Meta(capiscio_caller_badge="call_badge") + params = _mcp_types.CallToolRequestParams(name="test", arguments={}, _meta=meta) + req = _mcp_types.CallToolRequest(params=params) + await wrapper(req) + + # Credential should be restored to prior value + assert _current_credential.get() is prior_cred + finally: + _current_credential.reset(token) + + def test_server_init_installs_credential_extraction(self): + """CapiscioMCPServer.__init__ should call _install_credential_extraction.""" + with patch("capiscio_mcp.integrations.mcp.MCP_AVAILABLE", True): + with patch("capiscio_mcp.integrations.mcp.FastMCP"): + with patch( + "capiscio_mcp.integrations.mcp._install_credential_extraction" + ) as mock_install: + CapiscioMCPServer( + name="test", + did="did:web:example.com", + ) + mock_install.assert_called_once() + + +@requires_mcp_client +class TestClientCallToolMetaPropagation: + """Tests for CapiscioMCPClient.call_tool forwarding credentials via _meta.""" + + @pytest.mark.asyncio + async def test_call_tool_passes_badge_in_meta(self): + """call_tool should pass badge in session.call_tool meta kwarg.""" + with patch("capiscio_mcp.integrations.mcp.MCP_CLIENT_AVAILABLE", True): + client = CapiscioMCPClient( + command="python", + args=["server.py"], + badge="eyJhbGciOiJFZERTQSJ9.test.sig", + ) + + mock_session = AsyncMock() + mock_session.call_tool = AsyncMock(return_value="tool_result") + client._session = mock_session + + result = await client.call_tool("my_tool", {"arg": "val"}) + + mock_session.call_tool.assert_called_once_with( + "my_tool", + {"arg": "val"}, + meta={"capiscio_caller_badge": "eyJhbGciOiJFZERTQSJ9.test.sig"}, + ) + assert result == "tool_result" + + @pytest.mark.asyncio + async def test_call_tool_passes_api_key_in_meta(self): + """call_tool should pass api_key in session.call_tool meta kwarg.""" + with patch("capiscio_mcp.integrations.mcp.MCP_CLIENT_AVAILABLE", True): + client = CapiscioMCPClient( + command="python", + args=["server.py"], + api_key="sk-live-test", + ) + + mock_session = AsyncMock() + mock_session.call_tool = AsyncMock(return_value="result") + client._session = mock_session + + await client.call_tool("tool", {}) + + _, _, kwargs = mock_session.call_tool.mock_calls[0] + assert kwargs["meta"] == {"capiscio_caller_api_key": "sk-live-test"} + + @pytest.mark.asyncio + async def test_call_tool_passes_both_badge_and_api_key_in_meta(self): + """call_tool should pass both badge and api_key when both are set.""" + with patch("capiscio_mcp.integrations.mcp.MCP_CLIENT_AVAILABLE", True): + client = CapiscioMCPClient( + command="python", + args=["server.py"], + badge="badge_jws", + api_key="sk-test", + ) + + mock_session = AsyncMock() + mock_session.call_tool = AsyncMock(return_value="result") + client._session = mock_session + + await client.call_tool("tool", {}) + + _, _, kwargs = mock_session.call_tool.mock_calls[0] + assert kwargs["meta"] == { + "capiscio_caller_badge": "badge_jws", + "capiscio_caller_api_key": "sk-test", + } + + @pytest.mark.asyncio + async def test_call_tool_passes_none_meta_when_no_credentials(self): + """call_tool should pass meta=None when no credentials are set.""" + with patch("capiscio_mcp.integrations.mcp.MCP_CLIENT_AVAILABLE", True): + client = CapiscioMCPClient( + command="python", + args=["server.py"], + # No badge or api_key + ) + + mock_session = AsyncMock() + mock_session.call_tool = AsyncMock(return_value="result") + client._session = mock_session + + await client.call_tool("tool", {"x": 1}) + + mock_session.call_tool.assert_called_once_with("tool", {"x": 1}, meta=None) + + @pytest.mark.asyncio + async def test_call_tool_empty_arguments_uses_empty_dict(self): + """call_tool should default arguments to {} when None passed.""" + with patch("capiscio_mcp.integrations.mcp.MCP_CLIENT_AVAILABLE", True): + client = CapiscioMCPClient( + command="python", + args=["server.py"], + ) + + mock_session = AsyncMock() + mock_session.call_tool = AsyncMock(return_value="result") + client._session = mock_session + + await client.call_tool("tool") + + call_args = mock_session.call_tool.call_args + assert call_args.args[1] == {} diff --git a/tests/test_keeper.py b/tests/test_keeper.py new file mode 100644 index 0000000..df370f5 --- /dev/null +++ b/tests/test_keeper.py @@ -0,0 +1,223 @@ +"""Tests for capiscio_mcp.keeper.ServerBadgeKeeper.""" + +import base64 +import json +import time +from unittest.mock import MagicMock, patch + +import pytest + +from capiscio_mcp.keeper import ( + ServerBadgeKeeper, + _decode_jwt_exp, +) + + +# --------------------------------------------------------------------------- +# _decode_jwt_exp helpers +# --------------------------------------------------------------------------- + + +def _make_jwt(payload: dict) -> str: + """Build a minimal JWT compact serialization with the given payload.""" + header = base64.urlsafe_b64encode(b'{"alg":"EdDSA"}').rstrip(b"=").decode() + payload_bytes = json.dumps(payload).encode() + payload_b64 = base64.urlsafe_b64encode(payload_bytes).rstrip(b"=").decode() + return f"{header}.{payload_b64}.fakesig" + + +class TestDecodeJwtExp: + def test_valid_exp(self): + token = _make_jwt({"exp": 9999999999, "sub": "did:key:z6Mk"}) + assert _decode_jwt_exp(token) == 9999999999 + + def test_no_exp_returns_none(self): + token = _make_jwt({"sub": "did:key:z6Mk"}) + assert _decode_jwt_exp(token) is None + + def test_malformed_token_returns_none(self): + assert _decode_jwt_exp("not.a.valid.jwt") is None + + def test_empty_string_returns_none(self): + assert _decode_jwt_exp("") is None + + def test_single_part_returns_none(self): + assert _decode_jwt_exp("onlyone") is None + + def test_exp_is_int(self): + token = _make_jwt({"exp": 1700000000}) + result = _decode_jwt_exp(token) + assert isinstance(result, int) + assert result == 1700000000 + + +# --------------------------------------------------------------------------- +# ServerBadgeKeeper unit tests +# --------------------------------------------------------------------------- + + +class TestServerBadgeKeeper: + SERVER_ID = "550e8400-e29b-41d4-a716-446655440000" + API_KEY = "sk_test_abc123" + INITIAL_BADGE = _make_jwt({"exp": 9999999999, "sub": "test"}) + + def _make_keeper(self, **kwargs) -> ServerBadgeKeeper: + defaults = dict( + server_id=self.SERVER_ID, + api_key=self.API_KEY, + initial_badge=self.INITIAL_BADGE, + check_interval=100, # very large so it never fires in tests + ) + defaults.update(kwargs) + return ServerBadgeKeeper(**defaults) + + def test_init_stores_initial_badge(self): + keeper = self._make_keeper() + assert keeper.get_current_badge() == self.INITIAL_BADGE + + def test_init_not_running(self): + keeper = self._make_keeper() + assert keeper.is_running() is False + + def test_start_sets_running(self): + keeper = self._make_keeper() + keeper.start() + try: + assert keeper.is_running() is True + finally: + keeper.stop() + + def test_stop_clears_running(self): + keeper = self._make_keeper() + keeper.start() + keeper.stop() + assert keeper.is_running() is False + + def test_start_twice_raises(self): + keeper = self._make_keeper() + keeper.start() + try: + with pytest.raises(RuntimeError, match="already running"): + keeper.start() + finally: + keeper.stop() + + def test_context_manager(self): + keeper = self._make_keeper() + with keeper: + assert keeper.is_running() is True + assert keeper.is_running() is False + + def test_get_current_badge_returns_initial(self): + keeper = self._make_keeper() + assert keeper.get_current_badge() == self.INITIAL_BADGE + + def test_stop_without_start_is_noop(self): + keeper = self._make_keeper() + keeper.stop() # Should not raise + assert keeper.is_running() is False + + def test_renewal_updates_badge(self): + """When _renew() is called and HTTP returns a new badge, badge is updated.""" + new_badge = _make_jwt({"exp": 9999999999 + 300, "sub": "renewed"}) + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = {"data": {"badge": new_badge}} + + keeper = self._make_keeper() + with patch("capiscio_mcp.keeper.requests.post", return_value=mock_response): + keeper._renew() + + assert keeper.get_current_badge() == new_badge + + def test_renewal_calls_on_renew_callback(self): + """_renew() should call on_renew when provided.""" + new_badge = _make_jwt({"exp": 9999999999 + 300, "sub": "renewed"}) + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = {"badge": new_badge} + + received = [] + keeper = self._make_keeper(on_renew=lambda b: received.append(b)) + with patch("capiscio_mcp.keeper.requests.post", return_value=mock_response): + keeper._renew() + + assert received == [new_badge] + + def test_renewal_handles_http_error(self): + """_renew() should not raise on HTTP error — just log.""" + mock_response = MagicMock() + mock_response.status_code = 500 + mock_response.text = "Internal Server Error" + + keeper = self._make_keeper() + original_badge = keeper.get_current_badge() + with patch("capiscio_mcp.keeper.requests.post", return_value=mock_response): + keeper._renew() # Should not raise + + # Badge should be unchanged after failure + assert keeper.get_current_badge() == original_badge + + def test_renewal_handles_network_error(self): + """_renew() should not raise on network error.""" + import requests as req + + keeper = self._make_keeper() + with patch( + "capiscio_mcp.keeper.requests.post", + side_effect=req.RequestException("timeout"), + ): + keeper._renew() # Should not raise + + def test_maybe_renew_skips_when_not_near_expiry(self): + """_maybe_renew() should not call _renew when badge is not close to expiry.""" + far_future = _make_jwt({"exp": int(time.time()) + 9999}) + keeper = self._make_keeper(initial_badge=far_future, renewal_threshold=30) + + with patch.object(keeper, "_renew") as mock_renew: + keeper._maybe_renew() + mock_renew.assert_not_called() + + def test_maybe_renew_triggers_when_close_to_expiry(self): + """_maybe_renew() should call _renew when exp - now <= threshold.""" + expiring_soon = _make_jwt({"exp": int(time.time()) + 5}) + keeper = self._make_keeper(initial_badge=expiring_soon, renewal_threshold=30) + + with patch.object(keeper, "_renew") as mock_renew: + keeper._maybe_renew() + mock_renew.assert_called_once() + + def test_maybe_renew_skips_undecodable_badge(self): + """_maybe_renew() should not crash when badge has no exp.""" + keeper = self._make_keeper(initial_badge="not.a.jwt") + # Should not raise and should not call _renew + with patch.object(keeper, "_renew") as mock_renew: + keeper._maybe_renew() + mock_renew.assert_not_called() + + def test_renewal_response_alternative_shapes(self): + """Keeper handles 'badge' at root level and 'token' key.""" + for response_body in [ + {"badge": "renewed-badge-1"}, + {"token": "renewed-badge-2"}, + ]: + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = response_body + + keeper = self._make_keeper() + with patch("capiscio_mcp.keeper.requests.post", return_value=mock_response): + keeper._renew() + + expected = list(response_body.values())[0] + assert keeper.get_current_badge() == expected + + def test_ca_url_trailing_slash_stripped(self): + """ca_url trailing slash should be stripped so URL is clean.""" + keeper = ServerBadgeKeeper( + server_id=self.SERVER_ID, + api_key=self.API_KEY, + initial_badge=self.INITIAL_BADGE, + ca_url="https://registry.capisc.io/", + ) + assert keeper.ca_url == "https://registry.capisc.io"