Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
838 changes: 453 additions & 385 deletions Cargo.lock

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ alloy-consensus = { version = "1", default-features = false }
alloy-dyn-abi = { version = "1", default-features = false }
alloy-json-abi = { version = "1", default-features = false }
alloy-network = { version = "1", default-features = false }
alloy-primitives = { version = "1", default-features = false, features = ["rlp", "getrandom"] }
alloy-primitives = { version = "1.5", default-features = false, features = ["rlp", "getrandom"] }
alloy-provider = { version = "1", default-features = false, features = ["reqwest"] }
alloy-rlp = { version = "0.3", default-features = false }
alloy-rpc-types-eth = { version = "1", default-features = false, features = ["serde"] }
Expand Down Expand Up @@ -225,6 +225,7 @@ reqwest = { version = "0.12", default-features = false, features = [
"stream",
"rustls-tls",
] }
reth-era = { git = "https://github.com/paradigmxyz/reth", package = "reth-era", rev = "62abfdaeb54e8a205a8ee085ddebd56047d93374" }
ring = "0.17"
rpds = "0.11"
rusqlite = { version = "0.28", features = ["bundled"] }
Expand Down
1 change: 1 addition & 0 deletions beacon_node/beacon_chain/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ parking_lot = { workspace = true }
proto_array = { workspace = true }
rand = { workspace = true }
rayon = { workspace = true }
reth-era = { workspace = true }
safe_arith = { workspace = true }
sensitive_url = { workspace = true }
serde = { workspace = true }
Expand Down
173 changes: 172 additions & 1 deletion beacon_node/beacon_chain/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::beacon_chain::{
use crate::beacon_proposer_cache::BeaconProposerCache;
use crate::custody_context::NodeCustodyType;
use crate::data_availability_checker::DataAvailabilityChecker;
use crate::era_file_consumer::EraFileDir;
use crate::fork_choice_signal::ForkChoiceSignalTx;
use crate::fork_revert::{reset_fork_choice_to_finalization, revert_to_fork_boundary};
use crate::graffiti_calculator::{GraffitiCalculator, GraffitiOrigin};
Expand Down Expand Up @@ -37,8 +38,10 @@ use slasher::Slasher;
use slot_clock::{SlotClock, TestingSlotClock};
use state_processing::{AllCaches, per_slot_processing};
use std::marker::PhantomData;
use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{Duration, Instant};
use store::{Error as StoreError, HotColdDB, ItemStore, KeyValueStoreOp};
use task_executor::{ShutdownReason, TaskExecutor};
use tracing::{debug, error, info};
Expand Down Expand Up @@ -206,6 +209,134 @@ where
self
}

