diff --git a/mission/tacc/pipeline_inspection/pipeline_end_detector/CMakeLists.txt b/mission/tacc/pipeline_inspection/pipeline_end_detector/CMakeLists.txt new file mode 100644 index 0000000..c703e25 --- /dev/null +++ b/mission/tacc/pipeline_inspection/pipeline_end_detector/CMakeLists.txt @@ -0,0 +1,55 @@ +cmake_minimum_required(VERSION 3.8) +project(pipeline_end_detector) + +if(NOT CMAKE_CXX_STANDARD) + set(CMAKE_CXX_STANDARD 20) +endif() + +if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang") + add_compile_options(-Wall -Wextra -Wpedantic) +endif() + +find_package(ament_cmake REQUIRED) +find_package(rclcpp REQUIRED) +find_package(std_msgs REQUIRED) +find_package(std_srvs REQUIRED) +find_package(geometry_msgs REQUIRED) +find_package(vortex_msgs REQUIRED) +find_package(vortex_utils_ros REQUIRED) + +include_directories(include) + +add_executable(pipeline_end_detector_node + src/main.cpp + src/pipeline_end_detector_node.cpp +) + +target_include_directories(pipeline_end_detector_node + PRIVATE + $ +) + +ament_target_dependencies(pipeline_end_detector_node + rclcpp + std_msgs + std_srvs + vortex_utils_ros +) + +install( + TARGETS pipeline_end_detector_node + DESTINATION lib/${PROJECT_NAME} +) + +install( + DIRECTORY include/ + DESTINATION include +) + +install(DIRECTORY + launch + config + DESTINATION share/${PROJECT_NAME}/ +) + +ament_package() diff --git a/mission/tacc/pipeline_inspection/pipeline_end_detector/README.md b/mission/tacc/pipeline_inspection/pipeline_end_detector/README.md new file mode 100644 index 0000000..89298c0 --- /dev/null +++ b/mission/tacc/pipeline_inspection/pipeline_end_detector/README.md @@ -0,0 +1,50 @@ +# Pipeline End Detector + +A ROS2 node that subscribes to the end-of-pipeline classification output and, after a configurable number of consecutive detections, sends a trigger service call to the pipeline inspection FSM to signal that the pipeline has ended. + +## Overview + +The node listens to a `std_msgs/UInt8` topic published by the end-of-pipeline classifier: + +- `data == 1` — Class 1: end of pipeline detected +- `data == 0` — Class 0: continue following + +Once `detection_threshold` consecutive `1`s are received, the node calls the `pipeline_inspection_fsm/pipeline_finished` service. If the call fails, the counter resets and the node retries on the next streak. + +## ROS2 Interface + +### Subscriptions + +| Topic | Type | Description | +|---|---|---| +| `classification_output` | `std_msgs/UInt8` | End-of-pipeline classification output | + +### Service Clients + +| Service | Type | Description | +|---|---|---| +| `pipeline_inspection_fsm/pipeline_finished` | `std_srvs/Trigger` | Signals the FSM that the pipeline has ended | + +## Configuration + +### `pipeline_end_detector_config.yaml` + +```yaml +detection_threshold: 10 # consecutive detections required before triggering +topics: + detection: "classification_output" + end_of_pipeline_service: "pipeline_inspection_fsm/pipeline_finished" +``` + +## Running + +```bash +ros2 run pipeline_end_detector pipeline_end_detector_node \ + --ros-args --params-file path/to/pipeline_end_detector_config.yaml +``` + +Or via the launch file: + +```bash +ros2 launch pipeline_end_detector pipeline_end_detector.launch.py +``` diff --git a/mission/tacc/pipeline_inspection/pipeline_end_detector/config/pipeline_end_detector_config.yaml b/mission/tacc/pipeline_inspection/pipeline_end_detector/config/pipeline_end_detector_config.yaml new file mode 100644 index 0000000..7988aba --- /dev/null +++ b/mission/tacc/pipeline_inspection/pipeline_end_detector/config/pipeline_end_detector_config.yaml @@ -0,0 +1,12 @@ +/**: + ros__parameters: + # Number of consecutive detections required before triggering the service call + detection_threshold: 10 + topics: + # Topic published by the end-of-pipeline detector model (std_msgs/UInt8) + # data == 1 → Class 1: end of pipeline | data == 0 → Class 0: continue following + detection: "classification_output" + # Service name of the pipeline inspection FSM trigger + end_of_pipeline_service: "pipeline_inspection_fsm/pipeline_finished" + # Service name to activate detection on this node + start_detection_service: "pipeline_end_detector/start_detection" diff --git a/mission/tacc/pipeline_inspection/pipeline_end_detector/include/pipeline_end_detector/pipeline_end_detector_node.hpp b/mission/tacc/pipeline_inspection/pipeline_end_detector/include/pipeline_end_detector/pipeline_end_detector_node.hpp new file mode 100644 index 0000000..400d4e8 --- /dev/null +++ b/mission/tacc/pipeline_inspection/pipeline_end_detector/include/pipeline_end_detector/pipeline_end_detector_node.hpp @@ -0,0 +1,36 @@ +#pragma once + +#include +#include +#include + +namespace pipeline_end_detector { + +class PipelineEndDetectorNode : public rclcpp::Node { + public: + explicit PipelineEndDetectorNode(const rclcpp::NodeOptions& options); + + private: + void declare_parameters(); + void setup_pubsub(); + void detection_callback(const std_msgs::msg::UInt8::SharedPtr msg); + void call_end_of_pipeline_service(); + void start_detection_callback( + const std_srvs::srv::Trigger::Request::SharedPtr request, + std_srvs::srv::Trigger::Response::SharedPtr response); + + rclcpp::Subscription::SharedPtr detection_sub_; + rclcpp::Client::SharedPtr end_of_pipeline_client_; + rclcpp::Service::SharedPtr start_detection_server_; + + int consecutive_detections_{0}; // number of consecutive Class 1 (end of + // pipeline) detections so far + int detection_threshold_; // number of consecutive detections required to + // trigger the service call + bool detection_active_{ + false}; // set to true when FSM signals pipeline following has started + bool service_called_{ + false}; // prevents the service from being called more than once +}; + +} // namespace pipeline_end_detector diff --git a/mission/tacc/pipeline_inspection/pipeline_end_detector/launch/pipeline_end_detector.launch.py b/mission/tacc/pipeline_inspection/pipeline_end_detector/launch/pipeline_end_detector.launch.py new file mode 100644 index 0000000..c3f3ea9 --- /dev/null +++ b/mission/tacc/pipeline_inspection/pipeline_end_detector/launch/pipeline_end_detector.launch.py @@ -0,0 +1,37 @@ +import os + +from ament_index_python.packages import get_package_share_directory +from auv_setup.launch_arg_common import ( + declare_drone_and_namespace_args, + resolve_drone_and_namespace, +) +from launch import LaunchDescription +from launch.actions import OpaqueFunction +from launch_ros.actions import Node + + +def launch_setup(context, *args, **kwargs): + _, namespace = resolve_drone_and_namespace(context) + + config = os.path.join( + get_package_share_directory('pipeline_end_detector'), + 'config', + 'pipeline_end_detector_config.yaml', + ) + + node = Node( + package='pipeline_end_detector', + executable='pipeline_end_detector_node', + name='pipeline_end_detector_node', + namespace=namespace, + parameters=[config], + output='screen', + ) + + return [node] + + +def generate_launch_description() -> LaunchDescription: + return LaunchDescription( + declare_drone_and_namespace_args() + [OpaqueFunction(function=launch_setup)] + ) diff --git a/mission/tacc/pipeline_inspection/pipeline_end_detector/package.xml b/mission/tacc/pipeline_inspection/pipeline_end_detector/package.xml new file mode 100644 index 0000000..5dfccf8 --- /dev/null +++ b/mission/tacc/pipeline_inspection/pipeline_end_detector/package.xml @@ -0,0 +1,26 @@ + + + + pipeline_end_detector + 0.0.0 + + Subscribes to the end-of-pipeline detection topic and, after a configurable + number of consecutive detections, triggers a service call to the pipeline + inspection FSM to signal that the pipeline has ended. + + vortex + MIT + + ament_cmake + + rclcpp + std_msgs + std_srvs + geometry_msgs + vortex_msgs + vortex_utils_ros + + + ament_cmake + + diff --git a/mission/tacc/pipeline_inspection/pipeline_end_detector/src/main.cpp b/mission/tacc/pipeline_inspection/pipeline_end_detector/src/main.cpp new file mode 100644 index 0000000..8f2f03d --- /dev/null +++ b/mission/tacc/pipeline_inspection/pipeline_end_detector/src/main.cpp @@ -0,0 +1,11 @@ +#include +#include "pipeline_end_detector/pipeline_end_detector_node.hpp" + +int main(int argc, char* argv[]) { + rclcpp::init(argc, argv); + rclcpp::spin( + std::make_shared( + rclcpp::NodeOptions())); + rclcpp::shutdown(); + return 0; +} diff --git a/mission/tacc/pipeline_inspection/pipeline_end_detector/src/pipeline_end_detector_node.cpp b/mission/tacc/pipeline_inspection/pipeline_end_detector/src/pipeline_end_detector_node.cpp new file mode 100644 index 0000000..b719e07 --- /dev/null +++ b/mission/tacc/pipeline_inspection/pipeline_end_detector/src/pipeline_end_detector_node.cpp @@ -0,0 +1,109 @@ +#include "pipeline_end_detector/pipeline_end_detector_node.hpp" + +#include "vortex/utils/ros/qos_profiles.hpp" + +namespace pipeline_end_detector { + +PipelineEndDetectorNode::PipelineEndDetectorNode( + const rclcpp::NodeOptions& options) + : rclcpp::Node("pipeline_end_detector_node", options) { + declare_parameters(); + + detection_threshold_ = + static_cast(get_parameter("detection_threshold").as_int()); + + setup_pubsub(); + + RCLCPP_INFO( + get_logger(), + "PipelineEndDetectorNode started. threshold=%d, awaiting activation on " + "'%s'", + detection_threshold_, + get_parameter("topics.start_detection_service").as_string().c_str()); +} + +void PipelineEndDetectorNode::declare_parameters() { + declare_parameter("topics.detection"); + declare_parameter("topics.end_of_pipeline_service"); + declare_parameter("topics.start_detection_service"); + declare_parameter("detection_threshold"); +} + +void PipelineEndDetectorNode::setup_pubsub() { + const auto sensor_qos = vortex::utils::qos_profiles::reliable_profile(10); + detection_sub_ = create_subscription( + get_parameter("topics.detection").as_string(), sensor_qos, + std::bind(&PipelineEndDetectorNode::detection_callback, this, + std::placeholders::_1)); + + end_of_pipeline_client_ = create_client( + get_parameter("topics.end_of_pipeline_service").as_string()); + + start_detection_server_ = create_service( + get_parameter("topics.start_detection_service").as_string(), + std::bind(&PipelineEndDetectorNode::start_detection_callback, this, + std::placeholders::_1, std::placeholders::_2)); +} + +void PipelineEndDetectorNode::start_detection_callback( + const std_srvs::srv::Trigger::Request::SharedPtr /*request*/, + std_srvs::srv::Trigger::Response::SharedPtr response) { + detection_active_ = true; + response->success = true; + response->message = "Pipeline end detection activated."; + RCLCPP_INFO(get_logger(), "Pipeline following started — detection active."); +} + +void PipelineEndDetectorNode::detection_callback( + const std_msgs::msg::UInt8::SharedPtr msg) { + if (!detection_active_ || service_called_) { + return; + } + + if (msg->data > 0) { + ++consecutive_detections_; + RCLCPP_DEBUG(get_logger(), "Consecutive detections: %d / %d", + consecutive_detections_, detection_threshold_); + } else { + if (consecutive_detections_ > 0) { + RCLCPP_DEBUG(get_logger(), + "Detection streak broken, resetting counter."); + } + consecutive_detections_ = 0; + } + + if (consecutive_detections_ >= detection_threshold_) { + call_end_of_pipeline_service(); + } +} + +void PipelineEndDetectorNode::call_end_of_pipeline_service() { + if (!end_of_pipeline_client_->service_is_ready()) { + RCLCPP_WARN(get_logger(), + "End-of-pipeline service not available, skipping call."); + return; + } + + service_called_ = true; + + auto request = std::make_shared(); + auto future = end_of_pipeline_client_->async_send_request( + request, + [this](rclcpp::Client::SharedFuture result) { + const auto& response = result.get(); + if (response->success) { + RCLCPP_INFO(get_logger(), + "End-of-pipeline service call succeeded: %s", + response->message.c_str()); + rclcpp::shutdown(); + } else { + RCLCPP_ERROR(get_logger(), + "End-of-pipeline service call failed: %s", + response->message.c_str()); + service_called_ = false; + consecutive_detections_ = 0; + } + }); +} + +} // namespace pipeline_end_detector