From 7452273988a41df9a203d62a6ee7f5153567af77 Mon Sep 17 00:00:00 2001 From: Mathieu David Date: Sun, 21 Jun 2026 01:54:23 +0200 Subject: [PATCH 1/2] feat: add push-callback registration for rcl primitives Add an opt-in way for primitives to report readiness via rcl's push callbacks (rcl_*_set_on_new_*_callback) instead of being polled in a wait set, as the foundation for an event-driven executor. `RclPrimitive::register_on_ready` installs a callback that the middleware invokes when the entity becomes ready and returns an `OnReadyHandle` (RAII) that deregisters on drop. `OnReadyRegistration` wraps the unsafe rcl setter: it boxes the callback context for a stable address and, on drop, clears the callback before freeing the context (finalizing the rcl entity first) so the middleware can never invoke a freed context during teardown. Implemented for subscriptions, services, and clients. No executor consumes this yet, so the basic executor is unchanged. --- rclrs/src/client.rs | 25 +++ rclrs/src/executor.rs | 2 + rclrs/src/executor/event_callback.rs | 243 +++++++++++++++++++++++++++ rclrs/src/service.rs | 25 +++ rclrs/src/subscription.rs | 35 ++++ rclrs/src/wait_set/rcl_primitive.rs | 30 ++++ 6 files changed, 360 insertions(+) create mode 100644 rclrs/src/executor/event_callback.rs diff --git a/rclrs/src/client.rs b/rclrs/src/client.rs index a229bac13..1535dc7ab 100644 --- a/rclrs/src/client.rs +++ b/rclrs/src/client.rs @@ -468,6 +468,31 @@ where fn kind(&self) -> RclPrimitiveKind { RclPrimitiveKind::Client } + + fn register_on_ready( + &self, + on_ready: Box, + ) -> Result>, RclrsError> { + // A client has a single readiness path; report it as `Basic`. + let on_ready = move |n| on_ready(ReadyKind::Basic, n); + let registration = crate::executor::event_callback::OnReadyRegistration::new( + Arc::clone(&self.handle), + set_client_on_new_response, + Box::new(on_ready), + )?; + Ok(Some(Box::new(registration))) + } +} + +/// Install (or, with a null callback/user_data, clear) the "on new response" +/// push callback used by the event-driven executor. Encapsulates the client +/// lock and the rcl call within this module. +unsafe fn set_client_on_new_response( + handle: &ClientHandle, + callback: rcl_event_callback_t, + user_data: *const std::os::raw::c_void, +) -> rcl_ret_t { + rcl_client_set_on_new_response_callback(&*handle.lock(), callback, user_data) } type SequenceNumber = i64; diff --git a/rclrs/src/executor.rs b/rclrs/src/executor.rs index 0d97f985f..10aae3cf1 100644 --- a/rclrs/src/executor.rs +++ b/rclrs/src/executor.rs @@ -1,6 +1,8 @@ mod basic_executor; pub use self::basic_executor::*; +pub(crate) mod event_callback; + use crate::{ Context, ContextHandle, GuardCondition, IntoNodeOptions, Node, RclrsError, Waitable, WeakActivityListener, diff --git a/rclrs/src/executor/event_callback.rs b/rclrs/src/executor/event_callback.rs new file mode 100644 index 000000000..fce813174 --- /dev/null +++ b/rclrs/src/executor/event_callback.rs @@ -0,0 +1,243 @@ +//! Safe RAII wrapper around rcl's "on new ___" push-callback APIs +//! (`rcl_subscription_set_on_new_message_callback` and the service/client/event +//! equivalents). +//! +//! These let an event-driven executor learn that an entity has become ready +//! *without* polling `rcl_wait`: the middleware invokes a C callback (possibly +//! from its own thread) when data arrives. We forward that to a Rust closure, +//! which an executor uses to enqueue work. +//! +//! [`OnReadyRegistration`] is generic over the entity handle type `H`. Each +//! entity module provides a [`SetOnReadyFn`] that locks its handle and calls the +//! appropriate `rcl_*_set_on_new_*_callback`; the registration owns the boxed +//! callback context and the handle, and deregisters on drop. +//! +//! # Safety model +//! +//! rcl stores the `user_data` pointer we hand it and passes it back to the C +//! callback on every event. That pointer must stay valid for as long as the +//! callback is registered. We therefore: +//! +//! - box the [`EventCallbackCtx`] so it has a stable heap address, and +//! - in `Drop`, **unregister the callback first** (so the middleware can no +//! longer invoke the trampoline) and only then free the context. +//! +//! Getting that ordering wrong is a use-after-free, since the middleware may be +//! calling the trampoline from another thread at the moment of teardown. + +use std::os::raw::c_void; +use std::sync::Arc; + +use crate::{rcl_bindings::*, OnReadyHandle, RclrsError, ToResult}; + +/// The context carried through rcl as `user_data`. Boxed so its address is +/// stable for the lifetime of the registration. +struct EventCallbackCtx { + on_ready: Box, +} + +/// The C trampoline that rcl/rmw invokes when an entity becomes ready. It may be +/// called from a middleware thread, so it does nothing but forward to the Rust +/// closure. It must not run user code or take locks that could deadlock the +/// middleware. +unsafe extern "C" fn on_ready_trampoline(user_data: *const c_void, number_of_events: usize) { + // SAFETY: `user_data` is the pointer to the `EventCallbackCtx` we passed to + // the rcl setter. It stays valid until the owning registration's `Drop` + // clears the callback, which always happens before the box is freed. + let ctx = unsafe { &*(user_data as *const EventCallbackCtx) }; + (ctx.on_ready)(number_of_events); +} + +/// A function that registers (or, with a null callback/user_data, clears) the +/// "on ready" push callback on an entity handle of type `H`. Implemented per +/// entity module so that `H`'s (private) lock and its specific +/// `rcl_*_set_on_new_*_callback` stay encapsulated there. +pub(crate) type SetOnReadyFn = unsafe fn(&H, rcl_event_callback_t, *const c_void) -> rcl_ret_t; + +/// RAII registration of a push "on ready" callback on an rcl entity. +/// +/// While alive, `on_ready(number_of_events)` is invoked by the middleware +/// whenever the entity becomes ready. Dropping it unregisters the callback +/// before releasing the context, so the middleware can never call into freed +/// memory. +pub(crate) struct OnReadyRegistration { + set_callback: SetOnReadyFn, + // Field order is important for teardown safety: `handle` is declared + // (and therefore dropped) before `ctx`. Dropping the last `Arc` + // finalizes the rcl entity (destroying the middleware reader), so by the + // time `ctx` is freed no middleware thread can still invoke the trampoline + // against it. This mirrors rclcpp, which frees its callback storage only + // after `rcl_*_fini`. See `Drop` below. + handle: Arc, + + // Never read directly, held only so its `Drop` frees the callback context. + #[allow(dead_code)] + ctx: CtxBox, +} + +impl OnReadyHandle for OnReadyRegistration {} + +impl OnReadyRegistration { + /// Register `on_ready` to be called by the middleware whenever the entity + /// becomes ready. `set_callback` locks `handle` and installs the trampoline. + pub(crate) fn new( + handle: Arc, + set_callback: SetOnReadyFn, + on_ready: Box, + ) -> Result { + let ctx = Box::into_raw(Box::new(EventCallbackCtx { on_ready })); + + // SAFETY: `ctx` points to a live, heap-stable context that outlives the + // registration (only freed once, when the `CtxBox` field is dropped). + let result = + unsafe { set_callback(&handle, Some(on_ready_trampoline), ctx as *const c_void).ok() }; + + if let Err(err) = result { + // Registration failed, so nothing else references `ctx`. Reclaim it. + // SAFETY: `ctx` came from `Box::into_raw` above and was never + // successfully registered. + unsafe { + drop(Box::from_raw(ctx)); + } + return Err(err); + } + + Ok(Self { + set_callback, + handle, + ctx: CtxBox(ctx), + }) + } +} + +impl Drop for OnReadyRegistration { + fn drop(&mut self) { + // Detach the callback so the middleware stops invoking the trampoline. + // The context is NOT freed here, the `ctx: CtxBox` field is dropped + // *after* `handle`, so the rcl entity is finalized before + // the context is freed, avoiding a use-after-free if a callback is still + // in flight at teardown. + // + // SAFETY: handle is valid and locked by the setter. A null callback + + // null user_data clears the registration. + unsafe { + let _ = (self.set_callback)(&self.handle, None, std::ptr::null()); + } + } +} + +/// Owns the heap-allocated [`EventCallbackCtx`] and frees it on drop. Kept as a +/// separate field of [`OnReadyRegistration`] so its drop runs *after* the +/// `handle` Arc. +struct CtxBox(*mut EventCallbackCtx); + +// SAFETY: the pointer is only dereferenced by the middleware via the trampoline +// (forwarding to a `Send + Sync` closure). It carries no thread-unsafe state. +unsafe impl Send for CtxBox {} +unsafe impl Sync for CtxBox {} + +impl Drop for CtxBox { + fn drop(&mut self) { + // SAFETY: by the time this runs, `OnReadyRegistration::drop` has cleared + // the callback and the `handle` field has been dropped (finalizing the + // rcl entity if it was the last reference), so no middleware thread can + // still be dereferencing this context. Reclaim it exactly once. + unsafe { + drop(Box::from_raw(self.0)); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::subscription::set_subscription_on_new_message; + use crate::*; + use ros_env::test_msgs::msg; + use std::sync::atomic::{AtomicUsize, Ordering}; + use std::time::{Duration, Instant}; + + /// The push callback fires when messages arrive, without ever spinning the + /// executor (i.e. without `rcl_wait`). + #[test] + fn push_callback_fires_without_spinning() -> Result<(), RclrsError> { + let executor = Context::default().create_basic_executor(); + let node = executor.create_node(&format!("test_push_callback_{}", line!()))?; + let qos = QoSProfile::default().reliable().keep_last(10); + + let publisher = node.create_publisher::("test_push_topic".qos(qos))?; + let subscription = node + .create_subscription::("test_push_topic".qos(qos), |_: msg::Empty| {})?; + + let count = Arc::new(AtomicUsize::new(0)); + let count_cb = Arc::clone(&count); + let _registration = OnReadyRegistration::new( + Arc::clone(subscription.handle()), + set_subscription_on_new_message, + Box::new(move |n| { + count_cb.fetch_add(n, Ordering::Relaxed); + }), + )?; + + // Publish repeatedly (to ride out discovery) and wait for the push + // callback to fire. We deliberately never spin the executor. + let deadline = Instant::now() + Duration::from_secs(10); + while count.load(Ordering::Relaxed) == 0 && Instant::now() < deadline { + publisher.publish(msg::Empty::default())?; + std::thread::sleep(Duration::from_millis(20)); + } + + assert!( + count.load(Ordering::Relaxed) > 0, + "push callback never fired" + ); + Ok(()) + } + + /// Rapidly create and drop registrations while messages are flowing. If the + /// drop ordering is wrong (freeing the context before unregistering), the + /// middleware thread can call into freed memory; this stresses that path. + #[test] + fn rapid_register_unregister_is_sound() -> Result<(), RclrsError> { + let executor = Context::default().create_basic_executor(); + let node = executor.create_node(&format!("test_push_raii_{}", line!()))?; + let qos = QoSProfile::default().reliable().keep_last(10); + + let publisher = node.create_publisher::("test_push_raii_topic".qos(qos))?; + let subscription = node.create_subscription::( + "test_push_raii_topic".qos(qos), + |_: msg::Empty| {}, + )?; + + // A background thread floods the topic the whole time. + let stop = Arc::new(std::sync::atomic::AtomicBool::new(false)); + let stop_pub = Arc::clone(&stop); + let flood = std::thread::spawn(move || { + while !stop_pub.load(Ordering::Acquire) { + let _ = publisher.publish(msg::Empty::default()); + std::thread::sleep(Duration::from_micros(50)); + } + }); + + // Register/unregister many times against the live subscription. + for _ in 0..2000 { + let count = Arc::new(AtomicUsize::new(0)); + let count_cb = Arc::clone(&count); + let registration = OnReadyRegistration::new( + Arc::clone(subscription.handle()), + set_subscription_on_new_message, + Box::new(move |n| { + count_cb.fetch_add(n, Ordering::Relaxed); + }), + )?; + // Hold briefly so the middleware can fire into this context, then drop + // (which must unregister before freeing). + std::thread::sleep(Duration::from_micros(100)); + drop(registration); + } + + stop.store(true, Ordering::Release); + flood.join().unwrap(); + Ok(()) + } +} diff --git a/rclrs/src/service.rs b/rclrs/src/service.rs index a30ec8dc4..f72accb25 100644 --- a/rclrs/src/service.rs +++ b/rclrs/src/service.rs @@ -307,6 +307,31 @@ where fn handle(&self) -> RclPrimitiveHandle<'_> { RclPrimitiveHandle::Service(self.handle.lock()) } + + fn register_on_ready( + &self, + on_ready: Box, + ) -> Result>, RclrsError> { + // A service has a single readiness path; report it as `Basic`. + let on_ready = move |n| on_ready(ReadyKind::Basic, n); + let registration = crate::executor::event_callback::OnReadyRegistration::new( + Arc::clone(&self.handle), + set_service_on_new_request, + Box::new(on_ready), + )?; + Ok(Some(Box::new(registration))) + } +} + +/// Install (or, with a null callback/user_data, clear) the "on new request" +/// push callback used by the event-driven executor. Encapsulates the service +/// lock and the rcl call within this module. +pub(crate) unsafe fn set_service_on_new_request( + handle: &ServiceHandle, + callback: rcl_event_callback_t, + user_data: *const std::os::raw::c_void, +) -> rcl_ret_t { + rcl_service_set_on_new_request_callback(&*handle.lock(), callback, user_data) } // SAFETY: The functions accessing this type, including drop(), shouldn't care about the thread diff --git a/rclrs/src/subscription.rs b/rclrs/src/subscription.rs index 16a800a9c..e58767788 100644 --- a/rclrs/src/subscription.rs +++ b/rclrs/src/subscription.rs @@ -103,6 +103,16 @@ where self.handle.topic_name() } + /// Access the handle for this subscription's underlying `rcl_subscription_t`. + /// + /// Returns the subscription handle. Only the `event_callback` tests use this + /// accessor (production code reaches the handle through its own field), so it + /// is compiled only under test rather than carried as dead code. + #[cfg(test)] + pub(crate) fn handle(&self) -> &Arc { + &self.handle + } + /// Returns the QoS settings of the subscription. pub fn qos(&self) -> QoSProfile { let options = unsafe { @@ -294,6 +304,31 @@ where fn handle(&self) -> RclPrimitiveHandle<'_> { RclPrimitiveHandle::Subscription(self.handle.lock()) } + + fn register_on_ready( + &self, + on_ready: Box, + ) -> Result>, RclrsError> { + // A subscription has a single readiness path; report it as `Basic`. + let on_ready = move |n| on_ready(ReadyKind::Basic, n); + let registration = crate::executor::event_callback::OnReadyRegistration::new( + Arc::clone(&self.handle), + set_subscription_on_new_message, + Box::new(on_ready), + )?; + Ok(Some(Box::new(registration))) + } +} + +/// Install (or, with a null callback/user_data, clear) the "on new message" +/// push callback used by the event-driven executor. Encapsulates the +/// subscription lock and the rcl call within this module. +pub(crate) unsafe fn set_subscription_on_new_message( + handle: &SubscriptionHandle, + callback: rcl_event_callback_t, + user_data: *const std::os::raw::c_void, +) -> rcl_ret_t { + rcl_subscription_set_on_new_message_callback(&*handle.lock(), callback, user_data) } // SAFETY: The functions accessing this type, including drop(), shouldn't care about the thread diff --git a/rclrs/src/wait_set/rcl_primitive.rs b/rclrs/src/wait_set/rcl_primitive.rs index c59efc54c..27f2af4e7 100644 --- a/rclrs/src/wait_set/rcl_primitive.rs +++ b/rclrs/src/wait_set/rcl_primitive.rs @@ -28,8 +28,38 @@ pub trait RclPrimitive: Send + Sync { /// Provide the handle for this primitive fn handle(&self) -> RclPrimitiveHandle<'_>; + + /// Register a push "on ready" callback so an event-driven executor can learn + /// this primitive has become ready without polling a wait set. The + /// middleware invokes `on_ready` with the [`ReadyKind`] describing *which* + /// part of the primitive became ready and the number of new events. + /// + /// Most primitives have a single readiness path and call `on_ready` with + /// [`ReadyKind::Basic`]. Composite primitives (action servers and clients) + /// register one callback per internal source and call `on_ready` with a + /// [`ReadyKind::ActionServer`]/[`ReadyKind::ActionClient`] value whose single + /// matching flag is set, so the executor knows which sub-entity to run. + /// + /// Returns `Ok(None)` for primitive kinds that have no rcl push-callback API + /// (e.g. timers and guard conditions); an event-driven executor drives those + /// by other means. The returned [`OnReadyHandle`] keeps the callback(s) + /// registered; dropping it detaches them. + fn register_on_ready( + &self, + on_ready: Box, + ) -> Result>, RclrsError> { + // Default: no push-callback support. Suppress the unused parameter. + let _ = on_ready; + Ok(None) + } } +/// RAII handle that keeps a push "on ready" callback registered with the +/// middleware (see [`RclPrimitive::register_on_ready`]). Dropping it +/// unregisters the callback, before freeing the callback's context, so an +/// event-driven executor can detach an entity simply by dropping this handle. +pub trait OnReadyHandle: Send + Sync {} + /// Enum to describe the kind of an executable. #[derive(Clone, Copy, Hash, PartialEq, Eq, PartialOrd, Ord)] pub enum RclPrimitiveKind { From 9b519ea844626adc9ae552acda997d98fdbeeb0a Mon Sep 17 00:00:00 2001 From: Mathieu David Date: Sun, 21 Jun 2026 19:06:08 +0200 Subject: [PATCH 2/2] feat: add event-driven multi-threaded Tokio executor MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add a Tokio-based executor (opt-in via the `tokio-executor` feature, enabled by default) that learns entity readiness from the rcl push callbacks added in the previous commit instead of polling rcl_wait. Each Worker drains its own mailbox on a dedicated Tokio task, so one Worker's callbacks are serialized and ordered while independent Workers run concurrently across Tokio's thread pool — multi-core concurrency with no per-event task spawn. Subscriptions, services, and clients are driven by push callbacks; timers by tokio::time. Worker tasks are gated by spinning (callbacks only run while spinning, and spin() waits for in-flight callbacks before returning); spin() honors only_next_available_work (spin_once) and reports a timeout as a Timeout error, matching the basic executor. Notifications coalesce per entity to bound the mailbox, a panicking callback is contained rather than wedging the worker, and push-callback registrations finalize the rcl entity before freeing their context to avoid a teardown use-after-free. Opt out with `default-features = false` to drop the Tokio multi-threaded runtime and macros for a lighter build. Action support follows in a separate commit. --- rclrs/Cargo.toml | 4 +- rclrs/src/executor.rs | 5 + rclrs/src/executor/tokio_executor.rs | 1636 +++++++++++++++++++++++++ rclrs/src/timer.rs | 4 + rclrs/src/wait_set/rcl_primitive.rs | 13 +- rclrs/src/wait_set/wait_set_runner.rs | 47 +- rclrs/src/wait_set/waitable.rs | 55 +- rclrs/src/worker.rs | 47 + 8 files changed, 1764 insertions(+), 47 deletions(-) create mode 100644 rclrs/src/executor/tokio_executor.rs diff --git a/rclrs/Cargo.toml b/rclrs/Cargo.toml index 430f3c3af..1c5f7a7cd 100644 --- a/rclrs/Cargo.toml +++ b/rclrs/Cargo.toml @@ -73,8 +73,10 @@ rustflags = "0.1" ament_rs = "0.3" [features] -default = [] +default = ["tokio-executor"] serde = ["dep:serde", "dep:serde-big-array", "rosidl_runtime_rs/serde", "ros-env/serde"] +# Enables the event-driven, Tokio-based multi-threaded executor +tokio-executor = ["tokio/rt-multi-thread", "tokio/macros", "tokio/time"] # This feature is solely for the purpose of being able to generate documetation without a ROS installation # The only intended usage of this feature is for docs.rs builders to work, and is not intended to be used by end users use_ros_shim = ["paste", "ros-env/use_ros_shim", "rosidl_runtime_rs/use_ros_shim"] diff --git a/rclrs/src/executor.rs b/rclrs/src/executor.rs index 10aae3cf1..a0b9ea563 100644 --- a/rclrs/src/executor.rs +++ b/rclrs/src/executor.rs @@ -3,6 +3,11 @@ pub use self::basic_executor::*; pub(crate) mod event_callback; +#[cfg(feature = "tokio-executor")] +mod tokio_executor; +#[cfg(feature = "tokio-executor")] +pub use self::tokio_executor::*; + use crate::{ Context, ContextHandle, GuardCondition, IntoNodeOptions, Node, RclrsError, Waitable, WeakActivityListener, diff --git a/rclrs/src/executor/tokio_executor.rs b/rclrs/src/executor/tokio_executor.rs new file mode 100644 index 000000000..657645cfb --- /dev/null +++ b/rclrs/src/executor/tokio_executor.rs @@ -0,0 +1,1636 @@ +//! Event-driven, Tokio-backed executor for rclrs. +//! +//! Readiness is push-based (rcl `set_on_new_*_callback`), not polled. Each +//! **Worker** (the node's default group is its main worker) gets its own +//! `tokio::mpsc` mailbox and one spawned task that drains it. Because a Tokio +//! task is never polled by two threads at once, that single task gives +//! per-worker mutual exclusion *and* FIFO ordering for free; Tokio's scheduler +//! provides the thread pool, work-stealing, and M:N multiplexing. So different +//! workers run in parallel automatically, with no per-event spawn and nothing +//! for the user to configure. +//! +//! Worker tasks are **gated by spinning**: they only execute ROS entity +//! callbacks (subscriptions, services, clients, timers, actions) while `spin()` +//! is active, preserving rclrs's contract that those callbacks do not run until +//! you spin and that none are still running once `spin()` returns (quiescence is +//! enforced by waiting for in-flight callbacks before returning). +//! +//! This gating applies to entity callbacks, not to free-standing async tasks +//! spawned through the executor commands (e.g. `commands().run(..)`): those are +//! ordinary Tokio tasks and run on the runtime independently of `spin()`. +//! Code that needs work confined to spinning should put it in an entity callback +//! rather than a spawned task. + +use std::any::Any; +use std::collections::HashMap; +use std::panic::AssertUnwindSafe; +use std::sync::{ + atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering}, + Arc, Mutex, +}; +use std::time::{Duration, Instant}; + +use futures::future::BoxFuture; +use tokio::sync::{ + mpsc::{UnboundedReceiver, UnboundedSender}, + watch, Notify, +}; + +use crate::rcl_bindings::{rcl_timer_get_time_until_next_call, rcl_timer_is_ready, rcl_timer_t}; +use crate::{ + log_error, Context, ExecutorChannel, ExecutorRuntime, ExecutorWorkerOptions, OnReadyHandle, + PayloadTask, RclReturnCode, RclrsError, ReadyKind, SpinConditions, ToResult, Waitable, + WeakActivityListener, WorkerChannel, +}; + +use super::Executor; + +/// Identifies an entity within a worker. +type EntityId = u64; + +/// A message delivered to a worker's task. +enum WorkerMsg { + /// The entity became ready, take and run its callback(s). At most one such + /// message is outstanding per entity at a time (see [`WorkerEntity::scheduled`]); + /// the worker takes [`WorkerEntity::pending`] items when it handles it. + Ready { entity: EntityId }, + + /// Run a one-shot task against the worker's payload. + Payload(PayloadTask), +} + +/// An entity owned by a worker: registration inserts it, the worker task runs it. +struct WorkerEntity { + waitable: Mutex, + + /// The middleware can fire many notifications for one entity in a row. Rather + /// than queue one mailbox message per notification, we combine them: this flag + /// is set when a `Ready` for this entity is already queued and not yet + /// handled, so further notifications skip queueing another one. That keeps the + /// mailbox to at most one pending `Ready` per entity (instead of one per + /// message) even while spinning is paused. The worker clears the flag *before* + /// it drains, so a notification that races the drain queues a fresh `Ready` + /// and is never lost. + scheduled: Arc, + + /// How many ready events have been reported but not yet taken. Each + /// combined-away notification still adds its `number_of_events` here, so no + /// work is lost: the worker swaps this to zero and takes exactly that many + /// items. Using the reported count (rather than looping until a take fails) + /// keeps the take bounded for every primitive kind, including those that + /// report "empty" as success. + pending: Arc, + + /// Keeps the push callback registered; dropping it deregisters. `None` for + /// passive entities (e.g. guard conditions) or timers (driven separately). + /// Behind a `Mutex` so it can be filled in *after* the entity is inserted + /// into the registry: registering before insertion would let an early + /// middleware callback enqueue a `Ready` the worker can't resolve, which it + /// would drop, wedging the entity with its `scheduled` flag stuck set. + _on_ready: Mutex>>, +} + +/// State shared between the runtime and all workers. +struct ExecutorShared { + /// Gate the worker tasks observe: they execute only while this is `true`. + spin: watch::Sender, + + /// Promptly wakes `spin()` when a halt is requested. + halt: Arc, + + /// Number of callbacks currently executing across all workers. `spin()` + /// waits for this to reach zero before returning, so no ROS callback is + /// running once `spin()` has returned (quiescence). + active: Arc, + + /// Number of mailbox messages enqueued across all workers but not yet + /// handled (queued *or* in flight). `spin()` with `only_next_available_work` + /// uses this to detect when the currently-available work has drained. + outstanding: Arc, + + /// Errors produced by callbacks; drained and returned by `spin()`. + errors: Arc>>, + + /// Allocates entity ids across all workers. + next_entity_id: Arc, +} + +/// The per-worker event loop. Spawned once per worker (see +/// [`TokioExecutorChannel::create_worker`]). It owns the worker's payload and +/// drains its mailbox for the executor's lifetime, running each message against +/// the payload (and the worker's activity listeners) while the executor is +/// spinning. +struct WorkerLoop { + mailbox: UnboundedReceiver, + entities: Arc>>>, + payload: Box, + listeners: Arc>>, + spinning: watch::Receiver, + error_sink: Arc>>, + + /// Callbacks currently in flight across all workers; `spin()` waits for this + /// to reach zero before returning (quiescence). + active: Arc, + + /// Mailbox messages enqueued but not yet handled; `spin()` uses this to know + /// when the currently-available work has drained. + outstanding: Arc, + + /// Fires periodically so the loop can drop entities whose owning handle has + /// been released (see [`next_message`][Self::next_message]). Must be created + /// inside the Tokio runtime, so the loop is constructed in the spawned task. + reap: tokio::time::Interval, +} + +impl WorkerLoop { + /// Drain the mailbox for the executor's lifetime. Each message is gated on + /// the executor spinning, then handled. Returns when the worker's mailbox is + /// dropped or the executor itself is dropped. + async fn run(mut self) { + loop { + let Some(msg) = self.next_message().await else { + return; // worker dropped + }; + + if !self.wait_until_spinning().await { + return; // executor dropped + } + + self.handle(msg); + + // The message is fully handled: it is no longer in flight, and no + // longer counts as outstanding work for `spin()`. + self.active.fetch_sub(1, Ordering::AcqRel); + self.outstanding.fetch_sub(1, Ordering::AcqRel); + } + } + + /// Wait for the next mailbox message, reaping dropped entities on the side. + /// + /// On a periodic tick we drop entities whose owning handle has been released, + /// so we stop holding their rcl handle and push-callback registration. + /// Entities on active topics are also reaped on-event in + /// [`run_ready_entity`][Self::run_ready_entity]; this catches idle ones. + /// Returns `None` once the worker (its mailbox sender) has been dropped. + async fn next_message(&mut self) -> Option { + // Split the borrow so `select!` can poll the reap timer and the mailbox + // (two separate fields) at the same time. + let Self { + reap, + mailbox, + entities, + .. + } = self; + loop { + tokio::select! { + _ = reap.tick() => { + entities + .lock() + .unwrap() + .retain(|_, e| e.waitable.lock().unwrap().in_use()); + } + msg = mailbox.recv() => return msg, + } + } + } + + /// Count the pending message as in-flight and block until the executor is + /// spinning. + /// + /// The in-flight count is incremented *before* the gate is checked, so a + /// concurrent `spin()` closing the gate either observes this unit (and waits + /// for it) or has already closed the gate before we run anything. Either way + /// no callback runs after `spin()` returns. While parked the count is released + /// so it does not hold up quiescence. + /// + /// Returns `true` once spinning, with the in-flight count left incremented; + /// the caller decrements it once the message is handled. Returns `false` if + /// the executor was dropped, with the in-flight count already released. + async fn wait_until_spinning(&mut self) -> bool { + self.active.fetch_add(1, Ordering::AcqRel); + + loop { + if *self.spinning.borrow_and_update() { + return true; + } + + self.active.fetch_sub(1, Ordering::AcqRel); + + if self.spinning.changed().await.is_err() { + return false; // executor dropped + } + + self.active.fetch_add(1, Ordering::AcqRel); + } + } + + /// Run a single mailbox message against the worker's payload. + fn handle(&mut self, msg: WorkerMsg) { + match msg { + WorkerMsg::Ready { entity } => { + let errors = self.run_ready_entity(entity); + if !errors.is_empty() { + self.error_sink.lock().unwrap().extend(errors); + } + } + WorkerMsg::Payload(task) => { + // Contain a panic so a bad task cannot kill the worker. + if std::panic::catch_unwind(AssertUnwindSafe(|| task(&mut *self.payload))).is_err() + { + log_error!( + "rclrs.executor.tokio_executor", + "A payload task panicked; the executor contained the panic \ + and continues.", + ); + } + } + } + } + + /// Handle a `Ready` notification for `entity`: re-arm notification combining, + /// then either + /// run its callback(s) or, if its owning handle has been dropped, deregister + /// it. Returns any errors the callbacks produced. + fn run_ready_entity(&mut self, entity: EntityId) -> Vec { + // Clone the entry out under a brief lock so a callback may create new + // entities on this worker without deadlocking. + let Some(entry) = self.entities.lock().unwrap().get(&entity).cloned() else { + return Vec::new(); + }; + + // Clear the "already queued" flag *before* taking: a notification that + // arrives while we run then re-sets `scheduled` and adds to `pending`, + // queueing a fresh `Ready`, so no wakeup is lost. + entry.scheduled.store(false, Ordering::Release); + let count = entry.pending.swap(0, Ordering::AcqRel); + + let mut waitable = entry.waitable.lock().unwrap(); + if !waitable.in_use() { + // The owning handle was dropped: deregister and never run a callback + // for a dropped entity. + drop(waitable); + self.entities.lock().unwrap().remove(&entity); + return Vec::new(); + } + + let (ran, errors) = Self::execute_ready(&mut waitable, count, &mut *self.payload); + drop(waitable); + + if ran { + self.run_listeners_contained(); + } + errors + } + + /// Take up to `count` items from `waitable` and run its callback for each, + /// stopping early once a take turns up empty. + /// + /// The work runs inside `catch_unwind` so a panicking callback cannot kill + /// the worker task or leak the `active`/`outstanding` counters that `spin()` + /// waits on (which would wedge quiescence forever). The mutex guard is held + /// by the caller *outside* the closure, so it drops normally (unpoisoned) if + /// the callback unwinds. Returns whether any callback ran and any errors. + fn execute_ready( + waitable: &mut Waitable, + count: usize, + payload: &mut dyn Any, + ) -> (bool, Vec) { + let exec = std::panic::catch_unwind(AssertUnwindSafe(|| { + let mut ran = false; + let mut errors = Vec::new(); + for _ in 0..count.max(1) { + // Every primitive this executor handles has a single readiness + // path, so it always runs with `Basic`. + // + // SAFETY: `payload` is this worker's payload and `waitable` was + // registered on this worker, so its primitive expects exactly + // this payload type. + match unsafe { waitable.execute_with(ReadyKind::Basic, payload) } { + Ok(()) => ran = true, + Err(err) if err.is_take_failed() => break, + Err(err) => { + errors.push(err); + break; + } + } + } + (ran, errors) + })); + + exec.unwrap_or_else(|_| { + log_error!( + "rclrs.executor.tokio_executor", + "A callback panicked while spinning; the executor contained the \ + panic and continues. The worker's payload may now be in an \ + inconsistent state.", + ); + (false, Vec::new()) + }) + } + + /// Run this worker's activity listeners against the payload, containing any + /// panic so a bad listener cannot kill the worker task. + fn run_listeners_contained(&mut self) { + if std::panic::catch_unwind(AssertUnwindSafe(|| { + crate::worker::run_activity_listeners(&self.listeners, &mut *self.payload); + })) + .is_err() + { + log_error!( + "rclrs.executor.tokio_executor", + "A worker activity listener panicked; the executor contained the \ + panic and continues.", + ); + } + } +} + +/// When a timer will next fire, as reported by rcl. +enum NextFire { + /// Fires after this much time. + In(Duration), + /// Due now. + Now, + /// rcl could not report a time (the timer was canceled or errored). + Unavailable, +} + +/// Drives a single timer from the Tokio clock. Timers have no rcl push callback, +/// so one of these is spawned per timer: it sleeps until the timer's next +/// deadline, queues a ready message on the worker's mailbox, waits until the +/// worker has run the timer, and repeats. It exits when the timer's owning entity +/// is dropped (or the worker is gone). +struct TimerDriver { + rcl_timer: Arc>, + /// Cleared when the timer's owning entity is dropped; the driver then exits. + in_use: Arc, + id: EntityId, + mailbox: UnboundedSender, + /// The same notification-combining state the entity's push path would use, so + /// a timer fire is handled as one bounded take like any other event. + scheduled: Arc, + pending: Arc, + outstanding: Arc, +} + +impl TimerDriver { + async fn run(self) { + loop { + if !self.in_use.load(Ordering::Acquire) { + return; + } + + match self.time_until_next_fire() { + NextFire::In(delay) => tokio::time::sleep(delay).await, + NextFire::Now => tokio::task::yield_now().await, + NextFire::Unavailable => { + // Canceled or errored: don't busy-loop. Back off and re-check; + // the driver exits once the timer is dropped. + tokio::time::sleep(Duration::from_millis(50)).await; + continue; + } + } + + if !self.in_use.load(Ordering::Acquire) { + return; + } + + if !self.enqueue_tick() { + return; // worker gone + } + + if !self.wait_until_handled().await { + return; // entity dropped while waiting + } + } + } + + /// Ask rcl how long until this timer next fires. + fn time_until_next_fire(&self) -> NextFire { + let timer = self.rcl_timer.lock().unwrap(); + let mut value: i64 = 0; + + // SAFETY: handle valid and locked + let ret = unsafe { rcl_timer_get_time_until_next_call(&*timer, &mut value) }; + + match ret.ok() { + Ok(()) if value > 0 => NextFire::In(Duration::from_nanos(value as u64)), + Ok(()) => NextFire::Now, + Err(_) => NextFire::Unavailable, + } + } + + /// Queue a ready tick for this timer, combining with any already-pending one + /// (the same `scheduled`/`pending` mechanism as push callbacks). Returns + /// `false` if the worker is gone. + fn enqueue_tick(&self) -> bool { + self.pending.fetch_add(1, Ordering::AcqRel); + if !self.scheduled.swap(true, Ordering::AcqRel) { + self.outstanding.fetch_add(1, Ordering::AcqRel); + if self + .mailbox + .send(WorkerMsg::Ready { entity: self.id }) + .is_err() + { + return false; // worker gone + } + } + true + } + + /// Wait until the worker has actually run the timer (it no longer reports + /// ready) before computing the next deadline, so we don't re-fire for the + /// same deadline. Returns `false` if the timer's entity was dropped while + /// waiting, in which case the driver should exit. + async fn wait_until_handled(&self) -> bool { + loop { + if !self.in_use.load(Ordering::Acquire) { + return false; + } + if !self.is_ready() { + return true; + } + tokio::time::sleep(Duration::from_millis(1)).await; + } + } + + /// Whether the rcl timer currently reports ready. An error (e.g. the timer was + /// canceled) is treated as not ready, letting the outer loop's time query + /// handle the canceled state. + fn is_ready(&self) -> bool { + let timer = self.rcl_timer.lock().unwrap(); + let mut ready = false; + // SAFETY: handle valid and locked; out-pointer valid. + let ret = unsafe { rcl_timer_is_ready(&*timer, &mut ready) }; + ret.ok().map(|()| ready).unwrap_or(false) + } +} + +/// Block until the current spin should stop, returning whether it stopped because +/// the timeout elapsed (`true`) rather than because of a halt, a context +/// shutdown, or the available work draining (`false`). +/// +/// With `only_once` (the spin_once pattern) it waits for work to arrive, drains +/// it, then returns; while work is in flight it never times out mid-batch. Without +/// it, it runs until `stop_time` (if any), a halt, or context shutdown. +async fn block_until_stop( + halt: Arc, + context: Context, + active: Arc, + outstanding: Arc, + halt_notify: Arc, + stop_time: Option, + only_once: bool, +) -> bool { + // For `only_once`, poll tightly so we detect the available work draining + // without adding latency; otherwise a coarse poll is enough (we only re-check + // halt/timeout/context). + let poll = if only_once { + Duration::from_micros(200) + } else { + Duration::from_millis(100) + }; + + // Whether any work has been seen this spin, so `only_once` waits for work to + // arrive (up to the timeout) before declaring the batch drained. + let mut saw_work = false; + + loop { + if halt.load(Ordering::Acquire) { + return false; + } + + // Stop spinning once the ROS context is no longer valid (shutdown). + if !context.ok() { + return false; + } + + let busy = outstanding.load(Ordering::Acquire) > 0 || active.load(Ordering::Acquire) > 0; + if busy { + saw_work = true; + } + + if only_once { + // Process the currently-available work, then stop. While work is in + // flight we keep draining (never time out mid-batch); once it has + // drained we're done. If no work is in flight we wait for some to + // arrive, up to the timeout. + if saw_work && !busy { + return false; + } + + if !busy && stop_time.is_some_and(|st| Instant::now() >= st) { + return true; + } + } else if stop_time.is_some_and(|st| Instant::now() >= st) { + // Ran for the requested duration. + return true; + } + + let wait = stop_time + .map(|st| st.saturating_duration_since(Instant::now())) + .unwrap_or(poll) + .min(poll); + + tokio::select! { + _ = halt_notify.notified() => {} + _ = tokio::time::sleep(wait) => {} + } + } +} + +/// Wait until no callbacks are in flight across any worker. Used by `spin()` to +/// uphold quiescence before returning. +async fn await_quiescence(active: Arc) { + while active.load(Ordering::Acquire) > 0 { + tokio::time::sleep(Duration::from_micros(100)).await; + } +} + +/// Where the executor's tasks run: a runtime it owns, or one adopted from the +/// caller (e.g. the `#[tokio::main]` runtime). +enum RuntimeHost { + /// A runtime this executor created. Used to `block_on` in the blocking + /// `spin()`, and dropped (shutting down its worker threads) with the executor. + Owned(tokio::runtime::Runtime), + /// A runtime adopted from the caller. We only ever spawn on it; we never + /// `block_on` it or shut it down. + Adopted(tokio::runtime::Handle), +} + +impl RuntimeHost { + /// A handle for spawning, regardless of which kind of host this is. + fn handle(&self) -> tokio::runtime::Handle { + match self { + RuntimeHost::Owned(runtime) => runtime.handle().clone(), + RuntimeHost::Adopted(handle) => handle.clone(), + } + } +} + +/// A multi-threaded async executor backed by a Tokio runtime, driven by rcl push +/// callbacks, with one task per worker (see the module docs). +pub struct TokioExecutorRuntime { + host: RuntimeHost, + shared: Arc, +} + +impl TokioExecutorRuntime { + /// Create an executor that owns a fresh multi-threaded Tokio runtime. + /// + /// Users should call [`CreateTokioExecutor::create_tokio_executor`] instead. + pub(crate) fn new() -> Self { + let runtime = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .expect("Failed to create Tokio runtime for rclrs executor"); + Self::with_runtime(runtime) + } + + /// Create an executor that owns a caller-provided Tokio runtime. + /// + /// Users should call + /// [`CreateTokioExecutor::create_tokio_executor_with_runtime`] instead. + pub(crate) fn with_runtime(runtime: tokio::runtime::Runtime) -> Self { + Self::from_host(RuntimeHost::Owned(runtime)) + } + + /// Create an executor that runs on a runtime adopted from the caller, rather + /// than owning one. + /// + /// Users should call + /// [`CreateTokioExecutor::create_tokio_executor_on_current_runtime`] or + /// [`CreateTokioExecutor::create_tokio_executor_with_handle`] instead. + pub(crate) fn with_handle(handle: tokio::runtime::Handle) -> Self { + Self::from_host(RuntimeHost::Adopted(handle)) + } + + fn from_host(host: RuntimeHost) -> Self { + let (spin, _) = watch::channel(false); + Self { + host, + shared: Arc::new(ExecutorShared { + spin, + halt: Arc::new(Notify::new()), + active: Arc::new(AtomicUsize::new(0)), + outstanding: Arc::new(AtomicUsize::new(0)), + errors: Arc::new(Mutex::new(Vec::new())), + next_entity_id: Arc::new(AtomicU64::new(0)), + }), + } + } + + fn take_errors(&self) -> Vec { + std::mem::take(&mut *self.shared.errors.lock().unwrap()) + } + + /// If the spin is bounded by an until-promise, spawn a task that flips the + /// halt flag and wakes the spin once the promise resolves, and return its + /// handle. The caller aborts that handle when the spin ends, so a spin that + /// stops for another reason (timeout, shutdown) does not leave the task + /// parked on `promise.await` forever, accumulating one detached task per such + /// spin. + fn arm_until_promise( + &self, + conditions: &mut SpinConditions, + ) -> Option> { + let promise = conditions.options.until_promise_resolved.take()?; + let halt_flag = Arc::clone(&conditions.halt_spinning); + let halt_notify = Arc::clone(&self.shared.halt); + Some(self.host.handle().spawn(async move { + let _ = promise.await; + halt_flag.store(true, Ordering::Release); + halt_notify.notify_waiters(); + })) + } + + /// The async core of spinning: open the gate so worker tasks run, wait until + /// the spin should stop, close the gate, then wait for in-flight callbacks to + /// finish (quiescence) before returning. Shared by the blocking `spin()` and + /// the awaitable `spin_async()`. It never calls `block_on`, so it can run on + /// any runtime, including the caller's in the adopted case. + async fn run_spin(&self, mut conditions: SpinConditions) -> Vec { + let promise_task = self.arm_until_promise(&mut conditions); + + let stop_time = conditions.options.timeout.map(|t| Instant::now() + t); + let only_once = conditions.options.only_next_available_work; + + // Open the gate so the worker tasks process, then wait until the spin + // should stop. + let _ = self.shared.spin.send(true); + let timed_out = block_until_stop( + Arc::clone(&conditions.halt_spinning), + conditions.context.clone(), + Arc::clone(&self.shared.active), + Arc::clone(&self.shared.outstanding), + Arc::clone(&self.shared.halt), + stop_time, + only_once, + ) + .await; + + // Close the gate: worker tasks park after finishing any in-flight message. + let _ = self.shared.spin.send(false); + + // Cancel the until-promise watcher so it doesn't outlive this spin. + if let Some(task) = promise_task { + task.abort(); + } + + // Wait for any in-flight callbacks to finish, so no ROS callback is + // running once the spin returns (quiescence). + await_quiescence(Arc::clone(&self.shared.active)).await; + + // Match the basic executor's contract: a timeout is reported as a + // `Timeout` error rather than a silent return. + let mut errors = self.take_errors(); + if timed_out { + errors.push(RclrsError::RclError { + code: RclReturnCode::Timeout, + msg: None, + }); + } + errors + } +} + +impl ExecutorRuntime for TokioExecutorRuntime { + fn channel(&self) -> Arc { + Arc::new(TokioExecutorChannel { + handle: self.host.handle(), + shared: Arc::clone(&self.shared), + }) + } + + fn spin(&mut self, conditions: SpinConditions) -> Vec { + // `spin()` blocks the calling thread by driving the spin to completion. It + // must not be called from within a Tokio runtime: it would block a runtime + // worker thread (an immediate deadlock on a current-thread runtime), and + // `block_on` from inside a runtime panics anyway. Give a clear message + // instead, and steer callers in an async context to `spin_async().await`. + assert!( + tokio::runtime::Handle::try_current().is_err(), + "Executor::spin() blocks the calling thread and must not be called from \ + within a Tokio runtime. Call Executor::spin_async().await instead.", + ); + + match &self.host { + RuntimeHost::Owned(runtime) => runtime.block_on(self.run_spin(conditions)), + // `Handle::block_on` is valid here because the assert above established + // we are not inside a runtime. `clone()` avoids borrowing `self.host` + // across the `self.run_spin(..)` borrow. + RuntimeHost::Adopted(handle) => handle.clone().block_on(self.run_spin(conditions)), + } + } + + fn spin_async( + self: Box, + conditions: SpinConditions, + ) -> BoxFuture<'static, (Box, Vec)> { + // Drive the spin on whatever runtime awaits this future (the caller's, in + // the adopted case). The worker tasks run on the host runtime's threads + // regardless, so no `block_on` and no helper thread are needed. + Box::pin(async move { + let errors = self.run_spin(conditions).await; + (self as Box, errors) + }) + } +} + +struct TokioExecutorChannel { + handle: tokio::runtime::Handle, + shared: Arc, +} + +impl ExecutorChannel for TokioExecutorChannel { + fn create_worker(&self, options: ExecutorWorkerOptions) -> Arc { + let (mailbox_tx, mailbox_rx) = tokio::sync::mpsc::unbounded_channel(); + let entities = Arc::new(Mutex::new(HashMap::new())); + let listeners = Arc::new(Mutex::new(Vec::new())); + + // One task per worker. Tokio schedules it. Different workers therefore run + // concurrently, while this worker's callbacks stay serialized and ordered. + // The reap `Interval` is created inside the spawned task (i.e. within the + // Tokio runtime), which is where a Tokio timer must be constructed. + let worker_entities = Arc::clone(&entities); + let worker_listeners = Arc::clone(&listeners); + let spinning = self.shared.spin.subscribe(); + let error_sink = Arc::clone(&self.shared.errors); + let active = Arc::clone(&self.shared.active); + let outstanding = Arc::clone(&self.shared.outstanding); + let payload = options.payload; + self.handle.spawn(async move { + WorkerLoop { + mailbox: mailbox_rx, + entities: worker_entities, + payload, + listeners: worker_listeners, + spinning, + error_sink, + active, + outstanding, + reap: tokio::time::interval(Duration::from_secs(1)), + } + .run() + .await + }); + + Arc::new(TokioWorkerChannel { + handle: self.handle.clone(), + mailbox: mailbox_tx, + entities, + listeners, + errors: Arc::clone(&self.shared.errors), + next_entity_id: Arc::clone(&self.shared.next_entity_id), + outstanding: Arc::clone(&self.shared.outstanding), + }) + } + + fn wake_all_wait_sets(&self) { + // Wake any in-progress spin so it re-checks halt_spinning promptly. + self.shared.halt.notify_waiters(); + } +} + +struct TokioWorkerChannel { + handle: tokio::runtime::Handle, + mailbox: UnboundedSender, + entities: Arc>>>, + listeners: Arc>>, + errors: Arc>>, + next_entity_id: Arc, + outstanding: Arc, +} + +impl TokioWorkerChannel { + /// Build the push-callback closure for entity `id`. It combines repeated + /// middleware notifications so the mailbox holds at most one pending `Ready` + /// per entity even while spinning is paused: every notification adds its event + /// count to `pending`, but only the one that flips `scheduled` queues a + /// `Ready` (and counts it as outstanding work). The worker takes `pending` + /// items and clears `scheduled` when it handles the entity. + fn make_on_ready( + &self, + id: EntityId, + scheduled: Arc, + pending: Arc, + ) -> Box { + let mailbox = self.mailbox.clone(); + let outstanding = Arc::clone(&self.outstanding); + + Box::new(move |_kind, count| { + pending.fetch_add(count.max(1), Ordering::AcqRel); + if !scheduled.swap(true, Ordering::AcqRel) { + outstanding.fetch_add(1, Ordering::AcqRel); + let _ = mailbox.send(WorkerMsg::Ready { entity: id }); + } + }) + } +} + +impl WorkerChannel for TokioWorkerChannel { + fn add_async_task(&self, f: BoxFuture<'static, ()>) { + self.handle.spawn(f); + } + + /// Register a worker entity with the executor: insert it into the registry, + /// then install its push callback (for message-driven primitives) or spawn a + /// timer driver (for timers), so the entity's readiness reaches this worker's + /// mailbox. + /// + /// Guard conditions have no rcl push-callback API, so `register_on_ready` + /// returns `None` and they sit inert here. That is correct for the per-worker + /// wakeup guard conditions: there is no `rcl_wait` to interrupt, since new + /// entities register their callback immediately, payload tasks go straight to + /// the mailbox, and removals are reaped. The one guard condition that does + /// carry a callback is the node graph guard condition (see `node_options.rs`); + /// rmw exposes no "on trigger" callback for guard conditions, so on this + /// executor graph changes are not event-driven. This is not a correctness + /// regression: graph-change listeners (`Node::notify_on_graph_change`) + /// re-check their condition on a period regardless of notifications, so they + /// still resolve, just within that period rather than immediately. + /// Lower-latency graph-change handling is left as future work. + fn add_to_wait_set(&self, new_entity: Waitable) { + let id = self.next_entity_id.fetch_add(1, Ordering::Relaxed); + + // State for combining repeated notifications: at most one pending `Ready` + // per entity, with the event count accumulated in `pending`. + let scheduled = Arc::new(AtomicBool::new(false)); + let pending = Arc::new(AtomicUsize::new(0)); + let on_ready = self.make_on_ready(id, Arc::clone(&scheduled), Arc::clone(&pending)); + + // Grab the timer-driver inputs before `new_entity` is moved into the + // registry below. + let timer = new_entity.timer_handle(); + let in_use = new_entity.in_use_handle(); + + // Insert into the registry BEFORE registering the push callback (or + // spawning the timer/expiration drivers), so the entity is always + // resolvable by the time any readiness can enqueue a `Ready` for it. + // + // Registering first would race: an early middleware callback could fire, + // enqueue a `Ready`, and have the worker drop it (entity not found yet), + // leaving `scheduled` stuck set so no further `Ready` is ever sent. The + // `_on_ready` handle is filled in just below, once the callback is live. + let entry = Arc::new(WorkerEntity { + waitable: Mutex::new(new_entity), + scheduled: Arc::clone(&scheduled), + pending: Arc::clone(&pending), + _on_ready: Mutex::new(None), + }); + self.entities.lock().unwrap().insert(id, Arc::clone(&entry)); + + // Now register the push callback against the (already-inserted) entity. + // Holding the waitable lock here is safe: the callback only touches the + // `scheduled`/`pending` atomics and the mailbox, never the waitable. + let registration = match entry.waitable.lock().unwrap().register_on_ready(on_ready) { + Ok(registration) => registration, + Err(err) => { + // Surface the failure both in the log and via spin()'s error + // return, rather than silently leaving an inert entity. + log_error!( + "rclrs.executor.tokio_executor", + "Failed to register an on-ready callback: {err}", + ); + self.errors.lock().unwrap().push(err); + None + } + }; + *entry._on_ready.lock().unwrap() = registration; + + // Timers have no rcl push callback. Drive them from the Tokio clock. The + // driver feeds the same `scheduled`/`pending` pair as push callbacks, so a + // timer fire is one bounded take like any other event. + if let Some(rcl_timer) = timer { + self.handle.spawn( + TimerDriver { + rcl_timer, + in_use: Arc::clone(&in_use), + id, + mailbox: self.mailbox.clone(), + scheduled: Arc::clone(&scheduled), + pending: Arc::clone(&pending), + outstanding: Arc::clone(&self.outstanding), + } + .run(), + ); + } + } + + fn send_payload_task(&self, f: PayloadTask) { + // Counts as outstanding work until a worker handles it (so + // `only_next_available_work` waits for payload tasks too). + self.outstanding.fetch_add(1, Ordering::AcqRel); + let _ = self.mailbox.send(WorkerMsg::Payload(f)); + } + + fn add_activity_listener(&self, listener: WeakActivityListener) { + self.listeners.lock().unwrap().push(listener); + } +} + +/// This trait allows [`Context`] to create a Tokio-based executor. +/// +/// There are two families of constructors. The `create_tokio_executor*` methods +/// give the executor its **own** runtime. The `*_on_current_runtime` / +/// `*_with_handle` methods **adopt** a runtime the caller already has (for +/// example the one set up by `#[tokio::main]`), so ROS callbacks and the caller's +/// other Tokio work share a single runtime and thread pool. +/// +/// Inside `#[tokio::main]`, prefer the adopting constructors. An owned-runtime +/// executor is meant for a non-async context: its blocking `spin` cannot run +/// inside a runtime, and dropping the owned runtime from within a runtime panics +/// (a Tokio rule). The adopting constructors avoid both, since they own no +/// runtime. +/// +/// When adopting a runtime, note that: +/// +/// - The blocking [`Executor::spin`] must not be called from within a runtime; +/// use [`Executor::spin_async`]`.await` from async code. (This is true even +/// for an owned runtime: `spin` blocks the calling thread.) +/// - The adopted runtime must have Tokio's time driver enabled (`enable_time` or +/// `enable_all`; `#[tokio::main]` does by default), since timers and the +/// per-worker reap interval rely on it. +/// - A current-thread runtime is fine for async or short non-blocking callbacks, +/// but a blocking or CPU-bound sync callback occupies the single thread, and +/// `tokio::task::block_in_place` (the usual escape hatch) panics on a +/// current-thread runtime. Use a multi-threaded runtime if callbacks can block. +pub trait CreateTokioExecutor { + /// Create an event-driven Tokio-based executor associated with this + /// [`Context`], with its own default multi-threaded Tokio runtime. + fn create_tokio_executor(&self) -> Executor; + + /// Create an event-driven Tokio-based executor with a caller-provided Tokio + /// runtime (e.g. to control worker-thread count or names). The executor owns + /// the runtime. + fn create_tokio_executor_with_runtime(&self, runtime: tokio::runtime::Runtime) -> Executor; + + /// Create an event-driven Tokio-based executor that runs on the **current** + /// Tokio runtime instead of creating its own. Must be called from within a + /// runtime (panics otherwise, like [`tokio::runtime::Handle::current`]). See + /// the trait docs for the time-driver and current-thread caveats. + fn create_tokio_executor_on_current_runtime(&self) -> Executor; + + /// Create an event-driven Tokio-based executor that runs on the runtime + /// behind `handle`, instead of creating its own. Like + /// [`create_tokio_executor_on_current_runtime`][Self::create_tokio_executor_on_current_runtime] + /// but for callers that hold a [`Handle`][tokio::runtime::Handle] without + /// being on a runtime thread at construction time. + fn create_tokio_executor_with_handle(&self, handle: tokio::runtime::Handle) -> Executor; +} + +impl CreateTokioExecutor for Context { + fn create_tokio_executor(&self) -> Executor { + self.create_executor(TokioExecutorRuntime::new()) + } + + fn create_tokio_executor_with_runtime(&self, runtime: tokio::runtime::Runtime) -> Executor { + self.create_executor(TokioExecutorRuntime::with_runtime(runtime)) + } + + fn create_tokio_executor_on_current_runtime(&self) -> Executor { + self.create_executor(TokioExecutorRuntime::with_handle( + tokio::runtime::Handle::current(), + )) + } + + fn create_tokio_executor_with_handle(&self, handle: tokio::runtime::Handle) -> Executor { + self.create_executor(TokioExecutorRuntime::with_handle(handle)) + } +} + +#[cfg(test)] +mod tests { + use crate::*; + use ros_env::test_msgs; + use ros_env::test_msgs::msg; + use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; + use std::sync::Arc; + use std::time::{Duration, Instant}; + + /// The executor can adopt the current Tokio runtime (the `#[tokio::main]` + /// pattern) instead of owning one, and deliver messages while driven by + /// `spin_async().await` from within that runtime. + #[tokio::test(flavor = "multi_thread")] + async fn tokio_adopts_current_runtime() -> Result<(), RclrsError> { + let mut executor = Context::default().create_tokio_executor_on_current_runtime(); + let node = executor.create_node( + format!("test_tokio_adopt_mt_{}", line!()).start_parameter_services(false), + )?; + let qos = QoSProfile::default().reliable().keep_last(10); + + let received = Arc::new(AtomicUsize::new(0)); + let received_cb = Arc::clone(&received); + let _sub = node.create_subscription::( + "tokio_adopt_mt_topic".qos(qos), + move |_m: msg::Empty| { + received_cb.fetch_add(1, Ordering::Relaxed); + }, + )?; + let publisher = node.create_publisher::("tokio_adopt_mt_topic".qos(qos))?; + + // Drive the executor with `spin_async` on the current runtime (no separate + // runtime, no helper thread), republishing to ride out discovery until a + // message arrives. + let deadline = Instant::now() + Duration::from_secs(10); + while received.load(Ordering::Relaxed) == 0 && Instant::now() < deadline { + publisher.publish(msg::Empty::default())?; + let (exec, _) = executor + .spin_async(SpinOptions::spin_once().timeout(Duration::from_millis(200))) + .await; + executor = exec; + } + + assert!( + received.load(Ordering::Relaxed) > 0, + "no message delivered while running on the adopted runtime", + ); + Ok(()) + } + + /// Adopting a current-thread runtime works for a short non-blocking callback. + #[tokio::test(flavor = "current_thread")] + async fn tokio_adopts_current_thread_runtime() -> Result<(), RclrsError> { + let mut executor = Context::default().create_tokio_executor_on_current_runtime(); + let node = executor.create_node( + format!("test_tokio_adopt_ct_{}", line!()).start_parameter_services(false), + )?; + let qos = QoSProfile::default().reliable().keep_last(10); + + let received = Arc::new(AtomicUsize::new(0)); + let received_cb = Arc::clone(&received); + let _sub = node.create_subscription::( + "tokio_adopt_ct_topic".qos(qos), + move |_m: msg::Empty| { + received_cb.fetch_add(1, Ordering::Relaxed); + }, + )?; + let publisher = node.create_publisher::("tokio_adopt_ct_topic".qos(qos))?; + + let deadline = Instant::now() + Duration::from_secs(10); + while received.load(Ordering::Relaxed) == 0 && Instant::now() < deadline { + publisher.publish(msg::Empty::default())?; + let (exec, _) = executor + .spin_async(SpinOptions::spin_once().timeout(Duration::from_millis(200))) + .await; + executor = exec; + } + + assert!( + received.load(Ordering::Relaxed) > 0, + "no message delivered on an adopted current-thread runtime", + ); + Ok(()) + } + + /// Blocking `spin()` must not be called from within a Tokio runtime; it panics + /// with guidance to use `spin_async` instead. + /// + /// Uses the adopted constructor so there is no owned runtime to drop inside + /// this async test (dropping an owned Tokio runtime from within a runtime is + /// itself a panic, which is exactly why owned mode is not for `#[tokio::main]`). + #[tokio::test(flavor = "multi_thread")] + async fn tokio_blocking_spin_within_runtime_panics() { + let mut executor = Context::default().create_tokio_executor_on_current_runtime(); + let _node = executor + .create_node( + format!("test_tokio_spin_panic_{}", line!()).start_parameter_services(false), + ) + .unwrap(); + + // A scary panic message on stderr here is expected; the test asserts the + // panic happened. + let panicked = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| { + executor.spin(SpinOptions::default().timeout(Duration::from_millis(10))); + })) + .is_err(); + assert!( + panicked, + "blocking spin() inside a Tokio runtime should panic" + ); + } + + /// A spin with a timeout and no work reports a `Timeout` error, matching the + /// basic executor's contract (rather than returning silently). + #[test] + fn tokio_spin_timeout_reports_error() -> Result<(), RclrsError> { + let mut executor = Context::default().create_tokio_executor(); + let _node = executor.create_node( + format!("test_tokio_timeout_{}", line!()).start_parameter_services(false), + )?; + + let errors = executor.spin(SpinOptions::default().timeout(Duration::from_millis(20))); + assert!( + errors.iter().any(|e| matches!( + e, + RclrsError::RclError { + code: RclReturnCode::Timeout, + .. + } + )), + "expected a Timeout error from a timed-out spin, got {errors:?}", + ); + Ok(()) + } + + /// `only_next_available_work` (spin_once) drains the currently-available work + /// and returns promptly — it must not be ignored (loop forever) on the Tokio + /// path. We publish then spin_once until the message is delivered. + #[test] + fn tokio_spin_once_processes_available_work() -> Result<(), RclrsError> { + let mut executor = Context::default().create_tokio_executor(); + let node = executor.create_node( + format!("test_tokio_spin_once_{}", line!()).start_parameter_services(false), + )?; + let qos = QoSProfile::default().reliable().keep_last(10); + + let received = Arc::new(AtomicUsize::new(0)); + let received_cb = Arc::clone(&received); + let _sub = node.create_subscription::( + "tokio_spin_once_topic".qos(qos), + move |_m: msg::Empty| { + received_cb.fetch_add(1, Ordering::Relaxed); + }, + )?; + let publisher = node.create_publisher::("tokio_spin_once_topic".qos(qos))?; + + // Each spin_once waits up to its timeout for work, drains it, and returns; + // republish to ride out discovery. A wedged/ignored spin_once would never + // deliver the message and this would time out at the outer deadline. + let deadline = Instant::now() + Duration::from_secs(10); + while received.load(Ordering::Relaxed) == 0 && Instant::now() < deadline { + publisher.publish(msg::Empty::default())?; + let _ = executor.spin(SpinOptions::spin_once().timeout(Duration::from_millis(200))); + } + + assert!( + received.load(Ordering::Relaxed) > 0, + "spin_once never delivered the message (only_next_available_work ignored?)", + ); + Ok(()) + } + + /// Regression for strict quiescence: once `spin()` returns, no callback may + /// still be running. The callback signals the moment it starts (resolving the + /// until-promise, so spinning is asked to stop *while it runs*) and then + /// blocks for 400ms. `spin()` must not return until it has finished — proven + /// by `completed` being set and by the elapsed time exceeding the callback's + /// duration. Using the start-signal (rather than a fixed sleep) makes the + /// test robust to discovery/delivery latency. + #[test] + fn tokio_spin_waits_for_in_flight_callback() -> Result<(), RclrsError> { + use futures::channel::oneshot; + use std::sync::Mutex; + + let mut executor = Context::default().create_tokio_executor(); + let node = executor.create_node( + format!("test_tokio_quiescence_{}", line!()).start_parameter_services(false), + )?; + let qos = QoSProfile::default().reliable().keep_last(10); + + let count = Arc::new(AtomicUsize::new(0)); + let completed = Arc::new(AtomicBool::new(false)); + // The long blocking body runs only once "armed", so discovery stays fast. + let armed = Arc::new(AtomicBool::new(false)); + // Sender the callback uses to announce that it has started running. + let start_tx = Arc::new(Mutex::new(None::>)); + + let (count_cb, completed_cb, armed_cb, tx_cb) = ( + Arc::clone(&count), + Arc::clone(&completed), + Arc::clone(&armed), + Arc::clone(&start_tx), + ); + let _sub = node.create_subscription::( + "tokio_quiescence_topic".qos(qos), + move |_m: msg::Empty| { + count_cb.fetch_add(1, Ordering::Relaxed); + if armed_cb.swap(false, Ordering::AcqRel) { + if let Some(tx) = tx_cb.lock().unwrap().take() { + let _ = tx.send(()); + } + std::thread::sleep(Duration::from_millis(400)); + completed_cb.store(true, Ordering::Release); + } + }, + )?; + let publisher = node.create_publisher::("tokio_quiescence_topic".qos(qos))?; + + // Discovery: spin_once (fast callback) until a message lands. + let deadline = Instant::now() + Duration::from_secs(10); + while count.load(Ordering::Relaxed) == 0 && Instant::now() < deadline { + publisher.publish(msg::Empty::default())?; + let _ = executor.spin(SpinOptions::spin_once().timeout(Duration::from_millis(200))); + } + assert!( + count.load(Ordering::Relaxed) > 0, + "discovery never delivered" + ); + + // Arm, then spin until the callback *starts* (the promise resolves from + // inside it). spin() waits for the message and the callback to begin, so + // there is no fixed-window race; the 10s timeout is just a safety net. + let (tx, rx) = oneshot::channel(); + *start_tx.lock().unwrap() = Some(tx); + armed.store(true, Ordering::Release); + let halt_on_start = executor.commands().run(async move { + let _ = rx.await; + }); + + publisher.publish(msg::Empty::default())?; + let start = Instant::now(); + executor.spin( + SpinOptions::default() + .until_promise_resolved(halt_on_start) + .timeout(Duration::from_secs(10)), + ); + let elapsed = start.elapsed(); + + assert!( + completed.load(Ordering::Acquire), + "spin() returned while a callback was still running (quiescence violated)", + ); + assert!( + elapsed >= Duration::from_millis(350), + "spin() returned after {elapsed:?}, before the in-flight callback finished", + ); + Ok(()) + } + + /// Regression for notification combining / no message loss: a burst of messages + /// published while the executor is NOT spinning must all be delivered once it + /// resumes (the per-entity `pending` accumulator preserves the count), and the + /// entity must not wedge. + #[test] + fn tokio_burst_while_paused_is_delivered() -> Result<(), RclrsError> { + const BURST: usize = 20; + + let mut executor = Context::default().create_tokio_executor(); + let node = executor + .create_node(format!("test_tokio_burst_{}", line!()).start_parameter_services(false))?; + let qos = QoSProfile::default().reliable().keep_last(100); + + let received = Arc::new(AtomicUsize::new(0)); + let received_cb = Arc::clone(&received); + let _sub = node.create_subscription::( + "tokio_burst_topic".qos(qos), + move |_m: msg::Empty| { + received_cb.fetch_add(1, Ordering::Relaxed); + }, + )?; + let publisher = node.create_publisher::("tokio_burst_topic".qos(qos))?; + + // Discovery: get one message through so pub/sub are matched. + let deadline = Instant::now() + Duration::from_secs(10); + while received.load(Ordering::Relaxed) == 0 && Instant::now() < deadline { + publisher.publish(msg::Empty::default())?; + let _ = executor.spin(SpinOptions::spin_once().timeout(Duration::from_millis(200))); + } + let baseline = received.load(Ordering::Relaxed); + + // Burst while NOT spinning: these fire push callbacks that combine into a + // single queued `Ready` whose accumulated count is BURST. + for _ in 0..BURST { + publisher.publish(msg::Empty::default())?; + } + std::thread::sleep(Duration::from_millis(300)); + + // Resume: a wedged entity or lost count would leave us short. + let target = baseline + BURST; + let deadline = Instant::now() + Duration::from_secs(10); + while received.load(Ordering::Relaxed) < target && Instant::now() < deadline { + let _ = executor.spin(SpinOptions::spin_once().timeout(Duration::from_millis(200))); + } + + assert!( + received.load(Ordering::Relaxed) >= target, + "only {} of {} messages delivered after a paused burst (combining lost work or wedged)", + received.load(Ordering::Relaxed), + target, + ); + Ok(()) + } + + /// A panicking callback must not wedge the executor: spin() must still return + /// (quiescence counters not leaked) and the worker must survive to run other + /// callbacks. Without panic containment the first spin would hang forever on + /// quiescence and this test would time out. + #[test] + fn tokio_panicking_callback_does_not_wedge() -> Result<(), RclrsError> { + let mut executor = Context::default().create_tokio_executor(); + let node = executor + .create_node(format!("test_tokio_panic_{}", line!()).start_parameter_services(false))?; + let qos = QoSProfile::default().reliable().keep_last(10); + + // A subscription whose callback always panics. + let _panic_sub = node.create_subscription::( + "tokio_panic_topic".qos(qos), + |_m: msg::Empty| panic!("intentional test panic in a callback"), + )?; + let panic_pub = node.create_publisher::("tokio_panic_topic".qos(qos))?; + + // A healthy subscription on the same worker — it must still run. + let healthy = Arc::new(AtomicUsize::new(0)); + let healthy_cb = Arc::clone(&healthy); + let _healthy_sub = node.create_subscription::( + "tokio_healthy_topic".qos(qos), + move |_m: msg::Empty| { + healthy_cb.fetch_add(1, Ordering::Relaxed); + }, + )?; + let healthy_pub = node.create_publisher::("tokio_healthy_topic".qos(qos))?; + + let deadline = Instant::now() + Duration::from_secs(10); + while healthy.load(Ordering::Relaxed) == 0 && Instant::now() < deadline { + // Each spin processes the panicking callback (contained) and the + // healthy one. If quiescence leaked, spin() here would never return. + let _ = panic_pub.publish(msg::Empty::default()); + let _ = healthy_pub.publish(msg::Empty::default()); + let _ = executor.spin(SpinOptions::spin_once().timeout(Duration::from_millis(200))); + } + + assert!( + healthy.load(Ordering::Relaxed) > 0, + "a panicking callback wedged the worker or spin() quiescence", + ); + Ok(()) + } + + /// End-to-end: a node-scoped subscription receives messages via the + /// event-driven path (push callback -> mailbox -> worker task -> callback). + #[test] + fn tokio_events_pubsub() -> Result<(), RclrsError> { + let mut executor = Context::default().create_tokio_executor(); + let node = executor.create_node( + format!("test_tokio_events_pubsub_{}", line!()).start_parameter_services(false), + )?; + let qos = QoSProfile::default().reliable().keep_last(10); + + let publisher = node.create_publisher::("tokio_events_topic".qos(qos))?; + let received = Arc::new(AtomicUsize::new(0)); + let received_cb = Arc::clone(&received); + let _sub = node.create_subscription::( + "tokio_events_topic".qos(qos), + move |_: msg::Empty| { + received_cb.fetch_add(1, Ordering::Relaxed); + }, + )?; + + let deadline = Instant::now() + Duration::from_secs(10); + while received.load(Ordering::Relaxed) == 0 && Instant::now() < deadline { + publisher.publish(msg::Empty::default())?; + executor.spin(SpinOptions::new().timeout(Duration::from_millis(50))); + std::thread::sleep(Duration::from_millis(20)); + } + + assert!( + received.load(Ordering::Relaxed) > 0, + "subscription callback never ran via the event-driven path" + ); + Ok(()) + } + + /// Callbacks must NOT run before spinning (deferred-execution guarantee that + /// the spin gate provides). + #[test] + fn tokio_events_no_callbacks_before_spin() -> Result<(), RclrsError> { + let mut executor = Context::default().create_tokio_executor(); + let node = executor.create_node( + format!("test_tokio_no_early_{}", line!()).start_parameter_services(false), + )?; + let qos = QoSProfile::default().reliable().keep_last(10); + + let publisher = node.create_publisher::("tokio_no_early_topic".qos(qos))?; + let received = Arc::new(AtomicUsize::new(0)); + let received_cb = Arc::clone(&received); + let _sub = node.create_subscription::( + "tokio_no_early_topic".qos(qos), + move |_: msg::Empty| { + received_cb.fetch_add(1, Ordering::Relaxed); + }, + )?; + + // Publish and wait WITHOUT spinning; the callback must not run. + for _ in 0..5 { + publisher.publish(msg::Empty::default())?; + } + std::thread::sleep(Duration::from_millis(300)); + assert_eq!( + received.load(Ordering::Relaxed), + 0, + "callback ran before the executor was spun" + ); + + // Now spin and confirm the buffered messages are delivered. + let deadline = Instant::now() + Duration::from_secs(10); + while received.load(Ordering::Relaxed) == 0 && Instant::now() < deadline { + publisher.publish(msg::Empty::default())?; + executor.spin(SpinOptions::new().timeout(Duration::from_millis(50))); + std::thread::sleep(Duration::from_millis(20)); + } + assert!( + received.load(Ordering::Relaxed) > 0, + "callback never ran while spinning" + ); + Ok(()) + } + + /// A dropped subscription must stop firing callbacks (its entity is pruned). + #[test] + fn tokio_events_dropped_subscription_stops() -> Result<(), RclrsError> { + let mut executor = Context::default().create_tokio_executor(); + let node = executor + .create_node(format!("test_tokio_drop_{}", line!()).start_parameter_services(false))?; + let qos = QoSProfile::default().reliable().keep_last(10); + + let publisher = node.create_publisher::("tokio_drop_topic".qos(qos))?; + let count = Arc::new(AtomicUsize::new(0)); + let count_cb = Arc::clone(&count); + let sub = node.create_subscription::( + "tokio_drop_topic".qos(qos), + move |_: msg::Empty| { + count_cb.fetch_add(1, Ordering::Relaxed); + }, + )?; + + // Confirm the subscription is delivering. + let deadline = Instant::now() + Duration::from_secs(10); + while count.load(Ordering::Relaxed) == 0 && Instant::now() < deadline { + publisher.publish(msg::Empty::default())?; + executor.spin(SpinOptions::new().timeout(Duration::from_millis(50))); + std::thread::sleep(Duration::from_millis(20)); + } + assert!( + count.load(Ordering::Relaxed) > 0, + "subscription never delivered" + ); + + // Drop it, then keep publishing + spinning: the callback must not fire again. + drop(sub); + let after_drop = count.load(Ordering::Relaxed); + for _ in 0..10 { + publisher.publish(msg::Empty::default())?; + executor.spin(SpinOptions::new().timeout(Duration::from_millis(50))); + } + assert_eq!( + count.load(Ordering::Relaxed), + after_drop, + "callback fired after the subscription was dropped" + ); + Ok(()) + } + + /// End-to-end service round-trip driven entirely by the event-driven executor. + #[test] + fn tokio_events_service_roundtrip() -> Result<(), RclrsError> { + let mut executor = Context::default().create_tokio_executor(); + let node = executor.create_node( + format!("test_tokio_events_service_{}", line!()).start_parameter_services(false), + )?; + + let _service = node.create_service::( + "tokio_events_service", + |_request: test_msgs::srv::Empty_Request| test_msgs::srv::Empty_Response::default(), + )?; + let client = node.create_client::("tokio_events_service")?; + + let deadline = Instant::now() + Duration::from_secs(10); + while !client.service_is_ready()? { + assert!(Instant::now() < deadline, "service never became ready"); + std::thread::sleep(Duration::from_millis(20)); + } + + let response: Promise = + client.call(test_msgs::srv::Empty_Request::default())?; + let (mut response, notice) = executor.commands().create_notice(response); + executor.spin( + SpinOptions::new() + .until_promise_resolved(notice) + .timeout(Duration::from_secs(5)), + ); + + assert!( + response.try_recv().ok().flatten().is_some(), + "client never received the service response via the event-driven path" + ); + Ok(()) + } + + /// Timers fire on the event-driven executor. + #[test] + fn tokio_events_timer_fires() -> Result<(), RclrsError> { + let mut executor = Context::default().create_tokio_executor(); + let node = executor.create_node( + format!("test_tokio_events_timer_{}", line!()).start_parameter_services(false), + )?; + + let count = Arc::new(AtomicUsize::new(0)); + let count_cb = Arc::clone(&count); + let _timer = node.create_timer_repeating(Duration::from_millis(10), move || { + count_cb.fetch_add(1, Ordering::Relaxed); + })?; + + executor.spin(SpinOptions::new().timeout(Duration::from_millis(300))); + + let fired = count.load(Ordering::Relaxed); + assert!( + fired >= 3, + "timer fired only {fired} times in ~300ms (expected several)" + ); + Ok(()) + } + + /// Worker-scoped subscription + `listen_until` activity listener on the + /// event-driven executor. + #[test] + fn tokio_events_worker() -> Result<(), RclrsError> { + let mut executor = Context::default().create_tokio_executor(); + let node = executor.create_node( + format!("test_tokio_worker_{}", line!()).start_parameter_services(false), + )?; + + let worker = node.create_worker::(0); + let _sub = worker.create_subscription( + "tokio_worker_topic", + |payload: &mut usize, _msg: msg::Empty| { + *payload += 1; + }, + )?; + let promise = worker.listen_until(|payload: &mut usize| (*payload > 0).then_some(*payload)); + + let publisher = node.create_publisher::("tokio_worker_topic")?; + let stop = Arc::new(AtomicBool::new(false)); + let stop_pub = Arc::clone(&stop); + let pub_thread = std::thread::spawn(move || { + while !stop_pub.load(Ordering::Acquire) { + let _ = publisher.publish(msg::Empty::default()); + std::thread::sleep(Duration::from_millis(10)); + } + }); + + let (mut promise, notice) = executor.commands().create_notice(promise); + executor.spin( + SpinOptions::new() + .until_promise_resolved(notice) + .timeout(Duration::from_secs(5)), + ); + stop.store(true, Ordering::Release); + pub_thread.join().unwrap(); + + assert!( + promise.try_recv().ok().flatten().is_some(), + "worker subscription / activity listener never fired on the event-driven executor" + ); + Ok(()) + } + + /// A node with parameter services enabled drives cleanly on the executor. + #[test] + fn tokio_events_node_with_parameter_services() -> Result<(), RclrsError> { + let mut executor = Context::default().create_tokio_executor(); + let _node = executor.create_node(&format!("test_tokio_paramsvc_{}", line!()))?; + let errors = executor.spin(SpinOptions::new().timeout(Duration::from_millis(200))); + // A bare-timeout spin reports a `Timeout` error (matching the basic + // executor); assert nothing *other* than that was produced. + assert!( + errors.iter().all(|e| matches!( + e, + RclrsError::RclError { + code: RclReturnCode::Timeout, + .. + } + )), + "spinning a node with parameter services produced unexpected errors: {errors:?}" + ); + Ok(()) + } + + /// Async tasks run on the Tokio runtime (would panic on the basic executor). + #[test] + fn tokio_async_task_runs() { + let mut executor = Context::default().create_tokio_executor(); + let _node = executor + .create_node(&format!("test_tokio_async_task_{}", line!())) + .unwrap(); + + let done = Arc::new(AtomicBool::new(false)); + let done_clone = Arc::clone(&done); + + let promise = executor.commands().run(async move { + tokio::time::sleep(Duration::from_millis(1)).await; + done_clone.store(true, Ordering::Release); + }); + + let (_, notice) = executor.commands().create_notice(promise); + executor + .spin( + SpinOptions::new() + .until_promise_resolved(notice) + .timeout(Duration::from_secs(5)), + ) + .first_error() + .unwrap(); + + assert!(done.load(Ordering::Acquire)); + } +} diff --git a/rclrs/src/timer.rs b/rclrs/src/timer.rs index 9d64e99c2..7926ffbe0 100644 --- a/rclrs/src/timer.rs +++ b/rclrs/src/timer.rs @@ -509,6 +509,10 @@ impl RclPrimitive for TimerExecutable { fn handle(&self) -> RclPrimitiveHandle<'_> { RclPrimitiveHandle::Timer(self.handle.rcl_timer.lock().unwrap()) } + + fn timer_handle(&self) -> Option>> { + Some(Arc::clone(&self.handle.rcl_timer)) + } } impl PartialEq for TimerState { diff --git a/rclrs/src/wait_set/rcl_primitive.rs b/rclrs/src/wait_set/rcl_primitive.rs index 27f2af4e7..9c298c14b 100644 --- a/rclrs/src/wait_set/rcl_primitive.rs +++ b/rclrs/src/wait_set/rcl_primitive.rs @@ -1,4 +1,7 @@ -use std::{any::Any, sync::MutexGuard}; +use std::{ + any::Any, + sync::{Arc, Mutex, MutexGuard}, +}; use crate::{log_error, rcl_bindings::*, InnerGuardConditionHandle, RclrsError, ToResult}; @@ -52,6 +55,14 @@ pub trait RclPrimitive: Send + Sync { let _ = on_ready; Ok(None) } + + /// For timer primitives, returns a clone of the underlying rcl timer handle + /// so an event-driven executor can drive it from its own clock (timers are + /// not message-driven and have no rcl push-callback API). Returns `None` for + /// every other primitive kind. + fn timer_handle(&self) -> Option>> { + None + } } /// RAII handle that keeps a push "on ready" callback registered with the diff --git a/rclrs/src/wait_set/wait_set_runner.rs b/rclrs/src/wait_set/wait_set_runner.rs index 62d39c9fb..a422fd94e 100644 --- a/rclrs/src/wait_set/wait_set_runner.rs +++ b/rclrs/src/wait_set/wait_set_runner.rs @@ -13,8 +13,8 @@ use std::{ }; use crate::{ - log_debug, log_fatal, ActivityListenerCallback, Context, ExecutorWorkerOptions, GuardCondition, - PayloadTask, Promise, RclReturnCode, RclrsError, WaitSet, Waitable, WeakActivityListener, + log_debug, log_fatal, Context, ExecutorWorkerOptions, GuardCondition, PayloadTask, Promise, + RclReturnCode, RclrsError, WaitSet, Waitable, WeakActivityListener, }; /// This is a utility class that executors can use to easily run and manage @@ -135,7 +135,6 @@ impl WaitSetRunner { /// [1]: crate::SpinOptions::until_promise_resolved pub fn run_blocking(&mut self, conditions: WaitSetRunConditions) -> Result<(), RclrsError> { let mut first_spin = true; - let mut listeners = Vec::new(); loop { // TODO(@mxgrey): SmallVec would be better suited here if we are // okay with adding that as a dependency. @@ -193,47 +192,7 @@ impl WaitSetRunner { })?; if at_least_one { - // We drain all listeners from activity_listeners to ensure that we - // don't get a deadlock from double-locking the activity_listeners - // mutex while executing one of the listeners. If the listener has - // access to the Worker then it could attempt to add another - // listener while we have the vector locked, which would cause a - // deadlock. - listeners.extend( - self.activity_listeners - .lock() - .unwrap() - .drain(..) - .filter_map(|x| x.upgrade()), - ); - - for arc_listener in &listeners { - // We pull the callback out of its mutex entirely and release - // the lock on the mutex before executing the callback. Otherwise - // if the callback triggers its own WorkerActivity to change the - // callback then we would get a deadlock from double-locking the - // mutex. - let listener = { arc_listener.lock().unwrap().take() }; - if let Some(mut listener) = listener { - match &mut listener { - ActivityListenerCallback::Listen(listen) => { - listen(&mut *self.payload); - } - ActivityListenerCallback::Inert => { - // Do nothing - } - } - - // We replace instead of assigning in case the callback - // inserted its own - arc_listener.lock().unwrap().replace(listener); - } - } - - self.activity_listeners - .lock() - .unwrap() - .extend(listeners.drain(..).map(|x| Arc::downgrade(&x))); + crate::worker::run_activity_listeners(&self.activity_listeners, &mut *self.payload); } if let Some(stop_time) = conditions.stop_time { diff --git a/rclrs/src/wait_set/waitable.rs b/rclrs/src/wait_set/waitable.rs index be771736d..62503e073 100644 --- a/rclrs/src/wait_set/waitable.rs +++ b/rclrs/src/wait_set/waitable.rs @@ -2,6 +2,9 @@ use std::sync::{ atomic::{AtomicBool, Ordering}, Arc, }; +// Only the feature-gated `timer_handle` accessor below uses `Mutex`. +#[cfg(feature = "tokio-executor")] +use std::sync::Mutex; use crate::{ error::ToResult, log_error, rcl_bindings::*, ActionClientReady, ActionServerReady, @@ -43,10 +46,60 @@ impl Waitable { self.index_in_wait_set.is_some() } - pub(super) fn in_use(&self) -> bool { + /// Whether this waitable is still in use (its owning entity has not been + /// dropped). Used by the wait set to drop finished entries, and by an + /// event-driven executor to know when to deregister. + pub(crate) fn in_use(&self) -> bool { self.in_use.load(Ordering::Relaxed) } + /// Register a push "on ready" callback for an event-driven executor. + /// Delegates to the wrapped primitive. See [`RclPrimitive::register_on_ready`]. + #[cfg(feature = "tokio-executor")] + pub(crate) fn register_on_ready( + &self, + on_ready: Box, + ) -> Result>, RclrsError> { + self.primitive.register_on_ready(on_ready) + } + + /// Execute the wrapped primitive once for an event-driven executor with the + /// given readiness, taking a single item (e.g. one message) and running its + /// callback. For most primitives `ready` is [`ReadyKind::Basic`]; for action + /// servers/clients it identifies which sub-entity became ready. + /// + /// # Safety + /// + /// `payload` must have the type the wrapped primitive expects (the type of + /// the [`Worker`][crate::Worker] that owns it). Passing a mismatched payload + /// is undefined behavior, since the primitive downcasts it. An event-driven + /// executor upholds this by only ever executing an entity against the payload + /// of the worker it was registered on. + #[cfg(feature = "tokio-executor")] + pub(crate) unsafe fn execute_with( + &mut self, + ready: ReadyKind, + payload: &mut dyn std::any::Any, + ) -> Result<(), RclrsError> { + // SAFETY: the payload-type obligation is forwarded to our caller via this + // function being `unsafe`. + unsafe { self.primitive.execute(ready, payload) } + } + + /// If this waitable wraps a timer, returns a clone of its rcl timer handle so + /// an event-driven executor can drive it from its own clock. `None` otherwise. + #[cfg(feature = "tokio-executor")] + pub(crate) fn timer_handle(&self) -> Option>> { + self.primitive.timer_handle() + } + + /// A clone of the "in use" flag, so an event-driven executor's timer driver + /// can stop once the owning entity has been dropped. + #[cfg(feature = "tokio-executor")] + pub(crate) fn in_use_handle(&self) -> Arc { + Arc::clone(&self.in_use) + } + pub(super) fn is_ready(&self, wait_set: &rcl_wait_set_t) -> Option { match self.primitive.kind() { RclPrimitiveKind::Subscription => { diff --git a/rclrs/src/worker.rs b/rclrs/src/worker.rs index dd411399a..6f508f5f4 100644 --- a/rclrs/src/worker.rs +++ b/rclrs/src/worker.rs @@ -740,6 +740,53 @@ pub enum ActivityListenerCallback { Inert, } +/// Run every activity listener against the worker `payload`. Executor runtimes +/// call this after a worker primitive has run, so listeners (e.g. those backing +/// [`WorkerState::listen_until`]) observe the possibly-updated payload. +/// +/// The listeners are drained, run, and re-inserted so that a listener which +/// mutates the listener set (by adding another listener while it runs) cannot +/// deadlock on the listener-set mutex; likewise each callback is taken out of +/// its own mutex before running. +pub(crate) fn run_activity_listeners( + activity_listeners: &Mutex>, + payload: &mut dyn Any, +) { + // We drain all listeners from activity_listeners so that we don't get a + // deadlock from double-locking the activity_listeners mutex while executing + // one of the listeners. If the listener has access to the Worker then it + // could attempt to add another listener while we have the vector locked, + // which would cause a deadlock. + let mut listeners: Vec>>> = activity_listeners + .lock() + .unwrap() + .drain(..) + .filter_map(|listener| listener.upgrade()) + .collect(); + + for arc_listener in &listeners { + // We pull the callback out of its mutex entirely and release the lock on + // the mutex before executing the callback. Otherwise, if the callback + // triggers its own WorkerActivity to change the callback, then we would + // get a deadlock from double-locking the mutex. + let listener = { arc_listener.lock().unwrap().take() }; + if let Some(mut listener) = listener { + match &mut listener { + ActivityListenerCallback::Listen(listen) => listen(payload), + ActivityListenerCallback::Inert => {} + } + // Replace instead of assigning, in case the callback inserted its own. + arc_listener.lock().unwrap().replace(listener); + } + } + + activity_listeners.lock().unwrap().extend( + listeners + .drain(..) + .map(|listener| Arc::downgrade(&listener)), + ); +} + /// This is used to determine what kind of payload a callback can accept, as /// well as what kind of callbacks can be used with it. Users should not implement /// this trait.