diff --git a/examples/bare_metal/src/main.rs b/examples/bare_metal/src/main.rs index 77ec950..ff88e97 100644 --- a/examples/bare_metal/src/main.rs +++ b/examples/bare_metal/src/main.rs @@ -45,29 +45,29 @@ //! //! The example exercises the **trait layer** (`TransportSocket`, //! `TransportFactory`, `Timer`, `Spawner`, `ChannelFactory`) — and -//! that is all. It does NOT demonstrate a no_alloc integration with +//! that is all. It does NOT demonstrate a `no_alloc` integration with //! `simple_someip::Client` / `simple_someip::Server`, because those -//! are not yet no_alloc-compatible. +//! are not yet `no_alloc`-compatible. //! //! **Completed abstractions:** //! - Phase 9: `Spawner` trait (task submission) //! - Phase 10: `E2ERegistryHandle` / `InterfaceHandle` (lock handles) //! - Phase 11: `ChannelFactory` trait with `TokioChannels` (std) and -//! `EmbassySyncChannels` (bare_metal) backends — replaces direct +//! `EmbassySyncChannels` (`bare_metal`) backends — replaces direct //! `tokio::sync::mpsc` / `oneshot` usage +//! - Phase 12: `TransportSocket` GATs — `SendFuture` / `RecvFuture` +//! express `Send` bounds without RTN; `Socket = TokioSocket` pin +//! removed from `bind_*` functions //! //! **Remaining gaps:** -//! 1. **`F::Socket = TokioSocket`** bound on `bind_*`: a phase-5 -//! compromise because stable Rust Return-Type Notation is still -//! nightly. Phase 12 relaxes this via GATs. -//! 2. **Feature-flag split** (Phase 13): `client` / `server` still +//! 1. **Feature-flag split** (Phase 13): `client` / `server` still //! pull in tokio + socket2. A future split (`client` vs -//! `client-tokio`) will make the core types no_std-compatible. +//! `client-tokio`) will make the core types `no_std`-compatible. //! //! Until those are closed, `feature = "client"` / `feature = "server"` //! pull in `std + tokio + socket2`. //! -//! # Recommendation for no_alloc consumers today +//! # Recommendation for `no_alloc` consumers today //! //! Do NOT route through `Client::new_with_spawner_and_loopback`. //! Instead, depend on `simple-someip` with `default-features = false, @@ -87,7 +87,7 @@ //! `TransportSocket::recv_from` / `Timer::sleep` directly. That is //! the shape the trait layer was designed for; the `Client` / //! `Server` types are a std+tokio convenience layer on top that -//! happens not to suit no_alloc targets yet. +//! happens not to suit `no_alloc` targets yet. use core::future::Future; use core::net::{Ipv4Addr, SocketAddrV4}; @@ -140,49 +140,81 @@ impl TransportFactory for MockFactory { } } -impl TransportSocket for MockSocket { - fn send_to( - &self, - buf: &[u8], - target: SocketAddrV4, - ) -> impl Future> { - let bytes = buf.to_vec(); - let pipe = Arc::clone(&self.pipe); - async move { - pipe.send_queue.lock().unwrap().push_back((bytes, target)); - Ok(()) +/// Future returned by [`MockSocket::send_to`]. Defers the queue push +/// to poll-time so the side effect happens when the future is awaited, +/// not when `send_to` is called — matching what a real bare-metal +/// `TransportSocket` impl would do (the network driver only sees the +/// datagram when the executor polls the future). +struct MockSendFut { + pipe: Arc, + bytes: Option>, + target: SocketAddrV4, +} + +impl Future for MockSendFut { + type Output = Result<(), TransportError>; + + fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll { + let me = self.get_mut(); + if let Some(bytes) = me.bytes.take() { + me.pipe.send_queue.lock().unwrap().push_back((bytes, me.target)); } + Poll::Ready(Ok(())) } +} - fn recv_from( - &self, - buf: &mut [u8], - ) -> impl Future> { - // Read synchronously before the async block so we don't have to - // capture `buf` across the `.await` boundary. If the queue is - // empty, return a ready `Err(TimedOut)` rather than a pending - // future. A production bare-metal impl would instead register - // the `Context`'s `Waker` on the network driver's RX-ready - // signal and return `Poll::Pending` so the executor can park - // the task — see e.g. `embassy_net::UdpSocket` or smoltcp's - // socket polling model. In this single-threaded example we - // always send first then recv, so the timeout branch is - // unreachable here. - let result = { - let mut q = self.pipe.recv_queue.lock().unwrap(); - q.pop_front() - }; - match result { +/// Future returned by [`MockSocket::recv_from`]. Reads from the queue +/// on poll. A production bare-metal impl would instead register the +/// `Context`'s `Waker` on the network driver's RX-ready signal and +/// return `Poll::Pending` when the queue is empty — see e.g. +/// `embassy_net::UdpSocket` or smoltcp's socket polling model. This +/// mock returns `Err(TimedOut)` on empty for simplicity; the demo +/// always sends before recv-ing so the empty branch is unreachable. +struct MockRecvFut<'a> { + pipe: Arc, + buf: &'a mut [u8], +} + +impl Future for MockRecvFut<'_> { + type Output = Result; + + fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll { + let me = self.get_mut(); + let entry = me.pipe.recv_queue.lock().unwrap().pop_front(); + Poll::Ready(match entry { Some((bytes, source)) => { - let n = bytes.len().min(buf.len()); - buf[..n].copy_from_slice(&bytes[..n]); - core::future::ready(Ok(ReceivedDatagram { + let n = bytes.len().min(me.buf.len()); + me.buf[..n].copy_from_slice(&bytes[..n]); + Ok(ReceivedDatagram { bytes_received: n, source, truncated: n < bytes.len(), - })) + }) } - None => core::future::ready(Err(TransportError::Io(IoErrorKind::TimedOut))), + None => Err(TransportError::Io(IoErrorKind::TimedOut)), + }) + } +} + +impl TransportSocket for MockSocket { + type SendFuture<'a> = MockSendFut; + type RecvFuture<'a> = MockRecvFut<'a>; + + fn send_to<'a>(&'a self, buf: &'a [u8], target: SocketAddrV4) -> Self::SendFuture<'a> { + // `buf` cannot be borrowed past this call (its lifetime is + // bounded by the borrow checker, not the future), so we copy + // here. The push to the shared queue is deferred to `poll`. + MockSendFut { + pipe: Arc::clone(&self.pipe), + bytes: Some(buf.to_vec()), + target, + } + } + + fn recv_from<'a>(&'a self, buf: &'a mut [u8]) -> Self::RecvFuture<'a> { + MockRecvFut { + pipe: Arc::clone(&self.pipe), + buf, } } @@ -278,7 +310,7 @@ impl simple_someip::transport::Spawner for WorkingSpawner { /// interrupts; this helper exists only to drive the demo's /// synchronous mock futures (which resolve on the first poll). /// -/// For a real no_alloc `block_on`, see e.g. `embassy_executor::block_on`, +/// For a real `no_alloc` `block_on`, see e.g. `embassy_executor::block_on`, /// the `cassette` crate, or roll your own around a hardware-timer-driven /// `Waker`. The `Future::poll` loop body below is the part that stays /// the same; only the `Waker` plumbing and yield strategy change. @@ -374,11 +406,8 @@ fn main() { ); println!( "note: trait layer (TransportSocket + TransportFactory + Timer + \ - Spawner) exercised end-to-end. For a no_alloc SOME/IP client \ - today, build your own orchestrator on `protocol` + `e2e` + these \ - traits — do NOT route through `Client::new_with_spawner_and_loopback`: \ - the Client internals still depend on tokio::sync::mpsc/oneshot, \ - Arc>, and an F::Socket=TokioSocket bound (RTN). \ - See top-of-file docblock for the full blocker list." + Spawner + ChannelFactory) exercised end-to-end. Phases 9-12 \ + complete. Remaining gap: client/server feature flags still pull \ + in tokio + socket2 (Phase 13). See top-of-file docblock." ); } diff --git a/src/client/socket_manager.rs b/src/client/socket_manager.rs index 6239cb6..f79c2b6 100644 --- a/src/client/socket_manager.rs +++ b/src/client/socket_manager.rs @@ -22,16 +22,17 @@ //! //! # Bare-metal readiness status //! -//! **Completed abstractions (Phases 9-11):** +//! **Completed abstractions (Phases 9-12):** //! - `Spawner` trait (Phase 9): task submission is pluggable. //! - `E2ERegistryHandle` / `InterfaceHandle` (Phase 10): lock handles //! abstracted away from `Arc>` / `Arc>`. //! - `ChannelFactory` (Phase 11): channel primitives abstracted via -//! `TokioChannels` (std) and `EmbassySyncChannels` (bare_metal). +//! `TokioChannels` (std) and `EmbassySyncChannels` (`bare_metal`). +//! - `TransportSocket` GATs (Phase 12): `Socket = TokioSocket` pin +//! removed; `SendFuture` / `RecvFuture` associated types express +//! `Send` bounds for spawnable socket loops. //! //! **Remaining gaps:** -//! - **`F::Socket = TokioSocket`** bound on `bind_*` (Phase 12): -//! RTN-gap, see `bind_discovery_seeded_with_transport` docstring. //! - **Feature-flag split** (Phase 13): `client` / `server` still //! pull in tokio + socket2 dependencies. //! @@ -190,27 +191,35 @@ where /// and submits the socket's I/O loop through a caller-supplied /// [`Spawner`]. /// - /// # Why `F::Socket` is still pinned to `TokioSocket` + /// # Socket bounds /// - /// The factory must still produce a - /// [`TokioSocket`](crate::tokio_transport::TokioSocket). Generalizing - /// to any `TransportSocket` requires stable-Rust Return-Type Notation - /// (RFC 3654) to express `Send` bounds on the trait's RPITIT methods - /// at this call site. RTN is nightly-only as of this writing; the - /// alternatives (GATs on `TransportSocket`, or boxed-future - /// type-erasure) each carry costs bigger than waiting — see the - /// module docstring for the full analysis. + /// Phase 12 relaxed the previous `F::Socket = TokioSocket` pin by + /// switching [`TransportSocket`] to GATs. The factory's socket type + /// must now satisfy: + /// + /// - `Send + Sync + 'static` — so the socket loop future can be + /// spawned on a multithreaded executor and outlive its owner. + /// - `for<'a> SendFuture<'a>: Send` and `for<'a> RecvFuture<'a>: Send` + /// — the named GAT futures must themselves be `Send` so the + /// spawned loop crosses thread boundaries cleanly. The `for<'a>` + /// higher-ranked bound expresses "for any borrow lifetime" without + /// needing nightly-only Return-Type Notation (RFC 3654). + /// + /// Stable Rust cannot express `Send` bounds on the anonymous future + /// types of `async fn` trait methods at use sites, which is why + /// Phase 12 chose named associated types over RPITIT. See + /// [`TransportSocket::SendFuture`](crate::transport::TransportSocket::SendFuture). /// /// # Bare-metal path /// /// Phase 11 abstracted the channel primitives behind /// [`ChannelFactory`](crate::transport::ChannelFactory). The /// `bare_metal` feature activates `EmbassySyncChannels` as an - /// alternative to `TokioChannels`. However, this function still - /// requires the `F::Socket = TokioSocket` bound (Phase 12 gap). - /// Once Phase 12 relaxes that bound via GATs and Phase 13 splits - /// the feature flags, a bare-metal consumer can use - /// `SocketManager` directly with a custom socket backend. + /// alternative to `TokioChannels`. With Phase 12's relaxed socket + /// bound, a bare-metal consumer can now supply their own + /// `TransportSocket` impl (e.g. wrapping `embassy_net::udp::UdpSocket`) + /// as long as it is `Send + Sync + 'static` and its `SendFuture` / + /// `RecvFuture` GAT projections are `Send` for every borrow lifetime. pub async fn bind_discovery_seeded_with_transport( factory: &F, spawner: &S, @@ -221,7 +230,10 @@ where multicast_loopback: bool, ) -> Result where - F: TransportFactory, + F: TransportFactory, + F::Socket: Send + Sync + 'static, + for<'a> ::SendFuture<'a>: Send, + for<'a> ::RecvFuture<'a>: Send, S: Spawner, R: E2ERegistryHandle, { @@ -279,9 +291,15 @@ where /// Variant of [`Self::bind`] that constructs the underlying socket /// through a caller-supplied [`TransportFactory`] and submits the - /// socket's I/O loop through a caller-supplied [`Spawner`]. See - /// [`Self::bind_discovery_seeded_with_transport`] for the factory - /// bound rationale. + /// socket's I/O loop through a caller-supplied [`Spawner`]. + /// + /// # Generic bounds + /// + /// The factory's socket must be `Send + Sync + 'static` and its async + /// methods must return `Send` futures so the socket loop can be + /// spawned onto a multithreaded executor. See + /// [`TransportSocket::SendFuture`](crate::transport::TransportSocket::SendFuture) + /// for background on the GAT approach. pub async fn bind_with_transport( factory: &F, spawner: &S, @@ -289,7 +307,10 @@ where e2e_registry: R, ) -> Result where - F: TransportFactory, + F: TransportFactory, + F::Socket: Send + Sync + 'static, + for<'a> ::SendFuture<'a>: Send, + for<'a> ::RecvFuture<'a>: Send, S: Spawner, R: E2ERegistryHandle, { @@ -397,24 +418,40 @@ where _ = MpscRecv::recv(&mut receiver).await; } - /// Build the I/O loop over a concrete [`TokioSocket`] as a future. - /// Callers are expected to `tokio::spawn` this future alongside - /// [`Self`]; the socket loop runs concurrently with its owner so + /// Build the I/O loop over any [`TransportSocket`] as a future. + /// Callers are expected to spawn this future alongside [`Self`]; + /// the socket loop runs concurrently with its owner so /// `SocketManager::send`'s internal oneshot wait can complete. /// The reasoning for why the spawn hasn't been hoisted is in the /// module-level docs. /// - /// The function remains tied to `TokioSocket` concretely because - /// generalizing it to `T: TransportSocket` needs stable-Rust - /// return-type notation to express `Send` bounds on the trait's - /// RPITIT methods — still nightly as of this writing. + /// # `Send` bounds + /// + /// The returned future must be `Send + 'static` for `Spawner::spawn`. + /// This works on stable Rust (no RTN required) because: + /// - `T: Send + Sync + 'static` makes the captured socket `Send`. + /// - The HRTBs `for<'a> T::SendFuture<'a>: Send` and + /// `for<'a> T::RecvFuture<'a>: Send` make the GAT-projected futures + /// `Send` for every borrow lifetime, which is what propagates + /// `Send` to the enclosing `async` block. + /// - All other captured state (`buf`, channels, registry) is `Send`. + /// + /// Bare-metal `TransportSocket` impls must ensure their `SendFuture` + /// and `RecvFuture` associated types are `Send` (e.g. by avoiding + /// `Rc` / `RefCell` in the future state) for this to compile. #[allow(clippy::too_many_lines)] - async fn socket_loop_future( - socket: crate::tokio_transport::TokioSocket, + async fn socket_loop_future( + socket: T, rx_tx: C::BoundedSender, Error>>, mut tx_rx: C::BoundedReceiver>, e2e_registry: R, - ) { + ) + where + T: TransportSocket + Send + Sync + 'static, + for<'a> T::SendFuture<'a>: Send, + for<'a> T::RecvFuture<'a>: Send, + R: E2ERegistryHandle, + { // Maximum number of consecutive `recv_from` errors tolerated before // the socket loop gives up. A single failure (transient I/O, peer // RST, ICMP port-unreachable amplified into `ConnectionRefused`) @@ -1032,6 +1069,115 @@ mod tests { assert_eq!(view.header().message_id(), crate::protocol::MessageId::SD); } + /// Phase 12 witness: proves `bind_with_transport` accepts a factory + /// whose `Socket` type is **not** `TokioSocket`. The Phase 12 gate + /// (no `F::Socket = TokioSocket` pin) is a type-system claim, and + /// without this test the trait surface could regress to a Tokio + /// pin in a future phase without any test catching it. The + /// existing `bind_with_transport_*` tests both hardcode + /// `type Socket = TokioSocket`, which only covers the previous + /// pinned-bound shape. + /// + /// `WrappedSocket` is a transparent newtype around `TokioSocket` + /// with its own `TransportSocket` impl — the *type identity* is + /// what matters for this test, not the behavior. The end-to-end + /// send-and-verify confirms the spawned I/O loop also carries + /// through the wrapper, not just the bind call. + #[tokio::test] + async fn bind_with_transport_accepts_non_tokio_socket_type() { + use crate::tokio_transport::{TokioSocket, TokioTransport}; + use crate::transport::TransportError; + use core::future::Future; + + struct WrappedSocket(TokioSocket); + + impl TransportSocket for WrappedSocket { + // Borrow the inner socket's named GAT futures; this keeps + // the wrapper zero-overhead while still exercising a + // distinct `Self::Socket` type at the bind call site. + type SendFuture<'a> = ::SendFuture<'a>; + type RecvFuture<'a> = ::RecvFuture<'a>; + + fn send_to<'a>( + &'a self, + buf: &'a [u8], + target: SocketAddrV4, + ) -> Self::SendFuture<'a> { + self.0.send_to(buf, target) + } + fn recv_from<'a>(&'a self, buf: &'a mut [u8]) -> Self::RecvFuture<'a> { + self.0.recv_from(buf) + } + fn local_addr(&self) -> Result { + self.0.local_addr() + } + fn join_multicast_v4( + &self, + group: Ipv4Addr, + iface: Ipv4Addr, + ) -> Result<(), TransportError> { + self.0.join_multicast_v4(group, iface) + } + fn leave_multicast_v4( + &self, + group: Ipv4Addr, + iface: Ipv4Addr, + ) -> Result<(), TransportError> { + self.0.leave_multicast_v4(group, iface) + } + } + + struct WrappingFactory; + impl TransportFactory for WrappingFactory { + type Socket = WrappedSocket; + fn bind( + &self, + addr: SocketAddrV4, + options: &SocketOptions, + ) -> impl Future> { + let opts = *options; + async move { + let inner = TokioTransport.bind(addr, &opts).await?; + Ok(WrappedSocket(inner)) + } + } + } + + // Compile-time witness: this `let` binding only typechecks if + // `bind_with_transport` accepts `F::Socket = WrappedSocket` — + // i.e. the previous `F::Socket = TokioSocket` pin is gone. + let mut sm = SocketManager::::bind_with_transport( + &WrappingFactory, + &TokioSpawner, + 0, + test_registry(), + ) + .await + .expect("bind via wrapping factory"); + let sm_port = sm.port(); + + // Runtime witness: traffic flows through the wrapper's + // `send_to` and the spawned I/O loop's `recv_from`. + let recv = UdpSocket::bind("127.0.0.1:0").await.unwrap(); + let recv_port = recv.local_addr().unwrap().port(); + + let msg = Message::::new_sd(1, &empty_sd_header()); + sm.send(SocketAddrV4::new(Ipv4Addr::LOCALHOST, recv_port), msg) + .await + .expect("send via wrapping factory"); + + let mut buf = [0u8; UDP_BUFFER_SIZE]; + let (len, _from) = + tokio::time::timeout(std::time::Duration::from_secs(2), recv.recv_from(&mut buf)) + .await + .expect("timed out waiting for datagram") + .expect("recv failed"); + assert!(len > 0, "empty datagram"); + let view = MessageView::parse(&buf[..len]).unwrap(); + assert_eq!(view.header().message_id(), crate::protocol::MessageId::SD); + let _ = sm_port; + } + /// Negative test: a factory that returns /// `Err(TransportError::AddressInUse)` must surface as /// `Err(Error::Transport(TransportError::AddressInUse))` through diff --git a/src/tokio_transport.rs b/src/tokio_transport.rs index 7a764af..f2523e1 100644 --- a/src/tokio_transport.rs +++ b/src/tokio_transport.rs @@ -34,9 +34,12 @@ use core::future::Future; use core::net::{Ipv4Addr, SocketAddrV4}; +use core::pin::Pin; +use core::task::{Context, Poll}; use core::time::Duration; use std::net::{IpAddr, SocketAddr}; use std::sync::{Arc, Mutex, RwLock}; +use tokio::io::ReadBuf; use tokio::net::UdpSocket; use crate::e2e::{E2ECheckStatus, E2EKey, E2EProfile}; @@ -114,45 +117,97 @@ impl TransportFactory for TokioTransport { } } -impl TransportSocket for TokioSocket { - async fn send_to(&self, buf: &[u8], target: SocketAddrV4) -> Result<(), TransportError> { - self.inner - .send_to(buf, target) - .await - .map(|_| ()) - .map_err(|e| map_io_error(&e)) +/// Named future returned by [`TokioSocket::send_to`]. +/// +/// Drives [`tokio::net::UdpSocket::poll_send_to`] directly so the GAT +/// associated type ([`TransportSocket::SendFuture`]) can be named on +/// stable Rust without heap-allocating a [`futures::future::BoxFuture`] +/// per datagram. Auto-derives `Send`. +pub struct SendTo<'a> { + socket: &'a UdpSocket, + buf: &'a [u8], + target: SocketAddr, +} + +impl Future for SendTo<'_> { + type Output = Result<(), TransportError>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match self.socket.poll_send_to(cx, self.buf, self.target) { + Poll::Pending => Poll::Pending, + Poll::Ready(Ok(_n)) => Poll::Ready(Ok(())), + Poll::Ready(Err(e)) => Poll::Ready(Err(map_io_error(&e))), + } } +} - async fn recv_from(&self, buf: &mut [u8]) -> Result { - let (n, src) = self - .inner - .recv_from(buf) - .await - .map_err(|e| map_io_error(&e))?; - let source = match src { - SocketAddr::V4(v4) => v4, - SocketAddr::V6(_) => { - // SOME/IP is IPv4-only; an IPv6 source on our socket is - // either impossible (v4 bind) or a misconfiguration. - return Err(TransportError::Unsupported); +/// Named future returned by [`TokioSocket::recv_from`]. +/// +/// Drives [`tokio::net::UdpSocket::poll_recv_from`] directly so the GAT +/// associated type ([`TransportSocket::RecvFuture`]) can be named on +/// stable Rust without heap-allocating a [`futures::future::BoxFuture`] +/// per datagram. Auto-derives `Send`. +pub struct RecvFrom<'a> { + socket: &'a UdpSocket, + buf: &'a mut [u8], +} + +impl Future for RecvFrom<'_> { + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + // No self-references; safe to project to &mut Self. + let me = self.get_mut(); + let mut read_buf = ReadBuf::new(me.buf); + match me.socket.poll_recv_from(cx, &mut read_buf) { + Poll::Pending => Poll::Pending, + Poll::Ready(Err(e)) => Poll::Ready(Err(map_io_error(&e))), + Poll::Ready(Ok(src)) => { + let n = read_buf.filled().len(); + let source = match src { + SocketAddr::V4(v4) => v4, + // SOME/IP is IPv4-only; an IPv6 source on our socket is + // either impossible (v4 bind) or a misconfiguration. + SocketAddr::V6(_) => return Poll::Ready(Err(TransportError::Unsupported)), + }; + // Caveat: `tokio::net::UdpSocket::poll_recv_from` silently + // truncates when the caller's `buf` is smaller than the + // datagram and returns only the bytes that fit — it does + // NOT expose a truncation flag. Surfacing a reliable + // `truncated: bool` here would require a platform-specific + // `recvmsg`/MSG_TRUNC path (libc + unsafe), which is + // deferred to the phase 10+ bare-metal refactor. Until + // then, this field is always `false` for the Tokio + // backend; callers must not rely on it for truncation + // detection. This is documented on + // `ReceivedDatagram::truncated`'s field doc. + Poll::Ready(Ok(ReceivedDatagram { + bytes_received: n, + source, + truncated: false, + })) } - }; - // Caveat: `tokio::net::UdpSocket::recv_from` silently - // truncates when the caller's `buf` is smaller than the - // datagram and returns only the bytes that fit — it does - // NOT expose a truncation flag. Surfacing a reliable - // `truncated: bool` here would require a platform-specific - // `recvmsg`/MSG_TRUNC path (libc + unsafe), which is - // deferred to the phase 10+ bare-metal refactor. Until - // then, this field is always `false` for the Tokio - // backend; callers must not rely on it for truncation - // detection. This is documented on - // `ReceivedDatagram::truncated`'s field doc. - Ok(ReceivedDatagram { - bytes_received: n, - source, - truncated: false, - }) + } + } +} + +impl TransportSocket for TokioSocket { + type SendFuture<'a> = SendTo<'a>; + type RecvFuture<'a> = RecvFrom<'a>; + + fn send_to<'a>(&'a self, buf: &'a [u8], target: SocketAddrV4) -> Self::SendFuture<'a> { + SendTo { + socket: &self.inner, + buf, + target: SocketAddr::V4(target), + } + } + + fn recv_from<'a>(&'a self, buf: &'a mut [u8]) -> Self::RecvFuture<'a> { + RecvFrom { + socket: &self.inner, + buf, + } } fn local_addr(&self) -> Result { @@ -427,10 +482,14 @@ pub use embassy_channels::{ #[cfg(feature = "bare_metal")] mod embassy_channels { - use super::*; use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex; use embassy_sync::channel::Channel; use std::sync::Arc; + use core::future::Future; + use crate::transport::{ + ChannelFactory, MpscRecv, MpscSend, OneshotCancelled, OneshotRecv, OneshotSend, + UnboundedRecv, UnboundedSend, + }; // ── Oneshot (capacity-1 Channel) ────────────────────────────────────── @@ -553,7 +612,7 @@ mod embassy_channels { /// [`ChannelFactory`] backed by `embassy-sync::channel::Channel`. /// /// The `Arc>` allocation makes this suitable for - /// `std + alloc` bare-metal builds. A future no_alloc port stores the + /// `std + alloc` bare-metal builds. A future `no_alloc` port stores the /// channel in a `static` and works with borrowed handles. #[derive(Clone, Copy)] pub struct EmbassySyncChannels; diff --git a/src/transport.rs b/src/transport.rs index 6693c28..bcc6b4e 100644 --- a/src/transport.rs +++ b/src/transport.rs @@ -104,6 +104,7 @@ //! use core::future::Future; //! use core::net::{Ipv4Addr, SocketAddrV4}; //! use core::time::Duration; +//! use futures::future::BoxFuture; //! use simple_someip::transport::{ //! IoErrorKind, ReceivedDatagram, SocketOptions, Timer, TransportError, //! TransportFactory, TransportSocket, @@ -132,24 +133,31 @@ //! } //! //! impl TransportSocket for TokioSocket { -//! fn send_to( -//! &self, -//! buf: &[u8], +//! // `BoxFuture` keeps this sketch short. The real `TokioSocket` +//! // shipped under the `client` / `server` features uses named +//! // future structs that wrap `poll_send_to` / `poll_recv_from` +//! // for zero-allocation per datagram — see `tokio_transport.rs`. +//! type SendFuture<'a> = BoxFuture<'a, Result<(), TransportError>>; +//! type RecvFuture<'a> = BoxFuture<'a, Result>; +//! +//! fn send_to<'a>( +//! &'a self, +//! buf: &'a [u8], //! target: SocketAddrV4, -//! ) -> impl Future> { -//! async move { +//! ) -> Self::SendFuture<'a> { +//! Box::pin(async move { //! self.inner //! .send_to(buf, target) //! .await //! .map(|_| ()) //! .map_err(|_| TransportError::Io(IoErrorKind::Other)) -//! } +//! }) //! } -//! fn recv_from( -//! &self, -//! buf: &mut [u8], -//! ) -> impl Future> { -//! async move { +//! fn recv_from<'a>( +//! &'a self, +//! buf: &'a mut [u8], +//! ) -> Self::RecvFuture<'a> { +//! Box::pin(async move { //! let (n, src) = self //! .inner //! .recv_from(buf) @@ -164,7 +172,7 @@ //! source, //! truncated: false, //! }) -//! } +//! }) //! } //! fn local_addr(&self) -> Result { //! match self.inner.local_addr() { @@ -348,9 +356,9 @@ pub struct ReceivedDatagram { /// A bound, configured UDP socket usable for SOME/IP message exchange. /// /// Implementations are obtained via [`TransportFactory::bind`]. The -/// send/receive methods return `impl Future` so the trait is -/// executor-agnostic; the caller awaits them on whatever runtime it -/// owns. The smaller socket-level queries ([`Self::local_addr`], +/// send/receive methods return associated future types so callers can +/// require `Send` bounds when spawning socket loops on multithreaded +/// executors. The smaller socket-level queries ([`Self::local_addr`], /// [`Self::join_multicast_v4`], [`Self::leave_multicast_v4`]) are /// synchronous because they are typically O(1) lookups on a backend's /// internal handle and do not benefit from yielding to the executor. @@ -359,7 +367,39 @@ pub struct ReceivedDatagram { /// [`TransportSocket::join_multicast_v4`]; the bind-time /// [`SocketOptions::multicast_if_v4`] only selects the *outbound* /// multicast interface. +/// +/// # Associated future types (Phase 12) +/// +/// The [`SendFuture`](Self::SendFuture) and [`RecvFuture`](Self::RecvFuture) +/// associated types let consumers express `Send` bounds on the futures +/// returned by `send_to` and `recv_from` without requiring nightly-only +/// Return-Type Notation (RTN, RFC 3654). This enables: +/// +/// ```ignore +/// fn spawn_loop(sock: T, spawner: impl Spawner) +/// where +/// T: Send + Sync + 'static, +/// for<'a> T::SendFuture<'a>: Send, +/// for<'a> T::RecvFuture<'a>: Send, +/// { +/// spawner.spawn(async move { /* use sock */ }); +/// } +/// ``` +/// +/// `TokioSocket` implements these with `Send` futures; bare-metal +/// implementations must do the same if they want to be used with +/// multithreaded spawners. pub trait TransportSocket { + /// Future returned by [`Self::send_to`]. + type SendFuture<'a>: Future> + where + Self: 'a; + + /// Future returned by [`Self::recv_from`]. + type RecvFuture<'a>: Future> + where + Self: 'a; + /// Send `buf` to `target`. UDP is atomic — either the whole datagram /// is transmitted or an error is returned; there is no short-write /// case, which is why this method returns `()` on success rather than @@ -385,11 +425,7 @@ pub trait TransportSocket { /// - [`TransportError::Unsupported`] if `target` is not representable /// on a backend that only speaks a subset of IPv4 (rare; most /// backends surface addressing issues as [`TransportError::Io`]). - fn send_to( - &self, - buf: &[u8], - target: SocketAddrV4, - ) -> impl Future>; + fn send_to<'a>(&'a self, buf: &'a [u8], target: SocketAddrV4) -> Self::SendFuture<'a>; /// Receive the next datagram into `buf`, returning a /// [`ReceivedDatagram`] carrying byte count, source, and a truncation @@ -413,10 +449,7 @@ pub trait TransportSocket { /// A datagram whose payload exceeds `buf` is **not** an error; it is /// returned with [`ReceivedDatagram::truncated`] set to `true`. The /// caller decides whether to treat truncation as fatal. - fn recv_from( - &self, - buf: &mut [u8], - ) -> impl Future>; + fn recv_from<'a>(&'a self, buf: &'a mut [u8]) -> Self::RecvFuture<'a>; /// Return the local address this socket is bound to. Useful for /// discovering the ephemeral port chosen by `bind(port: 0, ..)`. @@ -836,18 +869,14 @@ mod tests { } impl TransportSocket for NullSocket { - fn send_to( - &self, - _buf: &[u8], - _target: SocketAddrV4, - ) -> impl Future> { + type SendFuture<'a> = core::future::Ready>; + type RecvFuture<'a> = core::future::Ready>; + + fn send_to<'a>(&'a self, _buf: &'a [u8], _target: SocketAddrV4) -> Self::SendFuture<'a> { core::future::ready(Err(TransportError::Unsupported)) } - fn recv_from( - &self, - _buf: &mut [u8], - ) -> impl Future> { + fn recv_from<'a>(&'a self, _buf: &'a mut [u8]) -> Self::RecvFuture<'a> { core::future::ready(Err(TransportError::Unsupported)) }