Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
e63c21b
Era files consumer
dapplion Dec 21, 2025
7360f6c
Remove extra fn
dapplion Dec 23, 2025
a3a6db1
Fill DB on startup
dapplion Dec 24, 2025
d55b53e
Working initial era file loading
dapplion Dec 26, 2025
c48c3f9
Remove era backfill
dapplion Dec 28, 2025
a251763
Track progress in era file reconstruction
dapplion Dec 28, 2025
9531d28
Cleaner impl with more checks
dapplion Dec 28, 2025
6fd9b41
Add checks precapella
dapplion Dec 30, 2025
e7ac055
Parallel import
dapplion Dec 30, 2025
cf6788c
Add logs
dapplion Dec 30, 2025
e2bf767
add more logs
dapplion Dec 30, 2025
3cd3063
Debug stalled loop
dapplion Jan 2, 2026
e7c4d9c
Improve error
dapplion Jan 5, 2026
ba72270
Fix off by one condition on using historical summaries
dapplion Jan 5, 2026
f348a5c
Add ERA file producer on a running beacon node
dapplion Dec 22, 2025
83342a1
Era files consumer
dapplion Dec 21, 2025
2f7a52e
Remove extra fn
dapplion Dec 23, 2025
cb242aa
Fill DB on startup
dapplion Dec 24, 2025
3a460b4
Working initial era file loading
dapplion Dec 26, 2025
8078d7a
Remove era backfill
dapplion Dec 28, 2025
2fb4316
Track progress in era file reconstruction
dapplion Dec 28, 2025
8965fd0
Cleaner impl with more checks
dapplion Dec 28, 2025
ddba894
Add checks precapella
dapplion Dec 30, 2025
1f94cee
Parallel import
dapplion Dec 30, 2025
595a451
Add logs
dapplion Dec 30, 2025
eb27fad
add more logs
dapplion Dec 30, 2025
3a87d8d
Debug stalled loop
dapplion Jan 2, 2026
4fac1e3
Improve error
dapplion Jan 5, 2026
df77910
Fix off by one condition on using historical summaries
dapplion Jan 5, 2026
6e51914
fix lint: remove unused import, sort deps
dapplion Feb 10, 2026
bbe4b40
fix test: provide StateSlotMismatch struct fields
dapplion Feb 10, 2026
9989d6d
fix test and sort Cargo.toml
dapplion Feb 10, 2026
34d5e03
regenerate CLI docs and update deps for cargo audit
dapplion Feb 10, 2026
6b863b5
Add ERA file CI test vectors with comprehensive verification
dapplion Feb 12, 2026
de03695
merge consumer tests into producer branch
dapplion Feb 12, 2026
cf71081
add era producer tests
dapplion Feb 12, 2026
1c4ae7f
fix era producer bugs: first block skip, filename numbering, state of…
dapplion Feb 13, 2026
8ae7381
update ERA test vectors README with test coverage details
dapplion Feb 13, 2026
17329cf
add ERA file corruption tests
dapplion Feb 13, 2026
b794d6f
simplify corruption tests with pre-committed corrupt files
dapplion Feb 13, 2026
c0b28da
add corruption tests for ERA consumer
dapplion Feb 13, 2026
8bf05cd
fix clippy lint: use sort_by_key instead of sort_by
dapplion Feb 13, 2026
bf98b5d
era: add trusted state root verification (PR #65 item #6)
dapplion Feb 13, 2026
e5f2bf5
fix clippy: collapse nested if statements in ERA code
dapplion Feb 13, 2026
058ac92
fix warnings: remove unused variables in ERA tests
dapplion Feb 13, 2026
1ab6b4d
fix clippy lints and formatting
dapplion Feb 13, 2026
c9368bd
refactor ERA tests: consolidate, simplify, fix corrupt file generation
dapplion Feb 13, 2026
aaa6e94
verify block hash chain + state roots, add mutated balance test
dapplion Feb 14, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
845 changes: 457 additions & 388 deletions Cargo.lock

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ alloy-consensus = { version = "1", default-features = false }
alloy-dyn-abi = { version = "1", default-features = false }
alloy-json-abi = { version = "1", default-features = false }
alloy-network = { version = "1", default-features = false }
alloy-primitives = { version = "1", default-features = false, features = ["rlp", "getrandom"] }
alloy-primitives = { version = "1.5", default-features = false, features = ["rlp", "getrandom"] }
alloy-provider = { version = "1", default-features = false, features = ["reqwest"] }
alloy-rlp = { version = "0.3", default-features = false }
alloy-rpc-types-eth = { version = "1", default-features = false, features = ["serde"] }
Expand Down Expand Up @@ -138,6 +138,7 @@ eip_3076 = { path = "common/eip_3076" }
either = "1.9"
environment = { path = "lighthouse/environment" }
eth2 = { path = "common/eth2" }
reth-era = { git = "https://github.com/paradigmxyz/reth", package = "reth-era", rev = "62abfdaeb54e8a205a8ee085ddebd56047d93374" }
eth2_config = { path = "common/eth2_config" }
eth2_key_derivation = { path = "crypto/eth2_key_derivation" }
eth2_keystore = { path = "crypto/eth2_keystore" }
Expand Down
3 changes: 3 additions & 0 deletions beacon_node/beacon_chain/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ parking_lot = { workspace = true }
proto_array = { workspace = true }
rand = { workspace = true }
rayon = { workspace = true }
reth-era = { workspace = true }
safe_arith = { workspace = true }
sensitive_url = { workspace = true }
serde = { workspace = true }
Expand Down Expand Up @@ -76,6 +77,8 @@ maplit = { workspace = true }
mockall = { workspace = true }
mockall_double = { workspace = true }
serde_json = { workspace = true }
serde_yaml = { workspace = true }
tempfile = { workspace = true }

[[bench]]
name = "benches"
Expand Down
181 changes: 180 additions & 1 deletion beacon_node/beacon_chain/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::beacon_chain::{
use crate::beacon_proposer_cache::BeaconProposerCache;
use crate::custody_context::NodeCustodyType;
use crate::data_availability_checker::DataAvailabilityChecker;
use crate::era_file_consumer::EraFileDir;
use crate::fork_choice_signal::ForkChoiceSignalTx;
use crate::fork_revert::{reset_fork_choice_to_finalization, revert_to_fork_boundary};
use crate::graffiti_calculator::{GraffitiCalculator, GraffitiOrigin};
Expand Down Expand Up @@ -37,8 +38,10 @@ use slasher::Slasher;
use slot_clock::{SlotClock, TestingSlotClock};
use state_processing::{AllCaches, per_slot_processing};
use std::marker::PhantomData;
use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{Duration, Instant};
use store::{Error as StoreError, HotColdDB, ItemStore, KeyValueStoreOp};
use task_executor::{ShutdownReason, TaskExecutor};
use tracing::{debug, error, info};
Expand Down Expand Up @@ -206,6 +209,142 @@ where
self
}

/// Import trusted era files into the store before building the chain.
pub fn era_files(
self,
era_files_dir: &Path,
mut genesis_state: BeaconState<E>,
trusted_state_root: Option<Hash256>,
trusted_slot: Option<Slot>,
) -> Result<Self, String> {
let genesis_state_root = genesis_state
.canonical_root()
.map_err(|e| format!("Error computing genesis state root: {e:?}"))?;

let builder = self.genesis_state(genesis_state.clone())?;
let store = builder.store.clone().ok_or("era_files requires a store.")?;

// Explicitly store the genesis state in the cold DB. In testing this seems necessary.
// TODO(era): Review why ^
{
let mut ops = vec![];
store
.store_cold_state(&genesis_state_root, &genesis_state, &mut ops)
.map_err(|e| format!("Error building genesis state write ops: {e:?}"))?;
store
.cold_db
.do_atomically(ops)
.map_err(|e| format!("Error writing genesis state: {e:?}"))?;
}

// Import all blocks and states from the ERA files.
let era_dir = EraFileDir::new::<E>(era_files_dir, &builder.spec)?;
let max_era = era_dir.max_era();
let slots_per_historical_root = E::slots_per_historical_root() as u64;

let imported_era_files_pointer = store
.get_era_import_pointer()
.map_err(|error| format!("Era import pointer read failed: {error:?}"))?
.unwrap_or(0);
info!(
?era_files_dir,
max_slot = max_era * slots_per_historical_root,
current_slot = imported_era_files_pointer * slots_per_historical_root,
"Importing blocks and states from ERA files"
);
let mut import_progress = Speedo::default();
let mut import_last_log = Instant::now();
for era_number in imported_era_files_pointer + 1..=max_era {
debug!(?era_files_dir, era_number, "Importing ERA file");
era_dir.import_era_file(
&store,
era_number,
&builder.spec,
trusted_state_root,
trusted_slot,
)?;
debug!(?era_files_dir, era_number, "Imported ERA file");
store
.set_era_import_pointer(era_number)
.map_err(|error| format!("Era import pointer write failed: {error:?}"))?;
debug!(?era_files_dir, era_number, "Persisted era pointer");

let now = Instant::now();
let done_slots = era_number * slots_per_historical_root;
import_progress.observe(Slot::new(done_slots), now);
if now.duration_since(import_last_log) >= Duration::from_secs(5) {
import_last_log = now;
info!(
completed_era_files = era_number,
total_era_files = max_era,
completed_slots = done_slots,
total_slots = max_era * slots_per_historical_root,
slots_per_second = import_progress.slots_per_second().unwrap_or(0.0),
"Importing era files"
);
}
}

info!(
?era_files_dir,
max_slot = max_era * slots_per_historical_root,
"Reconstructing states from ERA files"
);
let total_era_files = max_era;
let completed_era_files = Arc::new(AtomicU64::new(0));
let progress = Arc::new(Mutex::new((Speedo::default(), Instant::now())));

(1..=max_era).into_par_iter().try_for_each(|era_number| {
let already_reconstructed = store
.era_reconstruction_done(era_number)
.map_err(|error| format!("Era reconstruction marker read failed: {error:?}"))?;

if !already_reconstructed {
let start_slot = Slot::new((era_number - 1) * slots_per_historical_root);
let end_slot = Slot::new(era_number * slots_per_historical_root);
store
.reconstruct_historic_states_on_range(
// Start reconstruction with state at the era file start, but the state has the
// block already applied. So start with the block at the next slot.
start_slot,
start_slot + Slot::new(1),
end_slot,
)
.map_err(|error| {
format!("Era reconstruction failed for era {era_number}: {error:?}")
})?;

store
.set_era_reconstruction_done(era_number)
.map_err(|error| {
format!("Era reconstruction marker write failed: {error:?}")
})?;
}

let now = Instant::now();
let done_era_files = completed_era_files.fetch_add(1, Ordering::Relaxed) + 1;
let done_slots = done_era_files.saturating_mul(slots_per_historical_root);
let mut progress = progress.lock();
let (speedo, last_log) = &mut *progress;
speedo.observe(Slot::new(done_slots), now);
if now.duration_since(*last_log) >= Duration::from_secs(5) {
*last_log = now;
info!(
completed_era_files = done_era_files,
total_era_files,
completed_slots = done_slots,
total_slots = total_era_files.saturating_mul(slots_per_historical_root),
slots_per_second = speedo.slots_per_second().unwrap_or(0.0),
"Reconstructing from era files"
);
}

Ok::<(), String>(())
})?;

Ok(builder)
}

/// Sets the store migrator config (optional).
pub fn store_migrator_config(mut self, config: MigratorConfig) -> Self {
self.store_migrator_config = Some(config);
Expand Down Expand Up @@ -1240,6 +1379,46 @@ fn build_data_columns_from_blobs<E: EthSpec>(
Ok(data_columns)
}

/// Track recent slot completion rates for era reconstruction.
#[derive(Default)]
struct Speedo(Vec<(Slot, Instant)>);

impl Speedo {
fn observe(&mut self, slot: Slot, instant: Instant) {
const SPEEDO_OBSERVATIONS: usize = 4;
if self.0.len() > SPEEDO_OBSERVATIONS {
self.0.remove(0);
}
self.0.push((slot, instant));
}

fn slots_per_second(&self) -> Option<f64> {
let speeds = self
.0
.windows(2)
.filter_map(|windows| {
let (slot_a, instant_a) = windows[0];
let (slot_b, instant_b) = windows[1];
let distance = f64::from((slot_b - slot_a).as_u64() as u32);
let seconds = f64::from((instant_b - instant_a).as_millis() as u32) / 1_000.0;
if seconds > 0.0 {
Some(distance / seconds)
} else {
None
}
})
.collect::<Vec<f64>>();

let count = speeds.len();
let sum: f64 = speeds.iter().sum();
if count > 0 {
Some(sum / f64::from(count as u32))
} else {
None
}
}
}

#[cfg(not(debug_assertions))]
#[cfg(test)]
mod test {
Expand Down
Loading
Loading