diff --git a/.github/workflows/test-suite.yml b/.github/workflows/test-suite.yml index 1d66bd30e78..c3a56a86a70 100644 --- a/.github/workflows/test-suite.yml +++ b/.github/workflows/test-suite.yml @@ -148,6 +148,31 @@ jobs: cache-provider: warpbuild - name: Run beacon_chain tests for all known forks run: make test-beacon-chain + beacon-chain-store-tests-static-cold: + name: beacon-chain-store-tests-static-cold + needs: [check-labels] + if: needs.check-labels.outputs.skip_ci != 'true' + runs-on: ${{ github.repository == 'sigp/lighthouse' && 'warp-ubuntu-latest-x64-8x;snapshot.key=lighthouse-ubuntu-latest-v1' || 'ubuntu-latest' }} + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + steps: + - uses: actions/checkout@v5 + - if: github.repository != 'sigp/lighthouse' + name: Get latest version of stable Rust + uses: moonrepo/setup-rust@v1 + with: + channel: stable + cache-target: release + bins: cargo-nextest + - if: github.repository == 'sigp/lighthouse' + uses: Swatinem/rust-cache@v2 + with: + cache-provider: warpbuild + - name: Run beacon_chain store_tests against the static cold backend + env: + COLD_BACKEND: static + FORK_NAME: fulu + run: cargo nextest run --release --features "fork_from_env,slasher/lmdb,$TEST_FEATURES" -p beacon_chain --test beacon_chain_tests -E 'test(/^store_tests::/)' --no-fail-fast http-api-tests: name: http-api-tests needs: [check-labels] @@ -493,6 +518,7 @@ jobs: 'forbidden-files-check', 'release-tests-ubuntu', 'beacon-chain-tests', + 'beacon-chain-store-tests-static-cold', 'op-pool-tests', 'network-tests', 'slasher-tests', diff --git a/Cargo.lock b/Cargo.lock index 078f699f3c8..eba135b0152 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8485,6 +8485,7 @@ dependencies = [ "safe_arith", "serde", "smallvec", + "snap", "ssz_types", "state_processing", "strum", diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index f618cf63217..8e996f5ef3b 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -320,7 +320,7 @@ pub enum StateSkipConfig { pub trait BeaconChainTypes: Send + Sync + 'static { type HotStore: store::ItemStore; - type ColdStore: store::ItemStore; + type ColdStore: store::ColdStore; type SlotClock: slot_clock::SlotClock; type EthSpec: types::EthSpec; } diff --git a/beacon_node/beacon_chain/src/beacon_fork_choice_store.rs b/beacon_node/beacon_chain/src/beacon_fork_choice_store.rs index 95fde28f5b2..6d63aa6136e 100644 --- a/beacon_node/beacon_chain/src/beacon_fork_choice_store.rs +++ b/beacon_node/beacon_chain/src/beacon_fork_choice_store.rs @@ -14,7 +14,7 @@ use ssz_derive::{Decode, Encode}; use std::collections::BTreeSet; use std::marker::PhantomData; use std::sync::Arc; -use store::{Error as StoreError, HotColdDB, ItemStore}; +use store::{ColdStore, Error as StoreError, HotColdDB, ItemStore}; use superstruct::superstruct; use types::{ AbstractExecPayload, BeaconBlockRef, BeaconState, BeaconStateError, Checkpoint, Epoch, EthSpec, @@ -129,8 +129,8 @@ impl BalancesCache { /// Implements `fork_choice::ForkChoiceStore` in order to provide a persistent backing to the /// `fork_choice::ForkChoice` struct. #[derive(Debug, Educe)] -#[educe(PartialEq(bound(E: EthSpec, Hot: ItemStore, Cold: ItemStore)))] -pub struct BeaconForkChoiceStore, Cold: ItemStore> { +#[educe(PartialEq(bound(E: EthSpec, Hot: ItemStore, Cold: ColdStore)))] +pub struct BeaconForkChoiceStore, Cold: ColdStore> { #[educe(PartialEq(ignore))] store: Arc>, balances_cache: BalancesCache, @@ -151,7 +151,7 @@ impl BeaconForkChoiceStore where E: EthSpec, Hot: ItemStore, - Cold: ItemStore, + Cold: ColdStore, { /// Initialize `Self` from some `anchor` checkpoint which may or may not be the genesis state. /// @@ -268,7 +268,7 @@ impl ForkChoiceStore for BeaconForkChoiceStore where E: EthSpec, Hot: ItemStore, - Cold: ItemStore, + Cold: ColdStore, { type Error = Error; diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index d70561db9ba..1086759b831 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -40,7 +40,7 @@ use state_processing::per_slot_processing; use std::marker::PhantomData; use std::sync::Arc; use std::time::Duration; -use store::{Error as StoreError, HotColdDB, ItemStore, KeyValueStoreOp}; +use store::{ColdStore, DBColumnCold, Error as StoreError, HotColdDB, ItemStore, KeyValueStoreOp}; use task_executor::{ShutdownReason, TaskExecutor}; use tracing::{debug, error, info, warn}; use tree_hash::TreeHash; @@ -60,7 +60,7 @@ impl BeaconChainTypes for Witness where THotStore: ItemStore + 'static, - TColdStore: ItemStore + 'static, + TColdStore: ColdStore + 'static, TSlotClock: SlotClock + 'static, E: EthSpec + 'static, { @@ -115,7 +115,7 @@ impl BeaconChainBuilder> where THotStore: ItemStore + 'static, - TColdStore: ItemStore + 'static, + TColdStore: ColdStore + 'static, TSlotClock: SlotClock + 'static, E: EthSpec + 'static, { @@ -340,7 +340,7 @@ where .map_err(|e| format!("Failed to store genesis block: {:?}", e))?; store .store_frozen_block_root_at_skip_slots(Slot::new(0), Slot::new(1), beacon_block_root) - .and_then(|ops| store.cold_db.do_atomically(ops)) + .and_then(|ops| store.cold_db.put_batch(DBColumnCold::BlockRoots, ops)) .map_err(|e| format!("Failed to store genesis block root: {e:?}"))?; // Store the genesis block under the `ZERO_HASH` key. @@ -435,6 +435,21 @@ where .clone() .ok_or("weak_subjectivity_state requires a store")?; + // The static cold backend is append-only in ascending slot order. A + // checkpoint / weak-subjectivity start writes the anchor state in the + // middle of the chain and then backfills earlier slots, which the + // static format can't represent. Refuse the combination at startup + // rather than failing later with an out-of-order put. + if matches!( + store.get_config().cold_backend, + store::config::ColdBackendKind::Static + ) { + return Err("static cold backend only supports starting from genesis; \ + checkpoint sync and weak subjectivity sync require the kv \ + cold backend" + .to_string()); + } + // Ensure the state is advanced to an epoch boundary. let slots_per_epoch = E::slots_per_epoch(); if weak_subj_state.slot() % slots_per_epoch != 0 { @@ -558,7 +573,7 @@ where .map_err(|e| format!("Error writing frozen block roots: {e:?}"))?; store .cold_db - .do_atomically(block_root_batch) + .put_batch(DBColumnCold::BlockRoots, block_root_batch) .map_err(|e| format!("Error writing frozen block roots: {e:?}"))?; debug!( from = %weak_subj_block.slot(), @@ -1152,7 +1167,7 @@ impl BeaconChainBuilder> where THotStore: ItemStore + 'static, - TColdStore: ItemStore + 'static, + TColdStore: ColdStore + 'static, E: EthSpec + 'static, { /// Sets the `BeaconChain` slot clock to `TestingSlotClock`. diff --git a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs index 7d1bba2de98..ab9dd1af1f0 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs @@ -791,7 +791,10 @@ mod test { use fork_choice::PayloadVerificationStatus; use logging::create_test_tracing_subscriber; use state_processing::ConsensusContext; - use store::{HotColdDB, ItemStore, StoreConfig, database::interface::BeaconNodeBackend}; + use store::{ + ColdStore, HotColdDB, ItemStore, StoreConfig, + database::interface::{BeaconNodeBackend, ColdBackend}, + }; use tempfile::{TempDir, tempdir}; use tracing::info; use types::MinimalEthSpec; @@ -802,7 +805,7 @@ mod test { fn get_store_with_spec( db_path: &TempDir, spec: Arc, - ) -> Arc, BeaconNodeBackend>> { + ) -> Arc, ColdBackend>> { let hot_path = db_path.path().join("hot_db"); let cold_path = db_path.path().join("cold_db"); let blobs_path = db_path.path().join("blobs_db"); @@ -861,7 +864,7 @@ mod test { where E: EthSpec, Hot: ItemStore, - Cold: ItemStore, + Cold: ColdStore, { let chain = &harness.chain; let head = chain.head_snapshot(); @@ -947,7 +950,7 @@ mod test { E: EthSpec, T: BeaconChainTypes< HotStore = BeaconNodeBackend, - ColdStore = BeaconNodeBackend, + ColdStore = ColdBackend, EthSpec = E, >, { diff --git a/beacon_node/beacon_chain/src/historical_blocks.rs b/beacon_node/beacon_chain/src/historical_blocks.rs index bfda52558e4..bdad76abbb1 100644 --- a/beacon_node/beacon_chain/src/historical_blocks.rs +++ b/beacon_node/beacon_chain/src/historical_blocks.rs @@ -10,7 +10,10 @@ use std::borrow::Cow; use std::iter; use std::time::Duration; use store::metadata::DataColumnInfo; -use store::{AnchorInfo, BlobInfo, DBColumn, Error as StoreError, KeyValueStore, KeyValueStoreOp}; +use store::{ + AnchorInfo, BlobInfo, ColdStore, DBColumnCold, Error as StoreError, KeyValueStore, + KeyValueStoreOp, +}; use strum::IntoStaticStr; use tracing::{debug, debug_span, instrument}; use types::{Hash256, Slot}; @@ -108,7 +111,7 @@ impl BeaconChain { let mut new_oldest_data_column_slot = data_column_info.oldest_data_column_slot; let mut blob_batch = Vec::::new(); - let mut cold_batch = Vec::with_capacity(blocks_to_import.len()); + let mut cold_batch: Vec<(Slot, Vec)> = Vec::with_capacity(blocks_to_import.len()); let mut hot_batch = Vec::with_capacity(blocks_to_import.len()); let mut signed_blocks = Vec::with_capacity(blocks_to_import.len()); @@ -174,11 +177,7 @@ impl BeaconChain { // Store block roots, including at all skip slots in the freezer DB. for slot in (block.slot().as_u64()..prev_block_slot.as_u64()).rev() { debug!(%slot, ?block_root, "Storing frozen block to root mapping"); - cold_batch.push(KeyValueStoreOp::PutKeyValue( - DBColumn::BeaconBlockRoots, - slot.to_be_bytes().to_vec(), - block_root.as_slice().to_vec(), - )); + cold_batch.push((Slot::new(slot), block_root.as_slice().to_vec())); } prev_block_slot = block.slot(); @@ -191,11 +190,7 @@ impl BeaconChain { if expected_block_root == self.genesis_block_root { let genesis_slot = self.spec.genesis_slot; for slot in genesis_slot.as_u64()..prev_block_slot.as_u64() { - cold_batch.push(KeyValueStoreOp::PutKeyValue( - DBColumn::BeaconBlockRoots, - slot.to_be_bytes().to_vec(), - self.genesis_block_root.as_slice().to_vec(), - )); + cold_batch.push((Slot::new(slot), self.genesis_block_root.as_slice().to_vec())); } prev_block_slot = genesis_slot; expected_block_root = Hash256::zero(); @@ -261,7 +256,9 @@ impl BeaconChain { } { let _span = debug_span!("backfill_write_cold_db").entered(); - self.store.cold_db.do_atomically(cold_batch)?; + self.store + .cold_db + .put_batch(DBColumnCold::BlockRoots, cold_batch)?; } let mut anchor_and_blob_batch = Vec::with_capacity(3); diff --git a/beacon_node/beacon_chain/src/migrate.rs b/beacon_node/beacon_chain/src/migrate.rs index 3c17c1ebba4..f86ee3b7cda 100644 --- a/beacon_node/beacon_chain/src/migrate.rs +++ b/beacon_node/beacon_chain/src/migrate.rs @@ -7,7 +7,7 @@ use std::sync::{Arc, mpsc}; use std::thread; use std::time::{Duration, SystemTime, UNIX_EPOCH}; use store::hot_cold_store::{HotColdDBError, migrate_database}; -use store::{Error, ItemStore, Split, StoreOp}; +use store::{ColdStore, Error, ItemStore, Split, StoreOp}; pub use store::{HotColdDB, MemoryStore}; use tracing::{debug, error, info, warn}; use types::{BeaconState, BeaconStateHash, Checkpoint, Epoch, EthSpec, Hash256, Slot}; @@ -30,7 +30,7 @@ pub const DEFAULT_EPOCHS_PER_MIGRATION: u64 = 1; /// The background migrator runs a thread to perform pruning and migrate state from the hot /// to the cold database. -pub struct BackgroundMigrator, Cold: ItemStore> { +pub struct BackgroundMigrator, Cold: ColdStore> { db: Arc>, /// Record of when the last migration ran, for enforcing `epochs_per_migration`. prev_migration: Arc>, @@ -135,7 +135,7 @@ pub struct FinalizationNotification { pub prev_migration: Arc>, } -impl, Cold: ItemStore> BackgroundMigrator { +impl, Cold: ColdStore> BackgroundMigrator { /// Create a new `BackgroundMigrator` and spawn its thread if necessary. pub fn new(db: Arc>, config: MigratorConfig) -> Self { // Estimate last migration run from DB split slot. diff --git a/beacon_node/beacon_chain/src/persisted_custody.rs b/beacon_node/beacon_chain/src/persisted_custody.rs index ba221c67b5f..e13ab83319a 100644 --- a/beacon_node/beacon_chain/src/persisted_custody.rs +++ b/beacon_node/beacon_chain/src/persisted_custody.rs @@ -1,7 +1,7 @@ use crate::custody_context::CustodyContextSsz; use ssz::{Decode, Encode}; use std::sync::Arc; -use store::{DBColumn, Error as StoreError, HotColdDB, ItemStore, StoreItem}; +use store::{ColdStore, DBColumn, Error as StoreError, HotColdDB, ItemStore, StoreItem}; use types::{EthSpec, Hash256}; /// 32-byte key for accessing the `CustodyContext`. All zero because `CustodyContext` has its own column. @@ -9,7 +9,7 @@ pub const CUSTODY_DB_KEY: Hash256 = Hash256::ZERO; pub struct PersistedCustody(pub CustodyContextSsz); -pub fn load_custody_context, Cold: ItemStore>( +pub fn load_custody_context, Cold: ColdStore>( store: Arc>, ) -> Option { let res: Result, _> = @@ -22,7 +22,7 @@ pub fn load_custody_context, Cold: ItemStore>( } /// Attempt to persist the custody context object to `self.store`. -pub fn persist_custody_context, Cold: ItemStore>( +pub fn persist_custody_context, Cold: ColdStore>( store: Arc>, custody_context: CustodyContextSsz, ) -> Result<(), store::Error> { diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index ca55811a706..5cf36812d51 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -65,8 +65,8 @@ use std::str::FromStr; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, LazyLock}; use std::time::Duration; -use store::database::interface::BeaconNodeBackend; -use store::{HotColdDB, ItemStore, MemoryStore, config::StoreConfig}; +use store::database::interface::{BeaconNodeBackend, ColdBackend}; +use store::{ColdStore, HotColdDB, ItemStore, MemoryStore, config::StoreConfig}; use task_executor::TaskExecutor; use task_executor::{ShutdownReason, test_utils::TestRuntime}; use tracing::debug; @@ -124,7 +124,7 @@ pub fn get_kzg(spec: &ChainSpec) -> Arc { pub type BaseHarnessType = Witness; -pub type DiskHarnessType = BaseHarnessType, BeaconNodeBackend>; +pub type DiskHarnessType = BaseHarnessType, ColdBackend>; pub type EphemeralHarnessType = BaseHarnessType, MemoryStore>; pub type BoxedMutator = Box< @@ -350,7 +350,7 @@ impl Builder> { /// Disk store, start from genesis. pub fn fresh_disk_store( mut self, - store: Arc, BeaconNodeBackend>>, + store: Arc, ColdBackend>>, ) -> Self { let validator_keypairs = self .validator_keypairs @@ -384,7 +384,7 @@ impl Builder> { /// Disk store, resume. pub fn resumed_disk_store( mut self, - store: Arc, BeaconNodeBackend>>, + store: Arc, ColdBackend>>, ) -> Self { let mutator = move |builder: BeaconChainBuilder<_>| { builder @@ -400,7 +400,7 @@ impl Builder> where E: EthSpec, Hot: ItemStore, - Cold: ItemStore, + Cold: ColdStore, { pub fn new(eth_spec_instance: E) -> Self { let runtime = TestRuntime::default(); @@ -761,7 +761,7 @@ impl BeaconChainHarness> where E: EthSpec, Hot: ItemStore, - Cold: ItemStore, + Cold: ColdStore, { pub fn builder(eth_spec_instance: E) -> Builder> { create_test_tracing_subscriber(); diff --git a/beacon_node/beacon_chain/tests/op_verification.rs b/beacon_node/beacon_chain/tests/op_verification.rs index 2f97f10745e..df35b729883 100644 --- a/beacon_node/beacon_chain/tests/op_verification.rs +++ b/beacon_node/beacon_chain/tests/op_verification.rs @@ -15,7 +15,7 @@ use state_processing::per_block_processing::errors::{ }; use std::sync::{Arc, LazyLock}; use store::StoreConfig; -use store::database::interface::BeaconNodeBackend; +use store::database::interface::{BeaconNodeBackend, ColdBackend}; use tempfile::{TempDir, tempdir}; use types::*; @@ -27,7 +27,7 @@ static KEYPAIRS: LazyLock> = type E = MinimalEthSpec; type TestHarness = BeaconChainHarness>; -type HotColdDB = store::HotColdDB, BeaconNodeBackend>; +type HotColdDB = store::HotColdDB, ColdBackend>; fn get_store(db_path: &TempDir) -> Arc { let spec = Arc::new(test_spec::()); diff --git a/beacon_node/beacon_chain/tests/prepare_payload.rs b/beacon_node/beacon_chain/tests/prepare_payload.rs index 47dd1ef517e..66c9e4ba734 100644 --- a/beacon_node/beacon_chain/tests/prepare_payload.rs +++ b/beacon_node/beacon_chain/tests/prepare_payload.rs @@ -15,7 +15,7 @@ use state_processing::{ state_advance::complete_state_advance, }; use std::sync::{Arc, LazyLock}; -use store::database::interface::BeaconNodeBackend; +use store::database::interface::{BeaconNodeBackend, ColdBackend}; use store::{HotColdDB, StoreConfig}; use tempfile::{TempDir, tempdir}; use types::*; @@ -34,7 +34,7 @@ type TestHarness = BeaconChainHarness>; fn get_store( db_path: &TempDir, spec: Arc, -) -> Arc, BeaconNodeBackend>> { +) -> Arc, ColdBackend>> { let store_config = StoreConfig { prune_payloads: false, ..StoreConfig::default() @@ -46,7 +46,7 @@ fn get_store_generic( db_path: &TempDir, config: StoreConfig, spec: Arc, -) -> Arc, BeaconNodeBackend>> { +) -> Arc, ColdBackend>> { create_test_tracing_subscriber(); let hot_path = db_path.path().join("chain_db"); let cold_path = db_path.path().join("freezer_db"); @@ -64,7 +64,7 @@ fn get_store_generic( } fn get_harness( - store: Arc, BeaconNodeBackend>>, + store: Arc, ColdBackend>>, validator_count: usize, ) -> TestHarness { // Most tests expect to retain historic states, so we use this as the default. @@ -81,7 +81,7 @@ fn get_harness( } fn get_harness_generic( - store: Arc, BeaconNodeBackend>>, + store: Arc, ColdBackend>>, validator_count: usize, chain_config: ChainConfig, node_custody_type: NodeCustodyType, diff --git a/beacon_node/beacon_chain/tests/schema_stability.rs b/beacon_node/beacon_chain/tests/schema_stability.rs index 8200748ae6c..5c277802fdc 100644 --- a/beacon_node/beacon_chain/tests/schema_stability.rs +++ b/beacon_node/beacon_chain/tests/schema_stability.rs @@ -11,7 +11,7 @@ use ssz::Encode; use std::sync::{Arc, LazyLock}; use store::{ DBColumn, HotColdDB, StoreConfig, StoreItem, - database::interface::BeaconNodeBackend, + database::interface::{BeaconNodeBackend, ColdBackend}, hot_cold_store::Split, metadata::{DataColumnCustodyInfo, DataColumnInfo}, }; @@ -20,7 +20,7 @@ use tempfile::{TempDir, tempdir}; use types::{ChainSpec, Hash256, MainnetEthSpec, Slot}; type E = MainnetEthSpec; -type Store = Arc, BeaconNodeBackend>>; +type Store = Arc, ColdBackend>>; type TestHarness = BeaconChainHarness>; const VALIDATOR_COUNT: usize = 32; @@ -107,7 +107,7 @@ fn check_db_columns() { let expected_columns = vec![ "bma", "blk", "blb", "bdc", "bdi", "ste", "hsd", "hsn", "bsn", "bsd", "bss", "bs3", "bcs", "bst", "exp", "pay", "bch", "opo", "etc", "frk", "pkc", "brp", "bsx", "bsr", "bbx", "bbr", - "bhr", "brm", "dht", "cus", "otb", "bhs", "olc", "lcu", "scb", "scm", "dmy", + "bbs", "bhr", "brm", "dht", "cus", "otb", "bhs", "olc", "lcu", "scb", "scm", "dmy", ]; assert_eq!(expected_columns, current_columns); } diff --git a/beacon_node/beacon_chain/tests/store_tests.rs b/beacon_node/beacon_chain/tests/store_tests.rs index 1576092c814..4d0f7e8a6c7 100644 --- a/beacon_node/beacon_chain/tests/store_tests.rs +++ b/beacon_node/beacon_chain/tests/store_tests.rs @@ -43,10 +43,11 @@ use std::convert::TryInto; use std::str::FromStr; use std::sync::{Arc, LazyLock}; use std::time::Duration; -use store::database::interface::BeaconNodeBackend; -use store::metadata::{CURRENT_SCHEMA_VERSION, STATE_UPPER_LIMIT_NO_RETAIN, SchemaVersion}; +use store::database::interface::{BeaconNodeBackend, ColdBackend}; +use store::metadata::{CURRENT_SCHEMA_VERSION, SchemaVersion}; use store::{ BlobInfo, DBColumn, HotColdDB, StoreConfig, + config::ColdBackendKind, hdiff::HierarchyConfig, iter::{BlockRootsIterator, StateRootsIterator}, }; @@ -68,19 +69,30 @@ static KEYPAIRS: LazyLock> = type E = MinimalEthSpec; type TestHarness = BeaconChainHarness>; -fn get_store(db_path: &TempDir) -> Arc, BeaconNodeBackend>> { +fn get_store(db_path: &TempDir) -> Arc, ColdBackend>> { let store_config = StoreConfig { prune_payloads: false, + cold_backend: cold_backend_from_env(), ..StoreConfig::default() }; get_store_generic(db_path, store_config, test_spec::()) } +/// Pick the cold backend from `COLD_BACKEND=static|kv` so the same test suite +/// can be run against both backends without duplicating tests. Default is the +/// historical KV backend. +fn cold_backend_from_env() -> ColdBackendKind { + match std::env::var("COLD_BACKEND").as_deref() { + Ok("static") => ColdBackendKind::Static, + _ => ColdBackendKind::Kv, + } +} + fn get_store_generic( db_path: &TempDir, config: StoreConfig, spec: ChainSpec, -) -> Arc, BeaconNodeBackend>> { +) -> Arc, ColdBackend>> { create_test_tracing_subscriber(); let hot_path = db_path.path().join("chain_db"); let cold_path = db_path.path().join("freezer_db"); @@ -98,7 +110,7 @@ fn get_store_generic( } fn get_harness( - store: Arc, BeaconNodeBackend>>, + store: Arc, ColdBackend>>, validator_count: usize, ) -> TestHarness { // Most tests expect to retain historic states, so we use this as the default. @@ -115,7 +127,7 @@ fn get_harness( } fn get_harness_import_all_data_columns( - store: Arc, BeaconNodeBackend>>, + store: Arc, ColdBackend>>, validator_count: usize, ) -> TestHarness { // Most tests expect to retain historic states, so we use this as the default. @@ -133,7 +145,7 @@ fn get_harness_import_all_data_columns( } fn get_harness_generic( - store: Arc, BeaconNodeBackend>>, + store: Arc, ColdBackend>>, validator_count: usize, chain_config: ChainConfig, node_custody_type: NodeCustodyType, @@ -167,7 +179,7 @@ fn check_db_invariants(harness: &TestHarness) { } fn get_states_descendant_of_block( - store: &HotColdDB, BeaconNodeBackend>, + store: &HotColdDB, ColdBackend>, block_root: Hash256, ) -> Vec<(Hash256, Slot)> { let summaries = store.load_hot_state_summaries().unwrap(); @@ -3002,6 +3014,12 @@ async fn weak_subjectivity_sync_test( backfill_batch_size: Option, provide_blobs: bool, ) { + // Static cold backend rejects checkpoint+backfill at construction; nothing + // here would exercise it usefully under that mode. + if cold_backend_from_env() == ColdBackendKind::Static { + return; + } + // Build an initial chain on one harness, representing a synced node with full history. let num_final_blocks = E::slots_per_epoch() * 2; @@ -5018,75 +5036,6 @@ fn check_data_column_existence( } } -#[tokio::test] -async fn prune_historic_states() { - let num_blocks_produced = E::slots_per_epoch() * 5; - let db_path = tempdir().unwrap(); - let store = get_store(&db_path); - let harness = get_harness(store.clone(), LOW_VALIDATOR_COUNT); - let genesis_state_root = harness.chain.genesis_state_root; - - let genesis_state = harness - .chain - .get_state(&genesis_state_root, None, CACHE_STATE_IN_TESTS) - .unwrap() - .unwrap(); - - harness - .extend_chain( - num_blocks_produced as usize, - BlockStrategy::OnCanonicalHead, - AttestationStrategy::AllValidators, - ) - .await; - - // Check historical states are present. - let first_epoch_state_roots = harness - .chain - .forwards_iter_state_roots(Slot::new(0)) - .unwrap() - .take(E::slots_per_epoch() as usize) - .map(Result::unwrap) - .collect::>(); - for &(state_root, slot) in &first_epoch_state_roots { - assert!( - store - .get_state(&state_root, Some(slot), CACHE_STATE_IN_TESTS) - .unwrap() - .is_some() - ); - } - - store - .prune_historic_states(genesis_state_root, &genesis_state) - .unwrap(); - - // Check that anchor info is updated. - let anchor_info = store.get_anchor_info(); - assert_eq!(anchor_info.state_lower_limit, 0); - assert_eq!(anchor_info.state_upper_limit, STATE_UPPER_LIMIT_NO_RETAIN); - - // Ensure all epoch 0 states other than the genesis have been pruned. - for &(state_root, slot) in &first_epoch_state_roots { - assert_eq!( - store - .get_state(&state_root, Some(slot), CACHE_STATE_IN_TESTS) - .unwrap() - .is_some(), - slot == 0 - ); - } - - // Run for another two epochs. - let additional_blocks_produced = 2 * E::slots_per_epoch(); - harness - .extend_slots(additional_blocks_produced as usize) - .await; - - check_finalization(&harness, num_blocks_produced + additional_blocks_produced); - check_split_slot(&harness, store); -} - // Test the function `get_ancestor_state_root` for slots prior to the split where we only have // sparse summaries stored. #[tokio::test] @@ -5851,7 +5800,7 @@ async fn test_gloas_hot_state_hierarchy() { /// Check that the HotColdDB's split_slot is equal to the start slot of the last finalized epoch. fn check_split_slot( harness: &TestHarness, - store: Arc, BeaconNodeBackend>>, + store: Arc, ColdBackend>>, ) { let split_slot = store.get_split_slot(); assert_eq!( diff --git a/beacon_node/client/src/builder.rs b/beacon_node/client/src/builder.rs index 9dfb8304bc8..bf4467a2646 100644 --- a/beacon_node/client/src/builder.rs +++ b/beacon_node/client/src/builder.rs @@ -40,7 +40,7 @@ use std::path::{Path, PathBuf}; use std::sync::Arc; use std::time::Duration; use std::time::{SystemTime, UNIX_EPOCH}; -use store::database::interface::BeaconNodeBackend; +use store::database::interface::{BeaconNodeBackend, ColdBackend}; use timer::spawn_timer; use tracing::{debug, info, instrument, warn}; use types::data::compute_ordered_custody_column_indices; @@ -98,7 +98,7 @@ where TSlotClock: SlotClock + Clone + 'static, E: EthSpec + 'static, THotStore: ItemStore + 'static, - TColdStore: ItemStore + 'static, + TColdStore: store::ColdStore + 'static, { /// Instantiates a new, empty builder. /// @@ -811,7 +811,7 @@ where TSlotClock: SlotClock + Clone + 'static, E: EthSpec + 'static, THotStore: ItemStore + 'static, - TColdStore: ItemStore + 'static, + TColdStore: store::ColdStore + 'static, { /// Consumes the internal `BeaconChainBuilder`, attaching the resulting `BeaconChain` to self. #[instrument(skip_all)] @@ -842,8 +842,7 @@ where } } -impl - ClientBuilder, BeaconNodeBackend>> +impl ClientBuilder, ColdBackend>> where TSlotClock: SlotClock + 'static, E: EthSpec + 'static, @@ -885,7 +884,7 @@ impl ClientBuilder + 'static, - TColdStore: ItemStore + 'static, + TColdStore: store::ColdStore + 'static, { /// Specifies that the slot clock should read the time from the computers system clock. pub fn system_time_slot_clock(mut self) -> Result { diff --git a/beacon_node/network/src/persisted_dht.rs b/beacon_node/network/src/persisted_dht.rs index 113b3cdd323..f3195c7c21a 100644 --- a/beacon_node/network/src/persisted_dht.rs +++ b/beacon_node/network/src/persisted_dht.rs @@ -1,12 +1,12 @@ use lighthouse_network::Enr; use std::sync::Arc; -use store::{DBColumn, Error as StoreError, HotColdDB, ItemStore, StoreItem}; +use store::{ColdStore, DBColumn, Error as StoreError, HotColdDB, ItemStore, StoreItem}; use types::{EthSpec, Hash256}; /// 32-byte key for accessing the `DhtEnrs`. All zero because `DhtEnrs` has its own column. pub const DHT_DB_KEY: Hash256 = Hash256::ZERO; -pub fn load_dht, Cold: ItemStore>( +pub fn load_dht, Cold: ColdStore>( store: Arc>, ) -> Vec { // Load DHT from store @@ -20,7 +20,7 @@ pub fn load_dht, Cold: ItemStore>( } /// Attempt to persist the ENR's in the DHT to `self.store`. -pub fn persist_dht, Cold: ItemStore>( +pub fn persist_dht, Cold: ColdStore>( store: Arc>, enrs: Vec, ) -> Result<(), store::Error> { @@ -28,7 +28,7 @@ pub fn persist_dht, Cold: ItemStore>( } /// Attempts to clear any DHT entries. -pub fn clear_dht, Cold: ItemStore>( +pub fn clear_dht, Cold: ColdStore>( store: Arc>, ) -> Result<(), store::Error> { store.hot_db.delete::(&DHT_DB_KEY) diff --git a/beacon_node/src/cli.rs b/beacon_node/src/cli.rs index 51cda0fac3b..ad40c080f6e 100644 --- a/beacon_node/src/cli.rs +++ b/beacon_node/src/cli.rs @@ -1622,6 +1622,18 @@ pub fn cli_app() -> Command { .action(ArgAction::Set) .display_order(0) ) + .arg( + Arg::new("cold-backend") + .long("cold-backend") + .value_name("BACKEND") + .value_parser(store::config::ColdBackendKind::VARIANTS.to_vec()) + .help("Cold (freezer) DB backend. \"kv\" stores cold data in the \ + same KV as the hot DB. \"static\" stores cold data in \ + slot-keyed static files; only supported when starting \ + from genesis.") + .action(ArgAction::Set) + .display_order(0) + ) .arg( Arg::new("delay-block-publishing") .long("delay-block-publishing") diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index 8ba2c0f3214..31e0b59a6e2 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -418,6 +418,10 @@ pub fn get_config( client_config.store.backend = backend; } + if let Some(cold_backend) = clap_utils::parse_optional(cli_args, "cold-backend")? { + client_config.store.cold_backend = cold_backend; + } + if let Some(hierarchy_config) = clap_utils::parse_optional(cli_args, "hierarchy-exponents")? { client_config.store.hierarchy_config = hierarchy_config; } diff --git a/beacon_node/src/lib.rs b/beacon_node/src/lib.rs index e33da17e266..e1ab0c5a60e 100644 --- a/beacon_node/src/lib.rs +++ b/beacon_node/src/lib.rs @@ -14,13 +14,13 @@ use network_utils::enr_ext::peer_id_to_node_id; use slasher::{DatabaseBackendOverride, Slasher}; use std::ops::{Deref, DerefMut}; use std::sync::Arc; -use store::database::interface::BeaconNodeBackend; +use store::database::interface::{BeaconNodeBackend, ColdBackend}; use tracing::{info, warn}; use types::{ChainSpec, Epoch, EthSpec, ForkName}; /// A type-alias to the tighten the definition of a production-intended `Client`. pub type ProductionClient = - Client, BeaconNodeBackend>>; + Client, ColdBackend>>; /// The beacon node `Client` that is used in production. /// diff --git a/beacon_node/store/Cargo.toml b/beacon_node/store/Cargo.toml index 50028fe73ff..3e103c18636 100644 --- a/beacon_node/store/Cargo.toml +++ b/beacon_node/store/Cargo.toml @@ -27,6 +27,7 @@ redb = { version = "2.1.3", optional = true } safe_arith = { workspace = true } serde = { workspace = true } smallvec = { workspace = true } +snap = { workspace = true } ssz_types = { workspace = true } state_processing = { workspace = true } strum = { workspace = true } diff --git a/beacon_node/store/src/config.rs b/beacon_node/store/src/config.rs index 29705283fa9..1488f11ce53 100644 --- a/beacon_node/store/src/config.rs +++ b/beacon_node/store/src/config.rs @@ -55,6 +55,8 @@ pub struct StoreConfig { pub prune_payloads: bool, /// Database backend to use. pub backend: DatabaseBackend, + /// Which cold backend to use for the freezer DB. + pub cold_backend: ColdBackendKind, /// State diff hierarchy. pub hierarchy_config: HierarchyConfig, /// Whether to prune blobs older than the blob data availability boundary. @@ -100,6 +102,7 @@ pub enum StoreConfigError { }, ZeroEpochsPerBlobPrune, InvalidVersionByte(Option), + InvalidColdBackendByte(u8), } impl Default for StoreConfig { @@ -116,6 +119,7 @@ impl Default for StoreConfig { compact_on_prune: true, prune_payloads: true, backend: DEFAULT_BACKEND, + cold_backend: ColdBackendKind::default(), hierarchy_config: HierarchyConfig::default(), prune_blobs: true, epochs_per_blob_prune: DEFAULT_EPOCHS_PER_BLOB_PRUNE, @@ -276,3 +280,52 @@ pub enum DatabaseBackend { #[cfg(feature = "redb")] Redb, } + +/// Cold backend selector. +#[derive( + Debug, Default, Copy, Clone, PartialEq, Eq, Serialize, Deserialize, EnumString, VariantNames, +)] +#[strum(serialize_all = "lowercase")] +pub enum ColdBackendKind { + /// Cold data lives in the same KV backend as the hot DB. Default. + #[default] + Kv, + /// Cold data lives in slot-keyed static files. + Static, +} + +impl ColdBackendKind { + /// One-byte tag persisted under `COLD_BACKEND_KEY` in `BeaconMeta`. + /// Stable across builds — never reorder or reuse a value. + pub fn as_byte(self) -> u8 { + match self { + Self::Kv => 0, + Self::Static => 1, + } + } + + pub fn from_byte(byte: u8) -> Result { + match byte { + 0 => Ok(Self::Kv), + 1 => Ok(Self::Static), + other => Err(StoreConfigError::InvalidColdBackendByte(other)), + } + } +} + +impl StoreItem for ColdBackendKind { + fn db_column() -> DBColumn { + DBColumn::BeaconMeta + } + + fn as_store_bytes(&self) -> Vec { + vec![self.as_byte()] + } + + fn from_store_bytes(bytes: &[u8]) -> Result { + let &[byte] = bytes else { + return Err(StoreConfigError::InvalidColdBackendByte(0).into()); + }; + Ok(Self::from_byte(byte)?) + } +} diff --git a/beacon_node/store/src/database/interface.rs b/beacon_node/store/src/database/interface.rs index 5646f1179c8..4ce22df9af4 100644 --- a/beacon_node/store/src/database/interface.rs +++ b/beacon_node/store/src/database/interface.rs @@ -2,11 +2,16 @@ use crate::database::leveldb_impl; #[cfg(feature = "redb")] use crate::database::redb_impl; -use crate::{ColumnIter, ColumnKeyIter, DBColumn, Error, ItemStore, Key, KeyValueStore, metrics}; +use crate::{ + ColdStore, ColumnIter, ColumnKeyIter, DBColumn, DBColumnCold, DBColumnColdIndex, Error, + IndexIter, ItemStore, Key, KeyValueStore, SlotIter, StaticColdStore, metrics, +}; use crate::{KeyValueStoreOp, StoreConfig, config::DatabaseBackend}; +use ssz::{Decode, Encode}; use std::collections::HashSet; use std::path::Path; use types::EthSpec; +use types::{Hash256, Slot}; pub enum BeaconNodeBackend { #[cfg(feature = "leveldb")] @@ -17,6 +22,119 @@ pub enum BeaconNodeBackend { impl ItemStore for BeaconNodeBackend {} +/// Runtime selector for the cold backend. +/// +/// Held by the production `HotColdDB` so the cold strategy can be picked at +/// startup based on `StoreConfig`. `Kv` keeps the existing behaviour +/// (everything in the KV store); `Static` uses the slot-keyed file backend. +/// +/// The `Kv` arm inlines the byte-translation (slot/root → bytes) directly here +/// rather than going through an intermediate `impl ColdStore for BeaconNodeBackend` +/// — `BeaconNodeBackend` is only ever a `ColdStore` via this enum, so the +/// indirection isn't earning anything. +pub enum ColdBackend { + Kv(BeaconNodeBackend), + Static(StaticColdStore), +} + +impl ColdStore for ColdBackend { + fn get(&self, c: DBColumnCold, slot: Slot) -> Result>, Error> { + match self { + Self::Kv(db) => db.get_bytes(c.db_column(), &slot.as_u64().to_be_bytes()), + Self::Static(db) => ColdStore::::get(db, c, slot), + } + } + fn put_batch(&self, c: DBColumnCold, items: Vec<(Slot, Vec)>) -> Result<(), Error> { + match self { + Self::Kv(db) => { + let col = c.db_column(); + let ops = items + .into_iter() + .map(|(slot, value)| { + crate::KeyValueStoreOp::PutKeyValue( + col, + slot.as_u64().to_be_bytes().to_vec(), + value, + ) + }) + .collect(); + db.do_atomically(ops) + } + Self::Static(db) => ColdStore::::put_batch(db, c, items), + } + } + fn contains(&self, c: DBColumnCold, slot: Slot) -> Result { + match self { + Self::Kv(db) => db.key_exists(c.db_column(), &slot.as_u64().to_be_bytes()), + Self::Static(db) => ColdStore::::contains(db, c, slot), + } + } + fn iter_from(&self, c: DBColumnCold, from: Slot) -> SlotIter<'_> { + match self { + Self::Kv(db) => Box::new( + db.iter_column_from::>(c.db_column(), &from.as_u64().to_be_bytes()) + .map(|res| { + res.and_then(|(key_bytes, value)| { + let bytes: [u8; 8] = + key_bytes.try_into().map_err(|_| Error::InvalidBytes)?; + Ok((Slot::new(u64::from_be_bytes(bytes)), value)) + }) + }), + ), + Self::Static(db) => ColdStore::::iter_from(db, c, from), + } + } + // `Slot::as_ssz_bytes()` is byte-identical to the legacy + // `ColdStateSummary { slot }` wrapper so existing dbs round-trip without + // migration. Pinned by `ssz_compat_with_legacy_summary` in `lib.rs`. + fn get_index(&self, c: DBColumnColdIndex, root: Hash256) -> Result, Error> { + match self { + Self::Kv(db) => Ok(db + .get_bytes(c.db_column(), root.as_slice())? + .map(|bytes| Slot::from_ssz_bytes(&bytes)) + .transpose()?), + Self::Static(db) => ColdStore::::get_index(db, c, root), + } + } + fn put_index_batch( + &self, + c: DBColumnColdIndex, + items: Vec<(Hash256, Slot)>, + ) -> Result<(), Error> { + match self { + Self::Kv(db) => { + let col = c.db_column(); + let ops = items + .into_iter() + .map(|(root, slot)| { + crate::KeyValueStoreOp::PutKeyValue( + col, + root.as_slice().to_vec(), + slot.as_ssz_bytes(), + ) + }) + .collect(); + db.do_atomically(ops) + } + Self::Static(db) => ColdStore::::put_index_batch(db, c, items), + } + } + fn iter_index(&self, c: DBColumnColdIndex) -> IndexIter<'_> { + match self { + Self::Kv(db) => Box::new(db.iter_column::(c.db_column()).map(|res| { + res.and_then(|(root, value)| Ok((root, Slot::from_ssz_bytes(&value)?))) + })), + Self::Static(db) => ColdStore::::iter_index(db, c), + } + } + fn sync(&self) -> Result<(), Error> { + match self { + Self::Kv(db) => KeyValueStore::sync(db), + Self::Static(db) => ColdStore::::sync(db), + } + } +} + impl KeyValueStore for BeaconNodeBackend { fn get_bytes(&self, column: DBColumn, key: &[u8]) -> Result>, Error> { match self { diff --git a/beacon_node/store/src/errors.rs b/beacon_node/store/src/errors.rs index a07cc838863..68dc28998e3 100644 --- a/beacon_node/store/src/errors.rs +++ b/beacon_node/store/src/errors.rs @@ -1,5 +1,6 @@ use crate::config::StoreConfigError; use crate::hot_cold_store::{HotColdDBError, StateSummaryIteratorError}; +use crate::static_cold::StaticColdStoreError; use crate::{DBColumn, hdiff}; #[cfg(feature = "leveldb")] use leveldb::error::Error as LevelDBError; @@ -14,6 +15,7 @@ pub enum Error { SszDecodeError(DecodeError), BeaconStateError(BeaconStateError), HotColdDBError(HotColdDBError), + StaticColdStoreError(StaticColdStoreError), DBError { message: String, }, @@ -38,6 +40,21 @@ pub enum Error { MissingHistoricBlocks { oldest_block_slot: Slot, }, + /// State reconstruction is not supported with the static cold backend. + /// + /// The static-file backend is strict-ascending append-only, but online + /// reconstruction writes states at slots already below the high-water + /// mark. Per `specs/static-cold-backend.md`, a full node never becomes + /// archive by online reconstruction. + ReconstructionUnsupportedOnStaticCold, + /// The configured cold backend differs from the one persisted on disk. + /// + /// Switching cold backends in-place is unsupported because the on-disk + /// formats are incompatible. + ColdBackendMismatch { + on_disk: crate::config::ColdBackendKind, + configured: crate::config::ColdBackendKind, + }, /// State reconstruction failed because it didn't reach the upper limit slot. /// /// This should never happen (it's a logic error). @@ -129,6 +146,12 @@ impl From for Error { } } +impl From for Error { + fn from(e: StaticColdStoreError) -> Error { + Error::StaticColdStoreError(e) + } +} + impl From for Error { fn from(e: BeaconStateError) -> Error { Error::BeaconStateError(e) diff --git a/beacon_node/store/src/forwards_iter.rs b/beacon_node/store/src/forwards_iter.rs index 255b7d8eac8..6478e6695e0 100644 --- a/beacon_node/store/src/forwards_iter.rs +++ b/beacon_node/store/src/forwards_iter.rs @@ -1,6 +1,6 @@ use crate::errors::{Error, Result}; use crate::iter::{BlockRootsIterator, StateRootsIterator}; -use crate::{ColumnIter, DBColumn, HotColdDB, ItemStore}; +use crate::{ColdStore, DBColumn, DBColumnCold, HotColdDB, ItemStore, SlotIter}; use itertools::process_results; use std::marker::PhantomData; use types::{BeaconState, EthSpec, Hash256, Slot}; @@ -9,7 +9,7 @@ pub type HybridForwardsBlockRootsIterator<'a, E, Hot, Cold> = pub type HybridForwardsStateRootsIterator<'a, E, Hot, Cold> = HybridForwardsIterator<'a, E, Hot, Cold>; -impl, Cold: ItemStore> HotColdDB { +impl, Cold: ColdStore> HotColdDB { fn simple_forwards_iterator( &self, column: DBColumn, @@ -116,15 +116,15 @@ impl, Cold: ItemStore> HotColdDB } /// Forwards root iterator that makes use of a slot -> root mapping in the freezer DB. -pub struct FrozenForwardsIterator<'a, E: EthSpec, Hot: ItemStore, Cold: ItemStore> { - inner: ColumnIter<'a, Vec>, +pub struct FrozenForwardsIterator<'a, E: EthSpec, Hot: ItemStore, Cold: ColdStore> { + inner: SlotIter<'a>, column: DBColumn, next_slot: Slot, end_slot: Slot, _phantom: PhantomData<(E, Hot, Cold)>, } -impl<'a, E: EthSpec, Hot: ItemStore, Cold: ItemStore> +impl<'a, E: EthSpec, Hot: ItemStore, Cold: ColdStore> FrozenForwardsIterator<'a, E, Hot, Cold> { /// `end_slot` is EXCLUSIVE here. @@ -134,12 +134,13 @@ impl<'a, E: EthSpec, Hot: ItemStore, Cold: ItemStore> start_slot: Slot, end_slot: Slot, ) -> Result { - if column != DBColumn::BeaconBlockRoots && column != DBColumn::BeaconStateRoots { - return Err(Error::ForwardsIterInvalidColumn(column)); - } - let start = start_slot.as_u64().to_be_bytes(); + let cold_column = match column { + DBColumn::BeaconBlockRoots => DBColumnCold::BlockRoots, + DBColumn::BeaconStateRoots => DBColumnCold::StateRoots, + _ => return Err(Error::ForwardsIterInvalidColumn(column)), + }; Ok(Self { - inner: store.cold_db.iter_column_from(column, &start), + inner: store.cold_db.iter_from(cold_column, start_slot), column, next_slot: start_slot, end_slot, @@ -148,7 +149,7 @@ impl<'a, E: EthSpec, Hot: ItemStore, Cold: ItemStore> } } -impl, Cold: ItemStore> Iterator +impl, Cold: ColdStore> Iterator for FrozenForwardsIterator<'_, E, Hot, Cold> { type Item = Result<(Hash256, Slot)>; @@ -160,13 +161,7 @@ impl, Cold: ItemStore> Iterator self.inner .as_mut() .next()? - .and_then(|(slot_bytes, root_bytes)| { - let slot = slot_bytes - .clone() - .try_into() - .map(u64::from_be_bytes) - .map(Slot::new) - .map_err(|_| Error::InvalidBytes)?; + .and_then(|(slot, root_bytes)| { if root_bytes.len() != std::mem::size_of::() { return Err(Error::InvalidBytes); } @@ -199,7 +194,7 @@ impl Iterator for SimpleForwardsIterator { } /// Fusion of the above two approaches to forwards iteration. Fast and efficient. -pub enum HybridForwardsIterator<'a, E: EthSpec, Hot: ItemStore, Cold: ItemStore> { +pub enum HybridForwardsIterator<'a, E: EthSpec, Hot: ItemStore, Cold: ColdStore> { PreFinalization { iter: Box>, store: &'a HotColdDB, @@ -220,7 +215,7 @@ pub enum HybridForwardsIterator<'a, E: EthSpec, Hot: ItemStore, Cold: ItemSto Finished, } -impl<'a, E: EthSpec, Hot: ItemStore, Cold: ItemStore> +impl<'a, E: EthSpec, Hot: ItemStore, Cold: ColdStore> HybridForwardsIterator<'a, E, Hot, Cold> { /// Construct a new hybrid iterator. @@ -349,7 +344,7 @@ impl<'a, E: EthSpec, Hot: ItemStore, Cold: ItemStore> } } -impl, Cold: ItemStore> Iterator +impl, Cold: ColdStore> Iterator for HybridForwardsIterator<'_, E, Hot, Cold> { type Item = Result<(Hash256, Slot)>; diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index e9b9de76e61..3b4026aa32d 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -1,20 +1,21 @@ use crate::config::{OnDiskStoreConfig, StoreConfig}; -use crate::database::interface::BeaconNodeBackend; +use crate::database::interface::{BeaconNodeBackend, ColdBackend}; use crate::forwards_iter::{HybridForwardsBlockRootsIterator, HybridForwardsStateRootsIterator}; use crate::hdiff::{HDiff, HDiffBuffer, HierarchyConfig, HierarchyModuli, StorageStrategy}; use crate::historic_state_cache::HistoricStateCache; use crate::iter::{BlockRootsIterator, ParentRootBlockIterator, RootsIterator}; use crate::memory_store::MemoryStore; use crate::metadata::{ - ANCHOR_INFO_KEY, ANCHOR_UNINITIALIZED, AnchorInfo, BLOB_INFO_KEY, BlobInfo, + ANCHOR_INFO_KEY, ANCHOR_UNINITIALIZED, AnchorInfo, BLOB_INFO_KEY, BlobInfo, COLD_BACKEND_KEY, COMPACTION_TIMESTAMP_KEY, CONFIG_KEY, CURRENT_SCHEMA_VERSION, CompactionTimestamp, DATA_COLUMN_CUSTODY_INFO_KEY, DATA_COLUMN_INFO_KEY, DataColumnCustodyInfo, DataColumnInfo, SCHEMA_VERSION_KEY, SPLIT_KEY, STATE_UPPER_LIMIT_NO_RETAIN, SchemaVersion, }; use crate::state_cache::{PutStateOutcome, StateCache}; +use crate::static_cold::StaticColdStore; use crate::{ - BlobSidecarListFromRoot, DBColumn, DatabaseBlock, Error, ItemStore, KeyValueStoreOp, StoreItem, - StoreOp, get_data_column_key, + BlobSidecarListFromRoot, ColdStore, DBColumn, DBColumnCold, DBColumnColdIndex, DatabaseBlock, + Error, ItemStore, KeyValueStoreOp, StoreItem, StoreOp, get_data_column_key, metrics::{self, COLD_METRIC, HOT_METRIC}, parse_data_column_key, }; @@ -49,7 +50,7 @@ use zstd::{Decoder, Encoder}; /// Stores vector fields like the `block_roots` and `state_roots` separately, and only stores /// intermittent "restore point" states pre-finalization. #[derive(Debug)] -pub struct HotColdDB, Cold: ItemStore> { +pub struct HotColdDB, Cold: ColdStore> { /// The slot and state root at the point where the database is split between hot and cold. /// /// States with slots less than `split.slot` are in the cold DB, while states with slots @@ -66,7 +67,7 @@ pub struct HotColdDB, Cold: ItemStore> { /// Cold database containing compact historical data. pub cold_db: Cold, /// Database containing blobs. If None, store falls back to use `cold_db`. - pub blobs_db: Cold, + pub blobs_db: Hot, /// Hot database containing duplicated but quick-to-access recent data. /// /// The hot database also contains all blocks. @@ -88,6 +89,21 @@ pub struct HotColdDB, Cold: ItemStore> { _phantom: PhantomData, } +/// Pending cold-DB writes for a single commit unit (one state, one reconstruct +/// batch, etc.). +/// +/// Slot-keyed bulk data goes in `data`; the matching `BeaconColdStateSummary` +/// root index goes in `state_summary_index`. `commit_cold_batch` flushes them +/// in the correct order (data, sync, then index). +/// +/// Slots within `data` for any single column must arrive strictly ascending — +/// the static cold backend rejects out-of-order puts. +#[derive(Default)] +pub struct ColdBatch { + pub data: Vec<(DBColumnCold, Slot, Vec)>, + pub state_summary_index: Vec<(Hash256, Slot)>, +} + #[derive(Debug)] struct BlockCache { block_cache: LruCache>, @@ -189,7 +205,6 @@ pub enum HotColdDBError { MissingExecutionPayloadEnvelope(Hash256), MissingFullBlockExecutionPayloadPruned(Hash256, Slot), MissingAnchorInfo, - MissingFrozenBlockSlot(Hash256), MissingFrozenBlock(Slot), MissingPathToBlobsDatabase, BlobsPreviouslyInDefaultStore, @@ -258,7 +273,7 @@ impl HotColdDB, MemoryStore> { } } -impl HotColdDB, BeaconNodeBackend> { +impl HotColdDB, ColdBackend> { /// Open a new or existing database, with the given paths to the hot and cold DBs. /// /// The `migrate_schema` function is passed in so that the parent `BeaconChain` can provide @@ -282,13 +297,27 @@ impl HotColdDB, BeaconNodeBackend> { let anchor_info = RwLock::new(Self::load_anchor_info(&hot_db)?); debug!(?anchor_info, "Loaded anchor info"); + // Pin the cold backend kind to the directory before we touch it. + // Static and Kv layouts are incompatible on disk, so refuse to open + // a directory written by the other one. If no record exists yet + // (fresh DB), persist the configured kind so this check is + // load-bearing on every subsequent open. + Self::check_or_init_cold_backend_kind(&hot_db, config.cold_backend)?; + let db = HotColdDB { split: RwLock::new(Split::default()), anchor_info, blob_info: RwLock::new(BlobInfo::default()), data_column_info: RwLock::new(DataColumnInfo::default()), blobs_db: BeaconNodeBackend::open(&config, blobs_db_path)?, - cold_db: BeaconNodeBackend::open(&config, cold_path)?, + cold_db: match config.cold_backend { + crate::config::ColdBackendKind::Kv => { + ColdBackend::Kv(BeaconNodeBackend::open(&config, cold_path)?) + } + crate::config::ColdBackendKind::Static => { + ColdBackend::Static(StaticColdStore::open(cold_path, &config)?) + } + }, hot_db, block_cache: NonZeroUsize::new(config.block_cache_size) .map(BlockCache::new) @@ -451,7 +480,7 @@ impl HotColdDB, BeaconNodeBackend> { } } -impl, Cold: ItemStore> HotColdDB { +impl, Cold: ColdStore> HotColdDB { fn cold_storage_strategy(&self, slot: Slot) -> Result { // The start slot for the freezer HDiff is always 0 Ok(self.hierarchy.storage_strategy(slot, Slot::new(0))?) @@ -726,16 +755,34 @@ impl, Cold: ItemStore> HotColdDB /// /// This is useful for e.g. ignoring the slot-indicated fork to forcefully load a block as if it /// were for a different fork. + /// + /// Reads hot first, then falls back to the cold archive via the + /// `BlockSlot` index. Under KV cold the index is empty so the fallback + /// always returns None — behaviour is identical to a hot-only read. + /// Under Static cold (genesis-archive or era-import), blocks at slot < + /// split.slot live in cold only, and the fallback is the read path. pub fn get_block_with>( &self, block_root: &Hash256, decoder: impl FnOnce(&[u8]) -> Result, ssz::DecodeError>, ) -> Result>, Error> { - self.hot_db + if let Some(bytes) = self + .hot_db .get_bytes(DBColumn::BeaconBlock, block_root.as_slice())? - .map(|block_bytes| decoder(&block_bytes)) - .transpose() - .map_err(|e| e.into()) + { + return decoder(&bytes).map(Some).map_err(Into::into); + } + let Some(slot) = self + .cold_db + .get_index(DBColumnColdIndex::BlockSlot, *block_root)? + else { + return Ok(None); + }; + let bytes = self + .cold_db + .get(DBColumnCold::Block, slot)? + .ok_or(HotColdDBError::MissingFrozenBlock(slot))?; + decoder(&bytes).map(Some).map_err(Into::into) } pub fn get_payload_envelope( @@ -937,9 +984,23 @@ impl, Cold: ItemStore> HotColdDB } /// Determine whether a block exists in the database. + /// + /// Mirrors `get_block_with`: hot first, then cold via the `BlockSlot` + /// index. pub fn block_exists(&self, block_root: &Hash256) -> Result { - self.hot_db - .key_exists(DBColumn::BeaconBlock, block_root.as_slice()) + if self + .hot_db + .key_exists(DBColumn::BeaconBlock, block_root.as_slice())? + { + return Ok(true); + } + let Some(slot) = self + .cold_db + .get_index(DBColumnColdIndex::BlockSlot, *block_root)? + else { + return Ok(false); + }; + self.cold_db.contains(DBColumnCold::Block, slot) } /// Delete a block from the store and the block cache. @@ -1075,11 +1136,12 @@ impl, Cold: ItemStore> HotColdDB /// Store a state in the store. pub fn put_state(&self, state_root: &Hash256, state: &BeaconState) -> Result<(), Error> { - let mut ops: Vec = Vec::new(); if state.slot() < self.get_split_slot() { - self.store_cold_state(state_root, state, &mut ops)?; - self.cold_db.do_atomically(ops) + let mut batch = ColdBatch::default(); + self.store_cold_state(state_root, state, &mut batch)?; + self.commit_cold_batch(batch) } else { + let mut ops: Vec = Vec::new(); self.store_hot_state(state_root, state, &mut ops)?; self.hot_db.do_atomically(ops) } @@ -2075,16 +2137,42 @@ impl, Cold: ItemStore> HotColdDB ) } + /// Commit `batch` to the cold backend. Slot-keyed bulk data goes first, the + /// `BeaconColdStateSummary` root index lands after a sync, so a crash leaves + /// no dangling index entry. + pub fn commit_cold_batch(&self, batch: ColdBatch) -> Result<(), Error> { + self.commit_cold_data(batch.data)?; + self.cold_db.sync()?; + self.cold_db.put_index_batch( + DBColumnColdIndex::ColdStateSummary, + batch.state_summary_index, + ) + } + + /// Group slot-keyed `data` by column and write each column to the cold backend. + /// Used by callers (e.g. migration) that want to commit data per-iteration but + /// defer the matching index entries to end-of-batch. + pub fn commit_cold_data(&self, data: Vec<(DBColumnCold, Slot, Vec)>) -> Result<(), Error> { + let mut groups: HashMap)>> = HashMap::new(); + for (col, slot, value) in data { + groups.entry(col).or_default().push((slot, value)); + } + for (col, items) in groups { + self.cold_db.put_batch(col, items)?; + } + Ok(()) + } + pub fn store_cold_state_summary( &self, state_root: &Hash256, slot: Slot, - ops: &mut Vec, + batch: &mut ColdBatch, ) -> Result<(), Error> { - ops.push(ColdStateSummary { slot }.as_kv_store_op(*state_root)); - ops.push(KeyValueStoreOp::PutKeyValue( - DBColumn::BeaconStateRoots, - slot.as_u64().to_be_bytes().to_vec(), + batch.state_summary_index.push((*state_root, slot)); + batch.data.push(( + DBColumnCold::StateRoots, + slot, state_root.as_slice().to_vec(), )); Ok(()) @@ -2095,9 +2183,9 @@ impl, Cold: ItemStore> HotColdDB &self, state_root: &Hash256, state: &BeaconState, - ops: &mut Vec, + batch: &mut ColdBatch, ) -> Result<(), Error> { - self.store_cold_state_summary(state_root, state.slot(), ops)?; + self.store_cold_state_summary(state_root, state.slot(), batch)?; let slot = state.slot(); match self.cold_storage_strategy(slot)? { @@ -2116,7 +2204,7 @@ impl, Cold: ItemStore> HotColdDB %slot, "Storing cold state" ); - self.store_cold_state_as_snapshot(state, ops)?; + self.store_cold_state_as_snapshot(state, batch)?; } StorageStrategy::DiffFrom(from) => { debug!( @@ -2125,7 +2213,7 @@ impl, Cold: ItemStore> HotColdDB %slot, "Storing cold state" ); - self.store_cold_state_as_diff(state, from, ops)?; + self.store_cold_state_as_diff(state, from, batch)?; } } Ok(()) @@ -2134,7 +2222,7 @@ impl, Cold: ItemStore> HotColdDB pub fn store_cold_state_as_snapshot( &self, state: &BeaconState, - ops: &mut Vec, + batch: &mut ColdBatch, ) -> Result<(), Error> { let bytes = state.as_ssz_bytes(); let compressed_value = { @@ -2147,19 +2235,14 @@ impl, Cold: ItemStore> HotColdDB out }; - ops.push(KeyValueStoreOp::PutKeyValue( - DBColumn::BeaconStateSnapshot, - state.slot().as_u64().to_be_bytes().to_vec(), - compressed_value, - )); + batch + .data + .push((DBColumnCold::StateSnapshot, state.slot(), compressed_value)); Ok(()) } fn load_cold_state_bytes_as_snapshot(&self, slot: Slot) -> Result>, Error> { - match self - .cold_db - .get_bytes(DBColumn::BeaconStateSnapshot, &slot.as_u64().to_be_bytes())? - { + match self.cold_db.get(DBColumnCold::StateSnapshot, slot)? { Some(bytes) => { let _timer = metrics::start_timer(&metrics::STORE_BEACON_STATE_FREEZER_DECOMPRESS_TIME); @@ -2250,7 +2333,7 @@ impl, Cold: ItemStore> HotColdDB &self, state: &BeaconState, from_slot: Slot, - ops: &mut Vec, + batch: &mut ColdBatch, ) -> Result<(), Error> { // Load diff base state bytes. let (_, base_buffer) = { @@ -2273,11 +2356,9 @@ impl, Cold: ItemStore> HotColdDB diff_bytes.len() as f64, ); - ops.push(KeyValueStoreOp::PutKeyValue( - DBColumn::BeaconStateDiff, - state.slot().as_u64().to_be_bytes().to_vec(), - diff_bytes, - )); + batch + .data + .push((DBColumnCold::StateDiff, state.slot(), diff_bytes)); Ok(()) } @@ -2399,7 +2480,7 @@ impl, Cold: ItemStore> HotColdDB let bytes = { let _t = metrics::start_timer_vec(&metrics::BEACON_HDIFF_READ_TIME, COLD_METRIC); self.cold_db - .get_bytes(DBColumn::BeaconStateDiff, &slot.as_u64().to_be_bytes())? + .get(DBColumnCold::StateDiff, slot)? .ok_or(HotColdDBError::MissingHDiff(slot))? }; let hdiff = { @@ -3040,6 +3121,25 @@ impl, Cold: ItemStore> HotColdDB self.hot_db.put(&CONFIG_KEY, &self.config.as_disk_config()) } + /// Pin the cold backend kind to the DB's hot metadata on first open and + /// refuse subsequent opens that disagree. Static and Kv use incompatible + /// on-disk layouts in the cold path, so silently switching would leave + /// orphaned data behind and quietly corrupt new writes. + fn check_or_init_cold_backend_kind( + hot_db: &BeaconNodeBackend, + configured: crate::config::ColdBackendKind, + ) -> Result<(), Error> { + use crate::config::ColdBackendKind; + match hot_db.get::(&COLD_BACKEND_KEY)? { + Some(on_disk) if on_disk != configured => Err(Error::ColdBackendMismatch { + on_disk, + configured, + }), + Some(_) => Ok(()), + None => hot_db.put(&COLD_BACKEND_KEY, &configured), + } + } + /// Load the split point from disk, sans block root. fn load_split_partial(&self) -> Result, Error> { self.hot_db @@ -3071,10 +3171,8 @@ impl, Cold: ItemStore> HotColdDB /// Load a frozen state's slot, given its root. pub fn load_cold_state_slot(&self, state_root: &Hash256) -> Result, Error> { - Ok(self - .cold_db - .get(state_root)? - .map(|s: ColdStateSummary| s.slot)) + self.cold_db + .get_index(DBColumnColdIndex::ColdStateSummary, *state_root) } /// Load a hot state's summary, given its root. @@ -3113,23 +3211,6 @@ impl, Cold: ItemStore> HotColdDB Ok(()) } - /// Run a compaction pass on the freezer DB to free up space used by deleted states. - pub fn compact_freezer(&self) -> Result<(), Error> { - let columns = vec![ - DBColumn::BeaconColdStateSummary, - DBColumn::BeaconStateSnapshot, - DBColumn::BeaconStateDiff, - DBColumn::BeaconStateRoots, - ]; - - for column in columns { - info!(?column, "Starting compaction"); - self.cold_db.compact_column(column)?; - info!(?column, "Finishing compaction"); - } - Ok(()) - } - /// Return `true` if compaction on finalization/pruning is enabled. pub fn compact_on_prune(&self) -> bool { self.config.compact_on_prune @@ -3159,14 +3240,10 @@ impl, Cold: ItemStore> HotColdDB start_slot: Slot, end_slot: Slot, block_root: Hash256, - ) -> Result, Error> { + ) -> Result)>, Error> { let mut ops = vec![]; for slot in start_slot.as_u64()..end_slot.as_u64() { - ops.push(KeyValueStoreOp::PutKeyValue( - DBColumn::BeaconBlockRoots, - slot.to_be_bytes().to_vec(), - block_root.as_slice().to_vec(), - )); + ops.push((Slot::new(slot), block_root.as_slice().to_vec())); } Ok(ops) } @@ -3177,7 +3254,7 @@ impl, Cold: ItemStore> HotColdDB pub fn get_cold_block_root(&self, slot: Slot) -> Result, Error> { Ok(self .cold_db - .get_bytes(DBColumn::BeaconBlockRoots, &slot.as_u64().to_be_bytes())? + .get(DBColumnCold::BlockRoots, slot)? .map(|bytes| Hash256::from_ssz_bytes(&bytes)) .transpose()?) } @@ -3190,7 +3267,7 @@ impl, Cold: ItemStore> HotColdDB pub fn get_cold_state_root(&self, slot: Slot) -> Result, Error> { Ok(self .cold_db - .get_bytes(DBColumn::BeaconStateRoots, &slot.as_u64().to_be_bytes())? + .get(DBColumnCold::StateRoots, slot)? .map(|bytes| Hash256::from_ssz_bytes(&bytes)) .transpose()?) } @@ -3483,67 +3560,6 @@ impl, Cold: ItemStore> HotColdDB Ok(()) } - /// Delete *all* states from the freezer database and update the anchor accordingly. - /// - /// WARNING: this method deletes the genesis state and replaces it with the provided - /// `genesis_state`. This is to support its use in schema migrations where the storage scheme of - /// the genesis state may be modified. It is the responsibility of the caller to ensure that the - /// genesis state is correct, else a corrupt database will be created. - pub fn prune_historic_states( - &self, - genesis_state_root: Hash256, - genesis_state: &BeaconState, - ) -> Result<(), Error> { - // Update the anchor to use the dummy state upper limit and disable historic state storage. - let old_anchor = self.get_anchor_info(); - let new_anchor = AnchorInfo { - state_upper_limit: STATE_UPPER_LIMIT_NO_RETAIN, - state_lower_limit: Slot::new(0), - ..old_anchor.clone() - }; - - // Commit the anchor change immediately: if the cold database ops fail they can always be - // retried, and we can't do them atomically with this change anyway. - self.compare_and_set_anchor_info_with_write(old_anchor, new_anchor)?; - - // Stage freezer data for deletion. Do not bother loading and deserializing values as this - // wastes time and is less schema-agnostic. My hope is that this method will be useful for - // migrating to the tree-states schema (delete everything in the freezer then start afresh). - let mut cold_ops = vec![]; - - let columns = vec![ - DBColumn::BeaconColdStateSummary, - DBColumn::BeaconStateSnapshot, - DBColumn::BeaconStateDiff, - DBColumn::BeaconStateRoots, - ]; - - for column in columns { - for res in self.cold_db.iter_column_keys::>(column) { - let key = res?; - cold_ops.push(KeyValueStoreOp::DeleteKey(column, key)); - } - } - let delete_ops = cold_ops.len(); - - // If we just deleted the genesis state, re-store it using the current* schema. - if self.get_split_slot() > 0 { - info!( - state_root = ?genesis_state_root, - "Re-storing genesis state" - ); - self.store_cold_state(&genesis_state_root, genesis_state, &mut cold_ops)?; - } - - info!(delete_ops, "Deleting historic states"); - self.cold_db.do_atomically(cold_ops)?; - - // In order to reclaim space, we need to compact the freezer DB as well. - self.compact_freezer()?; - - Ok(()) - } - fn update_blob_or_data_column_info( &self, start_epoch: Epoch, @@ -3575,7 +3591,7 @@ impl, Cold: ItemStore> HotColdDB /// This function previously did a combination of freezer migration alongside pruning. Now it is /// *just* responsible for copying relevant data to the freezer, while pruning is implemented /// in `prune_hot_db`. -pub fn migrate_database, Cold: ItemStore>( +pub fn migrate_database, Cold: ColdStore>( store: Arc>, finalized_state_root: Hash256, finalized_block_root: Hash256, @@ -3607,7 +3623,29 @@ pub fn migrate_database, Cold: ItemStore>( return Err(HotColdDBError::FreezeSlotUnaligned(finalized_state.slot()).into()); } - let mut cold_db_block_ops = vec![]; + let mut cold_db_block_ops: Vec<(Slot, Vec)> = vec![]; + // Cold-DB root index for state summaries (state_root -> slot). + // Committed after the slot-keyed cold data so a crash leaves no dangling indices. + let mut cold_state_summary_index: Vec<(Hash256, Slot)> = vec![]; + + // The static cold backend is a self-sufficient archive — it owns the + // bulk historic-block bytes, not just the slot index. The KV cold + // backend keeps blocks in the hot DB indefinitely, so duplicating them + // would only waste space. + let move_blocks_to_static_cold = matches!( + store.get_config().cold_backend, + crate::config::ColdBackendKind::Static + ); + let mut cold_db_block_data: Vec<(Slot, Vec)> = vec![]; + // `block_root -> slot` index for blocks moved into the cold archive, + // committed after the slot-keyed bulk so a crash leaves no dangling index + // entry. Reads on hot miss go index -> bulk. + let mut cold_block_slot_index: Vec<(Hash256, Slot)> = vec![]; + // Hot block roots whose bytes are now durably in cold; deleted from hot + // after the cold index is committed. Hot keeps blocks at slot >= + // split.slot; cold owns slot < split.slot under Static. + let mut hot_block_delete_roots: Vec = vec![]; + let mut last_seen_block_root: Option = None; // Iterate in descending order until the current split slot let state_roots: Vec<_> = @@ -3617,27 +3655,48 @@ pub fn migrate_database, Cold: ItemStore>( })?; // Then, iterate states in slot ascending order, as they are stored wrt previous states. - for (block_root, state_root, slot) in state_roots.into_iter().rev() { + for (block_root, state_root, slot) in state_roots.iter().rev() { // Store the slot to block root mapping. - cold_db_block_ops.push(KeyValueStoreOp::PutKeyValue( - DBColumn::BeaconBlockRoots, - slot.as_u64().to_be_bytes().to_vec(), - block_root.as_slice().to_vec(), - )); + cold_db_block_ops.push((*slot, block_root.as_slice().to_vec())); + + // Move the block bytes into static cold at the slot they were + // proposed at. `RootsIterator` yields the same `block_root` for + // every skip slot until the next block — dedup by tracking the + // most recent root, then keep only the first occurrence whose + // iteration slot matches `block.slot()` (skip-slot continuations + // from the previous finalization window have `block.slot() < slot` + // and are already in cold from that earlier migration). + if move_blocks_to_static_cold && Some(*block_root) != last_seen_block_root { + last_seen_block_root = Some(*block_root); + if let Some(block_bytes) = store + .hot_db + .get_bytes(DBColumn::BeaconBlock, block_root.as_slice())? + { + let block = SignedBeaconBlock::>::from_ssz_bytes( + &block_bytes, + &store.spec, + )?; + if block.slot() == *slot { + cold_db_block_data.push((*slot, block_bytes)); + cold_block_slot_index.push((*block_root, *slot)); + hot_block_delete_roots.push(*block_root); + } + } + } // Do not try to store states if a restore point is yet to be stored, or will never be // stored (see `STATE_UPPER_LIMIT_NO_RETAIN`). Make an exception for the genesis state // which always needs to be copied from the hot DB to the freezer and should not be deleted. - if slot != 0 && slot < anchor_info.state_upper_limit { + if *slot != 0 && *slot < anchor_info.state_upper_limit { continue; } - let mut cold_db_state_ops = vec![]; + let mut batch = ColdBatch::default(); // Only store the cold state if it's on a diff boundary. // Calling `store_cold_state_summary` instead of `store_cold_state` for those allows us // to skip loading many hot states. - if let StorageStrategy::ReplayFrom(from) = store.cold_storage_strategy(slot)? { + if let StorageStrategy::ReplayFrom(from) = store.cold_storage_strategy(*slot)? { // Store slot -> state_root and state_root -> slot mappings. debug!( strategy = "replay", @@ -3645,20 +3704,22 @@ pub fn migrate_database, Cold: ItemStore>( %slot, "Storing cold state" ); - store.store_cold_state_summary(&state_root, slot, &mut cold_db_state_ops)?; + store.store_cold_state_summary(state_root, *slot, &mut batch)?; } else { // This is some state that we want to migrate to the freezer db. // There is no reason to cache this state. let state: BeaconState = store - .get_hot_state(&state_root, false)? - .ok_or(HotColdDBError::MissingStateToFreeze(state_root))?; + .get_hot_state(state_root, false)? + .ok_or(HotColdDBError::MissingStateToFreeze(*state_root))?; - store.store_cold_state(&state_root, &state, &mut cold_db_state_ops)?; + store.store_cold_state(state_root, &state, &mut batch)?; } // Cold states are diffed with respect to each other, so we need to finish writing previous - // states before storing new ones. - store.cold_db.do_atomically(cold_db_state_ops)?; + // slot-keyed cold data before staging new entries. Index entries accumulate to the end of + // the migration so all root indices land after every cold-bulk write is durable. + store.commit_cold_data(batch.data)?; + cold_state_summary_index.append(&mut batch.state_summary_index); } // Warning: Critical section. We have to take care not to put any of the two databases in an @@ -3669,8 +3730,36 @@ pub fn migrate_database, Cold: ItemStore>( // potentially re-doing the migration to copy data to the freezer, for consistency. If we crash // after writing all new block & state data to the freezer but before updating the split, then // in the worst case we will restart with the old split and re-run the migration. - store.cold_db.do_atomically(cold_db_block_ops)?; + // + // Slot-keyed cold data lands first; the BeaconColdStateSummary root index is committed after, + // so a mid-migration crash leaves cold data without dangling indices. + // + // TODO(static): a crash between `put_batch(BlockRoots, …)` and + // `put_index_batch(ColdStateSummary, …)` leaves BlockRoots committed for + // the new range but ColdStateSummary missing. The next restart re-runs the + // migration, which re-derives the summary, but until then invariant 11 + // (`check_cold_state_root_indices`) will fire transiently. KV mode has the + // same window. Worth reviewing whether the index could move to per-iter + // commit alongside the slot-keyed data, or whether the invariant should + // be relaxed during the `split.slot < latest finalized` window. + store + .cold_db + .put_batch(DBColumnCold::BlockRoots, cold_db_block_ops)?; + if !cold_db_block_data.is_empty() { + store + .cold_db + .put_batch(DBColumnCold::Block, cold_db_block_data)?; + } store.cold_db.sync()?; + store.cold_db.put_index_batch( + DBColumnColdIndex::ColdStateSummary, + cold_state_summary_index, + )?; + if !cold_block_slot_index.is_empty() { + store + .cold_db + .put_index_batch(DBColumnColdIndex::BlockSlot, cold_block_slot_index)?; + } let new_split = { let mut split_guard = store.split.write(); let latest_split = *split_guard; @@ -3707,6 +3796,20 @@ pub fn migrate_database, Cold: ItemStore>( new_split }; + // Reclaim hot-DB space for blocks that are now durably in cold. Run AFTER + // the split commit: if we crash here, the next `get_block_with` for one + // of these roots will hit the cold fallback (BlockSlot index then Block + // bulk) and find them. A crash *before* this point is also safe — hot + // still has the bytes, and the next migration's idempotent re-puts cover + // any partial cold state. + if !hot_block_delete_roots.is_empty() { + let hot_delete_ops: Vec = hot_block_delete_roots + .into_iter() + .map(|root| KeyValueStoreOp::DeleteKey(DBColumn::BeaconBlock, root.as_slice().to_vec())) + .collect(); + store.hot_db.do_atomically(hot_delete_ops)?; + } + // Update the cache's view of the finalized state. store.update_finalized_state( finalized_state_root, @@ -3786,7 +3889,7 @@ pub enum StateSummaryIteratorError { /// Return the ancestor state root of a state beyond SlotsPerHistoricalRoot using the roots iterator /// and the store -pub fn get_ancestor_state_root<'a, E: EthSpec, Hot: ItemStore, Cold: ItemStore>( +pub fn get_ancestor_state_root<'a, E: EthSpec, Hot: ItemStore, Cold: ColdStore>( store: &'a HotColdDB, from_state: &'a BeaconState, target_slot: Slot, @@ -3993,7 +4096,7 @@ impl StoreItem for HotStateSummary { impl HotStateSummary { /// Construct a new summary of the given state. - pub fn new, Cold: ItemStore>( + pub fn new, Cold: ColdStore>( store: &HotColdDB, state_root: Hash256, state: &BeaconState, @@ -4041,26 +4144,6 @@ impl HotStateSummary { } } -/// Struct for summarising a state in the freezer database. -#[derive(Debug, Clone, Copy, Default, Encode, Decode)] -pub(crate) struct ColdStateSummary { - pub slot: Slot, -} - -impl StoreItem for ColdStateSummary { - fn db_column() -> DBColumn { - DBColumn::BeaconColdStateSummary - } - - fn as_store_bytes(&self) -> Vec { - self.as_ssz_bytes() - } - - fn from_store_bytes(bytes: &[u8]) -> Result { - Ok(Self::from_ssz_bytes(bytes)?) - } -} - #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] pub struct BytesKey { pub key: Vec, diff --git a/beacon_node/store/src/invariants.rs b/beacon_node/store/src/invariants.rs index d251fb8800a..fe2e3e802ca 100644 --- a/beacon_node/store/src/invariants.rs +++ b/beacon_node/store/src/invariants.rs @@ -6,8 +6,8 @@ //! See the `check_invariants` and `check_database_invariants` methods for the full list. use crate::hdiff::StorageStrategy; -use crate::hot_cold_store::{ColdStateSummary, HotStateSummary}; -use crate::{DBColumn, Error, ItemStore}; +use crate::hot_cold_store::HotStateSummary; +use crate::{ColdStore, DBColumn, DBColumnCold, DBColumnColdIndex, Error, ItemStore}; use crate::{HotColdDB, Split}; use serde::Serialize; use ssz::Decode; @@ -242,7 +242,7 @@ pub enum InvariantViolation { ColdStateBaseSummaryMissing { slot: Slot, base_slot: Slot }, } -impl, Cold: ItemStore> HotColdDB { +impl, Cold: ColdStore> HotColdDB { /// Run all database invariant checks. /// /// The `ctx` parameter provides data from the beacon chain layer (fork choice, state cache, @@ -557,6 +557,13 @@ impl, Cold: ItemStore> HotColdDB Ok(result) } + // TODO(static): re-walk invariants 10/11/12 under the static cold backend. + // The text-form preconditions and the "block in hot_db" check (#10) were + // written against the KV-cold world where finalized blocks live in hot DB + // forever. Under static cold, archived blocks may live elsewhere (TBD per + // TODO-static-block-storage.md item 2), and `cold_db.iter_index` over a + // sparse static column is O(highest - from). Confirm or update each. + /// Invariant 10 (Cold DB): Block root indices. /// /// ```text @@ -581,10 +588,7 @@ impl, Cold: ItemStore> HotColdDB for slot_val in anchor_info.oldest_block_slot.as_u64()..split.slot.as_u64() { let slot = Slot::new(slot_val); - let slot_bytes = slot_val.to_be_bytes(); - let block_root_bytes = self - .cold_db - .get_bytes(DBColumn::BeaconBlockRoots, &slot_bytes)?; + let block_root_bytes = self.cold_db.get(DBColumnCold::BlockRoots, slot)?; let Some(root_bytes) = block_root_bytes else { result.add_violation(InvariantViolation::ColdBlockRootMissing { @@ -603,10 +607,11 @@ impl, Cold: ItemStore> HotColdDB } let block_root = Hash256::from_slice(&root_bytes); - let block_exists = self - .hot_db - .key_exists(DBColumn::BeaconBlock, block_root.as_slice())?; - if !block_exists { + // Check both hot and (for Static cold) the cold archive via the + // BlockSlot index — under Static cold, finalized canonical + // blocks are deleted from hot once they're durable in cold, so a + // hot-only check would flag every migrated slot as an orphan. + if !self.block_exists(&block_root)? { result.add_violation(InvariantViolation::ColdBlockRootOrphan { slot, block_root }); } } @@ -635,11 +640,7 @@ impl, Cold: ItemStore> HotColdDB if slot <= anchor_info.state_lower_limit || slot >= cmp::min(split.slot, anchor_info.state_upper_limit) { - let slot_bytes = slot_val.to_be_bytes(); - let Some(root_bytes) = self - .cold_db - .get_bytes(DBColumn::BeaconStateRoots, &slot_bytes)? - else { + let Some(root_bytes) = self.cold_db.get(DBColumnCold::StateRoots, slot)? else { result.add_violation(InvariantViolation::ColdStateRootMissing { slot, state_lower_limit: anchor_info.state_lower_limit, @@ -660,7 +661,7 @@ impl, Cold: ItemStore> HotColdDB match self .cold_db - .get_bytes(DBColumn::BeaconColdStateSummary, state_root.as_slice())? + .get_index(DBColumnColdIndex::ColdStateSummary, state_root)? { None => { result.add_violation(InvariantViolation::ColdStateRootMissingSummary { @@ -668,13 +669,12 @@ impl, Cold: ItemStore> HotColdDB state_root, }); } - Some(summary_bytes) => { - let summary = ColdStateSummary::from_ssz_bytes(&summary_bytes)?; - if summary.slot != slot { + Some(summary_slot) => { + if summary_slot != slot { result.add_violation(InvariantViolation::ColdStateRootSlotMismatch { slot, state_root, - summary_slot: summary.slot, + summary_slot, }); } } @@ -701,46 +701,40 @@ impl, Cold: ItemStore> HotColdDB let mut summary_slots = HashSet::new(); let mut base_slot_refs = Vec::new(); - for res in self - .cold_db - .iter_column::(DBColumn::BeaconColdStateSummary) - { - let (state_root, value) = res?; - let summary = ColdStateSummary::from_ssz_bytes(&value)?; - - summary_slots.insert(summary.slot); + for res in self.cold_db.iter_index(DBColumnColdIndex::ColdStateSummary) { + let (state_root, summary_slot) = res?; - let slot_bytes = summary.slot.as_u64().to_be_bytes(); + summary_slots.insert(summary_slot); match self .hierarchy - .storage_strategy(summary.slot, Slot::new(0))? + .storage_strategy(summary_slot, Slot::new(0))? { StorageStrategy::Snapshot => { let has_snapshot = self .cold_db - .key_exists(DBColumn::BeaconStateSnapshot, &slot_bytes)?; + .contains(DBColumnCold::StateSnapshot, summary_slot)?; if !has_snapshot { result.add_violation(InvariantViolation::ColdStateMissingSnapshot { state_root, - slot: summary.slot, + slot: summary_slot, }); } } StorageStrategy::DiffFrom(base_slot) => { let has_diff = self .cold_db - .key_exists(DBColumn::BeaconStateDiff, &slot_bytes)?; + .contains(DBColumnCold::StateDiff, summary_slot)?; if !has_diff { result.add_violation(InvariantViolation::ColdStateMissingDiff { state_root, - slot: summary.slot, + slot: summary_slot, }); } - base_slot_refs.push((summary.slot, base_slot)); + base_slot_refs.push((summary_slot, base_slot)); } StorageStrategy::ReplayFrom(base_slot) => { - base_slot_refs.push((summary.slot, base_slot)); + base_slot_refs.push((summary_slot, base_slot)); } } } diff --git a/beacon_node/store/src/iter.rs b/beacon_node/store/src/iter.rs index 0cb803d1ed7..07aad13d21c 100644 --- a/beacon_node/store/src/iter.rs +++ b/beacon_node/store/src/iter.rs @@ -1,5 +1,5 @@ use crate::errors::HandleUnavailable; -use crate::{Error, HotColdDB, ItemStore}; +use crate::{ColdStore, Error, HotColdDB, ItemStore}; use std::borrow::Cow; use std::marker::PhantomData; use typenum::Unsigned; @@ -13,12 +13,12 @@ use types::{ /// /// It is assumed that all ancestors for this object are stored in the database. If this is not the /// case, the iterator will start returning `None` prior to genesis. -pub trait AncestorIter<'a, E: EthSpec, Hot: ItemStore, Cold: ItemStore, I: Iterator> { +pub trait AncestorIter<'a, E: EthSpec, Hot: ItemStore, Cold: ColdStore, I: Iterator> { /// Returns an iterator over the roots of the ancestors of `self`. fn try_iter_ancestor_roots(&self, store: &'a HotColdDB) -> Option; } -impl<'a, E: EthSpec, Hot: ItemStore, Cold: ItemStore> +impl<'a, E: EthSpec, Hot: ItemStore, Cold: ColdStore> AncestorIter<'a, E, Hot, Cold, BlockRootsIterator<'a, E, Hot, Cold>> for SignedBeaconBlock { /// Iterates across all available prior block roots of `self`, starting at the most recent and ending @@ -37,7 +37,7 @@ impl<'a, E: EthSpec, Hot: ItemStore, Cold: ItemStore> } } -impl<'a, E: EthSpec, Hot: ItemStore, Cold: ItemStore> +impl<'a, E: EthSpec, Hot: ItemStore, Cold: ColdStore> AncestorIter<'a, E, Hot, Cold, StateRootsIterator<'a, E, Hot, Cold>> for BeaconState { /// Iterates across all available prior state roots of `self`, starting at the most recent and ending @@ -51,11 +51,11 @@ impl<'a, E: EthSpec, Hot: ItemStore, Cold: ItemStore> } } -pub struct StateRootsIterator<'a, E: EthSpec, Hot: ItemStore, Cold: ItemStore> { +pub struct StateRootsIterator<'a, E: EthSpec, Hot: ItemStore, Cold: ColdStore> { inner: RootsIterator<'a, E, Hot, Cold>, } -impl, Cold: ItemStore> Clone +impl, Cold: ColdStore> Clone for StateRootsIterator<'_, E, Hot, Cold> { fn clone(&self) -> Self { @@ -65,7 +65,7 @@ impl, Cold: ItemStore> Clone } } -impl<'a, E: EthSpec, Hot: ItemStore, Cold: ItemStore> StateRootsIterator<'a, E, Hot, Cold> { +impl<'a, E: EthSpec, Hot: ItemStore, Cold: ColdStore> StateRootsIterator<'a, E, Hot, Cold> { pub fn new(store: &'a HotColdDB, beacon_state: &'a BeaconState) -> Self { Self { inner: RootsIterator::new(store, beacon_state), @@ -79,7 +79,7 @@ impl<'a, E: EthSpec, Hot: ItemStore, Cold: ItemStore> StateRootsIterator<' } } -impl, Cold: ItemStore> Iterator +impl, Cold: ColdStore> Iterator for StateRootsIterator<'_, E, Hot, Cold> { type Item = Result<(Hash256, Slot), Error>; @@ -99,11 +99,11 @@ impl, Cold: ItemStore> Iterator /// exhausted. /// /// Returns `None` for roots prior to genesis or when there is an error reading from `Store`. -pub struct BlockRootsIterator<'a, E: EthSpec, Hot: ItemStore, Cold: ItemStore> { +pub struct BlockRootsIterator<'a, E: EthSpec, Hot: ItemStore, Cold: ColdStore> { inner: RootsIterator<'a, E, Hot, Cold>, } -impl, Cold: ItemStore> Clone +impl, Cold: ColdStore> Clone for BlockRootsIterator<'_, E, Hot, Cold> { fn clone(&self) -> Self { @@ -113,7 +113,7 @@ impl, Cold: ItemStore> Clone } } -impl<'a, E: EthSpec, Hot: ItemStore, Cold: ItemStore> BlockRootsIterator<'a, E, Hot, Cold> { +impl<'a, E: EthSpec, Hot: ItemStore, Cold: ColdStore> BlockRootsIterator<'a, E, Hot, Cold> { /// Create a new iterator over all block roots in the given `beacon_state` and prior states. pub fn new(store: &'a HotColdDB, beacon_state: &'a BeaconState) -> Self { Self { @@ -138,7 +138,7 @@ impl<'a, E: EthSpec, Hot: ItemStore, Cold: ItemStore> BlockRootsIterator<' } } -impl, Cold: ItemStore> Iterator +impl, Cold: ColdStore> Iterator for BlockRootsIterator<'_, E, Hot, Cold> { type Item = Result<(Hash256, Slot), Error>; @@ -151,13 +151,13 @@ impl, Cold: ItemStore> Iterator } /// Iterator over state and block roots that backtracks using the vectors from a `BeaconState`. -pub struct RootsIterator<'a, E: EthSpec, Hot: ItemStore, Cold: ItemStore> { +pub struct RootsIterator<'a, E: EthSpec, Hot: ItemStore, Cold: ColdStore> { store: &'a HotColdDB, beacon_state: Cow<'a, BeaconState>, slot: Slot, } -impl, Cold: ItemStore> Clone for RootsIterator<'_, E, Hot, Cold> { +impl, Cold: ColdStore> Clone for RootsIterator<'_, E, Hot, Cold> { fn clone(&self) -> Self { Self { store: self.store, @@ -167,7 +167,7 @@ impl, Cold: ItemStore> Clone for RootsIterator< } } -impl<'a, E: EthSpec, Hot: ItemStore, Cold: ItemStore> RootsIterator<'a, E, Hot, Cold> { +impl<'a, E: EthSpec, Hot: ItemStore, Cold: ColdStore> RootsIterator<'a, E, Hot, Cold> { pub fn new(store: &'a HotColdDB, beacon_state: &'a BeaconState) -> Self { Self { store, @@ -234,7 +234,7 @@ impl<'a, E: EthSpec, Hot: ItemStore, Cold: ItemStore> RootsIterator<'a, E, } } -impl, Cold: ItemStore> Iterator +impl, Cold: ColdStore> Iterator for RootsIterator<'_, E, Hot, Cold> { /// (block_root, state_root, slot) @@ -246,13 +246,13 @@ impl, Cold: ItemStore> Iterator } /// Block iterator that uses the `parent_root` of each block to backtrack. -pub struct ParentRootBlockIterator<'a, E: EthSpec, Hot: ItemStore, Cold: ItemStore> { +pub struct ParentRootBlockIterator<'a, E: EthSpec, Hot: ItemStore, Cold: ColdStore> { store: &'a HotColdDB, next_block_root: Hash256, _phantom: PhantomData, } -impl<'a, E: EthSpec, Hot: ItemStore, Cold: ItemStore> +impl<'a, E: EthSpec, Hot: ItemStore, Cold: ColdStore> ParentRootBlockIterator<'a, E, Hot, Cold> { pub fn new(store: &'a HotColdDB, start_block_root: Hash256) -> Self { @@ -283,7 +283,7 @@ impl<'a, E: EthSpec, Hot: ItemStore, Cold: ItemStore> } } -impl, Cold: ItemStore> Iterator +impl, Cold: ColdStore> Iterator for ParentRootBlockIterator<'_, E, Hot, Cold> { type Item = Result<(Hash256, SignedBeaconBlock>), Error>; @@ -295,11 +295,11 @@ impl, Cold: ItemStore> Iterator #[derive(Clone)] /// Extends `BlockRootsIterator`, returning `SignedBeaconBlock` instances, instead of their roots. -pub struct BlockIterator<'a, E: EthSpec, Hot: ItemStore, Cold: ItemStore> { +pub struct BlockIterator<'a, E: EthSpec, Hot: ItemStore, Cold: ColdStore> { roots: BlockRootsIterator<'a, E, Hot, Cold>, } -impl<'a, E: EthSpec, Hot: ItemStore, Cold: ItemStore> BlockIterator<'a, E, Hot, Cold> { +impl<'a, E: EthSpec, Hot: ItemStore, Cold: ColdStore> BlockIterator<'a, E, Hot, Cold> { /// Create a new iterator over all blocks in the given `beacon_state` and prior states. pub fn new(store: &'a HotColdDB, beacon_state: &'a BeaconState) -> Self { Self { @@ -324,7 +324,7 @@ impl<'a, E: EthSpec, Hot: ItemStore, Cold: ItemStore> BlockIterator<'a, E, } } -impl, Cold: ItemStore> Iterator +impl, Cold: ColdStore> Iterator for BlockIterator<'_, E, Hot, Cold> { type Item = Result>, Error>; @@ -338,7 +338,7 @@ impl, Cold: ItemStore> Iterator /// /// Return `Err(HistoryUnavailable)` in the case where no more backtrack states are available /// due to weak subjectivity sync. -fn next_historical_root_backtrack_state, Cold: ItemStore>( +fn next_historical_root_backtrack_state, Cold: ColdStore>( store: &HotColdDB, current_state: &BeaconState, ) -> Result, Error> { diff --git a/beacon_node/store/src/lib.rs b/beacon_node/store/src/lib.rs index bd8caa3ad5b..61b49a91dc8 100644 --- a/beacon_node/store/src/lib.rs +++ b/beacon_node/store/src/lib.rs @@ -21,6 +21,7 @@ pub mod metadata; pub mod metrics; pub mod reconstruct; pub mod state_cache; +pub mod static_cold; pub mod database; pub mod iter; @@ -29,6 +30,7 @@ pub use self::blob_sidecar_list_from_root::BlobSidecarListFromRoot; pub use self::config::StoreConfig; pub use self::hot_cold_store::{HotColdDB, HotStateSummary, Split}; pub use self::memory_store::MemoryStore; +pub use self::static_cold::StaticColdStore; pub use crate::metadata::BlobInfo; pub use errors::Error; pub use metadata::AnchorInfo; @@ -105,6 +107,92 @@ pub trait KeyValueStore: Sync + Send + Sized + 'static { ) -> Result<(), Error>; } +pub type SlotIter<'a> = Box), Error>> + 'a>; +pub type IndexIter<'a> = Box> + 'a>; + +/// Slot-keyed cold columns served by the static archive. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EnumIter)] +pub enum DBColumnCold { + Block, + BlockRoots, + StateRoots, + StateSnapshot, + StateDiff, +} + +impl DBColumnCold { + pub fn db_column(self) -> DBColumn { + match self { + Self::Block => DBColumn::BeaconBlock, + Self::BlockRoots => DBColumn::BeaconBlockRoots, + Self::StateRoots => DBColumn::BeaconStateRoots, + Self::StateSnapshot => DBColumn::BeaconStateSnapshot, + Self::StateDiff => DBColumn::BeaconStateDiff, + } + } + + pub fn try_from_db_column(column: DBColumn) -> Option { + match column { + DBColumn::BeaconBlock => Some(Self::Block), + DBColumn::BeaconBlockRoots => Some(Self::BlockRoots), + DBColumn::BeaconStateRoots => Some(Self::StateRoots), + DBColumn::BeaconStateSnapshot => Some(Self::StateSnapshot), + DBColumn::BeaconStateDiff => Some(Self::StateDiff), + _ => None, + } + } +} + +/// Root-keyed indices owned by the cold backend. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum DBColumnColdIndex { + /// `block_root -> slot` for blocks moved into the cold archive. Populated + /// by `migrate_database` (Static cold) and era-file import. Empty under + /// KV cold. Consulted by `HotColdDB::get_block_with` to resolve root-keyed + /// reads against the slot-keyed cold archive. + BlockSlot, + /// `state_root -> slot` for cold state summaries. + ColdStateSummary, +} + +impl DBColumnColdIndex { + pub fn db_column(self) -> DBColumn { + match self { + Self::BlockSlot => DBColumn::BeaconBlockSlot, + Self::ColdStateSummary => DBColumn::BeaconColdStateSummary, + } + } +} + +pub trait ColdStore: Sync + Send + Sized + 'static { + // Slot-keyed bulk data. + fn get(&self, column: DBColumnCold, slot: Slot) -> Result>, Error>; + + /// Append `items` to `column`. Slots within `items` must be strictly + /// ascending and strictly greater than every slot already written to the + /// column — the static-file backend rejects out-of-order puts. KV + /// backends accept any order, so this is enforced by the caller, not the + /// trait. + fn put_batch(&self, column: DBColumnCold, items: Vec<(Slot, Vec)>) -> Result<(), Error>; + + fn contains(&self, column: DBColumnCold, slot: Slot) -> Result; + + fn iter_from(&self, column: DBColumnCold, from: Slot) -> SlotIter<'_>; + + // Root-keyed indices owned by the cold backend. + fn get_index(&self, column: DBColumnColdIndex, root: Hash256) -> Result, Error>; + + fn put_index_batch( + &self, + column: DBColumnColdIndex, + items: Vec<(Hash256, Slot)>, + ) -> Result<(), Error>; + + fn iter_index(&self, column: DBColumnColdIndex) -> IndexIter<'_>; + + fn sync(&self) -> Result<(), Error>; +} + pub trait Key: Sized + 'static { fn from_bytes(key: &[u8]) -> Result; } @@ -349,6 +437,12 @@ pub enum DBColumn { /// Can be removed once schema v22 is buried by a hard fork. #[strum(serialize = "bbr")] BeaconBlockRootsChunked, + /// `block_root -> slot` index for blocks moved into the cold archive. + /// Populated by `migrate_database` (Static cold) and era-file import. + /// Empty under KV cold. Consulted by `HotColdDB::get_block_with` to + /// resolve root-keyed reads against the slot-keyed cold archive. + #[strum(serialize = "bbs")] + BeaconBlockSlot, /// DEPRECATED. Can be removed once schema v22 is buried by a hard fork. #[strum(serialize = "bhr")] BeaconHistoricalRoots, @@ -404,6 +498,7 @@ impl DBColumn { Self::OverflowLRUCache => 33, // DEPRECATED Self::BeaconMeta | Self::BeaconBlock + | Self::BeaconBlockSlot | Self::BeaconState | Self::BeaconBlob | Self::BeaconStateSummary @@ -493,6 +588,21 @@ mod tests { } } + /// Mirrors the wrapper that older releases stored in `BeaconColdStateSummary`. + #[derive(Encode, Decode)] + struct LegacyColdStateSummary { + slot: Slot, + } + + #[test] + fn ssz_compat_with_legacy_summary() { + let slot = Slot::new(42); + assert_eq!( + slot.as_ssz_bytes(), + LegacyColdStateSummary { slot }.as_ssz_bytes(), + ); + } + fn test_impl(store: impl ItemStore) { let key = Hash256::random(); let item = StorableThing { a: 1, b: 42 }; diff --git a/beacon_node/store/src/memory_store.rs b/beacon_node/store/src/memory_store.rs index 6baef61c9d8..2777518e423 100644 --- a/beacon_node/store/src/memory_store.rs +++ b/beacon_node/store/src/memory_store.rs @@ -1,8 +1,10 @@ use crate::{ - ColumnIter, ColumnKeyIter, DBColumn, Error, ItemStore, Key, KeyValueStore, KeyValueStoreOp, - errors::Error as DBError, get_key_for_col, hot_cold_store::BytesKey, + ColdStore, ColumnIter, ColumnKeyIter, DBColumn, DBColumnCold, DBColumnColdIndex, Error, + IndexIter, ItemStore, Key, KeyValueStore, KeyValueStoreOp, SlotIter, get_key_for_col, + hot_cold_store::BytesKey, }; use parking_lot::RwLock; +use ssz::{Decode, Encode}; use std::collections::{BTreeMap, HashSet}; use std::marker::PhantomData; use types::*; @@ -93,11 +95,13 @@ impl KeyValueStore for MemoryStore { .filter_map(|(k, _)| k.remove_column_variable(column).map(|k| k.to_vec())) .collect::>(); Box::new(keys.into_iter().filter_map(move |key| { - self.get_bytes(column, &key).transpose().map(|res| { - let k = K::from_bytes(&key)?; - let v = res?; - Ok((k, v)) - }) + KeyValueStore::get_bytes(self, column, &key) + .transpose() + .map(|res| { + let k = K::from_bytes(&key)?; + let v = res?; + Ok((k, v)) + }) })) } @@ -124,7 +128,7 @@ impl KeyValueStore for MemoryStore { Box::new(keys.into_iter().map(move |key| K::from_bytes(&key))) } - fn delete_batch(&self, col: DBColumn, ops: HashSet<&[u8]>) -> Result<(), DBError> { + fn delete_batch(&self, col: DBColumn, ops: HashSet<&[u8]>) -> Result<(), Error> { for op in ops { let column_key = get_key_for_col(col, op); self.db.write().remove(&BytesKey::from_vec(column_key)); @@ -149,3 +153,72 @@ impl KeyValueStore for MemoryStore { } impl ItemStore for MemoryStore {} + +// Mirrors `ColdBackend::Kv` in `database/interface.rs` — both translate the +// slot/root-typed `ColdStore` API into the byte-keyed `KeyValueStore` API in +// the same way. Kept inline here (and there) rather than behind a shared helper +// because there are only two impls and the indirection wasn't earning its +// keep. +// +// `Slot::as_ssz_bytes()` is byte-identical to the legacy +// `ColdStateSummary { slot }` wrapper, so existing dbs round-trip without +// migration. Pinned by the `ssz_compat_with_legacy_summary` test in `lib.rs`. +impl ColdStore for MemoryStore { + fn get(&self, c: DBColumnCold, slot: Slot) -> Result>, Error> { + self.get_bytes(c.db_column(), &slot.as_u64().to_be_bytes()) + } + fn put_batch(&self, c: DBColumnCold, items: Vec<(Slot, Vec)>) -> Result<(), Error> { + let col = c.db_column(); + let ops = items + .into_iter() + .map(|(slot, value)| { + KeyValueStoreOp::PutKeyValue(col, slot.as_u64().to_be_bytes().to_vec(), value) + }) + .collect(); + self.do_atomically(ops) + } + fn contains(&self, c: DBColumnCold, slot: Slot) -> Result { + self.key_exists(c.db_column(), &slot.as_u64().to_be_bytes()) + } + fn iter_from(&self, c: DBColumnCold, from: Slot) -> SlotIter<'_> { + Box::new( + self.iter_column_from::>(c.db_column(), &from.as_u64().to_be_bytes()) + .map(|res| { + res.and_then(|(key_bytes, value)| { + let bytes: [u8; 8] = + key_bytes.try_into().map_err(|_| Error::InvalidBytes)?; + Ok((Slot::new(u64::from_be_bytes(bytes)), value)) + }) + }), + ) + } + fn get_index(&self, c: DBColumnColdIndex, root: Hash256) -> Result, Error> { + Ok(self + .get_bytes(c.db_column(), root.as_slice())? + .map(|bytes| Slot::from_ssz_bytes(&bytes)) + .transpose()?) + } + fn put_index_batch( + &self, + c: DBColumnColdIndex, + items: Vec<(Hash256, Slot)>, + ) -> Result<(), Error> { + let col = c.db_column(); + let ops = items + .into_iter() + .map(|(root, slot)| { + KeyValueStoreOp::PutKeyValue(col, root.as_slice().to_vec(), slot.as_ssz_bytes()) + }) + .collect(); + self.do_atomically(ops) + } + fn iter_index(&self, c: DBColumnColdIndex) -> IndexIter<'_> { + Box::new( + self.iter_column::(c.db_column()) + .map(|res| res.and_then(|(root, value)| Ok((root, Slot::from_ssz_bytes(&value)?)))), + ) + } + fn sync(&self) -> Result<(), Error> { + KeyValueStore::sync(self) + } +} diff --git a/beacon_node/store/src/metadata.rs b/beacon_node/store/src/metadata.rs index 215cdb2b64d..2386291b799 100644 --- a/beacon_node/store/src/metadata.rs +++ b/beacon_node/store/src/metadata.rs @@ -19,6 +19,10 @@ pub const ANCHOR_INFO_KEY: Hash256 = Hash256::repeat_byte(5); pub const BLOB_INFO_KEY: Hash256 = Hash256::repeat_byte(6); pub const DATA_COLUMN_INFO_KEY: Hash256 = Hash256::repeat_byte(7); pub const DATA_COLUMN_CUSTODY_INFO_KEY: Hash256 = Hash256::repeat_byte(8); +/// Records which cold backend (`Kv` or `Static`) was used to write this DB. +/// Set on first open, checked on every subsequent open. Switching backends +/// in-place is unsupported. +pub const COLD_BACKEND_KEY: Hash256 = Hash256::repeat_byte(9); /// State upper limit value used to indicate that a node is not storing historic states. pub const STATE_UPPER_LIMIT_NO_RETAIN: Slot = Slot::new(u64::MAX); @@ -141,11 +145,6 @@ impl AnchorInfo { self.state_lower_limit == 0 && self.state_upper_limit >= split_slot } - /// Return true if no historic states other than genesis *will ever be stored*. - pub fn full_state_pruning_enabled(&self) -> bool { - self.state_lower_limit == 0 && self.state_upper_limit == STATE_UPPER_LIMIT_NO_RETAIN - } - /// Compute the correct `AnchorInfo` for an archive node created from the current node. /// /// This method ensures that the `anchor_slot` which is used for the hot database's diff grid is diff --git a/beacon_node/store/src/reconstruct.rs b/beacon_node/store/src/reconstruct.rs index 04a519af020..8bace0ab2ce 100644 --- a/beacon_node/store/src/reconstruct.rs +++ b/beacon_node/store/src/reconstruct.rs @@ -1,8 +1,9 @@ //! Implementation of historic state reconstruction (given complete block history). +use crate::config::ColdBackendKind; use crate::forwards_iter::FrozenForwardsIterator; -use crate::hot_cold_store::{HotColdDB, HotColdDBError}; +use crate::hot_cold_store::{ColdBatch, HotColdDB, HotColdDBError}; use crate::metrics; -use crate::{DBColumn, Error, ItemStore}; +use crate::{ColdStore, DBColumn, Error, ItemStore}; use itertools::{Itertools, process_results}; use state_processing::{ BlockSignatureStrategy, ConsensusContext, VerifyBlockRoot, per_block_processing, @@ -16,12 +17,21 @@ impl HotColdDB where E: EthSpec, Hot: ItemStore, - Cold: ItemStore, + Cold: ColdStore, { pub fn reconstruct_historic_states( self: &Arc, num_blocks: Option, ) -> Result<(), Error> { + // Online reconstruction writes historic states into the cold backend + // at slots that are already below the static-cold high-water mark for + // each column. The static backend is strict-ascending and would reject + // those puts as out-of-order. Per `specs/static-cold-backend.md`, a + // full node never becomes archive by online reconstruction — refuse. + if matches!(self.config.cold_backend, ColdBackendKind::Static) { + return Err(Error::ReconstructionUnsupportedOnStaticCold); + } + let mut anchor = self.get_anchor_info(); // Nothing to do, history is complete. @@ -129,7 +139,7 @@ where state.build_caches(&self.spec)?; process_results(block_root_iter, |iter| -> Result<(), Error> { - let mut io_batch = vec![]; + let mut batch = ColdBatch::default(); let mut prev_state_root = None; for ((prev_block_root, _), (block_root, slot)) in iter.tuple_windows() { @@ -172,7 +182,7 @@ where .or_else(|_| state.update_tree_hash_cache())?; // Stage state for storage in freezer DB. - self.store_cold_state(&state_root, &state, &mut io_batch)?; + self.store_cold_state(&state_root, &state, &mut batch)?; let batch_complete = slot + 1 == to_slot; @@ -181,7 +191,7 @@ where // - The diff/snapshot for this slot is required for future slots, or // - The reconstruction batch is complete (we are about to return). if self.hierarchy.should_commit_immediately(slot)? || batch_complete { - self.cold_db.do_atomically(std::mem::take(&mut io_batch))?; + self.commit_cold_batch(std::mem::take(&mut batch))?; if batch_complete { // Perform one last integrity check on the state reached. diff --git a/beacon_node/store/src/static_cold.rs b/beacon_node/store/src/static_cold.rs new file mode 100644 index 00000000000..1da8f2219b8 --- /dev/null +++ b/beacon_node/store/src/static_cold.rs @@ -0,0 +1,853 @@ +//! Slot-keyed durable archive for finalized cold-DB columns. +//! +//! `StaticColdStore` is a black box: `(column, slot, bytes)` in, same back. +//! See `specs/static-cold-backend.md` for the abstraction-level contract. +//! +//! # Layout +//! +//! ```text +//! / +//! {blk,bbr,bsr,bss,bsd}/ # one subdir per DBColumnCold +//! data_{file_id:05} # file_id = slot / 8192 +//! data_{file_id:05}.off # 8192 × u64 LE offsets, 0 = no record +//! column.conf # 36-byte commit marker, atomic-renamed +//! index/ # embedded KV for DBColumnColdIndex +//! ``` +//! +//! # File format +//! +//! Data file: e2store version record (`65 32 00 00 00 00 00 00`), then records +//! appended as `type[2] | length[4 LE] | reserved[2]=0 | payload` (snappy- +//! framed if `column.compression`). Per-column tags in `column_config`. +//! +//! `column.conf`: `b"LHSTBLK2" | highest_slot u64 LE (u64::MAX = empty) | +//! current_data_len u64 LE | record_type[2] | compression u8 | reserved | max_value_bytes u64 LE`. +//! Atomic update: write `.tmp`, fsync, rename, fsync dir. +//! +//! # Put contract +//! +//! Durable on return. Slots arrive ascending **or** are identical-value +//! re-puts of an already-committed slot (so `migrate_database` retries after +//! a mid-loop crash are safe). Previously-skipped slots (offset 0) cannot +//! be filled — that would break the append-only data file. +//! +//! # Recovery on open +//! +//! Data file truncated to `current_data_len`; `.off` entries beyond +//! `highest_slot` cleared. The `column.conf` rename is the commit point. +//! +//! # TODO(static): tests +//! +//! - happy path `open` / `get` / `put` per `DBColumnCold` +//! - out-of-order put rejection +//! - identical-value re-put at any committed slot succeeds; mismatched +//! value or skipped-slot fill rejected +//! - crash windows around data, `.off`, and `column.conf` (heal on open) +//! - `max_value_bytes` ratchet-up persists on next open +//! - `COLD_BACKEND_KEY` mismatch refuses to start + +use crate::config::StoreConfig; +use crate::database::interface::BeaconNodeBackend; +use crate::{DBColumnCold, KeyValueStore}; +use parking_lot::Mutex; +use snap::{read::FrameDecoder, write::FrameEncoder}; +use std::{ + collections::HashMap, + fmt, + fs::{self, File, OpenOptions}, + io::{self, Read, Seek, SeekFrom, Write}, + marker::PhantomData, + path::{Path, PathBuf}, +}; +use strum::IntoEnumIterator; +use types::{EthSpec, Slot}; + +const SLOTS_PER_FILE: u64 = 8192; +const OFFSET_SIZE: u64 = 8; +const OFFSET_FILE_LEN: u64 = SLOTS_PER_FILE * OFFSET_SIZE; +const CONFIG_FILE: &str = "column.conf"; +const CONFIG_TMP_FILE: &str = "column.conf.tmp"; +const DATA_FILE_PREFIX: &str = "data_"; +const CONFIG_MAGIC: &[u8; 8] = b"LHSTBLK2"; +const CONFIG_LEN: usize = 36; +/// Empty-store sentinel for `highest_written_slot` in the per-column config. +const EMPTY_SLOT: u64 = u64::MAX; +/// e2store version record, written once at the start of each data file. +const VERSION_RECORD: [u8; 8] = [0x65, 0x32, 0, 0, 0, 0, 0, 0]; + +const COMPRESSION_NONE: u8 = 0; +const COMPRESSION_SNAPPY: u8 = 1; + +/// Per-column configuration. On first creation of a column the values come +/// from `column_config`; thereafter they are persisted in the column file-set +/// `static_blocks.conf` and the on-disk values win over current-build defaults. +#[derive(Debug, Clone, Copy)] +struct ColumnConfig { + /// On-disk subdirectory name under the store root. Stable across builds. + subdir: &'static str, + /// e2store record type tag for this column. + record_type: [u8; 2], + /// Whether values are snappy-framed before write. + compression: bool, + /// Upper bound on a single decoded record's size in bytes. + max_value_bytes: u64, +} + +/// Per-column file format defaults. +fn column_config(column: DBColumnCold) -> ColumnConfig { + match column { + DBColumnCold::Block => ColumnConfig { + subdir: "blk", + record_type: [0x01, 0x00], + compression: true, + max_value_bytes: 10 * 1024 * 1024, + }, + DBColumnCold::BlockRoots => ColumnConfig { + subdir: "bbr", + record_type: [0x02, 0x00], + compression: false, + max_value_bytes: 64, + }, + DBColumnCold::StateRoots => ColumnConfig { + subdir: "bsr", + record_type: [0x03, 0x00], + compression: false, + max_value_bytes: 64, + }, + DBColumnCold::StateSnapshot => ColumnConfig { + subdir: "bss", + record_type: [0x04, 0x00], + compression: false, + max_value_bytes: 1024 * 1024 * 1024, + }, + DBColumnCold::StateDiff => ColumnConfig { + // HDiff is already compressed internally (zstd'd validator and + // balance chunks; xdelta3 state diff). No benefit to wrapping it + // in snappy here. + subdir: "bsd", + record_type: [0x05, 0x00], + compression: false, + max_value_bytes: 1024 * 1024 * 1024, + }, + } +} + +pub struct StaticColdStore { + /// All cold columns the static archive backs, opened eagerly at boot. + /// Frozen after construction; per-column writer state is locked inside + /// each `Column`. + columns: HashMap, + /// Embedded KV for root-keyed indices (e.g. `ColdStateSummary`). The + /// slot-keyed file backend is the bulk archive; this side-table lets us + /// look up `state_root → slot` without scanning the bulk files. + index_db: BeaconNodeBackend, + _phantom: PhantomData, +} + +type StoreResult = std::result::Result; + +#[derive(Debug)] +pub enum StaticColdStoreError { + Io(io::Error), + Compression(io::Error), + Invalid(String), + Unsupported(&'static str), +} + +impl fmt::Display for StaticColdStoreError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Io(e) => write!(f, "static cold store io error: {e}"), + Self::Compression(e) => write!(f, "static cold store compression error: {e}"), + Self::Invalid(message) => write!(f, "static cold store invalid data: {message}"), + Self::Unsupported(op) => write!(f, "static cold store does not support {op}"), + } + } +} + +impl From for StaticColdStoreError { + fn from(e: io::Error) -> Self { + Self::Io(e) + } +} + +impl StaticColdStore { + /// Open the archive rooted at `path`. Every cold column is opened eagerly + /// so subsequent reads/writes are pure hashmap lookups with no I/O on the + /// hot path. An embedded KV is opened at `/index/` for the + /// root-keyed indices. + pub fn open(path: &Path, config: &StoreConfig) -> Result { + fs::create_dir_all(path).map_err(StaticColdStoreError::Io)?; + let mut columns = HashMap::new(); + for column in DBColumnCold::iter() { + let cfg = column_config(column); + columns.insert(column, Column::open(path.join(cfg.subdir), cfg)?); + } + let index_db = BeaconNodeBackend::open(config, &path.join("index"))?; + Ok(Self { + columns, + index_db, + _phantom: PhantomData, + }) + } + + /// Read the value at `(column, slot)`, if present. + pub fn get(&self, column: DBColumnCold, slot: Slot) -> StoreResult>> { + self.columns[&column].get(slot) + } + + /// Durably store `bytes` at `(column, slot)`. Slots within a column must + /// arrive strictly ascending. A re-put of an identical value at the + /// current highest slot is treated as a no-op so callers that pre-write + /// a slot at startup (e.g. genesis block_root) don't trip the + /// out-of-order check on the first migration. + pub fn put(&self, column: DBColumnCold, slot: Slot, bytes: &[u8]) -> StoreResult<()> { + self.columns[&column].put(slot, bytes) + } + + /// Return `true` if a value exists at `(column, slot)`. Cheaper than `get` + /// because only the `.off` sidecar is consulted; the data file is not read. + pub fn contains(&self, column: DBColumnCold, slot: Slot) -> StoreResult { + self.columns[&column].contains(slot) + } +} + +/// Single-column slot-keyed file set. Owns one subdirectory of data + `.off` + +/// config files. +#[derive(Debug)] +struct Column { + root_dir: PathBuf, + config: ColumnConfig, + highest_written_slot: Mutex>, +} + +struct ColumnConfigOnDisk { + highest_written_slot: Option, + current_data_len: u64, + record_type: [u8; 2], + compression: bool, + max_value_bytes: u64, +} + +impl Column { + fn open(root_dir: PathBuf, defaults: ColumnConfig) -> StoreResult { + fs::create_dir_all(&root_dir)?; + + // First-open: persist current-build defaults. Re-open: persisted + // settings win over `defaults`, which preserves on-disk readability + // even if the build's defaults change later. + let config_path = root_dir.join(CONFIG_FILE); + let tmp_path = root_dir.join(CONFIG_TMP_FILE); + if !config_path.exists() { + atomic_write_config(&config_path, &tmp_path, &root_dir, None, 0, &defaults)?; + } + + let on_disk = read_config(&config_path)?; + // record_type and compression are sticky — they're load-bearing for + // reading old records, so on-disk wins over build-time defaults. + // max_value_bytes is a soft bound used to cap accepted record sizes; + // ratchet it up if the build's default is larger so a newer build + // can write bigger records than an older one persisted, then + // re-persist immediately so future opens see the new bound. + let max_value_bytes = on_disk.max_value_bytes.max(defaults.max_value_bytes); + let config = ColumnConfig { + subdir: defaults.subdir, + record_type: on_disk.record_type, + compression: on_disk.compression, + max_value_bytes, + }; + if max_value_bytes != on_disk.max_value_bytes { + atomic_write_config( + &config_path, + &tmp_path, + &root_dir, + on_disk.highest_written_slot, + on_disk.current_data_len, + &config, + )?; + } + + let handle = Self { + root_dir, + config, + highest_written_slot: Mutex::new(None), + }; + + if let Some(slot) = on_disk.highest_written_slot { + handle.heal_current_file(slot, on_disk.current_data_len)?; + } + *handle.highest_written_slot.lock() = on_disk.highest_written_slot; + + Ok(handle) + } + + fn get(&self, slot: Slot) -> StoreResult>> { + let Some(highest_written_slot) = *self.highest_written_slot.lock() else { + return Ok(None); + }; + if slot > highest_written_slot { + return Ok(None); + } + self.read_record(slot) + } + + /// Read a record at `slot` without consulting the writer mutex. Used by + /// callers that already hold the lock (`put` for the idempotency check) + /// or have another reason to know the slot is committed. + fn read_record(&self, slot: Slot) -> StoreResult>> { + let file_id = file_id(slot); + let offset = self.read_offset(file_id, slot)?; + if offset == 0 { + return Ok(None); + } + + let data_path = self.data_path(file_id); + let mut data_file = File::open(&data_path)?; + data_file.seek(SeekFrom::Start(offset))?; + + let mut header = [0; 8]; + data_file.read_exact(&mut header)?; + if header[0..2] != self.config.record_type || header[6..8] != [0, 0] { + return Err(StaticColdStoreError::Invalid( + "invalid static cold record header".into(), + )); + } + + let len = u32::from_le_bytes([header[2], header[3], header[4], header[5]]) as usize; + let mut payload = vec![0; len]; + data_file.read_exact(&mut payload)?; + + if self.config.compression { + decompress_record(&payload, self.config.max_value_bytes).map(Some) + } else { + if (payload.len() as u64) > self.config.max_value_bytes { + return Err(StaticColdStoreError::Invalid( + "static cold record exceeds size limit".into(), + )); + } + Ok(Some(payload)) + } + } + + fn contains(&self, slot: Slot) -> StoreResult { + let Some(highest_written_slot) = *self.highest_written_slot.lock() else { + return Ok(false); + }; + if slot > highest_written_slot { + return Ok(false); + } + Ok(self.read_offset(file_id(slot), slot)? != 0) + } + + fn put(&self, slot: Slot, bytes: &[u8]) -> StoreResult<()> { + let mut highest_written_slot = self.highest_written_slot.lock(); + if let Some(highest) = *highest_written_slot + && slot <= highest + { + // Idempotent re-put: any committed slot can be re-put with the + // identical value. Required so a `migrate_database` retry after a + // mid-loop crash can re-walk slots that were already committed in + // the previous attempt without tripping the strict-ascending + // invariant. A previously-skipped slot (offset zero) cannot be + // filled in — that would break the append-only data file. + let existing = self.read_record(slot)?.ok_or_else(|| { + StaticColdStoreError::Invalid(format!( + "static cold re-put at slot {slot} <= highest {highest} \ + but no record exists; cannot fill a previously-skipped slot" + )) + })?; + if existing == bytes { + return Ok(()); + } + return Err(StaticColdStoreError::Invalid(format!( + "static cold re-put at slot {slot} with mismatched value" + ))); + } + + let payload = if self.config.compression { + compress_record(bytes)? + } else { + bytes.to_vec() + }; + let payload_len = u32::try_from(payload.len()) + .map_err(|_| StaticColdStoreError::Invalid("static cold record too large".into()))?; + + let target_file_id = file_id(slot); + // Discard an uncommitted next-file tail after a crash. + let reset_file = (*highest_written_slot).map(file_id) != Some(target_file_id); + let off_pos = offset_position(slot); + let data_path = self.data_path(target_file_id); + let off_path = self.offset_path(target_file_id); + + let mut data_file = OpenOptions::new() + .read(true) + .append(true) + .create(true) + .open(&data_path)?; + if reset_file { + data_file.set_len(0)?; + } + + if data_file.metadata()?.len() == 0 { + data_file.write_all(&VERSION_RECORD)?; + } + + let offset = data_file.seek(SeekFrom::End(0))?; + write_record( + &mut data_file, + self.config.record_type, + payload_len, + &payload, + )?; + let data_len = data_file.seek(SeekFrom::End(0))?; + // Data and offset files must hit disk before the config commit marker. + data_file.sync_all()?; + + let mut off_file = OpenOptions::new() + .read(true) + .write(true) + .create(true) + .truncate(false) + .open(&off_path)?; + if reset_file { + off_file.set_len(0)?; + } + if off_file.metadata()?.len() < OFFSET_FILE_LEN { + off_file.set_len(OFFSET_FILE_LEN)?; + } + off_file.seek(SeekFrom::Start(off_pos))?; + off_file.write_all(&offset.to_le_bytes())?; + off_file.sync_all()?; + + // Atomic config update is the commit point. + self.write_config(Some(slot), data_len)?; + *highest_written_slot = Some(slot); + + Ok(()) + } + + /// Append `items` to the column with one fsync per file (data + offset), + /// not per slot. Whole batch is durable on return — the same caller-visible + /// contract as `put` — but with O(1) syncs per underlying file instead of + /// O(n) per item. + /// + /// The implementation walks `items` once, grouping them by `file_id`. For + /// each group it opens the data file and offset file once, appends every + /// record's bytes (collecting `(slot, offset)` pairs in memory), writes the + /// offset table, fsyncs both files, then commits via `write_config`. Idempotent + /// re-put of `items[0]` at `highest_written_slot` is honored as in `put`. + fn put_batch(&self, items: Vec<(Slot, Vec)>) -> StoreResult<()> { + if items.is_empty() { + return Ok(()); + } + + // Validate ascending order up front (cheap, catches caller bugs). + for w in items.windows(2) { + if w[1].0 <= w[0].0 { + return Err(StaticColdStoreError::Invalid( + "static cold put_batch slots must be strictly ascending".into(), + )); + } + } + + let mut highest_written_slot = self.highest_written_slot.lock(); + let mut iter = items.into_iter().peekable(); + + // Idempotent re-put: if the first item is exactly highest_written_slot + // with matching bytes, drop it from the batch. + if let (Some(highest), Some((first_slot, _))) = (*highest_written_slot, iter.peek()) { + if *first_slot < highest { + return Err(StaticColdStoreError::Invalid( + "static cold put_batch out of order vs highest_written_slot".into(), + )); + } + if *first_slot == highest { + let (slot, value) = iter.next().expect("peeked"); + let existing = self.read_record(slot)?.ok_or_else(|| { + StaticColdStoreError::Invalid( + "static cold missing record at highest slot".into(), + ) + })?; + if existing != value { + return Err(StaticColdStoreError::Invalid( + "static cold re-put with mismatched value".into(), + )); + } + } + } + + // Group remaining items by file_id, write each group with a single + // fsync per file. + let mut last_slot: Option = None; + let mut last_data_len: u64 = 0; + while iter.peek().is_some() { + let target_file_id = file_id(iter.peek().expect("peeked").0); + let mut group: Vec<(Slot, Vec)> = Vec::new(); + while let Some(&(slot, _)) = iter.peek() { + if file_id(slot) != target_file_id { + break; + } + group.push(iter.next().expect("peeked")); + } + + let reset_file = (*highest_written_slot).map(file_id) != Some(target_file_id); + let data_path = self.data_path(target_file_id); + let off_path = self.offset_path(target_file_id); + + // Data file: append all records, then fsync once. + let mut data_file = OpenOptions::new() + .read(true) + .append(true) + .create(true) + .open(&data_path)?; + if reset_file { + data_file.set_len(0)?; + } + if data_file.metadata()?.len() == 0 { + data_file.write_all(&VERSION_RECORD)?; + } + // BufWriter coalesces the small-record header writes (8 bytes) and + // the small payloads into larger syscalls. + let mut offsets: Vec<(Slot, u64)> = Vec::with_capacity(group.len()); + { + let mut writer = std::io::BufWriter::with_capacity(1 << 20, &mut data_file); + let mut cursor = writer.get_ref().metadata()?.len(); + for (slot, value) in &group { + let payload: std::borrow::Cow<'_, [u8]> = if self.config.compression { + compress_record(value)?.into() + } else { + value.as_slice().into() + }; + let payload_len = u32::try_from(payload.len()).map_err(|_| { + StaticColdStoreError::Invalid("static cold record too large".into()) + })?; + offsets.push((*slot, cursor)); + // Inline `write_record` to avoid the `&mut File` -> BufWriter mismatch. + writer.write_all(&self.config.record_type)?; + writer.write_all(&payload_len.to_le_bytes())?; + writer.write_all(&0u16.to_le_bytes())?; + writer.write_all(&payload)?; + cursor += 8 + payload.len() as u64; + } + writer.flush()?; + } + let data_len = data_file.seek(SeekFrom::End(0))?; + data_file.sync_all()?; + + // Offset file: open, ensure full size, write all offsets in seek+write + // pairs (8 bytes each), then fsync once. + let mut off_file = OpenOptions::new() + .read(true) + .write(true) + .create(true) + .truncate(false) + .open(&off_path)?; + if reset_file { + off_file.set_len(0)?; + } + if off_file.metadata()?.len() < OFFSET_FILE_LEN { + off_file.set_len(OFFSET_FILE_LEN)?; + } + for (slot, offset) in &offsets { + off_file.seek(SeekFrom::Start(offset_position(*slot)))?; + off_file.write_all(&offset.to_le_bytes())?; + } + off_file.sync_all()?; + + // Track final slot/data_len for the single config commit at end of batch. + if let Some((s, _)) = group.last() { + last_slot = Some(*s); + last_data_len = data_len; + } + *highest_written_slot = last_slot; + } + + // Single atomic config commit covering the whole batch. + if let Some(s) = last_slot { + self.write_config(Some(s), last_data_len)?; + } + + Ok(()) + } + + fn heal_current_file(&self, slot: Slot, current_data_len: u64) -> StoreResult<()> { + let file_id = file_id(slot); + let data_path = self.data_path(file_id); + let data_file = OpenOptions::new().read(true).write(true).open(&data_path)?; + let data_len = data_file.metadata()?.len(); + if data_len < current_data_len { + return Err(StaticColdStoreError::Invalid( + "static cold data file shorter than committed length".into(), + )); + } + if data_len != current_data_len { + data_file.set_len(current_data_len)?; + data_file.sync_all()?; + } + + let off_path = self.offset_path(file_id); + let mut off_file = OpenOptions::new().read(true).write(true).open(&off_path)?; + let required_len = offset_position(slot) + OFFSET_SIZE; + let off_len = off_file.metadata()?.len(); + if off_len < required_len { + return Err(StaticColdStoreError::Invalid( + "static cold offset file shorter than committed slot".into(), + )); + } + if off_len < OFFSET_FILE_LEN { + off_file.set_len(OFFSET_FILE_LEN)?; + } + + let clear_start = required_len; + if clear_start < OFFSET_FILE_LEN { + // Remove offsets to entries beyond the committed slot. + off_file.seek(SeekFrom::Start(clear_start))?; + let zeroes = vec![0; (OFFSET_FILE_LEN - clear_start) as usize]; + off_file.write_all(&zeroes)?; + off_file.sync_all()?; + } + + Ok(()) + } + + fn write_config( + &self, + highest_written_slot: Option, + current_data_len: u64, + ) -> StoreResult<()> { + atomic_write_config( + &self.config_path(), + &self.root_dir.join(CONFIG_TMP_FILE), + &self.root_dir, + highest_written_slot, + current_data_len, + &self.config, + ) + } + + fn read_offset(&self, file_id: u64, slot: Slot) -> StoreResult { + let off_path = self.offset_path(file_id); + let mut off_file = File::open(&off_path)?; + let mut bytes = [0; 8]; + off_file.seek(SeekFrom::Start(offset_position(slot)))?; + off_file.read_exact(&mut bytes)?; + Ok(u64::from_le_bytes(bytes)) + } + + fn config_path(&self) -> PathBuf { + self.root_dir.join(CONFIG_FILE) + } + + fn data_path(&self, file_id: u64) -> PathBuf { + self.root_dir + .join(format!("{DATA_FILE_PREFIX}{file_id:05}")) + } + + fn offset_path(&self, file_id: u64) -> PathBuf { + self.root_dir + .join(format!("{DATA_FILE_PREFIX}{file_id:05}.off")) + } +} + +fn read_config(path: &Path) -> StoreResult { + let bytes = fs::read(path)?; + if bytes.len() != CONFIG_LEN || &bytes[0..8] != CONFIG_MAGIC { + return Err(StaticColdStoreError::Invalid( + "invalid static cold config".into(), + )); + } + let highest = u64::from_le_bytes(bytes[8..16].try_into().expect("slice length checked")); + let current_data_len = + u64::from_le_bytes(bytes[16..24].try_into().expect("slice length checked")); + let record_type = [bytes[24], bytes[25]]; + let compression = match bytes[26] { + COMPRESSION_NONE => false, + COMPRESSION_SNAPPY => true, + other => { + return Err(StaticColdStoreError::Invalid(format!( + "unknown compression flag {other}" + ))); + } + }; + let max_value_bytes = + u64::from_le_bytes(bytes[28..36].try_into().expect("slice length checked")); + Ok(ColumnConfigOnDisk { + highest_written_slot: (highest != EMPTY_SLOT).then(|| Slot::new(highest)), + current_data_len, + record_type, + compression, + max_value_bytes, + }) +} + +fn atomic_write_config( + config_path: &Path, + tmp_path: &Path, + root_dir: &Path, + highest_written_slot: Option, + current_data_len: u64, + config: &ColumnConfig, +) -> StoreResult<()> { + let mut bytes = [0u8; CONFIG_LEN]; + bytes[0..8].copy_from_slice(CONFIG_MAGIC); + bytes[8..16].copy_from_slice( + &highest_written_slot + .map_or(EMPTY_SLOT, |slot| slot.as_u64()) + .to_le_bytes(), + ); + bytes[16..24].copy_from_slice(¤t_data_len.to_le_bytes()); + bytes[24..26].copy_from_slice(&config.record_type); + bytes[26] = if config.compression { + COMPRESSION_SNAPPY + } else { + COMPRESSION_NONE + }; + bytes[27] = 0; + bytes[28..36].copy_from_slice(&config.max_value_bytes.to_le_bytes()); + + { + let mut tmp = File::create(tmp_path)?; + tmp.write_all(&bytes)?; + tmp.sync_all()?; + } + + fs::rename(tmp_path, config_path)?; + sync_dir(root_dir) +} + +fn file_id(slot: Slot) -> u64 { + slot.as_u64() / SLOTS_PER_FILE +} + +fn offset_position(slot: Slot) -> u64 { + (slot.as_u64() % SLOTS_PER_FILE) * OFFSET_SIZE +} + +fn compress_record(bytes: &[u8]) -> StoreResult> { + let mut encoder = FrameEncoder::new(Vec::new()); + encoder + .write_all(bytes) + .map_err(StaticColdStoreError::Compression)?; + encoder.flush().map_err(StaticColdStoreError::Compression)?; + Ok(encoder.get_ref().clone()) +} + +fn write_record( + file: &mut File, + record_type: [u8; 2], + payload_len: u32, + payload: &[u8], +) -> StoreResult<()> { + file.write_all(&record_type)?; + file.write_all(&payload_len.to_le_bytes())?; + file.write_all(&0u16.to_le_bytes())?; + file.write_all(payload)?; + Ok(()) +} + +fn decompress_record(bytes: &[u8], max_value_bytes: u64) -> StoreResult> { + let decoder = FrameDecoder::new(bytes); + let mut limited = decoder.take(max_value_bytes + 1); + let mut decompressed = Vec::new(); + limited + .read_to_end(&mut decompressed) + .map_err(StaticColdStoreError::Compression)?; + if decompressed.len() as u64 > max_value_bytes { + return Err(StaticColdStoreError::Invalid( + "static cold record exceeds decompressed size limit".into(), + )); + } + Ok(decompressed) +} + +fn sync_dir(path: &Path) -> StoreResult<()> { + let dir = File::open(path)?; + dir.sync_all()?; + Ok(()) +} + +// Slot-keyed columns are served from the static files; root-keyed index +// columns are served from the embedded KV at `/index/`. +impl crate::ColdStore for StaticColdStore { + fn get(&self, c: DBColumnCold, slot: Slot) -> Result>, crate::Error> { + StaticColdStore::get(self, c, slot).map_err(Into::into) + } + + fn put_batch(&self, c: DBColumnCold, items: Vec<(Slot, Vec)>) -> Result<(), crate::Error> { + self.columns[&c].put_batch(items).map_err(Into::into) + } + + fn contains(&self, c: DBColumnCold, slot: Slot) -> Result { + StaticColdStore::contains(self, c, slot).map_err(Into::into) + } + + fn iter_from(&self, c: DBColumnCold, from: Slot) -> crate::SlotIter<'_> { + // TODO(static): this is O(highest - from) reads, one File::open per slot, + // and most slots in sparse columns (StateSnapshot/StateDiff) yield None. + // Acceptable today because iter_from is only used by infrequent paths + // (forwards iter, invariants). Improve if it becomes a hotspot. + let column = &self.columns[&c]; + let Some(highest) = *column.highest_written_slot.lock() else { + return Box::new(std::iter::empty()); + }; + if from > highest { + return Box::new(std::iter::empty()); + } + let column_ref = column; + Box::new( + (from.as_u64()..=highest.as_u64()) + .map(Slot::new) + .filter_map(move |slot| match column_ref.read_record(slot) { + Ok(Some(value)) => Some(Ok((slot, value))), + Ok(None) => None, + Err(e) => Some(Err(e.into())), + }), + ) + } + + fn get_index( + &self, + c: crate::DBColumnColdIndex, + root: types::Hash256, + ) -> Result, crate::Error> { + use ssz::Decode; + Ok(self + .index_db + .get_bytes(c.db_column(), root.as_slice())? + .map(|bytes| Slot::from_ssz_bytes(&bytes)) + .transpose()?) + } + + fn put_index_batch( + &self, + c: crate::DBColumnColdIndex, + items: Vec<(types::Hash256, Slot)>, + ) -> Result<(), crate::Error> { + use ssz::Encode; + let col = c.db_column(); + let ops = items + .into_iter() + .map(|(root, slot)| { + crate::KeyValueStoreOp::PutKeyValue( + col, + root.as_slice().to_vec(), + slot.as_ssz_bytes(), + ) + }) + .collect(); + self.index_db.do_atomically(ops) + } + + fn iter_index(&self, c: crate::DBColumnColdIndex) -> crate::IndexIter<'_> { + use ssz::Decode; + Box::new( + self.index_db + .iter_column::(c.db_column()) + .map(|res| res.and_then(|(root, value)| Ok((root, Slot::from_ssz_bytes(&value)?)))), + ) + } + + fn sync(&self) -> Result<(), crate::Error> { + KeyValueStore::sync(&self.index_db) + } +} diff --git a/book/src/help_bn.md b/book/src/help_bn.md index b580bcae528..d43bc44516e 100644 --- a/book/src/help_bn.md +++ b/book/src/help_bn.md @@ -71,6 +71,10 @@ Options: --checkpoint-sync-url-timeout Set the timeout for checkpoint sync calls to remote beacon node HTTP endpoint. [default: 180] + --cold-backend + Cold (freezer) DB backend. "kv" stores cold data in the same KV as the + hot DB. "static" stores cold data in slot-keyed static files; only + supported when starting from genesis. [possible values: kv, static] -d, --datadir Used to specify a custom root data directory for lighthouse keys and databases. Defaults to $HOME/.lighthouse/{network} where network is diff --git a/database_manager/src/cli.rs b/database_manager/src/cli.rs index cb332546f94..b8d1ea2ea84 100644 --- a/database_manager/src/cli.rs +++ b/database_manager/src/cli.rs @@ -78,7 +78,6 @@ pub enum DatabaseManagerSubcommand { Version(Version), PrunePayloads(PrunePayloads), PruneBlobs(PruneBlobs), - PruneStates(PruneStates), Compact(Compact), } @@ -176,21 +175,6 @@ pub struct PrunePayloads {} )] pub struct PruneBlobs {} -#[derive(Parser, Clone, Deserialize, Serialize, Debug)] -#[clap( - about = "Prune all beacon states from the freezer database.", - alias = "prune_states" -)] -pub struct PruneStates { - #[clap( - long, - help = "Commit to pruning states irreversably. Without this flag the command will \ - just check that the database is capable of being pruned.", - help_heading = FLAG_HEADER, - )] - pub confirm: bool, -} - #[derive(Parser, Clone, Deserialize, Serialize, Debug)] #[clap(about = "Compact database manually.")] pub struct Compact { diff --git a/database_manager/src/lib.rs b/database_manager/src/lib.rs index 608400fa7ed..f6552c6b7b0 100644 --- a/database_manager/src/lib.rs +++ b/database_manager/src/lib.rs @@ -1,7 +1,6 @@ pub mod cli; use crate::cli::DatabaseManager; use crate::cli::Migrate; -use crate::cli::PruneStates; use beacon_chain::{ builder::Witness, schema_change::migrate_schema, slot_clock::SystemTimeSlotClock, }; @@ -17,13 +16,13 @@ use std::path::PathBuf; use store::KeyValueStore; use store::{ DBColumn, HotColdDB, - database::interface::BeaconNodeBackend, + database::interface::{BeaconNodeBackend, ColdBackend}, errors::Error, metadata::{CURRENT_SCHEMA_VERSION, SchemaVersion}, }; use strum::{EnumString, VariantNames}; -use tracing::{info, warn}; -use types::{BeaconState, EthSpec, Slot}; +use tracing::info; +use types::EthSpec; fn parse_client_config( cli_args: &ArgMatches, @@ -55,7 +54,7 @@ pub fn display_db_version( let blobs_path = client_config.get_blobs_db_path(); let mut version = CURRENT_SCHEMA_VERSION; - HotColdDB::, BeaconNodeBackend>::open( + HotColdDB::, ColdBackend>::open( &hot_path, &cold_path, &blobs_path, @@ -309,7 +308,7 @@ pub fn migrate_db( let mut from = CURRENT_SCHEMA_VERSION; let to = migrate_config.to; - let db = HotColdDB::, BeaconNodeBackend>::open( + let db = HotColdDB::, ColdBackend>::open( &hot_path, &cold_path, &blobs_path, @@ -339,7 +338,7 @@ pub fn prune_payloads( let cold_path = client_config.get_freezer_db_path(); let blobs_path = client_config.get_blobs_db_path(); - let db = HotColdDB::, BeaconNodeBackend>::open( + let db = HotColdDB::, ColdBackend>::open( &hot_path, &cold_path, &blobs_path, @@ -363,7 +362,7 @@ pub fn prune_blobs( let cold_path = client_config.get_freezer_db_path(); let blobs_path = client_config.get_blobs_db_path(); - let db = HotColdDB::, BeaconNodeBackend>::open( + let db = HotColdDB::, ColdBackend>::open( &hot_path, &cold_path, &blobs_path, @@ -377,75 +376,6 @@ pub fn prune_blobs( db.try_prune_most_blobs(true) } -pub struct PruneStatesConfig { - confirm: bool, -} -fn parse_prune_states_config( - prune_states_config: &PruneStates, -) -> Result { - let confirm = prune_states_config.confirm; - Ok(PruneStatesConfig { confirm }) -} - -pub fn prune_states( - client_config: ClientConfig, - prune_config: PruneStatesConfig, - mut genesis_state: BeaconState, - runtime_context: &RuntimeContext, -) -> Result<(), String> { - let spec = &runtime_context.eth2_config.spec; - let hot_path = client_config.get_db_path(); - let cold_path = client_config.get_freezer_db_path(); - let blobs_path = client_config.get_blobs_db_path(); - - let db = HotColdDB::, BeaconNodeBackend>::open( - &hot_path, - &cold_path, - &blobs_path, - |_, _, _| Ok(()), - client_config.store, - spec.clone(), - ) - .map_err(|e| format!("Unable to open database: {e:?}"))?; - - // Load the genesis state from the database to ensure we're deleting states for the - // correct network, and that we don't end up storing the wrong genesis state. - let genesis_from_db = db - .load_cold_state_by_slot(Slot::new(0)) - .map_err(|e| format!("Error reading genesis state: {e:?}"))?; - - if genesis_from_db.genesis_validators_root() != genesis_state.genesis_validators_root() { - return Err(format!( - "Error: Wrong network. Genesis state in DB does not match {} genesis.", - spec.config_name.as_deref().unwrap_or("") - )); - } - - // Check that the user has confirmed they want to proceed. - if !prune_config.confirm { - if db.get_anchor_info().full_state_pruning_enabled() { - info!("States have already been pruned"); - return Ok(()); - } - - info!("Ready to prune states"); - warn!("Pruning states is irreversible"); - warn!("Re-run this command with --confirm to commit to state deletion"); - info!("Nothing has been pruned on this run"); - return Err("Error: confirmation flag required".into()); - } - - // Delete all historic state data and *re-store* the genesis state. - let genesis_state_root = genesis_state - .update_tree_hash_cache() - .map_err(|e| format!("Error computing genesis state root: {e:?}"))?; - db.prune_historic_states(genesis_state_root, &genesis_state) - .map_err(|e| format!("Failed to prune due to error: {e:?}"))?; - - info!("Historic states pruned successfully"); - Ok(()) -} - /// Run the database manager, returning an error string if the operation did not succeed. pub fn run( cli_args: &ArgMatches, @@ -456,26 +386,6 @@ pub fn run( let context = env.core_context(); let format_err = |e| format!("Fatal error: {:?}", e); - let get_genesis_state = || { - let executor = env.core_context().executor; - let network_config = context - .eth2_network_config - .clone() - .ok_or("Missing network config")?; - - executor - .block_on_dangerous( - network_config.genesis_state::( - client_config.genesis_state_url.as_deref(), - client_config.genesis_state_url_timeout, - ), - "get_genesis_state", - ) - .ok_or("Shutting down")? - .map_err(|e| format!("Error getting genesis state: {e}"))? - .ok_or("Genesis state missing".to_string()) - }; - match &db_manager_config.subcommand { cli::DatabaseManagerSubcommand::Migrate(migrate_config) => { let migrate_config = parse_migrate_config(migrate_config)?; @@ -494,11 +404,6 @@ pub fn run( cli::DatabaseManagerSubcommand::PruneBlobs(_) => { prune_blobs(client_config, &context).map_err(format_err) } - cli::DatabaseManagerSubcommand::PruneStates(prune_states_config) => { - let prune_config = parse_prune_states_config(prune_states_config)?; - let genesis_state = get_genesis_state()?; - prune_states(client_config, prune_config, genesis_state, &context) - } cli::DatabaseManagerSubcommand::Compact(compact_config) => { let compact_config = parse_compact_config(compact_config)?; compact_db::(compact_config, client_config).map_err(format_err) diff --git a/specs/era-storage.md b/specs/era-storage.md new file mode 100644 index 00000000000..854a9c1d65e --- /dev/null +++ b/specs/era-storage.md @@ -0,0 +1,284 @@ +# Era Blob Storage + +Static-file backend for `BlobSidecar` archival, using E2Store-compatible `.erb` +files. Slot-indexed, append-only forward, sealed in fixed-size eras. + +Stored data is blobs only. Column sidecars are derived on read. + +**Initialization is via genesis sync or import of an existing era set. +Checkpoint sync and P2P blob backfill are incompatible with this backend +and rejected at startup.** + +## Required APIs (active forks: Fulu, Gloas) + +``` +get_blobs(slot).into_columns ≡ + get_data_column_sidecars_from_block( + block, + [compute_cells_and_kzg_proofs(b) for b in blobs] + ) +``` +(consensus-specs/fulu/validator.md) + +### REST (beacon-APIs) + +| Endpoint | blobs_db | era backend | +| - | - | - | +| `GET /eth/v1/beacon/blobs/{block_id}?versioned_hashes=…` | `get_blobs(root)`, HTTP filters by hash | resolve slot → `era.get_blobs(slot)` | +| `GET /eth/v1/debug/beacon/data_column_sidecars/{block_id}?indices=…` | `get_data_columns(root)` | resolve slot → `era.get_blobs(slot).into_columns` | + +### P2P Req/Resp (Fulu, carried into Gloas) + +| Method | blobs_db | era backend | +| - | - | - | +| `BlobSidecarsByRange` | blobs_db per slot | `era.get_blobs(slot)` | +| `BlobSidecarsByRoot` | blobs_db per root | resolve root → slot → `BlobSidecarsByRange` | +| `DataColumnSidecarsByRange` | `BeaconDataColumn` per slot | `era.get_blobs(slot).into_columns` | +| `DataColumnSidecarsByRoot` | `BeaconDataColumn` per root | resolve root → slot → `DataColumnSidecarsByRange` | + +`era.get_blobs(slot)` returns the full per-slot list; HTTP / wire-layer +projection (`versioned_hashes`, `indices`, `columns`) happens above the +store. Blob wire methods are deprecated as of +`FULU_FORK_EPOCH + MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS`. + +## Constants + +| Name | Value | +| - | - | +| `SLOTS_PER_ERA` | `SLOTS_PER_HISTORICAL_ROOT` (`8192`) | +| `ERA_SEAL_DELAY` | `2 * SLOTS_PER_EPOCH` | + +## Custom types + +| Name | SSZ | +| - | - | +| `EraNumber` | `uint64` | +| `EraBlobPointers` | `{ back: Slot, forward: Option }` | +| `Manifest` | `{ sealed_eras: List[EraNumber], anchor_era: EraNumber }` | + +## Layout + +``` +{datadir}/beacon/era/ + manifest + {network}-{era:05}-{root_prefix}.erb +``` + +## API + +The era store is **not** a `KeyValueStore`. It is a narrow, slot-indexed type +held as a field on `HotColdDB`: + +```rust +fn get_blobs(slot: Slot) -> Result>>; +fn append_blobs(slot: Slot, blobs: BlobSidecarList) -> Result<()>; // requires slot > forward_pointer +fn forward_pointer() -> Option; +fn back_pointer() -> Slot; // set once at init, read-only +fn is_sealed(era: EraNumber) -> bool; +fn seal(era: EraNumber) -> Result<()>; +``` + +### Invariants + +- `back_pointer` is set at init and never changes: + - genesis sync → `back_pointer = 0` + - era-file init → `back_pointer = lowest slot in imported set` +- `forward_pointer` is `None` at genesis-sync init, or `= highest slot in + imported set` after era-file init. It only advances; `append_blobs` + requires `slot > forward_pointer`. +- Out-of-order writes return `Err(OutOfOrder)`. The store does not de-dupe. +- `get_blobs(slot)`: + - in-range, no blobs at slot → `Some([])` + - in-range, blobs present → `Some(list)` + - out-of-range → `None` + +### Gap fills + +When `append_blobs(slot, ...)` advances multiple slots ahead of +`forward_pointer`, intermediate slots are auto-filled with empties internally. +Required for the pre-Deneb → Deneb jump on first append. + +## Helpers + +```python +def era_of(slot: Slot) -> EraNumber: + return slot // SLOTS_PER_ERA + +def era_range(e: EraNumber) -> (Slot, Slot): + return (e * SLOTS_PER_ERA, (e + 1) * SLOTS_PER_ERA) + +def can_seal(e, back, forward, finalized) -> bool: + start, end = era_range(e) + return (forward is not None + and back <= start and end <= forward + and end + ERA_SEAL_DELAY <= finalized) +``` + +## Triggers + +| Op | Caller | Source | +| - | - | - | +| `append_blobs` | new step on the migrator thread, after `migrate_database`, before `try_prune_blobs` | drains `blobs_db` for slots that became finalized this migration | +| `seal` | post `append_blobs`; also at startup over imported eras | when `can_seal(e)` holds | + +`append_blobs` fires only post-finality; the no-rewrite invariant of `.erb` +files is preserved against reorgs by construction. There is no backfill +trigger — historical data arrives only via era-file import at init. + +## Sealing + +For each `e` with `can_seal(e)`: +1. Write `.erb.tmp`, append `SlotIndex`, fsync. +2. Atomic rename to final filename. +3. Update `manifest`. +4. Delete overlay rows for `era_range(e)`. + +Crash mid-seal leaves a `*.tmp` discarded on restart. Sealing is idempotent. + +## Read + +`HotColdDB::get_blobs(block_root)` becomes: +1. Resolve `slot` from `block_root` (see Status quo — there is no slot index + for blobs today; one of the three options below is required). +2. If `era_of(slot)` is sealed → `era_store.get_blobs(slot)`. +3. Else → `blobs_db` as today. + +Root → slot resolution options (era-mode only): +- (a) extend the call sites to pass `slot` alongside `block_root` (most + callers already have it: block import, blob-by-range RPC). +- (b) maintain a `(root → era)` map in the era manifest, sealed eras only. +- (c) on miss, load the block header from the cold DB to recover its slot. + +Default plan: (a) where the caller has it cheaply, (b) as fallback for the +HTTP-by-root path. The era store itself stays purely slot-indexed. + +## Pruning + +Era and existing pruning interlock by capping the prune cursor: + +``` +prune_horizon = min(retention_horizon, lowest_unsealed_era_start) +``` + +Sealing must precede the prune cursor advancing into a given era. Pruning +itself is unchanged; only the cursor calculation gains the era clamp when era +mode is enabled. + +## Status quo + +### Storage + +- `blobs_db` is a separate physical DB next to `chain_db` and `freezer_db`, + same backend (LevelDB / Redb). + `beacon_node/store/src/hot_cold_store.rs:266-290`. +- `DBColumn::BeaconBlob` rows are **keyed by `block_root` only**; the value + is the entire `BlobSidecarList` for that block, SSZ-encoded as a single + row. `beacon_node/store/src/lib.rs:257`. +- No slot index for blobs anywhere. `get_blobs(block_root) -> BlobSidecarListFromRoot` + is the only read API. `beacon_node/store/src/hot_cold_store.rs:2625`. + +### Lifecycle + +- **Init.** `BlobInfo.oldest_blob_slot = max(anchor_slot, deneb_fork_slot)` in + `init_blob_info`. `hot_cold_store.rs:2854`. +- **Forward sync.** `put_blobs(block_root, blobs)` writes directly to + `blobs_db` per block, no batching. `hot_cold_store.rs:958`. +- **Backfill.** `import_historical_block_batch` builds `StoreOp::PutBlobs` + ops, commits via `blobs_db.do_atomically(blob_batch)` at line 256, then + CAS-updates `oldest_blob_slot` to the min slot seen. + `beacon_chain/src/historical_blocks.rs:159-294`. +- **Finalization migration.** `migrate_database` does **not** touch blobs. + Hot/cold split applies only to states and block roots. + `hot_cold_store.rs:3578-3726`. +- **Pruning.** `try_prune_blobs` runs on the migrator thread post-migrate. + Walks blocks backwards from `min(data_availability_boundary - margin, + split.epoch - 1)`, deletes blob rows by block_root, advances + `oldest_blob_slot` to `end_slot + 1`. `hot_cold_store.rs:3320-3483`. + +### Implications for the era backend + +- **Blobs are root-keyed; routing by slot needs resolution.** Blob reads + today never compute a slot; era-mode introduces that need (see the Read + section above for the chosen approach). +- **`append_blobs` cannot live inside `migrate_database`** — that function + doesn't process blobs today. It hooks as a **new step on the migrator + thread**, after `migrate_database` returns and before `try_prune_blobs` + runs. +- **No backfill hook.** `import_historical_block_batch` is unused under era + mode; checkpoint sync and blob backfill are rejected at startup. +- **`prune_horizon` clamp** lives inside `try_prune_blobs`: when era mode is + on, intersect the existing horizon with `lowest_unsealed_era_start`. + Trivially additive. + +## Integration + +### `beacon_node/store` + +- `EraBlobStore` is a field on `HotColdDB`, gated by a runtime flag. **No new + `BeaconNodeBackend` variant.** +- `HotColdDB::get_blobs` adds the `era_of(slot)` check before falling through + to `blobs_db`. +- All other store paths unchanged. + +### Metadata + +- `AnchorInfo` unchanged. +- `BlobInfo` unchanged. +- New `BeaconMeta` entries: `EraBlobPointers { back, forward }`, `EraManifest`. +- Bump `SchemaVersion`. + +### CLI + +- `--store-era-blobs` (default off). Mutually exclusive with + `--checkpoint-sync-url` and any blob-backfill flag; node refuses to start + if both are set. +- `--era-import-dir ` — directory of `.era` (blocks + boundary state) + + `.erb` (blobs) files consumed at init. Required if not genesis-syncing. +- `lcli era blobs export` to produce `.erb` files from an archival node. + +## Initialization + +Two paths only; the backend refuses to start in any other configuration. + +### Genesis sync + +- `back_pointer = 0`, `forward_pointer = None`. +- Forward sync fills `blobs_db` from genesis. As eras finalize and pass + `ERA_SEAL_DELAY`, `append_blobs` drains them into the era backend; sealing + produces `.erb` files; the overlay rows are deleted. + +### Era-file import + +- User supplies `--era-import-dir` containing matched `.era` and `.erb` + files. +- At startup: + 1. `.era` consumer (existing `era-file` branch) loads blocks + boundary + state, bootstrapping the chain. + 2. Each `.erb` file is validated against the imported blocks: per + sidecar, check `kzg_commitment` against + `block.body.blob_kzg_commitments[index]` and run + `verify_blob_kzg_proof`. + 3. Validated `.erb` files are linked into `{datadir}/beacon/era/`; + manifest is updated; eras are marked sealed. +- `back_pointer = lowest slot in imported set`, + `forward_pointer = highest slot in imported set`. +- Forward sync continues from there. + +### Compatibility + +- **Checkpoint sync incompatible.** A checkpoint-synced node has a gap from + genesis to anchor that requires backfill — disabled here. Startup error. +- **P2P blob backfill incompatible.** Same reason. Startup error. +- **No in-place opt-in for existing nodes.** A populated default-backend + node must `lcli era blobs export` its data, also export `.era` blocks + + state, drop `chain_db` / `freezer_db` / `blobs_db`, and reinitialize via + era-file import. + +Non-canonical blobs never reach the era backend. `append_blobs` runs after +`prune_hot_db` (`migrate.rs:769-778`) deletes orphaned blobs in the same +migrator pass; era-file import only accepts validated canonical data. + +## Coexistence + +Era is additive. Default `blobs_db` paths are untouched. `.erb` output is +spec-compatible with Nimbus. diff --git a/specs/static-cold-backend.md b/specs/static-cold-backend.md new file mode 100644 index 00000000000..f142b73011f --- /dev/null +++ b/specs/static-cold-backend.md @@ -0,0 +1,72 @@ +# Static Cold Backend + +Goal: make the cold archive backend pluggable. + +Supported cold backends: + +- current KV cold DB +- static range files + +## Node modes + +| Startup path | Mode | +| - | - | +| Genesis sync with static archive enabled | archive | +| Checkpoint sync with complete static history imported | archive | +| Checkpoint sync without complete static history | full node | + +A full node does not become archive by P2P backfill or online reconstruction. + +## Ownership + +| Store | Owns | +| - | - | +| Hot DB | head data, fork-choice data, unfinalized data, P2P-required recent block window, metadata | +| Cold backend | finalized archive ranges, root-to-slot indices for finalized data (block_root → slot, state_root → slot) | + +## Writers + +Static cold files are written only by: + +- genesis sync, in finalized slot order +- verified complete range import + +Network backfill may write recent blocks to Hot DB, but never to static cold. +Online reconstruction never writes static cold. + +## Availability + +A static range is either complete or absent. Reads below the hot/recent window +require the matching static range. If it is absent, the node is not archive for +that range. + +The current KV cold DB remains a valid cold backend. + +## Backend API + +Slot-keyed bulk: `get`, `put_batch`, `exists`, `iter_from`, `sync`. No deletes. +Batched puts are best-effort, not atomic. + +Root-keyed indices: `get_index(col, root)`, `put_index_batch(col, items)`, where +`col` is one of `BlockSlot` or `ColdStateSummary`. The static-file backend embeds +the same KV implementation Lighthouse uses for the main DB at `/index/` to +serve these. Crash-safety rule: slot-keyed bulk data is committed before the +matching root index entry, so a crash leaves cold data without a dangling index. + +### `put_batch` durability and fsync semantics + +`put_batch(items)` is durable on return for the batch as a whole — the same +caller-visible contract as N×`put` — but it performs O(1) fsyncs per +underlying file regardless of batch size, instead of the 4 fsyncs per slot +that the per-item path issues (data file, offset file, config tmp, config +dir). Within a column, slots in `items` must be strictly ascending; items +that span multiple `file_id` boundaries are handled by grouping internally, +with one data fsync and one offset fsync per touched file plus a single +atomic config commit at the end of the batch. + +## Removed + +- `lighthouse db prune-states` and `HotColdDB::prune_historic_states`. They + produce a "cold blocks present, cold states absent" mode that is not in the + startup-path table above, and the spec does not support runtime mode + transitions in either direction.