From 54e489f5352bbba2ac6ee66c59d8abd74768609d Mon Sep 17 00:00:00 2001 From: Caio Gondim Date: Wed, 27 May 2026 21:31:30 -0400 Subject: [PATCH] fix: bound Connection::abandoned_paths (prune on discard) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `Connection::abandoned_paths` (a `FxHashSet`) grew without bound: when a path is fully discarded, `discard_path` reclaims `paths`, `number_spaces` and `path_stats` (and the `PathDrained` timer clears `local_cid_state`), but the id was never removed from `abandoned_paths`. The set was kept only as the path-id monotonicity guard — `open_path` computed the next id from `abandoned_paths.iter().max()` to avoid reusing a discarded id — which is also what the existing `TODO(flub)` on the field anticipated ("a set together with a minimum"). Replace the lifetime-growing set with a `max_abandoned_path_id` scalar (updated where ids are inserted into the set), read that in `open_path`, and prune the set in `discard_path`. The set is now bounded by the number of concurrently abandoned-but-not-yet-drained paths, not by the lifetime abandon count, while path-id allocation stays strictly monotonic. Adds a deterministic regression test (`abandon_cycle_bounded`) that open/abandons a path each cycle while keeping one live anchor and asserts every per-path collection stays bounded, plus `connection_freed_after_peer_vanishes` showing a connection is fully reclaimed when the peer vanishes without a graceful close. Test-only `path_collection_sizes` / `PathStatsMap::len` accessors back the gauge. --- noq-proto/src/connection/mod.rs | 48 +++++++++++++++--- noq-proto/src/connection/stats.rs | 6 +++ noq-proto/src/tests/multipath.rs | 81 +++++++++++++++++++++++++++++++ noq/src/tests.rs | 56 +++++++++++++++++++++ 4 files changed, 183 insertions(+), 8 deletions(-) diff --git a/noq-proto/src/connection/mod.rs b/noq-proto/src/connection/mod.rs index 9f4f2cf1d..6129a86f5 100644 --- a/noq-proto/src/connection/mod.rs +++ b/noq-proto/src/connection/mod.rs @@ -300,14 +300,23 @@ pub struct Connection { /// possible to use [`Connection::local_cid_state`] to know if CIDs have been issued /// since they are issued asynchronously by the endpoint driver. max_path_id_with_cids: PathId, - /// The paths already abandoned - /// - /// They may still have some state left in [`Connection::paths`] or - /// [`Connection::local_cid_state`] since some of this has to be kept around for some - /// time after a path is abandoned. - // TODO(flub): Make this a more efficient data structure. Like ranges of abandoned - // paths. Or a set together with a minimum. Or something. + /// Paths that have been abandoned but not yet fully discarded. + /// + /// An abandoned path keeps some state in [`Connection::paths`] / + /// [`Connection::local_cid_state`] until it drains, so it stays here until + /// [`Connection::discard_path`] reclaims it — at which point its id is + /// removed (it is no longer a known path). This set is therefore bounded by + /// the number of concurrently-live paths, not by the lifetime abandon count. + /// The lifetime high-water needed to keep path ids monotonic is tracked + /// separately in [`Connection::max_abandoned_path_id`]. abandoned_paths: FxHashSet, + /// Greatest [`PathId`] ever abandoned (monotonic; never decreases). + /// + /// Path ids must never be reused, so [`Connection::open_path`] allocates + /// above this. It is kept as a scalar — rather than read off + /// [`Connection::abandoned_paths`] — so the set can be pruned on discard + /// without losing the reuse guard. `None` until the first abandon. + max_abandoned_path_id: Option, /// State for n0's () nat traversal protocol. n0_nat_traversal: n0_nat_traversal::State, @@ -429,6 +438,7 @@ impl Connection { remote_max_path_id: PathId::ZERO, max_path_id_with_cids: PathId::ZERO, abandoned_paths: Default::default(), + max_abandoned_path_id: None, n0_nat_traversal: Default::default(), qlog, @@ -568,7 +578,7 @@ impl Connection { return Err(PathError::ServerSideNotAllowed); } - let max_abandoned = self.abandoned_paths.iter().max().copied(); + let max_abandoned = self.max_abandoned_path_id; let max_used = self.paths.keys().last().copied(); let path_id = max_abandoned .max(max_used) @@ -717,6 +727,7 @@ impl Connection { .push_back(EndpointEventInner::RetireResetToken(path_id)); self.abandoned_paths.insert(path_id); + self.max_abandoned_path_id = self.max_abandoned_path_id.max(Some(path_id)); for timer in timer::PathTimer::VALUES { // match for completeness @@ -806,6 +817,23 @@ impl Connection { self.paths.keys().copied().collect() } + /// Sizes of the per-path collections, for leak gauging in tests. + /// + /// Order: `paths`, `abandoned_paths`, `remote_cids`, `local_cid_state`, + /// `path_stats`, data-space `number_spaces`. Each must stay bounded under + /// path churn — any value that grows with the churn count is a leak. + #[cfg(test)] + pub(crate) fn path_collection_sizes(&self) -> [usize; 6] { + [ + self.paths.len(), + self.abandoned_paths.len(), + self.remote_cids.len(), + self.local_cid_state.len(), + self.path_stats.len(), + self.spaces[SpaceId::Data].number_spaces.len(), + ] + } + /// Gets the local [`PathStatus`] for a known [`PathId`] pub fn path_status(&self, path_id: PathId) -> Result { self.path(path_id) @@ -3391,6 +3419,10 @@ impl Connection { self.partial_stats += path_stats; self.paths.remove(&path_id); self.spaces[SpaceId::Data].number_spaces.remove(&path_id); + // The id is no longer a known path; its monotonicity is preserved by + // `max_abandoned_path_id`, so drop it from the live set to keep it + // bounded by concurrent paths rather than the lifetime abandon count. + self.abandoned_paths.remove(&path_id); self.events.push_back( PathEvent::Discarded { diff --git a/noq-proto/src/connection/stats.rs b/noq-proto/src/connection/stats.rs index dfaf02748..3dc00ec75 100644 --- a/noq-proto/src/connection/stats.rs +++ b/noq-proto/src/connection/stats.rs @@ -364,4 +364,10 @@ impl PathStatsMap { pub(super) fn discard(&mut self, path_id: &PathId) -> PathStats { self.0.remove(path_id).unwrap_or_default() } + + /// Number of paths with retained stats (leak gauge in tests). + #[cfg(test)] + pub(super) fn len(&self) -> usize { + self.0.len() + } } diff --git a/noq-proto/src/tests/multipath.rs b/noq-proto/src/tests/multipath.rs index 1f6e8d125..ed49ecaf1 100644 --- a/noq-proto/src/tests/multipath.rs +++ b/noq-proto/src/tests/multipath.rs @@ -1648,6 +1648,87 @@ fn abandon_cycle() -> TestResult { Ok(()) } +/// Leak regression: open+abandon a path many times keeping one path always +/// live (so PATH_ABANDON is always sendable and the path drains the happy way). +/// Both `self.paths` (heavy `PathData`) and `self.abandoned_paths` (path ids) +/// must stay bounded across the churn — a value that grows with the cycle count +/// is a memory leak (matches the fleet soak's churn-proportional RSS slope). +#[test] +fn abandon_cycle_bounded() -> TestResult { + let _guard = subscribe(); + + let mut cfg = TransportConfig::default(); + cfg.max_concurrent_multipath_paths(6); + cfg.initial_rtt(Duration::from_millis(10)); + + let mut pair = ConnPair::with_transport_cfg(cfg.clone(), cfg); + pair.drive(); + + const CYCLES: u16 = 40; + + // One fresh remote address per cycle so no 4-tuple is ever reused. + let routing = pair.routes.as_basic(); + let mut addrs_client = vec![routing.client_addr]; + let mut addrs_server = vec![routing.server_addr]; + for i in 1..=CYCLES { + let mut ca = routing.client_addr; + ca.set_port(ca.port() + i); + addrs_client.push(ca); + let mut sa = routing.server_addr; + sa.set_port(sa.port() + i); + addrs_server.push(sa); + } + pair.routes = + ManyToManyRouting::simple_symmetric(addrs_client.clone(), addrs_server.clone()).into(); + + // Always keep the just-opened path as the live anchor, abandon the previous. + let mut current_path = PathId::ZERO; + for cycle in 0..CYCLES { + let addr_idx = (cycle as usize) + 1; + let new_path_net = FourTuple { + local_ip: Some(addrs_client[addr_idx].ip()), + remote: addrs_server[addr_idx], + }; + + let new_path = pair.open_path(Client, new_path_net, PathStatus::Available)?; + pair.drive(); + while pair.poll(Client).is_some() {} + while pair.poll(Server).is_some() {} + + pair.close_path(Client, current_path, 0u8.into())?; + pair.drive(); + while pair.poll(Client).is_some() {} + while pair.poll(Server).is_some() {} + + current_path = new_path; + + // All per-path collections. At most ~2 paths are ever live, and a + // drained path must leave every collection; none may grow with `cycle`. + let [paths, abandoned, remote_cids, local_cids, path_stats, number_spaces] = + pair.conn(Client).path_collection_sizes(); + info!( + cycle, + paths, abandoned, remote_cids, local_cids, path_stats, number_spaces, + "post-cycle path collection sizes" + ); + for (name, len) in [ + ("paths", paths), + ("abandoned_paths", abandoned), + ("remote_cids", remote_cids), + ("local_cid_state", local_cids), + ("path_stats", path_stats), + ("number_spaces", number_spaces), + ] { + assert!( + len <= 6, + "cycle {cycle}: {name} grew to {len} — unbounded under path churn (leak)" + ); + } + } + + Ok(()) +} + /// NAT traversal round revalidates an existing path via new PATH_CHALLENGE. #[test] fn nat_traversal_revalidates_existing_path() -> TestResult { diff --git a/noq/src/tests.rs b/noq/src/tests.rs index 74c760c3d..a9d1cc83e 100755 --- a/noq/src/tests.rs +++ b/noq/src/tests.rs @@ -1263,6 +1263,62 @@ async fn dropped_connection_cleans_up() { endpoint.wait_all_draining().await; } +/// Leak regression (fleet churn): when a peer vanishes without a graceful +/// CONNECTION_CLOSE — the gossip re-dial / NAT-flap pattern that drove the soak +/// RSS slope — the local connection must still be fully reclaimed. The per-conn +/// driver task holds the only remaining `ConnectionInner` ref after the handle +/// is dropped, so if it never terminates the whole connection (~hundreds of KB) +/// leaks for every churned peer. The weak handle must go dead within a bounded +/// time once the idle timeout reclaims the silent connection. +#[tokio::test(start_paused = true)] +async fn connection_freed_after_peer_vanishes() { + let _guard = subscribe(); + + const IDLE_TIMEOUT: Duration = Duration::from_secs(2); + let mut transport_config = TransportConfig::default(); + transport_config + .max_idle_timeout(Some(IDLE_TIMEOUT.try_into().unwrap())) + .initial_rtt(Duration::from_millis(10)); + + let factory = EndpointFactory::new(); + let server = factory.endpoint_with_config("server", transport_config.clone()); + let client = factory.endpoint_with_config("client", transport_config); + let server_addr = server.local_addr().unwrap(); + + let (client_conn, server_conn) = tokio::join!( + async { + client + .connect(server_addr, "localhost") + .unwrap() + .await + .unwrap() + }, + async { server.accept().await.unwrap().await.unwrap() } + ); + + let weak = client_conn.weak_handle(); + assert!(weak.is_alive(), "connection alive while established"); + + // Peer vanishes: drop the whole server so the client gets no close frame and + // must reclaim the connection via its idle timeout. + drop(server_conn); + drop(server); + + // Drop the local handle: only the spawned driver task references the + // connection now. It must drive to drained and terminate. + drop(client_conn); + + // Advance well past the idle timeout; the paused runtime polls the driver as + // its timers fire. + tokio::time::sleep(IDLE_TIMEOUT * 3).await; + tokio::task::yield_now().await; + + assert!( + !weak.is_alive(), + "ConnectionInner leaked: driver task never terminated after peer vanished" + ); +} + /// Test that accessing stats from `Path` works as expected. #[tokio::test] async fn path_clone_stats_after_abandon() {