diff --git a/rclrs/src/subscription/into_async_subscription_callback.rs b/rclrs/src/subscription/into_async_subscription_callback.rs index 5ed7da9b..cd5b1683 100644 --- a/rclrs/src/subscription/into_async_subscription_callback.rs +++ b/rclrs/src/subscription/into_async_subscription_callback.rs @@ -31,7 +31,8 @@ where Out: Future + Send + 'static, { fn into_async_subscription_callback(mut self) -> AnySubscriptionCallback { - NodeSubscriptionCallback::Regular(Box::new(move |message| Box::pin(self(message)))).into() + NodeSubscriptionCallback::AsyncRegular(Box::new(move |message| Box::pin(self(message)))) + .into() } } @@ -42,7 +43,7 @@ where Out: Future + Send + 'static, { fn into_async_subscription_callback(mut self) -> AnySubscriptionCallback { - NodeSubscriptionCallback::RegularWithMessageInfo(Box::new(move |message, info| { + NodeSubscriptionCallback::AsyncRegularWithMessageInfo(Box::new(move |message, info| { Box::pin(self(message, info)) })) .into() @@ -56,7 +57,8 @@ where Out: Future + Send + 'static, { fn into_async_subscription_callback(mut self) -> AnySubscriptionCallback { - NodeSubscriptionCallback::Boxed(Box::new(move |message| Box::pin(self(message)))).into() + NodeSubscriptionCallback::AsyncBoxed(Box::new(move |message| Box::pin(self(message)))) + .into() } } @@ -67,7 +69,7 @@ where F: Future + Send + 'static, { fn into_async_subscription_callback(mut self) -> AnySubscriptionCallback { - NodeSubscriptionCallback::BoxedWithMessageInfo(Box::new(move |message, info| { + NodeSubscriptionCallback::AsyncBoxedWithMessageInfo(Box::new(move |message, info| { Box::pin(self(message, info)) })) .into() @@ -81,7 +83,8 @@ where F: Future + Send + 'static, { fn into_async_subscription_callback(mut self) -> AnySubscriptionCallback { - NodeSubscriptionCallback::Loaned(Box::new(move |message| Box::pin(self(message)))).into() + NodeSubscriptionCallback::AsyncLoaned(Box::new(move |message| Box::pin(self(message)))) + .into() } } @@ -92,7 +95,7 @@ where F: Future + Send + 'static, { fn into_async_subscription_callback(mut self) -> AnySubscriptionCallback { - NodeSubscriptionCallback::LoanedWithMessageInfo(Box::new(move |message, info| { + NodeSubscriptionCallback::AsyncLoanedWithMessageInfo(Box::new(move |message, info| { Box::pin(self(message, info)) })) .into() @@ -110,68 +113,68 @@ mod tests { let cb = |_msg: TestMessage| async {}; assert!(matches!( cb.into_async_subscription_callback(), - AnySubscriptionCallback::Node(NodeSubscriptionCallback::::Regular(_)), + AnySubscriptionCallback::Node(NodeSubscriptionCallback::::AsyncRegular(_)), )); let cb = |_msg: TestMessage, _info: MessageInfo| async {}; assert!(matches!( cb.into_async_subscription_callback(), AnySubscriptionCallback::Node( - NodeSubscriptionCallback::::RegularWithMessageInfo(_) + NodeSubscriptionCallback::::AsyncRegularWithMessageInfo(_) ), )); let cb = |_msg: Box| async {}; assert!(matches!( cb.into_async_subscription_callback(), - AnySubscriptionCallback::Node(NodeSubscriptionCallback::::Boxed(_)), + AnySubscriptionCallback::Node(NodeSubscriptionCallback::::AsyncBoxed(_)), )); let cb = |_msg: Box, _info: MessageInfo| async {}; assert!(matches!( cb.into_async_subscription_callback(), AnySubscriptionCallback::Node( - NodeSubscriptionCallback::::BoxedWithMessageInfo(_) + NodeSubscriptionCallback::::AsyncBoxedWithMessageInfo(_) ), )); let cb = |_msg: ReadOnlyLoanedMessage| async {}; assert!(matches!( cb.into_async_subscription_callback(), - AnySubscriptionCallback::Node(NodeSubscriptionCallback::::Loaned(_)), + AnySubscriptionCallback::Node(NodeSubscriptionCallback::::AsyncLoaned(_)), )); let cb = |_msg: ReadOnlyLoanedMessage, _info: MessageInfo| async {}; assert!(matches!( cb.into_async_subscription_callback(), AnySubscriptionCallback::Node( - NodeSubscriptionCallback::::LoanedWithMessageInfo(_) + NodeSubscriptionCallback::::AsyncLoanedWithMessageInfo(_) ), )); assert!(matches!( test_regular.into_async_subscription_callback(), - AnySubscriptionCallback::Node(NodeSubscriptionCallback::::Regular(_)), + AnySubscriptionCallback::Node(NodeSubscriptionCallback::::AsyncRegular(_)), )); assert!(matches!( test_regular_with_info.into_async_subscription_callback(), AnySubscriptionCallback::Node( - NodeSubscriptionCallback::::RegularWithMessageInfo(_) + NodeSubscriptionCallback::::AsyncRegularWithMessageInfo(_) ), )); assert!(matches!( test_boxed.into_async_subscription_callback(), - AnySubscriptionCallback::Node(NodeSubscriptionCallback::::Boxed(_)), + AnySubscriptionCallback::Node(NodeSubscriptionCallback::::AsyncBoxed(_)), )); assert!(matches!( test_boxed_with_info.into_async_subscription_callback(), AnySubscriptionCallback::Node( - NodeSubscriptionCallback::::BoxedWithMessageInfo(_) + NodeSubscriptionCallback::::AsyncBoxedWithMessageInfo(_) ), )); assert!(matches!( test_loaned.into_async_subscription_callback(), - AnySubscriptionCallback::Node(NodeSubscriptionCallback::::Loaned(_)), + AnySubscriptionCallback::Node(NodeSubscriptionCallback::::AsyncLoaned(_)), )); assert!(matches!( test_loaned_with_info.into_async_subscription_callback(), AnySubscriptionCallback::Node( - NodeSubscriptionCallback::::LoanedWithMessageInfo(_) + NodeSubscriptionCallback::::AsyncLoanedWithMessageInfo(_) ), )); } diff --git a/rclrs/src/subscription/into_node_subscription_callback.rs b/rclrs/src/subscription/into_node_subscription_callback.rs index 927ff38b..e39d53de 100644 --- a/rclrs/src/subscription/into_node_subscription_callback.rs +++ b/rclrs/src/subscription/into_node_subscription_callback.rs @@ -4,17 +4,15 @@ use crate::{ AnySubscriptionCallback, MessageInfo, NodeSubscriptionCallback, ReadOnlyLoanedMessage, }; -use std::sync::Arc; - /// A trait for regular callbacks of subscriptions. /// /// Subscription callbacks support six signatures: -/// - [`Fn`] ( `Message` ) -/// - [`Fn`] ( `Message`, [`MessageInfo`] ) -/// - [`Fn`] ( [`Box`]<`Message`> ) -/// - [`Fn`] ( [`Box`]<`Message`>, [`MessageInfo`] ) -/// - [`Fn`] ( [`ReadOnlyLoanedMessage`]<`Message`> ) -/// - [`Fn`] ( [`ReadOnlyLoanedMessage`]<`Message`>, [`MessageInfo`] ) +/// - [`FnMut`] ( `Message` ) +/// - [`FnMut`] ( `Message`, [`MessageInfo`] ) +/// - [`FnMut`] ( [`Box`]<`Message`> ) +/// - [`FnMut`] ( [`Box`]<`Message`>, [`MessageInfo`] ) +/// - [`FnMut`] ( [`ReadOnlyLoanedMessage`]<`Message`> ) +/// - [`FnMut`] ( [`ReadOnlyLoanedMessage`]<`Message`>, [`MessageInfo`] ) pub trait IntoNodeSubscriptionCallback: Send + 'static where T: Message, @@ -28,102 +26,60 @@ where impl IntoNodeSubscriptionCallback for Func where T: Message, - Func: Fn(T) + Send + Sync + 'static, + Func: FnMut(T) + Send + 'static, { fn into_node_subscription_callback(self) -> AnySubscriptionCallback { - let func = Arc::new(self); - NodeSubscriptionCallback::Regular(Box::new(move |message| { - let f = Arc::clone(&func); - Box::pin(async move { - f(message); - }) - })) - .into() + NodeSubscriptionCallback::SyncRegular(Box::new(self)).into() } } impl IntoNodeSubscriptionCallback for Func where T: Message, - Func: Fn(T, MessageInfo) + Send + Sync + 'static, + Func: FnMut(T, MessageInfo) + Send + 'static, { fn into_node_subscription_callback(self) -> AnySubscriptionCallback { - let func = Arc::new(self); - NodeSubscriptionCallback::RegularWithMessageInfo(Box::new(move |message, info| { - let f = Arc::clone(&func); - Box::pin(async move { - f(message, info); - }) - })) - .into() + NodeSubscriptionCallback::SyncRegularWithMessageInfo(Box::new(self)).into() } } impl IntoNodeSubscriptionCallback,)> for Func where T: Message, - Func: Fn(Box) + Send + Sync + 'static, + Func: FnMut(Box) + Send + 'static, { fn into_node_subscription_callback(self) -> AnySubscriptionCallback { - let func = Arc::new(self); - NodeSubscriptionCallback::Boxed(Box::new(move |message| { - let f = Arc::clone(&func); - Box::pin(async move { - f(message); - }) - })) - .into() + NodeSubscriptionCallback::SyncBoxed(Box::new(self)).into() } } impl IntoNodeSubscriptionCallback, MessageInfo)> for Func where T: Message, - Func: Fn(Box, MessageInfo) + Send + Sync + 'static, + Func: FnMut(Box, MessageInfo) + Send + 'static, { fn into_node_subscription_callback(self) -> AnySubscriptionCallback { - let func = Arc::new(self); - NodeSubscriptionCallback::BoxedWithMessageInfo(Box::new(move |message, info| { - let f = Arc::clone(&func); - Box::pin(async move { - f(message, info); - }) - })) - .into() + NodeSubscriptionCallback::SyncBoxedWithMessageInfo(Box::new(self)).into() } } impl IntoNodeSubscriptionCallback,)> for Func where T: Message, - Func: Fn(ReadOnlyLoanedMessage) + Send + Sync + 'static, + Func: FnMut(ReadOnlyLoanedMessage) + Send + 'static, { fn into_node_subscription_callback(self) -> AnySubscriptionCallback { - let func = Arc::new(self); - NodeSubscriptionCallback::Loaned(Box::new(move |message| { - let f = Arc::clone(&func); - Box::pin(async move { - f(message); - }) - })) - .into() + NodeSubscriptionCallback::SyncLoaned(Box::new(self)).into() } } impl IntoNodeSubscriptionCallback, MessageInfo)> for Func where T: Message, - Func: Fn(ReadOnlyLoanedMessage, MessageInfo) + Send + Sync + 'static, + Func: FnMut(ReadOnlyLoanedMessage, MessageInfo) + Send + 'static, { fn into_node_subscription_callback(self) -> AnySubscriptionCallback { - let func = Arc::new(self); - NodeSubscriptionCallback::LoanedWithMessageInfo(Box::new(move |message, info| { - let f = Arc::clone(&func); - Box::pin(async move { - f(message, info); - }) - })) - .into() + NodeSubscriptionCallback::SyncLoanedWithMessageInfo(Box::new(self)).into() } } @@ -138,68 +94,68 @@ mod tests { let cb = |_msg: TestMessage| {}; assert!(matches!( cb.into_node_subscription_callback(), - AnySubscriptionCallback::Node(NodeSubscriptionCallback::::Regular(_)), + AnySubscriptionCallback::Node(NodeSubscriptionCallback::::SyncRegular(_)), )); let cb = |_msg: TestMessage, _info: MessageInfo| {}; assert!(matches!( cb.into_node_subscription_callback(), AnySubscriptionCallback::Node( - NodeSubscriptionCallback::::RegularWithMessageInfo(_) + NodeSubscriptionCallback::::SyncRegularWithMessageInfo(_) ) )); let cb = |_msg: Box| {}; assert!(matches!( cb.into_node_subscription_callback(), - AnySubscriptionCallback::Node(NodeSubscriptionCallback::::Boxed(_)), + AnySubscriptionCallback::Node(NodeSubscriptionCallback::::SyncBoxed(_)), )); let cb = |_msg: Box, _info: MessageInfo| {}; assert!(matches!( cb.into_node_subscription_callback(), AnySubscriptionCallback::Node( - NodeSubscriptionCallback::::BoxedWithMessageInfo(_) + NodeSubscriptionCallback::::SyncBoxedWithMessageInfo(_) ), )); let cb = |_msg: ReadOnlyLoanedMessage| {}; assert!(matches!( cb.into_node_subscription_callback(), - AnySubscriptionCallback::Node(NodeSubscriptionCallback::::Loaned(_)), + AnySubscriptionCallback::Node(NodeSubscriptionCallback::::SyncLoaned(_)), )); let cb = |_msg: ReadOnlyLoanedMessage, _info: MessageInfo| {}; assert!(matches!( cb.into_node_subscription_callback(), AnySubscriptionCallback::Node( - NodeSubscriptionCallback::::LoanedWithMessageInfo(_) + NodeSubscriptionCallback::::SyncLoanedWithMessageInfo(_) ), )); assert!(matches!( test_regular.into_node_subscription_callback(), - AnySubscriptionCallback::Node(NodeSubscriptionCallback::::Regular(_)), + AnySubscriptionCallback::Node(NodeSubscriptionCallback::::SyncRegular(_)), )); assert!(matches!( test_regular_with_info.into_node_subscription_callback(), AnySubscriptionCallback::Node( - NodeSubscriptionCallback::::RegularWithMessageInfo(_) + NodeSubscriptionCallback::::SyncRegularWithMessageInfo(_) ), )); assert!(matches!( test_boxed.into_node_subscription_callback(), - AnySubscriptionCallback::Node(NodeSubscriptionCallback::::Boxed(_)), + AnySubscriptionCallback::Node(NodeSubscriptionCallback::::SyncBoxed(_)), )); assert!(matches!( test_boxed_with_info.into_node_subscription_callback(), AnySubscriptionCallback::Node( - NodeSubscriptionCallback::::BoxedWithMessageInfo(_) + NodeSubscriptionCallback::::SyncBoxedWithMessageInfo(_) ), )); assert!(matches!( test_loaned.into_node_subscription_callback(), - AnySubscriptionCallback::Node(NodeSubscriptionCallback::::Loaned(_)), + AnySubscriptionCallback::Node(NodeSubscriptionCallback::::SyncLoaned(_)), )); assert!(matches!( test_loaned_with_info.into_node_subscription_callback(), AnySubscriptionCallback::Node( - NodeSubscriptionCallback::::LoanedWithMessageInfo(_) + NodeSubscriptionCallback::::SyncLoanedWithMessageInfo(_) ), )); } diff --git a/rclrs/src/subscription/node_subscription_callback.rs b/rclrs/src/subscription/node_subscription_callback.rs index 0b9b6347..2ca138bc 100644 --- a/rclrs/src/subscription/node_subscription_callback.rs +++ b/rclrs/src/subscription/node_subscription_callback.rs @@ -16,20 +16,34 @@ use std::sync::Arc; /// [1]: crate::IntoNodeSubscriptionCallback /// [2]: crate::IntoAsyncSubscriptionCallback pub enum NodeSubscriptionCallback { - /// A callback with only the message as an argument. - Regular(Box BoxFuture<'static, ()> + Send>), - /// A callback with the message and the message info as arguments. - RegularWithMessageInfo(Box BoxFuture<'static, ()> + Send>), - /// A callback with only the boxed message as an argument. - Boxed(Box) -> BoxFuture<'static, ()> + Send>), - /// A callback with the boxed message and the message info as arguments. - BoxedWithMessageInfo(Box, MessageInfo) -> BoxFuture<'static, ()> + Send>), - /// A callback with only the loaned message as an argument. + // Sync variants — callback is called directly, no async wrapping. + /// A synchronous callback with only the message as an argument. + SyncRegular(Box), + /// A synchronous callback with the message and the message info as arguments. + SyncRegularWithMessageInfo(Box), + /// A synchronous callback with only the boxed message as an argument. + SyncBoxed(Box) + Send>), + /// A synchronous callback with the boxed message and the message info as arguments. + SyncBoxedWithMessageInfo(Box, MessageInfo) + Send>), + /// A synchronous callback with only the loaned message as an argument. + SyncLoaned(Box) + Send>), + /// A synchronous callback with the loaned message and the message info as arguments. + SyncLoanedWithMessageInfo(Box, MessageInfo) + Send>), + // Async variants — callback returns a future, dispatched via the executor task queue. + /// An async callback with only the message as an argument. + AsyncRegular(Box BoxFuture<'static, ()> + Send>), + /// An async callback with the message and the message info as arguments. + AsyncRegularWithMessageInfo(Box BoxFuture<'static, ()> + Send>), + /// An async callback with only the boxed message as an argument. + AsyncBoxed(Box) -> BoxFuture<'static, ()> + Send>), + /// An async callback with the boxed message and the message info as arguments. + AsyncBoxedWithMessageInfo(Box, MessageInfo) -> BoxFuture<'static, ()> + Send>), + /// An async callback with only the loaned message as an argument. #[allow(clippy::type_complexity)] - Loaned(Box) -> BoxFuture<'static, ()> + Send>), - /// A callback with the loaned message and the message info as arguments. + AsyncLoaned(Box) -> BoxFuture<'static, ()> + Send>), + /// An async callback with the loaned message and the message info as arguments. #[allow(clippy::type_complexity)] - LoanedWithMessageInfo( + AsyncLoanedWithMessageInfo( Box, MessageInfo) -> BoxFuture<'static, ()> + Send>, ), } @@ -42,27 +56,53 @@ impl NodeSubscriptionCallback { ) -> Result<(), RclrsError> { let mut evaluate = || { match self { - NodeSubscriptionCallback::Regular(cb) => { + // Sync variants — call directly, no async overhead + NodeSubscriptionCallback::SyncRegular(cb) => { + let (msg, _) = handle.take::()?; + cb(msg); + } + NodeSubscriptionCallback::SyncRegularWithMessageInfo(cb) => { + let (msg, msg_info) = handle.take::()?; + cb(msg, msg_info); + } + NodeSubscriptionCallback::SyncBoxed(cb) => { + let (msg, _) = handle.take_boxed::()?; + cb(msg); + } + NodeSubscriptionCallback::SyncBoxedWithMessageInfo(cb) => { + let (msg, msg_info) = handle.take_boxed::()?; + cb(msg, msg_info); + } + NodeSubscriptionCallback::SyncLoaned(cb) => { + let (msg, _) = handle.take_loaned::()?; + cb(msg); + } + NodeSubscriptionCallback::SyncLoanedWithMessageInfo(cb) => { + let (msg, msg_info) = handle.take_loaned::()?; + cb(msg, msg_info); + } + // Async variants — dispatch through executor task queue + NodeSubscriptionCallback::AsyncRegular(cb) => { let (msg, _) = handle.take::()?; commands.run_async(cb(msg)); } - NodeSubscriptionCallback::RegularWithMessageInfo(cb) => { + NodeSubscriptionCallback::AsyncRegularWithMessageInfo(cb) => { let (msg, msg_info) = handle.take::()?; commands.run_async(cb(msg, msg_info)); } - NodeSubscriptionCallback::Boxed(cb) => { + NodeSubscriptionCallback::AsyncBoxed(cb) => { let (msg, _) = handle.take_boxed::()?; commands.run_async(cb(msg)); } - NodeSubscriptionCallback::BoxedWithMessageInfo(cb) => { + NodeSubscriptionCallback::AsyncBoxedWithMessageInfo(cb) => { let (msg, msg_info) = handle.take_boxed::()?; commands.run_async(cb(msg, msg_info)); } - NodeSubscriptionCallback::Loaned(cb) => { + NodeSubscriptionCallback::AsyncLoaned(cb) => { let (msg, _) = handle.take_loaned::()?; commands.run_async(cb(msg)); } - NodeSubscriptionCallback::LoanedWithMessageInfo(cb) => { + NodeSubscriptionCallback::AsyncLoanedWithMessageInfo(cb) => { let (msg, msg_info) = handle.take_loaned::()?; commands.run_async(cb(msg, msg_info)); }