diff --git a/crates/beacon_state/data/src/encode.rs b/crates/beacon_state/data/src/encode.rs index e62068b..99c6a22 100644 --- a/crates/beacon_state/data/src/encode.rs +++ b/crates/beacon_state/data/src/encode.rs @@ -13,7 +13,7 @@ use blst::min_pk::PublicKey; use crate::{ BeaconState, decompose::FIXED_PART, - types::{B256, Checkpoint, Eth1Data, SyncCommittee}, + types::{B256, Checkpoint, Eth1Data, SLOTS_PER_EPOCH, SyncCommittee}, }; // SSZ-serialised sizes of the fixed-width records (Fulu); mirror `decompose`. @@ -258,11 +258,23 @@ impl BeaconState { // F11_OFF/F12_OFF validators, balances w_u32(w, offs[2])?; w_u32(w, offs[3])?; - // F13 randao_mixes, F14 slashings - write_b256_slice(w, &epoch_base.randao_mixes)?; - for s in epoch_base.slashings.iter() { - w_u64(w, *s)?; + + // The current epoch's bucket is only + // flushed to the array tier at the epoch boundary, so mid-epoch the live + // value lives in the `*_current` caches (the hashing path substitutes + // them in `effective_{randao_mixes,slashings}_into`). + // F13 randao_mixes + let current_epoch = (sl.slot / SLOTS_PER_EPOCH) as usize; + let randao_curr = current_epoch % epoch_base.randao_mixes.len(); + write_b256_slice(w, &epoch_base.randao_mixes[..randao_curr])?; + w.write_all(&sl.randao_mix_current)?; + write_b256_slice(w, &epoch_base.randao_mixes[randao_curr + 1..])?; + // F14 slashings. + let slashings_curr = current_epoch % epoch_base.slashings.len(); + for (i, s) in epoch_base.slashings.iter().enumerate() { + w_u64(w, if i == slashings_curr { sl.current_epoch_slashings } else { *s })?; } + // F15_OFF/F16_OFF previous/current participation w_u32(w, offs[4])?; w_u32(w, offs[5])?; @@ -426,7 +438,10 @@ pub fn decode_checkpoint_pubkeys(bytes: &[u8]) -> Result, Pubkeys #[cfg(test)] mod tests { - use crate::{BeaconState, SpecConfig, decode_checkpoint_pubkeys, encode::Section}; + use crate::{ + BeaconState, EpochGroup, EpochStateFinalized, SLOTS_PER_HISTORICAL_ROOT, SlotState, + SlotStateFinalized, SlotStateGroup, SpecConfig, decode_checkpoint_pubkeys, encode::Section, + }; /// `encode → decompose → encode` must be a byte-exact fixed point, and the /// chunked streaming path must match the one-pass path. The pre-bootstrap @@ -456,7 +471,47 @@ mod tests { assert_eq!(once, streamed, "chunk-streamed SSZ differs from one-pass"); } - /// Pubkeys sidecar: write → decode must reproduce the decompressed keys. + /// Regression: a checkpoint persisted mid-epoch must encode the *live* + /// current-epoch randao mix / slashings (the `*_current` caches), not the + /// stale array bucket — that bucket is only refreshed at the epoch + /// boundary. + #[test] + fn encode_uses_live_current_epoch_caches() { + let cur = 3usize; + let mut bs = BeaconState::pre_bootstrap(); + + // Stale ring buckets, distinct from the live `*_current` caches the + // encode path must emit instead. + let mut epoch = EpochStateFinalized::default(); + epoch.randao_mixes[cur] = [0xAA; 32]; + epoch.slashings[cur] = 111; + bs.epoch = EpochGroup::new(epoch); + + let zero_roots = || vec![[0u8; 32]; SLOTS_PER_HISTORICAL_ROOT].into_boxed_slice(); + let slot = SlotState { + slot: 96, // epoch 3 + randao_mix_current: [0xBB; 32], + current_epoch_slashings: 222, + ..Default::default() + }; + bs.slot_states = + SlotStateGroup::new(SlotStateFinalized::from_parts(slot, zero_roots(), zero_roots())); + + let mut ssz = Vec::with_capacity(bs.ssz_len()); + bs.encode_ssz(&mut ssz).unwrap(); + let decoded = BeaconState::decompose(&ssz, &SpecConfig::mainnet(), None).unwrap(); + + let de = decoded.epoch.finalized(); + let ds = decoded.slot_states.finalized().state(); + assert_eq!(de.randao_mixes[cur], [0xBB; 32], "randao current bucket = live"); + assert_eq!(ds.randao_mix_current, [0xBB; 32]); + assert_eq!(de.slashings[cur], 222, "slashings current bucket = live"); + assert_eq!(ds.current_epoch_slashings, 222); + } + + /// Write the decompressed-pubkey sidecar, decode it, rebuild the validators + /// from SSZ + sidecar, and confirm the pubkeys match. Also that a corrupted + /// sidecar is rejected (never silently accepted). #[test] fn pubkeys_sidecar_round_trip() { let bs = BeaconState::pre_bootstrap(); diff --git a/crates/beacon_state/tile/src/lib.rs b/crates/beacon_state/tile/src/lib.rs index ba10a90..916a740 100644 --- a/crates/beacon_state/tile/src/lib.rs +++ b/crates/beacon_state/tile/src/lib.rs @@ -9,6 +9,7 @@ pub mod ssz_hash; pub mod state_transition; pub mod tile; mod validate; +mod weak_subjectivity; #[cfg(test)] pub(crate) mod test_signing; diff --git a/crates/beacon_state/tile/src/tile.rs b/crates/beacon_state/tile/src/tile.rs index 748a55e..891c268 100644 --- a/crates/beacon_state/tile/src/tile.rs +++ b/crates/beacon_state/tile/src/tile.rs @@ -12,8 +12,8 @@ use silver_beacon_state_data::{ }; use silver_common::{ BeaconStateEvent, BlockSource, DataColumnsAvailable, GossipTopic, NewGossipMsg, P2pStreamId, - PeerEvent, RpcInbound, RpcResponse, RpcResponseInbound, RpcSeverity, SilverSpine, SyncUpdate, - TCacheRead, TRandomAccess, TRead, hex32, + PeerEvent, ReplayBlock, RpcInbound, RpcResponse, RpcResponseInbound, RpcSeverity, SilverSpine, + SyncUpdate, TCacheRead, TRandomAccess, TRead, hex32, ssz_view::{ self, AttesterSlashingView, MAX_ATTESTATIONS_ELECTRA, MAX_ATTESTING_INDICES, PROPOSER_SLASHING_SIZE, ProposerSlashingView, SIGNED_BLS_CHANGE_SIZE, @@ -30,6 +30,7 @@ use crate::{ fork_choice::{BlockImport, ForkChoice, MAX_FORK_CHOICE_NODES, VoteTracker, compute_deltas}, shuffling::{self, DOMAIN_BEACON_ATTESTER}, ssz_hash, state_transition, validate, + weak_subjectivity::weak_subjectivity_period, }; /// Survivor set re-anchored at finalization: every fork-choice node's bundle @@ -187,6 +188,9 @@ pub struct BeaconStateTile { gossip_consumer: TRandomAccess, rpc_consumer: TRandomAccess, + replay_consumer: TRandomAccess, + + verify_weak_subjectivity: bool, } type Producers = ::Producers; @@ -194,12 +198,15 @@ type Producers = ::Producers; impl BeaconStateTile { /// If `checkpoint_state` is non-empty, bootstraps immediately; otherwise /// starts inert in `Mode::Syncing` (call `bootstrap` before the loop). + #[allow(clippy::too_many_arguments)] pub fn new( ticker: SlotTicker, spec: SpecConfig, mut state: BeaconStateOwner, gossip_consumer: TRandomAccess, rpc_consumer: TRandomAccess, + replay_consumer: TRandomAccess, + verify_weak_subjectivity: bool, checkpoint_state: &[u8], decompressed_pubkeys: &[u8], ) -> Self { @@ -244,6 +251,8 @@ impl BeaconStateTile { ), gossip_consumer, rpc_consumer, + replay_consumer, + verify_weak_subjectivity, }; if !checkpoint_state.is_empty() { @@ -362,6 +371,8 @@ impl BeaconStateTile { self.fork_choice = ForkChoice::init(trusted, trusted, slot, block_root, anchor_state_root, anchor); self.state.publish_state_id(anchor); + + self.assert_within_weak_subjectivity(); } /// Index bundle of fork-choice's canonical tip. For gossip-object @@ -513,6 +524,24 @@ impl BeaconStateTile { self.configured_fork_digest = Some(digest); } + pub fn assert_within_weak_subjectivity(&mut self) { + if !self.verify_weak_subjectivity { + return; + } + let ws_period = { + let view = self.state.read_view(self.last_applied); + weak_subjectivity_period(&view, &mut self.stf_scratch.active) + }; + let checkpoint_epoch = self.head_state_slot() / SLOTS_PER_EPOCH; + let current_epoch = self.ticker.current_slot() / SLOTS_PER_EPOCH; + assert!( + current_epoch <= checkpoint_epoch + ws_period, + "checkpoint epoch {checkpoint_epoch} is outside the weak-subjectivity period \ + ({ws_period} epochs); wall epoch {current_epoch} — refusing stale anchor \ + (override with --disable-weak-subjectivity)", + ); + } + fn status_payload(&mut self) -> [u8; STATUS_V2_SIZE] { let fork_digest = self.fork_digest(); @@ -1105,6 +1134,28 @@ impl BeaconStateTile { fn da_boundary(&self) -> Slot { self.sync_finalized_slot.max(self.fork_choice.finalized_checkpoint.epoch * SLOTS_PER_EPOCH) } + fn replay_block(&mut self, data: &[u8]) { + if !SignedBeaconBlockView::check_size(data) { + tracing::error!(len = data.len(), "replayed on-disk block malformed"); + return; + } + + let block_slot = SignedBeaconBlockView::slot(data); + let feedback = self.apply_block_impl(data, false); + match feedback { + Feedback::Reject(block_root) => tracing::error!( + block_slot, + block_root = ?block_root.map(|r| hex32(&r)), + "replayed block rejected", + ), + _ => tracing::info!( + head_slot = self.head_state_slot(), + block_slot, + "replayed block: {:?}", + feedback + ), + } + } fn apply_block_impl(&mut self, data: &[u8], gate_da: bool) -> Feedback { let parsed = match self.precheck_block(data) { @@ -1810,6 +1861,21 @@ impl Tile for BeaconStateTile { adapter.consume(|m: DataColumnsAvailable, producers| { self.handle_data_columns_available(m, producers); }); + + adapter.consume(|m: ReplayBlock, producers| match m { + ReplayBlock::Block { ssz } => { + let acquired = self.replay_consumer.acquire(ssz); + let data = acquired.buffer().ok().map(|(d, _)| d as *const [u8]); + if let Some(p) = data { + self.replay_block(unsafe { &*p }); + } + } + ReplayBlock::Done => { + producers.produce(self.status_event()); + producers.produce(BeaconStateEvent::ReplayComplete); + } + }); + self.replay_consumer.free(); } } @@ -1936,16 +2002,35 @@ mod tests { /// Tile whose ticker reports `wall_slot` as the current slot. fn make_tile_at_wall_slot(wall_slot: u64) -> BeaconStateTile { + make_tile_at_wall_slot_ws(wall_slot, true) + } + + fn make_tile_at_wall_slot_ws( + wall_slot: u64, + verify_weak_subjectivity: bool, + ) -> BeaconStateTile { let secs_per_slot = 12u64; let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs(); let genesis = now.saturating_sub(wall_slot * secs_per_slot + 1); let ticker = SlotTicker::new(genesis, Duration::from_secs(12), Duration::from_secs(4)); let gossip_p = TCache::producer("test_gossip", 1 << 20); let event_p = TCache::producer("test_event", 1 << 20); + let replay_p = TCache::producer("test_replay", 1 << 20); let gossip_c = gossip_p.cache_ref().random_access("test_gossip", true).unwrap(); let rpc_c = event_p.cache_ref().random_access("test_event", true).unwrap(); + let replay_c = replay_p.cache_ref().random_access("test_replay", true).unwrap(); let state = BeaconStateOwner::pre_bootstrap(); - BeaconStateTile::new(ticker, SpecConfig::mainnet(), state, gossip_c, rpc_c, &[], &[]) + BeaconStateTile::new( + ticker, + SpecConfig::mainnet(), + state, + gossip_c, + rpc_c, + replay_c, + verify_weak_subjectivity, + &[], + &[], + ) } fn placeholder_pubkey(i: usize) -> BLSPubkey { diff --git a/crates/beacon_state/tile/src/weak_subjectivity.rs b/crates/beacon_state/tile/src/weak_subjectivity.rs new file mode 100644 index 0000000..430f014 --- /dev/null +++ b/crates/beacon_state/tile/src/weak_subjectivity.rs @@ -0,0 +1,50 @@ +use silver_beacon_state_data::{SLOTS_PER_EPOCH, StateReadView}; + +use crate::shuffling; + +// Constants are deliberately the *pre-MaxEB* ones: the period's derivation +// assumes a 32-ETH per-validator cap and count-based churn, and has not been +// re-derived for EIP-7251. `MAX_EB_ETH` is therefore 32 — not the runtime +// 2048-ETH `MAX_EFFECTIVE_BALANCE` — matching the spec-as-written and other +// clients. A known approximation under MaxEB. +const SAFETY_DECAY: u64 = 10; +const MIN_WITHDRAWABILITY_DELAY: u64 = 256; +const MAX_EB_ETH: u64 = 32; +const ETH_TO_GWEI: u64 = 1_000_000_000; +const MAX_DEPOSITS: u64 = 16; +const MIN_PER_EPOCH_CHURN_LIMIT: u64 = 4; +const CHURN_LIMIT_QUOTIENT: u64 = 1 << 16; + +pub(crate) fn weak_subjectivity_period(view: &StateReadView, scratch: &mut Vec) -> u64 { + let epoch = view.slot.state().slot / SLOTS_PER_EPOCH; + scratch.clear(); + shuffling::get_active_validator_indices_into(&view.validators, epoch, scratch); + let n = scratch.len() as u64; + if n == 0 { + return MIN_WITHDRAWABILITY_DELAY; + } + + // get_total_active_balance, floored at one increment, averaged to whole ETH. + let total = scratch + .iter() + .map(|&i| view.validators.effective_balance(i as usize)) + .sum::() + .max(ETH_TO_GWEI); + let d = SAFETY_DECAY; + let t = total / n / ETH_TO_GWEI; + let cap = MAX_EB_ETH; + let delta = (n / CHURN_LIMIT_QUOTIENT).max(MIN_PER_EPOCH_CHURN_LIMIT); + let big_delta = MAX_DEPOSITS * SLOTS_PER_EPOCH; + + let mut ws = MIN_WITHDRAWABILITY_DELAY; + // On the else side the branch guarantees `t <= cap*(200+3d)/(200+12d) < cap`, + // so `cap - t > 0` (no div-by-zero) and the subtraction above never wraps. + if cap * (200 + 3 * d) < t * (200 + 12 * d) { + let churn = n * (t * (200 + 12 * d) - cap * (200 + 3 * d)) / (600 * delta * (2 * t + cap)); + let top_ups = n * (200 + 3 * d) / (600 * big_delta); + ws += churn.max(top_ups); + } else { + ws += 3 * n * d * t / (200 * big_delta * (cap - t)); + } + ws +} diff --git a/crates/beacon_state/tile/tests/common.rs b/crates/beacon_state/tile/tests/common.rs index aaf4fd9..a64efb2 100644 --- a/crates/beacon_state/tile/tests/common.rs +++ b/crates/beacon_state/tile/tests/common.rs @@ -90,6 +90,8 @@ pub struct Harness { inj_adapter: SpineAdapter, gossip_in_producer: TProducer, rpc_in_producer: TProducer, + // Kept alive to back the tile's replay consumer; unused by these tests. + _replay_in_producer: TProducer, outbound_log: Vec, _base_dir: PathBuf, // owned to keep temp files around for the run } @@ -100,6 +102,7 @@ pub enum OutboundKind { PersistBlock, BlockRejected, SendGossip, + ReplayComplete, } impl OutboundKind { @@ -109,6 +112,7 @@ impl OutboundKind { "persist_block" => Self::PersistBlock, "block_rejected" => Self::BlockRejected, "send_gossip" => Self::SendGossip, + "replay_complete" => Self::ReplayComplete, _ => return None, }) } @@ -118,21 +122,32 @@ impl OutboundKind { BeaconStateEvent::Status { .. } => Self::Status, BeaconStateEvent::PersistBlock { .. } => Self::PersistBlock, BeaconStateEvent::BlockRejected { .. } => Self::BlockRejected, + BeaconStateEvent::ReplayComplete => Self::ReplayComplete, } } } impl Harness { pub fn new(wall_slot: u64, checkpoint_ssz: &[u8]) -> Self { - Self::build(wall_slot, |ticker, gc, rc| { + Self::build(wall_slot, |ticker, gc, rc, repc| { let state = BeaconStateOwner::pre_bootstrap(); - BeaconStateTile::new(ticker, SpecConfig::mainnet(), state, gc, rc, checkpoint_ssz, &[]) + BeaconStateTile::new( + ticker, + silver_beacon_state_data::SpecConfig::mainnet(), + state, + gc, + rc, + repc, + true, + checkpoint_ssz, + &[], + ) }) } fn build(wall_slot: u64, build_tile: F) -> Self where - F: FnOnce(SlotTicker, TRandomAccess, TRandomAccess) -> BeaconStateTile, + F: FnOnce(SlotTicker, TRandomAccess, TRandomAccess, TRandomAccess) -> BeaconStateTile, { static SEQ: AtomicU64 = AtomicU64::new(0); let seq = SEQ.fetch_add(1, Ordering::Relaxed); @@ -154,11 +169,14 @@ impl Harness { let gossip_in_producer = TCache::producer("gossip_in", 1 << 24); let rpc_in_producer = TCache::producer("rpc_in", 1 << 24); + let replay_in_producer = TCache::producer("replay_in", 1 << 24); let gossip_consumer = gossip_in_producer.cache_ref().random_access("test", true).expect("gossip ra"); let rpc_consumer = rpc_in_producer.cache_ref().random_access("test", true).expect("rpc ra"); + let replay_consumer = + replay_in_producer.cache_ref().random_access("test", true).expect("replay ra"); - let tile = build_tile(ticker, gossip_consumer, rpc_consumer); + let tile = build_tile(ticker, gossip_consumer, rpc_consumer, replay_consumer); // Order matters: attach tile first so its tile_id stays 0 for the // real consumer of `inbound`; Injector gets tile_id 1. @@ -180,6 +198,7 @@ impl Harness { inj_adapter, gossip_in_producer, rpc_in_producer, + _replay_in_producer: replay_in_producer, outbound_log: Vec::new(), _base_dir: base, } diff --git a/crates/bin/src/main.rs b/crates/bin/src/main.rs index de3a80a..9283b97 100644 --- a/crates/bin/src/main.rs +++ b/crates/bin/src/main.rs @@ -1,4 +1,4 @@ -use std::{error::Error, path::Path, str::FromStr, sync::Arc, time::Instant}; +use std::{error::Error, str::FromStr, sync::Arc, time::Instant}; use flux::{ tile::{TileConfig, attach_tile}, @@ -15,7 +15,7 @@ use silver_discovery::{DiscV5, Discovery}; use silver_gossip::GossipHandler; use silver_network::{Context, NetworkTile, P2p}; use silver_peer::PeerManager; -use silver_storage::tile::StorageTile; +use silver_storage::{latest_local_checkpoint, tile::StorageTile}; use tracing_subscriber::{EnvFilter, fmt::format::FmtSpan}; #[cfg(not(feature = "alloc-profile"))] @@ -36,34 +36,7 @@ fn main() -> Result<(), Box> { tracing::debug!("start"); - let args = std::env::args().collect::>(); - let config_path = args.iter().position(|a| a == "--config").and_then(|i| args.get(i + 1)); - - let config = match config_path { - // Devnet / custom: every network-specific value (fork_digest, - // genesis, bootstrap ENRs, external IP, ports, secret key) comes - // from the file — no source edits needed. - Some(path) => Config::from_file(path)?, - // Default: mainnet, random identity, hardcoded bootnodes below. - None => { - let mut secret = [0u8; 32]; - rand::thread_rng().fill_bytes(&mut secret); - let fork_digest = [0x8c, 0x9f, 0x62, 0xfe]; - let next_fork_version = [6, 0, 0, 0]; - let next_fork_epoch = u64::MAX; - let mut config = Config::new(secret, fork_digest, next_fork_version, next_fork_epoch) - .with_discovery_port(31133) - .with_quic_port(31123); - - if let Some(ckpt) = args.get(1).filter(|a| !a.starts_with("--")) { - config = config.with_checkpoint(ckpt.to_string()); - if let Some(pk) = args.get(2).filter(|a| !a.starts_with("--")) { - config = config.with_checkpoint_pubkeys(pk.to_string()); - } - } - config - } - }; + let config = load_config()?; tracing::info!("loaded config with fork digest: {}", hex::encode(config.fork_digest())); @@ -89,10 +62,11 @@ fn main() -> Result<(), Box> { incoming_rpc_producer.cache_ref().random_access("ds_incoming_rpc", true)?; let persist_rpc_consumer_ds = incoming_rpc_producer.cache_ref().random_access("ds_persist_incoming_rpc", true)?; - - // rpc producer let outgoing_rpc_producer = TCache::multi_producer("outgoing_rpc", config.outgoing_rpc_tcache_size()); + let replay_blocks_producer = TCache::producer("replay_blocks", 1 << 25); + let replay_blocks_consumer = + replay_blocks_producer.cache_ref().random_access("bs_replay", true)?; // Tiles. let keypair = config.keypair()?; @@ -127,10 +101,9 @@ fn main() -> Result<(), Box> { }; let now = Instant::now(); - // File configs supply bootnodes via chain_config.bootstrap_enrs; the - // default mainnet run falls back to the hardcoded set. - let bootnodes: Vec = if config_path.is_some() { - config.chain_config().bootstrap_enrs + + let bootnodes = if !config.chain_config().bootstrap_enrs.is_empty() { + config.chain_config().bootstrap_enrs.clone() } else { vec![ Enr::from_str( @@ -144,6 +117,7 @@ fn main() -> Result<(), Box> { )?, ] }; + for enr in &bootnodes { discv5.add_enr(enr, now); } @@ -164,35 +138,29 @@ fn main() -> Result<(), Box> { hex::encode(config.fork_digest()), )?; + let chain_config = config.chain_config(); + let ticker = SlotTicker::new( + chain_config.genesis_unix_secs, + chain_config.slot_duration(), + chain_config.playload_lookahead(), + ); + + let (checkpoint, checkpoint_pubkeys) = load_checkpoint(&config)?; + let booting_from_local_checkpoint = !checkpoint.is_empty(); + let control_tile = Controller::new( PeerManager::new( gossip_topics, config.peer_score_params(), - config.syncing(), + config.syncing_config(), config.fork_digest(), local_enr.into(), das_custody_groups, + booting_from_local_checkpoint, ), outgoing_rpc_producer.clone(), ); - let chain_config = config.chain_config(); - let ticker = SlotTicker::new( - chain_config.genesis_unix_secs, - chain_config.slot_duration(), - chain_config.playload_lookahead(), - ); - - let checkpoint = match chain_config.checkpoint_file { - Some(file) => load_checkpoint(file)?, - None => vec![], - }; - // Pubkey sidecar only meaningful alongside a checkpoint. - let checkpoint_pubkeys = match chain_config.checkpoint_pubkeys_file { - Some(file) if !checkpoint.is_empty() => load_checkpoint(file)?, - _ => vec![], - }; - let state = BeaconStateOwner::pre_bootstrap(); let state_reader = state.reader(); @@ -203,6 +171,8 @@ fn main() -> Result<(), Box> { state, ssz_gossip_consumer, incoming_rpc_consumer, + replay_blocks_consumer, + !config.disable_weak_subjectivity_check(), &checkpoint, &checkpoint_pubkeys, ); @@ -218,13 +188,14 @@ fn main() -> Result<(), Box> { incoming_rpc_consumer_ds, persist_rpc_consumer_ds, outgoing_rpc_producer, + replay_blocks_producer, state_reader, das_custody_groups, config.fork_digest(), config.data_storage_dir().into(), + booting_from_local_checkpoint, ); - // Spine let spine = SilverSpine::new(None); // TODO panic handler spine.start(None, None, |scoped_spine| { @@ -239,9 +210,62 @@ fn main() -> Result<(), Box> { Ok(()) } -fn load_checkpoint + std::fmt::Debug>( - file_path: P, -) -> Result, std::io::Error> { - tracing::info!("loading checkpoint file: {file_path:?}"); - std::fs::read(file_path) +fn load_config() -> Result { + let args = std::env::args().collect::>(); + let config_path = args.iter().position(|a| a == "--config").and_then(|i| args.get(i + 1)); + let mut config = match config_path { + // Devnet / custom: every network-specific value (fork_digest, + // genesis, bootstrap ENRs, external IP, ports, secret key) comes + // from the file — no source edits needed. + Some(path) => Config::from_file(path)?, + // Default: mainnet, random identity, hardcoded bootnodes below. + None => { + let mut secret = [0u8; 32]; + rand::thread_rng().fill_bytes(&mut secret); + let fork_digest = [0x8c, 0x9f, 0x62, 0xfe]; + let next_fork_version = [6, 0, 0, 0]; + let next_fork_epoch = u64::MAX; + let mut config = Config::new(secret, fork_digest, next_fork_version, next_fork_epoch) + .with_discovery_port(31133) + .with_quic_port(31123); + + if let Some(ckpt) = args.get(1).filter(|a| !a.starts_with("--")) { + config = config.with_checkpoint(ckpt.to_string()); + if let Some(pk) = args.get(2).filter(|a| !a.starts_with("--")) { + config = config.with_checkpoint_pubkeys(pk.to_string()); + } + } + + config + } + }; + + // CLI override (applies on top of either source). + if args.iter().any(|a| a == "--disable-weak-subjectivity") { + config = config.with_disable_weak_subjectivity_check(true); + } + + Ok(config) +} + +fn load_checkpoint(config: &Config) -> Result<(Vec, Vec), std::io::Error> { + let chain_config = config.chain_config(); + match &chain_config.checkpoint_file { + Some(file) => { + let checkpoint = std::fs::read(file)?; + let pubkeys = match &chain_config.checkpoint_pubkeys_file { + Some(file) if !checkpoint.is_empty() => std::fs::read(file)?, + _ => vec![], + }; + Ok((checkpoint, pubkeys)) + } + None => match latest_local_checkpoint(config.data_storage_dir()) { + Some((slot, ssz_path, pubkeys_path)) => { + tracing::info!(slot, "bootstrapping from on-disk checkpoint"); + let pubkeys = pubkeys_path.map(std::fs::read).transpose()?.unwrap_or_default(); + Ok((std::fs::read(ssz_path)?, pubkeys)) + } + None => Ok((vec![], vec![])), + }, + } } diff --git a/crates/common/src/lib.rs b/crates/common/src/lib.rs index 85bdce8..5ccd7de 100644 --- a/crates/common/src/lib.rs +++ b/crates/common/src/lib.rs @@ -22,11 +22,11 @@ pub use crate::{ MAX_PAYLOAD_BODIES_PER_REQ, MULTISTREAM_V1, MultiProducer as TMultiProducer, NewGossipMsg, P2pSend, P2pStreamId, PayloadValidationStatus, PeerControl, PeerEvent, PeerStatus, Producer as TProducer, REJECT_RESPONSE, REQUEST_ID_PREFIX_MASK, RPC_PROTOCOLS, - RandomAccessConsumer as TRandomAccess, RequestCategory, Reservation as TReservation, - RpcInbound, RpcMsg, RpcOutbound, RpcRequest, RpcRequestInbound, RpcRequestOutbound, - RpcResponse, RpcResponseInbound, RpcResponseOutbound, RpcSeverity, SilverSpine, - SilverSpineProducers, StreamProtocol, SyncUpdate, TCache, TCacheProducer, TCacheRead, - TCacheRef, WithdrawalInline, + RandomAccessConsumer as TRandomAccess, ReplayBlock, RequestCategory, + Reservation as TReservation, RpcInbound, RpcMsg, RpcOutbound, RpcRequest, + RpcRequestInbound, RpcRequestOutbound, RpcResponse, RpcResponseInbound, + RpcResponseOutbound, RpcSeverity, SilverSpine, SilverSpineProducers, StreamProtocol, + SyncUpdate, TCache, TCacheProducer, TCacheRead, TCacheRef, WithdrawalInline, }, util::{create_self_signed_certificate, decode_varint, encode_varint, hex32}, wheel::Wheel, diff --git a/crates/common/src/spine.rs b/crates/common/src/spine.rs index 3019ceb..55f066d 100644 --- a/crates/common/src/spine.rs +++ b/crates/common/src/spine.rs @@ -9,8 +9,8 @@ pub use messages::{ EngineGetPayloadResp, EngineHealthEvent, EngineNewPayloadReq, EngineNewPayloadResp, EnginePreparePayloadReq, EngineReq, EngineResp, GossipMsgOut, IpBytes, MAX_BLOBS_PER_BLOCK, MAX_PAYLOAD_BODIES_PER_REQ, NewGossipMsg, P2pSend, PayloadValidationStatus, PeerControl, - PeerEvent, PeerStatus, REQUEST_ID_PREFIX_MASK, RequestCategory, RpcInbound, RpcMsg, - RpcOutbound, RpcRequest, RpcRequestInbound, RpcRequestOutbound, RpcResponse, + PeerEvent, PeerStatus, REQUEST_ID_PREFIX_MASK, ReplayBlock, RequestCategory, RpcInbound, + RpcMsg, RpcOutbound, RpcRequest, RpcRequestInbound, RpcRequestOutbound, RpcResponse, RpcResponseInbound, RpcResponseOutbound, RpcSeverity, SyncUpdate, WithdrawalInline, }; pub use stream_id::P2pStreamId; @@ -51,6 +51,8 @@ pub struct SilverSpine { pub data_columns: SpineQueue, #[queue(size(2usize.pow(10)))] pub sync_target: SpineQueue, + #[queue(size(2usize.pow(12)))] + pub replay_blocks: SpineQueue, #[queue(size(2usize.pow(10)))] pub engine_reqs: SpineQueue, diff --git a/crates/common/src/spine/messages.rs b/crates/common/src/spine/messages.rs index 5f77f7c..910f200 100644 --- a/crates/common/src/spine/messages.rs +++ b/crates/common/src/spine/messages.rs @@ -618,6 +618,14 @@ pub enum BeaconStateEvent { Status { ssz: [u8; STATUS_V2_SIZE], latest_block_slot: u64, wall_slot: u64 }, PersistBlock { ssz: TCacheRead, source: BlockSource }, BlockRejected { block_root: [u8; 32], source: BlockSource }, + ReplayComplete, +} + +#[derive(Clone, Copy, Debug)] +#[repr(C, u8)] +pub enum ReplayBlock { + Block { ssz: TCacheRead }, + Done, } /// Maximum blob commitments per block (Fulu target; increase as the spec @@ -851,7 +859,7 @@ impl BeaconStateEvent { match self { Self::Status { .. } => SszView::Status(StatusView {}), Self::PersistBlock { .. } => SszView::SignedBeaconBlock(SignedBeaconBlockView {}), - Self::BlockRejected { .. } => SszView::None, + Self::BlockRejected { .. } | Self::ReplayComplete => SszView::None, } } } diff --git a/crates/config/src/lib.rs b/crates/config/src/lib.rs index 16af1c5..c4dce0a 100644 --- a/crates/config/src/lib.rs +++ b/crates/config/src/lib.rs @@ -1,5 +1,6 @@ use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6}; +use chain_config::ChainConfig; pub use discovery_config::DiscoveryConfig; pub use peer_score_params::ScoreParams; use secp256k1::PublicKey; @@ -7,8 +8,6 @@ use serde::{Deserialize, Serialize}; use silver_common::{Enr, Error, GossipTopic, Identify, Keypair, NodeId, PeerId, StreamProtocol}; pub use syncing_config::SyncingConfig; -use crate::chain_config::ChainConfig; - mod chain_config; mod discovery_config; mod peer_score_params; @@ -106,6 +105,8 @@ pub struct Config { outgoing_rpc_tcache_size: usize, #[serde(default = "default_data_dir")] data_storage_dir: String, + #[serde(default)] + disable_weak_subjectivity_check: bool, } impl Config { @@ -137,6 +138,7 @@ impl Config { incoming_rpc_tcache_size: 2 << 27, // ssz outgoing_rpc_tcache_size: 2 << 24, // ssz data_storage_dir: default_data_dir(), + disable_weak_subjectivity_check: false, } } @@ -178,6 +180,11 @@ impl Config { self } + pub fn with_disable_weak_subjectivity_check(mut self, disable: bool) -> Self { + self.disable_weak_subjectivity_check = disable; + self + } + pub fn keypair(&self) -> Result { Keypair::from_secret(&self.secret_key) } @@ -253,8 +260,8 @@ impl Config { Ok(identify) } - pub fn chain_config(&self) -> ChainConfig { - self.chain_config.clone() + pub fn chain_config(&self) -> &ChainConfig { + &self.chain_config } pub fn discovery_config(&self) -> DiscoveryConfig { @@ -265,7 +272,7 @@ impl Config { self.peer_score_params.clone() } - pub fn syncing(&self) -> SyncingConfig { + pub fn syncing_config(&self) -> SyncingConfig { self.syncing.clone() } @@ -300,6 +307,10 @@ impl Config { pub fn data_storage_dir(&self) -> &str { &self.data_storage_dir } + + pub fn disable_weak_subjectivity_check(&self) -> bool { + self.disable_weak_subjectivity_check + } } #[cfg(test)] diff --git a/crates/control/src/tile.rs b/crates/control/src/tile.rs index 6b93dcb..f75b219 100644 --- a/crates/control/src/tile.rs +++ b/crates/control/src/tile.rs @@ -134,6 +134,9 @@ impl Tile for Controller { BeaconStateEvent::BlockRejected { block_root, source } => { self.peer_manager.record_block_rejected(block_root, source); } + BeaconStateEvent::ReplayComplete => { + self.peer_manager.on_local_replay_complete(); + } _ => {} }); diff --git a/crates/e2e/src/stack.rs b/crates/e2e/src/stack.rs index 013ab49..7f85979 100644 --- a/crates/e2e/src/stack.rs +++ b/crates/e2e/src/stack.rs @@ -262,6 +262,7 @@ impl PublisherStack { [0u8; 4], [0u8; METADATA_SIZE], 0, + false, ), TCache::multi_producer("dummy_rpc_out", 32), // dummpy rpc out ); @@ -369,6 +370,7 @@ impl EchoStack { [0u8; 4], [0u8; METADATA_SIZE], 0, + false, ), TCache::multi_producer("dummy_rpc_out", 32), // dummpy rpc out ); diff --git a/crates/e2e/src/utils.rs b/crates/e2e/src/utils.rs index fb47d76..2d0c594 100644 --- a/crates/e2e/src/utils.rs +++ b/crates/e2e/src/utils.rs @@ -131,8 +131,10 @@ impl PmBsHarness { let gossip_p = TCache::producer("gossip_in", 1 << 20); let rpc_cap = (n_blocks * 300 * 1024).next_power_of_two().max(1 << 22); let rpc_p = TCache::producer("rpc_in", rpc_cap); + let replay_p = TCache::producer("replay_in", 1 << 20); let gossip_c = gossip_p.cache_ref().random_access("test", true).expect("gossip ra"); let rpc_c = rpc_p.cache_ref().random_access("test", true).expect("rpc ra"); + let replay_c = replay_p.cache_ref().random_access("test", true).expect("replay ra"); let state = BeaconStateOwner::pre_bootstrap(); let mut bs = BeaconStateTile::new( @@ -141,6 +143,8 @@ impl PmBsHarness { state, gossip_c, rpc_c, + replay_c, + true, checkpoint, &[], ); @@ -157,6 +161,7 @@ impl PmBsHarness { [0u8; 4], // overwritten via set_status from BS's first emission [0u8; METADATA_SIZE], 0, + false, ); let mut ctl = Controller::new(pm, TCache::multi_producer("rpc_out_dummy", 32)); let mut ctl_a = SpineAdapter::connect_tile(&ctl, &mut spine); diff --git a/crates/e2e/tests/checkpoint_load.rs b/crates/e2e/tests/checkpoint_load.rs index ccc01b0..7a861c6 100644 --- a/crates/e2e/tests/checkpoint_load.rs +++ b/crates/e2e/tests/checkpoint_load.rs @@ -153,8 +153,10 @@ fn finalized_state_loads() { let ticker = SlotTicker::new(genesis_time, Duration::from_secs(12), Duration::from_secs(4)); let gossip_p = TCache::producer("gossip_in", 1 << 20); let rpc_p = TCache::producer("rpc_in", 1 << 20); + let replay_p = TCache::producer("replay_in", 1 << 20); let gossip_c = gossip_p.cache_ref().random_access("test", false).unwrap(); let rpc_c = rpc_p.cache_ref().random_access("test", false).unwrap(); + let replay_c = replay_p.cache_ref().random_access("test", false).unwrap(); let state = BeaconStateOwner::pre_bootstrap(); let mut tile = BeaconStateTile::new( @@ -163,6 +165,8 @@ fn finalized_state_loads() { state, gossip_c, rpc_c, + replay_c, + true, &ssz, &[], ); @@ -318,17 +322,21 @@ fn tile_apply_block_ef_fixture() { .decompress_vec(&std::fs::read(&block_path).unwrap()) .expect("snappy block"); - // Offset genesis_time so wall_slot is far ahead of the small EF slot. - let genesis_time = u64::from_le_bytes(pre_ssz[0..8].try_into().unwrap()); + // Wall just past the block: far enough that it isn't future-rejected, but + // close enough that the pre-state stays within its weak-subjectivity period. + let block_slot = silver_common::ssz_view::SignedBeaconBlockView::slot(&block_ssz); + let now = std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_secs(); let ticker = SlotTicker::new( - genesis_time.saturating_sub(60 * 60 * 24 * 365), + now.saturating_sub((block_slot + 1) * 12), Duration::from_secs(12), Duration::from_secs(4), ); let gossip_p = TCache::producer("gossip_ef", 1 << 20); let rpc_p = TCache::producer("rpc_ef", 1 << 20); + let replay_p = TCache::producer("replay_ef", 1 << 20); let gossip_c = gossip_p.cache_ref().random_access("test", false).unwrap(); let rpc_c = rpc_p.cache_ref().random_access("test", false).unwrap(); + let replay_c = replay_p.cache_ref().random_access("test", false).unwrap(); let state = BeaconStateOwner::pre_bootstrap(); let mut tile = BeaconStateTile::new( @@ -337,6 +345,8 @@ fn tile_apply_block_ef_fixture() { state, gossip_c, rpc_c, + replay_c, + true, &pre_ssz, &[], ); diff --git a/crates/gossip/src/message.rs b/crates/gossip/src/message.rs index 1807e6f..dbc3e5b 100644 --- a/crates/gossip/src/message.rs +++ b/crates/gossip/src/message.rs @@ -35,7 +35,7 @@ pub(super) fn handle_incoming( }; let topic = GossipTopic::from_wire(topic_string, fork_digest_hex)?; - tracing::info!(?stream_id, ?topic, "Gossip message received"); + tracing::trace!(?stream_id, ?topic, "Gossip message received"); // Decompress: block snappy. let len = read_message_length(snappy_data, &topic).inspect_err(|_| { diff --git a/crates/peer/src/manager.rs b/crates/peer/src/manager.rs index 80a836f..7816526 100644 --- a/crates/peer/src/manager.rs +++ b/crates/peer/src/manager.rs @@ -5,7 +5,7 @@ use std::{ collections::{HashMap, HashSet, VecDeque}, net::{IpAddr, SocketAddr}, - time::Instant, + time::{Duration, Instant}, }; use fxhash::{FxHashMap, FxHashSet}; @@ -32,6 +32,10 @@ const IP_COLOC_CAP: usize = 128; const ARCHIVE_CAP: usize = 512; const SYNC_AGG_CAP: usize = 64; +/// If BS never sends `ReplayComplete` (signal lost, storage stalled), drop the +/// gate after this timeout so network sync can't be stranded. +const REPLAY_GATE_TIMEOUT: Duration = Duration::from_secs(300); + pub struct PeerManager { /// Live peers keyed by connection handle. peers: HashMap, @@ -187,6 +191,13 @@ pub struct PeerManager { /// `latest_block_slot` on the Status event. pub(crate) local_head_imported_slot: u64, + /// Gates `maybe_issue_syncreq` while the storage tile replays the on-disk + /// chain into BS, so we don't network-fetch blocks we already have. + awaiting_local_replay: bool, + /// First `tick` instant seen while `awaiting_local_replay`; anchors + /// REPLAY_GATE_TIMEOUT. `None` until the first tick. + awaiting_replay_since: Option, + /// Peers burnt as backer candidates for the duration of the current /// catchup. A peer lands here when it misbehaves on an RPC while it is /// our active inflight backer — `pick_sync_peer` skips them so the next @@ -299,6 +310,7 @@ impl PeerManager { fork_digest: [u8; 4], metadata: [u8; METADATA_SIZE], custody_columns: u128, + awaiting_local_replay: bool, ) -> Self { let now = Instant::now(); let mesh = @@ -343,6 +355,8 @@ impl PeerManager { col_tried_for_range: FxHashSet::with_capacity_and_hasher(PEERS_CAP, Default::default()), col_stall_logged: false, local_head_imported_slot: 0, + awaiting_local_replay, + awaiting_replay_since: None, burnt_for_target: FxHashSet::with_capacity_and_hasher(PEERS_CAP, Default::default()), finalized_counts: FxHashMap::with_capacity_and_hasher(SYNC_AGG_CAP, Default::default()), head_counts: FxHashMap::with_capacity_and_hasher(SYNC_AGG_CAP, Default::default()), @@ -390,6 +404,14 @@ impl PeerManager { self.local_head_imported_slot = slot; } + pub fn on_local_replay_complete(&mut self) { + if self.awaiting_local_replay { + tracing::info!("local replay complete; unblocking network sync"); + self.awaiting_local_replay = false; + self.target_dirty = true; + } + } + pub fn set_wall_slot(&mut self, wall_slot: u64) { if self.local_wall_slot != wall_slot { self.local_wall_slot = wall_slot; @@ -977,6 +999,17 @@ impl PeerManager { self.last_heartbeat = now; } + if self.awaiting_local_replay { + let since = *self.awaiting_replay_since.get_or_insert(now); + if now.saturating_duration_since(since) >= REPLAY_GATE_TIMEOUT { + tracing::warn!( + "local-replay gate timed out without ReplayComplete; unblocking sync" + ); + self.awaiting_local_replay = false; + self.target_dirty = true; + } + } + // 2) Activate P3 tracking for peers whose grace window has elapsed. self.activate_p3_where_due(now); @@ -2045,7 +2078,11 @@ mod tests { #[derive(Default)] struct Captured(Vec); - fn fixture(our_topics: Vec, params: ScoreParams) -> (PeerManager, Captured) { + fn fixture( + our_topics: Vec, + params: ScoreParams, + awaiting_replay: bool, + ) -> (PeerManager, Captured) { ( PeerManager::new( our_topics, @@ -2054,6 +2091,7 @@ mod tests { [0u8; 4], [0u8; METADATA_SIZE], 0, + awaiting_replay, ), Captured::default(), ) @@ -2089,7 +2127,7 @@ mod tests { #[test] fn connect_with_no_topics_emits_nothing() { let now = Instant::now(); - let (mut mgr, mut cap) = fixture(vec![], ScoreParams::default()); + let (mut mgr, mut cap) = fixture(vec![], ScoreParams::default(), false); mgr.set_synced(true); connect(&mut mgr, &mut cap, 1, 1, now); assert!(subscribe_events(&cap).is_empty()); @@ -2099,7 +2137,7 @@ mod tests { fn connect_emits_subscribe_per_our_topic() { let now = Instant::now(); let topics = vec![GossipTopic::BeaconBlock, GossipTopic::VoluntaryExit]; - let (mut mgr, mut cap) = fixture(topics.clone(), ScoreParams::default()); + let (mut mgr, mut cap) = fixture(topics.clone(), ScoreParams::default(), false); mgr.set_synced(true); connect(&mut mgr, &mut cap, 1, 1, now); let subs = subscribe_events(&cap); @@ -2113,7 +2151,7 @@ mod tests { fn peer_subscribes_and_we_graft_when_mesh_under_d_low() { let now = Instant::now(); let topics = vec![GossipTopic::BeaconBlock]; - let (mut mgr, mut cap) = fixture(topics, ScoreParams::default()); + let (mut mgr, mut cap) = fixture(topics, ScoreParams::default(), false); mgr.set_synced(true); connect(&mut mgr, &mut cap, 1, 1, now); cap.0.clear(); @@ -2143,7 +2181,7 @@ mod tests { params.behaviour_penalty_weight = -10.0; // excess^2 * -10 params.graylist_threshold = -80.0; // behaviour_penalty=3 → score=-90 params.ip_ban_threshold = 1; // single eviction → BanIp for this test - let (mut mgr, mut cap) = fixture(vec![], params); + let (mut mgr, mut cap) = fixture(vec![], params, false); connect(&mut mgr, &mut cap, 1, 1, now); for _ in 0..5 { @@ -2173,7 +2211,7 @@ mod tests { let mut params = ScoreParams::default(); params.iwant_followup = Duration::from_secs(3); params.heartbeat_interval = Duration::from_millis(100); - let (mut mgr, mut cap) = fixture(vec![], params); + let (mut mgr, mut cap) = fixture(vec![], params, false); connect(&mut mgr, &mut cap, 1, 1, now); let hash = silver_common::MessageId { id: [7u8; 20] }; @@ -2201,7 +2239,7 @@ mod tests { let mut params = ScoreParams::default(); params.max_ihave_length = 3; params.graylist_threshold = -100_000.0; - let (mut mgr, mut cap) = fixture(vec![], params); + let (mut mgr, mut cap) = fixture(vec![], params, false); connect(&mut mgr, &mut cap, 1, 1, now); let hash = silver_common::MessageId { id: [9u8; 20] }; @@ -2228,7 +2266,7 @@ mod tests { let mut params = ScoreParams::default(); params.iwant_followup = Duration::from_secs(3); params.heartbeat_interval = Duration::from_millis(100); - let (mut mgr, mut cap) = fixture(vec![], params); + let (mut mgr, mut cap) = fixture(vec![], params, false); connect(&mut mgr, &mut cap, 1, 1, now); let hash = silver_common::MessageId { id: [77u8; 20] }; @@ -2255,7 +2293,7 @@ mod tests { let mut params = ScoreParams::default(); params.ip_colocation_threshold = 2; params.ip_colocation_weight = -5.0; - let (mut mgr, mut cap) = fixture(vec![], params); + let (mut mgr, mut cap) = fixture(vec![], params, false); for i in 1..=5u8 { mgr.handle_event( @@ -2280,7 +2318,7 @@ mod tests { fn disconnect_archives_and_reconnect_restores() { let now = Instant::now(); let params = ScoreParams::default(); - let (mut mgr, mut cap) = fixture(vec![], params); + let (mut mgr, mut cap) = fixture(vec![], params, false); connect(&mut mgr, &mut cap, 1, 1, now); mgr.handle_event(PeerEvent::P2pGossipInvalidFrame { p2p_peer: 1 }, now, &mut |c| { @@ -2308,7 +2346,7 @@ mod tests { let mut now = Instant::now(); let mut params = ScoreParams::default(); params.archived_ttl = Duration::from_secs(10); - let (mut mgr, mut cap) = fixture(vec![], params); + let (mut mgr, mut cap) = fixture(vec![], params, false); connect(&mut mgr, &mut cap, 1, 1, now); mgr.handle_event( PeerEvent::P2pDisconnect { p2p_peer: 1, peer_id: peer_id(1) }, @@ -2336,7 +2374,7 @@ mod tests { let mut params = ScoreParams::default(); params.iwant_followup = Duration::from_secs(3); params.heartbeat_interval = Duration::from_millis(100); - let (mut mgr, mut cap) = fixture(vec![], params); + let (mut mgr, mut cap) = fixture(vec![], params, false); connect(&mut mgr, &mut cap, 1, 1, now); let hash = silver_common::MessageId { id: [7u8; 20] }; @@ -2396,7 +2434,7 @@ mod tests { let mut params = ScoreParams::default(); params.iwant_followup = Duration::from_secs(3); params.heartbeat_interval = Duration::from_millis(100); - let (mut mgr, mut cap) = fixture(vec![], params); + let (mut mgr, mut cap) = fixture(vec![], params, false); connect(&mut mgr, &mut cap, 1, 1, now); connect(&mut mgr, &mut cap, 2, 2, now); @@ -2468,7 +2506,7 @@ mod tests { let mut params = ScoreParams::default(); params.d_lazy = 3; params.d_low = 1; // so the first subscriber grafts into mesh, rest stay non-mesh - let (mut mgr, mut cap) = fixture(vec![GossipTopic::BeaconBlock], params); + let (mut mgr, mut cap) = fixture(vec![GossipTopic::BeaconBlock], params, false); mgr.set_synced(true); for i in 1..=4u8 { @@ -2524,7 +2562,7 @@ mod tests { let mut params = ScoreParams::default(); params.gossip_threshold = -1.0; params.graylist_threshold = -1_000_000.0; - let (mut mgr, mut cap) = fixture(vec![GossipTopic::BeaconBlock], params); + let (mut mgr, mut cap) = fixture(vec![GossipTopic::BeaconBlock], params, false); connect(&mut mgr, &mut cap, 1, 1, now); mgr.handle_event( PeerEvent::P2pGossipTopicSubscribe { p2p_peer: 1, topic: GossipTopic::BeaconBlock }, @@ -2562,7 +2600,7 @@ mod tests { fn iwant_request_above_threshold_emits_forward() { use silver_common::{TCache, TCacheRead}; let now = Instant::now(); - let (mut mgr, mut cap) = fixture(vec![], ScoreParams::default()); + let (mut mgr, mut cap) = fixture(vec![], ScoreParams::default(), false); connect(&mut mgr, &mut cap, 1, 1, now); mgr.set_synced(true); @@ -2595,7 +2633,7 @@ mod tests { let mut params = ScoreParams::default(); params.gossip_threshold = -1.0; params.graylist_threshold = -1_000_000.0; - let (mut mgr, mut cap) = fixture(vec![], params); + let (mut mgr, mut cap) = fixture(vec![], params, false); connect(&mut mgr, &mut cap, 1, 1, now); for _ in 0..5 { @@ -2636,7 +2674,7 @@ mod tests { params.d_low = 0; params.d = 0; params.d_high = 8; - let (mut mgr, mut cap) = fixture(vec![GossipTopic::BeaconBlock], params); + let (mut mgr, mut cap) = fixture(vec![GossipTopic::BeaconBlock], params, false); mgr.set_synced(true); for i in 1..=4u8 { @@ -2703,7 +2741,7 @@ mod tests { params.gossip_threshold = -1.0; params.graylist_threshold = -1_000_000.0; params.d_high = 8; - let (mut mgr, mut cap) = fixture(vec![GossipTopic::BeaconBlock], params); + let (mut mgr, mut cap) = fixture(vec![GossipTopic::BeaconBlock], params, false); for i in 1..=2u8 { connect(&mut mgr, &mut cap, i as usize, i, now); @@ -2758,7 +2796,7 @@ mod tests { params.d_low = 0; params.d = 0; params.d_high = 8; - let (mut mgr, mut cap) = fixture(vec![GossipTopic::BeaconBlock], params); + let (mut mgr, mut cap) = fixture(vec![GossipTopic::BeaconBlock], params, false); mgr.set_synced(true); for i in 1..=3u8 { @@ -2867,7 +2905,7 @@ mod tests { let now = Instant::now(); let mut params = ScoreParams::default(); params.target_peers = 4; - let (mut mgr, mut cap) = fixture(vec![], params); + let (mut mgr, mut cap) = fixture(vec![], params, false); // ENR must carry an `eth2` field matching the fixture's [0u8;4] // fork digest now that `our_fork_digest` is always set. let enr = @@ -2916,7 +2954,7 @@ mod tests { let now = Instant::now(); let mut params = ScoreParams::default(); params.target_peers = 2; - let (mut mgr, mut cap) = fixture(vec![], params); + let (mut mgr, mut cap) = fixture(vec![], params, false); connect(&mut mgr, &mut cap, 1, 1, now); connect(&mut mgr, &mut cap, 2, 2, now); @@ -2941,7 +2979,7 @@ mod tests { params.graylist_threshold = -80.0; params.target_peers = 8; params.ip_ban_threshold = 1; // single eviction → BanIp for this test - let (mut mgr, mut cap) = fixture(vec![], params); + let (mut mgr, mut cap) = fixture(vec![], params, false); // Connect a peer on 10.0.0.42, drive their score below graylist, tick // to evict — that should record 10.0.0.42 in `banned_ips`. @@ -2983,7 +3021,7 @@ mod tests { // Decay must be slow enough that "5 invalid frames -> tick -> ban" // still trips the gate after the test's first tick. params.behaviour_penalty_decay = 0.999; - let (mut mgr, mut cap) = fixture(vec![], params); + let (mut mgr, mut cap) = fixture(vec![], params, false); connect(&mut mgr, &mut cap, 42, 42, now); for _ in 0..5 { @@ -3028,7 +3066,7 @@ mod tests { params.ip_ban_threshold = 3; // Slow decay so we don't flap above threshold between iterations. params.behaviour_penalty_decay = 0.999; - let (mut mgr, mut cap) = fixture(vec![], params); + let (mut mgr, mut cap) = fixture(vec![], params, false); let ip = std::net::Ipv4Addr::new(10, 0, 0, 99); @@ -3078,7 +3116,7 @@ mod tests { params.ip_ban_threshold = 2; params.banned_ip_ttl = Duration::from_secs(10); params.behaviour_penalty_decay = 0.999; - let (mut mgr, mut cap) = fixture(vec![], params); + let (mut mgr, mut cap) = fixture(vec![], params, false); let ip = std::net::Ipv4Addr::new(10, 0, 0, 88); // First eviction: bumps count to 1. No BanIp yet (threshold=2). @@ -3144,7 +3182,7 @@ mod tests { params.ip_ban_threshold = 1; params.banned_ip_ttl = Duration::from_secs(10); params.behaviour_penalty_decay = 0.999; - let (mut mgr, mut cap) = fixture(vec![], params); + let (mut mgr, mut cap) = fixture(vec![], params, false); connect(&mut mgr, &mut cap, 42, 42, now); for _ in 0..5 { @@ -3189,7 +3227,7 @@ mod tests { params.banned_ip_ttl = Duration::from_secs(60); params.ip_ban_threshold = 2; params.behaviour_penalty_decay = 0.999; - let (mut mgr, mut cap) = fixture(vec![], params); + let (mut mgr, mut cap) = fixture(vec![], params, false); connect(&mut mgr, &mut cap, 1, 1, now); for _ in 0..5 { @@ -3224,7 +3262,7 @@ mod tests { params.banned_peer_ttl = Duration::from_secs(3600); params.ip_ban_threshold = 100; params.behaviour_penalty_decay = 0.999; - let (mut mgr, mut cap) = fixture(vec![], params); + let (mut mgr, mut cap) = fixture(vec![], params, false); // Connect peer with seed=7 on 10.0.0.7 → drive below graylist → tick. connect(&mut mgr, &mut cap, 1, 7, now); @@ -3255,7 +3293,7 @@ mod tests { params.target_peers = 4; params.discovery_query_interval = Duration::from_secs(5); params.heartbeat_interval = Duration::from_millis(10); - let (mut mgr, mut cap) = fixture(vec![], params); + let (mut mgr, mut cap) = fixture(vec![], params, false); // First tick after construction: under target, throttle has elapsed // (last_discovery was set to construction time, query_interval=5s). @@ -3280,7 +3318,7 @@ mod tests { let now = Instant::now(); let mut params = ScoreParams::default(); params.target_peers = 4; - let (mut mgr, mut cap) = fixture(vec![], params); + let (mut mgr, mut cap) = fixture(vec![], params, false); mgr.set_fork_digest([0xAA, 0xBB, 0xCC, 0xDD]); let mut wrong_eth2 = [0u8; 16]; @@ -3301,7 +3339,7 @@ mod tests { let now = Instant::now(); let mut params = ScoreParams::default(); params.target_peers = 4; - let (mut mgr, mut cap) = fixture(vec![], params); + let (mut mgr, mut cap) = fixture(vec![], params, false); mgr.set_fork_digest([0xAA, 0xBB, 0xCC, 0xDD]); // ENR with no eth2 field — same drop policy as lighthouse. @@ -3321,7 +3359,7 @@ mod tests { params.target_peers = 2; params.max_priority_peers = 4; // Subscribe to attnet 5 — our required mask flips bit 5. - let (mut mgr, mut cap) = fixture(vec![GossipTopic::BeaconAttestation(5)], params); + let (mut mgr, mut cap) = fixture(vec![GossipTopic::BeaconAttestation(5)], params, false); connect(&mut mgr, &mut cap, 1, 1, now); connect(&mut mgr, &mut cap, 2, 2, now); cap.0.clear(); @@ -3353,7 +3391,7 @@ mod tests { let mut params = ScoreParams::default(); params.target_peers = 2; params.max_priority_peers = 2; // already at the priority cap - let (mut mgr, mut cap) = fixture(vec![GossipTopic::BeaconAttestation(5)], params); + let (mut mgr, mut cap) = fixture(vec![GossipTopic::BeaconAttestation(5)], params, false); connect(&mut mgr, &mut cap, 1, 1, now); connect(&mut mgr, &mut cap, 2, 2, now); cap.0.clear(); @@ -3375,7 +3413,7 @@ mod tests { fn rpc_fatal_misbehaviour_evicts_on_tick() { let now = Instant::now(); // Defaults: graylist_threshold = -80, Fatal delta = -200. - let (mut mgr, mut cap) = fixture(vec![], ScoreParams::default()); + let (mut mgr, mut cap) = fixture(vec![], ScoreParams::default(), false); connect(&mut mgr, &mut cap, 1, 1, now); cap.0.clear(); @@ -3399,7 +3437,7 @@ mod tests { fn rpc_low_tolerance_penalises_but_keeps_peer() { let now = Instant::now(); // Defaults: graylist_threshold = -80, Low delta = -10. - let (mut mgr, mut cap) = fixture(vec![], ScoreParams::default()); + let (mut mgr, mut cap) = fixture(vec![], ScoreParams::default(), false); connect(&mut mgr, &mut cap, 1, 1, now); mgr.handle_event( @@ -3426,7 +3464,7 @@ mod tests { let now = Instant::now(); // Eight Low reports at -10 each = -80, exactly at graylist (strict < // check, so push to nine to trip). - let (mut mgr, mut cap) = fixture(vec![], ScoreParams::default()); + let (mut mgr, mut cap) = fixture(vec![], ScoreParams::default(), false); connect(&mut mgr, &mut cap, 1, 1, now); for _ in 0..9 { @@ -3454,7 +3492,7 @@ mod tests { let mut params = ScoreParams::default(); params.behaviour_penalty_decay = 0.5; params.decay_to_zero = 0.01; - let (mut mgr, mut cap) = fixture(vec![], params); + let (mut mgr, mut cap) = fixture(vec![], params, false); connect(&mut mgr, &mut cap, 1, 1, now); mgr.handle_event(PeerEvent::P2pGossipInvalidFrame { p2p_peer: 1 }, now, &mut |c| { @@ -3520,7 +3558,7 @@ mod tests { #[test] fn p2p_status_fork_digest_mismatch_evicts() { let now = Instant::now(); - let (mut mgr, mut cap) = fixture(vec![], ScoreParams::default()); + let (mut mgr, mut cap) = fixture(vec![], ScoreParams::default(), false); connect(&mut mgr, &mut cap, 1, 1, now); mgr.set_fork_digest(fork_a()); cap.0.clear(); @@ -3539,7 +3577,7 @@ mod tests { #[test] fn p2p_status_matching_fork_digest_keeps_peer_and_parses_db() { let now = Instant::now(); - let (mut mgr, mut cap) = fixture(vec![], ScoreParams::default()); + let (mut mgr, mut cap) = fixture(vec![], ScoreParams::default(), false); connect(&mut mgr, &mut cap, 1, 1, now); mgr.set_fork_digest(fork_a()); @@ -3556,7 +3594,7 @@ mod tests { #[test] fn p2p_status_divergent_finalized_root_evicts() { let now = Instant::now(); - let (mut mgr, mut cap) = fixture(vec![], ScoreParams::default()); + let (mut mgr, mut cap) = fixture(vec![], ScoreParams::default(), false); connect(&mut mgr, &mut cap, 1, 1, now); set_local(&mut mgr, status_v2_ssz(fork_a(), [0xAA; 32], 42, [0xCC; 32], 42 * 32 + 5)); mgr.set_wall_slot(42 * 32 + 5); @@ -3582,7 +3620,7 @@ mod tests { #[test] fn p2p_status_same_finalized_root_accepted() { let now = Instant::now(); - let (mut mgr, mut cap) = fixture(vec![], ScoreParams::default()); + let (mut mgr, mut cap) = fixture(vec![], ScoreParams::default(), false); connect(&mut mgr, &mut cap, 1, 1, now); set_local(&mut mgr, status_v2_ssz(fork_a(), [0xAA; 32], 42, [0xCC; 32], 42 * 32 + 5)); mgr.set_wall_slot(42 * 32 + 5); @@ -3600,7 +3638,7 @@ mod tests { #[test] fn p2p_status_rejected_finalized_root_evicts() { let now = Instant::now(); - let (mut mgr, mut cap) = fixture(vec![], ScoreParams::default()); + let (mut mgr, mut cap) = fixture(vec![], ScoreParams::default(), false); connect(&mut mgr, &mut cap, 1, 1, now); set_local(&mut mgr, status_v2_ssz(fork_a(), [0xAA; 32], 42, [0xCC; 32], 42 * 32 + 5)); mgr.set_wall_slot(42 * 32 + 5); @@ -3633,7 +3671,7 @@ mod tests { // lockstep so ENR-discovery filter and inbound Status check never // diverge. let now = Instant::now(); - let (mut mgr, mut cap) = fixture(vec![], ScoreParams::default()); + let (mut mgr, mut cap) = fixture(vec![], ScoreParams::default(), false); connect(&mut mgr, &mut cap, 1, 1, now); set_local(&mut mgr, status_v2_ssz(fork_b(), [0; 32], 0, [0; 32], 0)); @@ -3666,7 +3704,7 @@ mod tests { #[test] fn select_target_no_local_state_returns_following() { - let (mut mgr, _cap) = fixture(vec![], ScoreParams::default()); + let (mut mgr, _cap) = fixture(vec![], ScoreParams::default(), false); assert!(mgr.maybe_emit_sync_target().is_none(), "no dirty signal"); mgr.target_dirty = true; // No local_state → can't decide; stays Following (and equals last @@ -3677,7 +3715,7 @@ mod tests { #[test] fn select_target_picks_max_vote_finalized_target() { let now = Instant::now(); - let (mut mgr, mut cap) = fixture(vec![], ScoreParams::default()); + let (mut mgr, mut cap) = fixture(vec![], ScoreParams::default(), false); set_snapshot(&mut mgr, /* finalized_epoch */ 10, /* head_slot */ 320); // 2 peers back finalized (epoch=20, root=X); 1 peer backs (epoch=15, root=Y). @@ -3703,7 +3741,7 @@ mod tests { #[test] fn select_target_skips_blacklisted_finalized() { let now = Instant::now(); - let (mut mgr, mut cap) = fixture(vec![], ScoreParams::default()); + let (mut mgr, mut cap) = fixture(vec![], ScoreParams::default(), false); set_snapshot(&mut mgr, 10, 320); let x = [0xBB; 32]; @@ -3726,7 +3764,7 @@ mod tests { #[test] fn select_target_pins_until_reached() { let now = Instant::now(); - let (mut mgr, mut cap) = fixture(vec![], ScoreParams::default()); + let (mut mgr, mut cap) = fixture(vec![], ScoreParams::default(), false); set_snapshot(&mut mgr, 10, 320); let x = [0xBB; 32]; @@ -3757,7 +3795,7 @@ mod tests { #[test] fn select_target_unpins_after_reaching_finalized() { let now = Instant::now(); - let (mut mgr, mut cap) = fixture(vec![], ScoreParams::default()); + let (mut mgr, mut cap) = fixture(vec![], ScoreParams::default(), false); set_snapshot(&mut mgr, 10, 320); let x = [0xBB; 32]; @@ -3782,7 +3820,7 @@ mod tests { // just not FFG-finalized). Must NOT trigger a SyncingFinalized // catch-up to re-download blocks we hold. let now = Instant::now(); - let (mut mgr, mut cap) = fixture(vec![], ScoreParams::default()); + let (mut mgr, mut cap) = fixture(vec![], ScoreParams::default(), false); // finalized_epoch=10, head_slot=700 (> 20 * 32 = 640). set_snapshot(&mut mgr, 10, 700); @@ -3803,7 +3841,7 @@ mod tests { // Pinned to a finalized target, then our head advances past its epoch // boundary without FFG-finalizing it → target is considered reached. let now = Instant::now(); - let (mut mgr, mut cap) = fixture(vec![], ScoreParams::default()); + let (mut mgr, mut cap) = fixture(vec![], ScoreParams::default(), false); set_snapshot(&mut mgr, 10, 320); let x = [0xBB; 32]; @@ -3826,7 +3864,7 @@ mod tests { #[test] fn select_target_falls_through_to_head_catchup() { let now = Instant::now(); - let (mut mgr, mut cap) = fixture(vec![], ScoreParams::default()); + let (mut mgr, mut cap) = fixture(vec![], ScoreParams::default(), false); // Finalized is current; head is behind. set_local(&mut mgr, status_v2_ssz(fork_a(), [0xAA; 32], 100, [0xCC; 32], 3200)); mgr.set_wall_slot(100_000); @@ -3862,7 +3900,7 @@ mod tests { #[test] fn select_target_following_when_no_one_is_ahead() { let now = Instant::now(); - let (mut mgr, mut cap) = fixture(vec![], ScoreParams::default()); + let (mut mgr, mut cap) = fixture(vec![], ScoreParams::default(), false); set_snapshot(&mut mgr, 100, 3200); // Peer is at the same finalized + head; nothing to chase. @@ -3881,7 +3919,7 @@ mod tests { #[test] fn select_target_emits_only_on_change() { let now = Instant::now(); - let (mut mgr, mut cap) = fixture(vec![], ScoreParams::default()); + let (mut mgr, mut cap) = fixture(vec![], ScoreParams::default(), false); set_snapshot(&mut mgr, 10, 320); let x = [0xBB; 32]; @@ -3898,10 +3936,31 @@ mod tests { assert_eq!(mgr.maybe_emit_sync_target(), None); } + #[test] + fn awaiting_local_replay_gates_syncreq_until_complete() { + let now = Instant::now(); + let (mut mgr, mut cap) = fixture(vec![], ScoreParams::default(), true); + set_snapshot(&mut mgr, 10, 320); + + // A peer is well ahead — target selection still runs during replay. + connect(&mut mgr, &mut cap, 1, 1, now); + send_status(&mut mgr, &mut cap, 1, make_status_v2(fork_a(), [0xBB; 32], 20, [0; 32], 640)); + assert!(matches!(mgr.maybe_emit_sync_target(), Some(SyncUpdate::SyncingFinalized { .. }))); + + // Gated: no BlocksByRange while the storage tile is still replaying. + mgr.maybe_issue_syncreq(now, &mut |c| cap.0.push(c)); + assert!(mgr.inflight_syncreq.is_none(), "no syncreq while awaiting replay"); + + // Released on the BS ReplayComplete signal — now it issues. + mgr.on_local_replay_complete(); + mgr.maybe_issue_syncreq(now, &mut |c| cap.0.push(c)); + assert!(mgr.inflight_syncreq.is_some(), "syncreq issues after replay completes"); + } + #[test] fn block_rejected_in_finalized_catchup_blacklists_target() { let now = Instant::now(); - let (mut mgr, mut cap) = fixture(vec![], ScoreParams::default()); + let (mut mgr, mut cap) = fixture(vec![], ScoreParams::default(), false); set_snapshot(&mut mgr, 10, 320); let bad_target = [0xBB; 32]; @@ -3952,7 +4011,7 @@ mod tests { // == target_root`. Guards against stuck-pinned off-by-one and against // an epoch match with a drifted root being treated as reached. let now = Instant::now(); - let (mut mgr, mut cap) = fixture(vec![], ScoreParams::default()); + let (mut mgr, mut cap) = fixture(vec![], ScoreParams::default(), false); let target = [0xBB; 32]; // Local: epoch 10, head 320, finalized_root [0xAA] (snapshot default). @@ -4002,7 +4061,7 @@ mod tests { // from a local apply-lag (peer delivered, head didn't advance). // Only the former scores + burns. let now = Instant::now(); - let (mut mgr, mut cap) = fixture(vec![], ScoreParams::default()); + let (mut mgr, mut cap) = fixture(vec![], ScoreParams::default(), false); let target = [0xBB; 32]; set_snapshot(&mut mgr, 10, 320); connect(&mut mgr, &mut cap, 1, 1, now); @@ -4047,7 +4106,7 @@ mod tests { // the delivered watermark is more than one batch ahead of head_slot, // issuance pauses until apply catches up. let now = Instant::now(); - let (mut mgr, mut cap) = fixture(vec![], ScoreParams::default()); + let (mut mgr, mut cap) = fixture(vec![], ScoreParams::default(), false); let target = [0xBB; 32]; set_snapshot(&mut mgr, 10, 320); // head_slot 320, never advances here connect(&mut mgr, &mut cap, 1, 1, now); @@ -4089,7 +4148,7 @@ mod tests { // re-issue from our apply-lagged head_slot and re-import the same // blocks. let now = Instant::now(); - let (mut mgr, mut cap) = fixture(vec![], ScoreParams::default()); + let (mut mgr, mut cap) = fixture(vec![], ScoreParams::default(), false); // Local well behind head; finalized current so head sync (not // finalized) is selected. wall_slot near head so the behind-wall // guard doesn't fire. @@ -4408,7 +4467,7 @@ mod tests { // Following + head trailing wall_slot by > head_lag_threshold_slots // ⇒ fell_behind(). Strict `>` at the boundary; gated on `is_synced` // and on having a local status. - let (mut mgr, _cap) = fixture(vec![], ScoreParams::default()); + let (mut mgr, _cap) = fixture(vec![], ScoreParams::default(), false); // No local status yet → false regardless of flag. assert!(!mgr.fell_behind()); @@ -4442,7 +4501,7 @@ mod tests { // march its head over the empty genesis→wall gap). Once caught up it // latches synced, and a later empty-slot tail keeps `Following`. let now = Instant::now(); - let (mut mgr, mut cap) = fixture(vec![], ScoreParams::default()); + let (mut mgr, mut cap) = fixture(vec![], ScoreParams::default(), false); set_local(&mut mgr, status_v2_ssz(fork_a(), [0xAA; 32], 0, [0; 32], 0)); mgr.set_wall_slot(100_000); @@ -4478,7 +4537,7 @@ mod tests { #[test] fn select_target_filters_too_far_ahead_finalized() { let now = Instant::now(); - let (mut mgr, mut cap) = fixture(vec![], ScoreParams::default()); + let (mut mgr, mut cap) = fixture(vec![], ScoreParams::default(), false); set_local(&mut mgr, status_v2_ssz(fork_a(), [0xAA; 32], 10, [0xCC; 32], 320)); mgr.set_wall_slot(320); @@ -4500,7 +4559,7 @@ mod tests { #[test] fn peer_data_columns_prioritization_and_slot_reservation() { let now = Instant::now(); - let (mut mgr, mut cap) = fixture(vec![], ScoreParams::default()); + let (mut mgr, mut cap) = fixture(vec![], ScoreParams::default(), false); // Connect and identify Peer 1 let kp1 = Keypair::from_secret(&[1u8; 32]).unwrap(); diff --git a/crates/peer/src/manager/rpc.rs b/crates/peer/src/manager/rpc.rs index 72c1588..dab3b60 100644 --- a/crates/peer/src/manager/rpc.rs +++ b/crates/peer/src/manager/rpc.rs @@ -593,6 +593,10 @@ impl PeerManager { /// BlocksByRange capacity, build the SSZ request, emit it, and set /// `inflight_syncreq`. pub fn maybe_issue_syncreq(&mut self, now: Instant, emit: &mut impl FnMut(PeerControl)) { + if self.awaiting_local_replay { + return; + } + // Columns first, unconditionally: the early returns below (flow // control, target reached) must not starve the column driver — // with the DA check, missing columns are exactly what stalls the diff --git a/crates/storage/src/lib.rs b/crates/storage/src/lib.rs index f3a7599..75f221e 100644 --- a/crates/storage/src/lib.rs +++ b/crates/storage/src/lib.rs @@ -4,3 +4,4 @@ pub mod tile; pub mod util; pub use counters::StorageCounters; +pub use store::latest_local_checkpoint; diff --git a/crates/storage/src/store.rs b/crates/storage/src/store.rs index 7eb98a7..638ee36 100644 --- a/crates/storage/src/store.rs +++ b/crates/storage/src/store.rs @@ -22,6 +22,7 @@ mod checkpoint; mod io; use checkpoint::CheckpointWriter; +pub use checkpoint::latest_local_checkpoint; /// `DataColumnSidecarsByRange` is bounded by /// `count * NUMBER_OF_COLUMNS <= MAX_REQUEST_DATA_COLUMN_SIDECARS` @@ -476,6 +477,35 @@ impl Store { &self.head_root } + pub(super) fn replay_block_paths(&self, custody: u128) -> Vec<(u64, PathBuf, bool)> { + let checkpoint_slot = self.last_persisted_finalized_slot; + let mut paths = Vec::with_capacity(self.unfinalized.len()); + for (block_root, block) in &self.unfinalized { + if block.slot > checkpoint_slot { + let mask = self.unfinalized_columns.get(block_root).map_or(0, |&(_, m)| m); + paths.push(( + block.slot, + self.unfinalized_dir().join(io::unfinalized_name( + block.slot, + &block.parent_root, + block_root, + )), + mask & custody == custody, + )); + } + } + for &slot in self.root_index.values() { + if slot > checkpoint_slot { + paths.push(( + slot, + self.block_slot_dir(slot).join(format!("{slot}_block.ssz")), + true, + )); + } + } + paths + } + #[timed] pub(super) fn rpc_request( &mut self, diff --git a/crates/storage/src/store/checkpoint.rs b/crates/storage/src/store/checkpoint.rs index ebd9efa..25f2c84 100644 --- a/crates/storage/src/store/checkpoint.rs +++ b/crates/storage/src/store/checkpoint.rs @@ -89,6 +89,15 @@ fn committed_checkpoint_slots(dir: &Path) -> Vec { slots } +pub fn latest_local_checkpoint(store_dir: &str) -> Option<(u64, PathBuf, Option)> { + let dir = Path::new(store_dir).join(FINALIZED_CHECKPOINTS_DIR); + let slot = *committed_checkpoint_slots(&dir).first()?; + let slot_dir = dir.join(slot.to_string()); + let pubkeys = slot_dir.join(format!("{slot}.pubkeys")); + let pubkeys = pubkeys.exists().then_some(pubkeys); + Some((slot, slot_dir.join(format!("{slot}.ssz")), pubkeys)) +} + /// Initialise the checkpoints dir on load: create it, drop incomplete dirs /// (crashed mid-write, no committed `.ssz`), prune to the newest /// MAX_FINALIZED_CHECKPOINTS, and return the newest committed slot (0 if none). diff --git a/crates/storage/src/tile.rs b/crates/storage/src/tile.rs index 40d10ca..cc91083 100644 --- a/crates/storage/src/tile.rs +++ b/crates/storage/src/tile.rs @@ -1,11 +1,17 @@ -use std::time::{Duration, Instant}; +use std::{ + collections::VecDeque, + fs::File, + io::Read, + path::PathBuf, + time::{Duration, Instant}, +}; use flux::{spine::SpineAdapter, tile::Tile}; use silver_beacon_state_data::{BeaconStateReader, SLOTS_PER_EPOCH}; use silver_common::{ BASE_REQUEST_ID, BeaconStateEvent, DataColumnsAvailable, NewGossipMsg, P2pSend, P2pStreamId, - PeerEvent, RpcInbound, RpcSeverity, SilverSpine, StreamProtocol, SyncUpdate, TMultiProducer, - TRandomAccess, TRead, Wheel, + PeerEvent, ReplayBlock, RpcInbound, RpcSeverity, SilverSpine, StreamProtocol, SyncUpdate, + TCacheProducer, TMultiProducer, TProducer, TRandomAccess, TRead, Wheel, ssz_view::{DataColumnSidecarView, SignedBeaconBlockView, StatusView}, }; use silver_metrics::timed; @@ -13,6 +19,8 @@ use silver_metrics::timed; use crate::{StorageCounters, store::Store, util}; const MAX_RETRIES: u8 = 5; +const MAX_REPLAY_BLOCKS_PER_LOOP: usize = 16; + /// Persist a finalized-state checkpoint only when within this many slots of /// the wall-clock head (i.e. not fast-syncing) — avoids stalling the writer /// and re-encoding every intermediate finalized epoch while catching up. @@ -66,6 +74,10 @@ pub struct StorageTile { // we are caught up to head; consumed when a persist is started (the // in-flight checkpoint then lives on the `Store`, driven by `file_io`). persist_pending: bool, + + replay_blocks: VecDeque<(PathBuf, bool)>, + replay_producer: TProducer, + replay_done: bool, } impl StorageTile { @@ -76,13 +88,23 @@ impl StorageTile { rpc_consumer: TRandomAccess, persist_rpc_consumer: TRandomAccess, rpc_producer: TMultiProducer, + replay_producer: TProducer, beacon_state: BeaconStateReader, custody_group_columns: u128, fork_digest: [u8; 4], data_store_dir: String, + replay_from_disk: bool, ) -> Self { let store = Store::load(data_store_dir).expect("failed to load storage store"); let checkpointed_epoch = store.last_persisted_finalized_slot() / SLOTS_PER_EPOCH; + let replay_blocks = if replay_from_disk { + let mut paths = store.replay_block_paths(custody_group_columns); + paths.sort_unstable_by_key(|(slot, _, _)| *slot); + paths.into_iter().map(|(_, path, cols)| (path, cols)).collect() + } else { + VecDeque::new() + }; + Self { custody_group_columns, request_id: BASE_REQUEST_ID, @@ -99,6 +121,72 @@ impl StorageTile { outstanding_requests: Wheel::new(Duration::from_millis(100)), checkpointed_epoch, persist_pending: false, + replay_blocks, + replay_producer, + replay_done: !replay_from_disk, + } + } + + fn drive_replay(&mut self, adapter: &mut SpineAdapter) { + if self.replay_done { + return; + } + + let mut sent = 0; + while sent < MAX_REPLAY_BLOCKS_PER_LOOP { + let Some(&(ref path, cols_on_disk)) = self.replay_blocks.front() else { + break; + }; + + let (mut file, len) = match File::open(path).and_then(|f| { + let len = f.metadata()?.len() as usize; + Ok((f, len)) + }) { + Ok(pair) => pair, + Err(e) => { + tracing::warn!(?e, ?path, "replay block open failed; skipping"); + self.replay_blocks.pop_front(); + continue; + } + }; + + let Some(mut reservation) = self.replay_producer.reserve(len, true) else { + return; // tcache full — retry next loop + }; + let buf = match reservation.buffer() { + Ok(buf) => buf, + Err(e) => { + tracing::error!(?e, "replay reservation buffer failed; skipping"); + self.replay_blocks.pop_front(); + continue; + } + }; + if let Err(e) = file.read_exact(&mut buf[..len]) { + tracing::error!(?e, ?path, "replay block read failed; skipping"); + self.replay_blocks.pop_front(); + continue; + } + + // TODO: request the missing columns instead of dropping? + let block = &buf[..len]; + if !cols_on_disk && + SignedBeaconBlockView::check_size(block) && + SignedBeaconBlockView::has_data_columns(block) + { + tracing::warn!(?path, "replay skip: custody columns missing on disk"); + self.replay_blocks.pop_front(); + continue; + } + + reservation.increment_offset(len); + adapter.produce(ReplayBlock::Block { ssz: reservation.read() }); + self.replay_blocks.pop_front(); + sent += 1; + } + + if self.replay_blocks.is_empty() { + adapter.produce(ReplayBlock::Done); + self.replay_done = true; } } @@ -346,6 +434,8 @@ impl StorageTile { impl Tile for StorageTile { fn loop_body(&mut self, adapter: &mut SpineAdapter) { + self.drive_replay(adapter); + self.gossip_consumer.free(); self.rpc_consumer.free(); self.persist_gossip_consumer.free(); @@ -602,6 +692,7 @@ mod tests { persist_rpc_tc.cache_ref().random_access("persist_rpc_cons", true).unwrap(); let rpc_producer = TCache::multi_producer("rpc_out", 1024 * 1024); + let replay_producer = TCache::producer("replay_out", 1024 * 1024); let beacon_state = BeaconStateOwner::pre_bootstrap().reader(); @@ -611,10 +702,12 @@ mod tests { rpc_consumer, persist_rpc_consumer, rpc_producer, + replay_producer, beacon_state, custody_columns, [1, 2, 3, 4], store_dir.clone(), + false, ); let mut block_bytes = vec![0u8; 784]; @@ -695,4 +788,114 @@ mod tests { let _ = std::fs::remove_dir_all(&store_dir); } + + /// Synthetic SignedBeaconBlock: message at 100, slot at [100..108), body + /// at 184. `has_data_columns` is `blob_kzg_commitments_offset < + /// execution_requests_offset`, so equal offsets ⇒ no columns. + fn make_block(slot: u64, with_data_columns: bool) -> Vec { + let mut b = vec![0u8; 784]; + b[0..4].copy_from_slice(&100u32.to_le_bytes()); + b[100..108].copy_from_slice(&slot.to_le_bytes()); + b[180..184].copy_from_slice(&84u32.to_le_bytes()); + let (blob_off, exec_off): (u32, u32) = + if with_data_columns { (400, 500) } else { (500, 500) }; + b[184 + 388..184 + 392].copy_from_slice(&blob_off.to_le_bytes()); + b[184 + 392..184 + 396].copy_from_slice(&exec_off.to_le_bytes()); + b + } + + #[test] + fn replay_skips_blocks_missing_custody_columns() { + // Checkpoint at slot 32 plus three unfinalized blocks above it. Custody + // set = {3, 7}. Needs-columns comes from the block bytes, presence + // from the disk bitmask: + // slot 33 — has columns, full custody set on disk ⇒ replayed + // slot 34 — has columns, only column 3 on disk ⇒ skipped + // slot 35 — columnless block, nothing on disk ⇒ replayed + // Only the unavailable block is dropped; replay continues past it and + // ends with Done so the peer manager resyncs the gap. + let custody = (1u128 << 3) | (1u128 << 7); + let store_dir = format!("/tmp/test_storage_replay_da_{}", rand::random::()); + let _ = std::fs::remove_dir_all(&store_dir); + + // Committed-checkpoint marker → last_persisted_finalized_slot = 32. + let ckpt = format!("{store_dir}/finalized_checkpoints/32"); + std::fs::create_dir_all(&ckpt).unwrap(); + std::fs::write(format!("{ckpt}/32.ssz"), b"x").unwrap(); + + // Unfinalized blocks: `__.ssz`. The root in the + // name keys the column bitmask; needs-columns is parsed from the bytes. + let unfin = format!("{store_dir}/unfinalized"); + std::fs::create_dir_all(&unfin).unwrap(); + let (root_a, root_b, root_c) = ("a".repeat(64), "b".repeat(64), "c".repeat(64)); + for (slot, root, dc) in [(33, &root_a, true), (34, &root_b, true), (35, &root_c, false)] { + std::fs::write( + format!("{unfin}/{slot}_{}_{}.ssz", "0".repeat(64), root), + make_block(slot, dc), + ) + .unwrap(); + } + + // Custody columns on disk: `__.ssz`. + let cols = format!("{store_dir}/unfinalized_columns"); + std::fs::create_dir_all(&cols).unwrap(); + for col in [3, 7] { + std::fs::write(format!("{cols}/33_{root_a}_{col}.ssz"), b"c").unwrap(); + } + std::fs::write(format!("{cols}/34_{root_b}_3.ssz"), b"c").unwrap(); // partial + + let gossip_tc = TCache::producer("g", 1 << 20); + let pg_tc = TCache::producer("pg", 1 << 20); + let rpc_tc = TCache::producer("r", 1 << 20); + let pr_tc = TCache::producer("pr", 1 << 20); + let mut tile = StorageTile::new( + gossip_tc.cache_ref().random_access("g", true).unwrap(), + pg_tc.cache_ref().random_access("pg", true).unwrap(), + rpc_tc.cache_ref().random_access("r", true).unwrap(), + pr_tc.cache_ref().random_access("pr", true).unwrap(), + TCache::multi_producer("rpc_out", 1 << 20), + TCache::producer("replay_out", 1 << 20), + BeaconStateOwner::pre_bootstrap().reader(), + custody, + [1, 2, 3, 4], + store_dir.clone(), + true, + ); + assert_eq!(tile.replay_blocks.len(), 3, "skip decided at replay, not load"); + + // Spine + injector: the tile produces, the injector drains. + let base = std::env::temp_dir().join(format!("silver-replay-da-{}", rand::random::())); + std::fs::create_dir_all(&base).unwrap(); + let mut spine = Box::new(SilverSpine::new_with_base_dir(&base, None)); + let mut tile_adapter = SpineAdapter::connect_tile(&tile, &mut spine); + let inj = Injector; + let mut inj_adapter = SpineAdapter::connect_tile(&inj, &mut spine); + // Prime injector cursors while queues are empty. + inj_adapter.consume(|_: DataColumnsAvailable, _| {}); + inj_adapter.consume(|_: ReplayBlock, _| {}); + + tile.drive_replay(&mut tile_adapter); + + let mut das = 0; + inj_adapter.consume(|_: DataColumnsAvailable, _| das += 1); + let (mut blocks, mut done) = (0, 0); + inj_adapter.consume(|m: ReplayBlock, _| match m { + ReplayBlock::Block { .. } => blocks += 1, + ReplayBlock::Done => done += 1, + }); + + assert_eq!(das, 0, "replay emits no separate DataColumnsAvailable event"); + assert_eq!(blocks, 2, "slots 33 and 35 replayed; 34 skipped"); + assert_eq!(done, 1, "replay terminated with Done"); + assert!(tile.replay_done); + + let _ = std::fs::remove_dir_all(&store_dir); + let _ = std::fs::remove_dir_all(&base); + } + + struct Injector; + + impl Tile for Injector { + fn loop_body(&mut self, _: &mut SpineAdapter) {} + } }