From 7c5759812488426f7d841d8e40b14963972b7cd4 Mon Sep 17 00:00:00 2001 From: Justin Kovacich Date: Mon, 27 Apr 2026 21:08:47 -0400 Subject: [PATCH 1/3] phase 14a: server feature-flag detangle (topology only) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Splits the `server` Cargo feature so the strategic-goal feature combo `features = ["bare_metal", "client", "server"]` builds without tokio. Phase 14b will retarget the server engine to the trait surface and expose a working server under the bare `server` feature; this commit is purely the topology change. # Cargo features Before: server = ["std", "dep:tokio", "dep:socket2", "dep:futures"] After: server = ["std"] # topology marker server-tokio = ["server", "dep:tokio", "dep:socket2", "dep:futures"] # Module gates flipped - `pub mod server;` feature = "server" → "server-tokio" - `pub use server::Server;` ditto - `pub use server::SubscriptionHandle;` ditto - `tokio_transport` mod gate `client-tokio or server` → `client-tokio or server-tokio` # Tests / examples - `[[test]] client_server` requires `["client-tokio", "server-tokio"]` - `examples/client_server` uses `["client-tokio", "server-tokio"]` - `examples/bare_metal/main.rs` status note + lib.rs feature-flag table updated # Verification - All 12 feature-matrix combos build clean, including the strategic combo `client,server,bare_metal`. - 457 lib + 11 + 1 + 1 + 9 doc tests pass with --all-features. - clippy clean with --all-features --all-targets. - bare_metal example runs end-to-end; bare_metal_client witness test passes. # What this leaves for 14b The bare `server` feature compiles to nothing useful today — every production code path in src/server/* still uses tokio internals. Phase 14b mirrors phase 13.5 on the server: introduces `ServerDeps`, makes `Server` and `EventPublisher` generic over the transport+timer, replaces the hand-rolled `socket2::Socket` SD bind with `factory.bind()`, and ungates the engine from `server-tokio`. Estimate per the phase 14 scoping report: ~1.5-2 ew. Per phase 13.5 lessons doc finding #5: introduce a `TestServer` type alias before any default-type-param drops in 14b. Co-Authored-By: Claude Opus 4.7 (1M context) --- Cargo.toml | 25 ++++++++++----------- examples/bare_metal/src/main.rs | 36 ++++++++++++++++++++----------- examples/client_server/Cargo.toml | 2 +- src/lib.rs | 32 ++++++++++++++++----------- 4 files changed, 56 insertions(+), 39 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 425d534..a84bff2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -55,25 +55,26 @@ tracing-subscriber = "0.3" [features] default = ["std"] std = ["embedded-io/std", "thiserror/std", "tracing/std"] -# Phase 13 split: `client` exposes the protocol/trait-surface client +# Phase 13a split: `client` exposes the protocol/trait-surface client # (no tokio, no socket2); `client-tokio` layers the tokio + socket2 # convenience defaults on top. Consumers of the bare-metal trait surface # enable `client` only (and supply their own `Spawner` / `Timer` / # `ChannelFactory` / `TransportFactory` impls). Consumers who want the # `Client::new` shortcut (defaulting to `TokioSpawner` / `TokioTimer` / # `TokioChannels` / `TokioTransport`) enable `client-tokio`. -# -# `server` is **not** split in phase 13 — `src/server/sd_state.rs`, -# `src/server/subscription_manager.rs`, and `src/server/mod.rs` still -# reference `tokio::net::UdpSocket`, `tokio::sync::RwLock`, and -# `socket2::Socket` directly in production code. Phase 14 (server -# parallel) is the phase that retargets the server to the trait -# surface; once that lands, `server` will gain the same split into -# `server` + `server-tokio`. Until then, enabling `server` continues -# to pull tokio + socket2. client = ["std", "dep:futures"] client-tokio = ["client", "dep:tokio", "dep:socket2"] -server = ["std", "dep:tokio", "dep:socket2", "dep:futures"] +# Phase 14a split: `server` is currently an empty topology marker — +# the server engine still requires tokio internals (raw `tokio::net::UdpSocket` +# bind in `sd_state`, `tokio::sync::RwLock` in `subscription_manager`, +# direct `socket2::Socket` calls in `mod.rs`). Phase 14b retargets the +# engine to the trait surface (mirroring phases 9–13.5 on the client), +# at which point the bare `server` feature will expose the trait-surface +# `Server` and `server-tokio` will provide tokio convenience defaults. +# Until 14b lands, enabling `server` alone gives only the feature +# topology; consumers wanting a working server enable `server-tokio`. +server = ["std"] +server-tokio = ["server", "dep:tokio", "dep:socket2", "dep:futures"] # Marks a build as intended for bare-metal / no_std consumption. # Currently a pure marker — enables no crate code on its own. Reserved # for future phases to gate no_std-specific helper types. @@ -96,7 +97,7 @@ bare_metal = ["dep:embassy-sync"] [[test]] name = "client_server" -required-features = ["client-tokio", "server"] +required-features = ["client-tokio", "server-tokio"] [[test]] name = "bare_metal_client" diff --git a/examples/bare_metal/src/main.rs b/examples/bare_metal/src/main.rs index 455a7b7..b394c6e 100644 --- a/examples/bare_metal/src/main.rs +++ b/examples/bare_metal/src/main.rs @@ -72,19 +72,27 @@ //! `EmbassySyncChannels` extracted from `tokio_transport` to //! `crate::embassy_channels` so it is reachable from no-tokio builds. //! +//! - Phase 14a (server feature-flag detangle): `server` is now a +//! topology marker; `server-tokio` carries the working tokio-backed +//! server. The strategic-goal feature combo +//! `default-features = false, features = ["bare_metal", "client", "server"]` +//! now compiles, though the `server` half is empty until 14b +//! retargets the engine. +//! //! **Remaining gaps:** -//! 1. **Server-side feature-flag split** (deferred to Phase 14): -//! `feature = "server"` still pulls in tokio + socket2 because -//! `server::sd_state` and `server::subscription_manager` reference -//! `tokio::net::UdpSocket` / `tokio::sync::RwLock` / -//! `socket2::Socket` directly. Phase 14 retargets the server to -//! the trait surface; once that lands, `server` will gain the same -//! `server` + `server-tokio` split. +//! 1. **Server engine retargeting** (Phase 14b): the working server +//! still requires `server-tokio` because its bind path uses +//! `tokio::net::UdpSocket` directly and `subscription_manager` +//! holds `tokio::sync::RwLock`. Phase 14b adds `ServerDeps` and a generic `Server` analogous to +//! `ClientDeps` / `Client`, then drops the gates on the bare +//! `server` feature to expose the trait-surface server. //! 2. **No-alloc Client**: `Client` / `Inner` still depend on //! `alloc` (heapless internals are fine, but `EmbassySyncChannels` -//! uses `Arc`, and `e2e_registry` uses `Arc>`). Phase 16 -//! is the verification phase that lights up an alloc-panicking -//! harness; the no-alloc port itself is its own follow-on phase. +//! uses `Arc`, and `e2e_registry` uses `Arc>`). Phase +//! 13.6 (static-pool ChannelFactory) is the engine fix; phase 16 +//! is the CI verification that lights up an alloc-panicking +//! harness. //! //! # Recommendation for `no_alloc` consumers today //! @@ -432,8 +440,10 @@ fn main() { "note: trait layer (TransportSocket + TransportFactory + Timer + \ Spawner + ChannelFactory) exercised end-to-end. Phases 9-12 \ complete; phases 13a + 13.5 (client + Client engine generic) \ - complete. Remaining: phase 14 server-trait retargeting + \ - server-side `server-tokio` split, then phase 16 no-alloc \ - verification. See top-of-file docblock." + complete; phase 14a (server feature topology) complete. \ + Remaining: phase 14b server-engine retargeting (working \ + `server` without tokio) + phase 13.6 static-pool \ + ChannelFactory + phase 16 no-alloc CI verification. See \ + top-of-file docblock." ); } diff --git a/examples/client_server/Cargo.toml b/examples/client_server/Cargo.toml index b9bf2c2..d4f8aa5 100644 --- a/examples/client_server/Cargo.toml +++ b/examples/client_server/Cargo.toml @@ -5,7 +5,7 @@ edition = "2024" publish = false [dependencies] -simple-someip = { path = "../..", features = ["client-tokio", "server"] } +simple-someip = { path = "../..", features = ["client-tokio", "server-tokio"] } tokio = { version = "1", features = ["macros", "rt-multi-thread", "time"] } tracing = "0.1" tracing-subscriber = "0.3" diff --git a/src/lib.rs b/src/lib.rs index eceec62..62ff937 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -29,7 +29,8 @@ //! | `std` | yes | Enables std-dependent helpers (`RawPayload`, `VecSdHeader`, `OfferedEndpoint`) | //! | `client` | no | Trait-surface client; implies `std` + futures (no tokio) | //! | `client-tokio` | no | Adds the `Client::new` / `TokioSpawner` / `TokioTransport` convenience defaults; implies `client` + tokio + socket2 | -//! | `server` | no | Async tokio server; implies `std` + tokio + socket2 + futures (server-tokio split deferred to phase 14) | +//! | `server` | no | Empty topology marker today — phase 14b retargets the engine and `server` will then expose the trait-surface server. Until then, `server-tokio` is the working flavor. | +//! | `server-tokio` | no | Working tokio-backed server; implies `server` + tokio + socket2 + futures | //! | `bare_metal` | no | Pure marker — does not enable any crate code. See `examples/bare_metal/` (the trait-surface canary) for the full bare-metal-readiness story. | //! //! The default feature set is `["std"]`, which links `std` and enables @@ -151,14 +152,19 @@ pub mod protocol; #[cfg(feature = "std")] mod raw_payload; /// SOME/IP server for offering services and handling incoming requests. -#[cfg(feature = "server")] +/// +/// Phase 14a: gated to `server-tokio` because every method body in +/// `server::*` still uses tokio internals (raw `tokio::net::UdpSocket` +/// bind, `tokio::sync::RwLock`, `socket2::Socket`). Phase 14b +/// retargets the engine to the trait surface, after which the bare +/// `server` feature will expose a generic `Server` and +/// `server-tokio` will provide the tokio convenience defaults. +#[cfg(feature = "server-tokio")] pub mod server; /// Tokio + `socket2` implementation of the [`transport`] traits. Provided /// as the default `std` backend — available whenever `client-tokio` or -/// `server` is enabled. (Phase 13: `client` is now no-tokio; the tokio -/// backend lives behind `client-tokio`. `server` still pulls tokio -/// transitively until phase 14 retargets it to the trait surface.) -#[cfg(any(feature = "client-tokio", feature = "server"))] +/// `server-tokio` is enabled. +#[cfg(any(feature = "client-tokio", feature = "server-tokio"))] pub mod tokio_transport; /// `embassy-sync`-backed implementation of [`transport::ChannelFactory`]. @@ -176,9 +182,9 @@ mod traits; /// Executor-agnostic UDP transport abstraction used by the client and /// server modules. `no_std`-compatible; a default `std + tokio` backend /// ships in `tokio_transport` (available under the `client-tokio` / -/// `server` features) — the link is rendered as a code literal because -/// the target module is feature-gated and would break default-feature -/// rustdoc builds. +/// `server-tokio` features) — the link is rendered as a code literal +/// because the target module is feature-gated and would break +/// default-feature rustdoc builds. pub mod transport; #[cfg(feature = "std")] pub use raw_payload::{RawPayload, VecSdHeader}; @@ -191,14 +197,14 @@ pub use client::{ Client, ClientDeps, ClientUpdate, ClientUpdates, DiscoveryMessage, PendingResponse, }; pub use e2e::{E2ECheckStatus, E2EKey, E2EProfile}; -#[cfg(feature = "server")] +#[cfg(feature = "server-tokio")] pub use server::Server; -#[cfg(feature = "server")] -pub use server::SubscriptionHandle; -#[cfg(any(feature = "client-tokio", feature = "server"))] +#[cfg(any(feature = "client-tokio", feature = "server-tokio"))] pub use tokio_transport::{TokioChannels, TokioSocket, TokioSpawner, TokioTimer, TokioTransport}; pub use transport::{ ChannelFactory, E2ERegistryHandle, InterfaceHandle, IoErrorKind, MpscRecv, MpscSend, OneshotCancelled, OneshotRecv, OneshotSend, ReceivedDatagram, SocketOptions, Spawner, Timer, TransportError, TransportFactory, TransportSocket, UnboundedRecv, UnboundedSend, }; +#[cfg(feature = "server-tokio")] +pub use server::SubscriptionHandle; From e2e9a73ecfa861da70af46f75e064d7f63e056cb Mon Sep 17 00:00:00 2001 From: Justin Kovacich Date: Tue, 28 Apr 2026 06:09:48 -0400 Subject: [PATCH 2/3] phase 14a fixup: refresh stale doc text after the server split MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three doc-text references still named the pre-split feature gates or the wrong phase number: - src/tokio_transport.rs:9 / :16 — "feature = client" / "server" → "client-tokio" / "server-tokio". The actual cfg attribute on the module declaration was already correct; this is the surrounding prose + an inline doctest cfg gate that mismatched the gate it was describing. - src/client/socket_manager.rs:43 — "deferred to Phase 14" → updated to reflect the 14a/14b split (14a topology landed in b7fc30f; the substantive engine-retargeting work is 14b). No code or feature behavior changes. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/client/socket_manager.rs | 9 +++++++-- src/tokio_transport.rs | 9 +++++---- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/src/client/socket_manager.rs b/src/client/socket_manager.rs index ed92268..287a2d1 100644 --- a/src/client/socket_manager.rs +++ b/src/client/socket_manager.rs @@ -40,9 +40,14 @@ //! socket2 on top of `client`. //! //! **Remaining gaps:** -//! - **Server-side split** (deferred to Phase 14): `feature = "server"` -//! still pulls tokio + socket2 because `server::sd_state` / +//! - **Working server without tokio** (Phase 14b): the bare `server` +//! feature is currently a topology marker only (Phase 14a, commit +//! `b7fc30f`). The actual server engine still requires +//! `server-tokio` because `server::sd_state` / //! `server::subscription_manager` reference tokio types directly. +//! Phase 14b retargets the engine to the trait surface (mirroring +//! phase 13.5 on the client) so a working server lives under just +//! `server`. //! //! For `no_alloc` SOME/IP usage today, consume `protocol`, `e2e`, and //! the `transport` trait layer directly — the `bare_metal` example diff --git a/src/tokio_transport.rs b/src/tokio_transport.rs index e170598..a1402b7 100644 --- a/src/tokio_transport.rs +++ b/src/tokio_transport.rs @@ -6,14 +6,15 @@ //! [`tokio::net::UdpSocket`] for the async I/O loop. [`TokioTimer`] is a //! thin wrapper over `tokio::time::sleep`. //! -//! Gated behind `#[cfg(any(feature = "client", feature = "server"))]` — -//! the `client` and `server` features are exactly the ones that already -//! pull in `tokio` and `socket2`, so no new dependency edge is introduced. +//! Gated behind `#[cfg(any(feature = "client-tokio", feature = "server-tokio"))]` — +//! the `client-tokio` and `server-tokio` features are exactly the ones +//! that pull in `tokio` and `socket2`, so no new dependency edge is +//! introduced. //! //! # Example //! //! ```no_run -//! # #[cfg(any(feature = "client", feature = "server"))] +//! # #[cfg(any(feature = "client-tokio", feature = "server-tokio"))] //! # async fn demo() -> Result<(), simple_someip::TransportError> { //! use core::net::{Ipv4Addr, SocketAddrV4}; //! use simple_someip::{SocketOptions, TransportFactory, TransportSocket}; From 22a173768c140a29f6e4fa82069da890ed388dd5 Mon Sep 17 00:00:00 2001 From: Justin Kovacich Date: Tue, 28 Apr 2026 07:50:29 -0400 Subject: [PATCH 3/3] phase 14b: server engine retargeting (Server reachable without tokio) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Mirrors phase 13.5 on the server side. `Server` is now generic over ``; the bare `server` feature exposes a working trait-surface server reachable via `Server::new_with_deps`, and `server-tokio` provides the `TokioTransport` / `TokioTimer` / `Arc>` / `Arc>` convenience defaults. # Public API New `pub struct ServerDeps` bundle (4 fields: factory, timer, e2e_registry, subscriptions). Mirrors `ClientDeps`. No `Spawner` (server has no internal task spawning), no `InterfaceHandle` (interface lives in `ServerConfig`). New constructors under just `feature = "server"`: - `Server::new_with_deps(deps, config, multicast_loopback)` — binds unicast + SD multicast via `factory.bind(...)`. - `Server::new_passive_with_deps(deps, config)` — binds unicast + ephemeral SD placeholder for external-SD-dispatcher integration. Tokio convenience constructors (`Server::new`, `new_with_loopback`, `new_passive`) are gated `server-tokio` and now delegate to `new_with_deps` / `new_passive_with_deps` after constructing a `ServerDeps` with tokio defaults. `ServerDeps` re-exported from the crate root as `simple_someip::ServerDeps`. `Subscriber` newly re-exported from `simple_someip::server` (it's the return type of `SubscriptionHandle::get_subscribers`; was implicitly part of the public trait surface but not nameable). # Engine refactor `Server` stores: - `unicast_socket: Arc`, `sd_socket: Arc` — was `Arc`. - `publisher: Arc>` — `EventPublisher` is now `` generic over its socket type. - `factory: F`, `timer: Tm` — both stored to support bare-metal factories carrying state and the announcement-loop's 1-second tick. `announcement_loop` replaced `TokioTimer.sleep(...)` with `self.timer.sleep(...)`. `sd_state::send_offer_service` now generic over `T: TransportSocket`. `subscription_manager`: `impl SubscriptionHandle for Arc>` and the `tokio::sync::RwLock` import gated to `server-tokio`. # Cargo features Before: server = ["std"] # topology marker server-tokio = ["server", "dep:tokio", "dep:socket2", "dep:futures"] After: server = ["std", "dep:futures"] # working trait-surface server server-tokio = ["server", "dep:tokio", "dep:socket2"] # tokio convenience defaults `futures` moves to `server` because the engine uses `futures::select!`. `tokio` and `socket2` stay only on the `server-tokio` flavor. # Bind path consolidation The hand-rolled `socket2::Socket::new(...)` SD-multicast bind in `Server::new_with_loopback` is gone. `new_with_deps` calls `factory.bind(sd_addr, &SocketOptions { reuse_address, reuse_port, multicast_if_v4: Some(interface), multicast_loop_v4 })` which routes through `TokioTransport::bind`'s already-existing socket2 path. No behavior change on the tokio side; bare-metal callers control the bind path entirely. # Tests - `tests/bare_metal_server.rs` (new): witness test gated on `["server", "bare_metal"]`. Builds `MockFactory` + `MockSocket` + `MockTimer` + `MockSubscriptions` (a hand-rolled `SubscriptionHandle` impl backed by `std::sync::Mutex>`) and proves `Server::new_with_deps` + `new_passive_with_deps` succeed and return a `Server` whose announcement-loop future is `Send + 'static`. Compile witness is the load-bearing assertion. - `tests/client_server.rs`: `TestServer` / `TestEventPublisher` type aliases introduced (per phase 13.5 lessons #5) so existing callers don't churn over the new generic params. - Server's internal `#[cfg(test)] mod tests` blocks tightened to `#[cfg(all(test, feature = "server-tokio"))]` since they use `tokio::test` / `tokio::net::UdpSocket` (per lesson #7). # `tokio_transport::bind_with_options` bug fix folded in `set_multicast_loop_v4` was called unconditionally regardless of whether the caller configured a multicast interface — this can fail on backends that error on the call for plain-unicast sockets. Now only called when `multicast_if_v4` is `Some`. Surfaced by the new SD-bind path; mirrors the same conditional in the client's discovery-bind path. # Verification - `cargo test --all-features -- --test-threads=1`: 457 lib + 1 + 1 + 2 (new bare_metal_server witness) + 11 + 9 doc. 0 failures. - `cargo clippy --all-features --all-targets`: clean. - Feature matrix `''`, `client,server`, `client,server,bare_metal`, `server`, `client-tokio,server-tokio` all build clean. - `bare_metal_client` witness still passes. # What this leaves for follow-on phases - Phase 13.6 (static-pool ChannelFactory): unaffected by 14b but still pending. The const-N quirk fix landed separately on the 13.6 branch. - Phase 16 (no-alloc CI): `Server::new_with_deps` still uses `Arc` and `Arc` internally, so a strict no_alloc build does not yet pass. Phase 13.6 (static channels) + follow-on `Arc` elimination will close this. Co-Authored-By: Claude Opus 4.7 (1M context) --- .gitignore | 1 + Cargo.toml | 24 +- examples/bare_metal/src/main.rs | 41 +-- src/lib.rs | 27 +- src/server/error.rs | 4 + src/server/event_publisher.rs | 90 ++++-- src/server/mod.rs | 482 ++++++++++++++++++++--------- src/server/sd_state.rs | 57 ++-- src/server/service_info.rs | 1 + src/server/subscription_manager.rs | 6 +- src/tokio_transport.rs | 22 +- tests/bare_metal_server.rs | 328 ++++++++++++++++++++ tests/client_server.rs | 26 +- 13 files changed, 855 insertions(+), 254 deletions(-) create mode 100644 tests/bare_metal_server.rs diff --git a/.gitignore b/.gitignore index 86362e6..1daa9fa 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ + .claude/ CLAUDE.md .DS_Store diff --git a/Cargo.toml b/Cargo.toml index a84bff2..fd8e1de 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -64,17 +64,15 @@ std = ["embedded-io/std", "thiserror/std", "tracing/std"] # `TokioChannels` / `TokioTransport`) enable `client-tokio`. client = ["std", "dep:futures"] client-tokio = ["client", "dep:tokio", "dep:socket2"] -# Phase 14a split: `server` is currently an empty topology marker — -# the server engine still requires tokio internals (raw `tokio::net::UdpSocket` -# bind in `sd_state`, `tokio::sync::RwLock` in `subscription_manager`, -# direct `socket2::Socket` calls in `mod.rs`). Phase 14b retargets the -# engine to the trait surface (mirroring phases 9–13.5 on the client), -# at which point the bare `server` feature will expose the trait-surface -# `Server` and `server-tokio` will provide tokio convenience defaults. -# Until 14b lands, enabling `server` alone gives only the feature -# topology; consumers wanting a working server enable `server-tokio`. -server = ["std"] -server-tokio = ["server", "dep:tokio", "dep:socket2", "dep:futures"] +# Phase 14b split (matches phase 13a on the client side): `server` +# exposes the trait-surface server (no tokio, no socket2). The engine +# itself uses `futures::select!` so `dep:futures` lives here. +# `server-tokio` adds the tokio + socket2 convenience defaults +# (`Server::new`, `Server::new_with_loopback`, `Server::new_passive`), +# bringing `Arc>` / `Arc>` +# / `TokioTransport` / `TokioTimer` defaults into scope. +server = ["std", "dep:futures"] +server-tokio = ["server", "dep:tokio", "dep:socket2"] # Marks a build as intended for bare-metal / no_std consumption. # Currently a pure marker — enables no crate code on its own. Reserved # for future phases to gate no_std-specific helper types. @@ -106,3 +104,7 @@ required-features = ["client", "bare_metal"] [[test]] name = "static_channels_alloc_witness" required-features = ["client", "bare_metal"] + +[[test]] +name = "bare_metal_server" +required-features = ["server", "bare_metal"] diff --git a/examples/bare_metal/src/main.rs b/examples/bare_metal/src/main.rs index b394c6e..56cb542 100644 --- a/examples/bare_metal/src/main.rs +++ b/examples/bare_metal/src/main.rs @@ -78,21 +78,24 @@ //! `default-features = false, features = ["bare_metal", "client", "server"]` //! now compiles, though the `server` half is empty until 14b //! retargets the engine. +//! - Phase 14b: `Server` is now constructible without +//! `server-tokio`. The engine carries `F: TransportFactory`, +//! `Tm: Timer`, `R: E2ERegistryHandle`, and `S: SubscriptionHandle` +//! generics, and the new `Server::new_with_deps` / +//! `Server::new_passive_with_deps` constructors take everything +//! explicitly via a `ServerDeps` bundle. The tokio convenience +//! constructors (`Server::new`, `Server::new_with_loopback`, +//! `Server::new_passive`) live behind the `server-tokio` feature +//! and delegate to `new_with_deps`. Witness: +//! `tests/bare_metal_server.rs` (gated on `server + bare_metal`). //! //! **Remaining gaps:** -//! 1. **Server engine retargeting** (Phase 14b): the working server -//! still requires `server-tokio` because its bind path uses -//! `tokio::net::UdpSocket` directly and `subscription_manager` -//! holds `tokio::sync::RwLock`. Phase 14b adds `ServerDeps` and a generic `Server` analogous to -//! `ClientDeps` / `Client`, then drops the gates on the bare -//! `server` feature to expose the trait-surface server. -//! 2. **No-alloc Client**: `Client` / `Inner` still depend on -//! `alloc` (heapless internals are fine, but `EmbassySyncChannels` -//! uses `Arc`, and `e2e_registry` uses `Arc>`). Phase -//! 13.6 (static-pool ChannelFactory) is the engine fix; phase 16 -//! is the CI verification that lights up an alloc-panicking -//! harness. +//! 1. **No-alloc Client/Server**: `Client` / `Server` engines still +//! depend on `alloc` (heapless internals are fine, but +//! `EmbassySyncChannels` uses `Arc`, and `e2e_registry` uses +//! `Arc>`). Phase 13.6 (static-pool ChannelFactory) is +//! the engine fix; phase 16 is the CI verification that lights up +//! an alloc-panicking harness. //! //! # Recommendation for `no_alloc` consumers today //! @@ -440,10 +443,12 @@ fn main() { "note: trait layer (TransportSocket + TransportFactory + Timer + \ Spawner + ChannelFactory) exercised end-to-end. Phases 9-12 \ complete; phases 13a + 13.5 (client + Client engine generic) \ - complete; phase 14a (server feature topology) complete. \ - Remaining: phase 14b server-engine retargeting (working \ - `server` without tokio) + phase 13.6 static-pool \ - ChannelFactory + phase 16 no-alloc CI verification. See \ - top-of-file docblock." + complete; phase 14a (server feature topology) complete; \ + phase 14b (Server engine generic over TransportFactory + \ + Timer + E2ERegistryHandle + SubscriptionHandle, reachable \ + via Server::new_with_deps under just `server`) complete — see \ + tests/bare_metal_server.rs for the witness. Remaining: \ + phase 13.6 static-pool ChannelFactory + phase 16 no-alloc \ + CI verification. See top-of-file docblock." ); } diff --git a/src/lib.rs b/src/lib.rs index 62ff937..4842e2e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -29,8 +29,8 @@ //! | `std` | yes | Enables std-dependent helpers (`RawPayload`, `VecSdHeader`, `OfferedEndpoint`) | //! | `client` | no | Trait-surface client; implies `std` + futures (no tokio) | //! | `client-tokio` | no | Adds the `Client::new` / `TokioSpawner` / `TokioTransport` convenience defaults; implies `client` + tokio + socket2 | -//! | `server` | no | Empty topology marker today — phase 14b retargets the engine and `server` will then expose the trait-surface server. Until then, `server-tokio` is the working flavor. | -//! | `server-tokio` | no | Working tokio-backed server; implies `server` + tokio + socket2 + futures | +//! | `server` | no | Trait-surface server; implies `std` + futures (no tokio) | +//! | `server-tokio` | no | Adds the `Server::new` / `TokioTransport` / `TokioTimer` convenience defaults; implies `server` + tokio + socket2 | //! | `bare_metal` | no | Pure marker — does not enable any crate code. See `examples/bare_metal/` (the trait-surface canary) for the full bare-metal-readiness story. | //! //! The default feature set is `["std"]`, which links `std` and enables @@ -153,13 +153,16 @@ pub mod protocol; mod raw_payload; /// SOME/IP server for offering services and handling incoming requests. /// -/// Phase 14a: gated to `server-tokio` because every method body in -/// `server::*` still uses tokio internals (raw `tokio::net::UdpSocket` -/// bind, `tokio::sync::RwLock`, `socket2::Socket`). Phase 14b -/// retargets the engine to the trait surface, after which the bare -/// `server` feature will expose a generic `Server` and -/// `server-tokio` will provide the tokio convenience defaults. -#[cfg(feature = "server-tokio")] +/// Phase 14b: the engine is generic over [`transport::TransportFactory`] + +/// [`transport::Timer`] + [`transport::E2ERegistryHandle`] + +/// [`server::SubscriptionHandle`], so the bare `server` feature exposes the +/// trait-surface server. The `server-tokio` feature additionally provides +/// the tokio convenience constructors ([`server::Server::new`], +/// [`server::Server::new_with_loopback`], [`server::Server::new_passive`]) +/// that default the type parameters to +/// `Arc>` / `Arc>` / +/// `TokioTransport` / `TokioTimer`. +#[cfg(feature = "server")] pub mod server; /// Tokio + `socket2` implementation of the [`transport`] traits. Provided /// as the default `std` backend — available whenever `client-tokio` or @@ -197,8 +200,8 @@ pub use client::{ Client, ClientDeps, ClientUpdate, ClientUpdates, DiscoveryMessage, PendingResponse, }; pub use e2e::{E2ECheckStatus, E2EKey, E2EProfile}; -#[cfg(feature = "server-tokio")] -pub use server::Server; +#[cfg(feature = "server")] +pub use server::{Server, ServerDeps, SubscriptionHandle}; #[cfg(any(feature = "client-tokio", feature = "server-tokio"))] pub use tokio_transport::{TokioChannels, TokioSocket, TokioSpawner, TokioTimer, TokioTransport}; pub use transport::{ @@ -206,5 +209,3 @@ pub use transport::{ OneshotCancelled, OneshotRecv, OneshotSend, ReceivedDatagram, SocketOptions, Spawner, Timer, TransportError, TransportFactory, TransportSocket, UnboundedRecv, UnboundedSend, }; -#[cfg(feature = "server-tokio")] -pub use server::SubscriptionHandle; diff --git a/src/server/error.rs b/src/server/error.rs index be86edb..fb8f04a 100644 --- a/src/server/error.rs +++ b/src/server/error.rs @@ -14,6 +14,10 @@ pub enum Error { /// An I/O error from the underlying network transport. #[error(transparent)] Io(#[from] std::io::Error), + /// A transport-layer error from a [`crate::transport::TransportFactory`] + /// or [`crate::transport::TransportSocket`] operation. + #[error("transport error: {0}")] + Transport(#[from] crate::transport::TransportError), /// An E2E protection or checking error occurred. #[error(transparent)] E2e(#[from] crate::e2e::Error), diff --git a/src/server/event_publisher.rs b/src/server/event_publisher.rs index 2181f7d..fdb06de 100644 --- a/src/server/event_publisher.rs +++ b/src/server/event_publisher.rs @@ -1,29 +1,38 @@ //! Event publishing functionality use super::Error; -use super::subscription_manager::{SubscriptionHandle, SubscriptionManager}; +use super::subscription_manager::SubscriptionHandle; use crate::UDP_BUFFER_SIZE; -use crate::e2e::{E2EKey, E2ERegistry}; +use crate::e2e::E2EKey; use crate::protocol::{Header, Message}; use crate::traits::{PayloadWireFormat, WireFormat}; -use crate::transport::E2ERegistryHandle; -use std::sync::{Arc, Mutex}; -use tokio::net::UdpSocket; -use tokio::sync::RwLock; - -/// Publishes events to subscribers -pub struct EventPublisher< - R: E2ERegistryHandle = Arc>, - S: SubscriptionHandle = Arc>, -> { +use crate::transport::{E2ERegistryHandle, TransportSocket}; +use std::sync::Arc; + +/// Publishes events to subscribers. +/// +/// Generic over `T: TransportSocket` (the socket primitive — `TokioSocket` +/// in the std/tokio path, a bare-metal embassy / smoltcp wrapper on +/// firmware), `R: E2ERegistryHandle`, and `S: SubscriptionHandle`. +pub struct EventPublisher +where + R: E2ERegistryHandle, + S: SubscriptionHandle, + T: TransportSocket + Send + Sync + 'static, +{ subscriptions: S, - socket: Arc, + socket: Arc, e2e_registry: R, } -impl EventPublisher { +impl EventPublisher +where + R: E2ERegistryHandle, + S: SubscriptionHandle, + T: TransportSocket + Send + Sync + 'static, +{ /// Create a new event publisher - pub fn new(subscriptions: S, socket: Arc, e2e_registry: R) -> Self { + pub fn new(subscriptions: S, socket: Arc, e2e_registry: R) -> Self { Self { subscriptions, socket, @@ -144,7 +153,7 @@ impl EventPublisher { let mut sent_count = 0; for subscriber in &subscribers { match self.socket.send_to(datagram, subscriber.address).await { - Ok(_) => { + Ok(()) => { sent_count += 1; tracing::trace!( "Sent event to subscriber {} ({} bytes)", @@ -258,7 +267,7 @@ impl EventPublisher { let mut sent_count = 0; for subscriber in &subscribers { match self.socket.send_to(datagram, subscriber.address).await { - Ok(_) => { + Ok(()) => { sent_count += 1; } Err(e) => { @@ -385,22 +394,55 @@ impl EventPublisher { } } -#[cfg(test)] +#[cfg(all(test, feature = "server-tokio"))] mod tests { use super::*; + use crate::e2e::E2ERegistry; use crate::protocol::sd::test_support::{TestPayload, empty_sd_header}; + use crate::server::SubscriptionManager; + use crate::tokio_transport::TokioSocket; use std::net::{Ipv4Addr, SocketAddrV4}; + use std::sync::Mutex; use std::vec; use std::vec::Vec; + use tokio::net::UdpSocket; + use tokio::sync::RwLock; + + /// Type alias bringing the tokio-flavor concrete type parameters back + /// into scope so tests can spell `TestEventPublisher` without + /// chasing the three-type-parameter signature on every call site. + type TestEventPublisher = EventPublisher< + Arc>, + Arc>, + TokioSocket, + >; fn test_registry() -> Arc> { Arc::new(Mutex::new(E2ERegistry::new())) } + /// Bind a `TokioSocket` for tests. The publisher path under + /// `server-tokio` already depends on `tokio_transport`, so we use it + /// directly rather than constructing a `tokio::net::UdpSocket` and + /// adapting it. + async fn bind_tokio_socket() -> Arc { + use crate::transport::{SocketOptions, TransportFactory}; + let factory = crate::tokio_transport::TokioTransport; + Arc::new( + factory + .bind( + SocketAddrV4::new(Ipv4Addr::LOCALHOST, 0), + &SocketOptions::new(), + ) + .await + .expect("bind tokio socket for test"), + ) + } + async fn make_publisher( subscriptions: Arc>, - ) -> (EventPublisher, Arc) { - let socket = Arc::new(UdpSocket::bind("127.0.0.1:0").await.unwrap()); + ) -> (TestEventPublisher, Arc) { + let socket = bind_tokio_socket().await; let publisher = EventPublisher::new(subscriptions, Arc::clone(&socket), test_registry()); (publisher, socket) } @@ -412,11 +454,7 @@ mod tests { #[tokio::test] async fn test_event_publisher_creation() { let subscriptions = Arc::new(RwLock::new(SubscriptionManager::new())); - let socket = Arc::new( - UdpSocket::bind("127.0.0.1:0") - .await - .expect("Failed to bind socket"), - ); + let socket = bind_tokio_socket().await; let publisher = EventPublisher::new(subscriptions, socket, test_registry()); assert!(std::mem::size_of_val(&publisher) > 0); @@ -579,7 +617,7 @@ mod tests { .unwrap(); } - let socket = Arc::new(UdpSocket::bind("127.0.0.1:0").await.unwrap()); + let socket = bind_tokio_socket().await; let publisher = EventPublisher::new(subscriptions, socket, e2e_registry); // Size the payload from `UDP_BUFFER_SIZE` and `PROFILE4_HEADER_SIZE` diff --git a/src/server/mod.rs b/src/server/mod.rs index 5a880cb..f7101bd 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -14,25 +14,30 @@ mod subscription_manager; pub use error::Error; pub use event_publisher::EventPublisher; -pub use service_info::{EventGroupInfo, ServiceInfo}; +pub use service_info::{EventGroupInfo, ServiceInfo, Subscriber}; pub use subscription_manager::{SubscribeError, SubscriptionHandle, SubscriptionManager}; use sd_state::SdStateManager; use crate::Timer; -use crate::e2e::{E2EKey, E2EProfile, E2ERegistry}; +use crate::e2e::{E2EKey, E2EProfile}; use crate::protocol::sd::{self, Entry, Flags, OptionsCount, ServiceEntry, TransportProtocol}; -use crate::tokio_transport::TokioTimer; -use crate::transport::E2ERegistryHandle; +use crate::transport::{E2ERegistryHandle, SocketOptions, TransportFactory, TransportSocket}; use futures::{FutureExt, pin_mut, select}; use std::{ format, - net::{IpAddr, Ipv4Addr, SocketAddrV4}, - sync::{Arc, Mutex}, + net::{Ipv4Addr, SocketAddrV4}, + sync::Arc, vec, vec::Vec, }; -use tokio::{net::UdpSocket, sync::RwLock}; + +#[cfg(feature = "server-tokio")] +use crate::e2e::E2ERegistry; +#[cfg(feature = "server-tokio")] +use std::sync::Mutex; +#[cfg(feature = "server-tokio")] +use tokio::sync::RwLock; /// Configuration for a SOME/IP service provider #[derive(Debug, Clone)] @@ -69,24 +74,79 @@ impl ServerConfig { } } -/// SOME/IP Server that can offer services and publish events -pub struct Server< - R: E2ERegistryHandle = Arc>, - S: SubscriptionHandle = Arc>, -> { +/// Bundle of pluggable infrastructure passed to [`Server::new_with_deps`]. +/// Mirrors [`crate::ClientDeps`] but with the server's smaller surface +/// — no `Spawner` (server has no internal task spawning), no +/// `InterfaceHandle` (interface lives in [`ServerConfig`]). +/// +/// All four fields are public so callers can construct the struct +/// inline. +pub struct ServerDeps +where + F: TransportFactory, + Tm: Timer, + R: E2ERegistryHandle, + S: SubscriptionHandle, +{ + /// Transport factory used to bind the unicast and SD sockets. + pub factory: F, + /// Async sleep primitive used by the announcement loop's 1-second tick. + pub timer: Tm, + /// Shared E2E registry handle for runtime E2E configuration. + pub e2e_registry: R, + /// Shared subscription manager handle. The convenience constructor + /// [`Server::new`] (under `server-tokio`) builds an + /// `Arc>` for this; bare-metal callers + /// supply their own [`SubscriptionHandle`] impl. + pub subscriptions: S, +} + +/// SOME/IP Server that can offer services and publish events. +/// +/// Generic over the four pluggable infrastructure types bundled in +/// [`ServerDeps`]: +/// - `R: E2ERegistryHandle` — runtime E2E configuration registry +/// - `S: SubscriptionHandle` — event-group subscription state +/// - `F: TransportFactory` — socket primitive (carried as a stored +/// unit-struct in the tokio path; bare-metal impls may carry state) +/// - `Tm: Timer` — async sleep used by the announcement loop +/// +/// The convenience constructors [`Self::new`] / [`Self::new_with_loopback`] +/// / [`Self::new_passive`] (under the `server-tokio` feature) instantiate +/// these as `Arc>` / `Arc>` +/// / `TokioTransport` / `TokioTimer`. Bare-metal callers use +/// [`Self::new_with_deps`] (under `server`) and supply their own. +pub struct Server +where + R: E2ERegistryHandle, + S: SubscriptionHandle, + F: TransportFactory + Send + Sync + 'static, + F::Socket: Send + Sync + 'static, + Tm: Timer + Clone + Send + Sync + 'static, +{ config: ServerConfig, /// Socket for receiving subscription requests - unicast_socket: Arc, + unicast_socket: Arc, /// Socket for sending SD announcements - sd_socket: Arc, + sd_socket: Arc, /// Subscription manager subscriptions: S, /// Event publisher - publisher: Arc>, + publisher: Arc>, /// SD session-ID counter and announcement emitter sd_state: Arc, /// Shared E2E registry for runtime E2E configuration e2e_registry: R, + /// Transport factory. Used at construction time to bind sockets; + /// retained on the struct so bare-metal factories that carry state + /// (e.g. an embassy-net `Stack` handle) survive the constructor. + /// On `server-tokio` builds this is a zero-sized `TokioTransport`. + #[allow(dead_code)] + factory: F, + /// Async sleep primitive used by [`Self::announcement_loop`]'s + /// 1-second tick. On `server-tokio` builds this is `TokioTimer` + /// (wrapping `tokio::time::sleep`). + timer: Tm, /// `true` if this server was constructed via [`Server::new_passive`]. /// Passive servers have no real SD socket bound to port 30490; their /// SD handling is managed externally. Calling [`Self::announcement_loop`] @@ -95,7 +155,15 @@ pub struct Server< is_passive: bool, } -impl Server { +#[cfg(feature = "server-tokio")] +impl + Server< + Arc>, + Arc>, + crate::tokio_transport::TokioTransport, + crate::tokio_transport::TokioTimer, + > +{ /// Create a new SOME/IP server /// /// # Errors @@ -134,56 +202,105 @@ impl Server { config: ServerConfig, multicast_loopback: bool, ) -> Result { - // Bind unicast socket for receiving subscriptions + let deps = ServerDeps { + factory: crate::tokio_transport::TokioTransport, + timer: crate::tokio_transport::TokioTimer, + e2e_registry: Arc::new(Mutex::new(E2ERegistry::new())), + subscriptions: Arc::new(RwLock::new(SubscriptionManager::new())), + }; + Self::new_with_deps(deps, config, multicast_loopback).await + } + + /// Create a passive SOME/IP server. + /// + /// A passive server binds its unicast socket at `config.local_port` as + /// usual (so `publish_raw_event` has a real source port matching the + /// endpoint advertised in external `OfferService` messages), but binds + /// its SD socket to an ephemeral port instead of the SOME/IP SD port + /// (30490). The passive server is therefore **not** part of the + /// `SO_REUSEPORT` group at 30490, and the kernel will never deliver SD + /// traffic destined for 30490 to it. + /// + /// Passive servers are intended for use with an external SD dispatcher + /// (for example, a `Client` whose discovery socket receives all + /// incoming `SubscribeEventGroup` / `FindService` messages and routes + /// them to the right `EventPublisher` via + /// [`EventPublisher::register_subscriber`]). Do **not** call + /// [`Server::announcement_loop`] or spawn [`Server::run`] on a passive + /// server — the external dispatcher owns those responsibilities. + /// + /// # Errors + /// + /// Returns an error if binding either socket fails. + pub async fn new_passive(config: ServerConfig) -> Result { + let deps = ServerDeps { + factory: crate::tokio_transport::TokioTransport, + timer: crate::tokio_transport::TokioTimer, + e2e_registry: Arc::new(Mutex::new(E2ERegistry::new())), + subscriptions: Arc::new(RwLock::new(SubscriptionManager::new())), + }; + Self::new_passive_with_deps(deps, config).await + } +} + +impl Server +where + R: E2ERegistryHandle, + S: SubscriptionHandle, + F: TransportFactory + Send + Sync + 'static, + F::Socket: Send + Sync + 'static, + for<'a> ::SendFuture<'a>: Send, + for<'a> ::RecvFuture<'a>: Send, + Tm: Timer + Clone + Send + Sync + 'static, +{ + /// Bare-metal-friendly constructor that takes every dependency + /// explicitly via a [`ServerDeps`] bundle. The `server-tokio` + /// convenience constructors ([`Self::new`], [`Self::new_with_loopback`], + /// [`Self::new_passive`]) ultimately delegate here. + /// + /// # Errors + /// + /// Returns an error if binding the unicast or SD socket via + /// [`TransportFactory::bind`] fails, or if joining the SD multicast + /// group fails. + pub async fn new_with_deps( + deps: ServerDeps, + config: ServerConfig, + multicast_loopback: bool, + ) -> Result { + let ServerDeps { + factory, + timer, + e2e_registry, + subscriptions, + } = deps; + + // Bind unicast socket for receiving subscriptions. let unicast_addr = SocketAddrV4::new(config.interface, config.local_port); - let unicast_socket = Arc::new(UdpSocket::bind(unicast_addr).await?); + let unicast_socket = Arc::new(factory.bind(unicast_addr, &SocketOptions::new()).await?); tracing::info!( "Server bound to {} for service 0x{:04X}", unicast_addr, config.service_id ); - // Bind SD socket for sending/receiving SD messages (must use SD port 30490) - let expected_sd_port = sd::MULTICAST_PORT; - let sd_bind_addr = - std::net::SocketAddr::new(IpAddr::V4(config.interface), expected_sd_port); - let sd_raw_socket = socket2::Socket::new( - socket2::Domain::IPV4, - socket2::Type::DGRAM, - Some(socket2::Protocol::UDP), - )?; - sd_raw_socket.set_reuse_address(true)?; - #[cfg(unix)] - sd_raw_socket.set_reuse_port(true)?; - sd_raw_socket.set_multicast_if_v4(&config.interface)?; - sd_raw_socket.set_multicast_loop_v4(multicast_loopback)?; - sd_raw_socket.bind(&sd_bind_addr.into())?; - sd_raw_socket.set_nonblocking(true)?; - let sd_std_socket: std::net::UdpSocket = sd_raw_socket.into(); - let sd_socket = UdpSocket::from_std(sd_std_socket)?; - - // Join SD multicast group to receive FindService and SubscribeEventGroup + // Bind SD socket for sending/receiving SD messages (must use SD port 30490). + let mut sd_opts = SocketOptions::new(); + sd_opts.reuse_address = true; + sd_opts.reuse_port = true; + sd_opts.multicast_if_v4 = Some(config.interface); + sd_opts.multicast_loop_v4 = multicast_loopback; + let sd_addr = SocketAddrV4::new(config.interface, sd::MULTICAST_PORT); + let sd_socket = factory.bind(sd_addr, &sd_opts).await?; sd_socket.join_multicast_v4(sd::MULTICAST_IP, config.interface)?; - let actual_sd_addr = sd_socket.local_addr()?; + let sd_socket = Arc::new(sd_socket); tracing::info!( "Server SD socket bound to {} (expected port {}), joined multicast {}", - actual_sd_addr, - expected_sd_port, + sd_addr, + sd::MULTICAST_PORT, sd::MULTICAST_IP ); - if let std::net::SocketAddr::V4(v4) = actual_sd_addr - && v4.port() != expected_sd_port - { - tracing::error!( - "SD socket port mismatch! Expected {}, got {}. Offers will use wrong source port.", - expected_sd_port, - v4.port() - ); - } - let subscriptions: Arc> = - Arc::new(RwLock::new(SubscriptionManager::new())); - let e2e_registry: Arc> = Arc::new(Mutex::new(E2ERegistry::new())); let publisher = Arc::new(EventPublisher::new( subscriptions.clone(), Arc::clone(&unicast_socket), @@ -193,67 +310,61 @@ impl Server { Ok(Self { config, unicast_socket, - sd_socket: Arc::new(sd_socket), + sd_socket, subscriptions, publisher, sd_state: Arc::new(SdStateManager::new()), e2e_registry, + factory, + timer, is_passive: false, }) } - /// Create a passive SOME/IP server. - /// - /// A passive server binds its unicast socket at `config.local_port` as - /// usual (so `publish_raw_event` has a real source port matching the - /// endpoint advertised in external `OfferService` messages), but binds - /// its SD socket to an ephemeral port instead of the SOME/IP SD port - /// (30490). The passive server is therefore **not** part of the - /// `SO_REUSEPORT` group at 30490, and the kernel will never deliver SD - /// traffic destined for 30490 to it. + /// Bare-metal-friendly passive-server constructor. /// - /// Passive servers are intended for use with an external SD dispatcher - /// (for example, a `Client` whose discovery socket receives all - /// incoming `SubscribeEventGroup` / `FindService` messages and routes - /// them to the right `EventPublisher` via - /// [`EventPublisher::register_subscriber`]). Do **not** call - /// [`Server::announcement_loop`] or spawn [`Server::run`] on a passive - /// server — the external dispatcher owns those responsibilities. + /// Passive servers bind a unicast socket as usual but bind their SD + /// socket to an ephemeral port (port 0) instead of the SOME/IP SD + /// port — see [`Server::new_passive`] under `server-tokio` for the + /// full explanation. Calling [`Self::announcement_loop`] or + /// [`Self::run`] on the result is a programming error. /// /// # Errors /// /// Returns an error if binding either socket fails. - pub async fn new_passive(config: ServerConfig) -> Result { - // Bind unicast socket at the configured local_port — the passive - // server still needs a real source port so published events appear - // to come from the endpoint advertised in the external OfferService. + pub async fn new_passive_with_deps( + deps: ServerDeps, + config: ServerConfig, + ) -> Result { + let ServerDeps { + factory, + timer, + e2e_registry, + subscriptions, + } = deps; + + // Bind unicast socket at the configured local_port. let unicast_addr = SocketAddrV4::new(config.interface, config.local_port); - let unicast_socket = Arc::new(UdpSocket::bind(unicast_addr).await?); + let unicast_socket = Arc::new(factory.bind(unicast_addr, &SocketOptions::new()).await?); tracing::info!( "Passive server bound to {} for service 0x{:04X}", unicast_addr, config.service_id ); - // Bind a placeholder SD socket on an ephemeral port. Nothing will - // route to it (neither multicast nor unicast on 30490), and neither - // `announcement_loop` nor `run` should be called for a passive - // server. We still allocate it so the `Server` struct shape is - // identical to the full-server path. - let sd_placeholder_addr = std::net::SocketAddr::new(IpAddr::V4(config.interface), 0); - let sd_socket = UdpSocket::bind(sd_placeholder_addr).await?; - // Log the bound address using `Debug` on the `Result` - // so a hypothetical `local_addr` failure does not propagate as a - // construction error and we do not introduce an unreachable Err - // arm purely for defensive logging. + // Placeholder SD socket on an ephemeral port — no multicast options, + // no group join. Nothing should route to it. + let sd_placeholder_addr = SocketAddrV4::new(config.interface, 0); + let sd_socket = Arc::new( + factory + .bind(sd_placeholder_addr, &SocketOptions::new()) + .await?, + ); tracing::info!( - "Passive server SD placeholder socket bound to {:?} (not in SD reuseport group)", - sd_socket.local_addr() + "Passive server SD placeholder socket bound near {} (not in SD reuseport group)", + sd_placeholder_addr ); - let subscriptions: Arc> = - Arc::new(RwLock::new(SubscriptionManager::new())); - let e2e_registry: Arc> = Arc::new(Mutex::new(E2ERegistry::new())); let publisher = Arc::new(EventPublisher::new( subscriptions.clone(), Arc::clone(&unicast_socket), @@ -263,17 +374,28 @@ impl Server { Ok(Self { config, unicast_socket, - sd_socket: Arc::new(sd_socket), + sd_socket, subscriptions, publisher, sd_state: Arc::new(SdStateManager::new()), e2e_registry, + factory, + timer, is_passive: true, }) } } -impl Server { +impl Server +where + R: E2ERegistryHandle, + S: SubscriptionHandle, + F: TransportFactory + Send + Sync + 'static, + F::Socket: Send + Sync + 'static, + for<'a> ::SendFuture<'a>: Send, + for<'a> ::RecvFuture<'a>: Send, + Tm: Timer + Clone + Send + Sync + 'static, +{ /// Build the periodic-SD-announcement future. /// /// Returns a future that sends an `OfferService` message to the SD @@ -282,12 +404,17 @@ impl Server { /// function does no work on its own. /// /// ```no_run - /// # use simple_someip::server::Server; - /// # async fn demo(server: Server) -> Result<(), simple_someip::server::Error> { + /// # #[cfg(feature = "server-tokio")] { + /// # use simple_someip::server::{Server, ServerConfig}; + /// # use std::net::Ipv4Addr; + /// # async fn demo() -> Result<(), simple_someip::server::Error> { + /// # let config = ServerConfig::new(Ipv4Addr::LOCALHOST, 30490, 0, 0); + /// # let server = Server::new(config).await?; /// let announce_fut = server.announcement_loop()?; /// tokio::spawn(announce_fut); /// # Ok(()) /// # } + /// # } /// ``` /// /// # Errors @@ -314,11 +441,12 @@ impl Server { let config = self.config.clone(); let sd_socket = Arc::clone(&self.sd_socket); let sd_state = Arc::clone(&self.sd_state); + let timer = self.timer.clone(); Ok(async move { let mut announcement_count = 0u32; loop { - match sd_state.send_offer_service(&config, &sd_socket).await { + match sd_state.send_offer_service(&config, &*sd_socket).await { Ok(()) => { announcement_count += 1; if announcement_count == 1 { @@ -342,8 +470,8 @@ impl Server { // Send announcements every 1 second. Sleep goes through // the `Timer` trait so bare-metal consumers can swap in // a different timer impl; today it resolves to - // `TokioTimer`. - TokioTimer.sleep(std::time::Duration::from_secs(1)).await; + // `TokioTimer` under the `server-tokio` feature. + timer.sleep(core::time::Duration::from_secs(1)).await; } }) } @@ -387,7 +515,8 @@ impl Server { someip_header.encode(&mut buffer)?; buffer.extend_from_slice(&sd_data); - self.sd_socket.send_to(&buffer, target).await?; + let target_v4 = socket_addr_v4(target)?; + self.sd_socket.send_to(&buffer, target_v4).await?; tracing::debug!( "Sent unicast OfferService to {} for service 0x{:04X}", target, @@ -399,7 +528,7 @@ impl Server { /// Get the event publisher for sending events #[must_use] - pub fn publisher(&self) -> Arc> { + pub fn publisher(&self) -> Arc> { Arc::clone(&self.publisher) } @@ -409,7 +538,12 @@ impl Server { /// /// Returns an error if the socket's local address cannot be retrieved. pub fn unicast_local_addr(&self) -> Result { - self.unicast_socket.local_addr() + match self.unicast_socket.local_addr() { + Ok(v4) => Ok(std::net::SocketAddr::V4(v4)), + Err(_) => Err(std::io::Error::other( + "transport: failed to read local_addr", + )), + } } /// Update the configured local port (useful after binding to ephemeral port 0). @@ -499,12 +633,22 @@ impl Server { pin_mut!(unicast_fut, sd_fut); select! { result = unicast_fut => { - let (len, addr) = result?; - (len, addr, "unicast", true) + let datagram = result?; + ( + datagram.bytes_received, + std::net::SocketAddr::V4(datagram.source), + "unicast", + true, + ) } result = sd_fut => { - let (len, addr) = result?; - (len, addr, "sd-multicast", false) + let datagram = result?; + ( + datagram.bytes_received, + std::net::SocketAddr::V4(datagram.source), + "sd-multicast", + false, + ) } } }; @@ -720,6 +864,23 @@ impl Server { } } +/// Convert a [`std::net::SocketAddr`] into a [`SocketAddrV4`] for the +/// transport layer. SOME/IP-SD is IPv4-only at this layer; if a V6 +/// address ever surfaces here it indicates a misconfiguration upstream +/// (a V6 socket binding the SD port, or a V6 source address surfaced +/// by a transport that should not produce one). Returns +/// [`std::io::ErrorKind::Unsupported`] in that case so the caller can +/// log and drop the message instead of panicking. +fn socket_addr_v4(addr: std::net::SocketAddr) -> Result { + match addr { + std::net::SocketAddr::V4(v4) => Ok(v4), + std::net::SocketAddr::V6(_) => Err(Error::Io(std::io::Error::new( + std::io::ErrorKind::Unsupported, + "IPv6 SD address is not supported", + ))), + } +} + /// Extract a single subscriber endpoint from the options runs associated with /// an SD entry. Walks both option runs, returns the first `IpV4Endpoint` /// found, and logs a `warn!` if more than one is present. @@ -786,7 +947,16 @@ fn extract_subscriber_endpoint( } } -impl Server { +impl Server +where + R: E2ERegistryHandle, + S: SubscriptionHandle, + F: TransportFactory + Send + Sync + 'static, + F::Socket: Send + Sync + 'static, + for<'a> ::SendFuture<'a>: Send, + for<'a> ::RecvFuture<'a>: Send, + Tm: Timer + Clone + Send + Sync + 'static, +{ /// Send `SubscribeAck` from an entry view async fn send_subscribe_ack_from_view( &self, @@ -822,7 +992,8 @@ impl Server { someip_header.encode(&mut buffer)?; buffer.extend_from_slice(&sd_data); - self.sd_socket.send_to(&buffer, subscriber).await?; + let subscriber_v4 = socket_addr_v4(subscriber)?; + self.sd_socket.send_to(&buffer, subscriber_v4).await?; tracing::debug!( "Sent SubscribeAck to {} for service 0x{:04X}, eventgroup 0x{:04X}", @@ -870,7 +1041,8 @@ impl Server { someip_header.encode(&mut buffer)?; buffer.extend_from_slice(&sd_data); - self.sd_socket.send_to(&buffer, subscriber).await?; + let subscriber_v4 = socket_addr_v4(subscriber)?; + self.sd_socket.send_to(&buffer, subscriber_v4).await?; tracing::warn!( "Sent SubscribeNack to {} for service 0x{:04X}, eventgroup 0x{:04X} (reason: {})", @@ -884,20 +1056,34 @@ impl Server { } } -#[cfg(test)] +#[cfg(all(test, feature = "server-tokio"))] mod tests { use super::*; use crate::protocol::{ Header as SomeIpHeader, MessageType, MessageTypeField, MessageView, ReturnCode, }; + use crate::tokio_transport::{TokioTimer, TokioTransport}; use crate::traits::WireFormat; use std::format; + use std::net::IpAddr; + use tokio::net::UdpSocket; + + /// Type alias bringing the tokio-flavor concrete type parameters back + /// into scope so tests can spell `TestServer::new(...)` without + /// chasing the four-type-parameter signature on every call site. + /// Mirrors the `TestClient` pattern from `tests/client_server.rs`. + type TestServer = Server< + Arc>, + Arc>, + TokioTransport, + TokioTimer, + >; #[tokio::test] async fn test_server_creation() { let config = ServerConfig::new(Ipv4Addr::LOCALHOST, 30682, 0x5B, 1); - let server: Result = Server::new(config).await; + let server: Result = TestServer::new(config).await; assert!(server.is_ok()); } @@ -909,16 +1095,16 @@ mod tests { // as `test_server_creation`. let config = ServerConfig::new(Ipv4Addr::LOCALHOST, 30683, 0x5C, 1); - let server = Server::new_with_loopback(config, true) + let server = TestServer::new_with_loopback(config, true) .await .expect("new_with_loopback(true) should succeed on localhost"); // Confirm the SD socket was actually configured with IP_MULTICAST_LOOP // enabled — this is the behavior the new code path is supposed to // produce and is what makes same-host testing possible. - let sock_ref = socket2::SockRef::from(&*server.sd_socket); assert!( - sock_ref + server + .sd_socket .multicast_loop_v4() .expect("multicast_loop_v4 getter should succeed"), "multicast loopback should be enabled on the SD socket", @@ -953,10 +1139,10 @@ mod tests { } /// Helper: create a server on an ephemeral port and return (Server, port) - async fn create_test_server(service_id: u16, instance_id: u16) -> (Server, u16) { + async fn create_test_server(service_id: u16, instance_id: u16) -> (TestServer, u16) { // Use port 0 to get an ephemeral port let config = ServerConfig::new(Ipv4Addr::LOCALHOST, 0, service_id, instance_id); - let mut server = Server::new(config).await.expect("Failed to create server"); + let mut server = TestServer::new(config).await.expect("Failed to create server"); let port = match server.unicast_local_addr().unwrap() { std::net::SocketAddr::V4(addr) => addr.port(), std::net::SocketAddr::V6(_) => panic!("expected IPv4 address"), @@ -1026,7 +1212,7 @@ mod tests { // Run server to process one message (with a timeout) let server_handle = tokio::spawn(async move { let mut buf = vec![0u8; 65535]; - let (len, addr) = server.unicast_socket.recv_from(&mut buf).await.unwrap(); + let datagram = server.unicast_socket.recv_from(&mut buf).await.unwrap(); let len = datagram.bytes_received; let addr = std::net::SocketAddr::V4(datagram.source); let data = &buf[..len]; let view = MessageView::parse(data).unwrap(); let sd_view = view.sd_header().unwrap(); @@ -1078,7 +1264,7 @@ mod tests { // Process the message let server_handle = tokio::spawn(async move { let mut buf = vec![0u8; 65535]; - let (len, addr) = server.unicast_socket.recv_from(&mut buf).await.unwrap(); + let datagram = server.unicast_socket.recv_from(&mut buf).await.unwrap(); let len = datagram.bytes_received; let addr = std::net::SocketAddr::V4(datagram.source); let data = &buf[..len]; let view = MessageView::parse(data).unwrap(); let sd_view = view.sd_header().unwrap(); @@ -1127,7 +1313,7 @@ mod tests { let server_handle = tokio::spawn(async move { let mut buf = vec![0u8; 65535]; - let (len, addr) = server.unicast_socket.recv_from(&mut buf).await.unwrap(); + let datagram = server.unicast_socket.recv_from(&mut buf).await.unwrap(); let len = datagram.bytes_received; let addr = std::net::SocketAddr::V4(datagram.source); let data = &buf[..len]; let view = MessageView::parse(data).unwrap(); let sd_view = view.sd_header().unwrap(); @@ -1174,7 +1360,7 @@ mod tests { // Process the message on the unicast socket let server_handle = tokio::spawn(async move { let mut buf = vec![0u8; 65535]; - let (len, addr) = server.unicast_socket.recv_from(&mut buf).await.unwrap(); + let datagram = server.unicast_socket.recv_from(&mut buf).await.unwrap(); let len = datagram.bytes_received; let addr = std::net::SocketAddr::V4(datagram.source); let data = &buf[..len]; let view = MessageView::parse(data).unwrap(); let sd_view = view.sd_header().unwrap(); @@ -1224,7 +1410,7 @@ mod tests { let server_handle = tokio::spawn(async move { let mut buf = vec![0u8; 65535]; - let (len, addr) = server.unicast_socket.recv_from(&mut buf).await.unwrap(); + let datagram = server.unicast_socket.recv_from(&mut buf).await.unwrap(); let len = datagram.bytes_received; let addr = std::net::SocketAddr::V4(datagram.source); let data = &buf[..len]; let view = MessageView::parse(data).unwrap(); let sd_view = view.sd_header().unwrap(); @@ -1271,7 +1457,7 @@ mod tests { let server_handle = tokio::spawn(async move { let mut buf = vec![0u8; 65535]; - let (len, addr) = server.unicast_socket.recv_from(&mut buf).await.unwrap(); + let datagram = server.unicast_socket.recv_from(&mut buf).await.unwrap(); let len = datagram.bytes_received; let addr = std::net::SocketAddr::V4(datagram.source); let data = &buf[..len]; let view = MessageView::parse(data).unwrap(); let sd_view = view.sd_header().unwrap(); @@ -1311,7 +1497,7 @@ mod tests { let server_handle = tokio::spawn(async move { let mut buf = vec![0u8; 65535]; - let (len, addr) = server.unicast_socket.recv_from(&mut buf).await.unwrap(); + let datagram = server.unicast_socket.recv_from(&mut buf).await.unwrap(); let len = datagram.bytes_received; let addr = std::net::SocketAddr::V4(datagram.source); let data = &buf[..len]; let view = MessageView::parse(data).unwrap(); let sd_view = view.sd_header().unwrap(); @@ -1563,7 +1749,7 @@ mod tests { let server_handle = tokio::spawn(async move { let mut buf = vec![0u8; 65535]; - let (len, addr) = server.unicast_socket.recv_from(&mut buf).await.unwrap(); + let datagram = server.unicast_socket.recv_from(&mut buf).await.unwrap(); let len = datagram.bytes_received; let addr = std::net::SocketAddr::V4(datagram.source); let data = &buf[..len]; let view = MessageView::parse(data).unwrap(); let sd_view = view.sd_header().unwrap(); @@ -1849,20 +2035,19 @@ mod tests { // datagram. We drive `handle_sd_message` directly rather than // `server.run()` so we can assert state after the call. let client_socket = UdpSocket::bind("127.0.0.1:0").await.unwrap(); - let sd_addr = match server.sd_socket.local_addr().unwrap() { - std::net::SocketAddr::V4(v4) => v4, - std::net::SocketAddr::V6(_) => panic!("expected v4 sd socket"), - }; + let sd_addr = server.sd_socket.local_addr().unwrap(); client_socket.send_to(&message, sd_addr).await.unwrap(); let mut buf = vec![0u8; 65_535]; - let (len, sender) = tokio::time::timeout( + let datagram = tokio::time::timeout( std::time::Duration::from_secs(2), server.sd_socket.recv_from(&mut buf), ) .await .expect("timeout receiving combined SD packet") .unwrap(); + let len = datagram.bytes_received; + let sender = std::net::SocketAddr::V4(datagram.source); let view = MessageView::parse(&buf[..len]).unwrap(); let sd_view = view.sd_header().unwrap(); server.handle_sd_message(&sd_view, sender).await.unwrap(); @@ -1900,9 +2085,9 @@ mod tests { /// Construct a passive server on loopback with an ephemeral unicast /// port. Tests use this as a standard fixture. - async fn make_passive_server(service_id: u16, instance_id: u16) -> Server { + async fn make_passive_server(service_id: u16, instance_id: u16) -> TestServer { let config = ServerConfig::new(Ipv4Addr::LOCALHOST, 0, service_id, instance_id); - Server::new_passive(config) + TestServer::new_passive(config) .await .expect("new_passive should succeed") } @@ -1931,16 +2116,11 @@ mod tests { // the same module. let server = make_passive_server(0x005C, 0x0001).await; let sd_addr = server.sd_socket.local_addr().unwrap(); - match sd_addr { - std::net::SocketAddr::V4(v4) => { - assert_ne!( - v4.port(), - 30490, - "passive SD socket must not bind the SOME/IP SD port" - ); - } - std::net::SocketAddr::V6(_) => panic!("expected IPv4 SD address"), - } + assert_ne!( + sd_addr.port(), + 30490, + "passive SD socket must not bind the SOME/IP SD port" + ); } #[tokio::test] @@ -2075,7 +2255,7 @@ mod tests { }; let config = ServerConfig::new(iface, 30501, SID, IID); - let server = Server::new_with_loopback(config, true).await.unwrap(); + let server = TestServer::new_with_loopback(config, true).await.unwrap(); let fut = server.announcement_loop().expect("build loop"); let handle = tokio::spawn(fut); @@ -2142,12 +2322,8 @@ mod tests { // Different placeholder ports. assert_ne!(addr_a, addr_b); // And neither is 30490. - if let std::net::SocketAddr::V4(v4) = addr_a { - assert_ne!(v4.port(), 30490); - } - if let std::net::SocketAddr::V4(v4) = addr_b { - assert_ne!(v4.port(), 30490); - } + assert_ne!(addr_a.port(), 30490); + assert_ne!(addr_b.port(), 30490); } #[tokio::test] @@ -2164,11 +2340,17 @@ mod tests { }; let config = ServerConfig::new(Ipv4Addr::LOCALHOST, blocker_port, 0x005C, 0x0001); - let result = Server::new_passive(config).await; + let result = TestServer::new_passive(config).await; let Err(err) = result else { panic!("new_passive must fail when the unicast port is taken"); }; match err { + // Phase 14b: the bind path now goes through the + // `TransportFactory` trait, so port collisions surface as + // `Error::Transport(TransportError::AddressInUse)` instead + // of `Error::Io`. Both variants are accepted to keep the + // test stable across future transport-error refactors. + Error::Transport(crate::transport::TransportError::AddressInUse) => {} Error::Io(io_err) => { assert!( matches!( @@ -2179,7 +2361,7 @@ mod tests { io_err.kind() ); } - other => panic!("expected Error::Io, got {other:?}"), + other => panic!("expected Error::Io or Error::Transport(AddressInUse), got {other:?}"), } drop(blocker); } @@ -2301,7 +2483,7 @@ mod tests { let rx: UdpSocket = UdpSocket::from_std(raw_rx.into()).unwrap(); rx.join_multicast_v4(sd::MULTICAST_IP, interface).unwrap(); - let server = Server::new_with_loopback(config, true) + let server = TestServer::new_with_loopback(config, true) .await .expect("server must bind with loopback enabled"); let announce_fut = server diff --git a/src/server/sd_state.rs b/src/server/sd_state.rs index 803f7bf..8bf12ed 100644 --- a/src/server/sd_state.rs +++ b/src/server/sd_state.rs @@ -12,11 +12,11 @@ use core::sync::atomic::{AtomicBool, AtomicU16, Ordering}; use std::{net::SocketAddrV4, vec::Vec}; -use tokio::net::UdpSocket; use crate::protocol::sd::{ self, Entry, Flags, OptionsCount, RebootFlag, ServiceEntry, TransportProtocol, }; +use crate::transport::TransportSocket; use super::{Error, ServerConfig}; @@ -123,10 +123,10 @@ impl SdStateManager { } /// Send a multicast `OfferService` announcement for the given config. - pub(super) async fn send_offer_service( + pub(super) async fn send_offer_service( &self, config: &ServerConfig, - socket: &UdpSocket, + socket: &T, ) -> Result<(), Error> { use crate::protocol::Header as SomeIpHeader; use crate::traits::WireFormat; @@ -187,12 +187,14 @@ impl SdStateManager { } } -#[cfg(test)] +#[cfg(all(test, feature = "server-tokio"))] mod tests { use super::{SdStateManager, ServerConfig}; use crate::protocol::sd::{self, EntryType, Flags, RebootFlag, TransportProtocol}; use crate::protocol::{MessageType, MessageView, ReturnCode}; - use std::net::{IpAddr, Ipv4Addr, SocketAddr}; + use crate::tokio_transport::TokioSocket; + use crate::transport::{SocketOptions, TransportFactory}; + use std::net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4}; use std::time::Duration; use tokio::net::UdpSocket; @@ -326,23 +328,22 @@ mod tests { UdpSocket::from_std(raw.into()) } - /// Bind a sender socket on an ephemeral port with `multicast_if` pinned - /// to the loopback interface so emitted packets loop back to any - /// receiver joined to the same group on that interface. - fn build_mcast_sender(interface: Ipv4Addr) -> std::io::Result { - let raw = socket2::Socket::new( - socket2::Domain::IPV4, - socket2::Type::DGRAM, - Some(socket2::Protocol::UDP), - )?; - raw.set_reuse_address(true)?; - #[cfg(unix)] - raw.set_reuse_port(true)?; - raw.set_multicast_loop_v4(true)?; - raw.set_multicast_if_v4(&interface)?; - raw.bind(&SocketAddr::new(IpAddr::V4(interface), 0).into())?; - raw.set_nonblocking(true)?; - UdpSocket::from_std(raw.into()) + /// Bind a sender [`TokioSocket`] on an ephemeral port with + /// `multicast_if` pinned to the loopback interface so emitted + /// packets loop back to any receiver joined to the same group on + /// that interface. Uses the [`TransportFactory`] surface so the + /// resulting socket implements [`crate::transport::TransportSocket`] + /// — which is what the now-generic + /// [`SdStateManager::send_offer_service`] requires. + async fn build_mcast_sender(interface: Ipv4Addr) -> Result { + let mut opts = SocketOptions::new(); + opts.reuse_address = true; + opts.reuse_port = true; + opts.multicast_if_v4 = Some(interface); + opts.multicast_loop_v4 = true; + crate::tokio_transport::TokioTransport + .bind(SocketAddrV4::new(interface, 0), &opts) + .await } /// Fields extracted from a received SOME/IP-SD `OfferService` packet. @@ -477,12 +478,12 @@ mod tests { } /// Standard loopback receiver/sender pair used by the send-path tests. - fn mcast_rx_tx() -> (UdpSocket, UdpSocket) { + async fn mcast_rx_tx() -> (UdpSocket, TokioSocket) { let interface = Ipv4Addr::LOCALHOST; let rx = build_mcast_receiver(interface).expect("bind receiver"); rx.join_multicast_v4(sd::MULTICAST_IP, interface) .expect("join SD multicast group"); - let tx = build_mcast_sender(interface).expect("bind sender"); + let tx = build_mcast_sender(interface).await.expect("bind sender"); (rx, tx) } @@ -497,7 +498,7 @@ mod tests { TEST_SERVICE_ID, TEST_INSTANCE_ID, ); - let (rx, tx) = mcast_rx_tx(); + let (rx, tx) = mcast_rx_tx().await; // Seed with a recognisable value so on-wire session_id is exact. let sd_state = SdStateManager::with_initial(0x1233); @@ -527,7 +528,7 @@ mod tests { TEST_SERVICE_ID, TEST_INSTANCE_ID, ); - let (rx, tx) = mcast_rx_tx(); + let (rx, tx) = mcast_rx_tx().await; let sd_state = SdStateManager::with_initial(0x1233); sd_state.send_offer_service(&config, &tx).await.unwrap(); @@ -553,7 +554,7 @@ mod tests { TEST_SERVICE_ID, TEST_INSTANCE_ID, ); - let (rx, tx) = mcast_rx_tx(); + let (rx, tx) = mcast_rx_tx().await; let sd_state = SdStateManager::with_initial(0xFFFE); sd_state.send_offer_service(&config, &tx).await.unwrap(); @@ -598,7 +599,7 @@ mod tests { TEST_INSTANCE_ID, ); config.ttl = 0; - let (rx, tx) = mcast_rx_tx(); + let (rx, tx) = mcast_rx_tx().await; let sd_state = SdStateManager::with_initial(0x1233); sd_state.send_offer_service(&config, &tx).await.unwrap(); diff --git a/src/server/service_info.rs b/src/server/service_info.rs index 59bf38a..a702278 100644 --- a/src/server/service_info.rs +++ b/src/server/service_info.rs @@ -52,6 +52,7 @@ pub struct Subscriber { impl Subscriber { /// Create a new subscriber + #[must_use] pub fn new( address: SocketAddrV4, service_id: u16, diff --git a/src/server/subscription_manager.rs b/src/server/subscription_manager.rs index 7f2cbe5..af3c743 100644 --- a/src/server/subscription_manager.rs +++ b/src/server/subscription_manager.rs @@ -3,7 +3,10 @@ use super::service_info::Subscriber; use core::future::Future; use heapless::{Vec as HeaplessVec, index_map::FnvIndexMap}; -use std::{net::SocketAddrV4, sync::Arc, vec::Vec}; +use std::{net::SocketAddrV4, vec::Vec}; +#[cfg(feature = "server-tokio")] +use std::sync::Arc; +#[cfg(feature = "server-tokio")] use tokio::sync::RwLock; /// Max number of distinct `(service_id, instance_id, event_group_id)` event @@ -300,6 +303,7 @@ pub trait SubscriptionHandle: Clone + Send + Sync + 'static { ) -> impl Future> + Send + '_; } +#[cfg(feature = "server-tokio")] impl SubscriptionHandle for Arc> { fn subscribe( &self, diff --git a/src/tokio_transport.rs b/src/tokio_transport.rs index a1402b7..25c03f5 100644 --- a/src/tokio_transport.rs +++ b/src/tokio_transport.rs @@ -266,7 +266,15 @@ fn bind_with_options(addr: SocketAddrV4, options: SocketOptions) -> std::io::Res if let Some(iface) = options.multicast_if_v4 { raw.set_multicast_if_v4(&iface)?; } - raw.set_multicast_loop_v4(options.multicast_loop_v4)?; + // Only set the multicast-loop flag when the caller is doing + // multicast (i.e. they configured a multicast interface). Calling + // `set_multicast_loop_v4` on a plain-unicast socket on some + // backends can return EOPNOTSUPP / EINVAL; even on Linux where it + // succeeds, it's a meaningless syscall. Mirrors the behavior of + // the `client::SocketManager` discovery-bind path. + if options.multicast_if_v4.is_some() { + raw.set_multicast_loop_v4(options.multicast_loop_v4)?; + } let bind_addr = SocketAddr::new(IpAddr::V4(*addr.ip()), addr.port()); raw.bind(&bind_addr.into())?; raw.set_nonblocking(true)?; @@ -540,13 +548,18 @@ mod tests { #[tokio::test] async fn multicast_loop_v4_option_propagates_in_both_directions() { - // Guards against a regression where `multicast_loop_v4: false` was - // silently ignored and the socket kept the OS default (often - // loopback ENABLED), diverging from the explicit request. + // Guards against a regression where `multicast_loop_v4` was + // silently ignored on a multicast bind and the socket kept the + // OS default, diverging from the explicit request. Phase 14b: + // `bind_with_options` only applies `set_multicast_loop_v4` when + // `multicast_if_v4` is `Some` (a plain-unicast bind has no + // meaningful multicast-loop setting), so this test always pairs + // the loop flag with a multicast interface. let factory = TokioTransport; let opts_off = SocketOptions { multicast_loop_v4: false, + multicast_if_v4: Some(Ipv4Addr::LOCALHOST), ..SocketOptions::default() }; let sock_off = factory @@ -560,6 +573,7 @@ mod tests { let opts_on = SocketOptions { multicast_loop_v4: true, + multicast_if_v4: Some(Ipv4Addr::LOCALHOST), ..SocketOptions::default() }; let sock_on = factory diff --git a/tests/bare_metal_server.rs b/tests/bare_metal_server.rs new file mode 100644 index 0000000..9b2ff92 --- /dev/null +++ b/tests/bare_metal_server.rs @@ -0,0 +1,328 @@ +//! Phase-14b witness test: prove that `Server` can be constructed and +//! driven without the `server-tokio` feature, using only the trait +//! surface (`TransportFactory`, `Timer`, `E2ERegistryHandle`, +//! `SubscriptionHandle`). +//! +//! `simple-someip` is compiled with `default-features = false, +//! features = ["server", "bare_metal"]` per the `required-features` +//! gate below — i.e. NO tokio, NO socket2 pulled in via the crate +//! itself. The test still uses the host's tokio runtime as a generic +//! executor (tokio is a `dev-dependency`), but every type fed to +//! `simple-someip::Server::new_with_deps` comes from the no-tokio side: +//! a hand-rolled mock `TransportFactory`, a hand-rolled `Timer`, a +//! hand-rolled `SubscriptionHandle`, and the std-backed +//! `Arc>` impl that ships under the bare `transport` +//! module. +//! +//! This is the gate witness for the phase-14b claim that `Server` +//! is reachable on a no-tokio build. Compile-witness alone (Cargo +//! `required-features` proving the test crate compiles without +//! `server-tokio`) is the load-bearing assertion; the `tokio::spawn` +//! at the end is a sanity check that the announcement-loop future is +//! `Send + 'static` and the trait surface drives a working pipeline. +#![cfg(all(feature = "server", feature = "bare_metal"))] + +use core::future::Future; +use core::net::{Ipv4Addr, SocketAddrV4}; +use core::pin::Pin; +use core::task::{Context, Poll}; +use core::time::Duration; +use std::collections::VecDeque; +use std::sync::{Arc, Mutex}; +use std::vec::Vec; + +use simple_someip::e2e::E2ERegistry; +use simple_someip::server::{SubscribeError, Subscriber, SubscriptionHandle}; +use simple_someip::transport::{ + ReceivedDatagram, SocketOptions, Timer, TransportError, TransportFactory, TransportSocket, +}; +use simple_someip::{Server, ServerDeps}; +use simple_someip::server::ServerConfig; + +// ── Mock transport ───────────────────────────────────────────────────── + +#[derive(Default)] +struct MockPipe { + sent: Mutex, SocketAddrV4)>>, + inbound: Mutex, SocketAddrV4)>>, +} + +#[derive(Clone)] +struct MockFactory { + pipe: Arc, + next_port: Arc>, +} + +impl TransportFactory for MockFactory { + type Socket = MockSocket; + fn bind( + &self, + addr: SocketAddrV4, + _options: &SocketOptions, + ) -> impl Future> + Send { + let pipe = Arc::clone(&self.pipe); + // Mock: assign port deterministically. If caller asked for 0, + // hand out an incrementing fake ephemeral port. + let port = if addr.port() == 0 { + let mut p = self.next_port.lock().unwrap(); + let next = *p + 1; + *p = next; + 40000 + next + } else { + addr.port() + }; + let local = SocketAddrV4::new(*addr.ip(), port); + async move { Ok(MockSocket { pipe, local }) } + } +} + +struct MockSocket { + pipe: Arc, + local: SocketAddrV4, +} + +struct MockSendFut { + pipe: Arc, + bytes: Option>, + target: SocketAddrV4, +} + +impl Future for MockSendFut { + type Output = Result<(), TransportError>; + fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll { + let me = self.get_mut(); + if let Some(bytes) = me.bytes.take() { + me.pipe.sent.lock().unwrap().push_back((bytes, me.target)); + } + Poll::Ready(Ok(())) + } +} + +struct MockRecvFut<'a> { + pipe: Arc, + buf: &'a mut [u8], +} + +impl Future for MockRecvFut<'_> { + type Output = Result; + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let me = self.get_mut(); + let entry = me.pipe.inbound.lock().unwrap().pop_front(); + match entry { + Some((bytes, source)) => { + let n = bytes.len().min(me.buf.len()); + me.buf[..n].copy_from_slice(&bytes[..n]); + Poll::Ready(Ok(ReceivedDatagram { + bytes_received: n, + source, + truncated: n < bytes.len(), + })) + } + None => { + // No data: return Pending and wake immediately to keep + // the run-loop ticking. Real bare-metal impls park the + // task on an interrupt-driven waker. + cx.waker().wake_by_ref(); + Poll::Pending + } + } + } +} + +impl TransportSocket for MockSocket { + type SendFuture<'a> = MockSendFut; + type RecvFuture<'a> = MockRecvFut<'a>; + + fn send_to<'a>(&'a self, buf: &'a [u8], target: SocketAddrV4) -> Self::SendFuture<'a> { + MockSendFut { + pipe: Arc::clone(&self.pipe), + bytes: Some(buf.to_vec()), + target, + } + } + + fn recv_from<'a>(&'a self, buf: &'a mut [u8]) -> Self::RecvFuture<'a> { + MockRecvFut { + pipe: Arc::clone(&self.pipe), + buf, + } + } + + fn local_addr(&self) -> Result { + Ok(self.local) + } + + fn join_multicast_v4(&self, _group: Ipv4Addr, _iface: Ipv4Addr) -> Result<(), TransportError> { + Ok(()) + } + + fn leave_multicast_v4(&self, _group: Ipv4Addr, _iface: Ipv4Addr) -> Result<(), TransportError> { + Ok(()) + } +} + +// ── Mock Timer ──────────────────────────────────────────────────────── + +#[derive(Clone)] +struct MockTimer; +impl Timer for MockTimer { + async fn sleep(&self, _duration: Duration) { + // The witness here is "the *crate* doesn't pull tokio under + // `--features server,bare_metal`," not "the test runs without + // tokio at all." The test runtime itself is `#[tokio::test]` + // (tokio is a `dev-dependency`), so using `tokio::task::yield_now` + // inside this mock is fine — it only proves the production + // crate's no-tokio path compiles. + tokio::task::yield_now().await; + } +} + +// ── Mock SubscriptionHandle ─────────────────────────────────────────── +// +// On `server-tokio`, `Arc>` is a built-in +// impl. Bare-metal callers supply their own. A real bare-metal impl +// would back this with a `critical_section::Mutex>` or a +// `spin::Mutex<...>` over a `heapless`-backed table; here we use +// `std::sync::Mutex` over a tiny inline table because the test runtime +// has `std`. The point is the *trait* impl, not the concurrency +// primitive. + +type SubKey = (u16, u16, u16, SocketAddrV4); + +#[derive(Clone, Default)] +#[allow(clippy::type_complexity)] +struct MockSubscriptions(Arc>>); + +impl SubscriptionHandle for MockSubscriptions { + fn subscribe( + &self, + service_id: u16, + instance_id: u16, + event_group_id: u16, + subscriber_addr: SocketAddrV4, + ) -> impl Future> + Send + '_ { + let this = self.0.clone(); + async move { + let mut guard = this.lock().unwrap(); + let key = (service_id, instance_id, event_group_id, subscriber_addr); + if !guard.contains(&key) { + guard.push(key); + } + Ok(()) + } + } + + fn unsubscribe( + &self, + service_id: u16, + instance_id: u16, + event_group_id: u16, + subscriber_addr: SocketAddrV4, + ) -> impl Future + Send + '_ { + let this = self.0.clone(); + async move { + let mut guard = this.lock().unwrap(); + guard.retain(|e| { + *e != (service_id, instance_id, event_group_id, subscriber_addr) + }); + } + } + + fn get_subscribers( + &self, + service_id: u16, + instance_id: u16, + event_group_id: u16, + ) -> impl Future> + Send + '_ { + let this = self.0.clone(); + async move { + let guard = this.lock().unwrap(); + guard + .iter() + .filter(|(s, i, e, _)| *s == service_id && *i == instance_id && *e == event_group_id) + .map(|(s, i, e, addr)| Subscriber::new(*addr, *s, *i, *e)) + .collect() + } + } +} + +// ── Test ────────────────────────────────────────────────────────────── + +#[tokio::test] +async fn server_constructible_without_server_tokio_feature() { + let pipe = Arc::new(MockPipe::default()); + let factory = MockFactory { + pipe: Arc::clone(&pipe), + next_port: Arc::new(Mutex::new(0)), + }; + + let e2e_handle: Arc> = Arc::new(Mutex::new(E2ERegistry::new())); + let subs = MockSubscriptions::default(); + + let config = ServerConfig::new(Ipv4Addr::LOCALHOST, 30490, 0x5B, 1); + + let deps: ServerDeps>, MockSubscriptions> = + ServerDeps { + factory, + timer: MockTimer, + e2e_registry: e2e_handle, + subscriptions: subs, + }; + + let server: Server< + Arc>, + MockSubscriptions, + MockFactory, + MockTimer, + > = Server::new_with_deps(deps, config, false) + .await + .expect("Server::new_with_deps must succeed with no-tokio mocks"); + + // Build the announcement-loop future and prove it's `Send + 'static` + // by spawning it on tokio. The witness is purely structural: if this + // line compiles, `Server` is reachable on a no-tokio build. + let announce_fut = server + .announcement_loop() + .expect("announcement_loop must build on a non-passive server"); + let handle = tokio::spawn(announce_fut); + + // Yield once so the spawned future has a chance to poll (its first + // tick fires `send_to` immediately, before the timer sleep). + tokio::task::yield_now().await; + tokio::task::yield_now().await; + + // Tear down: abort the announce loop. + handle.abort(); + let _ = handle.await; +} + +#[tokio::test] +async fn passive_server_constructible_without_server_tokio_feature() { + let pipe = Arc::new(MockPipe::default()); + let factory = MockFactory { + pipe: Arc::clone(&pipe), + next_port: Arc::new(Mutex::new(0)), + }; + + let e2e_handle: Arc> = Arc::new(Mutex::new(E2ERegistry::new())); + let subs = MockSubscriptions::default(); + + let config = ServerConfig::new(Ipv4Addr::LOCALHOST, 0, 0x5C, 2); + + let deps: ServerDeps>, MockSubscriptions> = + ServerDeps { + factory, + timer: MockTimer, + e2e_registry: e2e_handle, + subscriptions: subs, + }; + + let _server: Server< + Arc>, + MockSubscriptions, + MockFactory, + MockTimer, + > = Server::new_passive_with_deps(deps, config) + .await + .expect("Server::new_passive_with_deps must succeed with no-tokio mocks"); +} diff --git a/tests/client_server.rs b/tests/client_server.rs index 8b7a359..7a8ba9a 100644 --- a/tests/client_server.rs +++ b/tests/client_server.rs @@ -59,10 +59,30 @@ type TestClient = Client< TokioChannels, >; +/// Type alias bringing the tokio-flavor concrete type parameters back into +/// scope so callers can spell `TestServer::new(...)` without chasing the +/// four-type-parameter signature on every call site. +type TestServer = Server< + std::sync::Arc>, + std::sync::Arc>, + simple_someip::TokioTransport, + simple_someip::TokioTimer, +>; + +/// Type alias for the event publisher concrete type used by `TestServer`'s +/// publisher. Same shape rationale as [`TestServer`]. +type TestEventPublisher = simple_someip::server::EventPublisher< + std::sync::Arc>, + std::sync::Arc>, + simple_someip::TokioSocket, +>; + /// Create a server on an ephemeral unicast port, returning (Server, actual_port). -async fn create_server(service_id: u16, instance_id: u16) -> (Server, u16) { +async fn create_server(service_id: u16, instance_id: u16) -> (TestServer, u16) { let config = ServerConfig::new(Ipv4Addr::LOCALHOST, 0, service_id, instance_id); - let mut server: Server = Server::new(config).await.expect("Server::new failed"); + let mut server: TestServer = TestServer::new(config) + .await + .expect("Server::new failed"); let port = match server.unicast_local_addr().expect("local_addr failed") { std::net::SocketAddr::V4(a) => a.port(), _ => panic!("expected IPv4"), @@ -74,7 +94,7 @@ async fn create_server(service_id: u16, instance_id: u16) -> (Server, u16) { /// Poll `has_subscribers` with retries until the server has processed the /// subscription. Returns true if subscribers appeared within the deadline. async fn wait_for_subscribers( - publisher: &simple_someip::server::EventPublisher, + publisher: &TestEventPublisher, service_id: u16, instance_id: u16, event_group_id: u16,