From 0cc8b25ad814b063fc7f4d12fa1bd459aeba72ce Mon Sep 17 00:00:00 2001 From: Justin Kovacich Date: Tue, 28 Apr 2026 08:14:05 -0400 Subject: [PATCH 1/6] phase 13.6a: ChannelFactory bounded const-N quirk fix MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Prep work for phase 13.6 (static-pool ChannelFactory). Fixes a trait-shape bug uncovered during 13.6 design: `ChannelFactory::bounded` declares `` but the associated-type GAT didn't carry N, so backends that need N at the storage level (`embassy-sync`) silently hardcoded a single capacity (16) regardless of the call-site request. The const-generic on the method was advisory only; the storage shape never honored it. # Trait change (breaking) Before: type BoundedSender: MpscSend; type BoundedReceiver: MpscRecv; fn bounded() -> (Self::BoundedSender, Self::BoundedReceiver); After: type BoundedSender: MpscSend; type BoundedReceiver: MpscRecv; fn bounded() -> (Self::BoundedSender, Self::BoundedReceiver); # Backend impls - `tokio_transport::TokioChannels`: passes N through; ignored at the storage level (tokio mpsc stores capacity at runtime). - `embassy_channels::EmbassySyncChannels`: now actually uses the call-site N for the embassy `Channel<_, T, N>` storage. Previously hardcoded to 16. # Storage-site standardization `SocketManager`'s bounded sender/receiver fields now spell out `BoundedReceiver<_, 16>` / `BoundedSender<_, 16>` — both bind paths (discovery + unicast) standardized to N=16. Unicast was historically N=4, a tokio-conservative choice with no semantic requirement; bumping to 16 matches what embassy already used and what discovery already asked for. `Inner`'s control channel stays at N=4 (it's a separate channel) — its storage type is now `BoundedReceiver<_, 4>` / `BoundedSender<_, 4>`. # Why this is its own commit Phase 13.6's main work is the static-pool `ChannelFactory` impl (`StaticChannels` with per-T monomorphization via a `static_channels!` macro, atomic free-list reclamation, and close semantics for graceful run-loop shutdown). The const-N fix is genuinely independent of that work and benefits any future ChannelFactory impl that cares about per-channel capacity. Landing it separately keeps the 13.6-main commit focused on the static-pool design. # Verification - `cargo test --all-features --lib`: 457 / 457 pass. - `cargo clippy --all-features --all-targets`: clean. - `tests/bare_metal_client` witness still passes. # What this leaves for 13.6 (main) The static-pool `ChannelFactory` itself: `StaticChannels` with per-T pool storage, atomic free-list, poison-flag close semantics, and a `static_channels!` macro that consumers invoke with their distinct payload types. Plus rewriting the `bare_metal_client` witness to use StaticChannels (dropping the `JoinHandle::abort()` workaround the EmbassySyncChannels impl forced). Co-Authored-By: Claude Opus 4.7 (1M context) --- .gitignore | 8 +++----- src/client/inner.rs | 4 ++-- src/client/mod.rs | 2 +- src/client/socket_manager.rs | 20 ++++++++++++++------ src/embassy_channels.rs | 17 +++++++++-------- src/tokio_transport.rs | 10 +++++++--- src/transport.rs | 15 ++++++++++----- 7 files changed, 46 insertions(+), 30 deletions(-) diff --git a/.gitignore b/.gitignore index 93edde4..86362e6 100644 --- a/.gitignore +++ b/.gitignore @@ -1,7 +1,5 @@ -.DS_Store - -/target - +.claude/ CLAUDE.md - +.DS_Store lcov.info +/target diff --git a/src/client/inner.rs b/src/client/inner.rs index 3931292..dab0e85 100644 --- a/src/client/inner.rs +++ b/src/client/inner.rs @@ -299,7 +299,7 @@ pub(super) struct Inner< C: ChannelFactory, > { /// MPSC Receiver used to receive control messages from outer client - control_receiver: C::BoundedReceiver>, + control_receiver: C::BoundedReceiver, 4>, /// Queue of pending control messages to process request_queue: Deque, REQUEST_QUEUE_CAP>, /// Pending request-responses keyed by `request_id` (`client_id` << 16 | `session_counter`). @@ -398,7 +398,7 @@ where spawner: S, timer: Tm, ) -> ( - C::BoundedSender>, + C::BoundedSender, 4>, C::UnboundedReceiver>, impl core::future::Future + Send + 'static, ) { diff --git a/src/client/mod.rs b/src/client/mod.rs index 5ee7ed8..c6285b6 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -220,7 +220,7 @@ pub struct Client< C: ChannelFactory, > { interface: I, - control_sender: C::BoundedSender>, + control_sender: C::BoundedSender, 4>, e2e_registry: R, } diff --git a/src/client/socket_manager.rs b/src/client/socket_manager.rs index 847eb7a..a22a2d5 100644 --- a/src/client/socket_manager.rs +++ b/src/client/socket_manager.rs @@ -127,8 +127,8 @@ impl } pub struct SocketManager { - receiver: C::BoundedReceiver, Error>>, - sender: C::BoundedSender>, + receiver: C::BoundedReceiver, Error>, 16>, + sender: C::BoundedSender, 16>, local_port: u16, session_id: u16, /// Set to true once `session_id` has wrapped from 0xFFFF → 1. @@ -324,9 +324,17 @@ where S: Spawner, R: E2ERegistryHandle, { + // Standardized to N=16 across both discovery and unicast bind + // paths (was N=4 here historically — a tokio-conservative + // choice). The trait's const-N now propagates to the GAT, so + // the stored receiver/sender types must commit to a single N; + // 16 matches what embassy-sync hardcodes and what discovery + // already used. Bumping the unicast capacity from 4 to 16 has + // no semantic effect — it just lets the channels absorb a + // brief burst before backpressure kicks in. let (rx_tx, rx_rx) = - C::bounded::, Error>, 4>(); - let (tx_tx, tx_rx) = C::bounded::, 4>(); + C::bounded::, Error>, 16>(); + let (tx_tx, tx_rx) = C::bounded::, 16>(); let options = { let mut o = SocketOptions::new(); @@ -452,8 +460,8 @@ where #[allow(clippy::too_many_lines)] async fn socket_loop_future( socket: T, - rx_tx: C::BoundedSender, Error>>, - mut tx_rx: C::BoundedReceiver>, + rx_tx: C::BoundedSender, Error>, 16>, + mut tx_rx: C::BoundedReceiver, 16>, e2e_registry: R, ) where diff --git a/src/embassy_channels.rs b/src/embassy_channels.rs index d570361..8909a57 100644 --- a/src/embassy_channels.rs +++ b/src/embassy_channels.rs @@ -173,15 +173,16 @@ impl ChannelFactory for EmbassySyncChannels { ) } - type BoundedSender = EmbassySyncBoundedSender; - type BoundedReceiver = EmbassySyncBoundedReceiver; + // Phase 13.6: the const-N quirk is fixed. The `N` from the trait + // call site now propagates into the embassy `Channel<_, T, N>` + // storage, so callers asking for capacity 16 actually get 16, and + // callers asking for 4 actually get 4. (Previously this impl + // hardcoded 16 regardless of the requested N.) + type BoundedSender = EmbassySyncBoundedSender; + type BoundedReceiver = EmbassySyncBoundedReceiver; fn bounded( - ) -> (Self::BoundedSender, Self::BoundedReceiver) { - // The const N from the trait call site is ignored here — embassy-sync - // requires the capacity to be known at the impl level, not the call - // site. All bounded channels use capacity 16, which covers the - // worst case (discovery socket, which uses 16). - let chan: Arc> = Arc::new(Channel::new()); + ) -> (Self::BoundedSender, Self::BoundedReceiver) { + let chan: Arc> = Arc::new(Channel::new()); ( EmbassySyncBoundedSender(chan.clone()), EmbassySyncBoundedReceiver(chan), diff --git a/src/tokio_transport.rs b/src/tokio_transport.rs index 1c113a8..bbe0e47 100644 --- a/src/tokio_transport.rs +++ b/src/tokio_transport.rs @@ -398,10 +398,14 @@ impl ChannelFactory for TokioChannels { (tx, TokioOneshotReceiver(rx)) } - type BoundedSender = tokio::sync::mpsc::Sender; - type BoundedReceiver = tokio::sync::mpsc::Receiver; + // Tokio's `mpsc` channels store capacity at runtime, so the + // const-generic `N` is informational only — it does not affect + // the stored type. Embassy-sync's impl uses `N` differently (see + // `embassy_channels`). + type BoundedSender = tokio::sync::mpsc::Sender; + type BoundedReceiver = tokio::sync::mpsc::Receiver; fn bounded( - ) -> (Self::BoundedSender, Self::BoundedReceiver) { + ) -> (Self::BoundedSender, Self::BoundedReceiver) { tokio::sync::mpsc::channel(N) } diff --git a/src/transport.rs b/src/transport.rs index 3cb83cf..933fc52 100644 --- a/src/transport.rs +++ b/src/transport.rs @@ -875,13 +875,18 @@ pub trait ChannelFactory: Clone + Send + Sync + 'static { /// Create a oneshot channel pair. fn oneshot() -> (Self::OneshotSender, Self::OneshotReceiver); - /// Bounded-channel sender type. - type BoundedSender: MpscSend; - /// Bounded-channel receiver type. - type BoundedReceiver: MpscRecv; + /// Bounded-channel sender type. The `const N: usize` parameter is + /// the channel capacity; it must match the `N` passed to + /// [`Self::bounded`]. Backends that store the capacity at + /// construction time (`tokio::sync::mpsc`) ignore it for storage + /// purposes; backends that bake it into the type (`embassy-sync`) + /// use it directly. + type BoundedSender: MpscSend; + /// Bounded-channel receiver type. See [`Self::BoundedSender`]. + type BoundedReceiver: MpscRecv; /// Create a bounded channel with capacity `N`. fn bounded( - ) -> (Self::BoundedSender, Self::BoundedReceiver); + ) -> (Self::BoundedSender, Self::BoundedReceiver); /// Unbounded-channel sender type. type UnboundedSender: UnboundedSend; From fd01d5ca4b1e023dbb45c4761a145d16d8e4393b Mon Sep 17 00:00:00 2001 From: Justin Kovacich Date: Tue, 28 Apr 2026 09:22:13 -0400 Subject: [PATCH 2/6] phase 13.6b: ChannelFactory per-T Pooled bounds MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Reshape `ChannelFactory` so the three constructor methods (`oneshot`, `bounded`, `unbounded`) gain `where T: *Pooled` bounds and dispatch to that trait's pair-builder by default, instead of being direct constructors. Three new traits in `transport.rs`: - `OneshotPooled: Send + Sized + 'static` - `BoundedPooled: Send + Sized + 'static` - `UnboundedPooled: Send + Sized + 'static` Each carries a `*_pair() -> (C::Sender, C::Receiver)` constructor. `TokioChannels` and `EmbassySyncChannels` publish blanket `impl *Pooled for T` (TokioChannels also blankets bounded over `const N`), so existing user code is unaffected. A static-pool `ChannelFactory` (phase 13.6c+) instead publishes per-`T` `*Pooled` impls — typically generated by a macro — each pointing at a declared `static` pool. Calling `C::oneshot::()` against such a backend fails at the call site with `OneshotPooled is not implemented for NotDeclared`, turning "forgot to declare a pool" from a runtime panic into a compile error. What this leaves for 13.6c: - `src/static_channels/` module with pool primitives (slot, pool, free-list, send/recv handle types) and atomic ordering. - The `static_channels!` macro (13.6d) that generates per-`T` `*Pooled` impls from a user pool-layout declaration. - The alloc-panicking witness test (13.6e). Bound propagation: - The 7-bound bundle (3 oneshot, 3 bounded, 1 unbounded) is repeated inline at each impl block that constructs channels through `C`: `ControlMessage`, `Inner`, `SendMessage`, `SocketManager`, `Client`. A doc comment in `client::mod` explains why a single `C: ClientChannels

