Skip to content

Select biased timer#81

Closed
JustinKovacich wants to merge 1 commit into
feature/hoist_tokio_spawnfrom
feature/phase7_select_biased_timer
Closed

Select biased timer#81
JustinKovacich wants to merge 1 commit into
feature/hoist_tokio_spawnfrom
feature/phase7_select_biased_timer

Conversation

@JustinKovacich

@JustinKovacich JustinKovacich commented Apr 23, 2026

Copy link
Copy Markdown
Contributor

This PR is part of a chain. See Prev: #80; See Next: #82

@JustinKovacich JustinKovacich requested a review from Copilot April 23, 2026 14:52
@JustinKovacich JustinKovacich mentioned this pull request Apr 23, 2026
@JustinKovacich JustinKovacich mentioned this pull request Apr 23, 2026
@JustinKovacich JustinKovacich self-assigned this Apr 23, 2026
@JustinKovacich JustinKovacich added documentation Improvements or additions to documentation enhancement New feature or request labels Apr 23, 2026

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR refactors the client/server async event loops to use executor-agnostic futures selection and routes periodic sleeps through the crate’s Timer trait (via TokioTimer in the tokio backend), supporting eventual non-tokio/bare-metal integrations.

Changes:

  • Replace tokio::select! usage with futures::select! + .fuse()/pin_mut! to ensure unbiased selection and cleaner borrow lifetimes.
  • Route periodic ticks (client idle tick, server announcement tick, client SD announcement loop) through Timer::sleep (TokioTimer).
  • Update the client SD announcements API to return a caller-driven future and add end-to-end + error-propagation tests for custom TransportFactory.

Reviewed changes

Copilot reviewed 6 out of 7 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
src/tokio_transport.rs Updates TokioTimer docs to reflect internal usage at periodic tick sites.
src/server/mod.rs Uses TokioTimer for announcement sleep; replaces socket receive tokio::select! with futures::select!.
src/client/socket_manager.rs Reworks socket loop selection via futures::select!; adds tests for custom transport factories.
src/client/mod.rs Renames/reworks SD announcements into a caller-driven future; adds cadence + Send/'static tests.
src/client/inner.rs Replaces tokio::select! with futures::select! and routes the 125ms idle tick via TokioTimer.
Cargo.toml Adds optional futures dep and enables it under client/server features.
Cargo.lock Locks new futures transitive dependencies.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread Cargo.toml Outdated
Comment thread src/client/socket_manager.rs Outdated
Comment thread src/client/mod.rs
JustinKovacich added a commit that referenced this pull request Apr 23, 2026
Three fixes addressing Copilot review comments: (1) correct the
`futures` dependency comment in Cargo.toml to describe actual usage
(`select!` + `FuturesExt::fuse`/`pin_mut!`) rather than the stale
`select_biased!`/`FuturesUnordered` claim; (2) retitle the
`socket_manager` recv error log from "Error decoding message" to
"Transport recv failed" and rename the binding to `recv_err`, since the
error originates from `socket.recv_from`, not a parse step; (3) remove
the pre-loop sleep in `sd_announcements_loop` so the first announcement
lands at ~1× interval instead of ~2× — the in-loop sleep now carries
the initial-delay role. A new regression test,
`sd_announcements_loop_first_emit_within_one_interval`, pins
first-emit latency below 250ms at a 100ms interval.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
JustinKovacich added a commit that referenced this pull request Apr 23, 2026
Three fixes addressing Copilot review comments: (1) correct the
`futures` dependency comment in Cargo.toml to describe actual usage
(`select!` + `FuturesExt::fuse`/`pin_mut!`) rather than the stale
`select_biased!`/`FuturesUnordered` claim; (2) retitle the
`socket_manager` recv error log from "Error decoding message" to
"Transport recv failed" and rename the binding to `recv_err`, since the
error originates from `socket.recv_from`, not a parse step; (3) remove
the pre-loop sleep in `sd_announcements_loop` so the first announcement
lands at ~1× interval instead of ~2× — the in-loop sleep now carries
the initial-delay role. A new regression test,
`sd_announcements_loop_first_emit_within_one_interval`, pins
first-emit latency below 250ms at a 100ms interval.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
@JustinKovacich JustinKovacich force-pushed the feature/phase7_select_biased_timer branch from c95e19a to 27f9e67 Compare April 23, 2026 17:20
@JustinKovacich JustinKovacich requested a review from Copilot April 23, 2026 17:41

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR continues the “executor-hoisting / bare-metal readiness” chain by replacing tokio::select!-based loops with executor-agnostic futures::select! (with .fuse()/pin_mut!), and by routing periodic sleeps through the crate’s Timer trait (currently backed by TokioTimer).

