diff --git a/crates/aggregator/src/committee_finalizer.rs b/crates/aggregator/src/committee_finalizer.rs index dcfe621f44..cbbabc5255 100644 --- a/crates/aggregator/src/committee_finalizer.rs +++ b/crates/aggregator/src/committee_finalizer.rs @@ -6,8 +6,9 @@ use actix::prelude::*; use e3_events::{ - prelude::*, trap, BusHandle, CommitteeFinalizeRequested, CommitteeRequested, E3Failed, E3Stage, - E3StageChanged, EType, EnclaveEvent, EnclaveEventData, EventType, Shutdown, TypedEvent, + prelude::*, run_once, trap, BusHandle, CommitteeFinalizeRequested, CommitteeRequested, + E3Failed, E3Stage, E3StageChanged, EType, EffectsEnabled, EnclaveEvent, EnclaveEventData, + EventType, Shutdown, TypedEvent, }; use e3_utils::{NotifySync, MAILBOX_LIMIT}; use std::collections::HashMap; @@ -32,9 +33,9 @@ impl CommitteeFinalizer { pub fn attach(bus: &BusHandle) -> Addr { let addr = CommitteeFinalizer::new(bus).start(); + // Subscribe to state-building / cleanup events immediately bus.subscribe_all( &[ - EventType::CommitteeRequested, EventType::Shutdown, EventType::E3Failed, EventType::E3StageChanged, @@ -42,6 +43,21 @@ impl CommitteeFinalizer { addr.clone().recipient(), ); + // Gate CommitteeRequested behind EffectsEnabled — finalization should not + // be scheduled during historical event replay. + bus.subscribe( + EventType::EffectsEnabled, + run_once::({ + let bus = bus.clone(); + let addr = addr.clone(); + move |_| { + bus.subscribe(EventType::CommitteeRequested, addr.into()); + Ok(()) + } + }) + .recipient(), + ); + addr } } diff --git a/crates/evm/src/enclave_sol_writer.rs b/crates/evm/src/enclave_sol_writer.rs index c471c41630..16fc786fc7 100644 --- a/crates/evm/src/enclave_sol_writer.rs +++ b/crates/evm/src/enclave_sol_writer.rs @@ -24,8 +24,8 @@ use e3_events::EventType; use e3_events::Shutdown; use e3_events::{prelude::*, EffectsEnabled}; use e3_events::{run_once, EnclaveEvent}; -use e3_events::{E3Stage, E3StageChanged}; -use e3_events::{E3id, EType, PlaintextAggregated}; +use e3_events::{E3Stage, E3StageChanged, E3id}; +use e3_events::{EType, PlaintextAggregated}; use e3_utils::NotifySync; use e3_utils::MAILBOX_LIMIT; use tracing::info; diff --git a/crates/multithread/src/multithread.rs b/crates/multithread/src/multithread.rs index a40104fd76..a29f26f893 100644 --- a/crates/multithread/src/multithread.rs +++ b/crates/multithread/src/multithread.rs @@ -136,6 +136,8 @@ impl Multithread { ) -> Addr { let addr = Self::new(bus.clone(), rng.clone(), cipher.clone(), task_pool, report).start(); + // Gate ComputeRequest behind EffectsEnabled — proof generation should + // not trigger during historical event replay. bus.subscribe( EventType::EffectsEnabled, run_once::({ @@ -165,7 +167,29 @@ impl Multithread { let actor = Self::new(bus.clone(), rng.clone(), cipher.clone(), task_pool, report) .with_zk_prover(zk_prover); let addr = actor.start(); - bus.subscribe(EventType::ComputeRequest, addr.clone().recipient()); + bus.subscribe_all( + &[ + EventType::E3Failed, + EventType::E3StageChanged, + EventType::E3RequestComplete, + ], + addr.clone().into(), + ); + + bus.subscribe( + EventType::EffectsEnabled, + run_once::({ + let bus = bus.clone(); + let addr = addr.clone(); + move |_| { + bus.subscribe(EventType::ComputeRequest, addr.clone().recipient()); + info!("Multithread actor with ZK listening for events."); + Ok(()) + } + }) + .recipient(), + ); + addr } @@ -184,7 +208,6 @@ impl Actor for Multithread { impl Handler for Multithread { type Result = (); fn handle(&mut self, msg: EnclaveEvent, ctx: &mut Self::Context) -> Self::Result { - info!("Multithread received EnclaveEvent!"); let (data, ec) = msg.into_components(); match data { EnclaveEventData::ComputeRequest(data) => ctx.notify(TypedEvent::new(data, ec)), diff --git a/crates/net/src/dialer.rs b/crates/net/src/dialer.rs index d28ae911ec..8f02580ad4 100644 --- a/crates/net/src/dialer.rs +++ b/crates/net/src/dialer.rs @@ -24,6 +24,10 @@ use e3_utils::{retry_with_backoff, to_retry, OnceTake, RetryError}; const DIAL_DELAY: u64 = 3000; const DIAL_RETRIES: u32 = 10; +/// Per-connection timeout: if no swarm events arrive within this window during +/// a single dial attempt, we treat it as timed out and retry. +const DIAL_EVENT_TIMEOUT: Duration = Duration::from_secs(60); + /// Dial a single Multiaddr with retries and return an error should those retries not work async fn dial_multiaddr( cmd_tx: &mpsc::Sender, @@ -53,18 +57,22 @@ fn trace_error(r: Result<()>) { /// * `cmd_tx` - Sender for network peer commands /// * `event_tx` - Broadcast sender for peer events /// * `peers` - List of peer addresses to connect to +/// +/// # Returns +/// The number of peers that were successfully connected to. pub async fn dial_peers( cmd_tx: &mpsc::Sender, event_tx: &broadcast::Sender, peers: &Vec, -) -> Result<()> { +) -> Result { let futures: Vec<_> = peers .iter() .map(|addr| dial_multiaddr(cmd_tx, event_tx, addr)) .collect(); let results = join_all(futures).await; + let connected = results.iter().filter(|r| r.is_ok()).count(); results.into_iter().for_each(trace_error); - Ok(()) + Ok(connected) } /// Attempt a connection with retries to a multiaddr. @@ -140,8 +148,8 @@ async fn wait_for_connection( _ => (), } } - _ = sleep(Duration::from_secs(60)) => { - warn!("Connection attempt timed out after 60 seconds of no events"); + _ = sleep(DIAL_EVENT_TIMEOUT) => { + warn!("Connection attempt timed out after {:?} of no events", DIAL_EVENT_TIMEOUT); return Err(RetryError::Retry(std::io::Error::new( std::io::ErrorKind::TimedOut, "Connection attempt timed out", diff --git a/crates/net/src/events.rs b/crates/net/src/events.rs index 4083c2b0d4..3a41ed2510 100644 --- a/crates/net/src/events.rs +++ b/crates/net/src/events.rs @@ -148,10 +148,6 @@ pub struct OutgoingRequestFailed { pub error: String, } -#[derive(Message, Debug, Clone)] -#[rtype("()")] -pub struct AllPeersDialed; - /// Libp2pNetInterface Commands are sent to the network peer over a mspc channel #[derive(Debug, Clone)] pub enum NetCommand { @@ -259,7 +255,13 @@ pub enum NetEvent { /// Received response from a peer in response to an outgoing request OutgoingRequestSucceeded(OutgoingRequestSucceeded), OutgoingRequestFailed(OutgoingRequestFailed), - AllPeersDialed, + /// All configured peers have been dialed (not all necessarily connected). + AllPeersDialed { + /// Number of peers that successfully connected. + connected: usize, + /// Total number of peers that were dialed. + total: usize, + }, } #[derive(Clone, Debug)] diff --git a/crates/net/src/net_interface.rs b/crates/net/src/net_interface.rs index 38990f8dd0..90a85e4f3f 100644 --- a/crates/net/src/net_interface.rs +++ b/crates/net/src/net_interface.rs @@ -10,6 +10,14 @@ use crate::{ events::{IncomingResponse, OutgoingRequest, ProtocolResponse}, net_interface_handle::NetInterfaceHandle, }; +use crate::{ + dialer::dial_peers, + events::{ + GossipData, IncomingRequest, NetCommand, NetEvent, OutgoingRequestFailed, + OutgoingRequestSucceeded, PeerTarget, PutOrStoreError, + }, + ContentHash, +}; use anyhow::{bail, Context, Result}; use e3_events::CorrelationId; use e3_utils::ArcBytes; @@ -25,12 +33,13 @@ use libp2p::{ Behaviour as KademliaBehaviour, Config as KademliaConfig, GetRecordOk, QueryResult, Quorum, Record, RecordKey, }, + multiaddr::Protocol, request_response::{ self, cbor, Event as RequestResponseEvent, Message as RequestResponseMessage, ProtocolSupport, }, swarm::{dial_opts::DialOpts, DialError, NetworkBehaviour, SwarmEvent}, - PeerId, StreamProtocol, Swarm, + Multiaddr, PeerId, StreamProtocol, Swarm, }; use rand::prelude::IteratorRandom; use std::{ @@ -49,19 +58,42 @@ const PROTOCOL_NAME: StreamProtocol = StreamProtocol::new("/enclave/kad/1.0.0"); const MAX_KADEMLIA_PAYLOAD_MB: usize = 10; const DHT_MAX_RECORDS: usize = 4096; const MAX_GOSSIP_MSG_SIZE_KB: usize = 700; -/// Number of consecutive dial failures before a peer is evicted from the routing table. -/// Set high enough to survive transient network issues during DKG, but low enough to -/// eventually clean up genuinely unreachable peers. const MAX_CONSECUTIVE_DIAL_FAILURES: u32 = 5; +const EVENT_CHANNEL_SIZE: usize = 1000; +const CMD_CHANNEL_SIZE: usize = 1000; +const PEER_FAILURE_TTL: Duration = Duration::from_secs(300); + +/// Returns true if the multiaddr contains a loopback IP (127.0.0.0/8 or ::1). +/// Loopback addresses are only meaningful on the local machine and must not be +/// added to the Kademlia routing table, otherwise they get propagated to remote +/// peers via FIND_NODE responses, causing those peers to dial themselves. +fn is_loopback_addr(addr: &Multiaddr) -> bool { + addr.iter().any(|p| match p { + Protocol::Ip4(ip) => ip.is_loopback(), + Protocol::Ip6(ip) => ip.is_loopback(), + _ => false, + }) +} -use crate::{ - dialer::dial_peers, - events::{ - GossipData, IncomingRequest, NetCommand, NetEvent, OutgoingRequestFailed, - OutgoingRequestSucceeded, PeerTarget, PutOrStoreError, - }, - ContentHash, -}; +/// Returns true only when we should filter loopback addresses from Kademlia. +/// This is the case when the node has at least one non-loopback listener, +/// meaning it's in a production-like environment where propagating loopback +/// addresses to remote peers would cause them to dial themselves. +/// In localhost test environments (all listeners on 127.0.0.1) we allow +/// loopback so that peers can discover each other. +fn should_filter_loopback(swarm: &Swarm) -> bool { + swarm + .listeners() + .any(|addr| !is_loopback_addr(addr) && !is_unspecified_addr(addr)) +} + +fn is_unspecified_addr(addr: &Multiaddr) -> bool { + addr.iter().any(|p| match p { + Protocol::Ip4(ip) => ip.is_unspecified(), + Protocol::Ip6(ip) => ip.is_unspecified(), + _ => false, + }) +} #[derive(NetworkBehaviour)] pub struct NodeBehaviour { @@ -99,8 +131,8 @@ impl Libp2pNetInterface { udp_port: Option, topic: &str, ) -> Result { - let (event_tx, _) = broadcast::channel(1000); // TODO : tune this param - let (cmd_tx, cmd_rx) = mpsc::channel(1000); // TODO : tune this param + let (event_tx, _) = broadcast::channel(EVENT_CHANNEL_SIZE); + let (cmd_tx, cmd_rx) = mpsc::channel(CMD_CHANNEL_SIZE); let swarm = libp2p::SwarmBuilder::with_existing_identity(id.into_keypair()) .with_tokio() @@ -134,6 +166,8 @@ impl Libp2pNetInterface { let cmd_rx = &mut self.cmd_rx; let mut correlator = Correlator::new(); let mut peer_failures = PeerFailureTracker::new(); + // This is to make sure we dont spam warnings in the logs + let mut last_backpressure_warn = Instant::now(); // Subscribe to topic self.swarm @@ -156,8 +190,9 @@ impl Libp2pNetInterface { let cmd_tx = cmd_tx.clone(); let peers = self.peers.clone(); async move { - dial_peers(&cmd_tx, &event_tx, &peers).await?; - event_tx.send(NetEvent::AllPeersDialed)?; + let total = peers.len(); + let connected = dial_peers(&cmd_tx, &event_tx, &peers).await?; + event_tx.send(NetEvent::AllPeersDialed { connected, total })?; return anyhow::Ok(()); } }); @@ -183,6 +218,13 @@ impl Libp2pNetInterface { Ok(_) => (), Err(e) => error!("Error processing NetEvent: {e}") } + let queued = event_tx.len(); + if queued > EVENT_CHANNEL_SIZE * 3 / 4 + && last_backpressure_warn.elapsed() > Duration::from_secs(10) + { + warn!("Event broadcast channel backpressure: {queued}/{EVENT_CHANNEL_SIZE} queued"); + last_backpressure_warn = Instant::now(); + } } } @@ -292,17 +334,19 @@ async fn process_swarm_event( num_established, .. } => { - // Only log on first connection to this peer to avoid spam - if num_established.get() == 1 { - info!("Connected to {peer_id}"); - } // Reset failure count on successful connection peer_failures.reset(&peer_id); + if num_established.get() == 1 { + let total = swarm.connected_peers().count(); + info!("Peer connected: {peer_id} (total: {total})"); + } let remote_addr = endpoint.get_remote_address().clone(); - swarm - .behaviour_mut() - .kademlia - .add_address(&peer_id, remote_addr.clone()); + if !(should_filter_loopback(swarm) && is_loopback_addr(&remote_addr)) { + swarm + .behaviour_mut() + .kademlia + .add_address(&peer_id, remote_addr.clone()); + } // Trigger Kademlia bootstrap to discover peers beyond direct connections if num_established.get() == 1 { @@ -332,11 +376,16 @@ async fn process_swarm_event( "Peer ID mismatch at {remote_addr}: expected {failed_peer}, got {obtained} — \ replacing stale routing entry" ); + let local_peer = *swarm.local_peer_id(); swarm.behaviour_mut().kademlia.remove_peer(failed_peer); - swarm - .behaviour_mut() - .kademlia - .add_address(&obtained, remote_addr); + if obtained != local_peer + && !(should_filter_loopback(swarm) && is_loopback_addr(&remote_addr)) + { + swarm + .behaviour_mut() + .kademlia + .add_address(&obtained, remote_addr); + } peer_failures.reset(failed_peer); // Trigger Kademlia bootstrap to discover peers beyond direct connections. @@ -372,11 +421,14 @@ async fn process_swarm_event( SwarmEvent::IncomingConnectionError { error, .. } => { let error_str = format!("{:#}", anyhow::Error::from(error)); - // Downgrade self dial attempts to debug - if error_str.contains("Local peer ID") { + // Downgrade benign handshake failures to debug: + // - "Local peer ID": self-dial attempt + // - "aborted by peer": simultaneous connection dedup (both sides dialed, + // libp2p keeps one connection and the other side aborts the handshake) + if error_str.contains("Local peer ID") || error_str.contains("aborted by peer") { debug!("{}", error_str); } else { - warn!("{}", error_str); + warn!("Incoming connection error: {}", error_str); } } @@ -554,8 +606,48 @@ async fn process_swarm_event( debug!("Response sent to peer={}, id={}", peer, request_id); } + SwarmEvent::Behaviour(NodeBehaviourEvent::Identify( + libp2p::identify::Event::Received { peer_id, info, .. }, + )) => { + debug!("Identify received from {peer_id}: {:?}", info.observed_addr); + let filter = should_filter_loopback(swarm); + for addr in &info.listen_addrs { + if !(filter && is_loopback_addr(addr)) { + swarm + .behaviour_mut() + .kademlia + .add_address(&peer_id, addr.clone()); + } + } + if !is_loopback_addr(&info.observed_addr) { + swarm.add_external_address(info.observed_addr); + } + } + + SwarmEvent::ConnectionClosed { + peer_id, + num_established, + cause, + .. + } => { + if num_established == 0 { + let total = swarm.connected_peers().count(); + info!("Peer disconnected: {peer_id} (total: {total}, cause: {cause:?})"); + } + } + + SwarmEvent::ListenerClosed { + addresses, reason, .. + } => { + warn!("Listener closed on {addresses:?}: {reason:?}"); + } + + SwarmEvent::ListenerError { error, .. } => { + error!("Listener error: {error}"); + } + unknown => { - trace!("Unknown event: {:?}", unknown); + debug!("Unhandled swarm event: {:?}", unknown); } }; Ok(()) @@ -867,8 +959,9 @@ fn handle_response(swarm: &mut Swarm, responder: DirectResponder) } /// Tracks consecutive connection failures per peer to detect and evict stale peers. +/// Entries are automatically cleaned up after PEER_FAILURE_TTL to prevent unbounded growth. struct PeerFailureTracker { - failures: HashMap, + failures: HashMap, } impl PeerFailureTracker { @@ -880,15 +973,25 @@ impl PeerFailureTracker { /// Record a failure for the given peer and return the new consecutive failure count. fn record_failure(&mut self, peer_id: &PeerId) -> u32 { - let count = self.failures.entry(*peer_id).or_insert(0); - *count += 1; - *count + self.cleanup_stale(); + let now = Instant::now(); + let entry = self.failures.entry(*peer_id).or_insert((0, now)); + entry.0 += 1; + entry.1 = now; + entry.0 } /// Reset the failure count for a peer (e.g. on successful connection or after eviction). fn reset(&mut self, peer_id: &PeerId) { self.failures.remove(peer_id); } + + /// Remove entries older than PEER_FAILURE_TTL to prevent unbounded growth + fn cleanup_stale(&mut self) { + let now = Instant::now(); + self.failures + .retain(|_, (_, last_seen)| now.duration_since(*last_seen) < PEER_FAILURE_TTL); + } } #[cfg(test)] diff --git a/crates/net/src/net_interface_handle.rs b/crates/net/src/net_interface_handle.rs index f90b44ea4c..6acda2f293 100644 --- a/crates/net/src/net_interface_handle.rs +++ b/crates/net/src/net_interface_handle.rs @@ -73,7 +73,10 @@ pub fn create_channel_bridge() -> (NetInterfaceHandle, NetChannelBridge) { tokio::spawn(async move { let _rx_guard = keep_alive; sleep(Duration::from_millis(100)).await; - let _ = startup_event_tx.send(NetEvent::AllPeersDialed); + let _ = startup_event_tx.send(NetEvent::AllPeersDialed { + connected: 0, + total: 0, + }); while let Some(cmd) = m_cmd_rx.recv().await { let _ = tx.send(cmd); } diff --git a/crates/net/src/net_sync_manager.rs b/crates/net/src/net_sync_manager.rs index 483b13507e..c6b99c8c50 100644 --- a/crates/net/src/net_sync_manager.rs +++ b/crates/net/src/net_sync_manager.rs @@ -16,7 +16,7 @@ use e3_utils::MAILBOX_LIMIT; use serde::{Deserialize, Serialize}; use std::{collections::HashMap, convert::TryInto, sync::Arc, time::Duration}; use tokio::sync::{broadcast, mpsc}; -use tracing::{debug, info}; +use tracing::{debug, error, info, warn}; use crate::{ direct_requester::DirectRequester, @@ -25,6 +25,13 @@ use crate::{ net_event_batch::{fetch_all_batched_events, BatchCursor, EventBatch, FetchEventsSince}, }; +/// Maximum time to wait for a `ConnectionEstablished` event after all dials +/// failed before publishing `NetReady` anyway. +const NET_READY_CONNECT_TIMEOUT: Duration = Duration::from_secs(60); + +/// Maximum time to wait for the `AllPeersDialed` event before giving up. +const ALL_PEERS_DIALED_TIMEOUT: Duration = Duration::from_secs(30); + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct SyncResponseValue { pub events: Vec>, @@ -61,7 +68,9 @@ pub struct NetSyncManager { rx: Arc>, eventstore: Recipient>, requests: HashMap, - peers_ready: bool, + all_peers_dialed: bool, + has_connections: bool, + net_ready_published: bool, } impl NetSyncManager { @@ -77,8 +86,19 @@ impl NetSyncManager { rx: Arc::clone(rx), eventstore, requests: HashMap::new(), - peers_ready: false, + all_peers_dialed: false, + has_connections: false, + net_ready_published: false, + } + } + + fn publish_net_ready(&mut self) -> Result<()> { + if !self.net_ready_published { + self.net_ready_published = true; + info!("NetSyncManager: publishing NetReady"); + self.bus.publish_without_context(NetReady::new())?; } + Ok(()) } pub fn setup( @@ -102,7 +122,10 @@ impl NetSyncManager { match event { // Someone is asking for our sync NetEvent::IncomingRequest(value) => addr.do_send(value), - NetEvent::AllPeersDialed => addr.do_send(AllPeersDialed), + NetEvent::AllPeersDialed { connected, total } => { + addr.do_send(AllPeersDialed { connected, total }) + } + NetEvent::ConnectionEstablished { .. } => addr.do_send(PeerConnected), _ => (), } } @@ -150,7 +173,7 @@ impl Handler> for NetSyncManager { self.rx.clone(), msg, ctx.address(), - !self.peers_ready, + !self.all_peers_dialed, ), ) } @@ -249,19 +272,64 @@ impl Handler for NetSyncManager { impl Handler for NetSyncManager { type Result = (); - fn handle(&mut self, _: AllPeersDialed, _: &mut Self::Context) -> Self::Result { + fn handle(&mut self, msg: AllPeersDialed, ctx: &mut Self::Context) -> Self::Result { trap(EType::Sync, &self.bus.clone(), || { - info!("NetSyncManager: AllPeersDialed"); - self.peers_ready = true; - self.bus.publish_without_context(NetReady::new())?; + info!( + "NetSyncManager: AllPeersDialed (connected={}, total={})", + msg.connected, msg.total + ); + self.all_peers_dialed = true; + if msg.connected > 0 { + self.has_connections = true; + } + if msg.total == 0 || self.has_connections { + // No peers configured or connections already established + self.publish_net_ready()?; + } else { + // All dials failed — wait for a ConnectionEstablished event. + // Fall back to a 60-second timeout so we don't hang forever. + info!( + "All peer dials failed, waiting for connections before publishing NetReady..." + ); + let bus = self.bus.clone(); + ctx.run_later(NET_READY_CONNECT_TIMEOUT, move |this, _| { + if !this.net_ready_published { + warn!("No peer connections established within 60s timeout, publishing NetReady anyway"); + this.net_ready_published = true; + if let Err(e) = bus.publish_without_context(NetReady::new()) { + error!("Failed to publish NetReady: {e}"); + } + } + }); + } Ok(()) }) } } +impl Handler for NetSyncManager { + type Result = (); + fn handle(&mut self, _: PeerConnected, _: &mut Self::Context) -> Self::Result { + if !self.has_connections { + info!("NetSyncManager: first peer connected"); + self.has_connections = true; + if self.all_peers_dialed { + trap(EType::Sync, &self.bus.clone(), || self.publish_net_ready()); + } + } + } +} + +#[derive(Message)] +#[rtype(result = "()")] +struct AllPeersDialed { + connected: usize, + total: usize, +} + #[derive(Message)] #[rtype(result = "()")] -struct AllPeersDialed; +struct PeerConnected; async fn handle_sync_request_event( net_cmds: mpsc::Sender, @@ -277,14 +345,14 @@ async fn handle_sync_request_event( await_event( &net_events, |e| { - if matches!(e, &NetEvent::AllPeersDialed) { + if matches!(e, &NetEvent::AllPeersDialed { .. }) { info!("AllPeersDialed matched!"); Some(e.clone()) } else { None } }, - Duration::from_secs(30), + ALL_PEERS_DIALED_TIMEOUT, ) .await?; } diff --git a/crates/sortition/src/sortition.rs b/crates/sortition/src/sortition.rs index ee6e37ba09..d46805e6bc 100644 --- a/crates/sortition/src/sortition.rs +++ b/crates/sortition/src/sortition.rs @@ -278,10 +278,9 @@ impl Sortition { }) .start(); - // Subscribe to all relevant events + // Subscribe to state-building events immediately (needed during EventStore replay) bus.subscribe_all( &[ - EventType::E3Requested, EventType::CiphernodeAdded, EventType::CiphernodeRemoved, EventType::TicketBalanceUpdated, @@ -297,6 +296,21 @@ impl Sortition { addr.clone().into(), ); + // Gate E3Requested behind EffectsEnabled — sortition should not trigger + // ticket generation during historical event replay. + bus.subscribe( + EventType::EffectsEnabled, + e3_events::run_once::({ + let bus = bus.clone(); + let addr = addr.clone(); + move |_| { + bus.subscribe(EventType::E3Requested, addr.into()); + Ok(()) + } + }) + .recipient(), + ); + info!("Sortition actor started"); Ok(addr) } diff --git a/crates/sync/src/sync.rs b/crates/sync/src/sync.rs index 4fe10ec225..59f8c190e6 100644 --- a/crates/sync/src/sync.rs +++ b/crates/sync/src/sync.rs @@ -493,4 +493,189 @@ mod tests { assert_eq!(result.len(), 3); } + + /// Verify that `run_once::` correctly gates event subscriptions. + /// + /// Simulates the sync flow: + /// 1. An event is published BEFORE EffectsEnabled (should be dropped — nobody listening) + /// 2. EffectsEnabled is published (triggers subscription) + /// 3. The same event is published AFTER EffectsEnabled (should be received) + /// + /// This is the pattern used by Sortition (E3Requested), CommitteeFinalizer + /// (CommitteeRequested), Multithread (ComputeRequest), and the sol writers. + #[actix::test] + async fn effects_enabled_gates_event_subscriptions() -> anyhow::Result<()> { + use std::sync::{ + atomic::{AtomicU32, Ordering}, + Arc, + }; + + let system = EventSystem::new().with_fresh_bus(); + let bus = system.handle()?.enable("test-effects-gating"); + + let receive_count = Arc::new(AtomicU32::new(0)); + + // Set up a gated subscription: only subscribe to TestEvent after EffectsEnabled + let counter = receive_count.clone(); + let runner = e3_events::run_once::({ + let bus = bus.clone(); + move |_| { + // Create a simple actor that counts received TestEvents + use actix::{Actor, Context, Handler}; + + struct Counter(Arc); + impl Actor for Counter { + type Context = Context; + } + impl Handler for Counter { + type Result = (); + fn handle(&mut self, msg: EnclaveEvent, _: &mut Self::Context) -> Self::Result { + if matches!(msg.get_data(), EnclaveEventData::TestEvent(_)) { + self.0.fetch_add(1, Ordering::SeqCst); + } + } + } + + let addr = Counter(counter).start(); + bus.subscribe(EventType::TestEvent, addr.recipient()); + Ok(()) + } + }); + bus.subscribe(EventType::EffectsEnabled, runner.recipient()); + + // 1. Publish a TestEvent BEFORE EffectsEnabled — should NOT be received + bus.event_bus().try_send( + EnclaveEvent::::test_event("before-effects") + .id(1) + .seq(1) + .build(), + )?; + + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + assert_eq!( + receive_count.load(Ordering::SeqCst), + 0, + "Event before EffectsEnabled should not be received" + ); + + // 2. Publish EffectsEnabled — triggers the subscription + bus.publish_without_context(EffectsEnabled::new())?; + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + // 3. Publish a TestEvent AFTER EffectsEnabled — should be received + bus.event_bus().try_send( + EnclaveEvent::::test_event("after-effects") + .id(2) + .seq(2) + .build(), + )?; + + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + assert_eq!( + receive_count.load(Ordering::SeqCst), + 1, + "Event after EffectsEnabled should be received exactly once" + ); + + Ok(()) + } + + /// Verify that ungated (immediate) subscriptions receive events both + /// before and after EffectsEnabled. + /// + /// This mirrors how Sortition subscribes to state-building events + /// (CiphernodeAdded, E3Failed, etc.) immediately, while gating + /// E3Requested behind EffectsEnabled. The immediate subscriptions + /// must work during EventStore replay (before EffectsEnabled). + #[actix::test] + async fn immediate_subscriptions_receive_before_effects_enabled() -> anyhow::Result<()> { + use std::sync::{ + atomic::{AtomicU32, Ordering}, + Arc, + }; + + let system = EventSystem::new().with_fresh_bus(); + let bus = system.handle()?.enable("test-immediate-sub"); + + let immediate_count = Arc::new(AtomicU32::new(0)); + let gated_count = Arc::new(AtomicU32::new(0)); + + // Helper actor that counts TestEvents + use actix::{Actor, Context, Handler}; + + struct Counter(Arc); + impl Actor for Counter { + type Context = Context; + } + impl Handler for Counter { + type Result = (); + fn handle(&mut self, msg: EnclaveEvent, _: &mut Self::Context) -> Self::Result { + if matches!(msg.get_data(), EnclaveEventData::TestEvent(_)) { + self.0.fetch_add(1, Ordering::SeqCst); + } + } + } + + // Immediate subscription — receives all events, including before EffectsEnabled + let immediate_actor = Counter(immediate_count.clone()).start(); + bus.subscribe(EventType::TestEvent, immediate_actor.recipient()); + + // Gated subscription — only receives after EffectsEnabled + let gated_counter = gated_count.clone(); + let runner = e3_events::run_once::({ + let bus = bus.clone(); + move |_| { + let addr = Counter(gated_counter).start(); + bus.subscribe(EventType::TestEvent, addr.recipient()); + Ok(()) + } + }); + bus.subscribe(EventType::EffectsEnabled, runner.recipient()); + + // 1. Publish event BEFORE EffectsEnabled + bus.event_bus().try_send( + EnclaveEvent::::test_event("during-replay") + .id(1) + .seq(1) + .build(), + )?; + + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + assert_eq!( + immediate_count.load(Ordering::SeqCst), + 1, + "Immediate subscription should receive events before EffectsEnabled" + ); + assert_eq!( + gated_count.load(Ordering::SeqCst), + 0, + "Gated subscription should NOT receive events before EffectsEnabled" + ); + + // 2. Publish EffectsEnabled + bus.publish_without_context(EffectsEnabled::new())?; + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + // 3. Publish event AFTER EffectsEnabled + bus.event_bus().try_send( + EnclaveEvent::::test_event("after-effects") + .id(2) + .seq(2) + .build(), + )?; + + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + assert_eq!( + immediate_count.load(Ordering::SeqCst), + 2, + "Immediate subscription should receive events after EffectsEnabled too" + ); + assert_eq!( + gated_count.load(Ordering::SeqCst), + 1, + "Gated subscription should receive events after EffectsEnabled" + ); + + Ok(()) + } }