From 398289b30d081d512d1a5d73975390e8d79188ba Mon Sep 17 00:00:00 2001 From: Justin Kovacich Date: Fri, 24 Apr 2026 20:52:56 -0400 Subject: [PATCH] phase 7: select-fairness, FusedFuture migration, Timer trait wiring MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Final state of #81, squashed from 8 commits to keep the rebase onto the rewritten #80 tractable. The intermediate history includes a transient select_biased! step that was reverted to plain select! for fairness within the same PR, which would have produced a contradictory mid-rebase state if replayed individually. Original commits, oldest first: - 888bcfa Add futures dep behind client/server features for upcoming phase 7 work. Enables futures::FutureExt / pin_mut / select macros in the event loops; no code paths use it yet. - be42199 Dropped tokio::select in favor of futures::select_biased! (with FutureExt::fuse + pin_mut!) in socket_manager and client::inner; removed tokio::spawn(...) from Client::new (the run-future is now returned to the caller); migrated client.start_sd_announcements -> sd_announcements_loop returning a future for the caller to spawn. - 2f90058 Utilize TokioTimer for the loop sleeps, one step closer to a bare-metal replacement. - 0df42f2 Address adversarial review for phase 7 — fairness, readability, coverage: * select_biased! -> select! at the 3 event-loop sites (socket_manager, inner::run_future, server::run). Restores random per-poll fairness and eliminates the arm-starvation risk introduced by biased ordering. * Imports Timer + TokioTimer at each Timer call site so UFCS reduces to method syntax (TokioTimer.sleep(d)). * Hoists socket_manager's Outcome