Changes:

  • Replace tokio::select! with futures::select! in client/server hot loops to keep fairness while improving borrow scoping.
  • Route periodic ticks (client idle tick, server announcement tick, client SD announcement interval) through Timer::sleep.
  • Rename client SD announcement API to return a caller-driven future (sd_announcements_loop) and add/expand tests around transport customization and announcement cadence.

Reviewed changes

Copilot reviewed 6 out of 7 changed files in this pull request and generated 1 comment.

Show a summary per file
File Description
src/tokio_transport.rs Updates TokioTimer docs to reflect new internal usage across periodic tick sites.
src/server/mod.rs Switches server receive loop to futures::select! and routes announcement sleep through TokioTimer via Timer.
src/client/socket_manager.rs Reworks the socket I/O loop to use futures::select! with a per-iteration outcome to drop borrows cleanly; adds end-to-end/negative tests for custom factories.
src/client/mod.rs Renames start_sd_announcementssd_announcements_loop and changes it to return an impl Future; adds cadence-related tests.
src/client/inner.rs Moves client event loop selection to futures::select! and routes the 125ms idle tick through Timer.
Cargo.toml Adds optional futures dependency and wires it into client/server features.
Cargo.lock Locks new futures transitive dependencies.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread src/server/mod.rs
@JustinKovacich JustinKovacich force-pushed the feature/hoist_tokio_spawn branch from d0043ad to 8b2ff9a Compare April 23, 2026 17:58
JustinKovacich added a commit that referenced this pull request Apr 23, 2026
Round-2 Copilot feedback on PR #81: the passive-server error
message in `announcement_loop` still referred to
`Client::start_sd_announcements`, but this PR renamed that API to
`Client::sd_announcements_loop`. Updated the error message to
point at the correct name so users hitting the "called on passive
server" error aren't sent to a non-existent function.

Docs/string-literal only.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
@JustinKovacich JustinKovacich requested a review from Copilot April 24, 2026 21:10

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR continues the “executor-agnostic” refactor by switching client/server async loops from tokio::select! / direct tokio::time calls to futures::select! and routing periodic sleeps through the crate’s Timer abstraction (currently TokioTimer).

Changes:

  • Replace tokio::select! usage in client/server loops with futures::select! + .fuse()/pin_mut! to manage borrows and preserve fairness.
  • Route periodic tick sleeps (client idle tick, server announcement tick, SD announcement cadence) through Timer::sleep (TokioTimer).
  • Rename Client::start_sd_announcements to Client::sd_announcements_loop (now returns a future) and add tests around custom transport + announcement cadence.

Reviewed changes

Copilot reviewed 6 out of 7 changed files in this pull request and generated 2 comments.

Show a summary per file
File Description
src/tokio_transport.rs Update TokioTimer docs to reflect internal usage at periodic tick sites.
src/server/mod.rs Use TokioTimer.sleep for announcement tick; replace tokio::select! with futures::select! in receive loop.
src/client/socket_manager.rs Switch socket loop from tokio::select! to futures::select!; add end-to-end transport-factory tests.
src/client/mod.rs Rename SD announcement API to a caller-driven future; add cadence/latency tests; route sleeps via TokioTimer.
src/client/inner.rs Replace tokio::select! with futures::select! and route idle tick via TokioTimer.sleep.
Cargo.toml Add optional futures dep and enable it for client/server features.
Cargo.lock Lockfile updates for futures and transitive deps.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread src/client/mod.rs
Comment thread src/client/mod.rs Outdated

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR continues the executor-agnostic refactor (per PR chain) by replacing tokio::select!/direct tokio::time::sleep usage in core loops with futures::select! and routing periodic waits through the crate’s Timer trait (TokioTimer in the tokio backend), improving fairness/borrow scoping and easing future bare-metal timer swaps.

