From 718613e09b2953ee7b917257f06b247407ccc99d Mon Sep 17 00:00:00 2001 From: Louis Ponet Date: Sun, 7 Jun 2026 09:01:09 +0100 Subject: [PATCH 1/4] better handling of cursors --- crates/flux-communication/src/queue/mod.rs | 61 ++++++++++++++++------ 1 file changed, 44 insertions(+), 17 deletions(-) diff --git a/crates/flux-communication/src/queue/mod.rs b/crates/flux-communication/src/queue/mod.rs index de0ecb0..5970327 100644 --- a/crates/flux-communication/src/queue/mod.rs +++ b/crates/flux-communication/src/queue/mod.rs @@ -162,33 +162,27 @@ impl QueueHeader { self.acquire_group_lock(); - // 1. Exact match — reuse the existing slot for i in 0..MAX_GROUPS { + // 1. Exact match — reuse the existing slot if self.group_labels[i] == key { self.release_group_lock(); return &raw const self.group_cursors[i].cursor; } - } - - // 2. Empty slot - for i in 0..MAX_GROUPS { + // 2. Empty slot if self.group_labels[i] == ArrayStr::::new() { self.group_labels[i] = key; self.group_cursors[i].cursor.store(0, Ordering::Relaxed); self.release_group_lock(); return &raw const self.group_cursors[i].cursor; } - } - - // 3. Slot owned by a dead process — reclaim it - for i in 0..MAX_GROUPS { - if let Some(pid) = pid_from_label(self.group_labels[i].as_str()) { - if !is_pid_alive(pid) { - self.group_labels[i] = key; - self.group_cursors[i].cursor.store(0, Ordering::Relaxed); - self.release_group_lock(); - return &raw const self.group_cursors[i].cursor; - } + // 3. Slot owned by a dead process — reclaim it + if let Some(pid) = pid_from_label(self.group_labels[i].as_str()) && + !is_pid_alive(pid) + { + self.group_labels[i] = key; + self.group_cursors[i].cursor.store(0, Ordering::Relaxed); + self.release_group_lock(); + return &raw const self.group_cursors[i].cursor; } } @@ -214,7 +208,9 @@ impl QueueHeader { return out; } - if !label.is_empty() { + if !label.is_empty() && + pid_from_label(self.group_labels[i].as_str()).is_none_or(is_pid_alive) + { let label = label.as_str(); let cursor = self.group_cursors[i].cursor.load(Ordering::Relaxed); out.push((label, cursor)); @@ -222,6 +218,27 @@ impl QueueHeader { } out } + + pub fn max_writable_msgs_without_speeding_past(&self) -> usize { + let mut min_cursor = self.count.load(Ordering::Relaxed); + for i in 0..MAX_GROUPS { + let label = &self.group_labels[i]; + + // Guard: if the stored length exceeds the fixed-size buffer the + // memory is uninitialised / from an incompatible header layout. + if label.len() > GROUP_LABEL_LEN { + return self.len(); + } + + if !label.is_empty() && + pid_from_label(self.group_labels[i].as_str()).is_none_or(is_pid_alive) + { + min_cursor = + min_cursor.min(self.group_cursors[i].cursor.load(Ordering::Relaxed) - 1); + } + } + self.len().saturating_sub(self.count.load(Ordering::Relaxed).saturating_sub(min_cursor)) + } } pub(crate) fn shmem_map_create_or_open(flink_path: &Path, size: usize) -> (*mut u8, bool, usize) { @@ -508,6 +525,9 @@ impl InnerQueue { unsafe { (&*v).validate(len) }?; Ok(v) } + pub fn max_writable_msgs_without_speeding_past(&self) -> usize { + self.header.max_writable_msgs_without_speeding_past() + } } #[allow(clippy::non_send_fields_in_send_ty)] @@ -596,6 +616,10 @@ impl Queue { let head = self.count(); unsafe { (*cursor).fetch_max(head, Ordering::Relaxed) }; } + + pub fn active_groups(&self) -> Vec<(&str, usize)> { + self.header.active_groups() + } } unsafe impl Send for Queue {} @@ -644,6 +668,9 @@ impl Producer { self.queue.produce(msg) } } + pub fn max_writable_msgs_without_speeding_past(&self) -> usize { + self.queue.max_writable_msgs_without_speeding_past() + } #[inline] pub fn produce_without_first(&self, msg: &T) -> usize { From 5f3cb210f8eb00c4b7898d75cadd5d88d9841b19 Mon Sep 17 00:00:00 2001 From: Louis Ponet Date: Sun, 7 Jun 2026 10:42:05 +0100 Subject: [PATCH 2/4] fix --- crates/flux-communication/src/queue/mod.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/crates/flux-communication/src/queue/mod.rs b/crates/flux-communication/src/queue/mod.rs index 5970327..1b8eafe 100644 --- a/crates/flux-communication/src/queue/mod.rs +++ b/crates/flux-communication/src/queue/mod.rs @@ -162,12 +162,14 @@ impl QueueHeader { self.acquire_group_lock(); + // 1. Exact match — reuse the existing slot for i in 0..MAX_GROUPS { - // 1. Exact match — reuse the existing slot if self.group_labels[i] == key { self.release_group_lock(); return &raw const self.group_cursors[i].cursor; } + } + for i in 0..MAX_GROUPS { // 2. Empty slot if self.group_labels[i] == ArrayStr::::new() { self.group_labels[i] = key; @@ -233,8 +235,8 @@ impl QueueHeader { if !label.is_empty() && pid_from_label(self.group_labels[i].as_str()).is_none_or(is_pid_alive) { - min_cursor = - min_cursor.min(self.group_cursors[i].cursor.load(Ordering::Relaxed) - 1); + min_cursor = min_cursor + .min(self.group_cursors[i].cursor.load(Ordering::Relaxed).saturating_sub(1)); } } self.len().saturating_sub(self.count.load(Ordering::Relaxed).saturating_sub(min_cursor)) From 4af87abed6e72fc021c5dece6eaf8f3907624d8c Mon Sep 17 00:00:00 2001 From: Louis Ponet Date: Sun, 7 Jun 2026 11:17:17 +0100 Subject: [PATCH 3/4] test(queue): cover active groups and cursor capacity Update the dead-PID group test for active_groups filtering and the intended single-loop slot reclamation behavior. Add a regression test showing max_writable_msgs_without_speeding_past reports the exact number of writes that can occur before a broadcast consumer would be overwritten. --- .../src/queue/tests_basic.rs | 52 +++++++++++++++---- 1 file changed, 41 insertions(+), 11 deletions(-) diff --git a/crates/flux-communication/src/queue/tests_basic.rs b/crates/flux-communication/src/queue/tests_basic.rs index 34fc2fd..17520be 100644 --- a/crates/flux-communication/src/queue/tests_basic.rs +++ b/crates/flux-communication/src/queue/tests_basic.rs @@ -211,15 +211,45 @@ fn dead_pid_slot_reclaimed() { let cursor = header.find_or_insert_group(dead_pid_label); unsafe { &*cursor }.store(42, Ordering::Relaxed); - assert_eq!(header.active_groups().len(), 1); - - // Requesting a new group should reclaim the dead-PID slot once all empty - // slots are exhausted. But because the first 255 slots are empty, the - // new label will simply take one of those. To actually test reclamation, - // fill ALL slots first with dead-PID labels, then insert a new key. - // Instead, verify that the dead-PID slot is still there (not reclaimed - // unnecessarily) and that a new group takes an empty slot first. - let new_cursor = header.find_or_insert_group("app[1].new.broadcast"); - assert_ne!(cursor, new_cursor); - assert_eq!(header.active_groups().len(), 2); + assert!(header.active_groups().is_empty()); + + let new_cursor = header.find_or_insert_group("app.new.broadcast"); + assert_eq!(cursor, new_cursor); + + let groups = header.active_groups(); + assert_eq!(groups.len(), 1); + assert_eq!(groups[0].0, "app.new.broadcast"); +} + +#[test] +fn max_writable_msgs_matches_safe_writes_before_overwrite() { + for typ in [QueueType::SPMC, QueueType::MPMC] { + let q = Queue::new(8, typ); + let mut p = Producer::from(q); + let mut c = ConsumerBare::new_broadcast_test(q); + let mut m = 0; + + for i in 0..5 { + p.produce(&i); + } + for i in 0..2 { + c.try_consume(&mut m).unwrap(); + assert_eq!(m, i); + } + + let writable = p.max_writable_msgs_without_speeding_past(); + assert_eq!(writable, 5); + + for i in 5..5 + writable { + p.produce(&i); + } + assert_eq!(p.max_writable_msgs_without_speeding_past(), 0); + + for i in 2..10 { + c.try_consume(&mut m).unwrap(); + assert_eq!(m, i); + } + + assert!(matches!(c.try_consume(&mut m), Err(ReadError::Empty))); + } } From f91a1f38a3f368c97c90f94b0b7fa5f6182301b4 Mon Sep 17 00:00:00 2001 From: Louis Ponet Date: Sun, 7 Jun 2026 11:43:18 +0100 Subject: [PATCH 4/4] don't drain backlogs forcibly --- crates/flux-network/src/tcp/connector.rs | 44 +----------------------- crates/flux-network/src/tcp/stream.rs | 11 ------ 2 files changed, 1 insertion(+), 54 deletions(-) diff --git a/crates/flux-network/src/tcp/connector.rs b/crates/flux-network/src/tcp/connector.rs index b2152e7..8001abb 100644 --- a/crates/flux-network/src/tcp/connector.rs +++ b/crates/flux-network/src/tcp/connector.rs @@ -1,7 +1,7 @@ use std::net::SocketAddr; use flux::spine::{SpineProducerWithDCache, SpineProducers}; -use flux_timing::{Duration, Instant, Nanos, Repeater}; +use flux_timing::{Duration, Nanos, Repeater}; use flux_utils::{DCachePtr, safe_panic}; use mio::{Events, Interest, Poll, Token, event::Event, net::TcpListener}; use tracing::{debug, error, warn}; @@ -213,46 +213,6 @@ impl ConnectionManager { } } - fn flush_backlogs(&mut self) { - let now = Instant::now(); - let mut i = self.conns.len(); - while i != 0 { - i -= 1; - let (token, ref mut variant) = self.conns[i]; - let stream = match variant { - ConnectionVariant::Outbound(s) | ConnectionVariant::Inbound(s) => s, - ConnectionVariant::Listener(_) => continue, - }; - if stream.has_backlog() { - if stream.drain_backlog(self.poll.registry()) == ConnState::Disconnected { - self.disconnect_at_index_pending(i); - continue; - } - if let Some((max, timeout)) = self.max_backlog { - let len = stream.backlog_len(); - if len > max { - // Start or continue the exceeded-since timer. - let since = *stream.backlog_exceeded_since.get_or_insert(now); - let elapsed = now.saturating_sub(since); - if elapsed >= timeout { - warn!( - ?token, - backlog = len, - max, - ?elapsed, - "backlog exceeded limit for too long, disconnecting" - ); - self.disconnect_at_index_pending(i); - } - } else { - // Back below threshold — reset the timer. - stream.backlog_exceeded_since = None; - } - } - } - } - } - fn connect(&mut self, addr: SocketAddr) -> Option { let o = Token(self.next_token); if let Some(stream) = self.try_connect(o, addr) { @@ -713,7 +673,6 @@ impl TcpConnector { o = true; self.conn_mgr.handle_event(e, &mut handler); } - self.conn_mgr.flush_backlogs(); o |= self.conn_mgr.drain_pending_disconnects(&mut handler); o } @@ -747,7 +706,6 @@ impl TcpConnector { o = true; self.conn_mgr.handle_event_produce(e, produce, &mut on_msg); } - self.conn_mgr.flush_backlogs(); o |= self.conn_mgr.drain_pending_disconnects(&mut |event| { let _ = on_msg(event); }); diff --git a/crates/flux-network/src/tcp/stream.rs b/crates/flux-network/src/tcp/stream.rs index 48a57c9..d7124b7 100644 --- a/crates/flux-network/src/tcp/stream.rs +++ b/crates/flux-network/src/tcp/stream.rs @@ -464,17 +464,6 @@ impl TcpStream { } } - #[inline] - pub(crate) fn has_backlog(&self) -> bool { - !self.send_backlog.is_empty() - } - - /// Number of framed messages waiting in the send backlog. - #[inline] - pub(crate) fn backlog_len(&self) -> usize { - self.send_backlog.len() - } - /// Flush queued data until kernel blocks, queue empty or we've written the /// max bytes per iter. /// returns connstate and whether it should be deregistered from writable