From e4ab4148f748e62c3e08fbae379e83303c4c447d Mon Sep 17 00:00:00 2001 From: Justin Kovacich Date: Fri, 24 Apr 2026 21:03:36 -0400 Subject: [PATCH] phase 8: bare_metal example as trait-surface canary MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Final state of #82, squashed from 14 commits to keep the rebase onto the rewritten #81 tractable. Several intermediate commits reshape the same areas (bare_metal example reframing, test hygiene, doc updates, clippy fixes), so a single rebase against the final shape avoids fighting transient mid-PR states. Original commits, oldest first: - 287b47a Removed warns on dropped callers, removed tokio::spawn in subscribe_no_wait, response oneshot is now silent. - 6b8dfc3 subscribe_no_wait orphan cleanup. - c519d4f Added a bare_metal feature to feature flags and Cargo.toml. - cff87a8 Added a bare_metal example with no tokio, no socket2, no std::net — exercises the TransportSocket / TransportFactory / Timer / SpawnFuture trait surface end-to-end on host so that breakage of the abstraction is caught before any real bare-metal port. - e2465ef Added integration tests, gave examples of running bare_metal, added warnings against using demo code as implementation prototypes. - 5cd838b (same subject — second commit of the same change set). - d4cb3b1 Merge remote-tracking branch 'origin/feature/phase8_bare_metal' into feature/phase8_bare_metal. - 7e46c3a phase 8: reframe bare_metal example as host-side trait-surface canary. Doc comments and test descriptions clarify that the example is a trait-surface canary, not a real bare-metal target. - 16e3b84 round-2: docs + test-hygiene fixes for phase 8. - c99a9ee chore(clippy): tidy new warnings in phase8 PR. - 99eff06 docs: update stale function-name references in comments. - a45bd5b test(bare_metal_example_builds): drop runtime CARGO_MANIFEST_DIR assert. The runtime check was redundant with the at-compile-time path resolution and noisy when run outside cargo. - f770bf9 Update examples/bare_metal/src/main.rs. - 1e92f43 PR #82 round: workspace-command wording + spelling fixes. Co-Authored-By: Claude Opus 4.7 (1M context) --- Cargo.lock | 7 + Cargo.toml | 22 +- examples/bare_metal/Cargo.toml | 12 + examples/bare_metal/src/main.rs | 288 +++++++++++++++++++ examples/client_server/src/main.rs | 13 +- examples/discovery_client/src/main.rs | 4 +- src/client/inner.rs | 44 ++- src/client/mod.rs | 63 ++++- src/client/socket_manager.rs | 388 ++++++++++++++------------ src/lib.rs | 18 +- tests/bare_metal_example_builds.rs | 22 ++ 11 files changed, 666 insertions(+), 215 deletions(-) create mode 100644 examples/bare_metal/Cargo.toml create mode 100644 examples/bare_metal/src/main.rs create mode 100644 tests/bare_metal_example_builds.rs diff --git a/Cargo.lock b/Cargo.lock index a5f2c35..7a2c94d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,13 @@ # It is not intended for manual editing. version = 4 +[[package]] +name = "bare_metal" +version = "0.0.0" +dependencies = [ + "simple-someip", +] + [[package]] name = "byteorder" version = "1.5.0" diff --git a/Cargo.toml b/Cargo.toml index 8ee0290..8c33e00 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,10 @@ [workspace] -members = [".", "examples/discovery_client", "examples/client_server"] +members = [ + ".", + "examples/bare_metal", + "examples/client_server", + "examples/discovery_client", +] [package] name = "simple-someip" @@ -41,6 +46,21 @@ default = ["std"] std = ["embedded-io/std", "thiserror/std", "tracing/std"] client = ["std", "dep:tokio", "dep:socket2", "dep:futures"] server = ["std", "dep:tokio", "dep:socket2", "dep:futures"] +# Marks a build as intended for bare-metal / no_std consumption. +# Currently a pure marker — enables no crate code on its own. Reserved +# for future phases to gate no_std-specific helper types. +# +# **To demonstrate the bare-metal trait surface, use the +# `examples/bare_metal` workspace member directly:** `cargo run -p +# bare_metal`. That workspace member depends on `simple-someip` with +# `default-features = false, features = ["bare_metal"]`, so it +# exercises the actual bare-metal configuration. +# +# Enabling `bare_metal` on its own does NOT make the crate +# bare-metal-complete: the `client` and `server` feature paths still +# spawn per-socket I/O loops on `tokio::spawn`, and a fully tokio-free +# build additionally needs a user-provided `Spawner` impl (phase 9). +bare_metal = [] [[test]] name = "client_server" diff --git a/examples/bare_metal/Cargo.toml b/examples/bare_metal/Cargo.toml new file mode 100644 index 0000000..6350105 --- /dev/null +++ b/examples/bare_metal/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "bare_metal" +version = "0.0.0" +edition = "2024" +publish = false + +# The whole point of this example: depend on `simple-someip` with +# `default-features = false` (no `std` feature) and `bare_metal` on. +# This exercises the `transport` trait surface in the same minimal +# configuration a real firmware build would use. +[dependencies] +simple-someip = { path = "../..", default-features = false, features = ["bare_metal"] } diff --git a/examples/bare_metal/src/main.rs b/examples/bare_metal/src/main.rs new file mode 100644 index 0000000..33782ac --- /dev/null +++ b/examples/bare_metal/src/main.rs @@ -0,0 +1,288 @@ +//! Host-side canary for the bare-metal trait surface. +//! +//! # What this example actually is +//! +//! A workspace-member binary that exercises `simple-someip`'s +//! `TransportSocket` / `TransportFactory` / `Timer` traits against a +//! hand-rolled mock backend. The `Cargo.toml` in this directory +//! depends on `simple-someip` with +//! `default-features = false, features = ["bare_metal"]`, so building +//! or running this example proves **that the trait surface compiles +//! under exactly the feature set a firmware consumer would use** — +//! no `std`-feature paths from `simple-someip`, no tokio, no socket2. +//! `cargo build --workspace` catches any regression that breaks this +//! surface even without running the binary. +//! +//! # How to run +//! +//! ```text +//! cargo run -p bare_metal +//! ``` +//! +//! # What this is NOT +//! +//! This is **not** a runtime `no_std` demonstration. The host-side +//! mock uses `std::collections::VecDeque`, `std::sync::{Arc, Mutex}`, +//! `std::time::Instant`, and `println!` — all of which an actual +//! firmware build would replace with embedded equivalents +//! (`heapless::Deque`, `spin::Mutex`, a platform clock, `defmt!` or +//! similar). Using `std` in the *host-side driver code* is fine +//! because the purpose of this example is to verify **the +//! `simple-someip` crate itself** compiles with `default-features = +//! false` and exposes a trait surface that embedded consumers can +//! target. A true runtime-`no_std` example belongs with the phase +//! 10+ bare-metal refactor, once `Client` / `Server` can consume a +//! user-supplied transport and spawner without pulling in tokio. +//! +//! # Known gaps in the bare-metal story (independent of this example) +//! +//! `SocketManager::bind*` today still pins `F::Socket = TokioSocket`, +//! so the trait impls below — while correct — cannot be plugged into +//! the crate's `Client` / `Server` event loops yet. Two upstream +//! blockers must land first: +//! +//! 1. Relax the `F::Socket = TokioSocket` bound to +//! `F::Socket: TransportSocket` (requires stable Return-Type +//! Notation or a GAT-based parallel trait). +//! 2. Extract a `Spawner` trait so `SocketManager::bind*` can submit +//! per-socket loops to the user's executor instead of calling +//! `tokio::spawn` directly. See phase 9 in the refactor plan. +//! +//! Until (1) and (2) land, bare-metal users CAN implement the traits +//! below, but they CANNOT route their implementations through +//! `Client` / `Server`. + +use core::future::Future; +use core::net::{Ipv4Addr, SocketAddrV4}; +use core::task::{Context, Poll, Waker}; +use core::time::Duration; + +use std::collections::VecDeque; +use std::sync::{Arc, Mutex}; + +use simple_someip::transport::{ + IoErrorKind, ReceivedDatagram, SocketOptions, Timer, TransportError, TransportFactory, + TransportSocket, +}; + +/// Shared in-memory pipe. A `MockFactory` built around one of these +/// hands out sockets whose `send_to` pushes to `send_queue` and whose +/// `recv_from` pops from `recv_queue`. Two factories swapped queue- +/// ends give you a bidirectional pipe. +#[derive(Default)] +struct MockPipe { + /// `(bytes, dest_addr)` pairs sent by the local socket. + send_queue: Mutex, SocketAddrV4)>>, + /// `(bytes, src_addr)` pairs the local socket will read next. + recv_queue: Mutex, SocketAddrV4)>>, +} + +#[derive(Clone)] +struct MockFactory { + pipe: Arc, + local_addr: SocketAddrV4, +} + +struct MockSocket { + pipe: Arc, + local_addr: SocketAddrV4, +} + +impl TransportFactory for MockFactory { + type Socket = MockSocket; + + fn bind( + &self, + _addr: SocketAddrV4, + _options: &SocketOptions, + ) -> impl Future> { + let pipe = Arc::clone(&self.pipe); + let local_addr = self.local_addr; + core::future::ready(Ok(MockSocket { pipe, local_addr })) + } +} + +impl TransportSocket for MockSocket { + fn send_to( + &mut self, + buf: &[u8], + target: SocketAddrV4, + ) -> impl Future> { + let bytes = buf.to_vec(); + let pipe = Arc::clone(&self.pipe); + async move { + pipe.send_queue.lock().unwrap().push_back((bytes, target)); + Ok(()) + } + } + + fn recv_from( + &mut self, + buf: &mut [u8], + ) -> impl Future> { + let pipe = Arc::clone(&self.pipe); + // Copy directly into `buf` by stealing its slice lifetime out + // of the async block via a raw-pointer round-trip would be + // unsafe; instead, poll the queue on first call and fill buf + // synchronously if a datagram is ready. If the queue is empty, + // this mock returns a ready + // `Err(TransportError::Io(IoErrorKind::TimedOut))` rather than + // a pending future. In this single-threaded example we always + // send first then recv, so the timeout branch is unreachable + // here. + // + // The mock borrow-dance is awkward compared to a real UDP + // socket's recv_from; a production bare-metal impl would copy + // bytes out of its driver's receive slab directly into `buf`. + let result = { + let mut q = pipe.recv_queue.lock().unwrap(); + q.pop_front() + }; + match result { + Some((bytes, source)) => { + let n = bytes.len().min(buf.len()); + buf[..n].copy_from_slice(&bytes[..n]); + core::future::ready(Ok(ReceivedDatagram { + bytes_received: n, + source, + truncated: n < bytes.len(), + })) + } + None => core::future::ready(Err(TransportError::Io(IoErrorKind::TimedOut))), + } + } + + fn local_addr(&self) -> Result { + Ok(self.local_addr) + } + + fn join_multicast_v4( + &mut self, + _group: Ipv4Addr, + _iface: Ipv4Addr, + ) -> Result<(), TransportError> { + // Bare-metal stacks without multicast would return + // Unsupported; our mock is happy to no-op. + Ok(()) + } + + fn leave_multicast_v4( + &mut self, + _group: Ipv4Addr, + _iface: Ipv4Addr, + ) -> Result<(), TransportError> { + Ok(()) + } +} + +/// Timer that sleeps by busy-waiting on a monotonic clock. +/// +/// **ANTI-PATTERN — DO NOT USE IN PRODUCTION.** Busy-waiting burns a +/// core and starves other tasks. A real bare-metal impl would park +/// the task on its hardware timer ISR (e.g. `embassy_time::Timer::after`, +/// or a custom `Future` that registers itself with the MCU's timer +/// peripheral). The `Timer` trait signature is identical; only the +/// body changes. +struct MockTimer; + +impl Timer for MockTimer { + fn sleep(&self, duration: Duration) -> impl Future { + // ANTI-PATTERN: busy-wait. See struct docstring. + let deadline = std::time::Instant::now() + duration; + async move { + while std::time::Instant::now() < deadline { + std::hint::spin_loop(); + } + } + } +} + +/// Single-step `block_on` for the demo. +/// +/// **ANTI-PATTERN — DO NOT USE IN PRODUCTION.** `Waker::noop()` means +/// no wake-up signal is ever registered; a future that yields +/// `Pending` waiting on real I/O would never get polled again. The +/// loop-and-`spin_loop()` fallback here masks that by busy-spinning, +/// which is worse than useless on bare metal. Production executors +/// use proper `Waker` plumbing + a task queue driven by hardware +/// interrupts. This helper exists only to drive the demo's +/// synchronous mock futures (which resolve on the first poll). +fn block_on(fut: F) -> F::Output { + let waker = Waker::noop(); + let mut cx = Context::from_waker(&waker); + let mut fut = Box::pin(fut); + loop { + match fut.as_mut().poll(&mut cx) { + Poll::Ready(v) => return v, + Poll::Pending => { + // ANTI-PATTERN: busy-spin. See fn docstring. + std::hint::spin_loop(); + } + } + } +} + +fn main() { + // Each socket owns its own pipe; the "network" is us manually + // moving bytes from A's send queue into B's recv queue below. For + // a single send/recv demo this is enough; a more realistic mock + // would wire the two queues into a cross-connected pair at bind + // time. + let pipe_a = Arc::new(MockPipe::default()); + let pipe_b = Arc::new(MockPipe::default()); + + let factory_a = MockFactory { + pipe: Arc::clone(&pipe_a), + local_addr: SocketAddrV4::new(Ipv4Addr::new(10, 0, 0, 1), 30500), + }; + let factory_b = MockFactory { + pipe: Arc::clone(&pipe_b), + local_addr: SocketAddrV4::new(Ipv4Addr::new(10, 0, 0, 2), 30500), + }; + let options = SocketOptions::new(); + + let mut sock_a = block_on(factory_a.bind(factory_a.local_addr, &options)).expect("bind A"); + let mut sock_b = block_on(factory_b.bind(factory_b.local_addr, &options)).expect("bind B"); + + let payload = b"hello bare-metal"; + block_on(sock_a.send_to(payload, sock_b.local_addr().unwrap())).expect("send_to"); + + // DEMO-ONLY: hand-drain A's send queue into B's recv queue to + // simulate "the network carried the datagram." A real bare-metal + // integration would have its network driver (lwIP, smoltcp, a + // custom Ethernet ISR, etc.) write directly into the receiving + // socket's recv buffer — no user code touches the queues. This + // drain pattern is not a template; it exists to keep the example + // self-contained. + let sent = std::mem::take(&mut *pipe_a.send_queue.lock().unwrap()); + for (bytes, _dst) in sent { + pipe_b + .recv_queue + .lock() + .unwrap() + .push_back((bytes, sock_a.local_addr().unwrap())); + } + + let mut buf = [0u8; 64]; + let datagram = block_on(sock_b.recv_from(&mut buf)).expect("recv_from"); + + assert_eq!(datagram.bytes_received, payload.len()); + assert_eq!(datagram.source, sock_a.local_addr().unwrap()); + assert!(!datagram.truncated); + assert_eq!(&buf[..datagram.bytes_received], payload); + + // Demonstrate the Timer trait briefly. + let timer = MockTimer; + block_on(timer.sleep(Duration::from_millis(1))); + + println!( + "bare-metal example: sent {} bytes from {} to {}, received cleanly.", + datagram.bytes_received, + sock_a.local_addr().unwrap(), + sock_b.local_addr().unwrap(), + ); + println!( + "note: this only exercises the trait layer — see source comments \ + for the Client/Server + Spawner gap (phase 9 work)." + ); +} diff --git a/examples/client_server/src/main.rs b/examples/client_server/src/main.rs index 1cc716d..f97c706 100644 --- a/examples/client_server/src/main.rs +++ b/examples/client_server/src/main.rs @@ -1,4 +1,4 @@ -//! Client+Server hybrid example using `start_sd_announcements`. +//! Client+Server hybrid example using `Client::sd_announcements_loop`. //! //! Demonstrates how to run a SOME/IP application that is simultaneously: //! - A **client** subscribing to a remote service's events @@ -11,7 +11,7 @@ //! multicast announcements. //! //! The server's built-in `announcement_loop()` is NOT used — instead, the -//! client's `start_sd_announcements()` handles periodic multicast +//! client's `sd_announcements_loop()` handles periodic multicast //! announcements. The server's `run()` loop still handles unicast SD //! traffic (e.g. `SubscribeAck`/`SubscribeNack` responses) on its own //! socket, which is necessary for subscription management. @@ -106,8 +106,8 @@ async fn main() -> Result<(), Box> { // ── Create the client (handles discovery, subscriptions, SD socket) ── - let (client, mut updates, run) = simple_someip::Client::::new(interface); - let _run_handle = tokio::spawn(run); + let (client, mut updates, run_fut) = simple_someip::Client::::new(interface); + tokio::spawn(run_fut); client.bind_discovery().await?; info!("Client discovery bound"); @@ -127,7 +127,7 @@ async fn main() -> Result<(), Box> { info!("Server bound on port {MY_SERVER_PORT}"); // NOTE: We intentionally do NOT spawn server.announcement_loop(). - // The client's start_sd_announcements handles all SD traffic. + // The client's sd_announcements_loop handles all SD traffic. let _publisher = server.publisher(); @@ -141,7 +141,8 @@ async fn main() -> Result<(), Box> { // ── Start combined SD announcements from the client socket ─────────── let sd_header = build_sd_header(interface); - let _announce_handle = client.start_sd_announcements(sd_header, Duration::from_secs(1)); + let _announce_handle = + tokio::spawn(client.sd_announcements_loop(sd_header, Duration::from_secs(1))); info!("Started combined Find+Offer SD announcements (1s interval)"); // ── Main event loop ───────────────────────────────────────────────── diff --git a/examples/discovery_client/src/main.rs b/examples/discovery_client/src/main.rs index ae866bf..0500536 100644 --- a/examples/discovery_client/src/main.rs +++ b/examples/discovery_client/src/main.rs @@ -287,8 +287,8 @@ async fn main() -> Result<(), Error> { info!("Starting discovery client on interface {interface}"); - let (client, mut updates, run) = simple_someip::Client::::new(interface); - let _run_handle = tokio::spawn(run); + let (client, mut updates, run_fut) = simple_someip::Client::::new(interface); + tokio::spawn(run_fut); client.bind_discovery().await.unwrap(); let mut state = DiscoveryState::new(); diff --git a/src/client/inner.rs b/src/client/inner.rs index 6eb1a06..586f29a 100644 --- a/src/client/inner.rs +++ b/src/client/inner.rs @@ -603,20 +603,26 @@ where warn!("Failed to bind to interface: {}. Error: {:?}", interface, e); } } + // A dropped receiver is legitimate control flow + // (cancellation, `_no_wait` variants, panic + // recovery). `debug!` instead of `warn!` keeps + // observability for the "this shouldn't happen" + // case without cluttering production warn logs + // when callers deliberately drop. if response.send(bind_result).is_err() { - warn!("SetInterface response receiver dropped (caller canceled)"); + debug!("SetInterface: caller dropped the response receiver"); } } ControlMessage::BindDiscovery(response) => { let result = self.bind_discovery().await; if response.send(result).is_err() { - warn!("BindDiscovery response receiver dropped (caller canceled)"); + debug!("BindDiscovery: caller dropped the response receiver"); } } ControlMessage::UnbindDiscovery(response) => { self.unbind_discovery().await; if response.send(Ok(())).is_err() { - warn!("UnbindDiscovery response receiver dropped (caller canceled)"); + debug!("UnbindDiscovery: caller dropped the response receiver"); } } ControlMessage::SendSD(target, header, response) => { @@ -641,8 +647,8 @@ where e ); if response.send(Err(e)).is_err() { - warn!( - "SendSD error response receiver dropped (caller canceled)" + debug!( + "SendSD (bind-err path): caller dropped the response receiver" ); } } @@ -661,7 +667,7 @@ where .send(target, message) .await; if response.send(send_result).is_err() { - warn!("SendSD response receiver dropped (caller canceled)"); + debug!("SendSD: caller dropped the response receiver"); } } } @@ -690,7 +696,7 @@ where service_id, instance_id, addr, ); if response.send(Ok(())).is_err() { - warn!("AddEndpoint response receiver dropped (caller canceled)"); + debug!("AddEndpoint: caller dropped the response receiver"); } } ControlMessage::RemoveEndpoint(service_id, instance_id, response) => { @@ -703,7 +709,7 @@ where service_id, instance_id, ); if response.send(Ok(())).is_err() { - warn!("RemoveEndpoint response receiver dropped (caller canceled)"); + debug!("RemoveEndpoint: caller dropped the response receiver"); } } ControlMessage::SendToService { @@ -810,7 +816,11 @@ where instance_id, }; if self.service_registry.get(id).is_none() { - let _ = response.send(Err(Error::ServiceNotFound)); + if response.send(Err(Error::ServiceNotFound)).is_err() { + debug!( + "Subscribe (ServiceNotFound): caller dropped the response receiver (expected for subscribe_no_wait)" + ); + } return; } @@ -821,7 +831,11 @@ where port } Err(e) => { - let _ = response.send(Err(e)); + if response.send(Err(e)).is_err() { + debug!( + "Subscribe (bind-err): caller dropped the response receiver" + ); + } return; } }; @@ -849,7 +863,11 @@ where } } Err(e) => { - let _ = response.send(Err(e)); + if response.send(Err(e)).is_err() { + debug!( + "Subscribe (discovery-bind-err): caller dropped the response receiver" + ); + } } }, Some(discovery_socket) => { @@ -878,7 +896,9 @@ where .send(target, message) .await; if response.send(send_result).is_err() { - warn!("Subscribe response receiver dropped (caller canceled)"); + debug!( + "Subscribe: caller dropped the response receiver (expected for subscribe_no_wait)" + ); } } } diff --git a/src/client/mod.rs b/src/client/mod.rs index f360c6f..fef0d03 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -378,6 +378,22 @@ where /// channel, so it may block if that bounded channel is full. Useful for /// periodic renewals where waiting for subscription processing is /// unnecessary. + /// + /// The response oneshot is simply dropped at the end of this call. + /// The inner loop's send-to-dropped-receiver path is not logged at + /// `warn!`; at most it is logged at `debug!`, so fire-and-forget usage + /// remains low-noise. + /// + /// # Silent drop on a closed channel + /// + /// Unlike the other `Client` methods (which `.unwrap()` the `send` + /// result and therefore panic if the run-loop has exited and closed + /// the receiver), `subscribe_no_wait` deliberately discards the + /// `send` result. If the client run-loop has exited, the request is + /// silently dropped — there is no error surface and no panic. This + /// matches the fire-and-forget contract: callers that need to know + /// whether the subscription was actually dispatched should use + /// [`subscribe`](Self::subscribe) instead. pub async fn subscribe_no_wait( &self, service_id: u16, @@ -387,7 +403,7 @@ where event_group_id: u16, client_port: u16, ) { - let (response, message) = ControlMessage::subscribe( + let (_response, message) = ControlMessage::subscribe( service_id, instance_id, major_version, @@ -396,11 +412,6 @@ where client_port, ); let _ = self.control_sender.send(message).await; - // Consume the response in the background so the inner loop doesn't - // warn about a dropped receiver. - tokio::spawn(async move { - let _ = response.await; - }); } /// Returns the current SD reboot flag tracked by the client. @@ -866,6 +877,46 @@ mod tests { client.shut_down(); } + /// Stress test: 200 back-to-back `subscribe_no_wait` calls, each of + /// which drops its response oneshot. Phase 8(a) removed the + /// `tokio::spawn(drain-the-oneshot)` wrapper this function used to + /// have, and dropped the `warn!("...response receiver dropped")` + /// sites in the inner loop. Regressions that re-introduce either + /// would show up as either (a) hundreds of orphan spawned tasks + /// (not directly testable without instrumentation) or (b) log-noise + /// pollution / a hung inner loop (directly testable — asserted by + /// `assert_inner_alive` at the end). + #[tokio::test] + async fn test_subscribe_no_wait_fire_and_forget_stress() { + let (client, _updates, run_fut) = TestClient::new(Ipv4Addr::LOCALHOST); + tokio::spawn(run_fut); + + // Unknown service so the inner loop's ServiceNotFound branch + // fires on every iteration — that's the path where the + // response oneshot is dropped and the (removed) warn used to + // fire. 200 iterations is well above the control-channel + // buffer size (4) to also exercise backpressure. + for _ in 0..200 { + client + .subscribe_no_wait(0xFFFF, 0xFFFF, 1, 3, 0x01, 0) + .await; + } + + // Inner loop must still be responsive after the stress. + let msg = crate::protocol::Message::new_sd(1, &empty_sd_header()); + let result = tokio::time::timeout( + std::time::Duration::from_secs(2), + client.request(0xFFFF, 0xFFFF, msg), + ) + .await + .expect("inner loop unresponsive after 200 subscribe_no_wait calls"); + assert!( + matches!(result, Err(Error::ServiceNotFound)), + "expected ServiceNotFound, got {result:?}" + ); + client.shut_down(); + } + #[tokio::test] async fn test_bind_discovery_and_unbind() { let (client, _updates, run_fut) = TestClient::new(Ipv4Addr::LOCALHOST); diff --git a/src/client/socket_manager.rs b/src/client/socket_manager.rs index 4bb2fde..79d6499 100644 --- a/src/client/socket_manager.rs +++ b/src/client/socket_manager.rs @@ -1,3 +1,30 @@ +//! Client-side UDP socket management. +//! +//! Each bound socket is backed by a `TokioSocket` (concrete, phase-5 +//! compromise) with its I/O loop running on a `tokio::spawn`'d task. +//! That spawn is the last `tokio::spawn` call inside the library +//! critical path — every other spawn was hoisted out to the caller in +//! phase 6 / 7. This one can't be hoisted until a `Spawner` trait is +//! introduced (planned for phase 9). +//! +//! # Why the `tokio::spawn` in `bind_*` is still there +//! +//! Briefly experimented with having `Inner` drive per-socket futures +//! via `FuturesUnordered` (phase 8 attempt, reverted). That deadlocks: +//! `Inner::handle_control_message` awaits `SocketManager::send`, +//! which internally awaits an mpsc→oneshot round-trip that requires +//! the socket loop to make progress. But `Inner::run_future` is +//! parked inside the handler, so nothing polls the socket loop. +//! Concurrency between the two is mandatory; on tokio we get it via +//! `tokio::spawn` giving each socket its own task. +//! +//! The fix is either (a) a `Spawner` trait that `Inner::bind_*` uses +//! instead of calling `tokio::spawn` directly (small, contradicts the +//! phase-4 "no `ExecutorAdapter`" decision but warranted given concrete +//! evidence), or (b) a non-await `SocketManager::send` that defers +//! completion to a later `select!` iteration (invasive). Phase 9 +//! picks (a). + use crate::{ UDP_BUFFER_SIZE, e2e::{E2ECheckStatus, E2EKey, E2ERegistry}, @@ -40,7 +67,7 @@ pub struct SendMessage { response: tokio::sync::oneshot::Sender>, } -/// One iteration's select-outcome in `spawn_socket_loop`. The inner +/// One iteration's select-outcome in `socket_loop_future`. The inner /// block returns this scalar so the pinned per-iteration `send_fut` / /// `recv_fut` futures drop before the processing body — releasing their /// `&mut buf` / `&mut socket` borrows. @@ -114,9 +141,9 @@ where /// The factory must still produce a /// [`TokioSocket`](crate::tokio_transport::TokioSocket) because the /// spawned I/O loop is currently tokio-specific; the bound will be - /// relaxed to any `TransportSocket` once `spawn_socket_loop`'s outer - /// `tokio::spawn` is hoisted (planned for phase 8 alongside the - /// bare-metal example). + /// relaxed to any `TransportSocket` once the `tokio::spawn` that + /// drives `socket_loop_future` is hoisted out of `bind_discovery_*` + /// (tracked separately; phase 9+ spawner-trait work). pub async fn bind_discovery_seeded_with_transport( factory: &F, interface: Ipv4Addr, @@ -152,7 +179,8 @@ where let socket = factory.bind(bind_addr, &options).await?; socket.join_multicast_v4(sd::MULTICAST_IP, interface)?; - Self::spawn_socket_loop(socket, rx_tx, tx_rx, e2e_registry); + let fut = Self::socket_loop_future(socket, rx_tx, tx_rx, e2e_registry); + tokio::spawn(fut); Ok(Self { receiver: rx_rx, sender: tx_tx, @@ -190,7 +218,8 @@ where let socket = factory.bind(bind_addr, &options).await?; let port = socket.local_addr()?.port(); - Self::spawn_socket_loop(socket, rx_tx, tx_rx, e2e_registry); + let fut = Self::socket_loop_future(socket, rx_tx, tx_rx, e2e_registry); + tokio::spawn(fut); Ok(Self { receiver: rx_rx, sender: tx_tx, @@ -262,57 +291,53 @@ where _ = receiver.recv().await; } - /// Spawn the I/O loop over a concrete [`TokioSocket`]. + /// Build the I/O loop over a concrete [`TokioSocket`] as a future. + /// Callers are expected to `tokio::spawn` this future alongside + /// [`Self`]; the socket loop runs concurrently with its owner so + /// `SocketManager::send`'s internal oneshot wait can complete. + /// The reasoning for why the spawn hasn't been hoisted is in the + /// module-level docs. /// - /// The socket's trait methods (`send_to`, `recv_from`, - /// `join_multicast_v4`) are the entire I/O surface used inside — the - /// loop body does not call any `TokioSocket`-specific inherent - /// methods, so generalizing this function over `T: TransportSocket` - /// is a mechanical change once the outer `tokio::spawn` is hoisted - /// out (planned for phase 8 alongside the bare-metal example — - /// hoisting the spawn moves the `Send` requirement off this - /// function, sidestepping stable Rust's current inability to - /// express `Send` bounds on RPITIT method returns without nightly - /// return-type notation). + /// The function remains tied to `TokioSocket` concretely because + /// generalizing it to `T: TransportSocket` needs stable-Rust + /// return-type notation to express `Send` bounds on the trait's + /// RPITIT methods — still nightly as of this writing. #[allow(clippy::too_many_lines)] - fn spawn_socket_loop( + async fn socket_loop_future( socket: crate::tokio_transport::TokioSocket, rx_tx: mpsc::Sender, Error>>, mut tx_rx: mpsc::Receiver>, e2e_registry: Arc>, ) { - tokio::spawn(async move { - let mut buf = [0u8; UDP_BUFFER_SIZE]; - - loop { - // `select!` (not `select_biased!`) gives pseudo-random - // fairness across ready arms — matches prior - // `tokio::select!` behavior and avoids starving either - // the send or recv arm under sustained one-sided load. - // - // The fresh `.fuse()`'d per-iteration futures are pinned - // on the stack (required: `Fuse<_>` is not `Unpin`). - // Returning an `Outcome

