From 000b2e21862ea0b3a2489cdec30606092edd7a5c Mon Sep 17 00:00:00 2001 From: Hamza Khalid Date: Mon, 2 Mar 2026 16:19:29 +0500 Subject: [PATCH 1/4] fix: replace two-layer EVM reconnect loop with single provider lifecycle --- .../src/ciphernode_builder.rs | 9 +- crates/ciphernode-builder/src/evm_system.rs | 18 +- crates/evm/src/evm_parser.rs | 5 +- crates/evm/src/evm_read_interface.rs | 339 +++++++++++++----- crates/evm/src/helpers.rs | 13 +- 5 files changed, 281 insertions(+), 103 deletions(-) diff --git a/crates/ciphernode-builder/src/ciphernode_builder.rs b/crates/ciphernode-builder/src/ciphernode_builder.rs index b7e936fba0..2eeb63b9af 100644 --- a/crates/ciphernode-builder/src/ciphernode_builder.rs +++ b/crates/ciphernode-builder/src/ciphernode_builder.rs @@ -18,7 +18,7 @@ use e3_events::{ AggregateConfig, AggregateId, BusHandle, EnclaveEvent, EventBus, EventBusConfig, EvmEventConfig, }; use e3_evm::{BondingRegistrySolReader, CiphernodeRegistrySolReader, EnclaveSolWriter}; -use e3_evm::{CiphernodeRegistrySol, EnclaveSolReader}; +use e3_evm::{CiphernodeRegistrySol, EnclaveSolReader, ProviderConfig}; use e3_fhe::ext::FheExtension; use e3_fhe_params::BfvPreset; use e3_keyshare::ext::ThresholdKeyshareExtension; @@ -613,7 +613,14 @@ async fn setup_evm_system( let provider = provider_cache.ensure_read_provider(chain).await?; let chain_id = provider.chain_id(); evm_config.insert(chain_id, chain.try_into()?); + + // Create a provider factory so the EVM reader can reconnect on transport failures + let rpc_url = chain.rpc_url()?; + let provider_factory = + ProviderConfig::new(rpc_url, chain.rpc_auth.clone()).into_read_provider_factory(); + let mut system = EvmSystemChainBuilder::new(&bus, &provider); + system.with_provider_factory(provider_factory); if contract_components.enclave { let write_provider = provider_cache.ensure_write_provider(chain).await?; diff --git a/crates/ciphernode-builder/src/evm_system.rs b/crates/ciphernode-builder/src/evm_system.rs index 0609b7b15f..fa68574a21 100644 --- a/crates/ciphernode-builder/src/evm_system.rs +++ b/crates/ciphernode-builder/src/evm_system.rs @@ -11,7 +11,7 @@ use alloy::{primitives::Address, providers::Provider}; use e3_events::{run_once, BusHandle, EventSubscriber, EventType, HistoricalEvmSyncStart}; use e3_evm::{ EthProvider, EvmChainGateway, EvmEventProcessor, EvmReadInterface, EvmRouter, Filters, - FixHistoricalOrder, + FixHistoricalOrder, ProviderFactory, }; pub trait RouteFn: FnOnce(EvmEventProcessor) -> EvmEventProcessor + Send {} @@ -22,6 +22,7 @@ type RouteFactory = Box; // Build the event system for a single chain pub struct EvmSystemChainBuilder

{ provider: EthProvider

, + provider_factory: Option>, bus: BusHandle, chain_id: u64, route_factories: Vec<(Address, RouteFactory)>, @@ -33,11 +34,17 @@ impl EvmSystemChainBuilder

{ Self { bus: bus.clone(), provider: provider.clone(), + provider_factory: None, chain_id, route_factories: Vec::new(), } } + pub fn with_provider_factory(&mut self, factory: ProviderFactory

) -> &mut Self { + self.provider_factory = Some(factory); + self + } + pub fn with_contract( &mut self, address: Address, @@ -61,6 +68,7 @@ impl EvmSystemChainBuilder

{ // Clone self refs for closure let bus = self.bus.clone(); let provider = self.provider.clone(); + let provider_factory = self.provider_factory.clone(); let chain_id = self.chain_id; // Only gets consumed once so fine to use replace to clean out route_factories @@ -78,7 +86,13 @@ impl EvmSystemChainBuilder

{ let filters = filters_from_router(&router, deploy_block); // Setup and start the read interface and the router - EvmReadInterface::setup(&provider, router.start(), &bus, filters); + EvmReadInterface::setup_with_factory( + &provider, + provider_factory, + router.start(), + &bus, + filters, + ); Ok(()) } }); diff --git a/crates/evm/src/evm_parser.rs b/crates/evm/src/evm_parser.rs index 473e67629c..f529a8a627 100644 --- a/crates/evm/src/evm_parser.rs +++ b/crates/evm/src/evm_parser.rs @@ -5,15 +5,18 @@ // or FITNESS FOR A PARTICULAR PURPOSE. use actix::{Actor, Handler}; +use alloy::primitives::{LogData, B256}; use e3_events::{hlc::HlcTimestamp, EnclaveEventData}; use e3_utils::MAILBOX_LIMIT; use tracing::debug; use crate::{ events::{EnclaveEvmEvent, EvmEventProcessor, EvmLog}, - EvmEvent, ExtractorFn, + EvmEvent, }; +pub type ExtractorFn = fn(&LogData, Option<&B256>, u64) -> Option; + pub struct EvmParser { next: EvmEventProcessor, extractor: ExtractorFn, diff --git a/crates/evm/src/evm_read_interface.rs b/crates/evm/src/evm_read_interface.rs index 44093726b7..6ba29bfe69 100644 --- a/crates/evm/src/evm_read_interface.rs +++ b/crates/evm/src/evm_read_interface.rs @@ -5,36 +5,34 @@ // or FITNESS FOR A PARTICULAR PURPOSE. use crate::events::{EnclaveEvmEvent, EvmEventProcessor}; -use crate::helpers::EthProvider; +use crate::helpers::{EthProvider, ProviderFactory}; use crate::log_fetcher::{backfill_to_head, fetch_logs_chunked, process_log, TimestampTracker}; use crate::HistoricalSyncComplete; use actix::prelude::*; -use actix::{Addr, Recipient}; use alloy::eips::BlockNumberOrTag; -use alloy::primitives::{LogData, B256}; +use alloy::primitives::B256; use alloy::providers::Provider; use alloy::rpc::types::Filter; use alloy_primitives::Address; use anyhow::anyhow; -use e3_events::{BusHandle, ErrorDispatcher, Event, EventSubscriber, EventType}; -use e3_events::{EType, EnclaveEvent, EnclaveEventData, EventId}; -use e3_utils::MAILBOX_LIMIT; +use e3_events::{ + BusHandle, EType, EnclaveEvent, EnclaveEventData, ErrorDispatcher, Event, EventId, +}; +use e3_events::{EventSubscriber, EventType}; +use e3_utils::{retry_with_backoff, RetryError, MAILBOX_LIMIT}; use futures_util::stream::StreamExt; use std::collections::{HashMap, HashSet}; +use std::time::Duration; use tokio::select; use tokio::sync::oneshot; use tracing::{error, info, instrument, warn}; const MAX_RECONNECT_DELAY_SECS: u64 = 60; - -pub type ExtractorFn = fn(&LogData, Option<&B256>, u64) -> Option; - -pub struct EvmReadInterfaceParams

{ - provider: EthProvider

, - next: Recipient, - bus: BusHandle, - filters: Filters, -} +/// Maximum attempts to recreate a provider via the factory before adding an +/// extra outer delay. Uses exponential backoff between each attempt. +const PROVIDER_RECREATE_MAX_ATTEMPTS: u32 = 3; +/// Initial delay (ms) between provider-recreation attempts. +const PROVIDER_RECREATE_INITIAL_DELAY_MS: u64 = 2000; #[derive(Default, serde::Serialize, serde::Deserialize, Clone)] pub struct EvmReadInterfaceState { @@ -75,6 +73,8 @@ impl Filters { pub struct EvmReadInterface

{ /// The alloy provider provider: Option>, + /// Optional factory to recreate the provider when the transport dies + provider_factory: Option>, /// A shutdown receiver to listen to for shutdown signals sent to the loop this is only used /// internally. You should send the Shutdown signal to the reader directly or via the EventBus shutdown_rx: Option>, @@ -89,33 +89,34 @@ pub struct EvmReadInterface

{ } impl EvmReadInterface

{ - pub fn new(params: EvmReadInterfaceParams

) -> Self { - let (shutdown_tx, shutdown_rx) = oneshot::channel(); - Self { - provider: Some(params.provider), - shutdown_rx: Some(shutdown_rx), - shutdown_tx: Some(shutdown_tx), - next: params.next, - bus: params.bus, - filters: params.filters, - } + pub fn setup( + provider: &EthProvider

, + next: impl Into, + bus: &BusHandle, + filters: Filters, + ) -> Addr { + Self::setup_with_factory(provider, None, next, bus, filters) } - pub fn setup( + pub fn setup_with_factory( provider: &EthProvider

, + provider_factory: Option>, next: impl Into, bus: &BusHandle, filters: Filters, ) -> Addr { - let params = EvmReadInterfaceParams { - provider: provider.clone(), + let (shutdown_tx, shutdown_rx) = oneshot::channel(); + let reader = Self { + provider: Some(provider.clone()), + provider_factory, + shutdown_rx: Some(shutdown_rx), + shutdown_tx: Some(shutdown_tx), next: next.into(), bus: bus.clone(), filters, }; - let addr = EvmReadInterface::new(params).start(); - + let addr = reader.start(); bus.subscribe(EventType::Shutdown, addr.clone().into()); addr } @@ -130,6 +131,7 @@ impl Actor for EvmReadInterface

{ let bus = self.bus.clone(); let next = self.next.clone(); let filters = self.filters.clone(); + let provider_factory = self.provider_factory.take(); let Some(provider) = self.provider.take() else { error!("Could not start event reader as provider has already been used."); @@ -142,26 +144,160 @@ impl Actor for EvmReadInterface

{ }; ctx.spawn( - async move { stream_from_evm(provider, next, shutdown, &bus, filters).await } - .into_actor(self), + async move { + stream_from_evm(provider, provider_factory, next, shutdown, &bus, filters).await + } + .into_actor(self), + ); + } +} + +struct Backoff { + delay_secs: u64, + max_delay_secs: u64, +} + +impl Backoff { + fn new(max_delay_secs: u64) -> Self { + Self { + delay_secs: 1, + max_delay_secs, + } + } + + fn reset(&mut self) { + self.delay_secs = 1; + } + + fn next_delay(&mut self) -> Duration { + let delay = Duration::from_secs(self.delay_secs); + self.delay_secs = (self.delay_secs * 2).min(self.max_delay_secs); + delay + } +} + +fn is_transport_dead(e: &anyhow::Error) -> bool { + let msg = e.to_string(); + msg.contains("backend connection task has stopped") + || msg.contains("connection closed") + || msg.contains("broken pipe") + || msg.contains("connection reset") + || msg.contains("WebSocket connection closed") + || msg.contains("transport error") +} + +async fn sleep_or_shutdown(duration: Duration, shutdown: &mut oneshot::Receiver<()>) -> bool { + select! { + _ = tokio::time::sleep(duration) => false, + _ = &mut *shutdown => { + info!("Shutdown signal received during backoff"); + true + } + } +} + +async fn recreate_provider( + factory: &ProviderFactory

, + shutdown: &mut oneshot::Receiver<()>, + chain_id: u64, + backoff: &mut Backoff, +) -> Option> { + loop { + // Check for shutdown before each attempt cycle + if shutdown.try_recv().is_ok() { + return None; + } + + let delay = backoff.next_delay(); + warn!( + chain_id, + delay_secs = delay.as_secs(), + "Waiting before provider recreation attempt" ); + if sleep_or_shutdown(delay, shutdown).await { + return None; + } + + // Use retry_with_backoff for bounded attempts to create + health-check + let factory_clone = factory.clone(); + let result = retry_with_backoff( + || { + let f = factory_clone.clone(); + async move { + let provider = f().await.map_err(|e| { + warn!(chain_id, error = %e, "Factory failed to create provider"); + RetryError::Retry(e) + })?; + + // Health check: verify the new transport is actually alive + provider.provider().get_block_number().await.map_err(|e| { + warn!(chain_id, error = %e, "New provider failed health check"); + RetryError::Retry(anyhow!("Health check failed: {}", e)) + })?; + + Ok(provider) + } + }, + PROVIDER_RECREATE_MAX_ATTEMPTS, + PROVIDER_RECREATE_INITIAL_DELAY_MS, + ) + .await; + + match result { + Ok(new_provider) => { + info!(chain_id, "Provider recreated and verified"); + backoff.reset(); + return Some(new_provider); + } + Err(e) => { + error!( + chain_id, + error = %e, + "All provider recreation attempts failed, will retry with longer backoff" + ); + continue; + } + } } } +async fn get_new_provider_or_exit( + factory: &Option>, + shutdown: &mut oneshot::Receiver<()>, + chain_id: u64, + backoff: &mut Backoff, + bus: &BusHandle, +) -> Option> { + let Some(factory) = factory else { + error!( + chain_id, + "Transport died and no provider factory configured" + ); + bus.err( + EType::Evm, + anyhow!("Transport died and no provider factory configured"), + ); + return None; + }; + recreate_provider(factory, shutdown, chain_id, backoff).await +} + #[instrument(name = "evm_interface", skip_all)] async fn stream_from_evm( provider: EthProvider

, + provider_factory: Option>, next: EvmEventProcessor, mut shutdown: oneshot::Receiver<()>, bus: &BusHandle, filters: Filters, ) { let chain_id = provider.chain_id(); - let provider_ref = provider.provider(); let mut timestamp_tracker = TimestampTracker::new(); + let mut backoff = Backoff::new(MAX_RECONNECT_DELAY_SECS); + + // ── Phase 1: Historical sync (must succeed, fatal on failure) ── - // Determine chain head for historical fetch - let latest_block = match provider_ref.get_block_number().await { + let latest_block = match provider.provider().get_block_number().await { Ok(bn) => bn, Err(e) => { error!(chain_id, error = %e, "Failed to get latest block number"); @@ -170,9 +306,8 @@ async fn stream_from_evm( } }; - // Historical events — chunked to respect RPC block-range limits let last_id = match fetch_logs_chunked( - provider_ref, + provider.provider(), &filters.historical, filters.start_block, latest_block, @@ -197,74 +332,65 @@ async fn stream_from_evm( HistoricalSyncComplete::new(chain_id, last_id), )); - // Live subscription with gap-fill on connect/reconnect - subscribe_live_events( - provider_ref, - &next, - &mut shutdown, - bus, - &filters.current, - chain_id, - &mut timestamp_tracker, - latest_block, - ) - .await; -} + // ── Phase 2: Live event loop with provider lifecycle management ── + // + // Single flat loop: backfill → subscribe → consume stream → repeat. + // On transport death, immediately recreate the provider. + // On transient errors, retry with exponential backoff. -async fn subscribe_live_events( - provider: &P, - next: &EvmEventProcessor, - shutdown: &mut oneshot::Receiver<()>, - bus: &BusHandle, - filter: &Filter, - chain_id: u64, - timestamp_tracker: &mut TimestampTracker, - mut last_block: u64, -) { - let mut reconnect_attempt: u32 = 0; + let mut last_block = latest_block; + let mut current_provider = provider; loop { - if reconnect_attempt > 0 { - let delay_secs = (2u64.pow(reconnect_attempt.min(6))).min(MAX_RECONNECT_DELAY_SECS); - warn!( - chain_id, - reconnect_attempt, delay_secs, "Reconnecting to live event stream" - ); - tokio::time::sleep(std::time::Duration::from_secs(delay_secs)).await; - } - - if shutdown.try_recv().is_ok() { - info!("Shutdown signal received, stopping EVM stream"); - return; - } - - // Backfill any blocks missed since last_block. This handles: - // - Blocks mined between historical fetch completion and first subscription - // - Blocks mined during reconnection downtime - // - Geth's eth_subscribe silently ignoring fromBlock + // Step 1: Backfill any blocks missed since last_block match backfill_to_head( - provider, - filter, + current_provider.provider(), + &filters.current, chain_id, - next, - timestamp_tracker, + &next, + &mut timestamp_tracker, &mut last_block, ) .await { - Ok(_) => {} + Ok(_) => backoff.reset(), + Err(e) if is_transport_dead(&e) => { + warn!(chain_id, error = %e, "Transport dead during backfill"); + let Some(p) = get_new_provider_or_exit( + &provider_factory, + &mut shutdown, + chain_id, + &mut backoff, + bus, + ) + .await + else { + return; + }; + current_provider = p; + continue; + } Err(e) => { - warn!(chain_id, error = %e, "Gap backfill failed, will retry"); - reconnect_attempt += 1; + warn!(chain_id, error = %e, "Transient backfill failure"); + if sleep_or_shutdown(backoff.next_delay(), &mut shutdown).await { + return; + } continue; } } - match provider.subscribe_logs(filter).await { + // Step 2: Subscribe to live events + let sub_result = current_provider + .provider() + .subscribe_logs(&filters.current) + .await + .map_err(|e| anyhow!("{}", e)); + + match sub_result { Ok(subscription) => { + backoff.reset(); let sub_id: B256 = subscription.local_id().clone(); let mut stream = subscription.into_stream(); - reconnect_attempt = 0; info!(chain_id, "Live event subscription active"); loop { @@ -275,31 +401,48 @@ async fn subscribe_live_events( if let Some(bn) = log.block_number { last_block = last_block.max(bn); } - process_log(provider, log, chain_id, next, timestamp_tracker).await; + process_log( + current_provider.provider(), + log, chain_id, &next, &mut timestamp_tracker, + ).await; } None => { + // Stream ended (server-side close, idle timeout, etc.) + // Loop back to backfill + resubscribe with no penalty. warn!(chain_id, "Live event stream ended, will reconnect"); break; } } } - _ = &mut *shutdown => { + _ = &mut shutdown => { info!("Shutdown signal received, stopping EVM stream"); - match provider.unsubscribe(sub_id).await { - Ok(_) => info!("Unsubscribed successfully from EVM event stream"), - Err(err) => error!(chain_id, error = %err, "Cannot unsubscribe from EVM event stream"), - }; + let _ = current_provider.provider().unsubscribe(sub_id).await; return; } } } - - reconnect_attempt += 1; + } + Err(e) if is_transport_dead(&e) => { + warn!(chain_id, error = %e, "Transport dead during subscribe"); + let Some(p) = get_new_provider_or_exit( + &provider_factory, + &mut shutdown, + chain_id, + &mut backoff, + bus, + ) + .await + else { + return; + }; + current_provider = p; } Err(e) => { - error!(chain_id, reconnect_attempt, error = %e, "Failed to subscribe to live events"); + error!(chain_id, error = %e, "Failed to subscribe to live events"); bus.err(EType::Evm, anyhow!("{}", e)); - reconnect_attempt += 1; + if sleep_or_shutdown(backoff.next_delay(), &mut shutdown).await { + return; + } } } } diff --git a/crates/evm/src/helpers.rs b/crates/evm/src/helpers.rs index 5233b0d4ad..2ef2567650 100644 --- a/crates/evm/src/helpers.rs +++ b/crates/evm/src/helpers.rs @@ -33,7 +33,7 @@ use e3_config::{RpcAuth, RPC}; use e3_crypto::Cipher; use e3_data::Repository; use e3_utils::{retry_with_backoff, RetryError}; -use std::{env, future::Future, sync::Arc}; +use std::{env, future::Future, pin::Pin, sync::Arc}; use tracing::info; use zeroize::{Zeroize, Zeroizing}; @@ -87,6 +87,10 @@ impl EthProvider

{ } } +pub type ProviderFactory

= + Arc Pin>> + Send>> + Send + Sync>; + +#[derive(Clone)] pub struct ProviderConfig { rpc: RPC, auth: RpcAuth, @@ -169,6 +173,13 @@ impl ProviderConfig { Ok(ws_connect) } + pub fn into_read_provider_factory(self) -> ProviderFactory { + Arc::new(move || { + let config = self.clone(); + Box::pin(async move { config.create_readonly_provider().await }) + }) + } + fn create_http_client(&self) -> Result { let mut headers = HeaderMap::new(); if let Some(auth_header) = self.auth.to_header_value() { From 06b27864391b1ae0b8b675eeaaa10296182456a7 Mon Sep 17 00:00:00 2001 From: Hamza Khalid Date: Mon, 2 Mar 2026 16:31:51 +0500 Subject: [PATCH 2/4] fix: review comments --- .../src/ciphernode_builder.rs | 1 - crates/evm/src/evm_read_interface.rs | 18 +++++++++++++++--- 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/crates/ciphernode-builder/src/ciphernode_builder.rs b/crates/ciphernode-builder/src/ciphernode_builder.rs index 2eeb63b9af..1e21494717 100644 --- a/crates/ciphernode-builder/src/ciphernode_builder.rs +++ b/crates/ciphernode-builder/src/ciphernode_builder.rs @@ -614,7 +614,6 @@ async fn setup_evm_system( let chain_id = provider.chain_id(); evm_config.insert(chain_id, chain.try_into()?); - // Create a provider factory so the EVM reader can reconnect on transport failures let rpc_url = chain.rpc_url()?; let provider_factory = ProviderConfig::new(rpc_url, chain.rpc_auth.clone()).into_read_provider_factory(); diff --git a/crates/evm/src/evm_read_interface.rs b/crates/evm/src/evm_read_interface.rs index 6ba29bfe69..041bc9eb0c 100644 --- a/crates/evm/src/evm_read_interface.rs +++ b/crates/evm/src/evm_read_interface.rs @@ -29,7 +29,7 @@ use tracing::{error, info, instrument, warn}; const MAX_RECONNECT_DELAY_SECS: u64 = 60; /// Maximum attempts to recreate a provider via the factory before adding an -/// extra outer delay. Uses exponential backoff between each attempt. +/// extra outer delay. const PROVIDER_RECREATE_MAX_ATTEMPTS: u32 = 3; /// Initial delay (ms) between provider-recreation attempts. const PROVIDER_RECREATE_INITIAL_DELAY_MS: u64 = 2000; @@ -203,7 +203,6 @@ async fn recreate_provider( backoff: &mut Backoff, ) -> Option> { loop { - // Check for shutdown before each attempt cycle if shutdown.try_recv().is_ok() { return None; } @@ -218,7 +217,6 @@ async fn recreate_provider( return None; } - // Use retry_with_backoff for bounded attempts to create + health-check let factory_clone = factory.clone(); let result = retry_with_backoff( || { @@ -235,6 +233,20 @@ async fn recreate_provider( RetryError::Retry(anyhow!("Health check failed: {}", e)) })?; + let new_chain_id = provider.chain_id(); + if new_chain_id != chain_id { + let err = anyhow!( + "Chain ID mismatch: expected {}, got {}", + chain_id, + new_chain_id + ); + error!( + chain_id, + new_chain_id, "Recreated provider is on wrong chain" + ); + return Err(RetryError::Failure(err)); + } + Ok(provider) } }, From 6d5e5082c8b8fa29f5ed3a41fd03fdbde4640f44 Mon Sep 17 00:00:00 2001 From: Hamza Khalid Date: Mon, 2 Mar 2026 16:52:50 +0500 Subject: [PATCH 3/4] fix: review comments --- crates/evm/src/evm_read_interface.rs | 123 ++++++++++++++------------- 1 file changed, 62 insertions(+), 61 deletions(-) diff --git a/crates/evm/src/evm_read_interface.rs b/crates/evm/src/evm_read_interface.rs index 041bc9eb0c..709eeea7f5 100644 --- a/crates/evm/src/evm_read_interface.rs +++ b/crates/evm/src/evm_read_interface.rs @@ -33,6 +33,8 @@ const MAX_RECONNECT_DELAY_SECS: u64 = 60; const PROVIDER_RECREATE_MAX_ATTEMPTS: u32 = 3; /// Initial delay (ms) between provider-recreation attempts. const PROVIDER_RECREATE_INITIAL_DELAY_MS: u64 = 2000; +/// Consecutive failures before we assume the provider is dead and recreate it. +const MAX_RETRIES_BEFORE_RECREATE: u32 = 3; #[derive(Default, serde::Serialize, serde::Deserialize, Clone)] pub struct EvmReadInterfaceState { @@ -176,16 +178,6 @@ impl Backoff { } } -fn is_transport_dead(e: &anyhow::Error) -> bool { - let msg = e.to_string(); - msg.contains("backend connection task has stopped") - || msg.contains("connection closed") - || msg.contains("broken pipe") - || msg.contains("connection reset") - || msg.contains("WebSocket connection closed") - || msg.contains("transport error") -} - async fn sleep_or_shutdown(duration: Duration, shutdown: &mut oneshot::Receiver<()>) -> bool { select! { _ = tokio::time::sleep(duration) => false, @@ -233,20 +225,6 @@ async fn recreate_provider( RetryError::Retry(anyhow!("Health check failed: {}", e)) })?; - let new_chain_id = provider.chain_id(); - if new_chain_id != chain_id { - let err = anyhow!( - "Chain ID mismatch: expected {}, got {}", - chain_id, - new_chain_id - ); - error!( - chain_id, - new_chain_id, "Recreated provider is on wrong chain" - ); - return Err(RetryError::Failure(err)); - } - Ok(provider) } }, @@ -257,6 +235,14 @@ async fn recreate_provider( match result { Ok(new_provider) => { + let new_chain_id = new_provider.chain_id(); + if new_chain_id != chain_id { + error!( + chain_id, + new_chain_id, "Recreated provider is on wrong chain — fatal" + ); + return None; + } info!(chain_id, "Provider recreated and verified"); backoff.reset(); return Some(new_provider); @@ -291,7 +277,14 @@ async fn get_new_provider_or_exit( ); return None; }; - recreate_provider(factory, shutdown, chain_id, backoff).await + let result = recreate_provider(factory, shutdown, chain_id, backoff).await; + if result.is_none() { + bus.err( + EType::Evm, + anyhow!("Provider recreation failed for chain {}", chain_id), + ); + } + result } #[instrument(name = "evm_interface", skip_all)] @@ -352,6 +345,7 @@ async fn stream_from_evm( let mut last_block = latest_block; let mut current_provider = provider; + let mut consecutive_failures: u32 = 0; loop { // Step 1: Backfill any blocks missed since last_block @@ -365,25 +359,29 @@ async fn stream_from_evm( ) .await { - Ok(_) => backoff.reset(), - Err(e) if is_transport_dead(&e) => { - warn!(chain_id, error = %e, "Transport dead during backfill"); - let Some(p) = get_new_provider_or_exit( - &provider_factory, - &mut shutdown, - chain_id, - &mut backoff, - bus, - ) - .await - else { - return; - }; - current_provider = p; - continue; + Ok(_) => { + backoff.reset(); + consecutive_failures = 0; } Err(e) => { - warn!(chain_id, error = %e, "Transient backfill failure"); + consecutive_failures += 1; + warn!(chain_id, error = %e, consecutive_failures, "Backfill failed"); + if consecutive_failures >= MAX_RETRIES_BEFORE_RECREATE { + let Some(p) = get_new_provider_or_exit( + &provider_factory, + &mut shutdown, + chain_id, + &mut backoff, + bus, + ) + .await + else { + return; + }; + current_provider = p; + consecutive_failures = 0; + continue; + } if sleep_or_shutdown(backoff.next_delay(), &mut shutdown).await { return; } @@ -401,6 +399,7 @@ async fn stream_from_evm( match sub_result { Ok(subscription) => { backoff.reset(); + consecutive_failures = 0; let sub_id: B256 = subscription.local_id().clone(); let mut stream = subscription.into_stream(); info!(chain_id, "Live event subscription active"); @@ -434,26 +433,28 @@ async fn stream_from_evm( } } } - Err(e) if is_transport_dead(&e) => { - warn!(chain_id, error = %e, "Transport dead during subscribe"); - let Some(p) = get_new_provider_or_exit( - &provider_factory, - &mut shutdown, - chain_id, - &mut backoff, - bus, - ) - .await - else { - return; - }; - current_provider = p; - } Err(e) => { - error!(chain_id, error = %e, "Failed to subscribe to live events"); - bus.err(EType::Evm, anyhow!("{}", e)); - if sleep_or_shutdown(backoff.next_delay(), &mut shutdown).await { - return; + consecutive_failures += 1; + error!(chain_id, error = %e, consecutive_failures, "Failed to subscribe to live events"); + if consecutive_failures >= MAX_RETRIES_BEFORE_RECREATE { + let Some(p) = get_new_provider_or_exit( + &provider_factory, + &mut shutdown, + chain_id, + &mut backoff, + bus, + ) + .await + else { + return; + }; + current_provider = p; + consecutive_failures = 0; + } else { + bus.err(EType::Evm, anyhow!("{}", e)); + if sleep_or_shutdown(backoff.next_delay(), &mut shutdown).await { + return; + } } } } From d36aa1117bca819d3822d8285017035394553d99 Mon Sep 17 00:00:00 2001 From: Hamza Khalid Date: Mon, 2 Mar 2026 17:03:17 +0500 Subject: [PATCH 4/4] fix: review comments --- crates/evm/src/evm_read_interface.rs | 26 +++++++++++++++++++++++--- 1 file changed, 23 insertions(+), 3 deletions(-) diff --git a/crates/evm/src/evm_read_interface.rs b/crates/evm/src/evm_read_interface.rs index 709eeea7f5..cc5b3d8e9c 100644 --- a/crates/evm/src/evm_read_interface.rs +++ b/crates/evm/src/evm_read_interface.rs @@ -278,7 +278,7 @@ async fn get_new_provider_or_exit( return None; }; let result = recreate_provider(factory, shutdown, chain_id, backoff).await; - if result.is_none() { + if result.is_none() && shutdown.try_recv().is_err() { bus.err( EType::Evm, anyhow!("Provider recreation failed for chain {}", chain_id), @@ -419,8 +419,8 @@ async fn stream_from_evm( } None => { // Stream ended (server-side close, idle timeout, etc.) - // Loop back to backfill + resubscribe with no penalty. - warn!(chain_id, "Live event stream ended, will reconnect"); + consecutive_failures += 1; + warn!(chain_id, consecutive_failures, "Live event stream ended, will reconnect"); break; } } @@ -432,6 +432,26 @@ async fn stream_from_evm( } } } + + if consecutive_failures >= MAX_RETRIES_BEFORE_RECREATE { + let Some(p) = get_new_provider_or_exit( + &provider_factory, + &mut shutdown, + chain_id, + &mut backoff, + bus, + ) + .await + else { + return; + }; + current_provider = p; + consecutive_failures = 0; + } else if consecutive_failures > 0 { + if sleep_or_shutdown(backoff.next_delay(), &mut shutdown).await { + return; + } + } } Err(e) => { consecutive_failures += 1;