From 9b7ecb83251d82b0493208c42d7e99cf8f0fa9b1 Mon Sep 17 00:00:00 2001 From: dapplion <35266934+dapplion@users.noreply.github.com> Date: Sun, 21 Dec 2025 19:26:43 -0300 Subject: [PATCH 1/4] Era files consumer --- Cargo.lock | 139 ++++++--- Cargo.toml | 3 +- beacon_node/lighthouse_network/src/config.rs | 3 + .../lighthouse_network/src/types/globals.rs | 5 + beacon_node/network/Cargo.toml | 1 + .../src/network_beacon_processor/mod.rs | 2 +- .../network_beacon_processor/sync_methods.rs | 24 +- .../network/src/sync/backfill_sync/mod.rs | 26 ++ .../network/src/sync/backfill_sync_era/mod.rs | 277 ++++++++++++++++++ beacon_node/network/src/sync/manager.rs | 118 ++++++-- beacon_node/network/src/sync/mod.rs | 1 + beacon_node/src/cli.rs | 9 + beacon_node/src/config.rs | 3 + 13 files changed, 528 insertions(+), 83 deletions(-) create mode 100644 beacon_node/network/src/sync/backfill_sync_era/mod.rs diff --git a/Cargo.lock b/Cargo.lock index 6ed7bfd0b60..f883dccb655 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -325,9 +325,9 @@ dependencies = [ [[package]] name = "alloy-primitives" -version = "1.4.1" +version = "1.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "355bf68a433e0fd7f7d33d5a9fc2583fde70bf5c530f63b80845f8da5505cf28" +checksum = "7db950a29746be9e2f2c6288c8bd7a6202a81f999ce109a2933d2379970ec0fa" dependencies = [ "alloy-rlp", "arbitrary", @@ -346,6 +346,7 @@ dependencies = [ "proptest", "proptest-derive", "rand 0.9.2", + "rapidhash", "ruint", "rustc-hash 2.1.1", "serde", @@ -1235,8 +1236,8 @@ dependencies = [ "eth2_network_config", "ethereum_hashing", "ethereum_serde_utils", - "ethereum_ssz", - "ethereum_ssz_derive", + "ethereum_ssz 0.10.0", + "ethereum_ssz_derive 0.10.0", "execution_layer", "fixed_bytes", "fork_choice", @@ -1495,7 +1496,7 @@ dependencies = [ "blst", "ethereum_hashing", "ethereum_serde_utils", - "ethereum_ssz", + "ethereum_ssz 0.10.0", "fixed_bytes", "hex", "rand 0.9.2", @@ -1542,7 +1543,7 @@ dependencies = [ "clap", "clap_utils", "eth2_network_config", - "ethereum_ssz", + "ethereum_ssz 0.10.0", "hex", "lighthouse_network", "log", @@ -1600,7 +1601,7 @@ dependencies = [ "bls", "context_deserialize", "eth2", - "ethereum_ssz", + "ethereum_ssz 0.10.0", "lighthouse_version", "mockito", "reqwest", @@ -1888,7 +1889,7 @@ dependencies = [ "clap", "dirs", "eth2_network_config", - "ethereum_ssz", + "ethereum_ssz 0.10.0", "hex", "serde", "serde_json", @@ -1907,7 +1908,7 @@ dependencies = [ "environment", "eth2", "eth2_config", - "ethereum_ssz", + "ethereum_ssz 0.10.0", "execution_layer", "futures", "genesis", @@ -2554,7 +2555,7 @@ dependencies = [ "alloy-json-abi", "alloy-primitives", "bls", - "ethereum_ssz", + "ethereum_ssz 0.10.0", "hex", "reqwest", "serde_json", @@ -2865,8 +2866,8 @@ dependencies = [ "context_deserialize", "educe", "eth2_network_config", - "ethereum_ssz", - "ethereum_ssz_derive", + "ethereum_ssz 0.10.0", + "ethereum_ssz_derive 0.10.0", "execution_layer", "fork_choice", "fs2", @@ -3145,8 +3146,8 @@ dependencies = [ "eip_3076", "eth2_keystore", "ethereum_serde_utils", - "ethereum_ssz", - "ethereum_ssz_derive", + "ethereum_ssz 0.10.0", + "ethereum_ssz_derive 0.10.0", "futures", "futures-util", "mediatype", @@ -3228,7 +3229,7 @@ dependencies = [ "bytes", "discv5", "eth2_config", - "ethereum_ssz", + "ethereum_ssz 0.10.0", "fixed_bytes", "kzg", "pretty_reqwest_error", @@ -3293,6 +3294,21 @@ dependencies = [ "serde_json", ] +[[package]] +name = "ethereum_ssz" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0dcddb2554d19cde19b099fadddde576929d7a4d0c1cd3512d1fd95cf174375c" +dependencies = [ + "alloy-primitives", + "ethereum_serde_utils", + "itertools 0.13.0", + "serde", + "serde_derive", + "smallvec", + "typenum", +] + [[package]] name = "ethereum_ssz" version = "0.10.0" @@ -3310,6 +3326,18 @@ dependencies = [ "typenum", ] +[[package]] +name = "ethereum_ssz_derive" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a657b6b3b7e153637dc6bdc6566ad9279d9ee11a15b12cfb24a2e04360637e9f" +dependencies = [ + "darling 0.20.11", + "proc-macro2", + "quote", + "syn 2.0.110", +] + [[package]] name = "ethereum_ssz_derive" version = "0.10.0" @@ -3403,7 +3431,7 @@ dependencies = [ "bytes", "eth2", "ethereum_serde_utils", - "ethereum_ssz", + "ethereum_ssz 0.10.0", "fixed_bytes", "fork_choice", "hash-db", @@ -3609,8 +3637,8 @@ name = "fork_choice" version = "0.1.0" dependencies = [ "beacon_chain", - "ethereum_ssz", - "ethereum_ssz_derive", + "ethereum_ssz 0.10.0", + "ethereum_ssz_derive 0.10.0", "fixed_bytes", "logging", "metrics", @@ -3804,7 +3832,7 @@ version = "0.2.0" dependencies = [ "bls", "ethereum_hashing", - "ethereum_ssz", + "ethereum_ssz 0.10.0", "int_to_bytes", "merkle_proof", "rayon", @@ -4250,7 +4278,7 @@ dependencies = [ "either", "eth2", "ethereum_serde_utils", - "ethereum_ssz", + "ethereum_ssz 0.10.0", "execution_layer", "fixed_bytes", "futures", @@ -4916,8 +4944,8 @@ dependencies = [ "educe", "ethereum_hashing", "ethereum_serde_utils", - "ethereum_ssz", - "ethereum_ssz_derive", + "ethereum_ssz 0.10.0", + "ethereum_ssz_derive 0.10.0", "hex", "rayon", "rust_eth_kzg", @@ -4957,7 +4985,7 @@ dependencies = [ "eth2_network_config", "eth2_wallet", "ethereum_hashing", - "ethereum_ssz", + "ethereum_ssz 0.10.0", "execution_layer", "fixed_bytes", "hex", @@ -5517,8 +5545,8 @@ dependencies = [ "discv5", "either", "eth2", - "ethereum_ssz", - "ethereum_ssz_derive", + "ethereum_ssz 0.10.0", + "ethereum_ssz_derive 0.10.0", "fixed_bytes", "fnv", "futures", @@ -5890,8 +5918,8 @@ dependencies = [ "context_deserialize", "educe", "ethereum_hashing", - "ethereum_ssz", - "ethereum_ssz_derive", + "ethereum_ssz 0.10.0", + "ethereum_ssz_derive 0.10.0", "itertools 0.13.0", "parking_lot", "rayon", @@ -6233,7 +6261,7 @@ dependencies = [ "educe", "eth2", "eth2_network_config", - "ethereum_ssz", + "ethereum_ssz 0.10.0", "execution_layer", "fixed_bytes", "fnv", @@ -6257,6 +6285,7 @@ dependencies = [ "rand 0.9.2", "rand_chacha 0.3.1", "rand_chacha 0.9.0", + "reth-era", "serde_json", "slot_clock", "smallvec", @@ -6675,8 +6704,8 @@ dependencies = [ "bitvec", "bls", "educe", - "ethereum_ssz", - "ethereum_ssz_derive", + "ethereum_ssz 0.10.0", + "ethereum_ssz_derive 0.10.0", "fixed_bytes", "itertools 0.10.5", "maplit", @@ -7185,8 +7214,8 @@ dependencies = [ name = "proto_array" version = "0.2.0" dependencies = [ - "ethereum_ssz", - "ethereum_ssz_derive", + "ethereum_ssz 0.10.0", + "ethereum_ssz_derive 0.10.0", "fixed_bytes", "safe_arith", "serde", @@ -7419,6 +7448,15 @@ dependencies = [ "rand_core 0.9.3", ] +[[package]] +name = "rapidhash" +version = "4.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2988730ee014541157f48ce4dcc603940e00915edc3c7f9a8d78092256bb2493" +dependencies = [ + "rustversion", +] + [[package]] name = "rayon" version = "1.11.0" @@ -7597,6 +7635,21 @@ version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e061d1b48cb8d38042de4ae0a7a6401009d6143dc80d2e2d6f31f0bdd6470c7" +[[package]] +name = "reth-era" +version = "1.9.3" +source = "git+https://github.com/paradigmxyz/reth?rev=62abfdaeb54e8a205a8ee085ddebd56047d93374#62abfdaeb54e8a205a8ee085ddebd56047d93374" +dependencies = [ + "alloy-consensus", + "alloy-eips", + "alloy-primitives", + "alloy-rlp", + "ethereum_ssz 0.9.1", + "ethereum_ssz_derive 0.9.1", + "snap", + "thiserror 2.0.17", +] + [[package]] name = "rfc6979" version = "0.4.0" @@ -8433,8 +8486,8 @@ dependencies = [ "bls", "byteorder", "educe", - "ethereum_ssz", - "ethereum_ssz_derive", + "ethereum_ssz 0.10.0", + "ethereum_ssz_derive 0.10.0", "filesystem", "fixed_bytes", "flate2", @@ -8586,7 +8639,7 @@ dependencies = [ "context_deserialize", "educe", "ethereum_serde_utils", - "ethereum_ssz", + "ethereum_ssz 0.10.0", "itertools 0.14.0", "serde", "serde_derive", @@ -8610,8 +8663,8 @@ dependencies = [ "bls", "educe", "ethereum_hashing", - "ethereum_ssz", - "ethereum_ssz_derive", + "ethereum_ssz 0.10.0", + "ethereum_ssz_derive 0.10.0", "fixed_bytes", "int_to_bytes", "integer-sqrt", @@ -8638,7 +8691,7 @@ version = "0.1.0" dependencies = [ "beacon_chain", "bls", - "ethereum_ssz", + "ethereum_ssz 0.10.0", "fixed_bytes", "state_processing", "tokio", @@ -8660,8 +8713,8 @@ dependencies = [ "criterion", "db-key", "directory", - "ethereum_ssz", - "ethereum_ssz_derive", + "ethereum_ssz 0.10.0", + "ethereum_ssz_derive 0.10.0", "fixed_bytes", "itertools 0.10.5", "leveldb", @@ -9474,7 +9527,7 @@ checksum = "2db21caa355767db4fd6129876e5ae278a8699f4a6959b1e3e7aff610b532d52" dependencies = [ "alloy-primitives", "ethereum_hashing", - "ethereum_ssz", + "ethereum_ssz 0.10.0", "smallvec", "typenum", ] @@ -9539,8 +9592,8 @@ dependencies = [ "eth2_interop_keypairs", "ethereum_hashing", "ethereum_serde_utils", - "ethereum_ssz", - "ethereum_ssz_derive", + "ethereum_ssz 0.10.0", + "ethereum_ssz_derive 0.10.0", "fixed_bytes", "hex", "int_to_bytes", diff --git a/Cargo.toml b/Cargo.toml index d5d1687c764..581e92d0cfa 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -99,7 +99,7 @@ alloy-consensus = { version = "1", default-features = false } alloy-dyn-abi = { version = "1", default-features = false } alloy-json-abi = { version = "1", default-features = false } alloy-network = { version = "1", default-features = false } -alloy-primitives = { version = "1", default-features = false, features = ["rlp", "getrandom"] } +alloy-primitives = { version = "1.5", default-features = false, features = ["rlp", "getrandom"] } alloy-provider = { version = "1", default-features = false, features = ["reqwest"] } alloy-rlp = { version = "0.3", default-features = false } alloy-rpc-types-eth = { version = "1", default-features = false, features = ["serde"] } @@ -138,6 +138,7 @@ eip_3076 = { path = "common/eip_3076" } either = "1.9" environment = { path = "lighthouse/environment" } eth2 = { path = "common/eth2" } +reth-era = { git = "https://github.com/paradigmxyz/reth", package = "reth-era", rev = "62abfdaeb54e8a205a8ee085ddebd56047d93374" } eth2_config = { path = "common/eth2_config" } eth2_key_derivation = { path = "crypto/eth2_key_derivation" } eth2_keystore = { path = "crypto/eth2_keystore" } diff --git a/beacon_node/lighthouse_network/src/config.rs b/beacon_node/lighthouse_network/src/config.rs index 416ca73e08e..a9299538f7b 100644 --- a/beacon_node/lighthouse_network/src/config.rs +++ b/beacon_node/lighthouse_network/src/config.rs @@ -118,6 +118,8 @@ pub struct Config { /// Whether we are running a block proposer only node. pub proposer_only: bool, + /// Optional directory containing `.era` files for backfill. + pub era_files_dir: Option, /// Whether metrics are enabled. pub metrics_enabled: bool, @@ -357,6 +359,7 @@ impl Default for Config { shutdown_after_sync: false, topics: Vec::new(), proposer_only: false, + era_files_dir: None, metrics_enabled: false, enable_light_client_server: true, outbound_rate_limiter_config: None, diff --git a/beacon_node/lighthouse_network/src/types/globals.rs b/beacon_node/lighthouse_network/src/types/globals.rs index f46eb05ceb0..d9c6fc495a5 100644 --- a/beacon_node/lighthouse_network/src/types/globals.rs +++ b/beacon_node/lighthouse_network/src/types/globals.rs @@ -165,6 +165,11 @@ impl NetworkGlobals { self.backfill_state.read().clone() } + /// Set the current backfill state. + pub fn set_backfill_state(&self, state: BackFillState) { + *self.backfill_state.write() = state; + } + /// Returns a `Client` type if one is known for the `PeerId`. pub fn client(&self, peer_id: &PeerId) -> Client { self.peers diff --git a/beacon_node/network/Cargo.toml b/beacon_node/network/Cargo.toml index bf261965760..6526f071fc4 100644 --- a/beacon_node/network/Cargo.toml +++ b/beacon_node/network/Cargo.toml @@ -36,6 +36,7 @@ metrics = { workspace = true } operation_pool = { workspace = true } parking_lot = { workspace = true } rand = { workspace = true } +reth-era = { workspace = true } slot_clock = { workspace = true } smallvec = { workspace = true } ssz_types = { workspace = true } diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index bebda36d71c..2d2d8c39b0d 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -529,7 +529,7 @@ impl NetworkBeaconProcessor { }; Work::ChainSegment(Box::pin(process_fn)) } - ChainSegmentProcessId::BackSyncBatchId(_) => { + ChainSegmentProcessId::BackSyncBatchId(_) | ChainSegmentProcessId::BackSyncEraBatchId(_) => { let process_fn = move || processor.process_chain_segment_backfill(process_id, blocks); Work::ChainSegmentBackfill(Box::new(process_fn)) diff --git a/beacon_node/network/src/network_beacon_processor/sync_methods.rs b/beacon_node/network/src/network_beacon_processor/sync_methods.rs index e49ae134fe4..6501f587afa 100644 --- a/beacon_node/network/src/network_beacon_processor/sync_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/sync_methods.rs @@ -42,6 +42,8 @@ pub enum ChainSegmentProcessId { RangeBatchId(ChainId, Epoch), /// Processing ID for a backfill syncing batch. BackSyncBatchId(Epoch), + /// Processing ID for an era backfill syncing batch. + BackSyncEraBatchId(u64), } /// Returned when a chain segment import fails. @@ -616,14 +618,20 @@ impl NetworkBeaconProcessor { process_id: ChainSegmentProcessId, downloaded_blocks: Vec>, ) { - let ChainSegmentProcessId::BackSyncBatchId(epoch) = process_id else { - // this a request from RangeSync, this should _never_ happen - crit!( - error = - "process_chain_segment_backfill called on a variant other than BackSyncBatchId", - "Please notify the devs" - ); - return; + let epoch = match process_id { + ChainSegmentProcessId::BackSyncBatchId(epoch) => epoch, + ChainSegmentProcessId::BackSyncEraBatchId(_) => { + // Era backfill doesn't encode epochs; use 0 for logging. + Epoch::new(0) + } + _ => { + crit!( + error = + "process_chain_segment_backfill called on a variant other than BackSyncBatchId or BackSyncEraBatchId", + "Please notify the devs" + ); + return; + } }; let start_slot = downloaded_blocks.first().map(|b| b.slot().as_u64()); diff --git a/beacon_node/network/src/sync/backfill_sync/mod.rs b/beacon_node/network/src/sync/backfill_sync/mod.rs index 6c0cbd7e554..f6043cab4db 100644 --- a/beacon_node/network/src/sync/backfill_sync/mod.rs +++ b/beacon_node/network/src/sync/backfill_sync/mod.rs @@ -204,6 +204,32 @@ impl BackFillSync { } } + /// Refreshes backfill start state from the beacon chain. + /// + /// Intended for external backfill sources that may have advanced the anchor. + pub fn reset_from_store(&mut self) { + let anchor_info = self.beacon_chain.store.get_anchor_info(); + if anchor_info.block_backfill_complete(self.beacon_chain.genesis_backfill_slot) { + self.set_state(BackFillState::Completed); + return; + } + + let start_epoch = anchor_info + .oldest_block_slot + .epoch(T::EthSpec::slots_per_epoch()); + + self.current_start = start_epoch; + self.processing_target = start_epoch; + self.to_be_downloaded = start_epoch; + self.last_batch_downloaded = false; + self.current_processing_batch = None; + self.batches.clear(); + self.participating_peers.clear(); + self.restart_failed_sync = false; + self.validated_batches = 0; + self.set_state(BackFillState::Paused); + } + /// Starts or resumes syncing. /// /// If resuming is successful, reports back the current syncing metrics. diff --git a/beacon_node/network/src/sync/backfill_sync_era/mod.rs b/beacon_node/network/src/sync/backfill_sync_era/mod.rs new file mode 100644 index 00000000000..7a04588fb96 --- /dev/null +++ b/beacon_node/network/src/sync/backfill_sync_era/mod.rs @@ -0,0 +1,277 @@ +use crate::network_beacon_processor::ChainSegmentProcessId; +use crate::sync::backfill_sync::{ProcessResult, SyncStart}; +use crate::sync::manager::BatchProcessResult; +use crate::sync::network_context::SyncNetworkContext; +use beacon_chain::block_verification_types::RpcBlock; +use beacon_chain::{BeaconChain, BeaconChainTypes}; +use lighthouse_network::NetworkGlobals; +use lighthouse_network::types::BackFillState; +use reth_era::common::file_ops::StreamReader; +use reth_era::era::file::EraReader; +use std::fs::{self, File}; +use std::path::{Path, PathBuf}; +use std::sync::Arc; +use tracing::{debug, error, info}; +use typenum::Unsigned; +use types::{EthSpec, SignedBeaconBlock, Slot}; + +#[derive(Debug)] +#[allow(dead_code)] +pub enum BackFillEraError { + InternalError(String), + BadEraFile(String), +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum BackFillEraState { + NotStarted, + Syncing { era_number: u64 }, + Completed, + Disabled, +} + +pub struct BackFillSyncEra { + state: BackFillEraState, + initial_era: Option, + era_files_dir: PathBuf, + beacon_chain: Arc>, + network_globals: Arc>, +} + +impl BackFillSyncEra { + pub fn new( + beacon_chain: Arc>, + network_globals: Arc>, + era_files_dir: PathBuf, + ) -> Self { + Self { + state: BackFillEraState::NotStarted, + initial_era: None, + era_files_dir, + beacon_chain, + network_globals, + } + } + + pub fn pause(&mut self) { + if matches!(self.state, BackFillEraState::Syncing { .. }) { + self.state = BackFillEraState::NotStarted; + self.network_globals + .set_backfill_state(BackFillState::Paused); + } + } + + pub fn start( + &mut self, + network: &mut SyncNetworkContext, + ) -> Result { + if self.state == BackFillEraState::Disabled { + return Ok(SyncStart::NotSyncing); + } + + let anchor_info = self.beacon_chain.store.get_anchor_info(); + if anchor_info.block_backfill_complete(self.beacon_chain.genesis_backfill_slot) { + self.state = BackFillEraState::Completed; + self.network_globals + .set_backfill_state(BackFillState::Completed); + return Ok(SyncStart::NotSyncing); + } + + if self.state == BackFillEraState::NotStarted { + let start_slot = anchor_info.oldest_block_slot; + let start_era = era_number_for_slot::(start_slot); + self.initial_era = Some(start_era); + if let Err(e) = self.send_next_file(network, start_era) { + self.disable("failed to read era file"); + return Err(e); + } else { + self.network_globals + .set_backfill_state(BackFillState::Syncing); + } + } + + Ok(self.syncing_progress()) + } + + pub fn on_batch_process_result( + &mut self, + network: &mut SyncNetworkContext, + era_number: u64, + result: &BatchProcessResult, + ) -> Result { + let current_era = match self.state { + BackFillEraState::Syncing { era_number } => era_number, + _ => return Ok(ProcessResult::Successful), + }; + + if current_era != era_number { + debug!( + current_era, + era_number, "Ignoring backfill processing result for unknown era number" + ); + return Ok(ProcessResult::Successful); + } + + match result { + BatchProcessResult::Success { .. } => { + if era_start_slot::(current_era) + <= self.beacon_chain.genesis_backfill_slot + { + self.state = BackFillEraState::Completed; + self.network_globals + .set_backfill_state(BackFillState::Completed); + info!("Era backfill sync completed"); + Ok(ProcessResult::SyncCompleted) + } else { + let next_era = current_era.saturating_sub(1); + if let Err(e) = self.send_next_file(network, next_era) { + self.disable("failed to read era file"); + return Err(e); + } + Ok(ProcessResult::Successful) + } + } + BatchProcessResult::FaultyFailure { .. } | BatchProcessResult::NonFaultyFailure => { + self.disable("batch processing failed"); + Err(BackFillEraError::BadEraFile(format!( + "ERA backfill batch {era_number} failed processing" + ))) + } + } + } + + fn send_next_file( + &mut self, + network: &mut SyncNetworkContext, + era_number: u64, + ) -> Result<(), BackFillEraError> { + let blocks = + read_batch::(&self.era_files_dir, era_number, &self.beacon_chain.spec)?; + + if let Err(e) = network.beacon_processor().send_chain_segment( + ChainSegmentProcessId::BackSyncEraBatchId(era_number), + blocks, + ) { + self.state = BackFillEraState::NotStarted; + return Err(BackFillEraError::InternalError(format!( + "failed to send era backfill batch: {e}" + ))); + } else { + self.state = BackFillEraState::Syncing { era_number }; + } + + Ok(()) + } + + fn syncing_progress(&self) -> SyncStart { + let current_era = match self.state { + BackFillEraState::Syncing { era_number } => era_number, + _ => return SyncStart::NotSyncing, + }; + let Some(initial_era) = self.initial_era else { + return SyncStart::NotSyncing; + }; + + let slots_per_era = slots_per_era::(); + let completed = + (initial_era.saturating_sub(current_era)).saturating_mul(slots_per_era) as usize; + let remaining = era_start_slot::(current_era) + .saturating_sub(self.beacon_chain.genesis_backfill_slot) + .as_usize(); + + SyncStart::Syncing { + completed, + remaining, + } + } + + fn disable(&mut self, reason: &str) { + error!( + reason, + "Era backfill disabled, falling back to network backfill" + ); + self.state = BackFillEraState::Disabled; + self.network_globals + .set_backfill_state(BackFillState::Paused); + } +} + +fn read_batch( + era_files_dir: &Path, + era_number: u64, + spec: &types::ChainSpec, +) -> Result>, BackFillEraError> { + let path = find_era_file(era_files_dir, era_number) + .map_err(|e| BackFillEraError::BadEraFile(format!("Bad era file name: {e:?}")))? + .ok_or_else(|| { + BackFillEraError::BadEraFile(format!("No era file for number {era_number}")) + })?; + let file = File::open(&path) + .map_err(|e| BackFillEraError::BadEraFile(format!("Unable to read era file: {e:?}")))?; + let reader = EraReader::new(file); + let mut blocks = Vec::new(); + + for block in reader.iter() { + let compressed = block + .map_err(|e| BackFillEraError::BadEraFile(format!("Error reading era block: {e:?}")))?; + let ssz_bytes = compressed.decompress().map_err(|e| { + BackFillEraError::BadEraFile(format!("failed to decompress block: {e:?}")) + })?; + let block = SignedBeaconBlock::::from_ssz_bytes(&ssz_bytes, spec) + .map_err(|e| BackFillEraError::BadEraFile(format!("failed to decode block: {e:?}")))?; + blocks.push(RpcBlock::new_without_blobs(None, Arc::new(block))); + } + + Ok(blocks) +} + +fn parse_era_number(path: &Path) -> Result, String> { + let Some(stem) = path.file_stem().and_then(|name| name.to_str()) else { + return Ok(None); + }; + let mut parts = stem.split('-'); + let _config = parts.next(); + let Some(era_str) = parts.next() else { + return Ok(None); + }; + let era_number = era_str + .parse::() + .map_err(|_| format!("invalid era number in file: {}", path.display()))?; + Ok(Some(era_number)) +} + +fn find_era_file(dir: &Path, era_number: u64) -> Result, String> { + let mut found: Option = None; + for entry in fs::read_dir(dir).map_err(|e| format!("Error reading dir: {e:?}"))? { + let entry = entry.map_err(|e| format!("Error reading dir entry: {e:?}"))?; + let path = entry.path(); + if path.extension().and_then(|ext| ext.to_str()) != Some("era") { + continue; + } + let parsed = match parse_era_number(&path)? { + Some(parsed) => parsed, + None => continue, + }; + if parsed == era_number { + if found.is_some() { + return Err(format!("multiple era files found for era {era_number}")); + } + found = Some(path); + } + } + + Ok(found) +} + +fn era_number_for_slot(slot: Slot) -> u64 { + slot.as_u64() + .saturating_div(E::SlotsPerHistoricalRoot::to_u64()) +} + +fn era_start_slot(era_number: u64) -> Slot { + Slot::new(era_number.saturating_mul(E::SlotsPerHistoricalRoot::to_u64())) +} + +fn slots_per_era() -> u64 { + E::SlotsPerHistoricalRoot::to_u64() +} diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 338f21ce987..2da45b3547d 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -34,6 +34,7 @@ //! search for the block and subsequently search for parents if needed. use super::backfill_sync::{BackFillSync, ProcessResult, SyncStart}; +use super::backfill_sync_era::BackFillSyncEra; use super::block_lookups::BlockLookups; use super::network_context::{ CustodyByRootResult, RangeBlockComponent, RangeRequestId, RpcEvent, SyncNetworkContext, @@ -251,6 +252,9 @@ pub struct SyncManager { /// Backfill syncing. backfill_sync: BackFillSync, + /// Backfill syncing with ERA files + backfill_sync_era: Option>, + /// Custody syncing. custody_backfill_sync: CustodyBackFillSync, @@ -314,6 +318,9 @@ impl SyncManager { ), range_sync: RangeSync::new(beacon_chain.clone()), backfill_sync: BackFillSync::new(beacon_chain.clone(), network_globals.clone()), + backfill_sync_era: network_globals.config.era_files_dir.clone().map(|dir| { + BackFillSyncEra::new(beacon_chain.clone(), network_globals.clone(), dir) + }), custody_backfill_sync: CustodyBackFillSync::new(beacon_chain.clone(), network_globals), block_lookups: BlockLookups::new(), notified_unknown_roots: LRUTimeCache::new(Duration::from_secs( @@ -636,52 +643,76 @@ impl SyncManager { // complete a backfill sync. #[cfg(not(feature = "disable-backfill"))] if matches!(sync_state, SyncState::Synced) { - // Determine if we need to start/resume/restart a backfill sync. - match self.backfill_sync.start(&mut self.network) { - Ok(SyncStart::Syncing { - completed, - remaining, - }) => { - sync_state = SyncState::BackFillSyncing { + let mut era_backfill_active = false; + if let Some(era_backfill_sync) = self.backfill_sync_era.as_mut() { + match era_backfill_sync.start(&mut self.network) { + Ok(SyncStart::Syncing { completed, remaining, - }; - } - Ok(SyncStart::NotSyncing) => {} // Ignore updating the state if the backfill sync state didn't start. - Err(e) => { - error!(error = ?e, "Backfill sync failed to start"); + }) => { + sync_state = SyncState::BackFillSyncing { + completed, + remaining, + }; + era_backfill_active = true; + } + Ok(SyncStart::NotSyncing) => {} + Err(e) => { + error!(error = ?e, "Era backfill sync failed to start"); + } } } - // If backfill is complete, check if we have a pending custody backfill to complete - let anchor_info = self.chain.store.get_anchor_info(); - if anchor_info.block_backfill_complete(self.chain.genesis_backfill_slot) { - match self.custody_backfill_sync.start(&mut self.network) { + if !era_backfill_active { + self.backfill_sync.reset_from_store(); + // Determine if we need to start/resume/restart a backfill sync. + match self.backfill_sync.start(&mut self.network) { Ok(SyncStart::Syncing { completed, remaining, }) => { - sync_state = SyncState::CustodyBackFillSyncing { + sync_state = SyncState::BackFillSyncing { completed, remaining, }; } - Ok(SyncStart::NotSyncing) => {} // Ignore updating the state if custody sync state didn't start. + Ok(SyncStart::NotSyncing) => {} // Ignore updating the state if the backfill sync state didn't start. Err(e) => { - use crate::sync::custody_backfill_sync::CustodyBackfillError; + error!(error = ?e, "Backfill sync failed to start"); + } + } - match &e { - CustodyBackfillError::BatchDownloadFailed(_) - | CustodyBackfillError::BatchProcessingFailed(_) => { - debug!(error=?e, "Custody backfill batch processing or downloading failed"); - } - CustodyBackfillError::BatchInvalidState(_, reason) => { - error!(error=?e, reason, "Custody backfill sync failed due to invalid batch state") - } - CustodyBackfillError::InvalidSyncState(reason) => { - error!(error=?e, reason, "Custody backfill sync failed due to invalid sync state") + // If backfill is complete, check if we have a pending custody backfill to complete + let anchor_info = self.chain.store.get_anchor_info(); + if anchor_info.block_backfill_complete(self.chain.genesis_backfill_slot) + { + match self.custody_backfill_sync.start(&mut self.network) { + Ok(SyncStart::Syncing { + completed, + remaining, + }) => { + sync_state = SyncState::CustodyBackFillSyncing { + completed, + remaining, + }; + } + Ok(SyncStart::NotSyncing) => {} // Ignore updating the state if custody sync state didn't start. + Err(e) => { + use crate::sync::custody_backfill_sync::CustodyBackfillError; + + match &e { + CustodyBackfillError::BatchDownloadFailed(_) + | CustodyBackfillError::BatchProcessingFailed(_) => { + debug!(error=?e, "Custody backfill batch processing or downloading failed"); + } + CustodyBackfillError::BatchInvalidState(_, reason) => { + error!(error=?e, reason, "Custody backfill sync failed due to invalid batch state") + } + CustodyBackfillError::InvalidSyncState(reason) => { + error!(error=?e, reason, "Custody backfill sync failed due to invalid sync state") + } + CustodyBackfillError::Paused => {} } - CustodyBackfillError::Paused => {} } } } @@ -695,6 +726,9 @@ impl SyncManager { // Range sync is in progress. If there is a backfill or custody sync in progress pause it. #[cfg(not(feature = "disable-backfill"))] self.backfill_sync.pause(); + if let Some(era_backfill_sync) = self.backfill_sync_era.as_mut() { + era_backfill_sync.pause(); + } self.custody_backfill_sync .pause("Range sync in progress".to_string()); @@ -708,6 +742,9 @@ impl SyncManager { // in progress pause it. #[cfg(not(feature = "disable-backfill"))] self.backfill_sync.pause(); + if let Some(era_backfill_sync) = self.backfill_sync_era.as_mut() { + era_backfill_sync.pause(); + } self.custody_backfill_sync .pause("Range sync in progress".to_string()); @@ -941,6 +978,27 @@ impl SyncManager { } } } + ChainSegmentProcessId::BackSyncEraBatchId(era_number) => { + if let Some(era_backfill_sync) = self.backfill_sync_era.as_mut() { + match era_backfill_sync.on_batch_process_result( + &mut self.network, + era_number, + &result, + ) { + Ok(ProcessResult::Successful) => {} + Ok(ProcessResult::SyncCompleted) => self.update_sync_state(), + Err(error) => { + error!(error = ?error, "Era backfill sync failed"); + self.update_sync_state(); + } + } + } else { + debug!( + era_number = %era_number, + "Ignoring era backfill batch result without era backfill enabled" + ); + } + } }, SyncMessage::CustodyBatchProcessed { result, batch_id } => { match self.custody_backfill_sync.on_batch_process_result( diff --git a/beacon_node/network/src/sync/mod.rs b/beacon_node/network/src/sync/mod.rs index 054bab654c2..b59bb4571b5 100644 --- a/beacon_node/network/src/sync/mod.rs +++ b/beacon_node/network/src/sync/mod.rs @@ -6,6 +6,7 @@ mod batch; mod block_lookups; mod block_sidecar_coupling; mod custody_backfill_sync; +mod backfill_sync_era; pub mod manager; mod network_context; mod peer_sync_info; diff --git a/beacon_node/src/cli.rs b/beacon_node/src/cli.rs index e4c7c6ff1fe..c0771cc8997 100644 --- a/beacon_node/src/cli.rs +++ b/beacon_node/src/cli.rs @@ -414,6 +414,15 @@ pub fn cli_app() -> Command { .help_heading(FLAG_HEADER) .display_order(0) ) + .arg( + Arg::new("era-files-dir") + .long("era-files-dir") + .value_name("DIR") + .help("Directory containing `.era` files to use for backfill.") + .action(ArgAction::Set) + .help_heading(FLAG_HEADER) + .display_order(0) + ) .arg( Arg::new("complete-blob-backfill") .long("complete-blob-backfill") diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index 26dd3b6642e..f874bd14997 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -811,6 +811,9 @@ pub fn get_config( client_config.chain.genesis_backfill = true; } + client_config.network.era_files_dir = + clap_utils::parse_optional(cli_args, "era-files-dir")?; + client_config.chain.complete_blob_backfill = cli_args.get_flag("complete-blob-backfill"); // Ensure `prune_blobs` is false whenever complete-blob-backfill is set. This overrides any From 79e758f19f65c1f804c5f1cdd1b19e32f066cb46 Mon Sep 17 00:00:00 2001 From: dapplion <35266934+dapplion@users.noreply.github.com> Date: Tue, 23 Dec 2025 16:57:51 -0300 Subject: [PATCH 2/4] Remove extra fn --- .../network_beacon_processor/sync_methods.rs | 38 ++++++------------- .../network/src/sync/backfill_sync/mod.rs | 26 ------------- beacon_node/network/src/sync/manager.rs | 1 - 3 files changed, 12 insertions(+), 53 deletions(-) diff --git a/beacon_node/network/src/network_beacon_processor/sync_methods.rs b/beacon_node/network/src/network_beacon_processor/sync_methods.rs index 6501f587afa..e1fa09b0d35 100644 --- a/beacon_node/network/src/network_beacon_processor/sync_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/sync_methods.rs @@ -618,22 +618,7 @@ impl NetworkBeaconProcessor { process_id: ChainSegmentProcessId, downloaded_blocks: Vec>, ) { - let epoch = match process_id { - ChainSegmentProcessId::BackSyncBatchId(epoch) => epoch, - ChainSegmentProcessId::BackSyncEraBatchId(_) => { - // Era backfill doesn't encode epochs; use 0 for logging. - Epoch::new(0) - } - _ => { - crit!( - error = - "process_chain_segment_backfill called on a variant other than BackSyncBatchId or BackSyncEraBatchId", - "Please notify the devs" - ); - return; - } - }; - + let block_count = downloaded_blocks.len(); let start_slot = downloaded_blocks.first().map(|b| b.slot().as_u64()); let end_slot = downloaded_blocks.last().map(|b| b.slot().as_u64()); let sent_blocks = downloaded_blocks.len(); @@ -649,15 +634,16 @@ impl NetworkBeaconProcessor { let result = match self.process_backfill_blocks(downloaded_blocks) { (imported_blocks, Ok(_)) => { debug!( - batch_epoch = %epoch, - first_block_slot = start_slot, - keep_execution_payload = !self.chain.store.get_config().prune_payloads, - last_block_slot = end_slot, - processed_blocks = sent_blocks, - processed_blobs = n_blobs, - processed_data_columns = n_data_columns, - service= "sync", - "Backfill batch processed"); + block_count, + first_block_slot = start_slot, + keep_execution_payload = !self.chain.store.get_config().prune_payloads, + last_block_slot = end_slot, + processed_blocks = sent_blocks, + processed_blobs = n_blobs, + processed_data_columns = n_data_columns, + service = "sync", + "Backfill batch processed" + ); BatchProcessResult::Success { sent_blocks, imported_blocks, @@ -665,7 +651,7 @@ impl NetworkBeaconProcessor { } (_, Err(e)) => { debug!( - batch_epoch = %epoch, + block_count, first_block_slot = start_slot, last_block_slot = end_slot, processed_blobs = n_blobs, diff --git a/beacon_node/network/src/sync/backfill_sync/mod.rs b/beacon_node/network/src/sync/backfill_sync/mod.rs index f6043cab4db..6c0cbd7e554 100644 --- a/beacon_node/network/src/sync/backfill_sync/mod.rs +++ b/beacon_node/network/src/sync/backfill_sync/mod.rs @@ -204,32 +204,6 @@ impl BackFillSync { } } - /// Refreshes backfill start state from the beacon chain. - /// - /// Intended for external backfill sources that may have advanced the anchor. - pub fn reset_from_store(&mut self) { - let anchor_info = self.beacon_chain.store.get_anchor_info(); - if anchor_info.block_backfill_complete(self.beacon_chain.genesis_backfill_slot) { - self.set_state(BackFillState::Completed); - return; - } - - let start_epoch = anchor_info - .oldest_block_slot - .epoch(T::EthSpec::slots_per_epoch()); - - self.current_start = start_epoch; - self.processing_target = start_epoch; - self.to_be_downloaded = start_epoch; - self.last_batch_downloaded = false; - self.current_processing_batch = None; - self.batches.clear(); - self.participating_peers.clear(); - self.restart_failed_sync = false; - self.validated_batches = 0; - self.set_state(BackFillState::Paused); - } - /// Starts or resumes syncing. /// /// If resuming is successful, reports back the current syncing metrics. diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 2da45b3547d..9ebefa9fd6a 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -664,7 +664,6 @@ impl SyncManager { } if !era_backfill_active { - self.backfill_sync.reset_from_store(); // Determine if we need to start/resume/restart a backfill sync. match self.backfill_sync.start(&mut self.network) { Ok(SyncStart::Syncing { From 8c1ded1f4f8bf983398d5470325f1566b5ebcafc Mon Sep 17 00:00:00 2001 From: dapplion <35266934+dapplion@users.noreply.github.com> Date: Wed, 24 Dec 2025 19:39:27 -0300 Subject: [PATCH 3/4] Fill DB on startup --- Cargo.lock | 1 + beacon_node/beacon_chain/Cargo.toml | 1 + beacon_node/beacon_chain/src/builder.rs | 17 +++ .../beacon_chain/src/era_file_consumer.rs | 117 ++++++++++++++++++ beacon_node/beacon_chain/src/lib.rs | 1 + beacon_node/client/src/builder.rs | 13 +- beacon_node/client/src/config.rs | 3 + beacon_node/src/config.rs | 9 +- beacon_node/store/src/config.rs | 4 + beacon_node/store/src/reconstruct.rs | 2 +- 10 files changed, 164 insertions(+), 4 deletions(-) create mode 100644 beacon_node/beacon_chain/src/era_file_consumer.rs diff --git a/Cargo.lock b/Cargo.lock index f883dccb655..3fa59045af3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1264,6 +1264,7 @@ dependencies = [ "proto_array", "rand 0.9.2", "rayon", + "reth-era", "safe_arith", "sensitive_url", "serde", diff --git a/beacon_node/beacon_chain/Cargo.toml b/beacon_node/beacon_chain/Cargo.toml index 734cfdf32bb..b4a03294be0 100644 --- a/beacon_node/beacon_chain/Cargo.toml +++ b/beacon_node/beacon_chain/Cargo.toml @@ -48,6 +48,7 @@ parking_lot = { workspace = true } proto_array = { workspace = true } rand = { workspace = true } rayon = { workspace = true } +reth-era = { workspace = true } safe_arith = { workspace = true } sensitive_url = { workspace = true } serde = { workspace = true } diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index 58dbf1c35e8..c620a569f9d 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -18,6 +18,7 @@ use crate::persisted_custody::load_custody_context; use crate::shuffling_cache::{BlockShufflingIds, ShufflingCache}; use crate::validator_monitor::{ValidatorMonitor, ValidatorMonitorConfig}; use crate::validator_pubkey_cache::ValidatorPubkeyCache; +use crate::era_file_consumer::import_era_files; use crate::{ BeaconChain, BeaconChainTypes, BeaconForkChoiceStore, BeaconSnapshot, ServerSentEventHandler, }; @@ -37,6 +38,7 @@ use slasher::Slasher; use slot_clock::{SlotClock, TestingSlotClock}; use state_processing::{AllCaches, per_slot_processing}; use std::marker::PhantomData; +use std::path::Path; use std::sync::Arc; use std::time::Duration; use store::{Error as StoreError, HotColdDB, ItemStore, KeyValueStoreOp}; @@ -205,6 +207,21 @@ where self } + /// Import trusted era files into the store before building the chain. + pub fn era_files( + self, + era_files_dir: &Path, + genesis_state: BeaconState, + ) -> Result { + let builder = self.genesis_state(genesis_state)?; + let store = builder + .store + .clone() + .ok_or("era_files requires a store.")?; + import_era_files(&store, era_files_dir, &builder.spec)?; + Ok(builder) + } + /// Sets the store migrator config (optional). pub fn store_migrator_config(mut self, config: MigratorConfig) -> Self { self.store_migrator_config = Some(config); diff --git a/beacon_node/beacon_chain/src/era_file_consumer.rs b/beacon_node/beacon_chain/src/era_file_consumer.rs new file mode 100644 index 00000000000..135a858a7d0 --- /dev/null +++ b/beacon_node/beacon_chain/src/era_file_consumer.rs @@ -0,0 +1,117 @@ +use reth_era::common::file_ops::StreamReader; +use reth_era::era::file::EraReader; +use reth_era::era::types::consensus::{CompressedBeaconState, CompressedSignedBeaconBlock}; +use std::fs::{self, File}; +use std::path::{Path, PathBuf}; +use store::{HotColdDB, ItemStore}; +use tracing::{info, warn}; +use types::{BeaconState, ChainSpec, EthSpec, SignedBeaconBlock}; + +pub(crate) fn import_era_files, Cold: ItemStore>( + store: &HotColdDB, + era_files_dir: &Path, + spec: &ChainSpec, +) -> Result<(), String> { + let mut era_files = list_era_files(era_files_dir)?; + era_files.sort_by_key(|(era_number, _)| *era_number); + + let network_name = spec + .config_name + .clone() + .unwrap_or_else(|| "unknown".to_string()); + + for (era_number, path) in era_files { + info!(era_number, ?path, "Importing era file"); + import_era_file(store, &path, &network_name, spec) + .map_err(|error| format!("era file import failed: {error}"))?; + } + + Ok(()) +} + +fn import_era_file, Cold: ItemStore>( + store: &HotColdDB, + path: &Path, + network_name: &str, + spec: &ChainSpec, +) -> Result<(), String> { + let file = File::open(path).map_err(|error| format!("failed to open era file: {error}"))?; + let era_file = EraReader::new(file) + .read_and_assemble(network_name.to_string()) + .map_err(|error| format!("failed to parse era file: {error:?}"))?; + + for compressed_block in era_file.group.blocks { + let block = decode_block::(compressed_block, spec)?; + let block_root = block.canonical_root(); + store + .put_block(&block_root, block) + .map_err(|error| format!("failed to store block: {error:?}"))?; + } + + let mut state = decode_state::(era_file.group.era_state, spec)?; + let state_root = state + .canonical_root() + .map_err(|error| format!("failed to hash state: {error:?}"))?; + store + .put_state(&state_root, &state) + .map_err(|error| format!("failed to store state: {error:?}"))?; + + Ok(()) +} + +fn decode_block( + compressed: CompressedSignedBeaconBlock, + spec: &ChainSpec, +) -> Result, String> { + let bytes = compressed + .decompress() + .map_err(|error| format!("failed to decompress block: {error:?}"))?; + SignedBeaconBlock::from_ssz_bytes(&bytes, spec) + .map_err(|error| format!("failed to decode block: {error:?}")) +} + +fn decode_state( + compressed: CompressedBeaconState, + spec: &ChainSpec, +) -> Result, String> { + let bytes = compressed + .decompress() + .map_err(|error| format!("failed to decompress state: {error:?}"))?; + BeaconState::from_ssz_bytes(&bytes, spec) + .map_err(|error| format!("failed to decode state: {error:?}")) +} + +fn list_era_files(dir: &Path) -> Result, String> { + let entries = fs::read_dir(dir).map_err(|error| format!("failed to read era dir: {error}"))?; + let mut era_files = Vec::new(); + + for entry in entries { + let entry = entry.map_err(|error| format!("failed to read era entry: {error}"))?; + let path = entry.path(); + let Some(file_name) = path.file_name().and_then(|name| name.to_str()) else { + continue; + }; + + if !file_name.ends_with(".era") { + continue; + } + + let Some((prefix, _hash_part)) = file_name.rsplit_once('-') else { + continue; + }; + let Some((_network_name, era_part)) = prefix.rsplit_once('-') else { + continue; + }; + let Some(era_number) = era_part.parse().ok() else { + continue; + }; + + era_files.push((era_number, path)); + } + + if era_files.is_empty() { + warn!(?dir, "Era files directory is empty"); + } + + Ok(era_files) +} diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index 4ac3e54742d..4cd4603c1f6 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -21,6 +21,7 @@ pub mod custody_context; pub mod data_availability_checker; pub mod data_column_verification; mod early_attester_cache; +mod era_file_consumer; mod errors; pub mod events; pub mod execution_payload; diff --git a/beacon_node/client/src/builder.rs b/beacon_node/client/src/builder.rs index c48021e45d4..5b852b48999 100644 --- a/beacon_node/client/src/builder.rs +++ b/beacon_node/client/src/builder.rs @@ -248,7 +248,12 @@ where ClientGenesis::DepositContract } else if chain_exists { - if matches!(client_genesis, ClientGenesis::WeakSubjSszBytes { .. }) + if matches!(client_genesis, ClientGenesis::EraFiles { .. }) { + info!( + msg = "database already exists, use --purge-db to force era import", + "Refusing to import era files" + ); + } else if matches!(client_genesis, ClientGenesis::WeakSubjSszBytes { .. }) || matches!(client_genesis, ClientGenesis::CheckpointSyncUrl { .. }) { info!( @@ -460,6 +465,12 @@ where return Err("Loading genesis from deposit contract no longer supported".to_string()); } ClientGenesis::FromStore => builder.resume_from_db()?, + ClientGenesis::EraFiles { era_files_dir } => { + info!(?era_files_dir, "Importing era files"); + let genesis_state = genesis_state(&runtime_context, &config).await?; + let builder = builder.era_files(&era_files_dir, genesis_state)?; + builder.reconstruct_historic_states_from_era_states()? + } }; self.beacon_chain_builder = Some(beacon_chain_builder); diff --git a/beacon_node/client/src/config.rs b/beacon_node/client/src/config.rs index aeaa196df86..db5ad0d217a 100644 --- a/beacon_node/client/src/config.rs +++ b/beacon_node/client/src/config.rs @@ -45,6 +45,9 @@ pub enum ClientGenesis { CheckpointSyncUrl { url: SensitiveUrl, }, + EraFiles { + era_files_dir: PathBuf, + }, } /// The core configuration of a Lighthouse beacon node. diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index f874bd14997..bd5b0305e25 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -811,8 +811,13 @@ pub fn get_config( client_config.chain.genesis_backfill = true; } - client_config.network.era_files_dir = - clap_utils::parse_optional(cli_args, "era-files-dir")?; + if let Some(dir) = clap_utils::parse_optional::(cli_args, "era-files-dir")? { + let path = PathBuf::from(dir); + client_config.store.era_files_dir = Some(path.clone()); + client_config.genesis = ClientGenesis::EraFiles { + era_files_dir: path, + }; + } client_config.chain.complete_blob_backfill = cli_args.get_flag("complete-blob-backfill"); diff --git a/beacon_node/store/src/config.rs b/beacon_node/store/src/config.rs index 0aa00e659bc..9b17af22587 100644 --- a/beacon_node/store/src/config.rs +++ b/beacon_node/store/src/config.rs @@ -5,6 +5,7 @@ use ssz::{Decode, Encode}; use ssz_derive::{Decode, Encode}; use std::io::{Read, Write}; use std::num::NonZeroUsize; +use std::path::PathBuf; use strum::{Display, EnumString, VariantNames}; use superstruct::superstruct; use types::EthSpec; @@ -64,6 +65,8 @@ pub struct StoreConfig { /// The margin for blob pruning in epochs. The oldest blobs are pruned up until /// data_availability_boundary - blob_prune_margin_epochs. Default: 0. pub blob_prune_margin_epochs: u64, + /// Optional path for era file import/production. + pub era_files_dir: Option, } /// Variant of `StoreConfig` that gets written to disk. Contains immutable configuration params. @@ -120,6 +123,7 @@ impl Default for StoreConfig { prune_blobs: true, epochs_per_blob_prune: DEFAULT_EPOCHS_PER_BLOB_PRUNE, blob_prune_margin_epochs: DEFAULT_BLOB_PUNE_MARGIN_EPOCHS, + era_files_dir: None, } } } diff --git a/beacon_node/store/src/reconstruct.rs b/beacon_node/store/src/reconstruct.rs index 7aca692ef9b..d97be9ac774 100644 --- a/beacon_node/store/src/reconstruct.rs +++ b/beacon_node/store/src/reconstruct.rs @@ -9,7 +9,7 @@ use state_processing::{ }; use std::sync::Arc; use tracing::{debug, info}; -use types::EthSpec; +use types::{EthSpec, Slot}; impl HotColdDB where From 4d6d79eb7e0bc2dc0978176b4a606c000c5a5373 Mon Sep 17 00:00:00 2001 From: dapplion <35266934+dapplion@users.noreply.github.com> Date: Fri, 26 Dec 2025 17:56:48 -0300 Subject: [PATCH 4/4] Working initial era file loading --- beacon_node/beacon_chain/src/builder.rs | 46 ++++- .../beacon_chain/src/era_file_consumer.rs | 57 +++++- beacon_node/client/src/builder.rs | 3 +- beacon_node/store/src/hot_cold_store.rs | 13 +- beacon_node/store/src/reconstruct.rs | 165 ++++++++++-------- .../src/per_block_processing.rs | 5 +- .../src/per_block_processing/errors.rs | 5 +- 7 files changed, 201 insertions(+), 93 deletions(-) diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index c620a569f9d..d687bf15839 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -6,6 +6,7 @@ use crate::beacon_chain::{ use crate::beacon_proposer_cache::BeaconProposerCache; use crate::custody_context::NodeCustodyType; use crate::data_availability_checker::DataAvailabilityChecker; +use crate::era_file_consumer::import_era_files; use crate::fork_choice_signal::ForkChoiceSignalTx; use crate::fork_revert::{reset_fork_choice_to_finalization, revert_to_fork_boundary}; use crate::graffiti_calculator::{GraffitiCalculator, GraffitiOrigin}; @@ -18,7 +19,6 @@ use crate::persisted_custody::load_custody_context; use crate::shuffling_cache::{BlockShufflingIds, ShufflingCache}; use crate::validator_monitor::{ValidatorMonitor, ValidatorMonitorConfig}; use crate::validator_pubkey_cache::ValidatorPubkeyCache; -use crate::era_file_consumer::import_era_files; use crate::{ BeaconChain, BeaconChainTypes, BeaconForkChoiceStore, BeaconSnapshot, ServerSentEventHandler, }; @@ -211,14 +211,44 @@ where pub fn era_files( self, era_files_dir: &Path, - genesis_state: BeaconState, + mut genesis_state: BeaconState, ) -> Result { - let builder = self.genesis_state(genesis_state)?; - let store = builder - .store - .clone() - .ok_or("era_files requires a store.")?; - import_era_files(&store, era_files_dir, &builder.spec)?; + let genesis_state_root = genesis_state + .canonical_root() + .map_err(|e| format!("Error computing genesis state root: {e:?}"))?; + + let builder = self.genesis_state(genesis_state.clone())?; + let store = builder.store.clone().ok_or("era_files requires a store.")?; + + { + let mut ops = vec![]; + store + .store_cold_state(&genesis_state_root, &genesis_state, &mut ops) + .map_err(|e| format!("Error building genesis state write ops: {e:?}"))?; + store + .cold_db + .do_atomically(ops) + .map_err(|e| format!("Error writing genesis state: {e:?}"))?; + } + + let max_era = import_era_files(&store, era_files_dir, &builder.spec)?; + let slots_per_historical_root = E::slots_per_historical_root() as u64; + (1..=max_era).into_par_iter().try_for_each(|era_number| { + let start_slot = Slot::new((era_number - 1) * slots_per_historical_root); + let end_slot = Slot::new(era_number * slots_per_historical_root); + store + .reconstruct_historic_states_on_range( + // Start reconstruction with state at the era file start, but the state has the + // block already applied. So start with the block at the next slot. + start_slot, + start_slot + Slot::new(1), + end_slot, + ) + .map_err(|error| { + format!("Era reconstruction failed for era {era_number}: {error:?}") + }) + })?; + Ok(builder) } diff --git a/beacon_node/beacon_chain/src/era_file_consumer.rs b/beacon_node/beacon_chain/src/era_file_consumer.rs index 135a858a7d0..47f6a4979a5 100644 --- a/beacon_node/beacon_chain/src/era_file_consumer.rs +++ b/beacon_node/beacon_chain/src/era_file_consumer.rs @@ -3,15 +3,15 @@ use reth_era::era::file::EraReader; use reth_era::era::types::consensus::{CompressedBeaconState, CompressedSignedBeaconBlock}; use std::fs::{self, File}; use std::path::{Path, PathBuf}; -use store::{HotColdDB, ItemStore}; +use store::{DBColumn, HotColdDB, ItemStore, KeyValueStoreOp}; use tracing::{info, warn}; -use types::{BeaconState, ChainSpec, EthSpec, SignedBeaconBlock}; +use types::{BeaconState, ChainSpec, EthSpec, SignedBeaconBlock, Slot}; pub(crate) fn import_era_files, Cold: ItemStore>( store: &HotColdDB, era_files_dir: &Path, spec: &ChainSpec, -) -> Result<(), String> { +) -> Result { let mut era_files = list_era_files(era_files_dir)?; era_files.sort_by_key(|(era_number, _)| *era_number); @@ -20,18 +20,21 @@ pub(crate) fn import_era_files, Cold: ItemStore .clone() .unwrap_or_else(|| "unknown".to_string()); + let mut max_era = None; for (era_number, path) in era_files { info!(era_number, ?path, "Importing era file"); - import_era_file(store, &path, &network_name, spec) - .map_err(|error| format!("era file import failed: {error}"))?; + import_era_file(store, &path, era_number, &network_name, spec) + .map_err(|error| format!("era file {era_number} {path:?} import failed: {error}"))?; + max_era = Some(era_number); } - Ok(()) + max_era.ok_or_else(|| "era files directory is empty".to_string()) } fn import_era_file, Cold: ItemStore>( store: &HotColdDB, path: &Path, + era_number: u64, network_name: &str, spec: &ChainSpec, ) -> Result<(), String> { @@ -52,9 +55,11 @@ fn import_era_file, Cold: ItemStore>( let state_root = state .canonical_root() .map_err(|error| format!("failed to hash state: {error:?}"))?; + // Use put_cold_state as the split is not updated and we need the state into the cold store. store - .put_state(&state_root, &state) + .put_cold_state(&state_root, &state) .map_err(|error| format!("failed to store state: {error:?}"))?; + write_block_root_index_for_era(store, &state, era_number)?; Ok(()) } @@ -81,6 +86,44 @@ fn decode_state( .map_err(|error| format!("failed to decode state: {error:?}")) } +fn write_block_root_index_for_era, Cold: ItemStore>( + store: &HotColdDB, + state: &BeaconState, + era_number: u64, +) -> Result<(), String> { + let end_slot = state.slot(); + let slots_per_historical_root = E::slots_per_historical_root() as u64; + let expected_end_slot = Slot::new(era_number * slots_per_historical_root); + if end_slot != expected_end_slot { + return Err(format!( + "era state slot mismatch: expected {expected_end_slot}, got {end_slot}" + )); + } + + let start_slot = end_slot.saturating_sub(slots_per_historical_root); + + let ops = (start_slot.as_u64()..end_slot.as_u64()) + .map(|slot_u64| { + let slot = Slot::new(slot_u64); + let block_root = state + .get_block_root(slot) + .map_err(|error| format!("failed to read block root {slot}: {error:?}"))?; + Ok(KeyValueStoreOp::PutKeyValue( + DBColumn::BeaconBlockRoots, + slot_u64.to_be_bytes().to_vec(), + block_root.as_slice().to_vec(), + )) + }) + .collect::, String>>()?; + + store + .cold_db + .do_atomically(ops) + .map_err(|error| format!("failed to store block root index: {error:?}"))?; + + Ok(()) +} + fn list_era_files(dir: &Path) -> Result, String> { let entries = fs::read_dir(dir).map_err(|error| format!("failed to read era dir: {error}"))?; let mut era_files = Vec::new(); diff --git a/beacon_node/client/src/builder.rs b/beacon_node/client/src/builder.rs index 5b852b48999..a7db467b1d3 100644 --- a/beacon_node/client/src/builder.rs +++ b/beacon_node/client/src/builder.rs @@ -468,8 +468,7 @@ where ClientGenesis::EraFiles { era_files_dir } => { info!(?era_files_dir, "Importing era files"); let genesis_state = genesis_state(&runtime_context, &config).await?; - let builder = builder.era_files(&era_files_dir, genesis_state)?; - builder.reconstruct_historic_states_from_era_states()? + builder.era_files(&era_files_dir, genesis_state)? } }; diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index c4137191744..99657178145 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -196,7 +196,7 @@ pub enum HotColdDBError { RestorePointDecodeError(ssz::DecodeError), BlockReplayBeaconError(BeaconStateError), BlockReplaySlotError(SlotProcessingError), - BlockReplayBlockError(BlockProcessingError), + BlockReplayBlockError(Slot, BlockProcessingError), InvalidSlotsPerRestorePoint { slots_per_restore_point: u64, slots_per_historical_root: u64, @@ -1039,6 +1039,17 @@ impl, Cold: ItemStore> HotColdDB } } + /// Store and commit a state into the cold db store. + pub fn put_cold_state( + &self, + state_root: &Hash256, + state: &BeaconState, + ) -> Result<(), Error> { + let mut ops: Vec = Vec::new(); + self.store_cold_state(state_root, state, &mut ops)?; + self.cold_db.do_atomically(ops) + } + /// Fetch a state from the store. /// /// If `slot` is provided then it will be used as a hint as to which database should diff --git a/beacon_node/store/src/reconstruct.rs b/beacon_node/store/src/reconstruct.rs index d97be9ac774..5f20dcb161c 100644 --- a/beacon_node/store/src/reconstruct.rs +++ b/beacon_node/store/src/reconstruct.rs @@ -1,8 +1,9 @@ //! Implementation of historic state reconstruction (given complete block history). +use crate::forwards_iter::FrozenForwardsIterator; use crate::hot_cold_store::{HotColdDB, HotColdDBError}; -use crate::metrics; +use crate::{DBColumn, KeyValueStoreOp, metrics}; use crate::{Error, ItemStore}; -use itertools::{Itertools, process_results}; +use itertools::process_results; use state_processing::{ BlockSignatureStrategy, ConsensusContext, VerifyBlockRoot, per_block_processing, per_slot_processing, @@ -53,40 +54,103 @@ where return Ok(()); } - // If `num_blocks` is not specified iterate all blocks. Add 1 so that we end on an epoch - // boundary when `num_blocks` is a multiple of an epoch boundary. We want to be *inclusive* - // of the state at slot `lower_limit_slot + num_blocks`. - let block_root_iter = self - .forwards_block_roots_iterator_until(lower_limit_slot, upper_limit_slot - 1, || { - Err(Error::StateShouldNotBeRequired(upper_limit_slot - 1)) - })? - .take(num_blocks.map_or(usize::MAX, |n| n + 1)); + let from_slot = lower_limit_slot; + let to_slot = if let Some(num_blocks) = num_blocks { + std::cmp::min(upper_limit_slot, from_slot + Slot::new(num_blocks as u64)) + } else { + upper_limit_slot + }; + + self.reconstruct_historic_states_on_range(from_slot, from_slot, to_slot)?; + + let remaining = upper_limit_slot + .as_u64() + .saturating_sub(1) + .saturating_sub(to_slot.as_u64()); + info!( + slot = %to_slot, + remaining = %remaining, + "State reconstruction in progress" + ); + + // Update anchor. + let old_anchor = anchor.clone(); + + let reconstruction_complete = to_slot == upper_limit_slot; + if reconstruction_complete { + let new_anchor = old_anchor.as_archive_anchor(); + self.compare_and_set_anchor_info_with_write(old_anchor, new_anchor)?; + + return Ok(()); + } else { + // The lower limit has been raised, store it. + anchor.state_lower_limit = to_slot; + + self.compare_and_set_anchor_info_with_write(old_anchor, anchor.clone())?; + } + + // Check that the split point wasn't mutated during the state reconstruction process. + // It shouldn't have been, due to the serialization of requests through the store migrator, + // so this is just a paranoid check. + let latest_split = self.get_split_info(); + if split != latest_split { + return Err(Error::SplitPointModified(latest_split.slot, split.slot)); + } + + Ok(()) + } + + pub fn reconstruct_historic_states_on_range( + self: &Arc, + with_state_at_slot: Slot, + from_slot: Slot, + to_slot: Slot, + ) -> Result<(), Error> { + debug!( + %from_slot, + %to_slot, + "Starting state reconstruction batch" + ); + + let _t = metrics::start_timer(&metrics::STORE_BEACON_RECONSTRUCTION_TIME); + + let block_root_iter = + FrozenForwardsIterator::new(self, DBColumn::BeaconBlockRoots, from_slot, to_slot)?; // The state to be advanced. - let mut state = self.load_cold_state_by_slot(lower_limit_slot)?; + let mut state = self.load_cold_state_by_slot(with_state_at_slot)?; state.build_caches(&self.spec)?; process_results(block_root_iter, |iter| -> Result<(), Error> { let mut io_batch = vec![]; - let mut prev_state_root = None; - for ((prev_block_root, _), (block_root, slot)) in iter.tuple_windows() { - let is_skipped_slot = prev_block_root == block_root; - - let block = if is_skipped_slot { - None - } else { - Some( - self.get_blinded_block(&block_root)? - .ok_or(Error::BlockNotFound(block_root))?, - ) + for (block_root, slot) in iter { + io_batch.push(KeyValueStoreOp::PutKeyValue( + DBColumn::BeaconBlockRoots, + slot.as_u64().to_be_bytes().to_vec(), + block_root.as_slice().to_vec(), + )); + + let block = { + let block = self + .get_blinded_block(&block_root)? + .ok_or(Error::BlockNotFound(block_root))?; + if block.slot() == slot && block.slot() > self.spec.genesis_slot { + // If block.slot != slot means it's a skipped slot. + // Also skip applying the genesis slot. + Some(block) + } else { + None + } }; // Advance state to slot. - per_slot_processing(&mut state, prev_state_root.take(), &self.spec) - .map_err(HotColdDBError::BlockReplaySlotError)?; + while state.slot() < slot { + per_slot_processing(&mut state, prev_state_root.take(), &self.spec) + .map_err(HotColdDBError::BlockReplaySlotError)?; + } // Apply block. if let Some(block) = block { @@ -102,7 +166,7 @@ where &mut ctxt, &self.spec, ) - .map_err(HotColdDBError::BlockReplayBlockError)?; + .map_err(|e| HotColdDBError::BlockReplayBlockError(block.slot(), e))?; prev_state_root = Some(block.state_root()); } @@ -114,58 +178,21 @@ where // Stage state for storage in freezer DB. self.store_cold_state(&state_root, &state, &mut io_batch)?; - let batch_complete = - num_blocks.is_some_and(|n_blocks| slot == lower_limit_slot + n_blocks as u64); - let reconstruction_complete = slot + 1 == upper_limit_slot; + let batch_complete = slot + 1 == to_slot; // Commit the I/O batch if: // // - The diff/snapshot for this slot is required for future slots, or // - The reconstruction batch is complete (we are about to return), or // - Reconstruction is complete. - if self.hierarchy.should_commit_immediately(slot)? - || batch_complete - || reconstruction_complete - { - info!( - %slot, - remaining = %(upper_limit_slot - 1 - slot), - "State reconstruction in progress" - ); - + if self.hierarchy.should_commit_immediately(slot)? || batch_complete { self.cold_db.do_atomically(std::mem::take(&mut io_batch))?; - // Update anchor. - let old_anchor = anchor.clone(); - - if reconstruction_complete { - // The two limits have met in the middle! We're done! - // Perform one last integrity check on the state reached. - let computed_state_root = state.update_tree_hash_cache()?; - if computed_state_root != state_root { - return Err(Error::StateReconstructionRootMismatch { - slot, - expected: state_root, - computed: computed_state_root, - }); - } - - let new_anchor = old_anchor.as_archive_anchor(); - self.compare_and_set_anchor_info_with_write(old_anchor, new_anchor)?; - - return Ok(()); - } else { - // The lower limit has been raised, store it. - anchor.state_lower_limit = slot; - - self.compare_and_set_anchor_info_with_write(old_anchor, anchor.clone())?; - } - // If this is the end of the batch, return Ok. The caller will run another // batch when there is idle capacity. if batch_complete { debug!( - start_slot = %lower_limit_slot, + start_slot = %from_slot, end_slot = %slot, "Finished state reconstruction batch" ); @@ -179,14 +206,6 @@ where Err(Error::StateReconstructionLogicError) })??; - // Check that the split point wasn't mutated during the state reconstruction process. - // It shouldn't have been, due to the serialization of requests through the store migrator, - // so this is just a paranoid check. - let latest_split = self.get_split_info(); - if split != latest_split { - return Err(Error::SplitPointModified(latest_split.slot, split.slot)); - } - Ok(()) } } diff --git a/consensus/state_processing/src/per_block_processing.rs b/consensus/state_processing/src/per_block_processing.rs index 07149ff2ee8..4a32ba9f0ae 100644 --- a/consensus/state_processing/src/per_block_processing.rs +++ b/consensus/state_processing/src/per_block_processing.rs @@ -212,7 +212,10 @@ pub fn process_block_header( // Verify that the slots match verify!( block_header.slot == state.slot(), - HeaderInvalid::StateSlotMismatch + HeaderInvalid::StateSlotMismatch { + state: state.slot(), + header: block_header.slot, + } ); // Verify that the block is newer than the latest block header diff --git a/consensus/state_processing/src/per_block_processing/errors.rs b/consensus/state_processing/src/per_block_processing/errors.rs index ff7c0204e24..4be30dcbef4 100644 --- a/consensus/state_processing/src/per_block_processing/errors.rs +++ b/consensus/state_processing/src/per_block_processing/errors.rs @@ -267,7 +267,10 @@ impl From for BlockOperationError { #[derive(Debug, PartialEq, Clone)] pub enum HeaderInvalid { ProposalSignatureInvalid, - StateSlotMismatch, + StateSlotMismatch { + state: Slot, + header: Slot, + }, OlderThanLatestBlockHeader { latest_block_header_slot: Slot, block_slot: Slot,