` trait alias does not work today (stable Rust does not elaborate where-clause bounds, and macros do not expand inside `where` clauses) and points at the implied-bounds RFC that would let it collapse. - `ControlMessage`, `SendMessage`, `ReceivedMessage` go from `pub(super)` to `pub` and are re-exported from `client` so the forthcoming `static_channels!` macro can name them when generating per-`T` `*Pooled` impls. - `MessageDefinitions` gains a `Send` bound on every impl that bundles the channeled types — `Result: OneshotPooled` requires `P: Send`. Already present on `Client`; added on `Inner` and the `SendMessage`/`ControlMessage` factory impls. Verification: - `cargo build` clean across `client-tokio`, `client+bare_metal+std`, `server`, all-features. - `cargo test --all-features -- --test-threads=1`: 479 tests pass (457 lib + 11 client_server + 1 bare_metal_client + 1 bare_metal workspace member + 9 doctests). - `cargo clippy --all-targets --all-features` clean. - `cargo fmt -- --check` clean. Collateral: `cargo fmt` swept up pre-existing baseline format drift in `examples/`, `src/lib.rs`, `src/server/`, and one inner-test match arm — included in this commit rather than split off. Co-Authored-By: Claude Opus 4.7 (1M context) --- examples/bare_metal/src/main.rs | 6 +- examples/client_server/src/main.rs | 3 +- examples/discovery_client/src/main.rs | 3 +- src/client/inner.rs | 79 +++++++++++----- src/client/mod.rs | 59 ++++++++++-- src/client/socket_manager.rs | 36 ++++---- src/embassy_channels.rs | 67 +++++++++----- src/lib.rs | 4 +- src/server/mod.rs | 5 +- src/server/subscription_manager.rs | 9 +- src/tokio_transport.rs | 49 +++++++--- src/transport.rs | 125 ++++++++++++++++++++++---- tests/bare_metal_client.rs | 18 +--- 13 files changed, 337 insertions(+), 126 deletions(-) diff --git a/examples/bare_metal/src/main.rs b/examples/bare_metal/src/main.rs index da84428..455a7b7 100644 --- a/examples/bare_metal/src/main.rs +++ b/examples/bare_metal/src/main.rs @@ -177,7 +177,11 @@ impl Future for MockSendFut { fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll { let me = self.get_mut(); if let Some(bytes) = me.bytes.take() { - me.pipe.send_queue.lock().unwrap().push_back((bytes, me.target)); + me.pipe + .send_queue + .lock() + .unwrap() + .push_back((bytes, me.target)); } Poll::Ready(Ok(())) } diff --git a/examples/client_server/src/main.rs b/examples/client_server/src/main.rs index 516e064..c3eb7f0 100644 --- a/examples/client_server/src/main.rs +++ b/examples/client_server/src/main.rs @@ -106,8 +106,7 @@ async fn main() -> Result<(), Box> { // ── Create the client (handles discovery, subscriptions, SD socket) ── - let (client, mut updates, run_fut) = - simple_someip::Client::::new(interface); + let (client, mut updates, run_fut) = simple_someip::Client::::new(interface); let _run_handle = tokio::spawn(run_fut); client.bind_discovery().await?; info!("Client discovery bound"); diff --git a/examples/discovery_client/src/main.rs b/examples/discovery_client/src/main.rs index e4efd3b..4c3fcb0 100644 --- a/examples/discovery_client/src/main.rs +++ b/examples/discovery_client/src/main.rs @@ -287,8 +287,7 @@ async fn main() -> Result<(), Error> { info!("Starting discovery client on interface {interface}"); - let (client, mut updates, run_fut) = - simple_someip::Client::::new(interface); + let (client, mut updates, run_fut) = simple_someip::Client::::new(interface); let _run_handle = tokio::spawn(run_fut); client.bind_discovery().await.unwrap(); diff --git a/src/client/inner.rs b/src/client/inner.rs index dab0e85..2a77da8 100644 --- a/src/client/inner.rs +++ b/src/client/inner.rs @@ -8,6 +8,10 @@ use std::borrow::ToOwned; use std::sync::{Arc, Mutex}; use tracing::{debug, error, info, trace, warn}; +#[cfg(all(test, feature = "client-tokio"))] +use crate::e2e::E2ERegistry; +#[cfg(all(test, feature = "client-tokio"))] +use crate::tokio_transport::{TokioChannels, TokioSpawner, TokioTimer, TokioTransport}; use crate::{ Timer, client::{ @@ -23,10 +27,6 @@ use crate::{ TransportSocket, UnboundedSend, }, }; -#[cfg(all(test, feature = "client-tokio"))] -use crate::e2e::E2ERegistry; -#[cfg(all(test, feature = "client-tokio"))] -use crate::tokio_transport::{TokioChannels, TokioSpawner, TokioTimer, TokioTransport}; use super::error::Error; @@ -43,7 +43,7 @@ const PENDING_RESPONSES_CAP: usize = 64; /// two. const UNICAST_SOCKETS_CAP: usize = 8; -pub(super) enum ControlMessage { +pub enum ControlMessage { SetInterface(Ipv4Addr, C::OneshotSender>), BindDiscovery(C::OneshotSender>), UnbindDiscovery(C::OneshotSender>), @@ -140,20 +140,31 @@ impl std::fmt::Debug for Cont } } -impl ControlMessage { +impl ControlMessage +where + P: PayloadWireFormat + Send + 'static, + C: ChannelFactory, + Result<(), Error>: crate::transport::OneshotPooled, + Result: crate::transport::OneshotPooled, + Result: crate::transport::OneshotPooled, +{ + #[must_use] pub fn set_interface(interface: Ipv4Addr) -> (C::OneshotReceiver>, Self) { let (sender, receiver) = C::oneshot(); (receiver, Self::SetInterface(interface, sender)) } + #[must_use] pub fn bind_discovery() -> (C::OneshotReceiver>, Self) { let (sender, receiver) = C::oneshot(); (receiver, Self::BindDiscovery(sender)) } + #[must_use] pub fn unbind_discovery() -> (C::OneshotReceiver>, Self) { let (sender, receiver) = C::oneshot(); (receiver, Self::UnbindDiscovery(sender)) } + #[must_use] pub fn send_sd( socket_addr: SocketAddrV4, header: P::SdHeader, @@ -161,6 +172,7 @@ impl ControlMessage { let (sender, receiver) = C::oneshot(); (receiver, Self::SendSD(socket_addr, header, sender)) } + #[must_use] pub fn add_endpoint( service_id: u16, instance_id: u16, @@ -174,6 +186,7 @@ impl ControlMessage { ) } + #[must_use] pub fn remove_endpoint( service_id: u16, instance_id: u16, @@ -186,6 +199,7 @@ impl ControlMessage { } #[allow(clippy::type_complexity)] + #[must_use] pub fn send_to_service( service_id: u16, instance_id: u16, @@ -210,6 +224,7 @@ impl ControlMessage { ) } + #[must_use] pub fn subscribe( service_id: u16, instance_id: u16, @@ -233,6 +248,7 @@ impl ControlMessage { ) } + #[must_use] pub fn query_reboot_flag() -> ( C::OneshotReceiver>, Self, @@ -242,6 +258,7 @@ impl ControlMessage { } #[cfg(all(test, feature = "client-tokio"))] + #[must_use] pub fn force_sd_session_wrapped_for_test( wrapped: bool, ) -> (C::OneshotReceiver>, Self) { @@ -304,8 +321,11 @@ pub(super) struct Inner< request_queue: Deque, REQUEST_QUEUE_CAP>, /// Pending request-responses keyed by `request_id` (`client_id` << 16 | `session_counter`). /// Set by `SendToService`, cleared when a matching unicast arrives. - pending_responses: - FnvIndexMap>, PENDING_RESPONSES_CAP>, + pending_responses: FnvIndexMap< + u32, + C::OneshotSender>, + PENDING_RESPONSES_CAP, + >, /// Unbounded sender used to send updates to outer client update_sender: C::UnboundedSender>, /// Target interface for sockets @@ -370,7 +390,7 @@ impl< impl Inner where - PayloadDefinitions: PayloadWireFormat + Clone + std::fmt::Debug + 'static, + PayloadDefinitions: PayloadWireFormat + Clone + std::fmt::Debug + Send + 'static, F: TransportFactory + Send + Sync + 'static, F::Socket: Send + Sync + 'static, for<'a> ::SendFuture<'a>: Send, @@ -379,6 +399,16 @@ where Tm: Timer + Send + Sync + 'static, R: E2ERegistryHandle, C: ChannelFactory, + // Channel-bound bundle (see comment in `client::mod`). + Result<(), Error>: crate::transport::OneshotPooled, + Result: crate::transport::OneshotPooled, + Result: crate::transport::OneshotPooled, + ControlMessage: crate::transport::BoundedPooled, + super::socket_manager::SendMessage: + crate::transport::BoundedPooled, + Result, Error>: + crate::transport::BoundedPooled, + super::ClientUpdate: crate::transport::UnboundedPooled, { /// Construct an `Inner` and return the control/update channels plus /// the run-loop future. The caller drives the future on its @@ -1012,9 +1042,7 @@ where // `TokioTimer` (wrapping `tokio::time::sleep`); bare-metal // builds plug in their own (e.g. an `embassy_time` shim). let control_fut = control_receiver.recv().fuse(); - let sleep_fut = timer - .sleep(core::time::Duration::from_millis(125)) - .fuse(); + let sleep_fut = timer.sleep(core::time::Duration::from_millis(125)).fuse(); let discovery_fut = Self::receive_discovery(discovery_socket).fuse(); let unicast_fut = Self::receive_any_unicast(unicast_sockets).fuse(); pin_mut!(control_fut, sleep_fut, discovery_fut, unicast_fut); @@ -1177,8 +1205,8 @@ mod tests { use crate::protocol::sd::test_support::{TestPayload, empty_sd_header}; use crate::transport::{OneshotRecv, UnboundedRecv}; use std::format; - use tokio::sync::{mpsc, oneshot}; use tokio::sync::mpsc::Sender; + use tokio::sync::{mpsc, oneshot}; type TestControl = ControlMessage; /// Type alias for the fully-spelled `Inner` flavor used throughout @@ -1233,8 +1261,8 @@ mod tests { /// the resulting `RecvError`, which is exactly what Copilot flagged. #[test] fn reject_with_capacity_notifies_every_sender() { - use futures::FutureExt; use crate::transport::OneshotCancelled; + use futures::FutureExt; fn expect_capacity(rx: F, label: &str) where @@ -1286,8 +1314,12 @@ mod tests { expect_capacity(send_rx.recv(), "SendToService.send_complete"); // resp_rx has type Result — check it separately match resp_rx.recv().now_or_never() { - Some(Ok(Err(Error::Capacity(s)))) => assert_eq!(s, "request_queue", "SendToService.response"), - other => panic!("SendToService.response: expected Some(Ok(Err(Capacity))), got {other:?}"), + Some(Ok(Err(Error::Capacity(s)))) => { + assert_eq!(s, "request_queue", "SendToService.response"); + } + other => { + panic!("SendToService.response: expected Some(Ok(Err(Capacity))), got {other:?}") + } } } @@ -1520,7 +1552,9 @@ mod tests { ); match displaced_result { Err(Error::Capacity(tag)) => assert_eq!(tag, "pending_responses"), - other => panic!("expected Err(Error::Capacity(\\\"pending_responses\\\")), got {other:?}"), + other => { + panic!("expected Err(Error::Capacity(\\\"pending_responses\\\")), got {other:?}") + } } // The new sender is still live and pending. @@ -1629,15 +1663,20 @@ mod tests { // Drop control sender to trigger loop exit drop(control_sender); // The update receiver should eventually return None when the inner loop exits - let result = - tokio::time::timeout(std::time::Duration::from_secs(2), UnboundedRecv::recv(&mut update_receiver)).await; + let result = tokio::time::timeout( + std::time::Duration::from_secs(2), + UnboundedRecv::recv(&mut update_receiver), + ) + .await; assert!(result.is_ok()); assert!(result.unwrap().is_none()); } /// Helper: verify inner loop is still alive by sending an `AddEndpoint` and /// checking that a response arrives within 2 seconds. - async fn assert_inner_alive(control_sender: &Sender>) { + async fn assert_inner_alive( + control_sender: &Sender>, + ) { let addr = SocketAddrV4::new(Ipv4Addr::LOCALHOST, 9999); let (rx, msg) = TestControl::add_endpoint(0xFFFE, 0xFFFE, addr, 0); control_sender.send(msg).await.unwrap(); diff --git a/src/client/mod.rs b/src/client/mod.rs index c6285b6..2bd2c38 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -35,24 +35,56 @@ mod session; mod socket_manager; pub use error::Error; +/// Internal control message exchanged between [`Client`] handles and +/// the run-loop. Exposed (rather than `pub(super)`) so callers can +/// declare static channel pools for it via +/// `crate::transport::BoundedPooled`. End users typically do not +/// reference this type directly — the +/// `crate::static_channels::static_channels!` macro names it for them. +pub use inner::ControlMessage; +/// Per-socket message types exposed for the same reason as +/// [`ControlMessage`] — see its docstring. +pub use socket_manager::{ReceivedMessage, SendMessage}; use crate::Timer; -use crate::e2e::{E2ECheckStatus, E2EKey, E2EProfile}; #[cfg(feature = "client-tokio")] use crate::e2e::E2ERegistry; +use crate::e2e::{E2ECheckStatus, E2EKey, E2EProfile}; #[cfg(feature = "client-tokio")] use crate::tokio_transport::{TokioChannels, TokioSpawner, TokioTimer}; use crate::transport::{ - ChannelFactory, E2ERegistryHandle, InterfaceHandle, MpscSend, OneshotRecv, Spawner, - TransportFactory, TransportSocket, UnboundedRecv, + BoundedPooled, ChannelFactory, E2ERegistryHandle, InterfaceHandle, MpscSend, OneshotPooled, + OneshotRecv, Spawner, TransportFactory, TransportSocket, UnboundedPooled, UnboundedRecv, }; use crate::{protocol, protocol::Message, traits::PayloadWireFormat}; use core::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; -use inner::{ControlMessage, Inner}; +use inner::Inner; #[cfg(feature = "client-tokio")] use std::sync::{Arc, Mutex, RwLock}; use tracing::info; +// Bound bundle the client's internals demand from any +// `C: ChannelFactory` they channel through. Stable Rust does not +// elaborate where-clause bounds on a trait alias, and macros do not +// expand inside `where` clauses, so the bundle is repeated inline at +// each impl block that constructs channels. The list is authored once +// here as documentation and copy-pasted; mismatch surfaces as a +// trait-bound compile error pointing at the missing `OneshotPooled` / +// `BoundedPooled` / `UnboundedPooled` impl. +// +// ```ignore +// Result<(), Error>: OneshotPooled, +// Result: OneshotPooled, +// Result: OneshotPooled, +// ControlMessage: BoundedPooled, +// SendMessage: BoundedPooled, +// Result, Error>: BoundedPooled, +// ClientUpdate

: UnboundedPooled, +// ``` +// +// When stable Rust gains implied bounds for trait where-clauses, this +// collapses back to a single `C: ClientChannels

