From 62600b504fe2cedf74bc8de40f480c3a16b6ef8a Mon Sep 17 00:00:00 2001 From: Hamza Khalid Date: Wed, 18 Mar 2026 17:04:26 +0500 Subject: [PATCH 1/4] fix: e3meta not available during sync replay --- crates/sortition/src/ciphernode_selector.rs | 22 +++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/crates/sortition/src/ciphernode_selector.rs b/crates/sortition/src/ciphernode_selector.rs index dd3eed23a8..94ad9a5f8f 100644 --- a/crates/sortition/src/ciphernode_selector.rs +++ b/crates/sortition/src/ciphernode_selector.rs @@ -57,6 +57,7 @@ impl CiphernodeSelector { let e3_cache = selector_store.load_or_default(HashMap::new()).await?; let addr = CiphernodeSelector::new(bus, e3_cache, address).start(); + bus.subscribe(EventType::E3Requested, addr.clone().recipient()); bus.subscribe(EventType::E3RequestComplete, addr.clone().recipient()); bus.subscribe(EventType::CommitteeFinalized, addr.clone().recipient()); bus.subscribe(EventType::Shutdown, addr.clone().recipient()); @@ -71,6 +72,7 @@ impl Handler for CiphernodeSelector { fn handle(&mut self, msg: EnclaveEvent, ctx: &mut Self::Context) -> Self::Result { let (msg, ec) = msg.into_components(); match msg { + EnclaveEventData::E3Requested(data) => self.notify_sync(ctx, TypedEvent::new(data, ec)), EnclaveEventData::E3RequestComplete(data) => { self.notify_sync(ctx, TypedEvent::new(data, ec)) } @@ -83,6 +85,26 @@ impl Handler for CiphernodeSelector { } } +impl Handler> for CiphernodeSelector { + type Result = (); + + fn handle(&mut self, msg: TypedEvent, _: &mut Self::Context) -> Self::Result { + trap(EType::Sortition, &self.bus.with_ec(msg.get_ctx()), || { + self.e3_cache.try_mutate(msg.get_ctx(), |mut cache| { + cache.entry(msg.e3_id.clone()).or_insert(E3Meta { + seed: msg.seed, + threshold_n: msg.threshold_n, + threshold_m: msg.threshold_m, + params: msg.params.clone(), + esi_per_ct: msg.esi_per_ct, + error_size: msg.error_size.clone(), + }); + Ok(cache) + }) + }) + } +} + impl Handler>> for CiphernodeSelector { type Result = (); From c2bded641d64b07439c19cdfa394c23744899d0e Mon Sep 17 00:00:00 2001 From: Hamza Khalid Date: Wed, 18 Mar 2026 17:37:59 +0500 Subject: [PATCH 2/4] fix: sync hang on agg start --- crates/net/src/net_sync_manager.rs | 43 +++++++++++++++++++----------- crates/sync/src/sync.rs | 1 + 2 files changed, 29 insertions(+), 15 deletions(-) diff --git a/crates/net/src/net_sync_manager.rs b/crates/net/src/net_sync_manager.rs index c6b99c8c50..8a7661f089 100644 --- a/crates/net/src/net_sync_manager.rs +++ b/crates/net/src/net_sync_manager.rs @@ -367,22 +367,35 @@ async fn handle_sync_request_event( aggregate_id, since ); let requester = DirectRequester::builder(net_cmds.clone(), net_events.clone()).build(); - let events: Vec> = - fetch_all_batched_events(requester, PeerTarget::Random, *aggregate_id, *since, 100) - .await?; - - info!( - "Received {} events for aggregate_id={}", - events.len(), - aggregate_id - ); - - for enclave_event in events { - let ts = enclave_event.ts(); - if ts > latest_timestamp { - latest_timestamp = ts; + match fetch_all_batched_events::>( + requester, + PeerTarget::Random, + *aggregate_id, + *since, + 100, + ) + .await + { + Ok(events) => { + info!( + "Received {} events for aggregate_id={}", + events.len(), + aggregate_id + ); + for enclave_event in events { + let ts = enclave_event.ts(); + if ts > latest_timestamp { + latest_timestamp = ts; + } + all_events.push(enclave_event); + } + } + Err(e) => { + warn!( + "Failed to fetch events for aggregate_id={}: {e}. Continuing with available events.", + aggregate_id + ); } - all_events.push(enclave_event); } } diff --git a/crates/sync/src/sync.rs b/crates/sync/src/sync.rs index 59f8c190e6..e976039a60 100644 --- a/crates/sync/src/sync.rs +++ b/crates/sync/src/sync.rs @@ -29,6 +29,7 @@ fn is_infrastructure_event(event: &EnclaveEvent) -> bool { EnclaveEventData::SyncEnded(_) | EnclaveEventData::EffectsEnabled(_) | EnclaveEventData::HistoricalEvmSyncStart(_) + | EnclaveEventData::HistoricalNetSyncStart(_) ) } From 7c374e37be9748bca37a97b58379b5bd1ea3147a Mon Sep 17 00:00:00 2001 From: Hamza Khalid Date: Thu, 19 Mar 2026 15:22:50 +0500 Subject: [PATCH 3/4] fix: print reddial nodes --- crates/net/src/net_interface.rs | 8 ++++ crates/sortition/src/ciphernode_selector.rs | 45 ++++++++++++--------- 2 files changed, 34 insertions(+), 19 deletions(-) diff --git a/crates/net/src/net_interface.rs b/crates/net/src/net_interface.rs index 90a85e4f3f..54f378c0e4 100644 --- a/crates/net/src/net_interface.rs +++ b/crates/net/src/net_interface.rs @@ -185,6 +185,14 @@ impl Libp2pNetInterface { self.swarm.listen_on(addr.parse()?)?; trace!("Peers to dial: {:?}", self.peers); + if self.peers.is_empty() { + info!("Found 0 peers to dial"); + } else { + info!("Found {} peer(s) to dial:", self.peers.len()); + for peer in &self.peers { + info!(" -> {}", peer); + } + } tokio::spawn({ let event_tx = event_tx.clone(); let cmd_tx = cmd_tx.clone(); diff --git a/crates/sortition/src/ciphernode_selector.rs b/crates/sortition/src/ciphernode_selector.rs index 94ad9a5f8f..ac7e691b75 100644 --- a/crates/sortition/src/ciphernode_selector.rs +++ b/crates/sortition/src/ciphernode_selector.rs @@ -21,6 +21,18 @@ use e3_utils::MAILBOX_LIMIT; use std::collections::HashMap; use tracing::info; +/// Build an `E3Meta` from an `E3Requested` event's fields. +fn e3_meta_from(req: &E3Requested) -> E3Meta { + E3Meta { + seed: req.seed, + threshold_n: req.threshold_n, + threshold_m: req.threshold_m, + params: req.params.clone(), + esi_per_ct: req.esi_per_ct, + error_size: req.error_size.clone(), + } +} + /// CiphernodeSelector is an actor that determines if a ciphernode is part of a committee and if so /// emits a TicketGenerated event (score sortition) to the event bus pub struct CiphernodeSelector { @@ -85,20 +97,25 @@ impl Handler for CiphernodeSelector { } } +/// Handles `E3Requested` events received directly from the EventBus. +/// +/// This handler populates `e3_cache` during sync replay, when `Sortition` gates its +/// `E3Requested` subscription behind `EffectsEnabled` and therefore does NOT forward +/// `WithSortitionTicket` messages to us. Without this handler the cache would be empty +/// when `CommitteeFinalized` arrives during replay, causing a missing-meta error. +/// +/// During live operation both this handler AND the `WithSortitionTicket` handler fire for +/// the same E3. `or_insert` ensures the first write wins; the `WithSortitionTicket` +/// handler then overwrites with identical data via `insert`. impl Handler> for CiphernodeSelector { type Result = (); fn handle(&mut self, msg: TypedEvent, _: &mut Self::Context) -> Self::Result { trap(EType::Sortition, &self.bus.with_ec(msg.get_ctx()), || { self.e3_cache.try_mutate(msg.get_ctx(), |mut cache| { - cache.entry(msg.e3_id.clone()).or_insert(E3Meta { - seed: msg.seed, - threshold_n: msg.threshold_n, - threshold_m: msg.threshold_m, - params: msg.params.clone(), - esi_per_ct: msg.esi_per_ct, - error_size: msg.error_size.clone(), - }); + cache + .entry(msg.e3_id.clone()) + .or_insert_with(|| e3_meta_from(&msg)); Ok(cache) }) }) @@ -119,17 +136,7 @@ impl Handler>> for CiphernodeSelecto "Mutating e3_cache: appending data: {:?}", data.e3_id.clone() ); - cache.insert( - data.e3_id.clone(), - E3Meta { - seed: data.seed, - threshold_n: data.threshold_n, - threshold_m: data.threshold_m, - params: data.params.clone(), - esi_per_ct: data.esi_per_ct, - error_size: data.error_size.clone(), - }, - ); + cache.insert(data.e3_id.clone(), e3_meta_from(&data)); Ok(cache) })?; From f6c2382dc7812e7025fde33c1f0f1bfd18a7dacd Mon Sep 17 00:00:00 2001 From: Hamza Khalid Date: Thu, 19 Mar 2026 15:25:45 +0500 Subject: [PATCH 4/4] fix: review --- crates/net/src/net_sync_manager.rs | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/crates/net/src/net_sync_manager.rs b/crates/net/src/net_sync_manager.rs index 8a7661f089..01bd433a36 100644 --- a/crates/net/src/net_sync_manager.rs +++ b/crates/net/src/net_sync_manager.rs @@ -360,6 +360,7 @@ async fn handle_sync_request_event( let mut all_events: Vec> = Vec::new(); let mut latest_timestamp: u128 = 0; + let mut failed_aggregates: Vec = Vec::new(); for (aggregate_id, since) in event.since.iter() { info!( @@ -395,10 +396,18 @@ async fn handle_sync_request_event( "Failed to fetch events for aggregate_id={}: {e}. Continuing with available events.", aggregate_id ); + failed_aggregates.push(*aggregate_id); } } } + if !failed_aggregates.is_empty() { + bail!( + "failed to fetch historical net events for aggregates: {:?}", + failed_aggregates + ); + } + info!( "Sync complete: collected {} events across {} aggregates, latest_timestamp={}", all_events.len(),