Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions crates/events/src/enclave_event/enable_effects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,23 @@
// without even the implied warranty of MERCHANTABILITY
// or FITNESS FOR A PARTICULAR PURPOSE.

use crate::CorrelationId;
use actix::Message;
use serde::{Deserialize, Serialize};
use std::fmt::{self, Display};

/// Dispatched once the sync process is complete and live listening should continue
/// Dispatched once effects (side-effects) should be activated after a sync pass
#[derive(Message, Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[rtype(result = "()")]
pub struct EffectsEnabled;
pub struct EffectsEnabled {
pub correlation_id: CorrelationId,
}
Comment thread
ryardley marked this conversation as resolved.

impl EffectsEnabled {
pub fn new() -> Self {
Self {}
Self {
correlation_id: CorrelationId::new(),
}
}
}

Expand Down
9 changes: 7 additions & 2 deletions crates/events/src/enclave_event/sync_end.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,23 @@
// without even the implied warranty of MERCHANTABILITY
// or FITNESS FOR A PARTICULAR PURPOSE.

use crate::CorrelationId;
use actix::Message;
use serde::{Deserialize, Serialize};
use std::fmt::{self, Display};

/// Dispatched once the sync process is complete and live listening should continue
#[derive(Message, Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[rtype(result = "()")]
pub struct SyncEnded;
pub struct SyncEnded {
pub correlation_id: CorrelationId,
}
Comment thread
ryardley marked this conversation as resolved.

impl SyncEnded {
pub fn new() -> Self {
Self {}
Self {
correlation_id: CorrelationId::new(),
}
}
}

Expand Down
70 changes: 23 additions & 47 deletions crates/net/src/net_interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,7 @@ use rand::prelude::IteratorRandom;
use std::{
collections::HashMap,
io::Error,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
sync::Arc,
time::{Duration, Instant},
};
use tokio::{select, sync::broadcast, sync::mpsc};
Expand Down Expand Up @@ -129,7 +126,6 @@ impl NetInterface {
let cmd_rx = &mut self.cmd_rx;
let mut correlator = Correlator::new();
let mut peer_failures = PeerFailureTracker::new();
let shutdown_flag = Arc::new(AtomicBool::new(false));

// Subscribe to topic
self.swarm
Expand Down Expand Up @@ -158,26 +154,23 @@ impl NetInterface {
});

