Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions ddspipe_core/src/cpp/communication/rpc/RpcBridge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,7 @@ void RpcBridge::init_nts_()
{
EPROSIMA_LOG_INFO(DDSPIPE_RPCBRIDGE, "Creating endpoints in RpcBridge for service " << rpc_topic_ << ".");

// TODO: remove and use every participant
std::set<ParticipantId> ids = participants_->get_rtps_participants_ids();
std::set<ParticipantId> ids = participants_->get_participants_ids();

// Create a proxy client and server in each RTPS participant
for (ParticipantId id: ids)
Expand Down Expand Up @@ -318,7 +317,10 @@ void RpcBridge::transmit_(

SampleIdentity reply_related_sample_identity =
rpc_data.write_params.get_reference().sample_identity();
reply_related_sample_identity.sequence_number(rpc_data.origin_sequence_number);
// Use the reader GUID of the reply for the service client to replace the writer GUID of
// the request for the service client
reply_related_sample_identity.writer_guid(
rpc_data.write_params.get_reference().related_sample_identity().writer_guid());

if (reply_related_sample_identity == SampleIdentity::unknown())
{
Expand Down Expand Up @@ -376,7 +378,9 @@ void RpcBridge::transmit_(

// A Server could be answering a different client in this same DDS Pipe or a remote client
// Thus, it must be filtered so only replies to this client are processed.
if (rpc_data.write_params.get_reference().sample_identity().writer_guid() != reader->guid())
if (!rpc_data.write_params.is_set() ||
rpc_data.write_params.get_reference().related_sample_identity().writer_guid()
!= reader->guid())
{
logDebug(DDSPIPE_RPCBRIDGE,
"RpcBridge for service " << *this << " from reader " << reader->guid() <<
Expand All @@ -386,14 +390,16 @@ void RpcBridge::transmit_(
else
{
std::pair<ParticipantId, SampleIdentity> registry_entry;
auto request_sequence_number =
rpc_data.write_params.get_reference().related_sample_identity().sequence_number();
{
// Wait for request transmission to be finished (entry added to registry)
std::lock_guard<std::recursive_mutex> lock(
service_registries_[reader->participant_id()]->get_mutex());

// Fetch information required for transmission; which proxy server should send it and with what parameters
registry_entry = service_registries_[reader->participant_id()]->get(
rpc_data.write_params.get_reference().sample_identity().sequence_number());
request_sequence_number);
}

// Not valid means:
Expand All @@ -415,7 +421,7 @@ void RpcBridge::transmit_(
else
{
service_registries_[reader->participant_id()]->erase(
rpc_data.write_params.get_reference().sample_identity().sequence_number());
request_sequence_number);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,26 @@ class CommonReader : public BaseReader, public fastdds::dds::DataReaderListener,
fastdds::dds::Topic* topic,
fastdds::dds::InconsistentTopicStatus status);

/////////////////////////
// RPC REQUIRED METHODS
/////////////////////////
// TODO remove these methods once the double reference is solved

//! Get GUID of internal RTPS reader
DDSPIPE_PARTICIPANTS_DllAPI
core::types::Guid guid() const override;

//! Get internal RTPS reader mutex
DDSPIPE_PARTICIPANTS_DllAPI
fastdds::RecursiveTimedMutex& get_rtps_mutex() const override;

//! Get number of unread cache changes in internal RTPS reader
DDSPIPE_PARTICIPANTS_DllAPI
uint64_t get_unread_count() const override;

DDSPIPE_PARTICIPANTS_DllAPI
core::types::DdsTopic topic() const override;

protected:

/////////////////////////
Expand Down Expand Up @@ -158,7 +178,7 @@ class CommonReader : public BaseReader, public fastdds::dds::DataReaderListener,
const fastdds::dds::SampleInfo& info,
core::types::RtpsPayloadData& data_to_fill) const noexcept;


virtual core::types::RtpsPayloadData* create_data_() const noexcept;
/////////////////////////
// EXTERNAL METHODS
/////////////////////////
Expand All @@ -176,6 +196,9 @@ class CommonReader : public BaseReader, public fastdds::dds::DataReaderListener,

fastdds::dds::Subscriber* dds_subscriber_;
fastdds::dds::DataReader* reader_;

// simulate rtps mutex
mutable fastdds::RecursiveTimedMutex mp_mutex_;
};

} /* namespace dds */
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright 2025 Sony Group Corporation.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <ddspipe_core/types/topic/rpc/RpcTopic.hpp>

#include <ddspipe_participants/library/library_dll.h>
#include <ddspipe_participants/reader/dds/CommonReader.hpp>

namespace eprosima {
namespace ddspipe {
namespace participants {
namespace rpc {

/**
* Base dds Reader concrete class that implements abstract CommonReader one.
*/
class DdsSimpleReader : public dds::CommonReader
{
public:

/**
* @brief Construct a new DdsSimpleReader object
*
* Get the Attributes and TopicQoS and create the SimpleReader History and the DDS SimpleReader.
*
* @param participant_id Router Id of the Participant that has created this SimpleReader.
* @param topic Topic that this SimpleReader subscribes to.
* @param payload_pool Shared Payload Pool to received data and take it.
* @param subscriber DDS Subscriber
* @param topic_entity DDS Topic
*
* @throw \c InitializationException in case any creation has failed
*/
DDSPIPE_PARTICIPANTS_DllAPI
DdsSimpleReader(
const core::types::ParticipantId& participant_id,
const core::types::DdsTopic& topic,
const std::shared_ptr<core::PayloadPool>& payload_pool,
fastdds::dds::DomainParticipant* participant,
fastdds::dds::Topic* topic_entity);

protected:
//! Override Parent method to fill fields exclusive from RPC.
virtual void fill_received_data_(
const fastdds::dds::SampleInfo& info,
core::types::RtpsPayloadData& data_to_fill) const noexcept override;

virtual core::types::RtpsPayloadData* create_data_() const noexcept override;
};

} /* namespace rpc */
} /* namespace participants */
} /* namespace ddspipe */
} /* namespace eprosima */
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,12 @@ class CommonWriter : public BaseWriter
eprosima::fastdds::rtps::WriteParams& to_send_params,
const core::types::RtpsPayloadData& data) const noexcept;

DDSPIPE_PARTICIPANTS_DllAPI
virtual
void fill_sent_data_(
const eprosima::fastdds::rtps::WriteParams& sent_params,
core::types::RtpsPayloadData& data_to_fill) const noexcept;

/////////////////////////
// EXTERNAL VARIABLES
/////////////////////////
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Copyright 2025 Sony Group Corporation.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <ddspipe_participants/library/library_dll.h>
#include <ddspipe_participants/efficiency/cache_change/CacheChangePool.hpp>
#include <ddspipe_participants/writer/dds/CommonWriter.hpp>

namespace eprosima {
namespace ddspipe {
namespace participants {
namespace rpc {

/**
* Base DDS DataWriter concrete class that implements abstract CommonWriter one.
*/
class DdsSimpleWriter : public dds::CommonWriter
{
public:
/**
* @brief Construct a new DdsSimpleWriter object
*
* Get the Attributes and TopicQoS and create the SimpleWriter History and the DDS SimpleWriter.
*
* @note use protected constructor so this class is not called but from subclasses
* (Basically make abstract class without a pure virtual function).
*
* @param participant_id Router Id of the Participant that has created this SimpleWriter.
* @param topic Topic that this SimpleWriter subscribes to.
* @param payload_pool Shared Payload Pool to received data and take it.
* @param participant DDS Participant pointer.
* @param topic_entity DDS Topic pointer.
*
* @throw \c InitializationException in case any creation has failed
*/
DDSPIPE_PARTICIPANTS_DllAPI
DdsSimpleWriter(
const core::types::ParticipantId& participant_id,
const core::types::DdsTopic& topic,
const std::shared_ptr<core::PayloadPool>& payload_pool,
fastdds::dds::DomainParticipant* participant,
fastdds::dds::Topic* topic_entity,
const bool repeater = false);

//! Override Parent method to fill fields only required for RPC.
DDSPIPE_PARTICIPANTS_DllAPI
virtual utils::ReturnCode fill_to_send_data_(
eprosima::fastdds::rtps::WriteParams& to_send_params,
const core::types::RtpsPayloadData& data) const noexcept;

//! Override Parent method to fill fields only required for RPC.
DDSPIPE_PARTICIPANTS_DllAPI
virtual void fill_sent_data_(
const eprosima::fastdds::rtps::WriteParams& sent_params,
core::types::RtpsPayloadData& data_to_fill) const noexcept;
};

} /* namespace rpc */
} /* namespace participants */
} /* namespace ddspipe */
} /* namespace eprosima */
Loading