From 895a7c736f11d414941a1ab5b0d78b9d9e3acb71 Mon Sep 17 00:00:00 2001 From: Louis Ponet Date: Tue, 31 Mar 2026 04:50:05 +0100 Subject: [PATCH] fix(tcp): auto-grow epoll event buffer, user_timeout on inbound, configurable nodelay, EOF logging Four fixes/improvements for TcpConnector: 1. Auto-grow epoll event buffer to prevent connection starvation. 2. Apply TCP_USER_TIMEOUT to accepted inbound connections (was only set on outbound). 3. Make TCP_NODELAY configurable via with_nodelay(bool). Default remains true (Nagle disabled). 4. Add debug logging to silent EOF paths in read_frame. Previously, Ok(0) during header/payload reads returned Disconnected with no log, making it impossible to distinguish peer-closed from I/O errors. Now logs peer address and read progress. --- crates/flux-network/src/tcp/connector.rs | 65 +++++++++++++++++++----- crates/flux-network/src/tcp/stream.rs | 19 ++++++- 2 files changed, 69 insertions(+), 15 deletions(-) diff --git a/crates/flux-network/src/tcp/connector.rs b/crates/flux-network/src/tcp/connector.rs index ea0f4a7..2544ef5 100644 --- a/crates/flux-network/src/tcp/connector.rs +++ b/crates/flux-network/src/tcp/connector.rs @@ -4,7 +4,7 @@ use flux::spine::{SpineProducerWithDCache, SpineProducers}; use flux_timing::{Duration, Instant, Nanos, Repeater}; use flux_utils::{DCachePtr, safe_panic}; use mio::{Events, Interest, Poll, Token, event::Event, net::TcpListener}; -use tracing::{debug, error, warn}; +use tracing::{debug, error, info, warn}; use crate::tcp::{ ConnState, TcpStream, TcpTelemetry, @@ -61,6 +61,7 @@ struct ConnectionManager { telemetry: TcpTelemetry, socket_buf_size: Option, user_timeout_ms: u32, + nodelay: bool, dcache: Option, /// When set, connections whose send backlog exceeds `max` messages for /// longer than `timeout` are disconnected (outbound scheduled for @@ -82,6 +83,7 @@ impl Default for ConnectionManager { telemetry: TcpTelemetry::Disabled, socket_buf_size: None, user_timeout_ms: DEFAULT_TCP_USER_TIMEOUT_MS, + nodelay: true, dcache: None, max_backlog: None, to_be_reconnected: Vec::with_capacity(10), @@ -305,7 +307,7 @@ impl ConnectionManager { return None; }; new_stream - .set_nodelay(true) + .set_nodelay(self.nodelay) .inspect_err(|e| { error!("couldn't setup nodelay for tcp stream for {addr}: {e}"); }) @@ -385,6 +387,7 @@ impl ConnectionManager { if let Some(size) = self.socket_buf_size { set_socket_buf_size(&stream, size); } + set_user_timeout(&stream, self.user_timeout_ms); let token = Token(self.next_token); if let Err(e) = self.poll.registry().register(&mut stream, token, Interest::READABLE) @@ -393,7 +396,7 @@ impl ConnectionManager { let _ = stream.shutdown(std::net::Shutdown::Both); continue; }; - if let Err(e) = stream.set_nodelay(true) { + if let Err(e) = stream.set_nodelay(self.nodelay) { error!("couldn't set nodelay on stream to {addr}: {e}"); continue; } @@ -470,6 +473,7 @@ impl ConnectionManager { if let Some(size) = self.socket_buf_size { set_socket_buf_size(&stream, size); } + set_user_timeout(&stream, self.user_timeout_ms); let token = Token(self.next_token); if let Err(e) = self.poll.registry().register(&mut stream, token, Interest::READABLE) @@ -478,7 +482,7 @@ impl ConnectionManager { let _ = stream.shutdown(std::net::Shutdown::Both); continue; }; - if let Err(e) = stream.set_nodelay(true) { + if let Err(e) = stream.set_nodelay(self.nodelay) { error!("couldn't set nodelay on stream to {addr}: {e}"); continue; } @@ -549,10 +553,11 @@ impl ConnectionManager { pub struct TcpConnector { events: Events, conn_mgr: ConnectionManager, + event_capacity: usize, } impl Default for TcpConnector { fn default() -> Self { - Self { events: Events::with_capacity(128), conn_mgr: ConnectionManager::default() } + Self { events: Events::with_capacity(128), conn_mgr: ConnectionManager::default(), event_capacity: 128 } } } impl TcpConnector { @@ -598,13 +603,22 @@ impl TcpConnector { self } - /// Overrides the TCP_USER_TIMEOUT socket option applied to - /// outbound connections. + /// Overrides the TCP_USER_TIMEOUT socket option applied to all + /// connections (outbound, reconnected, and accepted inbound). pub fn with_user_timeout(mut self, timeout_ms: u32) -> Self { self.conn_mgr.user_timeout_ms = timeout_ms; self } + /// Sets TCP_NODELAY on all connections. Default: `true` (Nagle disabled). + /// + /// Set to `false` to allow Nagle's algorithm to coalesce small writes, + /// which can improve throughput at the cost of latency. + pub fn with_nodelay(mut self, nodelay: bool) -> Self { + self.conn_mgr.nodelay = nodelay; + self + } + /// Sets the maximum send backlog (in framed messages) and how long it must /// stay exceeded before a connection is automatically disconnected. /// @@ -619,6 +633,29 @@ impl TcpConnector { self } + /// Sets the initial epoll event buffer capacity. + /// + /// The buffer auto-grows (doubles) whenever a `poll()` fills the current + /// capacity, so this only affects the starting size. Default: 128. + pub fn with_event_capacity(mut self, capacity: usize) -> Self { + self.event_capacity = capacity; + self.events = Events::with_capacity(capacity); + self + } + + /// If the last `poll()` returned exactly `event_capacity` events, double + /// the buffer to avoid starving connections that didn't fit. + fn maybe_grow_events(&mut self, n_events: usize) { + // mio returns at most `capacity` events; hitting the limit means + // there were likely more fds ready than we could service. + if n_events >= self.event_capacity { + let new_cap = self.event_capacity * 2; + info!(old = self.event_capacity, new = new_cap, "epoll event buffer full, growing"); + self.event_capacity = new_cap; + self.events = Events::with_capacity(new_cap); + } + } + /// Polls sockets once (non-blocking) and dispatches events via /// [`PollEvent`]. /// @@ -640,14 +677,15 @@ impl TcpConnector { safe_panic!("got error polling {e}"); return false; } - let mut o = false; + let mut n_events = 0usize; for e in self.events.iter() { - o = true; + n_events += 1; self.conn_mgr.handle_event(e, &mut handler); } + self.maybe_grow_events(n_events); self.conn_mgr.flush_backlogs(); - o + n_events > 0 } /// Like [`poll_with`] but for dcache-backed streams. The handler receives @@ -671,13 +709,14 @@ impl TcpConnector { safe_panic!("got error polling {e}"); return false; } - let mut o = false; + let mut n_events = 0usize; for e in self.events.iter() { - o = true; + n_events += 1; self.conn_mgr.handle_event_produce(e, produce, &mut on_msg); } + self.maybe_grow_events(n_events); self.conn_mgr.flush_backlogs(); - o + n_events > 0 } /// Writes immediately or enqueues bytes for later sending. diff --git a/crates/flux-network/src/tcp/stream.rs b/crates/flux-network/src/tcp/stream.rs index ef20f08..3da0a00 100644 --- a/crates/flux-network/src/tcp/stream.rs +++ b/crates/flux-network/src/tcp/stream.rs @@ -430,7 +430,14 @@ impl TcpStream { RxState::ReadingHeader { mut buf, mut have } => { while have < FRAME_HEADER_SIZE { match self.stream.read(&mut buf[have..]) { - Ok(0) => return ReadOutcome::Disconnected, + Ok(0) => { + debug!( + peer = %self.peer_addr, + have, + "tcp: connection closed by peer (reading header)", + ); + return ReadOutcome::Disconnected; + } Ok(n) => { have += n; @@ -511,7 +518,15 @@ impl TcpStream { self.stream.read(&mut buf[offset..msg_len]) }; match result { - Ok(0) => return ReadOutcome::Disconnected, + Ok(0) => { + debug!( + peer = %self.peer_addr, + msg_len, + offset, + "tcp: connection closed by peer (reading payload)", + ); + return ReadOutcome::Disconnected; + } Ok(n) => { offset += n; if offset == msg_len {