loop {
if shutdown_flag.load(Ordering::Relaxed) {
info!("Shutdown flag set, exiting event loop");
break;
}

select! {
// Process commands
Some(command) = cmd_rx.recv() => {
match process_swarm_command(&mut self.swarm, &event_tx, &shutdown_flag, &mut correlator, command).await {
Ok(should_break) => {
if should_break {
break;
}
},
Err(e) => error!("Error processing NetCommand: {e}")
if let NetCommand::Shutdown = command {
if let Err(e) = handle_shutdown(&mut self.swarm) {
error!("Error processing NetCommand: {e}");
}
break;
}

if let Err(e) = process_swarm_command(&mut self.swarm, &event_tx, &mut correlator, command).await {
error!("Error processing NetCommand: {e}")
}
}
// Process events
event = self.swarm.select_next_some() => {
match process_swarm_event(&mut self.swarm, &event_tx, &shutdown_flag, &mut correlator, &mut peer_failures, event).await {
match process_swarm_event(&mut self.swarm, &event_tx, &mut correlator, &mut peer_failures, event).await {
Ok(_) => (),
Err(e) => error!("Error processing NetEvent: {e}")
}
Expand Down Expand Up @@ -252,14 +245,10 @@ fn create_behaviour(
async fn process_swarm_event(
swarm: &mut Swarm<NodeBehaviour>,
event_tx: &broadcast::Sender<NetEvent>,
shutdown_flag: &Arc<AtomicBool>,
correlator: &mut Correlator,
peer_failures: &mut PeerFailureTracker,
event: SwarmEvent<NodeBehaviourEvent>,
) -> Result<()> {
if shutdown_flag.load(Ordering::Relaxed) {
return Ok(()); // Skip processing during shutdown
}
match event {
SwarmEvent::ConnectionEstablished {
peer_id,
Expand Down Expand Up @@ -511,31 +500,25 @@ async fn process_swarm_event(
Ok(())
}

/// Process all swarm commands. Returns Ok(true) if the loop should break (shutdown).
/// Process all swarm commands except shutdown.
async fn process_swarm_command(
swarm: &mut Swarm<NodeBehaviour>,
event_tx: &broadcast::Sender<NetEvent>,
shutdown_flag: &Arc<AtomicBool>,
correlator: &mut Correlator,
command: NetCommand,
) -> Result<bool> {
if shutdown_flag.load(Ordering::Relaxed) {
// Signal shutdown by returning true
return Ok(true);
}

) -> Result<()> {
match command {
NetCommand::GossipPublish {
data,
topic,
correlation_id,
} => {
handle_gossip_publish(swarm, event_tx, data, topic, correlation_id)?;
Ok(false)
Ok(())
}
NetCommand::Dial(multi) => {
handle_dial(swarm, event_tx, multi)?;
Ok(false)
Ok(())
}
NetCommand::DhtPutRecord {
correlation_id,
Expand All @@ -552,29 +535,28 @@ async fn process_swarm_command(
expires,
value,
)?;
Ok(false)
Ok(())
}
NetCommand::DhtGetRecord {
correlation_id,
key,
} => {
handle_get_record(swarm, correlator, correlation_id, key)?;
Ok(false)
}
NetCommand::Shutdown => {
handle_shutdown(swarm, shutdown_flag)?;
Ok(true)
Ok(())
}
NetCommand::OutgoingSyncRequest {
correlation_id,
value,
} => {
handle_outgoing_sync_request(swarm, correlator, correlation_id, value)?;
Ok(false)
Ok(())
}
NetCommand::SyncResponse { value, channel } => {
handle_sync_response(swarm, channel, value)?;
Ok(false)
Ok(())
}
NetCommand::Shutdown => {
unreachable!("shutdown command must be handled in NetInterface::start")
}
}
}
Expand Down Expand Up @@ -685,15 +667,9 @@ fn handle_get_record(
Ok(())
}

fn handle_shutdown(
swarm: &mut Swarm<NodeBehaviour>,
shutdown_flag: &Arc<AtomicBool>,
) -> Result<()> {
fn handle_shutdown(swarm: &mut Swarm<NodeBehaviour>) -> Result<()> {
info!("Starting graceful shutdown");

// Set the shutdown flag
shutdown_flag.store(true, Ordering::Relaxed);

// Disconnect all peers
let peers: Vec<_> = swarm.connected_peers().copied().collect();
for peer in peers {
Expand Down
126 changes: 122 additions & 4 deletions crates/sync/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@ use anyhow::Result;
use e3_data::Repositories;
use e3_events::{
AggregateConfig, AggregateId, BusHandle, CorrelationId, EffectsEnabled, EnclaveEvent,
EventContextAccessors, EventPublisher, EventStoreQueryBy, EventStoreQueryResponse,
EvmEventConfig, EvmEventConfigChain, HistoricalEvmEventsReceived, HistoricalEvmSyncStart,
HistoricalNetEventsReceived, HistoricalNetSyncStart, SeqAgg, SyncEnded, Unsequenced,
EnclaveEventData, Event, EventContextAccessors, EventPublisher, EventStoreQueryBy,
EventStoreQueryResponse, EvmEventConfig, EvmEventConfigChain, HistoricalEvmEventsReceived,
HistoricalEvmSyncStart, HistoricalNetEventsReceived, HistoricalNetSyncStart, SeqAgg, SyncEnded,
Unsequenced,
};
use e3_utils::actix::channel as actix_toolbox;
use std::{
Expand All @@ -22,6 +23,15 @@ use std::{
use tokio::sync::mpsc::Receiver;
use tracing::{info, warn};

fn is_infrastructure_event(event: &EnclaveEvent) -> bool {
matches!(
event.get_data(),
EnclaveEventData::SyncEnded(_)
| EnclaveEventData::EffectsEnabled(_)
| EnclaveEventData::HistoricalEvmSyncStart(_)
)
}

pub async fn sync(
bus: &BusHandle,
default_config: &EvmEventConfig,
Expand Down Expand Up @@ -55,8 +65,17 @@ pub async fn sync(
info!("{} EventStore events loaded.", events.len());

info!("Replaying events to actors...");
// 4. Replay the EventStore events to all listeners (except effects)
// 4. Replay the EventStore events to all listeners (except effects).
// Skip infrastructure events (SyncEnded, EffectsEnabled, HistoricalEvmSyncStart) because
// they will be re-published by this sync process (steps 5, 8, 10). Replaying them here
// would poison the EventBus bloom-filter deduplication: the replayed event has the same
// EventId (payload hash) as the one we publish later, causing the later event to be
// silently dropped. This is critical for SyncEnded, if the EvmChainGateway never
// receives it, the gateway stays in BufferUntilLive and all live EVM events are lost.
Comment thread
hmzakhalid marked this conversation as resolved.
for event in events {
if is_infrastructure_event(&event) {
continue;
}
bus.event_bus().try_send(event)?;
}
info!("Events replayed.");
Expand Down Expand Up @@ -105,6 +124,8 @@ pub async fn sync(
}
info!("Historical events published.");

// 10. Publish the SyncEnded event
info!("Publishing SyncEnded event...");
bus.publish_without_context(SyncEnded::new())?;
info!("Sync finished.");
// normal live operations
Expand Down Expand Up @@ -258,3 +279,100 @@ impl SnapshotLoaded {
Self { snapshot }
}
}

#[cfg(test)]
mod tests {
use super::is_infrastructure_event;
use e3_ciphernode_builder::EventSystem;
use e3_events::{
EffectsEnabled, EnclaveEvent, EnclaveEventData, Event, EventConstructorWithTimestamp,
EventSource, EvmEventConfig, HistoricalEvmSyncStart, SyncEnded, TakeEvents, TestEvent,
Unsequenced,
};

fn make_sequenced(data: impl Into<EnclaveEventData>, seq: u64) -> EnclaveEvent {
EnclaveEvent::<Unsequenced>::new_with_timestamp(
data.into(),
None,
1000,
None,
EventSource::Local,
)
.into_sequenced(seq)
}

/// `sender` is `Option<Recipient<…>>` — `None` is safe here since we're not dispatching.
fn make_historical_evm_sync_start() -> HistoricalEvmSyncStart {
HistoricalEvmSyncStart {
evm_config: EvmEventConfig::new(),
sender: None,
}
}

#[test]
fn infrastructure_events_are_detected() {
let sync_ended = make_sequenced(SyncEnded::new(), 1);
let effects_enabled = make_sequenced(EffectsEnabled::new(), 2);
let evm_sync_start = make_sequenced(make_historical_evm_sync_start(), 3);
let test_event = make_sequenced(TestEvent::new("hello", 42), 4);

assert!(is_infrastructure_event(&sync_ended));
assert!(is_infrastructure_event(&effects_enabled));
assert!(is_infrastructure_event(&evm_sync_start));
assert!(!is_infrastructure_event(&test_event));
}

/// Regression: infrastructure events replayed from the EventStore must be filtered before
/// they reach the bus. If they aren't, the bloom-filter deduplicates the copy that `sync()`
/// re-publishes later, causing it to be silently dropped.
#[actix::test]
async fn infrastructure_events_are_filtered_during_replay() -> anyhow::Result<()> {
let system = EventSystem::new("test-sync-replay").with_fresh_bus();
let bus = system.handle()?;
let history = bus.history();

let events: Vec<EnclaveEvent> = vec![
make_sequenced(TestEvent::new("before", 1), 1),
make_sequenced(SyncEnded::new(), 2),
make_sequenced(EffectsEnabled::new(), 3),
make_sequenced(make_historical_evm_sync_start(), 4),
make_sequenced(TestEvent::new("after", 2), 5),
];

for event in events {
if is_infrastructure_event(&event) {
continue;
}
bus.event_bus().try_send(event)?;
}

let received = history.send(TakeEvents::new(2)).await?;

let event_types: Vec<&'static str> = received
.iter()
.map(|e| match e.get_data() {
EnclaveEventData::TestEvent(_) => "TestEvent",
EnclaveEventData::SyncEnded(_) => "SyncEnded",
EnclaveEventData::EffectsEnabled(_) => "EffectsEnabled",
EnclaveEventData::HistoricalEvmSyncStart(_) => "HistoricalEvmSyncStart",
_ => "other",
})
.collect();

assert_eq!(event_types, vec!["TestEvent", "TestEvent"]);

let msgs: Vec<String> = received
.iter()
.filter_map(|e| {
if let EnclaveEventData::TestEvent(t) = e.get_data() {
Some(t.msg.clone())
} else {
None
}
})
.collect();
assert_eq!(msgs, vec!["before", "after"]);

Ok(())
}
}
Loading