From 36500fc7fbf50fd8ad576e2e3fff131657cb0d8f Mon Sep 17 00:00:00 2001 From: Dr Luke Angel Date: Fri, 29 May 2026 08:11:21 -0400 Subject: [PATCH 1/3] fix: implement Drop for Connecting to reap aborted handshakes A Connecting dropped before handshake completion left its connection task, packet spaces, and channels stranded in EndpointInner forever (before handshake completion max_idle_timeout is not yet enabled and there is no handshake timeout). Drop now takes the inner connection and calls implicit_close() (guarded by !is_closed) to drain and release. The connected oneshot Receiver is wrapped in Option so poll/into_0rtt can take() it without moving out of a type implementing Drop (E0509). --- noq/src/connection.rs | 49 ++++++++++++++++++++++++++++++------------- 1 file changed, 34 insertions(+), 15 deletions(-) diff --git a/noq/src/connection.rs b/noq/src/connection.rs index 9466ccb0d..53cb4b4ab 100644 --- a/noq/src/connection.rs +++ b/noq/src/connection.rs @@ -40,7 +40,7 @@ use proto::{ #[derive(Debug)] pub struct Connecting { conn: Option, - connected: oneshot::Receiver, + connected: Option>, handshake_data_ready: Option>, } @@ -82,7 +82,7 @@ impl Connecting { Self { conn: Some(conn), - connected: on_connected_recv, + connected: Some(on_connected_recv), handshake_data_ready: Some(on_handshake_data_recv), } } @@ -141,7 +141,8 @@ impl Connecting { if is_ok { let conn = self.conn.take().unwrap(); - Ok((Connection(conn), ZeroRttAccepted(self.connected))) + let connected = self.connected.take().unwrap(); + Ok((Connection(conn), ZeroRttAccepted(connected))) } else { Err(self) } @@ -213,19 +214,37 @@ impl Connecting { impl Future for Connecting { type Output = Result; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - Pin::new(&mut self.connected).poll(cx).map(|_| { - let conn = self.conn.take().unwrap(); - let inner = conn.lock_without_waking("connecting"); - if inner.connected { - drop(inner); - Ok(Connection(conn)) - } else { - Err(inner - .error - .clone() - .expect("connected signaled without connection success or error")) + let mut connected = self.connected.take().expect("polled after yielding Ready"); + match Pin::new(&mut connected).poll(cx) { + Poll::Ready(_) => { + let conn = self.conn.take().unwrap(); + let inner = conn.lock_without_waking("connecting"); + if inner.connected { + drop(inner); + Poll::Ready(Ok(Connection(conn))) + } else { + Poll::Ready(Err(inner + .error + .clone() + .expect("connected signaled without connection success or error"))) + } } - }) + Poll::Pending => { + self.connected = Some(connected); + Poll::Pending + } + } + } +} + +impl Drop for Connecting { + fn drop(&mut self) { + if let Some(conn) = self.conn.take() { + let mut state = conn.lock_without_waking("connecting_drop"); + if !state.inner.is_closed() { + state.implicit_close(&conn.shared); + } + } } } From b49aa337d8521dbd1d09dae47d7c6b00bfa7f6ef Mon Sep 17 00:00:00 2001 From: Dr Luke Angel Date: Mon, 1 Jun 2026 11:20:26 -0400 Subject: [PATCH 2/3] fix: address PR feedback for noq --- noq-proto/src/connection/mod.rs | 54 +++++++++++++-- noq/src/connection.rs | 114 +++++++++++++++++++------------- noq/src/tests.rs | 9 +++ 3 files changed, 127 insertions(+), 50 deletions(-) diff --git a/noq-proto/src/connection/mod.rs b/noq-proto/src/connection/mod.rs index 9f4f2cf1d..2394cdd8c 100644 --- a/noq-proto/src/connection/mod.rs +++ b/noq-proto/src/connection/mod.rs @@ -12,7 +12,7 @@ use bytes::{Bytes, BytesMut}; use frame::StreamMetaVec; use rand::{RngExt, SeedableRng, rngs::StdRng}; -use rustc_hash::{FxHashMap, FxHashSet}; +use rustc_hash::FxHashMap; use thiserror::Error; use tracing::{debug, error, trace, trace_span, warn}; @@ -307,7 +307,7 @@ pub struct Connection { /// time after a path is abandoned. // TODO(flub): Make this a more efficient data structure. Like ranges of abandoned // paths. Or a set together with a minimum. Or something. - abandoned_paths: FxHashSet, + abandoned_paths: AbandonedPaths, /// State for n0's () nat traversal protocol. n0_nat_traversal: n0_nat_traversal::State, @@ -416,7 +416,7 @@ impl Connection { config.stream_receive_window, ), datagrams: DatagramState::default(), - config, + config: config.clone(), remote_cids: FxHashMap::from_iter([(PathId::ZERO, CidQueue::new(remote_cid))]), rng, path_stats: Default::default(), @@ -428,7 +428,7 @@ impl Connection { local_max_path_id: PathId::ZERO, remote_max_path_id: PathId::ZERO, max_path_id_with_cids: PathId::ZERO, - abandoned_paths: Default::default(), + abandoned_paths: AbandonedPaths::new(config.max_concurrent_multipath_paths.map_or(1, |v| v.get()) as usize), n0_nat_traversal: Default::default(), qlog, @@ -568,7 +568,7 @@ impl Connection { return Err(PathError::ServerSideNotAllowed); } - let max_abandoned = self.abandoned_paths.iter().max().copied(); + let max_abandoned = self.abandoned_paths.max_path_id(); let max_used = self.paths.keys().last().copied(); let path_id = max_abandoned .max(max_used) @@ -7726,3 +7726,47 @@ mod tests { } } } + + +/// Bounded storage of the most recently abandoned paths +#[derive(Debug, Clone)] +pub(crate) struct AbandonedPaths { + paths: VecDeque, + capacity: usize, + total_abandoned: u32, + max_abandoned: Option, +} + +impl AbandonedPaths { + pub(crate) fn new(capacity: usize) -> Self { + Self { + paths: VecDeque::with_capacity(capacity), + capacity, + total_abandoned: 0, + max_abandoned: None, + } + } + + pub(crate) fn insert(&mut self, id: PathId) { + if !self.paths.contains(&id) { + self.paths.push_back(id); + self.total_abandoned += 1; + self.max_abandoned = Some(self.max_abandoned.unwrap_or(PathId::ZERO).max(id)); + if self.paths.len() > self.capacity { + self.paths.pop_front(); + } + } + } + + pub(crate) fn contains(&self, id: &PathId) -> bool { + self.paths.contains(id) + } + + pub(crate) fn len(&self) -> usize { + self.total_abandoned as usize + } + + pub(crate) fn max_path_id(&self) -> Option { + self.max_abandoned + } +} diff --git a/noq/src/connection.rs b/noq/src/connection.rs index 53cb4b4ab..9fc4d056c 100644 --- a/noq/src/connection.rs +++ b/noq/src/connection.rs @@ -39,9 +39,28 @@ use proto::{ /// In-progress connection attempt future #[derive(Debug)] pub struct Connecting { - conn: Option, - connected: Option>, - handshake_data_ready: Option>, + state: ConnectingState, +} + +#[derive(Debug)] +enum ConnectingState { + Active { + conn: ConnectionRef, + connected: oneshot::Receiver, + handshake_data_ready: Option>, + }, + Consumed, +} + +impl Drop for Connecting { + fn drop(&mut self) { + if let ConnectingState::Active { conn, .. } = &mut self.state { + let mut state = conn.lock_without_waking("connecting_drop"); + if !state.inner.is_closed() { + state.implicit_close(&conn.shared); + } + } + } } impl Connecting { @@ -81,9 +100,11 @@ impl Connecting { )); Self { - conn: Some(conn), - connected: Some(on_connected_recv), - handshake_data_ready: Some(on_handshake_data_recv), + state: ConnectingState::Active { + conn, + connected: on_connected_recv, + handshake_data_ready: Some(on_handshake_data_recv), + }, } } @@ -132,17 +153,20 @@ impl Connecting { /// before TLS client authentication has occurred, and should therefore not be used to send /// data for which client authentication is being used. pub fn into_0rtt(mut self) -> Result<(Connection, ZeroRttAccepted), Self> { - // This lock borrows `self` and would normally be dropped at the end of this scope, so we'll - // have to release it explicitly before returning `self` by value. - let conn = (self.conn.as_mut().unwrap()).lock_without_waking("into_0rtt"); - - let is_ok = conn.inner.has_0rtt() || conn.inner.side().is_server(); - drop(conn); + let is_ok = match &self.state { + ConnectingState::Active { conn, .. } => { + let inner = conn.lock_without_waking("into_0rtt"); + inner.inner.has_0rtt() || inner.inner.side().is_server() + } + ConnectingState::Consumed => false, + }; if is_ok { - let conn = self.conn.take().unwrap(); - let connected = self.connected.take().unwrap(); - Ok((Connection(conn), ZeroRttAccepted(connected))) + if let ConnectingState::Active { conn, connected, .. } = std::mem::replace(&mut self.state, ConnectingState::Consumed) { + Ok((Connection(conn), ZeroRttAccepted(connected))) + } else { + unreachable!() + } } else { Err(self) } @@ -158,10 +182,13 @@ impl Connecting { // Taking &mut self allows us to use a single oneshot channel rather than dealing with // potentially many tasks waiting on the same event. It's a bit of a hack, but keeps things // simple. - if let Some(x) = self.handshake_data_ready.take() { + let ConnectingState::Active { conn, handshake_data_ready, .. } = &mut self.state else { + panic!("used after yielding Ready"); + }; + + if let Some(x) = handshake_data_ready.take() { let _ = x.await; } - let conn = self.conn.as_ref().unwrap(); let inner = conn.lock_without_waking("handshake"); inner .inner @@ -187,7 +214,9 @@ impl Connecting { /// /// Will panic if called after `poll` has returned `Ready`. pub fn local_ip(&self) -> Option { - let conn = self.conn.as_ref().expect("used after yielding Ready"); + let ConnectingState::Active { conn, .. } = &self.state else { + panic!("used after yielding Ready"); + }; let inner = conn.lock_without_waking("local_ip"); inner @@ -201,8 +230,10 @@ impl Connecting { /// /// Will panic if called after `poll` has returned `Ready`. pub fn remote_address(&self) -> SocketAddr { - let conn_ref: &ConnectionRef = self.conn.as_ref().expect("used after yielding Ready"); - conn_ref + let ConnectingState::Active { conn, .. } = &self.state else { + panic!("used after yielding Ready"); + }; + conn .lock_without_waking("remote_address") .inner .network_path(PathId::ZERO) @@ -214,36 +245,29 @@ impl Connecting { impl Future for Connecting { type Output = Result; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let mut connected = self.connected.take().expect("polled after yielding Ready"); - match Pin::new(&mut connected).poll(cx) { - Poll::Ready(_) => { - let conn = self.conn.take().unwrap(); - let inner = conn.lock_without_waking("connecting"); - if inner.connected { - drop(inner); - Poll::Ready(Ok(Connection(conn))) - } else { - Poll::Ready(Err(inner - .error - .clone() - .expect("connected signaled without connection success or error"))) + match &mut self.state { + ConnectingState::Active { connected, .. } => { + match Pin::new(connected).poll(cx) { + Poll::Pending => return Poll::Pending, + Poll::Ready(_) => {} } } - Poll::Pending => { - self.connected = Some(connected); - Poll::Pending - } + ConnectingState::Consumed => panic!("polled after yielding Ready"), } - } -} -impl Drop for Connecting { - fn drop(&mut self) { - if let Some(conn) = self.conn.take() { - let mut state = conn.lock_without_waking("connecting_drop"); - if !state.inner.is_closed() { - state.implicit_close(&conn.shared); + if let ConnectingState::Active { conn, .. } = std::mem::replace(&mut self.state, ConnectingState::Consumed) { + let inner = conn.lock_without_waking("connecting"); + if inner.connected { + drop(inner); + Poll::Ready(Ok(Connection(conn))) + } else { + Poll::Ready(Err(inner + .error + .clone() + .expect("connected signaled without connection success or error"))) } + } else { + unreachable!() } } } diff --git a/noq/src/tests.rs b/noq/src/tests.rs index 74c760c3d..37765e2ee 100755 --- a/noq/src/tests.rs +++ b/noq/src/tests.rs @@ -1762,3 +1762,12 @@ unsafe fn wake_by_ref_waker(data: *const ()) { unsafe fn drop_waker(data: *const ()) { drop(unsafe { Arc::::from_raw(data as *const WakeCounter) }); } + +#[tokio::test] +async fn drop_connecting_cleans_up() { + let ep = endpoint(); + let addr = "127.0.0.1:1234".parse().unwrap(); + let connecting = ep.connect(addr, "localhost").unwrap(); + drop(connecting); + ep.wait_idle().await; +} From 923a7f706d76b7f86a8893c4058ff805a0c78e30 Mon Sep 17 00:00:00 2001 From: Dr Luke Angel Date: Tue, 2 Jun 2026 05:59:24 -0400 Subject: [PATCH 3/3] fix: address PR feedback for noq state refactor --- noq-proto/src/connection/mod.rs | 54 +++------------------------------ noq/src/connection.rs | 4 ++- 2 files changed, 8 insertions(+), 50 deletions(-) diff --git a/noq-proto/src/connection/mod.rs b/noq-proto/src/connection/mod.rs index 2394cdd8c..9f4f2cf1d 100644 --- a/noq-proto/src/connection/mod.rs +++ b/noq-proto/src/connection/mod.rs @@ -12,7 +12,7 @@ use bytes::{Bytes, BytesMut}; use frame::StreamMetaVec; use rand::{RngExt, SeedableRng, rngs::StdRng}; -use rustc_hash::FxHashMap; +use rustc_hash::{FxHashMap, FxHashSet}; use thiserror::Error; use tracing::{debug, error, trace, trace_span, warn}; @@ -307,7 +307,7 @@ pub struct Connection { /// time after a path is abandoned. // TODO(flub): Make this a more efficient data structure. Like ranges of abandoned // paths. Or a set together with a minimum. Or something. - abandoned_paths: AbandonedPaths, + abandoned_paths: FxHashSet, /// State for n0's () nat traversal protocol. n0_nat_traversal: n0_nat_traversal::State, @@ -416,7 +416,7 @@ impl Connection { config.stream_receive_window, ), datagrams: DatagramState::default(), - config: config.clone(), + config, remote_cids: FxHashMap::from_iter([(PathId::ZERO, CidQueue::new(remote_cid))]), rng, path_stats: Default::default(), @@ -428,7 +428,7 @@ impl Connection { local_max_path_id: PathId::ZERO, remote_max_path_id: PathId::ZERO, max_path_id_with_cids: PathId::ZERO, - abandoned_paths: AbandonedPaths::new(config.max_concurrent_multipath_paths.map_or(1, |v| v.get()) as usize), + abandoned_paths: Default::default(), n0_nat_traversal: Default::default(), qlog, @@ -568,7 +568,7 @@ impl Connection { return Err(PathError::ServerSideNotAllowed); } - let max_abandoned = self.abandoned_paths.max_path_id(); + let max_abandoned = self.abandoned_paths.iter().max().copied(); let max_used = self.paths.keys().last().copied(); let path_id = max_abandoned .max(max_used) @@ -7726,47 +7726,3 @@ mod tests { } } } - - -/// Bounded storage of the most recently abandoned paths -#[derive(Debug, Clone)] -pub(crate) struct AbandonedPaths { - paths: VecDeque, - capacity: usize, - total_abandoned: u32, - max_abandoned: Option, -} - -impl AbandonedPaths { - pub(crate) fn new(capacity: usize) -> Self { - Self { - paths: VecDeque::with_capacity(capacity), - capacity, - total_abandoned: 0, - max_abandoned: None, - } - } - - pub(crate) fn insert(&mut self, id: PathId) { - if !self.paths.contains(&id) { - self.paths.push_back(id); - self.total_abandoned += 1; - self.max_abandoned = Some(self.max_abandoned.unwrap_or(PathId::ZERO).max(id)); - if self.paths.len() > self.capacity { - self.paths.pop_front(); - } - } - } - - pub(crate) fn contains(&self, id: &PathId) -> bool { - self.paths.contains(id) - } - - pub(crate) fn len(&self) -> usize { - self.total_abandoned as usize - } - - pub(crate) fn max_path_id(&self) -> Option { - self.max_abandoned - } -} diff --git a/noq/src/connection.rs b/noq/src/connection.rs index 9fc4d056c..8262f37b9 100644 --- a/noq/src/connection.rs +++ b/noq/src/connection.rs @@ -165,7 +165,7 @@ impl Connecting { if let ConnectingState::Active { conn, connected, .. } = std::mem::replace(&mut self.state, ConnectingState::Consumed) { Ok((Connection(conn), ZeroRttAccepted(connected))) } else { - unreachable!() + unreachable!("state must be Active since is_ok is true") } } else { Err(self) @@ -178,6 +178,8 @@ impl Connecting { /// [`Session`](proto::crypto::Session). For the default `rustls` session, the return value can /// be [`downcast`](Box::downcast) to a /// [`crypto::rustls::HandshakeData`](crate::crypto::rustls::HandshakeData). + /// + /// Will panic if called after `poll` has returned `Ready`. pub async fn handshake_data(&mut self) -> Result, ConnectionError> { // Taking &mut self allows us to use a single oneshot channel rather than dealing with // potentially many tasks waiting on the same event. It's a bit of a hack, but keeps things