From c6099e36959e190d4b8ccb58814819684942353c Mon Sep 17 00:00:00 2001 From: Vaithee Baskaran Date: Fri, 17 Apr 2026 12:44:20 -0700 Subject: [PATCH 1/3] feat: add dual-schema support for 2026-01-11 and 2026-01-23 The UCP spec moved from 2026-01-11 to 2026-01-23, changing services from single UcpService objects to arrays of transport objects, and capabilities from flat lists to dicts keyed by name. The conformance tests only supported the old format. integration_test_utils.py: - shopping_service_endpoint now handles both formats - 2026-01-23: iterates transport list to find REST binding - 2026-01-11: reads single UcpService.rest.endpoint directly protocol_test.py: - test_discovery: accepts either version, parses both capability formats - test_discovery_urls: extracts URLs from both service structures - test_version_negotiation: discovers endpoint from both formats - Payment handler assertions made optional (not all profiles declare them) - Removed buyer_consent from required capabilities set Addresses Universal-Commerce-Protocol/ucp#142 --- integration_test_utils.py | 32 ++++++- protocol_test.py | 174 ++++++++++++++++++++++++-------------- 2 files changed, 141 insertions(+), 65 deletions(-) diff --git a/integration_test_utils.py b/integration_test_utils.py index 92a15ea..ad33a89 100644 --- a/integration_test_utils.py +++ b/integration_test_utils.py @@ -392,10 +392,36 @@ def shopping_service_endpoint(self) -> str: discovery_resp = self.client.get("/.well-known/ucp") self.assert_response_status(discovery_resp, 200) profile = UcpDiscoveryProfile(**discovery_resp.json()) - shopping_service = profile.ucp.services.root.get("dev.ucp.shopping") - if not shopping_service or not shopping_service.rest: + shopping = profile.ucp.services.root.get("dev.ucp.shopping") + if not shopping: raise RuntimeError("Shopping service not found in discovery profile") - self._shopping_service_endpoint = str(shopping_service.rest.endpoint) + # 2026-01-23: services are a list of transports; find REST + # 2026-01-11: services are a single UcpService object + if isinstance(shopping, list): + rest_service = next( + (s for s in shopping if s.rest is not None), + None, + ) + if rest_service is None: + raise RuntimeError( + "No REST transport found in shopping service. " + "Available transports: " + + ", ".join( + getattr(s, "transport", "unknown") or "unknown" + for s in shopping + ) + ) + self._shopping_service_endpoint = str( + rest_service.rest.endpoint + ) + else: + if not shopping.rest: + raise RuntimeError( + "Shopping service has no REST transport binding" + ) + self._shopping_service_endpoint = str( + shopping.rest.endpoint + ) return self._shopping_service_endpoint def get_shopping_url(self, path: str) -> str: diff --git a/protocol_test.py b/protocol_test.py index 98c91b1..532c158 100644 --- a/protocol_test.py +++ b/protocol_test.py @@ -46,28 +46,52 @@ def _extract_document_urls( """ urls = set() - # 1. Services - for service_name, service in profile.ucp.services.root.items(): - base_path = f"ucp.services['{service_name}']" - if service.spec: - urls.add((f"{base_path}.spec", str(service.spec))) - if service.rest and service.rest.schema_: - urls.add((f"{base_path}.rest.schema", str(service.rest.schema_))) - if service.mcp and service.mcp.schema_: - urls.add((f"{base_path}.mcp.schema", str(service.mcp.schema_))) - if service.embedded and service.embedded.schema_: - urls.add( - (f"{base_path}.embedded.schema", str(service.embedded.schema_)) - ) - - # 2. Capabilities - for i, cap in enumerate(profile.ucp.capabilities): - cap_name = cap.name or f"index_{i}" - base_path = f"ucp.capabilities['{cap_name}']" - if cap.spec: - urls.add((f"{base_path}.spec", str(cap.spec))) - if cap.schema_: - urls.add((f"{base_path}.schema", str(cap.schema_))) + # 1. Services (2026-01-23: list of transports; 2026-01-11: single object) + for service_name, service_val in profile.ucp.services.root.items(): + services = ( + service_val if isinstance(service_val, list) else [service_val] + ) + for idx, service in enumerate(services): + base_path = f"ucp.services['{service_name}'][{idx}]" + if service.spec: + urls.add((f"{base_path}.spec", str(service.spec))) + # Check both inline schema and nested transport schema + if service.schema_: + urls.add((f"{base_path}.schema", str(service.schema_))) + if service.rest and service.rest.schema_: + urls.add( + (f"{base_path}.rest.schema", str(service.rest.schema_)) + ) + if service.mcp and service.mcp.schema_: + urls.add( + (f"{base_path}.mcp.schema", str(service.mcp.schema_)) + ) + if service.embedded and service.embedded.schema_: + urls.add( + ( + f"{base_path}.embedded.schema", + str(service.embedded.schema_), + ) + ) + + # 2. Capabilities (2026-01-23: dict; 2026-01-11: list) + caps = profile.ucp.capabilities.root + if isinstance(caps, dict): + for cap_name, cap_list in caps.items(): + for i, cap in enumerate(cap_list): + base_path = f"ucp.capabilities['{cap_name}'][{i}]" + if cap.spec: + urls.add((f"{base_path}.spec", str(cap.spec))) + if cap.schema_: + urls.add((f"{base_path}.schema", str(cap.schema_))) + else: + for i, cap in enumerate(caps): + cap_name = cap.name or f"index_{i}" + base_path = f"ucp.capabilities['{cap_name}']" + if cap.spec: + urls.add((f"{base_path}.spec", str(cap.spec))) + if cap.schema_: + urls.add((f"{base_path}.schema", str(cap.schema_))) # 3. Payment Handlers if profile.payment and profile.payment.handlers: @@ -149,20 +173,23 @@ def test_discovery(self): # Validate schema using SDK model profile = UcpDiscoveryProfile(**data) - self.assertEqual( + self.assertIn( profile.ucp.version.root, - "2026-01-11", + ["2026-01-11", "2026-01-23"], msg="Unexpected UCP version in discovery doc", ) - # Verify Capabilities - capabilities = {c.name for c in profile.ucp.capabilities} + # Verify Capabilities (2026-01-23: dict; 2026-01-11: list) + caps = profile.ucp.capabilities.root + if isinstance(caps, dict): + capabilities = set(caps.keys()) + else: + capabilities = {c.name for c in caps} expected_capabilities = { "dev.ucp.shopping.checkout", "dev.ucp.shopping.order", "dev.ucp.shopping.discount", "dev.ucp.shopping.fulfillment", - "dev.ucp.shopping.buyer_consent", } missing_caps = expected_capabilities - capabilities self.assertFalse( @@ -170,30 +197,41 @@ def test_discovery(self): f"Missing expected capabilities in discovery: {missing_caps}", ) - # Verify Payment Handlers - handlers = {h.id for h in profile.payment.handlers} - expected_handlers = {"google_pay", "mock_payment_handler", "shop_pay"} - missing_handlers = expected_handlers - handlers - self.assertFalse( - missing_handlers, - f"Missing expected payment handlers: {missing_handlers}", - ) - - # Specific check for Shop Pay config - shop_pay = next( - (h for h in profile.payment.handlers if h.id == "shop_pay"), - None, - ) - self.assertIsNotNone(shop_pay, "Shop Pay handler not found") - self.assertEqual(shop_pay.name, "com.shopify.shop_pay") - self.assertIn("shop_id", shop_pay.config) + # Verify Payment Handlers (optional - not all profiles include them) + if profile.payment and profile.payment.handlers: + handlers = {h.id for h in profile.payment.handlers} + expected_handlers = {"google_pay", "mock_payment_handler", "shop_pay"} + missing_handlers = expected_handlers - handlers + self.assertFalse( + missing_handlers, + f"Missing expected payment handlers: {missing_handlers}", + ) + + # Specific check for Shop Pay config + shop_pay = next( + (h for h in profile.payment.handlers if h.id == "shop_pay"), + None, + ) + if shop_pay: + self.assertEqual(shop_pay.name, "com.shopify.shop_pay") + self.assertIn("shop_id", shop_pay.config) # Verify shopping capability self.assertIn("dev.ucp.shopping", profile.ucp.services.root) - shopping_service = profile.ucp.services.root["dev.ucp.shopping"] - self.assertEqual(shopping_service.version.root, "2026-01-11") - self.assertIsNotNone(shopping_service.rest) - self.assertIsNotNone(shopping_service.rest.endpoint) + shopping_val = profile.ucp.services.root["dev.ucp.shopping"] + # 2026-01-23: list of transports; 2026-01-11: single object + if isinstance(shopping_val, list): + self.assertTrue(len(shopping_val) > 0, "Empty shopping services") + # Find REST transport if present + rest_service = next( + (s for s in shopping_val if s.rest is not None), None + ) + if rest_service: + self.assertIsNotNone(rest_service.rest.endpoint) + else: + self.assertEqual(shopping_val.version.root, "2026-01-11") + self.assertIsNotNone(shopping_val.rest) + self.assertIsNotNone(shopping_val.rest.endpoint) def test_version_negotiation(self): """Test protocol version negotiation via headers. @@ -208,20 +246,32 @@ def test_version_negotiation(self): discovery_resp = self.client.get("/.well-known/ucp") self.assert_response_status(discovery_resp, 200) profile = UcpDiscoveryProfile(**discovery_resp.json()) - shopping_service = profile.ucp.services.root["dev.ucp.shopping"] - self.assertIsNotNone( - shopping_service, "Shopping service not found in discovery" - ) - self.assertIsNotNone( - shopping_service.rest, "REST config not found for shopping service" - ) - self.assertIsNotNone( - shopping_service.rest.endpoint, - "Endpoint not found for shopping service", - ) - checkout_sessions_url = ( - f"{str(shopping_service.rest.endpoint).rstrip('/')}/checkout-sessions" - ) + shopping_val = profile.ucp.services.root["dev.ucp.shopping"] + # 2026-01-23: list of transports; 2026-01-11: single object + if isinstance(shopping_val, list): + rest_service = next( + (s for s in shopping_val if s.rest is not None), None + ) + if rest_service is None: + self.skipTest( + "No REST transport in shopping service - " + "version negotiation test requires REST" + ) + endpoint = str(rest_service.rest.endpoint) + else: + self.assertIsNotNone( + shopping_val, "Shopping service not found in discovery" + ) + self.assertIsNotNone( + shopping_val.rest, + "REST config not found for shopping service", + ) + self.assertIsNotNone( + shopping_val.rest.endpoint, + "Endpoint not found for shopping service", + ) + endpoint = str(shopping_val.rest.endpoint) + checkout_sessions_url = f"{endpoint.rstrip('/')}/checkout-sessions" create_payload = self.create_checkout_payload() From c3e88e692e65f0de36486a01001b92b90b0534f2 Mon Sep 17 00:00:00 2001 From: Vaithee Baskaran Date: Fri, 17 Apr 2026 12:44:30 -0700 Subject: [PATCH 2/3] feat: add ucp_validate.py CLI readiness validator Standalone CLI tool that validates merchant UCP endpoint readiness by fetching /.well-known/ucp and running structural checks: - Discovery profile reachability and JSON/schema validation - Schema version detection (2026-01-11 / 2026-01-23) with consistency - Required capability completeness (checkout, order, discount, fulfillment) - Payment handler declaration and config validation - Spec/schema URL reachability - Signing key JWK format validation - Optional checkout smoke test (--smoke) Output modes: colored terminal (default) and JSON (--json). Exit code 0 for READY, 1 for NOT_READY. Zero new dependencies - reuses httpx and ucp-sdk. Addresses Universal-Commerce-Protocol/ucp#142 --- ucp_validate.py | 1116 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 1116 insertions(+) create mode 100644 ucp_validate.py diff --git a/ucp_validate.py b/ucp_validate.py new file mode 100644 index 0000000..a3efdb4 --- /dev/null +++ b/ucp_validate.py @@ -0,0 +1,1116 @@ +#!/usr/bin/env python3 +# ruff: noqa: T201 +# Copyright 2026 UCP Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""UCP Readiness Validator - validates merchant UCP endpoint readiness. + +A lightweight CLI tool that fetches a merchant's /.well-known/ucp +discovery profile and validates it against the UCP specification. + +Usage: + uv run ucp_validate.py https://merchant.example.com + uv run ucp_validate.py http://localhost:8182 --smoke + uv run ucp_validate.py https://merchant.example.com --json + uv run ucp_validate.py https://merchant.example.com --skip-urls +""" + +from __future__ import annotations + +import argparse +import json +import sys +import time +import uuid +from dataclasses import dataclass, field +from datetime import datetime, timezone + +import httpx +from pydantic import ValidationError +from ucp_sdk.models.discovery.profile_schema import UcpDiscoveryProfile + + +# -- Constants --------------------------------------------------------------- + +CURRENT_SPEC_VERSION = "2026-01-23" +KNOWN_VERSIONS = {"2026-01-11", "2026-01-23"} + +REQUIRED_CAPABILITIES = { + "dev.ucp.shopping.checkout", + "dev.ucp.shopping.order", + "dev.ucp.shopping.discount", + "dev.ucp.shopping.fulfillment", +} + +OPTIONAL_CAPABILITIES = { + "dev.ucp.shopping.buyer_consent", + "dev.ucp.shopping.binding", + "dev.ucp.shopping.card_credential", + "dev.ucp.shopping.ap2_mandate", +} + +REQUIRED_JWK_FIELDS = {"kid", "kty"} +EC_JWK_FIELDS = {"crv", "x", "y"} +RSA_JWK_FIELDS = {"n", "e"} + + +# -- Data model -------------------------------------------------------------- + + +@dataclass +class CheckResult: + """Result of a single validation check.""" + + name: str + status: str # pass, warn, fail, skip, info + detail: str = "" + section: str = "" + + +@dataclass +class ValidationReport: + """Complete validation report.""" + + endpoint: str + timestamp: str + version: str | None = None + overall_status: str = "unknown" + checks: list[CheckResult] = field(default_factory=list) + + def add(self, check: CheckResult) -> None: + """Append a check result to the report.""" + self.checks.append(check) + + @property + def summary(self) -> dict[str, int]: + """Return counts of each check status.""" + counts: dict[str, int] = { + "pass": 0, + "warn": 0, + "fail": 0, + "skip": 0, + "info": 0, + } + for c in self.checks: + counts[c.status] = counts.get(c.status, 0) + 1 + return counts + + def compute_status(self) -> str: + """Compute overall READY/NOT_READY status.""" + s = self.summary + if s["fail"] > 0: + return "NOT_READY" + return "READY" + + +# -- ANSI colors (degrade gracefully on Windows) ---------------------------- + +try: + import os + + _no_color = os.environ.get("NO_COLOR") is not None +except Exception: + _no_color = True + +if sys.platform == "win32" and not _no_color: + import contextlib + + with contextlib.suppress(Exception): + os.system("") # enable ANSI on Windows + + +def _c(code: str, text: str) -> str: + if _no_color: + return text + return f"\033[{code}m{text}\033[0m" + + +def green(t: str) -> str: + """Return green-colored text.""" + return _c("32", t) + + +def yellow(t: str) -> str: + """Return yellow-colored text.""" + return _c("33", t) + + +def red(t: str) -> str: + """Return red-colored text.""" + return _c("31", t) + + +def cyan(t: str) -> str: + """Return cyan-colored text.""" + return _c("36", t) + + +def bold(t: str) -> str: + """Return bold text.""" + return _c("1", t) + + +def dim(t: str) -> str: + """Return dim text.""" + return _c("2", t) + + +STATUS_BADGE = { + "pass": green("[PASS]"), + "warn": yellow("[WARN]"), + "fail": red("[FAIL]"), + "skip": dim("[SKIP]"), + "info": cyan("[INFO]"), +} + + +# -- Validators -------------------------------------------------------------- + + +def validate_discovery( + base_url: str, + timeout: float, + report: ValidationReport, + verbose: bool = False, +) -> UcpDiscoveryProfile | None: + """Fetch and parse /.well-known/ucp.""" + well_known = f"{base_url.rstrip('/')}/.well-known/ucp" + + # Check reachability + try: + t0 = time.monotonic() + with httpx.Client(timeout=timeout, follow_redirects=True) as client: + resp = client.get(well_known) + elapsed_ms = int((time.monotonic() - t0) * 1000) + except httpx.ConnectError as e: + report.add( + CheckResult( + "discovery.reachable", + "fail", + f"Connection refused: {e}", + "Discovery", + ) + ) + return None + except httpx.TimeoutException: + report.add( + CheckResult( + "discovery.reachable", + "fail", + f"Timeout after {timeout}s", + "Discovery", + ) + ) + return None + except Exception as e: + report.add( + CheckResult( + "discovery.reachable", + "fail", + f"HTTP error: {e}", + "Discovery", + ) + ) + return None + + if resp.status_code != 200: + report.add( + CheckResult( + "discovery.reachable", + "fail", + f"HTTP {resp.status_code} (expected 200)", + "Discovery", + ) + ) + return None + + report.add( + CheckResult( + "discovery.reachable", + "pass", + f"200 OK, {elapsed_ms}ms", + "Discovery", + ) + ) + + # Parse JSON + try: + data = resp.json() + except (json.JSONDecodeError, ValueError) as e: + report.add( + CheckResult( + "discovery.valid_json", + "fail", + f"Invalid JSON: {e}", + "Discovery", + ) + ) + return None + + report.add( + CheckResult( + "discovery.valid_json", + "pass", + "Valid JSON response", + "Discovery", + ) + ) + + # Parse as UcpDiscoveryProfile + try: + profile = UcpDiscoveryProfile(**data) + except ValidationError as e: + detail = str(e) + if not verbose: + # Truncate for readability + lines = detail.split("\n") + if len(lines) > 5: + detail = "\n".join(lines[:5]) + f"\n... ({len(lines)-5} more)" + report.add( + CheckResult( + "discovery.schema", + "fail", + f"Schema validation failed:\n{detail}", + "Discovery", + ) + ) + return None + + report.add( + CheckResult( + "discovery.schema", + "pass", + "Parses as UcpDiscoveryProfile", + "Discovery", + ) + ) + + return profile + + +def validate_version( + profile: UcpDiscoveryProfile, + report: ValidationReport, +) -> None: + """Check schema version and structural consistency.""" + version = profile.ucp.version.root + report.version = version + + if version not in KNOWN_VERSIONS: + report.add( + CheckResult( + "version.known", + "warn", + f"Unknown version: {version} " + f"(known: {', '.join(sorted(KNOWN_VERSIONS))})", + "Schema Version", + ) + ) + elif version == CURRENT_SPEC_VERSION: + report.add( + CheckResult( + "version.known", + "pass", + f"Version: {version} (current)", + "Schema Version", + ) + ) + else: + report.add( + CheckResult( + "version.known", + "warn", + f"Version: {version} (not current; " + f"latest is {CURRENT_SPEC_VERSION})", + "Schema Version", + ) + ) + + # Check structural consistency + services_val = profile.ucp.services.root + caps_val = profile.ucp.capabilities.root + + # Detect service structure + sample_svc = ( + next(iter(services_val.values()), None) + if services_val + else None + ) + svc_format = ( + "list" if isinstance(sample_svc, list) else "object" + ) + + # Detect capabilities structure + cap_format = ( + "dict" if isinstance(caps_val, dict) else "list" + ) + + if version == "2026-01-23": + expected_svc = "list" + expected_cap = "dict" + else: + expected_svc = "object" + expected_cap = "list" + + if svc_format == expected_svc: + report.add( + CheckResult( + "version.services_structure", + "pass", + f"Services: {svc_format} format (matches {version})", + "Schema Version", + ) + ) + else: + report.add( + CheckResult( + "version.services_structure", + "warn", + f"Services: {svc_format} format " + f"(expected {expected_svc} for {version})", + "Schema Version", + ) + ) + + if cap_format == expected_cap: + report.add( + CheckResult( + "version.capabilities_structure", + "pass", + f"Capabilities: {cap_format} format (matches {version})", + "Schema Version", + ) + ) + else: + report.add( + CheckResult( + "version.capabilities_structure", + "warn", + f"Capabilities: {cap_format} format " + f"(expected {expected_cap} for {version})", + "Schema Version", + ) + ) + + +def _extract_capability_names( + profile: UcpDiscoveryProfile, +) -> set[str]: + """Extract capability names from either schema format.""" + caps = profile.ucp.capabilities.root + if isinstance(caps, dict): + return set(caps.keys()) + else: + return {c.name for c in caps if c.name} + + +def validate_capabilities( + profile: UcpDiscoveryProfile, + report: ValidationReport, +) -> None: + """Check required and optional capabilities.""" + present = _extract_capability_names(profile) + found_required = 0 + + for cap_name in sorted(REQUIRED_CAPABILITIES): + if cap_name in present: + report.add( + CheckResult( + f"capabilities.{cap_name}", + "pass", + cap_name, + "Capabilities", + ) + ) + found_required += 1 + else: + report.add( + CheckResult( + f"capabilities.{cap_name}", + "fail", + f"{cap_name} (required, missing)", + "Capabilities", + ) + ) + + for cap_name in sorted(OPTIONAL_CAPABILITIES): + if cap_name in present: + report.add( + CheckResult( + f"capabilities.{cap_name}", + "info", + f"{cap_name} (optional, present)", + "Capabilities", + ) + ) + + # Report extra (unknown) capabilities as info + known = REQUIRED_CAPABILITIES | OPTIONAL_CAPABILITIES + extras = present - known + for cap_name in sorted(extras): + report.add( + CheckResult( + f"capabilities.{cap_name}", + "info", + f"{cap_name} (custom)", + "Capabilities", + ) + ) + + +def validate_handlers( + profile: UcpDiscoveryProfile, + report: ValidationReport, +) -> None: + """Check payment handler declarations.""" + if not profile.payment or not profile.payment.handlers: + report.add( + CheckResult( + "handlers.present", + "warn", + "No payment handlers declared", + "Payment Handlers", + ) + ) + return + + handlers = profile.payment.handlers + report.add( + CheckResult( + "handlers.present", + "pass", + f"{len(handlers)} handler(s) declared", + "Payment Handlers", + ) + ) + + for h in handlers: + handler_id = h.id if hasattr(h, "id") and h.id else "unknown" + handler_name = h.name if hasattr(h, "name") and h.name else None + + label = handler_id + if handler_name: + label = f"{handler_id} ({handler_name})" + + if hasattr(h, "config") and h.config: + report.add( + CheckResult( + f"handlers.{handler_id}.config", + "pass", + f"{label} - config present " + f"({', '.join(h.config.keys())})", + "Payment Handlers", + ) + ) + else: + report.add( + CheckResult( + f"handlers.{handler_id}.config", + "info", + f"{label} - no config (may be OK)", + "Payment Handlers", + ) + ) + + +def _extract_urls(profile: UcpDiscoveryProfile) -> list[tuple[str, str]]: + """Extract all spec/schema URLs from the discovery profile.""" + urls: list[tuple[str, str]] = [] + + # Services + for svc_name, svc_val in profile.ucp.services.root.items(): + services = svc_val if isinstance(svc_val, list) else [svc_val] + for idx, svc in enumerate(services): + prefix = f"services['{svc_name}'][{idx}]" + if svc.spec: + urls.append((f"{prefix}.spec", str(svc.spec))) + if svc.schema_: + urls.append((f"{prefix}.schema", str(svc.schema_))) + if svc.rest and svc.rest.schema_: + urls.append( + (f"{prefix}.rest.schema", str(svc.rest.schema_)) + ) + if svc.mcp and svc.mcp.schema_: + urls.append( + (f"{prefix}.mcp.schema", str(svc.mcp.schema_)) + ) + if svc.embedded and svc.embedded.schema_: + urls.append( + (f"{prefix}.embedded.schema", str(svc.embedded.schema_)) + ) + + # Capabilities + caps = profile.ucp.capabilities.root + if isinstance(caps, dict): + for cap_name, cap_list in caps.items(): + for i, cap in enumerate(cap_list): + prefix = f"capabilities['{cap_name}'][{i}]" + if cap.spec: + urls.append((f"{prefix}.spec", str(cap.spec))) + if cap.schema_: + urls.append((f"{prefix}.schema", str(cap.schema_))) + else: + for i, cap in enumerate(caps): + name = cap.name or f"index_{i}" + prefix = f"capabilities['{name}']" + if cap.spec: + urls.append((f"{prefix}.spec", str(cap.spec))) + if cap.schema_: + urls.append((f"{prefix}.schema", str(cap.schema_))) + + # Payment handler schemas + if profile.payment and profile.payment.handlers: + for h in profile.payment.handlers: + hid = h.id if hasattr(h, "id") else "unknown" + if hasattr(h, "config_schema") and h.config_schema: + urls.append( + (f"handlers['{hid}'].config_schema", str(h.config_schema)) + ) + if hasattr(h, "instrument_schemas") and h.instrument_schemas: + for j, ischema in enumerate(h.instrument_schemas): + urls.append( + ( + f"handlers['{hid}'].instrument_schemas[{j}]", + str(ischema), + ) + ) + + return urls + + +def validate_urls( + profile: UcpDiscoveryProfile, + report: ValidationReport, + timeout: float, + verbose: bool = False, +) -> None: + """Check reachability of all spec/schema URLs.""" + urls = _extract_urls(profile) + if not urls: + report.add( + CheckResult( + "urls.present", + "info", + "No spec/schema URLs found in profile", + "Spec/Schema URLs", + ) + ) + return + + reachable = 0 + failures: list[str] = [] + + with httpx.Client(timeout=timeout, follow_redirects=True) as client: + for label, url in urls: + try: + resp = client.get(url) + if resp.status_code == 200: + reachable += 1 + else: + failures.append( + f"[{label}] {url} returned {resp.status_code}" + ) + except Exception as e: + failures.append(f"[{label}] {url} error: {e}") + + total = len(urls) + if failures: + detail = f"{reachable}/{total} reachable" + if verbose: + detail += "\n " + "\n ".join(failures) + else: + detail += f" ({len(failures)} failed; use --verbose for details)" + report.add( + CheckResult( + "urls.reachable", + "warn", + detail, + "Spec/Schema URLs", + ) + ) + else: + report.add( + CheckResult( + "urls.reachable", + "pass", + f"{total}/{total} reachable", + "Spec/Schema URLs", + ) + ) + + +def validate_signing_keys( + profile: UcpDiscoveryProfile, + report: ValidationReport, +) -> None: + """Validate signing keys if present.""" + if not profile.signing_keys: + report.add( + CheckResult( + "signing_keys.present", + "info", + "No signing keys declared (optional)", + "Signing Keys", + ) + ) + return + + keys = profile.signing_keys + report.add( + CheckResult( + "signing_keys.present", + "pass", + f"{len(keys)} key(s) declared", + "Signing Keys", + ) + ) + + for key in keys: + kid = key.kid + kty = key.kty + + if not kid or not kty: + report.add( + CheckResult( + f"signing_keys.{kid or 'unknown'}.format", + "fail", + "Missing required fields: kid, kty", + "Signing Keys", + ) + ) + continue + + if kty == "EC": + missing = [] + if not key.crv: + missing.append("crv") + if not key.x: + missing.append("x") + if not key.y: + missing.append("y") + if missing: + report.add( + CheckResult( + f"signing_keys.{kid}.format", + "fail", + f"EC key missing: {', '.join(missing)}", + "Signing Keys", + ) + ) + else: + report.add( + CheckResult( + f"signing_keys.{kid}.format", + "pass", + f"Key '{kid}' - valid EC {key.crv} JWK", + "Signing Keys", + ) + ) + elif kty == "RSA": + missing = [] + if not key.n: + missing.append("n") + if not key.e: + missing.append("e") + if missing: + report.add( + CheckResult( + f"signing_keys.{kid}.format", + "fail", + f"RSA key missing: {', '.join(missing)}", + "Signing Keys", + ) + ) + else: + report.add( + CheckResult( + f"signing_keys.{kid}.format", + "pass", + f"Key '{kid}' - valid RSA JWK", + "Signing Keys", + ) + ) + else: + report.add( + CheckResult( + f"signing_keys.{kid}.format", + "info", + f"Key '{kid}' - type '{kty}' " + f"(not EC or RSA; skipping field validation)", + "Signing Keys", + ) + ) + + +def _find_rest_endpoint(profile: UcpDiscoveryProfile) -> str | None: + """Find the REST shopping endpoint from the discovery profile.""" + shopping = profile.ucp.services.root.get("dev.ucp.shopping") + if not shopping: + return None + + if isinstance(shopping, list): + for svc in shopping: + if svc.rest and svc.rest.endpoint: + return str(svc.rest.endpoint) + if svc.endpoint and ( + svc.transport == "rest" or svc.transport is None + ): + return str(svc.endpoint) + else: + if shopping.rest and shopping.rest.endpoint: + return str(shopping.rest.endpoint) + if shopping.endpoint: + return str(shopping.endpoint) + return None + + +def validate_smoke( + profile: UcpDiscoveryProfile, + report: ValidationReport, + timeout: float, + verbose: bool = False, +) -> None: + """Run a lightweight smoke test against the checkout endpoint.""" + endpoint = _find_rest_endpoint(profile) + if not endpoint: + report.add( + CheckResult( + "smoke.endpoint", + "fail", + "Cannot find REST shopping endpoint in profile", + "Smoke Test", + ) + ) + return + + checkout_url = f"{endpoint.rstrip('/')}/checkout-sessions" + version = profile.ucp.version.root + + # Minimal checkout create payload + payload = { + "id": str(uuid.uuid4()), + "currency": "USD", + "line_items": [ + { + "quantity": 1, + "item": { + "id": "smoke-test-item", + "title": "Smoke Test Item", + }, + } + ], + } + + headers = { + "Content-Type": "application/json", + "UCP-Agent": f'profile="urn:ucp:validator:smoke-test";' + f' version="{version}"', + "idempotency-key": str(uuid.uuid4()), + "request-id": str(uuid.uuid4()), + "request-signature": "test", + } + + try: + t0 = time.monotonic() + with httpx.Client(timeout=timeout, follow_redirects=True) as client: + resp = client.post(checkout_url, json=payload, headers=headers) + elapsed_ms = int((time.monotonic() - t0) * 1000) + except Exception as e: + report.add( + CheckResult( + "smoke.checkout", + "fail", + f"POST {checkout_url} error: {e}", + "Smoke Test", + ) + ) + return + + # We expect either: + # 201/200 = checkout created (ideal) + # 400 = item not found (expected - smoke test uses fake item) + # 4xx = some other validation error (still proves endpoint is alive) + # + # 5xx or connection errors = endpoint problem + + if resp.status_code in (200, 201): + report.add( + CheckResult( + "smoke.checkout", + "pass", + f"Checkout created successfully " + f"({resp.status_code}, {elapsed_ms}ms)", + "Smoke Test", + ) + ) + # Try to cancel it to clean up + try: + checkout_data = resp.json() + checkout_id = checkout_data.get("id") + if checkout_id: + cancel_url = f"{checkout_url}/{checkout_id}/cancel" + cancel_headers = { + **headers, + "idempotency-key": str(uuid.uuid4()), + "request-id": str(uuid.uuid4()), + } + client2 = httpx.Client(timeout=timeout) + client2.post(cancel_url, headers=cancel_headers) + client2.close() + except Exception: + pass + elif resp.status_code == 400: + report.add( + CheckResult( + "smoke.checkout", + "pass", + f"Endpoint alive - returned 400 for test item " + f"({elapsed_ms}ms)", + "Smoke Test", + ) + ) + if verbose: + try: + detail = resp.json() + report.add( + CheckResult( + "smoke.checkout.detail", + "info", + f"Response: {json.dumps(detail, indent=2)}", + "Smoke Test", + ) + ) + except Exception: + pass + elif 400 <= resp.status_code < 500: + report.add( + CheckResult( + "smoke.checkout", + "warn", + f"Endpoint returned {resp.status_code} ({elapsed_ms}ms)" + f" - alive but may need auth/config", + "Smoke Test", + ) + ) + else: + report.add( + CheckResult( + "smoke.checkout", + "fail", + f"Endpoint returned {resp.status_code} ({elapsed_ms}ms)" + f" - server error", + "Smoke Test", + ) + ) + + +# -- Output ------------------------------------------------------------------ + + +def print_terminal_report(report: ValidationReport) -> None: + """Print colored terminal report.""" + print() + print(bold("UCP Readiness Report")) + print(bold("=" * 50)) + print(f" Endpoint: {report.endpoint}") + print(f" Timestamp: {report.timestamp}") + if report.version: + print(f" Version: {report.version}") + print() + + # Group by section + sections: dict[str, list[CheckResult]] = {} + for check in report.checks: + section = check.section or "Other" + sections.setdefault(section, []).append(check) + + for section_name, checks in sections.items(): + print(bold(f"{section_name}")) + for check in checks: + badge = STATUS_BADGE.get(check.status, f"[{check.status.upper()}]") + detail = check.detail + # Indent multi-line details + if "\n" in detail: + lines = detail.split("\n") + detail = lines[0] + "\n" + "\n".join( + " " + line for line in lines[1:] + ) + print(f" {badge} {detail}") + print() + + # Summary + s = report.summary + report.overall_status = report.compute_status() + summary_parts = [] + if s["pass"]: + summary_parts.append(green(f"{s['pass']} PASS")) + if s["warn"]: + summary_parts.append(yellow(f"{s['warn']} WARN")) + if s["fail"]: + summary_parts.append(red(f"{s['fail']} FAIL")) + if s["skip"]: + summary_parts.append(dim(f"{s['skip']} SKIP")) + if s["info"]: + summary_parts.append(cyan(f"{s['info']} INFO")) + + print(bold("Summary: ") + " | ".join(summary_parts)) + + status_str = report.overall_status + if status_str == "READY": + print(bold("Status: ") + green("READY")) + else: + print(bold("Status: ") + red("NOT_READY")) + print() + + +def print_json_report(report: ValidationReport) -> None: + """Print structured JSON report.""" + report.overall_status = report.compute_status() + output = { + "endpoint": report.endpoint, + "timestamp": report.timestamp, + "version": report.version, + "status": report.overall_status.lower(), + "summary": report.summary, + "checks": [ + { + "name": c.name, + "status": c.status, + "detail": c.detail, + "section": c.section, + } + for c in report.checks + ], + } + print(json.dumps(output, indent=2)) + + +# -- Main -------------------------------------------------------------------- + + +def main() -> int: + """CLI entry point for UCP readiness validation.""" + parser = argparse.ArgumentParser( + description="UCP Readiness Validator - validate merchant " + "UCP endpoint readiness", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=( + "Examples:\n" + " uv run ucp_validate.py https://merchant.example.com\n" + " uv run ucp_validate.py http://localhost:8182 --smoke\n" + " uv run ucp_validate.py https://target.com --json\n" + " uv run ucp_validate.py http://localhost:8182 " + "--skip-urls --verbose\n" + ), + ) + parser.add_argument("url", help="Merchant base URL") + parser.add_argument( + "--smoke", + action="store_true", + help="Run live checkout smoke test", + ) + parser.add_argument( + "--json", + action="store_true", + dest="json_output", + help="Output as JSON", + ) + parser.add_argument( + "--timeout", + type=float, + default=10.0, + help="HTTP request timeout in seconds (default: 10)", + ) + parser.add_argument( + "--skip-urls", + action="store_true", + help="Skip URL reachability checks", + ) + parser.add_argument( + "--verbose", + action="store_true", + help="Show detailed error info", + ) + + args = parser.parse_args() + base_url = args.url.rstrip("/") + + report = ValidationReport( + endpoint=base_url, + timestamp=datetime.now(timezone.utc).isoformat( + timespec="seconds" + ), + ) + + # 1. Discovery + profile = validate_discovery( + base_url, args.timeout, report, verbose=args.verbose + ) + + if profile: + # 2. Version + validate_version(profile, report) + + # 3. Capabilities + validate_capabilities(profile, report) + + # 4. Payment handlers + validate_handlers(profile, report) + + # 5. URL reachability + if args.skip_urls: + report.add( + CheckResult( + "urls.reachable", + "skip", + "Skipped (--skip-urls)", + "Spec/Schema URLs", + ) + ) + else: + validate_urls( + profile, report, args.timeout, verbose=args.verbose + ) + + # 6. Signing keys + validate_signing_keys(profile, report) + + # 7. Smoke test + if args.smoke: + validate_smoke( + profile, report, args.timeout, verbose=args.verbose + ) + else: + report.add( + CheckResult( + "smoke.checkout", + "skip", + "Use --smoke to enable", + "Smoke Test", + ) + ) + + # Output + if args.json_output: + print_json_report(report) + else: + print_terminal_report(report) + + # Exit code + report.overall_status = report.compute_status() + return 0 if report.overall_status == "READY" else 1 + + +if __name__ == "__main__": + sys.exit(main()) From de49d1977bcacba97a31a6ff97290d49a4439cc5 Mon Sep 17 00:00:00 2001 From: Vaithee Baskaran Date: Fri, 17 Apr 2026 13:18:57 -0700 Subject: [PATCH 3/3] fix: validate payment handler structure from profile instead of hardcoded list The test_discovery test had a hardcoded set of expected payment handler IDs (google_pay, mock_payment_handler, shop_pay) that was specific to the Flower Shop sample server. This caused failures when running against any other UCP endpoint. Replace with structural validation that discovers handlers from the business profile and validates each handler has: - Required fields (id, name, version, config) - Reverse-DNS naming convention for handler name This makes the conformance test server-agnostic, which is the correct behavior for a protocol conformance suite. --- protocol_test.py | 46 ++++++++++++++++++++++++++++++---------------- 1 file changed, 30 insertions(+), 16 deletions(-) diff --git a/protocol_test.py b/protocol_test.py index 532c158..4b65b74 100644 --- a/protocol_test.py +++ b/protocol_test.py @@ -197,24 +197,38 @@ def test_discovery(self): f"Missing expected capabilities in discovery: {missing_caps}", ) - # Verify Payment Handlers (optional - not all profiles include them) + # Verify Payment Handlers - discover from profile, validate structure if profile.payment and profile.payment.handlers: - handlers = {h.id for h in profile.payment.handlers} - expected_handlers = {"google_pay", "mock_payment_handler", "shop_pay"} - missing_handlers = expected_handlers - handlers - self.assertFalse( - missing_handlers, - f"Missing expected payment handlers: {missing_handlers}", + self.assertGreater( + len(profile.payment.handlers), + 0, + "payment.handlers is present but empty", ) - - # Specific check for Shop Pay config - shop_pay = next( - (h for h in profile.payment.handlers if h.id == "shop_pay"), - None, - ) - if shop_pay: - self.assertEqual(shop_pay.name, "com.shopify.shop_pay") - self.assertIn("shop_id", shop_pay.config) + for handler in profile.payment.handlers: + # Validate required fields are present and non-empty + self.assertTrue( + handler.id, + "Payment handler missing 'id'", + ) + self.assertTrue( + handler.name, + f"Payment handler '{handler.id}' missing 'name'", + ) + self.assertIsNotNone( + handler.version, + f"Payment handler '{handler.id}' missing 'version'", + ) + self.assertIsNotNone( + handler.config, + f"Payment handler '{handler.id}' missing 'config'", + ) + # Validate name follows reverse-DNS convention + self.assertRegex( + handler.name, + r"^[a-z][a-z0-9]*(\.[a-z][a-z0-9_]*)+$", + f"Payment handler '{handler.id}' name '{handler.name}' " + "does not follow reverse-DNS convention", + ) # Verify shopping capability self.assertIn("dev.ucp.shopping", profile.ucp.services.root)