From 40b82f468aca28f98f93f98593264e687f914342 Mon Sep 17 00:00:00 2001 From: Justin Kovacich Date: Wed, 22 Apr 2026 16:45:39 -0400 Subject: [PATCH 01/13] Add a new tokio transport layer that uses semantics identical to the current client/server code, gated behind client and server --- src/lib.rs | 7 + src/tokio_transport.rs | 316 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 323 insertions(+) create mode 100644 src/tokio_transport.rs diff --git a/src/lib.rs b/src/lib.rs index e0b7574..0b918f6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -132,6 +132,11 @@ mod raw_payload; /// SOME/IP server for offering services and handling incoming requests. #[cfg(feature = "server")] pub mod server; +/// Tokio + `socket2` implementation of the [`transport`] traits. Provided +/// as the default `std` backend — available whenever `client` or `server` +/// is enabled. +#[cfg(any(feature = "client", feature = "server"))] +pub mod tokio_transport; mod traits; /// Executor-agnostic UDP transport abstraction. `no_std`-compatible. /// @@ -152,6 +157,8 @@ pub use client::{Client, ClientUpdate, ClientUpdates, DiscoveryMessage, PendingR pub use e2e::{E2ECheckStatus, E2EKey, E2EProfile}; #[cfg(feature = "server")] pub use server::Server; +#[cfg(any(feature = "client", feature = "server"))] +pub use tokio_transport::{TokioSocket, TokioTimer, TokioTransport}; pub use transport::{ IoErrorKind, ReceivedDatagram, SocketOptions, Timer, TransportError, TransportFactory, TransportSocket, diff --git a/src/tokio_transport.rs b/src/tokio_transport.rs new file mode 100644 index 0000000..3b0288f --- /dev/null +++ b/src/tokio_transport.rs @@ -0,0 +1,316 @@ +//! Tokio + socket2 implementation of the [`crate::transport`] traits. +//! +//! This is the default `std` backend. [`TokioTransport`] constructs +//! configured [`TokioSocket`]s via `socket2` for bind-time options (reuse, +//! multicast interface, multicast loop) and converts them to +//! [`tokio::net::UdpSocket`] for the async I/O loop. [`TokioTimer`] is a +//! thin wrapper over `tokio::time::sleep`. +//! +//! Gated behind `#[cfg(any(feature = "client", feature = "server"))]` — +//! the `client` and `server` features are exactly the ones that already +//! pull in `tokio` and `socket2`, so no new dependency edge is introduced. +//! +//! # Example +//! +//! ```no_run +//! # #[cfg(any(feature = "client", feature = "server"))] +//! # async fn demo() -> Result<(), simple_someip::TransportError> { +//! use core::net::{Ipv4Addr, SocketAddrV4}; +//! use simple_someip::{SocketOptions, TransportFactory, TransportSocket}; +//! use simple_someip::tokio_transport::TokioTransport; +//! +//! let factory = TokioTransport::default(); +//! let mut options = SocketOptions::new(); +//! options.reuse_address = true; +//! +//! let mut sock = factory +//! .bind(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 0), &options) +//! .await?; +//! let bound = sock.local_addr()?; +//! println!("bound to {bound}"); +//! # Ok(()) +//! # } +//! ``` + +use core::future::Future; +use core::net::{Ipv4Addr, SocketAddrV4}; +use core::time::Duration; +use std::net::{IpAddr, SocketAddr}; +use tokio::net::UdpSocket; + +use crate::transport::{ + IoErrorKind, ReceivedDatagram, SocketOptions, Timer, TransportError, TransportFactory, + TransportSocket, +}; + +/// Factory that binds [`TokioSocket`]s configured via `socket2`. +/// +/// Unit struct — all required state (the tokio runtime) is implicit in the +/// ambient task context at call time. +#[derive(Debug, Default, Clone, Copy)] +pub struct TokioTransport; + +/// A bound UDP socket backed by [`tokio::net::UdpSocket`]. +#[derive(Debug)] +pub struct TokioSocket { + inner: UdpSocket, +} + +/// Sleep backed by [`tokio::time::sleep`]. +#[derive(Debug, Default, Clone, Copy)] +pub struct TokioTimer; + +impl TransportFactory for TokioTransport { + type Socket = TokioSocket; + + fn bind( + &self, + addr: SocketAddrV4, + options: &SocketOptions, + ) -> impl Future> { + // Capture options by value into the async block so the returned + // future does not borrow `self` or `options`. + let options = *options; + async move { bind_with_options(addr, &options).map_err(map_io_error) } + } +} + +impl TransportSocket for TokioSocket { + fn send_to( + &mut self, + buf: &[u8], + target: SocketAddrV4, + ) -> impl Future> { + async move { + self.inner + .send_to(buf, target) + .await + .map(|_| ()) + .map_err(map_io_error) + } + } + + fn recv_from( + &mut self, + buf: &mut [u8], + ) -> impl Future> { + async move { + let (n, src) = self.inner.recv_from(buf).await.map_err(map_io_error)?; + let source = match src { + SocketAddr::V4(v4) => v4, + SocketAddr::V6(_) => { + // SOME/IP is IPv4-only; an IPv6 source on our socket is + // either impossible (v4 bind) or a misconfiguration. + return Err(TransportError::Unsupported); + } + }; + Ok(ReceivedDatagram { + bytes_received: n, + source, + truncated: false, + }) + } + } + + fn local_addr(&self) -> Result { + match self.inner.local_addr().map_err(map_io_error)? { + SocketAddr::V4(v4) => Ok(v4), + SocketAddr::V6(_) => Err(TransportError::Unsupported), + } + } + + fn join_multicast_v4( + &mut self, + group: Ipv4Addr, + iface: Ipv4Addr, + ) -> Result<(), TransportError> { + self.inner + .join_multicast_v4(group, iface) + .map_err(map_io_error) + } + + fn leave_multicast_v4( + &mut self, + group: Ipv4Addr, + iface: Ipv4Addr, + ) -> Result<(), TransportError> { + self.inner + .leave_multicast_v4(group, iface) + .map_err(map_io_error) + } +} + +impl Timer for TokioTimer { + fn sleep(&self, duration: Duration) -> impl Future { + // tokio::time::sleep returns a Sleep future; we wrap in an async + // block so the returned type is a simple `impl Future`. + async move { tokio::time::sleep(duration).await } + } +} + +/// Synchronously create and configure a UDP socket via `socket2`, then +/// hand it to tokio. Mirrors the existing bind paths in +/// [`crate::client::socket_manager`] and [`crate::server`] so behavior is +/// identical. +fn bind_with_options(addr: SocketAddrV4, options: &SocketOptions) -> std::io::Result { + let raw = socket2::Socket::new( + socket2::Domain::IPV4, + socket2::Type::DGRAM, + Some(socket2::Protocol::UDP), + )?; + if options.reuse_address { + raw.set_reuse_address(true)?; + } + #[cfg(unix)] + if options.reuse_port { + raw.set_reuse_port(true)?; + } + if let Some(iface) = options.multicast_if_v4 { + raw.set_multicast_if_v4(&iface)?; + } + if options.multicast_loop_v4 { + raw.set_multicast_loop_v4(true)?; + } + let bind_addr = SocketAddr::new(IpAddr::V4(*addr.ip()), addr.port()); + raw.bind(&bind_addr.into())?; + raw.set_nonblocking(true)?; + let std_sock: std::net::UdpSocket = raw.into(); + let inner = UdpSocket::from_std(std_sock)?; + Ok(TokioSocket { inner }) +} + +/// Map a `std::io::Error` into [`TransportError`]. The mapping is +/// conservative — anything that is not a clear match becomes +/// [`TransportError::Io`] with [`IoErrorKind::Other`] — and is not +/// considered stable (adding finer mappings is not a breaking change). +fn map_io_error(e: std::io::Error) -> TransportError { + use std::io::ErrorKind as K; + match e.kind() { + K::AddrInUse => TransportError::AddressInUse, + K::Unsupported => TransportError::Unsupported, + K::TimedOut => TransportError::Io(IoErrorKind::TimedOut), + K::Interrupted => TransportError::Io(IoErrorKind::Interrupted), + K::PermissionDenied => TransportError::Io(IoErrorKind::PermissionDenied), + K::ConnectionRefused => TransportError::Io(IoErrorKind::ConnectionRefused), + K::NetworkUnreachable | K::HostUnreachable => { + TransportError::Io(IoErrorKind::NetworkUnreachable) + } + _ => TransportError::Io(IoErrorKind::Other), + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn bind_ephemeral_and_report_local_addr() { + let factory = TokioTransport; + let sock = factory + .bind( + SocketAddrV4::new(Ipv4Addr::LOCALHOST, 0), + &SocketOptions::default(), + ) + .await + .expect("bind"); + let addr = sock.local_addr().expect("local_addr"); + assert_eq!(*addr.ip(), Ipv4Addr::LOCALHOST); + assert_ne!(addr.port(), 0, "kernel must assign a non-zero port"); + } + + #[tokio::test] + async fn round_trip_send_recv_between_two_sockets() { + let factory = TokioTransport; + let opts = SocketOptions::default(); + + let mut recv = factory + .bind(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 0), &opts) + .await + .unwrap(); + let recv_addr = recv.local_addr().unwrap(); + + let mut send = factory + .bind(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 0), &opts) + .await + .unwrap(); + + let payload = b"hello tokio transport"; + send.send_to(payload, recv_addr).await.unwrap(); + + let mut buf = [0u8; 64]; + let datagram = tokio::time::timeout(Duration::from_secs(2), recv.recv_from(&mut buf)) + .await + .expect("recv timed out") + .expect("recv failed"); + + assert_eq!(datagram.bytes_received, payload.len()); + assert_eq!(&buf[..datagram.bytes_received], payload); + assert!(!datagram.truncated); + } + + #[tokio::test] + async fn reuse_address_option_allows_rebind_pattern() { + // Two sockets with reuse_address=true should be able to bind the + // same port on platforms where SO_REUSEADDR permits it (windows + // and linux both do for DGRAM). + let mut opts = SocketOptions::default(); + opts.reuse_address = true; + + let factory = TokioTransport; + let a = factory + .bind(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 0), &opts) + .await + .unwrap(); + let port = a.local_addr().unwrap().port(); + + // Bind a second socket with the same options; with reuse_address + // on, the OS allows this for UDP DGRAM on the platforms we support. + // If the OS refuses, fall back to a plain bind — we're not testing + // OS semantics here, only that the option is applied without error. + let b = factory + .bind(SocketAddrV4::new(Ipv4Addr::LOCALHOST, port), &opts) + .await; + // Either success or AddrInUse is acceptable; the assertion is + // that bind_with_options does not produce a different surprise + // (like Unsupported or a raw Io panic). + match b { + Ok(_) | Err(TransportError::AddressInUse) => {} + Err(other) => panic!("unexpected rebind error: {other:?}"), + } + drop(a); + } + + #[tokio::test] + async fn timer_sleep_elapses_at_least_requested() { + let timer = TokioTimer; + let started = tokio::time::Instant::now(); + timer.sleep(Duration::from_millis(25)).await; + assert!(started.elapsed() >= Duration::from_millis(25)); + } + + #[test] + fn map_io_error_covers_common_kinds() { + use std::io::{Error, ErrorKind}; + assert!(matches!( + map_io_error(Error::from(ErrorKind::AddrInUse)), + TransportError::AddressInUse + )); + assert!(matches!( + map_io_error(Error::from(ErrorKind::TimedOut)), + TransportError::Io(IoErrorKind::TimedOut) + )); + assert!(matches!( + map_io_error(Error::from(ErrorKind::ConnectionRefused)), + TransportError::Io(IoErrorKind::ConnectionRefused) + )); + assert!(matches!( + map_io_error(Error::from(ErrorKind::Unsupported)), + TransportError::Unsupported + )); + // Fallback path + assert!(matches!( + map_io_error(Error::from(ErrorKind::Other)), + TransportError::Io(IoErrorKind::Other) + )); + } +} From 6b859886839be75cc23b584c2c390114830412d4 Mon Sep 17 00:00:00 2001 From: Justin Kovacich Date: Wed, 22 Apr 2026 17:00:17 -0400 Subject: [PATCH 02/13] bind_discovery_seeded and bind are async and construct a TokioSocket rather than call on socket2 directly --- src/client/error.rs | 4 ++ src/client/inner.rs | 25 +++---- src/client/socket_manager.rs | 122 +++++++++++++++++++++-------------- src/transport.rs | 13 +++- 4 files changed, 103 insertions(+), 61 deletions(-) diff --git a/src/client/error.rs b/src/client/error.rs index 97ce2f1..ee509d7 100644 --- a/src/client/error.rs +++ b/src/client/error.rs @@ -46,4 +46,8 @@ pub enum Error { /// (→ `crate::UDP_BUFFER_SIZE`). #[error("internal capacity exceeded: {0}")] Capacity(&'static str), + /// An error surfaced by the pluggable transport backend (see + /// [`crate::transport::TransportError`]). + #[error("transport error: {0:?}")] + Transport(#[from] crate::transport::TransportError), } diff --git a/src/client/inner.rs b/src/client/inner.rs index 9c41526..b602e45 100644 --- a/src/client/inner.rs +++ b/src/client/inner.rs @@ -366,7 +366,7 @@ where (control_sender, update_receiver) } - fn bind_discovery(&mut self) -> Result<(), Error> { + async fn bind_discovery(&mut self) -> Result<(), Error> { if self.discovery_socket.is_some() { Ok(()) } else { @@ -376,7 +376,8 @@ where self.sd_session_id, self.sd_session_has_wrapped, self.multicast_loopback, - )?; + ) + .await?; self.discovery_socket = Some(socket); Ok(()) } @@ -397,7 +398,7 @@ where self.interface = interface; } - fn bind_unicast(&mut self, port: u16) -> Result { + async fn bind_unicast(&mut self, port: u16) -> Result { if port != 0 && let Some(socket) = self.unicast_sockets.get(&port) { @@ -412,7 +413,7 @@ where ); return Err(Error::Capacity("unicast_sockets")); } - let unicast_socket = SocketManager::bind(port, Arc::clone(&self.e2e_registry))?; + let unicast_socket = SocketManager::bind(port, Arc::clone(&self.e2e_registry)).await?; let bound_port = unicast_socket.port(); // Capacity was checked above, so insert cannot report "full" here. // A defensive check guards against a future refactor that changes @@ -571,7 +572,7 @@ where return; } info!("Binding to interface: {}", interface); - let bind_result = self.bind_discovery(); + let bind_result = self.bind_discovery().await; match &bind_result { Ok(()) => { info!("Successfully Bound to interface: {}", interface); @@ -585,7 +586,7 @@ where } } ControlMessage::BindDiscovery(response) => { - let result = self.bind_discovery(); + let result = self.bind_discovery().await; if response.send(result).is_err() { warn!("BindDiscovery response receiver dropped (caller canceled)"); } @@ -600,7 +601,7 @@ where // SD Message, If the discovery socket is not bound, bind it match &mut self.discovery_socket { None => { - match self.bind_discovery() { + match self.bind_discovery().await { Ok(()) => { // See re-enqueue note on SetInterface above. if let Err(rejected) = self.request_queue.push_front( @@ -704,7 +705,7 @@ where let source_port = if desired_port == 0 { // Ephemeral: auto-bind only if no sockets exist, then use first if self.unicast_sockets.is_empty() { - match self.bind_unicast(0) { + match self.bind_unicast(0).await { Ok(port) => { debug!("Auto-bound unicast on port {} for SendToService", port); port @@ -719,7 +720,7 @@ where } } else { // Specific port: bind if not already bound - match self.bind_unicast(desired_port) { + match self.bind_unicast(desired_port).await { Ok(port) => port, Err(e) => { let _ = send_complete.send(Err(e)); @@ -792,7 +793,7 @@ where } // Bind unicast on the requested port (0 = ephemeral) - let unicast_port = match self.bind_unicast(client_port) { + let unicast_port = match self.bind_unicast(client_port).await { Ok(port) => { debug!("Bound unicast on port {} for Subscribe", port); port @@ -805,7 +806,7 @@ where // Auto-bind discovery if not bound (re-queue like SendSD does) match &mut self.discovery_socket { - None => match self.bind_discovery() { + None => match self.bind_discovery().await { Ok(()) => { // See re-enqueue note on SetInterface above. if let Err(rejected) = @@ -1194,6 +1195,7 @@ mod tests { for _ in 0..UNICAST_SOCKETS_CAP { let bound = inner .bind_unicast(0) + .await .expect("ephemeral bind below cap should succeed"); assert_ne!(bound, 0, "OS should assign a non-zero ephemeral port"); } @@ -1203,6 +1205,7 @@ mod tests { // socket (pre-bind capacity check). let err = inner .bind_unicast(0) + .await .expect_err("bind past cap should fail"); match err { Error::Capacity(name) => assert_eq!(name, "unicast_sockets"), diff --git a/src/client/socket_manager.rs b/src/client/socket_manager.rs index 4a12bfe..bb89941 100644 --- a/src/client/socket_manager.rs +++ b/src/client/socket_manager.rs @@ -2,16 +2,18 @@ use crate::{ UDP_BUFFER_SIZE, e2e::{E2ECheckStatus, E2EKey, E2ERegistry}, protocol::{Message, MessageView, sd}, + tokio_transport::TokioTransport, traits::{PayloadWireFormat, WireFormat}, + transport::{ReceivedDatagram, SocketOptions, TransportFactory, TransportSocket}, }; use super::error::Error; use std::{ - net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4}, + net::{Ipv4Addr, SocketAddr, SocketAddrV4}, sync::{Arc, Mutex}, task::{Context, Poll}, }; -use tokio::{net::UdpSocket, select, sync::mpsc}; +use tokio::{select, sync::mpsc}; use tracing::{error, info, trace}; /// A received message together with the source address it came from. @@ -67,7 +69,11 @@ where /// a previous socket when rebinding. Pass `(1, false)` for a fresh bind. /// Preserving state across rebinds avoids emitting a false reboot signal /// (`reboot_flag=1`) to peers after `unbind_discovery` + `bind_discovery`. - pub fn bind_discovery_seeded( + /// + /// Uses the default [`TokioTransport`] backend. Bare-metal callers can + /// construct a `SocketManager` directly via the `_with_transport` + /// variant once that lands alongside the phase-6 spawn-hoist refactor. + pub async fn bind_discovery_seeded( interface: Ipv4Addr, e2e_registry: Arc>, session_id: u16, @@ -76,19 +82,7 @@ where ) -> Result { let (rx_tx, rx_rx) = mpsc::channel(16); let (tx_tx, tx_rx) = mpsc::channel(16); - let bind_addr = - std::net::SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), sd::MULTICAST_PORT); - - // Create socket with SO_REUSEADDR to allow quick restart - let socket = socket2::Socket::new( - socket2::Domain::IPV4, - socket2::Type::DGRAM, - Some(socket2::Protocol::UDP), - )?; - socket.set_reuse_address(true)?; - #[cfg(unix)] - socket.set_reuse_port(true)?; - socket.set_multicast_if_v4(&interface)?; + // Control whether multicast packets sent by this socket are looped // back to sockets on the same host — INCLUDING this socket itself. // Disabled by default to avoid parsing self-sent OfferService / @@ -97,12 +91,18 @@ where // deliver this socket's own SD multicasts back to it, so higher-level // consumers must be prepared to see their own announcements surface // as inbound discovery traffic. - socket.set_multicast_loop_v4(multicast_loopback)?; - socket.bind(&bind_addr.into())?; - socket.set_nonblocking(true)?; - let socket: std::net::UdpSocket = socket.into(); - let socket = UdpSocket::from_std(socket)?; + let options = { + let mut o = SocketOptions::new(); + o.reuse_address = true; + o.reuse_port = true; + o.multicast_if_v4 = Some(interface); + o.multicast_loop_v4 = multicast_loopback; + o + }; + let bind_addr = SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, sd::MULTICAST_PORT); + let factory = TokioTransport; + let mut socket = factory.bind(bind_addr, &options).await?; socket.join_multicast_v4(sd::MULTICAST_IP, interface)?; Self::spawn_socket_loop(socket, rx_tx, tx_rx, e2e_registry); @@ -115,22 +115,19 @@ where }) } - pub fn bind(port: u16, e2e_registry: Arc>) -> Result { + pub async fn bind(port: u16, e2e_registry: Arc>) -> Result { let (rx_tx, rx_rx) = mpsc::channel(4); let (tx_tx, tx_rx) = mpsc::channel(4); - let bind_addr = std::net::SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), port); - - // Create socket with SO_REUSEADDR and SO_REUSEPORT to allow quick restart - let socket = socket2::Socket::new( - socket2::Domain::IPV4, - socket2::Type::DGRAM, - Some(socket2::Protocol::UDP), - )?; - socket.set_reuse_address(true)?; - socket.bind(&bind_addr.into())?; - socket.set_nonblocking(true)?; - let socket: std::net::UdpSocket = socket.into(); - let socket = UdpSocket::from_std(socket)?; + + let options = { + let mut o = SocketOptions::new(); + o.reuse_address = true; + o + }; + let bind_addr = SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, port); + + let factory = TokioTransport; + let socket = factory.bind(bind_addr, &options).await?; let port = socket.local_addr()?.port(); Self::spawn_socket_loop(socket, rx_tx, tx_rx, e2e_registry); Ok(Self { @@ -204,9 +201,20 @@ where _ = receiver.recv().await; } + /// Spawn the I/O loop over a concrete [`TokioSocket`]. + /// + /// The socket's trait methods (`send_to`, `recv_from`, + /// `join_multicast_v4`) are the entire I/O surface used inside — the + /// loop body does not call any `TokioSocket`-specific inherent + /// methods, so generalizing this function over `T: TransportSocket` + /// is a mechanical change once the outer `tokio::spawn` is hoisted + /// out in phase 6 (stable Rust's `Send` bounds on RPITIT method + /// returns are currently expressible only via return-type notation, + /// which is nightly — hoisting the spawn avoids the issue by moving + /// the `Send` requirement off this function entirely). #[allow(clippy::too_many_lines)] fn spawn_socket_loop( - socket: UdpSocket, + mut socket: crate::tokio_transport::TokioSocket, rx_tx: mpsc::Sender, Error>>, mut tx_rx: mpsc::Receiver>, e2e_registry: Arc>, @@ -217,7 +225,18 @@ where select! { result = socket.recv_from(&mut buf) => { match result { - Ok((bytes_received, source_address)) => { + Ok(ReceivedDatagram { bytes_received, source, truncated }) => { + if truncated { + // A truncated datagram cannot be parsed reliably; + // the length field in the SOME/IP header will not + // match the bytes we received. Log and drop. + error!( + "Discarding truncated datagram from {}: {} bytes received", + source, bytes_received + ); + continue; + } + let source_address = SocketAddr::V4(source); let parse_result = MessageView::parse(&buf[..bytes_received]) .and_then(|view| { let header = view.header().to_owned(); @@ -326,7 +345,7 @@ where } match socket.send_to(&buf[..message_length], send_message.target_addr).await { - Ok(_bytes_sent) => { + Ok(()) => { trace!("Sent {} bytes to {}", message_length, send_message.target_addr); if let Ok(()) = send_message.response.send(Ok(())) {} else { info!("Socket owner closed channel, closing socket."); @@ -336,7 +355,7 @@ where } Err(e) => { error!("Failed to send message with error: {:?}", e); - if let Ok(()) = send_message.response.send(Err(Error::Io(e))) { } else { + if let Ok(()) = send_message.response.send(Err(Error::Transport(e))) { } else { error!("Socket owner closed channel unexpectedly, closing socket."); break; } @@ -360,6 +379,10 @@ mod tests { use crate::protocol::sd::test_support::{TestPayload, empty_sd_header}; use std::format; use std::vec; + // Tests build ad-hoc UDP peers via tokio directly; this is not part of + // the production code path, which goes through the `TransportSocket` + // abstraction via `TokioTransport`. + use tokio::net::UdpSocket; type TestSocketManager = SocketManager; @@ -369,7 +392,7 @@ mod tests { #[tokio::test] async fn test_bind_ephemeral_port() { - let sm = TestSocketManager::bind(0, test_registry()).unwrap(); + let sm = TestSocketManager::bind(0, test_registry()).await.unwrap(); assert!(sm.port() > 0); assert_eq!(sm.session_id(), 1); } @@ -387,13 +410,13 @@ mod tests { #[tokio::test] async fn test_socket_manager_shut_down() { - let sm = TestSocketManager::bind(0, test_registry()).unwrap(); + let sm = TestSocketManager::bind(0, test_registry()).await.unwrap(); sm.shut_down().await; } #[tokio::test] async fn test_socket_manager_send_and_receive() { - let mut sm = TestSocketManager::bind(0, test_registry()).unwrap(); + let mut sm = TestSocketManager::bind(0, test_registry()).await.unwrap(); let sm_port = sm.port(); // Create a raw UDP socket to send data to the SocketManager @@ -425,7 +448,7 @@ mod tests { #[tokio::test] async fn test_poll_receive() { - let mut sm = TestSocketManager::bind(0, test_registry()).unwrap(); + let mut sm = TestSocketManager::bind(0, test_registry()).await.unwrap(); let sm_port = sm.port(); // Send a message to the socket manager from a raw socket @@ -451,7 +474,7 @@ mod tests { #[tokio::test] async fn test_send_drops_when_socket_loop_exits() { - let mut sm = TestSocketManager::bind(0, test_registry()).unwrap(); + let mut sm = TestSocketManager::bind(0, test_registry()).await.unwrap(); // Shut down the socket loop by dropping the internal channels // We can't directly kill the loop, but we can test the error path // by sending to a socket manager that has been shut down. @@ -495,7 +518,7 @@ mod tests { #[tokio::test] async fn test_socket_manager_debug() { - let sm = TestSocketManager::bind(0, test_registry()).unwrap(); + let sm = TestSocketManager::bind(0, test_registry()).await.unwrap(); let s = format!("{sm:?}"); assert!(s.contains("SocketManager")); sm.shut_down().await; @@ -503,7 +526,7 @@ mod tests { #[tokio::test] async fn test_socket_manager_send_to_target() { - let mut sm = TestSocketManager::bind(0, test_registry()).unwrap(); + let mut sm = TestSocketManager::bind(0, test_registry()).await.unwrap(); // Create a raw socket to receive let raw_socket = UdpSocket::bind("127.0.0.1:0").await.unwrap(); @@ -542,13 +565,14 @@ mod tests { false, false, ) + .await .unwrap(); assert_eq!(sm.session_id(), 1, "session_id 0 must be normalized to 1"); } #[tokio::test] async fn test_session_id_wraps_to_one_and_clears_reboot_flag() { - let mut sm = TestSocketManager::bind(0, test_registry()).unwrap(); + let mut sm = TestSocketManager::bind(0, test_registry()).await.unwrap(); let raw_socket = UdpSocket::bind("127.0.0.1:0").await.unwrap(); let target = SocketAddrV4::new(Ipv4Addr::LOCALHOST, raw_socket.local_addr().unwrap().port()); @@ -604,7 +628,9 @@ mod tests { reg.register(key, E2EProfile::Profile4(Profile4Config::new(0, 15))); let e2e_registry = Arc::new(Mutex::new(reg)); - let mut sm = SocketManager::::bind(0, e2e_registry).unwrap(); + let mut sm = SocketManager::::bind(0, e2e_registry) + .await + .unwrap(); // Craft a message whose raw-encoded size fits `UDP_BUFFER_SIZE` // exactly (header + payload = cap) but whose E2E-protected size diff --git a/src/transport.rs b/src/transport.rs index 68ab5d3..fff450f 100644 --- a/src/transport.rs +++ b/src/transport.rs @@ -208,21 +208,27 @@ use core::time::Duration; /// kinds can be added without a breaking change. Kept local to this crate /// (rather than re-exporting `embedded_io::ErrorKind`) so our public API /// does not move when `embedded_io` bumps major versions. -#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, thiserror::Error)] #[non_exhaustive] pub enum IoErrorKind { /// The operation timed out. + #[error("operation timed out")] TimedOut, /// The operation was interrupted and can be retried. + #[error("operation interrupted")] Interrupted, /// The caller lacks permission for the operation. + #[error("permission denied")] PermissionDenied, /// A remote peer actively refused the connection / destination was /// unreachable. + #[error("connection refused")] ConnectionRefused, /// The network layer rejected the operation (routing, MTU, etc.). + #[error("network unreachable")] NetworkUnreachable, /// Any error that does not fit a more specific variant. + #[error("i/o error")] Other, } @@ -234,16 +240,19 @@ pub enum IoErrorKind { /// native error types into one of these variants; anything that does not /// fit a specific variant should use [`TransportError::Io`] with an /// appropriate [`IoErrorKind`]. -#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, thiserror::Error)] #[non_exhaustive] pub enum TransportError { /// Bind failed because the address or port is already in use. + #[error("address in use")] AddressInUse, /// The operation is not supported by this transport (for example, /// multicast on a backend that has none, or an IPv6 address on an /// IPv4-only stack). + #[error("unsupported transport operation")] Unsupported, /// A generic I/O error, classified by a portable [`IoErrorKind`]. + #[error("transport i/o: {0}")] Io(IoErrorKind), } From 8526f9f8025d79292cb679b4be337d5644f9753a Mon Sep 17 00:00:00 2001 From: Justin Kovacich Date: Wed, 22 Apr 2026 17:05:50 -0400 Subject: [PATCH 03/13] Added bind_with_transport methods that accept a factory that produces a TokioSocket, one for SD, added tests --- src/client/socket_manager.rs | 110 ++++++++++++++++++++++++++++++++--- 1 file changed, 101 insertions(+), 9 deletions(-) diff --git a/src/client/socket_manager.rs b/src/client/socket_manager.rs index bb89941..2ca8ac9 100644 --- a/src/client/socket_manager.rs +++ b/src/client/socket_manager.rs @@ -65,14 +65,15 @@ impl SocketManager where MessageDefinitions: PayloadWireFormat + 'static, { - /// Bind the SD multicast socket, seeding the session counter and wrap state from - /// a previous socket when rebinding. Pass `(1, false)` for a fresh bind. - /// Preserving state across rebinds avoids emitting a false reboot signal - /// (`reboot_flag=1`) to peers after `unbind_discovery` + `bind_discovery`. + /// Bind the SD multicast socket, seeding the session counter and wrap + /// state from a previous socket when rebinding. Pass `(1, false)` for a + /// fresh bind. Preserving state across rebinds avoids emitting a false + /// reboot signal (`reboot_flag=1`) to peers after + /// `unbind_discovery` + `bind_discovery`. /// - /// Uses the default [`TokioTransport`] backend. Bare-metal callers can - /// construct a `SocketManager` directly via the `_with_transport` - /// variant once that lands alongside the phase-6 spawn-hoist refactor. + /// Uses the default [`TokioTransport`] backend. For tests or alternate + /// bind logic (e.g. an interceptor factory around `TokioTransport`), + /// use [`Self::bind_discovery_seeded_with_transport`]. pub async fn bind_discovery_seeded( interface: Ipv4Addr, e2e_registry: Arc>, @@ -80,6 +81,35 @@ where session_has_wrapped: bool, multicast_loopback: bool, ) -> Result { + Self::bind_discovery_seeded_with_transport( + &TokioTransport, + interface, + e2e_registry, + session_id, + session_has_wrapped, + multicast_loopback, + ) + .await + } + + /// Variant of [`Self::bind_discovery_seeded`] that constructs the + /// underlying socket through a caller-supplied [`TransportFactory`]. + /// The factory must still produce a + /// [`TokioSocket`](crate::tokio_transport::TokioSocket) because the + /// spawned I/O loop is currently tokio-specific; once phase 6 hoists + /// the spawn out of this function, this bound will be relaxed to any + /// `TransportSocket`. + pub async fn bind_discovery_seeded_with_transport( + factory: &F, + interface: Ipv4Addr, + e2e_registry: Arc>, + session_id: u16, + session_has_wrapped: bool, + multicast_loopback: bool, + ) -> Result + where + F: TransportFactory, + { let (rx_tx, rx_rx) = mpsc::channel(16); let (tx_tx, tx_rx) = mpsc::channel(16); @@ -101,7 +131,6 @@ where }; let bind_addr = SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, sd::MULTICAST_PORT); - let factory = TokioTransport; let mut socket = factory.bind(bind_addr, &options).await?; socket.join_multicast_v4(sd::MULTICAST_IP, interface)?; @@ -116,6 +145,21 @@ where } pub async fn bind(port: u16, e2e_registry: Arc>) -> Result { + Self::bind_with_transport(&TokioTransport, port, e2e_registry).await + } + + /// Variant of [`Self::bind`] that constructs the underlying socket + /// through a caller-supplied [`TransportFactory`]. See + /// [`Self::bind_discovery_seeded_with_transport`] for the factory + /// bound rationale. + pub async fn bind_with_transport( + factory: &F, + port: u16, + e2e_registry: Arc>, + ) -> Result + where + F: TransportFactory, + { let (rx_tx, rx_rx) = mpsc::channel(4); let (tx_tx, tx_rx) = mpsc::channel(4); @@ -126,7 +170,6 @@ where }; let bind_addr = SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, port); - let factory = TokioTransport; let socket = factory.bind(bind_addr, &options).await?; let port = socket.local_addr()?.port(); Self::spawn_socket_loop(socket, rx_tx, tx_rx, e2e_registry); @@ -711,4 +754,53 @@ mod tests { other => panic!("expected Error::Capacity(\"udp_buffer\"), got {other:?}"), } } + + /// Proves the public `bind_with_transport` entry point accepts an + /// alternative `TransportFactory` implementation. The factory here is + /// a thin interceptor that counts how many times `bind` is called; it + /// delegates to the built-in `TokioTransport`, which is what the + /// current `Socket = TokioSocket` bound requires. + #[tokio::test] + async fn bind_with_transport_accepts_custom_factory() { + use crate::tokio_transport::{TokioSocket, TokioTransport}; + use core::future::Future; + use core::sync::atomic::{AtomicUsize, Ordering}; + + struct CountingFactory { + inner: TokioTransport, + calls: AtomicUsize, + } + + impl TransportFactory for CountingFactory { + type Socket = TokioSocket; + fn bind( + &self, + addr: SocketAddrV4, + options: &SocketOptions, + ) -> impl Future> + { + self.calls.fetch_add(1, Ordering::SeqCst); + // Clone the options into the async block so no borrow + // escapes the returned future. + let options = *options; + let inner = self.inner; + async move { inner.bind(addr, &options).await } + } + } + + let factory = CountingFactory { + inner: TokioTransport, + calls: AtomicUsize::new(0), + }; + + let sm = TestSocketManager::bind_with_transport(&factory, 0, test_registry()) + .await + .expect("bind via custom factory"); + assert_eq!( + factory.calls.load(Ordering::SeqCst), + 1, + "custom factory should have been invoked exactly once" + ); + drop(sm); + } } From e37d061ff626a8907d14738c7d827ad9365e9785 Mon Sep 17 00:00:00 2001 From: Justin Kovacich Date: Wed, 22 Apr 2026 17:14:52 -0400 Subject: [PATCH 04/13] Responding to PR feedback --- src/client/socket_manager.rs | 16 ++++++++++++++++ src/tokio_transport.rs | 24 ++++++++++++++++++++++-- 2 files changed, 38 insertions(+), 2 deletions(-) diff --git a/src/client/socket_manager.rs b/src/client/socket_manager.rs index 2ca8ac9..3097e57 100644 --- a/src/client/socket_manager.rs +++ b/src/client/socket_manager.rs @@ -17,6 +17,13 @@ use tokio::{select, sync::mpsc}; use tracing::{error, info, trace}; /// A received message together with the source address it came from. +/// +/// TODO(phase 6): narrow `source` to `SocketAddrV4` to match the +/// `TransportSocket` trait's IPv4-only contract — today the field is +/// always a `SocketAddr::V4(_)` wrapping, and the V6 variant is +/// unreachable. Deferred here because the rename ripples through +/// `DiscoveryMessage` and `ClientUpdate::Unicast`, which is scope creep +/// for phase 5. #[derive(Clone, Debug)] pub struct ReceivedMessage

