diff --git a/src/Cameras/video_streaming/CMakeLists.txt b/src/Cameras/video_streaming/CMakeLists.txt index 35a0974f..0cc37cf4 100644 --- a/src/Cameras/video_streaming/CMakeLists.txt +++ b/src/Cameras/video_streaming/CMakeLists.txt @@ -28,6 +28,8 @@ add_library(${PROJECT_NAME} SHARED src/webrtc_node.cpp src/srt_node.cpp src/srt_node_client.cpp + src/rtp_node.cpp + src/rtp_client_node.cpp ) include_directories( @@ -60,6 +62,8 @@ rclcpp_components_register_nodes(${PROJECT_NAME} "WebRtcNode" "video_streaming::SrtNode" "video_streaming::SrtClientNode" + "video_streaming::RtpNode" + "video_streaming::RtpClientNode" ) # ---- Install diff --git a/src/Cameras/video_streaming/include/video_streaming/input_node.hpp b/src/Cameras/video_streaming/include/video_streaming/input_node.hpp index dc9a3408..f99df7e7 100644 --- a/src/Cameras/video_streaming/include/video_streaming/input_node.hpp +++ b/src/Cameras/video_streaming/include/video_streaming/input_node.hpp @@ -43,4 +43,5 @@ class InputNode : public BaseVideoNode { private: std::map source_map_; rclcpp::Service::SharedPtr video_service_; + std::shared_ptr current_video_request_; }; diff --git a/src/Cameras/video_streaming/include/video_streaming/rtp_client_node.hpp b/src/Cameras/video_streaming/include/video_streaming/rtp_client_node.hpp new file mode 100644 index 00000000..e4377cf5 --- /dev/null +++ b/src/Cameras/video_streaming/include/video_streaming/rtp_client_node.hpp @@ -0,0 +1,28 @@ +#pragma once + +#include "base_video_node.hpp" + +#include +#include +#include +#include + +namespace video_streaming { + +class RtpClientNode : public BaseVideoNode { +public: + explicit RtpClientNode(const rclcpp::NodeOptions &options); + +protected: + bool create_pipeline() override; + +private: + rcl_interfaces::msg::SetParametersResult + on_parameter_change(const std::vector ¶meters); + std::string get_pipeline_description(); + OnSetParametersCallbackHandle::SharedPtr param_callback_handle_; + uint16_t dest_port_; + int latency_ms_; +}; + +} // namespace video_streaming \ No newline at end of file diff --git a/src/Cameras/video_streaming/include/video_streaming/rtp_node.hpp b/src/Cameras/video_streaming/include/video_streaming/rtp_node.hpp new file mode 100644 index 00000000..409a55c9 --- /dev/null +++ b/src/Cameras/video_streaming/include/video_streaming/rtp_node.hpp @@ -0,0 +1,49 @@ +#pragma once + +#include "base_video_node.hpp" + +#include +#include +#include +#include +#include +#include + +namespace video_streaming { + +class RtpNode : public BaseVideoNode { +public: + explicit RtpNode(const rclcpp::NodeOptions &options); + ~RtpNode() override; + +protected: + bool create_pipeline() override; + +private: + rcl_interfaces::msg::SetParametersResult + on_parameter_change(const std::vector ¶meters); + + void set_bitrate(int32_t bitrate); + void set_resolution(int width, int height); + void on_iframe_trigger(const std_msgs::msg::Empty::SharedPtr msg); + std::string get_pipeline_description(); + + int target_framerate_; + int bitrate_; + int fec_; + int dest_port_; + int width_; + int height_; + + std::string dest_ip_; + + GstElement *h265_encoder_{nullptr}; + + rclcpp::Subscription::SharedPtr bitrate_sub_; + rclcpp::Subscription::SharedPtr iframe_sub_; + rclcpp::Publisher::SharedPtr restart_pub_; + + OnSetParametersCallbackHandle::SharedPtr param_callback_handle_; +}; + +} // namespace video_streaming \ No newline at end of file diff --git a/src/Cameras/video_streaming/launch/srt_client.launch.py b/src/Cameras/video_streaming/launch/rtp_client.launch.py similarity index 62% rename from src/Cameras/video_streaming/launch/srt_client.launch.py rename to src/Cameras/video_streaming/launch/rtp_client.launch.py index e940d1c5..28d36a52 100644 --- a/src/Cameras/video_streaming/launch/srt_client.launch.py +++ b/src/Cameras/video_streaming/launch/rtp_client.launch.py @@ -5,21 +5,21 @@ def generate_launch_description(): - srt_client_node = ComposableNode( + rtp_client_node = ComposableNode( package="video_streaming", - plugin="video_streaming::SrtClientNode", - name="srt_client_node", + plugin="video_streaming::RtpClientNode", + name="rtp_client_node", namespace="", - parameters=[{"srt_uri": "srt://192.168.0.55:7001?mode=caller"}], + parameters=[{"port": 5004, "latency_ms": 40}], ) # Container_mt container = ComposableNodeContainer( - name="test_client_container", + name="video_client_container", namespace="", package="rclcpp_components", executable="component_container", - composable_node_descriptions=[srt_client_node], + composable_node_descriptions=[rtp_client_node], output="screen", ) diff --git a/src/Cameras/video_streaming/launch/video_streaming.launch.py b/src/Cameras/video_streaming/launch/video_streaming.launch.py index d35be823..d3fe5910 100644 --- a/src/Cameras/video_streaming/launch/video_streaming.launch.py +++ b/src/Cameras/video_streaming/launch/video_streaming.launch.py @@ -34,18 +34,12 @@ def generate_launch_description(): parameters=[], ) - srt_node = ComposableNode( + rtp_node = ComposableNode( package="video_streaming", - plugin="video_streaming::SrtNode", - name="srt_node", + plugin="video_streaming::RtpNode", + name="rtp_node", namespace="", - parameters=[ - { - "srt_uri": "srt://:7001", - "latency": 100, - "iframe_interval": 30, - } - ], + parameters=[{"dest_ip": "192.168.1.100", "dest_port": 5004}], ) # Create a container for all 3 components @@ -58,7 +52,7 @@ def generate_launch_description(): input_node, detect_node, # streaming_node, - srt_node, + rtp_node, ], output="screen", ) diff --git a/src/Cameras/video_streaming/src/input_node.cpp b/src/Cameras/video_streaming/src/input_node.cpp index f778e2ed..565fabac 100644 --- a/src/Cameras/video_streaming/src/input_node.cpp +++ b/src/Cameras/video_streaming/src/input_node.cpp @@ -89,6 +89,10 @@ bool InputNode::create_pipeline() { } pipeline_ = p; + if (current_video_request_) { + video_cb(current_video_request_, + std::make_shared()); + } return true; } @@ -167,6 +171,9 @@ void InputNode::video_cb( RCLCPP_ERROR(this->get_logger(), "Failed to start pipeline"); response->success = false; } + if (response->success) { + current_video_request_ = request; + } } RCLCPP_COMPONENTS_REGISTER_NODE(InputNode) diff --git a/src/Cameras/video_streaming/src/rtp_client_node.cpp b/src/Cameras/video_streaming/src/rtp_client_node.cpp new file mode 100644 index 00000000..301e1b65 --- /dev/null +++ b/src/Cameras/video_streaming/src/rtp_client_node.cpp @@ -0,0 +1,88 @@ +#include "rtp_client_node.hpp" +#include +namespace video_streaming { +RtpClientNode::RtpClientNode(const rclcpp::NodeOptions &options) + : BaseVideoNode("rtp_client_node", options) { + RCLCPP_INFO(this->get_logger(), "Initializing RtpClientNode..."); + + this->declare_parameter("port", 7001); + this->declare_parameter("latency_ms", 200); + dest_port_ = this->get_parameter("port").as_int(); + latency_ms_ = this->get_parameter("latency_ms").as_int(); + param_callback_handle_ = this->add_on_set_parameters_callback(std::bind( + &RtpClientNode::on_parameter_change, this, std::placeholders::_1)); + if (!start_pipeline()) { + RCLCPP_FATAL(get_logger(), "RtpClientNode: failed to start pipeline."); + throw std::runtime_error("RtpClientNode: pipeline start failed"); + } + RCLCPP_INFO(get_logger(), "RtpClientNode constructed and pipeline started."); +} + +std::string RtpClientNode::get_pipeline_description() { + std::stringstream desc; + desc << "udpsrc port=" << dest_port_ << " caps=" + << "\"application/x-rtp,media=video,encoding-name=H265,payload=96, " + << "clock-rate=90000\" ! rtpjitterbuffer mode=4 latency=" << latency_ms_ + << " drop-on-latency=true ! rtpulpfecdec ! rtph265depay ! h265parse ! " + << "nvv4l2decoder ! nvvidconv ! nveglglessink sync=false"; + return desc.str(); +} + +bool RtpClientNode::create_pipeline() { + std::string desc = get_pipeline_description(); + RCLCPP_INFO(this->get_logger(), "Pipeline description: %s", desc.c_str()); + + GError *err = nullptr; + GstElement *p = gst_parse_launch(desc.c_str(), &err); + + if (err || !p) { + RCLCPP_ERROR(get_logger(), "gst_parse_launch failed: %s", + err ? err->message : "unknown"); + if (err) { + g_error_free(err); + } + return false; + } + if (!GST_IS_PIPELINE(p)) { + RCLCPP_ERROR(get_logger(), "Parsed element is not a pipeline."); + gst_object_unref(p); + return false; + } + pipeline_ = p; + + RCLCPP_INFO(this->get_logger(), "Pipeline parsed successfully."); + return true; +} +rcl_interfaces::msg::SetParametersResult RtpClientNode::on_parameter_change( + const std::vector ¶meters) { + rcl_interfaces::msg::SetParametersResult result; + result.successful = true; + result.reason = "success"; + bool needs_restart = false; + for (const auto ¶m : parameters) { + const std::string &name = param.get_name(); + + // Destination Port + if (name == "port") { + dest_port_ = param.as_int(); + RCLCPP_INFO(this->get_logger(), "Param Update: port = %d", dest_port_); + needs_restart = true; + continue; + } + // Latency + if (name == "latency_ms") { + latency_ms_ = param.as_int(); + RCLCPP_INFO(this->get_logger(), "Param Update: latency_ms = %d", + latency_ms_); + needs_restart = true; + continue; + } + } + if (needs_restart) { + stop_pipeline(); + start_pipeline(); + } + return result; +} +RCLCPP_COMPONENTS_REGISTER_NODE(video_streaming::RtpClientNode); +} // namespace video_streaming \ No newline at end of file diff --git a/src/Cameras/video_streaming/src/rtp_node.cpp b/src/Cameras/video_streaming/src/rtp_node.cpp new file mode 100644 index 00000000..be7d7c9c --- /dev/null +++ b/src/Cameras/video_streaming/src/rtp_node.cpp @@ -0,0 +1,219 @@ +#include "rtp_node.hpp" +#include "std_msgs/msg/empty.hpp" +#include "std_msgs/msg/int32.hpp" +#include +#include +#include +#include +namespace video_streaming { + +RtpNode::RtpNode(const rclcpp::NodeOptions &options) + : BaseVideoNode("rtp_node", options) { + RCLCPP_INFO(this->get_logger(), "Initializing RtpNode..."); + + this->declare_parameter("dest_ip", "127.0.0.1"); + this->declare_parameter("dest_port", 7001); + this->declare_parameter("target_framerate", 30); + this->declare_parameter("bitrate", 2000000); + this->declare_parameter("fec_percent", 20); + this->declare_parameter("default_width", 1280); + this->declare_parameter("default_height", 720); + + target_framerate_ = this->get_parameter("target_framerate").as_int(); + if (target_framerate_ <= 0) { + RCLCPP_ERROR(this->get_logger(), + "Invalid initial target_framerate: %d. Must be > 0.", + target_framerate_); + throw std::runtime_error("Invalid initial target_framerate"); + } + bitrate_ = this->get_parameter("bitrate").as_int(); + fec_ = this->get_parameter("fec_percent").as_int(); + dest_ip_ = this->get_parameter("dest_ip").as_string(); + dest_port_ = this->get_parameter("dest_port").as_int(); + width_ = this->get_parameter("default_width").as_int(); + height_ = this->get_parameter("default_height").as_int(); + h265_encoder_ = nullptr; + + param_callback_handle_ = this->add_on_set_parameters_callback( + std::bind(&RtpNode::on_parameter_change, this, std::placeholders::_1)); + + bitrate_sub_ = this->create_subscription( + "~/set_bitrate", 10, [this](const std_msgs::msg::Int32::SharedPtr msg) { + set_bitrate(msg->data); + }); + + iframe_sub_ = this->create_subscription( + "~/trigger_iframe", 10, + std::bind(&RtpNode::on_iframe_trigger, this, std::placeholders::_1)); + restart_pub_ = this->create_publisher( + "/all_video/restart_pipeline", 10); + + if (!start_pipeline()) { + RCLCPP_FATAL(get_logger(), "RtpNode: failed to start pipeline."); + throw std::runtime_error("RtpNode: pipeline start failed"); + } + RCLCPP_INFO(get_logger(), "RtpNode constructed and pipeline started."); +} + +RtpNode::~RtpNode() { BaseVideoNode::safe_gst_unref(h265_encoder_); } + +std::string RtpNode::get_pipeline_description() { + int video_bitrate = bitrate_ / (1 + fec_ / 100.0); + std::stringstream desc; + desc << "interpipesrc format=3 listen-to=detect is-live=true ! " + << "nvvidconv ! videorate ! " + << "capsfilter caps=\"video/x-raw,framerate=" << target_framerate_ + << "/1\" ! nvvidconv ! " + << "capsfilter caps=\"video/x-raw(memory:NVMM),height=" << height_ + << ",width=" << width_ << "\" ! nvv4l2h265enc bitrate=" << video_bitrate + << " name=h265_enc EnableTwopassCBR=true control-rate=1 " + "maxperf-enable=true ! queue ! rtph265pay config-interval=-1 ! " + "rtpulpfecenc percentage=" + << fec_ << " ! udpsink host=" << dest_ip_ << " port=" << dest_port_; + return desc.str(); +} + +bool RtpNode::create_pipeline() { + std::string desc = get_pipeline_description(); + RCLCPP_INFO(this->get_logger(), "Pipeline description: %s", desc.c_str()); + + GError *err = nullptr; + GstElement *p = gst_parse_launch(desc.c_str(), &err); + + if (err || !p) { + RCLCPP_ERROR(get_logger(), "gst_parse_launch failed: %s", + err ? err->message : "unknown"); + if (err) { + g_error_free(err); + } + return false; + } + if (!GST_IS_PIPELINE(p)) { + RCLCPP_ERROR(get_logger(), "Parsed element is not a pipeline."); + gst_object_unref(p); + return false; + } + pipeline_ = p; + // Unref any existing elements before getting new ones + safe_gst_unref(h265_encoder_); + h265_encoder_ = this->get_element("h265_enc"); + + if (!h265_encoder_) { + RCLCPP_ERROR(get_logger(), "Failed to get elements by name"); + return false; + } + + RCLCPP_INFO(this->get_logger(), + "Pipeline parsed and elements retrieved successfully."); + return true; +} + +rcl_interfaces::msg::SetParametersResult +RtpNode::on_parameter_change(const std::vector ¶meters) { + rcl_interfaces::msg::SetParametersResult result; + result.successful = true; + result.reason = "success"; + + bool needs_restart = false; + + for (const auto ¶m : parameters) { + const std::string &name = param.get_name(); + + // Forward error correction + if (name == "fec_percent") { + fec_ = param.as_int(); + RCLCPP_INFO(this->get_logger(), + "Param Update: Forward Error Correction set to %d%%", fec_); + needs_restart = true; + continue; + } + // Framerate + if (name == "target_framerate") { + target_framerate_ = param.as_int(); + RCLCPP_INFO(this->get_logger(), "Param Update: target_framerate = %d", + target_framerate_); + if (target_framerate_ <= 0) { + result.successful = false; + result.reason = "target_framerate must be greater than 0"; + RCLCPP_ERROR(this->get_logger(), + "Invalid target_framerate: %d. Must be > 0.", + target_framerate_); + return result; + } + needs_restart = true; + continue; + } + // Destination IP + if (name == "dest_ip") { + dest_ip_ = param.as_string(); + RCLCPP_INFO(this->get_logger(), "Param Update: dest_ip = %s", + dest_ip_.c_str()); + needs_restart = true; + continue; + } + // Destination Port + if (name == "dest_port") { + dest_port_ = param.as_int(); + RCLCPP_INFO(this->get_logger(), "Param Update: dest_port = %d", + dest_port_); + needs_restart = true; + continue; + } + // Bitrate + if (name == "bitrate") { + int bitrate = param.as_int(); + set_bitrate(bitrate); + RCLCPP_INFO(this->get_logger(), "Param Update: bitrate = %d", bitrate); + RCLCPP_WARN_ONCE(this->get_logger(), + "Bitrate changes should be made via the " + "~/set_bitrate topic for smoother transitions."); + continue; + } + } + if (needs_restart && result.successful) { + restart_pub_->publish(std_msgs::msg::Empty()); + } + return result; +} + +void RtpNode::set_bitrate(const int32_t bitrate) { + bitrate_ = bitrate; + int video_bitrate = bitrate / (1 + fec_ / 100.0); + if (h265_encoder_) { + pause_pipeline(); + g_object_set(h265_encoder_, "bitrate", video_bitrate, NULL); + resume_pipeline(); + RCLCPP_DEBUG(this->get_logger(), "Bitrate set to %d", video_bitrate); + } + // Adjust resolution based on bitrate thresholds + // TODO: These thresholds are arbitrary and may need tuning based on actual + // performance and quality requirements + int bits_per_frame = video_bitrate / target_framerate_; + if (bits_per_frame < 50000) { + set_resolution(640, 360); + } else if (bits_per_frame < 100000) { + set_resolution(854, 480); + } else if (bits_per_frame < 200000) { + set_resolution(1280, 720); + } else { + set_resolution(1920, 1080); + } +} + +void RtpNode::set_resolution(const int width, const int height) { + if (width == width_ && height == height_) { + return; + } + width_ = width; + height_ = height; + restart_pub_->publish(std_msgs::msg::Empty()); +} + +void RtpNode::on_iframe_trigger(const std_msgs::msg::Empty::SharedPtr msg) { + if (h265_encoder_) { + g_signal_emit_by_name(h265_encoder_, "force-IDR"); + } +} + +RCLCPP_COMPONENTS_REGISTER_NODE(video_streaming::RtpNode); +} // namespace video_streaming \ No newline at end of file