Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions rclcpp_async/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,9 @@ if(BUILD_TESTING)
ament_add_ros_isolated_gtest(test_goal_stream test/test_goal_stream.cpp)
target_link_libraries(test_goal_stream ${PROJECT_NAME} ${example_interfaces_TARGETS})

ament_add_ros_isolated_gtest(test_send_goal_cancel test/test_send_goal_cancel.cpp)
target_link_libraries(test_send_goal_cancel ${PROJECT_NAME} ${example_interfaces_TARGETS})

ament_add_ros_isolated_gtest(test_create_service test/test_create_service.cpp)
target_link_libraries(test_create_service ${PROJECT_NAME} ${std_srvs_TARGETS})

Expand Down
73 changes: 48 additions & 25 deletions rclcpp_async/include/rclcpp_async/co_context.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,12 @@ class CoContext : public Executor
typename ServiceT::Request::SharedPtr request)
{
return SendRequestAwaiter<ServiceT>{
*this, std::move(client), std::move(request), {}, {}, {}, false, false};
*this,
std::move(client),
std::move(request),
{},
{},
std::make_shared<typename SendRequestAwaiter<ServiceT>::State>()};
}

SleepAwaiter sleep(std::chrono::nanoseconds duration)
Expand Down Expand Up @@ -208,7 +213,13 @@ class CoContext : public Executor
size_t max_depth = kDefaultStreamDepth)
{
return SendGoalAwaiter<ActionT>{
*this, std::move(client), std::move(goal), nullptr, max_depth, {}, {}, {}, false, false};
*this,
std::move(client),
std::move(goal),
max_depth,
{},
{},
std::make_shared<typename SendGoalAwaiter<ActionT>::State>()};
}

