Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 43 additions & 14 deletions crates/flux-communication/src/queue/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,26 +169,22 @@ impl QueueHeader {
return &raw const self.group_cursors[i].cursor;
}
}

// 2. Empty slot
for i in 0..MAX_GROUPS {
// 2. Empty slot
if self.group_labels[i] == ArrayStr::<GROUP_LABEL_LEN>::new() {
self.group_labels[i] = key;
self.group_cursors[i].cursor.store(0, Ordering::Relaxed);
self.release_group_lock();
return &raw const self.group_cursors[i].cursor;
}
}

// 3. Slot owned by a dead process — reclaim it
for i in 0..MAX_GROUPS {
if let Some(pid) = pid_from_label(self.group_labels[i].as_str()) {
if !is_pid_alive(pid) {
self.group_labels[i] = key;
self.group_cursors[i].cursor.store(0, Ordering::Relaxed);
self.release_group_lock();
return &raw const self.group_cursors[i].cursor;
}
// 3. Slot owned by a dead process — reclaim it
if let Some(pid) = pid_from_label(self.group_labels[i].as_str()) &&
!is_pid_alive(pid)
{
self.group_labels[i] = key;
self.group_cursors[i].cursor.store(0, Ordering::Relaxed);
self.release_group_lock();
return &raw const self.group_cursors[i].cursor;
}
Comment thread
devin-ai-integration[bot] marked this conversation as resolved.
}
Comment thread
louisponet marked this conversation as resolved.

Expand All @@ -214,14 +210,37 @@ impl QueueHeader {
return out;
}

if !label.is_empty() {
if !label.is_empty() &&
pid_from_label(self.group_labels[i].as_str()).is_none_or(is_pid_alive)
{
Comment thread
louisponet marked this conversation as resolved.
let label = label.as_str();
let cursor = self.group_cursors[i].cursor.load(Ordering::Relaxed);
out.push((label, cursor));
}
}
out
}

pub fn max_writable_msgs_without_speeding_past(&self) -> usize {
let mut min_cursor = self.count.load(Ordering::Relaxed);
for i in 0..MAX_GROUPS {
let label = &self.group_labels[i];

// Guard: if the stored length exceeds the fixed-size buffer the
// memory is uninitialised / from an incompatible header layout.
if label.len() > GROUP_LABEL_LEN {
return self.len();
}

if !label.is_empty() &&
pid_from_label(self.group_labels[i].as_str()).is_none_or(is_pid_alive)
{
min_cursor = min_cursor
.min(self.group_cursors[i].cursor.load(Ordering::Relaxed).saturating_sub(1));
Comment thread
louisponet marked this conversation as resolved.
}
}
self.len().saturating_sub(self.count.load(Ordering::Relaxed).saturating_sub(min_cursor))
}
}

pub(crate) fn shmem_map_create_or_open(flink_path: &Path, size: usize) -> (*mut u8, bool, usize) {
Expand Down Expand Up @@ -508,6 +527,9 @@ impl<T: Copy> InnerQueue<T> {
unsafe { (&*v).validate(len) }?;
Ok(v)
}
pub fn max_writable_msgs_without_speeding_past(&self) -> usize {
self.header.max_writable_msgs_without_speeding_past()
}
}

#[allow(clippy::non_send_fields_in_send_ty)]
Expand Down Expand Up @@ -596,6 +618,10 @@ impl<T: Copy> Queue<T> {
let head = self.count();
unsafe { (*cursor).fetch_max(head, Ordering::Relaxed) };
}

pub fn active_groups(&self) -> Vec<(&str, usize)> {
self.header.active_groups()
}
}

unsafe impl<T> Send for Queue<T> {}
Expand Down Expand Up @@ -644,6 +670,9 @@ impl<T: Copy> Producer<T> {
self.queue.produce(msg)
}
}
pub fn max_writable_msgs_without_speeding_past(&self) -> usize {
self.queue.max_writable_msgs_without_speeding_past()
}

