From c13f636758db1e44960b6cdd6baf23a217d79021 Mon Sep 17 00:00:00 2001 From: ctrlc03 <93448202+ctrlc03@users.noreply.github.com> Date: Tue, 17 Mar 2026 15:12:09 +0000 Subject: [PATCH 1/7] feat: do not replay unnecessary actions on sync --- crates/aggregator/src/committee_finalizer.rs | 22 ++- crates/evm/src/enclave_sol_writer.rs | 4 +- crates/multithread/src/multithread.rs | 3 +- crates/sortition/src/sortition.rs | 18 +- crates/sync/src/sync.rs | 185 +++++++++++++++++++ 5 files changed, 224 insertions(+), 8 deletions(-) diff --git a/crates/aggregator/src/committee_finalizer.rs b/crates/aggregator/src/committee_finalizer.rs index dcfe621f44..cbbabc5255 100644 --- a/crates/aggregator/src/committee_finalizer.rs +++ b/crates/aggregator/src/committee_finalizer.rs @@ -6,8 +6,9 @@ use actix::prelude::*; use e3_events::{ - prelude::*, trap, BusHandle, CommitteeFinalizeRequested, CommitteeRequested, E3Failed, E3Stage, - E3StageChanged, EType, EnclaveEvent, EnclaveEventData, EventType, Shutdown, TypedEvent, + prelude::*, run_once, trap, BusHandle, CommitteeFinalizeRequested, CommitteeRequested, + E3Failed, E3Stage, E3StageChanged, EType, EffectsEnabled, EnclaveEvent, EnclaveEventData, + EventType, Shutdown, TypedEvent, }; use e3_utils::{NotifySync, MAILBOX_LIMIT}; use std::collections::HashMap; @@ -32,9 +33,9 @@ impl CommitteeFinalizer { pub fn attach(bus: &BusHandle) -> Addr { let addr = CommitteeFinalizer::new(bus).start(); + // Subscribe to state-building / cleanup events immediately bus.subscribe_all( &[ - EventType::CommitteeRequested, EventType::Shutdown, EventType::E3Failed, EventType::E3StageChanged, @@ -42,6 +43,21 @@ impl CommitteeFinalizer { addr.clone().recipient(), ); + // Gate CommitteeRequested behind EffectsEnabled — finalization should not + // be scheduled during historical event replay. + bus.subscribe( + EventType::EffectsEnabled, + run_once::({ + let bus = bus.clone(); + let addr = addr.clone(); + move |_| { + bus.subscribe(EventType::CommitteeRequested, addr.into()); + Ok(()) + } + }) + .recipient(), + ); + addr } } diff --git a/crates/evm/src/enclave_sol_writer.rs b/crates/evm/src/enclave_sol_writer.rs index c471c41630..16fc786fc7 100644 --- a/crates/evm/src/enclave_sol_writer.rs +++ b/crates/evm/src/enclave_sol_writer.rs @@ -24,8 +24,8 @@ use e3_events::EventType; use e3_events::Shutdown; use e3_events::{prelude::*, EffectsEnabled}; use e3_events::{run_once, EnclaveEvent}; -use e3_events::{E3Stage, E3StageChanged}; -use e3_events::{E3id, EType, PlaintextAggregated}; +use e3_events::{E3Stage, E3StageChanged, E3id}; +use e3_events::{EType, PlaintextAggregated}; use e3_utils::NotifySync; use e3_utils::MAILBOX_LIMIT; use tracing::info; diff --git a/crates/multithread/src/multithread.rs b/crates/multithread/src/multithread.rs index a40104fd76..2b87019c00 100644 --- a/crates/multithread/src/multithread.rs +++ b/crates/multithread/src/multithread.rs @@ -136,6 +136,8 @@ impl Multithread { ) -> Addr { let addr = Self::new(bus.clone(), rng.clone(), cipher.clone(), task_pool, report).start(); + // Gate ComputeRequest behind EffectsEnabled — proof generation should + // not trigger during historical event replay. bus.subscribe( EventType::EffectsEnabled, run_once::({ @@ -184,7 +186,6 @@ impl Actor for Multithread { impl Handler for Multithread { type Result = (); fn handle(&mut self, msg: EnclaveEvent, ctx: &mut Self::Context) -> Self::Result { - info!("Multithread received EnclaveEvent!"); let (data, ec) = msg.into_components(); match data { EnclaveEventData::ComputeRequest(data) => ctx.notify(TypedEvent::new(data, ec)), diff --git a/crates/sortition/src/sortition.rs b/crates/sortition/src/sortition.rs index ee6e37ba09..d46805e6bc 100644 --- a/crates/sortition/src/sortition.rs +++ b/crates/sortition/src/sortition.rs @@ -278,10 +278,9 @@ impl Sortition { }) .start(); - // Subscribe to all relevant events + // Subscribe to state-building events immediately (needed during EventStore replay) bus.subscribe_all( &[ - EventType::E3Requested, EventType::CiphernodeAdded, EventType::CiphernodeRemoved, EventType::TicketBalanceUpdated, @@ -297,6 +296,21 @@ impl Sortition { addr.clone().into(), ); + // Gate E3Requested behind EffectsEnabled — sortition should not trigger + // ticket generation during historical event replay. + bus.subscribe( + EventType::EffectsEnabled, + e3_events::run_once::({ + let bus = bus.clone(); + let addr = addr.clone(); + move |_| { + bus.subscribe(EventType::E3Requested, addr.into()); + Ok(()) + } + }) + .recipient(), + ); + info!("Sortition actor started"); Ok(addr) } diff --git a/crates/sync/src/sync.rs b/crates/sync/src/sync.rs index 4fe10ec225..59f8c190e6 100644 --- a/crates/sync/src/sync.rs +++ b/crates/sync/src/sync.rs @@ -493,4 +493,189 @@ mod tests { assert_eq!(result.len(), 3); } + + /// Verify that `run_once::` correctly gates event subscriptions. + /// + /// Simulates the sync flow: + /// 1. An event is published BEFORE EffectsEnabled (should be dropped — nobody listening) + /// 2. EffectsEnabled is published (triggers subscription) + /// 3. The same event is published AFTER EffectsEnabled (should be received) + /// + /// This is the pattern used by Sortition (E3Requested), CommitteeFinalizer + /// (CommitteeRequested), Multithread (ComputeRequest), and the sol writers. + #[actix::test] + async fn effects_enabled_gates_event_subscriptions() -> anyhow::Result<()> { + use std::sync::{ + atomic::{AtomicU32, Ordering}, + Arc, + }; + + let system = EventSystem::new().with_fresh_bus(); + let bus = system.handle()?.enable("test-effects-gating"); + + let receive_count = Arc::new(AtomicU32::new(0)); + + // Set up a gated subscription: only subscribe to TestEvent after EffectsEnabled + let counter = receive_count.clone(); + let runner = e3_events::run_once::({ + let bus = bus.clone(); + move |_| { + // Create a simple actor that counts received TestEvents + use actix::{Actor, Context, Handler}; + + struct Counter(Arc); + impl Actor for Counter { + type Context = Context; + } + impl Handler for Counter { + type Result = (); + fn handle(&mut self, msg: EnclaveEvent, _: &mut Self::Context) -> Self::Result { + if matches!(msg.get_data(), EnclaveEventData::TestEvent(_)) { + self.0.fetch_add(1, Ordering::SeqCst); + } + } + } + + let addr = Counter(counter).start(); + bus.subscribe(EventType::TestEvent, addr.recipient()); + Ok(()) + } + }); + bus.subscribe(EventType::EffectsEnabled, runner.recipient()); + + // 1. Publish a TestEvent BEFORE EffectsEnabled — should NOT be received + bus.event_bus().try_send( + EnclaveEvent::::test_event("before-effects") + .id(1) + .seq(1) + .build(), + )?; + + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + assert_eq!( + receive_count.load(Ordering::SeqCst), + 0, + "Event before EffectsEnabled should not be received" + ); + + // 2. Publish EffectsEnabled — triggers the subscription + bus.publish_without_context(EffectsEnabled::new())?; + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + // 3. Publish a TestEvent AFTER EffectsEnabled — should be received + bus.event_bus().try_send( + EnclaveEvent::::test_event("after-effects") + .id(2) + .seq(2) + .build(), + )?; + + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + assert_eq!( + receive_count.load(Ordering::SeqCst), + 1, + "Event after EffectsEnabled should be received exactly once" + ); + + Ok(()) + } + + /// Verify that ungated (immediate) subscriptions receive events both + /// before and after EffectsEnabled. + /// + /// This mirrors how Sortition subscribes to state-building events + /// (CiphernodeAdded, E3Failed, etc.) immediately, while gating + /// E3Requested behind EffectsEnabled. The immediate subscriptions + /// must work during EventStore replay (before EffectsEnabled). + #[actix::test] + async fn immediate_subscriptions_receive_before_effects_enabled() -> anyhow::Result<()> { + use std::sync::{ + atomic::{AtomicU32, Ordering}, + Arc, + }; + + let system = EventSystem::new().with_fresh_bus(); + let bus = system.handle()?.enable("test-immediate-sub"); + + let immediate_count = Arc::new(AtomicU32::new(0)); + let gated_count = Arc::new(AtomicU32::new(0)); + + // Helper actor that counts TestEvents + use actix::{Actor, Context, Handler}; + + struct Counter(Arc); + impl Actor for Counter { + type Context = Context; + } + impl Handler for Counter { + type Result = (); + fn handle(&mut self, msg: EnclaveEvent, _: &mut Self::Context) -> Self::Result { + if matches!(msg.get_data(), EnclaveEventData::TestEvent(_)) { + self.0.fetch_add(1, Ordering::SeqCst); + } + } + } + + // Immediate subscription — receives all events, including before EffectsEnabled + let immediate_actor = Counter(immediate_count.clone()).start(); + bus.subscribe(EventType::TestEvent, immediate_actor.recipient()); + + // Gated subscription — only receives after EffectsEnabled + let gated_counter = gated_count.clone(); + let runner = e3_events::run_once::({ + let bus = bus.clone(); + move |_| { + let addr = Counter(gated_counter).start(); + bus.subscribe(EventType::TestEvent, addr.recipient()); + Ok(()) + } + }); + bus.subscribe(EventType::EffectsEnabled, runner.recipient()); + + // 1. Publish event BEFORE EffectsEnabled + bus.event_bus().try_send( + EnclaveEvent::::test_event("during-replay") + .id(1) + .seq(1) + .build(), + )?; + + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + assert_eq!( + immediate_count.load(Ordering::SeqCst), + 1, + "Immediate subscription should receive events before EffectsEnabled" + ); + assert_eq!( + gated_count.load(Ordering::SeqCst), + 0, + "Gated subscription should NOT receive events before EffectsEnabled" + ); + + // 2. Publish EffectsEnabled + bus.publish_without_context(EffectsEnabled::new())?; + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + // 3. Publish event AFTER EffectsEnabled + bus.event_bus().try_send( + EnclaveEvent::::test_event("after-effects") + .id(2) + .seq(2) + .build(), + )?; + + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + assert_eq!( + immediate_count.load(Ordering::SeqCst), + 2, + "Immediate subscription should receive events after EffectsEnabled too" + ); + assert_eq!( + gated_count.load(Ordering::SeqCst), + 1, + "Gated subscription should receive events after EffectsEnabled" + ); + + Ok(()) + } } From 5c4f70a74ebe79b5d69a8a01438a1361423abb36 Mon Sep 17 00:00:00 2001 From: Hamza Khalid Date: Wed, 18 Mar 2026 11:42:51 +0500 Subject: [PATCH 2/7] fix: add loopback check and event gating --- crates/multithread/src/multithread.rs | 24 ++++- crates/net/src/net_interface.rs | 125 ++++++++++++++++++++------ 2 files changed, 119 insertions(+), 30 deletions(-) diff --git a/crates/multithread/src/multithread.rs b/crates/multithread/src/multithread.rs index 2b87019c00..a29f26f893 100644 --- a/crates/multithread/src/multithread.rs +++ b/crates/multithread/src/multithread.rs @@ -167,7 +167,29 @@ impl Multithread { let actor = Self::new(bus.clone(), rng.clone(), cipher.clone(), task_pool, report) .with_zk_prover(zk_prover); let addr = actor.start(); - bus.subscribe(EventType::ComputeRequest, addr.clone().recipient()); + bus.subscribe_all( + &[ + EventType::E3Failed, + EventType::E3StageChanged, + EventType::E3RequestComplete, + ], + addr.clone().into(), + ); + + bus.subscribe( + EventType::EffectsEnabled, + run_once::({ + let bus = bus.clone(); + let addr = addr.clone(); + move |_| { + bus.subscribe(EventType::ComputeRequest, addr.clone().recipient()); + info!("Multithread actor with ZK listening for events."); + Ok(()) + } + }) + .recipient(), + ); + addr } diff --git a/crates/net/src/net_interface.rs b/crates/net/src/net_interface.rs index 38990f8dd0..afbdfd6fc0 100644 --- a/crates/net/src/net_interface.rs +++ b/crates/net/src/net_interface.rs @@ -10,6 +10,14 @@ use crate::{ events::{IncomingResponse, OutgoingRequest, ProtocolResponse}, net_interface_handle::NetInterfaceHandle, }; +use crate::{ + dialer::dial_peers, + events::{ + GossipData, IncomingRequest, NetCommand, NetEvent, OutgoingRequestFailed, + OutgoingRequestSucceeded, PeerTarget, PutOrStoreError, + }, + ContentHash, +}; use anyhow::{bail, Context, Result}; use e3_events::CorrelationId; use e3_utils::ArcBytes; @@ -25,12 +33,13 @@ use libp2p::{ Behaviour as KademliaBehaviour, Config as KademliaConfig, GetRecordOk, QueryResult, Quorum, Record, RecordKey, }, + multiaddr::Protocol, request_response::{ self, cbor, Event as RequestResponseEvent, Message as RequestResponseMessage, ProtocolSupport, }, swarm::{dial_opts::DialOpts, DialError, NetworkBehaviour, SwarmEvent}, - PeerId, StreamProtocol, Swarm, + Multiaddr, PeerId, StreamProtocol, Swarm, }; use rand::prelude::IteratorRandom; use std::{ @@ -49,19 +58,22 @@ const PROTOCOL_NAME: StreamProtocol = StreamProtocol::new("/enclave/kad/1.0.0"); const MAX_KADEMLIA_PAYLOAD_MB: usize = 10; const DHT_MAX_RECORDS: usize = 4096; const MAX_GOSSIP_MSG_SIZE_KB: usize = 700; -/// Number of consecutive dial failures before a peer is evicted from the routing table. -/// Set high enough to survive transient network issues during DKG, but low enough to -/// eventually clean up genuinely unreachable peers. -const MAX_CONSECUTIVE_DIAL_FAILURES: u32 = 5; - -use crate::{ - dialer::dial_peers, - events::{ - GossipData, IncomingRequest, NetCommand, NetEvent, OutgoingRequestFailed, - OutgoingRequestSucceeded, PeerTarget, PutOrStoreError, - }, - ContentHash, -}; +const MAX_CONSECUTIVE_DIAL_FAILURES: u32 = 3; +const EVENT_CHANNEL_SIZE: usize = 1000; +const CMD_CHANNEL_SIZE: usize = 1000; +const PEER_FAILURE_TTL: Duration = Duration::from_secs(300); + +/// Returns true if the multiaddr contains a loopback IP (127.0.0.0/8 or ::1). +/// Loopback addresses are only meaningful on the local machine and must not be +/// added to the Kademlia routing table, otherwise they get propagated to remote +/// peers via FIND_NODE responses, causing those peers to dial themselves. +fn is_loopback_addr(addr: &Multiaddr) -> bool { + addr.iter().any(|p| match p { + Protocol::Ip4(ip) => ip.is_loopback(), + Protocol::Ip6(ip) => ip.is_loopback(), + _ => false, + }) +} #[derive(NetworkBehaviour)] pub struct NodeBehaviour { @@ -99,8 +111,8 @@ impl Libp2pNetInterface { udp_port: Option, topic: &str, ) -> Result { - let (event_tx, _) = broadcast::channel(1000); // TODO : tune this param - let (cmd_tx, cmd_rx) = mpsc::channel(1000); // TODO : tune this param + let (event_tx, _) = broadcast::channel(EVENT_CHANNEL_SIZE); + let (cmd_tx, cmd_rx) = mpsc::channel(CMD_CHANNEL_SIZE); let swarm = libp2p::SwarmBuilder::with_existing_identity(id.into_keypair()) .with_tokio() @@ -183,6 +195,10 @@ impl Libp2pNetInterface { Ok(_) => (), Err(e) => error!("Error processing NetEvent: {e}") } + let queued = event_tx.len(); + if queued > EVENT_CHANNEL_SIZE * 3 / 4 { + warn!("Event broadcast channel backpressure: {queued}/{EVENT_CHANNEL_SIZE} queued"); + } } } @@ -292,17 +308,19 @@ async fn process_swarm_event( num_established, .. } => { - // Only log on first connection to this peer to avoid spam - if num_established.get() == 1 { - info!("Connected to {peer_id}"); - } // Reset failure count on successful connection peer_failures.reset(&peer_id); + if num_established.get() == 1 { + let total = swarm.connected_peers().count(); + info!("Peer connected: {peer_id} (total: {total})"); + } let remote_addr = endpoint.get_remote_address().clone(); - swarm - .behaviour_mut() - .kademlia - .add_address(&peer_id, remote_addr.clone()); + if !is_loopback_addr(&remote_addr) { + swarm + .behaviour_mut() + .kademlia + .add_address(&peer_id, remote_addr.clone()); + } // Trigger Kademlia bootstrap to discover peers beyond direct connections if num_established.get() == 1 { @@ -554,8 +572,47 @@ async fn process_swarm_event( debug!("Response sent to peer={}, id={}", peer, request_id); } + SwarmEvent::Behaviour(NodeBehaviourEvent::Identify( + libp2p::identify::Event::Received { peer_id, info, .. }, + )) => { + debug!("Identify received from {peer_id}: {:?}", info.observed_addr); + for addr in &info.listen_addrs { + if !is_loopback_addr(addr) { + swarm + .behaviour_mut() + .kademlia + .add_address(&peer_id, addr.clone()); + } + } + if !is_loopback_addr(&info.observed_addr) { + swarm.add_external_address(info.observed_addr); + } + } + + SwarmEvent::ConnectionClosed { + peer_id, + num_established, + cause, + .. + } => { + if num_established == 0 { + let total = swarm.connected_peers().count(); + info!("Peer disconnected: {peer_id} (total: {total}, cause: {cause:?})"); + } + } + + SwarmEvent::ListenerClosed { + addresses, reason, .. + } => { + warn!("Listener closed on {addresses:?}: {reason:?}"); + } + + SwarmEvent::ListenerError { error, .. } => { + error!("Listener error: {error}"); + } + unknown => { - trace!("Unknown event: {:?}", unknown); + debug!("Unhandled swarm event: {:?}", unknown); } }; Ok(()) @@ -867,8 +924,9 @@ fn handle_response(swarm: &mut Swarm, responder: DirectResponder) } /// Tracks consecutive connection failures per peer to detect and evict stale peers. +/// Entries are automatically cleaned up after PEER_FAILURE_TTL to prevent unbounded growth. struct PeerFailureTracker { - failures: HashMap, + failures: HashMap, } impl PeerFailureTracker { @@ -880,15 +938,24 @@ impl PeerFailureTracker { /// Record a failure for the given peer and return the new consecutive failure count. fn record_failure(&mut self, peer_id: &PeerId) -> u32 { - let count = self.failures.entry(*peer_id).or_insert(0); - *count += 1; - *count + self.cleanup_stale(); + let entry = self.failures.entry(*peer_id).or_insert((0, Instant::now())); + entry.0 += 1; + entry.1 = Instant::now(); + entry.0 } /// Reset the failure count for a peer (e.g. on successful connection or after eviction). fn reset(&mut self, peer_id: &PeerId) { self.failures.remove(peer_id); } + + /// Remove entries older than PEER_FAILURE_TTL to prevent unbounded growth + fn cleanup_stale(&mut self) { + let now = Instant::now(); + self.failures + .retain(|_, (_, last_seen)| now.duration_since(*last_seen) < PEER_FAILURE_TTL); + } } #[cfg(test)] From 28d47a3df294350abf798896b40ffc2e03193fef Mon Sep 17 00:00:00 2001 From: Hamza Khalid Date: Wed, 18 Mar 2026 11:45:03 +0500 Subject: [PATCH 3/7] fix: increase dial failures --- crates/net/src/net_interface.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/net/src/net_interface.rs b/crates/net/src/net_interface.rs index afbdfd6fc0..24ac1f5813 100644 --- a/crates/net/src/net_interface.rs +++ b/crates/net/src/net_interface.rs @@ -58,7 +58,7 @@ const PROTOCOL_NAME: StreamProtocol = StreamProtocol::new("/enclave/kad/1.0.0"); const MAX_KADEMLIA_PAYLOAD_MB: usize = 10; const DHT_MAX_RECORDS: usize = 4096; const MAX_GOSSIP_MSG_SIZE_KB: usize = 700; -const MAX_CONSECUTIVE_DIAL_FAILURES: u32 = 3; +const MAX_CONSECUTIVE_DIAL_FAILURES: u32 = 5; const EVENT_CHANNEL_SIZE: usize = 1000; const CMD_CHANNEL_SIZE: usize = 1000; const PEER_FAILURE_TTL: Duration = Duration::from_secs(300); From be2b69d2f306de20897f5d220f0498f6e1271d53 Mon Sep 17 00:00:00 2001 From: Hamza Khalid Date: Wed, 18 Mar 2026 12:01:40 +0500 Subject: [PATCH 4/7] fix: remove dup instants --- crates/net/src/net_interface.rs | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/crates/net/src/net_interface.rs b/crates/net/src/net_interface.rs index 24ac1f5813..65e32b98d1 100644 --- a/crates/net/src/net_interface.rs +++ b/crates/net/src/net_interface.rs @@ -146,6 +146,8 @@ impl Libp2pNetInterface { let cmd_rx = &mut self.cmd_rx; let mut correlator = Correlator::new(); let mut peer_failures = PeerFailureTracker::new(); + // This is to make sure we dont spam warnings in the logs + let mut last_backpressure_warn = Instant::now(); // Subscribe to topic self.swarm @@ -196,8 +198,11 @@ impl Libp2pNetInterface { Err(e) => error!("Error processing NetEvent: {e}") } let queued = event_tx.len(); - if queued > EVENT_CHANNEL_SIZE * 3 / 4 { + if queued > EVENT_CHANNEL_SIZE * 3 / 4 + && last_backpressure_warn.elapsed() > Duration::from_secs(10) + { warn!("Event broadcast channel backpressure: {queued}/{EVENT_CHANNEL_SIZE} queued"); + last_backpressure_warn = Instant::now(); } } @@ -939,9 +944,10 @@ impl PeerFailureTracker { /// Record a failure for the given peer and return the new consecutive failure count. fn record_failure(&mut self, peer_id: &PeerId) -> u32 { self.cleanup_stale(); - let entry = self.failures.entry(*peer_id).or_insert((0, Instant::now())); + let now = Instant::now(); + let entry = self.failures.entry(*peer_id).or_insert((0, now)); entry.0 += 1; - entry.1 = Instant::now(); + entry.1 = now; entry.0 } From 62d290373363011b69d92e2c46c72f63ba94460f Mon Sep 17 00:00:00 2001 From: ctrlc03 <93448202+ctrlc03@users.noreply.github.com> Date: Wed, 18 Mar 2026 08:57:01 +0000 Subject: [PATCH 5/7] fix: guard loopback addresses in Kademlia and make net sync resilient to restart --- crates/net/src/net_interface.rs | 38 ++++++++++++++++++---- crates/net/src/net_sync_manager.rs | 51 +++++++++++++++++++++++++++--- 2 files changed, 78 insertions(+), 11 deletions(-) diff --git a/crates/net/src/net_interface.rs b/crates/net/src/net_interface.rs index 65e32b98d1..0e92e5c184 100644 --- a/crates/net/src/net_interface.rs +++ b/crates/net/src/net_interface.rs @@ -75,6 +75,26 @@ fn is_loopback_addr(addr: &Multiaddr) -> bool { }) } +/// Returns true only when we should filter loopback addresses from Kademlia. +/// This is the case when the node has at least one non-loopback listener, +/// meaning it's in a production-like environment where propagating loopback +/// addresses to remote peers would cause them to dial themselves. +/// In localhost test environments (all listeners on 127.0.0.1) we allow +/// loopback so that peers can discover each other. +fn should_filter_loopback(swarm: &Swarm) -> bool { + swarm + .listeners() + .any(|addr| !is_loopback_addr(addr) && !is_unspecified_addr(addr)) +} + +fn is_unspecified_addr(addr: &Multiaddr) -> bool { + addr.iter().any(|p| match p { + Protocol::Ip4(ip) => ip.is_unspecified(), + Protocol::Ip6(ip) => ip.is_unspecified(), + _ => false, + }) +} + #[derive(NetworkBehaviour)] pub struct NodeBehaviour { gossipsub: gossipsub::Behaviour, @@ -320,7 +340,7 @@ async fn process_swarm_event( info!("Peer connected: {peer_id} (total: {total})"); } let remote_addr = endpoint.get_remote_address().clone(); - if !is_loopback_addr(&remote_addr) { + if !(should_filter_loopback(swarm) && is_loopback_addr(&remote_addr)) { swarm .behaviour_mut() .kademlia @@ -355,11 +375,16 @@ async fn process_swarm_event( "Peer ID mismatch at {remote_addr}: expected {failed_peer}, got {obtained} — \ replacing stale routing entry" ); + let local_peer = *swarm.local_peer_id(); swarm.behaviour_mut().kademlia.remove_peer(failed_peer); - swarm - .behaviour_mut() - .kademlia - .add_address(&obtained, remote_addr); + if obtained != local_peer + && !(should_filter_loopback(swarm) && is_loopback_addr(&remote_addr)) + { + swarm + .behaviour_mut() + .kademlia + .add_address(&obtained, remote_addr); + } peer_failures.reset(failed_peer); // Trigger Kademlia bootstrap to discover peers beyond direct connections. @@ -581,8 +606,9 @@ async fn process_swarm_event( libp2p::identify::Event::Received { peer_id, info, .. }, )) => { debug!("Identify received from {peer_id}: {:?}", info.observed_addr); + let filter = should_filter_loopback(swarm); for addr in &info.listen_addrs { - if !is_loopback_addr(addr) { + if !(filter && is_loopback_addr(addr)) { swarm .behaviour_mut() .kademlia diff --git a/crates/net/src/net_sync_manager.rs b/crates/net/src/net_sync_manager.rs index 483b13507e..9d9a810daa 100644 --- a/crates/net/src/net_sync_manager.rs +++ b/crates/net/src/net_sync_manager.rs @@ -16,7 +16,7 @@ use e3_utils::MAILBOX_LIMIT; use serde::{Deserialize, Serialize}; use std::{collections::HashMap, convert::TryInto, sync::Arc, time::Duration}; use tokio::sync::{broadcast, mpsc}; -use tracing::{debug, info}; +use tracing::{debug, info, warn}; use crate::{ direct_requester::DirectRequester, @@ -293,15 +293,56 @@ async fn handle_sync_request_event( let mut all_events: Vec> = Vec::new(); let mut latest_timestamp: u128 = 0; + // Retry net sync with delays long enough to survive QUIC reconnection after a + // hard restart. When a node restarts, peers may still hold stale QUIC connections + // (~10s idle timeout). We retry so Kademlia has time to re-establish connections. + const NET_SYNC_RETRIES: u32 = 3; + const NET_SYNC_RETRY_DELAY: Duration = Duration::from_secs(5); + for (aggregate_id, since) in event.since.iter() { info!( "Requesting batched events for aggregate_id={} since={}", aggregate_id, since ); - let requester = DirectRequester::builder(net_cmds.clone(), net_events.clone()).build(); - let events: Vec> = - fetch_all_batched_events(requester, PeerTarget::Random, *aggregate_id, *since, 100) - .await?; + + let mut events: Vec> = Vec::new(); + let mut last_err = None; + + for attempt in 1..=NET_SYNC_RETRIES { + let requester = DirectRequester::builder(net_cmds.clone(), net_events.clone()).build(); + match fetch_all_batched_events( + requester, + PeerTarget::Random, + *aggregate_id, + *since, + 100, + ) + .await + { + Ok(fetched) => { + events = fetched; + last_err = None; + break; + } + Err(e) => { + warn!( + "Net sync attempt {attempt}/{NET_SYNC_RETRIES} failed for \ + aggregate_id={aggregate_id}: {e}" + ); + last_err = Some(e); + if attempt < NET_SYNC_RETRIES { + tokio::time::sleep(NET_SYNC_RETRY_DELAY).await; + } + } + } + } + + if let Some(e) = last_err { + warn!( + "All net sync attempts failed for aggregate_id={aggregate_id}: {e}. \ + Proceeding without net events for this aggregate." + ); + } info!( "Received {} events for aggregate_id={}", From e02611528a2788a1a59a11c1e689b6623cc0d13e Mon Sep 17 00:00:00 2001 From: Hamza Khalid Date: Wed, 18 Mar 2026 14:25:37 +0500 Subject: [PATCH 6/7] fix: check connection exists before ready --- crates/net/src/dialer.rs | 8 ++- crates/net/src/events.rs | 12 ++-- crates/net/src/net_interface.rs | 14 +++-- crates/net/src/net_interface_handle.rs | 5 +- crates/net/src/net_sync_manager.rs | 83 ++++++++++++++++++++++---- 5 files changed, 98 insertions(+), 24 deletions(-) diff --git a/crates/net/src/dialer.rs b/crates/net/src/dialer.rs index d28ae911ec..eb8cb38aa0 100644 --- a/crates/net/src/dialer.rs +++ b/crates/net/src/dialer.rs @@ -53,18 +53,22 @@ fn trace_error(r: Result<()>) { /// * `cmd_tx` - Sender for network peer commands /// * `event_tx` - Broadcast sender for peer events /// * `peers` - List of peer addresses to connect to +/// +/// # Returns +/// The number of peers that were successfully connected to. pub async fn dial_peers( cmd_tx: &mpsc::Sender, event_tx: &broadcast::Sender, peers: &Vec, -) -> Result<()> { +) -> Result { let futures: Vec<_> = peers .iter() .map(|addr| dial_multiaddr(cmd_tx, event_tx, addr)) .collect(); let results = join_all(futures).await; + let connected = results.iter().filter(|r| r.is_ok()).count(); results.into_iter().for_each(trace_error); - Ok(()) + Ok(connected) } /// Attempt a connection with retries to a multiaddr. diff --git a/crates/net/src/events.rs b/crates/net/src/events.rs index 4083c2b0d4..3a41ed2510 100644 --- a/crates/net/src/events.rs +++ b/crates/net/src/events.rs @@ -148,10 +148,6 @@ pub struct OutgoingRequestFailed { pub error: String, } -#[derive(Message, Debug, Clone)] -#[rtype("()")] -pub struct AllPeersDialed; - /// Libp2pNetInterface Commands are sent to the network peer over a mspc channel #[derive(Debug, Clone)] pub enum NetCommand { @@ -259,7 +255,13 @@ pub enum NetEvent { /// Received response from a peer in response to an outgoing request OutgoingRequestSucceeded(OutgoingRequestSucceeded), OutgoingRequestFailed(OutgoingRequestFailed), - AllPeersDialed, + /// All configured peers have been dialed (not all necessarily connected). + AllPeersDialed { + /// Number of peers that successfully connected. + connected: usize, + /// Total number of peers that were dialed. + total: usize, + }, } #[derive(Clone, Debug)] diff --git a/crates/net/src/net_interface.rs b/crates/net/src/net_interface.rs index 0e92e5c184..90a85e4f3f 100644 --- a/crates/net/src/net_interface.rs +++ b/crates/net/src/net_interface.rs @@ -190,8 +190,9 @@ impl Libp2pNetInterface { let cmd_tx = cmd_tx.clone(); let peers = self.peers.clone(); async move { - dial_peers(&cmd_tx, &event_tx, &peers).await?; - event_tx.send(NetEvent::AllPeersDialed)?; + let total = peers.len(); + let connected = dial_peers(&cmd_tx, &event_tx, &peers).await?; + event_tx.send(NetEvent::AllPeersDialed { connected, total })?; return anyhow::Ok(()); } }); @@ -420,11 +421,14 @@ async fn process_swarm_event( SwarmEvent::IncomingConnectionError { error, .. } => { let error_str = format!("{:#}", anyhow::Error::from(error)); - // Downgrade self dial attempts to debug - if error_str.contains("Local peer ID") { + // Downgrade benign handshake failures to debug: + // - "Local peer ID": self-dial attempt + // - "aborted by peer": simultaneous connection dedup (both sides dialed, + // libp2p keeps one connection and the other side aborts the handshake) + if error_str.contains("Local peer ID") || error_str.contains("aborted by peer") { debug!("{}", error_str); } else { - warn!("{}", error_str); + warn!("Incoming connection error: {}", error_str); } } diff --git a/crates/net/src/net_interface_handle.rs b/crates/net/src/net_interface_handle.rs index f90b44ea4c..6acda2f293 100644 --- a/crates/net/src/net_interface_handle.rs +++ b/crates/net/src/net_interface_handle.rs @@ -73,7 +73,10 @@ pub fn create_channel_bridge() -> (NetInterfaceHandle, NetChannelBridge) { tokio::spawn(async move { let _rx_guard = keep_alive; sleep(Duration::from_millis(100)).await; - let _ = startup_event_tx.send(NetEvent::AllPeersDialed); + let _ = startup_event_tx.send(NetEvent::AllPeersDialed { + connected: 0, + total: 0, + }); while let Some(cmd) = m_cmd_rx.recv().await { let _ = tx.send(cmd); } diff --git a/crates/net/src/net_sync_manager.rs b/crates/net/src/net_sync_manager.rs index 9d9a810daa..e403b51ceb 100644 --- a/crates/net/src/net_sync_manager.rs +++ b/crates/net/src/net_sync_manager.rs @@ -16,7 +16,7 @@ use e3_utils::MAILBOX_LIMIT; use serde::{Deserialize, Serialize}; use std::{collections::HashMap, convert::TryInto, sync::Arc, time::Duration}; use tokio::sync::{broadcast, mpsc}; -use tracing::{debug, info, warn}; +use tracing::{debug, error, info, warn}; use crate::{ direct_requester::DirectRequester, @@ -61,7 +61,9 @@ pub struct NetSyncManager { rx: Arc>, eventstore: Recipient>, requests: HashMap, - peers_ready: bool, + all_peers_dialed: bool, + has_connections: bool, + net_ready_published: bool, } impl NetSyncManager { @@ -77,10 +79,21 @@ impl NetSyncManager { rx: Arc::clone(rx), eventstore, requests: HashMap::new(), - peers_ready: false, + all_peers_dialed: false, + has_connections: false, + net_ready_published: false, } } + fn publish_net_ready(&mut self) -> Result<()> { + if !self.net_ready_published { + self.net_ready_published = true; + info!("NetSyncManager: publishing NetReady"); + self.bus.publish_without_context(NetReady::new())?; + } + Ok(()) + } + pub fn setup( bus: &BusHandle, tx: &mpsc::Sender, @@ -102,7 +115,10 @@ impl NetSyncManager { match event { // Someone is asking for our sync NetEvent::IncomingRequest(value) => addr.do_send(value), - NetEvent::AllPeersDialed => addr.do_send(AllPeersDialed), + NetEvent::AllPeersDialed { connected, total } => { + addr.do_send(AllPeersDialed { connected, total }) + } + NetEvent::ConnectionEstablished { .. } => addr.do_send(PeerConnected), _ => (), } } @@ -150,7 +166,7 @@ impl Handler> for NetSyncManager { self.rx.clone(), msg, ctx.address(), - !self.peers_ready, + !self.all_peers_dialed, ), ) } @@ -249,19 +265,64 @@ impl Handler for NetSyncManager { impl Handler for NetSyncManager { type Result = (); - fn handle(&mut self, _: AllPeersDialed, _: &mut Self::Context) -> Self::Result { + fn handle(&mut self, msg: AllPeersDialed, ctx: &mut Self::Context) -> Self::Result { trap(EType::Sync, &self.bus.clone(), || { - info!("NetSyncManager: AllPeersDialed"); - self.peers_ready = true; - self.bus.publish_without_context(NetReady::new())?; + info!( + "NetSyncManager: AllPeersDialed (connected={}, total={})", + msg.connected, msg.total + ); + self.all_peers_dialed = true; + if msg.connected > 0 { + self.has_connections = true; + } + if msg.total == 0 || self.has_connections { + // No peers configured or connections already established + self.publish_net_ready()?; + } else { + // All dials failed — wait for a ConnectionEstablished event. + // Fall back to a 60-second timeout so we don't hang forever. + info!( + "All peer dials failed, waiting for connections before publishing NetReady..." + ); + let bus = self.bus.clone(); + ctx.run_later(Duration::from_secs(60), move |this, _| { + if !this.net_ready_published { + warn!("No peer connections established within 60s timeout, publishing NetReady anyway"); + this.net_ready_published = true; + if let Err(e) = bus.publish_without_context(NetReady::new()) { + error!("Failed to publish NetReady: {e}"); + } + } + }); + } Ok(()) }) } } +impl Handler for NetSyncManager { + type Result = (); + fn handle(&mut self, _: PeerConnected, _: &mut Self::Context) -> Self::Result { + if !self.has_connections { + info!("NetSyncManager: first peer connected"); + self.has_connections = true; + if self.all_peers_dialed { + trap(EType::Sync, &self.bus.clone(), || self.publish_net_ready()); + } + } + } +} + +#[derive(Message)] +#[rtype(result = "()")] +struct AllPeersDialed { + connected: usize, + total: usize, +} + #[derive(Message)] #[rtype(result = "()")] -struct AllPeersDialed; +struct PeerConnected; async fn handle_sync_request_event( net_cmds: mpsc::Sender, @@ -277,7 +338,7 @@ async fn handle_sync_request_event( await_event( &net_events, |e| { - if matches!(e, &NetEvent::AllPeersDialed) { + if matches!(e, &NetEvent::AllPeersDialed { .. }) { info!("AllPeersDialed matched!"); Some(e.clone()) } else { From c4f7c63cc32890939929e0bec0d52f2e03249d62 Mon Sep 17 00:00:00 2001 From: Hamza Khalid Date: Wed, 18 Mar 2026 14:30:08 +0500 Subject: [PATCH 7/7] fix: revert retry loop --- crates/net/src/dialer.rs | 8 +++- crates/net/src/net_sync_manager.rs | 60 +++++++----------------------- 2 files changed, 19 insertions(+), 49 deletions(-) diff --git a/crates/net/src/dialer.rs b/crates/net/src/dialer.rs index eb8cb38aa0..8f02580ad4 100644 --- a/crates/net/src/dialer.rs +++ b/crates/net/src/dialer.rs @@ -24,6 +24,10 @@ use e3_utils::{retry_with_backoff, to_retry, OnceTake, RetryError}; const DIAL_DELAY: u64 = 3000; const DIAL_RETRIES: u32 = 10; +/// Per-connection timeout: if no swarm events arrive within this window during +/// a single dial attempt, we treat it as timed out and retry. +const DIAL_EVENT_TIMEOUT: Duration = Duration::from_secs(60); + /// Dial a single Multiaddr with retries and return an error should those retries not work async fn dial_multiaddr( cmd_tx: &mpsc::Sender, @@ -144,8 +148,8 @@ async fn wait_for_connection( _ => (), } } - _ = sleep(Duration::from_secs(60)) => { - warn!("Connection attempt timed out after 60 seconds of no events"); + _ = sleep(DIAL_EVENT_TIMEOUT) => { + warn!("Connection attempt timed out after {:?} of no events", DIAL_EVENT_TIMEOUT); return Err(RetryError::Retry(std::io::Error::new( std::io::ErrorKind::TimedOut, "Connection attempt timed out", diff --git a/crates/net/src/net_sync_manager.rs b/crates/net/src/net_sync_manager.rs index e403b51ceb..c6b99c8c50 100644 --- a/crates/net/src/net_sync_manager.rs +++ b/crates/net/src/net_sync_manager.rs @@ -25,6 +25,13 @@ use crate::{ net_event_batch::{fetch_all_batched_events, BatchCursor, EventBatch, FetchEventsSince}, }; +/// Maximum time to wait for a `ConnectionEstablished` event after all dials +/// failed before publishing `NetReady` anyway. +const NET_READY_CONNECT_TIMEOUT: Duration = Duration::from_secs(60); + +/// Maximum time to wait for the `AllPeersDialed` event before giving up. +const ALL_PEERS_DIALED_TIMEOUT: Duration = Duration::from_secs(30); + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct SyncResponseValue { pub events: Vec>, @@ -285,7 +292,7 @@ impl Handler for NetSyncManager { "All peer dials failed, waiting for connections before publishing NetReady..." ); let bus = self.bus.clone(); - ctx.run_later(Duration::from_secs(60), move |this, _| { + ctx.run_later(NET_READY_CONNECT_TIMEOUT, move |this, _| { if !this.net_ready_published { warn!("No peer connections established within 60s timeout, publishing NetReady anyway"); this.net_ready_published = true; @@ -345,7 +352,7 @@ async fn handle_sync_request_event( None } }, - Duration::from_secs(30), + ALL_PEERS_DIALED_TIMEOUT, ) .await?; } @@ -354,56 +361,15 @@ async fn handle_sync_request_event( let mut all_events: Vec> = Vec::new(); let mut latest_timestamp: u128 = 0; - // Retry net sync with delays long enough to survive QUIC reconnection after a - // hard restart. When a node restarts, peers may still hold stale QUIC connections - // (~10s idle timeout). We retry so Kademlia has time to re-establish connections. - const NET_SYNC_RETRIES: u32 = 3; - const NET_SYNC_RETRY_DELAY: Duration = Duration::from_secs(5); - for (aggregate_id, since) in event.since.iter() { info!( "Requesting batched events for aggregate_id={} since={}", aggregate_id, since ); - - let mut events: Vec> = Vec::new(); - let mut last_err = None; - - for attempt in 1..=NET_SYNC_RETRIES { - let requester = DirectRequester::builder(net_cmds.clone(), net_events.clone()).build(); - match fetch_all_batched_events( - requester, - PeerTarget::Random, - *aggregate_id, - *since, - 100, - ) - .await - { - Ok(fetched) => { - events = fetched; - last_err = None; - break; - } - Err(e) => { - warn!( - "Net sync attempt {attempt}/{NET_SYNC_RETRIES} failed for \ - aggregate_id={aggregate_id}: {e}" - ); - last_err = Some(e); - if attempt < NET_SYNC_RETRIES { - tokio::time::sleep(NET_SYNC_RETRY_DELAY).await; - } - } - } - } - - if let Some(e) = last_err { - warn!( - "All net sync attempts failed for aggregate_id={aggregate_id}: {e}. \ - Proceeding without net events for this aggregate." - ); - } + let requester = DirectRequester::builder(net_cmds.clone(), net_events.clone()).build(); + let events: Vec> = + fetch_all_batched_events(requester, PeerTarget::Random, *aggregate_id, *since, 100) + .await?; info!( "Received {} events for aggregate_id={}",