Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 40 additions & 8 deletions noq-proto/src/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,14 +300,23 @@ pub struct Connection {
/// possible to use [`Connection::local_cid_state`] to know if CIDs have been issued
/// since they are issued asynchronously by the endpoint driver.
max_path_id_with_cids: PathId,
/// The paths already abandoned
///
/// They may still have some state left in [`Connection::paths`] or
/// [`Connection::local_cid_state`] since some of this has to be kept around for some
/// time after a path is abandoned.
// TODO(flub): Make this a more efficient data structure. Like ranges of abandoned
// paths. Or a set together with a minimum. Or something.
/// Paths that have been abandoned but not yet fully discarded.
///
/// An abandoned path keeps some state in [`Connection::paths`] /
/// [`Connection::local_cid_state`] until it drains, so it stays here until
/// [`Connection::discard_path`] reclaims it β€” at which point its id is
/// removed (it is no longer a known path). This set is therefore bounded by
/// the number of concurrently-live paths, not by the lifetime abandon count.
/// The lifetime high-water needed to keep path ids monotonic is tracked
/// separately in [`Connection::max_abandoned_path_id`].
abandoned_paths: FxHashSet<PathId>,
/// Greatest [`PathId`] ever abandoned (monotonic; never decreases).
///
/// Path ids must never be reused, so [`Connection::open_path`] allocates
/// above this. It is kept as a scalar β€” rather than read off
/// [`Connection::abandoned_paths`] β€” so the set can be pruned on discard
/// without losing the reuse guard. `None` until the first abandon.
max_abandoned_path_id: Option<PathId>,
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixing this by adding more state to the Connection struct is not the way to go. It spreads out the logic of what is happening over the many places that touch it and increase the overall complexity. The logical behaviour of the FxHashSet should be preserved, this should be all that the rest of the application deals with.

So instead the fix for this should look at replacing the type of abandoned_paths with a custom type that has a fixed amount of storage, directly proportional to the configured max_concurrent_multipath_paths. The external interface then stays the same, or can be equivalent but better e.g. in the case of the .iter().max().copied() which isn't exactly great.


/// State for n0's (<https://n0.computer>) nat traversal protocol.
n0_nat_traversal: n0_nat_traversal::State,
Expand Down Expand Up @@ -429,6 +438,7 @@ impl Connection {
remote_max_path_id: PathId::ZERO,
max_path_id_with_cids: PathId::ZERO,
abandoned_paths: Default::default(),
max_abandoned_path_id: None,

n0_nat_traversal: Default::default(),
qlog,
Expand Down Expand Up @@ -568,7 +578,7 @@ impl Connection {
return Err(PathError::ServerSideNotAllowed);
}

let max_abandoned = self.abandoned_paths.iter().max().copied();
let max_abandoned = self.max_abandoned_path_id;
let max_used = self.paths.keys().last().copied();
let path_id = max_abandoned
.max(max_used)
Expand Down Expand Up @@ -717,6 +727,7 @@ impl Connection {
.push_back(EndpointEventInner::RetireResetToken(path_id));

self.abandoned_paths.insert(path_id);
self.max_abandoned_path_id = self.max_abandoned_path_id.max(Some(path_id));

for timer in timer::PathTimer::VALUES {
// match for completeness
Expand Down Expand Up @@ -806,6 +817,23 @@ impl Connection {
self.paths.keys().copied().collect()
}

/// Sizes of the per-path collections, for leak gauging in tests.
///
/// Order: `paths`, `abandoned_paths`, `remote_cids`, `local_cid_state`,
/// `path_stats`, data-space `number_spaces`. Each must stay bounded under
/// path churn β€” any value that grows with the churn count is a leak.
#[cfg(test)]
pub(crate) fn path_collection_sizes(&self) -> [usize; 6] {
[
self.paths.len(),
self.abandoned_paths.len(),
self.remote_cids.len(),
self.local_cid_state.len(),
self.path_stats.len(),
self.spaces[SpaceId::Data].number_spaces.len(),
]
}

/// Gets the local [`PathStatus`] for a known [`PathId`]
pub fn path_status(&self, path_id: PathId) -> Result<PathStatus, ClosedPath> {
self.path(path_id)
Expand Down Expand Up @@ -3391,6 +3419,10 @@ impl Connection {
self.partial_stats += path_stats;
self.paths.remove(&path_id);
self.spaces[SpaceId::Data].number_spaces.remove(&path_id);
// The id is no longer a known path; its monotonicity is preserved by
// `max_abandoned_path_id`, so drop it from the live set to keep it
// bounded by concurrent paths rather than the lifetime abandon count.
self.abandoned_paths.remove(&path_id);

self.events.push_back(
PathEvent::Discarded {
Expand Down
6 changes: 6 additions & 0 deletions noq-proto/src/connection/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -364,4 +364,10 @@ impl PathStatsMap {
pub(super) fn discard(&mut self, path_id: &PathId) -> PathStats {
self.0.remove(path_id).unwrap_or_default()
}

/// Number of paths with retained stats (leak gauge in tests).
#[cfg(test)]
pub(super) fn len(&self) -> usize {
self.0.len()
}
}
81 changes: 81 additions & 0 deletions noq-proto/src/tests/multipath.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1648,6 +1648,87 @@ fn abandon_cycle() -> TestResult {
Ok(())
}

/// Leak regression: open+abandon a path many times keeping one path always
/// live (so PATH_ABANDON is always sendable and the path drains the happy way).
/// Both `self.paths` (heavy `PathData`) and `self.abandoned_paths` (path ids)
/// must stay bounded across the churn β€” a value that grows with the cycle count
/// is a memory leak (matches the fleet soak's churn-proportional RSS slope).
#[test]
fn abandon_cycle_bounded() -> TestResult {
let _guard = subscribe();

let mut cfg = TransportConfig::default();
cfg.max_concurrent_multipath_paths(6);
cfg.initial_rtt(Duration::from_millis(10));

let mut pair = ConnPair::with_transport_cfg(cfg.clone(), cfg);
pair.drive();

const CYCLES: u16 = 40;

// One fresh remote address per cycle so no 4-tuple is ever reused.
let routing = pair.routes.as_basic();
let mut addrs_client = vec![routing.client_addr];
let mut addrs_server = vec![routing.server_addr];
for i in 1..=CYCLES {
let mut ca = routing.client_addr;
ca.set_port(ca.port() + i);
addrs_client.push(ca);
let mut sa = routing.server_addr;
sa.set_port(sa.port() + i);
addrs_server.push(sa);
}
pair.routes =
ManyToManyRouting::simple_symmetric(addrs_client.clone(), addrs_server.clone()).into();

// Always keep the just-opened path as the live anchor, abandon the previous.
let mut current_path = PathId::ZERO;
for cycle in 0..CYCLES {
let addr_idx = (cycle as usize) + 1;
let new_path_net = FourTuple {
local_ip: Some(addrs_client[addr_idx].ip()),
remote: addrs_server[addr_idx],
};

let new_path = pair.open_path(Client, new_path_net, PathStatus::Available)?;
pair.drive();
while pair.poll(Client).is_some() {}
while pair.poll(Server).is_some() {}

pair.close_path(Client, current_path, 0u8.into())?;
pair.drive();
while pair.poll(Client).is_some() {}
while pair.poll(Server).is_some() {}

current_path = new_path;

// All per-path collections. At most ~2 paths are ever live, and a
// drained path must leave every collection; none may grow with `cycle`.
let [paths, abandoned, remote_cids, local_cids, path_stats, number_spaces] =
pair.conn(Client).path_collection_sizes();
info!(
cycle,
paths, abandoned, remote_cids, local_cids, path_stats, number_spaces,
"post-cycle path collection sizes"
);
for (name, len) in [
("paths", paths),
("abandoned_paths", abandoned),
("remote_cids", remote_cids),
("local_cid_state", local_cids),
("path_stats", path_stats),
("number_spaces", number_spaces),
] {
assert!(
len <= 6,
"cycle {cycle}: {name} grew to {len} β€” unbounded under path churn (leak)"
);
}
}

Ok(())
}

/// NAT traversal round revalidates an existing path via new PATH_CHALLENGE.
#[test]
fn nat_traversal_revalidates_existing_path() -> TestResult {
Expand Down
56 changes: 56 additions & 0 deletions noq/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1263,6 +1263,62 @@ async fn dropped_connection_cleans_up() {
endpoint.wait_all_draining().await;
}

/// Leak regression (fleet churn): when a peer vanishes without a graceful
/// CONNECTION_CLOSE β€” the gossip re-dial / NAT-flap pattern that drove the soak
/// RSS slope β€” the local connection must still be fully reclaimed. The per-conn
/// driver task holds the only remaining `ConnectionInner` ref after the handle
/// is dropped, so if it never terminates the whole connection (~hundreds of KB)
/// leaks for every churned peer. The weak handle must go dead within a bounded
/// time once the idle timeout reclaims the silent connection.
#[tokio::test(start_paused = true)]
async fn connection_freed_after_peer_vanishes() {
let _guard = subscribe();

const IDLE_TIMEOUT: Duration = Duration::from_secs(2);
let mut transport_config = TransportConfig::default();
transport_config
.max_idle_timeout(Some(IDLE_TIMEOUT.try_into().unwrap()))
.initial_rtt(Duration::from_millis(10));

let factory = EndpointFactory::new();
let server = factory.endpoint_with_config("server", transport_config.clone());
let client = factory.endpoint_with_config("client", transport_config);
let server_addr = server.local_addr().unwrap();

let (client_conn, server_conn) = tokio::join!(
async {
client
.connect(server_addr, "localhost")
.unwrap()
.await
.unwrap()
},
async { server.accept().await.unwrap().await.unwrap() }
);

let weak = client_conn.weak_handle();
assert!(weak.is_alive(), "connection alive while established");

// Peer vanishes: drop the whole server so the client gets no close frame and
// must reclaim the connection via its idle timeout.
drop(server_conn);
drop(server);

// Drop the local handle: only the spawned driver task references the
// connection now. It must drive to drained and terminate.
drop(client_conn);

// Advance well past the idle timeout; the paused runtime polls the driver as
// its timers fire.
tokio::time::sleep(IDLE_TIMEOUT * 3).await;
tokio::task::yield_now().await;

assert!(
!weak.is_alive(),
"ConnectionInner leaked: driver task never terminated after peer vanished"
);
}

/// Test that accessing stats from `Path` works as expected.
#[tokio::test]
async fn path_clone_stats_after_abandon() {
Expand Down
Loading