diff --git a/Cargo.lock b/Cargo.lock index 7a2c94d..cd38b00 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -46,23 +46,58 @@ version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "19d374276b40fb8bbdee95aef7c7fa6b5316ec764510eb64b8dd0e2ed0d7e7f5" +[[package]] +name = "critical-section" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "790eea4361631c5e7d22598ecd5723ff611904e3344ce8720784c93e3d83d40b" + [[package]] name = "discovery_client" version = "0.0.0" dependencies = [ - "embedded-io", + "embedded-io 0.7.1", "simple-someip", "tokio", "tracing", "tracing-subscriber", ] +[[package]] +name = "embassy-sync" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8d2c8cdff05a7a51ba0087489ea44b0b1d97a296ca6b1d6d1a33ea7423d34049" +dependencies = [ + "cfg-if", + "critical-section", + "embedded-io-async", + "futures-sink", + "futures-util", + "heapless 0.8.0", +] + +[[package]] +name = "embedded-io" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "edd0f118536f44f5ccd48bcb8b111bdc3de888b58c74639dfb034a357d0f206d" + [[package]] name = "embedded-io" version = "0.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9eb1aa714776b75c7e67e1da744b81a129b3ff919c8712b5e1b32252c1f07cc7" +[[package]] +name = "embedded-io-async" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3ff09972d4073aa8c299395be75161d582e7629cd663171d62af73c8d50dba3f" +dependencies = [ + "embedded-io 0.6.1", +] + [[package]] name = "futures" version = "0.3.32" @@ -148,6 +183,16 @@ dependencies = [ "byteorder", ] +[[package]] +name = "heapless" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0bfb9eb618601c89945a70e254898da93b13be0388091d42117462b265bb3fad" +dependencies = [ + "hash32", + "stable_deref_trait", +] + [[package]] name = "heapless" version = "0.9.2" @@ -246,9 +291,10 @@ name = "simple-someip" version = "0.7.0" dependencies = [ "crc", - "embedded-io", + "embassy-sync", + "embedded-io 0.7.1", "futures", - "heapless", + "heapless 0.9.2", "socket2 0.5.10", "thiserror", "tokio", diff --git a/Cargo.toml b/Cargo.toml index 8c33e00..cf519b5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,6 +16,12 @@ repository = "https://github.com/luminartech/simple_someip" [dependencies] crc = "3.4" +# embassy-sync provides no_std-compatible bounded channels used as the +# channel backend when the `bare_metal` feature is active. The +# `critical-section` and `portable-atomic` deps ship with embassy-sync and +# are satisfiable on the Infineon AURIX TriCore target (HighTec toolchain) +# per the bare_metal_plan_v2 TriCore delta. +embassy-sync = { version = "0.6", optional = true } embedded-io = { version = "0.7" } # `futures` pulls in `futures-util` which provides the executor-agnostic # `select!` macro and `FutureExt::fuse` / `pin_mut!` helpers — used by @@ -60,7 +66,11 @@ server = ["std", "dep:tokio", "dep:socket2", "dep:futures"] # bare-metal-complete: the `client` and `server` feature paths still # spawn per-socket I/O loops on `tokio::spawn`, and a fully tokio-free # build additionally needs a user-provided `Spawner` impl (phase 9). -bare_metal = [] +# `bare_metal` activates embassy-sync as the channel backend. The feature +# is a prerequisite for the Phase 11 channel-handle abstraction: with +# `bare_metal` enabled, `EmbassySyncChannels` is available as the +# `ChannelFactory` impl that does not depend on tokio. +bare_metal = ["dep:embassy-sync"] [[test]] name = "client_server" diff --git a/examples/bare_metal/src/main.rs b/examples/bare_metal/src/main.rs index 1ef8844..77ec950 100644 --- a/examples/bare_metal/src/main.rs +++ b/examples/bare_metal/src/main.rs @@ -44,29 +44,28 @@ //! # Known gaps in the bare-metal story (independent of this example) //! //! The example exercises the **trait layer** (`TransportSocket`, -//! `TransportFactory`, `Timer`, `Spawner`) — and that is all. It does -//! NOT demonstrate a no_alloc integration with +//! `TransportFactory`, `Timer`, `Spawner`, `ChannelFactory`) — and +//! that is all. It does NOT demonstrate a no_alloc integration with //! `simple_someip::Client` / `simple_someip::Server`, because those -//! are not yet no_alloc-compatible. Phase 9 landed `Spawner`, which -//! abstracts ONE runtime primitive (task submission). Four others -//! remain before a no_alloc consumer can use `Client`: +//! are not yet no_alloc-compatible. //! -//! 1. **`tokio::sync::mpsc` channels** inside `SocketManager` -//! (capacities 4 and 16 per socket): heap-allocated AND -//! tokio-runtime-coupled (the `Waker` plumbing only works on a -//! tokio task). -//! 2. **`tokio::sync::oneshot`** used for send-ack round-trips: same -//! allocation + runtime-coupling issue. -//! 3. **`Arc>`** shared between the client's -//! control path and every per-socket loop: requires `alloc` + -//! `std::sync`. -//! 4. **`F::Socket = TokioSocket`** bound on `bind_*`: a phase-5 +//! **Completed abstractions:** +//! - Phase 9: `Spawner` trait (task submission) +//! - Phase 10: `E2ERegistryHandle` / `InterfaceHandle` (lock handles) +//! - Phase 11: `ChannelFactory` trait with `TokioChannels` (std) and +//! `EmbassySyncChannels` (bare_metal) backends — replaces direct +//! `tokio::sync::mpsc` / `oneshot` usage +//! +//! **Remaining gaps:** +//! 1. **`F::Socket = TokioSocket`** bound on `bind_*`: a phase-5 //! compromise because stable Rust Return-Type Notation is still -//! nightly. +//! nightly. Phase 12 relaxes this via GATs. +//! 2. **Feature-flag split** (Phase 13): `client` / `server` still +//! pull in tokio + socket2. A future split (`client` vs +//! `client-tokio`) will make the core types no_std-compatible. //! -//! Closing those four is additional phased work (roughly the same -//! scope again as phases 1–9 combined). Until then, `feature = "client"` -//! / `feature = "server"` pull in `std + tokio + socket2`. +//! Until those are closed, `feature = "client"` / `feature = "server"` +//! pull in `std + tokio + socket2`. //! //! # Recommendation for no_alloc consumers today //! diff --git a/src/client/inner.rs b/src/client/inner.rs index e822a1c..d6e5cb6 100644 --- a/src/client/inner.rs +++ b/src/client/inner.rs @@ -7,10 +7,6 @@ use std::{ sync::{Arc, Mutex}, task::Poll, }; -use tokio::sync::{ - mpsc::{self, Receiver, Sender}, - oneshot, -}; use tracing::{debug, error, info, trace, warn}; use crate::{ @@ -23,9 +19,12 @@ use crate::{ }, e2e::E2ERegistry, protocol::{self, Message}, - tokio_transport::{TokioSpawner, TokioTimer, TokioTransport}, + tokio_transport::{TokioChannels, TokioSpawner, TokioTimer, TokioTransport}, traits::PayloadWireFormat, - transport::{E2ERegistryHandle, Spawner}, + transport::{ + ChannelFactory, E2ERegistryHandle, MpscRecv, OneshotSend, Spawner, + UnboundedSend, + }, }; use super::error::Error; @@ -43,31 +42,31 @@ const PENDING_RESPONSES_CAP: usize = 64; /// two. const UNICAST_SOCKETS_CAP: usize = 8; -pub(super) enum ControlMessage { - SetInterface(Ipv4Addr, oneshot::Sender>), - BindDiscovery(oneshot::Sender>), - UnbindDiscovery(oneshot::Sender>), +pub(super) enum ControlMessage { + SetInterface(Ipv4Addr, C::OneshotSender>), + BindDiscovery(C::OneshotSender>), + UnbindDiscovery(C::OneshotSender>), SendSD( SocketAddrV4, P::SdHeader, - oneshot::Sender>, + C::OneshotSender>, ), AddEndpoint( u16, u16, SocketAddrV4, u16, - oneshot::Sender>, + C::OneshotSender>, ), - RemoveEndpoint(u16, u16, oneshot::Sender>), + RemoveEndpoint(u16, u16, C::OneshotSender>), SendToService { service_id: u16, instance_id: u16, message: Message

, /// Fires when the UDP send completes (or errors on lookup/bind). - send_complete: oneshot::Sender>, + send_complete: C::OneshotSender>, /// Fires when a matching unicast response arrives. - response: oneshot::Sender>, + response: C::OneshotSender>, }, Subscribe { service_id: u16, @@ -76,18 +75,18 @@ pub(super) enum ControlMessage { ttl: u32, event_group_id: u16, client_port: u16, - response: oneshot::Sender>, + response: C::OneshotSender>, }, - QueryRebootFlag(oneshot::Sender>), + QueryRebootFlag(C::OneshotSender>), /// Test-only: force `sd_session_has_wrapped` to simulate the state a /// long-running client reaches after its SD session counter wraps past /// `0xFFFF`, without actually sending 65k SD messages. Fires the /// accompanying oneshot once the mutation is applied. #[cfg(test)] - ForceSdSessionWrappedForTest(bool, oneshot::Sender>), + ForceSdSessionWrappedForTest(bool, C::OneshotSender>), } -impl std::fmt::Debug for ControlMessage

