diff --git a/crates/flux-network/src/tcp/connector.rs b/crates/flux-network/src/tcp/connector.rs index 9a8ab10..a47f6a9 100644 --- a/crates/flux-network/src/tcp/connector.rs +++ b/crates/flux-network/src/tcp/connector.rs @@ -8,7 +8,10 @@ use tracing::{debug, error, warn}; use crate::tcp::{ ConnState, TcpStream, TcpTelemetry, - stream::{DEFAULT_TCP_USER_TIMEOUT_MS, set_socket_buf_size, set_user_timeout}, + stream::{ + DEFAULT_TCP_USER_TIMEOUT_MS, FRAME_HEADER_SIZE, set_socket_buf_size, set_user_timeout, + write_frame_header, + }, }; #[derive(Clone, Copy, Debug)] @@ -74,6 +77,12 @@ struct ConnectionManager { // Outbound connections that completed during maybe_reconnect, drained in poll_with. reconnected_to: Vec, next_token: usize, + + /// Scratch buffers for [`SendBehavior::Broadcast`]: the frame is serialised + /// once per broadcast and the identical bytes are written to every + /// connection. + bcast_header: [u8; FRAME_HEADER_SIZE], + bcast_payload: Vec, } impl Default for ConnectionManager { fn default() -> Self { @@ -91,6 +100,8 @@ impl Default for ConnectionManager { reconnected_to: Vec::with_capacity(10), poll: Poll::new().expect("couldn't set up a poll for tcp connector"), next_token: 0, + bcast_header: [0; FRAME_HEADER_SIZE], + bcast_payload: Vec::with_capacity(TcpStream::SEND_BUF_SIZE), } } } @@ -133,14 +144,27 @@ impl ConnectionManager { where F: Fn(&mut Vec), { + // Serialise the frame ONCE for the whole fan-out, then hand the + // identical bytes to every connection. A single send-ts is shared + // across all connections for the broadcast. + self.bcast_payload.clear(); + serialise(&mut self.bcast_payload); + if self.bcast_payload.is_empty() { + return; + } + write_frame_header(&mut self.bcast_header, self.bcast_payload.len(), Nanos::now()); + let mut i = self.conns.len(); while i != 0 { i -= 1; match &mut self.conns[i].1 { ConnectionVariant::Outbound(tcp_connection) | ConnectionVariant::Inbound(tcp_connection) => { - if tcp_connection.write_or_enqueue_with(self.poll.registry(), serialise) == - ConnState::Disconnected + if tcp_connection.write_or_enqueue_shared( + self.poll.registry(), + &self.bcast_header, + &self.bcast_payload, + ) == ConnState::Disconnected { self.disconnect_at_index(i); } diff --git a/crates/flux-network/src/tcp/stream.rs b/crates/flux-network/src/tcp/stream.rs index 7749320..48a57c9 100644 --- a/crates/flux-network/src/tcp/stream.rs +++ b/crates/flux-network/src/tcp/stream.rs @@ -54,10 +54,35 @@ const LEN_HEADER_SIZE: usize = core::mem::size_of::(); /// Nanos timestamp when the sender finished serialising and handed bytes to /// kernel or enqueued in backlog. const TS_HEADER_SIZE: usize = core::mem::size_of::(); -const FRAME_HEADER_SIZE: usize = LEN_HEADER_SIZE + TS_HEADER_SIZE; +pub(crate) const FRAME_HEADER_SIZE: usize = LEN_HEADER_SIZE + TS_HEADER_SIZE; // TODO: might need to tweak these const RX_BUF_SIZE: usize = 32 * 1024; +/// Write the `[len][ts]` frame header for a `payload_len`-byte payload. +/// +/// Shared by the per-stream serialise path and the broadcast path +#[inline] +pub(crate) fn write_frame_header( + header: &mut [u8; FRAME_HEADER_SIZE], + payload_len: usize, + ts: Nanos, +) { + header[..LEN_HEADER_SIZE].copy_from_slice(&(payload_len as u32).to_le_bytes()); + header[LEN_HEADER_SIZE..FRAME_HEADER_SIZE].copy_from_slice(&ts.0.to_le_bytes()); +} + +/// Allocate a contiguous `header + payload` frame for the send backlog. +/// +/// Only hit when a socket blocks; the happy path writes `header` and `payload` +/// as separate `IoSlice`s and never concatenates. +#[inline] +fn build_frame_vec(header: &[u8; FRAME_HEADER_SIZE], payload: &[u8]) -> Vec { + let mut v = Vec::with_capacity(FRAME_HEADER_SIZE + payload.len()); + v.extend_from_slice(header); + v.extend_from_slice(payload); + v +} + /// Response type for all external calls. /// /// `Alive` means the connection is still usable. @@ -353,6 +378,69 @@ impl TcpStream { } } + /// Like [`write_or_enqueue_with`] but writes a frame that was already + /// serialised by the caller (`header` + `payload`), instead of running a + /// per-stream serialise closure. + /// + /// Used by the broadcast path: the frame is built once for the whole + /// fan-out and the identical bytes are handed to every connection. The + /// happy path writes `header` and `payload` as two `IoSlice`s with no + /// per-connection copy; only a blocked socket pays for a backlog copy. + #[inline] + pub(crate) fn write_or_enqueue_shared( + &mut self, + registry: &Registry, + header: &[u8; FRAME_HEADER_SIZE], + payload: &[u8], + ) -> ConnState { + if !self.send_backlog.is_empty() { + if self.drain_backlog(registry) == ConnState::Disconnected { + return ConnState::Disconnected; + } + if !self.send_backlog.is_empty() { + let data = self.alloc_shared_vec(header, payload); + return self.enqueue_back(registry, data); + } + // backlog drained, fall through to direct write + } + + match self.stream.write_vectored(&[IoSlice::new(header.as_slice()), IoSlice::new(payload)]) + { + Ok(0) => { + warn!("tcp: stream failed to write, disconnecting"); + ConnState::Disconnected + } + Ok(n) if n == payload.len() + FRAME_HEADER_SIZE => ConnState::Alive, + Ok(n) => { + let data = self.alloc_shared_vec(header, payload); + self.send_cursor = n; + self.enqueue_back(registry, data) + } + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + let data = self.alloc_shared_vec(header, payload); + self.enqueue_back(registry, data) + } + Err(err) => { + warn!(?err, "tcp: stream write fail"); + ConnState::Disconnected + } + } + } + + /// Backlog allocation for the shared-frame path. Mirrors [`alloc_vec`] but + /// sources the frame from borrowed slices rather than `self.send_buf`. + #[inline] + fn alloc_shared_vec(&mut self, header: &[u8; FRAME_HEADER_SIZE], payload: &[u8]) -> Vec { + if self.timers.is_some() { + let t0 = Nanos::now(); + let v = build_frame_vec(header, payload); + self.timers.as_mut().unwrap().alloc.emit_latency_from_nanos(t0, Nanos::now()); + v + } else { + build_frame_vec(header, payload) + } + } + /// Allocate `send_buf[start..end]` to vec. Times the alloc if telemetry /// enabled. #[inline] @@ -597,11 +685,7 @@ impl TcpStream { { self.send_buf.clear(); serialise(&mut self.send_buf); - // write frame header - self.header_buf[..LEN_HEADER_SIZE] - .copy_from_slice(&(self.send_buf.len() as u32).to_le_bytes()); - self.header_buf[LEN_HEADER_SIZE..FRAME_HEADER_SIZE] - .copy_from_slice(&Nanos::now().0.to_le_bytes()); + write_frame_header(&mut self.header_buf, self.send_buf.len(), Nanos::now()); } pub fn close(&mut self, registry: &Registry) {