Skip to content

phase 11: channel replacement (tokio::sync → ChannelFactory trait)#87

Closed
JustinKovacich wants to merge 1 commit into
feature/phase10_lock_handlefrom
feature/phase11_channel_replacement
Closed

phase 11: channel replacement (tokio::sync → ChannelFactory trait)#87
JustinKovacich wants to merge 1 commit into
feature/phase10_lock_handlefrom
feature/phase11_channel_replacement

Conversation

@JustinKovacich

@JustinKovacich JustinKovacich commented Apr 27, 2026

Copy link
Copy Markdown
Contributor

This is a chained PR. Prev: #86 Next: #88

Replace direct tokio::sync::mpsc and tokio::sync::oneshot usage in the client with a trait-abstracted ChannelFactory. This enables alternative channel backends for bare-metal / no-tokio builds.

Changes:

  • Add ChannelFactory trait to transport.rs with associated OneshotSend/Recv, MpscSend/Recv, UnboundedSend/Recv traits
  • Add TokioChannels impl wrapping tokio::sync::mpsc/oneshot
  • Add EmbassySyncChannels impl (behind bare_metal feature) wrapping embassy-sync::channel::Channel
  • Generify Inner<P, S, R, C>, ControlMessage<P, C>, SocketManager<P, C>, Client<M, R, I, C>, PendingResponse<P, C>, ClientUpdates<M, C> over C: ChannelFactory
  • Add embassy-sync dependency under bare_metal feature
  • Update bare_metal example and socket_manager.rs documentation

No tokio::sync imports remain in client production code; test code still uses tokio channels directly for test fixture construction.

Replace direct `tokio::sync::mpsc` and `tokio::sync::oneshot` usage in
the client with a trait-abstracted `ChannelFactory`. This enables
alternative channel backends for bare-metal / no-tokio builds.

Changes:
- Add `ChannelFactory` trait to `transport.rs` with associated
  `OneshotSend/Recv`, `MpscSend/Recv`, `UnboundedSend/Recv` traits
- Add `TokioChannels` impl wrapping `tokio::sync::mpsc`/`oneshot`
- Add `EmbassySyncChannels` impl (behind `bare_metal` feature)
  wrapping `embassy-sync::channel::Channel`
- Generify `Inner<P, S, R, C>`, `ControlMessage<P, C>`,
  `SocketManager<P, C>`, `Client<M, R, I, C>`, `PendingResponse<P, C>`,
  `ClientUpdates<M, C>` over `C: ChannelFactory`
- Add `embassy-sync` dependency under `bare_metal` feature
- Update bare_metal example and socket_manager.rs documentation

No `tokio::sync` imports remain in client production code; test code
still uses tokio channels directly for test fixture construction.

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces a channel abstraction layer (ChannelFactory + sender/receiver traits) to remove direct tokio::sync::{mpsc, oneshot} usage from the client implementation, enabling alternate channel backends for future bare-metal/no-tokio paths.

Changes:

  • Added ChannelFactory and associated channel handle traits (Oneshot*, Mpsc*, Unbounded*) to transport.
  • Implemented TokioChannels (tokio-backed) and added an initial EmbassySyncChannels (embassy-sync-backed) implementation.
  • Generified client internals (Client, Inner, SocketManager, control messages, pending responses, updates) over C: ChannelFactory, and updated bare-metal docs/deps.

Reviewed changes

Copilot reviewed 8 out of 9 changed files in this pull request and generated 9 comments.

Show a summary per file
File Description
src/transport.rs Introduces the ChannelFactory API surface and channel handle traits.
src/tokio_transport.rs Adds TokioChannels + initial EmbassySyncChannels implementations.
src/lib.rs Re-exports the channel abstraction types and TokioChannels.
src/client/socket_manager.rs Replaces tokio channel types with ChannelFactory-generic equivalents; updates docs/tests accordingly.
src/client/mod.rs Makes Client/ClientUpdates/PendingResponse generic over C: ChannelFactory; restricts sd_announcements_loop to tokio backend.
src/client/inner.rs Replaces tokio channels with ChannelFactory types inside the main client loop + updates tests.
examples/bare_metal/src/main.rs Updates bare-metal readiness docs to include Phase 11 channel abstraction.
Cargo.toml Adds optional embassy-sync dep and wires it into bare_metal feature.
Cargo.lock Locks new transitive dependencies from embassy-sync.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread src/transport.rs
Comment on lines +667 to +669
// `TokioChannels` (in `tokio_transport`) is the default for `std + tokio`
// builds; `EmbassySyncChannels` (in `tokio_transport`, gated behind
// `bare_metal`) is the alternative for no-tokio / no_std builds.