Changes:

  • Replace tokio::select! with futures::select! (+ .fuse()/pin_mut!) in client/server socket/event loops to preserve fairness and tighten borrow lifetimes.
  • Route periodic ticks through Timer::sleep (TokioTimer) in the server announcement loop and client inner/SD announcement loops.
  • Add futures as an optional dependency enabled by client/server, plus new tests around custom TransportFactory behavior and SD announcement timing.

Reviewed changes

Copilot reviewed 6 out of 7 changed files in this pull request and generated no comments.

Show a summary per file
File Description
src/tokio_transport.rs Updates TokioTimer docs to reflect that it’s now used internally at periodic tick sites.
src/server/mod.rs Replaces tokio::select! with futures::select! in the run loop and routes the 1s announcement sleep through TokioTimer/Timer.
src/client/socket_manager.rs Reworks the per-socket I/O loop to use futures::select! with scoped borrows; adds end-to-end custom TransportFactory tests and error-propagation test.
src/client/mod.rs Makes SD announcements loop executor-driven (impl Future) and routes sleep via Timer; updates/expands tests (including ignored multicast-dependent cadence/first-emit tests).
src/client/inner.rs Reworks the client processing loop to use futures::select! and routes the 125ms idle tick through TokioTimer/Timer with scoped borrows.
Cargo.toml Adds optional futures dependency and enables it under client/server features.
Cargo.lock Locks newly introduced futures dependency tree.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@JustinKovacich JustinKovacich marked this pull request as ready for review April 24, 2026 22:02
@JustinKovacich JustinKovacich force-pushed the feature/hoist_tokio_spawn branch from c162474 to 82df573 Compare April 25, 2026 00:48
Final state of #81, squashed from 8 commits to keep the rebase onto the
rewritten #80 tractable. The intermediate history includes a transient
select_biased! step that was reverted to plain select! for fairness
within the same PR, which would have produced a contradictory mid-rebase
state if replayed individually.

Original commits, oldest first:

- 888bcfa  Add futures dep behind client/server features for upcoming
           phase 7 work. Enables futures::FutureExt / pin_mut / select
           macros in the event loops; no code paths use it yet.

- be42199  Dropped tokio::select in favor of futures::select_biased!
           (with FutureExt::fuse + pin_mut!) in socket_manager and
           client::inner; removed tokio::spawn(...) from Client::new
           (the run-future is now returned to the caller); migrated
           client.start_sd_announcements -> sd_announcements_loop
           returning a future for the caller to spawn.

- 2f90058  Utilize TokioTimer for the loop sleeps, one step closer to
           a bare-metal replacement.

- 0df42f2  Address adversarial review for phase 7 — fairness,
           readability, coverage:
           * select_biased! -> select! at the 3 event-loop sites
             (socket_manager, inner::run_future, server::run). Restores
             random per-poll fairness and eliminates the arm-starvation
             risk introduced by biased ordering.
           * Imports Timer + TokioTimer at each Timer call site so UFCS
             reduces to method syntax (TokioTimer.sleep(d)).
           * Hoists socket_manager's Outcome<P> enum out of
             spawn_socket_loop's async block to module scope.
           * Updates stale "phase 6" doc references to point at the
             actual planned hoist phase (8, alongside the bare-metal
             example).
           * New tests:
               sd_announcements_loop_cadence_stays_close_to_requested
               bind_with_transport_carries_traffic_end_to_end
               bind_with_transport_propagates_factory_error
               client_new_run_future_is_send_static

- 27f9e67  phase 7: respond to PR #81 feedback (Copilot):
           (1) correct futures dep comment in Cargo.toml to describe
               actual usage (select! + FutureExt::fuse / pin_mut!);
           (2) rename recv error log to "Transport recv failed" with
               binding `recv_err` since the error is from
               socket.recv_from, not a parse step;
           (3) drop the pre-loop sleep in sd_announcements_loop so the
               first announcement lands at ~1x interval instead of ~2x;
               in-loop sleep carries the initial-delay role.
           New test sd_announcements_loop_first_emit_within_one_interval
           pins first-emit latency below 250ms at 100ms interval.

- 4f7f9d5  docs: fix stale Client API name in passive-server error
           message (start_sd_announcements -> sd_announcements_loop).

- f5c674a  chore(clippy): tidy new warnings in phase7 PR:
           * match_wildcard_for_single_variants on SocketAddr match —
             make the wildcard explicit as `other @ SocketAddr::V6(_)`.
           * manual_async_fn on AlwaysBusyFactory test-impl of
             TransportFactory::bind: rewrite as async fn.