template <typename ServiceT, typename CallbackT>
Expand Down Expand Up @@ -342,21 +353,26 @@ class CoContext : public Executor
template <typename ServiceT>
void SendRequestAwaiter<ServiceT>::await_suspend(std::coroutine_handle<> h)
{
// Capture `state` by value (shared_ptr) so the callback stays valid even
// if the awaiter is destroyed (e.g. cancellation) before the response
// arrives. When `state->done` is already set by the cancel path, the
// response callback bails out before touching the (dangling) handle.
client->async_send_request(
request, [this, h](typename rclcpp::Client<ServiceT>::SharedFuture future) {
if (done) {
request,
[state = state_, &ctx = ctx, h](typename rclcpp::Client<ServiceT>::SharedFuture future) {
if (state->done) {
return;
}
done = true;
response = future.get();
state->done = true;
state->response = future.get();
ctx.resume(h);
});

register_cancel(
cancel_cb_, token, ctx, h, [this]() { return done; },
[this]() {
done = true;
cancelled = true;
cancel_cb_, token, ctx, h, [state = state_]() { return state->done; },
[state = state_]() {
state->done = true;
state->cancelled = true;
});
}

Expand Down Expand Up @@ -447,51 +463,58 @@ void GoalStream<ActionT>::CancelAwaiter::await_suspend(std::coroutine_handle<> h
template <typename ActionT>
void SendGoalAwaiter<ActionT>::await_suspend(std::coroutine_handle<> h)
{
stream = std::make_shared<GoalStream<ActionT>>(ctx, max_depth);
state_->stream = std::make_shared<GoalStream<ActionT>>(ctx, max_depth);

typename rclcpp_action::Client<ActionT>::SendGoalOptions options;

options.feedback_callback = [s = stream](auto, const auto & feedback) {
options.feedback_callback = [s = state_->stream](auto, const auto & feedback) {
GoalEvent<ActionT> event;
event.type = GoalEvent<ActionT>::Type::kFeedback;
event.feedback = feedback;
s->push(std::move(event));
};

options.result_callback = [s = stream](const auto & wrapped_result) {
options.result_callback = [s = state_->stream](const auto & wrapped_result) {
GoalEvent<ActionT> event;
event.type = GoalEvent<ActionT>::Type::kComplete;
event.result = wrapped_result;
s->push(std::move(event));
};

options.goal_response_callback = [this, h](const auto & goal_handle) {
if (done) {
// Capture `state` by value (shared_ptr) so the callback stays valid even
// if the awaiter is destroyed before goal_response arrives (e.g. the
// awaiting coroutine was cancelled between async_send_goal() and the
// server's accept/reject). When `state->done` is already set by the
// cancel path, the callback bails out before touching the (dangling)
// coroutine handle.
options.goal_response_callback = [state = state_, client = client, &ctx = ctx,
h](const auto & goal_handle) {
if (state->done) {
return;
}
done = true;
state->done = true;
if (!goal_handle) {
result = Result<std::shared_ptr<GoalStream<ActionT>>>::Error("goal rejected");
state->result = Result<std::shared_ptr<GoalStream<ActionT>>>::Error("goal rejected");
} else {
stream->goal_handle_ = goal_handle;
stream->client_ = client;
result = Result<std::shared_ptr<GoalStream<ActionT>>>::Ok(stream);
state->stream->goal_handle_ = goal_handle;
state->stream->client_ = client;
state->result = Result<std::shared_ptr<GoalStream<ActionT>>>::Ok(state->stream);
}
// Use post() instead of resume() to defer coroutine resumption.
// rclcpp_action holds goal_requests_mutex during this callback
// (Jazzy bug: https://github.com/ros2/rclcpp/issues/2796).
// Resuming synchronously here would deadlock if the coroutine
// immediately calls async_send_goal again.
ctx.post([&ctx = ctx, h]() { ctx.resume(h); });
ctx.post([&ctx, h]() { ctx.resume(h); });
};

client->async_send_goal(goal, options);

register_cancel(
cancel_cb_, token, ctx, h, [this]() { return done; },
[this]() {
done = true;
cancelled = true;
cancel_cb_, token, ctx, h, [state = state_]() { return state->done; },
[state = state_]() {
state->done = true;
state->cancelled = true;
});
}

Expand Down
25 changes: 17 additions & 8 deletions rclcpp_async/include/rclcpp_async/goal_stream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -162,24 +162,33 @@ class GoalStream
template <typename ActionT>
struct SendGoalAwaiter
{
// Mutable state is held via shared_ptr so that callbacks registered on
// the rclcpp_action::Client (notably goal_response_callback) can safely
// observe `done` even if the awaiter (and its coroutine frame) has been
// destroyed after the awaiting coroutine was cancelled before the goal
// response arrived.
struct State
{
bool done = false;
bool cancelled = false;
std::shared_ptr<GoalStream<ActionT>> stream;
Result<std::shared_ptr<GoalStream<ActionT>>> result;
};

CoContext & ctx;
typename rclcpp_action::Client<ActionT>::SharedPtr client;
typename ActionT::Goal goal;
std::shared_ptr<GoalStream<ActionT>> stream;
size_t max_depth;
std::stop_token token;
std::shared_ptr<StopCb> cancel_cb_;
Result<std::shared_ptr<GoalStream<ActionT>>> result;
bool done = false;

bool cancelled = false;
std::shared_ptr<State> state_;

void set_token(std::stop_token t) { token = std::move(t); }

bool await_ready()
{
if (token.stop_requested()) {
cancelled = true;
state_->cancelled = true;
return true;
}
return false;
Expand All @@ -190,10 +199,10 @@ struct SendGoalAwaiter
Result<std::shared_ptr<GoalStream<ActionT>>> await_resume()
{
cancel_cb_.reset();
if (cancelled) {
if (state_->cancelled) {
throw CancelledException{};
}
return std::move(result);
return std::move(state_->result);
}
};

Expand Down
21 changes: 15 additions & 6 deletions rclcpp_async/include/rclcpp_async/send_request_awaiter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,21 +35,30 @@ struct SendRequestAwaiter
{
using Response = typename ServiceT::Response::SharedPtr;

// Mutable state is held via shared_ptr so that the response callback
// registered on the rclcpp::Client can safely observe `done` even if
// the awaiter (and its coroutine frame) has been destroyed after the
// awaiting coroutine was cancelled.
struct State
{
bool done = false;
bool cancelled = false;
Response response;
};

CoContext & ctx;
typename rclcpp::Client<ServiceT>::SharedPtr client;
typename ServiceT::Request::SharedPtr request;
std::stop_token token;
Response response;
std::shared_ptr<StopCb> cancel_cb_;
bool cancelled = false;
bool done = false;
std::shared_ptr<State> state_;

void set_token(std::stop_token t) { token = std::move(t); }

bool await_ready()
{
if (token.stop_requested()) {
cancelled = true;
state_->cancelled = true;
return true;
}
return false;
Expand All @@ -61,10 +70,10 @@ struct SendRequestAwaiter
Response await_resume()
{
cancel_cb_.reset();
if (cancelled) {
if (state_->cancelled) {
throw CancelledException{};
}
return std::move(response);
return std::move(state_->response);
}
};

Expand Down
Loading
Loading