Skip to content
270 changes: 153 additions & 117 deletions crates/commitlog/src/commitlog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,81 +107,32 @@ impl<R: Repo, T> Generic<R, T> {

/// Update the current epoch.
///
/// Calls [`Self::commit`] to flush all data of the previous epoch, and
/// returns the result.
///
/// Does nothing if the given `epoch` is equal to the current epoch.
///
/// # Errors
///
/// If `epoch` is smaller than the current epoch, an error of kind
/// [`io::ErrorKind::InvalidInput`] is returned.
///
/// Also see [`Self::commit`].
pub fn set_epoch(&mut self, epoch: u64) -> io::Result<Option<Committed>> {
use std::cmp::Ordering::*;

match epoch.cmp(&self.head.epoch()) {
Less => Err(io::Error::new(
pub fn set_epoch(&mut self, epoch: u64) -> io::Result<()> {
if epoch < self.head.epoch() {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"new epoch is smaller than current epoch",
)),
Equal => Ok(None),
Greater => {
let res = self.commit()?;
self.head.set_epoch(epoch);
Ok(res)
}
));
}
}

/// Write the currently buffered data to storage and rotate segments as
/// necessary.
///
/// Note that this does not imply that the data is durable, in particular
/// when a filesystem storage backend is used. Call [`Self::sync`] to flush
/// any OS buffers to stable storage.
///
/// # Errors
///
/// If an error occurs writing the data, the current [`Commit`] buffer is
/// retained, but a new segment is created. Retrying in case of an `Err`
/// return value thus will write the current data to that new segment.
///
/// If this fails, however, the next attempt to create a new segment will
/// fail with [`io::ErrorKind::AlreadyExists`]. Encountering this error kind
/// this means that something is seriously wrong underlying storage, and the
/// caller should stop writing to the log.
pub fn commit(&mut self) -> io::Result<Option<Committed>> {
self.panicked = true;
let writer = &mut self.head;
let sz = writer.commit.encoded_len();
// If the segment is empty, but the commit exceeds the max size,
// we got a huge commit which needs to be written even if that
// results in a huge segment.
let should_rotate = !writer.is_empty() && writer.len() + sz as u64 > self.opts.max_segment_size;
let writer = if should_rotate {
self.sync();
self.start_new_segment()?
} else {
writer
};

let ret = writer.commit().or_else(|e| {
warn!("Commit failed: {e}");
// Nb.: Don't risk a panic by calling `self.sync()`.
// We already gave up on the last commit, and will retry it next time.
self.start_new_segment()?;
Err(e)
});
self.panicked = false;
ret
self.head.set_epoch(epoch);
Ok(())
}

/// Force the currently active segment to be flushed to storage.
///
/// Using a filesystem backend, this means to call `fsync(2)`.
///
/// **Note** that this does not flush the buffered data from calls to
/// [Self::commit], it only instructs the underlying storage to flush its
/// buffers. Call [Self::flush] prior to this method to ensure data from
/// all previous [Self::commit] calls is flushed to the underlying storage.
///
/// # Panics
///
/// As an `fsync` failure leaves a file in a more of less undefined state,
Expand All @@ -195,6 +146,22 @@ impl<R: Repo, T> Generic<R, T> {
self.panicked = false;
}

/// Flush the buffered data from previous calls to [Self::commit] to the
/// underlying storage.
///
/// Call [Self::sync] to instruct the underlying storage to flush its
/// buffers as well.
pub fn flush(&mut self) -> io::Result<()> {
self.head.flush()
}

/// Calls [Self::flush] and then [Self::sync].
fn flush_and_sync(&mut self) -> io::Result<()> {
self.flush()?;
self.sync();
Ok(())
}

/// The last transaction offset written to disk, or `None` if nothing has
/// been written yet.
///
Expand Down Expand Up @@ -303,8 +270,63 @@ impl<R: Repo, T> Generic<R, T> {
}

impl<R: Repo, T: Encode> Generic<R, T> {
pub fn append(&mut self, record: T) -> Result<(), T> {
self.head.append(record)
/// Write `transactions` to the log.
///
/// This will store all `transactions` as a single [Commit]
/// (note that `transactions` must not yield more than [u16::MAX] elements).
///
/// Data is buffered by the underlying segment [Writer].
/// Call [Self::flush] to force flushing to the underlying storage.
///
/// If, after writing the transactions, the writer's total written bytes
/// exceed [Options::max_segment_size], the current segment is flushed,
/// `fsync`ed and closed, and a new segment is created.
///
/// Returns `Ok(None)` if `transactions` was empty, otherwise [Committed],
/// which contains the offset range and checksum of the commit.
///
/// Note that supplying empty `transactions` may cause the current segment
/// to be rotated.
///
/// # Errors
///
/// An `Err` value is returned in the following cases:
///
/// - if the transaction sequence is invalid, e.g. because the transaction
/// offsets are not contiguous.
///
/// In this case, **none** of the `transactions` will be written.
///
/// - if creating the new segment fails due to an I/O error.
///
/// # Panics
///
/// The method panics if:
///
/// - `transactions` exceeds [u16::MAX] elements
///
/// - [Self::flush] or writing to the underlying [Writer] fails
///
/// This is likely caused by some storage issue. As we cannot tell with
/// certainty how much data (if any) has been written, the internal state
/// becomes invalid and thus a panic is raised.
///
/// - [Self::sync] panics (called when rotating segments)
pub fn commit<U: Into<Transaction<T>>>(
&mut self,
transactions: impl IntoIterator<Item = U>,
) -> io::Result<Option<Committed>> {
self.panicked = true;
let writer = &mut self.head;
let committed = writer.commit(transactions)?;
if writer.len() >= self.opts.max_segment_size {
self.flush().expect("failed to flush segment");
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seemed a bit surprising to me at first -- but the BufWriter has no way of knowing how many bytes did make it. So if flush fails, the buffer is basically garbage.

self.sync();
self.start_new_segment()?;
}
self.panicked = false;

Ok(committed)
}

pub fn transactions_from<'a, D>(
Expand Down Expand Up @@ -348,8 +370,8 @@ impl<R: Repo, T: Encode> Generic<R, T> {
impl<R: Repo, T> Drop for Generic<R, T> {
fn drop(&mut self) {
if !self.panicked {
if let Err(e) = self.head.commit() {
warn!("failed to commit on drop: {e}");
if let Err(e) = self.flush_and_sync() {
warn!("failed to flush on drop: {e:#}");
}
}
}
Expand Down Expand Up @@ -920,7 +942,7 @@ fn range_is_empty(range: &impl RangeBounds<u64>) -> bool {

#[cfg(test)]
mod tests {
use std::{cell::Cell, iter::repeat, num::NonZeroU16};
use std::{cell::Cell, iter::repeat};

use pretty_assertions::assert_matches;

Expand All @@ -933,30 +955,31 @@ mod tests {
#[test]
fn rotate_segments_simple() {
let mut log = mem_log::<[u8; 32]>(128);
for _ in 0..3 {
log.append([0; 32]).unwrap();
log.commit().unwrap();
for i in 0..4 {
log.commit([(i, [0; 32])]).unwrap();
}
log.flush_and_sync().unwrap();

let offsets = log.repo.existing_offsets().unwrap();
assert_eq!(&offsets[..offsets.len() - 1], &log.tail);
assert_eq!(offsets[offsets.len() - 1], 2);
// TODO: We overshoot the max segment size.
assert_eq!(&offsets, &[0, 3]);
}

#[test]
fn huge_commit() {
let mut log = mem_log::<[u8; 32]>(32);

log.append([0; 32]).unwrap();
log.append([1; 32]).unwrap();
log.commit().unwrap();
assert!(log.head.len() > log.opts.max_segment_size);
log.commit([(0, [0; 32]), (1, [1; 32])]).unwrap();
log.flush_and_sync().unwrap();
// First segment got rotated out.
assert_eq!(&log.tail, &[0]);

log.append([2; 32]).unwrap();
log.commit().unwrap();
log.commit([(2, [2; 32])]).unwrap();
log.flush_and_sync().unwrap();

assert_eq!(&log.tail, &[0]);
assert_eq!(&log.repo.existing_offsets().unwrap(), &[0, 2]);
// Second segment got rotated out and segment 3 is created.
assert_eq!(&log.repo.existing_offsets().unwrap(), &[0, 2, 3]);
}

#[test]
Expand Down Expand Up @@ -1052,14 +1075,31 @@ mod tests {
fn traverse_commits_ignores_duplicates() {
let mut log = mem_log::<[u8; 32]>(1024);

log.append([42; 32]).unwrap();
let commit1 = log.head.commit.clone();
log.commit().unwrap();
log.head.commit = commit1.clone();
log.commit().unwrap();
log.append([43; 32]).unwrap();
let commit2 = log.head.commit.clone();
log.commit().unwrap();
let tx1 = [42u8; 32];
let tx2 = [43u8; 32];

log.commit([(0, tx1)]).unwrap();
let commit1 = Commit {
min_tx_offset: 0,
n: 1,
records: tx1.to_vec(),
..log.head.commit.clone()
};

// Reset the commit offset, so we can write the same commit twice.
log.head.commit.min_tx_offset = 0;
log.commit([(0, tx1)]).unwrap();

// Write another one.
log.commit([(1, tx2)]).unwrap();
let commit2 = Commit {
min_tx_offset: 1,
n: 1,
records: tx2.to_vec(),
..log.head.commit.clone()
};

log.flush_and_sync().unwrap();

assert_eq!(
[commit1, commit2].as_slice(),
Expand All @@ -1074,15 +1114,14 @@ mod tests {
fn traverse_commits_errors_when_forked() {
let mut log = mem_log::<[u8; 32]>(1024);

log.append([42; 32]).unwrap();
log.commit().unwrap();
log.head.commit = Commit {
min_tx_offset: 0,
n: 1,
records: [43; 32].to_vec(),
epoch: 0,
};
log.commit().unwrap();
log.commit([(0, [42; 32])]).unwrap();
// Reset the commit offset,
// and write a different commit at the same offset.
// This is considered a fork.
log.head.commit.min_tx_offset = 0;
log.commit([(0, [43; 32])]).unwrap();

log.flush_and_sync().unwrap();

let res = log.commits_from(0).collect::<Result<Vec<_>, _>>();
assert!(
Expand All @@ -1095,11 +1134,11 @@ mod tests {
fn traverse_commits_errors_when_offset_not_contiguous() {
let mut log = mem_log::<[u8; 32]>(1024);

log.append([42; 32]).unwrap();
log.commit().unwrap();
log.commit([(0, [42; 32])]).unwrap();
log.head.commit.min_tx_offset = 18;
log.append([42; 32]).unwrap();
log.commit().unwrap();
log.commit([(18, [42; 32])]).unwrap();

log.flush_and_sync().unwrap();

let res = log.commits_from(0).collect::<Result<Vec<_>, _>>();
assert!(
Expand All @@ -1111,7 +1150,7 @@ mod tests {
prev_error: None
})
),
"expected fork error: {res:?}"
"expected out-of-order error: {res:?}"
)
}

Expand Down Expand Up @@ -1221,7 +1260,7 @@ mod tests {
#[test]
fn reopen() {
let mut log = mem_log::<[u8; 32]>(1024);
let mut total_txs = fill_log(&mut log, 100, (1..=10).cycle());
let total_txs = fill_log(&mut log, 100, (1..=10).cycle());
assert_eq!(
total_txs,
log.transactions_from(0, &ArrayDecoder).map(Result::unwrap).count()
Expand All @@ -1231,12 +1270,11 @@ mod tests {
log.repo.clone(),
Options {
max_segment_size: 1024,
max_records_in_commit: NonZeroU16::new(10).unwrap(),
..Options::default()
},
)
.unwrap();
total_txs += fill_log(&mut log, 100, (1..=10).cycle());
let total_txs = fill_log(&mut log, 100, (1..=10).cycle());

assert_eq!(
total_txs,
Expand All @@ -1245,24 +1283,22 @@ mod tests {
}

#[test]
fn set_same_epoch_does_nothing() {
fn set_new_epoch() {
let mut log = Generic::<_, [u8; 32]>::open(repo::Memory::unlimited(), <_>::default()).unwrap();
assert_eq!(log.epoch(), Commit::DEFAULT_EPOCH);
let committed = log.set_epoch(Commit::DEFAULT_EPOCH).unwrap();
assert_eq!(committed, None);
}

#[test]
fn set_new_epoch_commits() {
let mut log = Generic::<_, [u8; 32]>::open(repo::Memory::unlimited(), <_>::default()).unwrap();
assert_eq!(log.epoch(), Commit::DEFAULT_EPOCH);
log.append(<_>::default()).unwrap();
let committed = log
.set_epoch(42)
.unwrap()
.expect("should have committed the pending transaction");
log.commit([(0, [12; 32])]).unwrap();
log.set_epoch(42).unwrap();
assert_eq!(log.epoch(), 42);
assert_eq!(committed.tx_range.start, 0);
log.commit([(1, [13; 32])]).unwrap();

log.flush_and_sync().unwrap();

let epochs = log
.commits_from(0)
.map(Result::unwrap)
.map(|commit| commit.epoch)
.collect::<Vec<_>>();
assert_eq!(&[Commit::DEFAULT_EPOCH, 42], epochs.as_slice());
}

#[test]
Expand Down
Loading
Loading