Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 21 additions & 18 deletions rclrs/src/subscription/into_async_subscription_callback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ where
Out: Future<Output = ()> + Send + 'static,
{
fn into_async_subscription_callback(mut self) -> AnySubscriptionCallback<T, ()> {
NodeSubscriptionCallback::Regular(Box::new(move |message| Box::pin(self(message)))).into()
NodeSubscriptionCallback::AsyncRegular(Box::new(move |message| Box::pin(self(message))))
.into()
}
}

Expand All @@ -42,7 +43,7 @@ where
Out: Future<Output = ()> + Send + 'static,
{
fn into_async_subscription_callback(mut self) -> AnySubscriptionCallback<T, ()> {
NodeSubscriptionCallback::RegularWithMessageInfo(Box::new(move |message, info| {
NodeSubscriptionCallback::AsyncRegularWithMessageInfo(Box::new(move |message, info| {
Box::pin(self(message, info))
}))
.into()
Expand All @@ -56,7 +57,8 @@ where
Out: Future<Output = ()> + Send + 'static,
{
fn into_async_subscription_callback(mut self) -> AnySubscriptionCallback<T, ()> {
NodeSubscriptionCallback::Boxed(Box::new(move |message| Box::pin(self(message)))).into()
NodeSubscriptionCallback::AsyncBoxed(Box::new(move |message| Box::pin(self(message))))
.into()
}
}

Expand All @@ -67,7 +69,7 @@ where
F: Future<Output = ()> + Send + 'static,
{
fn into_async_subscription_callback(mut self) -> AnySubscriptionCallback<T, ()> {
NodeSubscriptionCallback::BoxedWithMessageInfo(Box::new(move |message, info| {
NodeSubscriptionCallback::AsyncBoxedWithMessageInfo(Box::new(move |message, info| {
Box::pin(self(message, info))
}))
.into()
Expand All @@ -81,7 +83,8 @@ where
F: Future<Output = ()> + Send + 'static,
{
fn into_async_subscription_callback(mut self) -> AnySubscriptionCallback<T, ()> {
NodeSubscriptionCallback::Loaned(Box::new(move |message| Box::pin(self(message)))).into()
NodeSubscriptionCallback::AsyncLoaned(Box::new(move |message| Box::pin(self(message))))
.into()
}
}

Expand All @@ -92,7 +95,7 @@ where
F: Future<Output = ()> + Send + 'static,
{
fn into_async_subscription_callback(mut self) -> AnySubscriptionCallback<T, ()> {
NodeSubscriptionCallback::LoanedWithMessageInfo(Box::new(move |message, info| {
NodeSubscriptionCallback::AsyncLoanedWithMessageInfo(Box::new(move |message, info| {
Box::pin(self(message, info))
}))
.into()
Expand All @@ -110,68 +113,68 @@ mod tests {
let cb = |_msg: TestMessage| async {};
assert!(matches!(
cb.into_async_subscription_callback(),
AnySubscriptionCallback::Node(NodeSubscriptionCallback::<TestMessage>::Regular(_)),
AnySubscriptionCallback::Node(NodeSubscriptionCallback::<TestMessage>::AsyncRegular(_)),
));
let cb = |_msg: TestMessage, _info: MessageInfo| async {};
assert!(matches!(
cb.into_async_subscription_callback(),
AnySubscriptionCallback::Node(
NodeSubscriptionCallback::<TestMessage>::RegularWithMessageInfo(_)
NodeSubscriptionCallback::<TestMessage>::AsyncRegularWithMessageInfo(_)
),
));
let cb = |_msg: Box<TestMessage>| async {};
assert!(matches!(
cb.into_async_subscription_callback(),
AnySubscriptionCallback::Node(NodeSubscriptionCallback::<TestMessage>::Boxed(_)),
AnySubscriptionCallback::Node(NodeSubscriptionCallback::<TestMessage>::AsyncBoxed(_)),
));
let cb = |_msg: Box<TestMessage>, _info: MessageInfo| async {};
assert!(matches!(
cb.into_async_subscription_callback(),
AnySubscriptionCallback::Node(
NodeSubscriptionCallback::<TestMessage>::BoxedWithMessageInfo(_)
NodeSubscriptionCallback::<TestMessage>::AsyncBoxedWithMessageInfo(_)
),
));
let cb = |_msg: ReadOnlyLoanedMessage<TestMessage>| async {};
assert!(matches!(
cb.into_async_subscription_callback(),
AnySubscriptionCallback::Node(NodeSubscriptionCallback::<TestMessage>::Loaned(_)),
AnySubscriptionCallback::Node(NodeSubscriptionCallback::<TestMessage>::AsyncLoaned(_)),
));
let cb = |_msg: ReadOnlyLoanedMessage<TestMessage>, _info: MessageInfo| async {};
assert!(matches!(
cb.into_async_subscription_callback(),
AnySubscriptionCallback::Node(
NodeSubscriptionCallback::<TestMessage>::LoanedWithMessageInfo(_)
NodeSubscriptionCallback::<TestMessage>::AsyncLoanedWithMessageInfo(_)
),
));

assert!(matches!(
test_regular.into_async_subscription_callback(),
AnySubscriptionCallback::Node(NodeSubscriptionCallback::<TestMessage>::Regular(_)),
AnySubscriptionCallback::Node(NodeSubscriptionCallback::<TestMessage>::AsyncRegular(_)),
));
assert!(matches!(
test_regular_with_info.into_async_subscription_callback(),
AnySubscriptionCallback::Node(
NodeSubscriptionCallback::<TestMessage>::RegularWithMessageInfo(_)
NodeSubscriptionCallback::<TestMessage>::AsyncRegularWithMessageInfo(_)
),
));
assert!(matches!(
test_boxed.into_async_subscription_callback(),
AnySubscriptionCallback::Node(NodeSubscriptionCallback::<TestMessage>::Boxed(_)),
AnySubscriptionCallback::Node(NodeSubscriptionCallback::<TestMessage>::AsyncBoxed(_)),
));
assert!(matches!(
test_boxed_with_info.into_async_subscription_callback(),
AnySubscriptionCallback::Node(
NodeSubscriptionCallback::<TestMessage>::BoxedWithMessageInfo(_)
NodeSubscriptionCallback::<TestMessage>::AsyncBoxedWithMessageInfo(_)
),
));
assert!(matches!(
test_loaned.into_async_subscription_callback(),
AnySubscriptionCallback::Node(NodeSubscriptionCallback::<TestMessage>::Loaned(_)),
AnySubscriptionCallback::Node(NodeSubscriptionCallback::<TestMessage>::AsyncLoaned(_)),
));
assert!(matches!(
test_loaned_with_info.into_async_subscription_callback(),
AnySubscriptionCallback::Node(
NodeSubscriptionCallback::<TestMessage>::LoanedWithMessageInfo(_)
NodeSubscriptionCallback::<TestMessage>::AsyncLoanedWithMessageInfo(_)
),
));
}
Expand Down
104 changes: 30 additions & 74 deletions rclrs/src/subscription/into_node_subscription_callback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,15 @@ use crate::{
AnySubscriptionCallback, MessageInfo, NodeSubscriptionCallback, ReadOnlyLoanedMessage,
};

use std::sync::Arc;

/// A trait for regular callbacks of subscriptions.
///
/// Subscription callbacks support six signatures:
/// - [`Fn`] ( `Message` )
/// - [`Fn`] ( `Message`, [`MessageInfo`] )
/// - [`Fn`] ( [`Box`]<`Message`> )
/// - [`Fn`] ( [`Box`]<`Message`>, [`MessageInfo`] )
/// - [`Fn`] ( [`ReadOnlyLoanedMessage`]<`Message`> )
/// - [`Fn`] ( [`ReadOnlyLoanedMessage`]<`Message`>, [`MessageInfo`] )
/// - [`FnMut`] ( `Message` )
/// - [`FnMut`] ( `Message`, [`MessageInfo`] )
/// - [`FnMut`] ( [`Box`]<`Message`> )
/// - [`FnMut`] ( [`Box`]<`Message`>, [`MessageInfo`] )
/// - [`FnMut`] ( [`ReadOnlyLoanedMessage`]<`Message`> )
/// - [`FnMut`] ( [`ReadOnlyLoanedMessage`]<`Message`>, [`MessageInfo`] )
pub trait IntoNodeSubscriptionCallback<T, Args>: Send + 'static
where
T: Message,
Expand All @@ -28,102 +26,60 @@ where
impl<T, Func> IntoNodeSubscriptionCallback<T, (T,)> for Func
where
T: Message,
Func: Fn(T) + Send + Sync + 'static,
Func: FnMut(T) + Send + 'static,
{
fn into_node_subscription_callback(self) -> AnySubscriptionCallback<T, ()> {
let func = Arc::new(self);
NodeSubscriptionCallback::Regular(Box::new(move |message| {
let f = Arc::clone(&func);
Box::pin(async move {
f(message);
})
}))
.into()
NodeSubscriptionCallback::SyncRegular(Box::new(self)).into()
}
}

impl<T, Func> IntoNodeSubscriptionCallback<T, (T, MessageInfo)> for Func
where
T: Message,
Func: Fn(T, MessageInfo) + Send + Sync + 'static,
Func: FnMut(T, MessageInfo) + Send + 'static,
{
fn into_node_subscription_callback(self) -> AnySubscriptionCallback<T, ()> {
let func = Arc::new(self);
NodeSubscriptionCallback::RegularWithMessageInfo(Box::new(move |message, info| {
let f = Arc::clone(&func);
Box::pin(async move {
f(message, info);
})
}))
.into()
NodeSubscriptionCallback::SyncRegularWithMessageInfo(Box::new(self)).into()
}
}

impl<T, Func> IntoNodeSubscriptionCallback<T, (Box<T>,)> for Func
where
T: Message,
Func: Fn(Box<T>) + Send + Sync + 'static,
Func: FnMut(Box<T>) + Send + 'static,
{
fn into_node_subscription_callback(self) -> AnySubscriptionCallback<T, ()> {
let func = Arc::new(self);
NodeSubscriptionCallback::Boxed(Box::new(move |message| {
let f = Arc::clone(&func);
Box::pin(async move {
f(message);
})
}))
.into()
NodeSubscriptionCallback::SyncBoxed(Box::new(self)).into()
}
}

impl<T, Func> IntoNodeSubscriptionCallback<T, (Box<T>, MessageInfo)> for Func
where
T: Message,
Func: Fn(Box<T>, MessageInfo) + Send + Sync + 'static,
Func: FnMut(Box<T>, MessageInfo) + Send + 'static,
{
fn into_node_subscription_callback(self) -> AnySubscriptionCallback<T, ()> {
let func = Arc::new(self);
NodeSubscriptionCallback::BoxedWithMessageInfo(Box::new(move |message, info| {
let f = Arc::clone(&func);
Box::pin(async move {
f(message, info);
})
}))
.into()
NodeSubscriptionCallback::SyncBoxedWithMessageInfo(Box::new(self)).into()
}
}

impl<T, Func> IntoNodeSubscriptionCallback<T, (ReadOnlyLoanedMessage<T>,)> for Func
where
T: Message,
Func: Fn(ReadOnlyLoanedMessage<T>) + Send + Sync + 'static,
Func: FnMut(ReadOnlyLoanedMessage<T>) + Send + 'static,
{
fn into_node_subscription_callback(self) -> AnySubscriptionCallback<T, ()> {
let func = Arc::new(self);
NodeSubscriptionCallback::Loaned(Box::new(move |message| {
let f = Arc::clone(&func);
Box::pin(async move {
f(message);
})
}))
.into()
NodeSubscriptionCallback::SyncLoaned(Box::new(self)).into()
}
}

impl<T, Func> IntoNodeSubscriptionCallback<T, (ReadOnlyLoanedMessage<T>, MessageInfo)> for Func
where
T: Message,
Func: Fn(ReadOnlyLoanedMessage<T>, MessageInfo) + Send + Sync + 'static,
Func: FnMut(ReadOnlyLoanedMessage<T>, MessageInfo) + Send + 'static,
{
fn into_node_subscription_callback(self) -> AnySubscriptionCallback<T, ()> {
let func = Arc::new(self);
NodeSubscriptionCallback::LoanedWithMessageInfo(Box::new(move |message, info| {
let f = Arc::clone(&func);
Box::pin(async move {
f(message, info);
})
}))
.into()
NodeSubscriptionCallback::SyncLoanedWithMessageInfo(Box::new(self)).into()
}
}

Expand All @@ -138,68 +94,68 @@ mod tests {
let cb = |_msg: TestMessage| {};
assert!(matches!(
cb.into_node_subscription_callback(),
AnySubscriptionCallback::Node(NodeSubscriptionCallback::<TestMessage>::Regular(_)),
AnySubscriptionCallback::Node(NodeSubscriptionCallback::<TestMessage>::SyncRegular(_)),
));
let cb = |_msg: TestMessage, _info: MessageInfo| {};
assert!(matches!(
cb.into_node_subscription_callback(),
AnySubscriptionCallback::Node(
NodeSubscriptionCallback::<TestMessage>::RegularWithMessageInfo(_)
NodeSubscriptionCallback::<TestMessage>::SyncRegularWithMessageInfo(_)
)
));
let cb = |_msg: Box<TestMessage>| {};
assert!(matches!(
cb.into_node_subscription_callback(),
AnySubscriptionCallback::Node(NodeSubscriptionCallback::<TestMessage>::Boxed(_)),
AnySubscriptionCallback::Node(NodeSubscriptionCallback::<TestMessage>::SyncBoxed(_)),
));
let cb = |_msg: Box<TestMessage>, _info: MessageInfo| {};
assert!(matches!(
cb.into_node_subscription_callback(),
AnySubscriptionCallback::Node(
NodeSubscriptionCallback::<TestMessage>::BoxedWithMessageInfo(_)
NodeSubscriptionCallback::<TestMessage>::SyncBoxedWithMessageInfo(_)
),
));
let cb = |_msg: ReadOnlyLoanedMessage<TestMessage>| {};
assert!(matches!(
cb.into_node_subscription_callback(),
AnySubscriptionCallback::Node(NodeSubscriptionCallback::<TestMessage>::Loaned(_)),
AnySubscriptionCallback::Node(NodeSubscriptionCallback::<TestMessage>::SyncLoaned(_)),
));
let cb = |_msg: ReadOnlyLoanedMessage<TestMessage>, _info: MessageInfo| {};
assert!(matches!(
cb.into_node_subscription_callback(),
AnySubscriptionCallback::Node(
NodeSubscriptionCallback::<TestMessage>::LoanedWithMessageInfo(_)
NodeSubscriptionCallback::<TestMessage>::SyncLoanedWithMessageInfo(_)
),
));

assert!(matches!(
test_regular.into_node_subscription_callback(),
AnySubscriptionCallback::Node(NodeSubscriptionCallback::<TestMessage>::Regular(_)),
AnySubscriptionCallback::Node(NodeSubscriptionCallback::<TestMessage>::SyncRegular(_)),
));
assert!(matches!(
test_regular_with_info.into_node_subscription_callback(),
AnySubscriptionCallback::Node(
NodeSubscriptionCallback::<TestMessage>::RegularWithMessageInfo(_)
NodeSubscriptionCallback::<TestMessage>::SyncRegularWithMessageInfo(_)
),
));
assert!(matches!(
test_boxed.into_node_subscription_callback(),
AnySubscriptionCallback::Node(NodeSubscriptionCallback::<TestMessage>::Boxed(_)),
AnySubscriptionCallback::Node(NodeSubscriptionCallback::<TestMessage>::SyncBoxed(_)),
));
assert!(matches!(
test_boxed_with_info.into_node_subscription_callback(),
AnySubscriptionCallback::Node(
NodeSubscriptionCallback::<TestMessage>::BoxedWithMessageInfo(_)
NodeSubscriptionCallback::<TestMessage>::SyncBoxedWithMessageInfo(_)
),
));
assert!(matches!(
test_loaned.into_node_subscription_callback(),
AnySubscriptionCallback::Node(NodeSubscriptionCallback::<TestMessage>::Loaned(_)),
AnySubscriptionCallback::Node(NodeSubscriptionCallback::<TestMessage>::SyncLoaned(_)),
));
assert!(matches!(
test_loaned_with_info.into_node_subscription_callback(),
AnySubscriptionCallback::Node(
NodeSubscriptionCallback::<TestMessage>::LoanedWithMessageInfo(_)
NodeSubscriptionCallback::<TestMessage>::SyncLoanedWithMessageInfo(_)
),
));
}
Expand Down
Loading
Loading