Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 27 additions & 3 deletions crates/flux-network/src/tcp/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ use tracing::{debug, error, warn};

use crate::tcp::{
ConnState, TcpStream, TcpTelemetry,
stream::{DEFAULT_TCP_USER_TIMEOUT_MS, set_socket_buf_size, set_user_timeout},
stream::{
DEFAULT_TCP_USER_TIMEOUT_MS, FRAME_HEADER_SIZE, set_socket_buf_size, set_user_timeout,
write_frame_header,
},
};

#[derive(Clone, Copy, Debug)]
Expand Down Expand Up @@ -74,6 +77,12 @@ struct ConnectionManager {
// Outbound connections that completed during maybe_reconnect, drained in poll_with.
reconnected_to: Vec<Token>,
next_token: usize,

/// Scratch buffers for [`SendBehavior::Broadcast`]: the frame is serialised
/// once per broadcast and the identical bytes are written to every
/// connection.
bcast_header: [u8; FRAME_HEADER_SIZE],
bcast_payload: Vec<u8>,
}
impl Default for ConnectionManager {
fn default() -> Self {
Expand All @@ -91,6 +100,8 @@ impl Default for ConnectionManager {
reconnected_to: Vec::with_capacity(10),
poll: Poll::new().expect("couldn't set up a poll for tcp connector"),
next_token: 0,
bcast_header: [0; FRAME_HEADER_SIZE],
bcast_payload: Vec::with_capacity(TcpStream::SEND_BUF_SIZE),
}
}
}
Expand Down Expand Up @@ -133,14 +144,27 @@ impl ConnectionManager {
where
F: Fn(&mut Vec<u8>),
{
// Serialise the frame ONCE for the whole fan-out, then hand the
// identical bytes to every connection. A single send-ts is shared
// across all connections for the broadcast.
self.bcast_payload.clear();
serialise(&mut self.bcast_payload);
if self.bcast_payload.is_empty() {
return;
}
write_frame_header(&mut self.bcast_header, self.bcast_payload.len(), Nanos::now());

let mut i = self.conns.len();
while i != 0 {
i -= 1;
match &mut self.conns[i].1 {
ConnectionVariant::Outbound(tcp_connection) |
ConnectionVariant::Inbound(tcp_connection) => {
if tcp_connection.write_or_enqueue_with(self.poll.registry(), serialise) ==
ConnState::Disconnected
if tcp_connection.write_or_enqueue_shared(
self.poll.registry(),
&self.bcast_header,
&self.bcast_payload,
) == ConnState::Disconnected
{
self.disconnect_at_index(i);
}
Expand Down
96 changes: 90 additions & 6 deletions crates/flux-network/src/tcp/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,35 @@ const LEN_HEADER_SIZE: usize = core::mem::size_of::<u32>();
/// Nanos timestamp when the sender finished serialising and handed bytes to
/// kernel or enqueued in backlog.
const TS_HEADER_SIZE: usize = core::mem::size_of::<Nanos>();
const FRAME_HEADER_SIZE: usize = LEN_HEADER_SIZE + TS_HEADER_SIZE;
pub(crate) const FRAME_HEADER_SIZE: usize = LEN_HEADER_SIZE + TS_HEADER_SIZE;
// TODO: might need to tweak these
const RX_BUF_SIZE: usize = 32 * 1024;

/// Write the `[len][ts]` frame header for a `payload_len`-byte payload.
///
/// Shared by the per-stream serialise path and the broadcast path
#[inline]
pub(crate) fn write_frame_header(
header: &mut [u8; FRAME_HEADER_SIZE],
payload_len: usize,
ts: Nanos,
) {
header[..LEN_HEADER_SIZE].copy_from_slice(&(payload_len as u32).to_le_bytes());
header[LEN_HEADER_SIZE..FRAME_HEADER_SIZE].copy_from_slice(&ts.0.to_le_bytes());
}

/// Allocate a contiguous `header + payload` frame for the send backlog.
///
/// Only hit when a socket blocks; the happy path writes `header` and `payload`
/// as separate `IoSlice`s and never concatenates.
#[inline]
fn build_frame_vec(header: &[u8; FRAME_HEADER_SIZE], payload: &[u8]) -> Vec<u8> {
let mut v = Vec::with_capacity(FRAME_HEADER_SIZE + payload.len());
v.extend_from_slice(header);
v.extend_from_slice(payload);
v
}

/// Response type for all external calls.
///
/// `Alive` means the connection is still usable.
Expand Down Expand Up @@ -353,6 +378,69 @@ impl TcpStream {
}
}

/// Like [`write_or_enqueue_with`] but writes a frame that was already
/// serialised by the caller (`header` + `payload`), instead of running a
/// per-stream serialise closure.
///
/// Used by the broadcast path: the frame is built once for the whole
/// fan-out and the identical bytes are handed to every connection. The
/// happy path writes `header` and `payload` as two `IoSlice`s with no
/// per-connection copy; only a blocked socket pays for a backlog copy.
#[inline]
pub(crate) fn write_or_enqueue_shared(
&mut self,
registry: &Registry,
header: &[u8; FRAME_HEADER_SIZE],
payload: &[u8],
) -> ConnState {
if !self.send_backlog.is_empty() {
if self.drain_backlog(registry) == ConnState::Disconnected {
return ConnState::Disconnected;
}
if !self.send_backlog.is_empty() {
let data = self.alloc_shared_vec(header, payload);
return self.enqueue_back(registry, data);
}
// backlog drained, fall through to direct write
}

match self.stream.write_vectored(&[IoSlice::new(header.as_slice()), IoSlice::new(payload)])
{
Ok(0) => {
warn!("tcp: stream failed to write, disconnecting");
ConnState::Disconnected
}
Ok(n) if n == payload.len() + FRAME_HEADER_SIZE => ConnState::Alive,
Ok(n) => {
let data = self.alloc_shared_vec(header, payload);
self.send_cursor = n;
self.enqueue_back(registry, data)
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
let data = self.alloc_shared_vec(header, payload);
self.enqueue_back(registry, data)
}
Err(err) => {
warn!(?err, "tcp: stream write fail");
ConnState::Disconnected
}
}
}

/// Backlog allocation for the shared-frame path. Mirrors [`alloc_vec`] but
/// sources the frame from borrowed slices rather than `self.send_buf`.
#[inline]
fn alloc_shared_vec(&mut self, header: &[u8; FRAME_HEADER_SIZE], payload: &[u8]) -> Vec<u8> {
if self.timers.is_some() {
let t0 = Nanos::now();
let v = build_frame_vec(header, payload);
self.timers.as_mut().unwrap().alloc.emit_latency_from_nanos(t0, Nanos::now());
v
} else {
build_frame_vec(header, payload)
}
}

/// Allocate `send_buf[start..end]` to vec. Times the alloc if telemetry
/// enabled.
#[inline]
Expand Down Expand Up @@ -597,11 +685,7 @@ impl TcpStream {
{
self.send_buf.clear();
serialise(&mut self.send_buf);
// write frame header
self.header_buf[..LEN_HEADER_SIZE]
.copy_from_slice(&(self.send_buf.len() as u32).to_le_bytes());
self.header_buf[LEN_HEADER_SIZE..FRAME_HEADER_SIZE]
.copy_from_slice(&Nanos::now().0.to_le_bytes());
write_frame_header(&mut self.header_buf, self.send_buf.len(), Nanos::now());
}

pub fn close(&mut self, registry: &Registry) {
Expand Down
Loading