From d156033d49f1d49ad35f8105a7d1f9f943f5e630 Mon Sep 17 00:00:00 2001 From: Barry Xu Date: Tue, 9 Sep 2025 06:30:41 +0000 Subject: [PATCH 1/2] Add support for RPC topics Signed-off-by: Barry Xu --- .../src/cpp/communication/rpc/RpcBridge.cpp | 18 ++- .../reader/dds/CommonReader.hpp | 25 +++- .../reader/rpc/DdsSimpleReader.hpp | 67 ++++++++++ .../writer/dds/CommonWriter.hpp | 6 + .../writer/rpc/DdsSimpleWriter.hpp | 73 +++++++++++ .../cpp/participant/dds/CommonParticipant.cpp | 121 +++++++++++------- .../src/cpp/reader/dds/CommonReader.cpp | 27 +++- .../src/cpp/reader/rpc/DdsSimpleReader.cpp | 101 +++++++++++++++ .../src/cpp/writer/dds/CommonWriter.cpp | 20 ++- .../src/cpp/writer/rpc/DdsSimpleWriter.cpp | 72 +++++++++++ 10 files changed, 476 insertions(+), 54 deletions(-) create mode 100644 ddspipe_participants/include/ddspipe_participants/reader/rpc/DdsSimpleReader.hpp create mode 100644 ddspipe_participants/include/ddspipe_participants/writer/rpc/DdsSimpleWriter.hpp create mode 100644 ddspipe_participants/src/cpp/reader/rpc/DdsSimpleReader.cpp create mode 100644 ddspipe_participants/src/cpp/writer/rpc/DdsSimpleWriter.cpp diff --git a/ddspipe_core/src/cpp/communication/rpc/RpcBridge.cpp b/ddspipe_core/src/cpp/communication/rpc/RpcBridge.cpp index da9880cab..f1504e44b 100644 --- a/ddspipe_core/src/cpp/communication/rpc/RpcBridge.cpp +++ b/ddspipe_core/src/cpp/communication/rpc/RpcBridge.cpp @@ -61,8 +61,7 @@ void RpcBridge::init_nts_() { EPROSIMA_LOG_INFO(DDSPIPE_RPCBRIDGE, "Creating endpoints in RpcBridge for service " << rpc_topic_ << "."); - // TODO: remove and use every participant - std::set ids = participants_->get_rtps_participants_ids(); + std::set ids = participants_->get_participants_ids(); // Create a proxy client and server in each RTPS participant for (ParticipantId id: ids) @@ -318,7 +317,10 @@ void RpcBridge::transmit_( SampleIdentity reply_related_sample_identity = rpc_data.write_params.get_reference().sample_identity(); - reply_related_sample_identity.sequence_number(rpc_data.origin_sequence_number); + // Use the reader GUID of the reply for the service client to replace the writer GUID of + // the request for the service client + reply_related_sample_identity.writer_guid( + rpc_data.write_params.get_reference().related_sample_identity().writer_guid()); if (reply_related_sample_identity == SampleIdentity::unknown()) { @@ -376,7 +378,9 @@ void RpcBridge::transmit_( // A Server could be answering a different client in this same DDS Pipe or a remote client // Thus, it must be filtered so only replies to this client are processed. - if (rpc_data.write_params.get_reference().sample_identity().writer_guid() != reader->guid()) + if (!rpc_data.write_params.is_set() || + rpc_data.write_params.get_reference().related_sample_identity().writer_guid() + != reader->guid()) { logDebug(DDSPIPE_RPCBRIDGE, "RpcBridge for service " << *this << " from reader " << reader->guid() << @@ -386,6 +390,8 @@ void RpcBridge::transmit_( else { std::pair registry_entry; + auto request_sequence_number = + rpc_data.write_params.get_reference().related_sample_identity().sequence_number(); { // Wait for request transmission to be finished (entry added to registry) std::lock_guard lock( @@ -393,7 +399,7 @@ void RpcBridge::transmit_( // Fetch information required for transmission; which proxy server should send it and with what parameters registry_entry = service_registries_[reader->participant_id()]->get( - rpc_data.write_params.get_reference().sample_identity().sequence_number()); + request_sequence_number); } // Not valid means: @@ -415,7 +421,7 @@ void RpcBridge::transmit_( else { service_registries_[reader->participant_id()]->erase( - rpc_data.write_params.get_reference().sample_identity().sequence_number()); + request_sequence_number); } } } diff --git a/ddspipe_participants/include/ddspipe_participants/reader/dds/CommonReader.hpp b/ddspipe_participants/include/ddspipe_participants/reader/dds/CommonReader.hpp index 56a7c4bc3..688ae6256 100644 --- a/ddspipe_participants/include/ddspipe_participants/reader/dds/CommonReader.hpp +++ b/ddspipe_participants/include/ddspipe_participants/reader/dds/CommonReader.hpp @@ -87,6 +87,26 @@ class CommonReader : public BaseReader, public fastdds::dds::DataReaderListener, fastdds::dds::Topic* topic, fastdds::dds::InconsistentTopicStatus status); + ///////////////////////// + // RPC REQUIRED METHODS + ///////////////////////// + // TODO remove these methods once the double reference is solved + + //! Get GUID of internal RTPS reader + DDSPIPE_PARTICIPANTS_DllAPI + core::types::Guid guid() const override; + + //! Get internal RTPS reader mutex + DDSPIPE_PARTICIPANTS_DllAPI + fastdds::RecursiveTimedMutex& get_rtps_mutex() const override; + + //! Get number of unread cache changes in internal RTPS reader + DDSPIPE_PARTICIPANTS_DllAPI + uint64_t get_unread_count() const override; + + DDSPIPE_PARTICIPANTS_DllAPI + core::types::DdsTopic topic() const override; + protected: ///////////////////////// @@ -158,7 +178,7 @@ class CommonReader : public BaseReader, public fastdds::dds::DataReaderListener, const fastdds::dds::SampleInfo& info, core::types::RtpsPayloadData& data_to_fill) const noexcept; - + virtual core::types::RtpsPayloadData* create_data_() const noexcept; ///////////////////////// // EXTERNAL METHODS ///////////////////////// @@ -176,6 +196,9 @@ class CommonReader : public BaseReader, public fastdds::dds::DataReaderListener, fastdds::dds::Subscriber* dds_subscriber_; fastdds::dds::DataReader* reader_; + + // simulate rtps mutex + mutable fastdds::RecursiveTimedMutex mp_mutex_; }; } /* namespace dds */ diff --git a/ddspipe_participants/include/ddspipe_participants/reader/rpc/DdsSimpleReader.hpp b/ddspipe_participants/include/ddspipe_participants/reader/rpc/DdsSimpleReader.hpp new file mode 100644 index 000000000..2a10c989f --- /dev/null +++ b/ddspipe_participants/include/ddspipe_participants/reader/rpc/DdsSimpleReader.hpp @@ -0,0 +1,67 @@ +// Copyright 2025 Sony Group Corporation. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include + +#include +#include + +namespace eprosima { +namespace ddspipe { +namespace participants { +namespace rpc { + +/** + * Base dds Reader concrete class that implements abstract CommonReader one. + */ +class DdsSimpleReader : public dds::CommonReader +{ +public: + + /** + * @brief Construct a new DdsSimpleReader object + * + * Get the Attributes and TopicQoS and create the SimpleReader History and the DDS SimpleReader. + * + * @param participant_id Router Id of the Participant that has created this SimpleReader. + * @param topic Topic that this SimpleReader subscribes to. + * @param payload_pool Shared Payload Pool to received data and take it. + * @param subscriber DDS Subscriber + * @param topic_entity DDS Topic + * + * @throw \c InitializationException in case any creation has failed + */ + DDSPIPE_PARTICIPANTS_DllAPI + DdsSimpleReader( + const core::types::ParticipantId& participant_id, + const core::types::DdsTopic& topic, + const std::shared_ptr& payload_pool, + fastdds::dds::DomainParticipant* participant, + fastdds::dds::Topic* topic_entity); + +protected: + //! Override Parent method to fill fields exclusive from RPC. + virtual void fill_received_data_( + const fastdds::dds::SampleInfo& info, + core::types::RtpsPayloadData& data_to_fill) const noexcept override; + + virtual core::types::RtpsPayloadData* create_data_() const noexcept override; +}; + +} /* namespace rpc */ +} /* namespace participants */ +} /* namespace ddspipe */ +} /* namespace eprosima */ diff --git a/ddspipe_participants/include/ddspipe_participants/writer/dds/CommonWriter.hpp b/ddspipe_participants/include/ddspipe_participants/writer/dds/CommonWriter.hpp index 515fd860e..a955694b8 100644 --- a/ddspipe_participants/include/ddspipe_participants/writer/dds/CommonWriter.hpp +++ b/ddspipe_participants/include/ddspipe_participants/writer/dds/CommonWriter.hpp @@ -164,6 +164,12 @@ class CommonWriter : public BaseWriter eprosima::fastdds::rtps::WriteParams& to_send_params, const core::types::RtpsPayloadData& data) const noexcept; + DDSPIPE_PARTICIPANTS_DllAPI + virtual + void fill_sent_data_( + const eprosima::fastdds::rtps::WriteParams& sent_params, + core::types::RtpsPayloadData& data_to_fill) const noexcept; + ///////////////////////// // EXTERNAL VARIABLES ///////////////////////// diff --git a/ddspipe_participants/include/ddspipe_participants/writer/rpc/DdsSimpleWriter.hpp b/ddspipe_participants/include/ddspipe_participants/writer/rpc/DdsSimpleWriter.hpp new file mode 100644 index 000000000..65292e23c --- /dev/null +++ b/ddspipe_participants/include/ddspipe_participants/writer/rpc/DdsSimpleWriter.hpp @@ -0,0 +1,73 @@ +// Copyright 2025 Sony Group Corporation. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include + +namespace eprosima { +namespace ddspipe { +namespace participants { +namespace rpc { + +/** + * Base DDS DataWriter concrete class that implements abstract CommonWriter one. + */ +class DdsSimpleWriter : public dds::CommonWriter +{ +public: + /** + * @brief Construct a new DdsSimpleWriter object + * + * Get the Attributes and TopicQoS and create the SimpleWriter History and the DDS SimpleWriter. + * + * @note use protected constructor so this class is not called but from subclasses + * (Basically make abstract class without a pure virtual function). + * + * @param participant_id Router Id of the Participant that has created this SimpleWriter. + * @param topic Topic that this SimpleWriter subscribes to. + * @param payload_pool Shared Payload Pool to received data and take it. + * @param participant DDS Participant pointer. + * @param topic_entity DDS Topic pointer. + * + * @throw \c InitializationException in case any creation has failed + */ + DDSPIPE_PARTICIPANTS_DllAPI + DdsSimpleWriter( + const core::types::ParticipantId& participant_id, + const core::types::DdsTopic& topic, + const std::shared_ptr& payload_pool, + fastdds::dds::DomainParticipant* participant, + fastdds::dds::Topic* topic_entity, + const bool repeater = false); + + //! Override Parent method to fill fields only required for RPC. + DDSPIPE_PARTICIPANTS_DllAPI + virtual utils::ReturnCode fill_to_send_data_( + eprosima::fastdds::rtps::WriteParams& to_send_params, + const core::types::RtpsPayloadData& data) const noexcept; + + //! Override Parent method to fill fields only required for RPC. + DDSPIPE_PARTICIPANTS_DllAPI + virtual void fill_sent_data_( + const eprosima::fastdds::rtps::WriteParams& sent_params, + core::types::RtpsPayloadData& data_to_fill) const noexcept; +}; + +} /* namespace rpc */ +} /* namespace participants */ +} /* namespace ddspipe */ +} /* namespace eprosima */ diff --git a/ddspipe_participants/src/cpp/participant/dds/CommonParticipant.cpp b/ddspipe_participants/src/cpp/participant/dds/CommonParticipant.cpp index 9fda04eaa..4244eaab4 100644 --- a/ddspipe_participants/src/cpp/participant/dds/CommonParticipant.cpp +++ b/ddspipe_participants/src/cpp/participant/dds/CommonParticipant.cpp @@ -20,6 +20,7 @@ #include #include +#include #include #include @@ -27,9 +28,11 @@ #include #include #include +#include #include #include #include +#include #include #include @@ -125,39 +128,54 @@ std::shared_ptr CommonParticipant::create_writer( } const core::types::DdsTopic& dds_topic = *topic_ptr; - // Check that it is RTPS topic - if (dds_topic.internal_type_discriminator() != core::types::INTERNAL_TOPIC_TYPE_RTPS) - { - logDebug(DDSPIPE_DDS_PARTICIPANT, "Not creating Writer for non RTPS topic " << dds_topic.topic_name()); - return std::make_shared(); - } - // Get the DDS Topic associated (create it if it does not exist) fastdds::dds::Topic* fastdds_topic = topic_related_(dds_topic); - if (dds_topic.topic_qos.has_partitions() || dds_topic.topic_qos.has_ownership()) + if (dds_topic.internal_type_discriminator() == core::types::INTERNAL_TOPIC_TYPE_RPC) { - // Notice that MultiWriter does not require an init call - return std::make_shared( + logDebug(DDSPIPE_DDS_PARTICIPANT, + "Creating DDS RPC Writer for topic " << fastdds_topic->get_name() + << " for " << configuration_->domain); + auto writer = std::make_shared( this->id(), dds_topic, this->payload_pool_, dds_participant_, - fastdds_topic, - configuration_->is_repeater); + fastdds_topic); + writer->init(); + return writer; + } + else if (dds_topic.internal_type_discriminator() == core::types::INTERNAL_TOPIC_TYPE_RTPS) + { + if (dds_topic.topic_qos.has_partitions() || dds_topic.topic_qos.has_ownership()) + { + // Notice that MultiWriter does not require an init call + return std::make_shared( + this->id(), + dds_topic, + this->payload_pool_, + dds_participant_, + fastdds_topic, + configuration_->is_repeater); + } + else + { + auto writer = std::make_shared( + this->id(), + dds_topic, + this->payload_pool_, + dds_participant_, + fastdds_topic, + configuration_->is_repeater); + writer->init(); + + return writer; + } } else { - auto writer = std::make_shared( - this->id(), - dds_topic, - this->payload_pool_, - dds_participant_, - fastdds_topic, - configuration_->is_repeater); - writer->init(); - - return writer; + logDevError(DDSPIPE_RTPS_PARTICIPANT, "Incorrect dds Topic in Writer creation."); + return std::make_shared(); } } @@ -175,41 +193,56 @@ std::shared_ptr CommonParticipant::create_reader( const core::types::DdsTopic& dds_topic = *topic_ptr; - // Check that it is RTPS topic - if (dds_topic.internal_type_discriminator() != core::types::INTERNAL_TOPIC_TYPE_RTPS) - { - logDebug(DDSPIPE_DDS_PARTICIPANT, "Not creating Reader for non RTPS topic " << dds_topic.topic_name()); - return std::make_shared(); - } - // Get the DDS Topic associated (create it if it does not exist) fastdds::dds::Topic* fastdds_topic = topic_related_(dds_topic); - if (dds_topic.topic_qos.has_partitions() || dds_topic.topic_qos.has_ownership()) + if (dds_topic.internal_type_discriminator() == core::types::INTERNAL_TOPIC_TYPE_RPC) { - // Notice that MultiReader does not require an init call - auto reader = std::make_shared( + logDebug(DDSPIPE_DDS_PARTICIPANT, + "Creating DDS RPC Reader for topic " << fastdds_topic->get_name() + << " for " << configuration_->domain); + auto reader = std::make_shared( this->id(), dds_topic, this->payload_pool_, dds_participant_, - fastdds_topic, - discovery_database_); + fastdds_topic); reader->init(); - return reader; } + else if (dds_topic.internal_type_discriminator() == core::types::INTERNAL_TOPIC_TYPE_RTPS) + { + if (dds_topic.topic_qos.has_partitions() || dds_topic.topic_qos.has_ownership()) + { + // Notice that MultiReader does not require an init call + auto reader = std::make_shared( + this->id(), + dds_topic, + this->payload_pool_, + dds_participant_, + fastdds_topic, + discovery_database_); + reader->init(); + + return reader; + } + else + { + auto reader = std::make_shared( + this->id(), + dds_topic, + this->payload_pool_, + dds_participant_, + fastdds_topic); + reader->init(); + + return reader; + } + } else { - auto reader = std::make_shared( - this->id(), - dds_topic, - this->payload_pool_, - dds_participant_, - fastdds_topic); - reader->init(); - - return reader; + logDevError(DDSPIPE_RTPS_PARTICIPANT, "Incorrect dds Topic in Reader creation."); + return std::make_shared(); } } diff --git a/ddspipe_participants/src/cpp/reader/dds/CommonReader.cpp b/ddspipe_participants/src/cpp/reader/dds/CommonReader.cpp index a6ceffbc9..ca67a64d4 100644 --- a/ddspipe_participants/src/cpp/reader/dds/CommonReader.cpp +++ b/ddspipe_participants/src/cpp/reader/dds/CommonReader.cpp @@ -188,7 +188,7 @@ utils::ReturnCode CommonReader::take_nts_( do { - rtps_data.reset(new RtpsPayloadData()); + rtps_data.reset(create_data_()); auto ret = reader_->take_next_sample(rtps_data.get(), &info); @@ -348,6 +348,31 @@ void CommonReader::fill_received_data_( } } +core::types::Guid CommonReader::guid() const +{ + return reader_->guid(); +} + +fastdds::RecursiveTimedMutex& CommonReader::get_rtps_mutex() const +{ + return mp_mutex_; +} + +uint64_t CommonReader::get_unread_count() const +{ + return reader_->get_unread_count(); +} + +core::types::DdsTopic CommonReader::topic() const +{ + return topic_; +} + +RtpsPayloadData* CommonReader::create_data_() const noexcept +{ + return new RtpsPayloadData(); +} + } /* namespace dds */ } /* namespace participants */ } /* namespace ddspipe */ diff --git a/ddspipe_participants/src/cpp/reader/rpc/DdsSimpleReader.cpp b/ddspipe_participants/src/cpp/reader/rpc/DdsSimpleReader.cpp new file mode 100644 index 000000000..e800926dc --- /dev/null +++ b/ddspipe_participants/src/cpp/reader/rpc/DdsSimpleReader.cpp @@ -0,0 +1,101 @@ +// Copyright 2025 Sony Group Corporation. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include + +#include +#include + +#include +#include + +#include + +namespace eprosima { +namespace ddspipe { +namespace participants { +namespace rpc { + +DdsSimpleReader::DdsSimpleReader( + const core::types::ParticipantId& participant_id, + const core::types::DdsTopic& topic, + const std::shared_ptr& payload_pool, + fastdds::dds::DomainParticipant* participant, + fastdds::dds::Topic* topic_entity) + : CommonReader( + participant_id, topic, payload_pool, participant, topic_entity) +{ + EPROSIMA_LOG_INFO(DDSPIPE_RPC_READER, "Creating RPC Reader for topic " << topic_); +} + +void DdsSimpleReader::fill_received_data_( + const fastdds::dds::SampleInfo& info, + core::types::RtpsPayloadData& data_to_fill) const noexcept +{ + core::types::RpcPayloadData& rpc_data = dynamic_cast(data_to_fill); + + // Store the new data that has arrived in the Track data + // Get the writer guid + rpc_data.source_guid = detail::guid_from_instance_handle(info.publication_handle); + // Get source timestamp + rpc_data.source_timestamp = info.source_timestamp; + // Get Participant receiver + rpc_data.participant_receiver = participant_id_; + + // Set Instance Handle to rpc_data + if (topic_.topic_qos.keyed) + { + rpc_data.instanceHandle = info.instance_handle; + + // Set change kind + switch (info.instance_state) + { + case fastdds::dds::InstanceStateKind::ALIVE_INSTANCE_STATE: + rpc_data.kind = core::types::ChangeKind::ALIVE; + break; + + case fastdds::dds::InstanceStateKind::NOT_ALIVE_DISPOSED_INSTANCE_STATE: + rpc_data.kind = core::types::ChangeKind::NOT_ALIVE_DISPOSED; + break; + + default: + rpc_data.kind = core::types::ChangeKind::NOT_ALIVE_UNREGISTERED; + break; + } + } + else + { + rpc_data.kind = core::types::ChangeKind::ALIVE; + } + + fastdds::rtps::WriteParams write_params{}; + write_params.related_sample_identity() = info.related_sample_identity; + write_params.sample_identity() = info.sample_identity; + write_params.source_timestamp() = info.source_timestamp; + rpc_data.write_params.set_value(write_params); + + rpc_data.origin_sequence_number = info.sample_identity.sequence_number(); +} + +core::types::RtpsPayloadData* DdsSimpleReader::create_data_() const noexcept +{ + return new core::types::RpcPayloadData(); +} + +} /* namespace rpc */ +} /* namespace participants */ +} /* namespace ddspipe */ +} /* namespace eprosima */ diff --git a/ddspipe_participants/src/cpp/writer/dds/CommonWriter.cpp b/ddspipe_participants/src/cpp/writer/dds/CommonWriter.cpp index 34611b608..056229c9f 100644 --- a/ddspipe_participants/src/cpp/writer/dds/CommonWriter.cpp +++ b/ddspipe_participants/src/cpp/writer/dds/CommonWriter.cpp @@ -146,7 +146,14 @@ utils::ReturnCode CommonWriter::write_nts_( // (if it corresponds to a keyed topic), so it is equivalent to write with instance handle (and can hence use the // write with params overload to cover all cases). Future developers should be aware of this and might need to // update this method if the DataWriter implementation changes at some point. - return payload_pool_->write(writer_, &rtps_data, wparams); + auto ret = payload_pool_->write(writer_, &rtps_data, wparams); + if (ret != utils::ReturnCode::RETCODE_OK) + { + return ret; + } + + fill_sent_data_(wparams, rtps_data); + return utils::ReturnCode::RETCODE_OK; // TODO: handle dipose case -> DataWriter::write will always send ALIVE changes, so this case must be handled // with additional logic (e.g. by using unregister_instance instead of write). @@ -196,7 +203,9 @@ fastdds::dds::DataWriterQos CommonWriter::reckon_writer_qos_() const noexcept } // Set minimum deadline so it matches with everything - qos.deadline().period = eprosima::fastdds::dds::Duration_t(0); + // TODO: Comment the below line is a workaround for the issue. For a discussion on this topic, + // please refer to: https://github.com/eProsima/DDS-Pipe/issues/146#issuecomment-2944568048 + // qos.deadline().period = eprosima::fastdds::dds::Duration_t(0); return qos; } @@ -221,6 +230,13 @@ utils::ReturnCode CommonWriter::fill_to_send_data_( return utils::ReturnCode::RETCODE_OK; } +void CommonWriter::fill_sent_data_( + const eprosima::fastdds::rtps::WriteParams& sent_params, + core::types::RtpsPayloadData& data_to_fill) const noexcept +{ + // Do nothing +} + } /* namespace dds */ } /* namespace participants */ } /* namespace ddspipe */ diff --git a/ddspipe_participants/src/cpp/writer/rpc/DdsSimpleWriter.cpp b/ddspipe_participants/src/cpp/writer/rpc/DdsSimpleWriter.cpp new file mode 100644 index 000000000..99eeef299 --- /dev/null +++ b/ddspipe_participants/src/cpp/writer/rpc/DdsSimpleWriter.cpp @@ -0,0 +1,72 @@ +// Copyright 2025 Sony Group Corporation. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include + +#include +#include + +#include + +#include +#include +#include + +namespace eprosima { +namespace ddspipe { +namespace participants { +namespace rpc { + +DdsSimpleWriter::DdsSimpleWriter( + const core::types::ParticipantId& participant_id, + const core::types::DdsTopic& topic, + const std::shared_ptr& payload_pool, + fastdds::dds::DomainParticipant* participant, + fastdds::dds::Topic* topic_entity, + const bool repeater) + : CommonWriter( + participant_id, topic, payload_pool, participant, topic_entity, repeater) +{ + // Do nothing + //std::cout << "Creating DDS RPC Writer for topic " << topic_ << std::endl; +} + +utils::ReturnCode DdsSimpleWriter::fill_to_send_data_( + eprosima::fastdds::rtps::WriteParams& to_send_params, + const core::types::RtpsPayloadData& data) const noexcept +{ + const core::types::RpcPayloadData& rpc_data = dynamic_cast(data); + + if (! rpc_data.write_params.is_set()) { + return utils::ReturnCode::RETCODE_ERROR; + } + + to_send_params.related_sample_identity() = rpc_data.write_params->related_sample_identity(); + return utils::ReturnCode::RETCODE_OK; +} + +void DdsSimpleWriter::fill_sent_data_( + const eprosima::fastdds::rtps::WriteParams& sent_params, + core::types::RtpsPayloadData& data_to_fill) const noexcept +{ + core::types::RpcPayloadData& rpc_data = dynamic_cast(data_to_fill); + rpc_data.sent_sequence_number = sent_params.sample_identity().sequence_number(); +} + +} /* namespace rpc */ +} /* namespace participants */ +} /* namespace ddspipe */ +} /* namespace eprosima */ From 1622b8a63f3e05c79c4b45f33059f5b686435ccb Mon Sep 17 00:00:00 2001 From: Barry Xu Date: Fri, 17 Oct 2025 06:10:51 +0000 Subject: [PATCH 2/2] Remove a workaround https://github.com/eProsima/Fast-DDS/pull/6016 has been merged. So the workaround method is removed. Signed-off-by: Barry Xu --- ddspipe_participants/src/cpp/writer/dds/CommonWriter.cpp | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/ddspipe_participants/src/cpp/writer/dds/CommonWriter.cpp b/ddspipe_participants/src/cpp/writer/dds/CommonWriter.cpp index 056229c9f..0f7eae241 100644 --- a/ddspipe_participants/src/cpp/writer/dds/CommonWriter.cpp +++ b/ddspipe_participants/src/cpp/writer/dds/CommonWriter.cpp @@ -203,9 +203,7 @@ fastdds::dds::DataWriterQos CommonWriter::reckon_writer_qos_() const noexcept } // Set minimum deadline so it matches with everything - // TODO: Comment the below line is a workaround for the issue. For a discussion on this topic, - // please refer to: https://github.com/eProsima/DDS-Pipe/issues/146#issuecomment-2944568048 - // qos.deadline().period = eprosima::fastdds::dds::Duration_t(0); + qos.deadline().period = eprosima::fastdds::dds::Duration_t(0); return qos; }