Copilot AI Apr 27, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The doc comment claims EmbassySyncChannels (in tokio_transport) is an alternative for no-tokio / no_std builds, but tokio_transport itself is only compiled under #[cfg(any(feature = "client", feature = "server"))] (see src/lib.rs:151-152). As written, a --no-default-features --features bare_metal build cannot access this channel backend. Consider moving the embassy-backed ChannelFactory impl into a module that is compiled with bare_metal (and doesn’t depend on tokio), or adjust the docs/feature-gating to match reality.

Suggested change
// `TokioChannels` (in `tokio_transport`) is the default for `std + tokio`
// builds; `EmbassySyncChannels` (in `tokio_transport`, gated behind
// `bare_metal`) is the alternative for no-tokio / no_std builds.
// `TokioChannels` (in `tokio_transport`) is the default backend for
// `std + tokio` builds. This abstraction also allows a separate
// bare-metal / no-std backend to be provided from a module that is compiled
// for those configurations, rather than depending on `tokio_transport`.

Copilot uses AI. Check for mistakes.
Comment thread src/tokio_transport.rs
Comment on lines +445 to +450
impl<T: Send + 'static> OneshotSend<T> for EmbassySyncOneshotSender<T> {
fn send(self, value: T) -> Result<(), T> {
self.0.try_send(value).map_err(|e| match e {
embassy_sync::channel::TrySendError::Full(v) => v,
})
}

Copilot AI Apr 27, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

EmbassySyncOneshotSender::send maps try_send errors as if the only failure mode is Full, but the OneshotSend contract says this must return Err(value) when the receiver was dropped. With an Arc<Channel<..>>-backed design there is no close semantics, so send() can succeed even if nobody will ever receive (value can be leaked in the channel). Consider adding explicit close/cancel state (e.g., a shared atomic flag updated in Drop for the receiver) so send can reliably fail when the receiver is gone.

Copilot uses AI. Check for mistakes.
Comment thread src/tokio_transport.rs
Comment on lines +453 to +457
impl<T: Send + 'static> OneshotRecv<T> for EmbassySyncOneshotReceiver<T> {
fn recv(self) -> impl Future<Output = Result<T, OneshotCancelled>> + Send {
let chan = self.0;
async move { Ok(chan.receive().await) }
}

Copilot AI Apr 27, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

EmbassySyncOneshotReceiver::recv always returns Ok(..) and never produces OneshotCancelled, but OneshotRecv explicitly requires signalling cancellation when the sender is dropped before sending. Without a cancellation path, callers will block forever if the sender is dropped and no value is sent. Consider implementing sender-drop detection (or a separate cancellation signal) so recv() can resolve with Err(OneshotCancelled) as documented.

Copilot uses AI. Check for mistakes.
Comment thread src/tokio_transport.rs
Comment on lines +476 to +482
impl<T: Send + 'static, const N: usize> MpscSend<T> for EmbassySyncBoundedSender<T, N> {
fn send(&self, value: T) -> impl Future<Output = Result<(), ()>> + Send + '_ {
let chan = self.0.clone();
async move {
chan.send(value).await;
Ok(())
}

Copilot AI Apr 27, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

EmbassySyncBoundedSender::send unconditionally returns Ok(()) after awaiting chan.send(value). This violates the MpscSend contract (“Returns Err(()) if the receiver was dropped”) and can hang forever if the receiver is dropped while producers keep sending (channel fills and no task drains it). The embassy backend needs an explicit close mechanism so sends can fail fast when the receiver side is gone.

Copilot uses AI. Check for mistakes.
Comment thread src/tokio_transport.rs
Comment on lines +573 to +576
// The const N from the trait call site is ignored here — embassy-sync
// requires the capacity to be known at the impl level, not the call
// site. All bounded channels use capacity 16, which covers the
// worst case (discovery socket, which uses 16).

Copilot AI Apr 27, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

EmbassySyncChannels::bounded ignores the const generic N passed by the caller and always allocates a capacity-16 channel. That makes ChannelFactory::bounded::<T, N>()’s capacity parameter misleading and can materially change memory usage for call sites requesting smaller queues (e.g. N=4). If embassy-sync can’t support call-site-chosen capacities, consider redesigning the trait (no N const) or adding a const-assert/compile-time check that N <= 16 to avoid silently violating caller expectations.

Suggested change
// The const N from the trait call site is ignored here — embassy-sync
// requires the capacity to be known at the impl level, not the call
// site. All bounded channels use capacity 16, which covers the
// worst case (discovery socket, which uses 16).
// embassy-sync requires the capacity to be encoded in the concrete
// channel type used by this impl. This backend only provides
// capacity-16 bounded channels, so reject any other requested
// capacity at compile time rather than silently ignoring it.
let _: [(); 16] = [(); N];

