diff --git a/Cargo.lock b/Cargo.lock index 078f699f3c8..eba135b0152 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8485,6 +8485,7 @@ dependencies = [ "safe_arith", "serde", "smallvec", + "snap", "ssz_types", "state_processing", "strum", diff --git a/TODO-static-block-storage.md b/TODO-static-block-storage.md new file mode 100644 index 00000000000..385cb13b59c --- /dev/null +++ b/TODO-static-block-storage.md @@ -0,0 +1,42 @@ +# Static Block Storage TODO + +Current spec: [`specs/static-blocks.md`](./specs/static-blocks.md) + +Implemented: +- static block file format spec +- `StaticBlockStore::open/get/put` +- snappy-framed block records +- fixed-size `.off` sidecar files +- global `static_blocks.conf` commit marker +- startup healing for interrupted writes + +Remaining: + +1. Wire startup/config. + - add CLI/config path for enabling static block storage + - initialize `HotColdDB::static_blocks` + - reject checkpoint sync, late activation, and historical backfill init modes + +2. Bump schema. + - `DBColumn::BeaconBlockSlot` was added + - update schema version in `beacon_node/store/src/metadata.rs` + +3. Verify static fallback reads. + - after `static_blocks.get(slot)`, decode and verify the block root matches the requested root + - treat mismatches as corruption + +4. Update invariants. + - archived finalized blocks no longer require hot-db block bodies + - root/slot indices must remain consistent with static storage + +5. Add tests. + - archive/read happy path + - skip-slot dedup + - out-of-order put rejection + - crash windows around data, `.off`, and `.conf` + - wrong `BeaconBlockSlot` + - unsupported startup modes + +6. Decide decompression bound wiring. + - current implementation uses a local 10 MiB bound + - consider passing consensus `max_payload_size` or another store config value diff --git a/beacon_node/store/Cargo.toml b/beacon_node/store/Cargo.toml index 50028fe73ff..3e103c18636 100644 --- a/beacon_node/store/Cargo.toml +++ b/beacon_node/store/Cargo.toml @@ -27,6 +27,7 @@ redb = { version = "2.1.3", optional = true } safe_arith = { workspace = true } serde = { workspace = true } smallvec = { workspace = true } +snap = { workspace = true } ssz_types = { workspace = true } state_processing = { workspace = true } strum = { workspace = true } diff --git a/beacon_node/store/src/errors.rs b/beacon_node/store/src/errors.rs index a07cc838863..d198f446d3c 100644 --- a/beacon_node/store/src/errors.rs +++ b/beacon_node/store/src/errors.rs @@ -1,5 +1,7 @@ use crate::config::StoreConfigError; use crate::hot_cold_store::{HotColdDBError, StateSummaryIteratorError}; +use crate::static_blobs::StaticBlobStoreError; +use crate::static_blocks::StaticBlockStoreError; use crate::{DBColumn, hdiff}; #[cfg(feature = "leveldb")] use leveldb::error::Error as LevelDBError; @@ -14,6 +16,8 @@ pub enum Error { SszDecodeError(DecodeError), BeaconStateError(BeaconStateError), HotColdDBError(HotColdDBError), + StaticBlockStoreError(StaticBlockStoreError), + StaticBlobStoreError(StaticBlobStoreError), DBError { message: String, }, @@ -129,6 +133,18 @@ impl From for Error { } } +impl From for Error { + fn from(e: StaticBlockStoreError) -> Error { + Error::StaticBlockStoreError(e) + } +} + +impl From for Error { + fn from(e: StaticBlobStoreError) -> Error { + Error::StaticBlobStoreError(e) + } +} + impl From for Error { fn from(e: BeaconStateError) -> Error { Error::BeaconStateError(e) diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 291fdadcf51..a32f2dd9c46 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -12,6 +12,7 @@ use crate::metadata::{ SCHEMA_VERSION_KEY, SPLIT_KEY, STATE_UPPER_LIMIT_NO_RETAIN, SchemaVersion, }; use crate::state_cache::{PutStateOutcome, StateCache}; +use crate::static_blobs::StaticBlobStore; use crate::static_blocks::StaticBlockStore; use crate::{ BlobSidecarListFromRoot, DBColumn, DatabaseBlock, Error, ItemStore, KeyValueStoreOp, StoreItem, @@ -76,6 +77,8 @@ pub struct HotColdDB, Cold: ItemStore> { /// reads fall through to it after missing in `hot_db`. When `None` (legacy mode), all /// finalized blinded blocks remain in `hot_db` as today. pub static_blocks: Option>, + /// Optional slot-keyed archive for finalized blob sidecars. + pub static_blobs: Option>, /// LRU cache of deserialized blocks and blobs. Updated whenever a block or blob is loaded. block_cache: Option>>, /// Cache of beacon states. @@ -242,6 +245,7 @@ impl HotColdDB, MemoryStore> { blobs_db: MemoryStore::open(), hot_db: MemoryStore::open(), static_blocks: None, + static_blobs: None, block_cache: NonZeroUsize::new(config.block_cache_size) .map(BlockCache::new) .map(Mutex::new), @@ -297,6 +301,7 @@ impl HotColdDB, BeaconNodeBackend> { cold_db: BeaconNodeBackend::open(&config, cold_path)?, hot_db, static_blocks: None, + static_blobs: None, block_cache: NonZeroUsize::new(config.block_cache_size) .map(BlockCache::new) .map(Mutex::new), @@ -2700,10 +2705,37 @@ impl, Cold: ItemStore> HotColdDB Ok(BlobSidecarListFromRoot::NoBlobs) } } - None => Ok(BlobSidecarListFromRoot::NoRoot), + None => self.get_static_blobs(block_root), } } + /// Fetch blobs from the slot-keyed static archive after a blob-db miss. + fn get_static_blobs(&self, block_root: &Hash256) -> Result, Error> { + let Some(static_blobs) = &self.static_blobs else { + return Ok(BlobSidecarListFromRoot::NoRoot); + }; + let Some(slot) = self.get_finalized_blinded_block_slot(block_root)? else { + return Ok(BlobSidecarListFromRoot::NoRoot); + }; + let Some(blobs_bytes) = static_blobs.get(slot)? else { + return Ok(BlobSidecarListFromRoot::NoBlobs); + }; + + let blobs: Vec>> = Vec::<_>::from_ssz_bytes(&blobs_bytes)?; + let Some(max_blobs_per_block) = blobs + .first() + .map(|blob| self.spec.max_blobs_per_block(blob.epoch())) + else { + return Ok(BlobSidecarListFromRoot::NoBlobs); + }; + + let blobs = BlobSidecarList::new(blobs, max_blobs_per_block as usize)?; + self.block_cache + .as_ref() + .inspect(|cache| cache.lock().put_blobs(*block_root, blobs.clone())); + Ok(BlobSidecarListFromRoot::Blobs(blobs)) + } + /// Fetch all keys in the data_column column with prefix `block_root` pub fn get_data_column_keys(&self, block_root: Hash256) -> Result, Error> { self.blobs_db diff --git a/beacon_node/store/src/lib.rs b/beacon_node/store/src/lib.rs index f2b4a54ded3..8d7c2a3c165 100644 --- a/beacon_node/store/src/lib.rs +++ b/beacon_node/store/src/lib.rs @@ -21,6 +21,7 @@ pub mod metadata; pub mod metrics; pub mod reconstruct; pub mod state_cache; +pub mod static_blobs; pub mod static_blocks; pub mod database; @@ -30,6 +31,7 @@ pub use self::blob_sidecar_list_from_root::BlobSidecarListFromRoot; pub use self::config::StoreConfig; pub use self::hot_cold_store::{HotColdDB, HotStateSummary, Split}; pub use self::memory_store::MemoryStore; +pub use self::static_blobs::StaticBlobStore; pub use self::static_blocks::StaticBlockStore; pub use crate::metadata::BlobInfo; pub use errors::Error; diff --git a/beacon_node/store/src/static_blobs.rs b/beacon_node/store/src/static_blobs.rs new file mode 100644 index 00000000000..c221a9c299e --- /dev/null +++ b/beacon_node/store/src/static_blobs.rs @@ -0,0 +1,59 @@ +//! Slot-keyed archive API for finalized blob sidecars. +//! +//! This is the minimal surface needed to test HotColdDB integration. The file +//! backend is intentionally not implemented yet. + +use std::{ + fmt, io, + path::{Path, PathBuf}, +}; +use types::Slot; + +#[derive(Debug)] +pub struct StaticBlobStore { + root_dir: PathBuf, +} + +#[derive(Debug)] +pub enum StaticBlobStoreError { + Io(io::Error), + Unsupported(&'static str), +} + +impl fmt::Display for StaticBlobStoreError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Io(e) => write!(f, "static blob store io error: {e}"), + Self::Unsupported(message) => { + write!(f, "static blob store unsupported operation: {message}") + } + } + } +} + +impl From for StaticBlobStoreError { + fn from(e: io::Error) -> Self { + Self::Io(e) + } +} + +impl StaticBlobStore { + /// Open the archive rooted at `path`. + pub fn open(path: &Path) -> Result { + Ok(Self { + root_dir: path.to_path_buf(), + }) + } + + /// Read SSZ-encoded blob sidecars for `slot`, if present. + pub fn get(&self, _slot: Slot) -> Result>, StaticBlobStoreError> { + let _ = &self.root_dir; + Err(StaticBlobStoreError::Unsupported("get")) + } + + /// Store SSZ-encoded blob sidecars at `slot`. + pub fn put(&self, _slot: Slot, _bytes: &[u8]) -> Result<(), StaticBlobStoreError> { + let _ = &self.root_dir; + Err(StaticBlobStoreError::Unsupported("put")) + } +} diff --git a/beacon_node/store/src/static_blocks.rs b/beacon_node/store/src/static_blocks.rs index a94e83a7511..c0f5cfda458 100644 --- a/beacon_node/store/src/static_blocks.rs +++ b/beacon_node/store/src/static_blocks.rs @@ -1,37 +1,368 @@ //! Slot-keyed durable archive for finalized blinded blocks. //! //! `StaticBlockStore` is a black box from `HotColdDB`'s perspective: hand it block bytes, -//! ask it for them back by slot, ask it how far it has durably stored. Era boundaries, -//! file format, manifest layout, sealing, and rename semantics are entirely internal. +//! ask it for them back by slot. File mapping, recovery, and rename semantics are internal. //! //! Contract: //! - `put(slot, bytes)` is durable on return. The caller is allowed to rely on this for //! source-of-truth flips (e.g. writing a reverse-index entry, deleting from hot KV). +//! +//! See `specs/static-blocks.md` for the on-disk format. -use crate::Error; -use std::path::{Path, PathBuf}; +use snap::{read::FrameDecoder, write::FrameEncoder}; +use std::{ + fmt, + fs::{self, File, OpenOptions}, + io::{self, Read, Seek, SeekFrom, Write}, + path::{Path, PathBuf}, + sync::Mutex, +}; use types::Slot; +const SLOTS_PER_FILE: u64 = 8192; +const OFFSET_SIZE: u64 = 8; +const OFFSET_FILE_LEN: u64 = SLOTS_PER_FILE * OFFSET_SIZE; +const CONFIG_FILE: &str = "static_blocks.conf"; +const CONFIG_TMP_FILE: &str = "static_blocks.conf.tmp"; +const CONFIG_MAGIC: &[u8; 8] = b"LHSTBLK1"; +const CONFIG_LEN: usize = 24; +// Empty-store sentinel for `highest_written_slot` in `static_blocks.conf`. +const EMPTY_SLOT: u64 = u64::MAX; +// e2store version record. +const VERSION_RECORD: [u8; 8] = [0x65, 0x32, 0, 0, 0, 0, 0, 0]; +// CompressedSignedBeaconBlock e2store record type. +const BLOCK_RECORD_TYPE: [u8; 2] = [0x01, 0x00]; +const MAX_DECOMPRESSED_BLOCK_BYTES: u64 = 10 * 1024 * 1024; + #[derive(Debug)] pub struct StaticBlockStore { - #[allow(dead_code)] root_dir: PathBuf, + highest_written_slot: Mutex>, +} + +struct Config { + highest_written_slot: Option, + current_data_len: u64, +} + +type StoreResult = std::result::Result; + +#[derive(Debug)] +pub enum StaticBlockStoreError { + Io(io::Error), + Compression(io::Error), + Invalid(String), +} + +impl fmt::Display for StaticBlockStoreError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Io(e) => write!(f, "static block store io error: {e}"), + Self::Compression(e) => write!(f, "static block store compression error: {e}"), + Self::Invalid(message) => write!(f, "static block store invalid data: {message}"), + } + } +} + +impl From for StaticBlockStoreError { + fn from(e: io::Error) -> Self { + Self::Io(e) + } } impl StaticBlockStore { /// Open the archive rooted at `path`. - pub fn open(_path: &Path) -> Result { - todo!() + pub fn open(path: &Path) -> StoreResult { + fs::create_dir_all(path)?; + + let store = Self { + root_dir: path.to_path_buf(), + highest_written_slot: Mutex::new(None), + }; + + if !store.config_path().exists() { + store.write_config(None, 0)?; + } + + let config = store.read_config()?; + if let Some(slot) = config.highest_written_slot { + store.heal_current_file(slot, config.current_data_len)?; + } + *store.lock_highest()? = config.highest_written_slot; + + Ok(store) } /// Read the block at `slot`, if present. - pub fn get(&self, _slot: Slot) -> Result>, Error> { - todo!() + pub fn get(&self, slot: Slot) -> StoreResult>> { + let Some(highest_written_slot) = *self.lock_highest()? else { + return Ok(None); + }; + if slot > highest_written_slot { + return Ok(None); + } + + let file_id = file_id(slot); + let offset = self.read_offset(file_id, slot)?; + if offset == 0 { + return Ok(None); + } + + let data_path = self.data_path(file_id); + let mut data_file = File::open(&data_path)?; + data_file.seek(SeekFrom::Start(offset))?; + + let mut header = [0; 8]; + data_file.read_exact(&mut header)?; + if header[0..2] != BLOCK_RECORD_TYPE || header[6..8] != [0, 0] { + return Err(StaticBlockStoreError::Invalid( + "invalid static block record header".into(), + )); + } + + let len = u32::from_le_bytes([header[2], header[3], header[4], header[5]]) as usize; + let mut compressed = vec![0; len]; + data_file.read_exact(&mut compressed)?; + + decompress_block(&compressed) } /// Durably store `bytes` at `slot`. Must not return `Ok` until the bytes are recoverable /// after a crash. - pub fn put(&self, _slot: Slot, _bytes: &[u8]) -> Result<(), Error> { - todo!() + pub fn put(&self, slot: Slot, bytes: &[u8]) -> StoreResult<()> { + let mut highest_written_slot = self.lock_highest()?; + if highest_written_slot.is_some_and(|highest| slot <= highest) { + return Err(StaticBlockStoreError::Invalid( + "static block put out of order".into(), + )); + } + + let compressed = compress_block(bytes)?; + let compressed_len = u32::try_from(compressed.len()).map_err(|_| { + StaticBlockStoreError::Invalid("compressed static block too large".into()) + })?; + + let target_file_id = file_id(slot); + // Discard an uncommitted next-file tail after a crash. + let reset_file = (*highest_written_slot).map(file_id) != Some(target_file_id); + let off_pos = offset_position(slot); + let data_path = self.data_path(target_file_id); + let off_path = self.offset_path(target_file_id); + + let mut data_file = OpenOptions::new() + .read(true) + .append(true) + .create(true) + .open(&data_path)?; + if reset_file { + data_file.set_len(0)?; + } + + if data_file.metadata()?.len() == 0 { + data_file.write_all(&VERSION_RECORD)?; + } + + let offset = data_file.seek(SeekFrom::End(0))?; + write_block_record(&mut data_file, compressed_len, &compressed)?; + let data_len = data_file.seek(SeekFrom::End(0))?; + // Data and offset files must hit disk before the config commit marker. + data_file.sync_all()?; + + let mut off_file = OpenOptions::new() + .read(true) + .write(true) + .create(true) + .truncate(false) + .open(&off_path)?; + if reset_file { + off_file.set_len(0)?; + } + if off_file.metadata()?.len() < OFFSET_FILE_LEN { + off_file.set_len(OFFSET_FILE_LEN)?; + } + off_file.seek(SeekFrom::Start(off_pos))?; + off_file.write_all(&offset.to_le_bytes())?; + off_file.sync_all()?; + + // Atomic config update is the commit point. + self.write_config(Some(slot), data_len)?; + *highest_written_slot = Some(slot); + + Ok(()) } + + /// Truncate uncommitted data and clear uncommitted offsets after restart. + fn heal_current_file(&self, slot: Slot, current_data_len: u64) -> StoreResult<()> { + let file_id = file_id(slot); + let data_path = self.data_path(file_id); + let data_file = OpenOptions::new().read(true).write(true).open(&data_path)?; + let data_len = data_file.metadata()?.len(); + if data_len < current_data_len { + return Err(StaticBlockStoreError::Invalid( + "static block data file shorter than committed length".into(), + )); + } + if data_len != current_data_len { + data_file.set_len(current_data_len)?; + data_file.sync_all()?; + } + + let off_path = self.offset_path(file_id); + let mut off_file = OpenOptions::new().read(true).write(true).open(&off_path)?; + let required_len = offset_position(slot) + OFFSET_SIZE; + let off_len = off_file.metadata()?.len(); + if off_len < required_len { + return Err(StaticBlockStoreError::Invalid( + "static block offset file shorter than committed slot".into(), + )); + } + if off_len < OFFSET_FILE_LEN { + off_file.set_len(OFFSET_FILE_LEN)?; + } + + let clear_start = required_len; + if clear_start < OFFSET_FILE_LEN { + // Remove offsets to entries beyond the committed slot. + off_file.seek(SeekFrom::Start(clear_start))?; + let zeroes = vec![0; (OFFSET_FILE_LEN - clear_start) as usize]; + off_file.write_all(&zeroes)?; + off_file.sync_all()?; + } + + Ok(()) + } + + /// Read the global commit marker. + fn read_config(&self) -> StoreResult { + let path = self.config_path(); + let bytes = fs::read(&path)?; + if bytes.len() != CONFIG_LEN || &bytes[0..8] != CONFIG_MAGIC { + return Err(StaticBlockStoreError::Invalid( + "invalid static block config".into(), + )); + } + + let highest = u64::from_le_bytes(bytes[8..16].try_into().expect("slice length checked")); + let current_data_len = + u64::from_le_bytes(bytes[16..24].try_into().expect("slice length checked")); + + Ok(Config { + highest_written_slot: (highest != EMPTY_SLOT).then(|| Slot::new(highest)), + current_data_len, + }) + } + + /// Atomically write the global commit marker. + fn write_config( + &self, + highest_written_slot: Option, + current_data_len: u64, + ) -> StoreResult<()> { + let path = self.config_path(); + let tmp_path = self.root_dir.join(CONFIG_TMP_FILE); + let mut bytes = [0; CONFIG_LEN]; + bytes[0..8].copy_from_slice(CONFIG_MAGIC); + bytes[8..16].copy_from_slice( + &highest_written_slot + .map_or(EMPTY_SLOT, |slot| slot.as_u64()) + .to_le_bytes(), + ); + bytes[16..24].copy_from_slice(¤t_data_len.to_le_bytes()); + + { + let mut tmp = File::create(&tmp_path)?; + tmp.write_all(&bytes)?; + tmp.sync_all()?; + } + + fs::rename(&tmp_path, &path)?; + sync_dir(&self.root_dir) + } + + /// Read the slot's absolute data-file offset. + fn read_offset(&self, file_id: u64, slot: Slot) -> StoreResult { + let off_path = self.offset_path(file_id); + let mut off_file = File::open(&off_path)?; + let mut bytes = [0; 8]; + off_file.seek(SeekFrom::Start(offset_position(slot)))?; + off_file.read_exact(&mut bytes)?; + Ok(u64::from_le_bytes(bytes)) + } + + /// Lock writer state. + fn lock_highest(&self) -> StoreResult>> { + self.highest_written_slot + .lock() + .map_err(|_| StaticBlockStoreError::Invalid("static block mutex poisoned".into())) + } + + /// Path to the global config file. + fn config_path(&self) -> PathBuf { + self.root_dir.join(CONFIG_FILE) + } + + /// Path to a data file. + fn data_path(&self, file_id: u64) -> PathBuf { + self.root_dir.join(format!("static_blocks_{file_id:05}")) + } + + /// Path to a sidecar offset file. + fn offset_path(&self, file_id: u64) -> PathBuf { + self.root_dir + .join(format!("static_blocks_{file_id:05}.off")) + } +} + +/// File id containing `slot`. +fn file_id(slot: Slot) -> u64 { + slot.as_u64() / SLOTS_PER_FILE +} + +/// Byte position of `slot` in its `.off` file. +fn offset_position(slot: Slot) -> u64 { + (slot.as_u64() % SLOTS_PER_FILE) * OFFSET_SIZE +} + +/// Snappy-frame SSZ block bytes. +fn compress_block(bytes: &[u8]) -> StoreResult> { + let mut encoder = FrameEncoder::new(Vec::new()); + encoder + .write_all(bytes) + .map_err(StaticBlockStoreError::Compression)?; + encoder + .flush() + .map_err(StaticBlockStoreError::Compression)?; + Ok(encoder.get_ref().clone()) +} + +/// Append one compressed block record. +fn write_block_record(file: &mut File, compressed_len: u32, compressed: &[u8]) -> StoreResult<()> { + file.write_all(&BLOCK_RECORD_TYPE)?; + file.write_all(&compressed_len.to_le_bytes())?; + file.write_all(&0u16.to_le_bytes())?; + file.write_all(compressed)?; + Ok(()) +} + +/// Decode one compressed block record payload. +fn decompress_block(bytes: &[u8]) -> StoreResult>> { + let decoder = FrameDecoder::new(bytes); + let mut limited = decoder.take(MAX_DECOMPRESSED_BLOCK_BYTES + 1); + let mut decompressed = Vec::new(); + limited + .read_to_end(&mut decompressed) + .map_err(StaticBlockStoreError::Compression)?; + if decompressed.len() as u64 > MAX_DECOMPRESSED_BLOCK_BYTES { + return Err(StaticBlockStoreError::Invalid( + "static block exceeds decompressed size limit".into(), + )); + } + Ok(Some(decompressed)) +} + +/// Fsync directory entries after rename/create. +fn sync_dir(path: &Path) -> StoreResult<()> { + let dir = File::open(path)?; + dir.sync_all()?; + Ok(()) } diff --git a/specs/static-blocks.md b/specs/static-blocks.md index cb9737099bb..db57867e6bf 100644 --- a/specs/static-blocks.md +++ b/specs/static-blocks.md @@ -11,8 +11,8 @@ at startup. ## API A field on `HotColdDB`. Not a `KeyValueStore`. No `Hash256` in the API; the -archive is purely slot-keyed. Eras, manifests, file rotation, fsync ordering, -atomic rename — all internal. +archive is purely slot-keyed. File rotation, fsync ordering, and crash recovery +are internal. ```rust fn open(path: &Path) -> Result; @@ -23,6 +23,174 @@ fn put(slot: Slot, bytes: &[u8]) -> Result<()>; // durable on return `put` durability on return is the only caller-visible contract; the source- of-truth flip in `migrate_database` relies on it. +## Static file format + +Files live together in one directory: + +``` +static_blocks_00000 +static_blocks_00000.off +static_blocks_00001 +static_blocks_00001.off +static_blocks.conf +``` + +Mapping: + +``` +SLOTS_PER_FILE = 8192 +file_id = slot / SLOTS_PER_FILE +index = slot % SLOTS_PER_FILE +off_pos = index * 8 +``` + +The data file name uses `file_id` as a zero-padded decimal number. The slot +range is derived from the id and is not encoded in the name. + +Each data file starts with the e2store version record: + +``` +65 32 00 00 00 00 00 00 +``` + +Block records are appended after it: + +``` +type: [0x01, 0x00] +length: compressed_data.len() as u32, little-endian +reserved: u16 = 0 +data: snappy-framed(SSZ-encoded blinded SignedBeaconBlock bytes) +``` + +The `.off` file is fixed-size: `8192 * 8` bytes. Each entry is a little-endian +`u64` absolute byte offset into the matching data file. Offset `0` means no +block is present for that slot. Real block offsets are nonzero because the data +file starts with the version record. + +`static_blocks.conf` is global to the static block store and is fixed-size: + +``` +magic: [u8; 8] = b"LHSTBLK1" +highest_written_slot: u64 little-endian, u64::MAX means empty +current_data_len: u64 little-endian +``` + +`current_data_len` applies to the current file, derived from +`highest_written_slot / SLOTS_PER_FILE`. + +Config updates are atomic: + +1. Write the full config to `static_blocks.conf.tmp`. +2. Fsync `static_blocks.conf.tmp`. +3. Rename it over `static_blocks.conf`. +4. Fsync the directory. + +## `put` contract + +`put(slot, bytes)` requires: + +``` +highest_written_slot == None || slot > highest_written_slot +snappy_framed(bytes).len() <= u32::MAX +``` + +Skipped slots are allowed. They leave zero offsets in `.off`. + +Write sequence: + +1. Lock the writer. +2. Reject `slot <= highest_written_slot`. +3. Compute `file_id`, `index`, and `off_pos`. +4. Create or open `static_blocks_{file_id:05}`. +5. If the data file is new, write the e2store version record. +6. Create or open `static_blocks_{file_id:05}.off`. +7. If the `.off` file is new, initialize it to `8192 * 8` zero bytes. +8. Compress `bytes` with snappy-framed compression. +9. Append the compressed block record to the data file, remembering the offset + of its 8-byte record header. +10. Fsync the data file. +11. Write the offset as `u64` little-endian at `off_pos` in the `.off` file. +12. Fsync the `.off` file. +13. Atomically update `static_blocks.conf` with: + ``` + highest_written_slot = slot + current_data_len = data_file_len + ``` +14. Fsync the directory after the rename. + +A write is committed only when `static_blocks.conf` reflects it. + +On open, the store reads `static_blocks.conf`, truncates the current data file +to `current_data_len`, and clears offsets after `highest_written_slot` in the +current `.off` file. + +Crash behavior: + +| Crash point | Restart behavior | +| - | - | +| Before `static_blocks.conf` update | Previous slot remains committed; appended data is truncated and offset tail is cleared. | +| During `static_blocks.conf.tmp` write | Previous `static_blocks.conf` remains the commit marker. | +| After `static_blocks.conf` rename | New slot is committed. | + +## `get` contract + +`get(slot)`: + +1. Compute `file_id`, `index`, and `off_pos`. +2. Open `static_blocks_{file_id:05}.off`. +3. Read the `u64` little-endian offset at `off_pos`. +4. If the offset is `0`, return `None`. +5. Open `static_blocks_{file_id:05}`. +6. Seek to the offset. +7. Read and validate the 8-byte block record header: + ``` + type == [0x01, 0x00] + reserved == 0 + ``` +8. Read `length` compressed bytes. +9. Snappy-decompress the bytes with the consensus maximum + `SignedBeaconBlock` SSZ size for the active fork as the output bound. +10. Return the decompressed SSZ bytes. + +If decompression exceeds the bound, return a corruption error. + +Missing files are treated as `None` only when the slot is beyond +`highest_written_slot`. Missing files for committed slots are corruption. + +## `open` contract + +In-memory state is minimal: + +``` +dir +highest_written_slot +mutex +``` + +Files are opened inside `put` and `get`; the store does not cache current file +handles in v1. + +`static_blocks.conf` uses `u64::MAX` as the empty-store sentinel for +`highest_written_slot`. + +`open(path)`: + +1. Create `path` if it does not exist. +2. If `static_blocks.conf` does not exist, create it with: + ``` + magic = b"LHSTBLK1" + highest_written_slot = u64::MAX + current_data_len = 0 + ``` +3. Read and validate `static_blocks.conf`. +4. If `highest_written_slot == u64::MAX`, initialize in-memory + `highest_written_slot = None` and return. +5. Derive the current file from `highest_written_slot / SLOTS_PER_FILE`. +6. Truncate the current data file to `current_data_len`. +7. Clear `.off` entries after `highest_written_slot` in the current `.off` + file by writing zeroes. +8. Initialize in-memory `highest_written_slot = Some(slot)`. + ## Interaction with existing DBs | Concern | Today | With static blocks |