Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/Cameras/video_streaming/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ add_library(${PROJECT_NAME} SHARED
src/webrtc_node.cpp
src/srt_node.cpp
src/srt_node_client.cpp
src/rtp_node.cpp
src/rtp_client_node.cpp
)

include_directories(
Expand Down Expand Up @@ -60,6 +62,8 @@ rclcpp_components_register_nodes(${PROJECT_NAME}
"WebRtcNode"
"video_streaming::SrtNode"
"video_streaming::SrtClientNode"
"video_streaming::RtpNode"
"video_streaming::RtpClientNode"
)

# ---- Install
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,5 @@ class InputNode : public BaseVideoNode {
private:
std::map<std::string, size_t> source_map_;
rclcpp::Service<interfaces::srv::VideoOut>::SharedPtr video_service_;
std::shared_ptr<interfaces::srv::VideoOut::Request> current_video_request_;
};
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#pragma once

#include "base_video_node.hpp"

#include <gst/gst.h>
#include <rcl_interfaces/msg/set_parameters_result.hpp>
#include <rclcpp/node_options.hpp>
#include <rclcpp/rclcpp.hpp>

namespace video_streaming {

class RtpClientNode : public BaseVideoNode {
public:
explicit RtpClientNode(const rclcpp::NodeOptions &options);

protected:
bool create_pipeline() override;

private:
rcl_interfaces::msg::SetParametersResult
on_parameter_change(const std::vector<rclcpp::Parameter> &parameters);
std::string get_pipeline_description();
OnSetParametersCallbackHandle::SharedPtr param_callback_handle_;
uint16_t dest_port_;
int latency_ms_;
};

} // namespace video_streaming
49 changes: 49 additions & 0 deletions src/Cameras/video_streaming/include/video_streaming/rtp_node.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
#pragma once

#include "base_video_node.hpp"

#include <gst/gst.h>
#include <rcl_interfaces/msg/set_parameters_result.hpp>
#include <rclcpp/node_options.hpp>
#include <rclcpp/rclcpp.hpp>
#include <std_msgs/msg/empty.hpp>
#include <std_msgs/msg/int32.hpp>

namespace video_streaming {

class RtpNode : public BaseVideoNode {
public:
explicit RtpNode(const rclcpp::NodeOptions &options);
~RtpNode() override;

protected:
bool create_pipeline() override;

private:
rcl_interfaces::msg::SetParametersResult
on_parameter_change(const std::vector<rclcpp::Parameter> &parameters);

void set_bitrate(int32_t bitrate);
void set_resolution(int width, int height);
void on_iframe_trigger(const std_msgs::msg::Empty::SharedPtr msg);
std::string get_pipeline_description();

int target_framerate_;
int bitrate_;
int fec_;
int dest_port_;
int width_;
int height_;

std::string dest_ip_;

GstElement *h265_encoder_{nullptr};

rclcpp::Subscription<std_msgs::msg::Int32>::SharedPtr bitrate_sub_;
rclcpp::Subscription<std_msgs::msg::Empty>::SharedPtr iframe_sub_;
rclcpp::Publisher<std_msgs::msg::Empty>::SharedPtr restart_pub_;

OnSetParametersCallbackHandle::SharedPtr param_callback_handle_;
};

} // namespace video_streaming
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,21 @@

def generate_launch_description():

srt_client_node = ComposableNode(
rtp_client_node = ComposableNode(
package="video_streaming",
plugin="video_streaming::SrtClientNode",
name="srt_client_node",
plugin="video_streaming::RtpClientNode",
name="rtp_client_node",
namespace="",
parameters=[{"srt_uri": "srt://192.168.0.55:7001?mode=caller"}],
parameters=[{"port": 5004, "latency_ms": 40}],
)

# Container_mt
container = ComposableNodeContainer(
name="test_client_container",
name="video_client_container",
namespace="",
package="rclcpp_components",
executable="component_container",
composable_node_descriptions=[srt_client_node],
composable_node_descriptions=[rtp_client_node],
output="screen",
)

Expand Down
16 changes: 5 additions & 11 deletions src/Cameras/video_streaming/launch/video_streaming.launch.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,18 +34,12 @@ def generate_launch_description():
parameters=[],
)

srt_node = ComposableNode(
rtp_node = ComposableNode(
package="video_streaming",
plugin="video_streaming::SrtNode",
name="srt_node",
plugin="video_streaming::RtpNode",
name="rtp_node",
namespace="",
parameters=[
{
"srt_uri": "srt://:7001",
"latency": 100,
"iframe_interval": 30,
}
],
parameters=[{"dest_ip": "192.168.1.100", "dest_port": 5004}],
)

# Create a container for all 3 components
Expand All @@ -58,7 +52,7 @@ def generate_launch_description():
input_node,
detect_node,
# streaming_node,
srt_node,
rtp_node,
],
output="screen",
)
Expand Down
7 changes: 7 additions & 0 deletions src/Cameras/video_streaming/src/input_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ bool InputNode::create_pipeline() {
}