#[inline]
pub fn produce_without_first(&self, msg: &T) -> usize {
Expand Down
52 changes: 41 additions & 11 deletions crates/flux-communication/src/queue/tests_basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,15 +211,45 @@ fn dead_pid_slot_reclaimed() {
let cursor = header.find_or_insert_group(dead_pid_label);
unsafe { &*cursor }.store(42, Ordering::Relaxed);

assert_eq!(header.active_groups().len(), 1);

// Requesting a new group should reclaim the dead-PID slot once all empty
// slots are exhausted. But because the first 255 slots are empty, the
// new label will simply take one of those. To actually test reclamation,
// fill ALL slots first with dead-PID labels, then insert a new key.
// Instead, verify that the dead-PID slot is still there (not reclaimed
// unnecessarily) and that a new group takes an empty slot first.
let new_cursor = header.find_or_insert_group("app[1].new.broadcast");
assert_ne!(cursor, new_cursor);
assert_eq!(header.active_groups().len(), 2);
assert!(header.active_groups().is_empty());

let new_cursor = header.find_or_insert_group("app.new.broadcast");
assert_eq!(cursor, new_cursor);

let groups = header.active_groups();
assert_eq!(groups.len(), 1);
assert_eq!(groups[0].0, "app.new.broadcast");
}

#[test]
fn max_writable_msgs_matches_safe_writes_before_overwrite() {
for typ in [QueueType::SPMC, QueueType::MPMC] {
let q = Queue::new(8, typ);
let mut p = Producer::from(q);
let mut c = ConsumerBare::new_broadcast_test(q);
let mut m = 0;

for i in 0..5 {
p.produce(&i);
}
for i in 0..2 {
c.try_consume(&mut m).unwrap();
assert_eq!(m, i);
}

let writable = p.max_writable_msgs_without_speeding_past();
assert_eq!(writable, 5);

for i in 5..5 + writable {
p.produce(&i);
}
assert_eq!(p.max_writable_msgs_without_speeding_past(), 0);

for i in 2..10 {
c.try_consume(&mut m).unwrap();
assert_eq!(m, i);
}

assert!(matches!(c.try_consume(&mut m), Err(ReadError::Empty)));
}
}
44 changes: 1 addition & 43 deletions crates/flux-network/src/tcp/connector.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::net::SocketAddr;

use flux::spine::{SpineProducerWithDCache, SpineProducers};
use flux_timing::{Duration, Instant, Nanos, Repeater};
use flux_timing::{Duration, Nanos, Repeater};
use flux_utils::{DCachePtr, safe_panic};
use mio::{Events, Interest, Poll, Token, event::Event, net::TcpListener};
use tracing::{debug, error, warn};
Expand Down Expand Up @@ -213,46 +213,6 @@ impl ConnectionManager {
}
}

fn flush_backlogs(&mut self) {
let now = Instant::now();
let mut i = self.conns.len();
while i != 0 {
i -= 1;
let (token, ref mut variant) = self.conns[i];
let stream = match variant {
ConnectionVariant::Outbound(s) | ConnectionVariant::Inbound(s) => s,
ConnectionVariant::Listener(_) => continue,
};
if stream.has_backlog() {
if stream.drain_backlog(self.poll.registry()) == ConnState::Disconnected {
self.disconnect_at_index_pending(i);
continue;
}
if let Some((max, timeout)) = self.max_backlog {
let len = stream.backlog_len();
if len > max {
// Start or continue the exceeded-since timer.
let since = *stream.backlog_exceeded_since.get_or_insert(now);
let elapsed = now.saturating_sub(since);
if elapsed >= timeout {
warn!(
?token,
backlog = len,
max,
?elapsed,
"backlog exceeded limit for too long, disconnecting"
);
self.disconnect_at_index_pending(i);
}
} else {
// Back below threshold — reset the timer.
stream.backlog_exceeded_since = None;
}
}
}
}
}

fn connect(&mut self, addr: SocketAddr) -> Option<Token> {
let o = Token(self.next_token);
if let Some(stream) = self.try_connect(o, addr) {
Expand Down Expand Up @@ -713,7 +673,6 @@ impl TcpConnector {
o = true;
self.conn_mgr.handle_event(e, &mut handler);
}
self.conn_mgr.flush_backlogs();
o |= self.conn_mgr.drain_pending_disconnects(&mut handler);
o
}
Expand Down Expand Up @@ -747,7 +706,6 @@ impl TcpConnector {
o = true;
self.conn_mgr.handle_event_produce(e, produce, &mut on_msg);
}
self.conn_mgr.flush_backlogs();
o |= self.conn_mgr.drain_pending_disconnects(&mut |event| {
let _ = on_msg(event);
});
Expand Down
11 changes: 0 additions & 11 deletions crates/flux-network/src/tcp/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -464,17 +464,6 @@ impl TcpStream {
}
}

#[inline]
pub(crate) fn has_backlog(&self) -> bool {
!self.send_backlog.is_empty()
}

/// Number of framed messages waiting in the send backlog.
#[inline]
pub(crate) fn backlog_len(&self) -> usize {
self.send_backlog.len()
}

/// Flush queued data until kernel blocks, queue empty or we've written the
/// max bytes per iter.
/// returns connstate and whether it should be deregistered from writable
Expand Down
Loading