From a1f7d4d2e2ede6d865221835008c2257be38c500 Mon Sep 17 00:00:00 2001 From: vladimir-ea Date: Fri, 12 Jun 2026 17:53:08 +0100 Subject: [PATCH] ra consumer may acquire+ release out of order --- crates/common/src/spine/tcache/consumer.rs | 77 ++++++++++++++++++++-- 1 file changed, 70 insertions(+), 7 deletions(-) diff --git a/crates/common/src/spine/tcache/consumer.rs b/crates/common/src/spine/tcache/consumer.rs index bcc483f..caf10f7 100644 --- a/crates/common/src/spine/tcache/consumer.rs +++ b/crates/common/src/spine/tcache/consumer.rs @@ -260,6 +260,11 @@ pub(super) struct Buckets { bucket_mask: u64, // max difference between head and tail, before 'forced' eviction lag_threshold: u64, + // Out-of-order acquire lookback: the tail never advances within this + // many seqs of the newest acquire's bucket, so late acquires up to + // this far behind still land at or above the tail. 10% of capacity, + // rounded up to a bucket. + guard: u64, } impl Buckets { @@ -277,24 +282,35 @@ impl Buckets { bucket_shift: bucket_size.trailing_zeros() as u64, bucket_mask: !(bucket_size - 1), lag_threshold: lag_threshold(cache_capacity as u32), + guard: (cache_capacity / 10).next_multiple_of(bucket_size).max(bucket_size), } } fn acquire(&mut self, seq: u64) { + // Out-of-order acquire beyond the guard window: the producer may + // already be reclaiming this slot, and bucket_index aliases behind + // the tail onto in-window buckets — counting it would corrupt a + // live bucket (the matching release is dropped below tail). Skip; + // the read surfaces as StaleSeq at buffer() time. + if self.tail_seq != u64::MAX && seq < self.tail_seq { + tracing::warn!(seq, tail = self.tail_seq, head = self.head_seq, "acquire below tail"); + return; + } + let bucket_idx = self.bucket_index(seq); self.buckets[bucket_idx] += 1; self.head_seq = self.head_seq.max(seq); if self.tail_seq == u64::MAX { - self.tail_seq = self.bucket_start_seq(seq); + self.tail_seq = self.bucket_start_seq(seq).saturating_sub(self.guard); } - // rollup tail for completed buckets - maintain at least 1 bucket between - // head and tail to avoid any boundary cases where we might roll the tail before - // an entry is acquired. + // Rollup tail for completed buckets — keep `guard` seqs of slack + // below the newest acquire's bucket so bounded out-of-order + // acquires never land behind the tail. let head_bucket_seq = self.bucket_start_seq(seq); - while head_bucket_seq > (self.tail_seq + self.bucket_size) { + while head_bucket_seq > (self.tail_seq + self.guard) { let tail_bucket = self.bucket_index(self.tail_seq); if self.head_seq - self.tail_seq > self.lag_threshold { tracing::warn!( @@ -356,11 +372,58 @@ mod tests { fn buckets_acquire_initialises_tail_to_bucket_boundary() { let mut b = Buckets::new(64, 1024); b.acquire(100); - // 100 lives in bucket 1 (64..128). Tail is bucket-aligned to 64. - assert_eq!(b.tail_seq, 64); + // 100 lives in bucket 1 (64..128); tail starts `guard` (10% of + // 1024 → 128) below its bucket start, saturating at 0. + assert_eq!(b.tail_seq, 0); assert_eq!(b.head_seq, 100); } + /// A late acquire within the guard window (10% of capacity behind the + /// newest acquire) must land at or above the tail and be tracked. + #[test] + fn buckets_out_of_order_acquire_within_guard() { + // guard = (1024/10).next_multiple_of(64) = 128. + let mut b = Buckets::new(64, 1024); + b.acquire(0); + b.release(0, ""); + b.acquire(300); + // Tail rolled over the released bucket but held 128 back from + // bucket_start(300) = 256. + assert_eq!(b.tail_seq, 128); + // Late low acquire inside the window: tracked, not dropped. + b.acquire(200); + assert!(b.tail_seq <= 200); + b.release(200, ""); + b.release(300, ""); + } + + /// An acquire below the tail (out-of-order beyond the guard window) + /// must not be counted: bucket_index aliases behind-tail seqs onto + /// live buckets and the matching release is dropped, so counting + /// would permanently stall the tail on a phantom holder. + #[test] + fn buckets_acquire_below_tail_dropped_without_corruption() { + let mut b = Buckets::new(64, 1024); // guard 128, lag threshold 921 + b.acquire(0); + b.release(0, ""); + b.acquire(1000); // forces tail well past bucket 0 (tail = 832) + let tail = b.tail_seq; + assert!(tail >= 128); + + // Below-tail acquire: dropped (warn), tail untouched. + b.acquire(1); + assert_eq!(b.tail_seq, tail); + // Its release is the existing below-tail no-op. + b.release(1, ""); + + // seq 1 aliases onto the same ring bucket as seqs 1024..1088; had + // the acquire been counted, the tail would stall there forever + // (head - tail stays under the lag threshold, so no force-evict). + b.release(1000, ""); + b.acquire(1600); + assert!(b.tail_seq > 1024, "tail stalled at {} on a phantom holder", b.tail_seq); + } + #[test] fn buckets_release_advances_tail_when_head_is_ahead() { let mut b = Buckets::new(64, 1024);