diff --git a/crates/evm/Cargo.toml b/crates/evm/Cargo.toml index f41e5e035f..c8446c4095 100644 --- a/crates/evm/Cargo.toml +++ b/crates/evm/Cargo.toml @@ -37,4 +37,5 @@ e3-entrypoint = { workspace = true } e3-ciphernode-builder = { workspace = true } e3-test-helpers = { workspace = true } e3-events = { workspace = true, features = ["test-helpers"] } +tokio = { workspace = true, features = ["test-util"] } tracing-subscriber = { workspace = true } diff --git a/crates/evm/src/evm_chain_gateway.rs b/crates/evm/src/evm_chain_gateway.rs index c0ba917892..ca43f25d7f 100644 --- a/crates/evm/src/evm_chain_gateway.rs +++ b/crates/evm/src/evm_chain_gateway.rs @@ -19,6 +19,7 @@ use e3_events::{ }; use e3_events::{Event, EventPublisher}; use e3_utils::MAILBOX_LIMIT; +use tracing::warn; /// This component sits between the Evm ingestion for a chain and the Sync actor and the Bus. /// It coordinates event flow between these components. @@ -39,12 +40,17 @@ impl ForwardToSyncActorData { } } -/// This state machine coordinates the function of the EvmChainGateway +/// State machine coordinating event flow through the EvmChainGateway. +/// +/// Init -> ForwardToSyncActor -> BufferUntilLive -> Live #[derive(Clone, Debug)] enum SyncStatus { - /// Intial State - Init(Vec>), // Include a buffer to hold events that arrive too early - /// After HistoricalEvmSyncStart we forward all events to SyncActor + /// Buffers events until HistoricalEvmSyncStart arrives. + Init { + buffer: Vec>, + pending_sync_complete: Option, + }, + /// Forward events to the sync actor for ordering. ForwardToSyncActor(ForwardToSyncActorData), /// Once the chain has completed historical sync then we buffer all "live" events until sync is /// complete @@ -55,7 +61,10 @@ enum SyncStatus { impl Default for SyncStatus { fn default() -> Self { - Self::Init(Vec::new()) + Self::Init { + buffer: Vec::new(), + pending_sync_complete: None, + } } } @@ -63,8 +72,15 @@ impl SyncStatus { pub fn forward_to_sync_actor( &mut self, sender: Recipient, - ) -> Result>> { - let Self::Init(buffer) = self else { + ) -> Result<( + Vec>, + Option, + )> { + let Self::Init { + buffer, + pending_sync_complete, + } = self + else { bail!( "Cannot change state to ForwardToSyncActor when state is {:?}", self @@ -72,11 +88,12 @@ impl SyncStatus { }; let buffer = std::mem::take(buffer); + let pending = pending_sync_complete.take(); *self = SyncStatus::ForwardToSyncActor(ForwardToSyncActorData { sender: Some(sender), buffer: Vec::new(), }); - Ok(buffer) + Ok((buffer, pending)) } pub fn buffer_until_live(&mut self) -> Result { @@ -120,16 +137,20 @@ impl EvmChainGateway { } fn handle_sync_start(&mut self, msg: HistoricalEvmSyncStart) -> Result<()> { - // Received a HistoricalEvmSyncStart event from the event bus. Get the sender within that event and forward - // all events to that actor let sender = msg .sender .context("No sender on HistoricalEvmSyncStart Message")?; - let mut buffer = self.status.forward_to_sync_actor(sender)?; - // Drain any events that were buffered early + let (mut buffer, pending_sync_complete) = self.status.forward_to_sync_actor(sender)?; + for evt in buffer.drain(..) { self.process_evm_event(evt)?; } + + // HistoricalSyncComplete may have arrived before HistoricalEvmSyncStart + if let Some(event) = pending_sync_complete { + warn!("Processing buffered HistoricalSyncComplete that arrived during Init"); + self.forward_historical_sync_complete(event)?; + } Ok(()) } @@ -161,6 +182,20 @@ impl EvmChainGateway { } fn forward_historical_sync_complete(&mut self, event: HistoricalSyncComplete) -> Result<()> { + // Buffer if we're still in Init - will be replayed when HistoricalEvmSyncStart arrives + if let SyncStatus::Init { + pending_sync_complete, + .. + } = &mut self.status + { + warn!( + chain_id = event.chain_id, + "HistoricalSyncComplete arrived during Init, buffering" + ); + *pending_sync_complete = Some(event); + return Ok(()); + } + let state = self.status.buffer_until_live()?; let sender = state .sender @@ -172,7 +207,7 @@ impl EvmChainGateway { fn process_evm_event(&mut self, msg: EnclaveEvent) -> Result<()> { match &mut self.status { - SyncStatus::Init(buffer) => buffer.push(msg), + SyncStatus::Init { buffer, .. } => buffer.push(msg), SyncStatus::BufferUntilLive(buffer) => buffer.push(msg), SyncStatus::ForwardToSyncActor(state) => state.add_event(msg), SyncStatus::Live => self.publish_evm_event(msg)?, diff --git a/crates/evm/src/evm_read_interface.rs b/crates/evm/src/evm_read_interface.rs index 54fbab66f3..ef832fd4c9 100644 --- a/crates/evm/src/evm_read_interface.rs +++ b/crates/evm/src/evm_read_interface.rs @@ -6,6 +6,7 @@ use crate::events::{EnclaveEvmEvent, EvmEventProcessor, EvmLog}; use crate::helpers::EthProvider; +use crate::log_fetcher::{backfill_to_head, fetch_logs_chunked, process_log, TimestampTracker}; use crate::HistoricalSyncComplete; use actix::prelude::*; use actix::{Addr, Recipient}; @@ -24,6 +25,8 @@ use tokio::select; use tokio::sync::oneshot; use tracing::{debug, error, info, instrument, warn}; +const MAX_RECONNECT_DELAY_SECS: u64 = 60; + pub type ExtractorFn = fn(&LogData, Option<&B256>, u64) -> Option; pub struct EvmReadInterfaceParams

