Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 5 additions & 7 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ use crate::validator_monitor::{
};
use crate::validator_pubkey_cache::ValidatorPubkeyCache;
use crate::{
AvailabilityPendingExecutedBlock, BeaconChainError, BeaconForkChoiceStore, BeaconSnapshot,
CachedHead, metrics,
AvailabilityPendingExecutedBlock, BalancesCache, BeaconChainError, BeaconSnapshot, CachedHead,
metrics,
};
use eth2::types::{
EventKind, SseBlobSidecar, SseBlock, SseDataColumnSidecar, SseExtendedPayloadAttributes,
Expand Down Expand Up @@ -134,7 +134,7 @@ use types::data_column_sidecar::ColumnIndex;
use types::payload::BlockProductionVersion;
use types::*;

pub type ForkChoiceError = fork_choice::Error<crate::ForkChoiceStoreError>;
pub type ForkChoiceError = fork_choice::Error;

/// Alias to appease clippy.
type HashBlockTuple<E> = (Hash256, RpcBlock<E>);
Expand Down Expand Up @@ -344,7 +344,7 @@ pub enum BlockProcessStatus<E: EthSpec> {
pub type LightClientProducerEvent<T> = (Hash256, Slot, SyncAggregate<T>);

pub type BeaconForkChoice<T> = ForkChoice<
BeaconForkChoiceStore<
BalancesCache<
<T as BeaconChainTypes>::EthSpec,
<T as BeaconChainTypes>::HotStore,
<T as BeaconChainTypes>::ColdStore,
Expand Down Expand Up @@ -619,13 +619,11 @@ impl<T: BeaconChainTypes> BeaconChain<T> {

let persisted_fork_choice =
PersistedForkChoice::from_bytes(&persisted_fork_choice_bytes, store.get_config())?;
let fc_store =
BeaconForkChoiceStore::from_persisted(persisted_fork_choice.fork_choice_store, store)?;

Ok(Some(ForkChoice::from_persisted(
persisted_fork_choice.fork_choice,
reset_payload_statuses,
fc_store,
BalancesCache::new(store.clone()),
spec,
)?))
}
Expand Down
177 changes: 51 additions & 126 deletions beacon_node/beacon_chain/src/beacon_fork_choice_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
//! Additionally, the `BalancesCache` struct is defined; a cache designed to avoid database
//! reads when fork choice requires the validator balances of the justified state.

use crate::{BeaconSnapshot, metrics};
use crate::BeaconSnapshot;
use derivative::Derivative;
use fork_choice::ForkChoiceStore;
use fork_choice::{BalancesGetter, Error as ForkChoiceError};
use proto_array::JustifiedBalances;
use safe_arith::ArithError;
use ssz_derive::{Decode, Encode};
Expand All @@ -16,8 +16,7 @@ use std::sync::Arc;
use store::{Error as StoreError, HotColdDB, ItemStore};
use superstruct::superstruct;
use types::{
AbstractExecPayload, BeaconBlockRef, BeaconState, BeaconStateError, Checkpoint, Epoch, EthSpec,
FixedBytesExtended, Hash256, Slot,
BeaconState, BeaconStateError, Checkpoint, Epoch, EthSpec, FixedBytesExtended, Hash256, Slot,
};

#[derive(Debug)]
Expand Down Expand Up @@ -64,17 +63,22 @@ pub(crate) type CacheItem = CacheItemV8;
variant_attributes(derive(PartialEq, Clone, Default, Debug, Encode, Decode)),
no_enum
)]
pub struct BalancesCache {
pub struct BalancesCacheInner {
pub(crate) items: Vec<CacheItemV8>,
}

pub type BalancesCache = BalancesCacheV8;
pub type BalancesCacheInner = BalancesCacheInnerV8;

impl BalancesCache {
pub struct BalancesCache<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> {
pub(crate) cache: BalancesCacheInner,
pub(crate) store: Arc<HotColdDB<E, Hot, Cold>>,
}

impl BalancesCacheInner {
/// Inspect the given `state` and determine the root of the block at the first slot of
/// `state.current_epoch`. If there is not already some entry for the given block root, then
/// add the effective balances from the `state` to the cache.
pub fn process_state<E: EthSpec>(
fn process_state<E: EthSpec>(
&mut self,
block_root: Hash256,
state: &BeaconState<E>,
Expand Down Expand Up @@ -119,20 +123,56 @@ impl BalancesCache {
/// Get the balances for the given `block_root`, if any.
///
/// If some balances are found, they are cloned from the cache.
pub fn get(&mut self, block_root: Hash256, epoch: Epoch) -> Option<Vec<u64>> {
fn get(&self, block_root: Hash256, epoch: Epoch) -> Option<Vec<u64>> {
let i = self.position(block_root, epoch)?;
Some(self.items[i].balances.clone())
}
}

impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BalancesCache<E, Hot, Cold> {
pub fn new(store: Arc<HotColdDB<E, Hot, Cold>>) -> Self {
Self {
store,
cache: <_>::default(),
}
}
}

impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BalancesGetter
for BalancesCache<E, Hot, Cold>
{
fn get_balances(
&self,
state_root: Hash256,
checkpoint: Checkpoint,
) -> Result<JustifiedBalances, ForkChoiceError> {
if let Some(balances) = self.cache.get(checkpoint.root, checkpoint.epoch) {
return Ok(JustifiedBalances::from_effective_balances(balances)
.map_err(|e| format!("Invalid cached balances {e:?}"))?);
}

let state_slot = checkpoint.epoch.start_slot(E::slots_per_epoch());
let state = self
.store
.get_state(&state_root, Some(state_slot), false)
.map_err(|e| format!("Error getting state {state_root:?} {e:?}"))?
.ok_or_else(|| {
ForkChoiceError::BalancesGetterError(format!("Missing state {state_root:?}"))
})?;
let balances = JustifiedBalances::from_justified_state(&state)
.map_err(|e| format!("Invalid state balances {e:?}"))?;
return Ok(balances);
}
}

/// Implements `fork_choice::ForkChoiceStore` in order to provide a persistent backing to the
/// `fork_choice::ForkChoice` struct.
#[derive(Debug, Derivative)]
#[derivative(PartialEq(bound = "E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>"))]
pub struct BeaconForkChoiceStore<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> {
#[derivative(PartialEq = "ignore")]
store: Arc<HotColdDB<E, Hot, Cold>>,
balances_cache: BalancesCache,
balances_cache: BalancesCacheInner,
time: Slot,
finalized_checkpoint: Checkpoint,
justified_checkpoint: Checkpoint,
Expand Down Expand Up @@ -292,121 +332,6 @@ where
}
}

impl<E, Hot, Cold> ForkChoiceStore<E> for BeaconForkChoiceStore<E, Hot, Cold>
where
E: EthSpec,
Hot: ItemStore<E>,
Cold: ItemStore<E>,
{
type Error = Error;

fn get_current_slot(&self) -> Slot {
self.time
}

fn set_current_slot(&mut self, slot: Slot) {
self.time = slot
}

fn on_verified_block<Payload: AbstractExecPayload<E>>(
&mut self,
_block: BeaconBlockRef<E, Payload>,
block_root: Hash256,
state: &BeaconState<E>,
) -> Result<(), Self::Error> {
self.balances_cache.process_state(block_root, state)
}

fn justified_checkpoint(&self) -> &Checkpoint {
&self.justified_checkpoint
}

fn justified_state_root(&self) -> Hash256 {
self.justified_state_root
}

fn justified_balances(&self) -> &JustifiedBalances {
&self.justified_balances
}

fn finalized_checkpoint(&self) -> &Checkpoint {
&self.finalized_checkpoint
}

fn unrealized_justified_checkpoint(&self) -> &Checkpoint {
&self.unrealized_justified_checkpoint
}

fn unrealized_justified_state_root(&self) -> Hash256 {
self.unrealized_justified_state_root
}

fn unrealized_finalized_checkpoint(&self) -> &Checkpoint {
&self.unrealized_finalized_checkpoint
}

fn proposer_boost_root(&self) -> Hash256 {
self.proposer_boost_root
}

fn set_finalized_checkpoint(&mut self, checkpoint: Checkpoint) {
self.finalized_checkpoint = checkpoint
}

fn set_justified_checkpoint(
&mut self,
checkpoint: Checkpoint,
justified_state_root: Hash256,
) -> Result<(), Error> {
self.justified_checkpoint = checkpoint;
self.justified_state_root = justified_state_root;

if let Some(balances) = self.balances_cache.get(
self.justified_checkpoint.root,
self.justified_checkpoint.epoch,
) {
// NOTE: could avoid this re-calculation by introducing a `PersistedCacheItem`.
metrics::inc_counter(&metrics::BALANCES_CACHE_HITS);
self.justified_balances = JustifiedBalances::from_effective_balances(balances)?;
} else {
metrics::inc_counter(&metrics::BALANCES_CACHE_MISSES);

// Justified state is reasonably useful to cache, it might be finalized soon.
let update_cache = true;
let state = self
.store
.get_hot_state(&self.justified_state_root, update_cache)
.map_err(Error::FailedToReadState)?
.ok_or(Error::MissingState(self.justified_state_root))?;

self.justified_balances = JustifiedBalances::from_justified_state(&state)?;
}

Ok(())
}

fn set_unrealized_justified_checkpoint(&mut self, checkpoint: Checkpoint, state_root: Hash256) {
self.unrealized_justified_checkpoint = checkpoint;
self.unrealized_justified_state_root = state_root;
}

fn set_unrealized_finalized_checkpoint(&mut self, checkpoint: Checkpoint) {
self.unrealized_finalized_checkpoint = checkpoint;
}

fn set_proposer_boost_root(&mut self, proposer_boost_root: Hash256) {
self.proposer_boost_root = proposer_boost_root;
}

fn equivocating_indices(&self) -> &BTreeSet<u64> {
&self.equivocating_indices
}

fn extend_equivocating_indices(&mut self, indices: impl IntoIterator<Item = u64>) {
self.equivocating_indices.extend(indices);
}
}

pub type PersistedForkChoiceStore = PersistedForkChoiceStoreV28;

/// A container which allows persisting the `BeaconForkChoiceStore` to the on-disk database.
Expand All @@ -418,7 +343,7 @@ pub type PersistedForkChoiceStore = PersistedForkChoiceStoreV28;
pub struct PersistedForkChoiceStore {
/// The balances cache was removed from disk storage in schema V28.
#[superstruct(only(V17))]
pub balances_cache: BalancesCacheV8,
pub balances_cache: BalancesCacheInnerV8,
pub time: Slot,
pub finalized_checkpoint: Checkpoint,
pub justified_checkpoint: Checkpoint,
Expand Down
43 changes: 16 additions & 27 deletions beacon_node/beacon_chain/src/builder.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use crate::ChainConfig;
use crate::CustodyContext;
use crate::beacon_chain::{
BEACON_CHAIN_DB_KEY, CanonicalHead, LightClientProducerEvent, OP_POOL_DB_KEY,
BEACON_CHAIN_DB_KEY, BeaconForkChoice, CanonicalHead, LightClientProducerEvent, OP_POOL_DB_KEY,
};
use crate::beacon_fork_choice_store::BalancesCache;
use crate::beacon_proposer_cache::BeaconProposerCache;
use crate::custody_context::NodeCustodyType;
use crate::data_availability_checker::DataAvailabilityChecker;
Expand All @@ -18,9 +19,7 @@ use crate::persisted_custody::load_custody_context;
use crate::shuffling_cache::{BlockShufflingIds, ShufflingCache};
use crate::validator_monitor::{ValidatorMonitor, ValidatorMonitorConfig};
use crate::validator_pubkey_cache::ValidatorPubkeyCache;
use crate::{
BeaconChain, BeaconChainTypes, BeaconForkChoiceStore, BeaconSnapshot, ServerSentEventHandler,
};
use crate::{BeaconChain, BeaconChainTypes, BeaconSnapshot, ServerSentEventHandler};
use execution_layer::ExecutionLayer;
use fork_choice::{ForkChoice, ResetPayloadStatuses};
use futures::channel::mpsc::Sender;
Expand Down Expand Up @@ -81,9 +80,7 @@ pub struct BeaconChainBuilder<T: BeaconChainTypes> {
genesis_block_root: Option<Hash256>,
genesis_state_root: Option<Hash256>,
#[allow(clippy::type_complexity)]
fork_choice: Option<
ForkChoice<BeaconForkChoiceStore<T::EthSpec, T::HotStore, T::ColdStore>, T::EthSpec>,
>,
fork_choice: Option<BeaconForkChoice<T>>,
op_pool: Option<OperationPool<T::EthSpec>>,
execution_layer: Option<ExecutionLayer<T::EthSpec>>,
event_handler: Option<ServerSentEventHandler<T::EthSpec>>,
Expand Down Expand Up @@ -380,7 +377,7 @@ where
.map_err(|e| format!("Failed to initialize genesis anchor: {:?}", e))?,
);

let (genesis, updated_builder) = self.set_genesis_state(beacon_state)?;
let (mut genesis, updated_builder) = self.set_genesis_state(beacon_state)?;
self = updated_builder;

// Stage the database's metadata fields for atomic storage when `build` is called.
Expand All @@ -395,16 +392,15 @@ where
.map_err(|e| format!("Failed to initialize genesis data column info: {:?}", e))?,
);

let fc_store = BeaconForkChoiceStore::get_forkchoice_store(store, genesis.clone())
.map_err(|e| format!("Unable to initialize fork choice store: {e:?}"))?;
let current_slot = None;
let balances_getter = BalancesCache::new(store);
let current_slot = Slot::new(0);

let fork_choice = ForkChoice::from_anchor(
fc_store,
genesis.beacon_block_root,
&genesis.beacon_block,
&genesis.beacon_state,
&mut genesis.beacon_state,
current_slot,
balances_getter,
&self.spec,
)
.map_err(|e| format!("Unable to initialize ForkChoice: {:?}", e))?;
Expand Down Expand Up @@ -611,21 +607,14 @@ where
.map_err(|e| format!("Failed to initialize data column info: {:?}", e))?,
);

let snapshot = BeaconSnapshot {
beacon_block_root: weak_subj_block_root,
beacon_block: Arc::new(weak_subj_block),
beacon_state: weak_subj_state,
};

let fc_store = BeaconForkChoiceStore::get_forkchoice_store(store, snapshot.clone())
.map_err(|e| format!("Unable to initialize fork choice store: {e:?}"))?;
let balances_getter = BalancesCache::new(store);

let fork_choice = ForkChoice::from_anchor(
fc_store,
snapshot.beacon_block_root,
&snapshot.beacon_block,
&snapshot.beacon_state,
Some(weak_subj_slot),
weak_subj_block_root,
&weak_subj_block,
&mut weak_subj_state,
weak_subj_slot,
balances_getter,
&self.spec,
)
.map_err(|e| format!("Unable to initialize ForkChoice: {:?}", e))?;
Expand Down Expand Up @@ -797,7 +786,7 @@ where
head_block_root,
&head_state,
store.clone(),
Some(current_slot),
current_slot,
&self.spec,
)?;
}
Expand Down
Loading
Loading