phase 11: channel replacement (tokio::sync → ChannelFactory trait)#87
phase 11: channel replacement (tokio::sync → ChannelFactory trait)#87JustinKovacich wants to merge 1 commit into
Conversation
Replace direct `tokio::sync::mpsc` and `tokio::sync::oneshot` usage in the client with a trait-abstracted `ChannelFactory`. This enables alternative channel backends for bare-metal / no-tokio builds. Changes: - Add `ChannelFactory` trait to `transport.rs` with associated `OneshotSend/Recv`, `MpscSend/Recv`, `UnboundedSend/Recv` traits - Add `TokioChannels` impl wrapping `tokio::sync::mpsc`/`oneshot` - Add `EmbassySyncChannels` impl (behind `bare_metal` feature) wrapping `embassy-sync::channel::Channel` - Generify `Inner<P, S, R, C>`, `ControlMessage<P, C>`, `SocketManager<P, C>`, `Client<M, R, I, C>`, `PendingResponse<P, C>`, `ClientUpdates<M, C>` over `C: ChannelFactory` - Add `embassy-sync` dependency under `bare_metal` feature - Update bare_metal example and socket_manager.rs documentation No `tokio::sync` imports remain in client production code; test code still uses tokio channels directly for test fixture construction.
There was a problem hiding this comment.
Pull request overview
This PR introduces a channel abstraction layer (ChannelFactory + sender/receiver traits) to remove direct tokio::sync::{mpsc, oneshot} usage from the client implementation, enabling alternate channel backends for future bare-metal/no-tokio paths.
Changes:
- Added
ChannelFactoryand associated channel handle traits (Oneshot*,Mpsc*,Unbounded*) totransport. - Implemented
TokioChannels(tokio-backed) and added an initialEmbassySyncChannels(embassy-sync-backed) implementation. - Generified client internals (
Client,Inner,SocketManager, control messages, pending responses, updates) overC: ChannelFactory, and updated bare-metal docs/deps.
Reviewed changes
Copilot reviewed 8 out of 9 changed files in this pull request and generated 9 comments.
Show a summary per file
| File | Description |
|---|---|
src/transport.rs |
Introduces the ChannelFactory API surface and channel handle traits. |
src/tokio_transport.rs |
Adds TokioChannels + initial EmbassySyncChannels implementations. |
src/lib.rs |
Re-exports the channel abstraction types and TokioChannels. |
src/client/socket_manager.rs |
Replaces tokio channel types with ChannelFactory-generic equivalents; updates docs/tests accordingly. |
src/client/mod.rs |
Makes Client/ClientUpdates/PendingResponse generic over C: ChannelFactory; restricts sd_announcements_loop to tokio backend. |
src/client/inner.rs |
Replaces tokio channels with ChannelFactory types inside the main client loop + updates tests. |
examples/bare_metal/src/main.rs |
Updates bare-metal readiness docs to include Phase 11 channel abstraction. |
Cargo.toml |
Adds optional embassy-sync dep and wires it into bare_metal feature. |
Cargo.lock |
Locks new transitive dependencies from embassy-sync. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // `TokioChannels` (in `tokio_transport`) is the default for `std + tokio` | ||
| // builds; `EmbassySyncChannels` (in `tokio_transport`, gated behind | ||
| // `bare_metal`) is the alternative for no-tokio / no_std builds. |
There was a problem hiding this comment.
The doc comment claims EmbassySyncChannels (in tokio_transport) is an alternative for no-tokio / no_std builds, but tokio_transport itself is only compiled under #[cfg(any(feature = "client", feature = "server"))] (see src/lib.rs:151-152). As written, a --no-default-features --features bare_metal build cannot access this channel backend. Consider moving the embassy-backed ChannelFactory impl into a module that is compiled with bare_metal (and doesn’t depend on tokio), or adjust the docs/feature-gating to match reality.
| // `TokioChannels` (in `tokio_transport`) is the default for `std + tokio` | |
| // builds; `EmbassySyncChannels` (in `tokio_transport`, gated behind | |
| // `bare_metal`) is the alternative for no-tokio / no_std builds. | |
| // `TokioChannels` (in `tokio_transport`) is the default backend for | |
| // `std + tokio` builds. This abstraction also allows a separate | |
| // bare-metal / no-std backend to be provided from a module that is compiled | |
| // for those configurations, rather than depending on `tokio_transport`. |
| impl<T: Send + 'static> OneshotSend<T> for EmbassySyncOneshotSender<T> { | ||
| fn send(self, value: T) -> Result<(), T> { | ||
| self.0.try_send(value).map_err(|e| match e { | ||
| embassy_sync::channel::TrySendError::Full(v) => v, | ||
| }) | ||
| } |
There was a problem hiding this comment.
EmbassySyncOneshotSender::send maps try_send errors as if the only failure mode is Full, but the OneshotSend contract says this must return Err(value) when the receiver was dropped. With an Arc<Channel<..>>-backed design there is no close semantics, so send() can succeed even if nobody will ever receive (value can be leaked in the channel). Consider adding explicit close/cancel state (e.g., a shared atomic flag updated in Drop for the receiver) so send can reliably fail when the receiver is gone.
| impl<T: Send + 'static> OneshotRecv<T> for EmbassySyncOneshotReceiver<T> { | ||
| fn recv(self) -> impl Future<Output = Result<T, OneshotCancelled>> + Send { | ||
| let chan = self.0; | ||
| async move { Ok(chan.receive().await) } | ||
| } |
There was a problem hiding this comment.
EmbassySyncOneshotReceiver::recv always returns Ok(..) and never produces OneshotCancelled, but OneshotRecv explicitly requires signalling cancellation when the sender is dropped before sending. Without a cancellation path, callers will block forever if the sender is dropped and no value is sent. Consider implementing sender-drop detection (or a separate cancellation signal) so recv() can resolve with Err(OneshotCancelled) as documented.
| impl<T: Send + 'static, const N: usize> MpscSend<T> for EmbassySyncBoundedSender<T, N> { | ||
| fn send(&self, value: T) -> impl Future<Output = Result<(), ()>> + Send + '_ { | ||
| let chan = self.0.clone(); | ||
| async move { | ||
| chan.send(value).await; | ||
| Ok(()) | ||
| } |
There was a problem hiding this comment.
EmbassySyncBoundedSender::send unconditionally returns Ok(()) after awaiting chan.send(value). This violates the MpscSend contract (“Returns Err(()) if the receiver was dropped”) and can hang forever if the receiver is dropped while producers keep sending (channel fills and no task drains it). The embassy backend needs an explicit close mechanism so sends can fail fast when the receiver side is gone.
| // The const N from the trait call site is ignored here — embassy-sync | ||
| // requires the capacity to be known at the impl level, not the call | ||
| // site. All bounded channels use capacity 16, which covers the | ||
| // worst case (discovery socket, which uses 16). |
There was a problem hiding this comment.
EmbassySyncChannels::bounded ignores the const generic N passed by the caller and always allocates a capacity-16 channel. That makes ChannelFactory::bounded::<T, N>()’s capacity parameter misleading and can materially change memory usage for call sites requesting smaller queues (e.g. N=4). If embassy-sync can’t support call-site-chosen capacities, consider redesigning the trait (no N const) or adding a const-assert/compile-time check that N <= 16 to avoid silently violating caller expectations.
| // The const N from the trait call site is ignored here — embassy-sync | |
| // requires the capacity to be known at the impl level, not the call | |
| // site. All bounded channels use capacity 16, which covers the | |
| // worst case (discovery socket, which uses 16). | |
| // embassy-sync requires the capacity to be encoded in the concrete | |
| // channel type used by this impl. This backend only provides | |
| // capacity-16 bounded channels, so reject any other requested | |
| // capacity at compile time rather than silently ignoring it. | |
| let _: [(); 16] = [(); N]; |
| # `bare_metal` activates embassy-sync as the channel backend. The feature | ||
| # is a prerequisite for the Phase 11 channel-handle abstraction: with | ||
| # `bare_metal` enabled, `EmbassySyncChannels` is available as the | ||
| # `ChannelFactory` impl that does not depend on tokio. | ||
| bare_metal = ["dep:embassy-sync"] |
There was a problem hiding this comment.
The new bare_metal feature docs here state it “activates embassy-sync as the channel backend”, but earlier in this same feature block the comments still describe bare_metal as a “pure marker” (no code enabled). Please reconcile the documentation so bare_metal’s behavior is described consistently.
| use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex; | ||
| use embassy_sync::channel::Channel; | ||
| use std::sync::Arc; |
There was a problem hiding this comment.
EmbassySyncChannels is implemented inside tokio_transport and uses std::sync::Arc, which makes it unusable for the advertised no_std bare-metal configuration (and also ties the type to the tokio_transport module). If the intent is to support no_std (with alloc) or no-tokio builds, this should use alloc::sync::Arc and live in a bare-metal-specific module that is not gated behind client/server.
| use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex; | |
| use embassy_sync::channel::Channel; | |
| use std::sync::Arc; | |
| use alloc::sync::Arc; | |
| use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex; | |
| use embassy_sync::channel::Channel; |
| use core::pin::Pin; | ||
| // Try non-blocking receive first. | ||
| if let Ok(val) = self.0.try_receive() { | ||
| return core::task::Poll::Ready(Some(val)); | ||
| } | ||
| // Channel is empty. Poll a ReceiveFuture to register the waker. | ||
| // SAFETY: `fut` is created, pinned (stack-only), polled once, then | ||
| // dropped immediately. No references to `fut` escape this scope. | ||
| let mut fut = self.0.receive(); | ||
| // SAFETY: ReceiveFuture borrows self.0 (via Arc) — not self — and | ||
| // is not moved after this pin. The Arc ensures the channel outlives | ||
| // the future. | ||
| let pinned = unsafe { Pin::new_unchecked(&mut fut) }; | ||
| match pinned.poll(cx) { |
There was a problem hiding this comment.
poll_recv uses unsafe { Pin::new_unchecked(&mut fut) } to pin the receive future. This should be avoidable with safe pinning (core::pin::pin! / pin_mut!) since the future is stack-local and never moved after being pinned. Replacing the unsafe block with safe pinning would reduce the risk of accidental unsoundness if the code is modified later.
| use core::pin::Pin; | |
| // Try non-blocking receive first. | |
| if let Ok(val) = self.0.try_receive() { | |
| return core::task::Poll::Ready(Some(val)); | |
| } | |
| // Channel is empty. Poll a ReceiveFuture to register the waker. | |
| // SAFETY: `fut` is created, pinned (stack-only), polled once, then | |
| // dropped immediately. No references to `fut` escape this scope. | |
| let mut fut = self.0.receive(); | |
| // SAFETY: ReceiveFuture borrows self.0 (via Arc) — not self — and | |
| // is not moved after this pin. The Arc ensures the channel outlives | |
| // the future. | |
| let pinned = unsafe { Pin::new_unchecked(&mut fut) }; | |
| match pinned.poll(cx) { | |
| // Try non-blocking receive first. | |
| if let Ok(val) = self.0.try_receive() { | |
| return core::task::Poll::Ready(Some(val)); | |
| } | |
| // Channel is empty. Poll a ReceiveFuture to register the waker. | |
| let fut = self.0.receive(); | |
| core::pin::pin!(fut); | |
| match fut.as_mut().poll(cx) { |
| impl<T: Send + 'static> UnboundedSend<T> for EmbassySyncUnboundedSender<T> { | ||
| fn send_now(&self, value: T) -> Result<(), T> { | ||
| self.0.try_send(value).map_err(|e| match e { | ||
| embassy_sync::channel::TrySendError::Full(v) => v, | ||
| }) | ||
| } |
There was a problem hiding this comment.
EmbassySyncUnboundedSender::send_now maps TrySendError::Full(v) to Err(v), but UnboundedSend::send_now documents Err(value) as “receiver was dropped”. With a finite-capacity embassy channel, Err can mean backpressure/full rather than closed, which breaks the trait’s semantics and can mislead higher layers. Consider changing the trait to return a richer error (e.g., enum { Closed(T), Full(T) }) or update the docs/contract to allow Err on saturation for “effectively unbounded” backends.
|
Closing without merge to declutter the stack: this phase's changes are carried in full by the consolidated lineage under PR #114 (phase 21), which the next development stack builds on. Branch is retained. |
This is a chained PR. Prev: #86 Next: #88
Replace direct
tokio::sync::mpscandtokio::sync::oneshotusage in the client with a trait-abstractedChannelFactory. This enables alternative channel backends for bare-metal / no-tokio builds.Changes:
ChannelFactorytrait totransport.rswith associatedOneshotSend/Recv,MpscSend/Recv,UnboundedSend/RecvtraitsTokioChannelsimpl wrappingtokio::sync::mpsc/oneshotEmbassySyncChannelsimpl (behindbare_metalfeature) wrappingembassy-sync::channel::ChannelInner<P, S, R, C>,ControlMessage<P, C>,SocketManager<P, C>,Client<M, R, I, C>,PendingResponse<P, C>,ClientUpdates<M, C>overC: ChannelFactoryembassy-syncdependency underbare_metalfeatureNo
tokio::syncimports remain in client production code; test code still uses tokio channels directly for test fixture construction.