{ @@ -43,6 +46,7 @@ pub struct EvmReadInterfaceState { pub struct Filters { historical: Filter, current: Filter, + start_block: u64, } impl Filters { @@ -57,6 +61,7 @@ impl Filters { Self { historical, current, + start_block, } } @@ -122,7 +127,6 @@ impl Actor for EvmReadInterface

{ fn started(&mut self, ctx: &mut Self::Context) { ctx.set_mailbox_capacity(MAILBOX_LIMIT); - // let reader_addr = ctx.address(); let bus = self.bus.clone(); let next = self.next.clone(); let filters = self.filters.clone(); @@ -132,7 +136,6 @@ impl Actor for EvmReadInterface

{ return; }; - // let extractor = self.extractor; let Some(shutdown) = self.shutdown_rx.take() else { bus.err(EType::Evm, anyhow!("shutdown already called")); return; @@ -145,9 +148,6 @@ impl Actor for EvmReadInterface

{ } } -// TODO: split this up into: -// 1. historical request (will finish) -// 2. current listener (run indefinitely) #[instrument(name = "evm_interface", skip_all)] async fn stream_from_evm( provider: EthProvider

, @@ -158,69 +158,151 @@ async fn stream_from_evm( ) { let chain_id = provider.chain_id(); let provider_ref = provider.provider(); - let mut last_id: Option = None; let mut timestamp_tracker = TimestampTracker::new(); - // Historical events - match provider_ref.get_logs(&filters.historical).await { - Ok(historical_logs) => { - info!("Fetched {} historical events", historical_logs.len()); - for log in historical_logs { - let timestamp = timestamp_tracker.get(provider_ref, log.block_number).await; - let evt = EnclaveEvmEvent::Log(EvmLog::new(log, chain_id, timestamp)); - last_id = Some(evt.get_id()); - debug!("Sending event({})", evt.get_id()); - next.do_send(evt) - } + + // Determine chain head for historical fetch + let latest_block = match provider_ref.get_block_number().await { + Ok(bn) => bn, + Err(e) => { + error!(chain_id, error = %e, "Failed to get latest block number"); + bus.err(EType::Evm, anyhow!(e)); + return; + } + }; + + // Historical events — chunked to respect RPC block-range limits + let last_id = match fetch_logs_chunked( + provider_ref, + &filters.historical, + filters.start_block, + latest_block, + chain_id, + &next, + &mut timestamp_tracker, + ) + .await + { + Ok(id) => { + info!(chain_id, "Historical sync succeeded"); + id } Err(e) => { - error!("Failed to fetch historical events: {}", e); + error!(chain_id, error = %e, "Failed to fetch historical events — node cannot operate without full state, exiting"); bus.err(EType::Evm, anyhow!(e)); return; } - } - let historical_sync_event = HistoricalSyncComplete::new(chain_id, last_id); - info!( - "Historical Sync Complete event({})", - historical_sync_event.get_id() - ); + }; + next.do_send(EnclaveEvmEvent::HistoricalSyncComplete( - historical_sync_event, + HistoricalSyncComplete::new(chain_id, last_id), )); - info!("Subscribing to live events"); - match provider_ref.subscribe_logs(&filters.current).await { - Ok(subscription) => { - let id: B256 = subscription.local_id().clone(); - let mut stream = subscription.into_stream(); - - loop { - select! { - maybe_log = stream.next() => { - match maybe_log { - Some(log) => { - let timestamp = timestamp_tracker.get(provider_ref, log.block_number).await; - next.do_send(EnclaveEvmEvent::Log(EvmLog::new(log, chain_id, timestamp))) + // Live subscription with gap-fill on connect/reconnect + subscribe_live_events( + provider_ref, + &next, + &mut shutdown, + bus, + &filters.current, + chain_id, + &mut timestamp_tracker, + latest_block, + ) + .await; +} + +async fn subscribe_live_events( + provider: &P, + next: &EvmEventProcessor, + shutdown: &mut oneshot::Receiver<()>, + bus: &BusHandle, + filter: &Filter, + chain_id: u64, + timestamp_tracker: &mut TimestampTracker, + mut last_block: u64, +) { + let mut reconnect_attempt: u32 = 0; + + loop { + if reconnect_attempt > 0 { + let delay_secs = (2u64.pow(reconnect_attempt.min(6))).min(MAX_RECONNECT_DELAY_SECS); + warn!( + chain_id, + reconnect_attempt, delay_secs, "Reconnecting to live event stream" + ); + tokio::time::sleep(std::time::Duration::from_secs(delay_secs)).await; + } + + if shutdown.try_recv().is_ok() { + info!("Shutdown signal received, stopping EVM stream"); + return; + } + + // Backfill any blocks missed since last_block. This handles: + // - Blocks mined between historical fetch completion and first subscription + // - Blocks mined during reconnection downtime + // - Geth's eth_subscribe silently ignoring fromBlock + match backfill_to_head( + provider, + filter, + chain_id, + next, + timestamp_tracker, + &mut last_block, + ) + .await + { + Ok(_) => {} + Err(e) => { + warn!(chain_id, error = %e, "Gap backfill failed, will retry"); + reconnect_attempt += 1; + continue; + } + } + + match provider.subscribe_logs(filter).await { + Ok(subscription) => { + let sub_id: B256 = subscription.local_id().clone(); + let mut stream = subscription.into_stream(); + reconnect_attempt = 0; + info!(chain_id, "Live event subscription active"); + + loop { + select! { + maybe_log = stream.next() => { + match maybe_log { + Some(log) => { + if let Some(bn) = log.block_number { + last_block = last_block.max(bn); + } + process_log(provider, log, chain_id, next, timestamp_tracker).await; + } + None => { + warn!(chain_id, "Live event stream ended, will reconnect"); + break; + } } - None => break, // Stream ended } - } - _ = &mut shutdown => { - warn!("Received shutdown signal, stopping EVM stream"); - match provider_ref.unsubscribe(id).await { - Ok(_) => info!("Unsubscribed successfully from EVM event stream"), - Err(err) => error!("Cannot unsubscribe from EVM event stream: {}", err), - }; - break; + _ = &mut *shutdown => { + info!("Shutdown signal received, stopping EVM stream"); + match provider.unsubscribe(sub_id).await { + Ok(_) => info!("Unsubscribed successfully from EVM event stream"), + Err(err) => error!(chain_id, error = %err, "Cannot unsubscribe from EVM event stream"), + }; + return; + } } } + + reconnect_attempt += 1; + } + Err(e) => { + error!(chain_id, reconnect_attempt, error = %e, "Failed to subscribe to live events"); + bus.err(EType::Evm, anyhow!("{}", e)); + reconnect_attempt += 1; } - } - Err(e) => { - bus.err(EType::Evm, anyhow!("{}", e)); } } - - info!("Exiting stream loop"); } impl Handler for EvmReadInterface

{ @@ -234,38 +316,3 @@ impl Handler for EvmReadInterface

, // (block_number, timestamp) -} - -impl TimestampTracker { - fn new() -> Self { - Self { current: None } - } - - async fn get(&mut self, provider: &P, block_number: Option) -> u64 { - let Some(bn) = block_number else { - error!("BLOCK NUMBER NOT FOUND ON LOG!"); - return 0; - }; - - if let Some((cached_bn, ts)) = self.current { - if bn == cached_bn { - return ts; - } - } - - let ts = provider - .get_block_by_number(bn.into()) - .await - .ok() - .flatten() - .map(|b| b.header.timestamp) - .unwrap_or(0); - - self.current = Some((bn, ts)); - ts - } -} diff --git a/crates/evm/src/lib.rs b/crates/evm/src/lib.rs index 93045c9804..cb6a4717dc 100644 --- a/crates/evm/src/lib.rs +++ b/crates/evm/src/lib.rs @@ -16,6 +16,7 @@ mod evm_read_interface; mod evm_router; mod fix_historical_order; pub mod helpers; +mod log_fetcher; mod repo; mod sync_start_extractor; diff --git a/crates/evm/src/log_fetcher.rs b/crates/evm/src/log_fetcher.rs new file mode 100644 index 0000000000..8ea2b5ef34 --- /dev/null +++ b/crates/evm/src/log_fetcher.rs @@ -0,0 +1,495 @@ +// SPDX-License-Identifier: LGPL-3.0-only +// +// This file is provided WITHOUT ANY WARRANTY; +// without even the implied warranty of MERCHANTABILITY +// or FITNESS FOR A PARTICULAR PURPOSE. + +use crate::events::{EnclaveEvmEvent, EvmEventProcessor, EvmLog}; +use alloy::providers::Provider; +use alloy::rpc::types::{Filter, Log}; +use anyhow::anyhow; +use async_trait::async_trait; +use e3_events::CorrelationId; +use tracing::{debug, error, info, warn}; + +const GET_LOGS_CHUNK_SIZE: u64 = 10_000; +const GET_LOGS_MAX_RETRIES: u32 = 3; + +/// Trait abstracting provider methods needed for log fetching. +/// Enables unit testing without a real EVM provider. +#[async_trait] +pub(crate) trait LogProvider: Send + Sync { + async fn fetch_logs(&self, filter: &Filter) -> Result, anyhow::Error>; + async fn fetch_block_number(&self) -> Result; + async fn fetch_block_timestamp(&self, block_number: u64) -> Option; +} + +#[async_trait] +impl LogProvider for P { + async fn fetch_logs(&self, filter: &Filter) -> Result, anyhow::Error> { + self.get_logs(filter).await.map_err(|e| anyhow!("{}", e)) + } + async fn fetch_block_number(&self) -> Result { + self.get_block_number().await.map_err(|e| anyhow!("{}", e)) + } + async fn fetch_block_timestamp(&self, block_number: u64) -> Option { + self.get_block_by_number(block_number.into()) + .await + .ok() + .flatten() + .map(|b| b.header.timestamp) + } +} + +pub(crate) async fn process_log( + provider: &L, + log: Log, + chain_id: u64, + next: &EvmEventProcessor, + timestamp_tracker: &mut TimestampTracker, +) -> CorrelationId { + let timestamp = timestamp_tracker.get(provider, log.block_number).await; + let evt = EnclaveEvmEvent::Log(EvmLog::new(log, chain_id, timestamp)); + let id = evt.get_id(); + debug!("Sending event({})", id); + next.do_send(evt); + id +} + +/// Fetch logs in chunks from `from_block` to `to_block` with retry logic per chunk. +/// Returns the CorrelationId of the last processed event, if any. +pub(crate) async fn fetch_logs_chunked( + provider: &L, + filter: &Filter, + from_block: u64, + to_block: u64, + chain_id: u64, + next: &EvmEventProcessor, + timestamp_tracker: &mut TimestampTracker, +) -> Result, anyhow::Error> { + if to_block < from_block { + return Ok(None); + } + + let total_blocks = to_block - from_block + 1; + let total_chunks = (total_blocks + GET_LOGS_CHUNK_SIZE - 1) / GET_LOGS_CHUNK_SIZE; + + info!( + chain_id, + from_block, to_block, total_chunks, "Fetching logs in chunks" + ); + + let mut cursor = from_block; + let mut last_id: Option = None; + let mut chunk_idx = 0u64; + + while cursor <= to_block { + let chunk_end = (cursor + GET_LOGS_CHUNK_SIZE - 1).min(to_block); + chunk_idx += 1; + + let chunk_filter = filter.clone().from_block(cursor).to_block(chunk_end); + + let mut success = false; + for attempt in 1..=GET_LOGS_MAX_RETRIES { + match provider.fetch_logs(&chunk_filter).await { + Ok(logs) => { + info!( + chain_id, + chunk = chunk_idx, + total_chunks, + from = cursor, + to = chunk_end, + events = logs.len(), + "Fetched log chunk" + ); + for log in logs { + last_id = Some( + process_log(provider, log, chain_id, next, timestamp_tracker).await, + ); + } + success = true; + break; + } + Err(e) => { + warn!( + chain_id, chunk = chunk_idx, + from = cursor, to = chunk_end, + attempt, max_retries = GET_LOGS_MAX_RETRIES, + error = %e, "Failed to fetch log chunk, retrying" + ); + if attempt < GET_LOGS_MAX_RETRIES { + tokio::time::sleep(std::time::Duration::from_secs(2u64.pow(attempt))).await; + } + } + } + } + + if !success { + return Err(anyhow!( + "Failed to fetch logs for chain {} blocks {}..={} after {} retries", + chain_id, + cursor, + chunk_end, + GET_LOGS_MAX_RETRIES + )); + } + + cursor = chunk_end + 1; + } + + info!(chain_id, chunks_fetched = chunk_idx, "Log fetch complete"); + Ok(last_id) +} + +/// Fetch any blocks between `last_block` and the chain head to fill gaps. +/// Handles blocks missed during reconnection or due to Geth's eth_subscribe +/// silently ignoring the fromBlock parameter. +pub(crate) async fn backfill_to_head( + provider: &L, + filter: &Filter, + chain_id: u64, + next: &EvmEventProcessor, + timestamp_tracker: &mut TimestampTracker, + last_block: &mut u64, +) -> Result<(), anyhow::Error> { + let current_head = provider + .fetch_block_number() + .await + .map_err(|e| anyhow!("Failed to get block number for gap backfill: {}", e))?; + + let gap_start = *last_block + 1; + if gap_start > current_head { + return Ok(()); + } + + info!( + chain_id, + from = gap_start, + to = current_head, + blocks = current_head - gap_start + 1, + "Backfilling missed blocks" + ); + + let mut cursor = gap_start; + while cursor <= current_head { + let chunk_end = (cursor + GET_LOGS_CHUNK_SIZE - 1).min(current_head); + + fetch_logs_chunked( + provider, + filter, + cursor, + chunk_end, + chain_id, + next, + timestamp_tracker, + ) + .await?; + + *last_block = chunk_end; + cursor = chunk_end + 1; + } + + Ok(()) +} + +/// Cache utility to keep track of timestamps +pub(crate) struct TimestampTracker { + current: Option<(u64, u64)>, // (block_number, timestamp) +} + +impl TimestampTracker { + pub fn new() -> Self { + Self { current: None } + } + + pub async fn get(&mut self, provider: &L, block_number: Option) -> u64 { + let Some(bn) = block_number else { + error!("BLOCK NUMBER NOT FOUND ON LOG!"); + return 0; + }; + + if let Some((cached_bn, ts)) = self.current { + if bn == cached_bn { + return ts; + } + } + + let ts = provider.fetch_block_timestamp(bn).await.unwrap_or(0); + + self.current = Some((bn, ts)); + ts + } +} + +#[cfg(test)] +mod tests { + use super::*; + use actix::prelude::*; + use std::collections::VecDeque; + use std::sync::{Arc, Mutex}; + use tokio::sync::mpsc; + + #[derive(Clone)] + struct MockLogProvider { + inner: Arc>, + } + + struct MockState { + block_number: u64, + log_responses: VecDeque, String>>, + get_logs_calls: u32, + } + + impl MockLogProvider { + fn new(block_number: u64) -> Self { + Self { + inner: Arc::new(Mutex::new(MockState { + block_number, + log_responses: VecDeque::new(), + get_logs_calls: 0, + })), + } + } + + fn push_logs(&self, logs: Vec) { + self.inner.lock().unwrap().log_responses.push_back(Ok(logs)); + } + + fn push_error(&self, msg: &str) { + self.inner + .lock() + .unwrap() + .log_responses + .push_back(Err(msg.to_string())); + } + + fn get_logs_call_count(&self) -> u32 { + self.inner.lock().unwrap().get_logs_calls + } + } + + #[async_trait] + impl LogProvider for MockLogProvider { + async fn fetch_logs(&self, _filter: &Filter) -> Result, anyhow::Error> { + let mut state = self.inner.lock().unwrap(); + state.get_logs_calls += 1; + match state.log_responses.pop_front() { + Some(Ok(logs)) => Ok(logs), + Some(Err(msg)) => Err(anyhow!("{}", msg)), + None => Ok(vec![]), + } + } + + async fn fetch_block_number(&self) -> Result { + Ok(self.inner.lock().unwrap().block_number) + } + + async fn fetch_block_timestamp(&self, _block_number: u64) -> Option { + Some(0) + } + } + + struct TestCollector { + tx: mpsc::UnboundedSender, + } + + impl Actor for TestCollector { + type Context = Context; + } + + impl Handler for TestCollector { + type Result = (); + fn handle(&mut self, msg: EnclaveEvmEvent, _: &mut Self::Context) { + let _ = self.tx.send(msg); + } + } + + fn make_test_log(block_number: u64) -> Log { + Log { + block_number: Some(block_number), + ..Default::default() + } + } + + fn setup_collector() -> (EvmEventProcessor, mpsc::UnboundedReceiver) { + let (tx, rx) = mpsc::unbounded_channel(); + let addr = TestCollector { tx }.start(); + (addr.recipient(), rx) + } + + #[actix::test] + async fn test_fetch_logs_empty_range() { + let mock = MockLogProvider::new(100); + let (next, _rx) = setup_collector(); + let mut ts = TimestampTracker::new(); + let filter = Filter::new(); + + let result = fetch_logs_chunked(&mock, &filter, 200, 100, 1, &next, &mut ts).await; + + assert!(result.is_ok()); + assert!(result.unwrap().is_none()); + assert_eq!(mock.get_logs_call_count(), 0); + } + + #[actix::test] + async fn test_fetch_logs_single_chunk() { + let mock = MockLogProvider::new(5000); + mock.push_logs(vec![ + make_test_log(100), + make_test_log(200), + make_test_log(300), + ]); + let (next, mut rx) = setup_collector(); + let mut ts = TimestampTracker::new(); + let filter = Filter::new(); + + let result = fetch_logs_chunked(&mock, &filter, 0, 5000, 1, &next, &mut ts).await; + + assert!(result.is_ok()); + assert!(result.unwrap().is_some()); + assert_eq!(mock.get_logs_call_count(), 1); + + // Allow actix message delivery + tokio::task::yield_now().await; + let mut count = 0; + while rx.try_recv().is_ok() { + count += 1; + } + assert_eq!(count, 3); + } + + #[actix::test] + async fn test_fetch_logs_multiple_chunks() { + // 25k blocks → 3 chunks: [0..9999], [10000..19999], [20000..24999] + let mock = MockLogProvider::new(25000); + mock.push_logs(vec![make_test_log(5000)]); + mock.push_logs(vec![make_test_log(15000)]); + mock.push_logs(vec![make_test_log(22000)]); + let (next, _rx) = setup_collector(); + let mut ts = TimestampTracker::new(); + let filter = Filter::new(); + + let result = fetch_logs_chunked(&mock, &filter, 0, 24999, 1, &next, &mut ts).await; + + assert!(result.is_ok()); + assert!(result.unwrap().is_some()); + assert_eq!(mock.get_logs_call_count(), 3); + } + + #[actix::test] + async fn test_fetch_logs_retry_then_success() { + tokio::time::pause(); // Skip retry delays + + let mock = MockLogProvider::new(5000); + mock.push_error("temporary RPC error"); + mock.push_logs(vec![make_test_log(100)]); + let (next, _rx) = setup_collector(); + let mut ts = TimestampTracker::new(); + let filter = Filter::new(); + + let result = fetch_logs_chunked(&mock, &filter, 0, 5000, 1, &next, &mut ts).await; + + assert!(result.is_ok()); + assert!(result.unwrap().is_some()); + assert_eq!(mock.get_logs_call_count(), 2); + } + + #[actix::test] + async fn test_fetch_logs_all_retries_exhausted() { + tokio::time::pause(); + + let mock = MockLogProvider::new(5000); + for _ in 0..GET_LOGS_MAX_RETRIES { + mock.push_error("persistent RPC error"); + } + let (next, _rx) = setup_collector(); + let mut ts = TimestampTracker::new(); + let filter = Filter::new(); + + let result = fetch_logs_chunked(&mock, &filter, 0, 5000, 1, &next, &mut ts).await; + + let err = result.expect_err("expected error after all retries exhausted"); + assert!( + err.to_string().contains("Failed to fetch logs"), + "unexpected error: {err}" + ); + assert_eq!(mock.get_logs_call_count(), GET_LOGS_MAX_RETRIES); + } + + #[actix::test] + async fn test_backfill_no_gap() { + let mock = MockLogProvider::new(100); + let (next, _rx) = setup_collector(); + let mut ts = TimestampTracker::new(); + let filter = Filter::new(); + let mut last_block = 100u64; + + let result = backfill_to_head(&mock, &filter, 1, &next, &mut ts, &mut last_block).await; + + assert!(result.is_ok()); + assert_eq!(last_block, 100); + assert_eq!(mock.get_logs_call_count(), 0); + } + + #[actix::test] + async fn test_backfill_with_gap() { + let mock = MockLogProvider::new(200); + mock.push_logs(vec![make_test_log(150), make_test_log(180)]); + let (next, mut rx) = setup_collector(); + let mut ts = TimestampTracker::new(); + let filter = Filter::new(); + let mut last_block = 100u64; + + let result = backfill_to_head(&mock, &filter, 1, &next, &mut ts, &mut last_block).await; + + assert!(result.is_ok()); + assert_eq!(last_block, 200); + assert_eq!(mock.get_logs_call_count(), 1); + + tokio::task::yield_now().await; + let mut count = 0; + while rx.try_recv().is_ok() { + count += 1; + } + assert_eq!(count, 2); + } + + #[actix::test] + async fn test_backfill_partial_failure_preserves_progress() { + tokio::time::pause(); + + // Head at 25000, last_block at 100 + // Gap: blocks 101..=25000 → 3 chunks: + // chunk 1: [101, 10100] + // chunk 2: [10101, 20100] + // chunk 3: [20101, 25000] + let mock = MockLogProvider::new(25000); + // Chunk 1 succeeds + mock.push_logs(vec![make_test_log(500)]); + // Chunk 2 succeeds + mock.push_logs(vec![make_test_log(15000)]); + // Chunk 3: all retries fail + for _ in 0..GET_LOGS_MAX_RETRIES { + mock.push_error("RPC error"); + } + + let (next, _rx) = setup_collector(); + let mut ts = TimestampTracker::new(); + let filter = Filter::new(); + let mut last_block = 100u64; + + let result = backfill_to_head(&mock, &filter, 1, &next, &mut ts, &mut last_block).await; + + // Should fail because chunk 3 exhausted retries + assert!(result.is_err()); + // But last_block must have advanced past the two successful chunks + assert_eq!(last_block, 20100); + + // On retry: gap_start = 20101, head still 25000 → single chunk succeeds + mock.push_logs(vec![make_test_log(22000)]); + + let result = backfill_to_head(&mock, &filter, 1, &next, &mut ts, &mut last_block).await; + assert!(result.is_ok()); + assert_eq!(last_block, 25000); + } +} diff --git a/crates/sortition/src/backends.rs b/crates/sortition/src/backends.rs index 2f28eb4ab0..45565a6cda 100644 --- a/crates/sortition/src/backends.rs +++ b/crates/sortition/src/backends.rs @@ -162,7 +162,21 @@ impl SortitionList for ScoreBackend { return Ok(false); } - let winners = ScoreSortition::new(size).get_committee(e3_id, seed, &nodes)?; + let winners = ScoreSortition::new(size).get_committee(e3_id.clone(), seed, &nodes)?; + + let selected_nodes: Vec = winners + .iter() + .map(|w| format!("{}(ticket:{})", w.address, w.ticket_id)) + .collect(); + info!( + e3_id = %e3_id, + chain_id = chain_id, + committee_size = size, + selected_count = winners.len(), + nodes = ?selected_nodes, + "Sortition completed - selected nodes" + ); + let want: Address = address.parse()?; Ok(winners.iter().any(|w| w.address == want)) } @@ -189,7 +203,21 @@ impl SortitionList for ScoreBackend { return Ok(None); } - let winners = ScoreSortition::new(size).get_committee(e3_id, seed, &nodes)?; + let winners = ScoreSortition::new(size).get_committee(e3_id.clone(), seed, &nodes)?; + + let selected_nodes: Vec = winners + .iter() + .map(|w| format!("{}(ticket:{})", w.address, w.ticket_id)) + .collect(); + info!( + e3_id = %e3_id, + chain_id = chain_id, + committee_size = size, + selected_count = winners.len(), + nodes = ?selected_nodes, + "Sortition completed - selected nodes" + ); + let want: alloy::primitives::Address = address.parse()?; let maybe = winners diff --git a/crates/sortition/src/sortition.rs b/crates/sortition/src/sortition.rs index 56063df067..a690a7f08c 100644 --- a/crates/sortition/src/sortition.rs +++ b/crates/sortition/src/sortition.rs @@ -463,11 +463,29 @@ impl Handler> for Sortition { "Performing Sortition with buffer" ); - self.ciphernode_selector.do_send(WithSortitionTicket::new( - msg, - self.get_node_index(e3_id, seed, total_selection_size, chain_id), - &self.address, - )) + let node_index = self.get_node_index(e3_id.clone(), seed, total_selection_size, chain_id); + + match &node_index { + Some((index, ticket_id)) => { + info!( + e3_id = %e3_id, + node = %self.address, + index = index, + ticket_id = ?ticket_id, + "This node was SELECTED for sortition" + ); + } + None => { + info!( + e3_id = %e3_id, + node = %self.address, + "This node was NOT selected for sortition" + ); + } + } + + self.ciphernode_selector + .do_send(WithSortitionTicket::new(msg, node_index, &self.address)) } } diff --git a/crates/sync/src/sync.rs b/crates/sync/src/sync.rs index a81eed2737..668449bdb9 100644 --- a/crates/sync/src/sync.rs +++ b/crates/sync/src/sync.rs @@ -19,8 +19,8 @@ use std::{ collections::{BTreeMap, HashMap, HashSet}, time::Duration, }; -use tokio::{sync::mpsc::Receiver, time::timeout}; -use tracing::info; +use tokio::sync::mpsc::Receiver; +use tracing::{info, warn}; pub async fn sync( bus: &BusHandle, @@ -68,8 +68,7 @@ pub async fn sync( info!("Loading historical blockchain events..."); let (addr, rx) = actix_toolbox::mpsc::(256); bus.publish_without_context(HistoricalEvmSyncStart::new(addr, evm_config.clone()))?; - let historical_evm_events = - collect_historical_evm_events(rx, &evm_config, Duration::from_secs(30)).await; + let historical_evm_events = collect_historical_evm_events(rx, &evm_config).await; info!( "{} historical blockchain events loaded.", historical_evm_events.len() @@ -116,33 +115,42 @@ pub async fn sync( pub async fn collect_historical_evm_events( mut receiver: Receiver, config: &EvmEventConfig, - max_dur: Duration, ) -> Vec> { // Get expected chain IDs from config let expected = config.chains(); let mut received = HashSet::new(); let mut results = Vec::new(); + let progress_interval = Duration::from_secs(30); - let fut = async { - while received.len() < expected.len() { - if let Some(mut msg) = receiver.recv().await { - // Only accept messages for expected chains we haven't received yet + while received.len() < expected.len() { + match tokio::time::timeout(progress_interval, receiver.recv()).await { + Ok(Some(mut msg)) => { if expected.contains(&msg.chain_id) && !received.contains(&msg.chain_id) { + info!( + chain_id = msg.chain_id, + events = msg.events.len(), + chains_received = received.len() + 1, + chains_expected = expected.len(), + "Received historical events from chain" + ); received.insert(msg.chain_id); results.append(&mut msg.events); } - } else { + } + Ok(None) => { + // Channel closed — sender dropped + warn!("Historical events channel closed before all chains reported"); break; } - } - }; - - if let Err(_) = timeout(max_dur, fut).await { - for chain_id in expected.difference(&received) { - eprintln!( - "Error: Timeout waiting for historical events from chain {}", - chain_id - ); + Err(_) => { + // Not a failure — just a progress heartbeat + let remaining: Vec<_> = expected.difference(&received).collect(); + info!( + ?remaining, + "Still waiting for historical events from chains" + ); + continue; + } } }