enum out of spawn_socket_loop's async block to module scope. * Updates stale "phase 6" doc references to point at the actual planned hoist phase (8, alongside the bare-metal example). * New tests: sd_announcements_loop_cadence_stays_close_to_requested bind_with_transport_carries_traffic_end_to_end bind_with_transport_propagates_factory_error client_new_run_future_is_send_static - 27f9e67 phase 7: respond to PR #81 feedback (Copilot): (1) correct futures dep comment in Cargo.toml to describe actual usage (select! + FutureExt::fuse / pin_mut!); (2) rename recv error log to "Transport recv failed" with binding `recv_err` since the error is from socket.recv_from, not a parse step; (3) drop the pre-loop sleep in sd_announcements_loop so the first announcement lands at ~1x interval instead of ~2x; in-loop sleep carries the initial-delay role. New test sd_announcements_loop_first_emit_within_one_interval pins first-emit latency below 250ms at 100ms interval. - 4f7f9d5 docs: fix stale Client API name in passive-server error message (start_sd_announcements -> sd_announcements_loop). - f5c674a chore(clippy): tidy new warnings in phase7 PR: * match_wildcard_for_single_variants on SocketAddr match — make the wildcard explicit as `other @ SocketAddr::V6(_)`. * manual_async_fn on AlwaysBusyFactory test-impl of TransportFactory::bind: rewrite as async fn. - 33ce551 round-3: fix #[ignore] reasons on sd_announcements_loop tests. Align both tests under #[ignore] with an accurate reason referencing the real constraint (MULTICAST on the loopback interface); drop the stale sd_state.rs reference. Co-Authored-By: Claude Opus 4.7 (1M context) --- Cargo.lock | 89 ++++++ Cargo.toml | 12 +- src/client/inner.rs | 289 +++++++++++--------- src/client/mod.rs | 263 +++++++++++++++--- src/client/socket_manager.rs | 514 ++++++++++++++++++++--------------- src/server/mod.rs | 48 +++- src/tokio_transport.rs | 13 +- 7 files changed, 824 insertions(+), 404 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3fd7af0..a5f2c35 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -56,6 +56,82 @@ version = "0.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9eb1aa714776b75c7e67e1da744b81a129b3ff919c8712b5e1b32252c1f07cc7" +[[package]] +name = "futures" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b147ee9d1f6d097cef9ce628cd2ee62288d963e16fb287bd9286455b241382d" +dependencies = [ + "futures-channel", + "futures-core", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-channel" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07bbe89c50d7a535e539b8c17bc0b49bdb77747034daa8087407d655f3f7cc1d" +dependencies = [ + "futures-core", + "futures-sink", +] + +[[package]] +name = "futures-core" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e3450815272ef58cec6d564423f6e755e25379b217b0bc688e295ba24df6b1d" + +[[package]] +name = "futures-io" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cecba35d7ad927e23624b22ad55235f2239cfa44fd10428eecbeba6d6a717718" + +[[package]] +name = "futures-macro" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e835b70203e41293343137df5c0664546da5745f82ec9b84d40be8336958447b" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "futures-sink" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c39754e157331b013978ec91992bde1ac089843443c49cbc7f46150b0fad0893" + +[[package]] +name = "futures-task" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "037711b3d59c33004d3856fbdc83b99d4ff37a24768fa1be9ce3538a1cde4393" + +[[package]] +name = "futures-util" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "389ca41296e6190b48053de0321d02a77f32f8a5d2461dd38762c0593805c6d6" +dependencies = [ + "futures-channel", + "futures-core", + "futures-io", + "futures-macro", + "futures-sink", + "futures-task", + "memchr", + "pin-project-lite", + "slab", +] + [[package]] name = "hash32" version = "0.3.1" @@ -93,6 +169,12 @@ version = "0.4.29" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5e5032e24019045c762d3c0f28f5b6b8bbf38563a65908389bf7978758920897" +[[package]] +name = "memchr" +version = "2.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8ca58f447f06ed17d5fc4043ce1b10dd205e060fb3ce5b979b8ed8e59ff3f79" + [[package]] name = "mio" version = "1.2.0" @@ -158,6 +240,7 @@ version = "0.7.0" dependencies = [ "crc", "embedded-io", + "futures", "heapless", "socket2 0.5.10", "thiserror", @@ -166,6 +249,12 @@ dependencies = [ "tracing-subscriber", ] +[[package]] +name = "slab" +version = "0.4.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c790de23124f9ab44544d7ac05d60440adc586479ce501c1d6d7da3cd8c9cf5" + [[package]] name = "smallvec" version = "1.15.1" diff --git a/Cargo.toml b/Cargo.toml index 45bde7f..8ee0290 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,6 +12,14 @@ repository = "https://github.com/luminartech/simple_someip" [dependencies] crc = "3.4" embedded-io = { version = "0.7" } +# `futures` pulls in `futures-util` which provides the executor-agnostic +# `select!` macro and `FutureExt::fuse` / `pin_mut!` helpers — used by +# the client/server event loops in place of `tokio::select!`. Default +# features disabled so we only pull in the parts we use. +futures = { version = "0.3", default-features = false, features = [ + "async-await", + "std", +], optional = true } heapless = "0.9" socket2 = { version = "0.5", optional = true, features = ["all"] } thiserror = { version = "2", default-features = false } @@ -31,8 +39,8 @@ tracing-subscriber = "0.3" [features] default = ["std"] std = ["embedded-io/std", "thiserror/std", "tracing/std"] -client = ["std", "dep:tokio", "dep:socket2"] -server = ["std", "dep:tokio", "dep:socket2"] +client = ["std", "dep:tokio", "dep:socket2", "dep:futures"] +server = ["std", "dep:tokio", "dep:socket2", "dep:futures"] [[test]] name = "client_server" diff --git a/src/client/inner.rs b/src/client/inner.rs index 8d35a84..6eb1a06 100644 --- a/src/client/inner.rs +++ b/src/client/inner.rs @@ -1,3 +1,4 @@ +use futures::{FutureExt, pin_mut, select}; use heapless::{Deque, index_map::FnvIndexMap}; use std::{ borrow::ToOwned, @@ -6,16 +7,14 @@ use std::{ sync::{Arc, Mutex}, task::Poll, }; -use tokio::{ - select, - sync::{ - mpsc::{self, Receiver, Sender}, - oneshot, - }, +use tokio::sync::{ + mpsc::{self, Receiver, Sender}, + oneshot, }; use tracing::{debug, error, info, trace, warn}; use crate::{ + Timer, client::{ ClientUpdate, DiscoveryMessage, service_registry::{ServiceEndpointInfo, ServiceInstanceId, ServiceRegistry}, @@ -24,6 +23,7 @@ use crate::{ }, e2e::E2ERegistry, protocol::{self, Message}, + tokio_transport::TokioTimer, traits::PayloadWireFormat, }; @@ -892,149 +892,182 @@ where async move { info!("SOME/IP Client processing loop started"); loop { - let Self { - control_receiver, - pending_responses, - discovery_socket, - unicast_sockets, - update_sender, - request_queue, - session_tracker, - service_registry, - run, - .. - } = &mut self; - select! { - () = tokio::time::sleep(std::time::Duration::from_millis(125)) => {} - // Receive a control message only when the request queue - // has spare capacity, so we apply backpressure on the - // control channel instead of dropping the message — - // which would cancel any embedded oneshot senders and - // surface to callers as `RecvError` (mapped to - // `Error::Shutdown`), conflating overload with shutdown. - ctrl = control_receiver.recv(), if request_queue.len() < REQUEST_QUEUE_CAP => { + // Scope the `&mut self` destructure + pinned per-iteration + // futures so all borrows of `self` drop before we call + // `self.handle_control_message().await` below. `pin_mut!` + // creates stack-pinned locals that outlive the select + // macro, so the inner block is required to release those + // borrows. + let should_break = { + let Self { + control_receiver, + pending_responses, + discovery_socket, + unicast_sockets, + update_sender, + request_queue, + session_tracker, + service_registry, + run, + .. + } = &mut self; + // Build fresh per-iteration futures and fuse them for + // `select!`'s `FusedFuture + Unpin` bound. + // `receive_discovery` / `receive_any_unicast` are + // async fns that are not `Unpin`; the `Timer::sleep` + // future likewise. Stack-pinning via `pin_mut!` + // satisfies both. + // + // The 125ms idle tick goes through the `Timer` trait + // rather than `tokio::time::sleep` directly so a + // bare-metal swap to `embassy_time` (or any other + // `Timer` impl) is a one-line change here. Today it + // resolves to `TokioTimer`. + let control_fut = control_receiver.recv().fuse(); + let sleep_fut = TokioTimer + .sleep(std::time::Duration::from_millis(125)) + .fuse(); + let discovery_fut = Inner::receive_discovery(discovery_socket).fuse(); + let unicast_fut = Inner::receive_any_unicast(unicast_sockets).fuse(); + pin_mut!(control_fut, sleep_fut, discovery_fut, unicast_fut); + + // `select!` (not `select_biased!`) randomizes the + // arm check order each poll so no single arm can + // starve the others under sustained load. Matches + // the original `tokio::select!` fairness behavior. + select! { + // Receive a control message + ctrl = control_fut => { if let Some(ctrl) = ctrl { debug!("Received control message: {:?}", ctrl); - let push_result = request_queue.push_back(ctrl); - debug_assert!( - push_result.is_ok(), - "request_queue had capacity before recv but push_back failed" - ); + if request_queue.push_back(ctrl).is_err() { + // Queue full: the rejected ControlMessage is + // dropped, so any oneshot senders inside it + // cancel — callers awaiting those receivers + // will observe `RecvError`. + warn!( + "request_queue at capacity ({}); dropping control message", + REQUEST_QUEUE_CAP + ); + } } else { // The sender has been dropped, so we should exit *run = false; } } + () = sleep_fut => {} // Receive a discovery message - discovery = Inner::receive_discovery(discovery_socket) => { - trace!("Received discovery message: {:?}", discovery); - match discovery { - Ok((source, someip_header, sd_header)) => { - // Extract session ID from SOME/IP request_id (lower 16 bits) - let session_id = (someip_header.request_id() & 0xFFFF) as u16; - let sd_payload = PayloadDefinitions::new_sd_payload(&sd_header); - // Extract reboot flag from the SD payload flags - let reboot_flag = sd_payload - .sd_flags() - .map_or(crate::protocol::sd::RebootFlag::Continuous, |f| { - f.reboot() - }); - - // Track sender session/reboot state for every SD entry - // that identifies a service instance, not only - // offer/stop-offer entries. This ensures reboot - // detection works for all SD traffic (FindService, - // Subscribe, SubscribeAck, etc.). - let mut rebooted = false; - for (svc_id, inst_id) in sd_payload.service_instances() { - let verdict = session_tracker.check( - source, - TransportKind::Multicast, - svc_id, - inst_id, - session_id, - reboot_flag, - ); - if verdict == SessionVerdict::Reboot { - rebooted = true; + discovery = discovery_fut => { + trace!("Received discovery message: {:?}", discovery); + match discovery { + Ok((source, someip_header, sd_header)) => { + // Extract session ID from SOME/IP request_id (lower 16 bits) + let session_id = (someip_header.request_id() & 0xFFFF) as u16; + let sd_payload = PayloadDefinitions::new_sd_payload(&sd_header); + // Extract reboot flag from the SD payload flags + let reboot_flag = sd_payload + .sd_flags() + .map_or(crate::protocol::sd::RebootFlag::Continuous, |f| { + f.reboot() + }); + + // Track sender session/reboot state for every SD entry + // that identifies a service instance, not only + // offer/stop-offer entries. This ensures reboot + // detection works for all SD traffic (FindService, + // Subscribe, SubscribeAck, etc.). + let mut rebooted = false; + for (svc_id, inst_id) in sd_payload.service_instances() { + let verdict = session_tracker.check( + source, + TransportKind::Multicast, + svc_id, + inst_id, + session_id, + reboot_flag, + ); + if verdict == SessionVerdict::Reboot { + rebooted = true; + } } - } - // Auto-populate service registry from offer/stop-offer - // SD entries. - for ep in sd_payload.offered_endpoints() { - let id = ServiceInstanceId { - service_id: ep.service_id, - instance_id: ep.instance_id, - }; - if ep.is_offer { - if let Some(addr) = ep.addr { - service_registry.insert( - id, - ServiceEndpointInfo { - addr, - local_port: 0, - major_version: ep.major_version, - minor_version: ep.minor_version, - }, - ); + // Auto-populate service registry from offer/stop-offer + // SD entries. + for ep in sd_payload.offered_endpoints() { + let id = ServiceInstanceId { + service_id: ep.service_id, + instance_id: ep.instance_id, + }; + if ep.is_offer { + if let Some(addr) = ep.addr { + service_registry.insert( + id, + ServiceEndpointInfo { + addr, + local_port: 0, + major_version: ep.major_version, + minor_version: ep.minor_version, + }, + ); + trace!( + "Registry: added 0x{:04X}.0x{:04X} -> {}", + ep.service_id, ep.instance_id, addr, + ); + } + } else { + service_registry.remove(id); trace!( - "Registry: added 0x{:04X}.0x{:04X} -> {}", - ep.service_id, ep.instance_id, addr, + "Registry: removed 0x{:04X}.0x{:04X}", + ep.service_id, ep.instance_id, ); } - } else { - service_registry.remove(id); - trace!( - "Registry: removed 0x{:04X}.0x{:04X}", - ep.service_id, ep.instance_id, - ); } - } - if rebooted { - let _ = update_sender.send(ClientUpdate::SenderRebooted(source)); - } + if rebooted { + let _ = update_sender.send(ClientUpdate::SenderRebooted(source)); + } - let discovery_msg = DiscoveryMessage { - source, - someip_header, - sd_header, - }; - let _ = update_sender.send(ClientUpdate::DiscoveryUpdated(discovery_msg)); - } - Err(err) => { - error!("Error receiving discovery message: {:?}", err); - let _ = update_sender.send(ClientUpdate::Error(err)); + let discovery_msg = DiscoveryMessage { + source, + someip_header, + sd_header, + }; + let _ = update_sender.send(ClientUpdate::DiscoveryUpdated(discovery_msg)); + } + Err(err) => { + error!("Error receiving discovery message: {:?}", err); + let _ = update_sender.send(ClientUpdate::Error(err)); + } } - } - } - unicast = Inner::receive_any_unicast(unicast_sockets) => { - trace!("Received unicast message: {:?}", unicast); - match unicast { - Ok(received) => { - let ReceivedMessage { message: received_message, e2e_status, .. } = received; - // Check if this matches a pending request-response by request_id - let request_id = received_message.header().request_id(); - if let Some(sender) = pending_responses.remove(&request_id) { - let _ = sender.send(Ok(received_message.payload().clone())); - continue; + } + unicast = unicast_fut => { + trace!("Received unicast message: {:?}", unicast); + match unicast { + Ok(received) => { + let ReceivedMessage { message: received_message, e2e_status, .. } = received; + // Check if this matches a pending request-response by request_id + let request_id = received_message.header().request_id(); + if let Some(sender) = pending_responses.remove(&request_id) { + let _ = sender.send(Ok(received_message.payload().clone())); + continue; + } + // Not a response — forward as ClientUpdate::Unicast + let _ = update_sender.send(ClientUpdate::Unicast { message: received_message, e2e_status }); + } + Err(err) => { + let _ = update_sender.send(ClientUpdate::Error(err)); } - // Not a response — forward as ClientUpdate::Unicast - let _ = update_sender.send(ClientUpdate::Unicast { message: received_message, e2e_status }); - } - Err(err) => { - let _ = update_sender.send(ClientUpdate::Error(err)); } } - } - } - if !*run { - info!("SOME/IP Client processing loop exiting"); - break; + } + !*run + }; + if should_break { + info!("SOME/IP Client processing loop exiting"); + break; + } + self.handle_control_message().await; } - self.handle_control_message().await; - } } } } diff --git a/src/client/mod.rs b/src/client/mod.rs index 88fb2bb..f360c6f 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -36,7 +36,9 @@ mod socket_manager; pub use error::Error; +use crate::Timer; use crate::e2e::{E2ECheckStatus, E2EKey, E2EProfile, E2ERegistry}; +use crate::tokio_transport::TokioTimer; use crate::{protocol, protocol::Message, traits::PayloadWireFormat}; use inner::{ControlMessage, Inner}; use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; @@ -232,7 +234,7 @@ where /// With loopback enabled, the client's own discovery socket also receives /// the multicast SD traffic this client sends (e.g. `FindService` probes /// and periodic `OfferService` announcements driven by - /// [`Self::start_sd_announcements`]). Those self-sent messages are parsed + /// [`Self::sd_announcements_loop`]). Those self-sent messages are parsed /// the same as any other inbound SD traffic, so callers may observe: /// /// - [`ClientUpdate::DiscoveryUpdated`] events originating from this @@ -422,7 +424,7 @@ where /// Call this before manually building an SD header (e.g. one passed to /// [`send_sd_message`](Self::send_sd_message)) so the reboot flag reflects /// the current tracked state instead of a stale value baked at call time. - /// Headers passed to [`start_sd_announcements`](Self::start_sd_announcements) + /// Headers passed to [`sd_announcements_loop`](Self::sd_announcements_loop) /// are refreshed automatically per-tick and do not need this call. /// /// # Panics @@ -484,42 +486,69 @@ where /// counter wraps past `0xFFFF`, rather than staying stuck on whatever /// value was baked at call time. /// - /// Returns a [`tokio::task::JoinHandle`] that can be used to abort the - /// background task. The task uses a weak reference to the client's - /// control channel, so it exits automatically when all `Client` handles - /// are dropped (via `shut_down()` or going out of scope). + /// Returns an `impl Future + Send + 'static` that the + /// caller drives on their executor (typically via `tokio::spawn`). + /// The loop uses a weak reference to the client's control channel, + /// so it exits automatically when all `Client` handles are dropped + /// (via `shut_down()` or going out of scope). + /// + /// ```no_run + /// # use simple_someip::{Client, RawPayload, VecSdHeader}; + /// # use simple_someip::protocol::sd::{self, RebootFlag, Flags}; + /// # async fn demo(client: Client) { + /// let header = VecSdHeader { + /// flags: Flags::new_sd(RebootFlag::RecentlyRebooted), + /// entries: vec![], + /// options: vec![], + /// }; + /// let handle = tokio::spawn( + /// client.sd_announcements_loop(header, std::time::Duration::from_secs(1)) + /// ); + /// // ...later: handle.abort() to stop, or let the Client drop naturally. + /// # } + /// ``` /// /// # Arguments /// /// * `sd_header` — The SD header to send (entries + options). /// * `interval` — How often to send (e.g. every 1 second). Values below /// 100ms are clamped to 100ms to prevent tight loops. - pub fn start_sd_announcements( + pub fn sd_announcements_loop( &self, sd_header: ::SdHeader, interval: std::time::Duration, - ) -> tokio::task::JoinHandle<()> + ) -> impl core::future::Future + Send + 'static where ::SdHeader: Send + 'static, { use crate::protocol::sd; - // Use a WeakSender so this task does NOT keep the control channel + // Use a WeakSender so this future does NOT keep the control channel // alive. When all strong Client handles are dropped (shut_down), - // the weak sender will fail to upgrade and the task exits cleanly. + // the weak sender will fail to upgrade and the loop exits cleanly. let weak_sender = self.control_sender.downgrade(); let target = SocketAddrV4::new(sd::MULTICAST_IP, sd::MULTICAST_PORT); let interval = interval.max(std::time::Duration::from_millis(100)); - tokio::spawn(async move { - let mut tick = tokio::time::interval(interval); - tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); - // Consume the immediate first tick so we don't send before - // the caller has finished setting up (e.g. subscribing). - tick.tick().await; + async move { + // Sleep goes through the `Timer` trait so bare-metal + // consumers can swap in `embassy_time` or similar; today it + // resolves to `TokioTimer`. Note: we use `Timer::sleep` + // repeatedly instead of `tokio::time::interval` because the + // trait has no equivalent of `interval`. The resulting + // cadence is "interval + body time" rather than "interval + // aligned to wall clock"; for SD announcements (a + // best-effort periodic heartbeat) this difference is + // immaterial. A regression test pins the cadence at + // approximately `interval` tolerance. + // + // The first iteration's `sleep` also serves as the initial + // delay so the caller has a chance to finish setup (e.g. + // subscribing) before the first announcement goes out. + let timer = TokioTimer; let mut count = 0u64; loop { - tick.tick().await; + timer.sleep(interval).await; // Refresh the reboot flag from the client's tracked state // so long-running announcers transition from RecentlyRebooted @@ -578,7 +607,7 @@ where } } } - }) + } } /// Registers a service endpoint in the client's endpoint registry. @@ -1001,14 +1030,15 @@ mod tests { } #[tokio::test] - async fn test_start_sd_announcements_does_not_panic() { + async fn test_sd_announcements_loop_does_not_panic() { let (client, _updates, run_fut) = TestClient::new(Ipv4Addr::LOCALHOST); let _ = tokio::spawn(run_fut); client.bind_discovery().await.unwrap(); let sd_header = empty_sd_header(); - let handle = - client.start_sd_announcements(sd_header, std::time::Duration::from_millis(100)); + let handle = tokio::spawn( + client.sd_announcements_loop(sd_header, std::time::Duration::from_millis(100)), + ); // Let the task fire at least once (may fail to send on loopback, that's OK). tokio::time::sleep(std::time::Duration::from_millis(250)).await; @@ -1025,13 +1055,14 @@ mod tests { } #[tokio::test] - async fn test_start_sd_announcements_without_discovery_bound() { + async fn test_sd_announcements_loop_without_discovery_bound() { let (client, _updates, run_fut) = TestClient::new(Ipv4Addr::LOCALHOST); let _ = tokio::spawn(run_fut); // Don't bind discovery — the task should handle the error gracefully. let sd_header = empty_sd_header(); - let handle = - client.start_sd_announcements(sd_header, std::time::Duration::from_millis(100)); + let handle = tokio::spawn( + client.sd_announcements_loop(sd_header, std::time::Duration::from_millis(100)), + ); tokio::time::sleep(std::time::Duration::from_millis(250)).await; @@ -1047,14 +1078,15 @@ mod tests { } #[tokio::test] - async fn test_start_sd_announcements_abort_stops_task() { + async fn test_sd_announcements_loop_abort_stops_task() { let (client, _updates, run_fut) = TestClient::new(Ipv4Addr::LOCALHOST); let _ = tokio::spawn(run_fut); client.bind_discovery().await.unwrap(); let sd_header = empty_sd_header(); - let handle = - client.start_sd_announcements(sd_header, std::time::Duration::from_millis(100)); + let handle = tokio::spawn( + client.sd_announcements_loop(sd_header, std::time::Duration::from_millis(100)), + ); handle.abort(); let result = handle.await; @@ -1068,7 +1100,7 @@ mod tests { } #[tokio::test] - async fn test_start_sd_announcements_overrides_caller_reboot_flag() { + async fn test_sd_announcements_loop_overrides_caller_reboot_flag() { // Regression test for the auto-refresh behavior: a caller who bakes // `Continuous` into `sd_header.flags` must still observe the client's // tracked flag on the wire (here, `RecentlyRebooted`, because the @@ -1085,8 +1117,9 @@ mod tests { sd_header.flags = crate::protocol::sd::Flags::new_sd(crate::protocol::sd::RebootFlag::Continuous); - let handle = - client.start_sd_announcements(sd_header, std::time::Duration::from_millis(100)); + let handle = tokio::spawn( + client.sd_announcements_loop(sd_header, std::time::Duration::from_millis(100)), + ); // Loopback delivers our own SD announcements back as DiscoveryUpdated. // Drain updates until we see one (tokio::time::interval skips the @@ -1177,14 +1210,15 @@ mod tests { } #[tokio::test] - async fn test_start_sd_announcements_stops_on_shutdown() { + async fn test_sd_announcements_loop_stops_on_shutdown() { let (client, _updates, run_fut) = TestClient::new(Ipv4Addr::LOCALHOST); - let _ = tokio::spawn(run_fut); + tokio::spawn(run_fut); client.bind_discovery().await.unwrap(); let sd_header = empty_sd_header(); - let handle = - client.start_sd_announcements(sd_header, std::time::Duration::from_millis(100)); + let handle = tokio::spawn( + client.sd_announcements_loop(sd_header, std::time::Duration::from_millis(100)), + ); // Shut down the client — the weak sender should fail to upgrade // and the task should exit cleanly without needing abort(). @@ -1254,4 +1288,165 @@ mod tests { "expected Error::Shutdown after run-loop cancel, got {err:?}", ); } + + /// Pins the cadence of `sd_announcements_loop` under a healthy + /// (non-backpressured) control channel by counting how many + /// announcements land on the `Inner` loop's discovery socket + /// within a bounded window. + /// + /// Phase 7.5 replaced `tokio::time::interval` (wall-clock aligned, + /// catches up after slow bodies) with repeated `Timer::sleep` + /// calls (interval + body time, no catch-up). For a healthy event + /// loop the body is microseconds, so the observed cadence is very + /// close to the requested interval. If a future change regresses + /// this to "2 * interval" or worse, this test fires. + /// + /// The test creates a multicast receiver on the SD port/address + /// with loopback enabled, then runs a client with + /// `new_with_loopback(true)` and counts received announcements + /// over a 550ms window with an interval of 100ms. Expected: the + /// first announcement lands at t≈100ms, then ~every 100ms after, + /// so we expect 4-5 announcements in the window. Asserting `>= 3` + /// gives tolerance for scheduler jitter but still catches a 2x+ + /// cadence regression. + #[ignore = "requires MULTICAST on the loopback interface; dev \ + machines where `lo` lacks the MULTICAST flag will not \ + deliver loopback multicast and this test will fail. \ + Runs in any environment where loopback multicast is \ + available (e.g. CI)."] + #[tokio::test] + async fn sd_announcements_loop_cadence_stays_close_to_requested() { + use crate::protocol::sd; + use socket2::{Domain, Protocol, Socket, Type}; + + let iface = Ipv4Addr::LOCALHOST; + + // Build a loopback multicast receiver on the SD port. + let recv = { + let s = Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP)).unwrap(); + s.set_reuse_address(true).unwrap(); + #[cfg(unix)] + s.set_reuse_port(true).unwrap(); + s.bind(&std::net::SocketAddr::from((iface, sd::MULTICAST_PORT)).into()) + .unwrap(); + s.set_nonblocking(true).unwrap(); + let std_s: std::net::UdpSocket = s.into(); + let rs = tokio::net::UdpSocket::from_std(std_s).unwrap(); + rs.join_multicast_v4(sd::MULTICAST_IP, iface).unwrap(); + rs + }; + + let (client, _updates, run_fut) = TestClient::new_with_loopback(iface, true); + tokio::spawn(run_fut); + client.bind_discovery().await.unwrap(); + + let interval = std::time::Duration::from_millis(100); + let loop_handle = tokio::spawn(client.sd_announcements_loop(empty_sd_header(), interval)); + + // Collect announcements over a 550ms window. First send fires + // at ~100ms, subsequent at ~100ms intervals; expect 4-5 packets. + let start = std::time::Instant::now(); + let mut count = 0u32; + let mut buf = [0u8; 1500]; + while start.elapsed() < std::time::Duration::from_millis(550) { + if tokio::time::timeout( + std::time::Duration::from_millis(200), + recv.recv_from(&mut buf), + ) + .await + .map(|r| r.is_ok()) + .unwrap_or(false) + { + count += 1; + } + } + + loop_handle.abort(); + client.shut_down(); + + assert!( + count >= 3, + "expected >= 3 announcements in 550ms at 100ms interval, got {count} — \ + cadence may have regressed" + ); + } + + /// Pins the first-announcement latency of `sd_announcements_loop` + /// to a single interval. A prior revision slept once before the + /// loop AND at the top of each iteration, so the first packet + /// landed at ~2× interval. This test catches that regression by + /// measuring the time from loop start to the first received + /// announcement and requiring it to be well under 2× interval. + /// + /// Uses the same loopback-multicast catch pattern as + /// `sd_announcements_loop_cadence_stays_close_to_requested`. + #[ignore = "requires MULTICAST on the loopback interface; same \ + constraint as `sd_announcements_loop_cadence_stays_close_to_requested`. \ + Runs in any environment where loopback multicast is \ + available (e.g. CI)."] + #[tokio::test] + async fn sd_announcements_loop_first_emit_within_one_interval() { + use crate::protocol::sd; + use socket2::{Domain, Protocol, Socket, Type}; + + let iface = Ipv4Addr::LOCALHOST; + + let recv = { + let s = Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP)).unwrap(); + s.set_reuse_address(true).unwrap(); + #[cfg(unix)] + s.set_reuse_port(true).unwrap(); + s.bind(&std::net::SocketAddr::from((iface, sd::MULTICAST_PORT)).into()) + .unwrap(); + s.set_nonblocking(true).unwrap(); + let std_s: std::net::UdpSocket = s.into(); + let rs = tokio::net::UdpSocket::from_std(std_s).unwrap(); + rs.join_multicast_v4(sd::MULTICAST_IP, iface).unwrap(); + rs + }; + + let (client, _updates, run_fut) = TestClient::new_with_loopback(iface, true); + tokio::spawn(run_fut); + client.bind_discovery().await.unwrap(); + + let interval = std::time::Duration::from_millis(100); + let start = std::time::Instant::now(); + let loop_handle = tokio::spawn(client.sd_announcements_loop(empty_sd_header(), interval)); + + let mut buf = [0u8; 1500]; + let first = tokio::time::timeout( + std::time::Duration::from_millis(500), + recv.recv_from(&mut buf), + ) + .await + .expect("first SD announcement did not arrive within 500ms") + .expect("recv_from errored"); + let first_emit_elapsed = start.elapsed(); + let _ = first; + + loop_handle.abort(); + client.shut_down(); + + assert!( + first_emit_elapsed < std::time::Duration::from_millis(250), + "first announcement took {first_emit_elapsed:?}, expected < 250ms at 100ms interval — \ + likely double-sleep regression" + ); + } + + /// Compile-time-ish assertion that `Client::new`'s returned run + /// future is `Send + 'static`. If a future refactor captures a + /// `!Send` or borrowed type in `Inner::run_future`, `thread::spawn` + /// rejects the move and this test fails to compile — surfacing the + /// regression at the site that introduced it rather than at a + /// distant `tokio::spawn` call site. + /// + /// The test doesn't actually need to drive the future; it's a + /// type-level check that happens to execute a no-op thread. + #[test] + fn client_new_run_future_is_send_static() { + let (_client, _updates, run_fut) = TestClient::new(Ipv4Addr::LOCALHOST); + let handle = std::thread::spawn(move || drop(run_fut)); + handle.join().unwrap(); + } } diff --git a/src/client/socket_manager.rs b/src/client/socket_manager.rs index 59c4fa4..4bb2fde 100644 --- a/src/client/socket_manager.rs +++ b/src/client/socket_manager.rs @@ -8,13 +8,14 @@ use crate::{ }; use super::error::Error; +use futures::{FutureExt, pin_mut, select}; use std::{ net::{Ipv4Addr, SocketAddr, SocketAddrV4}, sync::{Arc, Mutex}, task::{Context, Poll}, }; -use tokio::{select, sync::mpsc}; -use tracing::{debug, error, info, trace}; +use tokio::sync::mpsc; +use tracing::{error, info, trace}; /// A received message together with the source address it came from. /// @@ -39,6 +40,15 @@ pub struct SendMessage { response: tokio::sync::oneshot::Sender>, } +/// One iteration's select-outcome in `spawn_socket_loop`. The inner +/// block returns this scalar so the pinned per-iteration `send_fut` / +/// `recv_fut` futures drop before the processing body — releasing their +/// `&mut buf` / `&mut socket` borrows. +enum Outcome { + Send(Option>), + Recv(Result), +} + impl SendMessage { pub fn new( target_addr: SocketAddrV4, @@ -103,9 +113,10 @@ where /// underlying socket through a caller-supplied [`TransportFactory`]. /// The factory must still produce a /// [`TokioSocket`](crate::tokio_transport::TokioSocket) because the - /// spawned I/O loop is currently tokio-specific; once phase 6 hoists - /// the spawn out of this function, this bound will be relaxed to any - /// `TransportSocket`. + /// spawned I/O loop is currently tokio-specific; the bound will be + /// relaxed to any `TransportSocket` once `spawn_socket_loop`'s outer + /// `tokio::spawn` is hoisted (planned for phase 8 alongside the + /// bare-metal example). pub async fn bind_discovery_seeded_with_transport( factory: &F, interface: Ipv4Addr, @@ -253,18 +264,16 @@ where /// Spawn the I/O loop over a concrete [`TokioSocket`]. /// - /// The loop body's entire I/O surface on the socket is `send_to` - /// and `recv_from` — both trait methods. Multicast membership - /// (`join_multicast_v4`) is set up by the caller *before* calling - /// this function, never from inside the spawned task, so the - /// per-loop I/O surface stays on just the two send/recv methods. - /// Because no `TokioSocket`-specific inherent methods are used - /// inside, generalizing this function over `T: TransportSocket` is - /// a mechanical change once the outer `tokio::spawn` is hoisted - /// out in phase 6 (stable Rust's `Send` bounds on RPITIT method - /// returns are currently expressible only via return-type notation, - /// which is nightly — hoisting the spawn avoids the issue by moving - /// the `Send` requirement off this function entirely). + /// The socket's trait methods (`send_to`, `recv_from`, + /// `join_multicast_v4`) are the entire I/O surface used inside — the + /// loop body does not call any `TokioSocket`-specific inherent + /// methods, so generalizing this function over `T: TransportSocket` + /// is a mechanical change once the outer `tokio::spawn` is hoisted + /// out (planned for phase 8 alongside the bare-metal example — + /// hoisting the spawn moves the `Send` requirement off this + /// function, sidestepping stable Rust's current inability to + /// express `Send` bounds on RPITIT method returns without nightly + /// return-type notation). #[allow(clippy::too_many_lines)] fn spawn_socket_loop( socket: crate::tokio_transport::TokioSocket, @@ -274,162 +283,184 @@ where ) { tokio::spawn(async move { let mut buf = [0u8; UDP_BUFFER_SIZE]; + loop { - select! { - result = socket.recv_from(&mut buf) => { - match result { - Ok(ReceivedDatagram { bytes_received, source, truncated }) => { - if truncated { - // A truncated datagram cannot be parsed reliably; - // the length field in the SOME/IP header will not - // match the bytes we received. Log and drop. - error!( - "Discarding truncated datagram from {}: {} bytes received", - source, bytes_received - ); - continue; - } - let source_address = SocketAddr::V4(source); - let parse_result = MessageView::parse(&buf[..bytes_received]) - .and_then(|view| { - let header = view.header().to_owned(); - let upper_header = header.upper_header_bytes(); - let key = E2EKey::from_message_id(header.message_id()); - let payload_bytes = view.payload_bytes(); - - // Apply E2E check if configured - let (e2e_status, effective_payload) = { - let mut registry = e2e_registry.lock().expect("e2e registry lock poisoned"); - match registry.check(key, payload_bytes, upper_header) { - Some((status, stripped)) => (Some(status), stripped), - None => (None, payload_bytes), - } - }; - - let payload = MessageDefinitions::from_payload_bytes(header.message_id(), effective_payload)?; - Ok(ReceivedMessage { - message: Message::new(header, payload), - source: source_address, - e2e_status, - }) - }) - .map_err(Error::from); - if rx_tx.send(parse_result).await.is_err() { - info!("Socket Dropping"); - // The receiver has been dropped, so we should exit - break; - } - } + // `select!` (not `select_biased!`) gives pseudo-random + // fairness across ready arms — matches prior + // `tokio::select!` behavior and avoids starving either + // the send or recv arm under sustained one-sided load. + // + // The fresh `.fuse()`'d per-iteration futures are pinned + // on the stack (required: `Fuse<_>` is not `Unpin`). + // Returning an `Outcome