` scalar from the inner block - // drops both pinned futures — and their `&mut buf` / - // `&mut socket` borrows — before the processing body - // below runs, so the body can re-borrow `buf` freely. - let outcome: Outcome = { - let send_fut = tx_rx.recv().fuse(); - let recv_fut = socket.recv_from(&mut buf).fuse(); - pin_mut!(send_fut, recv_fut); - select! { - message = send_fut => Outcome::Send(message), - result = recv_fut => Outcome::Recv(result), - } - }; - - match outcome { - Outcome::Send(Some(send_message)) => { - trace!("Sending: {:?}", &send_message); - let mut message_length = match send_message - .message - .encode(&mut buf.as_mut_slice()) - { + let mut buf = [0u8; UDP_BUFFER_SIZE]; + + loop { + // `select!` (not `select_biased!`) gives pseudo-random + // fairness across ready arms — matches prior + // `tokio::select!` behavior and avoids starving either + // the send or recv arm under sustained one-sided load. + // + // The fresh `.fuse()`'d per-iteration futures are pinned + // on the stack (required: `Fuse<_>` is not `Unpin`). + // Returning an `Outcome

` scalar from the inner block + // drops both pinned futures — and their `&mut buf` / + // `&mut socket` borrows — before the processing body + // below runs, so the body can re-borrow `buf` freely. + let outcome: Outcome = { + let send_fut = tx_rx.recv().fuse(); + let recv_fut = socket.recv_from(&mut buf).fuse(); + pin_mut!(send_fut, recv_fut); + select! { + message = send_fut => Outcome::Send(message), + result = recv_fut => Outcome::Recv(result), + } + }; + + match outcome { + Outcome::Send(Some(send_message)) => { + trace!("Sending: {:?}", &send_message); + let mut message_length = + match send_message.message.encode(&mut buf.as_mut_slice()) { Ok(length) => length, Err(e) => { error!("Failed to encode message: {:?}", e); @@ -326,144 +351,139 @@ where } }; - // Apply E2E protect if configured. `protected` - // is a disjoint stack buffer, so the input can - // be borrowed directly out of `buf[16..]` with - // no intermediate copy. - { - let key = - E2EKey::from_message_id(send_message.message.header().message_id()); - let mut registry = - e2e_registry.lock().expect("e2e registry lock poisoned"); - if registry.contains_key(&key) { - let upper_header: [u8; 8] = - buf[8..16].try_into().expect("upper header slice"); - let mut protected = [0u8; UDP_BUFFER_SIZE]; - let result = registry.protect( - key, - &buf[16..message_length], - upper_header, - &mut protected, - ); - match result { - Some(Ok(protected_len)) => { - if 16 + protected_len > UDP_BUFFER_SIZE { - error!( - "E2E-protected payload ({} bytes) exceeds UDP_BUFFER_SIZE ({}); dropping send", - 16 + protected_len, - UDP_BUFFER_SIZE - ); - let _ = send_message - .response - .send(Err(Error::Capacity("udp_buffer"))); - continue; - } - #[allow(clippy::cast_possible_truncation)] - let new_length: u32 = 8 + protected_len as u32; - buf[4..8].copy_from_slice(&new_length.to_be_bytes()); - buf[16..16 + protected_len] - .copy_from_slice(&protected[..protected_len]); - message_length = 16 + protected_len; - } - Some(Err(e)) => { - error!("E2E protect error: {:?}", e); + // Apply E2E protect if configured. `protected` + // is a disjoint stack buffer, so the input can + // be borrowed directly out of `buf[16..]` with + // no intermediate copy. + { + let key = + E2EKey::from_message_id(send_message.message.header().message_id()); + let mut registry = e2e_registry.lock().expect("e2e registry lock poisoned"); + if registry.contains_key(&key) { + let upper_header: [u8; 8] = + buf[8..16].try_into().expect("upper header slice"); + let mut protected = [0u8; UDP_BUFFER_SIZE]; + let result = registry.protect( + key, + &buf[16..message_length], + upper_header, + &mut protected, + ); + match result { + Some(Ok(protected_len)) => { + if 16 + protected_len > UDP_BUFFER_SIZE { + error!( + "E2E-protected payload ({} bytes) exceeds UDP_BUFFER_SIZE ({}); dropping send", + 16 + protected_len, + UDP_BUFFER_SIZE + ); + let _ = send_message + .response + .send(Err(Error::Capacity("udp_buffer"))); + continue; } - None => unreachable!("contains_key was true"), + #[allow(clippy::cast_possible_truncation)] + let new_length: u32 = 8 + protected_len as u32; + buf[4..8].copy_from_slice(&new_length.to_be_bytes()); + buf[16..16 + protected_len] + .copy_from_slice(&protected[..protected_len]); + message_length = 16 + protected_len; } + Some(Err(e)) => { + error!("E2E protect error: {:?}", e); + } + None => unreachable!("contains_key was true"), } } + } - match socket - .send_to(&buf[..message_length], send_message.target_addr) - .await - { - Ok(()) => { - trace!( - "Sent {} bytes to {}", - message_length, send_message.target_addr - ); - if let Ok(()) = send_message.response.send(Ok(())) { - } else { - info!("Socket owner closed channel, closing socket."); - // The sender has been dropped, so we should exit - break; - } + match socket + .send_to(&buf[..message_length], send_message.target_addr) + .await + { + Ok(()) => { + trace!( + "Sent {} bytes to {}", + message_length, send_message.target_addr + ); + if let Ok(()) = send_message.response.send(Ok(())) { + } else { + info!("Socket owner closed channel, closing socket."); + // The sender has been dropped, so we should exit + break; } - Err(e) => { - error!("Failed to send message with error: {:?}", e); - if let Ok(()) = send_message.response.send(Err(Error::Transport(e))) - { - } else { - error!( - "Socket owner closed channel unexpectedly, closing socket." - ); - break; - } + } + Err(e) => { + error!("Failed to send message with error: {:?}", e); + if let Ok(()) = send_message.response.send(Err(Error::Transport(e))) { + } else { + error!("Socket owner closed channel unexpectedly, closing socket."); + break; } } } - Outcome::Send(None) => { - info!("Send channel closed, closing socket."); - // The sender has been dropped, so we should exit - break; + } + Outcome::Send(None) => { + info!("Send channel closed, closing socket."); + // The sender has been dropped, so we should exit + break; + } + Outcome::Recv(Ok(ReceivedDatagram { + bytes_received, + source, + truncated, + })) => { + if truncated { + // A truncated datagram cannot be parsed reliably; + // the length field in the SOME/IP header will not + // match the bytes we received. Log and drop. + error!( + "Discarding truncated datagram from {}: {} bytes received", + source, bytes_received + ); + continue; } - Outcome::Recv(Ok(ReceivedDatagram { - bytes_received, - source, - truncated, - })) => { - if truncated { - // A truncated datagram cannot be parsed reliably; - // the length field in the SOME/IP header will not - // match the bytes we received. Log and drop. - error!( - "Discarding truncated datagram from {}: {} bytes received", - source, bytes_received - ); - continue; - } - let source_address = SocketAddr::V4(source); - let parse_result = MessageView::parse(&buf[..bytes_received]) - .and_then(|view| { - let header = view.header().to_owned(); - let upper_header = header.upper_header_bytes(); - let key = E2EKey::from_message_id(header.message_id()); - let payload_bytes = view.payload_bytes(); - - // Apply E2E check if configured - let (e2e_status, effective_payload) = { - let mut registry = - e2e_registry.lock().expect("e2e registry lock poisoned"); - match registry.check(key, payload_bytes, upper_header) { - Some((status, stripped)) => (Some(status), stripped), - None => (None, payload_bytes), - } - }; - - let payload = MessageDefinitions::from_payload_bytes( - header.message_id(), - effective_payload, - )?; - Ok(ReceivedMessage { - message: Message::new(header, payload), - source: source_address, - e2e_status, - }) + let source_address = SocketAddr::V4(source); + let parse_result = MessageView::parse(&buf[..bytes_received]) + .and_then(|view| { + let header = view.header().to_owned(); + let upper_header = header.upper_header_bytes(); + let key = E2EKey::from_message_id(header.message_id()); + let payload_bytes = view.payload_bytes(); + + // Apply E2E check if configured + let (e2e_status, effective_payload) = { + let mut registry = + e2e_registry.lock().expect("e2e registry lock poisoned"); + match registry.check(key, payload_bytes, upper_header) { + Some((status, stripped)) => (Some(status), stripped), + None => (None, payload_bytes), + } + }; + + let payload = MessageDefinitions::from_payload_bytes( + header.message_id(), + effective_payload, + )?; + Ok(ReceivedMessage { + message: Message::new(header, payload), + source: source_address, + e2e_status, }) - .map_err(Error::from); - if let Ok(()) = rx_tx.send(parse_result).await { - } else { - info!("Socket Dropping"); - // The receiver has been dropped, so we should exit - break; - } - } - Outcome::Recv(Err(recv_err)) => { - error!("Transport recv failed: {:?}", recv_err); + }) + .map_err(Error::from); + if let Ok(()) = rx_tx.send(parse_result).await { + } else { + info!("Socket Dropping"); + // The receiver has been dropped, so we should exit + break; } } + Outcome::Recv(Err(recv_err)) => { + error!("Transport recv failed: {:?}", recv_err); + } } - }); + } } } @@ -484,9 +504,13 @@ mod tests { Arc::new(Mutex::new(E2ERegistry::new())) } + async fn bind_ephemeral_spawned() -> TestSocketManager { + TestSocketManager::bind(0, test_registry()).await.unwrap() + } + #[tokio::test] async fn test_bind_ephemeral_port() { - let sm = TestSocketManager::bind(0, test_registry()).await.unwrap(); + let sm = bind_ephemeral_spawned().await; assert!(sm.port() > 0); assert_eq!(sm.session_id(), 1); } @@ -504,13 +528,13 @@ mod tests { #[tokio::test] async fn test_socket_manager_shut_down() { - let sm = TestSocketManager::bind(0, test_registry()).await.unwrap(); + let sm = bind_ephemeral_spawned().await; sm.shut_down().await; } #[tokio::test] async fn test_socket_manager_send_and_receive() { - let mut sm = TestSocketManager::bind(0, test_registry()).await.unwrap(); + let mut sm = bind_ephemeral_spawned().await; let sm_port = sm.port(); // Create a raw UDP socket to send data to the SocketManager @@ -542,7 +566,7 @@ mod tests { #[tokio::test] async fn test_poll_receive() { - let mut sm = TestSocketManager::bind(0, test_registry()).await.unwrap(); + let mut sm = bind_ephemeral_spawned().await; let sm_port = sm.port(); // Send a message to the socket manager from a raw socket @@ -568,7 +592,7 @@ mod tests { #[tokio::test] async fn test_send_drops_when_socket_loop_exits() { - let mut sm = TestSocketManager::bind(0, test_registry()).await.unwrap(); + let mut sm = bind_ephemeral_spawned().await; // Shut down the socket loop by dropping the internal channels // We can't directly kill the loop, but we can test the error path // by sending to a socket manager that has been shut down. @@ -612,7 +636,7 @@ mod tests { #[tokio::test] async fn test_socket_manager_debug() { - let sm = TestSocketManager::bind(0, test_registry()).await.unwrap(); + let sm = bind_ephemeral_spawned().await; let s = format!("{sm:?}"); assert!(s.contains("SocketManager")); sm.shut_down().await; @@ -620,7 +644,7 @@ mod tests { #[tokio::test] async fn test_socket_manager_send_to_target() { - let mut sm = TestSocketManager::bind(0, test_registry()).await.unwrap(); + let mut sm = bind_ephemeral_spawned().await; // Create a raw socket to receive let raw_socket = UdpSocket::bind("127.0.0.1:0").await.unwrap(); @@ -666,7 +690,7 @@ mod tests { #[tokio::test] async fn test_session_id_wraps_to_one_and_clears_reboot_flag() { - let mut sm = TestSocketManager::bind(0, test_registry()).await.unwrap(); + let mut sm = bind_ephemeral_spawned().await; let raw_socket = UdpSocket::bind("127.0.0.1:0").await.unwrap(); let target = SocketAddrV4::new(Ipv4Addr::LOCALHOST, raw_socket.local_addr().unwrap().port()); diff --git a/src/lib.rs b/src/lib.rs index 96c3d15..8a632c3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -26,12 +26,18 @@ //! //! | Feature | Default | Description | //! |---------|---------|-------------| -//! | `client` | no | Async tokio client; implies `std` + tokio + socket2 | -//! | `server` | no | Async tokio server; implies `std` + tokio + socket2 | -//! | `std` | no | Enables std-dependent helpers | -//! -//! By default only the `protocol`, trait, and `e2e` modules are compiled, and the crate -//! builds in `no_std` mode with no allocator requirement. +//! | `std` | yes | Enables std-dependent helpers (`RawPayload`, `VecSdHeader`, `OfferedEndpoint`) | +//! | `client` | no | Async tokio client; implies `std` + tokio + socket2 + futures | +//! | `server` | no | Async tokio server; implies `std` + tokio + socket2 + futures | +//! | `bare_metal` | no | Pure marker feature — enables no crate code. Reserved for future phases to gate `no_std` helper types. To exercise the bare-metal trait surface today, use the `examples/bare_metal` workspace member (`cargo run -p bare_metal`). **Does not make the crate fully bare-metal-complete**: the `client`/`server` feature paths still rely on `tokio::spawn` to drive per-socket I/O loops. A fully tokio-free build additionally requires a user-provided `Spawner` impl, planned as a trait alongside `TransportSocket` and `Timer`. | +//! +//! The default feature set is `["std"]`, which links `std` and enables +//! the `RawPayload` / `VecSdHeader` helpers. For a minimal build with +//! no allocator requirement — the `protocol`, trait, `transport`, and +//! `e2e` modules only — pass `--no-default-features`. The +//! trait-surface canary at `examples/bare_metal/` depends on the crate +//! with `default-features = false, features = ["bare_metal"]` and +//! proves the no-default-features build compiles. //! //! ## Examples //! diff --git a/tests/bare_metal_example_builds.rs b/tests/bare_metal_example_builds.rs new file mode 100644 index 0000000..ec992bf --- /dev/null +++ b/tests/bare_metal_example_builds.rs @@ -0,0 +1,22 @@ +//! Integration test: documents the intent that the `bare_metal` example +//! workspace member must compile cleanly. Guards against regressions in +//! the `transport`/`tokio_transport`/`Timer` trait surface that would +//! break bare-metal consumers. +//! +//! Compilation of the `bare_metal` example is already covered by +//! workspace-wide Cargo commands such as `cargo build --workspace`, +//! `cargo test --workspace`, or CI's `cargo clippy --workspace`, so +//! this file does not spawn a nested `cargo build` — nested cargo +//! invocations are redundant and flaky under lock contention. The test +//! body below is a minimal sanity check that the test harness ran at +//! all; the real coverage comes from those outer workspace-wide +//! checks. Keep this file so the regression's intent stays documented. + +#[test] +fn bare_metal_workspace_member_compiles() { + // Minimal canary: the test harness executed this test. Compilation of + // the `bare_metal` example itself is enforced by explicit + // workspace-wide checks (for example `cargo build --workspace`), + // not by spawning a nested `cargo build` here — so an empty body is + // sufficient. +}