feat: add chunked historical log fetching with RPC block range#1331
Conversation
|
The latest updates on your projects. Learn more about Vercel for GitHub.
|
|
No actionable comments were generated in the recent review. 🎉 📝 WalkthroughWalkthroughRestructures EVM ingestion and sync lifecycle: richer SyncStatus with Init buffering and BufferUntilLive/Live states; introduces chunked, retrying log fetcher and live subscribe with backfill and reconnect/backoff; removes outer timeout from historical collection; adds selection logging in sortition; adds tokio dev-dep and new internal log_fetcher module. Changes
Sequence DiagramsequenceDiagram
participant Reader as EVM Read Interface / LogFetcher
participant Bus as Event Bus
participant Gateway as EVM Chain Gateway
participant SyncActor as Sync Actor
Reader->>Reader: fetch_logs_chunked(start..head) (chunked, retries)
loop per chunk
Reader->>Bus: emit EnclaveEvent<Unsequenced>
end
Reader->>Gateway: HistoricalSyncComplete(last_id, latest_block)
alt Gateway in Init
Gateway->>Gateway: store pending_sync_complete (warn)
Gateway->>Gateway: buffer incoming events
else Gateway forwarded to SyncActor
Gateway->>SyncActor: forward_to_sync_actor(buffer, pending_sync_complete)
SyncActor-->>Gateway: ack/processing
Gateway->>Gateway: transition -> BufferUntilLive
SyncActor->>Bus: process/drain buffered events
end
Gateway->>Gateway: transition BufferUntilLive -> Live
Reader->>Reader: subscribe_live_events (backoff, backfill_to_head)
loop live logs
Reader->>Bus: emit live EnclaveEvent<Unsequenced>
end
Estimated code review effort🎯 5 (Critical) | ⏱️ ~120 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 5 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
crates/sync/src/sync.rs (1)
115-158:⚠️ Potential issue | 🟠 MajorThe progress loop will spin indefinitely if a chain's historical fetch fails and the sender is never dropped.
If
fetch_historical_logs_chunkedfails (line 27 ofevm_read_interface.rs),stream_from_evmreturns early without sendingHistoricalSyncComplete. This means the sender held byEvmChainGatewayis never dropped—it's only released whenbuffer_until_live()is called in response toHistoricalSyncComplete(line 195 ofevm_chain_gateway.rs). As a result,collect_historical_evm_eventswill timeout every 30 seconds indefinitely, logging "Still waiting for historical events from chains" repeatedly, with no upper bound on total wait time and no way to detect that a chain will never report. The sync process hangs silently rather than failing.Add a maximum total timeout or a mechanism for failed chains to explicitly signal the collector (e.g., send a sentinel/error variant), to guarantee that
sync()eventually unblocks and returns an error.
🤖 Fix all issues with AI agents
In `@crates/evm/src/evm_read_interface.rs`:
- Around line 258-299: The historical fetch (fetch_historical_logs_chunked)
returns last_id but the live subscription (subscribe_live_events) starts from
Latest, creating a gap for blocks mined between fetch completion and
subscription; modify the flow so subscribe_live_events is given the last fetched
block (last_id) and uses from_block = last_id + 1 (or re-fetch a small
overlapping range and deduplicate) — pass last_id (or computed latest_block)
into subscribe_live_events and update its logic to accept a starting
BlockNumber/NumberOrTag and start the live subscription from that block instead
of BlockNumberOrTag::Latest.
🧹 Nitpick comments (6)
crates/sortition/src/backends.rs (1)
206-219: Consider extracting the duplicated logging block.Lines 167-178 and 208-219 are identical. A small helper (e.g.,
log_sortition_result) taking&e3_id,chain_id,size, and&winnerswould eliminate the duplication. Not urgent given only two call sites.crates/sync/src/sync.rs (1)
125-153: Duplicate or unexpected messages from the samechain_idare silently discarded.Line 128 checks
!received.contains(&msg.chain_id), which correctly deduplicates. However, if an unexpectedchain_idarrives (not inexpected), it is also silently dropped. This is fine for correctness but worth noting for debuggability — adebug!orwarn!log for unexpected chain IDs would help diagnose misconfiguration.🔧 Optional: log unexpected chain IDs
if expected.contains(&msg.chain_id) && !received.contains(&msg.chain_id) { info!( chain_id = msg.chain_id, events = msg.events.len(), chains_received = received.len() + 1, chains_expected = expected.len(), "Received historical events from chain" ); received.insert(msg.chain_id); results.append(&mut msg.events); + } else if !expected.contains(&msg.chain_id) { + warn!(chain_id = msg.chain_id, "Received events from unexpected chain, ignoring"); }crates/evm/src/evm_read_interface.rs (2)
301-368: Reconnection backoff sleep is not interruptible by shutdown.On lines 314-319, the reconnect sleep can last up to 60 seconds. During this time, the shutdown signal (checked on line 322 via
try_recv) is not polled. This means a shutdown request could be delayed by up to a minute.🔧 Use `select!` to make sleep interruptible
if reconnect_attempt > 0 { let delay_secs = (2u64.pow(reconnect_attempt.min(6))).min(MAX_RECONNECT_DELAY_SECS); warn!( chain_id, reconnect_attempt, delay_secs, "Reconnecting to live event stream" ); - tokio::time::sleep(std::time::Duration::from_secs(delay_secs)).await; - } - - if shutdown.try_recv().is_ok() { - info!("Shutdown signal received, stopping EVM stream"); - return; + select! { + _ = tokio::time::sleep(std::time::Duration::from_secs(delay_secs)) => {} + _ = &mut *shutdown => { + info!("Shutdown signal received during reconnect backoff, stopping EVM stream"); + return; + } + } + } else if shutdown.try_recv().is_ok() { + info!("Shutdown signal received, stopping EVM stream"); + return; }
27-29: Constants are reasonable defaults; consider making them configurable.
GET_LOGS_CHUNK_SIZE = 10_000is a safe default for most RPC providers (many limit to 10k blocks). However, some providers have lower limits (e.g., 2,000 blocks) while others support much higher ranges. Making the chunk size configurable (e.g., via environment variable or config) would improve flexibility without code changes.This is non-blocking — the current defaults are sensible for the immediate fix.
crates/evm/src/evm_chain_gateway.rs (2)
184-197: SecondHistoricalSyncCompleteduringInitsilently overwrites the first.If
forward_historical_sync_completeis called twice while still inInit, line 195 overwritespending_sync_completewith the new event, losing the first. Since eachEvmChainGatewayshould be per-chain and receive only oneHistoricalSyncComplete, this shouldn't happen in practice — but a debug assertion or warning would make the invariant explicit and aid future debugging.🔧 Optional: warn on unexpected duplicate
if let SyncStatus::Init { pending_sync_complete, .. } = &mut self.status { + if pending_sync_complete.is_some() { + warn!( + chain_id = event.chain_id, + "Received duplicate HistoricalSyncComplete during Init, overwriting" + ); + } warn!( chain_id = event.chain_id, "HistoricalSyncComplete arrived during Init, buffering" ); *pending_sync_complete = Some(event); return Ok(()); }
273-380: Test does not cover the newInit-buffering scenario.The existing test verifies the happy path (
SyncStart→ events →SyncComplete→SyncEnded). Consider adding a test whereHistoricalSyncCompletearrives beforeHistoricalEvmSyncStartto exercise the newpending_sync_completebuffering path — this is the race condition that motivated the state machine change.
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@crates/evm/src/evm_read_interface.rs`:
- Line 342: The subscription with provider.subscribe_logs(filter) won't backfill
when using Geth because fromBlock is ignored; after the match that creates the
subscription, call provider.get_logs or provider.request("eth_getLogs", ...) to
fetch logs from (result.latest_block + 1) through the current chain head (use
provider.get_block_number()/block_number) and feed those logs into the same
processing/dedup pipeline (respecting the existing ids: HashSet<EventId>), then
continue consuming the live subscription; ensure you compute the range using
result.latest_block and the current block number and merge results so overlaps
are deduplicated by ids.
🧹 Nitpick comments (1)
crates/evm/src/evm_read_interface.rs (1)
376-379:bus.err()called on every reconnect failure may flood downstream.Each failed subscription attempt emits an error to the bus. If the provider is unavailable for an extended period, this generates a steady stream of error events at exponential-backoff intervals. Consider emitting the error only on the first failure and on periodic intervals, or after a maximum retry threshold is reached.
ryardley
left a comment
There was a problem hiding this comment.
Added a couple of ideas. I think as this gets more complex we should have a test around this so we are all clear on the behavior. I think it would be better if we can avoid passing pending_sync_complete as this is leaking state information outside of the reader.
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@crates/evm/src/log_fetcher.rs`:
- Around line 144-186: The current backfill_to_head updates *last_block only on
full success, causing already-sent events to be re-fetched if fetch_logs_chunked
fails mid-way; change the flow so progress is committed incrementally: modify
fetch_logs_chunked (or add a new helper) to return the highest block it
successfully processed (or accept &mut last_block and update it after each
successfully-sent chunk), then in backfill_to_head use that returned
highest_processed_block to set *last_block even on partial success so
already-delivered chunks are not re-sent; reference functions/vars:
backfill_to_head, fetch_logs_chunked, *last_block, and next (EvmEventProcessor)
when locating where to update progress.
🧹 Nitpick comments (2)
crates/evm/src/log_fetcher.rs (2)
35-41: Silent failure when block timestamp fetch fails.
fetch_block_timestampswallows errors via.ok()with no logging. Combined withTimestampTracker::getusingunwrap_or(0)(line 210), a failed timestamp lookup silently produces0— which could confuse downstream consumers expecting real timestamps.♻️ Add a warning when timestamp fetch fails
async fn fetch_block_timestamp(&self, block_number: u64) -> Option<u64> { - self.get_block_by_number(block_number.into()) + match self.get_block_by_number(block_number.into()) .await - .ok() - .flatten() - .map(|b| b.header.timestamp) + { + Ok(Some(b)) => Some(b.header.timestamp), + Ok(None) => { + warn!(block_number, "Block not found when fetching timestamp"); + None + } + Err(e) => { + warn!(block_number, error = %e, "Failed to fetch block timestamp"); + None + } + } }
264-283: Consider asserting on the filter ranges passed to the mock.The
MockLogProvider::fetch_logsignores the filter parameter, so tests don't verify that the correct block ranges are passed for each chunk. A recording mock that captures(from_block, to_block)per call would catch chunk-boundary regressions.
Closes #1330
After wiping node data (
rm -rf .config/enclave/.enclave/data), nodes fail to synchronize historical blockchain events and are never selected for E3 committees. The node appears to start successfully but silently drops all historical state, resulting in zero sortition participation.The
EvmReadInterfacemakes a single unboundedeth_getLogscall fromdeploy_blocktolatest. Most RPC providers limiteth_getLogsto few thousand blocks, causing the call to fail for long block range and an early return that killed both historical sync AND live event subscription. Without historical events,ticketPricestayed at 0 so all nodes got 0 available tickets and sortition selected nobody.So to fix
Summary by CodeRabbit
Bug Fixes
New Features
Tests