` supertrait. + /// Handle to a pending SOME/IP request-response transaction. /// Resolves when the inner loop receives a matching unicast reply. /// Does not borrow `Client`. @@ -160,7 +192,9 @@ impl std::fm } } -impl ClientUpdates { +impl + ClientUpdates +{ /// Waits for the next update from the client event loop. /// /// Returns `None` when the inner loop has exited (all `Client` handles @@ -381,10 +415,17 @@ where /// Methods available on all `Client` regardless of handle types. impl Client where - MessageDefinitions: PayloadWireFormat + Clone + std::fmt::Debug + 'static, + MessageDefinitions: PayloadWireFormat + Clone + std::fmt::Debug + Send + 'static, R: E2ERegistryHandle, I: InterfaceHandle, C: ChannelFactory, + Result<(), Error>: OneshotPooled, + Result: OneshotPooled, + Result: OneshotPooled, + ControlMessage: BoundedPooled, + SendMessage: BoundedPooled, + Result, Error>: BoundedPooled, + ClientUpdate: UnboundedPooled, { /// Bare-metal-friendly constructor that takes every dependency /// explicitly via a [`ClientDeps`] bundle: a [`TransportFactory`], a @@ -927,7 +968,8 @@ where loop { timer.sleep(interval).await; - let (flag_rx, flag_msg) = ControlMessage::::query_reboot_flag(); + let (flag_rx, flag_msg) = + ControlMessage::::query_reboot_flag(); let Some(sender) = weak_sender.upgrade() else { tracing::info!("Client shut down, stopping SD announcements"); break; @@ -955,7 +997,8 @@ where let mut header = sd_header.clone(); MessageDefinitions::set_reboot_flag(&mut header, reboot); - let (response, message) = ControlMessage::::send_sd(target, header); + let (response, message) = + ControlMessage::::send_sd(target, header); let Some(sender) = weak_sender.upgrade() else { tracing::info!("Client shut down, stopping SD announcements"); diff --git a/src/client/socket_manager.rs b/src/client/socket_manager.rs index a22a2d5..ed92268 100644 --- a/src/client/socket_manager.rs +++ b/src/client/socket_manager.rs @@ -89,7 +89,9 @@ pub struct SendMessage { response: C::OneshotSender>, } -impl std::fmt::Debug for SendMessage { +impl std::fmt::Debug + for SendMessage +{ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("SendMessage") .field("target_addr", &self.target_addr) @@ -107,8 +109,11 @@ enum Outcome { Recv(Result), } -impl - SendMessage +impl SendMessage +where + PayloadDefinitions: PayloadWireFormat + Send + 'static, + C: ChannelFactory, + Result<(), Error>: crate::transport::OneshotPooled, { pub fn new( target_addr: SocketAddrV4, @@ -137,7 +142,9 @@ pub struct SocketManager session_has_wrapped: bool, } -impl std::fmt::Debug for SocketManager { +impl std::fmt::Debug + for SocketManager +{ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("SocketManager") .field("local_port", &self.local_port) @@ -150,6 +157,9 @@ impl SocketManager where MessageDefinitions: PayloadWireFormat + Send + 'static, C: ChannelFactory, + Result<(), Error>: crate::transport::OneshotPooled, + SendMessage: crate::transport::BoundedPooled, + Result, Error>: crate::transport::BoundedPooled, { /// Bind the SD multicast socket, seeding the session counter and wrap /// state from a previous socket when rebinding. Pass `(1, false)` for a @@ -245,8 +255,7 @@ where S: Spawner, R: E2ERegistryHandle, { - let (rx_tx, rx_rx) = - C::bounded::, Error>, 16>(); + let (rx_tx, rx_rx) = C::bounded::, Error>, 16>(); let (tx_tx, tx_rx) = C::bounded::, 16>(); // Control whether multicast packets sent by this socket are looped @@ -332,8 +341,7 @@ where // already used. Bumping the unicast capacity from 4 to 16 has // no semantic effect — it just lets the channels absorb a // brief burst before backpressure kicks in. - let (rx_tx, rx_rx) = - C::bounded::, Error>, 16>(); + let (rx_tx, rx_rx) = C::bounded::, Error>, 16>(); let (tx_tx, tx_rx) = C::bounded::, 16>(); let options = { @@ -374,7 +382,8 @@ where ); return Err(Error::Capacity("udp_buffer")); } - let (result_channel, message) = SendMessage::::new(target_addr, message); + let (result_channel, message) = + SendMessage::::new(target_addr, message); self.sender.send(message).await.map_err(|()| { error!("Socket error when attempting to send message"); Error::SocketClosedUnexpectedly @@ -463,8 +472,7 @@ where rx_tx: C::BoundedSender, Error>, 16>, mut tx_rx: C::BoundedReceiver, 16>, e2e_registry: R, - ) - where + ) where T: TransportSocket + Send + Sync + 'static, for<'a> T::SendFuture<'a>: Send, for<'a> T::RecvFuture<'a>: Send, @@ -1116,11 +1124,7 @@ mod tests { type SendFuture<'a> = ::SendFuture<'a>; type RecvFuture<'a> = ::RecvFuture<'a>; - fn send_to<'a>( - &'a self, - buf: &'a [u8], - target: SocketAddrV4, - ) -> Self::SendFuture<'a> { + fn send_to<'a>(&'a self, buf: &'a [u8], target: SocketAddrV4) -> Self::SendFuture<'a> { self.0.send_to(buf, target) } fn recv_from<'a>(&'a self, buf: &'a mut [u8]) -> Self::RecvFuture<'a> { diff --git a/src/embassy_channels.rs b/src/embassy_channels.rs index 8909a57..c1574ce 100644 --- a/src/embassy_channels.rs +++ b/src/embassy_channels.rs @@ -36,15 +36,13 @@ use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex; use embassy_sync::channel::Channel; use crate::transport::{ - ChannelFactory, MpscRecv, MpscSend, OneshotCancelled, OneshotRecv, OneshotSend, UnboundedRecv, - UnboundedSend, + BoundedPooled, ChannelFactory, MpscRecv, MpscSend, OneshotCancelled, OneshotPooled, + OneshotRecv, OneshotSend, UnboundedPooled, UnboundedRecv, UnboundedSend, }; // ── Oneshot (capacity-1 Channel) ────────────────────────────────────── -pub struct EmbassySyncOneshotSender( - Arc>, -); +pub struct EmbassySyncOneshotSender(Arc>); pub struct EmbassySyncOneshotReceiver( Arc>, @@ -97,10 +95,7 @@ impl MpscRecv for EmbassySyncBoundedReceiv async move { Some(chan.receive().await) } } - fn poll_recv( - &mut self, - cx: &mut core::task::Context<'_>, - ) -> core::task::Poll> { + fn poll_recv(&mut self, cx: &mut core::task::Context<'_>) -> core::task::Poll> { use core::pin::Pin; // Try non-blocking receive first. if let Ok(val) = self.0.try_receive() { @@ -165,34 +160,60 @@ pub struct EmbassySyncChannels; impl ChannelFactory for EmbassySyncChannels { type OneshotSender = EmbassySyncOneshotSender; type OneshotReceiver = EmbassySyncOneshotReceiver; - fn oneshot() -> (Self::OneshotSender, Self::OneshotReceiver) { + + // Phase 13.6a: the const-N quirk is fixed. The `N` from the trait + // call site now propagates into the embassy `Channel<_, T, N>` + // storage, so callers asking for capacity 16 actually get 16, and + // callers asking for 4 actually get 4. + type BoundedSender = EmbassySyncBoundedSender; + type BoundedReceiver = EmbassySyncBoundedReceiver; + + type UnboundedSender = EmbassySyncUnboundedSender; + type UnboundedReceiver = EmbassySyncUnboundedReceiver; + + // The three constructor methods use the trait's default bodies, + // which delegate to the per-`T` `*Pooled` + // blanket impls below. Embassy-sync still allocates per call + // (`Arc>`); the no-alloc story lives in + // `crate::static_channels` (phase 13.6c+) which publishes per-`T` + // `*Pooled` impls instead of a blanket. +} + +// Blanket `*Pooled` impls. Embassy-sync still heap-allocates per call +// (one `Arc>` per pair); the goal of these blanket impls +// is API parity with `TokioChannels`, not zero-alloc — that's the +// `static_channels` job. +impl OneshotPooled for T { + fn oneshot_pair() -> ( + ::OneshotSender, + ::OneshotReceiver, + ) { let chan = Arc::new(Channel::new()); ( EmbassySyncOneshotSender(chan.clone()), EmbassySyncOneshotReceiver(chan), ) } +} - // Phase 13.6: the const-N quirk is fixed. The `N` from the trait - // call site now propagates into the embassy `Channel<_, T, N>` - // storage, so callers asking for capacity 16 actually get 16, and - // callers asking for 4 actually get 4. (Previously this impl - // hardcoded 16 regardless of the requested N.) - type BoundedSender = EmbassySyncBoundedSender; - type BoundedReceiver = EmbassySyncBoundedReceiver; - fn bounded( - ) -> (Self::BoundedSender, Self::BoundedReceiver) { +impl BoundedPooled for T { + fn bounded_pair() -> ( + ::BoundedSender, + ::BoundedReceiver, + ) { let chan: Arc> = Arc::new(Channel::new()); ( EmbassySyncBoundedSender(chan.clone()), EmbassySyncBoundedReceiver(chan), ) } +} - type UnboundedSender = EmbassySyncUnboundedSender; - type UnboundedReceiver = EmbassySyncUnboundedReceiver; - fn unbounded( - ) -> (Self::UnboundedSender, Self::UnboundedReceiver) { +impl UnboundedPooled for T { + fn unbounded_pair() -> ( + ::UnboundedSender, + ::UnboundedReceiver, + ) { let chan = Arc::new(Channel::new()); ( EmbassySyncUnboundedSender(chan.clone()), diff --git a/src/lib.rs b/src/lib.rs index 6534b59..b2beea5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -187,6 +187,8 @@ pub use client::{ pub use e2e::{E2ECheckStatus, E2EKey, E2EProfile}; #[cfg(feature = "server")] pub use server::Server; +#[cfg(feature = "server")] +pub use server::SubscriptionHandle; #[cfg(any(feature = "client-tokio", feature = "server"))] pub use tokio_transport::{TokioChannels, TokioSocket, TokioSpawner, TokioTimer, TokioTransport}; pub use transport::{ @@ -194,5 +196,3 @@ pub use transport::{ OneshotCancelled, OneshotRecv, OneshotSend, ReceivedDatagram, SocketOptions, Spawner, Timer, TransportError, TransportFactory, TransportSocket, UnboundedRecv, UnboundedSend, }; -#[cfg(feature = "server")] -pub use server::SubscriptionHandle; diff --git a/src/server/mod.rs b/src/server/mod.rs index f871764..5a880cb 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -2240,10 +2240,7 @@ mod tests { with_default(subscriber, || { // 0 endpoints → warn! "No IPv4 endpoint" branch. let iter_empty = sd::OptionIter::new(&[]); - assert_eq!( - extract_subscriber_endpoint(&iter_empty, 0, 0, 0, 0), - None - ); + assert_eq!(extract_subscriber_endpoint(&iter_empty, 0, 0, 0, 0), None); // 1 endpoint → trace! "Found IPv4 endpoint" branch. let mut buf_one = [0u8; 32]; diff --git a/src/server/subscription_manager.rs b/src/server/subscription_manager.rs index d561b83..7f2cbe5 100644 --- a/src/server/subscription_manager.rs +++ b/src/server/subscription_manager.rs @@ -325,9 +325,12 @@ impl SubscriptionHandle for Arc> { ) -> impl Future + Send + '_ { let this = self.clone(); async move { - this.write() - .await - .unsubscribe(service_id, instance_id, event_group_id, subscriber_addr); + this.write().await.unsubscribe( + service_id, + instance_id, + event_group_id, + subscriber_addr, + ); } } diff --git a/src/tokio_transport.rs b/src/tokio_transport.rs index bbe0e47..e170598 100644 --- a/src/tokio_transport.rs +++ b/src/tokio_transport.rs @@ -364,7 +364,9 @@ impl OneshotRecv for TokioOneshotReceiver { impl MpscSend for tokio::sync::mpsc::Sender { async fn send(&self, value: T) -> Result<(), ()> { - tokio::sync::mpsc::Sender::send(self, value).await.map_err(|_| ()) + tokio::sync::mpsc::Sender::send(self, value) + .await + .map_err(|_| ()) } } @@ -393,10 +395,6 @@ impl UnboundedRecv for TokioUnboundedReceiver { impl ChannelFactory for TokioChannels { type OneshotSender = tokio::sync::oneshot::Sender; type OneshotReceiver = TokioOneshotReceiver; - fn oneshot() -> (Self::OneshotSender, Self::OneshotReceiver) { - let (tx, rx) = tokio::sync::oneshot::channel(); - (tx, TokioOneshotReceiver(rx)) - } // Tokio's `mpsc` channels store capacity at runtime, so the // const-generic `N` is informational only — it does not affect @@ -404,14 +402,44 @@ impl ChannelFactory for TokioChannels { // `embassy_channels`). type BoundedSender = tokio::sync::mpsc::Sender; type BoundedReceiver = tokio::sync::mpsc::Receiver; - fn bounded( - ) -> (Self::BoundedSender, Self::BoundedReceiver) { - tokio::sync::mpsc::channel(N) - } type UnboundedSender = tokio::sync::mpsc::UnboundedSender; type UnboundedReceiver = TokioUnboundedReceiver; - fn unbounded() -> (Self::UnboundedSender, Self::UnboundedReceiver) { + + // The three constructor methods (`oneshot`, `bounded`, `unbounded`) + // use the trait's default bodies, which delegate to the per-`T` + // `*Pooled` blanket impls below. Tokio has a single + // shared allocator, so every `T: Send + 'static` is poolable; the + // blanket impls capture that. +} + +// Blanket `*Pooled` impls for every `T: Send + 'static` against +// `TokioChannels`. Tokio has a single shared allocator and so does not +// need per-`T` storage — each call constructs a fresh channel. +impl crate::transport::OneshotPooled for T { + fn oneshot_pair() -> ( + ::OneshotSender, + ::OneshotReceiver, + ) { + let (tx, rx) = tokio::sync::oneshot::channel(); + (tx, TokioOneshotReceiver(rx)) + } +} + +impl crate::transport::BoundedPooled for T { + fn bounded_pair() -> ( + ::BoundedSender, + ::BoundedReceiver, + ) { + tokio::sync::mpsc::channel(N) + } +} + +impl crate::transport::UnboundedPooled for T { + fn unbounded_pair() -> ( + ::UnboundedSender, + ::UnboundedReceiver, + ) { let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); (tx, TokioUnboundedReceiver(rx)) } @@ -426,7 +454,6 @@ impl ChannelFactory for TokioChannels { // has been moved to `crate::embassy_channels` (gated only by // `feature = "bare_metal"`) so it is reachable from any client build. - #[cfg(test)] mod tests { use super::*; diff --git a/src/transport.rs b/src/transport.rs index 933fc52..9c1e172 100644 --- a/src/transport.rs +++ b/src/transport.rs @@ -222,8 +222,8 @@ use core::future::Future; use core::net::{Ipv4Addr, SocketAddrV4}; use core::time::Duration; -use crate::e2e::{E2ECheckStatus, E2EKey, E2EProfile}; use crate::e2e::Error as E2EError; +use crate::e2e::{E2ECheckStatus, E2EKey, E2EProfile}; /// Portable I/O error kinds surfaced by transport implementations. /// @@ -714,22 +714,28 @@ pub trait InterfaceHandle: Clone + Send + Sync + 'static { #[cfg(feature = "std")] mod std_handle_impls { use super::{E2ERegistryHandle, InterfaceHandle}; - use crate::e2e::{E2ECheckStatus, E2EKey, E2EProfile, E2ERegistry}; use crate::e2e::Error as E2EError; + use crate::e2e::{E2ECheckStatus, E2EKey, E2EProfile, E2ERegistry}; use core::net::Ipv4Addr; use std::sync::{Arc, Mutex, RwLock}; impl E2ERegistryHandle for Arc> { fn register(&self, key: E2EKey, profile: E2EProfile) { - self.lock().expect("e2e registry lock poisoned").register(key, profile); + self.lock() + .expect("e2e registry lock poisoned") + .register(key, profile); } fn unregister(&self, key: &E2EKey) { - self.lock().expect("e2e registry lock poisoned").unregister(key); + self.lock() + .expect("e2e registry lock poisoned") + .unregister(key); } fn contains_key(&self, key: &E2EKey) -> bool { - self.lock().expect("e2e registry lock poisoned").contains_key(key) + self.lock() + .expect("e2e registry lock poisoned") + .contains_key(key) } fn protect( @@ -739,9 +745,12 @@ mod std_handle_impls { upper_header: [u8; 8], output: &mut [u8], ) -> Option> { - self.lock() - .expect("e2e registry lock poisoned") - .protect(key, payload, upper_header, output) + self.lock().expect("e2e registry lock poisoned").protect( + key, + payload, + upper_header, + output, + ) } fn check<'a>( @@ -824,10 +833,7 @@ pub trait MpscRecv: Send + 'static { /// Poll the channel without blocking. Used by `receive_any_unicast` to /// multiplex across several socket channels in a single `poll_fn` pass. - fn poll_recv( - &mut self, - cx: &mut core::task::Context<'_>, - ) -> core::task::Poll>; + fn poll_recv(&mut self, cx: &mut core::task::Context<'_>) -> core::task::Poll>; } /// The send half of an unbounded MPSC channel. @@ -867,13 +873,46 @@ pub trait UnboundedRecv: Send + 'static { /// - **unbounded** — notionally unbounded MPSC queue (embassy-sync /// implementations use a large-capacity channel). Used for the /// `ClientUpdate` stream from `Inner` to `Client`. +/// +/// # Per-`T` opt-in via the `*Pooled` traits (Phase 13.6b) +/// +/// The three constructor methods are generic over the channeled type +/// `T`, but a heap-free static-pool implementation needs to map each `T` +/// to a pre-declared `static` storage area. To make that mapping +/// type-safe — and to surface "you forgot to declare a pool for this +/// type" as a compile error rather than a runtime panic — each method +/// requires the channeled type to implement the corresponding +/// `*Pooled` trait and delegates the actual construction to it: +/// +/// ```ignore +/// fn oneshot() -> (...) where T: OneshotPooled { T::oneshot_pair() } +/// ``` +/// +/// Backends that have a single shared allocator (Tokio, embassy-sync) +/// publish a blanket `impl OneshotPooled for T` +/// (and its bounded / unbounded peers), so existing user code does not +/// notice the change. A static-pool backend instead publishes per-`T` +/// impls (typically generated by a `static_channels!` macro) that wire +/// each `T` to its declared pool. Calling `oneshot::()` +/// against such a backend fails at the call site with +/// `OneshotPooled is not implemented for NotDeclared`. pub trait ChannelFactory: Clone + Send + Sync + 'static { /// Oneshot sender type. type OneshotSender: OneshotSend; /// Oneshot receiver type. type OneshotReceiver: OneshotRecv; /// Create a oneshot channel pair. - fn oneshot() -> (Self::OneshotSender, Self::OneshotReceiver); + /// + /// Default body delegates to [`OneshotPooled::oneshot_pair`]; impls + /// rarely need to override this, they just publish the appropriate + /// `OneshotPooled` impls for the types they support. + #[must_use] + fn oneshot() -> (Self::OneshotSender, Self::OneshotReceiver) + where + T: OneshotPooled, + { + T::oneshot_pair() + } /// Bounded-channel sender type. The `const N: usize` parameter is /// the channel capacity; it must match the `N` passed to @@ -885,15 +924,62 @@ pub trait ChannelFactory: Clone + Send + Sync + 'static { /// Bounded-channel receiver type. See [`Self::BoundedSender`]. type BoundedReceiver: MpscRecv; /// Create a bounded channel with capacity `N`. - fn bounded( - ) -> (Self::BoundedSender, Self::BoundedReceiver); + /// + /// Default body delegates to [`BoundedPooled::bounded_pair`]. + #[must_use] + fn bounded() -> (Self::BoundedSender, Self::BoundedReceiver) + where + T: BoundedPooled, + { + T::bounded_pair() + } /// Unbounded-channel sender type. type UnboundedSender: UnboundedSend; /// Unbounded-channel receiver type. type UnboundedReceiver: UnboundedRecv; /// Create an unbounded channel. - fn unbounded() -> (Self::UnboundedSender, Self::UnboundedReceiver); + /// + /// Default body delegates to [`UnboundedPooled::unbounded_pair`]. + #[must_use] + fn unbounded() -> (Self::UnboundedSender, Self::UnboundedReceiver) + where + T: UnboundedPooled, + { + T::unbounded_pair() + } +} + +/// Per-`T` opt-in for [`ChannelFactory::oneshot`]. +/// +/// Implementors declare "this `T` may be channeled through `C`'s oneshot +/// family" and provide the construction. Backends with a single shared +/// allocator (Tokio, embassy-sync) publish a blanket +/// `impl OneshotPooled for T`. Static-pool +/// backends publish per-`T` impls — typically via a macro — each +/// pointing at a declared `static` pool slot. +/// +/// The trait is parameterized over the channel factory `C` so a single +/// `T` may participate in multiple backends without conflicting impls. +pub trait OneshotPooled: Send + Sized + 'static { + /// Build a `(sender, receiver)` pair through `C`'s oneshot family. + fn oneshot_pair() -> (C::OneshotSender, C::OneshotReceiver); +} + +/// Per-`(T, N)` opt-in for [`ChannelFactory::bounded`]. See +/// [`OneshotPooled`] for the design rationale; this is the bounded peer +/// with capacity baked into the type. +pub trait BoundedPooled: Send + Sized + 'static { + /// Build a `(sender, receiver)` pair through `C`'s bounded family + /// with capacity `N`. + fn bounded_pair() -> (C::BoundedSender, C::BoundedReceiver); +} + +/// Per-`T` opt-in for [`ChannelFactory::unbounded`]. See +/// [`OneshotPooled`] for the design rationale. +pub trait UnboundedPooled: Send + Sized + 'static { + /// Build a `(sender, receiver)` pair through `C`'s unbounded family. + fn unbounded_pair() -> (C::UnboundedSender, C::UnboundedReceiver); } #[cfg(test)] @@ -1099,9 +1185,10 @@ mod tests { fn null_e2e_registry_compiles() { let r = NullE2ERegistry; let key = E2EKey::new(0, 0); - r.register(key, crate::e2e::E2EProfile::Profile4( - crate::e2e::Profile4Config::new(0, 8), - )); + r.register( + key, + crate::e2e::E2EProfile::Profile4(crate::e2e::Profile4Config::new(0, 8)), + ); assert!(!r.contains_key(&key)); assert!(r.check(key, b"hello", [0; 8]).is_none()); } diff --git a/tests/bare_metal_client.rs b/tests/bare_metal_client.rs index 56b5caf..fb1177f 100644 --- a/tests/bare_metal_client.rs +++ b/tests/bare_metal_client.rs @@ -132,11 +132,7 @@ impl TransportSocket for MockSocket { type SendFuture<'a> = MockSendFut; type RecvFuture<'a> = MockRecvFut<'a>; - fn send_to<'a>( - &'a self, - buf: &'a [u8], - target: SocketAddrV4, - ) -> Self::SendFuture<'a> { + fn send_to<'a>(&'a self, buf: &'a [u8], target: SocketAddrV4) -> Self::SendFuture<'a> { MockSendFut { pipe: Arc::clone(&self.pipe), bytes: Some(buf.to_vec()), @@ -155,19 +151,11 @@ impl TransportSocket for MockSocket { Ok(self.local) } - fn join_multicast_v4( - &self, - _group: Ipv4Addr, - _iface: Ipv4Addr, - ) -> Result<(), TransportError> { + fn join_multicast_v4(&self, _group: Ipv4Addr, _iface: Ipv4Addr) -> Result<(), TransportError> { Ok(()) } - fn leave_multicast_v4( - &self, - _group: Ipv4Addr, - _iface: Ipv4Addr, - ) -> Result<(), TransportError> { + fn leave_multicast_v4(&self, _group: Ipv4Addr, _iface: Ipv4Addr) -> Result<(), TransportError> { Ok(()) } } From 6cbece832fccb5ce5cd0bbbcd1683c9b170565fb Mon Sep 17 00:00:00 2001 From: Justin Kovacich Date: Tue, 28 Apr 2026 09:31:09 -0400 Subject: [PATCH 3/6] phase 13.6c: src/static_channels/ pool primitives MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `src/static_channels/mod.rs` (~600 LOC) introduces no-alloc pool primitives that back the upcoming `static_channels!` macro (phase 13.6d). The module is gated on `feature = "bare_metal"` and sits beside `crate::embassy_channels` (which still heap-alloc's `Arc>` per call). Three families: - `OneshotSlot` + `OneshotPool` → `StaticOneshotSender` / `StaticOneshotReceiver` implementing `OneshotSend` / `OneshotRecv`. - `MpscSlot` + `MpscPool` → `StaticBoundedSender` / `StaticBoundedReceiver` implementing `MpscSend` / `MpscRecv`. - The same `MpscPool::claim_unbounded` returns `StaticUnboundedSender` / `StaticUnboundedReceiver` implementing `UnboundedSend` / `UnboundedRecv` (different trait semantics, same slot machinery — `SLOT_CAP` is the effective "unbounded" capacity). Design notes baked into the doc comment: - All slot/pool types have const `new()`, so a `static` array of slots initializes in const context (`[const { Slot::new() }; N]`, stable since 1.79). - Free-list seeded lazily on the first `claim`. Operations are serialized via `embassy_sync::blocking_mutex::Mutex>`, sidestepping Treiber-stack ABA without giving up no-alloc. - Slot-index recovery on release uses pointer arithmetic against the pool's `slots[0]` base — handles never carry the pool's `POOL_SIZE` const-generic. Reclaim hook is a `&'static dyn *Reclaim` trait object on each handle (one vtable indirection per drop), erasing both `POOL_SIZE` and `SLOT_CAP` from the public sender/receiver types. - Cancellation: oneshot sender drop sets a slot-level cancel bit and wakes a per-slot `AtomicWaker`; receiver's `recv()` future re-checks after registering both the cancel waker and the channel's internal waker (registered via a transient stack-pinned `chan.receive()` future, same trick the existing `EmbassySyncBoundedReceiver::poll_recv` uses). MPSC mirrors this: last-sender-drops sets `closed`, wakes the receiver, and `recv()` resolves to `None`. Pool exhaustion semantics: `claim*()` returns `Option`. The forthcoming `*Pooled::*_pair` impls cannot signal exhaustion through their return type (the `ChannelFactory` trait's three constructor methods return un-fallible pairs), so the macro will `expect()` on `claim` — pool exhaustion becomes a panic documented as a configuration error. Receiver-drop-while-bounded-sender-blocked-on-full-channel is an accepted v1 limitation. Documented in the module docstring. Tests (7, all passing): oneshot send/recv happy path, sender-drop cancels receiver, claim/release cycles back to a fresh pool, pool exhaustion returns `None`, bounded send/recv, clone-then-drop-all closes the receiver, unbounded `send_now` returns `Err(value)` when the slot's fixed capacity is full. What this leaves for 13.6d: - The `static_channels!` declarative macro that takes a user-authored pool layout and emits per-`T` `*Pooled` impls dispatching to declared `static` pools. What this leaves for 13.6e: - `tests/static_channels_witness.rs` — alloc-panicking `#[global_allocator]` shim verifying zero heap allocation after `Client::new` returns. - `tests/bare_metal_client.rs` updated to use `StaticChannels`, dropping the `JoinHandle::abort` workaround once the end-of-life close semantics are exercised end-to-end. Verification: - `cargo build` clean: `bare_metal`, `client+bare_metal+std`, `client-tokio`, `server`, all-features. - `cargo test --all-features -- --test-threads=1`: 486 tests pass (464 lib including the 7 new static_channels + 11 client_server + 1 bare_metal_client + 1 bare_metal example + 9 doctests). - `cargo clippy --all-targets --all-features` clean. - `cargo fmt -- --check` clean. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/lib.rs | 6 + src/static_channels/mod.rs | 771 +++++++++++++++++++++++++++++++++++++ 2 files changed, 777 insertions(+) create mode 100644 src/static_channels/mod.rs diff --git a/src/lib.rs b/src/lib.rs index b2beea5..eceec62 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -166,6 +166,12 @@ pub mod tokio_transport; /// of any tokio dependency. #[cfg(feature = "bare_metal")] pub mod embassy_channels; +/// Static-pool no-alloc primitives for [`transport::ChannelFactory`]. +/// Backs the consumer-declared static `OneshotPool` / `MpscPool` +/// instances that the upcoming `static_channels!` macro (phase 13.6d) +/// generates per-`T` `*Pooled` impls against. +#[cfg(feature = "bare_metal")] +pub mod static_channels; mod traits; /// Executor-agnostic UDP transport abstraction used by the client and /// server modules. `no_std`-compatible; a default `std + tokio` backend diff --git a/src/static_channels/mod.rs b/src/static_channels/mod.rs new file mode 100644 index 0000000..53b8e51 --- /dev/null +++ b/src/static_channels/mod.rs @@ -0,0 +1,771 @@ +//! Static-pool no-alloc backend for [`ChannelFactory`]. +//! +//! [`crate::embassy_channels::EmbassySyncChannels`] heap-allocates one +//! `Arc>` per `oneshot()` / `bounded()` / `unbounded()` +//! call. On a real bare-metal target that violates the strategic +//! "zero heap after `Client::new` returns" goal, because +//! `Client`'s run-loop awaits a oneshot for every request-response +//! pair. +//! +//! This module hands out `&'static` references into pre-allocated +//! `static` pools instead. The user declares pools (typically via +//! the `static_channels!` macro in phase 13.6d) sized to their +//! workload's high-water mark; once seeded, no further allocation +//! occurs. +//! +//! # Per-`T` `*Pooled` impls +//! +//! Phase 13.6b reshaped `ChannelFactory` so each constructor method +//! requires `T: *Pooled`. Static-pool consumers publish per-`T` +//! impls that route to the appropriate pool. The +//! `static_channels!` macro generates them; the primitives in this +//! module are the runtime they call into. +//! +//! # Pool exhaustion +//! +//! If a [`OneshotPool::claim`] / [`MpscPool::claim`] call finds the +//! pool empty it returns `None`. The trait method +//! `*Pooled::*_pair() -> (Sender, Receiver)` cannot return `None` — +//! it has no error channel — so generated impls **panic** on +//! exhaustion. Sizing the pool to the workload's high-water mark is +//! the user's responsibility; an exhaustion panic is a config error, +//! not a runtime error. +//! +//! # Cancellation semantics +//! +//! - **Sender drop without `send`**: the slot's cancellation flag is +//! set; the receiver's pending `recv()` resolves to +//! `Err(OneshotCancelled)` (oneshot) or `None` (bounded / +//! unbounded mpsc, after the last sender drops). +//! - **Receiver drop**: any pending value in the slot is dropped when +//! the slot is reclaimed. Bounded senders blocked on a full +//! channel may deadlock if the receiver disappears — typical +//! bare-metal use keeps the receiver alive for the program's +//! lifetime, so this is an accepted limitation for v1. + +#![allow(clippy::module_name_repetitions)] + +use core::cell::Cell; +use core::future::{Future, poll_fn}; +use core::pin::Pin; +use core::sync::atomic::{AtomicBool, AtomicU8, AtomicUsize, Ordering}; +use core::task::Poll; + +use embassy_sync::blocking_mutex::Mutex as BlockingMutex; +use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex; +use embassy_sync::channel::Channel; +use embassy_sync::waitqueue::AtomicWaker; + +use crate::transport::{ + MpscRecv, MpscSend, OneshotCancelled, OneshotRecv, OneshotSend, UnboundedRecv, UnboundedSend, +}; + +// ── Oneshot ─────────────────────────────────────────────────────────── + +const O_SENDER_ALIVE: u8 = 0b001; +const O_RECEIVER_ALIVE: u8 = 0b010; +const O_CANCELLED: u8 = 0b100; + +/// One slot of a [`OneshotPool`]. Const-constructible so a `static` +/// array of slots can be initialized in const context. +pub struct OneshotSlot { + chan: Channel, + /// Woken by the sender's drop when it cancels without sending. + /// (The chan's internal waker handles the value-arrival path.) + cancel_waker: AtomicWaker, + /// `O_SENDER_ALIVE | O_RECEIVER_ALIVE | O_CANCELLED` bitmask. + state: AtomicU8, + /// Free-list link (1-based pool index; 0 = none). + next_free: AtomicUsize, +} + +impl OneshotSlot { + /// Const-constructible empty slot. + #[must_use] + pub const fn new() -> Self { + Self { + chan: Channel::new(), + cancel_waker: AtomicWaker::new(), + state: AtomicU8::new(0), + next_free: AtomicUsize::new(0), + } + } +} + +impl Default for OneshotSlot { + fn default() -> Self { + Self::new() + } +} + +/// Reclaim hook used by [`StaticOneshotSender`] / [`StaticOneshotReceiver`] +/// in their `Drop` impls. Erases the pool's `POOL_SIZE` so handles do +/// not carry it. +trait OneshotReclaim: Send + Sync + 'static { + fn release(&self, slot: &'static OneshotSlot); +} + +/// A pool of [`OneshotSlot`]s. Place in a `static` and call +/// [`Self::claim`] to obtain a sender/receiver pair. +pub struct OneshotPool { + slots: [OneshotSlot; POOL_SIZE], + free_head: BlockingMutex>, + seeded: AtomicBool, +} + +impl OneshotPool { + /// Const-constructible empty pool. Free-list is seeded lazily on + /// the first [`Self::claim`]. + #[must_use] + pub const fn new() -> Self { + Self { + slots: [const { OneshotSlot::new() }; POOL_SIZE], + free_head: BlockingMutex::new(Cell::new(0)), + seeded: AtomicBool::new(false), + } + } + + /// Try to obtain a fresh sender/receiver pair. Returns `None` if + /// the pool is exhausted. + pub fn claim(&'static self) -> Option<(StaticOneshotSender, StaticOneshotReceiver)> { + self.ensure_seeded(); + let slot = self.pop_free()?; + slot.state + .store(O_SENDER_ALIVE | O_RECEIVER_ALIVE, Ordering::Release); + // No stale value should be in the channel (we drained on + // release), but be defensive. + let _ = slot.chan.try_receive(); + Some(( + StaticOneshotSender { + slot, + pool: self, + sent: false, + }, + StaticOneshotReceiver { slot, pool: self }, + )) + } + + fn ensure_seeded(&self) { + if self + .seeded + .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire) + .is_ok() + { + // Link slots[0] -> slots[1] -> ... -> slots[N-1] -> 0. + for i in 0..POOL_SIZE { + let next = if i + 1 < POOL_SIZE { i + 2 } else { 0 }; + self.slots[i].next_free.store(next, Ordering::Release); + } + self.free_head.lock(|h| h.set(1)); + } + } + + fn pop_free(&self) -> Option<&OneshotSlot> { + self.free_head.lock(|h| { + let head = h.get(); + if head == 0 { + return None; + } + let slot = &self.slots[head - 1]; + let next = slot.next_free.load(Ordering::Acquire); + h.set(next); + slot.next_free.store(0, Ordering::Release); + Some(slot) + }) + } +} + +impl Default for OneshotPool { + fn default() -> Self { + Self::new() + } +} + +impl OneshotReclaim for OneshotPool { + fn release(&self, slot: &'static OneshotSlot) { + let base = self.slots.as_ptr() as usize; + let here = core::ptr::from_ref::>(slot) as usize; + let stride = core::mem::size_of::>(); + debug_assert!(stride > 0, "OneshotSlot must be sized"); + debug_assert!(here >= base); + let idx = (here - base) / stride; + debug_assert!(idx < POOL_SIZE, "slot does not belong to this pool"); + // Drop any stale value still in the channel. + let _ = slot.chan.try_receive(); + slot.state.store(0, Ordering::Release); + self.free_head.lock(|h| { + slot.next_free.store(h.get(), Ordering::Release); + h.set(idx + 1); + }); + } +} + +/// Send half of a static-pool oneshot. +pub struct StaticOneshotSender { + slot: &'static OneshotSlot, + pool: &'static dyn OneshotReclaim, + sent: bool, +} + +impl OneshotSend for StaticOneshotSender { + fn send(mut self, value: T) -> Result<(), T> { + match self.slot.chan.try_send(value) { + Ok(()) => { + self.sent = true; + // Wake the receiver via cancel_waker too — its poll_fn + // re-checks the channel after the chan-internal waker + // wakes it, but waking cancel_waker also covers the + // case where the receiver registered there last. + self.slot.cancel_waker.wake(); + Ok(()) + } + Err(embassy_sync::channel::TrySendError::Full(v)) => Err(v), + } + } +} + +impl Drop for StaticOneshotSender { + fn drop(&mut self) { + if !self.sent { + self.slot.state.fetch_or(O_CANCELLED, Ordering::AcqRel); + self.slot.cancel_waker.wake(); + } + let prev = self.slot.state.fetch_and(!O_SENDER_ALIVE, Ordering::AcqRel); + let after = prev & !O_SENDER_ALIVE; + if (after & O_RECEIVER_ALIVE) == 0 { + self.pool.release(self.slot); + } + } +} + +/// Receive half of a static-pool oneshot. +pub struct StaticOneshotReceiver { + slot: &'static OneshotSlot, + pool: &'static dyn OneshotReclaim, +} + +impl OneshotRecv for StaticOneshotReceiver { + async fn recv(self) -> Result { + let slot = self.slot; + let result = poll_fn(move |cx| { + // 1. Try the channel first. + if let Ok(v) = slot.chan.try_receive() { + return Poll::Ready(Ok(v)); + } + // 2. Check cancellation. + if slot.state.load(Ordering::Acquire) & O_CANCELLED != 0 { + return Poll::Ready(Err(OneshotCancelled)); + } + // 3. Register on the cancel waker. + slot.cancel_waker.register(cx.waker()); + // 4. Register on the channel's internal waker by polling + // a transient receive future. embassy-sync registers + // the waker on poll and does not unregister on drop. + { + let mut fut = slot.chan.receive(); + // SAFETY: `fut` is stack-pinned, polled exactly + // once, then dropped before this scope ends. No + // reference to `fut` escapes. + let pinned = unsafe { Pin::new_unchecked(&mut fut) }; + if let Poll::Ready(v) = pinned.poll(cx) { + return Poll::Ready(Ok(v)); + } + } + // 5. Final re-check to close the lost-wakeup window + // between the early try_receive and the waker + // registrations. + if let Ok(v) = slot.chan.try_receive() { + return Poll::Ready(Ok(v)); + } + if slot.state.load(Ordering::Acquire) & O_CANCELLED != 0 { + return Poll::Ready(Err(OneshotCancelled)); + } + Poll::Pending + }) + .await; + // `self` drops here on return, running receiver-side bookkeeping. + drop(self); + result + } +} + +impl Drop for StaticOneshotReceiver { + fn drop(&mut self) { + let prev = self + .slot + .state + .fetch_and(!O_RECEIVER_ALIVE, Ordering::AcqRel); + let after = prev & !O_RECEIVER_ALIVE; + if (after & O_SENDER_ALIVE) == 0 { + self.pool.release(self.slot); + } + } +} + +// ── Mpsc (bounded + unbounded share the slot/pool machinery) ────────── + +/// One slot of an [`MpscPool`]. Const-constructible. +/// +/// Used by both bounded ([`StaticBoundedSender`] / +/// [`StaticBoundedReceiver`]) and unbounded ([`StaticUnboundedSender`] +/// / [`StaticUnboundedReceiver`]) pools — the public sender/receiver +/// types differ, but the slot machinery is shared. +pub struct MpscSlot { + chan: Channel, + /// Wakes the receiver on close. + close_waker: AtomicWaker, + /// Number of live senders (clones) + 1 if receiver is alive. + /// 0 → slot returns to free list. + refcount: AtomicUsize, + /// Set when the last sender drops while receiver is still alive, + /// so the receiver's `recv()` resolves to `None`. + closed: AtomicBool, + next_free: AtomicUsize, +} + +impl MpscSlot { + /// Const-constructible empty slot. + #[must_use] + pub const fn new() -> Self { + Self { + chan: Channel::new(), + close_waker: AtomicWaker::new(), + refcount: AtomicUsize::new(0), + closed: AtomicBool::new(false), + next_free: AtomicUsize::new(0), + } + } +} + +impl Default for MpscSlot { + fn default() -> Self { + Self::new() + } +} + +trait MpscReclaim: Send + Sync + 'static { + fn release(&self, slot: &'static MpscSlot); +} + +/// A pool of [`MpscSlot`]s. Place in a `static` and call +/// [`Self::claim_bounded`] or [`Self::claim_unbounded`]. +pub struct MpscPool { + slots: [MpscSlot; POOL_SIZE], + free_head: BlockingMutex>, + seeded: AtomicBool, +} + +impl + MpscPool +{ + /// Const-constructible empty pool. + #[must_use] + pub const fn new() -> Self { + Self { + slots: [const { MpscSlot::new() }; POOL_SIZE], + free_head: BlockingMutex::new(Cell::new(0)), + seeded: AtomicBool::new(false), + } + } + + /// Claim a slot for use as a bounded MPSC channel. + pub fn claim_bounded( + &'static self, + ) -> Option<( + StaticBoundedSender, + StaticBoundedReceiver, + )> { + let slot = self.claim_inner()?; + Some(( + StaticBoundedSender { slot, pool: self }, + StaticBoundedReceiver { slot, pool: self }, + )) + } + + /// Claim a slot for use as an unbounded MPSC channel. (Embassy-sync + /// has no truly unbounded channel; this uses `SLOT_CAP` as the + /// effective capacity.) + pub fn claim_unbounded( + &'static self, + ) -> Option<( + StaticUnboundedSender, + StaticUnboundedReceiver, + )> { + let slot = self.claim_inner()?; + Some(( + StaticUnboundedSender { slot, pool: self }, + StaticUnboundedReceiver { slot, pool: self }, + )) + } + + fn claim_inner(&'static self) -> Option<&'static MpscSlot> { + self.ensure_seeded(); + let slot = self.pop_free()?; + slot.refcount.store(2, Ordering::Release); // 1 sender + 1 receiver. + slot.closed.store(false, Ordering::Release); + // Defensive: drain any stale value. + while slot.chan.try_receive().is_ok() {} + Some(slot) + } + + fn ensure_seeded(&self) { + if self + .seeded + .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire) + .is_ok() + { + for i in 0..POOL_SIZE { + let next = if i + 1 < POOL_SIZE { i + 2 } else { 0 }; + self.slots[i].next_free.store(next, Ordering::Release); + } + self.free_head.lock(|h| h.set(1)); + } + } + + fn pop_free(&self) -> Option<&MpscSlot> { + self.free_head.lock(|h| { + let head = h.get(); + if head == 0 { + return None; + } + let slot = &self.slots[head - 1]; + let next = slot.next_free.load(Ordering::Acquire); + h.set(next); + slot.next_free.store(0, Ordering::Release); + Some(slot) + }) + } +} + +impl Default + for MpscPool +{ + fn default() -> Self { + Self::new() + } +} + +impl MpscReclaim + for MpscPool +{ + fn release(&self, slot: &'static MpscSlot) { + let base = self.slots.as_ptr() as usize; + let here = core::ptr::from_ref::>(slot) as usize; + let stride = core::mem::size_of::>(); + debug_assert!(stride > 0); + debug_assert!(here >= base); + let idx = (here - base) / stride; + debug_assert!(idx < POOL_SIZE); + while slot.chan.try_receive().is_ok() {} + slot.refcount.store(0, Ordering::Release); + slot.closed.store(false, Ordering::Release); + self.free_head.lock(|h| { + slot.next_free.store(h.get(), Ordering::Release); + h.set(idx + 1); + }); + } +} + +// ── Bounded MPSC handles ────────────────────────────────────────────── + +/// Bounded sender backed by a [`MpscPool`]. `Clone` increments the +/// slot's sender refcount; the receiver's `recv()` resolves to `None` +/// only after every clone (and the original) has been dropped. +pub struct StaticBoundedSender { + slot: &'static MpscSlot, + pool: &'static dyn MpscReclaim, +} + +impl Clone for StaticBoundedSender { + fn clone(&self) -> Self { + self.slot.refcount.fetch_add(1, Ordering::AcqRel); + Self { + slot: self.slot, + pool: self.pool, + } + } +} + +impl Drop for StaticBoundedSender { + fn drop(&mut self) { + // If we are the last sender (and receiver is alive — i.e. + // refcount goes from 2→1 with the receiver-bit being the + // remaining one), set closed + wake. + let prev = self.slot.refcount.fetch_sub(1, Ordering::AcqRel); + if prev == 2 { + // Could be either "last sender, receiver alive" (we want + // to close+wake) or "last receiver, sender alive" (no + // close/wake — that's the receiver's drop). To + // distinguish, set closed before decrementing? Simpler: + // set closed unconditionally here. If the receiver was + // the one that just dropped, `closed` is meaningless — + // the slot will be reclaimed when refcount hits 0. + self.slot.closed.store(true, Ordering::Release); + self.slot.close_waker.wake(); + } else if prev == 1 { + self.pool.release(self.slot); + } + } +} + +impl MpscSend for StaticBoundedSender { + async fn send(&self, value: T) -> Result<(), ()> { + self.slot.chan.send(value).await; + Ok(()) + } +} + +/// Bounded receiver backed by a [`MpscPool`]. +pub struct StaticBoundedReceiver { + slot: &'static MpscSlot, + pool: &'static dyn MpscReclaim, +} + +impl Drop for StaticBoundedReceiver { + fn drop(&mut self) { + // Receiver gone — mark closed so any pending send_now in + // unbounded variant returns errors. (Bounded send awaits; + // sender that's blocked on full chan won't be unblocked by + // this — accepted v1 limitation.) + self.slot.closed.store(true, Ordering::Release); + let prev = self.slot.refcount.fetch_sub(1, Ordering::AcqRel); + if prev == 1 { + self.pool.release(self.slot); + } + } +} + +impl MpscRecv for StaticBoundedReceiver { + fn recv(&mut self) -> impl Future> + Send + '_ { + let slot = self.slot; + async move { mpsc_recv_inner(slot).await } + } + + fn poll_recv(&mut self, cx: &mut core::task::Context<'_>) -> core::task::Poll> { + mpsc_poll_recv(self.slot, cx) + } +} + +// ── Unbounded MPSC handles ──────────────────────────────────────────── + +/// Unbounded sender — `send_now` returns `Err(value)` on a full slot +/// rather than blocking. Pool sizing must be generous enough that the +/// fixed-capacity slot is effectively unbounded for the workload; the +/// crate's existing Tokio path uses 128 as the default. +pub struct StaticUnboundedSender { + slot: &'static MpscSlot, + pool: &'static dyn MpscReclaim, +} + +impl Clone for StaticUnboundedSender { + fn clone(&self) -> Self { + self.slot.refcount.fetch_add(1, Ordering::AcqRel); + Self { + slot: self.slot, + pool: self.pool, + } + } +} + +impl Drop for StaticUnboundedSender { + fn drop(&mut self) { + let prev = self.slot.refcount.fetch_sub(1, Ordering::AcqRel); + if prev == 2 { + self.slot.closed.store(true, Ordering::Release); + self.slot.close_waker.wake(); + } else if prev == 1 { + self.pool.release(self.slot); + } + } +} + +impl UnboundedSend + for StaticUnboundedSender +{ + fn send_now(&self, value: T) -> Result<(), T> { + self.slot.chan.try_send(value).map_err(|e| match e { + embassy_sync::channel::TrySendError::Full(v) => v, + }) + } +} + +/// Unbounded receiver. +pub struct StaticUnboundedReceiver { + slot: &'static MpscSlot, + pool: &'static dyn MpscReclaim, +} + +impl Drop for StaticUnboundedReceiver { + fn drop(&mut self) { + self.slot.closed.store(true, Ordering::Release); + let prev = self.slot.refcount.fetch_sub(1, Ordering::AcqRel); + if prev == 1 { + self.pool.release(self.slot); + } + } +} + +impl UnboundedRecv + for StaticUnboundedReceiver +{ + fn recv(&mut self) -> impl Future> + Send + '_ { + let slot = self.slot; + async move { mpsc_recv_inner(slot).await } + } +} + +// ── Shared MPSC recv plumbing ───────────────────────────────────────── + +async fn mpsc_recv_inner( + slot: &'static MpscSlot, +) -> Option { + poll_fn(|cx| mpsc_poll_recv(slot, cx)).await +} + +fn mpsc_poll_recv( + slot: &'static MpscSlot, + cx: &mut core::task::Context<'_>, +) -> core::task::Poll> { + if let Ok(v) = slot.chan.try_receive() { + return Poll::Ready(Some(v)); + } + if slot.closed.load(Ordering::Acquire) { + // Drain race: a sender may have pushed a final value + // concurrently with closing. + if let Ok(v) = slot.chan.try_receive() { + return Poll::Ready(Some(v)); + } + return Poll::Ready(None); + } + slot.close_waker.register(cx.waker()); + { + let mut fut = slot.chan.receive(); + // SAFETY: `fut` is stack-pinned, polled once, then dropped. + let pinned = unsafe { Pin::new_unchecked(&mut fut) }; + if let Poll::Ready(v) = pinned.poll(cx) { + return Poll::Ready(Some(v)); + } + } + if let Ok(v) = slot.chan.try_receive() { + return Poll::Ready(Some(v)); + } + if slot.closed.load(Ordering::Acquire) { + if let Ok(v) = slot.chan.try_receive() { + return Poll::Ready(Some(v)); + } + return Poll::Ready(None); + } + Poll::Pending +} + +#[cfg(test)] +mod tests { + use super::*; + use core::future::Future; + use core::pin::pin; + use core::task::{Context, Poll, Waker}; + + fn poll_once(f: &mut core::pin::Pin<&mut F>) -> Poll { + let waker = Waker::noop(); + let mut cx = Context::from_waker(waker); + f.as_mut().poll(&mut cx) + } + + // ── Oneshot tests ───────────────────────────────────────────────── + + static ONESHOT_POOL_4: OneshotPool = OneshotPool::new(); + + #[test] + fn oneshot_send_recv_happy_path() { + let (tx, rx) = ONESHOT_POOL_4.claim().expect("pool not empty"); + tx.send(42).unwrap(); + let mut fut = pin!(rx.recv()); + match poll_once(&mut fut) { + Poll::Ready(Ok(v)) => assert_eq!(v, 42), + other => panic!("expected ready ok, got {other:?}"), + } + } + + #[test] + fn oneshot_sender_drop_cancels_receiver() { + let (tx, rx) = ONESHOT_POOL_4.claim().expect("pool not empty"); + drop(tx); + let mut fut = pin!(rx.recv()); + match poll_once(&mut fut) { + Poll::Ready(Err(OneshotCancelled)) => {} + other => panic!("expected cancelled, got {other:?}"), + } + } + + #[test] + fn oneshot_claim_release_cycles() { + static POOL: OneshotPool = OneshotPool::new(); + // Claim all 4, verify pool is exhausted, drop, re-claim. + let p1 = POOL.claim().unwrap(); + let p2 = POOL.claim().unwrap(); + let p3 = POOL.claim().unwrap(); + let p4 = POOL.claim().unwrap(); + assert!(POOL.claim().is_none(), "5th claim must exhaust"); + drop((p1, p2, p3, p4)); + let p5 = POOL.claim(); + assert!(p5.is_some(), "post-drop claim must succeed"); + } + + #[test] + fn oneshot_pool_exhaustion_returns_none() { + static POOL_2: OneshotPool = OneshotPool::new(); + let _a = POOL_2.claim().unwrap(); + let _b = POOL_2.claim().unwrap(); + assert!(POOL_2.claim().is_none(), "third claim must exhaust"); + } + + // ── Bounded MPSC tests ──────────────────────────────────────────── + + static MPSC_POOL: MpscPool = MpscPool::new(); + + #[test] + fn mpsc_bounded_send_recv() { + let (tx, mut rx) = MPSC_POOL.claim_bounded().expect("pool not empty"); + let mut send_fut = pin!(tx.send(7)); + assert!(matches!(poll_once(&mut send_fut), Poll::Ready(Ok(())))); + let mut recv_fut = pin!(rx.recv()); + match poll_once(&mut recv_fut) { + Poll::Ready(Some(7)) => {} + other => panic!("expected ready Some(7), got {other:?}"), + } + } + + #[test] + fn mpsc_bounded_clone_then_drop_all_closes_receiver() { + static POOL: MpscPool = MpscPool::new(); + let (tx, mut rx) = POOL.claim_bounded().expect("pool not empty"); + let tx2 = tx.clone(); + drop(tx); + // One clone still alive — receiver should not be closed yet. + { + let mut recv_fut = pin!(rx.recv()); + assert!(matches!(poll_once(&mut recv_fut), Poll::Pending)); + } + drop(tx2); + // All senders gone → receiver resolves to None. + let mut recv_fut = pin!(rx.recv()); + match poll_once(&mut recv_fut) { + Poll::Ready(None) => {} + other => panic!("expected ready None, got {other:?}"), + } + } + + // ── Unbounded MPSC tests ────────────────────────────────────────── + + #[test] + fn unbounded_send_now_returns_full_when_capacity_exhausted() { + static POOL: MpscPool = MpscPool::new(); + let (tx, _rx) = POOL.claim_unbounded().expect("pool not empty"); + assert!(tx.send_now(1).is_ok()); + assert!(tx.send_now(2).is_ok()); + match tx.send_now(3) { + Err(3) => {} + other => panic!("expected Err(3), got {other:?}"), + } + } +} From c17e0f65c2fae11f5404077b8bc0078ff184bfea Mon Sep 17 00:00:00 2001 From: Justin Kovacich Date: Tue, 28 Apr 2026 09:36:09 -0400 Subject: [PATCH 4/6] phase 13.6d: define_static_channels! macro MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Declarative macro that takes a user-authored pool layout and emits the per-`T` `*Pooled` impls + a `ChannelFactory` impl on a unit struct. Lives in `src/static_channels/mod.rs` next to the primitives, exported at crate root via `#[macro_export]`. Macro grammar: define_static_channels! { name: MyChannels, oneshot: [ (Result<(), MyError>, 80), (RebootResponse, 4), ], bounded: [ ((ControlMessage, 4), 1), ((SendMessage, 16), 8), ], unbounded: [ (ClientUpdate