Copilot uses AI. Check for mistakes.
Comment thread Cargo.toml
Comment on lines +69 to +73
# `bare_metal` activates embassy-sync as the channel backend. The feature
# is a prerequisite for the Phase 11 channel-handle abstraction: with
# `bare_metal` enabled, `EmbassySyncChannels` is available as the
# `ChannelFactory` impl that does not depend on tokio.
bare_metal = ["dep:embassy-sync"]

Copilot AI Apr 27, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new bare_metal feature docs here state it “activates embassy-sync as the channel backend”, but earlier in this same feature block the comments still describe bare_metal as a “pure marker” (no code enabled). Please reconcile the documentation so bare_metal’s behavior is described consistently.

Copilot uses AI. Check for mistakes.
Comment thread src/tokio_transport.rs
Comment on lines +431 to +433
use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex;
use embassy_sync::channel::Channel;
use std::sync::Arc;

Copilot AI Apr 27, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

EmbassySyncChannels is implemented inside tokio_transport and uses std::sync::Arc, which makes it unusable for the advertised no_std bare-metal configuration (and also ties the type to the tokio_transport module). If the intent is to support no_std (with alloc) or no-tokio builds, this should use alloc::sync::Arc and live in a bare-metal-specific module that is not gated behind client/server.

Suggested change
use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex;
use embassy_sync::channel::Channel;
use std::sync::Arc;
use alloc::sync::Arc;
use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex;
use embassy_sync::channel::Channel;

Copilot uses AI. Check for mistakes.
Comment thread src/tokio_transport.rs
Comment on lines +496 to +509
use core::pin::Pin;
// Try non-blocking receive first.
if let Ok(val) = self.0.try_receive() {
return core::task::Poll::Ready(Some(val));
}
// Channel is empty. Poll a ReceiveFuture to register the waker.
// SAFETY: `fut` is created, pinned (stack-only), polled once, then
// dropped immediately. No references to `fut` escape this scope.
let mut fut = self.0.receive();
// SAFETY: ReceiveFuture borrows self.0 (via Arc) — not self — and
// is not moved after this pin. The Arc ensures the channel outlives
// the future.
let pinned = unsafe { Pin::new_unchecked(&mut fut) };
match pinned.poll(cx) {

Copilot AI Apr 27, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

poll_recv uses unsafe { Pin::new_unchecked(&mut fut) } to pin the receive future. This should be avoidable with safe pinning (core::pin::pin! / pin_mut!) since the future is stack-local and never moved after being pinned. Replacing the unsafe block with safe pinning would reduce the risk of accidental unsoundness if the code is modified later.

Suggested change
use core::pin::Pin;
// Try non-blocking receive first.
if let Ok(val) = self.0.try_receive() {
return core::task::Poll::Ready(Some(val));
}
// Channel is empty. Poll a ReceiveFuture to register the waker.
// SAFETY: `fut` is created, pinned (stack-only), polled once, then
// dropped immediately. No references to `fut` escape this scope.
let mut fut = self.0.receive();
// SAFETY: ReceiveFuture borrows self.0 (via Arc) — not self — and
// is not moved after this pin. The Arc ensures the channel outlives
// the future.
let pinned = unsafe { Pin::new_unchecked(&mut fut) };
match pinned.poll(cx) {
// Try non-blocking receive first.
if let Ok(val) = self.0.try_receive() {
return core::task::Poll::Ready(Some(val));
}
// Channel is empty. Poll a ReceiveFuture to register the waker.
let fut = self.0.receive();
core::pin::pin!(fut);
match fut.as_mut().poll(cx) {

Copilot uses AI. Check for mistakes.
Comment thread src/tokio_transport.rs
Comment on lines +536 to +541
impl<T: Send + 'static> UnboundedSend<T> for EmbassySyncUnboundedSender<T> {
fn send_now(&self, value: T) -> Result<(), T> {
self.0.try_send(value).map_err(|e| match e {
embassy_sync::channel::TrySendError::Full(v) => v,
})
}

Copilot AI Apr 27, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

EmbassySyncUnboundedSender::send_now maps TrySendError::Full(v) to Err(v), but UnboundedSend::send_now documents Err(value) as “receiver was dropped”. With a finite-capacity embassy channel, Err can mean backpressure/full rather than closed, which breaks the trait’s semantics and can mislead higher layers. Consider changing the trait to return a richer error (e.g., enum { Closed(T), Full(T) }) or update the docs/contract to allow Err on saturation for “effectively unbounded” backends.

Copilot uses AI. Check for mistakes.
@JustinKovacich

Copy link
Copy Markdown
Contributor Author

Closing without merge to declutter the stack: this phase's changes are carried in full by the consolidated lineage under PR #114 (phase 21), which the next development stack builds on. Branch is retained.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants