diff --git a/rclcpp_async/CMakeLists.txt b/rclcpp_async/CMakeLists.txt index d8788a9..ee6b8fa 100644 --- a/rclcpp_async/CMakeLists.txt +++ b/rclcpp_async/CMakeLists.txt @@ -107,6 +107,9 @@ if(BUILD_TESTING) ament_add_ros_isolated_gtest(test_goal_stream test/test_goal_stream.cpp) target_link_libraries(test_goal_stream ${PROJECT_NAME} ${example_interfaces_TARGETS}) + ament_add_ros_isolated_gtest(test_send_goal_cancel test/test_send_goal_cancel.cpp) + target_link_libraries(test_send_goal_cancel ${PROJECT_NAME} ${example_interfaces_TARGETS}) + ament_add_ros_isolated_gtest(test_create_service test/test_create_service.cpp) target_link_libraries(test_create_service ${PROJECT_NAME} ${std_srvs_TARGETS}) diff --git a/rclcpp_async/include/rclcpp_async/co_context.hpp b/rclcpp_async/include/rclcpp_async/co_context.hpp index 0604958..fc96ed7 100644 --- a/rclcpp_async/include/rclcpp_async/co_context.hpp +++ b/rclcpp_async/include/rclcpp_async/co_context.hpp @@ -164,7 +164,12 @@ class CoContext : public Executor typename ServiceT::Request::SharedPtr request) { return SendRequestAwaiter{ - *this, std::move(client), std::move(request), {}, {}, {}, false, false}; + *this, + std::move(client), + std::move(request), + {}, + {}, + std::make_shared::State>()}; } SleepAwaiter sleep(std::chrono::nanoseconds duration) @@ -208,7 +213,13 @@ class CoContext : public Executor size_t max_depth = kDefaultStreamDepth) { return SendGoalAwaiter{ - *this, std::move(client), std::move(goal), nullptr, max_depth, {}, {}, {}, false, false}; + *this, + std::move(client), + std::move(goal), + max_depth, + {}, + {}, + std::make_shared::State>()}; } template @@ -342,21 +353,26 @@ class CoContext : public Executor template void SendRequestAwaiter::await_suspend(std::coroutine_handle<> h) { + // Capture `state` by value (shared_ptr) so the callback stays valid even + // if the awaiter is destroyed (e.g. cancellation) before the response + // arrives. When `state->done` is already set by the cancel path, the + // response callback bails out before touching the (dangling) handle. client->async_send_request( - request, [this, h](typename rclcpp::Client::SharedFuture future) { - if (done) { + request, + [state = state_, &ctx = ctx, h](typename rclcpp::Client::SharedFuture future) { + if (state->done) { return; } - done = true; - response = future.get(); + state->done = true; + state->response = future.get(); ctx.resume(h); }); register_cancel( - cancel_cb_, token, ctx, h, [this]() { return done; }, - [this]() { - done = true; - cancelled = true; + cancel_cb_, token, ctx, h, [state = state_]() { return state->done; }, + [state = state_]() { + state->done = true; + state->cancelled = true; }); } @@ -447,51 +463,58 @@ void GoalStream::CancelAwaiter::await_suspend(std::coroutine_handle<> h template void SendGoalAwaiter::await_suspend(std::coroutine_handle<> h) { - stream = std::make_shared>(ctx, max_depth); + state_->stream = std::make_shared>(ctx, max_depth); typename rclcpp_action::Client::SendGoalOptions options; - options.feedback_callback = [s = stream](auto, const auto & feedback) { + options.feedback_callback = [s = state_->stream](auto, const auto & feedback) { GoalEvent event; event.type = GoalEvent::Type::kFeedback; event.feedback = feedback; s->push(std::move(event)); }; - options.result_callback = [s = stream](const auto & wrapped_result) { + options.result_callback = [s = state_->stream](const auto & wrapped_result) { GoalEvent event; event.type = GoalEvent::Type::kComplete; event.result = wrapped_result; s->push(std::move(event)); }; - options.goal_response_callback = [this, h](const auto & goal_handle) { - if (done) { + // Capture `state` by value (shared_ptr) so the callback stays valid even + // if the awaiter is destroyed before goal_response arrives (e.g. the + // awaiting coroutine was cancelled between async_send_goal() and the + // server's accept/reject). When `state->done` is already set by the + // cancel path, the callback bails out before touching the (dangling) + // coroutine handle. + options.goal_response_callback = [state = state_, client = client, &ctx = ctx, + h](const auto & goal_handle) { + if (state->done) { return; } - done = true; + state->done = true; if (!goal_handle) { - result = Result>>::Error("goal rejected"); + state->result = Result>>::Error("goal rejected"); } else { - stream->goal_handle_ = goal_handle; - stream->client_ = client; - result = Result>>::Ok(stream); + state->stream->goal_handle_ = goal_handle; + state->stream->client_ = client; + state->result = Result>>::Ok(state->stream); } // Use post() instead of resume() to defer coroutine resumption. // rclcpp_action holds goal_requests_mutex during this callback // (Jazzy bug: https://github.com/ros2/rclcpp/issues/2796). // Resuming synchronously here would deadlock if the coroutine // immediately calls async_send_goal again. - ctx.post([&ctx = ctx, h]() { ctx.resume(h); }); + ctx.post([&ctx, h]() { ctx.resume(h); }); }; client->async_send_goal(goal, options); register_cancel( - cancel_cb_, token, ctx, h, [this]() { return done; }, - [this]() { - done = true; - cancelled = true; + cancel_cb_, token, ctx, h, [state = state_]() { return state->done; }, + [state = state_]() { + state->done = true; + state->cancelled = true; }); } diff --git a/rclcpp_async/include/rclcpp_async/goal_stream.hpp b/rclcpp_async/include/rclcpp_async/goal_stream.hpp index f4caba8..1f84534 100644 --- a/rclcpp_async/include/rclcpp_async/goal_stream.hpp +++ b/rclcpp_async/include/rclcpp_async/goal_stream.hpp @@ -162,24 +162,33 @@ class GoalStream template struct SendGoalAwaiter { + // Mutable state is held via shared_ptr so that callbacks registered on + // the rclcpp_action::Client (notably goal_response_callback) can safely + // observe `done` even if the awaiter (and its coroutine frame) has been + // destroyed after the awaiting coroutine was cancelled before the goal + // response arrived. + struct State + { + bool done = false; + bool cancelled = false; + std::shared_ptr> stream; + Result>> result; + }; + CoContext & ctx; typename rclcpp_action::Client::SharedPtr client; typename ActionT::Goal goal; - std::shared_ptr> stream; size_t max_depth; std::stop_token token; std::shared_ptr cancel_cb_; - Result>> result; - bool done = false; - - bool cancelled = false; + std::shared_ptr state_; void set_token(std::stop_token t) { token = std::move(t); } bool await_ready() { if (token.stop_requested()) { - cancelled = true; + state_->cancelled = true; return true; } return false; @@ -190,10 +199,10 @@ struct SendGoalAwaiter Result>> await_resume() { cancel_cb_.reset(); - if (cancelled) { + if (state_->cancelled) { throw CancelledException{}; } - return std::move(result); + return std::move(state_->result); } }; diff --git a/rclcpp_async/include/rclcpp_async/send_request_awaiter.hpp b/rclcpp_async/include/rclcpp_async/send_request_awaiter.hpp index 067b909..8703aa8 100644 --- a/rclcpp_async/include/rclcpp_async/send_request_awaiter.hpp +++ b/rclcpp_async/include/rclcpp_async/send_request_awaiter.hpp @@ -35,21 +35,30 @@ struct SendRequestAwaiter { using Response = typename ServiceT::Response::SharedPtr; + // Mutable state is held via shared_ptr so that the response callback + // registered on the rclcpp::Client can safely observe `done` even if + // the awaiter (and its coroutine frame) has been destroyed after the + // awaiting coroutine was cancelled. + struct State + { + bool done = false; + bool cancelled = false; + Response response; + }; + CoContext & ctx; typename rclcpp::Client::SharedPtr client; typename ServiceT::Request::SharedPtr request; std::stop_token token; - Response response; std::shared_ptr cancel_cb_; - bool cancelled = false; - bool done = false; + std::shared_ptr state_; void set_token(std::stop_token t) { token = std::move(t); } bool await_ready() { if (token.stop_requested()) { - cancelled = true; + state_->cancelled = true; return true; } return false; @@ -61,10 +70,10 @@ struct SendRequestAwaiter Response await_resume() { cancel_cb_.reset(); - if (cancelled) { + if (state_->cancelled) { throw CancelledException{}; } - return std::move(response); + return std::move(state_->response); } }; diff --git a/rclcpp_async/test/test_send_goal_cancel.cpp b/rclcpp_async/test/test_send_goal_cancel.cpp new file mode 100644 index 0000000..233a015 --- /dev/null +++ b/rclcpp_async/test/test_send_goal_cancel.cpp @@ -0,0 +1,180 @@ +// Copyright 2026 Tamaki Nishino +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include +#include +#include +#include +#include +#include +#include + +#include "rclcpp_async/rclcpp_async.hpp" + +using namespace rclcpp_async; // NOLINT(build/namespaces) +using namespace std::chrono_literals; // NOLINT(build/namespaces) +using Fibonacci = example_interfaces::action::Fibonacci; +using GoalHandle = rclcpp_action::ServerGoalHandle; + +// Regression test for a use-after-free in SendGoalAwaiter: if the awaiting +// coroutine is cancelled between async_send_goal() and the arrival of the +// server's goal response, the goal_response_callback must not touch the +// (already-destroyed) awaiter/coroutine frame when the response finally +// arrives. +class SendGoalCancelTest : public ::testing::Test +{ +protected: + static void SetUpTestSuite() { rclcpp::init(0, nullptr); } + static void TearDownTestSuite() { rclcpp::shutdown(); } + + void SetUp() override + { + client_node_ = std::make_shared("test_send_goal_cancel_client"); + server_node_ = std::make_shared("test_send_goal_cancel_server"); + ctx_ = std::make_unique(*client_node_); + client_executor_.add_node(client_node_); + server_executor_.add_node(server_node_); + action_client_ = + rclcpp_action::create_client(client_node_, "test_send_goal_cancel_action"); + } + + void TearDown() override + { + server_thread_stop_ = true; + if (server_thread_.joinable()) { + server_thread_.join(); + } + action_server_.reset(); + action_client_.reset(); + ctx_.reset(); + client_node_.reset(); + server_node_.reset(); + } + + // Stand up an action server whose goal_callback sleeps for `delay` before + // returning ACCEPT. The goal_callback runs on the server's executor + // thread; by spinning the server on its own thread, the client-side + // executor is free to process cancellation during that window. + void start_server_with_delayed_accept(std::chrono::milliseconds delay) + { + action_server_ = rclcpp_action::create_server( + server_node_, "test_send_goal_cancel_action", + [delay](const rclcpp_action::GoalUUID &, std::shared_ptr) { + std::this_thread::sleep_for(delay); + return rclcpp_action::GoalResponse::ACCEPT_AND_EXECUTE; + }, + [](const std::shared_ptr) { return rclcpp_action::CancelResponse::ACCEPT; }, + [](const std::shared_ptr goal_handle) { + std::thread([goal_handle]() { + auto result = std::make_shared(); + result->sequence = {0, 1}; + goal_handle->succeed(result); + }).detach(); + }); + + server_thread_ = std::thread([this]() { + while (!server_thread_stop_) { + server_executor_.spin_some(); + std::this_thread::sleep_for(1ms); + } + }); + } + + void wait_for_server() + { + for (int i = 0; i < 100; i++) { + client_executor_.spin_some(); + std::this_thread::sleep_for(10ms); + if (action_client_->action_server_is_ready()) { + break; + } + } + ASSERT_TRUE(action_client_->action_server_is_ready()); + } + + void spin_client_for(std::chrono::milliseconds duration) + { + auto end = std::chrono::steady_clock::now() + duration; + while (std::chrono::steady_clock::now() < end) { + client_executor_.spin_some(); + std::this_thread::sleep_for(1ms); + } + } + + void spin_client_until_done(Task & task, std::chrono::milliseconds timeout) + { + auto deadline = std::chrono::steady_clock::now() + timeout; + while (!task.handle.done() && std::chrono::steady_clock::now() < deadline) { + client_executor_.spin_some(); + std::this_thread::sleep_for(1ms); + } + } + + rclcpp::Node::SharedPtr client_node_; + rclcpp::Node::SharedPtr server_node_; + std::unique_ptr ctx_; + rclcpp::executors::SingleThreadedExecutor client_executor_; + rclcpp::executors::SingleThreadedExecutor server_executor_; + std::thread server_thread_; + std::atomic server_thread_stop_{false}; + rclcpp_action::Client::SharedPtr action_client_; + rclcpp_action::Server::SharedPtr action_server_; +}; + +TEST_F(SendGoalCancelTest, CancelWhileAwaitingGoalResponseDoesNotCrash) +{ + // Server delays its ACCEPT decision by 300ms. The client cancels before + // that window elapses, so the goal_response arrives after the awaiting + // coroutine has already unwound with CancelledException. + start_server_with_delayed_accept(300ms); + wait_for_server(); + + bool cancelled_caught = false; + auto coro = [&]() -> Task { + Fibonacci::Goal goal; + goal.order = 6; + try { + co_await ctx_->send_goal(action_client_, goal); + } catch (const CancelledException &) { + cancelled_caught = true; + } + }; + + { + auto task = ctx_->create_task(coro()); + + // Let the goal request reach the server (which is now sleeping inside + // goal_callback). The server thread spins independently. + spin_client_for(50ms); + + // Cancel while the server is still sleeping — no goal_response yet. + task.cancel(); + + spin_client_until_done(task, 500ms); + ASSERT_TRUE(task.handle.done()); + EXPECT_TRUE(cancelled_caught); + // task destructor here frees the coroutine frame that holds the awaiter. + } + + // After the frame is gone, the server's belated goal_response arrives and + // the rclcpp_action::Client still holds a reference-captured callback into + // (pre-fix) freed memory. Keep the client executor alive well past the + // server's delay so the callback fires. Pre-fix this dereferences the + // destroyed awaiter (heap-use-after-free under ASan; SIGSEGV in the wild). + spin_client_for(1000ms); + + SUCCEED(); +}