From ba85c73218f07aa444aa8709964bc76109ab6630 Mon Sep 17 00:00:00 2001 From: Hamza Khalid Date: Wed, 18 Feb 2026 08:42:05 +0500 Subject: [PATCH 1/3] fix: event dedup halting --- crates/sync/src/sync.rs | 27 +++++++++++++++++++++++---- 1 file changed, 23 insertions(+), 4 deletions(-) diff --git a/crates/sync/src/sync.rs b/crates/sync/src/sync.rs index 668449bdb9..900e763818 100644 --- a/crates/sync/src/sync.rs +++ b/crates/sync/src/sync.rs @@ -10,9 +10,10 @@ use anyhow::Result; use e3_data::Repositories; use e3_events::{ AggregateConfig, AggregateId, BusHandle, CorrelationId, EffectsEnabled, EnclaveEvent, - EventContextAccessors, EventPublisher, EventStoreQueryBy, EventStoreQueryResponse, - EvmEventConfig, EvmEventConfigChain, HistoricalEvmEventsReceived, HistoricalEvmSyncStart, - HistoricalNetEventsReceived, HistoricalNetSyncStart, SeqAgg, SyncEnded, Unsequenced, + EnclaveEventData, Event, EventContextAccessors, EventPublisher, EventStoreQueryBy, + EventStoreQueryResponse, EvmEventConfig, EvmEventConfigChain, HistoricalEvmEventsReceived, + HistoricalEvmSyncStart, HistoricalNetEventsReceived, HistoricalNetSyncStart, SeqAgg, SyncEnded, + Unsequenced, }; use e3_utils::actix::channel as actix_toolbox; use std::{ @@ -22,6 +23,15 @@ use std::{ use tokio::sync::mpsc::Receiver; use tracing::{info, warn}; +fn is_infrastructure_event(event: &EnclaveEvent) -> bool { + matches!( + event.get_data(), + EnclaveEventData::SyncEnded(_) + | EnclaveEventData::EffectsEnabled(_) + | EnclaveEventData::HistoricalEvmSyncStart(_) + ) +} + pub async fn sync( bus: &BusHandle, default_config: &EvmEventConfig, @@ -55,8 +65,17 @@ pub async fn sync( info!("{} EventStore events loaded.", events.len()); info!("Replaying events to actors..."); - // 4. Replay the EventStore events to all listeners (except effects) + // 4. Replay the EventStore events to all listeners (except effects). + // Skip infrastructure events (SyncEnded, EffectsEnabled, HistoricalEvmSyncStart) because + // they will be re-published by this sync process (steps 5, 8, 10). Replaying them here + // would poison the EventBus bloom-filter deduplication: the replayed event has the same + // EventId (payload hash) as the one we publish later, causing the later event to be + // silently dropped. This is critical for SyncEnded, if the EvmChainGateway never + // receives it, the gateway stays in BufferUntilLive and all live EVM events are lost. for event in events { + if is_infrastructure_event(&event) { + continue; + } bus.event_bus().try_send(event)?; } info!("Events replayed."); From 709fbfc56b49c95602866b22a2fb28c284cad265 Mon Sep 17 00:00:00 2001 From: Hamza Khalid Date: Wed, 18 Feb 2026 08:55:11 +0500 Subject: [PATCH 2/3] chore: refactor net shutdown --- crates/net/src/net_interface.rs | 70 +++++++++++---------------------- 1 file changed, 23 insertions(+), 47 deletions(-) diff --git a/crates/net/src/net_interface.rs b/crates/net/src/net_interface.rs index 7a931b3988..246a3c87eb 100644 --- a/crates/net/src/net_interface.rs +++ b/crates/net/src/net_interface.rs @@ -31,10 +31,7 @@ use rand::prelude::IteratorRandom; use std::{ collections::HashMap, io::Error, - sync::{ - atomic::{AtomicBool, Ordering}, - Arc, - }, + sync::Arc, time::{Duration, Instant}, }; use tokio::{select, sync::broadcast, sync::mpsc}; @@ -129,7 +126,6 @@ impl NetInterface { let cmd_rx = &mut self.cmd_rx; let mut correlator = Correlator::new(); let mut peer_failures = PeerFailureTracker::new(); - let shutdown_flag = Arc::new(AtomicBool::new(false)); // Subscribe to topic self.swarm @@ -158,26 +154,23 @@ impl NetInterface { }); loop { - if shutdown_flag.load(Ordering::Relaxed) { - info!("Shutdown flag set, exiting event loop"); - break; - } - select! { // Process commands Some(command) = cmd_rx.recv() => { - match process_swarm_command(&mut self.swarm, &event_tx, &shutdown_flag, &mut correlator, command).await { - Ok(should_break) => { - if should_break { - break; - } - }, - Err(e) => error!("Error processing NetCommand: {e}") + if let NetCommand::Shutdown = command { + if let Err(e) = handle_shutdown(&mut self.swarm) { + error!("Error processing NetCommand: {e}"); + } + break; + } + + if let Err(e) = process_swarm_command(&mut self.swarm, &event_tx, &mut correlator, command).await { + error!("Error processing NetCommand: {e}") } } // Process events event = self.swarm.select_next_some() => { - match process_swarm_event(&mut self.swarm, &event_tx, &shutdown_flag, &mut correlator, &mut peer_failures, event).await { + match process_swarm_event(&mut self.swarm, &event_tx, &mut correlator, &mut peer_failures, event).await { Ok(_) => (), Err(e) => error!("Error processing NetEvent: {e}") } @@ -252,14 +245,10 @@ fn create_behaviour( async fn process_swarm_event( swarm: &mut Swarm, event_tx: &broadcast::Sender, - shutdown_flag: &Arc, correlator: &mut Correlator, peer_failures: &mut PeerFailureTracker, event: SwarmEvent, ) -> Result<()> { - if shutdown_flag.load(Ordering::Relaxed) { - return Ok(()); // Skip processing during shutdown - } match event { SwarmEvent::ConnectionEstablished { peer_id, @@ -511,19 +500,13 @@ async fn process_swarm_event( Ok(()) } -/// Process all swarm commands. Returns Ok(true) if the loop should break (shutdown). +/// Process all swarm commands except shutdown. async fn process_swarm_command( swarm: &mut Swarm, event_tx: &broadcast::Sender, - shutdown_flag: &Arc, correlator: &mut Correlator, command: NetCommand, -) -> Result { - if shutdown_flag.load(Ordering::Relaxed) { - // Signal shutdown by returning true - return Ok(true); - } - +) -> Result<()> { match command { NetCommand::GossipPublish { data, @@ -531,11 +514,11 @@ async fn process_swarm_command( correlation_id, } => { handle_gossip_publish(swarm, event_tx, data, topic, correlation_id)?; - Ok(false) + Ok(()) } NetCommand::Dial(multi) => { handle_dial(swarm, event_tx, multi)?; - Ok(false) + Ok(()) } NetCommand::DhtPutRecord { correlation_id, @@ -552,29 +535,28 @@ async fn process_swarm_command( expires, value, )?; - Ok(false) + Ok(()) } NetCommand::DhtGetRecord { correlation_id, key, } => { handle_get_record(swarm, correlator, correlation_id, key)?; - Ok(false) - } - NetCommand::Shutdown => { - handle_shutdown(swarm, shutdown_flag)?; - Ok(true) + Ok(()) } NetCommand::OutgoingSyncRequest { correlation_id, value, } => { handle_outgoing_sync_request(swarm, correlator, correlation_id, value)?; - Ok(false) + Ok(()) } NetCommand::SyncResponse { value, channel } => { handle_sync_response(swarm, channel, value)?; - Ok(false) + Ok(()) + } + NetCommand::Shutdown => { + unreachable!("shutdown command must be handled in NetInterface::start") } } } @@ -685,15 +667,9 @@ fn handle_get_record( Ok(()) } -fn handle_shutdown( - swarm: &mut Swarm, - shutdown_flag: &Arc, -) -> Result<()> { +fn handle_shutdown(swarm: &mut Swarm) -> Result<()> { info!("Starting graceful shutdown"); - // Set the shutdown flag - shutdown_flag.store(true, Ordering::Relaxed); - // Disconnect all peers let peers: Vec<_> = swarm.connected_peers().copied().collect(); for peer in peers { From dfa0cacf0035f7eb5d95a4e848aa61a7c1bac481 Mon Sep 17 00:00:00 2001 From: Hamza Khalid Date: Wed, 18 Feb 2026 12:11:04 +0500 Subject: [PATCH 3/3] fix: add correlation id to events --- .../src/enclave_event/enable_effects.rs | 11 ++- crates/events/src/enclave_event/sync_end.rs | 9 +- crates/sync/src/sync.rs | 99 +++++++++++++++++++ 3 files changed, 114 insertions(+), 5 deletions(-) diff --git a/crates/events/src/enclave_event/enable_effects.rs b/crates/events/src/enclave_event/enable_effects.rs index 16ae123770..4657fab0bd 100644 --- a/crates/events/src/enclave_event/enable_effects.rs +++ b/crates/events/src/enclave_event/enable_effects.rs @@ -4,18 +4,23 @@ // without even the implied warranty of MERCHANTABILITY // or FITNESS FOR A PARTICULAR PURPOSE. +use crate::CorrelationId; use actix::Message; use serde::{Deserialize, Serialize}; use std::fmt::{self, Display}; -/// Dispatched once the sync process is complete and live listening should continue +/// Dispatched once effects (side-effects) should be activated after a sync pass #[derive(Message, Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] #[rtype(result = "()")] -pub struct EffectsEnabled; +pub struct EffectsEnabled { + pub correlation_id: CorrelationId, +} impl EffectsEnabled { pub fn new() -> Self { - Self {} + Self { + correlation_id: CorrelationId::new(), + } } } diff --git a/crates/events/src/enclave_event/sync_end.rs b/crates/events/src/enclave_event/sync_end.rs index 6b2118a72b..881cf0dd71 100644 --- a/crates/events/src/enclave_event/sync_end.rs +++ b/crates/events/src/enclave_event/sync_end.rs @@ -4,6 +4,7 @@ // without even the implied warranty of MERCHANTABILITY // or FITNESS FOR A PARTICULAR PURPOSE. +use crate::CorrelationId; use actix::Message; use serde::{Deserialize, Serialize}; use std::fmt::{self, Display}; @@ -11,11 +12,15 @@ use std::fmt::{self, Display}; /// Dispatched once the sync process is complete and live listening should continue #[derive(Message, Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] #[rtype(result = "()")] -pub struct SyncEnded; +pub struct SyncEnded { + pub correlation_id: CorrelationId, +} impl SyncEnded { pub fn new() -> Self { - Self {} + Self { + correlation_id: CorrelationId::new(), + } } } diff --git a/crates/sync/src/sync.rs b/crates/sync/src/sync.rs index 900e763818..4d41bbb6ef 100644 --- a/crates/sync/src/sync.rs +++ b/crates/sync/src/sync.rs @@ -124,6 +124,8 @@ pub async fn sync( } info!("Historical events published."); + // 10. Publish the SyncEnded event + info!("Publishing SyncEnded event..."); bus.publish_without_context(SyncEnded::new())?; info!("Sync finished."); // normal live operations @@ -277,3 +279,100 @@ impl SnapshotLoaded { Self { snapshot } } } + +#[cfg(test)] +mod tests { + use super::is_infrastructure_event; + use e3_ciphernode_builder::EventSystem; + use e3_events::{ + EffectsEnabled, EnclaveEvent, EnclaveEventData, Event, EventConstructorWithTimestamp, + EventSource, EvmEventConfig, HistoricalEvmSyncStart, SyncEnded, TakeEvents, TestEvent, + Unsequenced, + }; + + fn make_sequenced(data: impl Into, seq: u64) -> EnclaveEvent { + EnclaveEvent::::new_with_timestamp( + data.into(), + None, + 1000, + None, + EventSource::Local, + ) + .into_sequenced(seq) + } + + /// `sender` is `Option>` — `None` is safe here since we're not dispatching. + fn make_historical_evm_sync_start() -> HistoricalEvmSyncStart { + HistoricalEvmSyncStart { + evm_config: EvmEventConfig::new(), + sender: None, + } + } + + #[test] + fn infrastructure_events_are_detected() { + let sync_ended = make_sequenced(SyncEnded::new(), 1); + let effects_enabled = make_sequenced(EffectsEnabled::new(), 2); + let evm_sync_start = make_sequenced(make_historical_evm_sync_start(), 3); + let test_event = make_sequenced(TestEvent::new("hello", 42), 4); + + assert!(is_infrastructure_event(&sync_ended)); + assert!(is_infrastructure_event(&effects_enabled)); + assert!(is_infrastructure_event(&evm_sync_start)); + assert!(!is_infrastructure_event(&test_event)); + } + + /// Regression: infrastructure events replayed from the EventStore must be filtered before + /// they reach the bus. If they aren't, the bloom-filter deduplicates the copy that `sync()` + /// re-publishes later, causing it to be silently dropped. + #[actix::test] + async fn infrastructure_events_are_filtered_during_replay() -> anyhow::Result<()> { + let system = EventSystem::new("test-sync-replay").with_fresh_bus(); + let bus = system.handle()?; + let history = bus.history(); + + let events: Vec = vec![ + make_sequenced(TestEvent::new("before", 1), 1), + make_sequenced(SyncEnded::new(), 2), + make_sequenced(EffectsEnabled::new(), 3), + make_sequenced(make_historical_evm_sync_start(), 4), + make_sequenced(TestEvent::new("after", 2), 5), + ]; + + for event in events { + if is_infrastructure_event(&event) { + continue; + } + bus.event_bus().try_send(event)?; + } + + let received = history.send(TakeEvents::new(2)).await?; + + let event_types: Vec<&'static str> = received + .iter() + .map(|e| match e.get_data() { + EnclaveEventData::TestEvent(_) => "TestEvent", + EnclaveEventData::SyncEnded(_) => "SyncEnded", + EnclaveEventData::EffectsEnabled(_) => "EffectsEnabled", + EnclaveEventData::HistoricalEvmSyncStart(_) => "HistoricalEvmSyncStart", + _ => "other", + }) + .collect(); + + assert_eq!(event_types, vec!["TestEvent", "TestEvent"]); + + let msgs: Vec = received + .iter() + .filter_map(|e| { + if let EnclaveEventData::TestEvent(t) = e.get_data() { + Some(t.msg.clone()) + } else { + None + } + }) + .collect(); + assert_eq!(msgs, vec!["before", "after"]); + + Ok(()) + } +}