From 06f9c2e1d1448c3bb9e6ea43b1b070219a26f823 Mon Sep 17 00:00:00 2001 From: Kim Altintop Date: Wed, 7 Jan 2026 12:05:35 +0100 Subject: [PATCH 01/13] Append commit instead of individual transactions to commitlog This moves the following responsibilities to the datastore: - maintenance of the transaction offset - deciding how many transactions are in a commit --- crates/commitlog/src/commitlog.rs | 198 +++++++------ crates/commitlog/src/lib.rs | 74 +---- crates/commitlog/src/repo/mod.rs | 4 - crates/commitlog/src/segment.rs | 262 ++++++------------ crates/commitlog/src/tests/helpers.rs | 25 +- crates/commitlog/src/tests/partial.rs | 174 +----------- crates/commitlog/tests/random_payload/mod.rs | 23 +- crates/commitlog/tests/streaming/mod.rs | 7 +- crates/core/src/db/durability.rs | 22 +- crates/core/src/db/relational_db.rs | 13 +- .../subscription/module_subscription_actor.rs | 11 +- crates/durability/src/imp/local.rs | 73 +---- crates/durability/src/imp/mod.rs | 4 +- crates/durability/src/lib.rs | 14 +- 14 files changed, 254 insertions(+), 650 deletions(-) diff --git a/crates/commitlog/src/commitlog.rs b/crates/commitlog/src/commitlog.rs index 03c590e4950..03d2bb0ef38 100644 --- a/crates/commitlog/src/commitlog.rs +++ b/crates/commitlog/src/commitlog.rs @@ -118,7 +118,7 @@ impl Generic { /// [`io::ErrorKind::InvalidInput`] is returned. /// /// Also see [`Self::commit`]. - pub fn set_epoch(&mut self, epoch: u64) -> io::Result> { + pub fn set_epoch(&mut self, epoch: u64) -> io::Result<()> { use std::cmp::Ordering::*; match epoch.cmp(&self.head.epoch()) { @@ -126,58 +126,15 @@ impl Generic { io::ErrorKind::InvalidInput, "new epoch is smaller than current epoch", )), - Equal => Ok(None), + Equal => Ok(()), Greater => { - let res = self.commit()?; + self.flush()?; self.head.set_epoch(epoch); - Ok(res) + Ok(()) } } } - /// Write the currently buffered data to storage and rotate segments as - /// necessary. - /// - /// Note that this does not imply that the data is durable, in particular - /// when a filesystem storage backend is used. Call [`Self::sync`] to flush - /// any OS buffers to stable storage. - /// - /// # Errors - /// - /// If an error occurs writing the data, the current [`Commit`] buffer is - /// retained, but a new segment is created. Retrying in case of an `Err` - /// return value thus will write the current data to that new segment. - /// - /// If this fails, however, the next attempt to create a new segment will - /// fail with [`io::ErrorKind::AlreadyExists`]. Encountering this error kind - /// this means that something is seriously wrong underlying storage, and the - /// caller should stop writing to the log. - pub fn commit(&mut self) -> io::Result> { - self.panicked = true; - let writer = &mut self.head; - let sz = writer.commit.encoded_len(); - // If the segment is empty, but the commit exceeds the max size, - // we got a huge commit which needs to be written even if that - // results in a huge segment. - let should_rotate = !writer.is_empty() && writer.len() + sz as u64 > self.opts.max_segment_size; - let writer = if should_rotate { - self.sync(); - self.start_new_segment()? - } else { - writer - }; - - let ret = writer.commit().or_else(|e| { - warn!("Commit failed: {e}"); - // Nb.: Don't risk a panic by calling `self.sync()`. - // We already gave up on the last commit, and will retry it next time. - self.start_new_segment()?; - Err(e) - }); - self.panicked = false; - ret - } - /// Force the currently active segment to be flushed to storage. /// /// Using a filesystem backend, this means to call `fsync(2)`. @@ -195,6 +152,16 @@ impl Generic { self.panicked = false; } + pub fn flush(&mut self) -> io::Result<()> { + self.head.flush() + } + + fn flush_and_sync(&mut self) -> io::Result<()> { + self.flush()?; + self.sync(); + Ok(()) + } + /// The last transaction offset written to disk, or `None` if nothing has /// been written yet. /// @@ -303,8 +270,17 @@ impl Generic { } impl Generic { - pub fn append(&mut self, record: T) -> Result<(), T> { - self.head.append(record) + pub fn commit>>(&mut self, transactions: impl IntoIterator) -> io::Result<()> { + self.panicked = true; + let writer = &mut self.head; + writer.commit(transactions)?; + if writer.len() >= self.opts.max_segment_size { + self.flush_and_sync()?; + self.start_new_segment()?; + } + self.panicked = false; + + Ok(()) } pub fn transactions_from<'a, D>( @@ -348,8 +324,8 @@ impl Generic { impl Drop for Generic { fn drop(&mut self) { if !self.panicked { - if let Err(e) = self.head.commit() { - warn!("failed to commit on drop: {e}"); + if let Err(e) = self.flush_and_sync() { + warn!("failed to flush on drop: {e:#}"); } } } @@ -920,7 +896,7 @@ fn range_is_empty(range: &impl RangeBounds) -> bool { #[cfg(test)] mod tests { - use std::{cell::Cell, iter::repeat, num::NonZeroU16}; + use std::{cell::Cell, iter::repeat}; use pretty_assertions::assert_matches; @@ -933,30 +909,31 @@ mod tests { #[test] fn rotate_segments_simple() { let mut log = mem_log::<[u8; 32]>(128); - for _ in 0..3 { - log.append([0; 32]).unwrap(); - log.commit().unwrap(); + for i in 0..4 { + log.commit([(i, [0; 32])]).unwrap(); } + log.flush_and_sync().unwrap(); let offsets = log.repo.existing_offsets().unwrap(); assert_eq!(&offsets[..offsets.len() - 1], &log.tail); - assert_eq!(offsets[offsets.len() - 1], 2); + // TODO: We overshoot the max segment size. + assert_eq!(&offsets, &[0, 3]); } #[test] fn huge_commit() { let mut log = mem_log::<[u8; 32]>(32); - log.append([0; 32]).unwrap(); - log.append([1; 32]).unwrap(); - log.commit().unwrap(); - assert!(log.head.len() > log.opts.max_segment_size); + log.commit([(0, [0; 32]), (1, [1; 32])]).unwrap(); + log.flush_and_sync().unwrap(); + // First segment got rotated out. + assert_eq!(&log.tail, &[0]); - log.append([2; 32]).unwrap(); - log.commit().unwrap(); + log.commit([(2, [2; 32])]).unwrap(); + log.flush_and_sync().unwrap(); - assert_eq!(&log.tail, &[0]); - assert_eq!(&log.repo.existing_offsets().unwrap(), &[0, 2]); + // Second segment got rotated out and segment 3 is created. + assert_eq!(&log.repo.existing_offsets().unwrap(), &[0, 2, 3]); } #[test] @@ -1052,14 +1029,31 @@ mod tests { fn traverse_commits_ignores_duplicates() { let mut log = mem_log::<[u8; 32]>(1024); - log.append([42; 32]).unwrap(); - let commit1 = log.head.commit.clone(); - log.commit().unwrap(); - log.head.commit = commit1.clone(); - log.commit().unwrap(); - log.append([43; 32]).unwrap(); - let commit2 = log.head.commit.clone(); - log.commit().unwrap(); + let tx1 = [42u8; 32]; + let tx2 = [43u8; 32]; + + log.commit([(0, tx1)]).unwrap(); + let commit1 = Commit { + min_tx_offset: 0, + n: 1, + records: tx1.to_vec(), + ..log.head.commit.clone() + }; + + // Reset the commit offset, so we can write the same commit twice. + log.head.commit.min_tx_offset = 0; + log.commit([(0, tx1)]).unwrap(); + + // Write another one. + log.commit([(1, tx2)]).unwrap(); + let commit2 = Commit { + min_tx_offset: 1, + n: 1, + records: tx2.to_vec(), + ..log.head.commit.clone() + }; + + log.flush_and_sync().unwrap(); assert_eq!( [commit1, commit2].as_slice(), @@ -1074,15 +1068,14 @@ mod tests { fn traverse_commits_errors_when_forked() { let mut log = mem_log::<[u8; 32]>(1024); - log.append([42; 32]).unwrap(); - log.commit().unwrap(); - log.head.commit = Commit { - min_tx_offset: 0, - n: 1, - records: [43; 32].to_vec(), - epoch: 0, - }; - log.commit().unwrap(); + log.commit([(0, [42; 32])]).unwrap(); + // Reset the commit offset, + // and write a different commit at the same offset. + // This is considered a fork. + log.head.commit.min_tx_offset = 0; + log.commit([(0, [43; 32])]).unwrap(); + + log.flush_and_sync().unwrap(); let res = log.commits_from(0).collect::, _>>(); assert!( @@ -1095,11 +1088,11 @@ mod tests { fn traverse_commits_errors_when_offset_not_contiguous() { let mut log = mem_log::<[u8; 32]>(1024); - log.append([42; 32]).unwrap(); - log.commit().unwrap(); + log.commit([(0, [42; 32])]).unwrap(); log.head.commit.min_tx_offset = 18; - log.append([42; 32]).unwrap(); - log.commit().unwrap(); + log.commit([(18, [42; 32])]).unwrap(); + + log.flush_and_sync().unwrap(); let res = log.commits_from(0).collect::, _>>(); assert!( @@ -1111,7 +1104,7 @@ mod tests { prev_error: None }) ), - "expected fork error: {res:?}" + "expected out-of-order error: {res:?}" ) } @@ -1221,7 +1214,7 @@ mod tests { #[test] fn reopen() { let mut log = mem_log::<[u8; 32]>(1024); - let mut total_txs = fill_log(&mut log, 100, (1..=10).cycle()); + let total_txs = fill_log(&mut log, 100, (1..=10).cycle()); assert_eq!( total_txs, log.transactions_from(0, &ArrayDecoder).map(Result::unwrap).count() @@ -1231,12 +1224,11 @@ mod tests { log.repo.clone(), Options { max_segment_size: 1024, - max_records_in_commit: NonZeroU16::new(10).unwrap(), ..Options::default() }, ) .unwrap(); - total_txs += fill_log(&mut log, 100, (1..=10).cycle()); + let total_txs = fill_log(&mut log, 100, (1..=10).cycle()); assert_eq!( total_txs, @@ -1245,24 +1237,22 @@ mod tests { } #[test] - fn set_same_epoch_does_nothing() { - let mut log = Generic::<_, [u8; 32]>::open(repo::Memory::unlimited(), <_>::default()).unwrap(); - assert_eq!(log.epoch(), Commit::DEFAULT_EPOCH); - let committed = log.set_epoch(Commit::DEFAULT_EPOCH).unwrap(); - assert_eq!(committed, None); - } - - #[test] - fn set_new_epoch_commits() { + fn set_new_epoch() { let mut log = Generic::<_, [u8; 32]>::open(repo::Memory::unlimited(), <_>::default()).unwrap(); assert_eq!(log.epoch(), Commit::DEFAULT_EPOCH); - log.append(<_>::default()).unwrap(); - let committed = log - .set_epoch(42) - .unwrap() - .expect("should have committed the pending transaction"); + log.commit([(0, [12; 32])]).unwrap(); + log.set_epoch(42).unwrap(); assert_eq!(log.epoch(), 42); - assert_eq!(committed.tx_range.start, 0); + log.commit([(1, [13; 32])]).unwrap(); + + log.flush_and_sync().unwrap(); + + let epochs = log + .commits_from(0) + .map(Result::unwrap) + .map(|commit| commit.epoch) + .collect::>(); + assert_eq!(&[Commit::DEFAULT_EPOCH, 42], epochs.as_slice()); } #[test] diff --git a/crates/commitlog/src/lib.rs b/crates/commitlog/src/lib.rs index 1e7a8c0047e..725d501671f 100644 --- a/crates/commitlog/src/lib.rs +++ b/crates/commitlog/src/lib.rs @@ -1,6 +1,6 @@ use std::{ io, - num::{NonZeroU16, NonZeroU64}, + num::NonZeroU64, ops::RangeBounds, sync::{Arc, RwLock}, }; @@ -20,7 +20,7 @@ mod varint; pub use crate::{ commit::{Commit, StoredCommit}, payload::{Decoder, Encode}, - repo::fs::SizeOnDisk, + repo::{fs::SizeOnDisk, TxOffset}, segment::{Transaction, DEFAULT_LOG_FORMAT_VERSION}, varchar::Varchar, }; @@ -57,14 +57,6 @@ pub struct Options { /// Default: 1GiB #[cfg_attr(feature = "serde", serde(default = "Options::default_max_segment_size"))] pub max_segment_size: u64, - /// The maximum number of records in a commit. - /// - /// If this number is exceeded, the commit is flushed to disk even without - /// explicitly calling [`Commitlog::flush`]. - /// - /// Default: 1 - #[cfg_attr(feature = "serde", serde(default = "Options::default_max_records_in_commit"))] - pub max_records_in_commit: NonZeroU16, /// Whenever at least this many bytes have been written to the currently /// active segment, an entry is added to its offset index. /// @@ -106,7 +98,6 @@ impl Default for Options { impl Options { pub const DEFAULT_MAX_SEGMENT_SIZE: u64 = 1024 * 1024 * 1024; - pub const DEFAULT_MAX_RECORDS_IN_COMMIT: NonZeroU16 = NonZeroU16::new(1).expect("1 > 0, qed"); pub const DEFAULT_OFFSET_INDEX_INTERVAL_BYTES: NonZeroU64 = NonZeroU64::new(4096).expect("4096 > 0, qed"); pub const DEFAULT_OFFSET_INDEX_REQUIRE_SEGMENT_FSYNC: bool = false; pub const DEFAULT_PREALLOCATE_SEGMENTS: bool = false; @@ -114,7 +105,6 @@ impl Options { pub const DEFAULT: Self = Self { log_format_version: DEFAULT_LOG_FORMAT_VERSION, max_segment_size: Self::default_max_segment_size(), - max_records_in_commit: Self::default_max_records_in_commit(), offset_index_interval_bytes: Self::default_offset_index_interval_bytes(), offset_index_require_segment_fsync: Self::default_offset_index_require_segment_fsync(), preallocate_segments: Self::default_preallocate_segments(), @@ -128,10 +118,6 @@ impl Options { Self::DEFAULT_MAX_SEGMENT_SIZE } - pub const fn default_max_records_in_commit() -> NonZeroU16 { - Self::DEFAULT_MAX_RECORDS_IN_COMMIT - } - pub const fn default_offset_index_interval_bytes() -> NonZeroU64 { Self::DEFAULT_OFFSET_INDEX_INTERVAL_BYTES } @@ -262,7 +248,7 @@ impl Commitlog { pub fn flush(&self) -> io::Result> { let mut inner = self.inner.write().unwrap(); trace!("flush commitlog"); - inner.commit()?; + inner.flush()?; Ok(inner.max_committed_offset()) } @@ -282,7 +268,7 @@ impl Commitlog { pub fn flush_and_sync(&self) -> io::Result> { let mut inner = self.inner.write().unwrap(); trace!("flush and sync commitlog"); - inner.commit()?; + inner.flush()?; inner.sync(); Ok(inner.max_committed_offset()) @@ -383,57 +369,9 @@ impl Commitlog { } impl Commitlog { - /// Append the record `txdata` to the log. - /// - /// If the internal buffer exceeds [`Options::max_records_in_commit`], the - /// argument is returned in an `Err`. The caller should [`Self::flush`] the - /// log and try again. - /// - /// In case the log is appended to from multiple threads, this may result in - /// a busy loop trying to acquire a slot in the buffer. In such scenarios, - /// [`Self::append_maybe_flush`] is preferable. - pub fn append(&self, txdata: T) -> Result<(), T> { - let mut inner = self.inner.write().unwrap(); - inner.append(txdata) - } - - /// Append the record `txdata` to the log. - /// - /// The `txdata` payload is buffered in memory until either: - /// - /// - [`Self::flush`] is called explicitly, or - /// - [`Options::max_records_in_commit`] is exceeded - /// - /// In the latter case, [`Self::append`] flushes implicitly, _before_ - /// appending the `txdata` argument. - /// - /// I.e. the argument is not guaranteed to be flushed after the method - /// returns. If that is desired, [`Self::flush`] must be called explicitly. - /// - /// If writing `txdata` to the commitlog results in a new segment file being opened, - /// we will send a message down `on_new_segment`. - /// This will be hooked up to the `request_snapshot` channel of a `SnapshotWorker`. - /// - /// # Errors - /// - /// If the log needs to be flushed, but an I/O error occurs, ownership of - /// `txdata` is returned back to the caller alongside the [`io::Error`]. - /// - /// The value can then be used to retry appending. - pub fn append_maybe_flush(&self, txdata: T) -> Result<(), error::Append> { + pub fn commit>>(&self, transactions: impl IntoIterator) -> io::Result<()> { let mut inner = self.inner.write().unwrap(); - - if let Err(txdata) = inner.append(txdata) { - if let Err(source) = inner.commit() { - return Err(error::Append { txdata, source }); - } - - // `inner.commit.n` must be zero at this point - let res = inner.append(txdata); - debug_assert!(res.is_ok(), "failed to append while holding write lock"); - } - - Ok(()) + inner.commit(transactions) } /// Obtain an iterator which traverses the log from the start, yielding diff --git a/crates/commitlog/src/repo/mod.rs b/crates/commitlog/src/repo/mod.rs index 5be633bf5c7..2ab5579c8b2 100644 --- a/crates/commitlog/src/repo/mod.rs +++ b/crates/commitlog/src/repo/mod.rs @@ -219,8 +219,6 @@ pub fn create_segment_writer( min_tx_offset: offset, bytes_written: Header::LEN as u64, - max_records_in_commit: opts.max_records_in_commit, - offset_index_head: create_offset_index_writer(repo, offset, opts), }) } @@ -293,8 +291,6 @@ pub fn resume_segment_writer( min_tx_offset: tx_range.start, bytes_written: size_in_bytes, - max_records_in_commit: opts.max_records_in_commit, - offset_index_head: create_offset_index_writer(repo, offset, opts), })) } diff --git a/crates/commitlog/src/segment.rs b/crates/commitlog/src/segment.rs index 7e8c054467b..e2baa07d0b9 100644 --- a/crates/commitlog/src/segment.rs +++ b/crates/commitlog/src/segment.rs @@ -1,7 +1,7 @@ use std::{ fs::File, io::{self, BufWriter, ErrorKind, SeekFrom, Write as _}, - num::{NonZeroU16, NonZeroU64}, + num::NonZeroU64, ops::Range, }; @@ -100,70 +100,52 @@ pub struct Writer { pub(crate) min_tx_offset: u64, pub(crate) bytes_written: u64, - pub(crate) max_records_in_commit: NonZeroU16, - pub(crate) offset_index_head: Option, } impl Writer { - /// Append the record (aka transaction) `T` to the segment. - /// - /// If the number of currently buffered records would exceed `max_records_in_commit` - /// after the method returns, the argument is returned in an `Err` and not - /// appended to this writer's buffer. - /// - /// Otherwise, the `record` is encoded and and stored in the buffer. - /// - /// An `Err` result indicates that [`Self::commit`] should be called in - /// order to flush the buffered records to persistent storage. - pub fn append(&mut self, record: T) -> Result<(), T> { - if self.commit.n == u16::MAX || self.commit.n + 1 > self.max_records_in_commit.get() { - Err(record) - } else { - self.commit.n += 1; - record.encode_record(&mut self.commit.records); - Ok(()) - } - } + pub fn commit>, U: Encode>( + &mut self, + transactions: impl IntoIterator, + ) -> io::Result<()> { + let mut txs = transactions.into_iter().peekable(); + while txs.peek().is_some() { + for tx in txs.by_ref().take(u16::MAX as usize) { + let tx = tx.into(); + let expected_offset = self.commit.min_tx_offset + self.commit.n as u64; + if tx.offset != expected_offset { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + format!("invalid transaction offset {}, expected {}", tx.offset, expected_offset), + )); + } + self.commit.n += 1; + tx.txdata.encode_record(&mut self.commit.records); + } - /// Write the current [`Commit`] to the underlying [`io::Write`]. - /// - /// Will do nothing if the current commit is empty (i.e. `Commit::n` is zero). - /// In this case, `None` is returned. - /// - /// Otherwise `Some` [`Committed`] is returned, providing some metadata about - /// the commit. - pub fn commit(&mut self) -> io::Result> { - if self.commit.n == 0 { - return Ok(None); - } - let checksum = self.commit.write(&mut self.inner)?; - self.inner.flush()?; - - let commit_len = self.commit.encoded_len() as u64; - self.offset_index_head.as_mut().map(|index| { - debug!( - "append_after commit min_tx_offset={} bytes_written={} commit_len={}", - self.commit.min_tx_offset, self.bytes_written, commit_len - ); - index - .append_after_commit(self.commit.min_tx_offset, self.bytes_written, commit_len) - .map_err(|e| { - debug!("failed to append to offset index: {e:?}"); - }) - }); + let _checksum = self + .commit + .write(&mut self.inner) + .unwrap_or_else(|e| panic!("failed to write commit {}: {:#}", self.commit.min_tx_offset, e)); + let commit_len = self.commit.encoded_len() as u64; - let tx_range_start = self.commit.min_tx_offset; + if let Some(index) = self.offset_index_head.as_mut() { + let _ = index + .append_after_commit(self.commit.min_tx_offset, self.bytes_written, commit_len) + .inspect_err(|e| debug!("failed to append to offset index: {e}")); + } - self.bytes_written += commit_len; - self.commit.min_tx_offset += self.commit.n as u64; - self.commit.n = 0; - self.commit.records.clear(); + self.bytes_written += commit_len; + self.commit.min_tx_offset += self.commit.n as u64; + self.commit.n = 0; + self.commit.records.clear(); + } - Ok(Some(Committed { - tx_range: tx_range_start..self.commit.min_tx_offset, - checksum, - })) + Ok(()) + } + + pub fn flush(&mut self) -> io::Result<()> { + self.inner.flush() } /// Get the current epoch. @@ -537,6 +519,12 @@ pub struct Transaction { pub txdata: T, } +impl From<(u64, T)> for Transaction { + fn from((offset, txdata): (u64, T)) -> Self { + Self { offset, txdata } + } +} + pub struct Commits { pub header: Header, reader: R, @@ -744,16 +732,14 @@ impl Metadata { #[cfg(test)] mod tests { - use std::num::NonZeroU16; - - use super::*; - use crate::{payload::ArrayDecoder, repo, Options}; use itertools::Itertools; use pretty_assertions::assert_matches; - use proptest::prelude::*; use spacetimedb_paths::server::CommitLogDir; use tempfile::tempdir; + use super::*; + use crate::{payload::ArrayDecoder, repo, Options}; + #[test] fn header_roundtrip() { let hdr = Header { @@ -772,20 +758,9 @@ mod tests { fn write_read_roundtrip() { let repo = repo::Memory::unlimited(); - let mut writer = repo::create_segment_writer( - &repo, - Options { - max_records_in_commit: NonZeroU16::new(4).unwrap(), - ..<_>::default() - }, - Commit::DEFAULT_EPOCH, - 0, - ) - .unwrap(); - writer.append([0; 32]).unwrap(); - writer.append([1; 32]).unwrap(); - writer.append([2; 32]).unwrap(); - writer.commit().unwrap(); + let mut writer = repo::create_segment_writer(&repo, <_>::default(), Commit::DEFAULT_EPOCH, 0).unwrap(); + writer.commit([(0, [0; 32]), (1, [1; 32]), (2, [2; 32])]).unwrap(); + writer.flush().unwrap(); let reader = repo::open_segment_reader(&repo, DEFAULT_LOG_FORMAT_VERSION, 0).unwrap(); let header = reader.header; @@ -810,27 +785,15 @@ mod tests { fn metadata() { let repo = repo::Memory::unlimited(); - let mut writer = repo::create_segment_writer( - &repo, - Options { - max_records_in_commit: NonZeroU16::new(3).unwrap(), - ..<_>::default() - }, - Commit::DEFAULT_EPOCH, - 0, - ) - .unwrap(); + let mut writer = repo::create_segment_writer(&repo, <_>::default(), Commit::DEFAULT_EPOCH, 0).unwrap(); // Commit 0..2 - writer.append([0; 32]).unwrap(); - writer.append([0; 32]).unwrap(); - writer.commit().unwrap(); + writer.commit([(0, [0; 32]), (1, [0; 32])]).unwrap(); // Commit 2..3 - writer.append([1; 32]).unwrap(); - writer.commit().unwrap(); + writer.commit([(2, [1; 32])]).unwrap(); // Commit 3..5 - writer.append([2; 32]).unwrap(); - writer.append([2; 32]).unwrap(); - writer.commit().unwrap(); + writer.commit([(3, [2; 32]), (4, [2; 32])]).unwrap(); + + writer.flush().unwrap(); let reader = repo::open_segment_reader(&repo, DEFAULT_LOG_FORMAT_VERSION, 0).unwrap(); let metadata = reader.metadata().unwrap(); @@ -851,37 +814,32 @@ mod tests { #[test] fn commits() { let repo = repo::Memory::unlimited(); - let commits = vec![vec![[1; 32], [2; 32]], vec![[3; 32]], vec![[4; 32], [5; 32]]]; + let commits = vec![ + vec![(0, [1; 32]), (1, [2; 32])], + vec![(2, [3; 32])], + vec![(3, [4; 32]), (4, [5; 32])], + ]; - let mut writer = repo::create_segment_writer( - &repo, - Options { - max_records_in_commit: NonZeroU16::new(3).unwrap(), - ..<_>::default() - }, - Commit::DEFAULT_EPOCH, - 0, - ) - .unwrap(); + let mut writer = repo::create_segment_writer(&repo, <_>::default(), Commit::DEFAULT_EPOCH, 0).unwrap(); for commit in &commits { - for tx in commit { - writer.append(*tx).unwrap(); - } - writer.commit().unwrap(); + writer.commit(commit.clone()).unwrap(); } + writer.flush().unwrap(); + let reader = repo::open_segment_reader(&repo, DEFAULT_LOG_FORMAT_VERSION, 0).unwrap(); let mut commits1 = Vec::with_capacity(commits.len()); let mut min_tx_offset = 0; for txs in commits { + let n = txs.len(); commits1.push(Commit { min_tx_offset, - n: txs.len() as u16, - records: txs.concat(), + n: n as u16, + records: itertools::concat(txs.into_iter().map(|(_, payload)| payload.to_vec())), epoch: 0, }); - min_tx_offset += txs.len() as u64; + min_tx_offset += n as u64; } let commits2 = reader .commits() @@ -894,73 +852,25 @@ mod tests { #[test] fn transactions() { let repo = repo::Memory::unlimited(); - let commits = vec![vec![[1; 32], [2; 32]], vec![[3; 32]], vec![[4; 32], [5; 32]]]; + let commits = vec![ + vec![(0, [1; 32]), (1, [2; 32])], + vec![(2, [3; 32])], + vec![(3, [4; 32]), (4, [5; 32])], + ]; - let mut writer = repo::create_segment_writer( - &repo, - Options { - max_records_in_commit: NonZeroU16::new(3).unwrap(), - ..<_>::default() - }, - Commit::DEFAULT_EPOCH, - 0, - ) - .unwrap(); + let mut writer = repo::create_segment_writer(&repo, <_>::default(), Commit::DEFAULT_EPOCH, 0).unwrap(); for commit in &commits { - for tx in commit { - writer.append(*tx).unwrap(); - } - writer.commit().unwrap(); + writer.commit(commit.clone()).unwrap(); } + writer.flush().unwrap(); + let reader = repo::open_segment_reader(&repo, DEFAULT_LOG_FORMAT_VERSION, 0).unwrap(); let txs = reader .transactions(&ArrayDecoder) .collect::, _>>() .unwrap(); - assert_eq!( - txs, - commits - .into_iter() - .flatten() - .enumerate() - .map(|(offset, txdata)| Transaction { - offset: offset as u64, - txdata - }) - .collect::>() - ); - } - - proptest! { - #[test] - fn max_records_in_commit(max_records_in_commit in any::()) { - let mut writer = Writer { - commit: Commit::default(), - inner: BufWriter::new(Vec::new()), - - min_tx_offset: 0, - bytes_written: 0, - - max_records_in_commit, - - offset_index_head: None, - }; - - for i in 0..max_records_in_commit.get() { - assert!( - writer.append([0; 16]).is_ok(), - "less than {} records written: {}", - max_records_in_commit.get(), - i - ); - } - assert!( - writer.append([0; 16]).is_err(), - "more than {} records written", - max_records_in_commit.get() - ); - } + assert_eq!(txs, commits.into_iter().flatten().map(Into::into).collect::>()); } #[test] @@ -972,20 +882,14 @@ mod tests { min_tx_offset: 0, bytes_written: 0, - max_records_in_commit: NonZeroU16::MAX, offset_index_head: None, }; assert_eq!(0, writer.next_tx_offset()); - writer.append([0; 16]).unwrap(); - assert_eq!(0, writer.next_tx_offset()); - writer.commit().unwrap(); - assert_eq!(1, writer.next_tx_offset()); - writer.commit().unwrap(); + writer.commit([(0, [0; 16])]).unwrap(); assert_eq!(1, writer.next_tx_offset()); - writer.append([1; 16]).unwrap(); - writer.append([1; 16]).unwrap(); - writer.commit().unwrap(); + writer.commit([(1, [1; 16])]).unwrap(); + writer.commit([(2, [1; 16])]).unwrap(); assert_eq!(3, writer.next_tx_offset()); } diff --git a/crates/commitlog/src/tests/helpers.rs b/crates/commitlog/src/tests/helpers.rs index c645b2cdc65..b7740b92e5b 100644 --- a/crates/commitlog/src/tests/helpers.rs +++ b/crates/commitlog/src/tests/helpers.rs @@ -1,4 +1,4 @@ -use std::{fmt::Debug, num::NonZeroU16}; +use std::fmt::Debug; use env_logger::Env; @@ -13,7 +13,6 @@ pub fn mem_log(max_segment_size: u64) -> commitlog::Generic(ShortMem::new(800)); - let total_commits = 100; - let total_txs = fill_log_enospc(&mut log, total_commits, (1..=10).cycle()); - - assert_eq!( - total_txs, - log.transactions_from(0, &payload::ArrayDecoder) - .map(Result::unwrap) - .count() - ); - assert_eq!(total_commits, log.commits_from(0).map(Result::unwrap).count()); -} - -// Note: Write errors cause the in-flight commit to be written to a fresh -// segment. So as long as we write through the public API, partial writes -// never surface (i.e. the log is contiguous). -#[test] -fn reopen() { - enable_logging(); - - let repo = ShortMem::new(800); - let num_commits = 10; - - let mut total_txs = 0; - for i in 0..2 { - let mut log = open_log::<[u8; 32]>(repo.clone()); - total_txs += fill_log_enospc(&mut log, num_commits, (1..=10).cycle()); - - debug!("fill {} done", i + 1); - } - - assert_eq!( - total_txs, - open_log::<[u8; 32]>(repo.clone()) - .transactions_from(0, &payload::ArrayDecoder) - .map(Result::unwrap) - .count() - ); - - // Let's see if we hit a funny case in any of the segments. - for offset in repo.existing_offsets().unwrap().into_iter().rev() { - let meta = repo::open_segment_reader(&repo, DEFAULT_LOG_FORMAT_VERSION, offset) - .unwrap() - .metadata() - .unwrap(); - debug!("dropping segment: segment::{meta:?}"); - repo.remove_segment(offset).unwrap(); - assert_eq!( - meta.tx_range.start, - open_log::<[u8; 32]>(repo.clone()) - .transactions_from(0, &payload::ArrayDecoder) - .map(Result::unwrap) - .count() as u64 - ); - } -} - -#[test] -fn overwrite_reopen() { - enable_logging(); - - let repo = ShortMem::new(800); - let num_commits = 10; - let txs_per_commit = 5; - - let mut log = open_log::<[u8; 32]>(repo.clone()); - let mut total_txs = fill_log_enospc(&mut log, num_commits, repeat(txs_per_commit)); - - let last_segment_offset = repo.existing_offsets().unwrap().last().copied().unwrap(); - let last_commit: Commit = repo::open_segment_reader(&repo, DEFAULT_LOG_FORMAT_VERSION, last_segment_offset) - .unwrap() - .commits() - .map(Result::unwrap) - .last() - .unwrap() - .into(); - debug!("last commit: {last_commit:?}"); - - { - let mut last_segment = repo.open_segment_writer(last_segment_offset).unwrap(); - let pos = last_segment.len() - last_commit.encoded_len() + 1; - last_segment.modify_byte_at(pos, |_| 255); - } - - let mut log = open_log::<[u8; 32]>(repo.clone()); - for (i, commit) in log.commits_from(0).enumerate() { - if i < num_commits - 1 { - commit.expect("all but last commit should be good"); - } else { - let last_good_offset = txs_per_commit * (num_commits - 1); - assert!( - matches!( - commit, - Err(error::Traversal::Checksum { offset, .. }) if offset == last_good_offset as u64, - ), - "expected checksum error with offset={last_good_offset}: {commit:?}" - ); - } + for i in 0..100 { + log.commit([(i, [b'z'; 32])]).unwrap(); } - - // Write some more data. - total_txs += fill_log_enospc(&mut log, num_commits, repeat(txs_per_commit)); - // Log should be contiguous, but missing one corrupted commit. - assert_eq!( - total_txs - txs_per_commit, - log.transactions_from(0, &payload::ArrayDecoder) - .map(Result::unwrap) - .count() - ); - // Check that this is true if we reopen the log. - assert_eq!( - total_txs - txs_per_commit, - open_log::<[u8; 32]>(repo) - .transactions_from(0, &payload::ArrayDecoder) - .map(Result::unwrap) - .count() - ); } /// Edge case surfaced in production: @@ -154,7 +41,6 @@ fn first_commit_in_last_segment_corrupt() { let repo = repo::Memory::unlimited(); let options = Options { max_segment_size: 512, - max_records_in_commit: NonZeroU16::new(1).unwrap(), ..<_>::default() }; { @@ -180,7 +66,6 @@ fn open_log(repo: ShortMem) -> commitlog::Generic { repo, Options { max_segment_size: 1024, - max_records_in_commit: NonZeroU16::new(10).unwrap(), ..Options::default() }, ) @@ -197,16 +82,6 @@ struct ShortSegment { max_len: u64, } -impl ShortSegment { - pub fn len(&self) -> usize { - self.inner.len() - } - - pub fn modify_byte_at(&mut self, pos: usize, f: impl FnOnce(u8) -> u8) { - self.inner.modify_byte_at(pos, f); - } -} - impl SegmentLen for ShortSegment { fn segment_len(&mut self) -> io::Result { self.inner.segment_len() @@ -314,38 +189,3 @@ impl Repo for ShortMem { self.inner.existing_offsets() } } - -/// Like [`crate::tests::helpers::fill_log`], but expect that ENOSPC happens at -/// least once. -fn fill_log_enospc( - log: &mut commitlog::Generic, - num_commits: usize, - txs_per_commit: impl Iterator, -) -> usize -where - T: Debug + Default + Encode, -{ - let mut seen_enospc = false; - - let mut total_txs = 0; - for (_, n) in (0..num_commits).zip(txs_per_commit) { - for _ in 0..n { - log.append(T::default()).unwrap(); - total_txs += 1; - } - let res = log.commit(); - if let Err(Some(os)) = res.as_ref().map_err(|e| e.raw_os_error()) { - if os == ENOSPC { - debug!("fill: ignoring ENOSPC"); - seen_enospc = true; - log.commit().unwrap(); - continue; - } - } - res.unwrap(); - } - - assert!(seen_enospc, "expected to see ENOSPC"); - - total_txs -} diff --git a/crates/commitlog/tests/random_payload/mod.rs b/crates/commitlog/tests/random_payload/mod.rs index 85ab653480d..49e47ed42d1 100644 --- a/crates/commitlog/tests/random_payload/mod.rs +++ b/crates/commitlog/tests/random_payload/mod.rs @@ -1,5 +1,3 @@ -use std::num::NonZeroU16; - use log::info; use spacetimedb_commitlog::tests::helpers::enable_logging; use spacetimedb_commitlog::{payload, Commitlog, Options}; @@ -18,7 +16,6 @@ fn smoke() { CommitLogDir::from_path_unchecked(root.path()), Options { max_segment_size: 8 * 1024, - max_records_in_commit: NonZeroU16::MIN, ..Options::default() }, None, @@ -27,18 +24,18 @@ fn smoke() { let n_txs = 500; let payload = gen_payload(); - for _ in 0..n_txs { - clog.append_maybe_flush(payload).unwrap(); + for i in 0..n_txs { + clog.commit([(i, payload)]).unwrap(); } let committed_offset = clog.flush_and_sync().unwrap(); - assert_eq!(n_txs - 1, committed_offset.unwrap() as usize); + assert_eq!(n_txs - 1, committed_offset.unwrap()); assert_eq!( - n_txs, + n_txs as usize, clog.transactions(&payload::ArrayDecoder).map(Result::unwrap).count() ); // We set max_records_in_commit to 1, so n_commits == n_txs - assert_eq!(n_txs, clog.commits().map(Result::unwrap).count()); + assert_eq!(n_txs as usize, clog.commits().map(Result::unwrap).count()); } #[test] @@ -48,7 +45,6 @@ fn resets() { CommitLogDir::from_path_unchecked(root.path()), Options { max_segment_size: 512, - max_records_in_commit: NonZeroU16::MIN, ..Options::default() }, None, @@ -56,8 +52,8 @@ fn resets() { .unwrap(); let payload = gen_payload(); - for _ in 0..50 { - clog.append_maybe_flush(payload).unwrap(); + for i in 0..50 { + clog.commit([(i, payload)]).unwrap(); } clog.flush_and_sync().unwrap(); @@ -88,7 +84,6 @@ fn compression() { CommitLogDir::from_path_unchecked(root.path()), Options { max_segment_size: 8 * 1024, - max_records_in_commit: NonZeroU16::MIN, ..Options::default() }, None, @@ -98,8 +93,8 @@ fn compression() { // try to generate commitlogs that will be amenable to compression - // random data doesn't compress well, so try and have there be repetition let payloads = (0..4).map(|_| gen_payload()).cycle().take(1024).collect::>(); - for payload in &payloads { - clog.append_maybe_flush(*payload).unwrap(); + for (i, payload) in payloads.iter().enumerate() { + clog.commit([(i as u64, *payload)]).unwrap(); } clog.flush_and_sync().unwrap(); diff --git a/crates/commitlog/tests/streaming/mod.rs b/crates/commitlog/tests/streaming/mod.rs index 0474ae75b30..3c2077bacaf 100644 --- a/crates/commitlog/tests/streaming/mod.rs +++ b/crates/commitlog/tests/streaming/mod.rs @@ -1,6 +1,6 @@ use std::{ io, - num::{NonZeroU16, NonZeroU64}, + num::NonZeroU64, ops::RangeBounds, path::{Path, PathBuf}, }; @@ -183,7 +183,6 @@ async fn assert_equal_dirs(src: &Path, dst: &Path) { fn default_options() -> Options { Options { max_segment_size: 8 * 1024, - max_records_in_commit: NonZeroU16::MIN, // Write an index entry for every commit. offset_index_interval_bytes: NonZeroU64::new(256).unwrap(), offset_index_require_segment_fsync: false, @@ -195,8 +194,8 @@ async fn fill_log(path: PathBuf) { spawn_blocking(move || { let clog = Commitlog::open(CommitLogDir::from_path_unchecked(path), default_options(), None).unwrap(); let payload = random_payload::gen_payload(); - for _ in 0..100 { - clog.append_maybe_flush(payload).unwrap(); + for i in 0..100 { + clog.commit([(i, payload)]).unwrap(); } clog.flush_and_sync().unwrap(); }) diff --git a/crates/core/src/db/durability.rs b/crates/core/src/db/durability.rs index 0ea88bdb4b9..1476b56edbf 100644 --- a/crates/core/src/db/durability.rs +++ b/crates/core/src/db/durability.rs @@ -8,7 +8,7 @@ use spacetimedb_commitlog::payload::{ }; use spacetimedb_data_structures::map::IntSet; use spacetimedb_datastore::{execution_context::ReducerContext, traits::TxData}; -use spacetimedb_durability::{DurableOffset, TxOffset}; +use spacetimedb_durability::{DurableOffset, Transaction, TxOffset}; use spacetimedb_lib::Identity; use spacetimedb_primitives::TableId; use tokio::{ @@ -199,14 +199,14 @@ impl DurabilityWorkerActor { } pub fn do_durability(durability: &Durability, reducer_context: Option, tx_data: &TxData) { - if tx_data.tx_offset().is_none() { + let Some(tx_offset) = tx_data.tx_offset() else { let name = reducer_context.as_ref().map(|rcx| &*rcx.name); debug_assert!( !tx_data.has_rows_or_connect_disconnect(name), "tx_data has no rows but has connect/disconnect: `{name:?}`" ); return; - } + }; let is_persistent_table = |table_id: &TableId| -> bool { !tx_data.is_ephemeral_table(table_id) }; @@ -252,9 +252,14 @@ impl DurabilityWorkerActor { }), }; - // TODO: Should measure queuing time + actual write // This does not block, as per trait docs. - durability.append_tx(txdata); + durability.commit( + [Transaction { + offset: tx_offset, + txdata, + }] + .into(), + ); } } @@ -292,9 +297,12 @@ mod tests { impl spacetimedb_durability::Durability for CountingDurability { type TxData = Txdata; - fn append_tx(&self, _tx: Self::TxData) { + fn commit(&self, txs: Box<[Transaction]>) { + let Some(max_offset) = txs.iter().map(|x| x.offset).max() else { + return; + }; self.appended.send_modify(|offset| { - *offset = offset.map(|x| x + 1).or(Some(0)); + offset.replace(max_offset); }); } diff --git a/crates/core/src/db/relational_db.rs b/crates/core/src/db/relational_db.rs index b409a8121cd..60f8e6bea97 100644 --- a/crates/core/src/db/relational_db.rs +++ b/crates/core/src/db/relational_db.rs @@ -3,7 +3,7 @@ use crate::db::MetricsRecorderQueue; use crate::error::{DBError, RestoreSnapshotError}; use crate::messages::control_db::HostType; use crate::subscription::ExecutionCounters; -use crate::util::{asyncify, spawn_rayon}; +use crate::util::asyncify; use crate::worker_metrics::WORKER_METRICS; use anyhow::{anyhow, Context}; use enum_map::EnumMap; @@ -1716,7 +1716,6 @@ pub async fn local_durability( snapshot_worker: Option<&SnapshotWorker>, ) -> Result<(LocalDurability, DiskSizeFn), DBError> { let rt = tokio::runtime::Handle::current(); - // TODO: Should this better be spawn_blocking? let on_new_segment = snapshot_worker.map(|snapshot_worker| { let snapshot_worker = snapshot_worker.clone(); Arc::new(move || { @@ -1725,17 +1724,11 @@ pub async fn local_durability( snapshot_worker.request_snapshot_ignore_closed(); }) as Arc }); - let local = spawn_rayon(move || { + let local = asyncify(move || { durability::Local::open( replica_dir.clone(), rt, - durability::local::Options { - commitlog: commitlog::Options { - max_records_in_commit: 1.try_into().unwrap(), - ..Default::default() - }, - ..Default::default() - }, + <_>::default(), // Give the durability a handle to request a new snapshot run, // which it will send down whenever we rotate commitlog segments. on_new_segment, diff --git a/crates/core/src/subscription/module_subscription_actor.rs b/crates/core/src/subscription/module_subscription_actor.rs index 38df7cea554..5d2c494cba2 100644 --- a/crates/core/src/subscription/module_subscription_actor.rs +++ b/crates/core/src/subscription/module_subscription_actor.rs @@ -1466,7 +1466,7 @@ mod tests { use spacetimedb_commitlog::{commitlog, repo}; use spacetimedb_data_structures::map::{HashCollectionExt as _, HashMap}; use spacetimedb_datastore::system_tables::{StRowLevelSecurityRow, ST_ROW_LEVEL_SECURITY_ID}; - use spacetimedb_durability::{Durability, EmptyHistory, TxOffset}; + use spacetimedb_durability::{Durability, EmptyHistory, Transaction, TxOffset}; use spacetimedb_execution::dml::MutDatastore; use spacetimedb_lib::bsatn::ToBsatn; use spacetimedb_lib::db::auth::StAccess; @@ -1551,13 +1551,10 @@ mod tests { impl Durability for ManualDurability { type TxData = Txdata; - fn append_tx(&self, tx: Self::TxData) { + fn commit(&self, txs: Box<[Transaction]>) { let mut commitlog = self.commitlog.write().unwrap(); - if let Err(tx) = commitlog.append(tx) { - commitlog.commit().expect("error flushing commitlog"); - commitlog.append(tx).expect("should be able to append after flush"); - } - commitlog.commit().expect("error flushing commitlog"); + commitlog.commit(txs).expect("commit failed"); + commitlog.flush().expect("error flushing commitlog"); } fn durable_tx_offset(&self) -> spacetimedb_durability::DurableOffset { diff --git a/crates/durability/src/imp/local.rs b/crates/durability/src/imp/local.rs index e3a8afa34ac..f0c6daf474f 100644 --- a/crates/durability/src/imp/local.rs +++ b/crates/durability/src/imp/local.rs @@ -1,7 +1,5 @@ use std::{ io, - num::NonZeroU16, - panic, path::PathBuf, sync::{ atomic::{AtomicU64, Ordering::Relaxed}, @@ -80,7 +78,7 @@ pub struct Local { /// [`PersisterTask`]. /// /// Note that this is unbounded! - queue: mpsc::UnboundedSender>, + queue: mpsc::UnboundedSender>]>>, /// How many transactions are sitting in the `queue`. /// /// This is mainly for observability purposes, and can thus be updated with @@ -132,7 +130,6 @@ impl Local { queue_depth: queue_depth.clone(), sync_interval: opts.sync_interval, - max_records_in_commit: opts.commitlog.max_records_in_commit, lock, } @@ -191,7 +188,6 @@ struct Actor { queue_depth: Arc, sync_interval: Duration, - max_records_in_commit: NonZeroU16, #[allow(unused)] lock: Lock, @@ -201,7 +197,7 @@ impl Actor { #[instrument(name = "durability::local::actor", skip_all)] async fn run( self, - mut txdata_rx: mpsc::UnboundedReceiver>, + mut commits_rx: mpsc::UnboundedReceiver>]>>, mut shutdown_rx: mpsc::Receiver>, ) { info!("starting durability actor"); @@ -224,7 +220,7 @@ impl Actor { biased; Some(reply) = shutdown_rx.recv() => { - txdata_rx.close(); + commits_rx.close(); let _ = reply.send(self.lock.notified()); }, @@ -235,21 +231,16 @@ impl Actor { } }, - data = txdata_rx.recv() => { - let Some(txdata) = data else { + commit = commits_rx.recv() => { + let Some(commit) = commit else { break; }; self.queue_depth.fetch_sub(1, Relaxed); - // If we are writing one commit per tx, trying to buffer is - // fairly pointless. Immediately flush instead. - // - // Otherwise, try `Commitlog::append` as a fast-path - // that doesn't require `spawn_blocking`. - if self.max_records_in_commit.get() == 1 { - self.flush_append(txdata, true).await; - } else if let Err(retry) = self.clog.append(txdata) { - self.flush_append(retry, false).await - } + let clog = self.clog.clone(); + spawn_blocking(move || clog.commit(commit)) + .await + .expect("commitlog write panicked") + .expect("commitlog write failed"); }, } } @@ -261,30 +252,6 @@ impl Actor { info!("exiting durability actor"); } - #[instrument(skip_all)] - async fn flush_append(&self, txdata: Txdata, flush_after: bool) { - let clog = self.clog.clone(); - let span = Span::current(); - spawn_blocking(move || { - let _span = span.enter(); - let mut retry = Some(txdata); - while let Some(txdata) = retry.take() { - if let Err(error::Append { txdata, source }) = clog.append_maybe_flush(txdata) { - flush_error("append-maybe-flush", &source); - retry = Some(txdata); - } - } - - if flush_after { - clog.flush() - .map(drop) - .unwrap_or_else(|e| flush_error("flush-after", &e)); - } - }) - .await - .expect("commitlog append blocking task panicked") - } - #[instrument(skip_all)] async fn flush_and_sync(&self) -> io::Result> { // Skip if nothing changed. @@ -302,7 +269,7 @@ impl Actor { }) .await .expect("commitlog flush-and-sync blocking task panicked") - .inspect_err(|e| flush_error("flush-and-sync", e)) + .inspect_err(|e| warn!("error flushing commitlog: {e:#}")) .inspect(|maybe_offset| { if let Some(new_offset) = maybe_offset { trace!("synced to offset {new_offset}"); @@ -342,25 +309,11 @@ impl Drop for Lock { } } -/// Handle an error flushing the commitlog. -/// -/// Panics if the error indicates that the log may be permanently unwritable. -#[inline] -#[track_caller] -fn flush_error(task: &str, e: &io::Error) { - warn!("error flushing commitlog ({task}): {e:?}"); - if matches!(e.kind(), io::ErrorKind::AlreadyExists | io::ErrorKind::StorageFull) { - panic!("{e}"); - } -} - impl Durability for Local { type TxData = Txdata; - fn append_tx(&self, tx: Self::TxData) { - if self.queue.send(tx).is_err() { - panic!("durability actor crashed"); - } + fn commit(&self, txs: Box<[Transaction]>) { + self.queue.send(txs).expect("durability actor crashed"); self.queue_depth.fetch_add(1, Relaxed); } diff --git a/crates/durability/src/imp/mod.rs b/crates/durability/src/imp/mod.rs index 6f83106d0b3..562b9b56eab 100644 --- a/crates/durability/src/imp/mod.rs +++ b/crates/durability/src/imp/mod.rs @@ -15,7 +15,7 @@ mod testing { use futures::FutureExt as _; use tokio::sync::watch; - use crate::{Close, Durability, DurableOffset, TxOffset}; + use crate::{Close, Durability, DurableOffset, Transaction, TxOffset}; /// A [`Durability`] impl that sends all transactions into the void. /// @@ -41,7 +41,7 @@ mod testing { impl Durability for NoDurability { type TxData = T; - fn append_tx(&self, _: Self::TxData) { + fn commit(&self, _: Box<[Transaction]>) { if self.closed.load(Ordering::Relaxed) { panic!("`close` was called on this `NoDurability` instance"); } diff --git a/crates/durability/src/lib.rs b/crates/durability/src/lib.rs index 2038d90eeb9..0d40bcd76ad 100644 --- a/crates/durability/src/lib.rs +++ b/crates/durability/src/lib.rs @@ -104,19 +104,9 @@ pub trait Durability: Send + Sync { /// The payload representing a single transaction. type TxData; - /// Submit the transaction payload to be made durable. - /// - /// This method must never block, and accept new transactions even if they - /// cannot be made durable immediately. - /// - /// A permanent failure of the durable storage may be signalled by panicking. - fn append_tx(&self, tx: Self::TxData); + fn commit(&self, txs: Box<[Transaction]>); - /// The [`TxOffset`] considered durable. - /// - /// A `None` return value indicates that the durable offset is not known, - /// either because nothing has been persisted yet, or because the status - /// cannot be retrieved. + /// Obtain a handle to the [DurableOffset]. fn durable_tx_offset(&self) -> DurableOffset; /// Asynchronously request the durability to shut down, without dropping it. From a0de7d99840acfa562a689822bafe73b56c3ce21 Mon Sep 17 00:00:00 2001 From: Kim Altintop Date: Tue, 27 Jan 2026 18:06:13 +0100 Subject: [PATCH 02/13] Restore some commentary --- crates/durability/src/lib.rs | 21 ++++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/crates/durability/src/lib.rs b/crates/durability/src/lib.rs index 0d40bcd76ad..43c236a6388 100644 --- a/crates/durability/src/lib.rs +++ b/crates/durability/src/lib.rs @@ -95,15 +95,26 @@ pub type Close = BoxFuture<'static, Option>; /// /// NOTE: This is a preliminary definition, still under consideration. /// -/// A durability implementation accepts a payload representing a single database -/// transaction via [`Durability::append_tx`] in a non-blocking fashion. The -/// payload _should_ become durable eventually. [`TxOffset`]s reported by -/// [`Durability::durable_tx_offset`] shall be considered durable to the -/// extent the implementation can guarantee. +/// A durability implementation accepts one or more [Transaction]s to be made +/// durable via [Durability::commit] in a non-blocking fashion. +/// +/// A batch of transactions is eventually made durable atomically. +/// Note that this means that a torn write can render the whole batch +/// inaccessible, so small batches are usually preferable. +/// +/// Once a transaction becomes durable, the [DurableOffset] is updated. +/// What durable means depends on the implementation, informally it can be +/// thought of as "written to disk". pub trait Durability: Send + Sync { /// The payload representing a single transaction. type TxData; + /// Submit a batch of [Transaction]s to be made durable. + /// + /// This method must never block, and accept new transactions even if they + /// cannot be made durable immediately. + /// + /// Errors may be signalled by panicking. fn commit(&self, txs: Box<[Transaction]>); /// Obtain a handle to the [DurableOffset]. From e73abcc5226d4562a52e7e7e777fc1e2e6a106c2 Mon Sep 17 00:00:00 2001 From: Kim Altintop Date: Wed, 28 Jan 2026 10:26:20 +0100 Subject: [PATCH 03/13] Clear commit before returning error --- crates/commitlog/src/segment.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/crates/commitlog/src/segment.rs b/crates/commitlog/src/segment.rs index e2baa07d0b9..d0acb3eb8ce 100644 --- a/crates/commitlog/src/segment.rs +++ b/crates/commitlog/src/segment.rs @@ -114,6 +114,9 @@ impl Writer { let tx = tx.into(); let expected_offset = self.commit.min_tx_offset + self.commit.n as u64; if tx.offset != expected_offset { + self.commit.n = 0; + self.commit.records.clear(); + return Err(io::Error::new( io::ErrorKind::InvalidInput, format!("invalid transaction offset {}, expected {}", tx.offset, expected_offset), From d5b29ccdd957a7b00c849b4dcd2140c10d67a15a Mon Sep 17 00:00:00 2001 From: Kim Altintop Date: Wed, 28 Jan 2026 10:27:49 +0100 Subject: [PATCH 04/13] More commentary --- crates/commitlog/src/commitlog.rs | 41 +++++++++++++++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/crates/commitlog/src/commitlog.rs b/crates/commitlog/src/commitlog.rs index 03d2bb0ef38..ce3d72f2899 100644 --- a/crates/commitlog/src/commitlog.rs +++ b/crates/commitlog/src/commitlog.rs @@ -270,6 +270,47 @@ impl Generic { } impl Generic { + /// Write `transactions` to the log. + /// + /// This will store all transactions as a single [Commit] if possible + /// (if the iterator contains more than `u16::MAX` elements, additional + /// commits are created). + /// + /// Data is buffered by the underlying segment [Writer], so not all data + /// submitted here may have been written to disk when this method returns. + /// Call [Self::flush] to force flushing to the OS. + /// + /// If, after writing the transactions, the writer's total written bytes + /// exceed [Options::max_segment_size], the current segment is flushed, + /// `fsync`ed and closed, and a new segment is created. + /// + /// # Errors + /// + /// An `Err` value is returned in the following cases: + /// + /// - if the transaction sequence is invalid, e.g. because the transaction + /// offsets are not contiguous. + /// + /// In this case, the current commit will **not** be written. + /// If the input does not fit in a single commit, _some_ commits may have + /// been written when the invalid input is encountered. + /// + /// - if the current segment needs to be rotated, and an I/O error occurs + /// flushing it to storage. + /// + /// - if creating the new segment fails due to an I/O error. + /// + /// # Panics + /// + /// The method panics if: + /// + /// - writing to the underlying [Writer] fails + /// + /// This is likely caused by some storage issue. As we cannot tell with + /// certainty how much data (if any) has been written, the internal state + /// becomes invalid and thus a panic is raised. + /// + /// - if [Self::sync] panics (called when rotating segments) pub fn commit>>(&mut self, transactions: impl IntoIterator) -> io::Result<()> { self.panicked = true; let writer = &mut self.head; From de918d58d702bdf1efdf798eb72bcf65e7224c8d Mon Sep 17 00:00:00 2001 From: Kim Altintop Date: Wed, 28 Jan 2026 11:19:54 +0100 Subject: [PATCH 05/13] Panic if >u16::MAX transactions Allowing to restore `Committed` return --- crates/commitlog/src/commitlog.rs | 24 ++++++---- crates/commitlog/src/segment.rs | 77 ++++++++++++++++++------------- 2 files changed, 59 insertions(+), 42 deletions(-) diff --git a/crates/commitlog/src/commitlog.rs b/crates/commitlog/src/commitlog.rs index ce3d72f2899..9b0e8104049 100644 --- a/crates/commitlog/src/commitlog.rs +++ b/crates/commitlog/src/commitlog.rs @@ -272,12 +272,10 @@ impl Generic { impl Generic { /// Write `transactions` to the log. /// - /// This will store all transactions as a single [Commit] if possible - /// (if the iterator contains more than `u16::MAX` elements, additional - /// commits are created). + /// This will store all transactions as a single [Commit] + /// (note that `transactions` must not yield more than [u16::MAX] elements). /// - /// Data is buffered by the underlying segment [Writer], so not all data - /// submitted here may have been written to disk when this method returns. + /// Data is buffered by the underlying segment [Writer]. /// Call [Self::flush] to force flushing to the OS. /// /// If, after writing the transactions, the writer's total written bytes @@ -291,26 +289,34 @@ impl Generic { /// - if the transaction sequence is invalid, e.g. because the transaction /// offsets are not contiguous. /// - /// In this case, the current commit will **not** be written. - /// If the input does not fit in a single commit, _some_ commits may have - /// been written when the invalid input is encountered. + /// In this case, **none** of the `transactions` will be written. /// /// - if the current segment needs to be rotated, and an I/O error occurs /// flushing it to storage. /// + /// In this case, unwritten data remains buffered, and the current segment + /// remains open. Calling [Self::flush] afterwards may (or may not) + /// succeed, and calling [Self::commit] again with new data could grow + /// the segment further beyond [Options::max_segment_size] if successful. + /// + /// It is advisable to close and reopen the commitlog handle before + /// attempting further writes. + /// /// - if creating the new segment fails due to an I/O error. /// /// # Panics /// /// The method panics if: /// + /// - `transactions` exceeds [u16::MAX] elements + /// /// - writing to the underlying [Writer] fails /// /// This is likely caused by some storage issue. As we cannot tell with /// certainty how much data (if any) has been written, the internal state /// becomes invalid and thus a panic is raised. /// - /// - if [Self::sync] panics (called when rotating segments) + /// - [Self::sync] panics (called when rotating segments) pub fn commit>>(&mut self, transactions: impl IntoIterator) -> io::Result<()> { self.panicked = true; let writer = &mut self.head; diff --git a/crates/commitlog/src/segment.rs b/crates/commitlog/src/segment.rs index d0acb3eb8ce..c11f1b305f2 100644 --- a/crates/commitlog/src/segment.rs +++ b/crates/commitlog/src/segment.rs @@ -107,44 +107,55 @@ impl Writer { pub fn commit>, U: Encode>( &mut self, transactions: impl IntoIterator, - ) -> io::Result<()> { - let mut txs = transactions.into_iter().peekable(); - while txs.peek().is_some() { - for tx in txs.by_ref().take(u16::MAX as usize) { - let tx = tx.into(); - let expected_offset = self.commit.min_tx_offset + self.commit.n as u64; - if tx.offset != expected_offset { - self.commit.n = 0; - self.commit.records.clear(); - - return Err(io::Error::new( - io::ErrorKind::InvalidInput, - format!("invalid transaction offset {}, expected {}", tx.offset, expected_offset), - )); - } - self.commit.n += 1; - tx.txdata.encode_record(&mut self.commit.records); - } + ) -> io::Result> { + for tx in transactions { + let tx = tx.into(); + let expected_offset = self.commit.min_tx_offset + self.commit.n as u64; + if tx.offset != expected_offset { + self.commit.n = 0; + self.commit.records.clear(); - let _checksum = self - .commit - .write(&mut self.inner) - .unwrap_or_else(|e| panic!("failed to write commit {}: {:#}", self.commit.min_tx_offset, e)); - let commit_len = self.commit.encoded_len() as u64; - - if let Some(index) = self.offset_index_head.as_mut() { - let _ = index - .append_after_commit(self.commit.min_tx_offset, self.bytes_written, commit_len) - .inspect_err(|e| debug!("failed to append to offset index: {e}")); + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + format!("invalid transaction offset {}, expected {}", tx.offset, expected_offset), + )); } + // Increment `n` using checked add for error context. + self.commit.n = self + .commit + .n + .checked_add(1) + .expect("maximum number of transactions in single commit exceeded"); + tx.txdata.encode_record(&mut self.commit.records); + } - self.bytes_written += commit_len; - self.commit.min_tx_offset += self.commit.n as u64; - self.commit.n = 0; - self.commit.records.clear(); + if self.commit.n == 0 { + return Ok(None); } - Ok(()) + let checksum = self + .commit + .write(&mut self.inner) + .unwrap_or_else(|e| panic!("failed to write commit {}: {:#}", self.commit.min_tx_offset, e)); + let commit_len = self.commit.encoded_len() as u64; + + if let Some(index) = self.offset_index_head.as_mut() { + let _ = index + .append_after_commit(self.commit.min_tx_offset, self.bytes_written, commit_len) + .inspect_err(|e| debug!("failed to append to offset index: {e}")); + } + + let tx_range_start = self.commit.min_tx_offset; + + self.bytes_written += commit_len; + self.commit.min_tx_offset += self.commit.n as u64; + self.commit.n = 0; + self.commit.records.clear(); + + Ok(Some(Committed { + tx_range: tx_range_start..self.commit.min_tx_offset, + checksum, + })) } pub fn flush(&mut self) -> io::Result<()> { From f722edb90e9a8aa703750d0f8fbc3486bea9b146 Mon Sep 17 00:00:00 2001 From: Kim Altintop Date: Wed, 28 Jan 2026 11:39:28 +0100 Subject: [PATCH 06/13] Docs --- crates/commitlog/src/commitlog.rs | 21 +++++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) diff --git a/crates/commitlog/src/commitlog.rs b/crates/commitlog/src/commitlog.rs index 9b0e8104049..9d7358d626e 100644 --- a/crates/commitlog/src/commitlog.rs +++ b/crates/commitlog/src/commitlog.rs @@ -139,6 +139,11 @@ impl Generic { /// /// Using a filesystem backend, this means to call `fsync(2)`. /// + /// **Note** that this does not flush the buffered data from calls to + /// [Self::commit], it only instructs the underlying storage to flush its + /// buffers. Call [Self::flush] prior to this method to ensure data from + /// all previous [Self::commit] calls is flushed to the underlying storage. + /// /// # Panics /// /// As an `fsync` failure leaves a file in a more of less undefined state, @@ -152,10 +157,16 @@ impl Generic { self.panicked = false; } + /// Flush the buffered data from previous calls to [Self::commit] to the + /// underlying storage. + /// + /// Call [Self::sync] to instruct the underlying storage to flush its + /// buffers as well. pub fn flush(&mut self) -> io::Result<()> { self.head.flush() } + /// Calls [Self::flush] and then [Self::sync]. fn flush_and_sync(&mut self) -> io::Result<()> { self.flush()?; self.sync(); @@ -272,16 +283,22 @@ impl Generic { impl Generic { /// Write `transactions` to the log. /// - /// This will store all transactions as a single [Commit] + /// This will store all `transactions` as a single [Commit] /// (note that `transactions` must not yield more than [u16::MAX] elements). /// /// Data is buffered by the underlying segment [Writer]. - /// Call [Self::flush] to force flushing to the OS. + /// Call [Self::flush] to force flushing to the underlying storage. /// /// If, after writing the transactions, the writer's total written bytes /// exceed [Options::max_segment_size], the current segment is flushed, /// `fsync`ed and closed, and a new segment is created. /// + /// Returns `Ok(None)` if `transactions` was empty, otherwise [Committed], + /// which contains the offset range and checksum of the commit. + /// + /// Note that supplying empty `transactions` may cause the current segment + /// to be rotated. + /// /// # Errors /// /// An `Err` value is returned in the following cases: From 550aa4bf86c42fd9b374d11bdd75087df4a334fb Mon Sep 17 00:00:00 2001 From: Kim Altintop Date: Wed, 28 Jan 2026 11:41:52 +0100 Subject: [PATCH 07/13] `set_epoch` doesn't need to flush --- crates/commitlog/src/commitlog.rs | 21 +++++---------------- 1 file changed, 5 insertions(+), 16 deletions(-) diff --git a/crates/commitlog/src/commitlog.rs b/crates/commitlog/src/commitlog.rs index 9d7358d626e..ef33d2d5491 100644 --- a/crates/commitlog/src/commitlog.rs +++ b/crates/commitlog/src/commitlog.rs @@ -107,32 +107,21 @@ impl Generic { /// Update the current epoch. /// - /// Calls [`Self::commit`] to flush all data of the previous epoch, and - /// returns the result. - /// /// Does nothing if the given `epoch` is equal to the current epoch. /// /// # Errors /// /// If `epoch` is smaller than the current epoch, an error of kind /// [`io::ErrorKind::InvalidInput`] is returned. - /// - /// Also see [`Self::commit`]. pub fn set_epoch(&mut self, epoch: u64) -> io::Result<()> { - use std::cmp::Ordering::*; - - match epoch.cmp(&self.head.epoch()) { - Less => Err(io::Error::new( + if epoch < self.head.epoch() { + return Err(io::Error::new( io::ErrorKind::InvalidInput, "new epoch is smaller than current epoch", - )), - Equal => Ok(()), - Greater => { - self.flush()?; - self.head.set_epoch(epoch); - Ok(()) - } + )); } + self.head.set_epoch(epoch); + Ok(()) } /// Force the currently active segment to be flushed to storage. From 876f07b65285d867616bd7e55c46c9958155819c Mon Sep 17 00:00:00 2001 From: Kim Altintop Date: Wed, 28 Jan 2026 11:42:43 +0100 Subject: [PATCH 08/13] Return `Committed` from all `commit` methods --- crates/commitlog/src/commitlog.rs | 9 ++++++--- crates/commitlog/src/lib.rs | 5 +++++ 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/crates/commitlog/src/commitlog.rs b/crates/commitlog/src/commitlog.rs index ef33d2d5491..03105ad5bd1 100644 --- a/crates/commitlog/src/commitlog.rs +++ b/crates/commitlog/src/commitlog.rs @@ -323,17 +323,20 @@ impl Generic { /// becomes invalid and thus a panic is raised. /// /// - [Self::sync] panics (called when rotating segments) - pub fn commit>>(&mut self, transactions: impl IntoIterator) -> io::Result<()> { + pub fn commit>>( + &mut self, + transactions: impl IntoIterator, + ) -> io::Result> { self.panicked = true; let writer = &mut self.head; - writer.commit(transactions)?; + let committed = writer.commit(transactions)?; if writer.len() >= self.opts.max_segment_size { self.flush_and_sync()?; self.start_new_segment()?; } self.panicked = false; - Ok(()) + Ok(committed) } pub fn transactions_from<'a, D>( diff --git a/crates/commitlog/src/lib.rs b/crates/commitlog/src/lib.rs index 725d501671f..5a1c543a9ee 100644 --- a/crates/commitlog/src/lib.rs +++ b/crates/commitlog/src/lib.rs @@ -17,6 +17,7 @@ pub mod segment; mod varchar; mod varint; +use crate::segment::Committed; pub use crate::{ commit::{Commit, StoredCommit}, payload::{Decoder, Encode}, @@ -370,6 +371,10 @@ impl Commitlog { impl Commitlog { pub fn commit>>(&self, transactions: impl IntoIterator) -> io::Result<()> { + pub fn commit>>( + &self, + transactions: impl IntoIterator, + ) -> io::Result> { let mut inner = self.inner.write().unwrap(); inner.commit(transactions) } From 0cf8defc692dd11f18511718f84f482c642cbb33 Mon Sep 17 00:00:00 2001 From: Kim Altintop Date: Wed, 28 Jan 2026 11:43:29 +0100 Subject: [PATCH 09/13] Docs --- crates/commitlog/src/lib.rs | 47 ++++++++++++++++++++++++++++++++++++- 1 file changed, 46 insertions(+), 1 deletion(-) diff --git a/crates/commitlog/src/lib.rs b/crates/commitlog/src/lib.rs index 5a1c543a9ee..41cec481446 100644 --- a/crates/commitlog/src/lib.rs +++ b/crates/commitlog/src/lib.rs @@ -370,7 +370,52 @@ impl Commitlog { } impl Commitlog { - pub fn commit>>(&self, transactions: impl IntoIterator) -> io::Result<()> { + /// Write `transactions` to the log. + /// + /// This will store all `transactions` as a single [Commit] + /// (note that `transactions` must not yield more than [u16::MAX] elements). + /// + /// Data is buffered internally, call [Self::flush] to force flushing to + /// the underlying storage. + /// + /// Returns `Ok(None)` if `transactions` was empty, otherwise [Committed], + /// which contains the offset range and checksum of the commit. + /// + /// # Errors + /// + /// An `Err` value is returned in the following cases: + /// + /// - if the transaction sequence is invalid, e.g. because the transaction + /// offsets are not contiguous. + /// + /// In this case, **none** of the `transactions` will be written. + /// + /// - if the current segment needs to be rotated, and an I/O error occurs + /// flushing it to storage. + /// + /// In this case, unwritten data remains buffered, and the current segment + /// remains open. Calling [Self::flush] afterwards may (or may not) + /// succeed, and calling [Self::commit] again with new data could grow + /// the segment further beyond [Options::max_segment_size] if successful. + /// + /// It is advisable to close and reopen the commitlog handle before + /// attempting further writes. + /// + /// - if creating the new segment fails due to an I/O error. + /// + /// # Panics + /// + /// The method panics if: + /// + /// - `transactions` exceeds [u16::MAX] elements + /// + /// - writing to the underlying buffered writer fails + /// + /// This is likely caused by some storage issue. As we cannot tell with + /// certainty how much data (if any) has been written, the internal state + /// becomes invalid and thus a panic is raised. + /// + /// - [Self::sync] panics (called when rotating segments) pub fn commit>>( &self, transactions: impl IntoIterator, From a50b42299f6f7d58f1b6fe63441e691e0aaef60d Mon Sep 17 00:00:00 2001 From: Kim Altintop Date: Wed, 28 Jan 2026 12:10:10 +0100 Subject: [PATCH 10/13] Use assert --- crates/commitlog/src/segment.rs | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/crates/commitlog/src/segment.rs b/crates/commitlog/src/segment.rs index c11f1b305f2..8de47e5a05a 100644 --- a/crates/commitlog/src/segment.rs +++ b/crates/commitlog/src/segment.rs @@ -120,12 +120,11 @@ impl Writer { format!("invalid transaction offset {}, expected {}", tx.offset, expected_offset), )); } - // Increment `n` using checked add for error context. - self.commit.n = self - .commit - .n - .checked_add(1) - .expect("maximum number of transactions in single commit exceeded"); + assert!( + self.commit.n < u16::MAX, + "maximum number of transactions in a single commit exceeded" + ); + self.commit.n += 1; tx.txdata.encode_record(&mut self.commit.records); } @@ -136,6 +135,9 @@ impl Writer { let checksum = self .commit .write(&mut self.inner) + // Panic here as we don't know how much of the commit has been + // written (if anything). Further commits would leave corrupted data + // in the log. .unwrap_or_else(|e| panic!("failed to write commit {}: {:#}", self.commit.min_tx_offset, e)); let commit_len = self.commit.encoded_len() as u64; From fd274774a4410f7bbc67044e41ec3c36a9eb0c6f Mon Sep 17 00:00:00 2001 From: Kim Altintop Date: Wed, 28 Jan 2026 16:12:15 +0100 Subject: [PATCH 11/13] Restore the commit corruption after ENOSPC test --- crates/commitlog/src/commitlog.rs | 16 +--- crates/commitlog/src/lib.rs | 13 +-- crates/commitlog/src/tests/partial.rs | 125 ++++++++++++++++++++++++-- 3 files changed, 123 insertions(+), 31 deletions(-) diff --git a/crates/commitlog/src/commitlog.rs b/crates/commitlog/src/commitlog.rs index 03105ad5bd1..3d6df97d7c2 100644 --- a/crates/commitlog/src/commitlog.rs +++ b/crates/commitlog/src/commitlog.rs @@ -297,17 +297,6 @@ impl Generic { /// /// In this case, **none** of the `transactions` will be written. /// - /// - if the current segment needs to be rotated, and an I/O error occurs - /// flushing it to storage. - /// - /// In this case, unwritten data remains buffered, and the current segment - /// remains open. Calling [Self::flush] afterwards may (or may not) - /// succeed, and calling [Self::commit] again with new data could grow - /// the segment further beyond [Options::max_segment_size] if successful. - /// - /// It is advisable to close and reopen the commitlog handle before - /// attempting further writes. - /// /// - if creating the new segment fails due to an I/O error. /// /// # Panics @@ -316,7 +305,7 @@ impl Generic { /// /// - `transactions` exceeds [u16::MAX] elements /// - /// - writing to the underlying [Writer] fails + /// - [Self::flush] or writing to the underlying [Writer] fails /// /// This is likely caused by some storage issue. As we cannot tell with /// certainty how much data (if any) has been written, the internal state @@ -331,7 +320,8 @@ impl Generic { let writer = &mut self.head; let committed = writer.commit(transactions)?; if writer.len() >= self.opts.max_segment_size { - self.flush_and_sync()?; + self.flush().expect("failed to flush segment"); + self.sync(); self.start_new_segment()?; } self.panicked = false; diff --git a/crates/commitlog/src/lib.rs b/crates/commitlog/src/lib.rs index 41cec481446..a830928ce5a 100644 --- a/crates/commitlog/src/lib.rs +++ b/crates/commitlog/src/lib.rs @@ -390,17 +390,6 @@ impl Commitlog { /// /// In this case, **none** of the `transactions` will be written. /// - /// - if the current segment needs to be rotated, and an I/O error occurs - /// flushing it to storage. - /// - /// In this case, unwritten data remains buffered, and the current segment - /// remains open. Calling [Self::flush] afterwards may (or may not) - /// succeed, and calling [Self::commit] again with new data could grow - /// the segment further beyond [Options::max_segment_size] if successful. - /// - /// It is advisable to close and reopen the commitlog handle before - /// attempting further writes. - /// /// - if creating the new segment fails due to an I/O error. /// /// # Panics @@ -409,7 +398,7 @@ impl Commitlog { /// /// - `transactions` exceeds [u16::MAX] elements /// - /// - writing to the underlying buffered writer fails + /// - [Self::flush] or writing to the underlying buffered writer fails /// /// This is likely caused by some storage issue. As we cannot tell with /// certainty how much data (if any) has been written, the internal state diff --git a/crates/commitlog/src/tests/partial.rs b/crates/commitlog/src/tests/partial.rs index ebda9e29532..11c22fb08cd 100644 --- a/crates/commitlog/src/tests/partial.rs +++ b/crates/commitlog/src/tests/partial.rs @@ -3,28 +3,131 @@ use std::{ fmt::{self, Debug}, io::{self, Seek as _, SeekFrom}, iter, + ops::Range, }; -use log::debug; +use log::{debug, info}; use pretty_assertions::assert_matches; use crate::{ - commitlog, + commitlog, payload, repo::{self, Repo, SegmentLen}, segment::{self, FileLike}, tests::helpers::{enable_logging, fill_log_with}, - Options, + Commit, Options, TxOffset, DEFAULT_LOG_FORMAT_VERSION, }; #[test] -#[should_panic] -fn panics_on_enospc() { +#[should_panic(expected = "failed to flush segment")] +fn panics_on_partial_write() { enable_logging(); let mut log = open_log::<[u8; 32]>(ShortMem::new(800)); - for i in 0..100 { + for i in 0..20 { + info!("commit {i}"); + log.commit([(i, [b'z'; 32])]).expect("unexpected `Err` result"); + } +} + +fn fill_log(mut log: commitlog::Generic, range: Range) { + debug!("writing range {range:?}"); + + let end = range.end; + for i in range { + info!("commit {i}"); log.commit([(i, [b'z'; 32])]).unwrap(); } + log.flush().unwrap(); + + // Try to write one more, which should fail. + log.commit([(end, [b'x'; 32])]).unwrap(); + assert_matches!( + log.flush(), + Err(e) if e.kind() == io::ErrorKind::StorageFull + ); +} +/// Tests that, when a partial write occurs, we can read all flushed commits +/// up until the faulty one. +#[test] +fn read_log_up_to_partial_write() { + enable_logging(); + + const MAX_SEGMENT_SIZE: usize = 800; + const TXDATA_SIZE: usize = 32; + const COMMIT_SIZE: usize = Commit::FRAMING_LEN + TXDATA_SIZE; + const TOTAL_TXS: usize = MAX_SEGMENT_SIZE / COMMIT_SIZE; + + let repo = ShortMem::new(MAX_SEGMENT_SIZE as u64); + fill_log(open_log::<[u8; TXDATA_SIZE]>(repo.clone()), 0..(TOTAL_TXS as u64)); + + let txs = commitlog::transactions_from( + repo, + DEFAULT_LOG_FORMAT_VERSION, + 0, + &payload::ArrayDecoder::, + ) + .unwrap() + .map(Result::unwrap) + .count(); + + assert_eq!(txs, TOTAL_TXS,); +} + +/// Tests: +/// +/// - fill log until a partial write occurs +/// - corrupt the last successfully written commit +/// - fill log until a partial write occurs +/// +/// The log should detect the corrupt commit, create a fresh segment, and write +/// the second batch until ENOSPC. Traversal should work. +#[test] +fn reopen_with_corrupt_last_commit() { + enable_logging(); + + const MAX_SEGMENT_SIZE: usize = 800; + const TXDATA_SIZE: usize = 32; + const COMMIT_SIZE: usize = Commit::FRAMING_LEN + TXDATA_SIZE; + const TXS_PER_SEGMENT: u64 = (MAX_SEGMENT_SIZE / COMMIT_SIZE) as u64; + const TOTAL_TXS: u64 = (TXS_PER_SEGMENT * 2) - 1; + + let repo = ShortMem::new(MAX_SEGMENT_SIZE as u64); + + // Fill with as many txs as possible until ENOSPC. + fill_log(open_log::<[u8; TXDATA_SIZE]>(repo.clone()), 0..TXS_PER_SEGMENT); + + // Invalidate the checksum of the last commit. + let last_segment_offset = repo.existing_offsets().unwrap().last().copied().unwrap(); + let last_commit: Commit = repo::open_segment_reader(&repo, DEFAULT_LOG_FORMAT_VERSION, last_segment_offset) + .unwrap() + .commits() + .map(Result::unwrap) + .last() + .unwrap() + .into(); + { + let mut last_segment = repo.open_segment_writer(last_segment_offset).unwrap(); + let pos = last_segment.len() - last_commit.encoded_len() + 1; + last_segment.modify_byte_at(pos, |_| 255); + } + + // Write a second batch, starting with the offset of the corrupt commit. + fill_log( + open_log::<[u8; TXDATA_SIZE]>(repo.clone()), + last_commit.min_tx_offset..TOTAL_TXS, + ); + + let txs = commitlog::transactions_from( + repo, + DEFAULT_LOG_FORMAT_VERSION, + 0, + &payload::ArrayDecoder::, + ) + .unwrap() + .map(Result::unwrap) + .count(); + + assert_eq!(txs as u64, TOTAL_TXS); } /// Edge case surfaced in production: @@ -82,6 +185,16 @@ struct ShortSegment { max_len: u64, } +impl ShortSegment { + pub fn len(&self) -> usize { + self.inner.len() + } + + pub fn modify_byte_at(&mut self, pos: usize, f: impl FnOnce(u8) -> u8) { + self.inner.modify_byte_at(pos, f); + } +} + impl SegmentLen for ShortSegment { fn segment_len(&mut self) -> io::Result { self.inner.segment_len() From f37fad3f77b0a260b608f17439f68c8eadc6bdd8 Mon Sep 17 00:00:00 2001 From: Kim Altintop Date: Wed, 28 Jan 2026 17:17:48 +0100 Subject: [PATCH 12/13] Add TODO --- crates/durability/src/lib.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/crates/durability/src/lib.rs b/crates/durability/src/lib.rs index 43c236a6388..d4ca740e36a 100644 --- a/crates/durability/src/lib.rs +++ b/crates/durability/src/lib.rs @@ -115,6 +115,9 @@ pub trait Durability: Send + Sync { /// cannot be made durable immediately. /// /// Errors may be signalled by panicking. + // + // TODO(perf): Can we avoid allocating a new `Box<[_]>` for every commit, + // or at least reuse boxes? fn commit(&self, txs: Box<[Transaction]>); /// Obtain a handle to the [DurableOffset]. From 4a16bf5f8447fb0103f4503307a07758af13663b Mon Sep 17 00:00:00 2001 From: Kim Altintop Date: Wed, 28 Jan 2026 17:22:02 +0100 Subject: [PATCH 13/13] Fix fallocate tests --- crates/durability/tests/io/fallocate.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/crates/durability/tests/io/fallocate.rs b/crates/durability/tests/io/fallocate.rs index 5dc67dd13d3..4a484c42823 100644 --- a/crates/durability/tests/io/fallocate.rs +++ b/crates/durability/tests/io/fallocate.rs @@ -98,8 +98,8 @@ async fn local_durability_crashes_on_new_segment_if_not_enough_space() { // Mark initial segment as seen. new_segment_rx.borrow_and_update(); // Write past available space. - for _ in 0..256 { - durability.append_tx(txdata.clone()); + for offset in 0..256 { + durability.commit([(offset, txdata.clone()).into()].into()); } // Ensure new segment is created. new_segment_rx.changed().await?; @@ -107,7 +107,7 @@ async fn local_durability_crashes_on_new_segment_if_not_enough_space() { sleep(Duration::from_millis(5)).await; // Durability actor should have crashed, so this should panic. info!("trying append on crashed durability"); - durability.append_tx(txdata.clone()); + durability.commit([(256, txdata.clone()).into()].into()); } Ok(()) @@ -168,7 +168,6 @@ async fn local_durability( spacetimedb_durability::local::Options { commitlog: spacetimedb_commitlog::Options { max_segment_size, - max_records_in_commit: 1.try_into().unwrap(), preallocate_segments: true, ..<_>::default() },