{ +impl std::fmt::Debug for ControlMessage { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { Self::SetInterface(addr, _) => f.debug_tuple("SetInterface").field(addr).finish(), @@ -140,25 +139,25 @@ impl std::fmt::Debug for ControlMessage

{ } } -impl ControlMessage

{ - pub fn set_interface(interface: Ipv4Addr) -> (oneshot::Receiver>, Self) { - let (sender, receiver) = oneshot::channel(); +impl ControlMessage { + pub fn set_interface(interface: Ipv4Addr) -> (C::OneshotReceiver>, Self) { + let (sender, receiver) = C::oneshot(); (receiver, Self::SetInterface(interface, sender)) } - pub fn bind_discovery() -> (oneshot::Receiver>, Self) { - let (sender, receiver) = oneshot::channel(); + pub fn bind_discovery() -> (C::OneshotReceiver>, Self) { + let (sender, receiver) = C::oneshot(); (receiver, Self::BindDiscovery(sender)) } - pub fn unbind_discovery() -> (oneshot::Receiver>, Self) { - let (sender, receiver) = oneshot::channel(); + pub fn unbind_discovery() -> (C::OneshotReceiver>, Self) { + let (sender, receiver) = C::oneshot(); (receiver, Self::UnbindDiscovery(sender)) } pub fn send_sd( socket_addr: SocketAddrV4, header: P::SdHeader, - ) -> (oneshot::Receiver>, Self) { - let (sender, receiver) = oneshot::channel(); + ) -> (C::OneshotReceiver>, Self) { + let (sender, receiver) = C::oneshot(); (receiver, Self::SendSD(socket_addr, header, sender)) } pub fn add_endpoint( @@ -166,8 +165,8 @@ impl ControlMessage

{ instance_id: u16, addr: SocketAddrV4, local_port: u16, - ) -> (oneshot::Receiver>, Self) { - let (sender, receiver) = oneshot::channel(); + ) -> (C::OneshotReceiver>, Self) { + let (sender, receiver) = C::oneshot(); ( receiver, Self::AddEndpoint(service_id, instance_id, addr, local_port, sender), @@ -177,8 +176,8 @@ impl ControlMessage

{ pub fn remove_endpoint( service_id: u16, instance_id: u16, - ) -> (oneshot::Receiver>, Self) { - let (sender, receiver) = oneshot::channel(); + ) -> (C::OneshotReceiver>, Self) { + let (sender, receiver) = C::oneshot(); ( receiver, Self::RemoveEndpoint(service_id, instance_id, sender), @@ -191,12 +190,12 @@ impl ControlMessage

{ instance_id: u16, message: Message

, ) -> ( - oneshot::Receiver>, - oneshot::Receiver>, + C::OneshotReceiver>, + C::OneshotReceiver>, Self, ) { - let (send_complete_tx, send_complete_rx) = oneshot::channel(); - let (response_tx, response_rx) = oneshot::channel(); + let (send_complete_tx, send_complete_rx) = C::oneshot(); + let (response_tx, response_rx) = C::oneshot(); ( send_complete_rx, response_rx, @@ -217,8 +216,8 @@ impl ControlMessage

{ ttl: u32, event_group_id: u16, client_port: u16, - ) -> (oneshot::Receiver>, Self) { - let (sender, receiver) = oneshot::channel(); + ) -> (C::OneshotReceiver>, Self) { + let (sender, receiver) = C::oneshot(); ( receiver, Self::Subscribe { @@ -234,18 +233,18 @@ impl ControlMessage

{ } pub fn query_reboot_flag() -> ( - oneshot::Receiver>, + C::OneshotReceiver>, Self, ) { - let (sender, receiver) = oneshot::channel(); + let (sender, receiver) = C::oneshot(); (receiver, Self::QueryRebootFlag(sender)) } #[cfg(test)] pub fn force_sd_session_wrapped_for_test( wrapped: bool, - ) -> (oneshot::Receiver>, Self) { - let (sender, receiver) = oneshot::channel(); + ) -> (C::OneshotReceiver>, Self) { + let (sender, receiver) = C::oneshot(); ( receiver, Self::ForceSdSessionWrappedForTest(wrapped, sender), @@ -291,20 +290,21 @@ impl ControlMessage

{ } pub(super) struct Inner< - PayloadDefinitions: PayloadWireFormat, + PayloadDefinitions: PayloadWireFormat + 'static, S: Spawner = TokioSpawner, R: E2ERegistryHandle = Arc>, + C: ChannelFactory = TokioChannels, > { /// MPSC Receiver used to receive control messages from outer client - control_receiver: Receiver>, + control_receiver: C::BoundedReceiver>, /// Queue of pending control messages to process - request_queue: Deque, REQUEST_QUEUE_CAP>, + request_queue: Deque, REQUEST_QUEUE_CAP>, /// Pending request-responses keyed by `request_id` (`client_id` << 16 | `session_counter`). /// Set by `SendToService`, cleared when a matching unicast arrives. pending_responses: - FnvIndexMap>, PENDING_RESPONSES_CAP>, + FnvIndexMap>, PENDING_RESPONSES_CAP>, /// Unbounded sender used to send updates to outer client - update_sender: mpsc::UnboundedSender>, + update_sender: C::UnboundedSender>, /// Target interface for sockets interface: Ipv4Addr, /// Socket manager for service discovery if bound @@ -337,7 +337,9 @@ pub(super) struct Inner< phantom: std::marker::PhantomData, } -impl std::fmt::Debug for Inner { +impl std::fmt::Debug + for Inner +{ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("Inner") .field("interface", &self.interface) @@ -349,11 +351,12 @@ impl std::fmt::Debug for } } -impl Inner +impl Inner where PayloadDefinitions: PayloadWireFormat + Clone + std::fmt::Debug + 'static, S: Spawner + Send + Sync + 'static, R: E2ERegistryHandle, + C: ChannelFactory, { /// Construct an `Inner` and return the control/update channels plus /// the run-loop future. The caller must drive the future on a Tokio @@ -365,19 +368,20 @@ where /// is already Send. A bare-metal consumer whose transport produces /// `!Send` state needs a cfg-gated alternative constructor; none /// exists yet — it's planned alongside the bare-metal port. + #[allow(clippy::type_complexity)] pub fn build( interface: Ipv4Addr, e2e_registry: R, multicast_loopback: bool, spawner: S, ) -> ( - Sender>, - mpsc::UnboundedReceiver>, + C::BoundedSender>, + C::UnboundedReceiver>, impl core::future::Future + Send + 'static, ) { info!("Initializing SOME/IP Client"); - let (control_sender, control_receiver) = mpsc::channel(4); - let (update_sender, update_receiver) = mpsc::unbounded_channel(); + let (control_sender, control_receiver) = C::bounded::<_, 4>(); + let (update_sender, update_receiver) = C::unbounded(); let inner = Self { control_receiver, request_queue: Deque::new(), @@ -494,7 +498,7 @@ where fn track_or_reject_pending_response( &mut self, request_id: u32, - response: oneshot::Sender>, + response: C::OneshotSender>, ) { match self.pending_responses.insert(request_id, response) { Ok(None) => {} @@ -1075,7 +1079,7 @@ where } if rebooted { - let _ = update_sender.send(ClientUpdate::SenderRebooted(source)); + let _ = update_sender.send_now(ClientUpdate::SenderRebooted(source)); } let discovery_msg = DiscoveryMessage { @@ -1083,11 +1087,11 @@ where someip_header, sd_header, }; - let _ = update_sender.send(ClientUpdate::DiscoveryUpdated(discovery_msg)); + let _ = update_sender.send_now(ClientUpdate::DiscoveryUpdated(discovery_msg)); } Err(err) => { error!("Error receiving discovery message: {:?}", err); - let _ = update_sender.send(ClientUpdate::Error(err)); + let _ = update_sender.send_now(ClientUpdate::Error(err)); } } } @@ -1103,10 +1107,10 @@ where continue; } // Not a response — forward as ClientUpdate::Unicast - let _ = update_sender.send(ClientUpdate::Unicast { message: received_message, e2e_status }); + let _ = update_sender.send_now(ClientUpdate::Unicast { message: received_message, e2e_status }); } Err(err) => { - let _ = update_sender.send(ClientUpdate::Error(err)); + let _ = update_sender.send_now(ClientUpdate::Error(err)); } } } @@ -1126,7 +1130,10 @@ where mod tests { use super::*; use crate::protocol::sd::test_support::{TestPayload, empty_sd_header}; + use crate::transport::{OneshotRecv, UnboundedRecv}; use std::format; + use tokio::sync::{mpsc, oneshot}; + use tokio::sync::mpsc::Sender; type TestControl = ControlMessage; @@ -1170,55 +1177,62 @@ mod tests { /// the resulting `RecvError`, which is exactly what Copilot flagged. #[test] fn reject_with_capacity_notifies_every_sender() { - fn expect_capacity( - rx: &mut oneshot::Receiver>, - label: &str, - ) { - match rx.try_recv() { - Ok(Err(Error::Capacity(s))) => assert_eq!(s, "request_queue", "{label}"), - other => panic!("{label}: expected Err(Capacity), got {other:?}"), + use futures::FutureExt; + use crate::transport::OneshotCancelled; + + fn expect_capacity(rx: F, label: &str) + where + F: core::future::Future, OneshotCancelled>>, + { + match rx.now_or_never() { + Some(Ok(Err(Error::Capacity(s)))) => assert_eq!(s, "request_queue", "{label}"), + other => panic!("{label}: expected Some(Ok(Err(Capacity))), got {other:?}"), } } // Variants carrying a single Result<(), Error> response sender. - let (mut rx, msg) = TestControl::set_interface(Ipv4Addr::LOCALHOST); + let (rx, msg) = TestControl::set_interface(Ipv4Addr::LOCALHOST); msg.reject_with_capacity("request_queue"); - expect_capacity(&mut rx, "SetInterface"); + expect_capacity(rx.recv(), "SetInterface"); - let (mut rx, msg) = TestControl::bind_discovery(); + let (rx, msg) = TestControl::bind_discovery(); msg.reject_with_capacity("request_queue"); - expect_capacity(&mut rx, "BindDiscovery"); + expect_capacity(rx.recv(), "BindDiscovery"); - let (mut rx, msg) = TestControl::unbind_discovery(); + let (rx, msg) = TestControl::unbind_discovery(); msg.reject_with_capacity("request_queue"); - expect_capacity(&mut rx, "UnbindDiscovery"); + expect_capacity(rx.recv(), "UnbindDiscovery"); let target = SocketAddrV4::new(Ipv4Addr::LOCALHOST, 1234); - let (mut rx, msg) = TestControl::send_sd(target, empty_sd_header()); + let (rx, msg) = TestControl::send_sd(target, empty_sd_header()); msg.reject_with_capacity("request_queue"); - expect_capacity(&mut rx, "SendSD"); + expect_capacity(rx.recv(), "SendSD"); let addr = SocketAddrV4::new(Ipv4Addr::LOCALHOST, 5000); - let (mut rx, msg) = TestControl::add_endpoint(0x1234, 0x0001, addr, 0); + let (rx, msg) = TestControl::add_endpoint(0x1234, 0x0001, addr, 0); msg.reject_with_capacity("request_queue"); - expect_capacity(&mut rx, "AddEndpoint"); + expect_capacity(rx.recv(), "AddEndpoint"); - let (mut rx, msg) = TestControl::remove_endpoint(0x1234, 0x0001); + let (rx, msg) = TestControl::remove_endpoint(0x1234, 0x0001); msg.reject_with_capacity("request_queue"); - expect_capacity(&mut rx, "RemoveEndpoint"); + expect_capacity(rx.recv(), "RemoveEndpoint"); - let (mut rx, msg) = TestControl::subscribe(0x1234, 0x0001, 1, 3, 0x01, 0); + let (rx, msg) = TestControl::subscribe(0x1234, 0x0001, 1, 3, 0x01, 0); msg.reject_with_capacity("request_queue"); - expect_capacity(&mut rx, "Subscribe"); + expect_capacity(rx.recv(), "Subscribe"); // SendToService carries two senders — both must be notified so that - // neither `send_rx.await.unwrap()?` nor `PendingResponse::response()` + // neither `send_rx.recv().await.unwrap()?` nor `PendingResponse::response()` // panics. let message = Message::::new_sd(1, &empty_sd_header()); - let (mut send_rx, mut resp_rx, msg) = TestControl::send_to_service(0x1234, 0x0001, message); + let (send_rx, resp_rx, msg) = TestControl::send_to_service(0x1234, 0x0001, message); msg.reject_with_capacity("request_queue"); - expect_capacity(&mut send_rx, "SendToService.send_complete"); - expect_capacity(&mut resp_rx, "SendToService.response"); + expect_capacity(send_rx.recv(), "SendToService.send_complete"); + // resp_rx has type Result — check it separately + match resp_rx.recv().now_or_never() { + Some(Ok(Err(Error::Capacity(s)))) => assert_eq!(s, "request_queue", "SendToService.response"), + other => panic!("SendToService.response: expected Some(Ok(Err(Capacity))), got {other:?}"), + } } #[test] @@ -1264,8 +1278,10 @@ mod tests { /// Build an [`Inner`] without spawning the run loop, for direct /// unit-testing of state-mutating methods. fn make_inner_for_test() -> Inner { - let (_control_sender, control_receiver) = mpsc::channel(4); - let (update_sender, _update_receiver) = mpsc::unbounded_channel(); + let (_control_sender, control_receiver) = + TokioChannels::bounded::, 4>(); + let (update_sender, _update_receiver) = + TokioChannels::unbounded::>(); Inner { control_receiver, request_queue: Deque::new(), @@ -1326,8 +1342,9 @@ mod tests { /// alive so a future unicast reply can resolve it. #[tokio::test] async fn track_or_reject_pending_response_inserts_when_room_available() { + use futures::FutureExt; let mut inner = make_inner_for_test(); - let (tx, mut rx) = oneshot::channel::>(); + let (tx, rx) = oneshot::channel::>(); inner.track_or_reject_pending_response(0xDEAD_BEEF, tx); @@ -1339,7 +1356,7 @@ mod tests { // Receiver is still waiting — helper did NOT pre-emptively // resolve it with a capacity error on the happy path. assert!( - matches!(rx.try_recv(), Err(oneshot::error::TryRecvError::Empty)), + rx.now_or_never().is_none(), "receiver must still be pending when the insert succeeds", ); } @@ -1419,6 +1436,8 @@ mod tests { /// caller gets a clean `Result` instead of a panicking `RecvError`. #[tokio::test] async fn track_or_reject_pending_response_completes_displaced_sender() { + use futures::FutureExt; + let mut inner = make_inner_for_test(); let key: u32 = 0xCAFE_F00D; @@ -1428,7 +1447,7 @@ mod tests { assert_eq!(inner.pending_responses.len(), 1); // Second tracking with the same key: displaces the first sender. - let (second_tx, mut second_rx) = oneshot::channel::>(); + let (second_tx, second_rx) = oneshot::channel::>(); inner.track_or_reject_pending_response(key, second_tx); // Map still has one entry — the second one replaced the first. @@ -1443,15 +1462,12 @@ mod tests { ); match displaced_result { Err(Error::Capacity(tag)) => assert_eq!(tag, "pending_responses"), - other => panic!("expected Err(Error::Capacity(\"pending_responses\")), got {other:?}"), + other => panic!("expected Err(Error::Capacity(\\\"pending_responses\\\")), got {other:?}"), } // The new sender is still live and pending. assert!( - matches!( - second_rx.try_recv(), - Err(oneshot::error::TryRecvError::Empty) - ), + second_rx.now_or_never().is_none(), "replacement sender must still be pending in the map", ); } @@ -1545,18 +1561,18 @@ mod tests { drop(control_sender); // The update receiver should eventually return None when the inner loop exits let result = - tokio::time::timeout(std::time::Duration::from_secs(2), update_receiver.recv()).await; + tokio::time::timeout(std::time::Duration::from_secs(2), UnboundedRecv::recv(&mut update_receiver)).await; assert!(result.is_ok()); assert!(result.unwrap().is_none()); } /// Helper: verify inner loop is still alive by sending an `AddEndpoint` and /// checking that a response arrives within 2 seconds. - async fn assert_inner_alive(control_sender: &Sender>) { + async fn assert_inner_alive(control_sender: &Sender>) { let addr = SocketAddrV4::new(Ipv4Addr::LOCALHOST, 9999); let (rx, msg) = TestControl::add_endpoint(0xFFFE, 0xFFFE, addr, 0); control_sender.send(msg).await.unwrap(); - let result = tokio::time::timeout(std::time::Duration::from_secs(2), rx) + let result = tokio::time::timeout(std::time::Duration::from_secs(2), rx.recv()) .await .expect("Timed out — inner loop appears dead") .expect("Oneshot closed — inner loop appears dead"); @@ -1637,7 +1653,7 @@ mod tests { // Bind discovery first so the SendSD path has a socket to use let (rx, msg) = TestControl::bind_discovery(); control_sender.send(msg).await.unwrap(); - rx.await.unwrap().unwrap(); + rx.recv().await.unwrap().unwrap(); // Send SD with a dropped receiver let target = SocketAddrV4::new(Ipv4Addr::LOCALHOST, 30490); @@ -1670,7 +1686,7 @@ mod tests { // iteration 2: interface matches, bind discovery, send response let (rx, msg) = TestControl::bind_discovery(); control_sender.send(msg).await.unwrap(); - rx.await.unwrap().unwrap(); + rx.recv().await.unwrap().unwrap(); // Queue both messages into the channel buffer before the inner loop // processes either. mpsc sends on a non-full buffer complete without @@ -1685,13 +1701,13 @@ mod tests { control_sender.send(msg_add).await.unwrap(); // Both should complete successfully - let set_result = tokio::time::timeout(std::time::Duration::from_secs(3), rx_set) + let set_result = tokio::time::timeout(std::time::Duration::from_secs(3), rx_set.recv()) .await .expect("Timed out waiting for SetInterface") .expect("SetInterface oneshot closed"); assert!(set_result.is_ok()); - let add_result = tokio::time::timeout(std::time::Duration::from_secs(3), rx_add) + let add_result = tokio::time::timeout(std::time::Duration::from_secs(3), rx_add.recv()) .await .expect("Timed out waiting for AddEndpoint") .expect("AddEndpoint oneshot closed"); @@ -1701,8 +1717,8 @@ mod tests { assert_inner_alive(&control_sender).await; } - #[test] - fn test_send_to_service_constructor_returns_two_receivers() { + #[tokio::test] + async fn test_send_to_service_constructor_returns_two_receivers() { let message = Message::::new_sd(1, &empty_sd_header()); let (send_rx, resp_rx, msg) = TestControl::send_to_service(0x1234, 0x0001, message); @@ -1715,13 +1731,13 @@ mod tests { { // Both channels are independent — sending on one doesn't affect the other send_complete.send(Ok(())).unwrap(); - assert!(send_rx.blocking_recv().unwrap().is_ok()); + assert!(send_rx.recv().await.unwrap().is_ok()); let payload = TestPayload { header: empty_sd_header(), }; response.send(Ok(payload.clone())).unwrap(); - assert_eq!(resp_rx.blocking_recv().unwrap().unwrap(), payload); + assert_eq!(resp_rx.recv().await.unwrap().unwrap(), payload); } else { panic!("expected SendToService variant"); } @@ -1778,7 +1794,7 @@ mod tests { let addr = SocketAddrV4::new(Ipv4Addr::LOCALHOST, 5000); let (rx, msg) = TestControl::add_endpoint(0x1234, 0x0001, addr, 0); control_sender.send(msg).await.unwrap(); - rx.await.unwrap().unwrap(); + rx.recv().await.unwrap().unwrap(); // Send SendToService with the send_complete receiver dropped let message = Message::::new_sd(1, &empty_sd_header()); @@ -1804,7 +1820,7 @@ mod tests { let (rx, msg) = TestControl::bind_discovery(); control_sender.send(msg).await.unwrap(); - rx.await.unwrap().unwrap(); + rx.recv().await.unwrap().unwrap(); } #[tokio::test] @@ -1820,12 +1836,12 @@ mod tests { let (rx, msg) = TestControl::bind_discovery(); control_sender.send(msg).await.unwrap(); - rx.await.unwrap().unwrap(); + rx.recv().await.unwrap().unwrap(); // Second bind should also succeed (idempotent path) let (rx, msg) = TestControl::bind_discovery(); control_sender.send(msg).await.unwrap(); - rx.await.unwrap().unwrap(); + rx.recv().await.unwrap().unwrap(); } #[tokio::test] @@ -1843,7 +1859,7 @@ mod tests { let sd_header = empty_sd_header(); let (rx, msg) = TestControl::send_sd(target, sd_header); control_sender.send(msg).await.unwrap(); - let result = tokio::time::timeout(std::time::Duration::from_secs(2), rx) + let result = tokio::time::timeout(std::time::Duration::from_secs(2), rx.recv()) .await .expect("Timed out waiting for SendSD") .expect("SendSD oneshot closed"); @@ -1864,12 +1880,12 @@ mod tests { let addr = SocketAddrV4::new(Ipv4Addr::LOCALHOST, 5000); let (rx, msg) = TestControl::add_endpoint(0x1234, 0x0001, addr, 0); control_sender.send(msg).await.unwrap(); - rx.await.unwrap().unwrap(); + rx.recv().await.unwrap().unwrap(); let message = Message::::new_sd(1, &empty_sd_header()); let (send_rx, _resp_rx, msg) = TestControl::send_to_service(0x1234, 0x0001, message); control_sender.send(msg).await.unwrap(); - let result = tokio::time::timeout(std::time::Duration::from_secs(2), send_rx) + let result = tokio::time::timeout(std::time::Duration::from_secs(2), send_rx.recv()) .await .expect("Timed out waiting for SendToService") .expect("SendToService oneshot closed"); @@ -1890,18 +1906,18 @@ mod tests { // Bind discovery first let (rx, msg) = TestControl::bind_discovery(); control_sender.send(msg).await.unwrap(); - rx.await.unwrap().unwrap(); + rx.recv().await.unwrap().unwrap(); // Add endpoint let addr = SocketAddrV4::new(Ipv4Addr::LOCALHOST, 5000); let (rx, msg) = TestControl::add_endpoint(0x1234, 0x0001, addr, 0); control_sender.send(msg).await.unwrap(); - rx.await.unwrap().unwrap(); + rx.recv().await.unwrap().unwrap(); // Subscribe let (rx, msg) = TestControl::subscribe(0x1234, 0x0001, 1, 3, 0x01, 0); control_sender.send(msg).await.unwrap(); - let result = tokio::time::timeout(std::time::Duration::from_secs(2), rx) + let result = tokio::time::timeout(std::time::Duration::from_secs(2), rx.recv()) .await .expect("Timed out waiting for Subscribe") .expect("Subscribe oneshot closed"); @@ -1923,12 +1939,12 @@ mod tests { let addr = SocketAddrV4::new(Ipv4Addr::LOCALHOST, 5000); let (rx, msg) = TestControl::add_endpoint(0x1234, 0x0001, addr, 0); control_sender.send(msg).await.unwrap(); - rx.await.unwrap().unwrap(); + rx.recv().await.unwrap().unwrap(); // Subscribe should auto-bind discovery let (rx, msg) = TestControl::subscribe(0x1234, 0x0001, 1, 3, 0x01, 0); control_sender.send(msg).await.unwrap(); - let result = tokio::time::timeout(std::time::Duration::from_secs(2), rx) + let result = tokio::time::timeout(std::time::Duration::from_secs(2), rx.recv()) .await .expect("Timed out waiting for Subscribe") .expect("Subscribe oneshot closed"); @@ -1947,7 +1963,7 @@ mod tests { let (rx, msg) = TestControl::subscribe(0xFFFF, 0xFFFF, 1, 3, 0x01, 0); control_sender.send(msg).await.unwrap(); - let result = tokio::time::timeout(std::time::Duration::from_secs(2), rx) + let result = tokio::time::timeout(std::time::Duration::from_secs(2), rx.recv()) .await .expect("Timed out") .expect("oneshot closed"); @@ -1968,19 +1984,19 @@ mod tests { let addr = SocketAddrV4::new(Ipv4Addr::LOCALHOST, 5000); let (rx, msg) = TestControl::add_endpoint(0x1234, 0x0001, addr, 0); control_sender.send(msg).await.unwrap(); - rx.await.unwrap().unwrap(); + rx.recv().await.unwrap().unwrap(); // First send auto-binds unicast let message = Message::::new_sd(1, &empty_sd_header()); let (send_rx, _resp_rx, msg) = TestControl::send_to_service(0x1234, 0x0001, message); control_sender.send(msg).await.unwrap(); - send_rx.await.unwrap().unwrap(); + send_rx.recv().await.unwrap().unwrap(); // Second send reuses the existing socket (no auto-bind needed) let message = Message::::new_sd(1, &empty_sd_header()); let (send_rx, _resp_rx, msg) = TestControl::send_to_service(0x1234, 0x0001, message); control_sender.send(msg).await.unwrap(); - let result = tokio::time::timeout(std::time::Duration::from_secs(2), send_rx) + let result = tokio::time::timeout(std::time::Duration::from_secs(2), send_rx.recv()) .await .expect("Timed out") .expect("oneshot closed"); @@ -2024,7 +2040,7 @@ mod tests { // Binding discovery on 127.0.0.2 should succeed on most systems. let (rx, msg) = TestControl::set_interface(Ipv4Addr::new(127, 0, 0, 2)); control_sender.send(msg).await.unwrap(); - let result = tokio::time::timeout(std::time::Duration::from_secs(3), rx) + let result = tokio::time::timeout(std::time::Duration::from_secs(3), rx.recv()) .await .expect("Timed out waiting for SetInterface") .expect("SetInterface oneshot closed"); @@ -2049,7 +2065,7 @@ mod tests { // Bind discovery on LOCALHOST first let (rx, msg) = TestControl::bind_discovery(); control_sender.send(msg).await.unwrap(); - rx.await.unwrap().unwrap(); + rx.recv().await.unwrap().unwrap(); // Change to 127.0.0.2 — this takes the multi-step path: // 1. unbind discovery, re-queue @@ -2057,7 +2073,7 @@ mod tests { // 3. interface == 127.0.0.2, bind discovery let (rx, msg) = TestControl::set_interface(Ipv4Addr::new(127, 0, 0, 2)); control_sender.send(msg).await.unwrap(); - let result = tokio::time::timeout(std::time::Duration::from_secs(3), rx) + let result = tokio::time::timeout(std::time::Duration::from_secs(3), rx.recv()) .await .expect("Timed out waiting for SetInterface") .expect("SetInterface oneshot closed"); @@ -2081,17 +2097,17 @@ mod tests { // Add endpoint and bind discovery let (rx, msg) = TestControl::bind_discovery(); control_sender.send(msg).await.unwrap(); - rx.await.unwrap().unwrap(); + rx.recv().await.unwrap().unwrap(); let addr = SocketAddrV4::new(Ipv4Addr::LOCALHOST, 5000); let (rx, msg) = TestControl::add_endpoint(0x1234, 0x0001, addr, 0); control_sender.send(msg).await.unwrap(); - rx.await.unwrap().unwrap(); + rx.recv().await.unwrap().unwrap(); // First subscribe with specific port — binds the port let (rx, msg) = TestControl::subscribe(0x1234, 0x0001, 1, 3, 0x01, 44444); control_sender.send(msg).await.unwrap(); - let result = tokio::time::timeout(std::time::Duration::from_secs(2), rx) + let result = tokio::time::timeout(std::time::Duration::from_secs(2), rx.recv()) .await .expect("Timed out") .expect("oneshot closed"); @@ -2100,7 +2116,7 @@ mod tests { // Second subscribe with the same port — reuses the existing socket let (rx, msg) = TestControl::subscribe(0x1234, 0x0001, 1, 3, 0x02, 44444); control_sender.send(msg).await.unwrap(); - let result = tokio::time::timeout(std::time::Duration::from_secs(2), rx) + let result = tokio::time::timeout(std::time::Duration::from_secs(2), rx.recv()) .await .expect("Timed out") .expect("oneshot closed"); @@ -2132,11 +2148,11 @@ mod tests { // Bind and send one SD message to advance the session counter. let (rx, msg) = TestControl::bind_discovery(); control_sender.send(msg).await.unwrap(); - rx.await.unwrap().unwrap(); + rx.recv().await.unwrap().unwrap(); let (rx, msg) = TestControl::send_sd(target, empty_sd_header()); control_sender.send(msg).await.unwrap(); - rx.await.unwrap().unwrap(); + rx.recv().await.unwrap().unwrap(); let mut buf = vec![0u8; 1400]; let (len, _) = @@ -2152,16 +2168,16 @@ mod tests { // Unbind, then rebind. let (rx, msg) = TestControl::unbind_discovery(); control_sender.send(msg).await.unwrap(); - rx.await.unwrap().unwrap(); + rx.recv().await.unwrap().unwrap(); let (rx, msg) = TestControl::bind_discovery(); control_sender.send(msg).await.unwrap(); - rx.await.unwrap().unwrap(); + rx.recv().await.unwrap().unwrap(); // Send a second SD message and verify both session counter and reboot flag persisted. let (rx, msg) = TestControl::send_sd(target, empty_sd_header()); control_sender.send(msg).await.unwrap(); - rx.await.unwrap().unwrap(); + rx.recv().await.unwrap().unwrap(); let (len, _) = tokio::time::timeout(std::time::Duration::from_secs(2), raw.recv_from(&mut buf)) diff --git a/src/client/mod.rs b/src/client/mod.rs index 9545603..e84825a 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -38,29 +38,31 @@ pub use error::Error; use crate::Timer; use crate::e2e::{E2ECheckStatus, E2EKey, E2EProfile, E2ERegistry}; -use crate::tokio_transport::{TokioSpawner, TokioTimer}; -use crate::transport::{E2ERegistryHandle, InterfaceHandle, Spawner}; +use crate::tokio_transport::{TokioChannels, TokioSpawner, TokioTimer}; +use crate::transport::{ + ChannelFactory, E2ERegistryHandle, InterfaceHandle, MpscSend, OneshotRecv, Spawner, + UnboundedRecv, +}; use crate::{protocol, protocol::Message, traits::PayloadWireFormat}; use inner::{ControlMessage, Inner}; use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; use std::sync::{Arc, Mutex, RwLock}; -use tokio::sync::{mpsc, oneshot}; use tracing::info; /// Handle to a pending SOME/IP request-response transaction. /// Resolves when the inner loop receives a matching unicast reply. /// Does not borrow `Client`. -pub struct PendingResponse

{ - receiver: oneshot::Receiver>, +pub struct PendingResponse { + receiver: C::OneshotReceiver>, } -impl

std::fmt::Debug for PendingResponse

{ +impl std::fmt::Debug for PendingResponse { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("PendingResponse").finish_non_exhaustive() } } -impl

PendingResponse

{ +impl PendingResponse { /// Await the response payload. /// /// # Errors @@ -75,7 +77,7 @@ impl

PendingResponse

{ /// `PendingResponse` handle outlived its driver. Reserving `Shutdown` /// for actual lifecycle failure keeps `RecvError` unambiguous. pub async fn response(self) -> Result { - self.receiver.await.map_err(|_| Error::Shutdown)? + self.receiver.recv().await.map_err(|_| Error::Shutdown)? } } @@ -142,23 +144,25 @@ impl std::fmt::Debug for ClientUpdate

{ /// /// Returned by [`Client::new`]. Call [`recv`](Self::recv) to receive /// discovery, unicast, and error updates. -pub struct ClientUpdates { - update_receiver: mpsc::UnboundedReceiver>, +pub struct ClientUpdates { + update_receiver: C::UnboundedReceiver>, } -impl std::fmt::Debug for ClientUpdates { +impl std::fmt::Debug + for ClientUpdates +{ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("ClientUpdates").finish_non_exhaustive() } } -impl ClientUpdates { +impl ClientUpdates { /// Waits for the next update from the client event loop. /// /// Returns `None` when the inner loop has exited (all `Client` handles /// dropped and the event loop finished draining). pub async fn recv(&mut self) -> Option> { - self.update_receiver.recv().await + UnboundedRecv::recv(&mut self.update_receiver).await } } @@ -176,20 +180,22 @@ impl ClientUpdates { /// [`Self::new_with_spawner_and_loopback`]. #[derive(Clone)] pub struct Client< - MessageDefinitions: PayloadWireFormat, + MessageDefinitions: PayloadWireFormat + Send + 'static, R: E2ERegistryHandle = Arc>, I: InterfaceHandle = Arc>, + C: ChannelFactory = TokioChannels, > { interface: I, - control_sender: mpsc::Sender>, + control_sender: C::BoundedSender>, e2e_registry: R, } -impl std::fmt::Debug for Client +impl std::fmt::Debug for Client where - MessageDefinitions: PayloadWireFormat, + MessageDefinitions: PayloadWireFormat + Send + 'static, R: E2ERegistryHandle, I: InterfaceHandle, + C: ChannelFactory, { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("Client") @@ -199,7 +205,8 @@ where } /// Constructors that create the default `Arc`-backed handles for `std + tokio`. -impl Client>, Arc>> +impl + Client>, Arc>, TokioChannels> where MessageDefinitions: PayloadWireFormat + Clone + std::fmt::Debug + 'static, { @@ -312,19 +319,20 @@ where spawner: S, ) -> ( Self, - ClientUpdates, + ClientUpdates, impl core::future::Future + Send + 'static, ) where S: Spawner + Send + Sync + 'static, { let e2e_registry = Arc::new(Mutex::new(E2ERegistry::new())); - let (control_sender, update_receiver, run_future) = Inner::build( - interface, - Arc::clone(&e2e_registry), - multicast_loopback, - spawner, - ); + let (control_sender, update_receiver, run_future) = + Inner::::build( + interface, + Arc::clone(&e2e_registry), + multicast_loopback, + spawner, + ); let client = Self { interface: Arc::new(RwLock::new(interface)), @@ -336,12 +344,13 @@ where } } -/// Methods available on all `Client` regardless of handle types. -impl Client +/// Methods available on all `Client` regardless of handle types. +impl Client where MessageDefinitions: PayloadWireFormat + Clone + std::fmt::Debug + 'static, R: E2ERegistryHandle, I: InterfaceHandle, + C: ChannelFactory, { /// Returns the current network interface address. #[must_use] @@ -363,8 +372,8 @@ where self.control_sender .send(message) .await - .map_err(|_| Error::Shutdown)?; - response.await.map_err(|_| Error::Shutdown)??; + .map_err(|()| Error::Shutdown)?; + response.recv().await.map_err(|_| Error::Shutdown)??; self.interface.set(interface); Ok(()) } @@ -383,8 +392,8 @@ where self.control_sender .send(message) .await - .map_err(|_| Error::Shutdown)?; - response.await.map_err(|_| Error::Shutdown)? + .map_err(|()| Error::Shutdown)?; + response.recv().await.map_err(|_| Error::Shutdown)? } /// Unbinds the SD multicast discovery socket. @@ -401,8 +410,8 @@ where self.control_sender .send(message) .await - .map_err(|_| Error::Shutdown)?; - response.await.map_err(|_| Error::Shutdown)? + .map_err(|()| Error::Shutdown)?; + response.recv().await.map_err(|_| Error::Shutdown)? } /// Subscribes to an event group on a known service. @@ -434,8 +443,8 @@ where self.control_sender .send(message) .await - .map_err(|_| Error::Shutdown)?; - response.await.map_err(|_| Error::Shutdown)? + .map_err(|()| Error::Shutdown)?; + response.recv().await.map_err(|_| Error::Shutdown)? } /// Like [`subscribe`](Self::subscribe) but does not wait for the @@ -524,8 +533,8 @@ where self.control_sender .send(message) .await - .map_err(|_| Error::Shutdown)?; - response.await.map_err(|_| Error::Shutdown)? + .map_err(|()| Error::Shutdown)?; + response.recv().await.map_err(|_| Error::Shutdown)? } /// Test-only: force the inner loop's `sd_session_has_wrapped` so tests @@ -541,8 +550,8 @@ where self.control_sender .send(message) .await - .map_err(|_| Error::Shutdown)?; - response.await.map_err(|_| Error::Shutdown)? + .map_err(|()| Error::Shutdown)?; + response.recv().await.map_err(|_| Error::Shutdown)? } /// Sends an SD message to a specific target address. @@ -563,176 +572,8 @@ where self.control_sender .send(message) .await - .map_err(|_| Error::Shutdown)?; - response.await.map_err(|_| Error::Shutdown)? - } - - /// Start periodic SD announcements on the client's discovery socket. - /// - /// Spawns a background task that sends the given SD header to the - /// multicast group at a regular interval. Use this to bundle - /// `FindService` + `OfferService` entries from a single SD identity - /// when the application acts as both client and server. - /// - /// The announcements are sent via the client's SD socket, ensuring - /// they share the same source address as the client's `Subscribe` and - /// `FindService` messages. - /// - /// **Reboot flag auto-refresh:** the SD header's reboot bit is overridden - /// at each tick with the client's currently tracked reboot flag (via - /// [`PayloadWireFormat::set_reboot_flag`]). The reboot bit the caller - /// supplies on `sd_header` is therefore ignored. This ensures the flag - /// transitions from `RecentlyRebooted` to `Continuous` once the session - /// counter wraps past `0xFFFF`, rather than staying stuck on whatever - /// value was baked at call time. - /// - /// Returns an `impl Future + Send + 'static` that the - /// caller drives on their executor (typically via `tokio::spawn`). - /// The loop uses a weak reference to the client's control channel, - /// so it exits automatically when all `Client` handles are dropped - /// (via `shut_down()` or going out of scope). - /// - /// ```no_run - /// # use simple_someip::{Client, RawPayload, VecSdHeader}; - /// # use simple_someip::protocol::sd::{self, RebootFlag, Flags}; - /// # async fn demo(client: Client) { - /// let header = VecSdHeader { - /// flags: Flags::new_sd(RebootFlag::RecentlyRebooted), - /// entries: vec![], - /// options: vec![], - /// }; - /// let handle = tokio::spawn( - /// client.sd_announcements_loop(header, std::time::Duration::from_secs(1)) - /// ); - /// // ...later: handle.abort() to stop, or let the Client drop naturally. - /// # } - /// ``` - /// - /// # Arguments - /// - /// * `sd_header` — The SD header to send (entries + options). - /// * `interval` — How often to send (e.g. every 1 second). Values below - /// 100ms are clamped to 100ms to prevent tight loops. - pub fn sd_announcements_loop( - &self, - sd_header: ::SdHeader, - interval: std::time::Duration, - ) -> impl core::future::Future + Send + 'static - where - ::SdHeader: Send + 'static, - { - use crate::protocol::sd; - - // Use a WeakSender so this future does NOT keep the control channel - // alive. When all strong Client handles are dropped (shut_down), - // the weak sender will fail to upgrade and the loop exits cleanly. - let weak_sender = self.control_sender.downgrade(); - let target = SocketAddrV4::new(sd::MULTICAST_IP, sd::MULTICAST_PORT); - let interval = interval.max(std::time::Duration::from_millis(100)); - - async move { - // Sleep goes through the `Timer` trait so bare-metal - // consumers can swap in `embassy_time` or similar; today it - // resolves to `TokioTimer`. Note: we use `Timer::sleep` - // repeatedly instead of `tokio::time::interval` because the - // trait has no equivalent of `interval`. The resulting - // cadence is "interval + body time" rather than "interval - // aligned to wall clock"; for SD announcements (a - // best-effort periodic heartbeat) this difference is - // immaterial. A regression test pins the cadence at - // approximately `interval` tolerance. - // - // The first iteration's `sleep` also serves as the initial - // delay so the caller has a chance to finish setup (e.g. - // subscribing) before the first announcement goes out. - let timer = TokioTimer; - let mut count = 0u64; - loop { - timer.sleep(interval).await; - - // Refresh the reboot flag from the client's tracked state - // so long-running announcers transition from RecentlyRebooted - // to Continuous once the session counter wraps. The weak - // sender is upgraded, used to enqueue a single control - // message, then dropped before we await — keeping the - // strong sender alive across awaits would defeat the - // weak-sender shutdown path. - // - // Note: this iteration upgrades the weak sender twice (once - // for `query_reboot_flag`, once for `send_sd`). The user - // could call `shut_down` between them, in which case the - // first upgrade succeeds, the reboot flag arrives, then - // the second upgrade fails — emitting "Client shut down" - // partway through what was logically a single tick. The - // alternative (holding the strong sender across the - // `flag_rx.await`) would defeat the weak-sender shutdown - // path. The mid-tick log is harmless and not worth a - // refactor. - let (flag_rx, flag_msg) = ControlMessage::query_reboot_flag(); - let Some(sender) = weak_sender.upgrade() else { - tracing::info!("Client shut down, stopping SD announcements"); - break; - }; - let enqueue_ok = sender.send(flag_msg).await.is_ok(); - drop(sender); - if !enqueue_ok { - tracing::warn!("SD announcement channel closed, stopping"); - break; - } - let reboot = match flag_rx.await { - Ok(Ok(flag)) => flag, - Ok(Err(e)) => { - // Run loop returned a typed error (e.g. - // `Error::Capacity("request_queue")`). Skip this - // tick and try again next interval — capacity - // pressure is transient. - tracing::warn!( - "SD announcement reboot-flag query returned error ({:?}), skipping tick", - e - ); - continue; - } - Err(_) => { - tracing::warn!("SD announcement reboot-flag query dropped, stopping"); - break; - } - }; - let mut header = sd_header.clone(); - MessageDefinitions::set_reboot_flag(&mut header, reboot); - - let (response, message) = ControlMessage::send_sd(target, header); - - let Some(sender) = weak_sender.upgrade() else { - tracing::info!("Client shut down, stopping SD announcements"); - break; - }; - let send_ok = sender.send(message).await.is_ok(); - drop(sender); - - if !send_ok { - tracing::warn!("SD announcement channel closed, stopping"); - break; - } - - match response.await { - Ok(Ok(())) => { - count += 1; - if count == 1 { - tracing::info!("Sent first client SD announcement"); - } else { - tracing::trace!("Sent {count} client SD announcements"); - } - } - Ok(Err(e)) => { - tracing::error!("Failed to send SD announcement: {e:?}"); - } - Err(_) => { - tracing::warn!("SD announcement response dropped, stopping"); - break; - } - } - } - } + .map_err(|()| Error::Shutdown)?; + response.recv().await.map_err(|_| Error::Shutdown)? } /// Registers a service endpoint in the client's endpoint registry. @@ -765,8 +606,8 @@ where self.control_sender .send(message) .await - .map_err(|_| Error::Shutdown)?; - response.await.map_err(|_| Error::Shutdown)? + .map_err(|()| Error::Shutdown)?; + response.recv().await.map_err(|_| Error::Shutdown)? } /// Removes a service endpoint from the client's endpoint registry. @@ -783,8 +624,8 @@ where self.control_sender .send(message) .await - .map_err(|_| Error::Shutdown)?; - response.await.map_err(|_| Error::Shutdown)? + .map_err(|()| Error::Shutdown)?; + response.recv().await.map_err(|_| Error::Shutdown)? } /// Sends a message to a service and returns a handle to await the response. @@ -816,14 +657,14 @@ where service_id: u16, instance_id: u16, message: crate::protocol::Message, - ) -> Result, Error> { + ) -> Result, Error> { let (send_rx, response_rx, ctrl_msg) = ControlMessage::send_to_service(service_id, instance_id, message); self.control_sender .send(ctrl_msg) .await - .map_err(|_| Error::Shutdown)?; - send_rx.await.map_err(|_| Error::Shutdown)??; + .map_err(|()| Error::Shutdown)?; + send_rx.recv().await.map_err(|_| Error::Shutdown)??; Ok(PendingResponse { receiver: response_rx, }) @@ -859,9 +700,9 @@ where self.control_sender .send(ctrl_msg) .await - .map_err(|_| Error::Shutdown)?; - send_rx.await.map_err(|_| Error::Shutdown)??; - response_rx.await.map_err(|_| Error::Shutdown)? + .map_err(|()| Error::Shutdown)?; + send_rx.recv().await.map_err(|_| Error::Shutdown)??; + response_rx.recv().await.map_err(|_| Error::Shutdown)? } /// Register an E2E profile for the given key. @@ -892,6 +733,150 @@ where } } +/// `sd_announcements_loop` is only available with the `TokioChannels` backend +/// because it requires `tokio::sync::mpsc::Sender::downgrade()` for the +/// weak-sender shutdown pattern. A bare-metal alternative would need a +/// different lifecycle mechanism (phase-future). +impl Client +where + MessageDefinitions: PayloadWireFormat + Clone + std::fmt::Debug + 'static, + R: E2ERegistryHandle, + I: InterfaceHandle, +{ + /// Start periodic SD announcements on the client's discovery socket. + /// + /// Spawns a background task that sends the given SD header to the + /// multicast group at a regular interval. Use this to bundle + /// `FindService` + `OfferService` entries from a single SD identity + /// when the application acts as both client and server. + /// + /// The announcements are sent via the client's SD socket, ensuring + /// they share the same source address as the client's `Subscribe` and + /// `FindService` messages. + /// + /// **Reboot flag auto-refresh:** the SD header's reboot bit is overridden + /// at each tick with the client's currently tracked reboot flag (via + /// [`PayloadWireFormat::set_reboot_flag`]). The reboot bit the caller + /// supplies on `sd_header` is therefore ignored. This ensures the flag + /// transitions from `RecentlyRebooted` to `Continuous` once the session + /// counter wraps past `0xFFFF`, rather than staying stuck on whatever + /// value was baked at call time. + /// + /// Returns an `impl Future + Send + 'static` that the + /// caller drives on their executor (typically via `tokio::spawn`). + /// The loop uses a weak reference to the client's control channel, + /// so it exits automatically when all `Client` handles are dropped + /// (via `shut_down()` or going out of scope). + /// + /// ```no_run + /// # use simple_someip::{Client, RawPayload, VecSdHeader}; + /// # use simple_someip::protocol::sd::{self, RebootFlag, Flags}; + /// # async fn demo(client: Client) { + /// let header = VecSdHeader { + /// flags: Flags::new_sd(RebootFlag::RecentlyRebooted), + /// entries: vec![], + /// options: vec![], + /// }; + /// let handle = tokio::spawn( + /// client.sd_announcements_loop(header, std::time::Duration::from_secs(1)) + /// ); + /// // ...later: handle.abort() to stop, or let the Client drop naturally. + /// # } + /// ``` + /// + /// # Arguments + /// + /// * `sd_header` — The SD header to send (entries + options). + /// * `interval` — How often to send (e.g. every 1 second). Values below + /// 100ms are clamped to 100ms to prevent tight loops. + pub fn sd_announcements_loop( + &self, + sd_header: ::SdHeader, + interval: std::time::Duration, + ) -> impl core::future::Future + Send + 'static + where + ::SdHeader: Send + 'static, + { + use crate::protocol::sd; + use crate::transport::OneshotRecv; + + // Use a WeakSender so this future does NOT keep the control channel + // alive. When all strong Client handles are dropped (shut_down), + // the weak sender will fail to upgrade and the loop exits cleanly. + let weak_sender = self.control_sender.downgrade(); + let target = SocketAddrV4::new(sd::MULTICAST_IP, sd::MULTICAST_PORT); + let interval = interval.max(std::time::Duration::from_millis(100)); + + async move { + let timer = TokioTimer; + let mut count = 0u64; + loop { + timer.sleep(interval).await; + + let (flag_rx, flag_msg) = ControlMessage::::query_reboot_flag(); + let Some(sender) = weak_sender.upgrade() else { + tracing::info!("Client shut down, stopping SD announcements"); + break; + }; + let enqueue_ok = sender.send(flag_msg).await.is_ok(); + drop(sender); + if !enqueue_ok { + tracing::warn!("SD announcement channel closed, stopping"); + break; + } + let reboot = match flag_rx.recv().await { + Ok(Ok(flag)) => flag, + Ok(Err(e)) => { + tracing::warn!( + "SD announcement reboot-flag query returned error ({:?}), skipping tick", + e + ); + continue; + } + Err(_) => { + tracing::warn!("SD announcement reboot-flag query dropped, stopping"); + break; + } + }; + let mut header = sd_header.clone(); + MessageDefinitions::set_reboot_flag(&mut header, reboot); + + let (response, message) = ControlMessage::::send_sd(target, header); + + let Some(sender) = weak_sender.upgrade() else { + tracing::info!("Client shut down, stopping SD announcements"); + break; + }; + let send_ok = sender.send(message).await.is_ok(); + drop(sender); + + if !send_ok { + tracing::warn!("SD announcement channel closed, stopping"); + break; + } + + match response.recv().await { + Ok(Ok(())) => { + count += 1; + if count == 1 { + tracing::info!("Sent first client SD announcement"); + } else { + tracing::trace!("Sent {count} client SD announcements"); + } + } + Ok(Err(e)) => { + tracing::error!("Failed to send SD announcement: {e:?}"); + } + Err(_) => { + tracing::warn!("SD announcement response dropped, stopping"); + break; + } + } + } + } + } +} + #[cfg(test)] mod tests { use super::*; @@ -1074,16 +1059,16 @@ mod tests { #[test] fn test_pending_response_debug() { - let (_tx, rx) = oneshot::channel::>(); - let pending = PendingResponse { receiver: rx }; + let (_tx, rx) = TokioChannels::oneshot::>(); + let pending: PendingResponse = PendingResponse { receiver: rx }; let s = format!("{pending:?}"); assert!(s.contains("PendingResponse")); } #[tokio::test] async fn test_pending_response_resolves_ok() { - let (tx, rx) = oneshot::channel::>(); - let pending = PendingResponse { receiver: rx }; + let (tx, rx) = TokioChannels::oneshot::>(); + let pending: PendingResponse = PendingResponse { receiver: rx }; let payload = TestPayload { header: empty_sd_header(), }; @@ -1094,8 +1079,8 @@ mod tests { #[tokio::test] async fn test_pending_response_resolves_err() { - let (tx, rx) = oneshot::channel::>(); - let pending = PendingResponse { receiver: rx }; + let (tx, rx) = TokioChannels::oneshot::>(); + let pending: PendingResponse = PendingResponse { receiver: rx }; tx.send(Err(Error::ServiceNotFound)).unwrap(); let result = pending.response().await; assert!( diff --git a/src/client/socket_manager.rs b/src/client/socket_manager.rs index f06d625..6239cb6 100644 --- a/src/client/socket_manager.rs +++ b/src/client/socket_manager.rs @@ -20,43 +20,37 @@ //! Concurrency between the two is mandatory and cannot come from the //! same task — hence the `Spawner` hook. //! -//! # What phase 9's `Spawner` does NOT remove from the critical path +//! # Bare-metal readiness status //! -//! `Spawner` abstracts task submission, not runtime primitives. The -//! socket loop still `.await`s on runtime-coupled types every -//! iteration. `no_alloc` bare-metal consumers are still blocked by: +//! **Completed abstractions (Phases 9-11):** +//! - `Spawner` trait (Phase 9): task submission is pluggable. +//! - `E2ERegistryHandle` / `InterfaceHandle` (Phase 10): lock handles +//! abstracted away from `Arc>` / `Arc>`. +//! - `ChannelFactory` (Phase 11): channel primitives abstracted via +//! `TokioChannels` (std) and `EmbassySyncChannels` (bare_metal). //! -//! 1. **`tokio::sync::mpsc` channels** (per-socket: discovery uses -//! 16/16, unicast uses 4/4): heap-allocated + tokio-`Waker`- -//! specific. A `no_alloc` replacement needs a bounded inline-backed -//! channel with executor-agnostic waker registration (e.g. -//! `heapless::mpmc` + a hand-rolled `WakerRegistration`, or -//! `embassy-sync::Channel`). -//! 2. **`tokio::sync::oneshot` for send-acks** (see `SendMessage` -//! below): same problem at smaller scale; ownership restructure -//! is harder than the mpsc swap. -//! 3. **`Arc>`** shared between `Inner` and every -//! socket loop: requires `alloc` + `std::sync`. Collapses to -//! `&RefCell` on a single-task executor, but the -//! type change cascades through every call site. -//! 4. **`F::Socket = TokioSocket`** bound on `bind_*` (this module): -//! RTN-gap, see `bind_discovery_seeded_with_transport` docstring. +//! **Remaining gaps:** +//! - **`F::Socket = TokioSocket`** bound on `bind_*` (Phase 12): +//! RTN-gap, see `bind_discovery_seeded_with_transport` docstring. +//! - **Feature-flag split** (Phase 13): `client` / `server` still +//! pull in tokio + socket2 dependencies. //! -//! Until all four are addressed, enabling `feature = "client"` pulls -//! in `std + tokio + socket2`. The `bare_metal` feature flag is a -//! marker today; it does not make this module `no_alloc`. For `no_alloc` -//! SOME/IP usage today, consume `protocol`, `e2e`, and the `transport` -//! trait layer directly — the `bare_metal` example workspace member -//! demonstrates that surface. +//! Until Phase 13 completes, enabling `feature = "client"` pulls +//! in `std + tokio + socket2`. The `bare_metal` feature flag activates +//! `EmbassySyncChannels` but does not make this module `no_alloc` on +//! its own. For `no_alloc` SOME/IP usage today, consume `protocol`, +//! `e2e`, and the `transport` trait layer directly — the `bare_metal` +//! example workspace member demonstrates that surface. use crate::{ UDP_BUFFER_SIZE, e2e::{E2ECheckStatus, E2EKey}, protocol::{Message, MessageView, sd}, + tokio_transport::TokioChannels, traits::{PayloadWireFormat, WireFormat}, transport::{ - E2ERegistryHandle, ReceivedDatagram, SocketOptions, Spawner, TransportFactory, - TransportSocket, + ChannelFactory, E2ERegistryHandle, MpscRecv, MpscSend, OneshotRecv, OneshotSend, + ReceivedDatagram, SocketOptions, Spawner, TransportFactory, TransportSocket, }, }; @@ -66,7 +60,6 @@ use std::{ net::{Ipv4Addr, SocketAddr, SocketAddrV4}, task::{Context, Poll}, }; -use tokio::sync::mpsc; use tracing::{debug, error, info, trace, warn}; /// A received message together with the source address it came from. @@ -85,28 +78,38 @@ pub struct ReceivedMessage

{ } /// Structure representing a request to send a message -#[derive(Debug)] -pub struct SendMessage { +pub struct SendMessage { pub target_addr: SocketAddrV4, pub message: Message, - response: tokio::sync::oneshot::Sender>, + response: C::OneshotSender>, +} + +impl std::fmt::Debug for SendMessage { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("SendMessage") + .field("target_addr", &self.target_addr) + .field("message", &self.message) + .finish_non_exhaustive() + } } /// One iteration's select-outcome in `socket_loop_future`. The inner /// block returns this scalar so the pinned per-iteration `send_fut` / /// `recv_fut` futures drop before the processing body — releasing their /// `&mut buf` / `&mut socket` borrows. -enum Outcome { - Send(Option>), +enum Outcome { + Send(Option>), Recv(Result), } -impl SendMessage { +impl + SendMessage +{ pub fn new( target_addr: SocketAddrV4, message: Message, - ) -> (tokio::sync::oneshot::Receiver>, Self) { - let (response_tx, response_rx) = tokio::sync::oneshot::channel(); + ) -> (C::OneshotReceiver>, Self) { + let (response_tx, response_rx) = C::oneshot(); ( response_rx, Self { @@ -118,10 +121,9 @@ impl SendMessage { - receiver: mpsc::Receiver, Error>>, - sender: mpsc::Sender>, +pub struct SocketManager { + receiver: C::BoundedReceiver, Error>>, + sender: C::BoundedSender>, local_port: u16, session_id: u16, /// Set to true once `session_id` has wrapped from 0xFFFF → 1. @@ -130,9 +132,19 @@ pub struct SocketManager { session_has_wrapped: bool, } -impl SocketManager +impl std::fmt::Debug for SocketManager { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("SocketManager") + .field("local_port", &self.local_port) + .field("session_id", &self.session_id) + .finish_non_exhaustive() + } +} + +impl SocketManager where - MessageDefinitions: PayloadWireFormat + 'static, + MessageDefinitions: PayloadWireFormat + Send + 'static, + C: ChannelFactory, { /// Bind the SD multicast socket, seeding the session counter and wrap /// state from a previous socket when rebinding. Pass `(1, false)` for a @@ -189,19 +201,16 @@ where /// type-erasure) each carry costs bigger than waiting — see the /// module docstring for the full analysis. /// - /// # Why relaxing this bound alone does NOT unblock `no_alloc` callers + /// # Bare-metal path /// - /// Even with a custom `F::Socket`, this function internally - /// allocates two `tokio::sync::mpsc` channels (capacities 16 and 16) - /// and constructs `tokio::sync::oneshot` instances per send. Both - /// are heap-backed AND tokio-runtime-coupled (their `Waker` - /// plumbing only works inside a tokio reactor task). A `no_alloc` - /// bare-metal consumer cannot use this entry point today regardless - /// of the `F::Socket` bound. The recommended path for `no_alloc` - /// consumers is to bypass `SocketManager` / `Client` entirely and - /// build a small orchestrator directly on top of `protocol`, `e2e`, - /// and the `transport` traits — the `bare_metal` example workspace - /// member demonstrates the trait layer in isolation. + /// Phase 11 abstracted the channel primitives behind + /// [`ChannelFactory`](crate::transport::ChannelFactory). The + /// `bare_metal` feature activates `EmbassySyncChannels` as an + /// alternative to `TokioChannels`. However, this function still + /// requires the `F::Socket = TokioSocket` bound (Phase 12 gap). + /// Once Phase 12 relaxes that bound via GATs and Phase 13 splits + /// the feature flags, a bare-metal consumer can use + /// `SocketManager` directly with a custom socket backend. pub async fn bind_discovery_seeded_with_transport( factory: &F, spawner: &S, @@ -216,8 +225,9 @@ where S: Spawner, R: E2ERegistryHandle, { - let (rx_tx, rx_rx) = mpsc::channel(16); - let (tx_tx, tx_rx) = mpsc::channel(16); + let (rx_tx, rx_rx) = + C::bounded::, Error>, 16>(); + let (tx_tx, tx_rx) = C::bounded::, 16>(); // Control whether multicast packets sent by this socket are looped // back to sockets on the same host — INCLUDING this socket itself. @@ -283,8 +293,9 @@ where S: Spawner, R: E2ERegistryHandle, { - let (rx_tx, rx_rx) = mpsc::channel(4); - let (tx_tx, tx_rx) = mpsc::channel(4); + let (rx_tx, rx_rx) = + C::bounded::, Error>, 4>(); + let (tx_tx, tx_rx) = C::bounded::, 4>(); let options = { let mut o = SocketOptions::new(); @@ -324,16 +335,16 @@ where ); return Err(Error::Capacity("udp_buffer")); } - let (result_channel, message) = SendMessage::new(target_addr, message); - self.sender.send(message).await.map_err(|e| { - error!("Socket error: {e} when attempting to send message"); + let (result_channel, message) = SendMessage::::new(target_addr, message); + self.sender.send(message).await.map_err(|()| { + error!("Socket error when attempting to send message"); Error::SocketClosedUnexpectedly })?; // The socket loop's response sender can be dropped without sending // (executor cancellation, bare-metal `Spawner` that drops futures, // or a panic in the loop). Surface that as a typed error rather // than `.expect`-panicking the caller. - result_channel.await.map_err(|_| { + result_channel.recv().await.map_err(|_| { debug!("send result channel dropped (socket loop gone)"); Error::SocketClosedUnexpectedly })??; @@ -356,7 +367,7 @@ where } pub async fn receive(&mut self) -> Option, Error>> { - self.receiver.recv().await + MpscRecv::recv(&mut self.receiver).await } /// Poll the receiver for a message without blocking. @@ -383,7 +394,7 @@ where .. } = self; drop(sender); - _ = receiver.recv().await; + _ = MpscRecv::recv(&mut receiver).await; } /// Build the I/O loop over a concrete [`TokioSocket`] as a future. @@ -400,8 +411,8 @@ where #[allow(clippy::too_many_lines)] async fn socket_loop_future( socket: crate::tokio_transport::TokioSocket, - rx_tx: mpsc::Sender, Error>>, - mut tx_rx: mpsc::Receiver>, + rx_tx: C::BoundedSender, Error>>, + mut tx_rx: C::BoundedReceiver>, e2e_registry: R, ) { // Maximum number of consecutive `recv_from` errors tolerated before @@ -427,8 +438,8 @@ where // drops both pinned futures — and their `&mut buf` / // `&mut socket` borrows — before the processing body // below runs, so the body can re-borrow `buf` freely. - let outcome: Outcome = { - let send_fut = tx_rx.recv().fuse(); + let outcome: Outcome = { + let send_fut = MpscRecv::recv(&mut tx_rx).fuse(); let recv_fut = socket.recv_from(&mut buf).fuse(); pin_mut!(send_fut, recv_fut); select! { @@ -573,7 +584,7 @@ where }) }) .map_err(Error::from); - if let Ok(()) = rx_tx.send(parse_result).await { + if rx_tx.send(parse_result).await.is_ok() { } else { info!("Socket Dropping"); // The receiver has been dropped, so we should exit @@ -637,13 +648,14 @@ mod tests { #[tokio::test] async fn test_send_message_new() { + use crate::transport::OneshotRecv; let target = SocketAddrV4::new(Ipv4Addr::LOCALHOST, 1234); let msg = Message::new_sd(1, &empty_sd_header()); let (rx, send_msg) = SendMessage::::new(target, msg); assert_eq!(send_msg.target_addr, target); // Verify the oneshot channel works send_msg.response.send(Ok(())).unwrap(); - assert!(rx.await.unwrap().is_ok()); + assert!(rx.recv().await.unwrap().is_ok()); } #[tokio::test] diff --git a/src/lib.rs b/src/lib.rs index 477e43c..199c10d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -170,10 +170,11 @@ pub use e2e::{E2ECheckStatus, E2EKey, E2EProfile}; #[cfg(feature = "server")] pub use server::Server; #[cfg(any(feature = "client", feature = "server"))] -pub use tokio_transport::{TokioSocket, TokioSpawner, TokioTimer, TokioTransport}; +pub use tokio_transport::{TokioChannels, TokioSocket, TokioSpawner, TokioTimer, TokioTransport}; pub use transport::{ - E2ERegistryHandle, InterfaceHandle, IoErrorKind, ReceivedDatagram, SocketOptions, Spawner, - Timer, TransportError, TransportFactory, TransportSocket, + ChannelFactory, E2ERegistryHandle, InterfaceHandle, IoErrorKind, MpscRecv, MpscSend, + OneshotCancelled, OneshotRecv, OneshotSend, ReceivedDatagram, SocketOptions, Spawner, Timer, + TransportError, TransportFactory, TransportSocket, UnboundedRecv, UnboundedSend, }; #[cfg(feature = "server")] pub use server::SubscriptionHandle; diff --git a/src/tokio_transport.rs b/src/tokio_transport.rs index c363f3c..7a764af 100644 --- a/src/tokio_transport.rs +++ b/src/tokio_transport.rs @@ -43,8 +43,9 @@ use crate::e2e::{E2ECheckStatus, E2EKey, E2EProfile}; use crate::e2e::Error as E2EError; use crate::e2e::E2ERegistry; use crate::transport::{ - E2ERegistryHandle, InterfaceHandle, IoErrorKind, ReceivedDatagram, SocketOptions, Timer, - TransportError, TransportFactory, TransportSocket, + ChannelFactory, E2ERegistryHandle, InterfaceHandle, IoErrorKind, MpscRecv, MpscSend, + OneshotCancelled, OneshotRecv, OneshotSend, ReceivedDatagram, SocketOptions, Timer, + TransportError, TransportFactory, TransportSocket, UnboundedRecv, UnboundedSend, }; /// Factory that binds [`TokioSocket`]s configured via `socket2`. @@ -325,6 +326,268 @@ fn map_io_error(e: &std::io::Error) -> TransportError { mapped } +// ── TokioChannels ───────────────────────────────────────────────────────── + +/// [`ChannelFactory`] implementation backed by `tokio::sync::mpsc` and +/// `tokio::sync::oneshot`. This is the default channel backend for `std + +/// tokio` builds (active when the `client` or `server` feature is enabled). +#[derive(Clone, Copy)] +pub struct TokioChannels; + +// Newtype wrappers are needed because Rust does not allow implementing a +// foreign trait on a foreign type (orphan rule). Wrapping the tokio receiver +// types lets us impl OneshotRecv / UnboundedRecv on them. + +/// Newtype wrapping `tokio::sync::oneshot::Receiver` to implement +/// [`OneshotRecv`]. +pub struct TokioOneshotReceiver(pub(crate) tokio::sync::oneshot::Receiver); + +/// Newtype wrapping `tokio::sync::mpsc::UnboundedReceiver` to implement +/// [`UnboundedRecv`]. +pub struct TokioUnboundedReceiver(pub(crate) tokio::sync::mpsc::UnboundedReceiver); + +impl OneshotSend for tokio::sync::oneshot::Sender { + fn send(self, value: T) -> Result<(), T> { + tokio::sync::oneshot::Sender::send(self, value) + } +} + +impl OneshotRecv for TokioOneshotReceiver { + async fn recv(self) -> Result { + self.0.await.map_err(|_| OneshotCancelled) + } +} + +impl MpscSend for tokio::sync::mpsc::Sender { + async fn send(&self, value: T) -> Result<(), ()> { + tokio::sync::mpsc::Sender::send(self, value).await.map_err(|_| ()) + } +} + +impl MpscRecv for tokio::sync::mpsc::Receiver { + fn recv(&mut self) -> impl Future> + Send + '_ { + self.recv() + } + + fn poll_recv(&mut self, cx: &mut core::task::Context<'_>) -> core::task::Poll> { + self.poll_recv(cx) + } +} + +impl UnboundedSend for tokio::sync::mpsc::UnboundedSender { + fn send_now(&self, value: T) -> Result<(), T> { + self.send(value).map_err(|e| e.0) + } +} + +impl UnboundedRecv for TokioUnboundedReceiver { + fn recv(&mut self) -> impl Future> + Send + '_ { + self.0.recv() + } +} + +impl ChannelFactory for TokioChannels { + type OneshotSender = tokio::sync::oneshot::Sender; + type OneshotReceiver = TokioOneshotReceiver; + fn oneshot() -> (Self::OneshotSender, Self::OneshotReceiver) { + let (tx, rx) = tokio::sync::oneshot::channel(); + (tx, TokioOneshotReceiver(rx)) + } + + type BoundedSender = tokio::sync::mpsc::Sender; + type BoundedReceiver = tokio::sync::mpsc::Receiver; + fn bounded( + ) -> (Self::BoundedSender, Self::BoundedReceiver) { + tokio::sync::mpsc::channel(N) + } + + type UnboundedSender = tokio::sync::mpsc::UnboundedSender; + type UnboundedReceiver = TokioUnboundedReceiver; + fn unbounded() -> (Self::UnboundedSender, Self::UnboundedReceiver) { + let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); + (tx, TokioUnboundedReceiver(rx)) + } +} + +// ── EmbassySyncChannels ─────────────────────────────────────────────────── +// +// [`ChannelFactory`] backed by `embassy-sync::channel::Channel`. Active when +// the `bare_metal` feature is enabled. Both sender and receiver hold an +// `Arc>` so the channel state lives on the heap — this is +// the `std + alloc` path. A future no_alloc port (Phase 16) would store +// the channel in a `static` and use borrowed `Sender` / `Receiver` handles +// with `'static` lifetimes instead. + +#[cfg(feature = "bare_metal")] +pub use embassy_channels::{ + EmbassySyncBoundedReceiver, EmbassySyncBoundedSender, EmbassySyncChannels, + EmbassySyncOneshotReceiver, EmbassySyncOneshotSender, EmbassySyncUnboundedReceiver, + EmbassySyncUnboundedSender, +}; + +#[cfg(feature = "bare_metal")] +mod embassy_channels { + use super::*; + use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex; + use embassy_sync::channel::Channel; + use std::sync::Arc; + + // ── Oneshot (capacity-1 Channel) ────────────────────────────────────── + + pub struct EmbassySyncOneshotSender( + Arc>, + ); + + pub struct EmbassySyncOneshotReceiver( + Arc>, + ); + + impl OneshotSend for EmbassySyncOneshotSender { + fn send(self, value: T) -> Result<(), T> { + self.0.try_send(value).map_err(|e| match e { + embassy_sync::channel::TrySendError::Full(v) => v, + }) + } + } + + impl OneshotRecv for EmbassySyncOneshotReceiver { + fn recv(self) -> impl Future> + Send { + let chan = self.0; + async move { Ok(chan.receive().await) } + } + } + + // ── Bounded MPSC ────────────────────────────────────────────────────── + + pub struct EmbassySyncBoundedSender( + Arc>, + ); + + pub struct EmbassySyncBoundedReceiver( + Arc>, + ); + + impl Clone for EmbassySyncBoundedSender { + fn clone(&self) -> Self { + Self(self.0.clone()) + } + } + + impl MpscSend for EmbassySyncBoundedSender { + fn send(&self, value: T) -> impl Future> + Send + '_ { + let chan = self.0.clone(); + async move { + chan.send(value).await; + Ok(()) + } + } + } + + impl MpscRecv for EmbassySyncBoundedReceiver { + fn recv(&mut self) -> impl Future> + Send + '_ { + let chan = self.0.clone(); + async move { Some(chan.receive().await) } + } + + fn poll_recv( + &mut self, + cx: &mut core::task::Context<'_>, + ) -> core::task::Poll> { + use core::pin::Pin; + // Try non-blocking receive first. + if let Ok(val) = self.0.try_receive() { + return core::task::Poll::Ready(Some(val)); + } + // Channel is empty. Poll a ReceiveFuture to register the waker. + // SAFETY: `fut` is created, pinned (stack-only), polled once, then + // dropped immediately. No references to `fut` escape this scope. + let mut fut = self.0.receive(); + // SAFETY: ReceiveFuture borrows self.0 (via Arc) — not self — and + // is not moved after this pin. The Arc ensures the channel outlives + // the future. + let pinned = unsafe { Pin::new_unchecked(&mut fut) }; + match pinned.poll(cx) { + core::task::Poll::Ready(val) => core::task::Poll::Ready(Some(val)), + core::task::Poll::Pending => core::task::Poll::Pending, + } + } + } + + // ── Unbounded (large-capacity) MPSC ────────────────────────────────── + + // Embassy-sync has no truly unbounded channel; we use a large capacity + // (128) as a practical substitute for the client's update channel. + const UNBOUNDED_CAP: usize = 128; + + pub struct EmbassySyncUnboundedSender( + Arc>, + ); + + pub struct EmbassySyncUnboundedReceiver( + Arc>, + ); + + impl Clone for EmbassySyncUnboundedSender { + fn clone(&self) -> Self { + Self(self.0.clone()) + } + } + + impl UnboundedSend for EmbassySyncUnboundedSender { + fn send_now(&self, value: T) -> Result<(), T> { + self.0.try_send(value).map_err(|e| match e { + embassy_sync::channel::TrySendError::Full(v) => v, + }) + } + } + + impl UnboundedRecv for EmbassySyncUnboundedReceiver { + fn recv(&mut self) -> impl Future> + Send + '_ { + let chan = self.0.clone(); + async move { Some(chan.receive().await) } + } + } + + // ── ChannelFactory impl ─────────────────────────────────────────────── + + /// [`ChannelFactory`] backed by `embassy-sync::channel::Channel`. + /// + /// The `Arc>` allocation makes this suitable for + /// `std + alloc` bare-metal builds. A future no_alloc port stores the + /// channel in a `static` and works with borrowed handles. + #[derive(Clone, Copy)] + pub struct EmbassySyncChannels; + + impl ChannelFactory for EmbassySyncChannels { + type OneshotSender = EmbassySyncOneshotSender; + type OneshotReceiver = EmbassySyncOneshotReceiver; + fn oneshot() -> (Self::OneshotSender, Self::OneshotReceiver) { + let chan = Arc::new(Channel::new()); + (EmbassySyncOneshotSender(chan.clone()), EmbassySyncOneshotReceiver(chan)) + } + + type BoundedSender = EmbassySyncBoundedSender; + type BoundedReceiver = EmbassySyncBoundedReceiver; + fn bounded( + ) -> (Self::BoundedSender, Self::BoundedReceiver) { + // The const N from the trait call site is ignored here — embassy-sync + // requires the capacity to be known at the impl level, not the call + // site. All bounded channels use capacity 16, which covers the + // worst case (discovery socket, which uses 16). + let chan: Arc> = Arc::new(Channel::new()); + (EmbassySyncBoundedSender(chan.clone()), EmbassySyncBoundedReceiver(chan)) + } + + type UnboundedSender = EmbassySyncUnboundedSender; + type UnboundedReceiver = EmbassySyncUnboundedReceiver; + fn unbounded( + ) -> (Self::UnboundedSender, Self::UnboundedReceiver) { + let chan = Arc::new(Channel::new()); + (EmbassySyncUnboundedSender(chan.clone()), EmbassySyncUnboundedReceiver(chan)) + } + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/transport.rs b/src/transport.rs index aa3ab67..6693c28 100644 --- a/src/transport.rs +++ b/src/transport.rs @@ -660,6 +660,130 @@ pub trait InterfaceHandle: Clone + Send + Sync + 'static { fn set(&self, addr: Ipv4Addr); } +// ── Channel-handle abstraction (Phase 11) ───────────────────────────────── +// +// `ChannelFactory` and its associated sender / receiver traits replace direct +// use of `tokio::sync::mpsc` and `tokio::sync::oneshot` in the client. +// `TokioChannels` (in `tokio_transport`) is the default for `std + tokio` +// builds; `EmbassySyncChannels` (in `tokio_transport`, gated behind +// `bare_metal`) is the alternative for no-tokio / no_std builds. + +/// Returned by [`OneshotRecv::recv`] when the sender was dropped before +/// sending a value. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct OneshotCancelled; + +impl core::fmt::Display for OneshotCancelled { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + f.write_str("oneshot sender dropped before sending a value") + } +} + +/// The send half of a oneshot channel. Consuming: a value can be sent exactly +/// once. +pub trait OneshotSend: Send + 'static { + /// Send `value` through the channel. + /// + /// # Errors + /// + /// Returns `Err(value)` if the receiver was already dropped. + fn send(self, value: T) -> Result<(), T>; +} + +/// The receive half of a oneshot channel. Resolves once the sender delivers a +/// value, or returns [`OneshotCancelled`] if the sender is dropped first. +pub trait OneshotRecv: Send + 'static { + /// Await the value. Consumes self — a oneshot receiver can only be awaited + /// once. + fn recv(self) -> impl core::future::Future> + Send; +} + +/// The send half of a bounded MPSC channel. +/// +/// Implementations must be [`Clone`] so that multiple producers can share the +/// same channel (e.g. the `Client` handle is `Clone` and every clone must be +/// able to send control messages to `Inner`). +pub trait MpscSend: Clone + Send + 'static { + /// Send `value`, waiting if the channel is full. Returns `Err(())` if the + /// receiver was dropped. + fn send(&self, value: T) -> impl core::future::Future> + Send + '_; +} + +/// The receive half of a bounded MPSC channel. +pub trait MpscRecv: Send + 'static { + /// Receive the next value, waiting if the channel is empty. Returns `None` + /// if all senders were dropped and the channel is empty. + fn recv(&mut self) -> impl core::future::Future> + Send + '_; + + /// Poll the channel without blocking. Used by `receive_any_unicast` to + /// multiplex across several socket channels in a single `poll_fn` pass. + fn poll_recv( + &mut self, + cx: &mut core::task::Context<'_>, + ) -> core::task::Poll>; +} + +/// The send half of an unbounded MPSC channel. +/// +/// Unlike [`MpscSend`], sending never blocks — the implementation must buffer +/// arbitrarily many values (or, for embassy-sync, use a large finite capacity +/// that is treated as effectively unbounded). +pub trait UnboundedSend: Clone + Send + 'static { + /// Send `value` without blocking. + /// + /// # Errors + /// + /// Returns `Err(value)` if the receiver was dropped. + fn send_now(&self, value: T) -> Result<(), T>; +} + +/// The receive half of an unbounded MPSC channel. +pub trait UnboundedRecv: Send + 'static { + /// Receive the next value, waiting if the channel is empty. Returns `None` + /// if all senders were dropped and the channel is empty. + fn recv(&mut self) -> impl core::future::Future> + Send + '_; +} + +/// A zero-sized factory that creates channel pairs used by the client's +/// internal transport. +/// +/// Abstracting over both `tokio::sync::mpsc` / `oneshot` (std path) and +/// `embassy-sync::channel::Channel` (bare-metal path) behind a single trait +/// lets `Client` / `Inner` / `SocketManager` compile without a tokio +/// dependency when `bare_metal` is active and `tokio` is not. +/// +/// The three channel families: +/// - **oneshot** — single-shot rendezvous, capacity 1. Used for command +/// completion callbacks inside [`ControlMessage`](crate::client). +/// - **bounded** — finite-capacity MPSC queue. Used for the control channel +/// and per-socket send / receive queues. +/// - **unbounded** — notionally unbounded MPSC queue (embassy-sync +/// implementations use a large-capacity channel). Used for the +/// `ClientUpdate` stream from `Inner` to `Client`. +pub trait ChannelFactory: Clone + Send + Sync + 'static { + /// Oneshot sender type. + type OneshotSender: OneshotSend; + /// Oneshot receiver type. + type OneshotReceiver: OneshotRecv; + /// Create a oneshot channel pair. + fn oneshot() -> (Self::OneshotSender, Self::OneshotReceiver); + + /// Bounded-channel sender type. + type BoundedSender: MpscSend; + /// Bounded-channel receiver type. + type BoundedReceiver: MpscRecv; + /// Create a bounded channel with capacity `N`. + fn bounded( + ) -> (Self::BoundedSender, Self::BoundedReceiver); + + /// Unbounded-channel sender type. + type UnboundedSender: UnboundedSend; + /// Unbounded-channel receiver type. + type UnboundedReceiver: UnboundedRecv; + /// Create an unbounded channel. + fn unbounded() -> (Self::UnboundedSender, Self::UnboundedReceiver); +} + #[cfg(test)] mod tests { //! The traits are pure interfaces — these tests only verify that