- 33ce551  round-3: fix #[ignore] reasons on sd_announcements_loop
           tests. Align both tests under #[ignore] with an accurate
           reason referencing the real constraint (MULTICAST on the
           loopback interface); drop the stale sd_state.rs reference.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
JustinKovacich added a commit that referenced this pull request Apr 25, 2026
Final state of #82, squashed from 14 commits to keep the rebase onto
the rewritten #81 tractable. Several intermediate commits reshape the
same areas (bare_metal example reframing, test hygiene, doc updates,
clippy fixes), so a single rebase against the final shape avoids
fighting transient mid-PR states.

Original commits, oldest first:

- 287b47a  Removed warns on dropped callers, removed tokio::spawn in
           subscribe_no_wait, response oneshot is now silent.

- 6b8dfc3  subscribe_no_wait orphan cleanup.

- c519d4f  Added a bare_metal feature to feature flags and Cargo.toml.

- cff87a8  Added a bare_metal example with no tokio, no socket2, no
           std::net — exercises the TransportSocket / TransportFactory
           / Timer / SpawnFuture trait surface end-to-end on host so
           that breakage of the abstraction is caught before any real
           bare-metal port.

- e2465ef  Added integration tests, gave examples of running
           bare_metal, added warnings against using demo code as
           implementation prototypes.

- 5cd838b  (same subject — second commit of the same change set).

- d4cb3b1  Merge remote-tracking branch
           'origin/feature/phase8_bare_metal' into
           feature/phase8_bare_metal.

- 7e46c3a  phase 8: reframe bare_metal example as host-side
           trait-surface canary. Doc comments and test descriptions
           clarify that the example is a trait-surface canary, not a
           real bare-metal target.

- 16e3b84  round-2: docs + test-hygiene fixes for phase 8.

- c99a9ee  chore(clippy): tidy new warnings in phase8 PR.

- 99eff06  docs: update stale function-name references in comments.

- a45bd5b  test(bare_metal_example_builds): drop runtime
           CARGO_MANIFEST_DIR assert. The runtime check was redundant
           with the at-compile-time path resolution and noisy when run
           outside cargo.

- f770bf9  Update examples/bare_metal/src/main.rs.

- 1e92f43  PR #82 round: workspace-command wording + spelling fixes.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR continues the executor-agnostic refactor by routing periodic timing through the Timer trait and replacing tokio::select! call sites with futures::select! (+ fuse/pin_mut) to improve fairness and make future non-tokio backends easier to integrate (per the PR chain around #80 / #82).

Changes:

  • Route periodic sleeps through Timer (TokioTimer) in client/server loops (125ms idle tick, 1s server announcements, client SD announcement interval).
  • Replace tokio::select! usage with futures::select! (and per-iteration fused/pinned futures) in server receive loop and client socket loops.
  • Add optional futures dependency and enable it via client/server features; expand tests for custom TransportFactory usage.

Reviewed changes

Copilot reviewed 6 out of 7 changed files in this pull request and generated 5 comments.

