Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/evm/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,5 @@ e3-entrypoint = { workspace = true }
e3-ciphernode-builder = { workspace = true }
e3-test-helpers = { workspace = true }
e3-events = { workspace = true, features = ["test-helpers"] }
tokio = { workspace = true, features = ["test-util"] }
tracing-subscriber = { workspace = true }
61 changes: 48 additions & 13 deletions crates/evm/src/evm_chain_gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use e3_events::{
};
use e3_events::{Event, EventPublisher};
use e3_utils::MAILBOX_LIMIT;
use tracing::warn;

/// This component sits between the Evm ingestion for a chain and the Sync actor and the Bus.
/// It coordinates event flow between these components.
Expand All @@ -39,12 +40,17 @@ impl ForwardToSyncActorData {
}
}

/// This state machine coordinates the function of the EvmChainGateway
/// State machine coordinating event flow through the EvmChainGateway.
///
/// Init -> ForwardToSyncActor -> BufferUntilLive -> Live
#[derive(Clone, Debug)]
enum SyncStatus {
/// Intial State
Init(Vec<EnclaveEvent<Unsequenced>>), // Include a buffer to hold events that arrive too early
/// After HistoricalEvmSyncStart we forward all events to SyncActor
/// Buffers events until HistoricalEvmSyncStart arrives.
Init {
buffer: Vec<EnclaveEvent<Unsequenced>>,
pending_sync_complete: Option<HistoricalSyncComplete>,
},
/// Forward events to the sync actor for ordering.
ForwardToSyncActor(ForwardToSyncActorData),
/// Once the chain has completed historical sync then we buffer all "live" events until sync is
/// complete
Expand All @@ -55,28 +61,39 @@ enum SyncStatus {

impl Default for SyncStatus {
fn default() -> Self {
Self::Init(Vec::new())
Self::Init {
buffer: Vec::new(),
pending_sync_complete: None,
}
}
}

impl SyncStatus {
pub fn forward_to_sync_actor(
&mut self,
sender: Recipient<HistoricalEvmEventsReceived>,
) -> Result<Vec<EnclaveEvent<Unsequenced>>> {
let Self::Init(buffer) = self else {
) -> Result<(
Vec<EnclaveEvent<Unsequenced>>,
Option<HistoricalSyncComplete>,
)> {
let Self::Init {
buffer,
pending_sync_complete,
} = self
else {
bail!(
"Cannot change state to ForwardToSyncActor when state is {:?}",
self
);
};

let buffer = std::mem::take(buffer);
let pending = pending_sync_complete.take();
*self = SyncStatus::ForwardToSyncActor(ForwardToSyncActorData {
sender: Some(sender),
buffer: Vec::new(),
});
Ok(buffer)
Ok((buffer, pending))
}

pub fn buffer_until_live(&mut self) -> Result<ForwardToSyncActorData> {
Expand Down Expand Up @@ -120,16 +137,20 @@ impl EvmChainGateway {
}

fn handle_sync_start(&mut self, msg: HistoricalEvmSyncStart) -> Result<()> {
// Received a HistoricalEvmSyncStart event from the event bus. Get the sender within that event and forward
// all events to that actor
let sender = msg
.sender
.context("No sender on HistoricalEvmSyncStart Message")?;
let mut buffer = self.status.forward_to_sync_actor(sender)?;
// Drain any events that were buffered early
let (mut buffer, pending_sync_complete) = self.status.forward_to_sync_actor(sender)?;

for evt in buffer.drain(..) {
self.process_evm_event(evt)?;
}

// HistoricalSyncComplete may have arrived before HistoricalEvmSyncStart
Comment thread
hmzakhalid marked this conversation as resolved.
if let Some(event) = pending_sync_complete {
warn!("Processing buffered HistoricalSyncComplete that arrived during Init");
self.forward_historical_sync_complete(event)?;
}
Ok(())
}

Expand Down Expand Up @@ -161,6 +182,20 @@ impl EvmChainGateway {
}

fn forward_historical_sync_complete(&mut self, event: HistoricalSyncComplete) -> Result<()> {
// Buffer if we're still in Init - will be replayed when HistoricalEvmSyncStart arrives
if let SyncStatus::Init {
pending_sync_complete,
..
} = &mut self.status
{
warn!(
chain_id = event.chain_id,
"HistoricalSyncComplete arrived during Init, buffering"
);
*pending_sync_complete = Some(event);
return Ok(());
}

let state = self.status.buffer_until_live()?;
let sender = state
.sender
Expand All @@ -172,7 +207,7 @@ impl EvmChainGateway {

fn process_evm_event(&mut self, msg: EnclaveEvent<Unsequenced>) -> Result<()> {
match &mut self.status {
SyncStatus::Init(buffer) => buffer.push(msg),
SyncStatus::Init { buffer, .. } => buffer.push(msg),
SyncStatus::BufferUntilLive(buffer) => buffer.push(msg),
SyncStatus::ForwardToSyncActor(state) => state.add_event(msg),
SyncStatus::Live => self.publish_evm_event(msg)?,
Expand Down
Loading
Loading