diff --git a/crates/net/src/net_interface.rs b/crates/net/src/net_interface.rs index 90a85e4f3f..54f378c0e4 100644 --- a/crates/net/src/net_interface.rs +++ b/crates/net/src/net_interface.rs @@ -185,6 +185,14 @@ impl Libp2pNetInterface { self.swarm.listen_on(addr.parse()?)?; trace!("Peers to dial: {:?}", self.peers); + if self.peers.is_empty() { + info!("Found 0 peers to dial"); + } else { + info!("Found {} peer(s) to dial:", self.peers.len()); + for peer in &self.peers { + info!(" -> {}", peer); + } + } tokio::spawn({ let event_tx = event_tx.clone(); let cmd_tx = cmd_tx.clone(); diff --git a/crates/net/src/net_sync_manager.rs b/crates/net/src/net_sync_manager.rs index c6b99c8c50..01bd433a36 100644 --- a/crates/net/src/net_sync_manager.rs +++ b/crates/net/src/net_sync_manager.rs @@ -360,6 +360,7 @@ async fn handle_sync_request_event( let mut all_events: Vec> = Vec::new(); let mut latest_timestamp: u128 = 0; + let mut failed_aggregates: Vec = Vec::new(); for (aggregate_id, since) in event.since.iter() { info!( @@ -367,25 +368,46 @@ async fn handle_sync_request_event( aggregate_id, since ); let requester = DirectRequester::builder(net_cmds.clone(), net_events.clone()).build(); - let events: Vec> = - fetch_all_batched_events(requester, PeerTarget::Random, *aggregate_id, *since, 100) - .await?; - - info!( - "Received {} events for aggregate_id={}", - events.len(), - aggregate_id - ); - - for enclave_event in events { - let ts = enclave_event.ts(); - if ts > latest_timestamp { - latest_timestamp = ts; + match fetch_all_batched_events::>( + requester, + PeerTarget::Random, + *aggregate_id, + *since, + 100, + ) + .await + { + Ok(events) => { + info!( + "Received {} events for aggregate_id={}", + events.len(), + aggregate_id + ); + for enclave_event in events { + let ts = enclave_event.ts(); + if ts > latest_timestamp { + latest_timestamp = ts; + } + all_events.push(enclave_event); + } + } + Err(e) => { + warn!( + "Failed to fetch events for aggregate_id={}: {e}. Continuing with available events.", + aggregate_id + ); + failed_aggregates.push(*aggregate_id); } - all_events.push(enclave_event); } } + if !failed_aggregates.is_empty() { + bail!( + "failed to fetch historical net events for aggregates: {:?}", + failed_aggregates + ); + } + info!( "Sync complete: collected {} events across {} aggregates, latest_timestamp={}", all_events.len(), diff --git a/crates/sortition/src/ciphernode_selector.rs b/crates/sortition/src/ciphernode_selector.rs index dd3eed23a8..ac7e691b75 100644 --- a/crates/sortition/src/ciphernode_selector.rs +++ b/crates/sortition/src/ciphernode_selector.rs @@ -21,6 +21,18 @@ use e3_utils::MAILBOX_LIMIT; use std::collections::HashMap; use tracing::info; +/// Build an `E3Meta` from an `E3Requested` event's fields. +fn e3_meta_from(req: &E3Requested) -> E3Meta { + E3Meta { + seed: req.seed, + threshold_n: req.threshold_n, + threshold_m: req.threshold_m, + params: req.params.clone(), + esi_per_ct: req.esi_per_ct, + error_size: req.error_size.clone(), + } +} + /// CiphernodeSelector is an actor that determines if a ciphernode is part of a committee and if so /// emits a TicketGenerated event (score sortition) to the event bus pub struct CiphernodeSelector { @@ -57,6 +69,7 @@ impl CiphernodeSelector { let e3_cache = selector_store.load_or_default(HashMap::new()).await?; let addr = CiphernodeSelector::new(bus, e3_cache, address).start(); + bus.subscribe(EventType::E3Requested, addr.clone().recipient()); bus.subscribe(EventType::E3RequestComplete, addr.clone().recipient()); bus.subscribe(EventType::CommitteeFinalized, addr.clone().recipient()); bus.subscribe(EventType::Shutdown, addr.clone().recipient()); @@ -71,6 +84,7 @@ impl Handler for CiphernodeSelector { fn handle(&mut self, msg: EnclaveEvent, ctx: &mut Self::Context) -> Self::Result { let (msg, ec) = msg.into_components(); match msg { + EnclaveEventData::E3Requested(data) => self.notify_sync(ctx, TypedEvent::new(data, ec)), EnclaveEventData::E3RequestComplete(data) => { self.notify_sync(ctx, TypedEvent::new(data, ec)) } @@ -83,6 +97,31 @@ impl Handler for CiphernodeSelector { } } +/// Handles `E3Requested` events received directly from the EventBus. +/// +/// This handler populates `e3_cache` during sync replay, when `Sortition` gates its +/// `E3Requested` subscription behind `EffectsEnabled` and therefore does NOT forward +/// `WithSortitionTicket` messages to us. Without this handler the cache would be empty +/// when `CommitteeFinalized` arrives during replay, causing a missing-meta error. +/// +/// During live operation both this handler AND the `WithSortitionTicket` handler fire for +/// the same E3. `or_insert` ensures the first write wins; the `WithSortitionTicket` +/// handler then overwrites with identical data via `insert`. +impl Handler> for CiphernodeSelector { + type Result = (); + + fn handle(&mut self, msg: TypedEvent, _: &mut Self::Context) -> Self::Result { + trap(EType::Sortition, &self.bus.with_ec(msg.get_ctx()), || { + self.e3_cache.try_mutate(msg.get_ctx(), |mut cache| { + cache + .entry(msg.e3_id.clone()) + .or_insert_with(|| e3_meta_from(&msg)); + Ok(cache) + }) + }) + } +} + impl Handler>> for CiphernodeSelector { type Result = (); @@ -97,17 +136,7 @@ impl Handler>> for CiphernodeSelecto "Mutating e3_cache: appending data: {:?}", data.e3_id.clone() ); - cache.insert( - data.e3_id.clone(), - E3Meta { - seed: data.seed, - threshold_n: data.threshold_n, - threshold_m: data.threshold_m, - params: data.params.clone(), - esi_per_ct: data.esi_per_ct, - error_size: data.error_size.clone(), - }, - ); + cache.insert(data.e3_id.clone(), e3_meta_from(&data)); Ok(cache) })?; diff --git a/crates/sync/src/sync.rs b/crates/sync/src/sync.rs index 59f8c190e6..e976039a60 100644 --- a/crates/sync/src/sync.rs +++ b/crates/sync/src/sync.rs @@ -29,6 +29,7 @@ fn is_infrastructure_event(event: &EnclaveEvent) -> bool { EnclaveEventData::SyncEnded(_) | EnclaveEventData::EffectsEnabled(_) | EnclaveEventData::HistoricalEvmSyncStart(_) + | EnclaveEventData::HistoricalNetSyncStart(_) ) }