From be292bb063ed6cb2b71fb81e76c0fb0493c8116a Mon Sep 17 00:00:00 2001 From: Feliciano Angulo Date: Sun, 31 May 2026 21:32:27 -0400 Subject: [PATCH 1/3] feat(server): make client+server+bare_metal alloc-free, add NonSdRequestCallback MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two related changes to land the no-alloc bare-metal server path: 1. drop _alloc from the server feature and gate every alloc usage (use alloc::sync::Arc, Wrappable handles, SocketOptions, sd-protocol import, run_inner's 64 KiB vec! and new_with_deps/new_passive_with_deps' Arc-wrap constructors) so client+server+bare_metal builds with zero __rust_alloc/__rg_alloc symbol references. The Server struct's started latch is now a feature-gated StartedLatch type alias (Arc under _alloc, &'static AtomicBool on bare metal) passed through ServerStorage; the H/Hsd/Hep default generics use cfg'd DefaultSocketHandle/DefaultSdStateHandle/DefaultEventPublisherHandle aliases so Arc names don't need to resolve under no-alloc. StaticSubscriptionHandle returns core::future::Ready instead of alloc::boxed::Box::pin (the lock closures are synchronous). 2. Add NonSdRequestCallback fn-pointer on ServerStorage/Server, threaded through run_with_buffers/run_combined/recv_loop, invoked in place of the historical 'non-SD ignored' branch — surfaces method requests / fire-and-forget calls (e.g. halo's HWP1* requests) to the consumer FFI without requiring a ChannelFactory-backed ServerUpdates channel. --- Cargo.toml | 9 ++- src/lib.rs | 4 +- src/server/mod.rs | 97 ++++++++++++++++++++++++++---- src/server/runtime.rs | 13 +++- src/server/subscription_manager.rs | 61 ++++++++----------- 5 files changed, 135 insertions(+), 49 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index e0a529a..d07f67e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -109,7 +109,14 @@ _alloc = [] # bringing `Arc>` / `Arc>` / # / `TokioTransport` / `TokioTimer` defaults into scope, and forces # `std`. -server = ["dep:futures-util", "_alloc"] +# +# `server` itself is alloc-free: the no-alloc path is +# `Server::new_with_handles` + `run_with_buffers` with static handles and +# a `&'static AtomicBool` run latch. The allocator-backed conveniences +# (`new_with_deps`/`new_passive_with_deps`, `run`/`run_inner`'s 64 KiB +# buffers, the `Arc` `StartedLatch`) are gated behind `_alloc`, which +# `std` / `server-tokio` / `embassy_channels` still pull in. +server = ["dep:futures-util"] server-tokio = ["server", "std", "dep:tokio", "dep:socket2"] # Marks a build as intended for bare-metal / no_std consumption. # Activates embassy-sync as the channel backend, the `static_channels` diff --git a/src/lib.rs b/src/lib.rs index 0cff68d..cfef6f3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -225,7 +225,9 @@ pub use client::{ // `ClientChannelTypes` elaboration limit at the wrong call site. pub use e2e::{E2ECheckStatus, E2EKey, E2EProfile}; #[cfg(feature = "server")] -pub use server::{Server, ServerDeps, ServerHandles, ServerStorage, SubscriptionHandle}; +pub use server::{ + NonSdRequestCallback, Server, ServerDeps, ServerHandles, ServerStorage, SubscriptionHandle, +}; #[cfg(any(feature = "client-tokio", feature = "server-tokio"))] pub use tokio_transport::{TokioChannels, TokioSocket, TokioSpawner, TokioTimer, TokioTransport}; #[cfg(feature = "bare_metal")] diff --git a/src/server/mod.rs b/src/server/mod.rs index ac6d61d..646d756 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -28,15 +28,22 @@ use core::sync::atomic::{AtomicBool, Ordering}; use crate::Timer; use crate::e2e::{E2EKey, E2EProfile}; +#[cfg(feature = "_alloc")] use crate::protocol::sd; #[cfg(test)] use crate::protocol::sd::{Entry, Flags, ServiceEntry}; use crate::transport::{ - E2ERegistryHandle, SharedHandle, SocketOptions, TransportFactory, TransportSocket, - WrappableSharedHandle, + E2ERegistryHandle, SharedHandle, TransportFactory, TransportSocket, }; +#[cfg(feature = "_alloc")] +use crate::transport::SocketOptions; +#[cfg(feature = "_alloc")] +use crate::transport::WrappableSharedHandle; +#[cfg(feature = "_alloc")] use alloc::sync::Arc; -use core::net::{Ipv4Addr, SocketAddrV4}; +use core::net::Ipv4Addr; +#[cfg(feature = "_alloc")] +use core::net::SocketAddrV4; #[cfg(test)] use std::vec::Vec; @@ -483,6 +490,15 @@ where /// e2e)>`; for no-alloc, a `&'static EventPublisher<...>` /// declared externally. pub publisher: Hep, + /// First-poll run latch. On alloc builds, pass + /// `Arc::new(AtomicBool::new(false))`; on no-alloc bare metal, pass + /// a `&'static AtomicBool` (declared as a `static`). Prevents two + /// run-futures built from the same `Server` from racing the sockets + /// and SD session counter. + pub started: StartedLatch, + /// Optional callback for non-SD unicast datagrams (method requests). + /// `None` reproduces the default "non-SD ignored" behavior. + pub non_sd_observer: Option, } /// SOME/IP Server that can offer services and publish events. @@ -504,14 +520,36 @@ where /// these as `TokioTransport` / `TokioTimer` / `Arc>` /// / `Arc>`. Bare-metal callers use /// [`Self::new_with_deps`] (under `server`) and supply their own. +/// Default shared-handle types for the `Server`'s `H` / `Hsd` / `Hep` +/// generic parameters. `Arc` when an allocator is present; +/// `&'static T` on no-alloc bare metal (where the caller supplies the +/// statics). Both satisfy `SharedHandle`. These defaults are only +/// materialized for callers that omit the handle parameters (the +/// allocator-backed convenience constructors); no-alloc callers spell +/// the handle types explicitly via `new_with_handles`. +#[cfg(feature = "_alloc")] +type DefaultSocketHandle = Arc<::Socket>; +#[cfg(not(feature = "_alloc"))] +type DefaultSocketHandle = &'static ::Socket; + +#[cfg(feature = "_alloc")] +type DefaultSdStateHandle = Arc; +#[cfg(not(feature = "_alloc"))] +type DefaultSdStateHandle = &'static SdStateManager; + +#[cfg(feature = "_alloc")] +type DefaultEventPublisherHandle = Arc>; +#[cfg(not(feature = "_alloc"))] +type DefaultEventPublisherHandle = &'static EventPublisher; + pub struct Server< F, Tm, R, Sub, - H = Arc<::Socket>, - Hsd = Arc, - Hep = Arc::Socket>>, + H = DefaultSocketHandle, + Hsd = DefaultSdStateHandle, + Hep = DefaultEventPublisherHandle::Socket>, > where F: TransportFactory + 'static, F::Socket: 'static, @@ -563,11 +601,34 @@ pub struct Server< /// constructor's tuple, [`Self::run`], or [`Self::run_with_buffers`]) /// short-circuit with `Err(Error::InvalidUsage("server_already_running"))` /// rather than racing on the same SD/unicast sockets and session - /// counter. `Arc` because the run-future captures a - /// clone independent of `&self`'s lifetime. - started: Arc, + /// counter. Held behind [`StartedLatch`] — `Arc` when an + /// allocator is present, `&'static AtomicBool` on no-alloc bare metal + /// — because the run-future captures an owned copy independent of + /// `&self`'s lifetime, and both alternatives are `Clone + 'static`. + started: StartedLatch, + /// Optional callback invoked for non-SD unicast datagrams received + /// on the service's port (method requests / fire-and-forget calls). + /// `None` preserves the historical "ignore non-SD" behavior; `Some` + /// surfaces those datagrams to the consumer (used by halo's FFI to + /// dispatch HWP1 method requests). + non_sd_observer: Option, } +/// Callback invoked by the server's `recv_loop` for every non-SD +/// unicast datagram received on the service's port (i.e. method +/// requests / fire-and-forget calls to the offered services). The +/// payload is the full raw datagram bytes; the caller is responsible +/// for re-parsing the SOME/IP header (and applying any E2E check) on +/// the consumer side. `fn` pointers are `Copy + Send + Sync + 'static`, +/// so they can be stored on the `Server` and captured by the +/// run-future without adding a new generic. +pub type NonSdRequestCallback = fn(data: &[u8], source: core::net::SocketAddrV4); + +#[cfg(feature = "_alloc")] +type StartedLatch = Arc; +#[cfg(not(feature = "_alloc"))] +type StartedLatch = &'static AtomicBool; + /// `Hep` resolved against the `server-tokio` convenience constructors' /// concrete defaults — the `EventPublisher` shape with all four /// publisher type parameters bound to their tokio impls. Lets the @@ -720,6 +781,7 @@ impl } } +#[cfg(feature = "_alloc")] impl Server where F: TransportFactory + 'static, @@ -823,6 +885,7 @@ where timer, is_passive: false, started: Arc::new(AtomicBool::new(false)), + non_sd_observer: None, }; let handles = ServerHandles { publisher: server.publisher(), @@ -905,6 +968,7 @@ where timer, is_passive: true, started: Arc::new(AtomicBool::new(false)), + non_sd_observer: None, }; let handles = ServerHandles { publisher: server.publisher(), @@ -988,7 +1052,8 @@ where factory: deps.factory, timer: deps.timer, is_passive: false, - started: Arc::new(AtomicBool::new(false)), + started: deps.started, + non_sd_observer: deps.non_sd_observer, }) } @@ -1053,7 +1118,8 @@ where factory: deps.factory, timer: deps.timer, is_passive: true, - started: Arc::new(AtomicBool::new(false)), + started: deps.started, + non_sd_observer: deps.non_sd_observer, }) } @@ -1159,6 +1225,8 @@ where let sd_state = self.sd_state.clone(); let timer = self.timer.clone(); let is_passive = self.is_passive; + let non_sd_observer = self.non_sd_observer; + #[allow(noop_method_call)] let started = self.started.clone(); async move { @@ -1187,6 +1255,7 @@ where is_passive, unicast_buf, sd_buf, + non_sd_observer, ) .await } @@ -1212,6 +1281,7 @@ where /// # Errors /// /// Same as [`Self::run_with_buffers`]. + #[cfg(feature = "_alloc")] pub fn run( &self, ) -> impl core::future::Future> @@ -1242,6 +1312,7 @@ where /// declared bound — callers should prefer `run` (Send-checked at /// the API boundary) or `run_with_buffers` (explicitly no `Send` /// requirement). + #[cfg(feature = "_alloc")] fn run_inner( &self, ) -> impl core::future::Future> @@ -1254,6 +1325,7 @@ where let sd_state = self.sd_state.clone(); let timer = self.timer.clone(); let is_passive = self.is_passive; + let non_sd_observer = self.non_sd_observer; let started = self.started.clone(); async move { @@ -1288,6 +1360,7 @@ where is_passive, &mut unicast_buf, &mut sd_buf, + non_sd_observer, ) .await } @@ -1470,6 +1543,8 @@ mod tests { sd_socket, sd_state: Arc::new(SdStateManager::new()), publisher, + started: Arc::new(AtomicBool::new(false)), + non_sd_observer: None, }; (handles, bound_port) } diff --git a/src/server/runtime.rs b/src/server/runtime.rs index 4a022ed..f1b5af9 100644 --- a/src/server/runtime.rs +++ b/src/server/runtime.rs @@ -441,6 +441,7 @@ async fn recv_loop( subscriptions: &Sub, unicast_buf: &mut [u8], sd_buf: &mut [u8], + non_sd_observer: Option, ) -> Result<(), Error> where T: TransportSocket, @@ -535,6 +536,14 @@ where crate::log::warn!("Failed to parse SD message: {:?}", e); } } + } else if let Some(cb) = non_sd_observer { + // Surface non-SD unicast (method requests / fire-and-forget + // calls to offered services) via the registered callback. + // The full raw datagram is forwarded; the consumer is + // responsible for re-parsing and any E2E check. + if let core::net::SocketAddr::V4(src_v4) = addr { + cb(data, src_v4); + } } else { crate::log::trace!("Non-SD SOME/IP message, ignoring"); } @@ -560,6 +569,7 @@ where /// and only the receive loop drives — used by the dispatcher topology /// where a co-located `Client` emits `OfferService` on the server's /// behalf. +#[allow(clippy::too_many_arguments)] pub(super) async fn run_combined( config: ServerConfig, unicast_socket: H, @@ -570,6 +580,7 @@ pub(super) async fn run_combined( is_passive: bool, unicast_buf: &mut [u8], sd_buf: &mut [u8], + non_sd_observer: Option, ) -> Result<(), Error> where H: SharedHandle, @@ -593,7 +604,7 @@ where let sd = sd_socket.get(); let sd_state_ref = sd_state.get(); - let recv_fut = recv_loop(&config, unicast, sd, sd_state_ref, &subscriptions, unicast_buf, sd_buf); + let recv_fut = recv_loop(&config, unicast, sd, sd_state_ref, &subscriptions, unicast_buf, sd_buf, non_sd_observer); if config.announce { let announce_fut = announce_loop(&config, sd, sd_state_ref, &timer); diff --git a/src/server/subscription_manager.rs b/src/server/subscription_manager.rs index ade5b21..7c3a59e 100644 --- a/src/server/subscription_manager.rs +++ b/src/server/subscription_manager.rs @@ -492,21 +492,15 @@ pub mod bare_metal_subscription_impl { } impl SubscriptionHandle for StaticSubscriptionHandle { - // Futures are `Send` even though `SubscriptionManager` itself - // isn't `Sync` — the `embassy-sync` - // `CriticalSectionRawMutex` wrapping it IS `Sync`, and the - // future bodies have no `.await` points inside the lock - // closure (they capture only the `&'static` storage handle - // and the by-value args, all `Send`). Boxing with `+ Send` - // lets `Server::run`'s `Send` bound be satisfied. The - // `server` feature (required for `SubscriptionHandle` to be - // in scope) implies `_alloc`, so `Box::pin` is always - // available here. - type SubscribeFuture<'a> = core::pin::Pin< - alloc::boxed::Box> + Send + 'a>, - >; - type UnsubscribeFuture<'a> = - core::pin::Pin + Send + 'a>>; + // The lock closure is fully synchronous (no `.await` inside the + // critical section), so each operation completes immediately and + // the returned future is a concrete `core::future::Ready` — no + // heap, no `Box::pin`. This keeps the no-alloc bare-metal path + // free of `alloc` (the `server` feature no longer implies + // `_alloc`). `Ready` is `Send` when `T: Send`, satisfying any + // `Send`-checked run path. + type SubscribeFuture<'a> = core::future::Ready>; + type UnsubscribeFuture<'a> = core::future::Ready<()>; fn subscribe( &self, @@ -516,16 +510,14 @@ pub mod bare_metal_subscription_impl { subscriber_addr: SocketAddrV4, ) -> Self::SubscribeFuture<'_> { let storage = self.0; - alloc::boxed::Box::pin(async move { - storage.lock(|cell| { - cell.borrow_mut().subscribe( - service_id, - instance_id, - event_group_id, - subscriber_addr, - ) - }) - }) + core::future::ready(storage.lock(|cell| { + cell.borrow_mut().subscribe( + service_id, + instance_id, + event_group_id, + subscriber_addr, + ) + })) } fn unsubscribe( @@ -536,16 +528,15 @@ pub mod bare_metal_subscription_impl { subscriber_addr: SocketAddrV4, ) -> Self::UnsubscribeFuture<'_> { let storage = self.0; - alloc::boxed::Box::pin(async move { - storage.lock(|cell| { - cell.borrow_mut().unsubscribe( - service_id, - instance_id, - event_group_id, - subscriber_addr, - ); - }); - }) + storage.lock(|cell| { + cell.borrow_mut().unsubscribe( + service_id, + instance_id, + event_group_id, + subscriber_addr, + ); + }); + core::future::ready(()) } fn for_each_subscriber<'a, F>( From 3dafd3e57dc2eb557243eeda0c795f6a052b1136 Mon Sep 17 00:00:00 2001 From: Feliciano Angulo <91559562+FelicianoAngulo2@users.noreply.github.com> Date: Mon, 1 Jun 2026 11:48:55 -0400 Subject: [PATCH 2/3] Apply suggestions from code review add non-SD observer support and no-alloc witness tests Co-authored-by: Justin Kovacich <32140377+JustinKovacich@users.noreply.github.com> --- Cargo.toml | 5 + examples/bare_metal_server/src/main.rs | 1 + examples/embassy_net_client/src/main.rs | 1 + src/server/mod.rs | 31 +++- src/server/runtime.rs | 13 +- tests/bare_metal_e2e.rs | 2 + tests/bare_metal_server.rs | 222 ++++++++++++++++++++++++ tests/no_alloc_server_witness.rs | 199 +++++++++++++++++++++ 8 files changed, 468 insertions(+), 6 deletions(-) create mode 100644 tests/no_alloc_server_witness.rs diff --git a/Cargo.toml b/Cargo.toml index d07f67e..cb2e77c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -176,6 +176,11 @@ harness = false name = "bare_metal_server" required-features = ["server", "bare_metal"] +[[test]] +name = "no_alloc_server_witness" +required-features = ["server", "bare_metal"] +harness = false + [[test]] name = "bare_metal_e2e" required-features = ["client", "server", "bare_metal"] diff --git a/examples/bare_metal_server/src/main.rs b/examples/bare_metal_server/src/main.rs index 65082bf..7bbab5b 100644 --- a/examples/bare_metal_server/src/main.rs +++ b/examples/bare_metal_server/src/main.rs @@ -260,6 +260,7 @@ async fn main() { timer: MockTimer, e2e_registry: e2e, subscriptions: subs, + non_sd_observer: None, }, config, false, // multicast_loopback diff --git a/examples/embassy_net_client/src/main.rs b/examples/embassy_net_client/src/main.rs index fbf2726..9f00c67 100644 --- a/examples/embassy_net_client/src/main.rs +++ b/examples/embassy_net_client/src/main.rs @@ -374,6 +374,7 @@ async fn main() { timer: LocalTimer, e2e_registry: server_e2e, subscriptions: InMemorySubscriptions::default(), + non_sd_observer: None, }; // Default `H = Arc`. Annotation is explicit diff --git a/src/server/mod.rs b/src/server/mod.rs index 646d756..890b1e6 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -286,6 +286,13 @@ where /// `Arc>` for this; bare-metal callers /// supply their own [`SubscriptionHandle`] impl. pub subscriptions: Sub, + /// Optional callback invoked from the server's receive loop for every + /// non-SD **unicast** datagram (method requests / fire-and-forget calls + /// to offered services). `None` reproduces the historical + /// "non-SD ignored" behavior. The callback receives the full raw + /// datagram bytes and the source `SocketAddrV4`; the consumer is + /// responsible for re-parsing the SOME/IP header and any E2E check. + pub non_sd_observer: Option, } /// Tokio-defaulted constructor. @@ -330,6 +337,7 @@ impl timer: crate::tokio_transport::TokioTimer, e2e_registry: Arc::new(Mutex::new(E2ERegistry::new())), subscriptions: Arc::new(RwLock::new(SubscriptionManager::new())), + non_sd_observer: None, } } } @@ -354,6 +362,7 @@ where timer: self.timer, e2e_registry: self.e2e_registry, subscriptions: self.subscriptions, + non_sd_observer: self.non_sd_observer, } } @@ -365,6 +374,7 @@ where timer, e2e_registry: self.e2e_registry, subscriptions: self.subscriptions, + non_sd_observer: self.non_sd_observer, } } @@ -379,6 +389,7 @@ where timer: self.timer, e2e_registry, subscriptions: self.subscriptions, + non_sd_observer: self.non_sd_observer, } } @@ -393,8 +404,19 @@ where timer: self.timer, e2e_registry: self.e2e_registry, subscriptions, + non_sd_observer: self.non_sd_observer, } } + + /// Register a callback invoked for every non-SD unicast datagram + /// (method requests / fire-and-forget calls to offered services). + /// Passing `None` (the default if unset) preserves the historical + /// "ignore non-SD" behavior. + #[must_use] + pub fn with_non_sd_observer(mut self, observer: Option) -> Self { + self.non_sd_observer = observer; + self + } } /// Post-construction accessor bundle returned from `Server::new` (and @@ -736,6 +758,7 @@ impl timer: crate::tokio_transport::TokioTimer, e2e_registry: Arc::new(Mutex::new(E2ERegistry::new())), subscriptions: Arc::new(RwLock::new(SubscriptionManager::new())), + non_sd_observer: None, }; Self::new_with_deps(deps, config, multicast_loopback).await } @@ -776,6 +799,7 @@ impl timer: crate::tokio_transport::TokioTimer, e2e_registry: Arc::new(Mutex::new(E2ERegistry::new())), subscriptions: Arc::new(RwLock::new(SubscriptionManager::new())), + non_sd_observer: None, }; Self::new_passive_with_deps(deps, config).await } @@ -829,6 +853,7 @@ where timer, e2e_registry, subscriptions, + non_sd_observer: deps_non_sd_observer, } = deps; // Bind unicast socket for receiving subscriptions, then wrap @@ -885,7 +910,7 @@ where timer, is_passive: false, started: Arc::new(AtomicBool::new(false)), - non_sd_observer: None, + non_sd_observer: deps_non_sd_observer, }; let handles = ServerHandles { publisher: server.publisher(), @@ -921,6 +946,7 @@ where timer, e2e_registry, subscriptions, + non_sd_observer: deps_non_sd_observer, } = deps; // Bind unicast socket at the configured local_port. @@ -968,7 +994,7 @@ where timer, is_passive: true, started: Arc::new(AtomicBool::new(false)), - non_sd_observer: None, + non_sd_observer: deps_non_sd_observer, }; let handles = ServerHandles { publisher: server.publisher(), @@ -1777,6 +1803,7 @@ mod tests { timer: TokioTimer, e2e_registry: Arc::new(Mutex::new(E2ERegistry::new())), subscriptions: subscriptions.clone(), + non_sd_observer: None, }; let config = ServerConfig::new(0x5B, 1) .with_interface(Ipv4Addr::LOCALHOST) diff --git a/src/server/runtime.rs b/src/server/runtime.rs index f1b5af9..ed56df7 100644 --- a/src/server/runtime.rs +++ b/src/server/runtime.rs @@ -433,6 +433,7 @@ pub(super) async fn announce_loop( /// Receive loop body — drives `recv_from` on both the unicast and SD /// sockets, dispatches SD messages to [`handle_sd_message`]. +#[allow(clippy::too_many_arguments)] async fn recv_loop( config: &ServerConfig, unicast_socket: &T, @@ -536,16 +537,20 @@ where crate::log::warn!("Failed to parse SD message: {:?}", e); } } - } else if let Some(cb) = non_sd_observer { + } else if from_unicast { // Surface non-SD unicast (method requests / fire-and-forget // calls to offered services) via the registered callback. // The full raw datagram is forwarded; the consumer is // responsible for re-parsing and any E2E check. - if let core::net::SocketAddr::V4(src_v4) = addr { - cb(data, src_v4); + if let Some(cb) = non_sd_observer { + if let core::net::SocketAddr::V4(src_v4) = addr { + cb(data, src_v4); + } + } else { + crate::log::trace!("Non-SD unicast SOME/IP message, no observer registered — ignoring"); } } else { - crate::log::trace!("Non-SD SOME/IP message, ignoring"); + crate::log::trace!("Non-SD multicast SOME/IP message, ignoring"); } } Err(e) => { diff --git a/tests/bare_metal_e2e.rs b/tests/bare_metal_e2e.rs index 7127eaa..12f1091 100644 --- a/tests/bare_metal_e2e.rs +++ b/tests/bare_metal_e2e.rs @@ -363,6 +363,7 @@ async fn client_receives_server_sd_announcement() { timer: MockTimer, e2e_registry: server_e2e, subscriptions: server_subs, + non_sd_observer: None, }; let (_server, _handles, run): ( @@ -459,6 +460,7 @@ async fn client_send_request_server_runloop_stable() { timer: MockTimer, e2e_registry: server_e2e, subscriptions: server_subs, + non_sd_observer: None, }; let (_server, _handles, run): ( diff --git a/tests/bare_metal_server.rs b/tests/bare_metal_server.rs index b487a99..ba96752 100644 --- a/tests/bare_metal_server.rs +++ b/tests/bare_metal_server.rs @@ -37,6 +37,7 @@ use simple_someip::server::{SubscribeError, Subscriber, SubscriptionHandle}; use simple_someip::transport::{ ReceivedDatagram, SocketOptions, Timer, TransportError, TransportFactory, TransportSocket, }; +use simple_someip::server::NonSdRequestCallback; use simple_someip::{Server, ServerDeps}; // ── Mock transport ───────────────────────────────────────────────────── @@ -289,6 +290,7 @@ async fn server_constructible_without_server_tokio_feature() { timer: MockTimer, e2e_registry: e2e_handle, subscriptions: subs, + non_sd_observer: None, }; let (_server, _handles, run): ( @@ -336,6 +338,7 @@ async fn passive_server_constructible_without_server_tokio_feature() { timer: MockTimer, e2e_registry: e2e_handle, subscriptions: subs, + non_sd_observer: None, }; let (_server, _handles, _run): ( @@ -346,3 +349,222 @@ async fn passive_server_constructible_without_server_tokio_feature() { .await .expect("Server::new_passive_with_deps must succeed with no-tokio mocks"); } + +// ── NonSdRequestCallback witness ────────────────────────────────────── +// +// Drives a non-SD unicast datagram through the server's `recv_loop` +// and verifies the registered callback receives the right bytes + source. +// The companion test confirms `None` preserves the historical +// "ignore non-SD" behavior. + +// `NonSdRequestCallback` is `fn(&[u8], SocketAddrV4)` — a plain function +// pointer, so it can't capture environment. Each test parks its +// observation in a dedicated static so the callback can write into it +// without interfering with sibling tests (cargo runs tests in parallel +// within a test binary). + +use std::sync::OnceLock; + +static OBSERVED_SOME: OnceLock, SocketAddrV4)>>> = OnceLock::new(); +static OBSERVED_NONE: OnceLock, SocketAddrV4)>>> = OnceLock::new(); + +fn record_some(data: &[u8], source: SocketAddrV4) { + let slot = OBSERVED_SOME.get_or_init(|| Mutex::new(None)); + *slot.lock().unwrap() = Some((data.to_vec(), source)); +} + +fn record_none(data: &[u8], source: SocketAddrV4) { + let slot = OBSERVED_NONE.get_or_init(|| Mutex::new(None)); + *slot.lock().unwrap() = Some((data.to_vec(), source)); +} + +/// Build a minimal SOME/IP method-request datagram (16-byte header, +/// no payload, message_type = Request). The exact byte layout matches +/// the on-wire SOME/IP header format documented in +/// AUTOSAR_SWS_SOMEIPProtocol §4 — checked-by-encode against +/// `simple_someip::protocol::Header::encode` would be cleaner but the +/// header is small enough to spell out by hand and avoids dragging +/// the encoder dep into the test. +fn build_method_request(service_id: u16, method_id: u16) -> Vec { + let mut buf = Vec::with_capacity(16); + buf.extend_from_slice(&service_id.to_be_bytes()); // message_id (high) + buf.extend_from_slice(&method_id.to_be_bytes()); // message_id (low) + buf.extend_from_slice(&8u32.to_be_bytes()); // length = header(8) + payload(0) + buf.extend_from_slice(&0u32.to_be_bytes()); // request_id + buf.push(1); // protocol_version + buf.push(1); // interface_version + buf.push(0); // message_type = Request (0x00) + buf.push(0); // return_code = OK + buf +} + +async fn drive_until bool>(mut check: F) { + // Yield enough times for the spawned run-future to pick up the + // queued inbound datagram from the mock pipe. The mock socket's + // `recv_from` resolves immediately when a datagram is queued, so a + // few yields are sufficient on the multi-threaded runtime; we + // bound it at 200 yields (~ms) to keep the test snappy. + for _ in 0..200 { + if check() { + return; + } + tokio::task::yield_now().await; + } + panic!("timed out waiting for condition (callback never fired or assertion never held)"); +} + +#[tokio::test] +async fn non_sd_observer_some_receives_unicast_method_request() { + let pipe = Arc::new(MockPipe::default()); + let factory = MockFactory { + pipe: Arc::clone(&pipe), + next_port: Arc::new(Mutex::new(0)), + }; + + let e2e_handle: Arc> = Arc::new(Mutex::new(E2ERegistry::new())); + let subs = MockSubscriptions::default(); + + let config = ServerConfig::new(0x1234, 1) + .with_interface(Ipv4Addr::LOCALHOST) + .with_local_port(30700); + + let deps: ServerDeps>, MockSubscriptions> = + ServerDeps { + factory, + timer: MockTimer, + e2e_registry: e2e_handle, + subscriptions: subs, + non_sd_observer: Some(record_some as NonSdRequestCallback), + }; + + let (_server, _handles, run): ( + Server>, MockSubscriptions>, + _, + _, + ) = Server::new_with_deps(deps, config, false) + .await + .expect("Server::new_with_deps must succeed"); + + let handle = tokio::spawn(run); + + // Queue a non-SD unicast method-request datagram. `from_unicast` + // distinguishes which socket received it: the first socket bound + // by `MockFactory` (port 30700) is the unicast socket; the second + // (port 30490) is the SD socket. Since the mock pipe is shared + // across both sockets, we drive the datagram into the unicast + // arm by tagging it with a non-SD message_id and relying on the + // `select_biased!`'s prefer-unicast tick to pick the unicast + // future first. + let payload = build_method_request(0x1234, 0x0001); + let src = SocketAddrV4::new(Ipv4Addr::new(192, 0, 2, 100), 40000); + pipe.inbound + .lock() + .unwrap() + .push_back((payload.clone(), src)); + if let Some(w) = pipe.inbound_waker.lock().unwrap().take() { + w.wake(); + } + + drive_until(|| { + OBSERVED_SOME + .get() + .and_then(|m| m.lock().unwrap().clone()) + .is_some() + }) + .await; + + let (got_data, got_src) = OBSERVED_SOME + .get() + .unwrap() + .lock() + .unwrap() + .clone() + .expect("callback fired"); + assert_eq!( + got_data, payload, + "callback must receive the full raw datagram bytes" + ); + assert_eq!(got_src, src, "callback must receive the original source"); + + handle.abort(); + let _ = handle.await; +} + +#[tokio::test] +async fn non_sd_observer_none_preserves_ignore_behavior() { + let pipe = Arc::new(MockPipe::default()); + let factory = MockFactory { + pipe: Arc::clone(&pipe), + next_port: Arc::new(Mutex::new(0)), + }; + + let e2e_handle: Arc> = Arc::new(Mutex::new(E2ERegistry::new())); + let subs = MockSubscriptions::default(); + + let config = ServerConfig::new(0x1234, 1) + .with_interface(Ipv4Addr::LOCALHOST) + .with_local_port(30701); + + // Same `record_none` is installed as a witness that the path the + // observer would have taken is NOT walked when the field is None. + // The pipe still receives the datagram and the recv_loop still + // parses it — but the `else if from_unicast` arm with `None` + // falls through to the trace log, never invoking the callback. + // We don't actually wire the function pointer into the server + // (deps.non_sd_observer = None), but we keep a `record_none` so a + // future regression that accidentally fires *any* observer would + // populate OBSERVED_NONE and trip the assertion below. + let _kept_alive_to_witness_no_invocation: NonSdRequestCallback = record_none; + + let deps: ServerDeps>, MockSubscriptions> = + ServerDeps { + factory, + timer: MockTimer, + e2e_registry: e2e_handle, + subscriptions: subs, + non_sd_observer: None, + }; + + let (_server, _handles, run): ( + Server>, MockSubscriptions>, + _, + _, + ) = Server::new_with_deps(deps, config, false) + .await + .expect("Server::new_with_deps must succeed"); + + let handle = tokio::spawn(run); + + let payload = build_method_request(0x1234, 0x0001); + let src = SocketAddrV4::new(Ipv4Addr::new(192, 0, 2, 101), 40001); + pipe.inbound + .lock() + .unwrap() + .push_back((payload.clone(), src)); + if let Some(w) = pipe.inbound_waker.lock().unwrap().take() { + w.wake(); + } + + // Let the run-future poll the inbound queue enough times to dequeue + // and process the datagram. Since the path doesn't invoke any + // callback, there's no positive signal — we just need to give the + // recv_loop a window to act, then confirm OBSERVED_NONE stayed empty. + for _ in 0..50 { + tokio::task::yield_now().await; + } + // Belt-and-braces: also wait a real tick so the unicast `select_biased!` + // arm cycles at least once. + tokio::time::sleep(Duration::from_millis(10)).await; + + let observed = OBSERVED_NONE + .get() + .and_then(|m| m.lock().unwrap().clone()); + assert!( + observed.is_none(), + "callback must NOT fire when non_sd_observer is None; got {:?}", + observed + ); + + handle.abort(); + let _ = handle.await; +} diff --git a/tests/no_alloc_server_witness.rs b/tests/no_alloc_server_witness.rs new file mode 100644 index 0000000..7761493 --- /dev/null +++ b/tests/no_alloc_server_witness.rs @@ -0,0 +1,199 @@ +//! No-alloc CI gate (server side): proves the `server + bare_metal` +//! configuration's static handle types — specifically +//! [`StaticSubscriptionHandle`] — execute their hot paths without +//! invoking the global allocator. +//! +//! Sibling to [`tests/no_alloc_witness.rs`] (which gates the +//! `client + bare_metal` side). The harness shape is identical: a +//! [`PanicAllocator`] replaces the global allocator and is armed only +//! around the witnessed closures, turning any forbidden allocation +//! into a [`std::process::abort`] (and a hard CI failure). +//! +//! # Why `harness = false` +//! +//! `libtest` allocates during process startup — see the lengthy comment +//! in `no_alloc_witness.rs` for the rationale. The end result: this +//! file defines its own `main()` and reports a single pseudo-test name +//! to cargo-nextest's `--list` probe. +//! +//! # What is witnessed +//! +//! 1. [`StaticSubscriptionHandle::subscribe`] and `unsubscribe` now +//! return concrete [`core::future::Ready`] futures (the +//! [`feat/no-alloc-bare-metal`] fork's no-alloc rewrite of the +//! previously-`Box::pin`'d versions). Constructing the future and +//! polling it to `Ready` must not allocate. +//! 2. [`StaticSubscriptionHandle::for_each_subscriber`] iterates the +//! backing storage without allocating — the closure is invoked +//! per subscriber from inside the critical-section mutex. +//! +//! Together these are the load-bearing additions that let the +//! `server` Cargo feature drop its `_alloc` implication. The +//! upstream `nm` audit on `client,server,bare_metal` is the static +//! complement; this file is the dynamic complement. +//! +//! # What this does not witness +//! +//! - The full `Server::run_with_buffers` loop (requires a no-alloc +//! `TransportFactory`, `Timer`, and `EventPublisher` — out of scope +//! for the witness; the `examples/bare_metal_server` workspace +//! member is the compile-witness for that surface). +//! - Construction-time allocations inside `SubscriptionManager`'s +//! `heapless::Vec` backing storage. Those are by-design alloc-free +//! but happen outside the armed window. + +#![cfg(all(feature = "server", feature = "bare_metal"))] + +use core::cell::{Cell, RefCell}; +use core::future::Future; +use core::net::{Ipv4Addr, SocketAddrV4}; +use core::pin::Pin; +use core::sync::atomic::{AtomicBool, Ordering}; +use core::task::{Context, Poll, Waker}; +use std::alloc::{GlobalAlloc, Layout, System}; +use std::process; + +use embassy_sync::blocking_mutex::Mutex as BlockingMutex; +use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex; + +use simple_someip::server::{ + StaticSubscriptionHandle, StaticSubscriptionStorage, SubscriptionHandle, SubscriptionManager, +}; + +// ── Panic allocator ─────────────────────────────────────────────────────── + +static ARMED: AtomicBool = AtomicBool::new(false); + +struct PanicAllocator; + +fn diagnose_and_abort(kind: &str, size: usize, align_or_new: usize) -> ! { + ARMED.store(false, Ordering::SeqCst); + eprintln!( + "no_alloc_server_witness: forbidden allocation ({kind}): {size} bytes / {align_or_new}" + ); + process::abort(); +} + +unsafe impl GlobalAlloc for PanicAllocator { + unsafe fn alloc(&self, layout: Layout) -> *mut u8 { + if ARMED.load(Ordering::Acquire) { + diagnose_and_abort("alloc", layout.size(), layout.align()); + } + unsafe { System.alloc(layout) } + } + unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) { + unsafe { System.dealloc(ptr, layout) } + } + unsafe fn alloc_zeroed(&self, layout: Layout) -> *mut u8 { + if ARMED.load(Ordering::Acquire) { + diagnose_and_abort("alloc_zeroed", layout.size(), layout.align()); + } + unsafe { System.alloc_zeroed(layout) } + } + unsafe fn realloc(&self, ptr: *mut u8, layout: Layout, new_size: usize) -> *mut u8 { + if ARMED.load(Ordering::Acquire) { + diagnose_and_abort("realloc", layout.size(), new_size); + } + unsafe { System.realloc(ptr, layout, new_size) } + } +} + +#[global_allocator] +static GLOBAL: PanicAllocator = PanicAllocator; + +fn assert_no_alloc(label: &str, f: impl FnOnce() -> T) -> T { + ARMED.store(true, Ordering::SeqCst); + let result = f(); + ARMED.store(false, Ordering::SeqCst); + println!(" [pass] {label}"); + result +} + +/// Drive a future to completion with a no-op waker on the main thread. +/// All `StaticSubscriptionHandle` futures are synchronous (`Ready`), so +/// a single poll suffices; we panic otherwise to surface any future +/// regression that re-introduces a yield point. +fn poll_once_to_ready(mut fut: Pin<&mut F>) -> F::Output { + let waker = Waker::noop(); + let mut cx = Context::from_waker(waker); + match fut.as_mut().poll(&mut cx) { + Poll::Ready(v) => v, + Poll::Pending => panic!( + "StaticSubscriptionHandle future returned Pending; \ + the no-alloc contract requires synchronous completion \ + (no .await inside the critical-section lock)" + ), + } +} + +// ── Backing storage ─────────────────────────────────────────────────────── +// +// `SubscriptionManager::new()` is `const`, so the backing storage can +// live in a plain `static` — no `Box::leak` needed. + +static SUBS: StaticSubscriptionStorage = + BlockingMutex::>::new(RefCell::new( + SubscriptionManager::new(), + )); + +// ── Witnesses ───────────────────────────────────────────────────────────── + +fn witness_static_subscription_handle() { + let handle = StaticSubscriptionHandle::new(&SUBS); + let a1 = SocketAddrV4::new(Ipv4Addr::new(10, 0, 0, 1), 8001); + let a2 = SocketAddrV4::new(Ipv4Addr::new(10, 0, 0, 2), 8002); + + assert_no_alloc("StaticSubscriptionHandle::subscribe", || { + let mut fut = core::pin::pin!(handle.subscribe(0x5B, 1, 0x01, a1)); + poll_once_to_ready(fut.as_mut()).expect("subscribe must succeed"); + let mut fut = core::pin::pin!(handle.subscribe(0x5B, 1, 0x01, a2)); + poll_once_to_ready(fut.as_mut()).expect("subscribe must succeed"); + }); + + assert_no_alloc("StaticSubscriptionHandle::for_each_subscriber", || { + let count = Cell::new(0usize); + let mut fut = core::pin::pin!( + handle.for_each_subscriber(0x5B, 1, 0x01, |_s| count.set(count.get() + 1)) + ); + let visited = poll_once_to_ready(fut.as_mut()); + assert_eq!(visited, 2); + assert_eq!(count.get(), 2); + }); + + assert_no_alloc("StaticSubscriptionHandle::unsubscribe", || { + let mut fut = core::pin::pin!(handle.unsubscribe(0x5B, 1, 0x01, a1)); + poll_once_to_ready(fut.as_mut()); + }); + + assert_no_alloc( + "StaticSubscriptionHandle::for_each_subscriber (post-unsub)", + || { + let count = Cell::new(0usize); + let mut fut = core::pin::pin!( + handle.for_each_subscriber(0x5B, 1, 0x01, |_s| count.set(count.get() + 1)) + ); + let visited = poll_once_to_ready(fut.as_mut()); + assert_eq!(visited, 1); + assert_eq!(count.get(), 1); + }, + ); +} + +// ── Entry point ─────────────────────────────────────────────────────────── + +fn main() { + // Mirror `no_alloc_witness.rs`'s nextest discovery hook. + let args: Vec = std::env::args().collect(); + if args.iter().any(|a| a == "--list") { + if !args.iter().any(|a| a == "--ignored") { + println!("no_alloc_server_witness: test"); + } + return; + } + + println!("no-alloc server witness:"); + + witness_static_subscription_handle(); + + println!("all witnesses passed"); +} From 64fcc082e29dd26b1e25f40eee7df907e4c534be Mon Sep 17 00:00:00 2001 From: Feliciano Angulo Date: Wed, 3 Jun 2026 00:53:19 -0400 Subject: [PATCH 3/3] feat(server): add announce_only_future method for SD service announcements --- src/server/mod.rs | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/src/server/mod.rs b/src/server/mod.rs index 890b1e6..5264a1d 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -1287,6 +1287,34 @@ where } } + /// Run *only* the SD `OfferService` announcement loop, without + /// driving the receive path. Use this on supplementary Servers + /// that share a `sd_socket` / `unicast_socket` handle (via + /// [`Self::new_with_handles`]) with a primary Server already + /// running [`Self::run_with_buffers`]: the primary owns the + /// inbound recv loops, supplementary Servers add their own + /// `OfferService` to the same SD multicast group without + /// competing for inbound datagrams. + /// + /// The returned future loops forever (1 s tick between + /// announcements); spawn it on your executor. + pub fn announce_only_future<'a>( + &self, + ) -> impl core::future::Future + 'a + use<'a, F, Tm, R, Sub, H, Hsd, Hep> + where + Tm: 'a, + Hsd: 'a, + H: 'a, + { + let config = self.config.clone(); + let sd_socket = self.sd_socket.clone(); + let sd_state = self.sd_state.clone(); + let timer = self.timer.clone(); + async move { + runtime::announce_loop(&config, sd_socket.get(), sd_state.get(), &timer).await; + } + } + /// Run the server event loop with heap-allocated 64 KiB receive /// buffers — the convenience entry point for std and alloc-using /// bare-metal builds. Drives both the receive loop and (unless