Select biased timer#81
Conversation
There was a problem hiding this comment.
Pull request overview
This PR refactors the client/server async event loops to use executor-agnostic futures selection and routes periodic sleeps through the crate’s Timer trait (via TokioTimer in the tokio backend), supporting eventual non-tokio/bare-metal integrations.
Changes:
- Replace
tokio::select!usage withfutures::select!+.fuse()/pin_mut!to ensure unbiased selection and cleaner borrow lifetimes. - Route periodic ticks (client idle tick, server announcement tick, client SD announcement loop) through
Timer::sleep(TokioTimer). - Update the client SD announcements API to return a caller-driven future and add end-to-end + error-propagation tests for custom
TransportFactory.
Reviewed changes
Copilot reviewed 6 out of 7 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| src/tokio_transport.rs | Updates TokioTimer docs to reflect internal usage at periodic tick sites. |
| src/server/mod.rs | Uses TokioTimer for announcement sleep; replaces socket receive tokio::select! with futures::select!. |
| src/client/socket_manager.rs | Reworks socket loop selection via futures::select!; adds tests for custom transport factories. |
| src/client/mod.rs | Renames/reworks SD announcements into a caller-driven future; adds cadence + Send/'static tests. |
| src/client/inner.rs | Replaces tokio::select! with futures::select! and routes the 125ms idle tick via TokioTimer. |
| Cargo.toml | Adds optional futures dep and enables it under client/server features. |
| Cargo.lock | Locks new futures transitive dependencies. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Three fixes addressing Copilot review comments: (1) correct the `futures` dependency comment in Cargo.toml to describe actual usage (`select!` + `FuturesExt::fuse`/`pin_mut!`) rather than the stale `select_biased!`/`FuturesUnordered` claim; (2) retitle the `socket_manager` recv error log from "Error decoding message" to "Transport recv failed" and rename the binding to `recv_err`, since the error originates from `socket.recv_from`, not a parse step; (3) remove the pre-loop sleep in `sd_announcements_loop` so the first announcement lands at ~1× interval instead of ~2× — the in-loop sleep now carries the initial-delay role. A new regression test, `sd_announcements_loop_first_emit_within_one_interval`, pins first-emit latency below 250ms at a 100ms interval. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Three fixes addressing Copilot review comments: (1) correct the `futures` dependency comment in Cargo.toml to describe actual usage (`select!` + `FuturesExt::fuse`/`pin_mut!`) rather than the stale `select_biased!`/`FuturesUnordered` claim; (2) retitle the `socket_manager` recv error log from "Error decoding message" to "Transport recv failed" and rename the binding to `recv_err`, since the error originates from `socket.recv_from`, not a parse step; (3) remove the pre-loop sleep in `sd_announcements_loop` so the first announcement lands at ~1× interval instead of ~2× — the in-loop sleep now carries the initial-delay role. A new regression test, `sd_announcements_loop_first_emit_within_one_interval`, pins first-emit latency below 250ms at a 100ms interval. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
c95e19a to
27f9e67
Compare
There was a problem hiding this comment.
Pull request overview
This PR continues the “executor-hoisting / bare-metal readiness” chain by replacing tokio::select!-based loops with executor-agnostic futures::select! (with .fuse()/pin_mut!), and by routing periodic sleeps through the crate’s Timer trait (currently backed by TokioTimer).
Changes:
- Replace
tokio::select!withfutures::select!in client/server hot loops to keep fairness while improving borrow scoping. - Route periodic ticks (client idle tick, server announcement tick, client SD announcement interval) through
Timer::sleep. - Rename client SD announcement API to return a caller-driven future (
sd_announcements_loop) and add/expand tests around transport customization and announcement cadence.
Reviewed changes
Copilot reviewed 6 out of 7 changed files in this pull request and generated 1 comment.
Show a summary per file
| File | Description |
|---|---|
| src/tokio_transport.rs | Updates TokioTimer docs to reflect new internal usage across periodic tick sites. |
| src/server/mod.rs | Switches server receive loop to futures::select! and routes announcement sleep through TokioTimer via Timer. |
| src/client/socket_manager.rs | Reworks the socket I/O loop to use futures::select! with a per-iteration outcome to drop borrows cleanly; adds end-to-end/negative tests for custom factories. |
| src/client/mod.rs | Renames start_sd_announcements → sd_announcements_loop and changes it to return an impl Future; adds cadence-related tests. |
| src/client/inner.rs | Moves client event loop selection to futures::select! and routes the 125ms idle tick through Timer. |
| Cargo.toml | Adds optional futures dependency and wires it into client/server features. |
| Cargo.lock | Locks new futures transitive dependencies. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
d0043ad to
8b2ff9a
Compare
Round-2 Copilot feedback on PR #81: the passive-server error message in `announcement_loop` still referred to `Client::start_sd_announcements`, but this PR renamed that API to `Client::sd_announcements_loop`. Updated the error message to point at the correct name so users hitting the "called on passive server" error aren't sent to a non-existent function. Docs/string-literal only. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
This PR continues the “executor-agnostic” refactor by switching client/server async loops from tokio::select! / direct tokio::time calls to futures::select! and routing periodic sleeps through the crate’s Timer abstraction (currently TokioTimer).
Changes:
- Replace
tokio::select!usage in client/server loops withfutures::select!+.fuse()/pin_mut!to manage borrows and preserve fairness. - Route periodic tick sleeps (client idle tick, server announcement tick, SD announcement cadence) through
Timer::sleep(TokioTimer). - Rename
Client::start_sd_announcementstoClient::sd_announcements_loop(now returns a future) and add tests around custom transport + announcement cadence.
Reviewed changes
Copilot reviewed 6 out of 7 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| src/tokio_transport.rs | Update TokioTimer docs to reflect internal usage at periodic tick sites. |
| src/server/mod.rs | Use TokioTimer.sleep for announcement tick; replace tokio::select! with futures::select! in receive loop. |
| src/client/socket_manager.rs | Switch socket loop from tokio::select! to futures::select!; add end-to-end transport-factory tests. |
| src/client/mod.rs | Rename SD announcement API to a caller-driven future; add cadence/latency tests; route sleeps via TokioTimer. |
| src/client/inner.rs | Replace tokio::select! with futures::select! and route idle tick via TokioTimer.sleep. |
| Cargo.toml | Add optional futures dep and enable it for client/server features. |
| Cargo.lock | Lockfile updates for futures and transitive deps. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Pull request overview
This PR continues the executor-agnostic refactor (per PR chain) by replacing tokio::select!/direct tokio::time::sleep usage in core loops with futures::select! and routing periodic waits through the crate’s Timer trait (TokioTimer in the tokio backend), improving fairness/borrow scoping and easing future bare-metal timer swaps.
Changes:
- Replace
tokio::select!withfutures::select!(+.fuse()/pin_mut!) in client/server socket/event loops to preserve fairness and tighten borrow lifetimes. - Route periodic ticks through
Timer::sleep(TokioTimer) in the server announcement loop and client inner/SD announcement loops. - Add
futuresas an optional dependency enabled byclient/server, plus new tests around customTransportFactorybehavior and SD announcement timing.
Reviewed changes
Copilot reviewed 6 out of 7 changed files in this pull request and generated no comments.
Show a summary per file
| File | Description |
|---|---|
| src/tokio_transport.rs | Updates TokioTimer docs to reflect that it’s now used internally at periodic tick sites. |
| src/server/mod.rs | Replaces tokio::select! with futures::select! in the run loop and routes the 1s announcement sleep through TokioTimer/Timer. |
| src/client/socket_manager.rs | Reworks the per-socket I/O loop to use futures::select! with scoped borrows; adds end-to-end custom TransportFactory tests and error-propagation test. |
| src/client/mod.rs | Makes SD announcements loop executor-driven (impl Future) and routes sleep via Timer; updates/expands tests (including ignored multicast-dependent cadence/first-emit tests). |
| src/client/inner.rs | Reworks the client processing loop to use futures::select! and routes the 125ms idle tick through TokioTimer/Timer with scoped borrows. |
| Cargo.toml | Adds optional futures dependency and enables it under client/server features. |
| Cargo.lock | Locks newly introduced futures dependency tree. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
c162474 to
82df573
Compare
Final state of #81, squashed from 8 commits to keep the rebase onto the rewritten #80 tractable. The intermediate history includes a transient select_biased! step that was reverted to plain select! for fairness within the same PR, which would have produced a contradictory mid-rebase state if replayed individually. Original commits, oldest first: - 888bcfa Add futures dep behind client/server features for upcoming phase 7 work. Enables futures::FutureExt / pin_mut / select macros in the event loops; no code paths use it yet. - be42199 Dropped tokio::select in favor of futures::select_biased! (with FutureExt::fuse + pin_mut!) in socket_manager and client::inner; removed tokio::spawn(...) from Client::new (the run-future is now returned to the caller); migrated client.start_sd_announcements -> sd_announcements_loop returning a future for the caller to spawn. - 2f90058 Utilize TokioTimer for the loop sleeps, one step closer to a bare-metal replacement. - 0df42f2 Address adversarial review for phase 7 — fairness, readability, coverage: * select_biased! -> select! at the 3 event-loop sites (socket_manager, inner::run_future, server::run). Restores random per-poll fairness and eliminates the arm-starvation risk introduced by biased ordering. * Imports Timer + TokioTimer at each Timer call site so UFCS reduces to method syntax (TokioTimer.sleep(d)). * Hoists socket_manager's Outcome<P> enum out of spawn_socket_loop's async block to module scope. * Updates stale "phase 6" doc references to point at the actual planned hoist phase (8, alongside the bare-metal example). * New tests: sd_announcements_loop_cadence_stays_close_to_requested bind_with_transport_carries_traffic_end_to_end bind_with_transport_propagates_factory_error client_new_run_future_is_send_static - 27f9e67 phase 7: respond to PR #81 feedback (Copilot): (1) correct futures dep comment in Cargo.toml to describe actual usage (select! + FutureExt::fuse / pin_mut!); (2) rename recv error log to "Transport recv failed" with binding `recv_err` since the error is from socket.recv_from, not a parse step; (3) drop the pre-loop sleep in sd_announcements_loop so the first announcement lands at ~1x interval instead of ~2x; in-loop sleep carries the initial-delay role. New test sd_announcements_loop_first_emit_within_one_interval pins first-emit latency below 250ms at 100ms interval. - 4f7f9d5 docs: fix stale Client API name in passive-server error message (start_sd_announcements -> sd_announcements_loop). - f5c674a chore(clippy): tidy new warnings in phase7 PR: * match_wildcard_for_single_variants on SocketAddr match — make the wildcard explicit as `other @ SocketAddr::V6(_)`. * manual_async_fn on AlwaysBusyFactory test-impl of TransportFactory::bind: rewrite as async fn. - 33ce551 round-3: fix #[ignore] reasons on sd_announcements_loop tests. Align both tests under #[ignore] with an accurate reason referencing the real constraint (MULTICAST on the loopback interface); drop the stale sd_state.rs reference. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
33ce551 to
398289b
Compare
Final state of #82, squashed from 14 commits to keep the rebase onto the rewritten #81 tractable. Several intermediate commits reshape the same areas (bare_metal example reframing, test hygiene, doc updates, clippy fixes), so a single rebase against the final shape avoids fighting transient mid-PR states. Original commits, oldest first: - 287b47a Removed warns on dropped callers, removed tokio::spawn in subscribe_no_wait, response oneshot is now silent. - 6b8dfc3 subscribe_no_wait orphan cleanup. - c519d4f Added a bare_metal feature to feature flags and Cargo.toml. - cff87a8 Added a bare_metal example with no tokio, no socket2, no std::net — exercises the TransportSocket / TransportFactory / Timer / SpawnFuture trait surface end-to-end on host so that breakage of the abstraction is caught before any real bare-metal port. - e2465ef Added integration tests, gave examples of running bare_metal, added warnings against using demo code as implementation prototypes. - 5cd838b (same subject — second commit of the same change set). - d4cb3b1 Merge remote-tracking branch 'origin/feature/phase8_bare_metal' into feature/phase8_bare_metal. - 7e46c3a phase 8: reframe bare_metal example as host-side trait-surface canary. Doc comments and test descriptions clarify that the example is a trait-surface canary, not a real bare-metal target. - 16e3b84 round-2: docs + test-hygiene fixes for phase 8. - c99a9ee chore(clippy): tidy new warnings in phase8 PR. - 99eff06 docs: update stale function-name references in comments. - a45bd5b test(bare_metal_example_builds): drop runtime CARGO_MANIFEST_DIR assert. The runtime check was redundant with the at-compile-time path resolution and noisy when run outside cargo. - f770bf9 Update examples/bare_metal/src/main.rs. - 1e92f43 PR #82 round: workspace-command wording + spelling fixes. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
This PR continues the executor-agnostic refactor by routing periodic timing through the Timer trait and replacing tokio::select! call sites with futures::select! (+ fuse/pin_mut) to improve fairness and make future non-tokio backends easier to integrate (per the PR chain around #80 / #82).
Changes:
- Route periodic sleeps through
Timer(TokioTimer) in client/server loops (125ms idle tick, 1s server announcements, client SD announcement interval). - Replace
tokio::select!usage withfutures::select!(and per-iteration fused/pinned futures) in server receive loop and client socket loops. - Add optional
futuresdependency and enable it viaclient/serverfeatures; expand tests for customTransportFactoryusage.
Reviewed changes
Copilot reviewed 6 out of 7 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| src/tokio_transport.rs | Updates TokioTimer docs to reflect internal use at periodic tick sites. |
| src/server/mod.rs | Uses TokioTimer for announcement sleep; swaps recv tokio::select! for futures::select!. |
| src/client/socket_manager.rs | Uses futures::select! in socket loop; adds custom-factory tests; adjusts E2E oversize test fixture. |
| src/client/mod.rs | Makes SD announcement loop executor-driven (impl Future) and routes timing via Timer. |
| src/client/inner.rs | Uses futures::select! in main run loop and routes idle tick through TokioTimer. |
| Cargo.toml | Adds optional futures dependency and wires it into client/server features. |
| Cargo.lock | Locks new futures transitive dependencies. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| Outcome::Recv(Err(recv_err)) => { | ||
| error!("Transport recv failed: {:?}", recv_err); |
| // Craft a message whose raw-encoded size fits UDP_BUFFER_SIZE (16-byte | ||
| // header + 1480-byte payload = 1496 bytes) but whose E2E-protected | ||
| // size does not (payload grows by PROFILE4_HEADER_SIZE = 12, pushing | ||
| // the total to 1508 bytes, 8 over MTU). | ||
| let payload_bytes = [0u8; 1480]; |
| .await | ||
| .expect("send_to via custom-factory-built socket"); | ||
|
|
||
| let mut buf = [0u8; 1500]; |
| if request_queue.push_back(ctrl).is_err() { | ||
| // Queue full: the rejected ControlMessage is | ||
| // dropped, so any oneshot senders inside it | ||
| // cancel — callers awaiting those receivers | ||
| // will observe `RecvError`. | ||
| warn!( | ||
| "request_queue at capacity ({}); dropping control message", | ||
| REQUEST_QUEUE_CAP | ||
| ); |
| Outcome::Send(Some(send_message)) => { | ||
| trace!("Sending: {:?}", &send_message); | ||
| let mut message_length = match send_message | ||
| .message | ||
| .encode(&mut buf.as_mut_slice()) | ||
| { | ||
| Ok(length) => length, | ||
| Err(e) => { | ||
| // This arm is the transport-level recv_from | ||
| // result; decoding runs further up inside | ||
| // `MessageView::parse`. An `Err` here is an | ||
| // I/O failure on the socket read, not a | ||
| // decode failure. | ||
| // | ||
| // `map_io_error` in tokio_transport already | ||
| // logs the raw OS error + kind (at `warn!` | ||
| // for actionable kinds, `debug!` for | ||
| // steady-state noise like `TimedOut`), so | ||
| // stay at `debug!` here to avoid double- | ||
| // logging the same failure at `error!`. | ||
| debug!("recv_from returned error on socket loop: {:?}", e); | ||
| } | ||
| } | ||
| }, | ||
| message = tx_rx.recv() => { | ||
| if let Some(send_message) = message { | ||
| trace!("Sending: {:?}", &send_message); | ||
| // Fail fast with the capacity error rather than | ||
| // letting `encode` report a less-actionable | ||
| // protocol I/O error when it runs out of | ||
| // buffer. Matches the E2E-overflow arm below | ||
| // and the server event_publisher path. | ||
| let required_size = send_message.message.required_size(); | ||
| if required_size > UDP_BUFFER_SIZE { | ||
| error!( | ||
| "outgoing message ({} bytes) exceeds UDP_BUFFER_SIZE ({}); dropping send", | ||
| required_size, UDP_BUFFER_SIZE | ||
| ); | ||
| let _ = send_message.response.send(Err(Error::Capacity("udp_buffer"))); | ||
| continue; | ||
| } | ||
| let mut message_length = match send_message.message.encode(&mut buf.as_mut_slice()) { | ||
| Ok(length) => length, | ||
| Err(e) => { | ||
| error!("Failed to encode message: {:?}", e); | ||
| // If the sender is already closed we can't send the error back, so we shut everything down | ||
| if send_message.response.send(Err(e.into())).is_err() { | ||
| error!("Socket owner closed channel unexpectedly, closing socket."); | ||
| break; | ||
| } | ||
| error!("Failed to encode message: {:?}", e); | ||
| // If the sender is already closed we can't send the error back, so we shut everything down | ||
| if let Ok(()) = send_message.response.send(Err(e.into())) { | ||
| // Successfully sent error back to sender, carry on | ||
| continue; | ||
| } | ||
| }; | ||
|
|
||
| // Apply E2E protect if configured. `protected` | ||
| // is a disjoint stack buffer, so the input can | ||
| // be borrowed directly out of `buf[16..]` with | ||
| // no intermediate copy. | ||
| { | ||
| let key = E2EKey::from_message_id(send_message.message.header().message_id()); | ||
| let mut registry = e2e_registry.lock().expect("e2e registry lock poisoned"); | ||
| if registry.contains_key(&key) { | ||
| let upper_header: [u8; 8] = buf[8..16].try_into().expect("upper header slice"); | ||
| let mut protected = [0u8; UDP_BUFFER_SIZE]; | ||
| let result = registry.protect( | ||
| key, | ||
| &buf[16..message_length], | ||
| upper_header, | ||
| &mut protected, | ||
| ); | ||
| match result { | ||
| Some(Ok(protected_len)) => { | ||
| if 16 + protected_len > UDP_BUFFER_SIZE { | ||
| error!( | ||
| "E2E-protected datagram ({} bytes, header + protected payload) exceeds UDP_BUFFER_SIZE ({}); dropping send", | ||
| 16 + protected_len, UDP_BUFFER_SIZE | ||
| ); | ||
| let _ = send_message.response.send(Err(Error::Capacity("udp_buffer"))); | ||
| continue; | ||
| } | ||
| #[allow(clippy::cast_possible_truncation)] | ||
| let new_length: u32 = 8 + protected_len as u32; | ||
| buf[4..8].copy_from_slice(&new_length.to_be_bytes()); | ||
| buf[16..16 + protected_len].copy_from_slice(&protected[..protected_len]); | ||
| message_length = 16 + protected_len; | ||
| } | ||
| Some(Err(e)) => { | ||
| error!("E2E protect error: {:?}", e); | ||
| error!("Socket owner closed channel unexpectedly, closing socket."); | ||
| break; | ||
| } | ||
| }; |
|
Closing without merge to declutter the stack: this phase's changes are carried in full by the consolidated lineage under PR #114 (phase 21), which the next development stack builds on. Branch is retained. |
This PR is part of a chain. See Prev: #80; See Next: #82