, 1), ], } Each entry is a tuple. Bounded uses `((T, slot_cap), pool_size)` to let the `:ty` matcher disambiguate the type from the literals. Unbounded entries take `(T, pool_size)` only — every unbounded slot gets `UNBOUNDED_DEFAULT_CAP = 128` (matching the existing embassy-sync default), exposed as a public const. Generated impls: - One unit struct + `ChannelFactory` impl with associated types pointing at this module's `StaticOneshotSender` / `StaticBoundedSender<_, N>` / `StaticUnboundedSender<_, 128>` (and matching receivers). - One `OneshotPooled<$name> for T` impl per oneshot entry, each wrapping a function-local `static OneshotPool`. Function-scoped statics dodge name-collision across types without a `paste!` dep. - Same shape for `BoundedPooled<$name, SLOT_CAP> for T` and `UnboundedPooled<$name> for T`. - Pool exhaustion reaches the user as a panic with a stringified type name and pool-size in the message. Tests (3 added, 10 in static_channels total): - `macro_oneshot_dispatches_through_factory` — `MyChannels::oneshot::()` end-to-end through the `ChannelFactory` trait. - `macro_bounded_dispatches_through_factory` — same for `bounded::()`. - `macro_unbounded_dispatches_through_factory` — same for `unbounded::()`. What this leaves for 13.6e: - `tests/static_channels_witness.rs` — alloc-panicking `#[global_allocator]` shim verifying zero heap allocation after `Client::new` returns, declaring the client's `MyChannels` via this macro. - `tests/bare_metal_client.rs` updated to use a macro-declared `StaticChannels`, dropping the `JoinHandle::abort` workaround. Verification: - `cargo build` clean across `bare_metal`, `client+bare_metal+std`, `client-tokio`, `server`, all-features. - `cargo test --all-features -- --test-threads=1`: 489 tests pass (467 lib including the 10 static_channels + 11 client_server + 1 bare_metal_client + 1 bare_metal example + 9 doctests). - `cargo clippy --all-targets --all-features` clean. - `cargo fmt -- --check` clean. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/static_channels/mod.rs | 204 +++++++++++++++++++++++++++++++++++++ 1 file changed, 204 insertions(+) diff --git a/src/static_channels/mod.rs b/src/static_channels/mod.rs index 53b8e51..4dc97ee 100644 --- a/src/static_channels/mod.rs +++ b/src/static_channels/mod.rs @@ -658,6 +658,157 @@ fn mpsc_poll_recv( Poll::Pending } +// ── `define_static_channels!` macro ─────────────────────────────────── + +/// Default slot capacity for unbounded channels declared via +/// [`define_static_channels!`]. Matches the value used by the +/// embassy-sync-backed `EmbassySyncChannels::unbounded`. Each +/// unbounded `T` declared in the macro gets its own `MpscPool` +/// sized at `pool_size × UNBOUNDED_DEFAULT_CAP`. +pub const UNBOUNDED_DEFAULT_CAP: usize = 128; + +/// Generates a no-alloc [`ChannelFactory`] from a user-authored pool +/// layout. +/// +/// [`ChannelFactory`]: crate::transport::ChannelFactory +/// +/// The macro emits: +/// - A unit struct `pub struct $name;` implementing +/// [`ChannelFactory`] with associated types pointing at this +/// module's [`StaticOneshotSender`] / `StaticBoundedSender` / +/// `StaticUnboundedSender` (and matching receivers). +/// - One `impl OneshotPooled<$name> for T` per `oneshot` entry, +/// wrapping a function-local `static OneshotPool`. +/// - One `impl BoundedPooled<$name, SLOT_CAP> for T` per `bounded` +/// entry. +/// - One `impl UnboundedPooled<$name> for T` per `unbounded` entry, +/// each backed by an `MpscPool`. +/// +/// Pool exhaustion in the generated `*_pair()` impls is reported +/// via `expect()` (see module-level docs). +/// +/// # Example +/// +/// ```ignore +/// use simple_someip::define_static_channels; +/// +/// define_static_channels! { +/// name: MyChannels, +/// oneshot: [ +/// (Result<(), MyError>, 80), +/// (RebootResponse, 4), +/// ], +/// bounded: [ +/// ((ControlMessage, 4), 1), +/// ((SendMessage, 16), 8), +/// ], +/// unbounded: [ +/// (ClientUpdate

