From 6d5e7900ef8bdfb928bb21f90efbfe5d545a3802 Mon Sep 17 00:00:00 2001 From: Mathieu David Date: Thu, 16 Apr 2026 01:00:37 +0200 Subject: [PATCH 1/2] perf: add synchronous callback path for Node-scoped subscriptions Previously, all Node-scoped subscription callbacks were wrapped in async futures even when the callback was synchronous. This added 3 heap allocations per received message: Arc::clone, Box::pin(async { }), and Arc::new(Task { }) for the async task queue. Add explicit Sync/Async variants to NodeSubscriptionCallback. Sync callbacks are called directly with no Arc, no Future, and no task queue dispatch. Rename existing variants with Async prefix (breaking change). IntoNodeSubscriptionCallback now produces Sync variants, with relaxed bounds (FnMut + Send instead of Fn + Send + Sync). IntoAsyncSubscriptionCallback produces Async variants as before. --- .../into_async_subscription_callback.rs | 36 +++--- .../into_node_subscription_callback.rs | 104 +++++------------- .../node_subscription_callback.rs | 78 ++++++++++--- 3 files changed, 108 insertions(+), 110 deletions(-) diff --git a/rclrs/src/subscription/into_async_subscription_callback.rs b/rclrs/src/subscription/into_async_subscription_callback.rs index 5ed7da9b6..c2b853f0a 100644 --- a/rclrs/src/subscription/into_async_subscription_callback.rs +++ b/rclrs/src/subscription/into_async_subscription_callback.rs @@ -31,7 +31,7 @@ where Out: Future + Send + 'static, { fn into_async_subscription_callback(mut self) -> AnySubscriptionCallback { - NodeSubscriptionCallback::Regular(Box::new(move |message| Box::pin(self(message)))).into() + NodeSubscriptionCallback::AsyncRegular(Box::new(move |message| Box::pin(self(message)))).into() } } @@ -42,7 +42,7 @@ where Out: Future + Send + 'static, { fn into_async_subscription_callback(mut self) -> AnySubscriptionCallback { - NodeSubscriptionCallback::RegularWithMessageInfo(Box::new(move |message, info| { + NodeSubscriptionCallback::AsyncRegularWithMessageInfo(Box::new(move |message, info| { Box::pin(self(message, info)) })) .into() @@ -56,7 +56,7 @@ where Out: Future + Send + 'static, { fn into_async_subscription_callback(mut self) -> AnySubscriptionCallback { - NodeSubscriptionCallback::Boxed(Box::new(move |message| Box::pin(self(message)))).into() + NodeSubscriptionCallback::AsyncBoxed(Box::new(move |message| Box::pin(self(message)))).into() } } @@ -67,7 +67,7 @@ where F: Future + Send + 'static, { fn into_async_subscription_callback(mut self) -> AnySubscriptionCallback { - NodeSubscriptionCallback::BoxedWithMessageInfo(Box::new(move |message, info| { + NodeSubscriptionCallback::AsyncBoxedWithMessageInfo(Box::new(move |message, info| { Box::pin(self(message, info)) })) .into() @@ -81,7 +81,7 @@ where F: Future + Send + 'static, { fn into_async_subscription_callback(mut self) -> AnySubscriptionCallback { - NodeSubscriptionCallback::Loaned(Box::new(move |message| Box::pin(self(message)))).into() + NodeSubscriptionCallback::AsyncLoaned(Box::new(move |message| Box::pin(self(message)))).into() } } @@ -92,7 +92,7 @@ where F: Future + Send + 'static, { fn into_async_subscription_callback(mut self) -> AnySubscriptionCallback { - NodeSubscriptionCallback::LoanedWithMessageInfo(Box::new(move |message, info| { + NodeSubscriptionCallback::AsyncLoanedWithMessageInfo(Box::new(move |message, info| { Box::pin(self(message, info)) })) .into() @@ -110,68 +110,68 @@ mod tests { let cb = |_msg: TestMessage| async {}; assert!(matches!( cb.into_async_subscription_callback(), - AnySubscriptionCallback::Node(NodeSubscriptionCallback::::Regular(_)), + AnySubscriptionCallback::Node(NodeSubscriptionCallback::::AsyncRegular(_)), )); let cb = |_msg: TestMessage, _info: MessageInfo| async {}; assert!(matches!( cb.into_async_subscription_callback(), AnySubscriptionCallback::Node( - NodeSubscriptionCallback::::RegularWithMessageInfo(_) + NodeSubscriptionCallback::::AsyncRegularWithMessageInfo(_) ), )); let cb = |_msg: Box| async {}; assert!(matches!( cb.into_async_subscription_callback(), - AnySubscriptionCallback::Node(NodeSubscriptionCallback::::Boxed(_)), + AnySubscriptionCallback::Node(NodeSubscriptionCallback::::AsyncBoxed(_)), )); let cb = |_msg: Box, _info: MessageInfo| async {}; assert!(matches!( cb.into_async_subscription_callback(), AnySubscriptionCallback::Node( - NodeSubscriptionCallback::::BoxedWithMessageInfo(_) + NodeSubscriptionCallback::::AsyncBoxedWithMessageInfo(_) ), )); let cb = |_msg: ReadOnlyLoanedMessage| async {}; assert!(matches!( cb.into_async_subscription_callback(), - AnySubscriptionCallback::Node(NodeSubscriptionCallback::::Loaned(_)), + AnySubscriptionCallback::Node(NodeSubscriptionCallback::::AsyncLoaned(_)), )); let cb = |_msg: ReadOnlyLoanedMessage, _info: MessageInfo| async {}; assert!(matches!( cb.into_async_subscription_callback(), AnySubscriptionCallback::Node( - NodeSubscriptionCallback::::LoanedWithMessageInfo(_) + NodeSubscriptionCallback::::AsyncLoanedWithMessageInfo(_) ), )); assert!(matches!( test_regular.into_async_subscription_callback(), - AnySubscriptionCallback::Node(NodeSubscriptionCallback::::Regular(_)), + AnySubscriptionCallback::Node(NodeSubscriptionCallback::::AsyncRegular(_)), )); assert!(matches!( test_regular_with_info.into_async_subscription_callback(), AnySubscriptionCallback::Node( - NodeSubscriptionCallback::::RegularWithMessageInfo(_) + NodeSubscriptionCallback::::AsyncRegularWithMessageInfo(_) ), )); assert!(matches!( test_boxed.into_async_subscription_callback(), - AnySubscriptionCallback::Node(NodeSubscriptionCallback::::Boxed(_)), + AnySubscriptionCallback::Node(NodeSubscriptionCallback::::AsyncBoxed(_)), )); assert!(matches!( test_boxed_with_info.into_async_subscription_callback(), AnySubscriptionCallback::Node( - NodeSubscriptionCallback::::BoxedWithMessageInfo(_) + NodeSubscriptionCallback::::AsyncBoxedWithMessageInfo(_) ), )); assert!(matches!( test_loaned.into_async_subscription_callback(), - AnySubscriptionCallback::Node(NodeSubscriptionCallback::::Loaned(_)), + AnySubscriptionCallback::Node(NodeSubscriptionCallback::::AsyncLoaned(_)), )); assert!(matches!( test_loaned_with_info.into_async_subscription_callback(), AnySubscriptionCallback::Node( - NodeSubscriptionCallback::::LoanedWithMessageInfo(_) + NodeSubscriptionCallback::::AsyncLoanedWithMessageInfo(_) ), )); } diff --git a/rclrs/src/subscription/into_node_subscription_callback.rs b/rclrs/src/subscription/into_node_subscription_callback.rs index 927ff38be..e39d53de4 100644 --- a/rclrs/src/subscription/into_node_subscription_callback.rs +++ b/rclrs/src/subscription/into_node_subscription_callback.rs @@ -4,17 +4,15 @@ use crate::{ AnySubscriptionCallback, MessageInfo, NodeSubscriptionCallback, ReadOnlyLoanedMessage, }; -use std::sync::Arc; - /// A trait for regular callbacks of subscriptions. /// /// Subscription callbacks support six signatures: -/// - [`Fn`] ( `Message` ) -/// - [`Fn`] ( `Message`, [`MessageInfo`] ) -/// - [`Fn`] ( [`Box`]<`Message`> ) -/// - [`Fn`] ( [`Box`]<`Message`>, [`MessageInfo`] ) -/// - [`Fn`] ( [`ReadOnlyLoanedMessage`]<`Message`> ) -/// - [`Fn`] ( [`ReadOnlyLoanedMessage`]<`Message`>, [`MessageInfo`] ) +/// - [`FnMut`] ( `Message` ) +/// - [`FnMut`] ( `Message`, [`MessageInfo`] ) +/// - [`FnMut`] ( [`Box`]<`Message`> ) +/// - [`FnMut`] ( [`Box`]<`Message`>, [`MessageInfo`] ) +/// - [`FnMut`] ( [`ReadOnlyLoanedMessage`]<`Message`> ) +/// - [`FnMut`] ( [`ReadOnlyLoanedMessage`]<`Message`>, [`MessageInfo`] ) pub trait IntoNodeSubscriptionCallback: Send + 'static where T: Message, @@ -28,102 +26,60 @@ where impl IntoNodeSubscriptionCallback for Func where T: Message, - Func: Fn(T) + Send + Sync + 'static, + Func: FnMut(T) + Send + 'static, { fn into_node_subscription_callback(self) -> AnySubscriptionCallback { - let func = Arc::new(self); - NodeSubscriptionCallback::Regular(Box::new(move |message| { - let f = Arc::clone(&func); - Box::pin(async move { - f(message); - }) - })) - .into() + NodeSubscriptionCallback::SyncRegular(Box::new(self)).into() } } impl IntoNodeSubscriptionCallback for Func where T: Message, - Func: Fn(T, MessageInfo) + Send + Sync + 'static, + Func: FnMut(T, MessageInfo) + Send + 'static, { fn into_node_subscription_callback(self) -> AnySubscriptionCallback { - let func = Arc::new(self); - NodeSubscriptionCallback::RegularWithMessageInfo(Box::new(move |message, info| { - let f = Arc::clone(&func); - Box::pin(async move { - f(message, info); - }) - })) - .into() + NodeSubscriptionCallback::SyncRegularWithMessageInfo(Box::new(self)).into() } } impl IntoNodeSubscriptionCallback,)> for Func where T: Message, - Func: Fn(Box) + Send + Sync + 'static, + Func: FnMut(Box) + Send + 'static, { fn into_node_subscription_callback(self) -> AnySubscriptionCallback { - let func = Arc::new(self); - NodeSubscriptionCallback::Boxed(Box::new(move |message| { - let f = Arc::clone(&func); - Box::pin(async move { - f(message); - }) - })) - .into() + NodeSubscriptionCallback::SyncBoxed(Box::new(self)).into() } } impl IntoNodeSubscriptionCallback, MessageInfo)> for Func where T: Message, - Func: Fn(Box, MessageInfo) + Send + Sync + 'static, + Func: FnMut(Box, MessageInfo) + Send + 'static, { fn into_node_subscription_callback(self) -> AnySubscriptionCallback { - let func = Arc::new(self); - NodeSubscriptionCallback::BoxedWithMessageInfo(Box::new(move |message, info| { - let f = Arc::clone(&func); - Box::pin(async move { - f(message, info); - }) - })) - .into() + NodeSubscriptionCallback::SyncBoxedWithMessageInfo(Box::new(self)).into() } } impl IntoNodeSubscriptionCallback,)> for Func where T: Message, - Func: Fn(ReadOnlyLoanedMessage) + Send + Sync + 'static, + Func: FnMut(ReadOnlyLoanedMessage) + Send + 'static, { fn into_node_subscription_callback(self) -> AnySubscriptionCallback { - let func = Arc::new(self); - NodeSubscriptionCallback::Loaned(Box::new(move |message| { - let f = Arc::clone(&func); - Box::pin(async move { - f(message); - }) - })) - .into() + NodeSubscriptionCallback::SyncLoaned(Box::new(self)).into() } } impl IntoNodeSubscriptionCallback, MessageInfo)> for Func where T: Message, - Func: Fn(ReadOnlyLoanedMessage, MessageInfo) + Send + Sync + 'static, + Func: FnMut(ReadOnlyLoanedMessage, MessageInfo) + Send + 'static, { fn into_node_subscription_callback(self) -> AnySubscriptionCallback { - let func = Arc::new(self); - NodeSubscriptionCallback::LoanedWithMessageInfo(Box::new(move |message, info| { - let f = Arc::clone(&func); - Box::pin(async move { - f(message, info); - }) - })) - .into() + NodeSubscriptionCallback::SyncLoanedWithMessageInfo(Box::new(self)).into() } } @@ -138,68 +94,68 @@ mod tests { let cb = |_msg: TestMessage| {}; assert!(matches!( cb.into_node_subscription_callback(), - AnySubscriptionCallback::Node(NodeSubscriptionCallback::::Regular(_)), + AnySubscriptionCallback::Node(NodeSubscriptionCallback::::SyncRegular(_)), )); let cb = |_msg: TestMessage, _info: MessageInfo| {}; assert!(matches!( cb.into_node_subscription_callback(), AnySubscriptionCallback::Node( - NodeSubscriptionCallback::::RegularWithMessageInfo(_) + NodeSubscriptionCallback::::SyncRegularWithMessageInfo(_) ) )); let cb = |_msg: Box| {}; assert!(matches!( cb.into_node_subscription_callback(), - AnySubscriptionCallback::Node(NodeSubscriptionCallback::::Boxed(_)), + AnySubscriptionCallback::Node(NodeSubscriptionCallback::::SyncBoxed(_)), )); let cb = |_msg: Box, _info: MessageInfo| {}; assert!(matches!( cb.into_node_subscription_callback(), AnySubscriptionCallback::Node( - NodeSubscriptionCallback::::BoxedWithMessageInfo(_) + NodeSubscriptionCallback::::SyncBoxedWithMessageInfo(_) ), )); let cb = |_msg: ReadOnlyLoanedMessage| {}; assert!(matches!( cb.into_node_subscription_callback(), - AnySubscriptionCallback::Node(NodeSubscriptionCallback::::Loaned(_)), + AnySubscriptionCallback::Node(NodeSubscriptionCallback::::SyncLoaned(_)), )); let cb = |_msg: ReadOnlyLoanedMessage, _info: MessageInfo| {}; assert!(matches!( cb.into_node_subscription_callback(), AnySubscriptionCallback::Node( - NodeSubscriptionCallback::::LoanedWithMessageInfo(_) + NodeSubscriptionCallback::::SyncLoanedWithMessageInfo(_) ), )); assert!(matches!( test_regular.into_node_subscription_callback(), - AnySubscriptionCallback::Node(NodeSubscriptionCallback::::Regular(_)), + AnySubscriptionCallback::Node(NodeSubscriptionCallback::::SyncRegular(_)), )); assert!(matches!( test_regular_with_info.into_node_subscription_callback(), AnySubscriptionCallback::Node( - NodeSubscriptionCallback::::RegularWithMessageInfo(_) + NodeSubscriptionCallback::::SyncRegularWithMessageInfo(_) ), )); assert!(matches!( test_boxed.into_node_subscription_callback(), - AnySubscriptionCallback::Node(NodeSubscriptionCallback::::Boxed(_)), + AnySubscriptionCallback::Node(NodeSubscriptionCallback::::SyncBoxed(_)), )); assert!(matches!( test_boxed_with_info.into_node_subscription_callback(), AnySubscriptionCallback::Node( - NodeSubscriptionCallback::::BoxedWithMessageInfo(_) + NodeSubscriptionCallback::::SyncBoxedWithMessageInfo(_) ), )); assert!(matches!( test_loaned.into_node_subscription_callback(), - AnySubscriptionCallback::Node(NodeSubscriptionCallback::::Loaned(_)), + AnySubscriptionCallback::Node(NodeSubscriptionCallback::::SyncLoaned(_)), )); assert!(matches!( test_loaned_with_info.into_node_subscription_callback(), AnySubscriptionCallback::Node( - NodeSubscriptionCallback::::LoanedWithMessageInfo(_) + NodeSubscriptionCallback::::SyncLoanedWithMessageInfo(_) ), )); } diff --git a/rclrs/src/subscription/node_subscription_callback.rs b/rclrs/src/subscription/node_subscription_callback.rs index 0b9b6347c..dd2ff9eb9 100644 --- a/rclrs/src/subscription/node_subscription_callback.rs +++ b/rclrs/src/subscription/node_subscription_callback.rs @@ -16,20 +16,36 @@ use std::sync::Arc; /// [1]: crate::IntoNodeSubscriptionCallback /// [2]: crate::IntoAsyncSubscriptionCallback pub enum NodeSubscriptionCallback { - /// A callback with only the message as an argument. - Regular(Box BoxFuture<'static, ()> + Send>), - /// A callback with the message and the message info as arguments. - RegularWithMessageInfo(Box BoxFuture<'static, ()> + Send>), - /// A callback with only the boxed message as an argument. - Boxed(Box) -> BoxFuture<'static, ()> + Send>), - /// A callback with the boxed message and the message info as arguments. - BoxedWithMessageInfo(Box, MessageInfo) -> BoxFuture<'static, ()> + Send>), - /// A callback with only the loaned message as an argument. + // Sync variants — callback is called directly, no async wrapping. + /// A synchronous callback with only the message as an argument. + SyncRegular(Box), + /// A synchronous callback with the message and the message info as arguments. + SyncRegularWithMessageInfo(Box), + /// A synchronous callback with only the boxed message as an argument. + SyncBoxed(Box) + Send>), + /// A synchronous callback with the boxed message and the message info as arguments. + SyncBoxedWithMessageInfo(Box, MessageInfo) + Send>), + /// A synchronous callback with only the loaned message as an argument. + SyncLoaned(Box) + Send>), + /// A synchronous callback with the loaned message and the message info as arguments. + SyncLoanedWithMessageInfo(Box, MessageInfo) + Send>), + // Async variants — callback returns a future, dispatched via the executor task queue. + /// An async callback with only the message as an argument. + AsyncRegular(Box BoxFuture<'static, ()> + Send>), + /// An async callback with the message and the message info as arguments. + AsyncRegularWithMessageInfo(Box BoxFuture<'static, ()> + Send>), + /// An async callback with only the boxed message as an argument. + AsyncBoxed(Box) -> BoxFuture<'static, ()> + Send>), + /// An async callback with the boxed message and the message info as arguments. + AsyncBoxedWithMessageInfo( + Box, MessageInfo) -> BoxFuture<'static, ()> + Send>, + ), + /// An async callback with only the loaned message as an argument. #[allow(clippy::type_complexity)] - Loaned(Box) -> BoxFuture<'static, ()> + Send>), - /// A callback with the loaned message and the message info as arguments. + AsyncLoaned(Box) -> BoxFuture<'static, ()> + Send>), + /// An async callback with the loaned message and the message info as arguments. #[allow(clippy::type_complexity)] - LoanedWithMessageInfo( + AsyncLoanedWithMessageInfo( Box, MessageInfo) -> BoxFuture<'static, ()> + Send>, ), } @@ -42,27 +58,53 @@ impl NodeSubscriptionCallback { ) -> Result<(), RclrsError> { let mut evaluate = || { match self { - NodeSubscriptionCallback::Regular(cb) => { + // Sync variants — call directly, no async overhead + NodeSubscriptionCallback::SyncRegular(cb) => { + let (msg, _) = handle.take::()?; + cb(msg); + } + NodeSubscriptionCallback::SyncRegularWithMessageInfo(cb) => { + let (msg, msg_info) = handle.take::()?; + cb(msg, msg_info); + } + NodeSubscriptionCallback::SyncBoxed(cb) => { + let (msg, _) = handle.take_boxed::()?; + cb(msg); + } + NodeSubscriptionCallback::SyncBoxedWithMessageInfo(cb) => { + let (msg, msg_info) = handle.take_boxed::()?; + cb(msg, msg_info); + } + NodeSubscriptionCallback::SyncLoaned(cb) => { + let (msg, _) = handle.take_loaned::()?; + cb(msg); + } + NodeSubscriptionCallback::SyncLoanedWithMessageInfo(cb) => { + let (msg, msg_info) = handle.take_loaned::()?; + cb(msg, msg_info); + } + // Async variants — dispatch through executor task queue + NodeSubscriptionCallback::AsyncRegular(cb) => { let (msg, _) = handle.take::()?; commands.run_async(cb(msg)); } - NodeSubscriptionCallback::RegularWithMessageInfo(cb) => { + NodeSubscriptionCallback::AsyncRegularWithMessageInfo(cb) => { let (msg, msg_info) = handle.take::()?; commands.run_async(cb(msg, msg_info)); } - NodeSubscriptionCallback::Boxed(cb) => { + NodeSubscriptionCallback::AsyncBoxed(cb) => { let (msg, _) = handle.take_boxed::()?; commands.run_async(cb(msg)); } - NodeSubscriptionCallback::BoxedWithMessageInfo(cb) => { + NodeSubscriptionCallback::AsyncBoxedWithMessageInfo(cb) => { let (msg, msg_info) = handle.take_boxed::()?; commands.run_async(cb(msg, msg_info)); } - NodeSubscriptionCallback::Loaned(cb) => { + NodeSubscriptionCallback::AsyncLoaned(cb) => { let (msg, _) = handle.take_loaned::()?; commands.run_async(cb(msg)); } - NodeSubscriptionCallback::LoanedWithMessageInfo(cb) => { + NodeSubscriptionCallback::AsyncLoanedWithMessageInfo(cb) => { let (msg, msg_info) = handle.take_loaned::()?; commands.run_async(cb(msg, msg_info)); } From 9db9079868de6344e93e7d8c2b286112915e8677 Mon Sep 17 00:00:00 2001 From: Mathieu David Date: Sun, 21 Jun 2026 00:25:32 +0200 Subject: [PATCH 2/2] fix: apply rustfmt to subscription callback files CI failed on the fmt colcon test; these files were not formatted with nightly rustfmt as required by the workflow. Co-authored-by: Cursor --- .../src/subscription/into_async_subscription_callback.rs | 9 ++++++--- rclrs/src/subscription/node_subscription_callback.rs | 4 +--- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/rclrs/src/subscription/into_async_subscription_callback.rs b/rclrs/src/subscription/into_async_subscription_callback.rs index c2b853f0a..cd5b16832 100644 --- a/rclrs/src/subscription/into_async_subscription_callback.rs +++ b/rclrs/src/subscription/into_async_subscription_callback.rs @@ -31,7 +31,8 @@ where Out: Future + Send + 'static, { fn into_async_subscription_callback(mut self) -> AnySubscriptionCallback { - NodeSubscriptionCallback::AsyncRegular(Box::new(move |message| Box::pin(self(message)))).into() + NodeSubscriptionCallback::AsyncRegular(Box::new(move |message| Box::pin(self(message)))) + .into() } } @@ -56,7 +57,8 @@ where Out: Future + Send + 'static, { fn into_async_subscription_callback(mut self) -> AnySubscriptionCallback { - NodeSubscriptionCallback::AsyncBoxed(Box::new(move |message| Box::pin(self(message)))).into() + NodeSubscriptionCallback::AsyncBoxed(Box::new(move |message| Box::pin(self(message)))) + .into() } } @@ -81,7 +83,8 @@ where F: Future + Send + 'static, { fn into_async_subscription_callback(mut self) -> AnySubscriptionCallback { - NodeSubscriptionCallback::AsyncLoaned(Box::new(move |message| Box::pin(self(message)))).into() + NodeSubscriptionCallback::AsyncLoaned(Box::new(move |message| Box::pin(self(message)))) + .into() } } diff --git a/rclrs/src/subscription/node_subscription_callback.rs b/rclrs/src/subscription/node_subscription_callback.rs index dd2ff9eb9..2ca138bc9 100644 --- a/rclrs/src/subscription/node_subscription_callback.rs +++ b/rclrs/src/subscription/node_subscription_callback.rs @@ -37,9 +37,7 @@ pub enum NodeSubscriptionCallback { /// An async callback with only the boxed message as an argument. AsyncBoxed(Box) -> BoxFuture<'static, ()> + Send>), /// An async callback with the boxed message and the message info as arguments. - AsyncBoxedWithMessageInfo( - Box, MessageInfo) -> BoxFuture<'static, ()> + Send>, - ), + AsyncBoxedWithMessageInfo(Box, MessageInfo) -> BoxFuture<'static, ()> + Send>), /// An async callback with only the loaned message as an argument. #[allow(clippy::type_complexity)] AsyncLoaned(Box) -> BoxFuture<'static, ()> + Send>),