From 6856143cec50a5e8138f6e90d5be533c6ebb93d2 Mon Sep 17 00:00:00 2001 From: eltitanb Date: Mon, 1 Jun 2026 16:18:07 +0100 Subject: [PATCH 1/2] broadcast --- crates/flux-network/src/tcp/connector.rs | 62 ++++++++++----- crates/flux-network/src/tcp/stream.rs | 96 ++++++++++++++++++++++-- 2 files changed, 133 insertions(+), 25 deletions(-) diff --git a/crates/flux-network/src/tcp/connector.rs b/crates/flux-network/src/tcp/connector.rs index 9a8ab10..0ba8992 100644 --- a/crates/flux-network/src/tcp/connector.rs +++ b/crates/flux-network/src/tcp/connector.rs @@ -8,7 +8,10 @@ use tracing::{debug, error, warn}; use crate::tcp::{ ConnState, TcpStream, TcpTelemetry, - stream::{DEFAULT_TCP_USER_TIMEOUT_MS, set_socket_buf_size, set_user_timeout}, + stream::{ + DEFAULT_TCP_USER_TIMEOUT_MS, FRAME_HEADER_SIZE, set_socket_buf_size, set_user_timeout, + write_frame_header, + }, }; #[derive(Clone, Copy, Debug)] @@ -74,6 +77,12 @@ struct ConnectionManager { // Outbound connections that completed during maybe_reconnect, drained in poll_with. reconnected_to: Vec, next_token: usize, + + /// Scratch buffers for [`SendBehavior::Broadcast`]: the frame is serialised + /// once per broadcast and the identical bytes are written to every + /// connection. + bcast_header: [u8; FRAME_HEADER_SIZE], + bcast_payload: Vec, } impl Default for ConnectionManager { fn default() -> Self { @@ -91,6 +100,8 @@ impl Default for ConnectionManager { reconnected_to: Vec::with_capacity(10), poll: Poll::new().expect("couldn't set up a poll for tcp connector"), next_token: 0, + bcast_header: [0; FRAME_HEADER_SIZE], + bcast_payload: Vec::with_capacity(TcpStream::SEND_BUF_SIZE), } } } @@ -133,14 +144,27 @@ impl ConnectionManager { where F: Fn(&mut Vec), { + // Serialise the frame ONCE for the whole fan-out, then hand the + // identical bytes to every connection. A single send-ts is shared + // across all connections for the broadcast. + self.bcast_payload.clear(); + serialise(&mut self.bcast_payload); + if self.bcast_payload.is_empty() { + return; + } + write_frame_header(&mut self.bcast_header, self.bcast_payload.len(), Nanos::now()); + let mut i = self.conns.len(); while i != 0 { i -= 1; match &mut self.conns[i].1 { - ConnectionVariant::Outbound(tcp_connection) | - ConnectionVariant::Inbound(tcp_connection) => { - if tcp_connection.write_or_enqueue_with(self.poll.registry(), serialise) == - ConnState::Disconnected + ConnectionVariant::Outbound(tcp_connection) + | ConnectionVariant::Inbound(tcp_connection) => { + if tcp_connection.write_or_enqueue_shared( + self.poll.registry(), + &self.bcast_header, + &self.bcast_payload, + ) == ConnState::Disconnected { self.disconnect_at_index(i); } @@ -160,10 +184,10 @@ impl ConnectionManager { SendBehavior::Single(token) => { if let Some(i) = self.conns.iter().position(|(t, _)| *t == token) { match &mut self.conns[i].1 { - ConnectionVariant::Outbound(tcp_connection) | - ConnectionVariant::Inbound(tcp_connection) => { - if tcp_connection.write_or_enqueue_with(self.poll.registry(), serialise) == - ConnState::Disconnected + ConnectionVariant::Outbound(tcp_connection) + | ConnectionVariant::Inbound(tcp_connection) => { + if tcp_connection.write_or_enqueue_with(self.poll.registry(), serialise) + == ConnState::Disconnected { tracing::warn!("issue when writing to {token:?} disconnecting"); self.disconnect_at_index(i); @@ -230,8 +254,8 @@ impl ConnectionManager { self.telemetry, self.dcache.is_some(), ); - if let Some(msg) = &self.on_connect_msg && - tcp_stream.write_or_enqueue_with(self.poll.registry(), |buf: &mut Vec| { + if let Some(msg) = &self.on_connect_msg + && tcp_stream.write_or_enqueue_with(self.poll.registry(), |buf: &mut Vec| { buf.extend_from_slice(msg); }) == ConnState::Disconnected { @@ -368,8 +392,8 @@ impl ConnectionManager { loop { match &mut self.conns[stream_id].1 { - ConnectionVariant::Outbound(tcp_connection) | - ConnectionVariant::Inbound(tcp_connection) => { + ConnectionVariant::Outbound(tcp_connection) + | ConnectionVariant::Inbound(tcp_connection) => { if tcp_connection.poll_with( self.poll.registry(), e, @@ -412,8 +436,8 @@ impl ConnectionManager { self.dcache.is_some(), ); - if let Some(msg) = &self.on_connect_msg && - conn.write_or_enqueue_with( + if let Some(msg) = &self.on_connect_msg + && conn.write_or_enqueue_with( self.poll.registry(), |buf: &mut Vec| { buf.extend_from_slice(msg); @@ -452,8 +476,8 @@ impl ConnectionManager { loop { match &mut self.conns[stream_id].1 { - ConnectionVariant::Outbound(tcp_connection) | - ConnectionVariant::Inbound(tcp_connection) => { + ConnectionVariant::Outbound(tcp_connection) + | ConnectionVariant::Inbound(tcp_connection) => { let dcache = self.dcache.as_deref().expect("dcache required for poll_with_produce"); if tcp_connection.poll_with_produce( @@ -498,8 +522,8 @@ impl ConnectionManager { self.telemetry, self.dcache.is_some(), ); - if let Some(msg) = &self.on_connect_msg && - conn.write_or_enqueue_with( + if let Some(msg) = &self.on_connect_msg + && conn.write_or_enqueue_with( self.poll.registry(), |buf: &mut Vec| { buf.extend_from_slice(msg); diff --git a/crates/flux-network/src/tcp/stream.rs b/crates/flux-network/src/tcp/stream.rs index 7749320..48a57c9 100644 --- a/crates/flux-network/src/tcp/stream.rs +++ b/crates/flux-network/src/tcp/stream.rs @@ -54,10 +54,35 @@ const LEN_HEADER_SIZE: usize = core::mem::size_of::(); /// Nanos timestamp when the sender finished serialising and handed bytes to /// kernel or enqueued in backlog. const TS_HEADER_SIZE: usize = core::mem::size_of::(); -const FRAME_HEADER_SIZE: usize = LEN_HEADER_SIZE + TS_HEADER_SIZE; +pub(crate) const FRAME_HEADER_SIZE: usize = LEN_HEADER_SIZE + TS_HEADER_SIZE; // TODO: might need to tweak these const RX_BUF_SIZE: usize = 32 * 1024; +/// Write the `[len][ts]` frame header for a `payload_len`-byte payload. +/// +/// Shared by the per-stream serialise path and the broadcast path +#[inline] +pub(crate) fn write_frame_header( + header: &mut [u8; FRAME_HEADER_SIZE], + payload_len: usize, + ts: Nanos, +) { + header[..LEN_HEADER_SIZE].copy_from_slice(&(payload_len as u32).to_le_bytes()); + header[LEN_HEADER_SIZE..FRAME_HEADER_SIZE].copy_from_slice(&ts.0.to_le_bytes()); +} + +/// Allocate a contiguous `header + payload` frame for the send backlog. +/// +/// Only hit when a socket blocks; the happy path writes `header` and `payload` +/// as separate `IoSlice`s and never concatenates. +#[inline] +fn build_frame_vec(header: &[u8; FRAME_HEADER_SIZE], payload: &[u8]) -> Vec { + let mut v = Vec::with_capacity(FRAME_HEADER_SIZE + payload.len()); + v.extend_from_slice(header); + v.extend_from_slice(payload); + v +} + /// Response type for all external calls. /// /// `Alive` means the connection is still usable. @@ -353,6 +378,69 @@ impl TcpStream { } } + /// Like [`write_or_enqueue_with`] but writes a frame that was already + /// serialised by the caller (`header` + `payload`), instead of running a + /// per-stream serialise closure. + /// + /// Used by the broadcast path: the frame is built once for the whole + /// fan-out and the identical bytes are handed to every connection. The + /// happy path writes `header` and `payload` as two `IoSlice`s with no + /// per-connection copy; only a blocked socket pays for a backlog copy. + #[inline] + pub(crate) fn write_or_enqueue_shared( + &mut self, + registry: &Registry, + header: &[u8; FRAME_HEADER_SIZE], + payload: &[u8], + ) -> ConnState { + if !self.send_backlog.is_empty() { + if self.drain_backlog(registry) == ConnState::Disconnected { + return ConnState::Disconnected; + } + if !self.send_backlog.is_empty() { + let data = self.alloc_shared_vec(header, payload); + return self.enqueue_back(registry, data); + } + // backlog drained, fall through to direct write + } + + match self.stream.write_vectored(&[IoSlice::new(header.as_slice()), IoSlice::new(payload)]) + { + Ok(0) => { + warn!("tcp: stream failed to write, disconnecting"); + ConnState::Disconnected + } + Ok(n) if n == payload.len() + FRAME_HEADER_SIZE => ConnState::Alive, + Ok(n) => { + let data = self.alloc_shared_vec(header, payload); + self.send_cursor = n; + self.enqueue_back(registry, data) + } + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + let data = self.alloc_shared_vec(header, payload); + self.enqueue_back(registry, data) + } + Err(err) => { + warn!(?err, "tcp: stream write fail"); + ConnState::Disconnected + } + } + } + + /// Backlog allocation for the shared-frame path. Mirrors [`alloc_vec`] but + /// sources the frame from borrowed slices rather than `self.send_buf`. + #[inline] + fn alloc_shared_vec(&mut self, header: &[u8; FRAME_HEADER_SIZE], payload: &[u8]) -> Vec { + if self.timers.is_some() { + let t0 = Nanos::now(); + let v = build_frame_vec(header, payload); + self.timers.as_mut().unwrap().alloc.emit_latency_from_nanos(t0, Nanos::now()); + v + } else { + build_frame_vec(header, payload) + } + } + /// Allocate `send_buf[start..end]` to vec. Times the alloc if telemetry /// enabled. #[inline] @@ -597,11 +685,7 @@ impl TcpStream { { self.send_buf.clear(); serialise(&mut self.send_buf); - // write frame header - self.header_buf[..LEN_HEADER_SIZE] - .copy_from_slice(&(self.send_buf.len() as u32).to_le_bytes()); - self.header_buf[LEN_HEADER_SIZE..FRAME_HEADER_SIZE] - .copy_from_slice(&Nanos::now().0.to_le_bytes()); + write_frame_header(&mut self.header_buf, self.send_buf.len(), Nanos::now()); } pub fn close(&mut self, registry: &Registry) { From 2a373bbcd768f6ec7d62990c6023ba6ad360247f Mon Sep 17 00:00:00 2001 From: eltitanb Date: Mon, 1 Jun 2026 16:29:19 +0100 Subject: [PATCH 2/2] fmt --- crates/flux-network/src/tcp/connector.rs | 32 ++++++++++++------------ 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/crates/flux-network/src/tcp/connector.rs b/crates/flux-network/src/tcp/connector.rs index 0ba8992..a47f6a9 100644 --- a/crates/flux-network/src/tcp/connector.rs +++ b/crates/flux-network/src/tcp/connector.rs @@ -158,8 +158,8 @@ impl ConnectionManager { while i != 0 { i -= 1; match &mut self.conns[i].1 { - ConnectionVariant::Outbound(tcp_connection) - | ConnectionVariant::Inbound(tcp_connection) => { + ConnectionVariant::Outbound(tcp_connection) | + ConnectionVariant::Inbound(tcp_connection) => { if tcp_connection.write_or_enqueue_shared( self.poll.registry(), &self.bcast_header, @@ -184,10 +184,10 @@ impl ConnectionManager { SendBehavior::Single(token) => { if let Some(i) = self.conns.iter().position(|(t, _)| *t == token) { match &mut self.conns[i].1 { - ConnectionVariant::Outbound(tcp_connection) - | ConnectionVariant::Inbound(tcp_connection) => { - if tcp_connection.write_or_enqueue_with(self.poll.registry(), serialise) - == ConnState::Disconnected + ConnectionVariant::Outbound(tcp_connection) | + ConnectionVariant::Inbound(tcp_connection) => { + if tcp_connection.write_or_enqueue_with(self.poll.registry(), serialise) == + ConnState::Disconnected { tracing::warn!("issue when writing to {token:?} disconnecting"); self.disconnect_at_index(i); @@ -254,8 +254,8 @@ impl ConnectionManager { self.telemetry, self.dcache.is_some(), ); - if let Some(msg) = &self.on_connect_msg - && tcp_stream.write_or_enqueue_with(self.poll.registry(), |buf: &mut Vec| { + if let Some(msg) = &self.on_connect_msg && + tcp_stream.write_or_enqueue_with(self.poll.registry(), |buf: &mut Vec| { buf.extend_from_slice(msg); }) == ConnState::Disconnected { @@ -392,8 +392,8 @@ impl ConnectionManager { loop { match &mut self.conns[stream_id].1 { - ConnectionVariant::Outbound(tcp_connection) - | ConnectionVariant::Inbound(tcp_connection) => { + ConnectionVariant::Outbound(tcp_connection) | + ConnectionVariant::Inbound(tcp_connection) => { if tcp_connection.poll_with( self.poll.registry(), e, @@ -436,8 +436,8 @@ impl ConnectionManager { self.dcache.is_some(), ); - if let Some(msg) = &self.on_connect_msg - && conn.write_or_enqueue_with( + if let Some(msg) = &self.on_connect_msg && + conn.write_or_enqueue_with( self.poll.registry(), |buf: &mut Vec| { buf.extend_from_slice(msg); @@ -476,8 +476,8 @@ impl ConnectionManager { loop { match &mut self.conns[stream_id].1 { - ConnectionVariant::Outbound(tcp_connection) - | ConnectionVariant::Inbound(tcp_connection) => { + ConnectionVariant::Outbound(tcp_connection) | + ConnectionVariant::Inbound(tcp_connection) => { let dcache = self.dcache.as_deref().expect("dcache required for poll_with_produce"); if tcp_connection.poll_with_produce( @@ -522,8 +522,8 @@ impl ConnectionManager { self.telemetry, self.dcache.is_some(), ); - if let Some(msg) = &self.on_connect_msg - && conn.write_or_enqueue_with( + if let Some(msg) = &self.on_connect_msg && + conn.write_or_enqueue_with( self.poll.registry(), |buf: &mut Vec| { buf.extend_from_slice(msg);