/// Import trusted era files into the store before building the chain.
pub fn era_files(
self,
era_files_dir: &Path,
mut genesis_state: BeaconState<E>,
) -> Result<Self, String> {
let genesis_state_root = genesis_state
.canonical_root()
.map_err(|e| format!("Error computing genesis state root: {e:?}"))?;

let builder = self.genesis_state(genesis_state.clone())?;
let store = builder.store.clone().ok_or("era_files requires a store.")?;

// Explicitly store the genesis state in the cold DB. In testing this seems necessary.
// TODO(era): Review why ^
{
let mut ops = vec![];
store
.store_cold_state(&genesis_state_root, &genesis_state, &mut ops)
.map_err(|e| format!("Error building genesis state write ops: {e:?}"))?;
store
.cold_db
.do_atomically(ops)
.map_err(|e| format!("Error writing genesis state: {e:?}"))?;
}

// Import all blocks and states from the ERA files.
let era_dir = EraFileDir::new::<E>(era_files_dir, &builder.spec)?;
let max_era = era_dir.max_era();
let slots_per_historical_root = E::slots_per_historical_root() as u64;

let imported_era_files_pointer = store
.get_era_import_pointer()
.map_err(|error| format!("Era import pointer read failed: {error:?}"))?
.unwrap_or(0);
info!(
?era_files_dir,
max_slot = max_era * slots_per_historical_root,
current_slot = imported_era_files_pointer * slots_per_historical_root,
"Importing blocks and states from ERA files"
);
let mut import_progress = Speedo::default();
let mut import_last_log = Instant::now();
for era_number in imported_era_files_pointer + 1..=max_era {
debug!(?era_files_dir, era_number, "Importing ERA file");
era_dir.import_era_file(&store, era_number, &builder.spec)?;
debug!(?era_files_dir, era_number, "Imported ERA file");
store
.set_era_import_pointer(era_number)
.map_err(|error| format!("Era import pointer write failed: {error:?}"))?;
debug!(?era_files_dir, era_number, "Persisted era pointer");

let now = Instant::now();
let done_slots = era_number * slots_per_historical_root;
import_progress.observe(Slot::new(done_slots), now);
if now.duration_since(import_last_log) >= Duration::from_secs(5) {
import_last_log = now;
info!(
completed_era_files = era_number,
total_era_files = max_era,
completed_slots = done_slots,
total_slots = max_era * slots_per_historical_root,
slots_per_second = import_progress.slots_per_second().unwrap_or(0.0),
"Importing era files"
);
}
}

info!(
?era_files_dir,
max_slot = max_era * slots_per_historical_root,
"Reconstructing states from ERA files"
);
let total_era_files = max_era;
let completed_era_files = Arc::new(AtomicU64::new(0));
let progress = Arc::new(Mutex::new((Speedo::default(), Instant::now())));

(1..=max_era).into_par_iter().try_for_each(|era_number| {
let already_reconstructed = store
.era_reconstruction_done(era_number)
.map_err(|error| format!("Era reconstruction marker read failed: {error:?}"))?;

if !already_reconstructed {
let start_slot = Slot::new((era_number - 1) * slots_per_historical_root);
let end_slot = Slot::new(era_number * slots_per_historical_root);
store
.reconstruct_historic_states_on_range(
// Start reconstruction with state at the era file start, but the state has the
// block already applied. So start with the block at the next slot.
start_slot,
start_slot + Slot::new(1),
end_slot,
)
.map_err(|error| {
format!("Era reconstruction failed for era {era_number}: {error:?}")
})?;

store
.set_era_reconstruction_done(era_number)
.map_err(|error| {
format!("Era reconstruction marker write failed: {error:?}")
})?;
}

let now = Instant::now();
let done_era_files = completed_era_files.fetch_add(1, Ordering::Relaxed) + 1;
let done_slots = done_era_files.saturating_mul(slots_per_historical_root);
let mut progress = progress.lock();
let (speedo, last_log) = &mut *progress;
speedo.observe(Slot::new(done_slots), now);
if now.duration_since(*last_log) >= Duration::from_secs(5) {
*last_log = now;
info!(
completed_era_files = done_era_files,
total_era_files,
completed_slots = done_slots,
total_slots = total_era_files.saturating_mul(slots_per_historical_root),
slots_per_second = speedo.slots_per_second().unwrap_or(0.0),
"Reconstructing from era files"
);
}

Ok::<(), String>(())
})?;

Ok(builder)
}

/// Sets the store migrator config (optional).
pub fn store_migrator_config(mut self, config: MigratorConfig) -> Self {
self.store_migrator_config = Some(config);
Expand Down Expand Up @@ -1240,6 +1371,46 @@ fn build_data_columns_from_blobs<E: EthSpec>(
Ok(data_columns)
}

/// Track recent slot completion rates for era reconstruction.
#[derive(Default)]
struct Speedo(Vec<(Slot, Instant)>);

impl Speedo {
fn observe(&mut self, slot: Slot, instant: Instant) {
const SPEEDO_OBSERVATIONS: usize = 4;
if self.0.len() > SPEEDO_OBSERVATIONS {
self.0.remove(0);
}
self.0.push((slot, instant));
}

fn slots_per_second(&self) -> Option<f64> {
let speeds = self
.0
.windows(2)
.filter_map(|windows| {
let (slot_a, instant_a) = windows[0];
let (slot_b, instant_b) = windows[1];
let distance = f64::from((slot_b - slot_a).as_u64() as u32);
let seconds = f64::from((instant_b - instant_a).as_millis() as u32) / 1_000.0;
if seconds > 0.0 {
Some(distance / seconds)
} else {
None
}
})
.collect::<Vec<f64>>();

let count = speeds.len();
let sum: f64 = speeds.iter().sum();
if count > 0 {
Some(sum / f64::from(count as u32))
} else {
None
}
}
}

#[cfg(not(debug_assertions))]
#[cfg(test)]
mod test {
Expand Down
Loading
Loading