Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 70 additions & 7 deletions crates/common/src/spine/tcache/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,11 @@ pub(super) struct Buckets {
bucket_mask: u64,
// max difference between head and tail, before 'forced' eviction
lag_threshold: u64,
// Out-of-order acquire lookback: the tail never advances within this
// many seqs of the newest acquire's bucket, so late acquires up to
// this far behind still land at or above the tail. 10% of capacity,
// rounded up to a bucket.
guard: u64,
}

impl Buckets {
Expand All @@ -277,24 +282,35 @@ impl Buckets {
bucket_shift: bucket_size.trailing_zeros() as u64,
bucket_mask: !(bucket_size - 1),
lag_threshold: lag_threshold(cache_capacity as u32),
guard: (cache_capacity / 10).next_multiple_of(bucket_size).max(bucket_size),
}
}

fn acquire(&mut self, seq: u64) {
// Out-of-order acquire beyond the guard window: the producer may
// already be reclaiming this slot, and bucket_index aliases behind
// the tail onto in-window buckets — counting it would corrupt a
// live bucket (the matching release is dropped below tail). Skip;
// the read surfaces as StaleSeq at buffer() time.
if self.tail_seq != u64::MAX && seq < self.tail_seq {
tracing::warn!(seq, tail = self.tail_seq, head = self.head_seq, "acquire below tail");
return;
}

let bucket_idx = self.bucket_index(seq);
self.buckets[bucket_idx] += 1;

self.head_seq = self.head_seq.max(seq);

if self.tail_seq == u64::MAX {
self.tail_seq = self.bucket_start_seq(seq);
self.tail_seq = self.bucket_start_seq(seq).saturating_sub(self.guard);
}

// rollup tail for completed buckets - maintain at least 1 bucket between
// head and tail to avoid any boundary cases where we might roll the tail before
// an entry is acquired.
// Rollup tail for completed buckets — keep `guard` seqs of slack
// below the newest acquire's bucket so bounded out-of-order
// acquires never land behind the tail.
let head_bucket_seq = self.bucket_start_seq(seq);
while head_bucket_seq > (self.tail_seq + self.bucket_size) {
while head_bucket_seq > (self.tail_seq + self.guard) {
let tail_bucket = self.bucket_index(self.tail_seq);
if self.head_seq - self.tail_seq > self.lag_threshold {
tracing::warn!(
Expand Down Expand Up @@ -356,11 +372,58 @@ mod tests {
fn buckets_acquire_initialises_tail_to_bucket_boundary() {
let mut b = Buckets::new(64, 1024);
b.acquire(100);
// 100 lives in bucket 1 (64..128). Tail is bucket-aligned to 64.
assert_eq!(b.tail_seq, 64);
// 100 lives in bucket 1 (64..128); tail starts `guard` (10% of
// 1024 → 128) below its bucket start, saturating at 0.
assert_eq!(b.tail_seq, 0);
assert_eq!(b.head_seq, 100);
}

/// A late acquire within the guard window (10% of capacity behind the
/// newest acquire) must land at or above the tail and be tracked.
#[test]
fn buckets_out_of_order_acquire_within_guard() {
// guard = (1024/10).next_multiple_of(64) = 128.
let mut b = Buckets::new(64, 1024);
b.acquire(0);
b.release(0, "");
b.acquire(300);
// Tail rolled over the released bucket but held 128 back from
// bucket_start(300) = 256.
assert_eq!(b.tail_seq, 128);
// Late low acquire inside the window: tracked, not dropped.
b.acquire(200);
assert!(b.tail_seq <= 200);
b.release(200, "");
b.release(300, "");
}

/// An acquire below the tail (out-of-order beyond the guard window)
/// must not be counted: bucket_index aliases behind-tail seqs onto
/// live buckets and the matching release is dropped, so counting
/// would permanently stall the tail on a phantom holder.
#[test]
fn buckets_acquire_below_tail_dropped_without_corruption() {
let mut b = Buckets::new(64, 1024); // guard 128, lag threshold 921
b.acquire(0);
b.release(0, "");
b.acquire(1000); // forces tail well past bucket 0 (tail = 832)
let tail = b.tail_seq;
assert!(tail >= 128);

// Below-tail acquire: dropped (warn), tail untouched.
b.acquire(1);
assert_eq!(b.tail_seq, tail);
// Its release is the existing below-tail no-op.
b.release(1, "");

// seq 1 aliases onto the same ring bucket as seqs 1024..1088; had
// the acquire been counted, the tail would stall there forever
// (head - tail stays under the lag threshold, so no force-evict).
b.release(1000, "");
b.acquire(1600);
assert!(b.tail_seq > 1024, "tail stalled at {} on a phantom holder", b.tail_seq);
}

#[test]
fn buckets_release_advances_tail_when_head_is_ahead() {
let mut b = Buckets::new(64, 1024);
Expand Down
Loading