diff --git a/crates/commitlog/src/commitlog.rs b/crates/commitlog/src/commitlog.rs index 03c590e4950..3d6df97d7c2 100644 --- a/crates/commitlog/src/commitlog.rs +++ b/crates/commitlog/src/commitlog.rs @@ -107,81 +107,32 @@ impl Generic { /// Update the current epoch. /// - /// Calls [`Self::commit`] to flush all data of the previous epoch, and - /// returns the result. - /// /// Does nothing if the given `epoch` is equal to the current epoch. /// /// # Errors /// /// If `epoch` is smaller than the current epoch, an error of kind /// [`io::ErrorKind::InvalidInput`] is returned. - /// - /// Also see [`Self::commit`]. - pub fn set_epoch(&mut self, epoch: u64) -> io::Result> { - use std::cmp::Ordering::*; - - match epoch.cmp(&self.head.epoch()) { - Less => Err(io::Error::new( + pub fn set_epoch(&mut self, epoch: u64) -> io::Result<()> { + if epoch < self.head.epoch() { + return Err(io::Error::new( io::ErrorKind::InvalidInput, "new epoch is smaller than current epoch", - )), - Equal => Ok(None), - Greater => { - let res = self.commit()?; - self.head.set_epoch(epoch); - Ok(res) - } + )); } - } - - /// Write the currently buffered data to storage and rotate segments as - /// necessary. - /// - /// Note that this does not imply that the data is durable, in particular - /// when a filesystem storage backend is used. Call [`Self::sync`] to flush - /// any OS buffers to stable storage. - /// - /// # Errors - /// - /// If an error occurs writing the data, the current [`Commit`] buffer is - /// retained, but a new segment is created. Retrying in case of an `Err` - /// return value thus will write the current data to that new segment. - /// - /// If this fails, however, the next attempt to create a new segment will - /// fail with [`io::ErrorKind::AlreadyExists`]. Encountering this error kind - /// this means that something is seriously wrong underlying storage, and the - /// caller should stop writing to the log. - pub fn commit(&mut self) -> io::Result> { - self.panicked = true; - let writer = &mut self.head; - let sz = writer.commit.encoded_len(); - // If the segment is empty, but the commit exceeds the max size, - // we got a huge commit which needs to be written even if that - // results in a huge segment. - let should_rotate = !writer.is_empty() && writer.len() + sz as u64 > self.opts.max_segment_size; - let writer = if should_rotate { - self.sync(); - self.start_new_segment()? - } else { - writer - }; - - let ret = writer.commit().or_else(|e| { - warn!("Commit failed: {e}"); - // Nb.: Don't risk a panic by calling `self.sync()`. - // We already gave up on the last commit, and will retry it next time. - self.start_new_segment()?; - Err(e) - }); - self.panicked = false; - ret + self.head.set_epoch(epoch); + Ok(()) } /// Force the currently active segment to be flushed to storage. /// /// Using a filesystem backend, this means to call `fsync(2)`. /// + /// **Note** that this does not flush the buffered data from calls to + /// [Self::commit], it only instructs the underlying storage to flush its + /// buffers. Call [Self::flush] prior to this method to ensure data from + /// all previous [Self::commit] calls is flushed to the underlying storage. + /// /// # Panics /// /// As an `fsync` failure leaves a file in a more of less undefined state, @@ -195,6 +146,22 @@ impl Generic { self.panicked = false; } + /// Flush the buffered data from previous calls to [Self::commit] to the + /// underlying storage. + /// + /// Call [Self::sync] to instruct the underlying storage to flush its + /// buffers as well. + pub fn flush(&mut self) -> io::Result<()> { + self.head.flush() + } + + /// Calls [Self::flush] and then [Self::sync]. + fn flush_and_sync(&mut self) -> io::Result<()> { + self.flush()?; + self.sync(); + Ok(()) + } + /// The last transaction offset written to disk, or `None` if nothing has /// been written yet. /// @@ -303,8 +270,63 @@ impl Generic { } impl Generic { - pub fn append(&mut self, record: T) -> Result<(), T> { - self.head.append(record) + /// Write `transactions` to the log. + /// + /// This will store all `transactions` as a single [Commit] + /// (note that `transactions` must not yield more than [u16::MAX] elements). + /// + /// Data is buffered by the underlying segment [Writer]. + /// Call [Self::flush] to force flushing to the underlying storage. + /// + /// If, after writing the transactions, the writer's total written bytes + /// exceed [Options::max_segment_size], the current segment is flushed, + /// `fsync`ed and closed, and a new segment is created. + /// + /// Returns `Ok(None)` if `transactions` was empty, otherwise [Committed], + /// which contains the offset range and checksum of the commit. + /// + /// Note that supplying empty `transactions` may cause the current segment + /// to be rotated. + /// + /// # Errors + /// + /// An `Err` value is returned in the following cases: + /// + /// - if the transaction sequence is invalid, e.g. because the transaction + /// offsets are not contiguous. + /// + /// In this case, **none** of the `transactions` will be written. + /// + /// - if creating the new segment fails due to an I/O error. + /// + /// # Panics + /// + /// The method panics if: + /// + /// - `transactions` exceeds [u16::MAX] elements + /// + /// - [Self::flush] or writing to the underlying [Writer] fails + /// + /// This is likely caused by some storage issue. As we cannot tell with + /// certainty how much data (if any) has been written, the internal state + /// becomes invalid and thus a panic is raised. + /// + /// - [Self::sync] panics (called when rotating segments) + pub fn commit>>( + &mut self, + transactions: impl IntoIterator, + ) -> io::Result> { + self.panicked = true; + let writer = &mut self.head; + let committed = writer.commit(transactions)?; + if writer.len() >= self.opts.max_segment_size { + self.flush().expect("failed to flush segment"); + self.sync(); + self.start_new_segment()?; + } + self.panicked = false; + + Ok(committed) } pub fn transactions_from<'a, D>( @@ -348,8 +370,8 @@ impl Generic { impl Drop for Generic { fn drop(&mut self) { if !self.panicked { - if let Err(e) = self.head.commit() { - warn!("failed to commit on drop: {e}"); + if let Err(e) = self.flush_and_sync() { + warn!("failed to flush on drop: {e:#}"); } } } @@ -920,7 +942,7 @@ fn range_is_empty(range: &impl RangeBounds) -> bool { #[cfg(test)] mod tests { - use std::{cell::Cell, iter::repeat, num::NonZeroU16}; + use std::{cell::Cell, iter::repeat}; use pretty_assertions::assert_matches; @@ -933,30 +955,31 @@ mod tests { #[test] fn rotate_segments_simple() { let mut log = mem_log::<[u8; 32]>(128); - for _ in 0..3 { - log.append([0; 32]).unwrap(); - log.commit().unwrap(); + for i in 0..4 { + log.commit([(i, [0; 32])]).unwrap(); } + log.flush_and_sync().unwrap(); let offsets = log.repo.existing_offsets().unwrap(); assert_eq!(&offsets[..offsets.len() - 1], &log.tail); - assert_eq!(offsets[offsets.len() - 1], 2); + // TODO: We overshoot the max segment size. + assert_eq!(&offsets, &[0, 3]); } #[test] fn huge_commit() { let mut log = mem_log::<[u8; 32]>(32); - log.append([0; 32]).unwrap(); - log.append([1; 32]).unwrap(); - log.commit().unwrap(); - assert!(log.head.len() > log.opts.max_segment_size); + log.commit([(0, [0; 32]), (1, [1; 32])]).unwrap(); + log.flush_and_sync().unwrap(); + // First segment got rotated out. + assert_eq!(&log.tail, &[0]); - log.append([2; 32]).unwrap(); - log.commit().unwrap(); + log.commit([(2, [2; 32])]).unwrap(); + log.flush_and_sync().unwrap(); - assert_eq!(&log.tail, &[0]); - assert_eq!(&log.repo.existing_offsets().unwrap(), &[0, 2]); + // Second segment got rotated out and segment 3 is created. + assert_eq!(&log.repo.existing_offsets().unwrap(), &[0, 2, 3]); } #[test] @@ -1052,14 +1075,31 @@ mod tests { fn traverse_commits_ignores_duplicates() { let mut log = mem_log::<[u8; 32]>(1024); - log.append([42; 32]).unwrap(); - let commit1 = log.head.commit.clone(); - log.commit().unwrap(); - log.head.commit = commit1.clone(); - log.commit().unwrap(); - log.append([43; 32]).unwrap(); - let commit2 = log.head.commit.clone(); - log.commit().unwrap(); + let tx1 = [42u8; 32]; + let tx2 = [43u8; 32]; + + log.commit([(0, tx1)]).unwrap(); + let commit1 = Commit { + min_tx_offset: 0, + n: 1, + records: tx1.to_vec(), + ..log.head.commit.clone() + }; + + // Reset the commit offset, so we can write the same commit twice. + log.head.commit.min_tx_offset = 0; + log.commit([(0, tx1)]).unwrap(); + + // Write another one. + log.commit([(1, tx2)]).unwrap(); + let commit2 = Commit { + min_tx_offset: 1, + n: 1, + records: tx2.to_vec(), + ..log.head.commit.clone() + }; + + log.flush_and_sync().unwrap(); assert_eq!( [commit1, commit2].as_slice(), @@ -1074,15 +1114,14 @@ mod tests { fn traverse_commits_errors_when_forked() { let mut log = mem_log::<[u8; 32]>(1024); - log.append([42; 32]).unwrap(); - log.commit().unwrap(); - log.head.commit = Commit { - min_tx_offset: 0, - n: 1, - records: [43; 32].to_vec(), - epoch: 0, - }; - log.commit().unwrap(); + log.commit([(0, [42; 32])]).unwrap(); + // Reset the commit offset, + // and write a different commit at the same offset. + // This is considered a fork. + log.head.commit.min_tx_offset = 0; + log.commit([(0, [43; 32])]).unwrap(); + + log.flush_and_sync().unwrap(); let res = log.commits_from(0).collect::, _>>(); assert!( @@ -1095,11 +1134,11 @@ mod tests { fn traverse_commits_errors_when_offset_not_contiguous() { let mut log = mem_log::<[u8; 32]>(1024); - log.append([42; 32]).unwrap(); - log.commit().unwrap(); + log.commit([(0, [42; 32])]).unwrap(); log.head.commit.min_tx_offset = 18; - log.append([42; 32]).unwrap(); - log.commit().unwrap(); + log.commit([(18, [42; 32])]).unwrap(); + + log.flush_and_sync().unwrap(); let res = log.commits_from(0).collect::, _>>(); assert!( @@ -1111,7 +1150,7 @@ mod tests { prev_error: None }) ), - "expected fork error: {res:?}" + "expected out-of-order error: {res:?}" ) } @@ -1221,7 +1260,7 @@ mod tests { #[test] fn reopen() { let mut log = mem_log::<[u8; 32]>(1024); - let mut total_txs = fill_log(&mut log, 100, (1..=10).cycle()); + let total_txs = fill_log(&mut log, 100, (1..=10).cycle()); assert_eq!( total_txs, log.transactions_from(0, &ArrayDecoder).map(Result::unwrap).count() @@ -1231,12 +1270,11 @@ mod tests { log.repo.clone(), Options { max_segment_size: 1024, - max_records_in_commit: NonZeroU16::new(10).unwrap(), ..Options::default() }, ) .unwrap(); - total_txs += fill_log(&mut log, 100, (1..=10).cycle()); + let total_txs = fill_log(&mut log, 100, (1..=10).cycle()); assert_eq!( total_txs, @@ -1245,24 +1283,22 @@ mod tests { } #[test] - fn set_same_epoch_does_nothing() { + fn set_new_epoch() { let mut log = Generic::<_, [u8; 32]>::open(repo::Memory::unlimited(), <_>::default()).unwrap(); assert_eq!(log.epoch(), Commit::DEFAULT_EPOCH); - let committed = log.set_epoch(Commit::DEFAULT_EPOCH).unwrap(); - assert_eq!(committed, None); - } - - #[test] - fn set_new_epoch_commits() { - let mut log = Generic::<_, [u8; 32]>::open(repo::Memory::unlimited(), <_>::default()).unwrap(); - assert_eq!(log.epoch(), Commit::DEFAULT_EPOCH); - log.append(<_>::default()).unwrap(); - let committed = log - .set_epoch(42) - .unwrap() - .expect("should have committed the pending transaction"); + log.commit([(0, [12; 32])]).unwrap(); + log.set_epoch(42).unwrap(); assert_eq!(log.epoch(), 42); - assert_eq!(committed.tx_range.start, 0); + log.commit([(1, [13; 32])]).unwrap(); + + log.flush_and_sync().unwrap(); + + let epochs = log + .commits_from(0) + .map(Result::unwrap) + .map(|commit| commit.epoch) + .collect::>(); + assert_eq!(&[Commit::DEFAULT_EPOCH, 42], epochs.as_slice()); } #[test] diff --git a/crates/commitlog/src/lib.rs b/crates/commitlog/src/lib.rs index 1e7a8c0047e..a830928ce5a 100644 --- a/crates/commitlog/src/lib.rs +++ b/crates/commitlog/src/lib.rs @@ -1,6 +1,6 @@ use std::{ io, - num::{NonZeroU16, NonZeroU64}, + num::NonZeroU64, ops::RangeBounds, sync::{Arc, RwLock}, }; @@ -17,10 +17,11 @@ pub mod segment; mod varchar; mod varint; +use crate::segment::Committed; pub use crate::{ commit::{Commit, StoredCommit}, payload::{Decoder, Encode}, - repo::fs::SizeOnDisk, + repo::{fs::SizeOnDisk, TxOffset}, segment::{Transaction, DEFAULT_LOG_FORMAT_VERSION}, varchar::Varchar, }; @@ -57,14 +58,6 @@ pub struct Options { /// Default: 1GiB #[cfg_attr(feature = "serde", serde(default = "Options::default_max_segment_size"))] pub max_segment_size: u64, - /// The maximum number of records in a commit. - /// - /// If this number is exceeded, the commit is flushed to disk even without - /// explicitly calling [`Commitlog::flush`]. - /// - /// Default: 1 - #[cfg_attr(feature = "serde", serde(default = "Options::default_max_records_in_commit"))] - pub max_records_in_commit: NonZeroU16, /// Whenever at least this many bytes have been written to the currently /// active segment, an entry is added to its offset index. /// @@ -106,7 +99,6 @@ impl Default for Options { impl Options { pub const DEFAULT_MAX_SEGMENT_SIZE: u64 = 1024 * 1024 * 1024; - pub const DEFAULT_MAX_RECORDS_IN_COMMIT: NonZeroU16 = NonZeroU16::new(1).expect("1 > 0, qed"); pub const DEFAULT_OFFSET_INDEX_INTERVAL_BYTES: NonZeroU64 = NonZeroU64::new(4096).expect("4096 > 0, qed"); pub const DEFAULT_OFFSET_INDEX_REQUIRE_SEGMENT_FSYNC: bool = false; pub const DEFAULT_PREALLOCATE_SEGMENTS: bool = false; @@ -114,7 +106,6 @@ impl Options { pub const DEFAULT: Self = Self { log_format_version: DEFAULT_LOG_FORMAT_VERSION, max_segment_size: Self::default_max_segment_size(), - max_records_in_commit: Self::default_max_records_in_commit(), offset_index_interval_bytes: Self::default_offset_index_interval_bytes(), offset_index_require_segment_fsync: Self::default_offset_index_require_segment_fsync(), preallocate_segments: Self::default_preallocate_segments(), @@ -128,10 +119,6 @@ impl Options { Self::DEFAULT_MAX_SEGMENT_SIZE } - pub const fn default_max_records_in_commit() -> NonZeroU16 { - Self::DEFAULT_MAX_RECORDS_IN_COMMIT - } - pub const fn default_offset_index_interval_bytes() -> NonZeroU64 { Self::DEFAULT_OFFSET_INDEX_INTERVAL_BYTES } @@ -262,7 +249,7 @@ impl Commitlog { pub fn flush(&self) -> io::Result> { let mut inner = self.inner.write().unwrap(); trace!("flush commitlog"); - inner.commit()?; + inner.flush()?; Ok(inner.max_committed_offset()) } @@ -282,7 +269,7 @@ impl Commitlog { pub fn flush_and_sync(&self) -> io::Result> { let mut inner = self.inner.write().unwrap(); trace!("flush and sync commitlog"); - inner.commit()?; + inner.flush()?; inner.sync(); Ok(inner.max_committed_offset()) @@ -383,57 +370,47 @@ impl Commitlog { } impl Commitlog { - /// Append the record `txdata` to the log. + /// Write `transactions` to the log. /// - /// If the internal buffer exceeds [`Options::max_records_in_commit`], the - /// argument is returned in an `Err`. The caller should [`Self::flush`] the - /// log and try again. + /// This will store all `transactions` as a single [Commit] + /// (note that `transactions` must not yield more than [u16::MAX] elements). /// - /// In case the log is appended to from multiple threads, this may result in - /// a busy loop trying to acquire a slot in the buffer. In such scenarios, - /// [`Self::append_maybe_flush`] is preferable. - pub fn append(&self, txdata: T) -> Result<(), T> { - let mut inner = self.inner.write().unwrap(); - inner.append(txdata) - } - - /// Append the record `txdata` to the log. + /// Data is buffered internally, call [Self::flush] to force flushing to + /// the underlying storage. /// - /// The `txdata` payload is buffered in memory until either: + /// Returns `Ok(None)` if `transactions` was empty, otherwise [Committed], + /// which contains the offset range and checksum of the commit. /// - /// - [`Self::flush`] is called explicitly, or - /// - [`Options::max_records_in_commit`] is exceeded + /// # Errors /// - /// In the latter case, [`Self::append`] flushes implicitly, _before_ - /// appending the `txdata` argument. + /// An `Err` value is returned in the following cases: /// - /// I.e. the argument is not guaranteed to be flushed after the method - /// returns. If that is desired, [`Self::flush`] must be called explicitly. + /// - if the transaction sequence is invalid, e.g. because the transaction + /// offsets are not contiguous. /// - /// If writing `txdata` to the commitlog results in a new segment file being opened, - /// we will send a message down `on_new_segment`. - /// This will be hooked up to the `request_snapshot` channel of a `SnapshotWorker`. + /// In this case, **none** of the `transactions` will be written. /// - /// # Errors + /// - if creating the new segment fails due to an I/O error. /// - /// If the log needs to be flushed, but an I/O error occurs, ownership of - /// `txdata` is returned back to the caller alongside the [`io::Error`]. + /// # Panics /// - /// The value can then be used to retry appending. - pub fn append_maybe_flush(&self, txdata: T) -> Result<(), error::Append> { + /// The method panics if: + /// + /// - `transactions` exceeds [u16::MAX] elements + /// + /// - [Self::flush] or writing to the underlying buffered writer fails + /// + /// This is likely caused by some storage issue. As we cannot tell with + /// certainty how much data (if any) has been written, the internal state + /// becomes invalid and thus a panic is raised. + /// + /// - [Self::sync] panics (called when rotating segments) + pub fn commit>>( + &self, + transactions: impl IntoIterator, + ) -> io::Result> { let mut inner = self.inner.write().unwrap(); - - if let Err(txdata) = inner.append(txdata) { - if let Err(source) = inner.commit() { - return Err(error::Append { txdata, source }); - } - - // `inner.commit.n` must be zero at this point - let res = inner.append(txdata); - debug_assert!(res.is_ok(), "failed to append while holding write lock"); - } - - Ok(()) + inner.commit(transactions) } /// Obtain an iterator which traverses the log from the start, yielding diff --git a/crates/commitlog/src/repo/mod.rs b/crates/commitlog/src/repo/mod.rs index 5be633bf5c7..2ab5579c8b2 100644 --- a/crates/commitlog/src/repo/mod.rs +++ b/crates/commitlog/src/repo/mod.rs @@ -219,8 +219,6 @@ pub fn create_segment_writer( min_tx_offset: offset, bytes_written: Header::LEN as u64, - max_records_in_commit: opts.max_records_in_commit, - offset_index_head: create_offset_index_writer(repo, offset, opts), }) } @@ -293,8 +291,6 @@ pub fn resume_segment_writer( min_tx_offset: tx_range.start, bytes_written: size_in_bytes, - max_records_in_commit: opts.max_records_in_commit, - offset_index_head: create_offset_index_writer(repo, offset, opts), })) } diff --git a/crates/commitlog/src/segment.rs b/crates/commitlog/src/segment.rs index 7e8c054467b..8de47e5a05a 100644 --- a/crates/commitlog/src/segment.rs +++ b/crates/commitlog/src/segment.rs @@ -1,7 +1,7 @@ use std::{ fs::File, io::{self, BufWriter, ErrorKind, SeekFrom, Write as _}, - num::{NonZeroU16, NonZeroU64}, + num::NonZeroU64, ops::Range, }; @@ -100,58 +100,52 @@ pub struct Writer { pub(crate) min_tx_offset: u64, pub(crate) bytes_written: u64, - pub(crate) max_records_in_commit: NonZeroU16, - pub(crate) offset_index_head: Option, } impl Writer { - /// Append the record (aka transaction) `T` to the segment. - /// - /// If the number of currently buffered records would exceed `max_records_in_commit` - /// after the method returns, the argument is returned in an `Err` and not - /// appended to this writer's buffer. - /// - /// Otherwise, the `record` is encoded and and stored in the buffer. - /// - /// An `Err` result indicates that [`Self::commit`] should be called in - /// order to flush the buffered records to persistent storage. - pub fn append(&mut self, record: T) -> Result<(), T> { - if self.commit.n == u16::MAX || self.commit.n + 1 > self.max_records_in_commit.get() { - Err(record) - } else { + pub fn commit>, U: Encode>( + &mut self, + transactions: impl IntoIterator, + ) -> io::Result> { + for tx in transactions { + let tx = tx.into(); + let expected_offset = self.commit.min_tx_offset + self.commit.n as u64; + if tx.offset != expected_offset { + self.commit.n = 0; + self.commit.records.clear(); + + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + format!("invalid transaction offset {}, expected {}", tx.offset, expected_offset), + )); + } + assert!( + self.commit.n < u16::MAX, + "maximum number of transactions in a single commit exceeded" + ); self.commit.n += 1; - record.encode_record(&mut self.commit.records); - Ok(()) + tx.txdata.encode_record(&mut self.commit.records); } - } - /// Write the current [`Commit`] to the underlying [`io::Write`]. - /// - /// Will do nothing if the current commit is empty (i.e. `Commit::n` is zero). - /// In this case, `None` is returned. - /// - /// Otherwise `Some` [`Committed`] is returned, providing some metadata about - /// the commit. - pub fn commit(&mut self) -> io::Result> { if self.commit.n == 0 { return Ok(None); } - let checksum = self.commit.write(&mut self.inner)?; - self.inner.flush()?; + let checksum = self + .commit + .write(&mut self.inner) + // Panic here as we don't know how much of the commit has been + // written (if anything). Further commits would leave corrupted data + // in the log. + .unwrap_or_else(|e| panic!("failed to write commit {}: {:#}", self.commit.min_tx_offset, e)); let commit_len = self.commit.encoded_len() as u64; - self.offset_index_head.as_mut().map(|index| { - debug!( - "append_after commit min_tx_offset={} bytes_written={} commit_len={}", - self.commit.min_tx_offset, self.bytes_written, commit_len - ); - index + + if let Some(index) = self.offset_index_head.as_mut() { + let _ = index .append_after_commit(self.commit.min_tx_offset, self.bytes_written, commit_len) - .map_err(|e| { - debug!("failed to append to offset index: {e:?}"); - }) - }); + .inspect_err(|e| debug!("failed to append to offset index: {e}")); + } let tx_range_start = self.commit.min_tx_offset; @@ -166,6 +160,10 @@ impl Writer { })) } + pub fn flush(&mut self) -> io::Result<()> { + self.inner.flush() + } + /// Get the current epoch. pub fn epoch(&self) -> u64 { self.commit.epoch @@ -537,6 +535,12 @@ pub struct Transaction { pub txdata: T, } +impl From<(u64, T)> for Transaction { + fn from((offset, txdata): (u64, T)) -> Self { + Self { offset, txdata } + } +} + pub struct Commits { pub header: Header, reader: R, @@ -744,16 +748,14 @@ impl Metadata { #[cfg(test)] mod tests { - use std::num::NonZeroU16; - - use super::*; - use crate::{payload::ArrayDecoder, repo, Options}; use itertools::Itertools; use pretty_assertions::assert_matches; - use proptest::prelude::*; use spacetimedb_paths::server::CommitLogDir; use tempfile::tempdir; + use super::*; + use crate::{payload::ArrayDecoder, repo, Options}; + #[test] fn header_roundtrip() { let hdr = Header { @@ -772,20 +774,9 @@ mod tests { fn write_read_roundtrip() { let repo = repo::Memory::unlimited(); - let mut writer = repo::create_segment_writer( - &repo, - Options { - max_records_in_commit: NonZeroU16::new(4).unwrap(), - ..<_>::default() - }, - Commit::DEFAULT_EPOCH, - 0, - ) - .unwrap(); - writer.append([0; 32]).unwrap(); - writer.append([1; 32]).unwrap(); - writer.append([2; 32]).unwrap(); - writer.commit().unwrap(); + let mut writer = repo::create_segment_writer(&repo, <_>::default(), Commit::DEFAULT_EPOCH, 0).unwrap(); + writer.commit([(0, [0; 32]), (1, [1; 32]), (2, [2; 32])]).unwrap(); + writer.flush().unwrap(); let reader = repo::open_segment_reader(&repo, DEFAULT_LOG_FORMAT_VERSION, 0).unwrap(); let header = reader.header; @@ -810,27 +801,15 @@ mod tests { fn metadata() { let repo = repo::Memory::unlimited(); - let mut writer = repo::create_segment_writer( - &repo, - Options { - max_records_in_commit: NonZeroU16::new(3).unwrap(), - ..<_>::default() - }, - Commit::DEFAULT_EPOCH, - 0, - ) - .unwrap(); + let mut writer = repo::create_segment_writer(&repo, <_>::default(), Commit::DEFAULT_EPOCH, 0).unwrap(); // Commit 0..2 - writer.append([0; 32]).unwrap(); - writer.append([0; 32]).unwrap(); - writer.commit().unwrap(); + writer.commit([(0, [0; 32]), (1, [0; 32])]).unwrap(); // Commit 2..3 - writer.append([1; 32]).unwrap(); - writer.commit().unwrap(); + writer.commit([(2, [1; 32])]).unwrap(); // Commit 3..5 - writer.append([2; 32]).unwrap(); - writer.append([2; 32]).unwrap(); - writer.commit().unwrap(); + writer.commit([(3, [2; 32]), (4, [2; 32])]).unwrap(); + + writer.flush().unwrap(); let reader = repo::open_segment_reader(&repo, DEFAULT_LOG_FORMAT_VERSION, 0).unwrap(); let metadata = reader.metadata().unwrap(); @@ -851,37 +830,32 @@ mod tests { #[test] fn commits() { let repo = repo::Memory::unlimited(); - let commits = vec![vec![[1; 32], [2; 32]], vec![[3; 32]], vec![[4; 32], [5; 32]]]; + let commits = vec![ + vec![(0, [1; 32]), (1, [2; 32])], + vec![(2, [3; 32])], + vec![(3, [4; 32]), (4, [5; 32])], + ]; - let mut writer = repo::create_segment_writer( - &repo, - Options { - max_records_in_commit: NonZeroU16::new(3).unwrap(), - ..<_>::default() - }, - Commit::DEFAULT_EPOCH, - 0, - ) - .unwrap(); + let mut writer = repo::create_segment_writer(&repo, <_>::default(), Commit::DEFAULT_EPOCH, 0).unwrap(); for commit in &commits { - for tx in commit { - writer.append(*tx).unwrap(); - } - writer.commit().unwrap(); + writer.commit(commit.clone()).unwrap(); } + writer.flush().unwrap(); + let reader = repo::open_segment_reader(&repo, DEFAULT_LOG_FORMAT_VERSION, 0).unwrap(); let mut commits1 = Vec::with_capacity(commits.len()); let mut min_tx_offset = 0; for txs in commits { + let n = txs.len(); commits1.push(Commit { min_tx_offset, - n: txs.len() as u16, - records: txs.concat(), + n: n as u16, + records: itertools::concat(txs.into_iter().map(|(_, payload)| payload.to_vec())), epoch: 0, }); - min_tx_offset += txs.len() as u64; + min_tx_offset += n as u64; } let commits2 = reader .commits() @@ -894,73 +868,25 @@ mod tests { #[test] fn transactions() { let repo = repo::Memory::unlimited(); - let commits = vec![vec![[1; 32], [2; 32]], vec![[3; 32]], vec![[4; 32], [5; 32]]]; + let commits = vec![ + vec![(0, [1; 32]), (1, [2; 32])], + vec![(2, [3; 32])], + vec![(3, [4; 32]), (4, [5; 32])], + ]; - let mut writer = repo::create_segment_writer( - &repo, - Options { - max_records_in_commit: NonZeroU16::new(3).unwrap(), - ..<_>::default() - }, - Commit::DEFAULT_EPOCH, - 0, - ) - .unwrap(); + let mut writer = repo::create_segment_writer(&repo, <_>::default(), Commit::DEFAULT_EPOCH, 0).unwrap(); for commit in &commits { - for tx in commit { - writer.append(*tx).unwrap(); - } - writer.commit().unwrap(); + writer.commit(commit.clone()).unwrap(); } + writer.flush().unwrap(); + let reader = repo::open_segment_reader(&repo, DEFAULT_LOG_FORMAT_VERSION, 0).unwrap(); let txs = reader .transactions(&ArrayDecoder) .collect::, _>>() .unwrap(); - assert_eq!( - txs, - commits - .into_iter() - .flatten() - .enumerate() - .map(|(offset, txdata)| Transaction { - offset: offset as u64, - txdata - }) - .collect::>() - ); - } - - proptest! { - #[test] - fn max_records_in_commit(max_records_in_commit in any::()) { - let mut writer = Writer { - commit: Commit::default(), - inner: BufWriter::new(Vec::new()), - - min_tx_offset: 0, - bytes_written: 0, - - max_records_in_commit, - - offset_index_head: None, - }; - - for i in 0..max_records_in_commit.get() { - assert!( - writer.append([0; 16]).is_ok(), - "less than {} records written: {}", - max_records_in_commit.get(), - i - ); - } - assert!( - writer.append([0; 16]).is_err(), - "more than {} records written", - max_records_in_commit.get() - ); - } + assert_eq!(txs, commits.into_iter().flatten().map(Into::into).collect::>()); } #[test] @@ -972,20 +898,14 @@ mod tests { min_tx_offset: 0, bytes_written: 0, - max_records_in_commit: NonZeroU16::MAX, offset_index_head: None, }; assert_eq!(0, writer.next_tx_offset()); - writer.append([0; 16]).unwrap(); - assert_eq!(0, writer.next_tx_offset()); - writer.commit().unwrap(); - assert_eq!(1, writer.next_tx_offset()); - writer.commit().unwrap(); + writer.commit([(0, [0; 16])]).unwrap(); assert_eq!(1, writer.next_tx_offset()); - writer.append([1; 16]).unwrap(); - writer.append([1; 16]).unwrap(); - writer.commit().unwrap(); + writer.commit([(1, [1; 16])]).unwrap(); + writer.commit([(2, [1; 16])]).unwrap(); assert_eq!(3, writer.next_tx_offset()); } diff --git a/crates/commitlog/src/tests/helpers.rs b/crates/commitlog/src/tests/helpers.rs index c645b2cdc65..b7740b92e5b 100644 --- a/crates/commitlog/src/tests/helpers.rs +++ b/crates/commitlog/src/tests/helpers.rs @@ -1,4 +1,4 @@ -use std::{fmt::Debug, num::NonZeroU16}; +use std::fmt::Debug; use env_logger::Env; @@ -13,7 +13,6 @@ pub fn mem_log(max_segment_size: u64) -> commitlog::Generic(ShortMem::new(800)); - let total_commits = 100; - let total_txs = fill_log_enospc(&mut log, total_commits, (1..=10).cycle()); - - assert_eq!( - total_txs, - log.transactions_from(0, &payload::ArrayDecoder) - .map(Result::unwrap) - .count() - ); - assert_eq!(total_commits, log.commits_from(0).map(Result::unwrap).count()); + for i in 0..20 { + info!("commit {i}"); + log.commit([(i, [b'z'; 32])]).expect("unexpected `Err` result"); + } } -// Note: Write errors cause the in-flight commit to be written to a fresh -// segment. So as long as we write through the public API, partial writes -// never surface (i.e. the log is contiguous). +fn fill_log(mut log: commitlog::Generic, range: Range) { + debug!("writing range {range:?}"); + + let end = range.end; + for i in range { + info!("commit {i}"); + log.commit([(i, [b'z'; 32])]).unwrap(); + } + log.flush().unwrap(); + + // Try to write one more, which should fail. + log.commit([(end, [b'x'; 32])]).unwrap(); + assert_matches!( + log.flush(), + Err(e) if e.kind() == io::ErrorKind::StorageFull + ); +} +/// Tests that, when a partial write occurs, we can read all flushed commits +/// up until the faulty one. #[test] -fn reopen() { +fn read_log_up_to_partial_write() { enable_logging(); - let repo = ShortMem::new(800); - let num_commits = 10; - - let mut total_txs = 0; - for i in 0..2 { - let mut log = open_log::<[u8; 32]>(repo.clone()); - total_txs += fill_log_enospc(&mut log, num_commits, (1..=10).cycle()); + const MAX_SEGMENT_SIZE: usize = 800; + const TXDATA_SIZE: usize = 32; + const COMMIT_SIZE: usize = Commit::FRAMING_LEN + TXDATA_SIZE; + const TOTAL_TXS: usize = MAX_SEGMENT_SIZE / COMMIT_SIZE; - debug!("fill {} done", i + 1); - } + let repo = ShortMem::new(MAX_SEGMENT_SIZE as u64); + fill_log(open_log::<[u8; TXDATA_SIZE]>(repo.clone()), 0..(TOTAL_TXS as u64)); - assert_eq!( - total_txs, - open_log::<[u8; 32]>(repo.clone()) - .transactions_from(0, &payload::ArrayDecoder) - .map(Result::unwrap) - .count() - ); + let txs = commitlog::transactions_from( + repo, + DEFAULT_LOG_FORMAT_VERSION, + 0, + &payload::ArrayDecoder::, + ) + .unwrap() + .map(Result::unwrap) + .count(); - // Let's see if we hit a funny case in any of the segments. - for offset in repo.existing_offsets().unwrap().into_iter().rev() { - let meta = repo::open_segment_reader(&repo, DEFAULT_LOG_FORMAT_VERSION, offset) - .unwrap() - .metadata() - .unwrap(); - debug!("dropping segment: segment::{meta:?}"); - repo.remove_segment(offset).unwrap(); - assert_eq!( - meta.tx_range.start, - open_log::<[u8; 32]>(repo.clone()) - .transactions_from(0, &payload::ArrayDecoder) - .map(Result::unwrap) - .count() as u64 - ); - } + assert_eq!(txs, TOTAL_TXS,); } +/// Tests: +/// +/// - fill log until a partial write occurs +/// - corrupt the last successfully written commit +/// - fill log until a partial write occurs +/// +/// The log should detect the corrupt commit, create a fresh segment, and write +/// the second batch until ENOSPC. Traversal should work. #[test] -fn overwrite_reopen() { +fn reopen_with_corrupt_last_commit() { enable_logging(); - let repo = ShortMem::new(800); - let num_commits = 10; - let txs_per_commit = 5; + const MAX_SEGMENT_SIZE: usize = 800; + const TXDATA_SIZE: usize = 32; + const COMMIT_SIZE: usize = Commit::FRAMING_LEN + TXDATA_SIZE; + const TXS_PER_SEGMENT: u64 = (MAX_SEGMENT_SIZE / COMMIT_SIZE) as u64; + const TOTAL_TXS: u64 = (TXS_PER_SEGMENT * 2) - 1; + + let repo = ShortMem::new(MAX_SEGMENT_SIZE as u64); - let mut log = open_log::<[u8; 32]>(repo.clone()); - let mut total_txs = fill_log_enospc(&mut log, num_commits, repeat(txs_per_commit)); + // Fill with as many txs as possible until ENOSPC. + fill_log(open_log::<[u8; TXDATA_SIZE]>(repo.clone()), 0..TXS_PER_SEGMENT); + // Invalidate the checksum of the last commit. let last_segment_offset = repo.existing_offsets().unwrap().last().copied().unwrap(); let last_commit: Commit = repo::open_segment_reader(&repo, DEFAULT_LOG_FORMAT_VERSION, last_segment_offset) .unwrap() @@ -97,47 +105,29 @@ fn overwrite_reopen() { .last() .unwrap() .into(); - debug!("last commit: {last_commit:?}"); - { let mut last_segment = repo.open_segment_writer(last_segment_offset).unwrap(); let pos = last_segment.len() - last_commit.encoded_len() + 1; last_segment.modify_byte_at(pos, |_| 255); } - let mut log = open_log::<[u8; 32]>(repo.clone()); - for (i, commit) in log.commits_from(0).enumerate() { - if i < num_commits - 1 { - commit.expect("all but last commit should be good"); - } else { - let last_good_offset = txs_per_commit * (num_commits - 1); - assert!( - matches!( - commit, - Err(error::Traversal::Checksum { offset, .. }) if offset == last_good_offset as u64, - ), - "expected checksum error with offset={last_good_offset}: {commit:?}" - ); - } - } - - // Write some more data. - total_txs += fill_log_enospc(&mut log, num_commits, repeat(txs_per_commit)); - // Log should be contiguous, but missing one corrupted commit. - assert_eq!( - total_txs - txs_per_commit, - log.transactions_from(0, &payload::ArrayDecoder) - .map(Result::unwrap) - .count() - ); - // Check that this is true if we reopen the log. - assert_eq!( - total_txs - txs_per_commit, - open_log::<[u8; 32]>(repo) - .transactions_from(0, &payload::ArrayDecoder) - .map(Result::unwrap) - .count() + // Write a second batch, starting with the offset of the corrupt commit. + fill_log( + open_log::<[u8; TXDATA_SIZE]>(repo.clone()), + last_commit.min_tx_offset..TOTAL_TXS, ); + + let txs = commitlog::transactions_from( + repo, + DEFAULT_LOG_FORMAT_VERSION, + 0, + &payload::ArrayDecoder::, + ) + .unwrap() + .map(Result::unwrap) + .count(); + + assert_eq!(txs as u64, TOTAL_TXS); } /// Edge case surfaced in production: @@ -154,7 +144,6 @@ fn first_commit_in_last_segment_corrupt() { let repo = repo::Memory::unlimited(); let options = Options { max_segment_size: 512, - max_records_in_commit: NonZeroU16::new(1).unwrap(), ..<_>::default() }; { @@ -180,7 +169,6 @@ fn open_log(repo: ShortMem) -> commitlog::Generic { repo, Options { max_segment_size: 1024, - max_records_in_commit: NonZeroU16::new(10).unwrap(), ..Options::default() }, ) @@ -314,38 +302,3 @@ impl Repo for ShortMem { self.inner.existing_offsets() } } - -/// Like [`crate::tests::helpers::fill_log`], but expect that ENOSPC happens at -/// least once. -fn fill_log_enospc( - log: &mut commitlog::Generic, - num_commits: usize, - txs_per_commit: impl Iterator, -) -> usize -where - T: Debug + Default + Encode, -{ - let mut seen_enospc = false; - - let mut total_txs = 0; - for (_, n) in (0..num_commits).zip(txs_per_commit) { - for _ in 0..n { - log.append(T::default()).unwrap(); - total_txs += 1; - } - let res = log.commit(); - if let Err(Some(os)) = res.as_ref().map_err(|e| e.raw_os_error()) { - if os == ENOSPC { - debug!("fill: ignoring ENOSPC"); - seen_enospc = true; - log.commit().unwrap(); - continue; - } - } - res.unwrap(); - } - - assert!(seen_enospc, "expected to see ENOSPC"); - - total_txs -} diff --git a/crates/commitlog/tests/random_payload/mod.rs b/crates/commitlog/tests/random_payload/mod.rs index 85ab653480d..49e47ed42d1 100644 --- a/crates/commitlog/tests/random_payload/mod.rs +++ b/crates/commitlog/tests/random_payload/mod.rs @@ -1,5 +1,3 @@ -use std::num::NonZeroU16; - use log::info; use spacetimedb_commitlog::tests::helpers::enable_logging; use spacetimedb_commitlog::{payload, Commitlog, Options}; @@ -18,7 +16,6 @@ fn smoke() { CommitLogDir::from_path_unchecked(root.path()), Options { max_segment_size: 8 * 1024, - max_records_in_commit: NonZeroU16::MIN, ..Options::default() }, None, @@ -27,18 +24,18 @@ fn smoke() { let n_txs = 500; let payload = gen_payload(); - for _ in 0..n_txs { - clog.append_maybe_flush(payload).unwrap(); + for i in 0..n_txs { + clog.commit([(i, payload)]).unwrap(); } let committed_offset = clog.flush_and_sync().unwrap(); - assert_eq!(n_txs - 1, committed_offset.unwrap() as usize); + assert_eq!(n_txs - 1, committed_offset.unwrap()); assert_eq!( - n_txs, + n_txs as usize, clog.transactions(&payload::ArrayDecoder).map(Result::unwrap).count() ); // We set max_records_in_commit to 1, so n_commits == n_txs - assert_eq!(n_txs, clog.commits().map(Result::unwrap).count()); + assert_eq!(n_txs as usize, clog.commits().map(Result::unwrap).count()); } #[test] @@ -48,7 +45,6 @@ fn resets() { CommitLogDir::from_path_unchecked(root.path()), Options { max_segment_size: 512, - max_records_in_commit: NonZeroU16::MIN, ..Options::default() }, None, @@ -56,8 +52,8 @@ fn resets() { .unwrap(); let payload = gen_payload(); - for _ in 0..50 { - clog.append_maybe_flush(payload).unwrap(); + for i in 0..50 { + clog.commit([(i, payload)]).unwrap(); } clog.flush_and_sync().unwrap(); @@ -88,7 +84,6 @@ fn compression() { CommitLogDir::from_path_unchecked(root.path()), Options { max_segment_size: 8 * 1024, - max_records_in_commit: NonZeroU16::MIN, ..Options::default() }, None, @@ -98,8 +93,8 @@ fn compression() { // try to generate commitlogs that will be amenable to compression - // random data doesn't compress well, so try and have there be repetition let payloads = (0..4).map(|_| gen_payload()).cycle().take(1024).collect::>(); - for payload in &payloads { - clog.append_maybe_flush(*payload).unwrap(); + for (i, payload) in payloads.iter().enumerate() { + clog.commit([(i as u64, *payload)]).unwrap(); } clog.flush_and_sync().unwrap(); diff --git a/crates/commitlog/tests/streaming/mod.rs b/crates/commitlog/tests/streaming/mod.rs index 0474ae75b30..3c2077bacaf 100644 --- a/crates/commitlog/tests/streaming/mod.rs +++ b/crates/commitlog/tests/streaming/mod.rs @@ -1,6 +1,6 @@ use std::{ io, - num::{NonZeroU16, NonZeroU64}, + num::NonZeroU64, ops::RangeBounds, path::{Path, PathBuf}, }; @@ -183,7 +183,6 @@ async fn assert_equal_dirs(src: &Path, dst: &Path) { fn default_options() -> Options { Options { max_segment_size: 8 * 1024, - max_records_in_commit: NonZeroU16::MIN, // Write an index entry for every commit. offset_index_interval_bytes: NonZeroU64::new(256).unwrap(), offset_index_require_segment_fsync: false, @@ -195,8 +194,8 @@ async fn fill_log(path: PathBuf) { spawn_blocking(move || { let clog = Commitlog::open(CommitLogDir::from_path_unchecked(path), default_options(), None).unwrap(); let payload = random_payload::gen_payload(); - for _ in 0..100 { - clog.append_maybe_flush(payload).unwrap(); + for i in 0..100 { + clog.commit([(i, payload)]).unwrap(); } clog.flush_and_sync().unwrap(); }) diff --git a/crates/core/src/db/durability.rs b/crates/core/src/db/durability.rs index 0ea88bdb4b9..1476b56edbf 100644 --- a/crates/core/src/db/durability.rs +++ b/crates/core/src/db/durability.rs @@ -8,7 +8,7 @@ use spacetimedb_commitlog::payload::{ }; use spacetimedb_data_structures::map::IntSet; use spacetimedb_datastore::{execution_context::ReducerContext, traits::TxData}; -use spacetimedb_durability::{DurableOffset, TxOffset}; +use spacetimedb_durability::{DurableOffset, Transaction, TxOffset}; use spacetimedb_lib::Identity; use spacetimedb_primitives::TableId; use tokio::{ @@ -199,14 +199,14 @@ impl DurabilityWorkerActor { } pub fn do_durability(durability: &Durability, reducer_context: Option, tx_data: &TxData) { - if tx_data.tx_offset().is_none() { + let Some(tx_offset) = tx_data.tx_offset() else { let name = reducer_context.as_ref().map(|rcx| &*rcx.name); debug_assert!( !tx_data.has_rows_or_connect_disconnect(name), "tx_data has no rows but has connect/disconnect: `{name:?}`" ); return; - } + }; let is_persistent_table = |table_id: &TableId| -> bool { !tx_data.is_ephemeral_table(table_id) }; @@ -252,9 +252,14 @@ impl DurabilityWorkerActor { }), }; - // TODO: Should measure queuing time + actual write // This does not block, as per trait docs. - durability.append_tx(txdata); + durability.commit( + [Transaction { + offset: tx_offset, + txdata, + }] + .into(), + ); } } @@ -292,9 +297,12 @@ mod tests { impl spacetimedb_durability::Durability for CountingDurability { type TxData = Txdata; - fn append_tx(&self, _tx: Self::TxData) { + fn commit(&self, txs: Box<[Transaction]>) { + let Some(max_offset) = txs.iter().map(|x| x.offset).max() else { + return; + }; self.appended.send_modify(|offset| { - *offset = offset.map(|x| x + 1).or(Some(0)); + offset.replace(max_offset); }); } diff --git a/crates/core/src/db/relational_db.rs b/crates/core/src/db/relational_db.rs index b409a8121cd..60f8e6bea97 100644 --- a/crates/core/src/db/relational_db.rs +++ b/crates/core/src/db/relational_db.rs @@ -3,7 +3,7 @@ use crate::db::MetricsRecorderQueue; use crate::error::{DBError, RestoreSnapshotError}; use crate::messages::control_db::HostType; use crate::subscription::ExecutionCounters; -use crate::util::{asyncify, spawn_rayon}; +use crate::util::asyncify; use crate::worker_metrics::WORKER_METRICS; use anyhow::{anyhow, Context}; use enum_map::EnumMap; @@ -1716,7 +1716,6 @@ pub async fn local_durability( snapshot_worker: Option<&SnapshotWorker>, ) -> Result<(LocalDurability, DiskSizeFn), DBError> { let rt = tokio::runtime::Handle::current(); - // TODO: Should this better be spawn_blocking? let on_new_segment = snapshot_worker.map(|snapshot_worker| { let snapshot_worker = snapshot_worker.clone(); Arc::new(move || { @@ -1725,17 +1724,11 @@ pub async fn local_durability( snapshot_worker.request_snapshot_ignore_closed(); }) as Arc }); - let local = spawn_rayon(move || { + let local = asyncify(move || { durability::Local::open( replica_dir.clone(), rt, - durability::local::Options { - commitlog: commitlog::Options { - max_records_in_commit: 1.try_into().unwrap(), - ..Default::default() - }, - ..Default::default() - }, + <_>::default(), // Give the durability a handle to request a new snapshot run, // which it will send down whenever we rotate commitlog segments. on_new_segment, diff --git a/crates/core/src/subscription/module_subscription_actor.rs b/crates/core/src/subscription/module_subscription_actor.rs index 38df7cea554..5d2c494cba2 100644 --- a/crates/core/src/subscription/module_subscription_actor.rs +++ b/crates/core/src/subscription/module_subscription_actor.rs @@ -1466,7 +1466,7 @@ mod tests { use spacetimedb_commitlog::{commitlog, repo}; use spacetimedb_data_structures::map::{HashCollectionExt as _, HashMap}; use spacetimedb_datastore::system_tables::{StRowLevelSecurityRow, ST_ROW_LEVEL_SECURITY_ID}; - use spacetimedb_durability::{Durability, EmptyHistory, TxOffset}; + use spacetimedb_durability::{Durability, EmptyHistory, Transaction, TxOffset}; use spacetimedb_execution::dml::MutDatastore; use spacetimedb_lib::bsatn::ToBsatn; use spacetimedb_lib::db::auth::StAccess; @@ -1551,13 +1551,10 @@ mod tests { impl Durability for ManualDurability { type TxData = Txdata; - fn append_tx(&self, tx: Self::TxData) { + fn commit(&self, txs: Box<[Transaction]>) { let mut commitlog = self.commitlog.write().unwrap(); - if let Err(tx) = commitlog.append(tx) { - commitlog.commit().expect("error flushing commitlog"); - commitlog.append(tx).expect("should be able to append after flush"); - } - commitlog.commit().expect("error flushing commitlog"); + commitlog.commit(txs).expect("commit failed"); + commitlog.flush().expect("error flushing commitlog"); } fn durable_tx_offset(&self) -> spacetimedb_durability::DurableOffset { diff --git a/crates/durability/src/imp/local.rs b/crates/durability/src/imp/local.rs index e3a8afa34ac..f0c6daf474f 100644 --- a/crates/durability/src/imp/local.rs +++ b/crates/durability/src/imp/local.rs @@ -1,7 +1,5 @@ use std::{ io, - num::NonZeroU16, - panic, path::PathBuf, sync::{ atomic::{AtomicU64, Ordering::Relaxed}, @@ -80,7 +78,7 @@ pub struct Local { /// [`PersisterTask`]. /// /// Note that this is unbounded! - queue: mpsc::UnboundedSender>, + queue: mpsc::UnboundedSender>]>>, /// How many transactions are sitting in the `queue`. /// /// This is mainly for observability purposes, and can thus be updated with @@ -132,7 +130,6 @@ impl Local { queue_depth: queue_depth.clone(), sync_interval: opts.sync_interval, - max_records_in_commit: opts.commitlog.max_records_in_commit, lock, } @@ -191,7 +188,6 @@ struct Actor { queue_depth: Arc, sync_interval: Duration, - max_records_in_commit: NonZeroU16, #[allow(unused)] lock: Lock, @@ -201,7 +197,7 @@ impl Actor { #[instrument(name = "durability::local::actor", skip_all)] async fn run( self, - mut txdata_rx: mpsc::UnboundedReceiver>, + mut commits_rx: mpsc::UnboundedReceiver>]>>, mut shutdown_rx: mpsc::Receiver>, ) { info!("starting durability actor"); @@ -224,7 +220,7 @@ impl Actor { biased; Some(reply) = shutdown_rx.recv() => { - txdata_rx.close(); + commits_rx.close(); let _ = reply.send(self.lock.notified()); }, @@ -235,21 +231,16 @@ impl Actor { } }, - data = txdata_rx.recv() => { - let Some(txdata) = data else { + commit = commits_rx.recv() => { + let Some(commit) = commit else { break; }; self.queue_depth.fetch_sub(1, Relaxed); - // If we are writing one commit per tx, trying to buffer is - // fairly pointless. Immediately flush instead. - // - // Otherwise, try `Commitlog::append` as a fast-path - // that doesn't require `spawn_blocking`. - if self.max_records_in_commit.get() == 1 { - self.flush_append(txdata, true).await; - } else if let Err(retry) = self.clog.append(txdata) { - self.flush_append(retry, false).await - } + let clog = self.clog.clone(); + spawn_blocking(move || clog.commit(commit)) + .await + .expect("commitlog write panicked") + .expect("commitlog write failed"); }, } } @@ -261,30 +252,6 @@ impl Actor { info!("exiting durability actor"); } - #[instrument(skip_all)] - async fn flush_append(&self, txdata: Txdata, flush_after: bool) { - let clog = self.clog.clone(); - let span = Span::current(); - spawn_blocking(move || { - let _span = span.enter(); - let mut retry = Some(txdata); - while let Some(txdata) = retry.take() { - if let Err(error::Append { txdata, source }) = clog.append_maybe_flush(txdata) { - flush_error("append-maybe-flush", &source); - retry = Some(txdata); - } - } - - if flush_after { - clog.flush() - .map(drop) - .unwrap_or_else(|e| flush_error("flush-after", &e)); - } - }) - .await - .expect("commitlog append blocking task panicked") - } - #[instrument(skip_all)] async fn flush_and_sync(&self) -> io::Result> { // Skip if nothing changed. @@ -302,7 +269,7 @@ impl Actor { }) .await .expect("commitlog flush-and-sync blocking task panicked") - .inspect_err(|e| flush_error("flush-and-sync", e)) + .inspect_err(|e| warn!("error flushing commitlog: {e:#}")) .inspect(|maybe_offset| { if let Some(new_offset) = maybe_offset { trace!("synced to offset {new_offset}"); @@ -342,25 +309,11 @@ impl Drop for Lock { } } -/// Handle an error flushing the commitlog. -/// -/// Panics if the error indicates that the log may be permanently unwritable. -#[inline] -#[track_caller] -fn flush_error(task: &str, e: &io::Error) { - warn!("error flushing commitlog ({task}): {e:?}"); - if matches!(e.kind(), io::ErrorKind::AlreadyExists | io::ErrorKind::StorageFull) { - panic!("{e}"); - } -} - impl Durability for Local { type TxData = Txdata; - fn append_tx(&self, tx: Self::TxData) { - if self.queue.send(tx).is_err() { - panic!("durability actor crashed"); - } + fn commit(&self, txs: Box<[Transaction]>) { + self.queue.send(txs).expect("durability actor crashed"); self.queue_depth.fetch_add(1, Relaxed); } diff --git a/crates/durability/src/imp/mod.rs b/crates/durability/src/imp/mod.rs index 6f83106d0b3..562b9b56eab 100644 --- a/crates/durability/src/imp/mod.rs +++ b/crates/durability/src/imp/mod.rs @@ -15,7 +15,7 @@ mod testing { use futures::FutureExt as _; use tokio::sync::watch; - use crate::{Close, Durability, DurableOffset, TxOffset}; + use crate::{Close, Durability, DurableOffset, Transaction, TxOffset}; /// A [`Durability`] impl that sends all transactions into the void. /// @@ -41,7 +41,7 @@ mod testing { impl Durability for NoDurability { type TxData = T; - fn append_tx(&self, _: Self::TxData) { + fn commit(&self, _: Box<[Transaction]>) { if self.closed.load(Ordering::Relaxed) { panic!("`close` was called on this `NoDurability` instance"); } diff --git a/crates/durability/src/lib.rs b/crates/durability/src/lib.rs index 2038d90eeb9..d4ca740e36a 100644 --- a/crates/durability/src/lib.rs +++ b/crates/durability/src/lib.rs @@ -95,28 +95,32 @@ pub type Close = BoxFuture<'static, Option>; /// /// NOTE: This is a preliminary definition, still under consideration. /// -/// A durability implementation accepts a payload representing a single database -/// transaction via [`Durability::append_tx`] in a non-blocking fashion. The -/// payload _should_ become durable eventually. [`TxOffset`]s reported by -/// [`Durability::durable_tx_offset`] shall be considered durable to the -/// extent the implementation can guarantee. +/// A durability implementation accepts one or more [Transaction]s to be made +/// durable via [Durability::commit] in a non-blocking fashion. +/// +/// A batch of transactions is eventually made durable atomically. +/// Note that this means that a torn write can render the whole batch +/// inaccessible, so small batches are usually preferable. +/// +/// Once a transaction becomes durable, the [DurableOffset] is updated. +/// What durable means depends on the implementation, informally it can be +/// thought of as "written to disk". pub trait Durability: Send + Sync { /// The payload representing a single transaction. type TxData; - /// Submit the transaction payload to be made durable. + /// Submit a batch of [Transaction]s to be made durable. /// /// This method must never block, and accept new transactions even if they /// cannot be made durable immediately. /// - /// A permanent failure of the durable storage may be signalled by panicking. - fn append_tx(&self, tx: Self::TxData); + /// Errors may be signalled by panicking. + // + // TODO(perf): Can we avoid allocating a new `Box<[_]>` for every commit, + // or at least reuse boxes? + fn commit(&self, txs: Box<[Transaction]>); - /// The [`TxOffset`] considered durable. - /// - /// A `None` return value indicates that the durable offset is not known, - /// either because nothing has been persisted yet, or because the status - /// cannot be retrieved. + /// Obtain a handle to the [DurableOffset]. fn durable_tx_offset(&self) -> DurableOffset; /// Asynchronously request the durability to shut down, without dropping it. diff --git a/crates/durability/tests/io/fallocate.rs b/crates/durability/tests/io/fallocate.rs index 5dc67dd13d3..4a484c42823 100644 --- a/crates/durability/tests/io/fallocate.rs +++ b/crates/durability/tests/io/fallocate.rs @@ -98,8 +98,8 @@ async fn local_durability_crashes_on_new_segment_if_not_enough_space() { // Mark initial segment as seen. new_segment_rx.borrow_and_update(); // Write past available space. - for _ in 0..256 { - durability.append_tx(txdata.clone()); + for offset in 0..256 { + durability.commit([(offset, txdata.clone()).into()].into()); } // Ensure new segment is created. new_segment_rx.changed().await?; @@ -107,7 +107,7 @@ async fn local_durability_crashes_on_new_segment_if_not_enough_space() { sleep(Duration::from_millis(5)).await; // Durability actor should have crashed, so this should panic. info!("trying append on crashed durability"); - durability.append_tx(txdata.clone()); + durability.commit([(256, txdata.clone()).into()].into()); } Ok(()) @@ -168,7 +168,6 @@ async fn local_durability( spacetimedb_durability::local::Options { commitlog: spacetimedb_commitlog::Options { max_segment_size, - max_records_in_commit: 1.try_into().unwrap(), preallocate_segments: true, ..<_>::default() },