` scalar from the inner block + // drops both pinned futures — and their `&mut buf` / + // `&mut socket` borrows — before the processing body + // below runs, so the body can re-borrow `buf` freely. + let outcome: Outcome = { + let send_fut = tx_rx.recv().fuse(); + let recv_fut = socket.recv_from(&mut buf).fuse(); + pin_mut!(send_fut, recv_fut); + select! { + message = send_fut => Outcome::Send(message), + result = recv_fut => Outcome::Recv(result), + } + }; + + match outcome { + Outcome::Send(Some(send_message)) => { + trace!("Sending: {:?}", &send_message); + let mut message_length = match send_message + .message + .encode(&mut buf.as_mut_slice()) + { + Ok(length) => length, Err(e) => { - // This arm is the transport-level recv_from - // result; decoding runs further up inside - // `MessageView::parse`. An `Err` here is an - // I/O failure on the socket read, not a - // decode failure. - // - // `map_io_error` in tokio_transport already - // logs the raw OS error + kind (at `warn!` - // for actionable kinds, `debug!` for - // steady-state noise like `TimedOut`), so - // stay at `debug!` here to avoid double- - // logging the same failure at `error!`. - debug!("recv_from returned error on socket loop: {:?}", e); - } - } - }, - message = tx_rx.recv() => { - if let Some(send_message) = message { - trace!("Sending: {:?}", &send_message); - // Fail fast with the capacity error rather than - // letting `encode` report a less-actionable - // protocol I/O error when it runs out of - // buffer. Matches the E2E-overflow arm below - // and the server event_publisher path. - let required_size = send_message.message.required_size(); - if required_size > UDP_BUFFER_SIZE { - error!( - "outgoing message ({} bytes) exceeds UDP_BUFFER_SIZE ({}); dropping send", - required_size, UDP_BUFFER_SIZE - ); - let _ = send_message.response.send(Err(Error::Capacity("udp_buffer"))); - continue; - } - let mut message_length = match send_message.message.encode(&mut buf.as_mut_slice()) { - Ok(length) => length, - Err(e) => { - error!("Failed to encode message: {:?}", e); - // If the sender is already closed we can't send the error back, so we shut everything down - if send_message.response.send(Err(e.into())).is_err() { - error!("Socket owner closed channel unexpectedly, closing socket."); - break; - } + error!("Failed to encode message: {:?}", e); + // If the sender is already closed we can't send the error back, so we shut everything down + if let Ok(()) = send_message.response.send(Err(e.into())) { // Successfully sent error back to sender, carry on continue; } - }; - - // Apply E2E protect if configured. `protected` - // is a disjoint stack buffer, so the input can - // be borrowed directly out of `buf[16..]` with - // no intermediate copy. - { - let key = E2EKey::from_message_id(send_message.message.header().message_id()); - let mut registry = e2e_registry.lock().expect("e2e registry lock poisoned"); - if registry.contains_key(&key) { - let upper_header: [u8; 8] = buf[8..16].try_into().expect("upper header slice"); - let mut protected = [0u8; UDP_BUFFER_SIZE]; - let result = registry.protect( - key, - &buf[16..message_length], - upper_header, - &mut protected, - ); - match result { - Some(Ok(protected_len)) => { - if 16 + protected_len > UDP_BUFFER_SIZE { - error!( - "E2E-protected datagram ({} bytes, header + protected payload) exceeds UDP_BUFFER_SIZE ({}); dropping send", - 16 + protected_len, UDP_BUFFER_SIZE - ); - let _ = send_message.response.send(Err(Error::Capacity("udp_buffer"))); - continue; - } - #[allow(clippy::cast_possible_truncation)] - let new_length: u32 = 8 + protected_len as u32; - buf[4..8].copy_from_slice(&new_length.to_be_bytes()); - buf[16..16 + protected_len].copy_from_slice(&protected[..protected_len]); - message_length = 16 + protected_len; - } - Some(Err(e)) => { - error!("E2E protect error: {:?}", e); + error!("Socket owner closed channel unexpectedly, closing socket."); + break; + } + }; + + // Apply E2E protect if configured. `protected` + // is a disjoint stack buffer, so the input can + // be borrowed directly out of `buf[16..]` with + // no intermediate copy. + { + let key = + E2EKey::from_message_id(send_message.message.header().message_id()); + let mut registry = + e2e_registry.lock().expect("e2e registry lock poisoned"); + if registry.contains_key(&key) { + let upper_header: [u8; 8] = + buf[8..16].try_into().expect("upper header slice"); + let mut protected = [0u8; UDP_BUFFER_SIZE]; + let result = registry.protect( + key, + &buf[16..message_length], + upper_header, + &mut protected, + ); + match result { + Some(Ok(protected_len)) => { + if 16 + protected_len > UDP_BUFFER_SIZE { + error!( + "E2E-protected payload ({} bytes) exceeds UDP_BUFFER_SIZE ({}); dropping send", + 16 + protected_len, + UDP_BUFFER_SIZE + ); + let _ = send_message + .response + .send(Err(Error::Capacity("udp_buffer"))); + continue; } - None => unreachable!("contains_key was true"), + #[allow(clippy::cast_possible_truncation)] + let new_length: u32 = 8 + protected_len as u32; + buf[4..8].copy_from_slice(&new_length.to_be_bytes()); + buf[16..16 + protected_len] + .copy_from_slice(&protected[..protected_len]); + message_length = 16 + protected_len; + } + Some(Err(e)) => { + error!("E2E protect error: {:?}", e); } + None => unreachable!("contains_key was true"), } } + } - match socket.send_to(&buf[..message_length], send_message.target_addr).await { - Ok(()) => { - trace!("Sent {} bytes to {}", message_length, send_message.target_addr); - if send_message.response.send(Ok(())).is_err() { - info!("Socket owner closed channel, closing socket."); - // The sender has been dropped, so we should exit - break; - } + match socket + .send_to(&buf[..message_length], send_message.target_addr) + .await + { + Ok(()) => { + trace!( + "Sent {} bytes to {}", + message_length, send_message.target_addr + ); + if let Ok(()) = send_message.response.send(Ok(())) { + } else { + info!("Socket owner closed channel, closing socket."); + // The sender has been dropped, so we should exit + break; } - Err(e) => { - if send_message.response.send(Err(Error::Transport(e))).is_err() { - error!("Socket owner closed channel unexpectedly, closing socket."); - break; - } + } + Err(e) => { + error!("Failed to send message with error: {:?}", e); + if let Ok(()) = send_message.response.send(Err(Error::Transport(e))) + { + } else { + error!( + "Socket owner closed channel unexpectedly, closing socket." + ); + break; } } + } + } + Outcome::Send(None) => { + info!("Send channel closed, closing socket."); + // The sender has been dropped, so we should exit + break; + } + Outcome::Recv(Ok(ReceivedDatagram { + bytes_received, + source, + truncated, + })) => { + if truncated { + // A truncated datagram cannot be parsed reliably; + // the length field in the SOME/IP header will not + // match the bytes we received. Log and drop. + error!( + "Discarding truncated datagram from {}: {} bytes received", + source, bytes_received + ); + continue; + } + let source_address = SocketAddr::V4(source); + let parse_result = MessageView::parse(&buf[..bytes_received]) + .and_then(|view| { + let header = view.header().to_owned(); + let upper_header = header.upper_header_bytes(); + let key = E2EKey::from_message_id(header.message_id()); + let payload_bytes = view.payload_bytes(); + + // Apply E2E check if configured + let (e2e_status, effective_payload) = { + let mut registry = + e2e_registry.lock().expect("e2e registry lock poisoned"); + match registry.check(key, payload_bytes, upper_header) { + Some((status, stripped)) => (Some(status), stripped), + None => (None, payload_bytes), + } + }; + + let payload = MessageDefinitions::from_payload_bytes( + header.message_id(), + effective_payload, + )?; + Ok(ReceivedMessage { + message: Message::new(header, payload), + source: source_address, + e2e_status, + }) + }) + .map_err(Error::from); + if let Ok(()) = rx_tx.send(parse_result).await { } else { - info!("Send channel closed, closing socket."); - // The sender has been dropped, so we should exit + info!("Socket Dropping"); + // The receiver has been dropped, so we should exit break; } } + Outcome::Recv(Err(recv_err)) => { + error!("Transport recv failed: {:?}", recv_err); + } } } }); @@ -695,15 +726,11 @@ mod tests { .await .unwrap(); - // Craft a message whose raw-encoded size fits `UDP_BUFFER_SIZE` - // exactly (header + payload = cap) but whose E2E-protected size - // does not — Profile4 adds `PROFILE4_HEADER_SIZE` bytes which - // pushes the protected total over the cap. Sizes derived from - // `UDP_BUFFER_SIZE` and `PROFILE4_HEADER_SIZE` so the fixture - // stays valid if the constant is retuned. - const SOMEIP_HEADER_SIZE: usize = 16; - let payload_len = UDP_BUFFER_SIZE - SOMEIP_HEADER_SIZE; // raw total == UDP_BUFFER_SIZE - let payload_bytes = vec![0u8; payload_len]; + // Craft a message whose raw-encoded size fits UDP_BUFFER_SIZE (16-byte + // header + 1480-byte payload = 1496 bytes) but whose E2E-protected + // size does not (payload grows by PROFILE4_HEADER_SIZE = 12, pushing + // the total to 1508 bytes, 8 over MTU). + let payload_bytes = [0u8; 1480]; let payload = RawPayload::from_payload_bytes(message_id, &payload_bytes).unwrap(); let header = Header::new( message_id, @@ -727,68 +754,11 @@ mod tests { } } - /// Messages whose raw encoded size already exceeds `UDP_BUFFER_SIZE` - /// — with no E2E in play — must be rejected up front with - /// `Error::Capacity("udp_buffer")` rather than bubbling out the - /// less-actionable protocol I/O error that `encode` would report - /// after running out of buffer. - #[tokio::test] - async fn send_raw_message_exceeding_udp_buffer_returns_capacity_error() { - use crate::RawPayload; - use crate::protocol::{Header, MessageId, MessageType, MessageTypeField, ReturnCode}; - - let message_id = MessageId::new_from_service_and_method(0x1234, 0x5678); - // No E2E registered — goes straight through the pre-encode check. - let e2e_registry = Arc::new(Mutex::new(E2ERegistry::new())); - let mut sm = SocketManager::::bind(0, e2e_registry).await.unwrap(); - - // Derive a payload that makes the full message exceed the UDP cap - // by 1 byte regardless of how `UDP_BUFFER_SIZE` is retuned: - // 16-byte header + payload_len = UDP_BUFFER_SIZE + 1. - const SOMEIP_HEADER_SIZE: usize = 16; - let payload_len = UDP_BUFFER_SIZE - SOMEIP_HEADER_SIZE + 1; - let payload_bytes = vec![0u8; payload_len]; - let payload = RawPayload::from_payload_bytes(message_id, &payload_bytes).unwrap(); - let header = Header::new( - message_id, - 0x0001_0001, - 0x01, - 0x01, - MessageTypeField::new(MessageType::Request, false), - ReturnCode::Ok, - payload_bytes.len(), - ); - let message = Message::new(header, payload); - assert!( - message.required_size() > UDP_BUFFER_SIZE, - "fixture must actually exceed the cap for this test to exercise the new path", - ); - - let target = SocketAddrV4::new(Ipv4Addr::LOCALHOST, 9999); - let err = sm - .send(target, message) - .await - .expect_err("raw oversize message must error"); - match err { - Error::Capacity(tag) => assert_eq!(tag, "udp_buffer"), - other => panic!("expected Error::Capacity(\"udp_buffer\"), got {other:?}"), - } - } - /// Proves the public `bind_with_transport` entry point accepts an /// alternative `TransportFactory` implementation. The factory here is /// a thin interceptor that counts how many times `bind` is called; it /// delegates to the built-in `TokioTransport`, which is what the /// current `Socket = TokioSocket` bound requires. - /// - /// TODO: extend this with an end-to-end round-trip test that uses a - /// custom factory to actually carry traffic (send from socket A, - /// receive on socket B, assert bytes match), and a negative test - /// where the factory returns `Err(TransportError::AddressInUse)` - /// and asserts that surfaces as `Error::Transport(...)` through the - /// `?` + `From` chain. Both are scoped for the phase-6 branch where - /// the spawn hoist lets us swap the socket type, not just the bind - /// logic. #[tokio::test] async fn bind_with_transport_accepts_custom_factory() { use crate::tokio_transport::{TokioSocket, TokioTransport}; @@ -832,4 +802,104 @@ mod tests { ); drop(sm); } + + /// End-to-end proof that a custom `TransportFactory` actually + /// carries traffic through the full `SocketManager` path. Sends a + /// SOME/IP-SD message from one bound `SocketManager` to a raw tokio + /// socket, verifies the bytes arrive intact. Complements the lighter + /// `bind_with_transport_accepts_custom_factory` by exercising + /// `send_to` + the spawned I/O loop, not just the bind call. + #[tokio::test] + async fn bind_with_transport_carries_traffic_end_to_end() { + use crate::tokio_transport::{TokioSocket, TokioTransport}; + use core::future::Future; + + // Factory that overrides `SocketOptions` to force + // `reuse_address = true` regardless of caller-provided flags — + // proves the factory sits in the hot path. + struct ForceReuseFactory; + impl TransportFactory for ForceReuseFactory { + type Socket = TokioSocket; + fn bind( + &self, + addr: SocketAddrV4, + options: &SocketOptions, + ) -> impl Future> + { + let mut opts = *options; + opts.reuse_address = true; + async move { TokioTransport.bind(addr, &opts).await } + } + } + + let mut sm = SocketManager::::bind_with_transport( + &ForceReuseFactory, + 0, + test_registry(), + ) + .await + .expect("bind via custom factory"); + let sm_port = sm.port(); + + let recv = UdpSocket::bind("127.0.0.1:0").await.unwrap(); + let recv_port = recv.local_addr().unwrap().port(); + + let msg = Message::::new_sd(1, &empty_sd_header()); + sm.send(SocketAddrV4::new(Ipv4Addr::LOCALHOST, recv_port), msg) + .await + .expect("send_to via custom-factory-built socket"); + + let mut buf = [0u8; 1500]; + let (len, from) = + tokio::time::timeout(std::time::Duration::from_secs(2), recv.recv_from(&mut buf)) + .await + .expect("timed out waiting for datagram") + .expect("recv failed"); + + assert!(len > 0, "empty datagram"); + match from { + std::net::SocketAddr::V4(v4) => assert_eq!(v4.port(), sm_port), + other @ std::net::SocketAddr::V6(_) => { + panic!("unexpected source address family: {other:?}") + } + } + + // Parse and confirm it's a SOME/IP-SD message, not garbage. + let view = MessageView::parse(&buf[..len]).unwrap(); + assert_eq!(view.header().message_id(), crate::protocol::MessageId::SD); + } + + /// Negative test: a factory that returns + /// `Err(TransportError::AddressInUse)` must surface as + /// `Err(Error::Transport(TransportError::AddressInUse))` through + /// the `?` + `From` conversion chain in + /// `bind_with_transport`. Catches regressions in the `#[from]` + /// impl on `client::Error` or the return-type plumbing. + #[tokio::test] + async fn bind_with_transport_propagates_factory_error() { + use crate::tokio_transport::TokioSocket; + use crate::transport::TransportError; + + struct AlwaysBusyFactory; + impl TransportFactory for AlwaysBusyFactory { + type Socket = TokioSocket; + async fn bind( + &self, + _addr: SocketAddrV4, + _options: &SocketOptions, + ) -> Result { + Err(TransportError::AddressInUse) + } + } + + let err = TestSocketManager::bind_with_transport(&AlwaysBusyFactory, 0, test_registry()) + .await + .expect_err("factory returned Err, bind must surface it"); + match err { + Error::Transport(TransportError::AddressInUse) => {} + other => { + panic!("expected Error::Transport(TransportError::AddressInUse), got {other:?}") + } + } + } } diff --git a/src/server/mod.rs b/src/server/mod.rs index 2d644aa..a5c33cc 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -19,8 +19,11 @@ pub use subscription_manager::{SubscribeError, SubscriptionManager}; use sd_state::SdStateManager; +use crate::Timer; use crate::e2e::{E2EKey, E2EProfile, E2ERegistry}; use crate::protocol::sd::{self, Entry, Flags, OptionsCount, ServiceEntry, TransportProtocol}; +use crate::tokio_transport::TokioTimer; +use futures::{FutureExt, pin_mut, select}; use std::{ format, net::{IpAddr, Ipv4Addr, SocketAddrV4}, @@ -295,7 +298,7 @@ impl Server { format!( "announcement_loop called on passive Server for service 0x{:04X}; \ announcements must be driven externally (e.g. via \ - `simple_someip::Client::start_sd_announcements`)", + `simple_someip::Client::sd_announcements_loop`)", self.config.service_id ), ))); @@ -328,8 +331,11 @@ impl Server { } } - // Send announcements every 1 second - tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + // Send announcements every 1 second. Sleep goes through + // the `Timer` trait so bare-metal consumers can swap in + // a different timer impl; today it resolves to + // `TokioTimer`. + TokioTimer.sleep(std::time::Duration::from_secs(1)).await; } }) } @@ -470,17 +476,35 @@ impl Server { let mut sd_buf = vec![0u8; 65535]; loop { - let (data, len, addr, source) = tokio::select! { - result = self.unicast_socket.recv_from(&mut unicast_buf) => { - let (len, addr) = result?; - (&unicast_buf[..], len, addr, "unicast") - } - result = self.sd_socket.recv_from(&mut sd_buf) => { - let (len, addr) = result?; - (&sd_buf[..], len, addr, "sd-multicast") + // `select!` (not `select_biased!`) gives pseudo-random fairness + // across ready arms each poll — matches the prior + // `tokio::select!` behavior and avoids starving either the + // unicast or SD-multicast arm under sustained one-sided load. + // + // Fresh futures are constructed each iteration so the borrows + // of `unicast_buf` / `sd_buf` / the sockets end when the + // select macro returns, freeing the buffer we index into + // below. + let (len, addr, source, from_unicast) = { + let unicast_fut = self.unicast_socket.recv_from(&mut unicast_buf).fuse(); + let sd_fut = self.sd_socket.recv_from(&mut sd_buf).fuse(); + pin_mut!(unicast_fut, sd_fut); + select! { + result = unicast_fut => { + let (len, addr) = result?; + (len, addr, "unicast", true) + } + result = sd_fut => { + let (len, addr) = result?; + (len, addr, "sd-multicast", false) + } } }; - let data = &data[..len]; + let data = if from_unicast { + &unicast_buf[..len] + } else { + &sd_buf[..len] + }; // By default IP_MULTICAST_LOOP=false suppresses own multicast // messages on the SD socket, so no source-IP filtering is needed. diff --git a/src/tokio_transport.rs b/src/tokio_transport.rs index 7702628..58c7489 100644 --- a/src/tokio_transport.rs +++ b/src/tokio_transport.rs @@ -75,12 +75,13 @@ impl TokioSocket { /// Sleep backed by [`tokio::time::sleep`]. /// -/// TODO(phase 7): wire this into the `tokio::time::sleep` call sites in -/// `client::inner::Inner::run` (125 ms tick), `server::mod::Server::run`, -/// and `Client::start_sd_announcements` (1 s tick) so the crate's own -/// timing is also routed through the `Timer` trait. Today `TokioTimer` -/// is shipped as public API but unused internally — consumers can rely -/// on it, but the crate's own code still uses tokio directly. +/// Used internally at every periodic-tick site in the crate: the 125ms +/// idle tick in `Inner::run_future`, the 1s announcement tick in +/// `Server::announcement_loop`, and the user-supplied interval in +/// `Client::sd_announcements_loop`. A bare-metal consumer swapping this +/// out for `embassy_time` (or similar) needs to replace three references +/// to `TokioTimer` with their own `Timer` impl — no trait rewrite +/// required. #[derive(Debug, Default, Clone, Copy)] pub struct TokioTimer;