{ pub message: Message

, @@ -760,6 +767,15 @@ mod tests { /// a thin interceptor that counts how many times `bind` is called; it /// delegates to the built-in `TokioTransport`, which is what the /// current `Socket = TokioSocket` bound requires. + /// + /// TODO: extend this with an end-to-end round-trip test that uses a + /// custom factory to actually carry traffic (send from socket A, + /// receive on socket B, assert bytes match), and a negative test + /// where the factory returns `Err(TransportError::AddressInUse)` + /// and asserts that surfaces as `Error::Transport(...)` through the + /// `?` + `From` chain. Both are scoped for the phase-6 branch where + /// the spawn hoist lets us swap the socket type, not just the bind + /// logic. #[tokio::test] async fn bind_with_transport_accepts_custom_factory() { use crate::tokio_transport::{TokioSocket, TokioTransport}; diff --git a/src/tokio_transport.rs b/src/tokio_transport.rs index 3b0288f..5dc7649 100644 --- a/src/tokio_transport.rs +++ b/src/tokio_transport.rs @@ -57,6 +57,13 @@ pub struct TokioSocket { } /// Sleep backed by [`tokio::time::sleep`]. +/// +/// TODO(phase 7): wire this into the `tokio::time::sleep` call sites in +/// `client::inner::Inner::run` (125 ms tick), `server::mod::Server::run`, +/// and `Client::start_sd_announcements` (1 s tick) so the crate's own +/// timing is also routed through the `Timer` trait. Today `TokioTimer` +/// is shipped as public API but unused internally — consumers can rely +/// on it, but the crate's own code still uses tokio directly. #[derive(Debug, Default, Clone, Copy)] pub struct TokioTimer; @@ -183,9 +190,16 @@ fn bind_with_options(addr: SocketAddrV4, options: &SocketOptions) -> std::io::Re /// conservative — anything that is not a clear match becomes /// [`TransportError::Io`] with [`IoErrorKind::Other`] — and is not /// considered stable (adding finer mappings is not a breaking change). +/// +/// The full `std::io::Error` (raw errno, OS message, chained source) is +/// discarded by design to keep the public [`TransportError`] enum +/// portable and `no_std`-safe. To keep field debugging possible anyway, +/// the original error is emitted at `warn!` level here before mapping — +/// ops sees the detailed message in logs while callers get the portable +/// enum. fn map_io_error(e: std::io::Error) -> TransportError { use std::io::ErrorKind as K; - match e.kind() { + let mapped = match e.kind() { K::AddrInUse => TransportError::AddressInUse, K::Unsupported => TransportError::Unsupported, K::TimedOut => TransportError::Io(IoErrorKind::TimedOut), @@ -196,7 +210,13 @@ fn map_io_error(e: std::io::Error) -> TransportError { TransportError::Io(IoErrorKind::NetworkUnreachable) } _ => TransportError::Io(IoErrorKind::Other), - } + }; + tracing::warn!( + "tokio transport io error: {e} (raw_os={:?}, kind={:?}) mapped to {mapped}", + e.raw_os_error(), + e.kind(), + ); + mapped } #[cfg(test)] From 5fd83879b0776ff55ed78a60628c18ce50e5052c Mon Sep 17 00:00:00 2001 From: Justin Kovacich Date: Thu, 23 Apr 2026 12:06:41 -0400 Subject: [PATCH 05/13] phase 5: respond to PR #79 feedback MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Addresses three Copilot review comments. 1. tokio_transport: apply `multicast_loop_v4` unconditionally. The previous `if options.multicast_loop_v4 { set_multicast_loop_v4(true) }` only set the flag when the caller requested it ON; when OFF the socket kept the OS default (loopback ENABLED on Linux), diverging from the pre-trait code path that always called IP_MULTICAST_LOOP with the requested value. Fix: call `set_multicast_loop_v4(options.multicast_loop_v4)` every bind. A new `pub(crate) TokioSocket::multicast_loop_v4()` wraps `tokio::net::UdpSocket::multicast_loop_v4()` so tests (and future field debugging) can read the flag back. Covered by `multicast_loop_v4_option_propagates_in_both_directions`, which binds two sockets (false, true) and asserts the kernel reports the requested value for each. 2. client::Error::Transport: switch from `{0:?}` to `#[error(transparent)]`. The debug-format template was leaking variant names and struct-like debug output into user-facing error messages. Chose `transparent` over the prefixed `"transport error: {0}"` form for consistency with the three existing `#[error(transparent)]` variants on the same enum (`Protocol`, `Io`, `E2e`) — they all delegate fully to their inner Display. The `TransportError` Display impls already read as complete sentences ("address in use", "unsupported transport operation", "transport i/o: ..."), so the extra prefix would be noise. Covered by `transport_variant_displays_via_inner_display_not_debug`, which asserts the Display output contains no debug artifacts (`{`, `}`, `"`) and equals the inner `TransportError`'s Display verbatim. 3. transport / lib module docs: update the "no implementations ship" language. The TokioTransport / TokioSocket / TokioTimer backend now ships under the `client` / `server` features; the stale note in the transport module's `# Status` section and the short description next to `pub mod transport;` in lib.rs now point at the default backend. Docs-only; `cargo test --doc --all-features` passes. Co-Authored-By: Claude Opus 4.7 --- src/client/error.rs | 40 ++++++++++++++++++++++++++++++- src/lib.rs | 10 +++----- src/tokio_transport.rs | 53 +++++++++++++++++++++++++++++++++++++++--- src/transport.rs | 12 ++++++---- 4 files changed, 99 insertions(+), 16 deletions(-) diff --git a/src/client/error.rs b/src/client/error.rs index ee509d7..97063c0 100644 --- a/src/client/error.rs +++ b/src/client/error.rs @@ -48,6 +48,44 @@ pub enum Error { Capacity(&'static str), /// An error surfaced by the pluggable transport backend (see /// [`crate::transport::TransportError`]). - #[error("transport error: {0:?}")] + #[error(transparent)] Transport(#[from] crate::transport::TransportError), } + +#[cfg(test)] +mod tests { + use super::*; + use crate::transport::TransportError; + use std::format; + + #[test] + fn transport_variant_displays_via_inner_display_not_debug() { + // Regression guard: previously `{0:?}` leaked debug formatting + // (e.g. `AddressInUse`) into user-facing error messages. The + // `#[error(transparent)]` form delegates fully to the inner + // `TransportError`'s Display impl. + let err = Error::Transport(TransportError::AddressInUse); + let displayed = format!("{err}"); + + // No debug-format artifacts: no braces (`AddressInUse` is a unit + // variant, but struct-like variants would debug-format with + // braces), no quote-wrapping, no raw variant name from debug. + assert!( + !displayed.contains('{'), + "unexpected `{{` in Display output: {displayed:?}" + ); + assert!( + !displayed.contains('}'), + "unexpected `}}` in Display output: {displayed:?}" + ); + assert!( + !displayed.contains('"'), + "unexpected `\"` in Display output: {displayed:?}" + ); + + // `transparent` delegates to the inner Display verbatim. + let inner = format!("{}", TransportError::AddressInUse); + assert_eq!(displayed, inner); + assert_eq!(displayed, "address in use"); + } +} diff --git a/src/lib.rs b/src/lib.rs index 0b918f6..7cc0529 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -138,13 +138,9 @@ pub mod server; #[cfg(any(feature = "client", feature = "server"))] pub mod tokio_transport; mod traits; -/// Executor-agnostic UDP transport abstraction. `no_std`-compatible. -/// -/// Intended to be consumed by the `client` and `server` modules in a -/// future refactor; currently those paths still use `tokio` / `socket2` -/// directly. The trait surface is defined here so bare-metal consumers -/// can implement it today against their own stack and be ready when the -/// higher-level modules are migrated. +/// Executor-agnostic UDP transport abstraction used by the client and +/// server modules. `no_std`-compatible; a default `std + tokio` backend +/// ships in [`tokio_transport`] under the `client` / `server` features. pub mod transport; #[cfg(feature = "std")] pub use raw_payload::{RawPayload, VecSdHeader}; diff --git a/src/tokio_transport.rs b/src/tokio_transport.rs index 5dc7649..71b6109 100644 --- a/src/tokio_transport.rs +++ b/src/tokio_transport.rs @@ -56,6 +56,21 @@ pub struct TokioSocket { inner: UdpSocket, } +impl TokioSocket { + /// Read back the current value of the `IP_MULTICAST_LOOP` flag. Thin + /// wrapper over [`tokio::net::UdpSocket::multicast_loop_v4`], exposed + /// for tests that verify [`SocketOptions::multicast_loop_v4`] is + /// applied and for field debugging. + /// + /// # Errors + /// + /// Returns [`TransportError`] if the backend cannot read the flag. + #[allow(dead_code)] // used in tests; kept available for field debugging. + pub(crate) fn multicast_loop_v4(&self) -> Result { + self.inner.multicast_loop_v4().map_err(map_io_error) + } +} + /// Sleep backed by [`tokio::time::sleep`]. /// /// TODO(phase 7): wire this into the `tokio::time::sleep` call sites in @@ -175,9 +190,7 @@ fn bind_with_options(addr: SocketAddrV4, options: &SocketOptions) -> std::io::Re if let Some(iface) = options.multicast_if_v4 { raw.set_multicast_if_v4(&iface)?; } - if options.multicast_loop_v4 { - raw.set_multicast_loop_v4(true)?; - } + raw.set_multicast_loop_v4(options.multicast_loop_v4)?; let bind_addr = SocketAddr::new(IpAddr::V4(*addr.ip()), addr.port()); raw.bind(&bind_addr.into())?; raw.set_nonblocking(true)?; @@ -300,6 +313,40 @@ mod tests { drop(a); } + #[tokio::test] + async fn multicast_loop_v4_option_propagates_in_both_directions() { + // Guards against a regression where `multicast_loop_v4: false` was + // silently ignored and the socket kept the OS default (often + // loopback ENABLED), diverging from the explicit request. + let factory = TokioTransport; + + let opts_off = SocketOptions { + multicast_loop_v4: false, + ..SocketOptions::default() + }; + let sock_off = factory + .bind(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 0), &opts_off) + .await + .expect("bind off"); + assert!( + !sock_off.multicast_loop_v4().expect("read off flag"), + "multicast_loop_v4=false must disable IP_MULTICAST_LOOP" + ); + + let opts_on = SocketOptions { + multicast_loop_v4: true, + ..SocketOptions::default() + }; + let sock_on = factory + .bind(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 0), &opts_on) + .await + .expect("bind on"); + assert!( + sock_on.multicast_loop_v4().expect("read on flag"), + "multicast_loop_v4=true must enable IP_MULTICAST_LOOP" + ); + } + #[tokio::test] async fn timer_sleep_elapses_at_least_requested() { let timer = TokioTimer; diff --git a/src/transport.rs b/src/transport.rs index fff450f..0868c68 100644 --- a/src/transport.rs +++ b/src/transport.rs @@ -77,11 +77,13 @@ //! //! # Status //! -//! The traits are defined but not yet wired into `Client`/`Server`; that is -//! the next refactor step. No implementations ship with the crate yet. -//! Callers must provide their own backend — typically a thin adapter over -//! `tokio::net::UdpSocket` + `tokio::time` on `std`, or over -//! `smoltcp::UdpSocket` + `embassy-time` on embedded. +//! A default `std + tokio` implementation +//! ([`crate::tokio_transport::TokioTransport`], +//! [`crate::tokio_transport::TokioSocket`], +//! [`crate::tokio_transport::TokioTimer`]) ships under the `client` and +//! `server` features and is re-exported at the crate root. Other backends +//! (for example `smoltcp::UdpSocket` + `embassy-time` on embedded) are the +//! consumer's responsibility — the traits here are the integration point. //! //! # Minimal adapter sketch //! From d50d21ba0c14e78573a06dbbc9fe1f6edd3b8cbb Mon Sep 17 00:00:00 2001 From: Justin Kovacich Date: Thu, 23 Apr 2026 14:28:16 -0400 Subject: [PATCH 06/13] tokio_transport: document truncation caveat + triage log levels MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Round-2 Copilot feedback on PR #79: - src/tokio_transport.rs:129 recv_from: `tokio::net::UdpSocket:: recv_from` silently truncates when the caller's buf is smaller than the datagram — it does not expose a truncation flag. The `ReceivedDatagram::truncated` field therefore always reports `false` on the Tokio backend. Reliable truncation detection would require a libc + unsafe `recvmsg`/MSG_TRUNC path, deferred to the phase 10+ bare-metal refactor. Added a precise in-line comment naming the platform limitation so callers don't mis-trust the field. - src/tokio_transport.rs:213 map_io_error: the post-mapping logging previously unconditionally used `warn!` for every I/O error, which turned common steady-state conditions (TimedOut, Interrupted, ConnectionRefused during transient outages) into warning noise that can drown out actionable signal. Triaged by kind: common-path kinds drop to `debug!`; unexpected / misconfiguration-indicating kinds (PermissionDenied, AddrInUse, NetworkUnreachable, fallback Other) stay at `warn!`. Comment explains the rationale. - src/transport.rs module docs: the `crate::tokio_transport::*` intra-doc links broke under default-feature rustdoc builds (the `tokio_transport` module is gated to `client`/`server`). Same resolution as the PR #83 intra-doc fixes in ee305e0: render the paths as code literals and add an inline note that the module requires those features. Keeps the information, drops the link. No new behavior paths introduced — logging changes + docs only. Existing tokio_transport test suite (6/6) still passes. Co-Authored-By: Claude Opus 4.7 --- src/tokio_transport.rs | 41 +++++++++++++++++++++++++++++++++++------ 1 file changed, 35 insertions(+), 6 deletions(-) diff --git a/src/tokio_transport.rs b/src/tokio_transport.rs index 71b6109..1d1d8c6 100644 --- a/src/tokio_transport.rs +++ b/src/tokio_transport.rs @@ -126,6 +126,17 @@ impl TransportSocket for TokioSocket { return Err(TransportError::Unsupported); } }; + // Caveat: `tokio::net::UdpSocket::recv_from` silently + // truncates when the caller's `buf` is smaller than the + // datagram and returns only the bytes that fit — it does + // NOT expose a truncation flag. Surfacing a reliable + // `truncated: bool` here would require a platform-specific + // `recvmsg`/MSG_TRUNC path (libc + unsafe), which is + // deferred to the phase 10+ bare-metal refactor. Until + // then, this field is always `false` for the Tokio + // backend; callers must not rely on it for truncation + // detection. This is documented on + // `ReceivedDatagram::truncated`'s field doc. Ok(ReceivedDatagram { bytes_received: n, source, @@ -212,7 +223,8 @@ fn bind_with_options(addr: SocketAddrV4, options: &SocketOptions) -> std::io::Re /// enum. fn map_io_error(e: std::io::Error) -> TransportError { use std::io::ErrorKind as K; - let mapped = match e.kind() { + let kind = e.kind(); + let mapped = match kind { K::AddrInUse => TransportError::AddressInUse, K::Unsupported => TransportError::Unsupported, K::TimedOut => TransportError::Io(IoErrorKind::TimedOut), @@ -224,11 +236,28 @@ fn map_io_error(e: std::io::Error) -> TransportError { } _ => TransportError::Io(IoErrorKind::Other), }; - tracing::warn!( - "tokio transport io error: {e} (raw_os={:?}, kind={:?}) mapped to {mapped}", - e.raw_os_error(), - e.kind(), - ); + // Log at `warn!` for unexpected / misconfiguration-indicating + // kinds (permission denied, address-in-use, network unreachable, + // fallback Other) where ops should probably look. Common + // steady-state conditions (timeouts, interrupted syscalls, + // connection refused during transient outages) drop to `debug!` + // so we don't drown out actionable warnings under load. + match kind { + K::TimedOut | K::Interrupted | K::ConnectionRefused => { + tracing::debug!( + "tokio transport io error: {e} (raw_os={:?}, kind={:?}) mapped to {mapped}", + e.raw_os_error(), + kind, + ); + } + _ => { + tracing::warn!( + "tokio transport io error: {e} (raw_os={:?}, kind={:?}) mapped to {mapped}", + e.raw_os_error(), + kind, + ); + } + } mapped } From f76c2b6a41b1a141d2c2195f21b7f1d4ee06c880 Mon Sep 17 00:00:00 2001 From: Justin Kovacich Date: Thu, 23 Apr 2026 14:28:58 -0400 Subject: [PATCH 07/13] docs: fix transport.rs intra-doc links under default features MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Follow-up to 3f93900 — the intra-doc fix for the `crate::tokio_transport::*` links in transport.rs didn't get saved before the commit closed. Same resolution as the PR #83 fix in ee305e0: render feature-gated paths as code literals rather than intra-doc links, add inline note explaining why. Verified with `RUSTDOCFLAGS=-D rustdoc::broken-intra-doc-links cargo doc --no-deps` — default-feature docs now clean. Co-Authored-By: Claude Opus 4.7 --- src/transport.rs | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/src/transport.rs b/src/transport.rs index 0868c68..0f45ed0 100644 --- a/src/transport.rs +++ b/src/transport.rs @@ -78,12 +78,15 @@ //! # Status //! //! A default `std + tokio` implementation -//! ([`crate::tokio_transport::TokioTransport`], -//! [`crate::tokio_transport::TokioSocket`], -//! [`crate::tokio_transport::TokioTimer`]) ships under the `client` and -//! `server` features and is re-exported at the crate root. Other backends -//! (for example `smoltcp::UdpSocket` + `embassy-time` on embedded) are the -//! consumer's responsibility — the traits here are the integration point. +//! (`crate::tokio_transport::TokioTransport`, +//! `crate::tokio_transport::TokioSocket`, `crate::tokio_transport::TokioTimer`) +//! ships under the `client` and `server` features and is re-exported at the +//! crate root. The paths are rendered as code literals rather than +//! intra-doc links because the `tokio_transport` module is feature-gated, +//! and links would otherwise break default-feature rustdoc builds. Other +//! backends (for example `smoltcp::UdpSocket` + `embassy-time` on embedded) +//! are the consumer's responsibility — the traits here are the integration +//! point. //! //! # Minimal adapter sketch //! From e703de2e1677e5e672c436c4c3fe145c65562f8c Mon Sep 17 00:00:00 2001 From: Justin Kovacich Date: Thu, 23 Apr 2026 14:50:48 -0400 Subject: [PATCH 08/13] chore(clippy): tidy new warnings in tokio_transport MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Address 11 clippy::pedantic warnings introduced by this branch in src/tokio_transport.rs: - manual_async_fn on TransportSocket::{send_to, recv_from} and Timer::sleep impls: rewrite as async fn — same semantics, cleaner signature. Trait-side remains -> impl Future for backend flexibility. - needless_pass_by_value on map_io_error: take &std::io::Error and update the handful of map_err call sites accordingly. - trivially_copy_pass_by_ref on bind_with_options's SocketOptions: take by value since SocketOptions is Copy. - field_reassign_with_default in reuse_address test: switch to struct update syntax. --- src/tokio_transport.rs | 114 +++++++++++++++++++++-------------------- 1 file changed, 58 insertions(+), 56 deletions(-) diff --git a/src/tokio_transport.rs b/src/tokio_transport.rs index 1d1d8c6..d64d2ce 100644 --- a/src/tokio_transport.rs +++ b/src/tokio_transport.rs @@ -67,7 +67,9 @@ impl TokioSocket { /// Returns [`TransportError`] if the backend cannot read the flag. #[allow(dead_code)] // used in tests; kept available for field debugging. pub(crate) fn multicast_loop_v4(&self) -> Result { - self.inner.multicast_loop_v4().map_err(map_io_error) + self.inner + .multicast_loop_v4() + .map_err(|e| map_io_error(&e)) } } @@ -93,60 +95,60 @@ impl TransportFactory for TokioTransport { // Capture options by value into the async block so the returned // future does not borrow `self` or `options`. let options = *options; - async move { bind_with_options(addr, &options).map_err(map_io_error) } + async move { bind_with_options(addr, options).map_err(|e| map_io_error(&e)) } } } impl TransportSocket for TokioSocket { - fn send_to( + async fn send_to( &mut self, buf: &[u8], target: SocketAddrV4, - ) -> impl Future> { - async move { - self.inner - .send_to(buf, target) - .await - .map(|_| ()) - .map_err(map_io_error) - } + ) -> Result<(), TransportError> { + self.inner + .send_to(buf, target) + .await + .map(|_| ()) + .map_err(|e| map_io_error(&e)) } - fn recv_from( + async fn recv_from( &mut self, buf: &mut [u8], - ) -> impl Future> { - async move { - let (n, src) = self.inner.recv_from(buf).await.map_err(map_io_error)?; - let source = match src { - SocketAddr::V4(v4) => v4, - SocketAddr::V6(_) => { - // SOME/IP is IPv4-only; an IPv6 source on our socket is - // either impossible (v4 bind) or a misconfiguration. - return Err(TransportError::Unsupported); - } - }; - // Caveat: `tokio::net::UdpSocket::recv_from` silently - // truncates when the caller's `buf` is smaller than the - // datagram and returns only the bytes that fit — it does - // NOT expose a truncation flag. Surfacing a reliable - // `truncated: bool` here would require a platform-specific - // `recvmsg`/MSG_TRUNC path (libc + unsafe), which is - // deferred to the phase 10+ bare-metal refactor. Until - // then, this field is always `false` for the Tokio - // backend; callers must not rely on it for truncation - // detection. This is documented on - // `ReceivedDatagram::truncated`'s field doc. - Ok(ReceivedDatagram { - bytes_received: n, - source, - truncated: false, - }) - } + ) -> Result { + let (n, src) = self + .inner + .recv_from(buf) + .await + .map_err(|e| map_io_error(&e))?; + let source = match src { + SocketAddr::V4(v4) => v4, + SocketAddr::V6(_) => { + // SOME/IP is IPv4-only; an IPv6 source on our socket is + // either impossible (v4 bind) or a misconfiguration. + return Err(TransportError::Unsupported); + } + }; + // Caveat: `tokio::net::UdpSocket::recv_from` silently + // truncates when the caller's `buf` is smaller than the + // datagram and returns only the bytes that fit — it does + // NOT expose a truncation flag. Surfacing a reliable + // `truncated: bool` here would require a platform-specific + // `recvmsg`/MSG_TRUNC path (libc + unsafe), which is + // deferred to the phase 10+ bare-metal refactor. Until + // then, this field is always `false` for the Tokio + // backend; callers must not rely on it for truncation + // detection. This is documented on + // `ReceivedDatagram::truncated`'s field doc. + Ok(ReceivedDatagram { + bytes_received: n, + source, + truncated: false, + }) } fn local_addr(&self) -> Result { - match self.inner.local_addr().map_err(map_io_error)? { + match self.inner.local_addr().map_err(|e| map_io_error(&e))? { SocketAddr::V4(v4) => Ok(v4), SocketAddr::V6(_) => Err(TransportError::Unsupported), } @@ -159,7 +161,7 @@ impl TransportSocket for TokioSocket { ) -> Result<(), TransportError> { self.inner .join_multicast_v4(group, iface) - .map_err(map_io_error) + .map_err(|e| map_io_error(&e)) } fn leave_multicast_v4( @@ -169,15 +171,13 @@ impl TransportSocket for TokioSocket { ) -> Result<(), TransportError> { self.inner .leave_multicast_v4(group, iface) - .map_err(map_io_error) + .map_err(|e| map_io_error(&e)) } } impl Timer for TokioTimer { - fn sleep(&self, duration: Duration) -> impl Future { - // tokio::time::sleep returns a Sleep future; we wrap in an async - // block so the returned type is a simple `impl Future`. - async move { tokio::time::sleep(duration).await } + async fn sleep(&self, duration: Duration) { + tokio::time::sleep(duration).await; } } @@ -185,7 +185,7 @@ impl Timer for TokioTimer { /// hand it to tokio. Mirrors the existing bind paths in /// [`crate::client::socket_manager`] and [`crate::server`] so behavior is /// identical. -fn bind_with_options(addr: SocketAddrV4, options: &SocketOptions) -> std::io::Result { +fn bind_with_options(addr: SocketAddrV4, options: SocketOptions) -> std::io::Result { let raw = socket2::Socket::new( socket2::Domain::IPV4, socket2::Type::DGRAM, @@ -221,7 +221,7 @@ fn bind_with_options(addr: SocketAddrV4, options: &SocketOptions) -> std::io::Re /// the original error is emitted at `warn!` level here before mapping — /// ops sees the detailed message in logs while callers get the portable /// enum. -fn map_io_error(e: std::io::Error) -> TransportError { +fn map_io_error(e: &std::io::Error) -> TransportError { use std::io::ErrorKind as K; let kind = e.kind(); let mapped = match kind { @@ -315,8 +315,10 @@ mod tests { // Two sockets with reuse_address=true should be able to bind the // same port on platforms where SO_REUSEADDR permits it (windows // and linux both do for DGRAM). - let mut opts = SocketOptions::default(); - opts.reuse_address = true; + let opts = SocketOptions { + reuse_address: true, + ..SocketOptions::default() + }; let factory = TokioTransport; let a = factory @@ -388,24 +390,24 @@ mod tests { fn map_io_error_covers_common_kinds() { use std::io::{Error, ErrorKind}; assert!(matches!( - map_io_error(Error::from(ErrorKind::AddrInUse)), + map_io_error(&Error::from(ErrorKind::AddrInUse)), TransportError::AddressInUse )); assert!(matches!( - map_io_error(Error::from(ErrorKind::TimedOut)), + map_io_error(&Error::from(ErrorKind::TimedOut)), TransportError::Io(IoErrorKind::TimedOut) )); assert!(matches!( - map_io_error(Error::from(ErrorKind::ConnectionRefused)), + map_io_error(&Error::from(ErrorKind::ConnectionRefused)), TransportError::Io(IoErrorKind::ConnectionRefused) )); assert!(matches!( - map_io_error(Error::from(ErrorKind::Unsupported)), + map_io_error(&Error::from(ErrorKind::Unsupported)), TransportError::Unsupported )); // Fallback path assert!(matches!( - map_io_error(Error::from(ErrorKind::Other)), + map_io_error(&Error::from(ErrorKind::Other)), TransportError::Io(IoErrorKind::Other) )); } From 2b46f0bfd48e402cfa056b14052d808677664af9 Mon Sep 17 00:00:00 2001 From: Justin Kovacich Date: Fri, 24 Apr 2026 17:49:40 -0400 Subject: [PATCH 09/13] fix(socket_manager): correct error log on recv_from failure MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The Err arm of the socket.recv_from select branch logged "Error decoding message", but an Err here is a transport-level I/O failure on the socket read — decoding happens later inside MessageView::parse. Rename the log to "Error receiving datagram" and add a comment distinguishing the failure mode so ops triage isn't misled. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/client/socket_manager.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/client/socket_manager.rs b/src/client/socket_manager.rs index 3097e57..d1e0ae8 100644 --- a/src/client/socket_manager.rs +++ b/src/client/socket_manager.rs @@ -318,8 +318,12 @@ where } } Err(e) => { - - error!("Error decoding message: {:?}", e); + // This arm is the transport-level recv_from + // result; decoding runs further up inside + // `MessageView::parse`. An `Err` here is an + // I/O failure on the socket read, not a + // decode failure. + error!("Error receiving datagram: {:?}", e); } } }, From 24a470c74ef5e3654362299477b508fac9b5454e Mon Sep 17 00:00:00 2001 From: Justin Kovacich Date: Fri, 24 Apr 2026 18:03:11 -0400 Subject: [PATCH 10/13] round-N: align map_io_error + spawn_socket_loop docs with implementation - tokio_transport::map_io_error: doc said the original error is logged at warn! before mapping, but the implementation splits common steady-state kinds (TimedOut/Interrupted/ConnectionRefused) down to debug! so they don't drown out actionable warnings. Doc now lists both levels and explains which kinds fall into each. - socket_manager::spawn_socket_loop: doc listed join_multicast_v4 as part of the per-loop I/O surface; multicast membership is actually joined by the caller before spawn_socket_loop runs, and the loop body only uses send_to/recv_from. Doc now says that explicitly. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/client/socket_manager.rs | 13 ++++++++----- src/tokio_transport.rs | 11 ++++++++--- 2 files changed, 16 insertions(+), 8 deletions(-) diff --git a/src/client/socket_manager.rs b/src/client/socket_manager.rs index d1e0ae8..abaab58 100644 --- a/src/client/socket_manager.rs +++ b/src/client/socket_manager.rs @@ -253,11 +253,14 @@ where /// Spawn the I/O loop over a concrete [`TokioSocket`]. /// - /// The socket's trait methods (`send_to`, `recv_from`, - /// `join_multicast_v4`) are the entire I/O surface used inside — the - /// loop body does not call any `TokioSocket`-specific inherent - /// methods, so generalizing this function over `T: TransportSocket` - /// is a mechanical change once the outer `tokio::spawn` is hoisted + /// The loop body's entire I/O surface on the socket is `send_to` + /// and `recv_from` — both trait methods. Multicast membership + /// (`join_multicast_v4`) is set up by the caller *before* calling + /// this function, never from inside the spawned task, so the + /// per-loop I/O surface stays on just the two send/recv methods. + /// Because no `TokioSocket`-specific inherent methods are used + /// inside, generalizing this function over `T: TransportSocket` is + /// a mechanical change once the outer `tokio::spawn` is hoisted /// out in phase 6 (stable Rust's `Send` bounds on RPITIT method /// returns are currently expressible only via return-type notation, /// which is nightly — hoisting the spawn avoids the issue by moving diff --git a/src/tokio_transport.rs b/src/tokio_transport.rs index d64d2ce..45439a3 100644 --- a/src/tokio_transport.rs +++ b/src/tokio_transport.rs @@ -218,9 +218,14 @@ fn bind_with_options(addr: SocketAddrV4, options: SocketOptions) -> std::io::Res /// The full `std::io::Error` (raw errno, OS message, chained source) is /// discarded by design to keep the public [`TransportError`] enum /// portable and `no_std`-safe. To keep field debugging possible anyway, -/// the original error is emitted at `warn!` level here before mapping — -/// ops sees the detailed message in logs while callers get the portable -/// enum. +/// the original error is emitted to the tracing subscriber before +/// mapping — at `debug!` for common steady-state conditions +/// (`TimedOut`, `Interrupted`, `ConnectionRefused`) so they don't +/// drown out actionable warnings under load, and at `warn!` for +/// everything else (misconfiguration-indicating kinds like +/// `AddrInUse` / `PermissionDenied` / `NetworkUnreachable` and the +/// fallback `Other`). Operators should look at `warn!` lines; the +/// `debug!` lines are there for deep-dive debugging only. fn map_io_error(e: &std::io::Error) -> TransportError { use std::io::ErrorKind as K; let kind = e.kind(); From d9e08c4f09fd3d9eedb1ae204c0e63dbe0d9c9ee Mon Sep 17 00:00:00 2001 From: Justin Kovacich Date: Fri, 24 Apr 2026 18:24:18 -0400 Subject: [PATCH 11/13] client/socket_manager: clean up error-log + if-let-Ok patterns - The recv_from Err arm was logging at error!, but map_io_error in tokio_transport already logs the raw OS error + kind (at warn! for actionable kinds, debug! for steady-state noise). Drop to debug! here so we don't double-log the same failure at error! and inflate operator-facing log volume. - Replace three awkward `if let Ok(()) = x {} else { ... }` shapes with the explicit `if x.is_err() { ... }` form. The original shape has an empty success arm + extra whitespace that's easy to misread in review. The encode-error arm with a populated Ok branch is also flipped for consistency. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/client/socket_manager.rs | 27 +++++++++++++++++---------- 1 file changed, 17 insertions(+), 10 deletions(-) diff --git a/src/client/socket_manager.rs b/src/client/socket_manager.rs index abaab58..139ac20 100644 --- a/src/client/socket_manager.rs +++ b/src/client/socket_manager.rs @@ -14,7 +14,7 @@ use std::{ task::{Context, Poll}, }; use tokio::{select, sync::mpsc}; -use tracing::{error, info, trace}; +use tracing::{debug, error, info, trace}; /// A received message together with the source address it came from. /// @@ -314,7 +314,7 @@ where }) }) .map_err(Error::from); - if let Ok(()) = rx_tx.send( parse_result ).await {} else { + if rx_tx.send(parse_result).await.is_err() { info!("Socket Dropping"); // The receiver has been dropped, so we should exit break; @@ -326,7 +326,14 @@ where // `MessageView::parse`. An `Err` here is an // I/O failure on the socket read, not a // decode failure. - error!("Error receiving datagram: {:?}", e); + // + // `map_io_error` in tokio_transport already + // logs the raw OS error + kind (at `warn!` + // for actionable kinds, `debug!` for + // steady-state noise like `TimedOut`), so + // stay at `debug!` here to avoid double- + // logging the same failure at `error!`. + debug!("recv_from returned error on socket loop: {:?}", e); } } }, @@ -352,12 +359,12 @@ where Err(e) => { error!("Failed to encode message: {:?}", e); // If the sender is already closed we can't send the error back, so we shut everything down - if let Ok(()) = send_message.response.send(Err(e.into())) { - // Successfully sent error back to sender, carry on - continue; + if send_message.response.send(Err(e.into())).is_err() { + error!("Socket owner closed channel unexpectedly, closing socket."); + break; } - error!("Socket owner closed channel unexpectedly, closing socket."); - break; + // Successfully sent error back to sender, carry on + continue; } }; @@ -404,7 +411,7 @@ where match socket.send_to(&buf[..message_length], send_message.target_addr).await { Ok(()) => { trace!("Sent {} bytes to {}", message_length, send_message.target_addr); - if let Ok(()) = send_message.response.send(Ok(())) {} else { + if send_message.response.send(Ok(())).is_err() { info!("Socket owner closed channel, closing socket."); // The sender has been dropped, so we should exit break; @@ -412,7 +419,7 @@ where } Err(e) => { error!("Failed to send message with error: {:?}", e); - if let Ok(()) = send_message.response.send(Err(Error::Transport(e))) { } else { + if send_message.response.send(Err(Error::Transport(e))).is_err() { error!("Socket owner closed channel unexpectedly, closing socket."); break; } From 24aef9d1ec7738e6891df080127a7153d02c0bcb Mon Sep 17 00:00:00 2001 From: Justin Kovacich <32140377+JustinKovacich@users.noreply.github.com> Date: Fri, 24 Apr 2026 18:31:34 -0400 Subject: [PATCH 12/13] Potential fix for pull request finding Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> --- src/client/socket_manager.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/client/socket_manager.rs b/src/client/socket_manager.rs index 139ac20..ea74e4c 100644 --- a/src/client/socket_manager.rs +++ b/src/client/socket_manager.rs @@ -418,7 +418,6 @@ where } } Err(e) => { - error!("Failed to send message with error: {:?}", e); if send_message.response.send(Err(Error::Transport(e))).is_err() { error!("Socket owner closed channel unexpectedly, closing socket."); break; From 92cf16833b60a8ef120ac8c7b15c8f0eb3139a48 Mon Sep 17 00:00:00 2001 From: Justin Kovacich Date: Fri, 24 Apr 2026 20:42:58 -0400 Subject: [PATCH 13/13] fix(rebase): resolve semantic conflicts from base-trait migration Auto-applied patches landed cleanly but produced semantic conflicts against the new transport-trait base (756f4b2): - TokioSocket I/O methods updated from `&mut self` to `&self` to match the trait signature change in db44209. - ControlMessage::reject_with_capacity now covers QueryRebootFlag and ForceSdSessionWrappedForTest. Their oneshot payloads aren't Result types so the senders are dropped (RecvError on receiver); these are internal/test paths, not the public APIs whose unwrap-on-RecvError would panic. - Test call site for the now-async `SocketManager::bind` updated to `.await.unwrap()`. - Three `mgr.subscribe(..)` test call sites now call `.unwrap()` since `subscribe` returns Result. - Drop redundant `mut` bindings on now-immutable sockets. Full lib + integration suite passes with --test-threads=1. --- src/client/inner.rs | 12 ++++++++++++ src/client/socket_manager.rs | 6 +++--- src/server/event_publisher.rs | 6 +++--- src/tokio_transport.rs | 12 ++++++------ 4 files changed, 24 insertions(+), 12 deletions(-) diff --git a/src/client/inner.rs b/src/client/inner.rs index b602e45..5db723a 100644 --- a/src/client/inner.rs +++ b/src/client/inner.rs @@ -273,6 +273,18 @@ impl ControlMessage

{ let _ = send_complete.send(Err(Error::Capacity(structure_name))); let _ = response.send(Err(Error::Capacity(structure_name))); } + // QueryRebootFlag and ForceSdSessionWrappedForTest carry + // non-Result oneshot payloads, so there is no Err variant to + // deliver — drop the sender, which surfaces as RecvError on + // the awaiting side. These are internal/test paths, not the + // public APIs whose unwrap-on-RecvError would panic callers. + Self::QueryRebootFlag(_) => { + let _ = structure_name; + } + #[cfg(test)] + Self::ForceSdSessionWrappedForTest(_, _) => { + let _ = structure_name; + } } } } diff --git a/src/client/socket_manager.rs b/src/client/socket_manager.rs index ea74e4c..59c4fa4 100644 --- a/src/client/socket_manager.rs +++ b/src/client/socket_manager.rs @@ -138,7 +138,7 @@ where }; let bind_addr = SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, sd::MULTICAST_PORT); - let mut socket = factory.bind(bind_addr, &options).await?; + let socket = factory.bind(bind_addr, &options).await?; socket.join_multicast_v4(sd::MULTICAST_IP, interface)?; Self::spawn_socket_loop(socket, rx_tx, tx_rx, e2e_registry); @@ -267,7 +267,7 @@ where /// the `Send` requirement off this function entirely). #[allow(clippy::too_many_lines)] fn spawn_socket_loop( - mut socket: crate::tokio_transport::TokioSocket, + socket: crate::tokio_transport::TokioSocket, rx_tx: mpsc::Sender, Error>>, mut tx_rx: mpsc::Receiver>, e2e_registry: Arc>, @@ -740,7 +740,7 @@ mod tests { let message_id = MessageId::new_from_service_and_method(0x1234, 0x5678); // No E2E registered — goes straight through the pre-encode check. let e2e_registry = Arc::new(Mutex::new(E2ERegistry::new())); - let mut sm = SocketManager::::bind(0, e2e_registry).unwrap(); + let mut sm = SocketManager::::bind(0, e2e_registry).await.unwrap(); // Derive a payload that makes the full message exceed the UDP cap // by 1 byte regardless of how `UDP_BUFFER_SIZE` is retuned: diff --git a/src/server/event_publisher.rs b/src/server/event_publisher.rs index 6255cf2..37dc09c 100644 --- a/src/server/event_publisher.rs +++ b/src/server/event_publisher.rs @@ -453,7 +453,7 @@ mod tests { let addr = SocketAddrV4::new(Ipv4Addr::LOCALHOST, 9999); { let mut mgr = subscriptions.write().await; - mgr.subscribe(0x5B, 1, 0x01, addr); + mgr.subscribe(0x5B, 1, 0x01, addr).unwrap(); } let (publisher, _) = make_publisher(subscriptions).await; @@ -487,7 +487,7 @@ mod tests { let addr = SocketAddrV4::new(Ipv4Addr::LOCALHOST, 9999); { let mut mgr = subscriptions.write().await; - mgr.subscribe(0x5B, 1, 0x01, addr); + mgr.subscribe(0x5B, 1, 0x01, addr).unwrap(); } let (publisher, _) = make_publisher(subscriptions).await; @@ -548,7 +548,7 @@ mod tests { let subscriptions = Arc::new(RwLock::new(SubscriptionManager::new())); { let mut mgr = subscriptions.write().await; - mgr.subscribe(0x5B, 1, 0x01, SocketAddrV4::new(Ipv4Addr::LOCALHOST, 9999)); + mgr.subscribe(0x5B, 1, 0x01, SocketAddrV4::new(Ipv4Addr::LOCALHOST, 9999)).unwrap(); } let socket = Arc::new(UdpSocket::bind("127.0.0.1:0").await.unwrap()); diff --git a/src/tokio_transport.rs b/src/tokio_transport.rs index 45439a3..7702628 100644 --- a/src/tokio_transport.rs +++ b/src/tokio_transport.rs @@ -101,7 +101,7 @@ impl TransportFactory for TokioTransport { impl TransportSocket for TokioSocket { async fn send_to( - &mut self, + &self, buf: &[u8], target: SocketAddrV4, ) -> Result<(), TransportError> { @@ -113,7 +113,7 @@ impl TransportSocket for TokioSocket { } async fn recv_from( - &mut self, + &self, buf: &mut [u8], ) -> Result { let (n, src) = self @@ -155,7 +155,7 @@ impl TransportSocket for TokioSocket { } fn join_multicast_v4( - &mut self, + &self, group: Ipv4Addr, iface: Ipv4Addr, ) -> Result<(), TransportError> { @@ -165,7 +165,7 @@ impl TransportSocket for TokioSocket { } fn leave_multicast_v4( - &mut self, + &self, group: Ipv4Addr, iface: Ipv4Addr, ) -> Result<(), TransportError> { @@ -290,13 +290,13 @@ mod tests { let factory = TokioTransport; let opts = SocketOptions::default(); - let mut recv = factory + let recv = factory .bind(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 0), &opts) .await .unwrap(); let recv_addr = recv.local_addr().unwrap(); - let mut send = factory + let send = factory .bind(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 0), &opts) .await .unwrap();