diff --git a/beacon_node/store/src/config.rs b/beacon_node/store/src/config.rs index 1488f11ce53..fa2cb0c36d1 100644 --- a/beacon_node/store/src/config.rs +++ b/beacon_node/store/src/config.rs @@ -66,6 +66,7 @@ pub struct StoreConfig { /// The margin for blob pruning in epochs. The oldest blobs are pruned up until /// data_availability_boundary - blob_prune_margin_epochs. Default: 0. pub blob_prune_margin_epochs: u64, + pub allow_backfill: bool, } /// Variant of `StoreConfig` that gets written to disk. Contains immutable configuration params. @@ -124,6 +125,7 @@ impl Default for StoreConfig { prune_blobs: true, epochs_per_blob_prune: DEFAULT_EPOCHS_PER_BLOB_PRUNE, blob_prune_margin_epochs: DEFAULT_BLOB_PUNE_MARGIN_EPOCHS, + allow_backfill: false, } } } diff --git a/beacon_node/store/src/static_cold.rs b/beacon_node/store/src/static_cold.rs index 1da8f2219b8..c1e51b16075 100644 --- a/beacon_node/store/src/static_cold.rs +++ b/beacon_node/store/src/static_cold.rs @@ -68,16 +68,23 @@ const OFFSET_FILE_LEN: u64 = SLOTS_PER_FILE * OFFSET_SIZE; const CONFIG_FILE: &str = "column.conf"; const CONFIG_TMP_FILE: &str = "column.conf.tmp"; const DATA_FILE_PREFIX: &str = "data_"; -const CONFIG_MAGIC: &[u8; 8] = b"LHSTBLK2"; -const CONFIG_LEN: usize = 36; +const CONFIG_MAGIC_V2: &[u8; 8] = b"LHSTBLK2"; +const CONFIG_MAGIC: &[u8; 8] = b"LHSTBLK3"; +const CONFIG_LEN_V2: usize = 36; +const CONFIG_LEN: usize = 52; /// Empty-store sentinel for `highest_written_slot` in the per-column config. const EMPTY_SLOT: u64 = u64::MAX; +/// Sentinel: no backfill file tracked. +const NO_BACKFILL_FILE: u64 = u64::MAX; /// e2store version record, written once at the start of each data file. const VERSION_RECORD: [u8; 8] = [0x65, 0x32, 0, 0, 0, 0, 0, 0]; const COMPRESSION_NONE: u8 = 0; const COMPRESSION_SNAPPY: u8 = 1; +/// Bit 0 of the flags byte. +const FLAG_ALLOW_BACKFILL: u8 = 0b0000_0001; + /// Per-column configuration. On first creation of a column the values come /// from `column_config`; thereafter they are persisted in the column file-set /// `static_blocks.conf` and the on-disk values win over current-build defaults. @@ -91,6 +98,7 @@ struct ColumnConfig { compression: bool, /// Upper bound on a single decoded record's size in bytes. max_value_bytes: u64, + allow_backfill: bool, } /// Per-column file format defaults. @@ -101,24 +109,28 @@ fn column_config(column: DBColumnCold) -> ColumnConfig { record_type: [0x01, 0x00], compression: true, max_value_bytes: 10 * 1024 * 1024, + allow_backfill: false, }, DBColumnCold::BlockRoots => ColumnConfig { subdir: "bbr", record_type: [0x02, 0x00], compression: false, max_value_bytes: 64, + allow_backfill: false, }, DBColumnCold::StateRoots => ColumnConfig { subdir: "bsr", record_type: [0x03, 0x00], compression: false, max_value_bytes: 64, + allow_backfill: false, }, DBColumnCold::StateSnapshot => ColumnConfig { subdir: "bss", record_type: [0x04, 0x00], compression: false, max_value_bytes: 1024 * 1024 * 1024, + allow_backfill: false, }, DBColumnCold::StateDiff => ColumnConfig { // HDiff is already compressed internally (zstd'd validator and @@ -128,6 +140,7 @@ fn column_config(column: DBColumnCold) -> ColumnConfig { record_type: [0x05, 0x00], compression: false, max_value_bytes: 1024 * 1024 * 1024, + allow_backfill: false, }, } } @@ -180,7 +193,8 @@ impl StaticColdStore { fs::create_dir_all(path).map_err(StaticColdStoreError::Io)?; let mut columns = HashMap::new(); for column in DBColumnCold::iter() { - let cfg = column_config(column); + let mut cfg = column_config(column); + cfg.allow_backfill = config.allow_backfill; columns.insert(column, Column::open(path.join(cfg.subdir), cfg)?); } let index_db = BeaconNodeBackend::open(config, &path.join("index"))?; @@ -212,13 +226,24 @@ impl StaticColdStore { } } +#[derive(Debug, Default)] +struct ColumnWriteState { + highest_written_slot: Option, + /// Committed length of the data file that contains `highest_written_slot`. + current_data_len: u64, + /// File-id of the most recently backfilled non-current file (0 = none). + backfill_file_id: u64, + /// Committed length of `backfill_file_id` (0 when no backfill file). + backfill_data_len: u64, +} + /// Single-column slot-keyed file set. Owns one subdirectory of data + `.off` + /// config files. #[derive(Debug)] struct Column { root_dir: PathBuf, config: ColumnConfig, - highest_written_slot: Mutex>, + state: Mutex, } struct ColumnConfigOnDisk { @@ -227,6 +252,9 @@ struct ColumnConfigOnDisk { record_type: [u8; 2], compression: bool, max_value_bytes: u64, + allow_backfill: bool, + backfill_file_id: u64, + backfill_data_len: u64, } impl Column { @@ -239,7 +267,16 @@ impl Column { let config_path = root_dir.join(CONFIG_FILE); let tmp_path = root_dir.join(CONFIG_TMP_FILE); if !config_path.exists() { - atomic_write_config(&config_path, &tmp_path, &root_dir, None, 0, &defaults)?; + atomic_write_config( + &config_path, + &tmp_path, + &root_dir, + None, + 0, + &defaults, + NO_BACKFILL_FILE, + 0, + )?; } let on_disk = read_config(&config_path)?; @@ -250,13 +287,21 @@ impl Column { // can write bigger records than an older one persisted, then // re-persist immediately so future opens see the new bound. let max_value_bytes = on_disk.max_value_bytes.max(defaults.max_value_bytes); + // allow_backfill is sticky, once set: a store opened with backfill + // enabled keeps it even if later opened without the flag. + let allow_backfill = on_disk.allow_backfill || defaults.allow_backfill; let config = ColumnConfig { subdir: defaults.subdir, record_type: on_disk.record_type, compression: on_disk.compression, max_value_bytes, + allow_backfill, }; - if max_value_bytes != on_disk.max_value_bytes { + + // Re-persist if anything changed (max_value_bytes ratchet or + // allow_backfill upgrade from V2 -> V3). The format written is + // determined solely by config.allow_backfill inside atomic_write_config + if max_value_bytes != on_disk.max_value_bytes || allow_backfill != on_disk.allow_backfill { atomic_write_config( &config_path, &tmp_path, @@ -264,28 +309,42 @@ impl Column { on_disk.highest_written_slot, on_disk.current_data_len, &config, + on_disk.backfill_file_id, + on_disk.backfill_data_len, )?; } let handle = Self { root_dir, config, - highest_written_slot: Mutex::new(None), + state: Mutex::new(ColumnWriteState::default()), }; if let Some(slot) = on_disk.highest_written_slot { - handle.heal_current_file(slot, on_disk.current_data_len)?; + handle.heal_on_open( + slot, + on_disk.current_data_len, + on_disk.allow_backfill, + on_disk.backfill_file_id, + on_disk.backfill_data_len, + )?; } - *handle.highest_written_slot.lock() = on_disk.highest_written_slot; + *handle.state.lock() = ColumnWriteState { + highest_written_slot: on_disk.highest_written_slot, + current_data_len: on_disk.current_data_len, + backfill_file_id: on_disk.backfill_file_id, + backfill_data_len: on_disk.backfill_data_len, + }; Ok(handle) } fn get(&self, slot: Slot) -> StoreResult>> { - let Some(highest_written_slot) = *self.highest_written_slot.lock() else { + let highest = self.state.lock().highest_written_slot; + let Some(highest) = highest else { return Ok(None); }; - if slot > highest_written_slot { + if slot > highest { return Ok(None); } self.read_record(slot) @@ -330,40 +389,47 @@ impl Column { } fn contains(&self, slot: Slot) -> StoreResult { - let Some(highest_written_slot) = *self.highest_written_slot.lock() else { + let highest = self.state.lock().highest_written_slot; + let Some(highest) = highest else { return Ok(false); }; - if slot > highest_written_slot { + if slot > highest { return Ok(false); } Ok(self.read_offset(file_id(slot), slot)? != 0) } fn put(&self, slot: Slot, bytes: &[u8]) -> StoreResult<()> { - let mut highest_written_slot = self.highest_written_slot.lock(); - if let Some(highest) = *highest_written_slot - && slot <= highest - { - // Idempotent re-put: any committed slot can be re-put with the - // identical value. Required so a `migrate_database` retry after a - // mid-loop crash can re-walk slots that were already committed in - // the previous attempt without tripping the strict-ascending - // invariant. A previously-skipped slot (offset zero) cannot be - // filled in — that would break the append-only data file. - let existing = self.read_record(slot)?.ok_or_else(|| { - StaticColdStoreError::Invalid(format!( - "static cold re-put at slot {slot} <= highest {highest} \ + let mut state = self.state.lock(); + if let Some(highest) = state.highest_written_slot { + if slot <= highest { + if self.config.allow_backfill { + return self.put_backfill(slot, bytes, &mut state); + } + + // Non-backfill: only identical re-put allowed + // Idempotent re-put: any committed slot can be re-put with the + // identical value. Required so a `migrate_database` retry after a + // mid-loop crash can re-walk slots that were already committed in + // the previous attempt without tripping the strict-ascending + // invariant. A previously-skipped slot (offset zero) cannot be + // filled in — that would break the append-only data file. + let existing = self.read_record(slot)?.ok_or_else(|| { + StaticColdStoreError::Invalid(format!( + "static cold re-put at slot {slot} <= highest {highest} \ but no record exists; cannot fill a previously-skipped slot" - )) - })?; - if existing == bytes { - return Ok(()); + )) + })?; + if existing == bytes { + return Ok(()); + } + return Err(StaticColdStoreError::Invalid(format!( + "static cold re-put at slot {slot} with mismatched value" + ))); } - return Err(StaticColdStoreError::Invalid(format!( - "static cold re-put at slot {slot} with mismatched value" - ))); } + // Sequential write let payload = if self.config.compression { compress_record(bytes)? } else { @@ -374,7 +440,7 @@ impl Column { let target_file_id = file_id(slot); // Discard an uncommitted next-file tail after a crash. - let reset_file = (*highest_written_slot).map(file_id) != Some(target_file_id); + let reset_file = state.highest_written_slot.map(file_id) != Some(target_file_id); let off_pos = offset_position(slot); let data_path = self.data_path(target_file_id); let off_path = self.offset_path(target_file_id); @@ -419,9 +485,107 @@ impl Column { off_file.write_all(&offset.to_le_bytes())?; off_file.sync_all()?; - // Atomic config update is the commit point. - self.write_config(Some(slot), data_len)?; - *highest_written_slot = Some(slot); + atomic_write_config( + &self.config_path(), + &self.root_dir.join(CONFIG_TMP_FILE), + &self.root_dir, + Some(slot), + data_len, + &self.config, + state.backfill_file_id, + state.backfill_data_len, + )?; + state.highest_written_slot = Some(slot); + state.current_data_len = data_len; + + Ok(()) + } + + /// Write a previously-skipped slot. Called from `put` and `put_batch` + /// with `state` lock held + fn put_backfill( + &self, + slot: Slot, + bytes: &[u8], + state: &mut ColumnWriteState, + ) -> StoreResult<()> { + let fid = file_id(slot); + let offset = self.read_offset(fid, slot)?; + + if offset != 0 { + // Slot already committed - idempotent check. + let existing = self.read_record(slot)?.ok_or_else(|| { + StaticColdStoreError::Invalid( + "static cold backfill: offset nonzero but record missing".into(), + ) + })?; + if existing == bytes { + return Ok(()); + } + return Err(StaticColdStoreError::Invalid(format!( + "static cold backfill at slot {slot} conflicts with existing record" + ))); + } + + // offset == 0: fill the gap. + let payload = if self.config.compression { + compress_record(bytes)? + } else { + bytes.to_vec() + }; + let payload_len = u32::try_from(payload.len()) + .map_err(|_| StaticColdStoreError::Invalid("static cold record too large".into()))?; + + // Open for append - do NOT truncate; file already has phase-1 data. + let mut data_file = OpenOptions::new() + .read(true) + .append(true) + .create(true) + .open(self.data_path(fid))?; + if data_file.metadata()?.len() == 0 { + data_file.write_all(&VERSION_RECORD)?; + } + let data_offset = data_file.seek(SeekFrom::End(0))?; + write_record( + &mut data_file, + self.config.record_type, + payload_len, + &payload, + )?; + data_file.sync_all()?; + let new_data_len = data_file.metadata()?.len(); + + // Write offset (file must already be full-sized from phase-1 writes). + let mut off_file = OpenOptions::new() + .read(true) + .write(true) + .create(true) + .truncate(false) + .open(self.offset_path(fid))?; + if off_file.metadata()?.len() < OFFSET_FILE_LEN { + off_file.set_len(OFFSET_FILE_LEN)?; + } + off_file.seek(SeekFrom::Start(offset_position(slot)))?; + off_file.write_all(&data_offset.to_le_bytes())?; + off_file.sync_all()?; + + if Some(fid) == state.highest_written_slot.map(file_id) { + state.current_data_len = new_data_len; + } else { + state.backfill_file_id = fid; + state.backfill_data_len = new_data_len; + } + + atomic_write_config( + &self.config_path(), + &self.root_dir.join(CONFIG_TMP_FILE), + &self.root_dir, + state.highest_written_slot, + state.current_data_len, + &self.config, + state.backfill_file_id, + state.backfill_data_len, + )?; Ok(()) } @@ -450,16 +614,28 @@ impl Column { } } - let mut highest_written_slot = self.highest_written_slot.lock(); + let mut state = self.state.lock(); let mut iter = items.into_iter().peekable(); // Idempotent re-put: if the first item is exactly highest_written_slot // with matching bytes, drop it from the batch. - if let (Some(highest), Some((first_slot, _))) = (*highest_written_slot, iter.peek()) { + if let (Some(highest), Some((first_slot, _))) = (state.highest_written_slot, iter.peek()) { if *first_slot < highest { - return Err(StaticColdStoreError::Invalid( - "static cold put_batch out of order vs highest_written_slot".into(), - )); + if !self.config.allow_backfill { + return Err(StaticColdStoreError::Invalid( + "static cold put_batch out of order vs highest_written_slot".into(), + )); + } + // Backfill batch + let items: Vec<_> = iter.collect(); + for (slot, _) in &items { + if *slot > highest { + return Err(StaticColdStoreError::Invalid( + "static cold put_batch mixed sequential/backfill".into(), + )); + } + } + return self.put_batch_backfill(items, &mut state); } if *first_slot == highest { let (slot, value) = iter.next().expect("peeked"); @@ -490,7 +666,7 @@ impl Column { group.push(iter.next().expect("peeked")); } - let reset_file = (*highest_written_slot).map(file_id) != Some(target_file_id); + let reset_file = state.highest_written_slot.map(file_id) != Some(target_file_id); let data_path = self.data_path(target_file_id); let off_path = self.offset_path(target_file_id); @@ -559,33 +735,211 @@ impl Column { last_slot = Some(*s); last_data_len = data_len; } - *highest_written_slot = last_slot; + state.highest_written_slot = last_slot; } - // Single atomic config commit covering the whole batch. if let Some(s) = last_slot { - self.write_config(Some(s), last_data_len)?; + state.current_data_len = last_data_len; + atomic_write_config( + &self.config_path(), + &self.root_dir.join(CONFIG_TMP_FILE), + &self.root_dir, + Some(s), + last_data_len, + &self.config, + state.backfill_file_id, + state.backfill_data_len, + )?; } Ok(()) } - fn heal_current_file(&self, slot: Slot, current_data_len: u64) -> StoreResult<()> { - let file_id = file_id(slot); + /// Batched backfill: write all skipped slots (offset==0) in one pass per + /// file, single config commit at end. + fn put_batch_backfill( + &self, + items: Vec<(Slot, Vec)>, + state: &mut ColumnWriteState, + ) -> StoreResult<()> { + // Validate and separate: skip already-committed identical items, + // collect items to write. + let mut to_write: Vec<(Slot, Vec)> = Vec::with_capacity(items.len()); + for (slot, bytes) in items { + let offset = self.read_offset(file_id(slot), slot)?; + if offset != 0 { + let existing = self.read_record(slot)?.ok_or_else(|| { + StaticColdStoreError::Invalid( + "static cold backfill: offset nonzero but record missing".into(), + ) + })?; + if existing != bytes { + return Err(StaticColdStoreError::Invalid(format!( + "static cold backfill at slot {slot} conflicts with existing record" + ))); + } + continue; // idempotent + } + to_write.push((slot, bytes)); + } + if to_write.is_empty() { + return Ok(()); + } + + // Group by file_id (items are ascending so groups are naturally ascending). + let mut groups: Vec<(u64, Vec<(Slot, Vec)>)> = Vec::new(); + for (slot, bytes) in &to_write { + let file_id = file_id(*slot); + if groups.last().map_or(true, |(f, _)| *f != file_id) { + groups.push((file_id, Vec::new())); + } + groups + .last_mut() + .expect("just pushed") + .1 + .push((*slot, bytes.clone())); + } + + let highest_file_id = state.highest_written_slot.map(file_id); + + let non_current_files: std::collections::HashSet = to_write + .iter() + .map(|(slot, _)| file_id(*slot)) + .filter(|file_id| Some(*file_id) != highest_file_id) + .collect(); + if non_current_files.len() > 1 { + return Err(StaticColdStoreError::Invalid( + "static cold backfill batch targets multiple non-current files; split into separate batches".into(), + )); + } + + for (file_id, group) in &groups { + let mut data_file = OpenOptions::new() + .read(true) + .append(true) + .create(true) + .open(self.data_path(*file_id))?; + if data_file.metadata()?.len() == 0 { + data_file.write_all(&VERSION_RECORD)?; + } + + let mut offsets: Vec<(Slot, u64)> = Vec::with_capacity(group.len()); + { + let mut w = std::io::BufWriter::with_capacity(1 << 20, &mut data_file); + let mut cursor = w.get_ref().metadata()?.len(); + for (slot, bytes) in group { + let payload = if self.config.compression { + compress_record(bytes)? + } else { + bytes.to_vec() + }; + let payload_len = u32::try_from(payload.len()).map_err(|_| { + StaticColdStoreError::Invalid("static cold record too large".into()) + })?; + offsets.push((*slot, cursor)); + w.write_all(&self.config.record_type)?; + w.write_all(&payload_len.to_le_bytes())?; + w.write_all(&0u16.to_le_bytes())?; + w.write_all(&payload)?; + cursor += 8 + payload.len() as u64; + } + w.flush()?; + } + let new_data_len = data_file.seek(SeekFrom::End(0))?; + data_file.sync_all()?; + + let mut off_file = OpenOptions::new() + .read(true) + .write(true) + .create(true) + .truncate(false) + .open(self.offset_path(*file_id))?; + if off_file.metadata()?.len() < OFFSET_FILE_LEN { + off_file.set_len(OFFSET_FILE_LEN)?; + } + for (slot, offset) in &offsets { + off_file.seek(SeekFrom::Start(offset_position(*slot)))?; + off_file.write_all(&offset.to_le_bytes())?; + } + off_file.sync_all()?; + + if Some(*file_id) == highest_file_id { + state.current_data_len = new_data_len; + } else { + state.backfill_file_id = *file_id; + state.backfill_data_len = new_data_len; + } + } + + // Single atomic config commit. + atomic_write_config( + &self.config_path(), + &self.root_dir.join(CONFIG_TMP_FILE), + &self.root_dir, + state.highest_written_slot, + state.current_data_len, + &self.config, + state.backfill_file_id, + state.backfill_data_len, + ) + } + + fn heal_on_open( + &self, + slot: Slot, + current_data_len: u64, + allow_backfill: bool, + backfill_file_id: u64, + backfill_data_len: u64, + ) -> StoreResult<()> { + let current_file_id = file_id(slot); + self.heal_file(current_file_id, Some(slot), current_data_len)?; + + if allow_backfill { + self.scan_and_zero_dangling_offsets(current_file_id, current_data_len)?; + + if backfill_file_id != NO_BACKFILL_FILE && backfill_file_id != current_file_id { + self.heal_file(backfill_file_id, None, backfill_data_len)?; + self.scan_and_zero_dangling_offsets(backfill_file_id, backfill_data_len)?; + } + } + Ok(()) + } + + /// Truncate `data_{file_id}` to `committed_len`. If `highest_slot` is Some, + /// also clear trailing offset entries beyond that slot. + fn heal_file( + &self, + file_id: u64, + highest_slot: Option, + committed_len: u64, + ) -> StoreResult<()> { let data_path = self.data_path(file_id); + if !data_path.exists() { + return Ok(()); + } + let data_file = OpenOptions::new().read(true).write(true).open(&data_path)?; let data_len = data_file.metadata()?.len(); - if data_len < current_data_len { + if data_len < committed_len { return Err(StaticColdStoreError::Invalid( "static cold data file shorter than committed length".into(), )); } - if data_len != current_data_len { - data_file.set_len(current_data_len)?; + if data_len != committed_len { + data_file.set_len(committed_len)?; data_file.sync_all()?; } + let Some(slot) = highest_slot else { + return Ok(()); + }; + let off_path = self.offset_path(file_id); + if !off_path.exists() { + return Ok(()); + } + let mut off_file = OpenOptions::new().read(true).write(true).open(&off_path)?; let required_len = offset_position(slot) + OFFSET_SIZE; let off_len = off_file.metadata()?.len(); @@ -597,37 +951,55 @@ impl Column { if off_len < OFFSET_FILE_LEN { off_file.set_len(OFFSET_FILE_LEN)?; } - let clear_start = required_len; if clear_start < OFFSET_FILE_LEN { - // Remove offsets to entries beyond the committed slot. off_file.seek(SeekFrom::Start(clear_start))?; - let zeroes = vec![0; (OFFSET_FILE_LEN - clear_start) as usize]; - off_file.write_all(&zeroes)?; + off_file.write_all(&vec![0u8; (OFFSET_FILE_LEN - clear_start) as usize])?; off_file.sync_all()?; } - Ok(()) } - fn write_config( + /// Zero any offset entries that point past `committed_data_len`. + /// O(SLOTS_PER_FILE) = O(8192). Called only from `heal_on_open`. + fn scan_and_zero_dangling_offsets( &self, - highest_written_slot: Option, - current_data_len: u64, + file_id: u64, + committed_data_len: u64, ) -> StoreResult<()> { - atomic_write_config( - &self.config_path(), - &self.root_dir.join(CONFIG_TMP_FILE), - &self.root_dir, - highest_written_slot, - current_data_len, - &self.config, - ) + let off_path = self.offset_path(file_id); + if !off_path.exists() { + return Ok(()); + } + + let mut off_file = OpenOptions::new().read(true).write(true).open(&off_path)?; + let entries = (off_file.metadata()?.len() / OFFSET_SIZE) as usize; + let mut any = false; + let mut buf = [0u8; 8]; + for i in 0..entries { + let pos = (i as u64) * OFFSET_SIZE; + off_file.seek(SeekFrom::Start(pos))?; + off_file.read_exact(&mut buf)?; + let offset = u64::from_le_bytes(buf); + if offset != 0 && offset >= committed_data_len { + off_file.seek(SeekFrom::Start(pos))?; + off_file.write_all(&0u64.to_le_bytes())?; + any = true; + } + } + if any { + off_file.sync_all()?; + } + Ok(()) } fn read_offset(&self, file_id: u64, slot: Slot) -> StoreResult { let off_path = self.offset_path(file_id); - let mut off_file = File::open(&off_path)?; + let mut off_file = match File::open(&off_path) { + Ok(f) => f, + Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(0), + Err(e) => return Err(e.into()), + }; let mut bytes = [0; 8]; off_file.seek(SeekFrom::Start(offset_position(slot)))?; off_file.read_exact(&mut bytes)?; @@ -651,11 +1023,23 @@ impl Column { fn read_config(path: &Path) -> StoreResult { let bytes = fs::read(path)?; - if bytes.len() != CONFIG_LEN || &bytes[0..8] != CONFIG_MAGIC { - return Err(StaticColdStoreError::Invalid( - "invalid static cold config".into(), - )); - } + + let (allow_backfill, backfill_file_id, backfill_data_len) = + if bytes.len() == CONFIG_LEN && &bytes[0..8] == CONFIG_MAGIC { + let flags = bytes[27]; + ( + (flags & FLAG_ALLOW_BACKFILL) != 0, + u64::from_le_bytes(bytes[36..44].try_into().expect("slice length checked")), + u64::from_le_bytes(bytes[44..52].try_into().expect("slice length checked")), + ) + } else if bytes.len() == CONFIG_LEN_V2 && &bytes[0..8] == CONFIG_MAGIC_V2 { + (false, NO_BACKFILL_FILE, 0u64) + } else { + return Err(StaticColdStoreError::Invalid( + "invalid static cold config".into(), + )); + }; + let highest = u64::from_le_bytes(bytes[8..16].try_into().expect("slice length checked")); let current_data_len = u64::from_le_bytes(bytes[16..24].try_into().expect("slice length checked")); @@ -671,12 +1055,16 @@ fn read_config(path: &Path) -> StoreResult { }; let max_value_bytes = u64::from_le_bytes(bytes[28..36].try_into().expect("slice length checked")); + Ok(ColumnConfigOnDisk { highest_written_slot: (highest != EMPTY_SLOT).then(|| Slot::new(highest)), current_data_len, record_type, compression, max_value_bytes, + allow_backfill, + backfill_file_id, + backfill_data_len, }) } @@ -687,23 +1075,56 @@ fn atomic_write_config( highest_written_slot: Option, current_data_len: u64, config: &ColumnConfig, + backfill_file_id: u64, + backfill_data_len: u64, ) -> StoreResult<()> { - let mut bytes = [0u8; CONFIG_LEN]; - bytes[0..8].copy_from_slice(CONFIG_MAGIC); - bytes[8..16].copy_from_slice( - &highest_written_slot - .map_or(EMPTY_SLOT, |slot| slot.as_u64()) - .to_le_bytes(), - ); - bytes[16..24].copy_from_slice(¤t_data_len.to_le_bytes()); - bytes[24..26].copy_from_slice(&config.record_type); - bytes[26] = if config.compression { - COMPRESSION_SNAPPY + let bytes: Vec = if config.allow_backfill { + let mut b = [0u8; CONFIG_LEN]; + + b[0..8].copy_from_slice(CONFIG_MAGIC); + b[8..16].copy_from_slice( + &highest_written_slot + .map_or(EMPTY_SLOT, |slot| slot.as_u64()) + .to_le_bytes(), + ); + b[16..24].copy_from_slice(¤t_data_len.to_le_bytes()); + b[24..26].copy_from_slice(&config.record_type); + b[26] = if config.compression { + COMPRESSION_SNAPPY + } else { + COMPRESSION_NONE + }; + b[27] = if config.allow_backfill { + FLAG_ALLOW_BACKFILL + } else { + 0 + }; + b[28..36].copy_from_slice(&config.max_value_bytes.to_le_bytes()); + b[36..44].copy_from_slice(&backfill_file_id.to_le_bytes()); + b[44..52].copy_from_slice(&backfill_data_len.to_le_bytes()); + + b.to_vec() } else { - COMPRESSION_NONE + let mut b = [0u8; CONFIG_LEN_V2]; + + b[0..8].copy_from_slice(CONFIG_MAGIC_V2); + b[8..16].copy_from_slice( + &highest_written_slot + .map_or(EMPTY_SLOT, |slot| slot.as_u64()) + .to_le_bytes(), + ); + b[16..24].copy_from_slice(¤t_data_len.to_le_bytes()); + b[24..26].copy_from_slice(&config.record_type); + b[26] = if config.compression { + COMPRESSION_SNAPPY + } else { + COMPRESSION_NONE + }; + b[27] = 0; + b[28..36].copy_from_slice(&config.max_value_bytes.to_le_bytes()); + + b.to_vec() }; - bytes[27] = 0; - bytes[28..36].copy_from_slice(&config.max_value_bytes.to_le_bytes()); { let mut tmp = File::create(tmp_path)?; @@ -787,9 +1208,15 @@ impl crate::ColdStore for StaticColdStore { // Acceptable today because iter_from is only used by infrequent paths // (forwards iter, invariants). Improve if it becomes a hotspot. let column = &self.columns[&c]; - let Some(highest) = *column.highest_written_slot.lock() else { - return Box::new(std::iter::empty()); + + let highest = { + let state = column.state.lock(); + let Some(highest) = state.highest_written_slot else { + return Box::new(std::iter::empty()); + }; + highest }; + if from > highest { return Box::new(std::iter::empty()); } @@ -851,3 +1278,450 @@ impl crate::ColdStore for StaticColdStore { KeyValueStore::sync(&self.index_db) } } + +#[cfg(test)] +mod tests { + use crate::ColdStore; + + use super::*; + use tempfile::TempDir; + use types::MainnetEthSpec; + + const SLOT_8192: u64 = 8192; + + fn make_store(temp: &TempDir, allow_backfill: bool) -> StaticColdStore { + let mut config = StoreConfig::default(); + config.allow_backfill = allow_backfill; + let path = temp.path(); + StaticColdStore::open(path, &config).unwrap() + } + + fn write_slot( + store: &StaticColdStore, + col: DBColumnCold, + slot: u64, + val: &[u8], + ) { + store.put(col, Slot::new(slot), val).unwrap(); + } + + fn get_slot( + store: &StaticColdStore, + col: DBColumnCold, + slot: u64, + ) -> Option> { + store.get(col, Slot::new(slot)).unwrap() + } + + fn col_dir(temp: &TempDir, col: DBColumnCold) -> std::path::PathBuf { + temp.path().join(column_config(col).subdir) + } + + fn data_path(dir: &std::path::Path, file_id: u64) -> std::path::PathBuf { + dir.join(format!("{DATA_FILE_PREFIX}{file_id:05}")) + } + + fn offset_path(dir: &std::path::Path, file_id: u64) -> std::path::PathBuf { + dir.join(format!("{DATA_FILE_PREFIX}{file_id:05}.off")) + } + + /// Append raw record to data file without updating offset or config + fn write_raw_record( + data_path: &std::path::Path, + record_type: [u8; 2], + payload: &[u8], + ) -> std::io::Result { + let mut file = OpenOptions::new() + .append(true) + .create(true) + .open(data_path)?; + let len = u32::try_from(payload.len()).unwrap(); + let offset = file.seek(std::io::SeekFrom::End(0))?; + file.write_all(&record_type)?; + file.write_all(&len.to_le_bytes())?; + file.write_all(&0u16.to_le_bytes())?; + file.write_all(payload)?; + Ok(offset) + } + + /// Write offset entry directly + fn write_offset(off_path: &std::path::Path, slot: u64, offset: u64) -> std::io::Result<()> { + let pos = (slot % SLOT_8192) * 8; + let mut file = OpenOptions::new() + .read(true) + .write(true) + .create(true) + .open(off_path)?; + let current_len = file.metadata()?.len(); + if current_len < SLOT_8192 * 8 { + file.set_len(SLOT_8192 * 8)?; + } + file.seek(std::io::SeekFrom::Start(pos))?; + file.write_all(&offset.to_le_bytes())?; + Ok(()) + } + + #[test] + fn test_backfill_skipped_slot_readable() { + let tmp = TempDir::new().unwrap(); + let store = make_store(&tmp, true); + let col = DBColumnCold::BlockRoots; + + write_slot(&store, col, 10, b"v10"); + write_slot(&store, col, 11, b"v11"); + // skipe 12 + write_slot(&store, col, 13, b"v13"); + + write_slot(&store, col, 12, b"v12"); + + assert_eq!(get_slot(&store, col, 12), Some(b"v12".to_vec())); + } + + #[test] + fn test_backfill_into_current_file() { + let tmp = TempDir::new().unwrap(); + let store = make_store(&tmp, true); + let col = DBColumnCold::BlockRoots; + + // Write slots in file 0 (0-8191) + write_slot(&store, col, 100, b"v100"); + write_slot(&store, col, 200, b"v200"); + // Backfill slot 50 (same file 0) + write_slot(&store, col, 50, b"v50"); + + assert_eq!(get_slot(&store, col, 50), Some(b"v50".to_vec())); + assert_eq!(get_slot(&store, col, 200), Some(b"v200".to_vec())); + } + + #[test] + fn test_backfill_into_sealed_file() { + let tmp = TempDir::new().unwrap(); + let store = make_store(&tmp, true); + let col = DBColumnCold::BlockRoots; + + // highest in file 1 + write_slot(&store, col, SLOT_8192 + 100, b"v"); + // backfill into file 0 (sealed) + write_slot(&store, col, 50, b"v50"); + + assert_eq!(get_slot(&store, col, 50), Some(b"v50".to_vec())); + assert_eq!(get_slot(&store, col, SLOT_8192 + 100), Some(b"v".to_vec())); + } + + #[test] + fn test_backfill_rejected_when_disabled() { + let tmp = TempDir::new().unwrap(); + let store = make_store(&tmp, false); // allow_backfill = false + let col = DBColumnCold::BlockRoots; + + write_slot(&store, col, 10, b"v10"); + let res = store.put(col, Slot::new(5), b"v5"); + + assert!(res.is_err()); + let err = res.unwrap_err(); + assert!( + err.to_string() + .contains("cannot fill a previously-skipped slot") + ); + } + + #[test] + fn test_backfill_does_not_advance_highest_written_slot() { + let tmp = TempDir::new().unwrap(); + let store = make_store(&tmp, true); + let col = DBColumnCold::BlockRoots; + + write_slot(&store, col, 100, b"v100"); + write_slot(&store, col, 50, b"v50"); // backfill + + // highest should still be 100 + let val = get_slot(&store, col, 100); + assert!(val.is_some()); + let val50 = get_slot(&store, col, 50); + assert!(val50.is_some()); + } + + #[test] + fn test_backfill_idempotent_reput() { + let tmp = TempDir::new().unwrap(); + let store = make_store(&tmp, true); + let col = DBColumnCold::BlockRoots; + + write_slot(&store, col, 10, b"v10"); + write_slot(&store, col, 5, b"v5"); + // Re-put same slot with identical value + let res = store.put(col, Slot::new(5), b"v5"); + assert!(res.is_ok()); + assert_eq!(get_slot(&store, col, 5), Some(b"v5".to_vec())); + } + + #[test] + fn test_backfill_rejects_already_populated_slot_with_different_data() { + let tmp = TempDir::new().unwrap(); + let store = make_store(&tmp, true); + let col = DBColumnCold::BlockRoots; + + write_slot(&store, col, 10, b"v10"); + write_slot(&store, col, 5, b"v5"); + // Re-put different value + let res = store.put(col, Slot::new(5), b"different"); + assert!(res.is_err()); + assert!(res.unwrap_err().to_string().contains("conflicts")); + } + + #[test] + fn test_backfill_compressed_column_round_trip() { + let tmp = TempDir::new().unwrap(); + let store = make_store(&tmp, true); + let col = DBColumnCold::Block; // compression: true + + write_slot(&store, col, SLOT_8192 + 100, b"v"); + write_slot(&store, col, 50, &[0xAB; 8192]); + + assert_eq!(get_slot(&store, col, 50), Some(vec![0xAB; 8192])); + } + + #[test] + fn test_iter_from_includes_backfilled_slots() { + let tmp = TempDir::new().unwrap(); + let store = make_store(&tmp, true); + let col = DBColumnCold::BlockRoots; + + write_slot(&store, col, 0, b"v0"); + write_slot(&store, col, 10, b"v10"); + write_slot(&store, col, 5, b"v5"); // backfill + + let results: Vec<(Slot, Vec)> = ColdStore::iter_from(&store, col, Slot::new(0)) + .collect::>() + .unwrap(); + + assert_eq!(results.len(), 3); + assert_eq!(results[0].0, Slot::new(0)); + assert_eq!(results[1].0, Slot::new(5)); + assert_eq!(results[2].0, Slot::new(10)); + } + + #[test] + fn test_contains_backfilled_slot() { + let tmp = TempDir::new().unwrap(); + let store = make_store(&tmp, true); + let col = DBColumnCold::BlockRoots; + + write_slot(&store, col, 10, b"v10"); + assert!(!store.contains(col, Slot::new(5)).unwrap()); + + write_slot(&store, col, 5, b"v5"); // backfill + assert!(store.contains(col, Slot::new(5)).unwrap()); + } + + // Batched backfill + + #[test] + fn test_backfill_batch_multiple_slots_same_file() { + let tmp = TempDir::new().unwrap(); + let store = make_store(&tmp, true); + let col = DBColumnCold::BlockRoots; + + write_slot(&store, col, SLOT_8192 + 100, b"v"); + let items: Vec<(Slot, Vec)> = (0..5) + .map(|i| (Slot::new(i), format!("v{i}").into_bytes())) + .collect(); + store.put_batch(col, items).unwrap(); + + for i in 0..5 { + assert_eq!( + get_slot(&store, col, i as u64), + Some(format!("v{i}").into_bytes()) + ); + } + } + + #[test] + fn test_backfill_batch_across_file_boundaries() { + let tmp = TempDir::new().unwrap(); + let store = make_store(&tmp, true); + let col = DBColumnCold::BlockRoots; + + write_slot(&store, col, SLOT_8192 * 2, b"v"); + // Batch spans files 0 and 1 (both non-current vs highest in file 2) + let items: Vec<(Slot, _)> = vec![ + (Slot::new(50), b"v50".to_vec()), + ((SLOT_8192 + 50).into(), b"v".to_vec()), + ]; + let res = store.put_batch(col, items); + + assert!(res.is_err()); + } + + #[test] + fn test_backfill_mixed_sequential_and_backfill_batch_rejected() { + let tmp = TempDir::new().unwrap(); + let store = make_store(&tmp, true); + let col = DBColumnCold::BlockRoots; + + write_slot(&store, col, 100, b"v100"); + // Batch has backfill slot (50) and sequential slot (200) + let items: Vec<(Slot, _)> = vec![ + (Slot::new(50), b"v50".to_vec()), + (Slot::new(200), b"v200".to_vec()), + ]; + let res = store.put_batch(col, items); + + assert!(res.is_err()); + } + + // Crash recovery + + #[test] + fn test_heal_truncates_data_and_zeros_dangling_offset_after_crash() { + let tmp = TempDir::new().unwrap(); + let store = make_store(&tmp, true); + let col = DBColumnCold::BlockRoots; + let dir = col_dir(&tmp, col); + + write_slot(&store, col, 10, b"v10"); + + // Crash: backfill data + offset written, conf not updated + let offset = write_raw_record(&data_path(&dir, 0), [0x02, 0x00], b"backfill").unwrap(); + write_offset(&offset_path(&dir, 0), 5, offset).unwrap(); + + drop(store); + + let store2 = make_store(&tmp, true); + assert_eq!(get_slot(&store2, col, 10), Some(b"v10".to_vec())); + assert_eq!(get_slot(&store2, col, 5), None); + } + + #[test] + fn test_heal_preserves_committed_backfill_data() { + let tmp = TempDir::new().unwrap(); + let store = make_store(&tmp, true); + let col = DBColumnCold::BlockRoots; + + write_slot(&store, col, SLOT_8192 + 100, b"v"); + write_slot(&store, col, 50, b"v50"); + + drop(store); + + let store2 = make_store(&tmp, true); + assert_eq!(get_slot(&store2, col, 50), Some(b"v50".to_vec())); + } + + #[test] + fn test_sequential_write_after_backfill_correct_data_len() { + let tmp = TempDir::new().unwrap(); + let store = make_store(&tmp, true); + let col = DBColumnCold::BlockRoots; + + // Backfill into sealed file + write_slot(&store, col, SLOT_8192 + 100, b"v"); + write_slot(&store, col, 50, b"v50"); + // Sequential write into current file + write_slot(&store, col, SLOT_8192 + 200, b"v200"); + + drop(store); + + let store2 = make_store(&tmp, true); + assert_eq!(get_slot(&store2, col, 50), Some(b"v50".to_vec())); + assert_eq!( + get_slot(&store2, col, SLOT_8192 + 200), + Some(b"v200".to_vec()) + ); + } + + /// Test heal truncates uncommitted data in sealed backfill file + #[test] + fn test_heal_truncates_uncommitted_backfill_in_sealed_file() { + let tmp = TempDir::new().unwrap(); + let store = make_store(&tmp, true); + let col = DBColumnCold::BlockRoots; + let dir = col_dir(&tmp, col); + + // Commit highest in file 1 + write_slot(&store, col, SLOT_8192 + 100, b"v100"); + + // Crash: backfill data + offset written, conf not updated + let backfill_offset = + write_raw_record(&data_path(&dir, 0), [0x02, 0x00], b"backfill").unwrap(); + write_offset(&offset_path(&dir, 0), 50, backfill_offset).unwrap(); + + drop(store); + + let store2 = make_store(&tmp, true); + assert_eq!( + get_slot(&store2, col, SLOT_8192 + 100), + Some(b"v100".to_vec()) + ); + } + + /// Test multiple sequential uncommitted records get truncated + #[test] + fn test_heal_truncates_multiple_uncommitted_backfills() { + let tmp = TempDir::new().unwrap(); + let store = make_store(&tmp, true); + let col = DBColumnCold::BlockRoots; + let dir = col_dir(&tmp, col); + + write_slot(&store, col, 100, b"v100"); + + // Write 3 uncommitted backfill records + for slot in [10, 20, 30] { + let offset = write_raw_record( + &data_path(&dir, 0), + [0x02, 0x00], + format!("v{slot}").as_bytes(), + ) + .unwrap(); + write_offset(&offset_path(&dir, 0), slot, offset).unwrap(); + } + // Partial write of 4th (simulates crash mid-write) + { + let mut file = OpenOptions::new() + .append(true) + .open(&data_path(&dir, 0)) + .unwrap(); + file.write_all(&[0x02, 0x00, 8, 0, 0, 0, 0, 0]).unwrap(); + } + + drop(store); + + let store2 = make_store(&tmp, true); + assert_eq!(get_slot(&store2, col, 100), Some(b"v100".to_vec())); + // All uncommitted should be truncated + assert_eq!(get_slot(&store2, col, 10), None); + assert_eq!(get_slot(&store2, col, 20), None); + assert_eq!(get_slot(&store2, col, 30), None); + } + + #[test] + fn test_v2_conf_upgrades_to_v3_when_backfill_enabled() { + let tmp = TempDir::new().unwrap(); + let col = DBColumnCold::BlockRoots; + let dir = col_dir(&tmp, col); + let conf_path = dir.join(CONFIG_FILE); + + // Open with backfill disabled (writes v2 format) + { + let store = make_store(&tmp, false); + write_slot(&store, col, 10, b"v10"); + } + + // Verify v2 format on disk + let bytes = std::fs::read(&conf_path).unwrap(); + assert_eq!(&bytes[0..8], b"LHSTBLK2"); + assert_eq!(bytes.len(), CONFIG_LEN_V2); + + // Reopen with backfill enabled - should upgrade to v3 + { + let store = make_store(&tmp, true); + write_slot(&store, col, 5, b"v5"); + assert_eq!(get_slot(&store, col, 5), Some(b"v5".to_vec())); + } + + // Verify v3 format on disk + let bytes = std::fs::read(&conf_path).unwrap(); + assert_eq!(&bytes[0..8], b"LHSTBLK3"); + assert_eq!(bytes.len(), CONFIG_LEN); + assert_eq!(bytes[27] & FLAG_ALLOW_BACKFILL, FLAG_ALLOW_BACKFILL); + } +}