From 7452273988a41df9a203d62a6ee7f5153567af77 Mon Sep 17 00:00:00 2001 From: Mathieu David Date: Sun, 21 Jun 2026 01:54:23 +0200 Subject: [PATCH] feat: add push-callback registration for rcl primitives Add an opt-in way for primitives to report readiness via rcl's push callbacks (rcl_*_set_on_new_*_callback) instead of being polled in a wait set, as the foundation for an event-driven executor. `RclPrimitive::register_on_ready` installs a callback that the middleware invokes when the entity becomes ready and returns an `OnReadyHandle` (RAII) that deregisters on drop. `OnReadyRegistration` wraps the unsafe rcl setter: it boxes the callback context for a stable address and, on drop, clears the callback before freeing the context (finalizing the rcl entity first) so the middleware can never invoke a freed context during teardown. Implemented for subscriptions, services, and clients. No executor consumes this yet, so the basic executor is unchanged. --- rclrs/src/client.rs | 25 +++ rclrs/src/executor.rs | 2 + rclrs/src/executor/event_callback.rs | 243 +++++++++++++++++++++++++++ rclrs/src/service.rs | 25 +++ rclrs/src/subscription.rs | 35 ++++ rclrs/src/wait_set/rcl_primitive.rs | 30 ++++ 6 files changed, 360 insertions(+) create mode 100644 rclrs/src/executor/event_callback.rs diff --git a/rclrs/src/client.rs b/rclrs/src/client.rs index a229bac13..1535dc7ab 100644 --- a/rclrs/src/client.rs +++ b/rclrs/src/client.rs @@ -468,6 +468,31 @@ where fn kind(&self) -> RclPrimitiveKind { RclPrimitiveKind::Client } + + fn register_on_ready( + &self, + on_ready: Box, + ) -> Result>, RclrsError> { + // A client has a single readiness path; report it as `Basic`. + let on_ready = move |n| on_ready(ReadyKind::Basic, n); + let registration = crate::executor::event_callback::OnReadyRegistration::new( + Arc::clone(&self.handle), + set_client_on_new_response, + Box::new(on_ready), + )?; + Ok(Some(Box::new(registration))) + } +} + +/// Install (or, with a null callback/user_data, clear) the "on new response" +/// push callback used by the event-driven executor. Encapsulates the client +/// lock and the rcl call within this module. +unsafe fn set_client_on_new_response( + handle: &ClientHandle, + callback: rcl_event_callback_t, + user_data: *const std::os::raw::c_void, +) -> rcl_ret_t { + rcl_client_set_on_new_response_callback(&*handle.lock(), callback, user_data) } type SequenceNumber = i64; diff --git a/rclrs/src/executor.rs b/rclrs/src/executor.rs index 0d97f985f..10aae3cf1 100644 --- a/rclrs/src/executor.rs +++ b/rclrs/src/executor.rs @@ -1,6 +1,8 @@ mod basic_executor; pub use self::basic_executor::*; +pub(crate) mod event_callback; + use crate::{ Context, ContextHandle, GuardCondition, IntoNodeOptions, Node, RclrsError, Waitable, WeakActivityListener, diff --git a/rclrs/src/executor/event_callback.rs b/rclrs/src/executor/event_callback.rs new file mode 100644 index 000000000..fce813174 --- /dev/null +++ b/rclrs/src/executor/event_callback.rs @@ -0,0 +1,243 @@ +//! Safe RAII wrapper around rcl's "on new ___" push-callback APIs +//! (`rcl_subscription_set_on_new_message_callback` and the service/client/event +//! equivalents). +//! +//! These let an event-driven executor learn that an entity has become ready +//! *without* polling `rcl_wait`: the middleware invokes a C callback (possibly +//! from its own thread) when data arrives. We forward that to a Rust closure, +//! which an executor uses to enqueue work. +//! +//! [`OnReadyRegistration`] is generic over the entity handle type `H`. Each +//! entity module provides a [`SetOnReadyFn`] that locks its handle and calls the +//! appropriate `rcl_*_set_on_new_*_callback`; the registration owns the boxed +//! callback context and the handle, and deregisters on drop. +//! +//! # Safety model +//! +//! rcl stores the `user_data` pointer we hand it and passes it back to the C +//! callback on every event. That pointer must stay valid for as long as the +//! callback is registered. We therefore: +//! +//! - box the [`EventCallbackCtx`] so it has a stable heap address, and +//! - in `Drop`, **unregister the callback first** (so the middleware can no +//! longer invoke the trampoline) and only then free the context. +//! +//! Getting that ordering wrong is a use-after-free, since the middleware may be +//! calling the trampoline from another thread at the moment of teardown. + +use std::os::raw::c_void; +use std::sync::Arc; + +use crate::{rcl_bindings::*, OnReadyHandle, RclrsError, ToResult}; + +/// The context carried through rcl as `user_data`. Boxed so its address is +/// stable for the lifetime of the registration. +struct EventCallbackCtx { + on_ready: Box, +} + +/// The C trampoline that rcl/rmw invokes when an entity becomes ready. It may be +/// called from a middleware thread, so it does nothing but forward to the Rust +/// closure. It must not run user code or take locks that could deadlock the +/// middleware. +unsafe extern "C" fn on_ready_trampoline(user_data: *const c_void, number_of_events: usize) { + // SAFETY: `user_data` is the pointer to the `EventCallbackCtx` we passed to + // the rcl setter. It stays valid until the owning registration's `Drop` + // clears the callback, which always happens before the box is freed. + let ctx = unsafe { &*(user_data as *const EventCallbackCtx) }; + (ctx.on_ready)(number_of_events); +} + +/// A function that registers (or, with a null callback/user_data, clears) the +/// "on ready" push callback on an entity handle of type `H`. Implemented per +/// entity module so that `H`'s (private) lock and its specific +/// `rcl_*_set_on_new_*_callback` stay encapsulated there. +pub(crate) type SetOnReadyFn = unsafe fn(&H, rcl_event_callback_t, *const c_void) -> rcl_ret_t; + +/// RAII registration of a push "on ready" callback on an rcl entity. +/// +/// While alive, `on_ready(number_of_events)` is invoked by the middleware +/// whenever the entity becomes ready. Dropping it unregisters the callback +/// before releasing the context, so the middleware can never call into freed +/// memory. +pub(crate) struct OnReadyRegistration { + set_callback: SetOnReadyFn, + // Field order is important for teardown safety: `handle` is declared + // (and therefore dropped) before `ctx`. Dropping the last `Arc` + // finalizes the rcl entity (destroying the middleware reader), so by the + // time `ctx` is freed no middleware thread can still invoke the trampoline + // against it. This mirrors rclcpp, which frees its callback storage only + // after `rcl_*_fini`. See `Drop` below. + handle: Arc, + + // Never read directly, held only so its `Drop` frees the callback context. + #[allow(dead_code)] + ctx: CtxBox, +} + +impl OnReadyHandle for OnReadyRegistration {} + +impl OnReadyRegistration { + /// Register `on_ready` to be called by the middleware whenever the entity + /// becomes ready. `set_callback` locks `handle` and installs the trampoline. + pub(crate) fn new( + handle: Arc, + set_callback: SetOnReadyFn, + on_ready: Box, + ) -> Result { + let ctx = Box::into_raw(Box::new(EventCallbackCtx { on_ready })); + + // SAFETY: `ctx` points to a live, heap-stable context that outlives the + // registration (only freed once, when the `CtxBox` field is dropped). + let result = + unsafe { set_callback(&handle, Some(on_ready_trampoline), ctx as *const c_void).ok() }; + + if let Err(err) = result { + // Registration failed, so nothing else references `ctx`. Reclaim it. + // SAFETY: `ctx` came from `Box::into_raw` above and was never + // successfully registered. + unsafe { + drop(Box::from_raw(ctx)); + } + return Err(err); + } + + Ok(Self { + set_callback, + handle, + ctx: CtxBox(ctx), + }) + } +} + +impl Drop for OnReadyRegistration { + fn drop(&mut self) { + // Detach the callback so the middleware stops invoking the trampoline. + // The context is NOT freed here, the `ctx: CtxBox` field is dropped + // *after* `handle`, so the rcl entity is finalized before + // the context is freed, avoiding a use-after-free if a callback is still + // in flight at teardown. + // + // SAFETY: handle is valid and locked by the setter. A null callback + + // null user_data clears the registration. + unsafe { + let _ = (self.set_callback)(&self.handle, None, std::ptr::null()); + } + } +} + +/// Owns the heap-allocated [`EventCallbackCtx`] and frees it on drop. Kept as a +/// separate field of [`OnReadyRegistration`] so its drop runs *after* the +/// `handle` Arc. +struct CtxBox(*mut EventCallbackCtx); + +// SAFETY: the pointer is only dereferenced by the middleware via the trampoline +// (forwarding to a `Send + Sync` closure). It carries no thread-unsafe state. +unsafe impl Send for CtxBox {} +unsafe impl Sync for CtxBox {} + +impl Drop for CtxBox { + fn drop(&mut self) { + // SAFETY: by the time this runs, `OnReadyRegistration::drop` has cleared + // the callback and the `handle` field has been dropped (finalizing the + // rcl entity if it was the last reference), so no middleware thread can + // still be dereferencing this context. Reclaim it exactly once. + unsafe { + drop(Box::from_raw(self.0)); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::subscription::set_subscription_on_new_message; + use crate::*; + use ros_env::test_msgs::msg; + use std::sync::atomic::{AtomicUsize, Ordering}; + use std::time::{Duration, Instant}; + + /// The push callback fires when messages arrive, without ever spinning the + /// executor (i.e. without `rcl_wait`). + #[test] + fn push_callback_fires_without_spinning() -> Result<(), RclrsError> { + let executor = Context::default().create_basic_executor(); + let node = executor.create_node(&format!("test_push_callback_{}", line!()))?; + let qos = QoSProfile::default().reliable().keep_last(10); + + let publisher = node.create_publisher::("test_push_topic".qos(qos))?; + let subscription = node + .create_subscription::("test_push_topic".qos(qos), |_: msg::Empty| {})?; + + let count = Arc::new(AtomicUsize::new(0)); + let count_cb = Arc::clone(&count); + let _registration = OnReadyRegistration::new( + Arc::clone(subscription.handle()), + set_subscription_on_new_message, + Box::new(move |n| { + count_cb.fetch_add(n, Ordering::Relaxed); + }), + )?; + + // Publish repeatedly (to ride out discovery) and wait for the push + // callback to fire. We deliberately never spin the executor. + let deadline = Instant::now() + Duration::from_secs(10); + while count.load(Ordering::Relaxed) == 0 && Instant::now() < deadline { + publisher.publish(msg::Empty::default())?; + std::thread::sleep(Duration::from_millis(20)); + } + + assert!( + count.load(Ordering::Relaxed) > 0, + "push callback never fired" + ); + Ok(()) + } + + /// Rapidly create and drop registrations while messages are flowing. If the + /// drop ordering is wrong (freeing the context before unregistering), the + /// middleware thread can call into freed memory; this stresses that path. + #[test] + fn rapid_register_unregister_is_sound() -> Result<(), RclrsError> { + let executor = Context::default().create_basic_executor(); + let node = executor.create_node(&format!("test_push_raii_{}", line!()))?; + let qos = QoSProfile::default().reliable().keep_last(10); + + let publisher = node.create_publisher::("test_push_raii_topic".qos(qos))?; + let subscription = node.create_subscription::( + "test_push_raii_topic".qos(qos), + |_: msg::Empty| {}, + )?; + + // A background thread floods the topic the whole time. + let stop = Arc::new(std::sync::atomic::AtomicBool::new(false)); + let stop_pub = Arc::clone(&stop); + let flood = std::thread::spawn(move || { + while !stop_pub.load(Ordering::Acquire) { + let _ = publisher.publish(msg::Empty::default()); + std::thread::sleep(Duration::from_micros(50)); + } + }); + + // Register/unregister many times against the live subscription. + for _ in 0..2000 { + let count = Arc::new(AtomicUsize::new(0)); + let count_cb = Arc::clone(&count); + let registration = OnReadyRegistration::new( + Arc::clone(subscription.handle()), + set_subscription_on_new_message, + Box::new(move |n| { + count_cb.fetch_add(n, Ordering::Relaxed); + }), + )?; + // Hold briefly so the middleware can fire into this context, then drop + // (which must unregister before freeing). + std::thread::sleep(Duration::from_micros(100)); + drop(registration); + } + + stop.store(true, Ordering::Release); + flood.join().unwrap(); + Ok(()) + } +} diff --git a/rclrs/src/service.rs b/rclrs/src/service.rs index a30ec8dc4..f72accb25 100644 --- a/rclrs/src/service.rs +++ b/rclrs/src/service.rs @@ -307,6 +307,31 @@ where fn handle(&self) -> RclPrimitiveHandle<'_> { RclPrimitiveHandle::Service(self.handle.lock()) } + + fn register_on_ready( + &self, + on_ready: Box, + ) -> Result>, RclrsError> { + // A service has a single readiness path; report it as `Basic`. + let on_ready = move |n| on_ready(ReadyKind::Basic, n); + let registration = crate::executor::event_callback::OnReadyRegistration::new( + Arc::clone(&self.handle), + set_service_on_new_request, + Box::new(on_ready), + )?; + Ok(Some(Box::new(registration))) + } +} + +/// Install (or, with a null callback/user_data, clear) the "on new request" +/// push callback used by the event-driven executor. Encapsulates the service +/// lock and the rcl call within this module. +pub(crate) unsafe fn set_service_on_new_request( + handle: &ServiceHandle, + callback: rcl_event_callback_t, + user_data: *const std::os::raw::c_void, +) -> rcl_ret_t { + rcl_service_set_on_new_request_callback(&*handle.lock(), callback, user_data) } // SAFETY: The functions accessing this type, including drop(), shouldn't care about the thread diff --git a/rclrs/src/subscription.rs b/rclrs/src/subscription.rs index 16a800a9c..e58767788 100644 --- a/rclrs/src/subscription.rs +++ b/rclrs/src/subscription.rs @@ -103,6 +103,16 @@ where self.handle.topic_name() } + /// Access the handle for this subscription's underlying `rcl_subscription_t`. + /// + /// Returns the subscription handle. Only the `event_callback` tests use this + /// accessor (production code reaches the handle through its own field), so it + /// is compiled only under test rather than carried as dead code. + #[cfg(test)] + pub(crate) fn handle(&self) -> &Arc { + &self.handle + } + /// Returns the QoS settings of the subscription. pub fn qos(&self) -> QoSProfile { let options = unsafe { @@ -294,6 +304,31 @@ where fn handle(&self) -> RclPrimitiveHandle<'_> { RclPrimitiveHandle::Subscription(self.handle.lock()) } + + fn register_on_ready( + &self, + on_ready: Box, + ) -> Result>, RclrsError> { + // A subscription has a single readiness path; report it as `Basic`. + let on_ready = move |n| on_ready(ReadyKind::Basic, n); + let registration = crate::executor::event_callback::OnReadyRegistration::new( + Arc::clone(&self.handle), + set_subscription_on_new_message, + Box::new(on_ready), + )?; + Ok(Some(Box::new(registration))) + } +} + +/// Install (or, with a null callback/user_data, clear) the "on new message" +/// push callback used by the event-driven executor. Encapsulates the +/// subscription lock and the rcl call within this module. +pub(crate) unsafe fn set_subscription_on_new_message( + handle: &SubscriptionHandle, + callback: rcl_event_callback_t, + user_data: *const std::os::raw::c_void, +) -> rcl_ret_t { + rcl_subscription_set_on_new_message_callback(&*handle.lock(), callback, user_data) } // SAFETY: The functions accessing this type, including drop(), shouldn't care about the thread diff --git a/rclrs/src/wait_set/rcl_primitive.rs b/rclrs/src/wait_set/rcl_primitive.rs index c59efc54c..27f2af4e7 100644 --- a/rclrs/src/wait_set/rcl_primitive.rs +++ b/rclrs/src/wait_set/rcl_primitive.rs @@ -28,8 +28,38 @@ pub trait RclPrimitive: Send + Sync { /// Provide the handle for this primitive fn handle(&self) -> RclPrimitiveHandle<'_>; + + /// Register a push "on ready" callback so an event-driven executor can learn + /// this primitive has become ready without polling a wait set. The + /// middleware invokes `on_ready` with the [`ReadyKind`] describing *which* + /// part of the primitive became ready and the number of new events. + /// + /// Most primitives have a single readiness path and call `on_ready` with + /// [`ReadyKind::Basic`]. Composite primitives (action servers and clients) + /// register one callback per internal source and call `on_ready` with a + /// [`ReadyKind::ActionServer`]/[`ReadyKind::ActionClient`] value whose single + /// matching flag is set, so the executor knows which sub-entity to run. + /// + /// Returns `Ok(None)` for primitive kinds that have no rcl push-callback API + /// (e.g. timers and guard conditions); an event-driven executor drives those + /// by other means. The returned [`OnReadyHandle`] keeps the callback(s) + /// registered; dropping it detaches them. + fn register_on_ready( + &self, + on_ready: Box, + ) -> Result>, RclrsError> { + // Default: no push-callback support. Suppress the unused parameter. + let _ = on_ready; + Ok(None) + } } +/// RAII handle that keeps a push "on ready" callback registered with the +/// middleware (see [`RclPrimitive::register_on_ready`]). Dropping it +/// unregisters the callback, before freeing the callback's context, so an +/// event-driven executor can detach an entity simply by dropping this handle. +pub trait OnReadyHandle: Send + Sync {} + /// Enum to describe the kind of an executable. #[derive(Clone, Copy, Hash, PartialEq, Eq, PartialOrd, Ord)] pub enum RclPrimitiveKind {