pipeline_ = p;
if (current_video_request_) {
video_cb(current_video_request_,
std::make_shared<interfaces::srv::VideoOut::Response>());
}
return true;
}

Expand Down Expand Up @@ -167,6 +171,9 @@ void InputNode::video_cb(
RCLCPP_ERROR(this->get_logger(), "Failed to start pipeline");
response->success = false;
}
if (response->success) {
current_video_request_ = request;
}
}

RCLCPP_COMPONENTS_REGISTER_NODE(InputNode)
88 changes: 88 additions & 0 deletions src/Cameras/video_streaming/src/rtp_client_node.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
#include "rtp_client_node.hpp"
#include <rclcpp_components/register_node_macro.hpp>
namespace video_streaming {
RtpClientNode::RtpClientNode(const rclcpp::NodeOptions &options)
: BaseVideoNode("rtp_client_node", options) {
RCLCPP_INFO(this->get_logger(), "Initializing RtpClientNode...");

this->declare_parameter<int>("port", 7001);
this->declare_parameter<int>("latency_ms", 200);
dest_port_ = this->get_parameter("port").as_int();
latency_ms_ = this->get_parameter("latency_ms").as_int();
param_callback_handle_ = this->add_on_set_parameters_callback(std::bind(
&RtpClientNode::on_parameter_change, this, std::placeholders::_1));
if (!start_pipeline()) {
RCLCPP_FATAL(get_logger(), "RtpClientNode: failed to start pipeline.");
throw std::runtime_error("RtpClientNode: pipeline start failed");
}
RCLCPP_INFO(get_logger(), "RtpClientNode constructed and pipeline started.");
}

std::string RtpClientNode::get_pipeline_description() {
std::stringstream desc;
desc << "udpsrc port=" << dest_port_ << " caps="
<< "\"application/x-rtp,media=video,encoding-name=H265,payload=96, "
<< "clock-rate=90000\" ! rtpjitterbuffer mode=4 latency=" << latency_ms_
<< " drop-on-latency=true ! rtpulpfecdec ! rtph265depay ! h265parse ! "
<< "nvv4l2decoder ! nvvidconv ! nveglglessink sync=false";
return desc.str();
}

bool RtpClientNode::create_pipeline() {
std::string desc = get_pipeline_description();
RCLCPP_INFO(this->get_logger(), "Pipeline description: %s", desc.c_str());

GError *err = nullptr;
GstElement *p = gst_parse_launch(desc.c_str(), &err);

if (err || !p) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be standard to do nullptr checks like this in modern C++, IDK. But I think explicitly presenting the comparison is generally better.

i.e.
p == nullptr

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Modern C++ you typically explicitly compare to nullptr. This is very much a c idiom. This whole node follows the c style since gstreamer is a c library.

RCLCPP_ERROR(get_logger(), "gst_parse_launch failed: %s",
err ? err->message : "unknown");
if (err) {
g_error_free(err);
}
return false;
}
if (!GST_IS_PIPELINE(p)) {
RCLCPP_ERROR(get_logger(), "Parsed element is not a pipeline.");
gst_object_unref(p);
return false;
}
pipeline_ = p;

RCLCPP_INFO(this->get_logger(), "Pipeline parsed successfully.");
return true;
}
rcl_interfaces::msg::SetParametersResult RtpClientNode::on_parameter_change(
const std::vector<rclcpp::Parameter> &parameters) {
rcl_interfaces::msg::SetParametersResult result;
result.successful = true;
result.reason = "success";
bool needs_restart = false;
for (const auto &param : parameters) {
const std::string &name = param.get_name();

// Destination Port
if (name == "port") {
dest_port_ = param.as_int();
RCLCPP_INFO(this->get_logger(), "Param Update: port = %d", dest_port_);
needs_restart = true;
continue;
}
// Latency
if (name == "latency_ms") {
latency_ms_ = param.as_int();
RCLCPP_INFO(this->get_logger(), "Param Update: latency_ms = %d",
latency_ms_);
needs_restart = true;
continue;
}
}
if (needs_restart) {
stop_pipeline();
start_pipeline();
}
return result;
}
RCLCPP_COMPONENTS_REGISTER_NODE(video_streaming::RtpClientNode);
} // namespace video_streaming
Loading
Loading