diff --git a/rclcpp/src/rclcpp/executors/events_cbg_executor/first_in_first_out_scheduler.cpp b/rclcpp/src/rclcpp/executors/events_cbg_executor/first_in_first_out_scheduler.cpp index 16aba93835..4db4b7029d 100644 --- a/rclcpp/src/rclcpp/executors/events_cbg_executor/first_in_first_out_scheduler.cpp +++ b/rclcpp/src/rclcpp/executors/events_cbg_executor/first_in_first_out_scheduler.cpp @@ -145,10 +145,11 @@ get_next_ready_entity(GlobalEventIdProvider::MonotonicId max_id) return std::nullopt; } -std::unique_ptr FirstInFirstOutScheduler:: -get_handle_for_callback_group(const rclcpp::CallbackGroup::SharedPtr &/*callback_group*/) +std::unique_ptr +FirstInFirstOutScheduler::get_handle_for_callback_group( + const rclcpp::CallbackGroup::SharedPtr & callback_group) { - return std::make_unique(*this); + return std::make_unique(*this, callback_group->type()); } CBGScheduler::ExecutableEntityWithInfo FirstInFirstOutScheduler::get_next_ready_entity_intern() @@ -162,6 +163,11 @@ CBGScheduler::ExecutableEntityWithInfo FirstInFirstOutScheduler::get_next_ready_ std::optional ret = ready_cbg->get_next_ready_entity(); + + if (ready_cbg->get_type() == CallbackGroupType::Reentrant && ready_cbg->has_ready_entities()) { + ready_callback_groups.push_back(ready_cbg); + } + if(ret) { return CBGScheduler::ExecutableEntityWithInfo{.entity = std::move(ret), .moreEntitiesReady = !ready_callback_groups.empty()}; @@ -186,9 +192,14 @@ CBGScheduler::ExecutableEntityWithInfo FirstInFirstOutScheduler::get_next_ready_ std::optional ret = ready_cbg->get_next_ready_entity(max_id); if(ret) { - ready_callback_groups.erase(it); - return CBGScheduler::ExecutableEntityWithInfo{.entity = std::move(ret), - .moreEntitiesReady = !ready_callback_groups.empty()}; + if ( + ready_cbg->get_type() == CallbackGroupType::MutuallyExclusive || + !ready_cbg->has_ready_entities()) + { + ready_callback_groups.erase(it); + } + return CBGScheduler::ExecutableEntityWithInfo{ + .entity = std::move(ret), .moreEntitiesReady = !ready_callback_groups.empty()}; } } diff --git a/rclcpp/src/rclcpp/executors/events_cbg_executor/first_in_first_out_scheduler.hpp b/rclcpp/src/rclcpp/executors/events_cbg_executor/first_in_first_out_scheduler.hpp index 0847e09457..df55e22f29 100644 --- a/rclcpp/src/rclcpp/executors/events_cbg_executor/first_in_first_out_scheduler.hpp +++ b/rclcpp/src/rclcpp/executors/events_cbg_executor/first_in_first_out_scheduler.hpp @@ -32,8 +32,10 @@ namespace cbg_executor struct FirstInFirstOutCallbackGroupHandle final : public CBGScheduler::CallbackGroupHandle { public: - explicit FirstInFirstOutCallbackGroupHandle(CBGScheduler & scheduler) - : CallbackGroupHandle(scheduler) {} + explicit FirstInFirstOutCallbackGroupHandle(CBGScheduler & scheduler, CallbackGroupType type) + : CallbackGroupHandle(scheduler, type) + { + } std::function get_ready_callback_for_entity( const rclcpp::SubscriptionBase::WeakPtr & entity) final; diff --git a/rclcpp/src/rclcpp/executors/events_cbg_executor/scheduler.hpp b/rclcpp/src/rclcpp/executors/events_cbg_executor/scheduler.hpp index 9134f802fa..ab429e56c5 100644 --- a/rclcpp/src/rclcpp/executors/events_cbg_executor/scheduler.hpp +++ b/rclcpp/src/rclcpp/executors/events_cbg_executor/scheduler.hpp @@ -59,8 +59,10 @@ class CBGScheduler struct CallbackGroupHandle { - explicit CallbackGroupHandle(CBGScheduler & scheduler) - : scheduler(scheduler) {} + explicit CallbackGroupHandle(CBGScheduler & scheduler, CallbackGroupType type) + : scheduler(scheduler), type(type) + { + } CallbackGroupHandle(const CallbackGroupHandle &) = delete; CallbackGroupHandle(CallbackGroupHandle &&) = delete; @@ -90,7 +92,9 @@ class CBGScheduler { { std::lock_guard l(ready_mutex); - not_ready = false; + if (type != CallbackGroupType::Reentrant) { + not_ready = false; + } if(!has_ready_entities()) { idle = true; @@ -101,6 +105,8 @@ class CBGScheduler scheduler.callback_group_ready(this, false); } + CallbackGroupType get_type() {return type;} + bool is_ready(); protected: @@ -153,7 +159,9 @@ class CBGScheduler */ void mark_as_executing() { - not_ready = true; + if (type != CallbackGroupType::Reentrant) { + not_ready = true; + } } std::mutex ready_mutex; @@ -164,6 +172,9 @@ class CBGScheduler // true, if nothing is beeing executed, and there are no pending events bool idle = true; + + // type of the underlying callback group + CallbackGroupType type; }; struct ExecutableEntity @@ -220,7 +231,25 @@ class CBGScheduler { { std::lock_guard l(ready_callback_groups_mutex); - ready_callback_groups.push_back(handle); + + // Reentrant callback groups might not be removed from the queue when one of + // their entities starts executing. + if (handle->get_type() == CallbackGroupType::Reentrant) { + bool already_in_queue = false; + + for (auto it = ready_callback_groups.begin(); it != ready_callback_groups.end(); it++) { + if (*it == handle) { + already_in_queue = true; + break; + } + } + + if (!already_in_queue) { + ready_callback_groups.push_back(handle); + } + } else { + ready_callback_groups.push_back(handle); + } } if(callback_group_was_idle) { diff --git a/rclcpp/test/rclcpp/CMakeLists.txt b/rclcpp/test/rclcpp/CMakeLists.txt index 868329f5cd..83d827fc6e 100644 --- a/rclcpp/test/rclcpp/CMakeLists.txt +++ b/rclcpp/test/rclcpp/CMakeLists.txt @@ -535,6 +535,12 @@ if(TARGET test_multi_threaded_executor) target_link_libraries(test_multi_threaded_executor ${PROJECT_NAME}) endif() +ament_add_ros_isolated_gtest(test_events_cbg_executor_reentrant executors/test_events_cbg_executor_reentrant.cpp + APPEND_LIBRARY_DIRS "${append_library_dirs}") +if(TARGET test_events_cbg_executor_reentrant) + target_link_libraries(test_events_cbg_executor_reentrant ${PROJECT_NAME} test_msgs::test_msgs) +endif() + ament_add_ros_isolated_gtest(test_entities_collector executors/test_entities_collector.cpp APPEND_LIBRARY_DIRS "${append_library_dirs}" TIMEOUT 120) if(TARGET test_entities_collector) diff --git a/rclcpp/test/rclcpp/executors/test_events_cbg_executor_reentrant.cpp b/rclcpp/test/rclcpp/executors/test_events_cbg_executor_reentrant.cpp new file mode 100644 index 0000000000..4b1d25285c --- /dev/null +++ b/rclcpp/test/rclcpp/executors/test_events_cbg_executor_reentrant.cpp @@ -0,0 +1,123 @@ +// Copyright 2018 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include +#include +#include +#include +#include +#include + +#include "rclcpp/rclcpp.hpp" +#include "test_msgs/msg/empty.hpp" + +using namespace std::chrono_literals; + +class TestEventsCBGExecutorReentrant : public testing::Test +{ +protected: + static void SetUpTestCase() {rclcpp::init(0, nullptr);} + + static void TearDownTestCase() {rclcpp::shutdown();} +}; + +/* + * Test that multiple callbacks from the same reentrant callback group can + * be executed at the same time. + * + * The test creates two subscribers in a single reentrant callback group that + * listen to the same topic. Whichever subscriber executes first waits for + * the other to also start executing. If this waiting results in a timeout, + * then we know that the second subscriber wasn't able to execute because the + * executor handled the callback group incorrectly. + * + * Related issue: https://github.com/ros2/rclcpp/issues/3175 + */ +TEST_F(TestEventsCBGExecutorReentrant, reentract_callback_group_runs_concurrently) +{ + auto node = std::make_shared("test_events_cbg_executor_reentrant"); + + std::mutex rendezvous_mutex; + std::condition_variable rendezvous_cv; + auto rendezvous = [&](bool & own_started, const bool & other_started) { + std::unique_lock lock(rendezvous_mutex); + own_started = true; + rendezvous_cv.notify_all(); + return rendezvous_cv.wait_for(lock, 2s, [&other_started]() {return other_started;}); + }; + + rclcpp::SubscriptionOptions sub_opt; + auto cbg = node->create_callback_group(rclcpp::CallbackGroupType::Reentrant); + sub_opt.callback_group = cbg; + + bool sub1_started = false; + bool sub2_started = false; + std::atomic_bool sub1_finished{false}; + std::atomic_bool sub2_finished{false}; + std::atomic_bool sub1_timed_out{false}; + std::atomic_bool sub2_timed_out{false}; + + auto sub_1 = node->create_subscription( + "empty", 10, + [&](test_msgs::msg::Empty) { + if (!rendezvous(sub1_started, sub2_started)) { + sub1_timed_out = true; + } + sub1_finished = true; + }, + sub_opt); + + auto sub_2 = node->create_subscription( + "empty", 10, + [&](test_msgs::msg::Empty) { + if (!rendezvous(sub2_started, sub1_started)) { + sub2_timed_out = true; + } + sub2_finished = true; + }, + sub_opt); + + auto pub = node->create_publisher("empty", 10); + + auto executor = rclcpp::executors::EventsCBGExecutor(rclcpp::ExecutorOptions(), 2u); + ASSERT_GT(executor.get_number_of_threads(), 1u); + + executor.add_node(node); + std::thread spin_thread([&executor]() {executor.spin();}); + + /* + * Publish (and re-publish, in case discovery hasn't completed yet) until + * both callbacks have run to completion + */ + test_msgs::msg::Empty msg; + auto deadline = std::chrono::steady_clock::now() + 5s; + while (std::chrono::steady_clock::now() < deadline && + !(sub1_finished.load() && sub2_finished.load())) + { + pub->publish(msg); + std::this_thread::sleep_for(500ms); + } + + executor.cancel(); + spin_thread.join(); + + EXPECT_TRUE(sub1_finished.load()) << "sub 1 never ran"; + EXPECT_TRUE(sub2_finished.load()) << "sub 2 never ran"; + EXPECT_FALSE(sub1_timed_out.load()) << "sub 1 timed out waiting for sub 2 to start running -- " + "the reentrant callback group appears to be serialized"; + EXPECT_FALSE(sub2_timed_out.load()) << "sub 2 timed out waiting for sub 1 to start running -- " + "the reentrant callback group appears to be serialized"; +}