From 75828239919441f1903550bc1f81748a251fd42f Mon Sep 17 00:00:00 2001 From: Max Holman Date: Wed, 18 Mar 2026 21:05:40 +0700 Subject: [PATCH 1/5] refactor: align internal Rust names with proto/CLI naming convention NodeApi trait methods, handler structs, and REST/MCP function names now follow the same noun-verb pattern used by the management protocol and CLI/REPL (e.g. hint_set, route_del, peer_disconnect). OpenAPI operationIds updated to match (e.g. peersList, peerDisconnect, infoGet). Stale comments referencing old names cleaned up. Co-Authored-By: Claude Opus 4.6 (1M context) --- crates/api/src/handlers.rs | 22 ++++++++++---------- crates/api/src/lib.rs | 6 +++--- crates/core/src/control/handler.rs | 32 +++++++++++++++--------------- crates/core/src/ipc.rs | 14 ++++++------- crates/core/src/node_api.rs | 24 +++++++++++----------- crates/daemon/src/mode/mod.rs | 2 +- crates/mcp/src/tools.rs | 6 +++--- website/src/data/openapi.json | 14 ++++++------- 8 files changed, 60 insertions(+), 60 deletions(-) diff --git a/crates/api/src/handlers.rs b/crates/api/src/handlers.rs index c618140..14b7319 100644 --- a/crates/api/src/handlers.rs +++ b/crates/api/src/handlers.rs @@ -29,9 +29,9 @@ pub struct StatsResponse { pub active_flows: u64, } -/// Node status response. +/// Node info response. #[derive(Debug, Serialize)] -pub struct StatusResponse { +pub struct InfoResponse { pub name: String, pub version: String, pub role: String, @@ -133,9 +133,9 @@ pub struct PingResponseBody { pub role: String, } -/// Set hint request body. +/// Hint set request body. #[derive(Debug, Deserialize)] -pub struct SetHintRequestBody { +pub struct HintSetRequestBody { pub level: String, pub role: String, } @@ -183,7 +183,7 @@ pub async fn events( ) } -pub async fn info(State(state): State) -> Result, StatusCode> { +pub async fn info(State(state): State) -> Result, StatusCode> { let resp = state .ipc .lock() @@ -195,7 +195,7 @@ pub async fn info(State(state): State) -> Result, match resp.response { Some(management_response::Response::Info(s)) => { let role = s.role().to_string(); - Ok(Json(StatusResponse { + Ok(Json(InfoResponse { name: s.package_name, version: s.version, role, @@ -280,7 +280,7 @@ pub async fn peers(State(state): State) -> Result, } } -pub async fn disconnect_peer( +pub async fn peer_disconnect( State(state): State, Path(name): Path, ) -> (StatusCode, Json) { @@ -644,7 +644,7 @@ pub async fn ping(State(state): State) -> Result, Path(peer): Path, ) -> Result, StatusCode> { @@ -704,9 +704,9 @@ pub async fn shutdown(State(state): State) -> (StatusCode, Json, - Json(req): Json, + Json(req): Json, ) -> (StatusCode, Json) { let level = match req.level.as_str() { "prefer" => HintLevel::Prefer, @@ -787,7 +787,7 @@ pub async fn set_hint( } } -pub async fn clear_hints(State(state): State) -> (StatusCode, Json) { +pub async fn hint_set_auto(State(state): State) -> (StatusCode, Json) { let resp = state .ipc .lock() diff --git a/crates/api/src/lib.rs b/crates/api/src/lib.rs index 4c603f9..bfa7323 100644 --- a/crates/api/src/lib.rs +++ b/crates/api/src/lib.rs @@ -65,7 +65,7 @@ pub fn router(state: State) -> Router { .route("/info", get(handlers::info)) .route("/stats", get(handlers::stats)) .route("/peers", get(handlers::peers)) - .route("/peers/{name}", delete(handlers::disconnect_peer)) + .route("/peers/{name}", delete(handlers::peer_disconnect)) .route( "/routes", get(handlers::list_routes).post(handlers::add_route), @@ -76,11 +76,11 @@ pub fn router(state: State) -> Router { .route("/listen", post(handlers::listen)) .route("/disconnect", post(handlers::disconnect)) .route("/ping", get(handlers::ping)) - .route("/ping/{peer}", get(handlers::ping_peer)) + .route("/ping/{peer}", get(handlers::peer_ping)) .route("/shutdown", post(handlers::shutdown)) .route( "/hints", - put(handlers::set_hint).delete(handlers::clear_hints), + put(handlers::hint_set).delete(handlers::hint_set_auto), ) .layer(middleware::from_fn(move |req, next| { let auth = auth.clone(); diff --git a/crates/core/src/control/handler.rs b/crates/core/src/control/handler.rs index 46054c8..f6bd930 100644 --- a/crates/core/src/control/handler.rs +++ b/crates/core/src/control/handler.rs @@ -33,8 +33,8 @@ struct NodeState { /// Shared node state handle, cloneable and cheaply updatable. /// /// Consumers call [`SharedNodeState::update_role`], [`SharedNodeState::update_capabilities`], -/// etc. after negotiation or listening starts so that `wallhack info` / -/// `wallhack_status` reflects the real state of the daemon. +/// etc. after negotiation or listening starts so that `wallhack info` +/// reflects the real state of the daemon. #[derive(Clone, Debug)] pub struct SharedNodeState(Arc>); @@ -148,7 +148,7 @@ impl Handler { /// Returns a handle to the shared node state. /// /// Callers (daemon modes) use this to update role, capabilities, and - /// listen/connect state after negotiation so that `status()` reports + /// listen/connect state after negotiation so that `info()` reports /// accurate information. #[must_use] pub fn node_state(&self) -> SharedNodeState { @@ -396,9 +396,9 @@ impl crate::node_api::NodeApi for Handler { self.metrics.snapshot() } - fn status(&self) -> crate::node_api::NodeStatus { + fn info(&self) -> crate::node_api::NodeInfo { let state = self.state.load(); - crate::node_api::NodeStatus { + crate::node_api::NodeInfo { role: state.role, peer_addr: state.peer_addr.clone(), capabilities: state.capabilities, @@ -441,7 +441,7 @@ impl crate::node_api::NodeApi for Handler { Ok(()) } - fn remove_route(&self, cidr: &crate::Cidr) -> crate::node_api::Result<()> { + fn route_del(&self, cidr: &crate::Cidr) -> crate::node_api::Result<()> { if let Some(entry) = self.routes.remove(cidr) { let _ = self .route_updates @@ -452,7 +452,7 @@ impl crate::node_api::NodeApi for Handler { } } - fn disconnect_peer(&self, peer: String) -> crate::node_api::Result<()> { + fn peer_disconnect(&self, peer: String) -> crate::node_api::Result<()> { // Try name prefix first, then fall back to exact address match. // Used by REPL/CLI where prefix matching is convenient. let peer_info = self.peers.find_by_prefix(&peer).or_else(|e| { @@ -467,7 +467,7 @@ impl crate::node_api::NodeApi for Handler { Ok(()) } - fn disconnect_peer_by_id(&self, id: String) -> crate::node_api::Result<()> { + fn peer_disconnect_by_id(&self, id: String) -> crate::node_api::Result<()> { // Exact match on registry key. Used by REST API where the id // is taken directly from the peers list. if self.peers.get(&id).is_none() { @@ -481,12 +481,12 @@ impl crate::node_api::NodeApi for Handler { self.state.load().role } - fn set_hint(&self, hint: RoleHint) -> crate::node_api::Result<()> { + fn hint_set(&self, hint: RoleHint) -> crate::node_api::Result<()> { self.hint_tx.send_replace(Some(hint)); Ok(()) } - fn clear_hints(&self) -> crate::node_api::Result<()> { + fn hint_set_auto(&self) -> crate::node_api::Result<()> { self.hint_tx.send_replace(None); Ok(()) } @@ -738,7 +738,7 @@ mod tests { } #[test] - fn test_status_indeterminate_role() { + fn test_info_indeterminate_role() { let metrics = Arc::new(Metrics::default()); let peers = Arc::new(Registry::new()); let routes = RouteTable::shared(); @@ -754,7 +754,7 @@ mod tests { tokio::sync::broadcast::channel(16).0, ); - let status = crate::node_api::NodeApi::status(&handler); + let status = crate::node_api::NodeApi::info(&handler); assert_eq!(status.role, NodeRole::Indeterminate); } @@ -795,7 +795,7 @@ mod tests { } #[test] - fn test_status_reflects_node_state_updates() { + fn test_info_reflects_node_state_updates() { let handler = Handler::new( HandlerConfig::new( NodeRole::Indeterminate, @@ -809,7 +809,7 @@ mod tests { ); // Initially indeterminate with no capabilities. - let status = crate::node_api::NodeApi::status(&handler); + let status = crate::node_api::NodeApi::info(&handler); assert_eq!(status.role, NodeRole::Indeterminate); assert!(!status.capabilities.tun_capable); assert!(!status.capabilities.listening); @@ -825,7 +825,7 @@ mod tests { interactive: false, }); - let status = crate::node_api::NodeApi::status(&handler); + let status = crate::node_api::NodeApi::info(&handler); assert_eq!(status.role, NodeRole::Entry); assert!(status.capabilities.tun_capable); @@ -833,7 +833,7 @@ mod tests { let addr: SocketAddr = "0.0.0.0:4433".parse().unwrap(); state.set_listen_addr(addr); - let status = crate::node_api::NodeApi::status(&handler); + let status = crate::node_api::NodeApi::info(&handler); assert_eq!(status.listen_addr, Some(addr)); assert!(status.capabilities.listening); } diff --git a/crates/core/src/ipc.rs b/crates/core/src/ipc.rs index 30c009c..ec6851f 100644 --- a/crates/core/src/ipc.rs +++ b/crates/core/src/ipc.rs @@ -290,7 +290,7 @@ fn dispatch_request(request: &ManagementRequest, api: &dyn NodeApi) -> Managemen Some(management_request::Request::Ping(req)) => { if req.peer.is_empty() { // Ping the daemon itself - let status = api.status(); + let status = api.info(); management_response::Response::Ping(PingResponse { uptime_ms: status.uptime_ms, version: status.version, @@ -309,7 +309,7 @@ fn dispatch_request(request: &ManagementRequest, api: &dyn NodeApi) -> Managemen } Some(management_request::Request::Info(_)) => { - let s = api.status(); + let s = api.info(); management_response::Response::Info(InfoResponse { role: management::NodeRole::from(s.role).into(), connected: false, // deprecated — derive from peer count instead @@ -366,7 +366,7 @@ fn dispatch_request(request: &ManagementRequest, api: &dyn NodeApi) -> Managemen }, Some(management_request::Request::RouteDel(req)) => match req.cidr.parse() { - Ok(cidr) => match api.remove_route(&cidr) { + Ok(cidr) => match api.route_del(&cidr) { Ok(()) => management_response::Response::Ok(OkResponse {}), Err(e) => error_response(&e), }, @@ -378,9 +378,9 @@ fn dispatch_request(request: &ManagementRequest, api: &dyn NodeApi) -> Managemen Some(management_request::Request::PeerDisconnect(req)) => { let result = if req.exact { - api.disconnect_peer_by_id(req.peer.clone()) + api.peer_disconnect_by_id(req.peer.clone()) } else { - api.disconnect_peer(req.peer.clone()) + api.peer_disconnect(req.peer.clone()) }; match result { Ok(()) => management_response::Response::Ok(OkResponse {}), @@ -430,13 +430,13 @@ fn dispatch_request(request: &ManagementRequest, api: &dyn NodeApi) -> Managemen level: level.into(), target: target.into(), }; - match api.set_hint(hint) { + match api.hint_set(hint) { Ok(()) => management_response::Response::Ok(OkResponse {}), Err(e) => error_response(&e), } } - Some(management_request::Request::HintSetAuto(_)) => match api.clear_hints() { + Some(management_request::Request::HintSetAuto(_)) => match api.hint_set_auto() { Ok(()) => management_response::Response::Ok(OkResponse {}), Err(e) => error_response(&e), }, diff --git a/crates/core/src/node_api.rs b/crates/core/src/node_api.rs index 29f22eb..d927180 100644 --- a/crates/core/src/node_api.rs +++ b/crates/core/src/node_api.rs @@ -80,9 +80,9 @@ pub struct Metrics { pub packets_dropped: u64, } -/// Overall node status information. +/// Overall node info. #[derive(Debug, Clone)] -pub struct NodeStatus { +pub struct NodeInfo { /// Node's role. pub role: NodeRole, /// Peer address (if connected). @@ -153,7 +153,7 @@ pub trait NodeApi: Send + Sync { /// Get list of directly connected peers. /// /// For entry nodes: returns all connected exit/relay nodes. - /// For exit nodes with relay capability: returns downstream connected nodes. + /// For exit nodes with relay capability: returns accepted peer connections. /// For standard exit nodes: returns empty (no peers). fn peers(&self) -> Vec; @@ -165,8 +165,8 @@ pub trait NodeApi: Send + Sync { /// Get traffic and connection metrics. fn metrics(&self) -> Metrics; - /// Get overall node status. - fn status(&self) -> NodeStatus; + /// Get overall node info. + fn info(&self) -> NodeInfo; /// Connect to a peer. /// @@ -197,20 +197,20 @@ pub trait NodeApi: Send + Sync { /// Peer must be directly connected. fn add_route(&self, cidr: Cidr, peer: String) -> Result<()>; - /// Remove a route by CIDR. + /// Delete a route by CIDR. /// /// Only supported on entry nodes. Returns error for exit/relay nodes. - fn remove_route(&self, cidr: &Cidr) -> Result<()>; + fn route_del(&self, cidr: &Cidr) -> Result<()>; /// Disconnect a specific peer by name prefix or address. /// /// Supports prefix matching for REPL/CLI convenience. - fn disconnect_peer(&self, peer: String) -> Result<()>; + fn peer_disconnect(&self, peer: String) -> Result<()>; /// Disconnect a specific peer by exact registry id. /// /// Used by the REST API where the id comes directly from the peers list. - fn disconnect_peer_by_id(&self, id: String) -> Result<()>; + fn peer_disconnect_by_id(&self, id: String) -> Result<()>; /// Get the current negotiated role. fn current_role(&self) -> NodeRole; @@ -218,9 +218,9 @@ pub trait NodeApi: Send + Sync { /// Apply a role hint at runtime. /// /// Triggers re-negotiation if the node is in auto mode. - /// `role ` in the REPL is shorthand for `set_hint(Fixed, target)`. - fn set_hint(&self, hint: RoleHint) -> Result<()>; + /// `role ` in the REPL is shorthand for `hint_set(Fixed, target)`. + fn hint_set(&self, hint: RoleHint) -> Result<()>; /// Remove all hints (both startup and runtime). - fn clear_hints(&self) -> Result<()>; + fn hint_set_auto(&self) -> Result<()>; } diff --git a/crates/daemon/src/mode/mod.rs b/crates/daemon/src/mode/mod.rs index 7a8e100..d258420 100644 --- a/crates/daemon/src/mode/mod.rs +++ b/crates/daemon/src/mode/mod.rs @@ -115,7 +115,7 @@ pub(crate) fn spawn_heartbeat( peer_name: String, peers: Arc, ) -> tokio::task::JoinHandle<()> { - // Register control channel so disconnect_peer can send messages to this peer. + // Register control channel so peer_disconnect can send messages to this peer. peers.register_control(&peer_name, &control_tx); tokio::spawn(async move { diff --git a/crates/mcp/src/tools.rs b/crates/mcp/src/tools.rs index ecaaab9..c9e85b1 100644 --- a/crates/mcp/src/tools.rs +++ b/crates/mcp/src/tools.rs @@ -42,7 +42,7 @@ pub struct AddrParams { } #[derive(Debug, serde::Deserialize, schemars::JsonSchema)] -pub struct SetHintParams { +pub struct HintSetParams { /// Hint level: "prefer", "exclude", or "fixed" pub level: String, /// Target role: "entry", "exit", or "relay" @@ -117,7 +117,7 @@ impl WallhackServer { .await } - #[tool(description = "Remove a route by CIDR")] + #[tool(description = "Delete a route by CIDR")] async fn route_del( &self, Parameters(params): Parameters, @@ -183,7 +183,7 @@ impl WallhackServer { )] async fn hint_set( &self, - Parameters(params): Parameters, + Parameters(params): Parameters, ) -> Result { let level = match params.level.as_str() { "prefer" => HintLevel::Prefer, diff --git a/website/src/data/openapi.json b/website/src/data/openapi.json index cd0f723..57a7c5b 100644 --- a/website/src/data/openapi.json +++ b/website/src/data/openapi.json @@ -329,14 +329,14 @@ "get": { "summary": "Node info", "description": "Retrieves node identity, role, capabilities, and uptime.", - "operationId": "getInfo", + "operationId": "infoGet", "security": [{ "basicAuth": [] }], "responses": { "200": { "description": "Info retrieved.", "content": { "application/json": { - "schema": { "$ref": "#/components/schemas/StatusResponse" } + "schema": { "$ref": "#/components/schemas/InfoResponse" } } } }, @@ -367,7 +367,7 @@ "get": { "summary": "List connections", "description": "Retrieves a list of all currently connected peers.", - "operationId": "listPeers", + "operationId": "peersList", "security": [{ "basicAuth": [] }], "responses": { "200": { @@ -386,7 +386,7 @@ "delete": { "summary": "Disconnect peer", "description": "Terminates the connection with a specific peer. Accepts the peer's unique id (from GET /peers) or an unambiguous name prefix.", - "operationId": "disconnectPeer", + "operationId": "peerDisconnect", "security": [{ "basicAuth": [] }], "parameters": [ { @@ -416,7 +416,7 @@ "get": { "summary": "List routing table", "description": "Retrieves all active routing entries established through this node.", - "operationId": "listRoutes", + "operationId": "routesList", "security": [{ "basicAuth": [] }], "responses": { "200": { @@ -607,7 +607,7 @@ "get": { "summary": "Ping peer", "description": "Pings a specific connected peer by name or prefix.", - "operationId": "pingPeer", + "operationId": "peerPing", "security": [{ "basicAuth": [] }], "parameters": [ { @@ -662,7 +662,7 @@ "required": true, "content": { "application/json": { - "schema": { "$ref": "#/components/schemas/SetHintRequest" } + "schema": { "$ref": "#/components/schemas/HintSetRequest" } } } }, From 97b41a118077d86879e846a3bacab22c308fde23 Mon Sep 17 00:00:00 2001 From: Max Holman Date: Wed, 18 Mar 2026 21:10:24 +0700 Subject: [PATCH 2/5] refactor(transport): remove unused role_transition channel The role_transition_tx field was always None at every construction site. The RoleTransition message is still decoded and logged but no longer forwarded through a channel that nothing reads. Co-Authored-By: Claude Opus 4.6 (1M context) --- crates/core/src/client/quic/mod.rs | 1 - crates/core/src/client/ws/mod.rs | 1 - crates/core/src/server/quic/mod.rs | 1 - crates/core/src/server/ws/mod.rs | 1 - crates/core/src/transport/protocol.rs | 12 ------------ 5 files changed, 16 deletions(-) diff --git a/crates/core/src/client/quic/mod.rs b/crates/core/src/client/quic/mod.rs index 65daa74..bb2f118 100644 --- a/crates/core/src/client/quic/mod.rs +++ b/crates/core/src/client/quic/mod.rs @@ -181,7 +181,6 @@ impl Client for QuicClient { handshake_tx: Some(handshake_tx), latency_tx: Some(latency_tx), control_response_tx: None, - role_transition_tx: None, peer_registry: None, }; match protocol::run_control_stream_initiator( diff --git a/crates/core/src/client/ws/mod.rs b/crates/core/src/client/ws/mod.rs index 630adb9..1527d99 100644 --- a/crates/core/src/client/ws/mod.rs +++ b/crates/core/src/client/ws/mod.rs @@ -398,7 +398,6 @@ impl WsClient { handshake_tx: Some(handshake_tx), // receive server's Handshake latency_tx: Some(latency_tx), control_response_tx: None, - role_transition_tx: None, peer_registry: None, }; match protocol::run_control_stream_initiator( diff --git a/crates/core/src/server/quic/mod.rs b/crates/core/src/server/quic/mod.rs index 0dde26b..6ed396b 100644 --- a/crates/core/src/server/quic/mod.rs +++ b/crates/core/src/server/quic/mod.rs @@ -228,7 +228,6 @@ impl Server for QuicServer { handshake_tx: None, // Handshake already read above latency_tx: Some(latency_tx), control_response_tx: None, // server doesn't issue ControlRequests - role_transition_tx: None, peer_registry: Some(peer_registry), }; let mut control_stream = diff --git a/crates/core/src/server/ws/mod.rs b/crates/core/src/server/ws/mod.rs index 290a830..e6a9ad4 100644 --- a/crates/core/src/server/ws/mod.rs +++ b/crates/core/src/server/ws/mod.rs @@ -309,7 +309,6 @@ impl Server for WebSocketServer { handshake_tx: None, // Handshake already read above latency_tx: Some(latency_tx), control_response_tx: None, // server doesn't issue ControlRequests - role_transition_tx: None, peer_registry: Some(peer_registry), }; let mut control_stream = diff --git a/crates/core/src/transport/protocol.rs b/crates/core/src/transport/protocol.rs index f7303dd..cdd9470 100644 --- a/crates/core/src/transport/protocol.rs +++ b/crates/core/src/transport/protocol.rs @@ -141,8 +141,6 @@ pub struct ControlChannels { pub latency_tx: Option>, /// `ControlResponse` forwarding (client side, for correlating requests). pub control_response_tx: Option>, - /// `RoleTransition` forwarding to the mode task for re-evaluation. - pub role_transition_tx: Option>, /// Peer registry for handling relay `PeerAnnouncement` messages. /// Announced peers are registered/unregistered directly in the registry. pub peer_registry: Option>, @@ -292,9 +290,6 @@ impl ControlChannels { } Some(control_message::Message::RoleTransition(rt)) => { tracing::info!("Control: received RoleTransition: {:?}", rt.new_role()); - if let Some(ref tx) = self.role_transition_tx { - let _ = tx.send(rt).await; - } } Some(control_message::Message::PeerAnnouncement(announcement)) => { use wallhack_wire::control::peer_announcement; @@ -708,7 +703,6 @@ mod tests { handshake_tx: Some(a_hs_tx), latency_tx: None, control_response_tx: None, - role_transition_tx: None, peer_registry: None, }; let mut stream_a = BoxBiStream::new(stream_a); @@ -723,7 +717,6 @@ mod tests { handshake_tx: Some(b_hs_tx), latency_tx: None, control_response_tx: None, - role_transition_tx: None, peer_registry: None, }; let mut stream_b = BoxBiStream::new(stream_b); @@ -778,7 +771,6 @@ mod tests { handshake_tx: Some(hs_tx), latency_tx: None, control_response_tx: None, - role_transition_tx: None, peer_registry: None, }; @@ -806,7 +798,6 @@ mod tests { handshake_tx: None, latency_tx: Some(latency_tx), control_response_tx: None, - role_transition_tx: None, peer_registry: None, }; @@ -888,7 +879,6 @@ mod tests { handshake_tx: None, latency_tx: None, control_response_tx: None, - role_transition_tx: None, peer_registry: None, }; @@ -962,7 +952,6 @@ mod tests { handshake_tx: None, latency_tx: None, control_response_tx: None, - role_transition_tx: None, peer_registry: None, }; @@ -1094,7 +1083,6 @@ mod tests { handshake_tx: None, latency_tx: None, control_response_tx: None, - role_transition_tx: None, peer_registry: None, }; From d83adaab5d12cd2544ce0d2ea92a5c8f7e021aff Mon Sep 17 00:00:00 2001 From: Max Holman Date: Wed, 18 Mar 2026 21:35:22 +0700 Subject: [PATCH 3/5] =?UTF-8?q?refactor(transport):=20eliminate=20latency?= =?UTF-8?q?=20channel=20=E2=80=94=20update=20registry=20directly?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The Pong handler in the control loop now calls registry.update_latency() directly instead of forwarding RTT measurements through a dedicated mpsc channel. This removes 4 channel pair creations (one per transport impl), the latency_rx field from ConnectResult/AcceptResult, and the latency_rx parameter from spawn_heartbeat. The peer name for registry lookups is extracted from the Handshake message — on the server side at construction, on the client side when the first Handshake arrives through the control stream. QuicClient and WsClient gain a peer_registry field that daemon modes set before calling connect(). Co-Authored-By: Claude Opus 4.6 (1M context) --- crates/core/src/client/client.rs | 7 --- crates/core/src/client/quic/mod.rs | 12 +++-- crates/core/src/client/ws/mod.rs | 12 +++-- crates/core/src/server/quic/mod.rs | 9 ++-- crates/core/src/server/server.rs | 13 ----- crates/core/src/server/ws/mod.rs | 9 ++-- crates/core/src/transport/protocol.rs | 58 ++++++++++++-------- crates/daemon/src/mode/auto.rs | 43 ++++++--------- crates/daemon/src/mode/entry.rs | 78 +++++++++++---------------- crates/daemon/src/mode/exit.rs | 22 +++----- crates/daemon/src/mode/mod.rs | 22 +++----- crates/daemon/src/mode/relay.rs | 17 +++--- 12 files changed, 123 insertions(+), 179 deletions(-) diff --git a/crates/core/src/client/client.rs b/crates/core/src/client/client.rs index 1f5be62..93745cb 100644 --- a/crates/core/src/client/client.rs +++ b/crates/core/src/client/client.rs @@ -44,8 +44,6 @@ pub struct ConnectResult { control_tx: mpsc::Sender, /// Receiver for the server's `Handshake` (delivered via the control loop). peer_handshake_rx: Option>, - /// Pong-derived latency measurements from the control loop (milliseconds). - latency_rx: Option>, } impl ConnectResult { @@ -57,7 +55,6 @@ impl ConnectResult { tasks: ConnectionTasks, control_tx: mpsc::Sender, peer_handshake_rx: Option>, - latency_rx: Option>, ) -> Self { Self { channels, @@ -66,7 +63,6 @@ impl ConnectResult { transport, control_tx, peer_handshake_rx, - latency_rx, } } @@ -110,8 +106,6 @@ pub struct ErasedConnectResult { pub control_tx: mpsc::Sender, pub peer_handshake_rx: Option>, pub peer_addr: String, - /// Pong-derived latency measurements from the control loop (milliseconds). - pub latency_rx: Option>, } impl ConnectResult @@ -135,7 +129,6 @@ where channels: self.channels, tasks: self.tasks, control_tx: self.control_tx, - latency_rx: self.latency_rx, } } } diff --git a/crates/core/src/client/quic/mod.rs b/crates/core/src/client/quic/mod.rs index bb2f118..6c5c18d 100644 --- a/crates/core/src/client/quic/mod.rs +++ b/crates/core/src/client/quic/mod.rs @@ -64,6 +64,9 @@ pub struct QuicClient { name: Option, psk: Option>, local_handshake: Option, + /// Peer registry for direct latency updates in the control loop. + /// Set by the daemon mode before calling `connect()`. + pub peer_registry: Option>, } impl Client for QuicClient { @@ -101,6 +104,7 @@ impl Client for QuicClient { name: config.name, psk: config.psk, local_handshake: config.local_handshake, + peer_registry: None, }) } @@ -170,18 +174,17 @@ impl Client for QuicClient { // Create oneshot for receiving server's Handshake via the control loop. let (handshake_tx, handshake_rx) = tokio::sync::oneshot::channel::(); - let (latency_tx, latency_rx) = tokio::sync::mpsc::channel::(4); - // Spawn control stream task let control_handle = { let transport = Arc::clone(&transport); + let peer_registry = self.peer_registry.clone(); tokio::spawn(async move { let mut channels = protocol::ControlChannels { outgoing_rx: control_rx, handshake_tx: Some(handshake_tx), - latency_tx: Some(latency_tx), control_response_tx: None, - peer_registry: None, + peer_registry, + peer_name: None, }; match protocol::run_control_stream_initiator( &*transport, @@ -235,7 +238,6 @@ impl Client for QuicClient { tasks, control_tx, Some(handshake_rx), - Some(latency_rx), )) } diff --git a/crates/core/src/client/ws/mod.rs b/crates/core/src/client/ws/mod.rs index 1527d99..34141cf 100644 --- a/crates/core/src/client/ws/mod.rs +++ b/crates/core/src/client/ws/mod.rs @@ -196,6 +196,9 @@ impl Default for WsClientConfig { pub struct WsClient { config: WsClientConfig, tls_connector: Option, + /// Peer registry for direct latency updates in the control loop. + /// Set by the daemon mode before calling `connect()`. + pub peer_registry: Option>, } impl WsClient { @@ -223,6 +226,7 @@ impl WsClient { Ok(Self { config, tls_connector, + peer_registry: None, }) } @@ -387,18 +391,17 @@ impl WsClient { // Create oneshot for receiving server's Handshake via the control loop. let (handshake_tx, handshake_rx) = tokio::sync::oneshot::channel::(); - let (latency_tx, latency_rx) = tokio::sync::mpsc::channel::(4); - // Spawn control stream task let control_handle = { let transport = Arc::clone(&transport); + let peer_registry = self.peer_registry.clone(); tokio::spawn(async move { let mut channels = protocol::ControlChannels { outgoing_rx: control_rx, handshake_tx: Some(handshake_tx), // receive server's Handshake - latency_tx: Some(latency_tx), control_response_tx: None, - peer_registry: None, + peer_registry, + peer_name: None, }; match protocol::run_control_stream_initiator( &*transport, @@ -452,7 +455,6 @@ impl WsClient { tasks, control_tx, Some(handshake_rx), - Some(latency_rx), )) } } diff --git a/crates/core/src/server/quic/mod.rs b/crates/core/src/server/quic/mod.rs index 6ed396b..2ec9dbd 100644 --- a/crates/core/src/server/quic/mod.rs +++ b/crates/core/src/server/quic/mod.rs @@ -210,9 +210,7 @@ impl Server for QuicServer { .clone() .unwrap_or_else(RouteTable::shared); - // Create latency channel so pong-derived RTT measurements are available - // to the caller (e.g. for registry updates and one-shot ping responses). - let (latency_tx, latency_rx) = tokio::sync::mpsc::channel::(4); + let peer_name = peer_handshake.as_ref().map(|hs| hs.name.clone()); let route_updates = self.options.route_updates.clone().unwrap_or_else(|| { let (tx, _) = tokio::sync::broadcast::channel(16); tx @@ -225,10 +223,10 @@ impl Server for QuicServer { let handler = Handler::new(handler_config, metrics, peers, routes, route_updates); let mut channels = protocol::ControlChannels { outgoing_rx: control_rx, - handshake_tx: None, // Handshake already read above - latency_tx: Some(latency_tx), + handshake_tx: None, // Handshake already read above control_response_tx: None, // server doesn't issue ControlRequests peer_registry: Some(peer_registry), + peer_name, }; let mut control_stream = wallhack_transport::erased::BoxBiStream::new(control_stream); @@ -247,7 +245,6 @@ impl Server for QuicServer { metrics, peer_handshake, control_tx, - latency_rx, channel_binding, ))) } diff --git a/crates/core/src/server/server.rs b/crates/core/src/server/server.rs index 437842d..37d5570 100644 --- a/crates/core/src/server/server.rs +++ b/crates/core/src/server/server.rs @@ -65,9 +65,6 @@ pub struct AcceptResult { transport: Arc, /// Channel for injecting messages into the control stream. control_tx: mpsc::Sender, - /// Receiver for pong-derived latency measurements (milliseconds) from the - /// control loop. Used by one-shot ping callers. - latency_rx: Option>, /// TLS channel binding bytes for PSK proof verification. channel_binding: Option<[u8; crate::psk::CHANNEL_BINDING_LEN]>, } @@ -80,7 +77,6 @@ pub struct ErasedAcceptResult { pub peer_handshake: Option, pub transport: Arc, pub control_tx: mpsc::Sender, - pub latency_rx: Option>, pub channel_binding: Option<[u8; crate::psk::CHANNEL_BINDING_LEN]>, } @@ -100,7 +96,6 @@ where peer_handshake: self.peer_handshake.take(), transport: self.transport as Arc, control_tx: self.control_tx, - latency_rx: self.latency_rx.take(), channel_binding: self.channel_binding, } } @@ -110,7 +105,6 @@ impl AcceptResult { /// Creates a new accept result with an already-received peer `Handshake` /// and a latency receiver for pong-derived RTT measurements. #[must_use] - #[allow(clippy::too_many_arguments)] // accept result construction; will be simplified when builder pattern is adopted pub fn with_handshake( transport: Arc, channels: DataChannels, @@ -118,7 +112,6 @@ impl AcceptResult { metrics: SharedMetrics, peer_handshake: Option, control_tx: mpsc::Sender, - latency_rx: mpsc::Receiver, channel_binding: Option<[u8; crate::psk::CHANNEL_BINDING_LEN]>, ) -> Self { Self { @@ -128,7 +121,6 @@ impl AcceptResult { peer_handshake, transport, control_tx, - latency_rx: Some(latency_rx), channel_binding, } } @@ -178,11 +170,6 @@ impl AcceptResult { &self.control_tx } - /// Takes the latency receiver for pong-derived RTT measurements. - pub fn take_latency_rx(&mut self) -> Option> { - self.latency_rx.take() - } - /// Returns the TLS channel binding bytes for this connection. #[must_use] pub fn channel_binding(&self) -> Option<&[u8; crate::psk::CHANNEL_BINDING_LEN]> { diff --git a/crates/core/src/server/ws/mod.rs b/crates/core/src/server/ws/mod.rs index e6a9ad4..0a3da04 100644 --- a/crates/core/src/server/ws/mod.rs +++ b/crates/core/src/server/ws/mod.rs @@ -291,9 +291,7 @@ impl Server for WebSocketServer { .clone() .unwrap_or_else(RouteTable::shared); - // Create latency channel so pong-derived RTT measurements are available - // to the caller (e.g. for registry updates and one-shot ping responses). - let (latency_tx, latency_rx) = tokio::sync::mpsc::channel::(4); + let peer_name = peer_handshake.as_ref().map(|hs| hs.name.clone()); let route_updates = self.options.route_updates.clone().unwrap_or_else(|| { let (tx, _) = tokio::sync::broadcast::channel(16); tx @@ -306,10 +304,10 @@ impl Server for WebSocketServer { let handler = Handler::new(handler_config, metrics, peers, routes, route_updates); let mut channels = protocol::ControlChannels { outgoing_rx: control_rx, - handshake_tx: None, // Handshake already read above - latency_tx: Some(latency_tx), + handshake_tx: None, // Handshake already read above control_response_tx: None, // server doesn't issue ControlRequests peer_registry: Some(peer_registry), + peer_name, }; let mut control_stream = wallhack_transport::erased::BoxBiStream::new(control_stream); @@ -328,7 +326,6 @@ impl Server for WebSocketServer { metrics, peer_handshake, control_tx, - latency_rx, channel_binding, ))) } diff --git a/crates/core/src/transport/protocol.rs b/crates/core/src/transport/protocol.rs index cdd9470..8287f66 100644 --- a/crates/core/src/transport/protocol.rs +++ b/crates/core/src/transport/protocol.rs @@ -137,13 +137,15 @@ pub struct ControlChannels { pub outgoing_rx: mpsc::Receiver, /// One-shot for the first `Handshake` received from the peer. pub handshake_tx: Option>, - /// Pong-derived latency measurements (milliseconds). - pub latency_tx: Option>, /// `ControlResponse` forwarding (client side, for correlating requests). pub control_response_tx: Option>, - /// Peer registry for handling relay `PeerAnnouncement` messages. - /// Announced peers are registered/unregistered directly in the registry. + /// Peer registry for latency updates (Pong) and relay `PeerAnnouncement` + /// handling. Set on both client and server sides when available. pub peer_registry: Option>, + /// Peer name for latency updates. On the server side this is set at + /// construction (from the already-read handshake). On the client side + /// it is populated from the first received `Handshake` message. + pub peer_name: Option, } impl ControlChannels { @@ -235,6 +237,10 @@ impl ControlChannels { match msg.message { Some(control_message::Message::Handshake(hs)) => { tracing::info!("Handshake from {} ({})", hs.name, hs.version); + // Store peer name for Pong-driven latency updates. + if self.peer_name.is_none() { + self.peer_name = Some(hs.name.clone()); + } if let Some(tx) = self.handshake_tx.take() { let _ = tx.send(hs); } @@ -261,8 +267,10 @@ impl ControlChannels { // ms-resolution latency; f64 mantissa exceeds plausible RTT range let latency_ms = now_ms.saturating_sub(pong.timestamp_ms) as f64; tracing::trace!(latency_ms, "Control: received Pong"); - if let Some(ref tx) = self.latency_tx { - let _ = tx.send(latency_ms).await; + if let Some(ref registry) = self.peer_registry + && let Some(ref name) = self.peer_name + { + registry.update_latency(name, latency_ms); } } Some(control_message::Message::ControlRequest(req)) => { @@ -701,9 +709,9 @@ mod tests { let mut channels = ControlChannels { outgoing_rx: a_ctrl_rx, handshake_tx: Some(a_hs_tx), - latency_tx: None, control_response_tx: None, peer_registry: None, + peer_name: None, }; let mut stream_a = BoxBiStream::new(stream_a); channels @@ -715,9 +723,9 @@ mod tests { let mut channels = ControlChannels { outgoing_rx: b_ctrl_rx, handshake_tx: Some(b_hs_tx), - latency_tx: None, control_response_tx: None, peer_registry: None, + peer_name: None, }; let mut stream_b = BoxBiStream::new(stream_b); channels @@ -769,9 +777,9 @@ mod tests { let mut channels = ControlChannels { outgoing_rx: ctrl_rx, handshake_tx: Some(hs_tx), - latency_tx: None, control_response_tx: None, peer_registry: None, + peer_name: None, }; let mut stream_b = BoxBiStream::new(stream_b); @@ -785,20 +793,27 @@ mod tests { assert!(hs_rx.try_recv().is_err()); } - /// Pong latency is computed and forwarded via `latency_tx`. + /// Pong latency is computed and written to the peer registry. #[tokio::test] async fn test_ping_latency() { let (mut stream_a, stream_b) = bidi_pair(); - let (latency_tx, mut latency_rx) = tokio::sync::mpsc::channel::(4); + let registry = std::sync::Arc::new(crate::control::peers::Registry::new()); + registry.register( + "test-peer".to_string(), + "127.0.0.1:9999".to_string(), + crate::NodeRole::Exit, + wallhack_wire::data::Capabilities::default(), + crate::control::peers::ConnectionSide::Connect, + ); let (_ctrl_tx, ctrl_rx) = tokio::sync::mpsc::channel::(16); let mut channels = ControlChannels { outgoing_rx: ctrl_rx, handshake_tx: None, - latency_tx: Some(latency_tx), control_response_tx: None, - peer_registry: None, + peer_registry: Some(std::sync::Arc::clone(®istry)), + peer_name: Some("test-peer".to_string()), }; // Spawn the control loop on side B (will read from stream_b). @@ -856,11 +871,12 @@ mod tests { .await .unwrap(); - // The control loop should forward the latency via latency_tx. - let ms = tokio::time::timeout(std::time::Duration::from_secs(2), latency_rx.recv()) - .await - .expect("timed out") - .expect("channel closed"); + // Give the control loop a moment to process the Pong. + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + // The control loop should update latency directly in the registry. + let peer = registry.get("test-peer").expect("peer should exist"); + let ms = peer.latency_ms.expect("latency should be set"); // Latency should be approximately 100ms (within ±50ms tolerance for CI). assert!((50.0..=200.0).contains(&ms), "expected ~100ms, got {ms}ms"); @@ -877,9 +893,9 @@ mod tests { let mut channels = ControlChannels { outgoing_rx: ctrl_rx, handshake_tx: None, - latency_tx: None, control_response_tx: None, peer_registry: None, + peer_name: None, }; // Control loop with 1-second ping interval. @@ -950,9 +966,9 @@ mod tests { let mut channels = ControlChannels { outgoing_rx: ctrl_rx, handshake_tx: None, - latency_tx: None, control_response_tx: None, peer_registry: None, + peer_name: None, }; let server_handle = tokio::spawn(async move { @@ -1081,9 +1097,9 @@ mod tests { let mut channels = ControlChannels { outgoing_rx: ctrl_rx, handshake_tx: None, - latency_tx: None, control_response_tx: None, peer_registry: None, + peer_name: None, }; let server_handle = tokio::spawn(async move { diff --git a/crates/daemon/src/mode/auto.rs b/crates/daemon/src/mode/auto.rs index 8555f27..0ad2424 100644 --- a/crates/daemon/src/mode/auto.rs +++ b/crates/daemon/src/mode/auto.rs @@ -254,13 +254,17 @@ async fn run_auto_connector( Some(local_hs.clone()), ); let route_updates = route_updates.resubscribe(); + // Clone peers for the factory closure; the session closure moves the original. + let peers_for_factory = Arc::clone(&peers); crate::transport::connect_loop( || { let client_config = client_config.clone(); + let peers = Arc::clone(&peers_for_factory); async move { use wallhack_core::client::client::Client; let mut client = wallhack_core::client::quic::QuicClient::try_new(client_config)?; + client.peer_registry = Some(peers); client.connect(NodeRole::Indeterminate).await } }, @@ -306,12 +310,16 @@ async fn run_auto_connector( Some(local_hs.clone()), ); let route_updates = route_updates.resubscribe(); + // Clone peers for the factory closure; the session closure moves the original. + let peers_for_factory = Arc::clone(&peers); crate::transport::connect_loop( || { let client_config = client_config.clone(); + let peers = Arc::clone(&peers_for_factory); async move { let mut client = wallhack_core::client::ws::WsClient::new(client_config)?; + client.peer_registry = Some(peers); client.connect(NodeRole::Indeterminate).await } }, @@ -371,7 +379,6 @@ async fn run_auto_connect_session_dispatch( tasks, control_tx, peer_addr: _, - latency_rx, } = connect_result; let DataChannels { @@ -444,7 +451,6 @@ async fn run_auto_connect_session_dispatch( peer_addr, Some(peer_name), Some(Arc::clone(&peers)), - latency_rx, routes.clone(), route_updates, ) @@ -500,12 +506,8 @@ async fn run_auto_connect_session_dispatch( }); } drop(tasks); - let heartbeat = super::spawn_heartbeat( - control_tx, - latency_rx, - peer_name.clone(), - Arc::clone(&peers), - ); + let heartbeat = + super::spawn_heartbeat(control_tx, peer_name.clone(), Arc::clone(&peers)); run_auto_exit_session_inner( transport, instructions_rx, @@ -548,8 +550,7 @@ async fn run_auto_connect_session_dispatch( peer_caps, wallhack_core::control::peers::ConnectionSide::Connect, ); - let _heartbeat = - super::spawn_heartbeat(control_tx, latency_rx, name.clone(), Arc::clone(&peers)); + let _heartbeat = super::spawn_heartbeat(control_tx, name.clone(), Arc::clone(&peers)); hold_until_disconnect(tasks).await; peers.unregister(&name); tracing::info!("Peer disconnected: {name}"); @@ -782,7 +783,6 @@ where // so the spawned future is non-generic. let peer_hs = accept_result.take_peer_handshake(); let transport: Arc = accept_result.transport(); - let latency_rx = accept_result.take_latency_rx(); let ( DataChannels { instructions_tx, @@ -816,7 +816,6 @@ where Some(route_updates), peer_addr, node_state, - latency_rx, ) .await { @@ -860,7 +859,6 @@ async fn run_auto_accept_session_inner( >, peer_addr: String, node_state: SharedNodeState, - latency_rx: Option>, ) -> Result<(), NodeError> { let Some(peer_hs) = peer_hs else { tracing::warn!("No peer handshake from {peer_addr}; cannot negotiate"); @@ -1011,12 +1009,8 @@ async fn run_auto_accept_session_inner( ConnectionSide::Accept, ); - let _heartbeat = super::spawn_heartbeat( - control_tx, - latency_rx, - peer_name.clone(), - Arc::clone(&peers), - ); + let _heartbeat = + super::spawn_heartbeat(control_tx, peer_name.clone(), Arc::clone(&peers)); let handle = tokio::spawn(async move { manager.run().await }); match handle.await { @@ -1105,12 +1099,8 @@ async fn run_auto_accept_session_inner( ConnectionSide::Accept, ); - let _heartbeat = super::spawn_heartbeat( - control_tx, - latency_rx, - peer_name.clone(), - Arc::clone(&peers), - ); + let _heartbeat = + super::spawn_heartbeat(control_tx, peer_name.clone(), Arc::clone(&peers)); let adapter = SyscallExitAdapter::new(); let _reaper = adapter.start_reaper( @@ -1162,8 +1152,7 @@ async fn run_auto_accept_session_inner( peer_caps, wallhack_core::control::peers::ConnectionSide::Accept, ); - let _heartbeat = - super::spawn_heartbeat(control_tx, latency_rx, name.clone(), Arc::clone(&peers)); + let _heartbeat = super::spawn_heartbeat(control_tx, name.clone(), Arc::clone(&peers)); // Hold transport alive; wait for the peer to disconnect // by draining the instructions channel (closes when transport dies). let _keep_transport = transport; diff --git a/crates/daemon/src/mode/entry.rs b/crates/daemon/src/mode/entry.rs index 3c88114..cb53bce 100644 --- a/crates/daemon/src/mode/entry.rs +++ b/crates/daemon/src/mode/entry.rs @@ -368,13 +368,17 @@ pub(crate) async fn run_entry_connect( Some(entry_handshake), ); let route_updates = res.route_updates.resubscribe(); + // Clone peers for the factory closure; the session closure moves the original. + let peers_for_factory = Arc::clone(&peers); crate::transport::connect_loop( || { let client_config = client_config.clone(); + let peers = Arc::clone(&peers_for_factory); async move { use wallhack_core::client::client::Client; let mut client = wallhack_core::client::quic::QuicClient::try_new(client_config)?; + client.peer_registry = Some(peers); client.connect(NodeRole::Entry).await } }, @@ -415,12 +419,16 @@ pub(crate) async fn run_entry_connect( Some(entry_handshake), ); let route_updates = res.route_updates.resubscribe(); + // Clone peers for the factory closure; the session closure moves the original. + let peers_for_factory = Arc::clone(&peers); crate::transport::connect_loop( || { let client_config = client_config.clone(); + let peers = Arc::clone(&peers_for_factory); async move { let mut client = wallhack_core::client::ws::WsClient::new(client_config)?; + client.peer_registry = Some(peers); client.connect(NodeRole::Entry).await } }, @@ -493,7 +501,6 @@ pub(crate) async fn run_entry_connected_erased( tasks: _tasks, control_tx, peer_addr: _, - latency_rx, } = connect_result; // Wait for the server's handshake to get the peer name @@ -531,7 +538,6 @@ pub(crate) async fn run_entry_connected_erased( peer_addr, peer_name.as_deref(), peers, - latency_rx, routes, route_updates, ) @@ -551,7 +557,6 @@ pub(crate) async fn run_entry_connected_inner( peer_addr: &str, peer_name: Option<&str>, peers: Option>, - latency_rx: Option>, routes: Option, route_updates: Option< tokio::sync::broadcast::Receiver, @@ -632,7 +637,6 @@ pub(crate) async fn run_entry_connected_inner( let _heartbeat = if let Some(pn) = peer_name { Some(super::spawn_heartbeat( control_tx, - latency_rx, pn.to_string(), peers.unwrap_or_else(|| Arc::new(Registry::new())), )) @@ -745,9 +749,6 @@ where // Extract transport and channels from the generic AcceptResult before // spawning so the spawned future is non-generic. let transport: Arc = accept_result.transport(); - let latency_rx = accept_result - .take_latency_rx() - .unwrap_or_else(|| tokio::sync::mpsc::channel(1).1); let (channels, control_tx) = accept_result.into_channels(); // Spawn non-generic handler @@ -766,7 +767,7 @@ where peer: identity.name, peer_addr: peer_addr.clone(), }; - let result = params.run(latency_rx).await; + let result = params.run().await; // Unregister peer — connection ID check prevents evicting a // newer connection that re-registered under the same name. @@ -939,33 +940,19 @@ pub(crate) fn spawn_data_tasks( } } -/// Run the connection manager alongside ping/latency handling. +/// Run the connection manager alongside route update handling. /// /// On exit (normal or error), the manager task is aborted and joined so /// the `ConnectionManager` (and its TUN fd Arcs) are dropped before the /// caller runs `delete_tun`. -// REASON: threading manager_handle, control, latency, route_updates, peer info, peers, tun_name +// REASON: threading manager_handle, control, route_updates, peer info, peers, tun_name #[allow(clippy::too_many_arguments)] async fn run_connection_loop( mut manager_handle: tokio::task::JoinHandle>, - control_tx: tokio::sync::mpsc::Sender, - mut latency_rx: tokio::sync::mpsc::Receiver, mut route_updates: tokio::sync::broadcast::Receiver, peer: Option<&str>, - peers: &Arc, tun_name: &str, ) -> Result<(), NodeError> { - let mut heartbeat = tokio::time::interval(std::time::Duration::from_secs(30)); - heartbeat.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); - - // Fire an initial ping so latency is populated immediately after connect. - if let Err(e) = super::send_ping(&control_tx).await { - tracing::debug!("Initial ping failed: {e}"); - } - // Consume the first tick (fires immediately) so the heartbeat starts - // 30s after the initial ping, not immediately. - heartbeat.tick().await; - let mut manager_result = Ok(()); loop { tokio::select! { @@ -977,16 +964,6 @@ async fn run_connection_loop( }; break; } - Some(ms) = latency_rx.recv() => { - if let Some(id) = peer { - peers.update_latency(id, ms); - } - } - _ = heartbeat.tick() => { - if let Err(e) = super::send_ping(&control_tx).await { - tracing::debug!("Heartbeat ping failed: {e}"); - } - } update = route_updates.recv() => { match update { Ok(RouteUpdate::Add(entry)) => { @@ -1023,10 +1000,7 @@ async fn run_connection_loop( impl ConnectionParams { /// Main entry point for the non-generic connection handler. - pub async fn run( - self, - latency_rx: tokio::sync::mpsc::Receiver, - ) -> Result { + pub async fn run(self) -> Result { use wallhack_core::server::server::DataChannels; let ConnectionParams { @@ -1084,20 +1058,28 @@ impl ConnectionParams { responses_rx, ); + // Spawn heartbeat: sends periodic pings; latency updates happen directly + // in the control loop Pong handler via the peer registry. + // When there is no peer name, keep control_tx alive for the duration + // of this connection without spawning a heartbeat. + let mut _control_tx_keep = None; + let _heartbeat = if let Some(ref peer_name) = peer { + Some(super::spawn_heartbeat( + control_tx, + peer_name.clone(), + Arc::clone(&peers), + )) + } else { + _control_tx_keep = Some(control_tx); + None + }; + // Panic safety: guard calls delete_tun if we unwind. let mut tun_guard = TunDropGuard::new(name.clone()); let manager_handle = tokio::spawn(async move { manager.run().await }); - let result = run_connection_loop( - manager_handle, - control_tx, - latency_rx, - route_updates, - peer.as_deref(), - &peers, - &name, - ) - .await; + let result = + run_connection_loop(manager_handle, route_updates, peer.as_deref(), &name).await; // run_connection_loop aborts and joins the manager task, so all // TUN fd Arcs are dropped. Delete the TUN NOW — before returning — diff --git a/crates/daemon/src/mode/exit.rs b/crates/daemon/src/mode/exit.rs index e9f83ca..c3a05f3 100644 --- a/crates/daemon/src/mode/exit.rs +++ b/crates/daemon/src/mode/exit.rs @@ -129,10 +129,12 @@ async fn run_exit_connector( crate::transport::connect_loop( || { let client_config = client_config.clone(); + let ctx = Arc::clone(&ctx); async move { use wallhack_core::client::client::Client; let mut client = wallhack_core::client::quic::QuicClient::try_new(client_config)?; + client.peer_registry = Some(Arc::clone(&ctx.peers)); client.connect(NodeRole::Exit).await } }, @@ -149,7 +151,6 @@ async fn run_exit_connector( erased.control_tx, erased.tasks, erased.peer_handshake_rx, - erased.latency_rx, &peer_addr, &ctx, ) @@ -180,9 +181,11 @@ async fn run_exit_connector( crate::transport::connect_loop( || { let client_config = client_config.clone(); + let ctx = Arc::clone(&ctx); async move { let mut client = wallhack_core::client::ws::WsClient::new(client_config)?; + client.peer_registry = Some(Arc::clone(&ctx.peers)); client.connect(NodeRole::Exit).await } }, @@ -199,7 +202,6 @@ async fn run_exit_connector( erased.control_tx, erased.tasks, erased.peer_handshake_rx, - erased.latency_rx, &peer_addr, &ctx, ) @@ -314,7 +316,7 @@ where loop { match server.accept(NodeRole::Exit).await { - Ok(Some(mut accept_result)) => { + Ok(Some(accept_result)) => { let peer_addr = accept_result.peer_addr().to_string(); // Register the connecting peer using handshake name and capabilities. @@ -334,9 +336,6 @@ where ); let transport: Arc = accept_result.transport(); - let latency_rx = accept_result - .take_latency_rx() - .unwrap_or_else(|| tokio::sync::mpsc::channel(1).1); let adapter = SyscallExitAdapter::new(); let _reaper = adapter.start_reaper( std::time::Duration::from_mins(1), @@ -393,7 +392,6 @@ where tokio::spawn(async move { let _heartbeat = super::spawn_heartbeat( control_tx, - Some(latency_rx), peer_name.clone(), Arc::clone(&ctx.peers), ); @@ -430,7 +428,7 @@ where } /// Non-generic exit loop: monomorphized once regardless of transport type. -// REASON: threading transport, instructions, responses, control, tasks, handshake, latency, peer_addr, ctx +// REASON: threading transport, instructions, responses, control, tasks, handshake, peer_addr, ctx #[allow(clippy::too_many_arguments)] async fn run_exit_loop_inner( transport: Arc, @@ -440,7 +438,6 @@ async fn run_exit_loop_inner( control_tx: tokio::sync::mpsc::Sender, mut tasks: wallhack_core::client::client::ConnectionTasks, peer_handshake_rx: Option>, - latency_rx: Option>, peer_addr: &str, ctx: &ExitContext, ) -> Result<(), NodeError> { @@ -492,12 +489,7 @@ async fn run_exit_loop_inner( ); let orchestrator = Orchestrator::new(Arc::new(adapter), Arc::clone(&ctx.metrics)); - let _heartbeat = super::spawn_heartbeat( - control_tx, - latency_rx, - peer_name.clone(), - Arc::clone(&ctx.peers), - ); + let _heartbeat = super::spawn_heartbeat(control_tx, peer_name.clone(), Arc::clone(&ctx.peers)); let stream_fut = run_stream_listener(transport); tokio::pin!(stream_fut); diff --git a/crates/daemon/src/mode/mod.rs b/crates/daemon/src/mode/mod.rs index d258420..0ee2cce 100644 --- a/crates/daemon/src/mode/mod.rs +++ b/crates/daemon/src/mode/mod.rs @@ -106,12 +106,11 @@ pub(crate) async fn send_ping( /// Spawn a background heartbeat task for any connection. /// /// Fires an initial ping immediately, then pings every 30 seconds. -/// Consumes latency measurements from the transport control loop and -/// updates the peer registry. Runs until the control channel closes -/// or the returned handle is dropped/aborted. +/// Latency is now updated directly by the control loop Pong handler via +/// the peer registry. Runs until the control channel closes or the +/// returned handle is dropped/aborted. pub(crate) fn spawn_heartbeat( control_tx: tokio::sync::mpsc::Sender, - latency_rx: Option>, peer_name: String, peers: Arc, ) -> tokio::task::JoinHandle<()> { @@ -125,22 +124,15 @@ pub(crate) fn spawn_heartbeat( return; } - let mut latency_rx = latency_rx.unwrap_or_else(|| tokio::sync::mpsc::channel(1).1); let mut heartbeat = tokio::time::interval(std::time::Duration::from_secs(30)); heartbeat.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); heartbeat.tick().await; // consume first immediate tick loop { - tokio::select! { - Some(ms) = latency_rx.recv() => { - peers.update_latency(&peer_name, ms); - } - _ = heartbeat.tick() => { - if let Err(e) = send_ping(&control_tx).await { - tracing::debug!("Heartbeat ping failed: {e}"); - break; - } - } + heartbeat.tick().await; + if let Err(e) = send_ping(&control_tx).await { + tracing::debug!("Heartbeat ping failed: {e}"); + break; } } diff --git a/crates/daemon/src/mode/relay.rs b/crates/daemon/src/mode/relay.rs index 1f48b9e..34a839f 100644 --- a/crates/daemon/src/mode/relay.rs +++ b/crates/daemon/src/mode/relay.rs @@ -152,10 +152,12 @@ pub async fn run( crate::transport::connect_loop( || { let client_config = client_config.clone(); + let peers = Arc::clone(&peers); async move { use wallhack_core::client::client::Client; let mut client = wallhack_core::client::quic::QuicClient::try_new(client_config)?; + client.peer_registry = Some(peers); client.connect(NodeRole::Relay).await } }, @@ -173,7 +175,6 @@ pub async fn run( erased.tasks, erased.control_tx, erased.peer_handshake_rx, - erased.latency_rx, &global, &listen_spec, addr, @@ -208,9 +209,11 @@ pub async fn run( crate::transport::connect_loop( || { let client_config = client_config.clone(); + let peers = Arc::clone(&peers); async move { let mut client = wallhack_core::client::ws::WsClient::new(client_config)?; + client.peer_registry = Some(peers); client.connect(NodeRole::Relay).await } }, @@ -228,7 +231,6 @@ pub async fn run( erased.tasks, erased.control_tx, erased.peer_handshake_rx, - erased.latency_rx, &global, &listen_spec, addr, @@ -264,7 +266,6 @@ async fn run_relay_loop_inner( mut tasks: wallhack_core::client::client::ConnectionTasks, source_control_tx: tokio::sync::mpsc::Sender, peer_handshake_rx: Option>, - latency_rx: Option>, global: &GlobalConfig, listen_spec: &AddressSpec, addr: std::net::SocketAddr, @@ -302,7 +303,6 @@ async fn run_relay_loop_inner( let _source_heartbeat = super::spawn_heartbeat( source_control_tx.clone(), - latency_rx, peer_name.clone(), Arc::clone(&peers), ); @@ -682,7 +682,6 @@ fn handle_relay_connection( let peer_addr = erased.peer_addr; let transport = erased.transport; let peer_handshake = erased.peer_handshake; - let latency_rx = erased.latency_rx; let (channels, control_tx) = (erased.channels, erased.control_tx); let DataChannels { instructions_tx, @@ -705,12 +704,8 @@ fn handle_relay_connection( ConnectionSide::Accept, ); - let _accepted_heartbeat = super::spawn_heartbeat( - control_tx.clone(), - latency_rx, - peer_name.clone(), - Arc::clone(peers), - ); + let _accepted_heartbeat = + super::spawn_heartbeat(control_tx.clone(), peer_name.clone(), Arc::clone(peers)); // Incoming: accept uni stream from exit peer, dispatch data messages. // Exit peers send ExitNodeResponses which are dispatched via responses_tx. From 346d390199831441a8c4c2a5d28ba26f3d7b024a Mon Sep 17 00:00:00 2001 From: Max Holman Date: Wed, 18 Mar 2026 21:57:56 +0700 Subject: [PATCH 4/5] refactor(transport): deduplicate QUIC/WS client task setup MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Extract spawn_client_tasks() in client.rs — shared by both QUIC and WS connect paths. Handles the oneshot handshake channel, control stream task, data-in task, and ConnectResult construction that was previously copy-pasted across both transports. Co-Authored-By: Claude Opus 4.6 (1M context) --- crates/core/src/client/client.rs | 81 ++++++++++++++++++++++++++++++ crates/core/src/client/quic/mod.rs | 79 +++-------------------------- crates/core/src/client/ws/mod.rs | 78 +++------------------------- 3 files changed, 95 insertions(+), 143 deletions(-) diff --git a/crates/core/src/client/client.rs b/crates/core/src/client/client.rs index 93745cb..3dd5f18 100644 --- a/crates/core/src/client/client.rs +++ b/crates/core/src/client/client.rs @@ -133,6 +133,87 @@ where } } +/// Spawn the control and data-in tasks shared by all client transports. +/// +/// Called after the transport is established and the handshake has been +/// queued on `control_tx`. Creates the handshake oneshot, control loop +/// task, incoming data task, and returns a fully wired `ConnectResult`. +pub fn spawn_client_tasks( + transport: Arc, + control_tx: mpsc::Sender, + control_rx: mpsc::Receiver, + peer_registry: Option>, + remote_addr: String, +) -> ConnectResult +where + T::SendStream: 'static, + T::RecvStream: 'static, + T::BiStream: Send + 'static, +{ + use crate::transport::protocol; + + let (handshake_tx, handshake_rx) = oneshot::channel::(); + + let control_handle = { + let transport = Arc::clone(&transport); + tokio::spawn(async move { + let mut channels = protocol::ControlChannels { + outgoing_rx: control_rx, + handshake_tx: Some(handshake_tx), + control_response_tx: None, + peer_registry, + peer_name: None, + }; + match protocol::run_control_stream_initiator( + &*transport, + &mut channels, + None, + std::time::Duration::from_secs(30), + ) + .await + { + Ok(exit) => tracing::debug!("Control stream finished: {exit:?}"), + Err(e) => tracing::debug!("Control stream error: {e}"), + } + }) + }; + + let channels = DataChannels::new(); + + let incoming_handle = { + let transport = Arc::clone(&transport); + let instructions_tx = channels.instructions_tx.clone(); + let responses_tx = channels.responses_tx.clone(); + tokio::spawn(async move { + match transport.accept_uni().await { + Ok(Some(mut recv)) => { + if let Err(e) = + protocol::run_data_in(&mut recv, &instructions_tx, &responses_tx).await + { + tracing::debug!("Data-in handler finished: {e}"); + } + } + Ok(None) => tracing::debug!("Transport closed before data-in stream accepted"), + Err(e) => tracing::debug!("Failed to accept data-in stream: {e}"), + } + }) + }; + + let tasks = ConnectionTasks { + incoming: incoming_handle, + control: control_handle, + }; + + ConnectResult::new( + transport, + channels, + remote_addr, + tasks, + control_tx, + Some(handshake_rx), + ) +} + pub trait Client { type Error: std::error::Error + std::fmt::Debug + Send + Sync + 'static; type Transport: wallhack_transport::Transport; diff --git a/crates/core/src/client/quic/mod.rs b/crates/core/src/client/quic/mod.rs index 6c5c18d..5d4fa0c 100644 --- a/crates/core/src/client/quic/mod.rs +++ b/crates/core/src/client/quic/mod.rs @@ -2,21 +2,16 @@ use std::sync::Arc; use quinn::{IdleTimeout, VarInt, crypto::rustls::QuicClientConfig}; use tokio::time::Instant; -use wallhack_transport::Transport; use crate::{ - ClientConfig, NodeRole, - client::tls_config, - psk::HandshakeExt, - server::server::DataChannels, - transport::{protocol, quic::QuicTransport}, + ClientConfig, NodeRole, client::tls_config, psk::HandshakeExt, transport::quic::QuicTransport, }; use wallhack_wire::{ control::{ControlMessage, control_message}, data::Handshake, }; -use super::client::{Client, ConnectResult, ConnectionTasks}; +use super::client::{Client, ConnectResult}; #[derive(thiserror::Error, Debug)] pub enum Error { @@ -172,72 +167,12 @@ impl Client for QuicClient { })?; } - // Create oneshot for receiving server's Handshake via the control loop. - let (handshake_tx, handshake_rx) = tokio::sync::oneshot::channel::(); - // Spawn control stream task - let control_handle = { - let transport = Arc::clone(&transport); - let peer_registry = self.peer_registry.clone(); - tokio::spawn(async move { - let mut channels = protocol::ControlChannels { - outgoing_rx: control_rx, - handshake_tx: Some(handshake_tx), - control_response_tx: None, - peer_registry, - peer_name: None, - }; - match protocol::run_control_stream_initiator( - &*transport, - &mut channels, - None, // client doesn't handle ControlRequests - std::time::Duration::from_secs(30), - ) - .await - { - Ok(exit) => tracing::debug!("Control stream finished: {exit:?}"), - Err(e) => tracing::debug!("Control stream error: {e}"), - } - }) - }; - - let channels = DataChannels::new(); - - // Incoming data task: accept uni stream from peer, dispatch messages. - let incoming_handle = { - let transport = Arc::clone(&transport); - let instructions_tx = channels.instructions_tx.clone(); - let responses_tx = channels.responses_tx.clone(); - tokio::spawn(async move { - match transport.accept_uni().await { - Ok(Some(mut recv)) => { - if let Err(e) = - protocol::run_data_in(&mut recv, &instructions_tx, &responses_tx).await - { - tracing::debug!("Data-in handler finished: {e}"); - } - } - Ok(None) => tracing::debug!("Transport closed before data-in stream accepted"), - Err(e) => tracing::debug!("Failed to accept data-in stream: {e}"), - } - }) - }; - - // Outgoing data task is NOT spawned here; the caller opens the uni stream - // and drives run_send_instructions / run_send_responses as appropriate for - // its role, consuming the receiver from DataChannels. - - let tasks = ConnectionTasks { - incoming: incoming_handle, - control: control_handle, - }; - - Ok(ConnectResult::new( - Arc::clone(&transport), - channels, - remote_addr, - tasks, + Ok(super::client::spawn_client_tasks( + transport, control_tx, - Some(handshake_rx), + control_rx, + self.peer_registry.clone(), + remote_addr, )) } diff --git a/crates/core/src/client/ws/mod.rs b/crates/core/src/client/ws/mod.rs index 34141cf..e7d8d1c 100644 --- a/crates/core/src/client/ws/mod.rs +++ b/crates/core/src/client/ws/mod.rs @@ -27,14 +27,10 @@ use crate::{ NodeRole, client::config::ClientConfig, psk::HandshakeExt, - server::server::DataChannels, - transport::{ - Transport, protocol, - websocket::{WebSocketByteStream, WebSocketTransport, WebSocketTransportConfig}, - }, + transport::websocket::{WebSocketByteStream, WebSocketTransport, WebSocketTransportConfig}, }; -use super::client::{ConnectResult, ConnectionTasks}; +use super::client::ConnectResult; /// Errors that can occur in the WebSocket client. #[derive(Debug, thiserror::Error)] @@ -389,72 +385,12 @@ impl WsClient { let _ = control_tx.send(msg).await; } - // Create oneshot for receiving server's Handshake via the control loop. - let (handshake_tx, handshake_rx) = tokio::sync::oneshot::channel::(); - // Spawn control stream task - let control_handle = { - let transport = Arc::clone(&transport); - let peer_registry = self.peer_registry.clone(); - tokio::spawn(async move { - let mut channels = protocol::ControlChannels { - outgoing_rx: control_rx, - handshake_tx: Some(handshake_tx), // receive server's Handshake - control_response_tx: None, - peer_registry, - peer_name: None, - }; - match protocol::run_control_stream_initiator( - &*transport, - &mut channels, - None, // client doesn't handle ControlRequests - std::time::Duration::from_secs(30), - ) - .await - { - Ok(exit) => tracing::debug!("Control stream finished: {exit:?}"), - Err(e) => tracing::debug!("Control stream error: {e}"), - } - }) - }; - - let channels = DataChannels::new(); - - // Incoming data task: accept uni stream from peer, dispatch messages. - let incoming_handle = { - let transport = Arc::clone(&transport); - let instructions_tx = channels.instructions_tx.clone(); - let responses_tx = channels.responses_tx.clone(); - tokio::spawn(async move { - match transport.accept_uni().await { - Ok(Some(mut recv)) => { - if let Err(e) = - protocol::run_data_in(&mut recv, &instructions_tx, &responses_tx).await - { - tracing::debug!("Data-in handler finished: {e}"); - } - } - Ok(None) => tracing::debug!("Transport closed before data-in stream accepted"), - Err(e) => tracing::debug!("Failed to accept data-in stream: {e}"), - } - }) - }; - - // Outgoing data task is NOT spawned here; the caller opens the uni stream - // and drives run_send_instructions / run_send_responses as appropriate for - // its role, consuming the receiver from DataChannels. - - let tasks = ConnectionTasks { - incoming: incoming_handle, - control: control_handle, - }; - - Ok(ConnectResult::new( - Arc::clone(&transport), - channels, - remote_addr_str, - tasks, + Ok(super::client::spawn_client_tasks( + transport, control_tx, - Some(handshake_rx), + control_rx, + self.peer_registry.clone(), + remote_addr_str, )) } } From bd21131f463bbdd7ec068a7d16874d71b5f9f978 Mon Sep 17 00:00:00 2001 From: Max Holman Date: Wed, 18 Mar 2026 22:02:01 +0700 Subject: [PATCH 5/5] refactor(transport): deduplicate QUIC/WS server task setup MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Extract spawn_server_tasks() in server.rs — resolves shared resources from ServerOptions, spawns the control loop with a Handler, and builds the AcceptResult. Replaces ~35 lines of identical setup in both QUIC and WS accept paths. Co-Authored-By: Claude Opus 4.6 (1M context) --- crates/core/src/server/quic/mod.rs | 66 +++---------------------- crates/core/src/server/server.rs | 79 ++++++++++++++++++++++++++++++ crates/core/src/server/ws/mod.rs | 66 +++---------------------- 3 files changed, 91 insertions(+), 120 deletions(-) diff --git a/crates/core/src/server/quic/mod.rs b/crates/core/src/server/quic/mod.rs index 2ec9dbd..70d1c57 100644 --- a/crates/core/src/server/quic/mod.rs +++ b/crates/core/src/server/quic/mod.rs @@ -9,7 +9,6 @@ use wallhack_wire::{ use crate::{ NodeRole, SocketAddrExt as _, - control::{handler::Handler, metrics::Metrics, peers::Registry, routes::RouteTable}, psk::HandshakeExt, server::tls::{ALPN_QUIC_HTTP, configure_crypto}, transport::{ @@ -185,66 +184,13 @@ impl Server for QuicServer { } } - // Get or create shared metrics - let metrics = self - .options - .metrics - .clone() - .unwrap_or_else(|| Arc::new(Metrics::default())); - - let channels = super::server::DataChannels::new(); - - // Create control channel for injecting outgoing control messages - let (control_tx, control_rx) = tokio::sync::mpsc::channel::(64); - - // Spawn control stream task with handler - let handler_config = self.options.handler_config.clone(); - let peers = self - .options - .peers - .clone() - .unwrap_or_else(|| Arc::new(Registry::new())); - let routes = self - .options - .routes - .clone() - .unwrap_or_else(RouteTable::shared); - - let peer_name = peer_handshake.as_ref().map(|hs| hs.name.clone()); - let route_updates = self.options.route_updates.clone().unwrap_or_else(|| { - let (tx, _) = tokio::sync::broadcast::channel(16); - tx - }); - - { - let metrics = Arc::clone(&metrics); - let peer_registry = Arc::clone(&peers); - tokio::spawn(async move { - let handler = Handler::new(handler_config, metrics, peers, routes, route_updates); - let mut channels = protocol::ControlChannels { - outgoing_rx: control_rx, - handshake_tx: None, // Handshake already read above - control_response_tx: None, // server doesn't issue ControlRequests - peer_registry: Some(peer_registry), - peer_name, - }; - let mut control_stream = - wallhack_transport::erased::BoxBiStream::new(control_stream); - let exit = channels - .run(&mut control_stream, Some(&handler), Duration::from_secs(30)) - .await; - tracing::debug!("Control stream finished: {exit:?}"); - }); - } - - // Data tasks are NOT spawned here — the caller does that after PSK validation. - Ok(Some(AcceptResult::with_handshake( - Arc::clone(&transport), - channels, - remote_addr, - metrics, + let control_stream = wallhack_transport::erased::BoxBiStream::new(control_stream); + Ok(Some(super::server::spawn_server_tasks( + transport, + control_stream, + &self.options, peer_handshake, - control_tx, + remote_addr, channel_binding, ))) } diff --git a/crates/core/src/server/server.rs b/crates/core/src/server/server.rs index 37d5570..93bfc6c 100644 --- a/crates/core/src/server/server.rs +++ b/crates/core/src/server/server.rs @@ -194,6 +194,85 @@ pub struct ServerOptions { pub local_handshake: Option, } +/// Spawn the control task and build an `AcceptResult` — shared by all +/// server transports. +/// +/// Called after the transport handshake exchange. Resolves shared +/// resources from `options`, spawns the control loop with a `Handler`, +/// and returns a fully wired `AcceptResult`. +pub fn spawn_server_tasks( + transport: Arc, + control_stream: wallhack_transport::erased::BoxBiStream, + options: &ServerOptions, + peer_handshake: Option, + remote_addr: String, + channel_binding: Option<[u8; crate::psk::CHANNEL_BINDING_LEN]>, +) -> AcceptResult +where + T::SendStream: 'static, + T::RecvStream: 'static, + T::BiStream: Send + 'static, +{ + use crate::{ + control::{handler::Handler, metrics::Metrics, peers::Registry, routes::RouteTable}, + transport::protocol, + }; + + let metrics = options + .metrics + .clone() + .unwrap_or_else(|| Arc::new(Metrics::default())); + + let channels = DataChannels::new(); + let (control_tx, control_rx) = mpsc::channel::(64); + + let handler_config = options.handler_config.clone(); + let peers = options + .peers + .clone() + .unwrap_or_else(|| Arc::new(Registry::new())); + let routes = options.routes.clone().unwrap_or_else(RouteTable::shared); + let peer_name = peer_handshake.as_ref().map(|hs| hs.name.clone()); + let route_updates = options.route_updates.clone().unwrap_or_else(|| { + let (tx, _) = tokio::sync::broadcast::channel(16); + tx + }); + + { + let metrics = Arc::clone(&metrics); + let peer_registry = Arc::clone(&peers); + tokio::spawn(async move { + let handler = Handler::new(handler_config, metrics, peers, routes, route_updates); + let mut channels = protocol::ControlChannels { + outgoing_rx: control_rx, + handshake_tx: None, + control_response_tx: None, + peer_registry: Some(peer_registry), + peer_name, + }; + let mut control_stream = control_stream; + let exit = channels + .run( + &mut control_stream, + Some(&handler), + std::time::Duration::from_secs(30), + ) + .await; + tracing::debug!("Control stream finished: {exit:?}"); + }); + } + + AcceptResult::with_handshake( + transport, + channels, + remote_addr, + metrics, + peer_handshake, + control_tx, + channel_binding, + ) +} + pub trait Server { type Error: std::error::Error + std::fmt::Debug + Send + Sync + 'static; type Transport: Transport; diff --git a/crates/core/src/server/ws/mod.rs b/crates/core/src/server/ws/mod.rs index 0a3da04..c600039 100644 --- a/crates/core/src/server/ws/mod.rs +++ b/crates/core/src/server/ws/mod.rs @@ -25,7 +25,6 @@ use yamux::Mode; use crate::{ NodeRole, SocketAddrExt as _, - control::{handler::Handler, metrics::Metrics, peers::Registry, routes::RouteTable}, psk::HandshakeExt, transport::{ protocol, @@ -266,66 +265,13 @@ impl Server for WebSocketServer { } } - // Get or create shared metrics - let metrics = self - .options - .metrics - .clone() - .unwrap_or_else(|| Arc::new(Metrics::default())); - - let channels = super::server::DataChannels::new(); - - // Create control channel for injecting outgoing control messages - let (control_tx, control_rx) = tokio::sync::mpsc::channel::(64); - - // Spawn control stream task with handler - let handler_config = self.options.handler_config.clone(); - let peers = self - .options - .peers - .clone() - .unwrap_or_else(|| Arc::new(Registry::new())); - let routes = self - .options - .routes - .clone() - .unwrap_or_else(RouteTable::shared); - - let peer_name = peer_handshake.as_ref().map(|hs| hs.name.clone()); - let route_updates = self.options.route_updates.clone().unwrap_or_else(|| { - let (tx, _) = tokio::sync::broadcast::channel(16); - tx - }); - - { - let metrics = Arc::clone(&metrics); - let peer_registry = Arc::clone(&peers); - tokio::spawn(async move { - let handler = Handler::new(handler_config, metrics, peers, routes, route_updates); - let mut channels = protocol::ControlChannels { - outgoing_rx: control_rx, - handshake_tx: None, // Handshake already read above - control_response_tx: None, // server doesn't issue ControlRequests - peer_registry: Some(peer_registry), - peer_name, - }; - let mut control_stream = - wallhack_transport::erased::BoxBiStream::new(control_stream); - let exit = channels - .run(&mut control_stream, Some(&handler), Duration::from_secs(30)) - .await; - tracing::debug!("Control stream finished: {exit:?}"); - }); - } - - // Data tasks are NOT spawned here — the caller does that after PSK validation. - Ok(Some(AcceptResult::with_handshake( - Arc::clone(&transport), - channels, - peer_addr.to_string(), - metrics, + let control_stream = wallhack_transport::erased::BoxBiStream::new(control_stream); + Ok(Some(super::server::spawn_server_tasks( + transport, + control_stream, + &self.options, peer_handshake, - control_tx, + peer_addr.to_string(), channel_binding, ))) }