, 1), +/// ], +/// } +/// ``` +/// +/// All three sections are required; pass an empty `[]` if a family +/// has no entries. The bounded entry shape is +/// `((Type, slot_cap), pool_size)` to disambiguate the slot cap +/// from the pool size in the macro grammar. +#[macro_export] +macro_rules! define_static_channels { + ( + name: $name:ident, + oneshot: [ $( ($ot:ty, $opool:literal) ),* $(,)? ], + bounded: [ $( (($bt:ty, $bcap:literal), $bpool:literal) ),* $(,)? ], + unbounded: [ $( ($ut:ty, $upool:literal) ),* $(,)? ] $(,)? + ) => { + #[derive(Clone, Copy)] + pub struct $name; + + impl $crate::transport::ChannelFactory for $name { + type OneshotSender = + $crate::static_channels::StaticOneshotSender; + type OneshotReceiver = + $crate::static_channels::StaticOneshotReceiver; + type BoundedSender = + $crate::static_channels::StaticBoundedSender; + type BoundedReceiver = + $crate::static_channels::StaticBoundedReceiver; + type UnboundedSender = + $crate::static_channels::StaticUnboundedSender< + T, + { $crate::static_channels::UNBOUNDED_DEFAULT_CAP }, + >; + type UnboundedReceiver = + $crate::static_channels::StaticUnboundedReceiver< + T, + { $crate::static_channels::UNBOUNDED_DEFAULT_CAP }, + >; + } + + $( + impl $crate::transport::OneshotPooled<$name> for $ot { + fn oneshot_pair() -> ( + <$name as $crate::transport::ChannelFactory>::OneshotSender, + <$name as $crate::transport::ChannelFactory>::OneshotReceiver, + ) { + static POOL: $crate::static_channels::OneshotPool<$ot, $opool> = + $crate::static_channels::OneshotPool::new(); + POOL.claim().expect(::core::concat!( + "OneshotPool<", + ::core::stringify!($ot), + ", ", + ::core::stringify!($opool), + "> exhausted; increase the pool size declared in define_static_channels!" + )) + } + } + )* + + $( + impl $crate::transport::BoundedPooled<$name, $bcap> for $bt { + fn bounded_pair() -> ( + <$name as $crate::transport::ChannelFactory>::BoundedSender, + <$name as $crate::transport::ChannelFactory>::BoundedReceiver, + ) { + static POOL: $crate::static_channels::MpscPool<$bt, $bpool, $bcap> = + $crate::static_channels::MpscPool::new(); + POOL.claim_bounded().expect(::core::concat!( + "MpscPool<", + ::core::stringify!($bt), + ", pool=", + ::core::stringify!($bpool), + ", slot_cap=", + ::core::stringify!($bcap), + "> exhausted; increase the pool size declared in define_static_channels!" + )) + } + } + )* + + $( + impl $crate::transport::UnboundedPooled<$name> for $ut { + fn unbounded_pair() -> ( + <$name as $crate::transport::ChannelFactory>::UnboundedSender, + <$name as $crate::transport::ChannelFactory>::UnboundedReceiver, + ) { + static POOL: $crate::static_channels::MpscPool< + $ut, + $upool, + { $crate::static_channels::UNBOUNDED_DEFAULT_CAP }, + > = $crate::static_channels::MpscPool::new(); + POOL.claim_unbounded().expect(::core::concat!( + "MpscPool<", + ::core::stringify!($ut), + ", pool=", + ::core::stringify!($upool), + ", unbounded> exhausted; increase the pool size declared in define_static_channels!" + )) + } + } + )* + }; +} + #[cfg(test)] mod tests { use super::*; @@ -768,4 +919,57 @@ mod tests { other => panic!("expected Err(3), got {other:?}"), } } + + // ── define_static_channels! macro ───────────────────────────────── + + // Witness that the macro expands to a `ChannelFactory` with all + // three families wired and that the per-`T` `*Pooled` impls + // dispatch correctly. + crate::define_static_channels! { + name: MacroTestChannels, + oneshot: [ + (u32, 4), + (Result, 2), + ], + bounded: [ + ((u8, 4), 2), + ], + unbounded: [ + (u16, 1), + ], + } + + #[test] + fn macro_oneshot_dispatches_through_factory() { + use crate::transport::{ChannelFactory, OneshotSend}; + let (tx, rx) = MacroTestChannels::oneshot::(); + tx.send(99).unwrap(); + let mut fut = pin!(<_ as crate::transport::OneshotRecv>::recv(rx)); + match poll_once(&mut fut) { + Poll::Ready(Ok(99)) => {} + other => panic!("expected ready Ok(99), got {other:?}"), + } + } + + #[test] + fn macro_bounded_dispatches_through_factory() { + use crate::transport::{ChannelFactory, MpscRecv, MpscSend}; + let (tx, mut rx) = MacroTestChannels::bounded::(); + { + let mut send_fut = pin!(tx.send(7)); + assert!(matches!(poll_once(&mut send_fut), Poll::Ready(Ok(())))); + } + let mut recv_fut = pin!(rx.recv()); + match poll_once(&mut recv_fut) { + Poll::Ready(Some(7)) => {} + other => panic!("expected ready Some(7), got {other:?}"), + } + } + + #[test] + fn macro_unbounded_dispatches_through_factory() { + use crate::transport::{ChannelFactory, UnboundedSend}; + let (tx, _rx) = MacroTestChannels::unbounded::(); + assert!(tx.send_now(1234).is_ok()); + } } From 1bd1fc0257f5bd0aaf42cdd1372ee50430057bc3 Mon Sep 17 00:00:00 2001 From: Justin Kovacich Date: Tue, 28 Apr 2026 09:42:36 -0400 Subject: [PATCH 5/6] phase 13.6e: alloc-counting witness + bare_metal_client uses StaticChannels MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three changes: 1. **`tests/bare_metal_client.rs`** — switch from `EmbassySyncChannels` (heap-alloc per call) to a macro-declared `TestStaticChannels` via `define_static_channels!`. The Client integration test now exercises the static-pool channel path end-to-end. Pool sizes are deliberately small (oneshot pool=8/4/4, bounded pool=1/4/4, unbounded pool=1) — production firmware sizes pools to the workload's high-water mark. 2. **`tests/static_channels_alloc_witness.rs`** (new) — counting global allocator wired through `#[global_allocator]` plus two `#[tokio::test]`s that assert specific operations do not allocate after construction: - `no_alloc_when_claiming_oneshot_through_static_pool`: claim/send/release on a warmed-up pool allocates zero times. - `client_interface_read_after_construction_does_not_allocate`: 16 successive `client.interface()` calls allocate zero times. Tests serialize over a `MEASURE_LOCK` to keep parallel test execution from interleaving allocations across measurement regions. This is a softer witness than the panicking-allocator harness the design memo specifies. The full panic-on-alloc gate requires (a) a no-alloc test executor (tokio's runtime allocates), (b) a no-alloc `Spawner` impl for per-socket loops, and (c) stack-based `E2ERegistryHandle` / `InterfaceHandle` impls. Each of those is real work and lives under phase 16's HighTec-target CI harness umbrella. The counting witness here catches per-call channel-storage regressions today; phase 16 catches everything else. 3. **`src/embassy_channels.rs` docstring** — point at `crate::static_channels` and `define_static_channels!` as the no-alloc bare-metal path; `EmbassySyncChannels` is now framed as the on-ramp for `std + alloc` integration. `Cargo.toml` declares the new test under `required-features = ["client", "bare_metal"]`, matching `bare_metal_client`. Verification: - `cargo build` clean across `bare_metal`, `client+bare_metal+std`, `client-tokio`, `server`, all-features. - `cargo test --all-features -- --test-threads=1`: 491 tests pass (467 lib + 11 client_server + 1 bare_metal_client + 2 alloc witness + 1 bare_metal example + 9 doctests). - `cargo clippy --all-targets --all-features` clean. - `cargo fmt -- --check` clean. This concludes phase 13.6 — a 4-commit stack on feature/phase13_6_static_channels: - 13.6a: const-N quirk fix (already on branch). - 13.6b: ChannelFactory per-T `*Pooled` bounds. - 13.6c: src/static_channels/ pool primitives. - 13.6d: define_static_channels! macro. - 13.6e: this commit — alloc-counting witness + integration. Co-Authored-By: Claude Opus 4.7 (1M context) --- Cargo.toml | 4 + src/embassy_channels.rs | 32 ++- tests/bare_metal_client.rs | 102 +++++--- tests/static_channels_alloc_witness.rs | 327 +++++++++++++++++++++++++ 4 files changed, 417 insertions(+), 48 deletions(-) create mode 100644 tests/static_channels_alloc_witness.rs diff --git a/Cargo.toml b/Cargo.toml index 05477f1..425d534 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -101,3 +101,7 @@ required-features = ["client-tokio", "server"] [[test]] name = "bare_metal_client" required-features = ["client", "bare_metal"] + +[[test]] +name = "static_channels_alloc_witness" +required-features = ["client", "bare_metal"] diff --git a/src/embassy_channels.rs b/src/embassy_channels.rs index c1574ce..eeabb61 100644 --- a/src/embassy_channels.rs +++ b/src/embassy_channels.rs @@ -12,20 +12,28 @@ //! constructs a oneshot via this factory, so each such method //! triggers one `Arc` allocation. //! -//! This violates the strategic bare-metal goal "zero heap after -//! `Client::new` returns." The fix is a static-pool `ChannelFactory` -//! impl (planned as `StaticChannels`) that -//! hands out indices into a pre-allocated `static` array of -//! `Channel`s; that work is its own phase because it may require a -//! `ChannelFactory` trait-shape adjustment to permit `&'static Sender` -//! / `&'static Receiver` ownership. Until that lands, this impl is -//! useful for two cases: +//! # Use [`crate::static_channels`] for the no-alloc bare-metal path //! -//! 1. Bringing up a bare-metal port end-to-end on `std + alloc` -//! targets, validating the trait surface before the no-alloc -//! push. +//! Phase 13.6c shipped [`crate::static_channels`] — a no-alloc +//! `ChannelFactory` whose senders and receivers carry `&'static` +//! references into pre-allocated `OneshotPool` / `MpscPool` storage. +//! Phase 13.6d shipped the [`crate::define_static_channels`] macro +//! that generates the per-`T` `*Pooled` impls + a +//! [`ChannelFactory`] impl on a unit struct. +//! +//! `EmbassySyncChannels` remains useful for two cases: +//! +//! 1. Bringing up a bare-metal port on `std + alloc` targets where +//! you want the trait-surface integration validated before +//! declaring static pool sizes. //! 2. Demonstrating the `ChannelFactory` integration shape for -//! consumers writing their own no-alloc impl. +//! consumers writing their own backend. +//! +//! For production firmware targeting "zero heap after +//! `Client::new` returns", switch to the macro-declared static +//! pools. See `tests/bare_metal_client.rs` for the integration +//! pattern and `tests/static_channels_alloc_witness.rs` for the +//! per-call no-alloc verification. //! //! [`bounded`]: ChannelFactory::bounded //! [`unbounded`]: ChannelFactory::unbounded diff --git a/tests/bare_metal_client.rs b/tests/bare_metal_client.rs index fb1177f..e63faee 100644 --- a/tests/bare_metal_client.rs +++ b/tests/bare_metal_client.rs @@ -1,25 +1,34 @@ -//! Phase-13.5 witness test: prove that `Client` can be constructed and -//! driven without the `client-tokio` feature, using only the trait -//! surface (`TransportFactory`, `Spawner`, `Timer`, `ChannelFactory`, -//! `E2ERegistryHandle`, `InterfaceHandle`). +//! Phase-13.6 witness test: prove that `Client` can be constructed and +//! driven without the `client-tokio` feature, using a static-pool +//! [`ChannelFactory`] declared via [`define_static_channels!`] — the +//! production-bound bare-metal path (no per-call heap allocation for +//! channel storage). +//! +//! [`ChannelFactory`]: simple_someip::transport::ChannelFactory +//! [`define_static_channels!`]: simple_someip::define_static_channels +//! +//! Originally a phase-13.5 witness using `EmbassySyncChannels` (which +//! still heap-allocates an `Arc>` per call). Phase 13.6c +//! shipped the `static_channels` module; phase 13.6d shipped the +//! `define_static_channels!` macro; this test now exercises that +//! macro end-to-end against `Client::new_with_deps`. //! //! `simple-someip` is compiled with `default-features = false, //! features = ["client", "bare_metal"]` per the `required-features` -//! gate below — i.e. NO tokio, NO socket2 pulled in via the crate -//! itself. The test still uses the host's tokio runtime as a generic -//! executor (tokio is a `dev-dependency`), but every type fed to -//! `simple-someip::Client::new_with_factory_spawner_timer_and_loopback` -//! comes from the no-tokio side: a hand-rolled mock `TransportFactory`, -//! a hand-rolled `Timer`, the bare-metal `EmbassySyncChannels`, and -//! a `Spawner` that wraps `tokio::spawn` purely as the test-side -//! executor. +//! gate below — NO tokio, NO socket2 pulled in via the crate itself. +//! The test runtime still uses the host's tokio (a `dev-dependency`) +//! for `#[tokio::test]` execution, but every type fed to +//! `Client::new_with_deps` is from the no-tokio side: a hand-rolled +//! mock `TransportFactory`, a hand-rolled `Timer`, the +//! macro-declared static-pool channels, and a `Spawner` that wraps +//! `tokio::spawn` purely as the test executor. //! -//! This is the gate witness for the phase-13.5 claim that `Client` -//! is reachable on a no-tokio build. Compile-witness alone (Cargo -//! `required-features` proving the test crate compiles without -//! `client-tokio`) is the load-bearing assertion; the runtime -//! send/recv at the end is a sanity check that the wired-up generics -//! actually drive a working pipeline. +//! Compile-witness alone (Cargo `required-features` proving the test +//! crate compiles without `client-tokio`) is the load-bearing +//! assertion; the runtime send/recv at the end is a sanity check +//! that the wired-up generics actually drive a working pipeline. +//! Per-call heap-allocation absence is verified separately in +//! `tests/static_channels_alloc_witness.rs`. #![cfg(all(feature = "client", feature = "bare_metal"))] use core::future::Future; @@ -30,13 +39,38 @@ use core::time::Duration; use std::collections::VecDeque; use std::sync::{Arc, Mutex}; +use simple_someip::client::Error as ClientError; +use simple_someip::client::{ClientUpdate, ControlMessage, ReceivedMessage, SendMessage}; +use simple_someip::define_static_channels; use simple_someip::e2e::E2ERegistry; -use simple_someip::embassy_channels::EmbassySyncChannels; +use simple_someip::protocol::sd::RebootFlag; use simple_someip::transport::{ ReceivedDatagram, SocketOptions, Spawner, Timer, TransportError, TransportFactory, TransportSocket, }; -use simple_someip::{Client, ClientDeps}; +use simple_someip::{Client, ClientDeps, RawPayload}; + +// ── Static-pool channel factory declared via the macro ──────────────── +// +// One pool per channeled `T`. Pool sizes here are deliberately small +// for a witness test; production firmware would size pools to the +// workload's high-water mark. +define_static_channels! { + name: TestStaticChannels, + oneshot: [ + (Result<(), ClientError>, 8), + (Result, 4), + (Result, 4), + ], + bounded: [ + ((ControlMessage, 4), 1), + ((SendMessage, 16), 4), + ((Result, ClientError>, 16), 4), + ], + unbounded: [ + (ClientUpdate, 1), + ], +} // ── Mock transport ───────────────────────────────────────────────────── @@ -202,10 +236,10 @@ async fn client_constructible_without_client_tokio_feature() { let e2e_handle: Arc> = Arc::new(Mutex::new(E2ERegistry::new())); let (client, _updates, run_fut) = Client::< - simple_someip::RawPayload, + RawPayload, Arc>, Arc>, - EmbassySyncChannels, + TestStaticChannels, >::new_with_deps( ClientDeps { factory, @@ -217,27 +251,23 @@ async fn client_constructible_without_client_tokio_feature() { false, ); - // Spawn the run loop on an abortable handle so we can stop it - // cleanly at the end of the test. Note: `EmbassySyncChannels` does - // not surface a "all senders dropped" close signal, so dropping - // `client` does not gracefully shut the run loop down — that's - // intentional for embassy-sync, which is designed for static - // SPSC/MPSC patterns. The witness goal here is purely - // compile-time: the constructor accepts no-tokio types, returns - // a `Client` + updates triple, and the run-loop future is - // `Send + 'static` (proven by the `tokio::spawn` below). + // Compile-time witness: the constructor accepts no-tokio types, + // returns a `Client` + updates triple, and the run-loop future + // is `Send + 'static` (proven by the `tokio::spawn` below). let run_handle = tokio::spawn(run_fut); // Verify the Client handle is usable: read its interface address. assert_eq!(client.interface(), Ipv4Addr::LOCALHOST); - // Tear down: abort the run-loop task and drop the Client. We do - // not await drain of `updates` because EmbassySyncChannels has - // no close-on-sender-drop semantics (would require a tracking - // wrapper, which is out of scope for the witness). + // Tear down. `TestStaticChannels`'s bounded sender Drop sets the + // slot's `closed` flag and wakes the receiver, so dropping all + // `Client` clones lets the run loop's control-channel `recv` + // resolve to `None` and the loop exits naturally — but it's + // simpler to abort the spawned task directly here. The witness + // goal is the compile + start-up sanity check, not graceful + // shutdown semantics. run_handle.abort(); drop(client); - // Yield once so the abort takes effect before the test exits. tokio::time::sleep(Duration::from_millis(50)).await; } diff --git a/tests/static_channels_alloc_witness.rs b/tests/static_channels_alloc_witness.rs new file mode 100644 index 0000000..37fb5d0 --- /dev/null +++ b/tests/static_channels_alloc_witness.rs @@ -0,0 +1,327 @@ +//! Phase-13.6e witness: prove that the static-pool [`ChannelFactory`] +//! generated by [`define_static_channels!`] does not invoke the global +//! allocator on the request/response hot path. +//! +//! [`ChannelFactory`]: simple_someip::transport::ChannelFactory +//! [`define_static_channels!`]: simple_someip::define_static_channels +//! +//! # What this test asserts +//! +//! 1. `Client::new_with_deps` is allowed to allocate — the std-flavored +//! `Arc>` and `Arc>` handles +//! used here, plus tokio's task-spawning machinery, all heap-back. +//! The strategic-goal claim is "zero heap **after** `Client::new` +//! returns," not "zero heap, period." +//! 2. After construction, calling [`Client::interface`] (a pure handle +//! read) does not allocate. +//! 3. After construction, claiming + dropping a oneshot through the +//! macro-declared static pool does not allocate. This is the +//! direct witness for the strategic-goal claim about per-call +//! channel storage. +//! +//! # Why a counting allocator and not a panicking one +//! +//! The phase-16 design memo specifies a `#[global_allocator]` shim +//! that **panics** on allocation after `Client::new` returns. That +//! requires a no-alloc test executor (tokio's runtime allocates on +//! its own), no-alloc `Spawner` impl for the per-socket loops, and +//! stack-based `E2ERegistryHandle` / `InterfaceHandle` impls. Each +//! of those is a real piece of work and lives under the phase-16 CI +//! harness umbrella. +//! +//! The counting allocator here is a softer witness: it instruments +//! every allocation through a [`std::sync::atomic::AtomicUsize`] +//! counter and checks the delta across specific operations. It +//! catches regressions where a channel construction starts heap- +//! allocating; it does not catch "tokio runtime allocated to drive +//! a sleep" because that allocation is acceptable in the host-test +//! context. The phase-16 panicking harness will catch both. +#![cfg(all(feature = "client", feature = "bare_metal"))] + +use core::future::Future; +use core::net::{Ipv4Addr, SocketAddrV4}; +use core::pin::Pin; +use core::task::{Context, Poll}; +use core::time::Duration; +use std::alloc::{GlobalAlloc, Layout, System}; +use std::collections::VecDeque; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::{Arc, Mutex}; + +use simple_someip::client::Error as ClientError; +use simple_someip::client::{ClientUpdate, ControlMessage, ReceivedMessage, SendMessage}; +use simple_someip::define_static_channels; +use simple_someip::e2e::E2ERegistry; +use simple_someip::protocol::sd::RebootFlag; +use simple_someip::transport::{ + ChannelFactory, OneshotSend, ReceivedDatagram, SocketOptions, Spawner, Timer, TransportError, + TransportFactory, TransportSocket, +}; +use simple_someip::{Client, ClientDeps, RawPayload}; + +// ── Counting global allocator ───────────────────────────────────────── + +static ALLOC_COUNT: AtomicUsize = AtomicUsize::new(0); + +/// Serializes the alloc-measurement region across `#[tokio::test]`s in +/// this file. Without it, parallel test execution would interleave +/// allocations from one test into another's `(baseline, end)` window. +static MEASURE_LOCK: Mutex<()> = Mutex::new(()); + +struct CountingAllocator; + +unsafe impl GlobalAlloc for CountingAllocator { + unsafe fn alloc(&self, layout: Layout) -> *mut u8 { + ALLOC_COUNT.fetch_add(1, Ordering::Relaxed); + // SAFETY: forwarding to System with caller's layout + // contract preserved. + unsafe { System.alloc(layout) } + } + unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) { + // SAFETY: forwarding to System; ptr/layout came from System::alloc + // (we only delegate forward). + unsafe { System.dealloc(ptr, layout) } + } + unsafe fn alloc_zeroed(&self, layout: Layout) -> *mut u8 { + ALLOC_COUNT.fetch_add(1, Ordering::Relaxed); + // SAFETY: forwarding to System. + unsafe { System.alloc_zeroed(layout) } + } + unsafe fn realloc(&self, ptr: *mut u8, layout: Layout, new_size: usize) -> *mut u8 { + ALLOC_COUNT.fetch_add(1, Ordering::Relaxed); + // SAFETY: forwarding to System; ptr/layout/new_size invariants + // upheld by caller. + unsafe { System.realloc(ptr, layout, new_size) } + } +} + +#[global_allocator] +static GLOBAL: CountingAllocator = CountingAllocator; + +fn alloc_count() -> usize { + ALLOC_COUNT.load(Ordering::Relaxed) +} + +// ── Static channels declaration ─────────────────────────────────────── + +define_static_channels! { + name: WitnessChannels, + oneshot: [ + (Result<(), ClientError>, 8), + (Result, 4), + (Result, 4), + ], + bounded: [ + ((ControlMessage, 4), 1), + ((SendMessage, 16), 4), + ((Result, ClientError>, 16), 4), + ], + unbounded: [ + (ClientUpdate, 1), + ], +} + +// ── Mock transport (mirrors tests/bare_metal_client.rs) ─────────────── + +#[derive(Default)] +struct MockPipe { + sent: Mutex, SocketAddrV4)>>, + inbound: Mutex, SocketAddrV4)>>, +} + +#[derive(Clone)] +struct MockFactory { + pipe: Arc, + local_port: Arc>, +} + +impl TransportFactory for MockFactory { + type Socket = MockSocket; + fn bind( + &self, + addr: SocketAddrV4, + _options: &SocketOptions, + ) -> impl Future> + Send { + let pipe = Arc::clone(&self.pipe); + let mut p = self.local_port.lock().unwrap(); + let port = if addr.port() == 0 { + let next = *p + 1; + *p = next; + 30000 + next + } else { + addr.port() + }; + let local = SocketAddrV4::new(*addr.ip(), port); + async move { Ok(MockSocket { pipe, local }) } + } +} + +struct MockSocket { + pipe: Arc, + local: SocketAddrV4, +} + +struct MockSendFut { + pipe: Arc, + bytes: Option>, + target: SocketAddrV4, +} + +impl Future for MockSendFut { + type Output = Result<(), TransportError>; + fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll { + let me = self.get_mut(); + if let Some(bytes) = me.bytes.take() { + me.pipe.sent.lock().unwrap().push_back((bytes, me.target)); + } + Poll::Ready(Ok(())) + } +} + +struct MockRecvFut<'a> { + pipe: Arc, + buf: &'a mut [u8], +} + +impl Future for MockRecvFut<'_> { + type Output = Result; + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let me = self.get_mut(); + let entry = me.pipe.inbound.lock().unwrap().pop_front(); + match entry { + Some((bytes, source)) => { + let n = bytes.len().min(me.buf.len()); + me.buf[..n].copy_from_slice(&bytes[..n]); + Poll::Ready(Ok(ReceivedDatagram { + bytes_received: n, + source, + truncated: n < bytes.len(), + })) + } + None => { + cx.waker().wake_by_ref(); + Poll::Pending + } + } + } +} + +impl TransportSocket for MockSocket { + type SendFuture<'a> = MockSendFut; + type RecvFuture<'a> = MockRecvFut<'a>; + + fn send_to<'a>(&'a self, buf: &'a [u8], target: SocketAddrV4) -> Self::SendFuture<'a> { + MockSendFut { + pipe: Arc::clone(&self.pipe), + bytes: Some(buf.to_vec()), + target, + } + } + + fn recv_from<'a>(&'a self, buf: &'a mut [u8]) -> Self::RecvFuture<'a> { + MockRecvFut { + pipe: Arc::clone(&self.pipe), + buf, + } + } + + fn local_addr(&self) -> Result { + Ok(self.local) + } + + fn join_multicast_v4(&self, _g: Ipv4Addr, _i: Ipv4Addr) -> Result<(), TransportError> { + Ok(()) + } + fn leave_multicast_v4(&self, _g: Ipv4Addr, _i: Ipv4Addr) -> Result<(), TransportError> { + Ok(()) + } +} + +struct MockTimer; +impl Timer for MockTimer { + async fn sleep(&self, _duration: Duration) { + tokio::task::yield_now().await; + } +} + +struct TokioBackedSpawner; +impl Spawner for TokioBackedSpawner { + fn spawn(&self, future: impl Future + Send + 'static) { + drop(tokio::spawn(future)); + } +} + +// ── Witnesses ───────────────────────────────────────────────────────── + +#[tokio::test] +async fn no_alloc_when_claiming_oneshot_through_static_pool() { + let _guard = MEASURE_LOCK.lock().unwrap(); + // Warm any one-time tokio-runtime / first-claim seeding allocations + // before measuring. + { + let (tx, _rx) = WitnessChannels::oneshot::>(); + tx.send(Ok(())).unwrap(); + } + + let baseline = alloc_count(); + { + // A second claim/release cycle must not allocate. The pool is + // already seeded; the slot returned from the first claim is on + // the free list. + let (tx, _rx) = WitnessChannels::oneshot::>(); + tx.send(Ok(())).unwrap(); + } + let delta = alloc_count() - baseline; + assert_eq!( + delta, 0, + "static-pool oneshot claim/release allocated {delta} times \ + after warm-up; expected zero", + ); +} + +#[tokio::test] +async fn client_interface_read_after_construction_does_not_allocate() { + let pipe = Arc::new(MockPipe::default()); + let factory = MockFactory { + pipe: Arc::clone(&pipe), + local_port: Arc::new(Mutex::new(0)), + }; + let interface_handle: Arc> = + Arc::new(std::sync::RwLock::new(Ipv4Addr::LOCALHOST)); + let e2e_handle: Arc> = Arc::new(Mutex::new(E2ERegistry::new())); + + let (client, _updates, run_fut) = Client::< + RawPayload, + Arc>, + Arc>, + WitnessChannels, + >::new_with_deps( + ClientDeps { + factory, + spawner: TokioBackedSpawner, + timer: MockTimer, + e2e_registry: e2e_handle, + interface: interface_handle, + }, + false, + ); + let run_handle = tokio::spawn(run_fut); + + // After construction (and after a yield to give the spawn loop a + // chance to do its one-time setup allocs), measure pure-handle + // operations under the serialization lock. + tokio::task::yield_now().await; + let _guard = MEASURE_LOCK.lock().unwrap(); + let baseline = alloc_count(); + for _ in 0..16 { + assert_eq!(client.interface(), Ipv4Addr::LOCALHOST); + } + let delta = alloc_count() - baseline; + assert_eq!( + delta, 0, + "Client::interface() x16 allocated {delta} times; expected zero", + ); + + run_handle.abort(); + drop(client); +} From 6547a918ba107e8a7a18d4eb03dfea691cb44929 Mon Sep 17 00:00:00 2001 From: Justin Kovacich Date: Tue, 28 Apr 2026 10:02:50 -0400 Subject: [PATCH 6/6] phase 13.6f: waker tests, Debug impls, macro vis fragment, drop spurious wake - Add four missing tests: oneshot waker fires on send, oneshot cancel waker fires on sender drop, mpsc close waker fires when last sender drops, bounded-pool exhaustion returns None. - Remove spurious cancel_waker.wake() from OneshotSend::send Ok branch; embassy-sync's channel waker already wakes the receiver on value arrival, making the cancel_waker call redundant. - Add manual Debug impls for all ten public pool/handle types. - Add #[derive(Debug)] to the struct generated by define_static_channels!. - Accept optional vis: $vis:vis prefix in define_static_channels! via @body delegation arm; callers without vis: default to pub. Co-Authored-By: Claude Sonnet 4.6 --- src/static_channels/mod.rs | 165 +++++++++++++++++++++++++++++++++++-- 1 file changed, 157 insertions(+), 8 deletions(-) diff --git a/src/static_channels/mod.rs b/src/static_channels/mod.rs index 4dc97ee..b6f034e 100644 --- a/src/static_channels/mod.rs +++ b/src/static_channels/mod.rs @@ -212,11 +212,6 @@ impl OneshotSend for StaticOneshotSender { match self.slot.chan.try_send(value) { Ok(()) => { self.sent = true; - // Wake the receiver via cancel_waker too — its poll_fn - // re-checks the channel after the chan-internal waker - // wakes it, but waking cancel_waker also covers the - // case where the receiver registered there last. - self.slot.cancel_waker.wake(); Ok(()) } Err(embassy_sync::channel::TrySendError::Full(v)) => Err(v), @@ -658,6 +653,75 @@ fn mpsc_poll_recv( Poll::Pending } +// ── Debug impls ─────────────────────────────────────────────────────── + +impl core::fmt::Debug for OneshotSlot { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + f.debug_struct("OneshotSlot") + .field("state", &self.state) + .finish_non_exhaustive() + } +} + +impl core::fmt::Debug for OneshotPool { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + f.debug_struct("OneshotPool").finish_non_exhaustive() + } +} + +impl core::fmt::Debug for StaticOneshotSender { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + f.debug_struct("StaticOneshotSender") + .field("sent", &self.sent) + .finish_non_exhaustive() + } +} + +impl core::fmt::Debug for StaticOneshotReceiver { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + f.debug_struct("StaticOneshotReceiver").finish_non_exhaustive() + } +} + +impl core::fmt::Debug for MpscSlot { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + f.debug_struct("MpscSlot") + .field("refcount", &self.refcount) + .field("closed", &self.closed) + .finish_non_exhaustive() + } +} + +impl core::fmt::Debug for MpscPool { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + f.debug_struct("MpscPool").finish_non_exhaustive() + } +} + +impl core::fmt::Debug for StaticBoundedSender { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + f.debug_struct("StaticBoundedSender").finish_non_exhaustive() + } +} + +impl core::fmt::Debug for StaticBoundedReceiver { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + f.debug_struct("StaticBoundedReceiver").finish_non_exhaustive() + } +} + +impl core::fmt::Debug for StaticUnboundedSender { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + f.debug_struct("StaticUnboundedSender").finish_non_exhaustive() + } +} + +impl core::fmt::Debug for StaticUnboundedReceiver { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + f.debug_struct("StaticUnboundedReceiver").finish_non_exhaustive() + } +} + // ── `define_static_channels!` macro ─────────────────────────────────── /// Default slot capacity for unbounded channels declared via @@ -715,14 +779,22 @@ pub const UNBOUNDED_DEFAULT_CAP: usize = 128; /// from the pool size in the macro grammar. #[macro_export] macro_rules! define_static_channels { + // Entry point: explicit visibility. + ( vis: $vis:vis, name: $name:ident, $($rest:tt)* ) => { + $crate::define_static_channels! { @body $vis, $name, $($rest)* } + }; + // Entry point: no visibility token — default to `pub`. + ( name: $name:ident, $($rest:tt)* ) => { + $crate::define_static_channels! { @body pub, $name, $($rest)* } + }; ( - name: $name:ident, + @body $vis:vis, $name:ident, oneshot: [ $( ($ot:ty, $opool:literal) ),* $(,)? ], bounded: [ $( (($bt:ty, $bcap:literal), $bpool:literal) ),* $(,)? ], unbounded: [ $( ($ut:ty, $upool:literal) ),* $(,)? ] $(,)? ) => { - #[derive(Clone, Copy)] - pub struct $name; + #[derive(Clone, Copy, Debug)] + $vis struct $name; impl $crate::transport::ChannelFactory for $name { type OneshotSender = @@ -972,4 +1044,81 @@ mod tests { let (tx, _rx) = MacroTestChannels::unbounded::(); assert!(tx.send_now(1234).is_ok()); } + + // ── Waker-tracking helper ───────────────────────────────────────── + + use std::sync::Arc; + use std::sync::atomic::{AtomicBool, Ordering as SAtomic}; + + struct WakeFlag(AtomicBool); + impl std::task::Wake for WakeFlag { + fn wake(self: Arc) { + self.0.store(true, SAtomic::Release); + } + fn wake_by_ref(self: &Arc) { + self.0.store(true, SAtomic::Release); + } + } + fn tracking_waker() -> (Arc, Waker) { + let flag = Arc::new(WakeFlag(AtomicBool::new(false))); + let waker = Waker::from(flag.clone()); + (flag, waker) + } + + // ── Waker firing tests ──────────────────────────────────────────── + + #[test] + fn oneshot_waker_fires_on_send() { + static POOL: OneshotPool = OneshotPool::new(); + let (tx, rx) = POOL.claim().expect("pool not empty"); + let (flag, waker) = tracking_waker(); + let mut cx = Context::from_waker(&waker); + let mut fut = pin!(rx.recv()); + assert!(matches!(fut.as_mut().poll(&mut cx), Poll::Pending)); + tx.send(42u32).unwrap(); + assert!(flag.0.load(SAtomic::Acquire), "waker must fire when value is sent"); + let noop = Waker::noop(); + let mut cx2 = Context::from_waker(noop); + assert!(matches!(fut.as_mut().poll(&mut cx2), Poll::Ready(Ok(42)))); + } + + #[test] + fn oneshot_cancel_waker_fires_on_sender_drop() { + static POOL: OneshotPool = OneshotPool::new(); + let (tx, rx) = POOL.claim().expect("pool not empty"); + let (flag, waker) = tracking_waker(); + let mut cx = Context::from_waker(&waker); + let mut fut = pin!(rx.recv()); + assert!(matches!(fut.as_mut().poll(&mut cx), Poll::Pending)); + drop(tx); + assert!(flag.0.load(SAtomic::Acquire), "waker must fire when sender is dropped (cancel)"); + let noop = Waker::noop(); + let mut cx2 = Context::from_waker(noop); + assert!(matches!(fut.as_mut().poll(&mut cx2), Poll::Ready(Err(OneshotCancelled)))); + } + + #[test] + fn mpsc_close_waker_fires_on_all_senders_drop() { + static POOL: MpscPool = MpscPool::new(); + let (tx, mut rx) = POOL.claim_bounded().expect("pool not empty"); + let tx2 = tx.clone(); + let (flag, waker) = tracking_waker(); + let mut cx = Context::from_waker(&waker); + let mut fut = pin!(rx.recv()); + assert!(matches!(fut.as_mut().poll(&mut cx), Poll::Pending)); + drop(tx); + assert!(!flag.0.load(SAtomic::Acquire), "waker must not fire until last sender drops"); + drop(tx2); + assert!(flag.0.load(SAtomic::Acquire), "waker must fire when last sender drops"); + let noop = Waker::noop(); + let mut cx2 = Context::from_waker(noop); + assert!(matches!(fut.as_mut().poll(&mut cx2), Poll::Ready(None))); + } + + #[test] + fn mpsc_bounded_pool_exhaustion_returns_none() { + static POOL: MpscPool = MpscPool::new(); + let _a = POOL.claim_bounded().expect("pool not empty"); + assert!(POOL.claim_bounded().is_none(), "second claim must exhaust pool of size 1"); + } }