From f7a8ab554e67bcf06771b570956d890d900cc0b9 Mon Sep 17 00:00:00 2001 From: Enrico Piovesan Date: Thu, 16 Apr 2026 09:12:17 -0500 Subject: [PATCH 1/3] Implement manual federation sync and peer status surface --- .github/workflows/codeql.yml | 48 + .../traverse-cli/src/federation_operator.rs | 429 ++++ crates/traverse-cli/src/main.rs | 45 +- crates/traverse-registry/src/federation.rs | 2163 +++++++++++++++++ crates/traverse-registry/src/lib.rs | 2 + 5 files changed, 2686 insertions(+), 1 deletion(-) create mode 100644 .github/workflows/codeql.yml create mode 100644 crates/traverse-cli/src/federation_operator.rs create mode 100644 crates/traverse-registry/src/federation.rs diff --git a/.github/workflows/codeql.yml b/.github/workflows/codeql.yml new file mode 100644 index 00000000..74315aad --- /dev/null +++ b/.github/workflows/codeql.yml @@ -0,0 +1,48 @@ +name: CodeQL + +on: + push: + branches: + - main + pull_request: + schedule: + - cron: "23 6 * * 1" + +jobs: + analyze: + name: Analyze (${{ matrix.language }}) + runs-on: ubuntu-latest + permissions: + actions: read + contents: read + security-events: write + + strategy: + fail-fast: false + matrix: + include: + - language: actions + build-mode: none + - language: javascript-typescript + build-mode: none + - language: rust + build-mode: none + + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Initialize CodeQL + uses: github/codeql-action/init@v3 + with: + languages: ${{ matrix.language }} + build-mode: ${{ matrix.build-mode }} + + - name: Set up Rust + if: matrix.language == 'rust' + uses: dtolnay/rust-toolchain@stable + with: + toolchain: 1.94.0 + + - name: Perform CodeQL Analysis + uses: github/codeql-action/analyze@v3 diff --git a/crates/traverse-cli/src/federation_operator.rs b/crates/traverse-cli/src/federation_operator.rs new file mode 100644 index 00000000..5aeeb578 --- /dev/null +++ b/crates/traverse-cli/src/federation_operator.rs @@ -0,0 +1,429 @@ +use serde::Deserialize; +use std::fs; +use std::path::{Path, PathBuf}; +use traverse_registry::{ + FederationFailure, FederationPeer, FederationRegistry, FederationStatusSummary, + FederationSyncOutcome, FederationSyncStatus, FederationTrustState, RegistryScope, TrustRecord, + export_peer_state, +}; + +#[derive(Debug, Clone, Deserialize)] +pub struct FederationOperatorManifest { + pub peer: FederationPeerManifest, + pub trust: TrustRecordManifest, + pub bundle_manifest_path: PathBuf, + pub started_at: String, + pub finished_at: String, + pub evidence_ref: String, +} + +#[derive(Debug, Clone, Deserialize)] +pub struct FederationPeerManifest { + pub peer_id: String, + pub display_name: String, + pub trust_state: FederationTrustStateManifest, + pub identity_fingerprint: String, + pub sync_enabled: bool, + pub last_sync_at: Option, + pub last_sync_status: FederationSyncStatusManifest, + pub visible_registry_scopes: Vec, +} + +#[derive(Debug, Clone, Deserialize)] +pub struct TrustRecordManifest { + pub peer_id: String, + pub trust_model: String, + pub allowed_scopes: Vec, + pub approved_spec_refs: Vec, + pub approved_at: String, + pub revoked_at: Option, +} + +#[derive(Debug, Clone, Copy, Deserialize)] +#[serde(rename_all = "PascalCase")] +pub enum FederationTrustStateManifest { + Trusted, + Pending, + Blocked, + Revoked, +} + +#[derive(Debug, Clone, Copy, Deserialize)] +#[serde(rename_all = "PascalCase")] +pub enum FederationSyncStatusManifest { + Unknown, + Success, + Partial, + Failed, +} + +#[derive(Debug, Clone, Copy, Deserialize)] +#[serde(rename_all = "PascalCase")] +pub enum RegistryScopeManifest { + Public, + Private, +} + +#[derive(Debug)] +struct LoadedFederationContext { + manifest: FederationOperatorManifest, + peer: FederationPeer, + trust: TrustRecord, + federation: FederationRegistry, + registered_bundle: super::RegisteredBundle, +} + +pub fn render_federation_peers(manifest_path: &Path) -> Result { + let context = load_context(manifest_path)?; + Ok(render_peer_listing( + &context.federation.status_summary(), + &context.federation, + )) +} + +pub fn render_federation_sync(manifest_path: &Path) -> Result { + let mut context = load_context(manifest_path)?; + let outcome = sync_context(&mut context)?; + Ok(render_sync_report( + &context.federation.status_summary(), + &context.federation, + &outcome, + )) +} + +pub fn render_federation_status(manifest_path: &Path) -> Result { + let mut context = load_context(manifest_path)?; + let outcome = sync_context(&mut context)?; + Ok(format!( + "{}\n{}", + render_sync_report( + &context.federation.status_summary(), + &context.federation, + &outcome + ), + render_peer_listing(&context.federation.status_summary(), &context.federation) + )) +} + +fn load_context(manifest_path: &Path) -> Result { + let manifest = load_manifest(manifest_path)?; + let bundle_manifest_path = resolve_relative_path(manifest_path, &manifest.bundle_manifest_path); + let registered_bundle = super::load_registered_bundle(&bundle_manifest_path)?; + let peer = manifest.peer.clone().into_peer(); + let trust = manifest.trust.clone().into_trust(); + let mut federation = FederationRegistry::new(); + federation + .register_peer(peer.clone(), trust.clone()) + .map_err(render_federation_failure)?; + + Ok(LoadedFederationContext { + manifest, + peer, + trust, + federation, + registered_bundle, + }) +} + +fn sync_context(context: &mut LoadedFederationContext) -> Result { + let export = export_peer_state( + context.peer.clone(), + context.trust.clone(), + &context.registered_bundle.capability_registry, + &context.registered_bundle.event_registry, + &context.registered_bundle.workflow_registry, + ); + context + .federation + .sync_peer( + export, + &context.registered_bundle.capability_registry, + &context.registered_bundle.event_registry, + &context.registered_bundle.workflow_registry, + &context.manifest.started_at, + &context.manifest.finished_at, + &context.manifest.evidence_ref, + ) + .map_err(render_federation_failure) +} + +fn render_peer_listing( + summary: &FederationStatusSummary, + federation: &FederationRegistry, +) -> String { + let mut lines = vec![ + format!("peer_count: {}", summary.peer_count), + format!("trusted_peer_count: {}", summary.trusted_peer_count), + format!("last_sync_outcome: {:?}", summary.last_sync_outcome).to_lowercase(), + format!( + "sync_age: {}", + summary + .sync_age + .clone() + .unwrap_or_else(|| "none".to_string()) + ), + format!("conflict_count: {}", summary.conflict_count), + format!("blocked_entries: {}", summary.blocked_entries), + format!("route_failures: {}", summary.route_failures), + ]; + + for peer in federation.list_peers() { + lines.push(format!("peer_id: {}", peer.peer_id)); + lines.push(format!("display_name: {}", peer.display_name)); + lines.push(format!("trust_state: {:?}", peer.trust_state).to_lowercase()); + lines.push(format!("sync_enabled: {}", peer.sync_enabled)); + lines.push(format!( + "last_sync_at: {}", + peer.last_sync_at + .clone() + .unwrap_or_else(|| "none".to_string()) + )); + lines.push(format!("last_sync_status: {:?}", peer.last_sync_status).to_lowercase()); + lines.push(format!( + "visible_registry_scopes: {}", + peer.visible_registry_scopes + .iter() + .map(|scope| format!("{scope:?}").to_lowercase()) + .collect::>() + .join(", ") + )); + } + + lines.join("\n") +} + +fn render_sync_report( + summary: &FederationStatusSummary, + federation: &FederationRegistry, + outcome: &FederationSyncOutcome, +) -> String { + let session = &outcome.session; + let mut lines = vec![ + format!("session_id: {}", session.session_id), + format!("peer_id: {}", session.peer_id), + format!("sync_status: {:?}", session.status).to_lowercase(), + format!( + "registry_types: {}", + session + .registry_types + .iter() + .map(|kind| format!("{kind:?}").to_lowercase()) + .collect::>() + .join(", ") + ), + format!("validated_entries: {}", session.validated_entries), + format!("rejected_entries: {}", session.rejected_entries), + format!("conflict_count: {}", session.conflict_count), + format!("evidence_ref: {}", session.evidence_ref), + format!( + "finished_at: {}", + session + .finished_at + .clone() + .unwrap_or_else(|| "none".to_string()) + ), + format!("peer_count: {}", summary.peer_count), + format!("trusted_peer_count: {}", summary.trusted_peer_count), + ]; + + for conflict in &outcome.conflicts { + lines.push(format!( + "conflict: {} {} {} {} {:?}", + conflict.conflict_id, + format!("{:?}", conflict.registry_type).to_lowercase(), + conflict.entry_key, + conflict.conflict_reason, + conflict.resolution_state + )); + } + + lines.push(format!( + "registered_peers: {}", + federation + .list_peers() + .iter() + .map(|peer| peer.peer_id.clone()) + .collect::>() + .join(", ") + )); + + lines.join("\n") +} + +fn load_manifest(manifest_path: &Path) -> Result { + let contents = fs::read_to_string(manifest_path).map_err(|error| { + format!( + "failed to read federation operator manifest {}: {error}", + manifest_path.display() + ) + })?; + serde_json::from_str::(&contents).map_err(|error| { + format!( + "failed to parse federation operator manifest {}: {error}", + manifest_path.display() + ) + }) +} + +fn resolve_relative_path(base_path: &Path, path: &Path) -> PathBuf { + if path.is_absolute() { + path.to_path_buf() + } else { + base_path + .parent() + .unwrap_or_else(|| Path::new(".")) + .join(path) + } +} + +fn render_federation_failure(failure: FederationFailure) -> String { + failure + .errors + .into_iter() + .map(|error| format!("{:?} {}: {}", error.severity, error.target, error.message)) + .collect::>() + .join("\n") +} + +impl FederationPeerManifest { + fn into_peer(self) -> FederationPeer { + FederationPeer { + peer_id: self.peer_id, + display_name: self.display_name, + trust_state: self.trust_state.into(), + identity_fingerprint: self.identity_fingerprint, + sync_enabled: self.sync_enabled, + last_sync_at: self.last_sync_at, + last_sync_status: self.last_sync_status.into(), + visible_registry_scopes: self + .visible_registry_scopes + .into_iter() + .map(Into::into) + .collect(), + } + } +} + +impl TrustRecordManifest { + fn into_trust(self) -> TrustRecord { + TrustRecord { + peer_id: self.peer_id, + trust_model: self.trust_model, + allowed_scopes: self.allowed_scopes.into_iter().map(Into::into).collect(), + approved_spec_refs: self.approved_spec_refs, + approved_at: self.approved_at, + revoked_at: self.revoked_at, + } + } +} + +impl From for RegistryScope { + fn from(value: RegistryScopeManifest) -> Self { + match value { + RegistryScopeManifest::Public => RegistryScope::Public, + RegistryScopeManifest::Private => RegistryScope::Private, + } + } +} + +impl From for FederationTrustState { + fn from(value: FederationTrustStateManifest) -> Self { + match value { + FederationTrustStateManifest::Trusted => FederationTrustState::Trusted, + FederationTrustStateManifest::Pending => FederationTrustState::Pending, + FederationTrustStateManifest::Blocked => FederationTrustState::Blocked, + FederationTrustStateManifest::Revoked => FederationTrustState::Revoked, + } + } +} + +impl From for FederationSyncStatus { + fn from(value: FederationSyncStatusManifest) -> Self { + match value { + FederationSyncStatusManifest::Unknown => FederationSyncStatus::Unknown, + FederationSyncStatusManifest::Success => FederationSyncStatus::Success, + FederationSyncStatusManifest::Partial => FederationSyncStatus::Partial, + FederationSyncStatusManifest::Failed => FederationSyncStatus::Failed, + } + } +} + +#[cfg(test)] +mod tests { + #![allow(clippy::expect_used)] + + use super::{render_federation_peers, render_federation_status, render_federation_sync}; + use std::fs; + use std::path::PathBuf; + use std::time::{SystemTime, UNIX_EPOCH}; + + #[test] + fn federation_peers_and_status_renders_peer_listing_and_sync_summary() { + let temp_dir = unique_temp_dir(); + let manifest_path = temp_dir.join("federation-operator.json"); + let bundle_manifest_path = canonical_bundle_manifest_path(); + + fs::write( + &manifest_path, + format!( + r#"{{ + "peer": {{ + "peer_id": "peer-a", + "display_name": "Peer A", + "trust_state": "Trusted", + "identity_fingerprint": "fingerprint:peer-a", + "sync_enabled": true, + "last_sync_at": null, + "last_sync_status": "Unknown", + "visible_registry_scopes": ["Public"] + }}, + "trust": {{ + "peer_id": "peer-a", + "trust_model": "allowlist", + "allowed_scopes": ["Public"], + "approved_spec_refs": ["026-federation-registry-routing"], + "approved_at": "2026-04-10T00:00:00Z", + "revoked_at": null + }}, + "bundle_manifest_path": "{}", + "started_at": "2026-04-10T00:00:01Z", + "finished_at": "2026-04-10T00:00:02Z", + "evidence_ref": "evidence:federation-sync:peer-a" +}}"#, + bundle_manifest_path.display() + ), + ) + .expect("manifest should write"); + + let peers = render_federation_peers(&manifest_path).expect("peers should render"); + assert!(peers.contains("peer_count: 1")); + assert!(peers.contains("peer_id: peer-a")); + assert!(peers.contains("last_sync_status: unknown")); + + let sync = render_federation_sync(&manifest_path).expect("sync should render"); + assert!(sync.contains("session_id: sync_peer-a_1")); + assert!(sync.contains("sync_status: success")); + assert!(sync.contains("evidence_ref: evidence:federation-sync:peer-a")); + + let status = render_federation_status(&manifest_path).expect("status should render"); + assert!(status.contains("peer_count: 1")); + assert!(status.contains("last_sync_status: success")); + assert!(status.contains("registered_peers: peer-a")); + } + + fn canonical_bundle_manifest_path() -> PathBuf { + PathBuf::from(env!("CARGO_MANIFEST_DIR")) + .join("../../examples/expedition/registry-bundle/manifest.json") + } + + fn unique_temp_dir() -> PathBuf { + let mut path = std::env::temp_dir(); + let nonce = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("clock should be monotonic") + .as_nanos(); + path.push(format!("cogolo-federation-operator-{nonce}")); + fs::create_dir_all(&path).expect("temp dir should create"); + path + } +} diff --git a/crates/traverse-cli/src/main.rs b/crates/traverse-cli/src/main.rs index ac7a0bc3..b50ef6dc 100644 --- a/crates/traverse-cli/src/main.rs +++ b/crates/traverse-cli/src/main.rs @@ -1,8 +1,12 @@ mod agent_packages; mod browser_adapter; +mod federation_operator; use agent_packages::load_agent_package; use browser_adapter::serve_local_browser_adapter; +use federation_operator::{ + render_federation_peers, render_federation_status, render_federation_sync, +}; use serde_json::Value; use std::env; use std::fs; @@ -43,6 +47,15 @@ enum Command { manifest_path: PathBuf, request_path: PathBuf, }, + FederationPeers { + manifest_path: PathBuf, + }, + FederationSync { + manifest_path: PathBuf, + }, + FederationStatus { + manifest_path: PathBuf, + }, ExpeditionExecute { request_path: PathBuf, trace_output_path: Option, @@ -96,6 +109,9 @@ fn run_command(command: Command) -> Result { manifest_path, request_path, } => execute_agent(&manifest_path, &request_path), + Command::FederationPeers { manifest_path } => render_federation_peers(&manifest_path), + Command::FederationSync { manifest_path } => render_federation_sync(&manifest_path), + Command::FederationStatus { manifest_path } => render_federation_status(&manifest_path), Command::ExpeditionExecute { request_path, trace_output_path, @@ -112,6 +128,7 @@ fn parse_command(args: &[String]) -> Result { args.get(2).map(String::as_str), ) { (Some("browser-adapter"), Some("serve")) => parse_browser_adapter_command(args), + (Some("federation"), Some(_)) => parse_federation_command(args), (Some("agent"), Some("execute")) => parse_agent_execute_command(args), (Some("expedition"), Some("execute")) => parse_expedition_execute_command(args), _ => parse_fixed_arity_command(args), @@ -145,6 +162,15 @@ fn parse_fixed_arity_command(args: &[String]) -> Result { ("agent", "inspect") => Ok(Command::AgentInspect { manifest_path: PathBuf::from(&args[3]), }), + ("federation", "peers") => Ok(Command::FederationPeers { + manifest_path: PathBuf::from(&args[3]), + }), + ("federation", "sync") => Ok(Command::FederationSync { + manifest_path: PathBuf::from(&args[3]), + }), + ("federation", "status") => Ok(Command::FederationStatus { + manifest_path: PathBuf::from(&args[3]), + }), ("event", "inspect") => Ok(Command::Event { contract_path: PathBuf::from(&args[3]), }), @@ -168,6 +194,21 @@ fn parse_agent_execute_command(args: &[String]) -> Result { } } +fn parse_federation_command(args: &[String]) -> Result { + match args { + [_, _, _, manifest_path] if args[2] == "peers" => Ok(Command::FederationPeers { + manifest_path: PathBuf::from(manifest_path), + }), + [_, _, _, manifest_path] if args[2] == "sync" => Ok(Command::FederationSync { + manifest_path: PathBuf::from(manifest_path), + }), + [_, _, _, manifest_path] if args[2] == "status" => Ok(Command::FederationStatus { + manifest_path: PathBuf::from(manifest_path), + }), + _ => Err(usage()), + } +} + fn parse_expedition_execute_command(args: &[String]) -> Result { match args { [_, _, _, request_path] => Ok(Command::ExpeditionExecute { @@ -607,7 +648,7 @@ fn render_trace_summary(trace_path: &Path, trace: &RuntimeTrace) -> String { } fn usage() -> String { - "usage: traverse-cli [request-path] [--trace-out ] | traverse-cli browser-adapter serve [--bind
]".to_string() + "usage: traverse-cli [request-path] [--trace-out ] | traverse-cli browser-adapter serve [--bind
]".to_string() } fn write_trace_artifact(path: &Path, trace: &RuntimeTrace) -> Result<(), String> { @@ -649,6 +690,7 @@ fn debug_enum_to_snake_case(value: &str) -> String { struct RegisteredBundle { bundle: RegistryBundle, capability_registry: CapabilityRegistry, + event_registry: EventRegistry, workflow_registry: WorkflowRegistry, capability_records: Vec, event_records: Vec, @@ -788,6 +830,7 @@ fn load_registered_bundle(manifest_path: &Path) -> Result, + pub last_sync_status: FederationSyncStatus, + pub visible_registry_scopes: Vec, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct TrustRecord { + pub peer_id: String, + pub trust_model: String, + pub allowed_scopes: Vec, + pub approved_spec_refs: Vec, + pub approved_at: String, + pub revoked_at: Option, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct FederationPeerExport { + pub peer: FederationPeer, + pub trust: TrustRecord, + pub capabilities: Vec, + pub events: Vec, + pub workflows: Vec, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct FederationSyncSession { + pub session_id: String, + pub peer_id: String, + pub started_at: String, + pub finished_at: Option, + pub status: FederationSyncStatus, + pub registry_types: Vec, + pub validated_entries: usize, + pub rejected_entries: usize, + pub conflict_count: usize, + pub evidence_ref: String, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct PeerRegistrySnapshot { + pub peer_id: String, + pub registry_type: FederationRegistryKind, + pub entry_id: String, + pub version: String, + pub scope: RegistryScope, + pub approval_state: FederationApprovalState, + pub contract_ref: String, + pub provenance_ref: String, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct CrossPeerTraceProvenance { + pub trace_id: String, + pub origin_peer_id: String, + pub owning_peer_id: String, + pub route_reason: String, + pub sync_session_ref: Option, + pub response_status: FederationInvocationStatus, + pub evidence_ref: String, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct FederatedInvocation { + pub invocation_id: String, + pub origin_peer_id: String, + pub target_peer_id: String, + pub capability_id: String, + pub request_ref: String, + pub status: FederationInvocationStatus, + pub response_ref: Option, + pub trace_provenance: CrossPeerTraceProvenance, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct ConflictRecord { + pub conflict_id: String, + pub peer_ids: Vec, + pub registry_type: FederationRegistryKind, + pub entry_key: String, + pub conflict_reason: String, + pub resolution_state: FederationConflictResolutionState, + pub audit_ref: String, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct FederationStatusSummary { + pub peer_count: usize, + pub trusted_peer_count: usize, + pub last_sync_outcome: FederationSyncStatus, + pub sync_age: Option, + pub conflict_count: usize, + pub blocked_entries: usize, + pub route_failures: usize, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct FederationSyncOutcome { + pub session: FederationSyncSession, + pub accepted_snapshots: Vec, + pub conflicts: Vec, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum FederationErrorCode { + MissingRequiredField, + DuplicatePeer, + InvalidTrust, + PeerNotFound, + EntryValidationFailed, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct FederationError { + pub code: FederationErrorCode, + pub target: String, + pub message: String, + pub severity: ErrorSeverity, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct FederationFailure { + pub errors: Vec, +} + +#[derive(Debug, Default)] +pub struct FederationRegistry { + peers: BTreeMap, + trust_records: BTreeMap, + snapshots: BTreeMap<(String, FederationRegistryKind, String, String), PeerRegistrySnapshot>, + sync_sessions: Vec, + invocations: Vec, + conflicts: Vec, +} + +impl FederationRegistry { + #[must_use] + pub fn new() -> Self { + Self::default() + } + + pub fn register_peer( + &mut self, + peer: FederationPeer, + trust: TrustRecord, + ) -> Result<(), FederationFailure> { + let mut errors = Vec::new(); + if peer.peer_id.trim().is_empty() { + errors.push(federation_error( + FederationErrorCode::MissingRequiredField, + "$.peer.peer_id", + "peer_id must not be empty", + )); + } + if peer.display_name.trim().is_empty() { + errors.push(federation_error( + FederationErrorCode::MissingRequiredField, + "$.peer.display_name", + "display_name must not be empty", + )); + } + if peer.identity_fingerprint.trim().is_empty() { + errors.push(federation_error( + FederationErrorCode::MissingRequiredField, + "$.peer.identity_fingerprint", + "identity_fingerprint must not be empty", + )); + } + if peer.peer_id != trust.peer_id { + errors.push(federation_error( + FederationErrorCode::InvalidTrust, + "$.trust.peer_id", + "trust record must reference the same peer_id as the peer", + )); + } + if !peer.sync_enabled { + errors.push(federation_error( + FederationErrorCode::InvalidTrust, + "$.peer.sync_enabled", + "sync_enabled must be true for a trusted federation peer", + )); + } + if !matches!(peer.trust_state, FederationTrustState::Trusted) { + errors.push(federation_error( + FederationErrorCode::InvalidTrust, + "$.peer.trust_state", + "peer trust_state must be trusted before federation registration", + )); + } + if trust.allowed_scopes.is_empty() { + errors.push(federation_error( + FederationErrorCode::InvalidTrust, + "$.trust.allowed_scopes", + "allowed_scopes must not be empty", + )); + } + if trust.approved_spec_refs.is_empty() { + errors.push(federation_error( + FederationErrorCode::InvalidTrust, + "$.trust.approved_spec_refs", + "approved_spec_refs must not be empty", + )); + } + if !errors.is_empty() { + return Err(FederationFailure { errors }); + } + + match self.peers.get(&peer.peer_id) { + Some(existing) + if existing == &peer && self.trust_records.get(&peer.peer_id) == Some(&trust) => + { + Ok(()) + } + Some(_) => Err(FederationFailure { + errors: vec![federation_error( + FederationErrorCode::DuplicatePeer, + "$.peer.peer_id", + "a different federation peer is already registered with this peer_id", + )], + }), + None => { + self.trust_records.insert(peer.peer_id.clone(), trust); + self.peers.insert(peer.peer_id.clone(), peer); + Ok(()) + } + } + } + + #[must_use] + pub fn list_peers(&self) -> Vec { + let mut peers = self.peers.values().cloned().collect::>(); + peers.sort_by(|left, right| left.peer_id.cmp(&right.peer_id)); + peers + } + + #[must_use] + pub fn conflicts(&self) -> &[ConflictRecord] { + &self.conflicts + } + + #[must_use] + pub fn sync_sessions(&self) -> &[FederationSyncSession] { + &self.sync_sessions + } + + #[must_use] + pub fn invocations(&self) -> &[FederatedInvocation] { + &self.invocations + } + + #[must_use] + pub fn status_summary(&self) -> FederationStatusSummary { + let trusted_peer_count = self + .peers + .values() + .filter(|peer| peer.trust_state == FederationTrustState::Trusted) + .count(); + let last_session = self.sync_sessions.last(); + FederationStatusSummary { + peer_count: self.peers.len(), + trusted_peer_count, + last_sync_outcome: last_session + .map(|session| session.status) + .unwrap_or(FederationSyncStatus::Unknown), + sync_age: last_session.and_then(|session| session.finished_at.clone()), + conflict_count: self.conflicts.len(), + blocked_entries: self + .sync_sessions + .iter() + .map(|session| session.rejected_entries) + .sum(), + route_failures: self + .invocations + .iter() + .filter(|invocation| is_route_failure(invocation.status)) + .count(), + } + } + + pub fn sync_peer( + &mut self, + export: FederationPeerExport, + capabilities: &CapabilityRegistry, + events: &EventRegistry, + workflows: &WorkflowRegistry, + started_at: &str, + finished_at: &str, + evidence_ref: &str, + ) -> Result { + let mut errors = Vec::new(); + if started_at.trim().is_empty() { + errors.push(federation_error( + FederationErrorCode::MissingRequiredField, + "$.started_at", + "started_at must not be empty", + )); + } + if finished_at.trim().is_empty() { + errors.push(federation_error( + FederationErrorCode::MissingRequiredField, + "$.finished_at", + "finished_at must not be empty", + )); + } + if evidence_ref.trim().is_empty() { + errors.push(federation_error( + FederationErrorCode::MissingRequiredField, + "$.evidence_ref", + "evidence_ref must not be empty", + )); + } + if export.peer.peer_id != export.trust.peer_id { + errors.push(federation_error( + FederationErrorCode::InvalidTrust, + "$.trust.peer_id", + "export trust record must match the exporting peer id", + )); + } + + let Some(registered_peer) = self.peers.get(&export.peer.peer_id) else { + errors.push(federation_error( + FederationErrorCode::PeerNotFound, + "$.peer.peer_id", + "peer must be registered before it can be synced", + )); + return Err(FederationFailure { errors }); + }; + let Some(registered_trust) = self.trust_records.get(&export.peer.peer_id) else { + errors.push(federation_error( + FederationErrorCode::InvalidTrust, + "$.trust.peer_id", + "peer is missing its approved trust record", + )); + return Err(FederationFailure { errors }); + }; + + if registered_peer != &export.peer || registered_trust != &export.trust { + errors.push(federation_error( + FederationErrorCode::InvalidTrust, + "$.peer", + "exported peer metadata must match the registered trusted peer", + )); + } + if !registered_peer.sync_enabled { + errors.push(federation_error( + FederationErrorCode::InvalidTrust, + "$.peer.sync_enabled", + "sync is disabled for this peer", + )); + } + if registered_peer.trust_state != FederationTrustState::Trusted { + errors.push(federation_error( + FederationErrorCode::InvalidTrust, + "$.peer.trust_state", + "only trusted peers can participate in federation sync", + )); + } + if !errors.is_empty() { + return Err(FederationFailure { errors }); + } + + let mut accepted_snapshots = Vec::new(); + let mut conflict_records = Vec::new(); + + for capability in &export.capabilities { + if let Some(snapshot) = validate_capability_snapshot( + &export.peer, + &export.trust, + capabilities, + capability, + evidence_ref, + &mut conflict_records, + ) { + accepted_snapshots.push(snapshot); + } + } + for event in &export.events { + if let Some(snapshot) = validate_event_snapshot( + &export.peer, + &export.trust, + events, + event, + evidence_ref, + &mut conflict_records, + ) { + accepted_snapshots.push(snapshot); + } + } + for workflow in &export.workflows { + if let Some(snapshot) = validate_workflow_snapshot( + &export.peer, + &export.trust, + workflows, + workflow, + evidence_ref, + &mut conflict_records, + ) { + accepted_snapshots.push(snapshot); + } + } + + for snapshot in &accepted_snapshots { + let key = ( + snapshot.peer_id.clone(), + snapshot.registry_type, + snapshot.entry_id.clone(), + snapshot.version.clone(), + ); + self.snapshots.insert(key, snapshot.clone()); + } + self.conflicts.extend(conflict_records.clone()); + + let status = if accepted_snapshots.is_empty() && conflict_records.is_empty() { + FederationSyncStatus::Failed + } else if conflict_records.is_empty() { + FederationSyncStatus::Success + } else { + FederationSyncStatus::Partial + }; + + let session = FederationSyncSession { + session_id: format!( + "sync_{}_{}", + export.peer.peer_id, + self.sync_sessions.len() + 1 + ), + peer_id: export.peer.peer_id.clone(), + started_at: started_at.to_string(), + finished_at: Some(finished_at.to_string()), + status, + registry_types: synced_registry_types(&accepted_snapshots), + validated_entries: accepted_snapshots.len(), + rejected_entries: conflict_records.len(), + conflict_count: conflict_records.len(), + evidence_ref: evidence_ref.to_string(), + }; + + if let Some(peer) = self.peers.get_mut(&export.peer.peer_id) { + peer.last_sync_at = Some(finished_at.to_string()); + peer.last_sync_status = status; + } + + self.sync_sessions.push(session.clone()); + + Ok(FederationSyncOutcome { + session, + accepted_snapshots, + conflicts: conflict_records, + }) + } + + pub fn route_capability_invocation( + &mut self, + origin_peer_id: &str, + capability_id: &str, + version: &str, + request_ref: &str, + available_peer_ids: &BTreeSet, + routed_at: &str, + evidence_ref: &str, + ) -> Result { + let mut errors = Vec::new(); + if origin_peer_id.trim().is_empty() { + errors.push(federation_error( + FederationErrorCode::MissingRequiredField, + "$.origin_peer_id", + "origin_peer_id must not be empty", + )); + } + if capability_id.trim().is_empty() { + errors.push(federation_error( + FederationErrorCode::MissingRequiredField, + "$.capability_id", + "capability_id must not be empty", + )); + } + if version.trim().is_empty() { + errors.push(federation_error( + FederationErrorCode::MissingRequiredField, + "$.version", + "version must not be empty", + )); + } + if request_ref.trim().is_empty() { + errors.push(federation_error( + FederationErrorCode::MissingRequiredField, + "$.request_ref", + "request_ref must not be empty", + )); + } + if routed_at.trim().is_empty() { + errors.push(federation_error( + FederationErrorCode::MissingRequiredField, + "$.routed_at", + "routed_at must not be empty", + )); + } + if evidence_ref.trim().is_empty() { + errors.push(federation_error( + FederationErrorCode::MissingRequiredField, + "$.evidence_ref", + "evidence_ref must not be empty", + )); + } + if !self.peers.contains_key(origin_peer_id) { + errors.push(federation_error( + FederationErrorCode::PeerNotFound, + "$.origin_peer_id", + "origin peer must be registered before routing", + )); + } + if !errors.is_empty() { + return Err(FederationFailure { errors }); + } + + let origin_peer = self.peers.get(origin_peer_id).expect("validated above"); + let trust = self + .trust_records + .get(origin_peer_id) + .expect("validated above"); + + let candidate = self + .snapshots + .values() + .filter(|snapshot| snapshot.registry_type == FederationRegistryKind::Capability) + .filter(|snapshot| snapshot.entry_id == capability_id && snapshot.version == version) + .filter(|snapshot| scope_is_visible(snapshot.scope, trust, origin_peer)) + .min_by(|left, right| left.peer_id.cmp(&right.peer_id)) + .cloned() + .map(|snapshot| (snapshot.peer_id.clone(), snapshot)); + + let Some((target_peer_id, target_snapshot)) = candidate else { + return Err(FederationFailure { + errors: vec![federation_error( + FederationErrorCode::EntryValidationFailed, + "$.capability_id", + "no synchronized owning peer was found for the requested capability", + )], + }); + }; + + let available = available_peer_ids.contains(&target_peer_id); + let sync_session_ref = self + .sync_sessions + .iter() + .rev() + .find(|session| session.peer_id == target_peer_id) + .map(|session| session.evidence_ref.clone()); + let trace_id = format!("trace_{}_{}_{}", origin_peer_id, capability_id, version); + let invocation_id = format!( + "invocation_{}_{}_{}", + origin_peer_id, capability_id, version + ); + let (status, response_ref, route_reason) = if available { + ( + FederationInvocationStatus::Success, + Some(format!( + "response://{}/{}/{}", + target_peer_id, capability_id, version + )), + format!( + "routed to owning peer {} for synchronized capability snapshot", + target_peer_id + ), + ) + } else { + ( + FederationInvocationStatus::RetryableFailure, + None, + format!( + "owning peer {} is not currently reachable for invocation", + target_peer_id + ), + ) + }; + + let invocation = FederatedInvocation { + invocation_id, + origin_peer_id: origin_peer_id.to_string(), + target_peer_id: target_peer_id.clone(), + capability_id: capability_id.to_string(), + request_ref: request_ref.to_string(), + status, + response_ref, + trace_provenance: CrossPeerTraceProvenance { + trace_id, + origin_peer_id: origin_peer_id.to_string(), + owning_peer_id: target_snapshot.peer_id, + route_reason, + sync_session_ref, + response_status: status, + evidence_ref: evidence_ref.to_string(), + }, + }; + self.invocations.push(invocation.clone()); + Ok(invocation) + } +} + +pub fn export_peer_state( + peer: FederationPeer, + trust: TrustRecord, + capabilities: &CapabilityRegistry, + events: &EventRegistry, + workflows: &WorkflowRegistry, +) -> FederationPeerExport { + FederationPeerExport { + peer, + trust, + capabilities: capabilities.graph_entries(), + events: events.graph_entries(), + workflows: workflows.graph_entries(), + } +} + +fn validate_capability_snapshot( + peer: &FederationPeer, + trust: &TrustRecord, + capabilities: &CapabilityRegistry, + export: &ResolvedCapability, + evidence_ref: &str, + conflicts: &mut Vec, +) -> Option { + if !scope_is_allowed(export.record.scope, trust, peer) { + conflicts.push(build_conflict_record( + peer.peer_id.as_str(), + FederationRegistryKind::Capability, + &export.record.id, + &export.record.version, + "peer trust does not authorize the exported scope", + evidence_ref, + )); + return None; + } + + let lookup_scope = lookup_scope_for(export.record.scope); + let Some(local) = + capabilities.find_exact(lookup_scope, &export.record.id, &export.record.version) + else { + conflicts.push(build_conflict_record( + peer.peer_id.as_str(), + FederationRegistryKind::Capability, + &export.record.id, + &export.record.version, + "local approved registry is missing the exported capability", + evidence_ref, + )); + return None; + }; + + if local != *export { + conflicts.push(build_conflict_record( + peer.peer_id.as_str(), + FederationRegistryKind::Capability, + &export.record.id, + &export.record.version, + "local capability record differs from the exported peer record", + evidence_ref, + )); + return None; + } + + Some(build_snapshot( + peer, + FederationRegistryKind::Capability, + &export.record.id, + &export.record.version, + export.record.scope, + export.record.lifecycle.clone(), + &export.record.contract_path, + &format!( + "{:?}:{}:{}", + export.record.provenance.source, + export.record.provenance.author, + export.record.provenance.created_at + ), + )) +} + +fn validate_event_snapshot( + peer: &FederationPeer, + trust: &TrustRecord, + events: &EventRegistry, + export: &ResolvedEvent, + evidence_ref: &str, + conflicts: &mut Vec, +) -> Option { + if !scope_is_allowed(export.record.scope, trust, peer) { + conflicts.push(build_conflict_record( + peer.peer_id.as_str(), + FederationRegistryKind::Event, + &export.record.id, + &export.record.version, + "peer trust does not authorize the exported scope", + evidence_ref, + )); + return None; + } + + let lookup_scope = lookup_scope_for(export.record.scope); + let Some(local) = events.find_exact(lookup_scope, &export.record.id, &export.record.version) + else { + conflicts.push(build_conflict_record( + peer.peer_id.as_str(), + FederationRegistryKind::Event, + &export.record.id, + &export.record.version, + "local approved registry is missing the exported event", + evidence_ref, + )); + return None; + }; + + if local != *export { + conflicts.push(build_conflict_record( + peer.peer_id.as_str(), + FederationRegistryKind::Event, + &export.record.id, + &export.record.version, + "local event record differs from the exported peer record", + evidence_ref, + )); + return None; + } + + Some(build_snapshot( + peer, + FederationRegistryKind::Event, + &export.record.id, + &export.record.version, + export.record.scope, + export.record.lifecycle.clone(), + &export.record.contract_path, + &format!( + "{:?}:{}:{}", + export.record.provenance.source, + export.record.provenance.author, + export.record.provenance.created_at + ), + )) +} + +fn validate_workflow_snapshot( + peer: &FederationPeer, + trust: &TrustRecord, + workflows: &WorkflowRegistry, + export: &ResolvedWorkflow, + evidence_ref: &str, + conflicts: &mut Vec, +) -> Option { + if !scope_is_allowed(export.record.scope, trust, peer) { + conflicts.push(build_conflict_record( + peer.peer_id.as_str(), + FederationRegistryKind::Workflow, + &export.record.id, + &export.record.version, + "peer trust does not authorize the exported scope", + evidence_ref, + )); + return None; + } + + let lookup_scope = lookup_scope_for(export.record.scope); + let Some(local) = workflows.find_exact(lookup_scope, &export.record.id, &export.record.version) + else { + conflicts.push(build_conflict_record( + peer.peer_id.as_str(), + FederationRegistryKind::Workflow, + &export.record.id, + &export.record.version, + "local approved registry is missing the exported workflow", + evidence_ref, + )); + return None; + }; + + if local != *export { + conflicts.push(build_conflict_record( + peer.peer_id.as_str(), + FederationRegistryKind::Workflow, + &export.record.id, + &export.record.version, + "local workflow record differs from the exported peer record", + evidence_ref, + )); + return None; + } + + Some(build_snapshot( + peer, + FederationRegistryKind::Workflow, + &export.record.id, + &export.record.version, + export.record.scope, + export.record.lifecycle.clone(), + &export.record.workflow_path, + &format!( + "{}:{}:{}", + export.record.governing_spec, + export.record.validator_version, + export.record.registered_at + ), + )) +} + +fn build_snapshot( + peer: &FederationPeer, + registry_type: FederationRegistryKind, + entry_id: &str, + version: &str, + scope: RegistryScope, + lifecycle: Lifecycle, + contract_ref: &str, + provenance_ref: &str, +) -> PeerRegistrySnapshot { + PeerRegistrySnapshot { + peer_id: peer.peer_id.clone(), + registry_type, + entry_id: entry_id.to_string(), + version: version.to_string(), + scope, + approval_state: approval_state_from_lifecycle(&lifecycle), + contract_ref: contract_ref.to_string(), + provenance_ref: provenance_ref.to_string(), + } +} + +fn build_conflict_record( + peer_id: &str, + registry_type: FederationRegistryKind, + entry_id: &str, + version: &str, + reason: &str, + audit_ref: &str, +) -> ConflictRecord { + ConflictRecord { + conflict_id: format!("conflict_{}_{}_{}", peer_id, entry_id, version), + peer_ids: vec![peer_id.to_string()], + registry_type, + entry_key: format!("{registry_type:?}:{entry_id}@{version}"), + conflict_reason: reason.to_string(), + resolution_state: FederationConflictResolutionState::Open, + audit_ref: audit_ref.to_string(), + } +} + +fn approval_state_from_lifecycle(lifecycle: &Lifecycle) -> FederationApprovalState { + match lifecycle { + Lifecycle::Draft => FederationApprovalState::Draft, + Lifecycle::Active => FederationApprovalState::Approved, + Lifecycle::Deprecated => FederationApprovalState::Deprecated, + Lifecycle::Retired | Lifecycle::Archived => FederationApprovalState::Rejected, + } +} + +fn is_route_failure(status: FederationInvocationStatus) -> bool { + matches!( + status, + FederationInvocationStatus::Failure | FederationInvocationStatus::RetryableFailure + ) +} + +fn scope_is_allowed(scope: RegistryScope, trust: &TrustRecord, peer: &FederationPeer) -> bool { + trust.allowed_scopes.contains(&scope) && peer.visible_registry_scopes.contains(&scope) +} + +fn scope_is_visible(scope: RegistryScope, trust: &TrustRecord, peer: &FederationPeer) -> bool { + scope_is_allowed(scope, trust, peer) +} + +fn lookup_scope_for(scope: RegistryScope) -> LookupScope { + match scope { + RegistryScope::Public => LookupScope::PublicOnly, + RegistryScope::Private => LookupScope::PreferPrivate, + } +} + +fn synced_registry_types(snapshots: &[PeerRegistrySnapshot]) -> Vec { + let mut kinds = BTreeSet::new(); + for snapshot in snapshots { + kinds.insert(snapshot.registry_type); + } + kinds.into_iter().collect() +} + +fn federation_error(code: FederationErrorCode, target: &str, message: &str) -> FederationError { + FederationError { + code, + target: target.to_string(), + message: message.to_string(), + severity: ErrorSeverity::Error, + } +} + +#[cfg(test)] +#[allow(clippy::expect_used, clippy::too_many_lines)] +mod tests { + use super::*; + use crate::{ + ArtifactDigests, BinaryFormat, BinaryReference, CapabilityArtifactRecord, + CapabilityRegistration, CapabilityRegistry, ComposabilityMetadata, CompositionKind, + CompositionPattern, EventRegistry, ImplementationKind, RegistryProvenance, RegistryScope, + SourceKind, SourceReference, WorkflowDefinition, WorkflowNode, WorkflowNodeInput, + WorkflowNodeOutput, WorkflowRegistration, WorkflowRegistry, export_peer_state, + }; + use serde_json::json; + use traverse_contracts::{ + CapabilityContract, Entrypoint, EntrypointKind, EventClassification, EventContract, + EventPayload, EventProvenance, EventProvenanceSource, EventReference, EventType, Lifecycle, + Owner, PayloadCompatibility, SchemaContainer, SideEffect, SideEffectKind, + }; + + #[test] + fn registers_trusted_peer_and_reports_status() { + let mut federation = FederationRegistry::new(); + let peer = peer("peer-a", "Peer A"); + let trust = trust( + "peer-a", + vec![RegistryScope::Public, RegistryScope::Private], + ); + + federation + .register_peer(peer.clone(), trust.clone()) + .expect("peer should register"); + federation + .register_peer(peer.clone(), trust.clone()) + .expect("identical peer registration should be idempotent"); + + assert_eq!(federation.list_peers(), vec![peer]); + assert!(federation.sync_sessions().is_empty()); + assert!(federation.invocations().is_empty()); + let summary = federation.status_summary(); + assert_eq!(summary.peer_count, 1); + assert_eq!(summary.trusted_peer_count, 1); + assert_eq!(summary.last_sync_outcome, FederationSyncStatus::Unknown); + } + + #[test] + fn syncs_peer_export_and_routes_invocation_to_owner() { + let mut local_capabilities = CapabilityRegistry::new(); + let mut local_events = EventRegistry::new(); + let mut local_workflows = WorkflowRegistry::new(); + seed_capabilities(&mut local_capabilities); + seed_events(&mut local_events); + seed_workflows(&mut local_workflows, &local_capabilities); + + let peer = peer("peer-b", "Peer B"); + let trust = trust( + "peer-b", + vec![RegistryScope::Public, RegistryScope::Private], + ); + let export = export_peer_state( + peer.clone(), + trust.clone(), + &local_capabilities, + &local_events, + &local_workflows, + ); + + let mut federation = FederationRegistry::new(); + federation + .register_peer(peer, trust) + .expect("peer should register"); + + let outcome = federation + .sync_peer( + export, + &local_capabilities, + &local_events, + &local_workflows, + "2026-04-09T20:00:00Z", + "2026-04-09T20:01:00Z", + "evidence:sync-001", + ) + .expect("sync should pass"); + + assert_eq!(outcome.session.status, FederationSyncStatus::Success); + assert!(!outcome.accepted_snapshots.is_empty()); + assert!(outcome.conflicts.is_empty()); + + let origin_peer = self::peer("peer-a", "Peer A"); + let origin_trust = self::trust( + "peer-a", + vec![RegistryScope::Public, RegistryScope::Private], + ); + federation + .register_peer(origin_peer, origin_trust) + .expect("origin peer should register"); + let available = BTreeSet::from([String::from("peer-b")]); + let invocation = federation + .route_capability_invocation( + "peer-a", + "federation.capability.echo", + "1.0.0", + "request:001", + &available, + "2026-04-09T20:02:00Z", + "evidence:route-001", + ) + .expect("invocation should route"); + + assert_eq!(invocation.status, FederationInvocationStatus::Success); + assert_eq!(invocation.target_peer_id, "peer-b"); + assert_eq!(invocation.trace_provenance.origin_peer_id, "peer-a"); + assert_eq!(invocation.trace_provenance.owning_peer_id, "peer-b"); + assert_eq!( + invocation.response_ref.as_deref(), + Some("response://peer-b/federation.capability.echo/1.0.0") + ); + } + + #[test] + fn sync_reports_conflicts_for_divergent_private_entries() { + let mut local_capabilities = CapabilityRegistry::new(); + let mut local_events = EventRegistry::new(); + let mut local_workflows = WorkflowRegistry::new(); + seed_capabilities(&mut local_capabilities); + seed_events(&mut local_events); + seed_workflows(&mut local_workflows, &local_capabilities); + + let mut remote_capabilities = CapabilityRegistry::new(); + let mut altered_contract = capability_contract(); + altered_contract.summary = "divergent export".to_string(); + remote_capabilities + .register(capability_registration( + RegistryScope::Private, + altered_contract, + )) + .expect("remote capability should register"); + seed_events(&mut local_events); + seed_workflows(&mut local_workflows, &local_capabilities); + + let peer = peer("peer-c", "Peer C"); + let trust = trust("peer-c", vec![RegistryScope::Public]); + let export = export_peer_state( + peer.clone(), + trust.clone(), + &remote_capabilities, + &local_events, + &local_workflows, + ); + + let mut federation = FederationRegistry::new(); + federation + .register_peer(peer, trust) + .expect("peer should register"); + + let outcome = federation + .sync_peer( + export, + &local_capabilities, + &local_events, + &local_workflows, + "2026-04-09T20:10:00Z", + "2026-04-09T20:11:00Z", + "evidence:sync-002", + ) + .expect("sync should report conflicts rather than failing"); + + assert_eq!(outcome.session.status, FederationSyncStatus::Partial); + assert!(!outcome.conflicts.is_empty()); + assert_eq!(federation.conflicts().len(), outcome.conflicts.len()); + } + + #[test] + fn sync_reports_conflicts_for_permitted_but_divergent_private_capability_entries() { + let mut local_capabilities = CapabilityRegistry::new(); + seed_capabilities(&mut local_capabilities); + + let mut altered_contract = private_capability_contract(); + altered_contract.summary = "altered private capability".to_string(); + let mut remote_capabilities = CapabilityRegistry::new(); + remote_capabilities + .register(capability_registration( + RegistryScope::Private, + altered_contract, + )) + .expect("remote private capability should register"); + + let peer = peer("peer-capability-divergent", "Peer Capability Divergent"); + let trust = trust( + "peer-capability-divergent", + vec![RegistryScope::Public, RegistryScope::Private], + ); + + let mut federation = FederationRegistry::new(); + federation + .register_peer(peer.clone(), trust.clone()) + .expect("peer should register"); + + let export = export_peer_state( + peer, + trust, + &remote_capabilities, + &EventRegistry::new(), + &WorkflowRegistry::new(), + ); + let outcome = federation + .sync_peer( + export, + &local_capabilities, + &EventRegistry::new(), + &WorkflowRegistry::new(), + "2026-04-09T20:50:00Z", + "2026-04-09T20:51:00Z", + "evidence:divergent-private-capability", + ) + .expect("sync should report permitted capability divergence as conflicts"); + + assert!(outcome.conflicts.iter().any(|conflict| { + conflict + .conflict_reason + .contains("local capability record differs") + })); + } + + #[test] + fn sync_peer_rejects_trust_and_peer_state_mismatches() { + let mut federation = FederationRegistry::new(); + let peer = peer("peer-sync-guard", "Peer Sync Guard"); + let trust = trust("peer-sync-guard", vec![RegistryScope::Public]); + + federation + .register_peer(peer.clone(), trust.clone()) + .expect("peer should register"); + + let export = FederationPeerExport { + peer: peer.clone(), + trust: trust.clone(), + capabilities: Vec::new(), + events: Vec::new(), + workflows: Vec::new(), + }; + + let mut mismatched_trust = export.clone(); + mismatched_trust.trust.peer_id = "other-peer".to_string(); + assert!( + federation + .sync_peer( + mismatched_trust, + &CapabilityRegistry::new(), + &EventRegistry::new(), + &WorkflowRegistry::new(), + "2026-04-09T20:20:00Z", + "2026-04-09T20:21:00Z", + "evidence:sync-mismatch", + ) + .is_err() + ); + + federation.trust_records.remove("peer-sync-guard"); + assert!( + federation + .sync_peer( + export.clone(), + &CapabilityRegistry::new(), + &EventRegistry::new(), + &WorkflowRegistry::new(), + "2026-04-09T20:22:00Z", + "2026-04-09T20:23:00Z", + "evidence:sync-missing-trust", + ) + .is_err() + ); + + federation.peers.remove("peer-sync-guard"); + federation + .register_peer(peer.clone(), trust.clone()) + .expect("peer should re-register"); + if let Some(registered_peer) = federation.peers.get_mut("peer-sync-guard") { + registered_peer.sync_enabled = false; + registered_peer.trust_state = FederationTrustState::Pending; + } + + assert!( + federation + .sync_peer( + export, + &CapabilityRegistry::new(), + &EventRegistry::new(), + &WorkflowRegistry::new(), + "2026-04-09T20:24:00Z", + "2026-04-09T20:25:00Z", + "evidence:sync-disabled", + ) + .is_err() + ); + } + + #[test] + fn sync_peer_reports_missing_local_registry_entries() { + let peer = peer("peer-missing", "Peer Missing"); + let trust = trust( + "peer-missing", + vec![RegistryScope::Public, RegistryScope::Private], + ); + + let mut federation = FederationRegistry::new(); + federation + .register_peer(peer.clone(), trust.clone()) + .expect("peer should register"); + let mut remote_capabilities = CapabilityRegistry::new(); + seed_capabilities(&mut remote_capabilities); + let local_capabilities = CapabilityRegistry::new(); + let local_events = EventRegistry::new(); + let local_workflows = WorkflowRegistry::new(); + let capability_export = export_peer_state( + peer.clone(), + trust.clone(), + &remote_capabilities, + &EventRegistry::new(), + &WorkflowRegistry::new(), + ); + let capability_outcome = federation + .sync_peer( + capability_export, + &local_capabilities, + &local_events, + &local_workflows, + "2026-04-09T20:30:00Z", + "2026-04-09T20:31:00Z", + "evidence:missing-capability", + ) + .expect("sync should report missing capability as conflict"); + assert!(capability_outcome.conflicts.iter().any(|conflict| { + conflict + .conflict_reason + .contains("missing the exported capability") + })); + + let mut federation = FederationRegistry::new(); + federation + .register_peer(peer.clone(), trust.clone()) + .expect("peer should register"); + let mut remote_events = EventRegistry::new(); + seed_events(&mut remote_events); + let event_export = export_peer_state( + peer.clone(), + trust.clone(), + &CapabilityRegistry::new(), + &remote_events, + &WorkflowRegistry::new(), + ); + let event_outcome = federation + .sync_peer( + event_export, + &CapabilityRegistry::new(), + &EventRegistry::new(), + &WorkflowRegistry::new(), + "2026-04-09T20:32:00Z", + "2026-04-09T20:33:00Z", + "evidence:missing-event", + ) + .expect("sync should report missing event as conflict"); + assert!(event_outcome.conflicts.iter().any(|conflict| { + conflict + .conflict_reason + .contains("missing the exported event") + })); + + let mut federation = FederationRegistry::new(); + federation + .register_peer(peer.clone(), trust.clone()) + .expect("peer should register"); + let mut remote_workflows = WorkflowRegistry::new(); + let mut local_capabilities = CapabilityRegistry::new(); + let mut local_events = EventRegistry::new(); + seed_capabilities(&mut local_capabilities); + seed_events(&mut local_events); + seed_workflows(&mut remote_workflows, &local_capabilities); + let workflow_export = export_peer_state( + peer.clone(), + trust.clone(), + &CapabilityRegistry::new(), + &EventRegistry::new(), + &remote_workflows, + ); + let workflow_outcome = federation + .sync_peer( + workflow_export, + &CapabilityRegistry::new(), + &EventRegistry::new(), + &WorkflowRegistry::new(), + "2026-04-09T20:34:00Z", + "2026-04-09T20:35:00Z", + "evidence:missing-workflow", + ) + .expect("sync should report missing workflow as conflict"); + assert!(workflow_outcome.conflicts.iter().any(|conflict| { + conflict + .conflict_reason + .contains("missing the exported workflow") + })); + } + + #[test] + fn sync_peer_marks_empty_exports_as_failed() { + let mut federation = FederationRegistry::new(); + let peer = peer("peer-empty", "Peer Empty"); + let trust = trust("peer-empty", vec![RegistryScope::Public]); + federation + .register_peer(peer.clone(), trust.clone()) + .expect("peer should register"); + + let export = export_peer_state( + peer, + trust, + &CapabilityRegistry::new(), + &EventRegistry::new(), + &WorkflowRegistry::new(), + ); + let outcome = federation + .sync_peer( + export, + &CapabilityRegistry::new(), + &EventRegistry::new(), + &WorkflowRegistry::new(), + "2026-04-09T20:40:00Z", + "2026-04-09T20:41:00Z", + "evidence:sync-empty", + ) + .expect("empty export should still be accepted as a failed sync outcome"); + + assert_eq!(outcome.session.status, FederationSyncStatus::Failed); + assert!(outcome.accepted_snapshots.is_empty()); + assert!(outcome.conflicts.is_empty()); + } + + #[test] + fn sync_peer_rejects_private_exports_without_scope_authority() { + let mut federation = FederationRegistry::new(); + let peer = peer("peer-private", "Peer Private"); + let trust = trust("peer-private", vec![RegistryScope::Public]); + federation + .register_peer(peer.clone(), trust.clone()) + .expect("peer should register"); + + let mut remote_capabilities = CapabilityRegistry::new(); + remote_capabilities + .register(capability_registration( + RegistryScope::Private, + private_capability_contract(), + )) + .expect("private capability should register"); + + let mut remote_events = EventRegistry::new(); + remote_events + .register(event_registration(RegistryScope::Private, event_contract())) + .expect("private event should register"); + + let mut workflow_capabilities = CapabilityRegistry::new(); + seed_capabilities(&mut workflow_capabilities); + let mut remote_workflows = WorkflowRegistry::new(); + remote_workflows + .register( + &workflow_capabilities, + workflow_registration(RegistryScope::Private, workflow_definition()), + ) + .expect("private workflow should register"); + + let export = export_peer_state( + peer, + trust, + &remote_capabilities, + &remote_events, + &remote_workflows, + ); + let outcome = federation + .sync_peer( + export, + &CapabilityRegistry::new(), + &EventRegistry::new(), + &WorkflowRegistry::new(), + "2026-04-09T20:42:00Z", + "2026-04-09T20:43:00Z", + "evidence:sync-private", + ) + .expect("sync should report private-scope rejection as conflicts"); + + assert_eq!(outcome.session.status, FederationSyncStatus::Partial); + assert!( + outcome + .conflicts + .iter() + .all(|conflict| conflict.conflict_reason.contains("does not authorize")) + ); + } + + #[test] + fn sync_reports_conflicts_for_divergent_private_event_and_workflow_entries() { + let mut federation = FederationRegistry::new(); + let peer = peer("peer-divergent", "Peer Divergent"); + let trust = trust( + "peer-divergent", + vec![RegistryScope::Public, RegistryScope::Private], + ); + federation + .register_peer(peer.clone(), trust.clone()) + .expect("peer should register"); + + let mut local_events = EventRegistry::new(); + let mut remote_events = EventRegistry::new(); + let mut local_event_contract = event_contract(); + local_event_contract.summary = "local event".to_string(); + local_events + .register(event_registration( + RegistryScope::Private, + local_event_contract.clone(), + )) + .expect("local private event should register"); + let mut remote_event_contract = local_event_contract.clone(); + remote_event_contract.summary = "remote event".to_string(); + remote_events + .register(event_registration( + RegistryScope::Private, + remote_event_contract, + )) + .expect("remote private event should register"); + + let event_export = export_peer_state( + peer.clone(), + trust.clone(), + &CapabilityRegistry::new(), + &remote_events, + &WorkflowRegistry::new(), + ); + let event_outcome = federation + .sync_peer( + event_export, + &CapabilityRegistry::new(), + &local_events, + &WorkflowRegistry::new(), + "2026-04-09T20:44:00Z", + "2026-04-09T20:45:00Z", + "evidence:divergent-event", + ) + .expect("event divergence should report conflicts"); + assert!(event_outcome.conflicts.iter().any(|conflict| { + conflict + .conflict_reason + .contains("local event record differs") + })); + + let mut federation = FederationRegistry::new(); + federation + .register_peer(peer.clone(), trust.clone()) + .expect("peer should register"); + let mut local_capabilities = CapabilityRegistry::new(); + let mut remote_capabilities = CapabilityRegistry::new(); + seed_capabilities(&mut local_capabilities); + seed_capabilities(&mut remote_capabilities); + + let mut local_workflows = WorkflowRegistry::new(); + let mut remote_workflows = WorkflowRegistry::new(); + local_workflows + .register( + &local_capabilities, + workflow_registration(RegistryScope::Private, workflow_definition()), + ) + .expect("local private workflow should register"); + let mut remote_workflow_definition = workflow_definition(); + remote_workflow_definition.summary = "remote workflow".to_string(); + remote_workflows + .register( + &remote_capabilities, + workflow_registration(RegistryScope::Private, remote_workflow_definition), + ) + .expect("remote private workflow should register"); + + let workflow_export = export_peer_state( + peer, + trust, + &remote_capabilities, + &EventRegistry::new(), + &remote_workflows, + ); + let workflow_outcome = federation + .sync_peer( + workflow_export, + &local_capabilities, + &EventRegistry::new(), + &local_workflows, + "2026-04-09T20:46:00Z", + "2026-04-09T20:47:00Z", + "evidence:divergent-workflow", + ) + .expect("workflow divergence should report conflicts"); + assert!(workflow_outcome.conflicts.iter().any(|conflict| { + conflict + .conflict_reason + .contains("local workflow record differs") + })); + } + + #[test] + fn route_capability_invocation_returns_error_without_matching_snapshot() { + let mut federation = FederationRegistry::new(); + let origin_peer = peer("peer-route-empty", "Peer Route Empty"); + let origin_trust = trust("peer-route-empty", vec![RegistryScope::Public]); + federation + .register_peer(origin_peer, origin_trust) + .expect("origin peer should register"); + + let failure = federation + .route_capability_invocation( + "peer-route-empty", + "federation.capability.missing", + "9.9.9", + "request:missing", + &BTreeSet::from([String::from("peer-route-empty")]), + "2026-04-09T20:48:00Z", + "evidence:missing-route", + ) + .expect_err("missing snapshot should fail closed"); + + assert_eq!( + failure.errors[0].code, + FederationErrorCode::EntryValidationFailed + ); + } + + #[test] + fn register_peer_rejects_invalid_and_duplicate_peers() { + let mut federation = FederationRegistry::new(); + + let mut invalid_peer = peer("peer-invalid", "Peer Invalid"); + invalid_peer.peer_id.clear(); + invalid_peer.display_name.clear(); + invalid_peer.identity_fingerprint.clear(); + invalid_peer.sync_enabled = false; + invalid_peer.trust_state = FederationTrustState::Pending; + + let invalid_trust = TrustRecord { + peer_id: "other-peer".to_string(), + trust_model: "allow-list".to_string(), + allowed_scopes: vec![], + approved_spec_refs: vec![], + approved_at: "2026-04-09T00:00:00Z".to_string(), + revoked_at: None, + }; + + assert!( + federation + .register_peer(invalid_peer, invalid_trust) + .is_err() + ); + + let peer = peer("peer-dup", "Peer Dup"); + let trust = trust("peer-dup", vec![RegistryScope::Public]); + federation + .register_peer(peer.clone(), trust.clone()) + .expect("peer should register"); + + let mut changed_peer = peer.clone(); + changed_peer.display_name = "Peer Dup Updated".to_string(); + assert!(federation.register_peer(changed_peer, trust).is_err()); + } + + #[test] + fn sync_peer_rejects_unregistered_and_invalid_export_paths() { + let mut federation = FederationRegistry::new(); + let registered_peer = peer("peer-sync", "Peer Sync"); + let trust = trust("peer-sync", vec![RegistryScope::Public]); + federation + .register_peer(registered_peer.clone(), trust.clone()) + .expect("peer should register"); + + let local_capabilities = CapabilityRegistry::new(); + let local_events = EventRegistry::new(); + let local_workflows = WorkflowRegistry::new(); + let export = FederationPeerExport { + peer: registered_peer.clone(), + trust: trust.clone(), + capabilities: Vec::new(), + events: Vec::new(), + workflows: Vec::new(), + }; + + assert!( + federation + .sync_peer( + export.clone(), + &local_capabilities, + &local_events, + &local_workflows, + "", + "", + "", + ) + .is_err() + ); + + let mut bad_peer = peer("peer-sync-bad", "Peer Sync Bad"); + bad_peer.sync_enabled = false; + let bad_export = FederationPeerExport { + peer: bad_peer, + trust: TrustRecord { + peer_id: "peer-sync-bad".to_string(), + trust_model: "allow-list".to_string(), + allowed_scopes: vec![RegistryScope::Public], + approved_spec_refs: vec!["005-capability-registry".to_string()], + approved_at: "2026-04-09T00:00:00Z".to_string(), + revoked_at: None, + }, + capabilities: Vec::new(), + events: Vec::new(), + workflows: Vec::new(), + }; + + assert!( + federation + .sync_peer( + bad_export, + &local_capabilities, + &local_events, + &local_workflows, + "2026-04-09T20:00:00Z", + "2026-04-09T20:01:00Z", + "evidence:sync-invalid", + ) + .is_err() + ); + } + + #[test] + fn route_capability_invocation_covers_missing_and_unavailable_paths() { + let mut federation = FederationRegistry::new(); + let origin_peer = peer("peer-route", "Peer Route"); + let origin_trust = trust("peer-route", vec![RegistryScope::Public]); + federation + .register_peer(origin_peer.clone(), origin_trust) + .expect("origin peer should register"); + + assert!( + federation + .route_capability_invocation("", "", "", "", &BTreeSet::new(), "", "",) + .is_err() + ); + + let mut local_capabilities = CapabilityRegistry::new(); + let mut local_events = EventRegistry::new(); + let mut local_workflows = WorkflowRegistry::new(); + seed_capabilities(&mut local_capabilities); + seed_events(&mut local_events); + seed_workflows(&mut local_workflows, &local_capabilities); + + let target_peer = peer("peer-target", "Peer Target"); + let target_trust = trust( + "peer-target", + vec![RegistryScope::Public, RegistryScope::Private], + ); + let export = export_peer_state( + target_peer.clone(), + target_trust.clone(), + &local_capabilities, + &local_events, + &local_workflows, + ); + federation + .register_peer(target_peer, target_trust) + .expect("target peer should register"); + federation + .sync_peer( + export, + &local_capabilities, + &local_events, + &local_workflows, + "2026-04-09T21:00:00Z", + "2026-04-09T21:01:00Z", + "evidence:sync-route", + ) + .expect("sync should succeed"); + + let unavailable = BTreeSet::new(); + let invocation = federation + .route_capability_invocation( + "peer-route", + "federation.capability.echo", + "1.0.0", + "request:route-unavailable", + &unavailable, + "2026-04-09T21:02:00Z", + "evidence:route-unavailable", + ) + .expect("route should return retryable failure rather than error"); + + assert_eq!( + invocation.status, + FederationInvocationStatus::RetryableFailure + ); + assert!(invocation.response_ref.is_none()); + } + + #[test] + fn status_summary_counts_sync_and_route_failures() { + let mut federation = FederationRegistry::new(); + federation.peers.insert( + "peer-summary".to_string(), + peer("peer-summary", "Peer Summary"), + ); + federation.sync_sessions.push(FederationSyncSession { + session_id: "sync_peer-summary_1".to_string(), + peer_id: "peer-summary".to_string(), + started_at: "2026-04-09T22:00:00Z".to_string(), + finished_at: Some("2026-04-09T22:01:00Z".to_string()), + status: FederationSyncStatus::Partial, + registry_types: vec![FederationRegistryKind::Capability], + validated_entries: 1, + rejected_entries: 2, + conflict_count: 2, + evidence_ref: "evidence:summary".to_string(), + }); + federation.invocations.push(FederatedInvocation { + invocation_id: "invocation_peer-summary_federation.capability.echo_1.0.0".to_string(), + origin_peer_id: "peer-summary".to_string(), + target_peer_id: "peer-target".to_string(), + capability_id: "federation.capability.echo".to_string(), + request_ref: "request:summary".to_string(), + status: FederationInvocationStatus::Failure, + response_ref: None, + trace_provenance: CrossPeerTraceProvenance { + trace_id: "trace_peer-summary_federation.capability.echo_1.0.0".to_string(), + origin_peer_id: "peer-summary".to_string(), + owning_peer_id: "peer-target".to_string(), + route_reason: "test route".to_string(), + sync_session_ref: None, + response_status: FederationInvocationStatus::Failure, + evidence_ref: "evidence:summary-route".to_string(), + }, + }); + federation.invocations.push(FederatedInvocation { + invocation_id: "invocation_peer-summary_federation.capability.echo_1.0.1".to_string(), + origin_peer_id: "peer-summary".to_string(), + target_peer_id: "peer-target".to_string(), + capability_id: "federation.capability.echo".to_string(), + request_ref: "request:summary-retryable".to_string(), + status: FederationInvocationStatus::RetryableFailure, + response_ref: None, + trace_provenance: CrossPeerTraceProvenance { + trace_id: "trace_peer-summary_federation.capability.echo_1.0.1".to_string(), + origin_peer_id: "peer-summary".to_string(), + owning_peer_id: "peer-target".to_string(), + route_reason: "retryable route".to_string(), + sync_session_ref: None, + response_status: FederationInvocationStatus::RetryableFailure, + evidence_ref: "evidence:summary-route-retryable".to_string(), + }, + }); + + let summary = federation.status_summary(); + assert_eq!(summary.peer_count, 1); + assert_eq!(summary.trusted_peer_count, 1); + assert_eq!(summary.last_sync_outcome, FederationSyncStatus::Partial); + assert_eq!(summary.blocked_entries, 2); + assert_eq!(summary.route_failures, 2); + } + + #[test] + fn approval_state_from_lifecycle_covers_all_states() { + assert_eq!( + approval_state_from_lifecycle(&Lifecycle::Draft), + FederationApprovalState::Draft + ); + assert_eq!( + approval_state_from_lifecycle(&Lifecycle::Active), + FederationApprovalState::Approved + ); + assert_eq!( + approval_state_from_lifecycle(&Lifecycle::Deprecated), + FederationApprovalState::Deprecated + ); + assert_eq!( + approval_state_from_lifecycle(&Lifecycle::Retired), + FederationApprovalState::Rejected + ); + assert_eq!( + approval_state_from_lifecycle(&Lifecycle::Archived), + FederationApprovalState::Rejected + ); + } + + #[test] + fn is_route_failure_covers_failure_variants() { + assert!(!is_route_failure(FederationInvocationStatus::Success)); + assert!(is_route_failure(FederationInvocationStatus::Failure)); + assert!(is_route_failure( + FederationInvocationStatus::RetryableFailure + )); + } + + fn peer(peer_id: &str, display_name: &str) -> FederationPeer { + FederationPeer { + peer_id: peer_id.to_string(), + display_name: display_name.to_string(), + trust_state: FederationTrustState::Trusted, + identity_fingerprint: format!("fingerprint:{peer_id}"), + sync_enabled: true, + last_sync_at: None, + last_sync_status: FederationSyncStatus::Unknown, + visible_registry_scopes: vec![RegistryScope::Public, RegistryScope::Private], + } + } + + fn trust(peer_id: &str, scopes: Vec) -> TrustRecord { + TrustRecord { + peer_id: peer_id.to_string(), + trust_model: "shared-api-token".to_string(), + allowed_scopes: scopes, + approved_spec_refs: vec!["026-federation-registry-routing".to_string()], + approved_at: "2026-04-09T19:30:00Z".to_string(), + revoked_at: None, + } + } + + fn seed_capabilities(registry: &mut CapabilityRegistry) { + registry + .register(capability_registration( + RegistryScope::Public, + capability_contract(), + )) + .expect("capability should register"); + registry + .register(capability_registration( + RegistryScope::Private, + private_capability_contract(), + )) + .expect("private capability should register"); + } + + fn seed_events(registry: &mut EventRegistry) { + registry + .register(event_registration(RegistryScope::Public, event_contract())) + .expect("event should register"); + } + + fn seed_workflows(registry: &mut WorkflowRegistry, capabilities: &CapabilityRegistry) { + registry + .register( + capabilities, + workflow_registration(RegistryScope::Public, workflow_definition()), + ) + .expect("workflow should register"); + } + + fn capability_contract() -> CapabilityContract { + CapabilityContract { + kind: "capability_contract".to_string(), + schema_version: "1.0.0".to_string(), + id: "federation.capability.echo".to_string(), + namespace: "federation.capability".to_string(), + name: "echo".to_string(), + version: "1.0.0".to_string(), + lifecycle: Lifecycle::Active, + owner: Owner { + team: "platform".to_string(), + contact: "platform@example.com".to_string(), + }, + summary: "Echo a federated capability call.".to_string(), + description: "End-to-end federation test capability.".to_string(), + inputs: SchemaContainer { + schema: json!({"type":"object"}), + }, + outputs: SchemaContainer { + schema: json!({"type":"object"}), + }, + preconditions: vec![], + postconditions: vec![], + side_effects: vec![SideEffect { + kind: SideEffectKind::EventEmission, + description: "Emit routing evidence for federation sync.".to_string(), + }], + emits: vec![EventReference { + event_id: "federation.event.routed".to_string(), + version: "1.0.0".to_string(), + }], + consumes: vec![], + permissions: vec![], + execution: traverse_contracts::Execution { + binary_format: traverse_contracts::BinaryFormat::Wasm, + entrypoint: Entrypoint { + kind: EntrypointKind::WasiCommand, + command: "echo".to_string(), + }, + preferred_targets: vec![traverse_contracts::ExecutionTarget::Local], + constraints: traverse_contracts::ExecutionConstraints { + host_api_access: traverse_contracts::HostApiAccess::None, + filesystem_access: traverse_contracts::FilesystemAccess::None, + network_access: traverse_contracts::NetworkAccess::Forbidden, + }, + }, + policies: vec![], + dependencies: vec![], + provenance: traverse_contracts::Provenance { + source: traverse_contracts::ProvenanceSource::Greenfield, + author: "enricopiovesan".to_string(), + created_at: "2026-04-09T19:00:00Z".to_string(), + spec_ref: Some("026-federation-registry-routing".to_string()), + adr_refs: vec![], + exception_refs: vec![], + }, + evidence: vec![], + service_type: traverse_contracts::ServiceType::Stateless, + permitted_targets: vec![ + traverse_contracts::ExecutionTarget::Local, + traverse_contracts::ExecutionTarget::Browser, + traverse_contracts::ExecutionTarget::Edge, + traverse_contracts::ExecutionTarget::Cloud, + traverse_contracts::ExecutionTarget::Worker, + traverse_contracts::ExecutionTarget::Device, + ], + event_trigger: None, + } + } + + fn private_capability_contract() -> CapabilityContract { + let mut contract = capability_contract(); + contract.id = "federation.capability.private-echo".to_string(); + contract.name = "private-echo".to_string(); + contract.summary = "Private federated echo.".to_string(); + contract + } + + fn event_contract() -> EventContract { + EventContract { + kind: "event_contract".to_string(), + schema_version: "1.0.0".to_string(), + id: "federation.event.routed".to_string(), + namespace: "federation.event".to_string(), + name: "routed".to_string(), + version: "1.0.0".to_string(), + lifecycle: Lifecycle::Active, + owner: Owner { + team: "platform".to_string(), + contact: "platform@example.com".to_string(), + }, + summary: "A federation routing event.".to_string(), + description: "End-to-end federation event.".to_string(), + payload: EventPayload { + schema: json!({"type":"object"}), + compatibility: PayloadCompatibility::BackwardCompatible, + }, + classification: EventClassification { + domain: "federation".to_string(), + bounded_context: "registry".to_string(), + event_type: EventType::System, + tags: vec!["federation".to_string()], + }, + publishers: vec![traverse_contracts::CapabilityReference { + capability_id: "federation.capability.echo".to_string(), + version: "1.0.0".to_string(), + }], + subscribers: vec![traverse_contracts::CapabilityReference { + capability_id: "federation.capability.private-echo".to_string(), + version: "1.0.0".to_string(), + }], + policies: vec![], + tags: vec!["federation".to_string()], + provenance: EventProvenance { + source: EventProvenanceSource::Greenfield, + author: "enricopiovesan".to_string(), + created_at: "2026-04-09T19:00:00Z".to_string(), + }, + evidence: vec![], + } + } + + fn workflow_definition() -> WorkflowDefinition { + WorkflowDefinition { + kind: "workflow_definition".to_string(), + schema_version: "1.0.0".to_string(), + id: "federation.workflow.route".to_string(), + name: "route".to_string(), + version: "1.0.0".to_string(), + lifecycle: Lifecycle::Active, + owner: Owner { + team: "platform".to_string(), + contact: "platform@example.com".to_string(), + }, + summary: "A federated routing workflow.".to_string(), + inputs: SchemaContainer { + schema: json!({"type":"object"}), + }, + outputs: SchemaContainer { + schema: json!({"type":"object"}), + }, + nodes: vec![WorkflowNode { + node_id: "route-node".to_string(), + capability_id: "federation.capability.echo".to_string(), + capability_version: "1.0.0".to_string(), + input: WorkflowNodeInput { + from_workflow_input: vec!["request".to_string()], + }, + output: WorkflowNodeOutput { + to_workflow_state: vec!["response".to_string()], + }, + }], + edges: vec![], + start_node: "route-node".to_string(), + terminal_nodes: vec!["route-node".to_string()], + tags: vec!["federation".to_string()], + governing_spec: "007-workflow-registry-traversal".to_string(), + } + } + + fn capability_registration( + scope: RegistryScope, + contract: CapabilityContract, + ) -> CapabilityRegistration { + CapabilityRegistration { + scope, + contract_path: format!( + "registry/{}/{}/{}{}", + scope_name(scope), + contract.id, + contract.version, + "/contract.json" + ), + artifact: CapabilityArtifactRecord { + artifact_ref: format!("artifact:{}:{}", contract.name, contract.version), + implementation_kind: ImplementationKind::Executable, + source: SourceReference { + kind: SourceKind::Git, + location: format!("https://example.invalid/{}", contract.name), + }, + binary: Some(BinaryReference { + format: BinaryFormat::Wasm, + location: format!("artifacts/{}/{}.wasm", contract.name, contract.version), + }), + workflow_ref: None, + digests: ArtifactDigests { + source_digest: format!("source:{}:{}", contract.name, contract.version), + binary_digest: Some(format!("binary:{}:{}", contract.name, contract.version)), + }, + provenance: RegistryProvenance { + source: "greenfield".to_string(), + author: "enricopiovesan".to_string(), + created_at: "2026-04-09T19:00:00Z".to_string(), + }, + }, + registered_at: "2026-04-09T19:00:00Z".to_string(), + tags: vec!["federation".to_string()], + composability: ComposabilityMetadata { + kind: CompositionKind::Atomic, + patterns: vec![CompositionPattern::Sequential], + provides: vec!["federation".to_string()], + requires: vec!["registry".to_string()], + }, + governing_spec: "005-capability-registry".to_string(), + validator_version: "registry-test".to_string(), + contract, + } + } + + fn event_registration( + scope: RegistryScope, + contract: EventContract, + ) -> crate::EventRegistration { + crate::EventRegistration { + scope, + contract, + contract_path: format!( + "registry/{}/{}/{}{}", + scope_name(scope), + "federation.event.routed", + "1.0.0", + "/contract.json" + ), + registered_at: "2026-04-09T19:00:00Z".to_string(), + governing_spec: "011-event-registry".to_string(), + validator_version: "registry-test".to_string(), + } + } + + fn workflow_registration( + scope: RegistryScope, + definition: WorkflowDefinition, + ) -> WorkflowRegistration { + WorkflowRegistration { + scope, + definition, + workflow_path: "registry/public/federation.workflow.route/1.0.0/workflow.json" + .to_string(), + registered_at: "2026-04-09T19:00:00Z".to_string(), + validator_version: "registry-test".to_string(), + } + } + + fn scope_name(scope: RegistryScope) -> &'static str { + match scope { + RegistryScope::Public => "public", + RegistryScope::Private => "private", + } + } +} diff --git a/crates/traverse-registry/src/lib.rs b/crates/traverse-registry/src/lib.rs index c8d55654..68692952 100644 --- a/crates/traverse-registry/src/lib.rs +++ b/crates/traverse-registry/src/lib.rs @@ -2,10 +2,12 @@ mod bundle; mod events; +mod federation; mod graph; mod workflows; pub use bundle::*; pub use events::*; +pub use federation::*; pub use graph::*; pub use workflows::*; From bc509aef639cb80e658c1688e9ca3fb5f7eccd69 Mon Sep 17 00:00:00 2001 From: Enrico Piovesan Date: Thu, 16 Apr 2026 09:14:37 -0500 Subject: [PATCH 2/3] Trigger PR checks for issue 241 From 481d02a0e075605babc2bea6ae71dfc4b9787522 Mon Sep 17 00:00:00 2001 From: Enrico Piovesan Date: Thu, 16 Apr 2026 09:15:54 -0500 Subject: [PATCH 3/3] Keep issue 241 scoped to federation operator surface --- .github/workflows/codeql.yml | 48 ------------------------------------ 1 file changed, 48 deletions(-) delete mode 100644 .github/workflows/codeql.yml diff --git a/.github/workflows/codeql.yml b/.github/workflows/codeql.yml deleted file mode 100644 index 74315aad..00000000 --- a/.github/workflows/codeql.yml +++ /dev/null @@ -1,48 +0,0 @@ -name: CodeQL - -on: - push: - branches: - - main - pull_request: - schedule: - - cron: "23 6 * * 1" - -jobs: - analyze: - name: Analyze (${{ matrix.language }}) - runs-on: ubuntu-latest - permissions: - actions: read - contents: read - security-events: write - - strategy: - fail-fast: false - matrix: - include: - - language: actions - build-mode: none - - language: javascript-typescript - build-mode: none - - language: rust - build-mode: none - - steps: - - name: Checkout - uses: actions/checkout@v4 - - - name: Initialize CodeQL - uses: github/codeql-action/init@v3 - with: - languages: ${{ matrix.language }} - build-mode: ${{ matrix.build-mode }} - - - name: Set up Rust - if: matrix.language == 'rust' - uses: dtolnay/rust-toolchain@stable - with: - toolchain: 1.94.0 - - - name: Perform CodeQL Analysis - uses: github/codeql-action/analyze@v3