Show a summary per file
File Description
src/tokio_transport.rs Updates TokioTimer docs to reflect internal use at periodic tick sites.
src/server/mod.rs Uses TokioTimer for announcement sleep; swaps recv tokio::select! for futures::select!.
src/client/socket_manager.rs Uses futures::select! in socket loop; adds custom-factory tests; adjusts E2E oversize test fixture.
src/client/mod.rs Makes SD announcement loop executor-driven (impl Future) and routes timing via Timer.
src/client/inner.rs Uses futures::select! in main run loop and routes idle tick through TokioTimer.
Cargo.toml Adds optional futures dependency and wires it into client/server features.
Cargo.lock Locks new futures transitive dependencies.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +461 to +462
Outcome::Recv(Err(recv_err)) => {
error!("Transport recv failed: {:?}", recv_err);
Comment on lines +729 to +733
// Craft a message whose raw-encoded size fits UDP_BUFFER_SIZE (16-byte
// header + 1480-byte payload = 1496 bytes) but whose E2E-protected
// size does not (payload grows by PROFILE4_HEADER_SIZE = 12, pushing
// the total to 1508 bytes, 8 over MTU).
let payload_bytes = [0u8; 1480];
.await
.expect("send_to via custom-factory-built socket");

let mut buf = [0u8; 1500];
Comment thread src/client/inner.rs
Comment on lines +943 to +951
if request_queue.push_back(ctrl).is_err() {
// Queue full: the rejected ControlMessage is
// dropped, so any oneshot senders inside it
// cancel — callers awaiting those receivers
// will observe `RecvError`.
warn!(
"request_queue at capacity ({}); dropping control message",
REQUEST_QUEUE_CAP
);
Comment on lines +310 to +327
Outcome::Send(Some(send_message)) => {
trace!("Sending: {:?}", &send_message);
let mut message_length = match send_message
.message
.encode(&mut buf.as_mut_slice())
{
Ok(length) => length,
Err(e) => {
// This arm is the transport-level recv_from
// result; decoding runs further up inside
// `MessageView::parse`. An `Err` here is an
// I/O failure on the socket read, not a
// decode failure.
//
// `map_io_error` in tokio_transport already
// logs the raw OS error + kind (at `warn!`
// for actionable kinds, `debug!` for
// steady-state noise like `TimedOut`), so
// stay at `debug!` here to avoid double-
// logging the same failure at `error!`.
debug!("recv_from returned error on socket loop: {:?}", e);
}
}
},
message = tx_rx.recv() => {
if let Some(send_message) = message {
trace!("Sending: {:?}", &send_message);
// Fail fast with the capacity error rather than
// letting `encode` report a less-actionable
// protocol I/O error when it runs out of
// buffer. Matches the E2E-overflow arm below
// and the server event_publisher path.
let required_size = send_message.message.required_size();
if required_size > UDP_BUFFER_SIZE {
error!(
"outgoing message ({} bytes) exceeds UDP_BUFFER_SIZE ({}); dropping send",
required_size, UDP_BUFFER_SIZE
);
let _ = send_message.response.send(Err(Error::Capacity("udp_buffer")));
continue;
}
let mut message_length = match send_message.message.encode(&mut buf.as_mut_slice()) {
Ok(length) => length,
Err(e) => {
error!("Failed to encode message: {:?}", e);
// If the sender is already closed we can't send the error back, so we shut everything down
if send_message.response.send(Err(e.into())).is_err() {
error!("Socket owner closed channel unexpectedly, closing socket.");
break;
}
error!("Failed to encode message: {:?}", e);
// If the sender is already closed we can't send the error back, so we shut everything down
if let Ok(()) = send_message.response.send(Err(e.into())) {
// Successfully sent error back to sender, carry on
continue;
}
};

// Apply E2E protect if configured. `protected`
// is a disjoint stack buffer, so the input can
// be borrowed directly out of `buf[16..]` with
// no intermediate copy.
{
let key = E2EKey::from_message_id(send_message.message.header().message_id());
let mut registry = e2e_registry.lock().expect("e2e registry lock poisoned");
if registry.contains_key(&key) {
let upper_header: [u8; 8] = buf[8..16].try_into().expect("upper header slice");
let mut protected = [0u8; UDP_BUFFER_SIZE];
let result = registry.protect(
key,
&buf[16..message_length],
upper_header,
&mut protected,
);
match result {
Some(Ok(protected_len)) => {
if 16 + protected_len > UDP_BUFFER_SIZE {
error!(
"E2E-protected datagram ({} bytes, header + protected payload) exceeds UDP_BUFFER_SIZE ({}); dropping send",
16 + protected_len, UDP_BUFFER_SIZE
);
let _ = send_message.response.send(Err(Error::Capacity("udp_buffer")));
continue;
}
#[allow(clippy::cast_possible_truncation)]
let new_length: u32 = 8 + protected_len as u32;
buf[4..8].copy_from_slice(&new_length.to_be_bytes());
buf[16..16 + protected_len].copy_from_slice(&protected[..protected_len]);
message_length = 16 + protected_len;
}
Some(Err(e)) => {
error!("E2E protect error: {:?}", e);
error!("Socket owner closed channel unexpectedly, closing socket.");
break;
}
};
@JustinKovacich

Copy link
Copy Markdown
Contributor Author

Closing without merge to declutter the stack: this phase's changes are carried in full by the consolidated lineage under PR #114 (phase 21), which the next development stack builds on. Branch is retained.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

documentation Improvements or additions to documentation enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants