From b45c73dad4f468e5558aabd125e0b060b3f5f8b8 Mon Sep 17 00:00:00 2001 From: Deniz Tuana Ergonul Uzun Date: Fri, 6 Mar 2026 12:11:14 +0100 Subject: [PATCH 1/9] CRT topology change --- integtest/crt_reader_test.py | 20 +++++- plugins/CRTBernReaderModule.cpp | 102 +++++++++++++-------------- plugins/CRTBernReaderModule.hpp | 18 +++-- plugins/CRTGrenobleReaderModule.cpp | 103 ++++++++++++++-------------- plugins/CRTGrenobleReaderModule.hpp | 18 +++-- src/CreateSource.hpp | 2 +- 6 files changed, 138 insertions(+), 125 deletions(-) diff --git a/integtest/crt_reader_test.py b/integtest/crt_reader_test.py index 61f4b8c..6e624fd 100644 --- a/integtest/crt_reader_test.py +++ b/integtest/crt_reader_test.py @@ -45,7 +45,15 @@ data_classes.attribute_substitution( obj_class="SocketDataSender", obj_id="socket_sender_crt", - updates={"port": new_port}, + updates={"local_port": 0, "remote_port": new_port}, + ) +) +new_port = find_free_port() +onebyone_local_crt_bern_conf.config_substitutions.append( + data_classes.attribute_substitution( + obj_class="SocketDataSender", + obj_id="socket_sender_crt_2", + updates={"local_port": 0, "remote_port": new_port}, ) ) onebyone_local_crt_bern_conf.config_substitutions.append( @@ -86,7 +94,15 @@ data_classes.attribute_substitution( obj_class="SocketDataSender", obj_id="socket_sender_crt", - updates={"port": new_port}, + updates={"local_port": 0, "remote_port": new_port}, + ) +) +new_port = find_free_port() +onebyone_local_crt_grenoble_conf.config_substitutions.append( + data_classes.attribute_substitution( + obj_class="SocketDataSender", + obj_id="socket_sender_crt_2", + updates={"local_port": 0, "remote_port": new_port}, ) ) onebyone_local_crt_grenoble_conf.config_substitutions.append( diff --git a/plugins/CRTBernReaderModule.cpp b/plugins/CRTBernReaderModule.cpp index 21ffaaf..2e0e36e 100644 --- a/plugins/CRTBernReaderModule.cpp +++ b/plugins/CRTBernReaderModule.cpp @@ -17,8 +17,12 @@ #include "datahandlinglibs/utils/RateLimiter.hpp" #include "appmodel/DataReaderModule.hpp" +#include "appmodel/SocketDetectorToDaqConnection.hpp" +#include "appmodel/NWDetDataSender.hpp" #include "confmodel/QueueWithSourceId.hpp" +#include "confmodel/DetectorStream.hpp" +#include "confmodel/GeoId.hpp" #include "fddetdataformats/CRTBernFrame.hpp" @@ -37,11 +41,6 @@ constexpr uint64_t max_seq_id = 4095; */ constexpr uint8_t fake_det_id = (uint8_t)detdataformats::DetID::Subdetector::kVD_BernCRT; -/** - * @brief Fake packet stream ID - */ -constexpr uint64_t fake_stream_id = 0; - /** * @brief Fake packet block length */ @@ -87,14 +86,15 @@ fake_adc(fddetdataformats::CRTBernFrame& frame) * @param frame Fake packet * @param seq_id Fake packet sequence ID * @param timestamp Fake packet timestamp + * @param stream_id Fake packet stream ID */ void -fake_data(fddetdataformats::CRTBernFrame& frame, uint64_t& seq_id, uint64_t& timestamp) +fake_data(fddetdataformats::CRTBernFrame& frame, uint64_t& seq_id, uint64_t& timestamp, uint32_t stream_id) { frame.daq_header.det_id = fake_det_id & 0x3f; //6 bits for det id frame.daq_header.crate_id = 1; frame.daq_header.slot_id = 1; - frame.daq_header.stream_id = fake_stream_id; + frame.daq_header.stream_id = stream_id; fake_sequence_id(seq_id); frame.daq_header.seq_id = seq_id; frame.daq_header.block_length = fake_block_length; @@ -124,21 +124,43 @@ tokenize(std::string const& str, const char delim, std::vector& out } void -CRTBernReaderModule::init(const std::shared_ptr mfcg) +CRTBernReaderModule::init(const std::shared_ptr mcfg) { - auto* mdal = mfcg->get_dal(get_name()); + auto* mdal = mcfg->get_dal(get_name()); + + auto* d2d_conn = mdal->get_connections()[0]; // there's only 1 connection + auto* socket_d2d_conn = d2d_conn->cast(); + if (socket_d2d_conn == nullptr) { + auto err = datahandlinglibs::InitializationError(ERS_HERE, "Connection is not of type SocketDetectorToDaqConnection."); + ers::fatal(err); + throw err; + } + + for (auto nw_sender : socket_d2d_conn->get_net_senders()) { + if (nw_sender->is_disabled(*(mcfg->get_session()))) { + continue; + } + + for (auto det_stream : nw_sender->get_streams()) { + if (det_stream->is_disabled(*(mcfg->get_session()))) { + continue; + } + + m_fake_stream_ids[det_stream->get_source_id()] = det_stream->get_geo_id()->get_stream_id(); + } + } if (mdal->get_outputs().empty()) { - auto err = dunedaq::datahandlinglibs::InitializationError(ERS_HERE, + auto err = datahandlinglibs::InitializationError(ERS_HERE, "No outputs defined for CRT Bern reader in configuration."); ers::fatal(err); throw err; } - + for (auto* con : mdal->get_outputs()) { auto* queue = con->cast(); if (queue == nullptr) { - auto err = dunedaq::datahandlinglibs::InitializationError(ERS_HERE, "Outputs are not of type QueueWithGeoId."); + auto err = datahandlinglibs::InitializationError(ERS_HERE, "Outputs are not of type QueueWithGeoId."); ers::fatal(err); throw err; } @@ -149,12 +171,12 @@ CRTBernReaderModule::init(const std::shared_ptr mf std::vector words; tokenize(target, delim, words); - bool callback_mode = false; // TODO (DTE) : Make callback mode work? + bool callback_mode = false; if (words.front() == "cb") { - callback_mode = true; + TLOG() << "CRTBernReaderModule does not support callbacks"; + //callback_mode = true; } - m_source_id = queue->get_source_id(); auto ptr = m_sources[queue->get_source_id()] = createSourceModel(queue->UID(), callback_mode); register_node(queue->UID(), ptr); } @@ -177,11 +199,9 @@ CRTBernReaderModule::do_scrap(const CommandData_t& /*obj*/) if (m_run_marker.load()) { TLOG() << "Raising stop through variables!"; set_running(false); -// if (!m_callback_mode) { while (!m_producer_thread.get_readiness()) { std::this_thread::sleep_for(std::chrono::milliseconds(10)); } -// } } else { TLOG_DEBUG(5) << "Already stopped!"; } @@ -191,19 +211,16 @@ void CRTBernReaderModule::do_start(const CommandData_t& /*startobj*/) { // Setup callbacks on all sourcemodels - for (auto& [sourceid, source] : m_sources) { - source->acquire_callback(); - } + //for (auto& [_, source] : m_sources) { + // source->acquire_callback(); + //} enable_flow(); m_packet_count = 0; - m_t0 = std::chrono::high_resolution_clock::now(); - //if (!m_callback_mode) { - m_producer_thread.set_work(&CRTBernReaderModule::run_produce, this); - //} + m_producer_thread.set_work(&CRTBernReaderModule::run_produce, this); } void @@ -239,39 +256,22 @@ CRTBernReaderModule::run_produce() datahandlinglibs::RateLimiter rate_limiter(m_configured_packet_rate_khz); while (m_run_marker.load()) { - fake_data(frame, seq_id, timestamp); // TODO: To be filled by the CRT experts - - if (m_enable_flow.load()) [[likely]] { - handle_eth_payload(reinterpret_cast(&frame), sizeof(frame)); - ++m_packet_count; + // Create a fake packet for each stream + for (const auto& [sid, source] : m_sources) { + fake_data(frame, seq_id, timestamp, m_fake_stream_ids[sid]); // TODO: To be filled by the CRT experts + + if (m_enable_flow.load()) [[likely]] { + source->handle_payload(reinterpret_cast(&frame), sizeof(frame)); + ++m_packet_count; + } + + rate_limiter.limit(); } - - rate_limiter.limit(); } TLOG() << "Producer thread joins... "; // TODO (DTE): Debug log instead } -void -CRTBernReaderModule::handle_eth_payload(char* payload, std::size_t size) -{ - // Get DAQ Header and its StreamID - //auto* daq_header = reinterpret_cast(payload); - //auto src_id = m_stream_id_to_source_id[src_rx_q][(unsigned)daq_header->stream_id]; - - if ( auto src_it = m_sources.find(m_source_id); src_it != m_sources.end()) { - src_it->second->handle_payload(payload, size); - } else { - // Really bad -> unexpeced StreamID in UDP Payload. - // This check is needed in order to avoid dynamically add thousands - // of Sources on the fly, in case the data corruption is extremely severe. - //if (m_num_unexid_frames.count(0) == 0) { - // m_num_unexid_frames[0] = 0; - //} - //m_num_unexid_frames[0]++; - } -} - void CRTBernReaderModule::set_running(bool should_run) { diff --git a/plugins/CRTBernReaderModule.hpp b/plugins/CRTBernReaderModule.hpp index 3567272..ca705ba 100644 --- a/plugins/CRTBernReaderModule.hpp +++ b/plugins/CRTBernReaderModule.hpp @@ -40,7 +40,7 @@ class CRTBernReaderModule : public dunedaq::appfwk::DAQModule * @brief Handles initialization on boot * @param mcfg DAQ configuration data */ - void init(const std::shared_ptr mfcg) override; + void init(const std::shared_ptr mcfg) override; private: // Commands @@ -56,13 +56,6 @@ class CRTBernReaderModule : public dunedaq::appfwk::DAQModule */ void run_produce(); - /** - * @brief Forwards the payload to get processed - * @param payload Payload buffer - * @param size Payload size - */ - void handle_eth_payload(char* payload, std::size_t size); - /** * @brief Sets run marker * @param should_run Whether producer thread should continue @@ -96,12 +89,17 @@ class CRTBernReaderModule : public dunedaq::appfwk::DAQModule utilities::ReusableThread m_producer_thread; // Sinks (SourceConcepts) + using sid_to_source_map_t = std::map>; /** * @brief Data sources */ - using sid_to_source_map_t = std::map>; sid_to_source_map_t m_sources; - uint32_t m_source_id; // NOLINT(build/unsigned) + + using sid_to_fake_stream_id_map_t = std::map; + /** + * @brief Fake packet stream IDs + */ + sid_to_fake_stream_id_map_t m_fake_stream_ids; /** * @brief Configured packet transmission rate in kHz diff --git a/plugins/CRTGrenobleReaderModule.cpp b/plugins/CRTGrenobleReaderModule.cpp index 8e92f0c..e95d32f 100644 --- a/plugins/CRTGrenobleReaderModule.cpp +++ b/plugins/CRTGrenobleReaderModule.cpp @@ -17,8 +17,12 @@ #include "datahandlinglibs/utils/RateLimiter.hpp" #include "appmodel/DataReaderModule.hpp" +#include "appmodel/SocketDetectorToDaqConnection.hpp" +#include "appmodel/NWDetDataSender.hpp" #include "confmodel/QueueWithSourceId.hpp" +#include "confmodel/DetectorStream.hpp" +#include "confmodel/GeoId.hpp" #include "fddetdataformats/CRTGrenobleFrame.hpp" @@ -37,11 +41,6 @@ constexpr uint64_t max_seq_id = 4095; */ constexpr uint8_t fake_det_id = (uint8_t)detdataformats::DetID::Subdetector::kVD_GrenobleCRT; -/** - * @brief Fake packet stream ID - */ -constexpr uint64_t fake_stream_id = 0; - /** * @brief Fake packet block length */ @@ -87,14 +86,15 @@ fake_adc(fddetdataformats::CRTGrenobleFrame& frame) * @param frame Fake packet * @param seq_id Fake packet sequence ID * @param timestamp Fake packet timestamp + * @param stream_id Fake packet stream ID */ void -fake_data(fddetdataformats::CRTGrenobleFrame& frame, uint64_t& seq_id, uint64_t& timestamp) +fake_data(fddetdataformats::CRTGrenobleFrame& frame, uint64_t& seq_id, uint64_t& timestamp, uint32_t stream_id) { frame.daq_header.det_id = fake_det_id & 0x3f; //6 bits for det id frame.daq_header.crate_id = 1; frame.daq_header.slot_id = 1; - frame.daq_header.stream_id = fake_stream_id; + frame.daq_header.stream_id = stream_id; fake_sequence_id(seq_id); frame.daq_header.seq_id = seq_id; frame.daq_header.block_length = fake_block_length; @@ -124,21 +124,43 @@ tokenize(std::string const& str, const char delim, std::vector& out } void -CRTGrenobleReaderModule::init(const std::shared_ptr mfcg) +CRTGrenobleReaderModule::init(const std::shared_ptr mcfg) { - auto* mdal = mfcg->get_dal(get_name()); + auto* mdal = mcfg->get_dal(get_name()); + + auto* d2d_conn = mdal->get_connections()[0]; // there's only 1 connection + auto* socket_d2d_conn = d2d_conn->cast(); + if (socket_d2d_conn == nullptr) { + auto err = datahandlinglibs::InitializationError(ERS_HERE, "Connection is not of type SocketDetectorToDaqConnection."); + ers::fatal(err); + throw err; + } + + for (auto nw_sender : socket_d2d_conn->get_net_senders()) { + if (nw_sender->is_disabled(*(mcfg->get_session()))) { + continue; + } + + for (auto det_stream : nw_sender->get_streams()) { + if (det_stream->is_disabled(*(mcfg->get_session()))) { + continue; + } + + m_fake_stream_ids[det_stream->get_source_id()] = det_stream->get_geo_id()->get_stream_id(); + } + } if (mdal->get_outputs().empty()) { - auto err = dunedaq::datahandlinglibs::InitializationError(ERS_HERE, + auto err = datahandlinglibs::InitializationError(ERS_HERE, "No outputs defined for CRT Grenoble reader in configuration."); ers::fatal(err); throw err; } - + for (auto* con : mdal->get_outputs()) { auto* queue = con->cast(); if (queue == nullptr) { - auto err = dunedaq::datahandlinglibs::InitializationError(ERS_HERE, "Outputs are not of type QueueWithGeoId."); + auto err = datahandlinglibs::InitializationError(ERS_HERE, "Outputs are not of type QueueWithGeoId."); ers::fatal(err); throw err; } @@ -149,12 +171,12 @@ CRTGrenobleReaderModule::init(const std::shared_ptr words; tokenize(target, delim, words); - bool callback_mode = false; // TODO (DTE) : Make callback mode work? + bool callback_mode = false; if (words.front() == "cb") { - callback_mode = true; + TLOG() << "CRTGrenobleReaderModule does not support callbacks"; + //callback_mode = true; } - m_source_id = queue->get_source_id(); auto ptr = m_sources[queue->get_source_id()] = createSourceModel(queue->UID(), callback_mode); register_node(queue->UID(), ptr); } @@ -177,11 +199,9 @@ CRTGrenobleReaderModule::do_scrap(const CommandData_t& /*obj*/) if (m_run_marker.load()) { TLOG() << "Raising stop through variables!"; set_running(false); -// if (!m_callback_mode) { while (!m_producer_thread.get_readiness()) { std::this_thread::sleep_for(std::chrono::milliseconds(10)); } -// } } else { TLOG_DEBUG(5) << "Already stopped!"; } @@ -191,19 +211,17 @@ void CRTGrenobleReaderModule::do_start(const CommandData_t& /*startobj*/) { // Setup callbacks on all sourcemodels - for (auto& [sourceid, source] : m_sources) { - source->acquire_callback(); - } + //for (auto& [_, source] : m_sources) { + // source->acquire_callback(); + //} m_packet_count = 0; - + m_t0 = std::chrono::high_resolution_clock::now(); enable_flow(); - //if (!m_callback_mode) { - m_producer_thread.set_work(&CRTGrenobleReaderModule::run_produce, this); - //} + m_producer_thread.set_work(&CRTGrenobleReaderModule::run_produce, this); } void @@ -239,39 +257,22 @@ CRTGrenobleReaderModule::run_produce() datahandlinglibs::RateLimiter rate_limiter(m_configured_packet_rate_khz); while (m_run_marker.load()) { - fake_data(frame, seq_id, timestamp); // TODO: To be filled by the CRT experts - - if (m_enable_flow.load()) [[likely]] { - handle_eth_payload(reinterpret_cast(&frame), sizeof(frame)); - ++m_packet_count; + // Create a fake packet for each stream + for (const auto& [sid, source] : m_sources) { + fake_data(frame, seq_id, timestamp, m_fake_stream_ids[sid]); // TODO: To be filled by the CRT experts + + if (m_enable_flow.load()) [[likely]] { + source->handle_payload(reinterpret_cast(&frame), sizeof(frame)); + ++m_packet_count; + } + + rate_limiter.limit(); } - - rate_limiter.limit(); } TLOG() << "Producer thread joins... "; // TODO (DTE): Debug log instead } -void -CRTGrenobleReaderModule::handle_eth_payload(char* payload, std::size_t size) -{ - // Get DAQ Header and its StreamID - //auto* daq_header = reinterpret_cast(payload); - //auto src_id = m_stream_id_to_source_id[src_rx_q][(unsigned)daq_header->stream_id]; - - if ( auto src_it = m_sources.find(m_source_id); src_it != m_sources.end()) { - src_it->second->handle_payload(payload, size); - } else { - // Really bad -> unexpeced StreamID in UDP Payload. - // This check is needed in order to avoid dynamically add thousands - // of Sources on the fly, in case the data corruption is extremely severe. - //if (m_num_unexid_frames.count(0) == 0) { - // m_num_unexid_frames[0] = 0; - //} - //m_num_unexid_frames[0]++; - } -} - void CRTGrenobleReaderModule::set_running(bool should_run) { diff --git a/plugins/CRTGrenobleReaderModule.hpp b/plugins/CRTGrenobleReaderModule.hpp index c543121..93fc6da 100644 --- a/plugins/CRTGrenobleReaderModule.hpp +++ b/plugins/CRTGrenobleReaderModule.hpp @@ -40,7 +40,7 @@ class CRTGrenobleReaderModule : public dunedaq::appfwk::DAQModule * @brief Handles initialization on boot * @param mcfg DAQ configuration data */ - void init(const std::shared_ptr mfcg) override; + void init(const std::shared_ptr mcfg) override; private: // Commands @@ -56,13 +56,6 @@ class CRTGrenobleReaderModule : public dunedaq::appfwk::DAQModule */ void run_produce(); - /** - * @brief Forwards the payload to get processed - * @param payload Payload buffer - * @param size Payload size - */ - void handle_eth_payload(char* payload, std::size_t size); - /** * @brief Sets run marker * @param should_run Whether producer thread should continue @@ -96,12 +89,17 @@ class CRTGrenobleReaderModule : public dunedaq::appfwk::DAQModule utilities::ReusableThread m_producer_thread; // Sinks (SourceConcepts) + using sid_to_source_map_t = std::map>; /** * @brief Data sources */ - using sid_to_source_map_t = std::map>; sid_to_source_map_t m_sources; - uint32_t m_source_id; // NOLINT(build/unsigned) + + using sid_to_fake_stream_id_map_t = std::map; + /** + * @brief Fake packet stream IDs + */ + sid_to_fake_stream_id_map_t m_fake_stream_ids; /** * @brief Configured packet transmission rate in kHz diff --git a/src/CreateSource.hpp b/src/CreateSource.hpp index 0038548..1c9c5ce 100644 --- a/src/CreateSource.hpp +++ b/src/CreateSource.hpp @@ -30,7 +30,7 @@ createSourceModel(const std::string& conn_uid, bool callback_mode) { auto datatypes = dunedaq::iomanager::IOManager::get()->get_datatypes(conn_uid); if (datatypes.size() != 1) { - ers::error(dunedaq::datahandlinglibs::GenericConfigurationError(ERS_HERE, + ers::error(datahandlinglibs::GenericConfigurationError(ERS_HERE, "Multiple output data types specified! Expected only a single type!")); } std::string raw_dt{ *datatypes.begin() }; From 1130d19a2472b44c266231f716843f85c5c8dcac Mon Sep 17 00:00:00 2001 From: Deniz Tuana Ergonul Uzun Date: Tue, 10 Mar 2026 17:15:32 +0100 Subject: [PATCH 2/9] No callback prefix check needed anymore --- plugins/CRTBernReaderModule.cpp | 23 +---------------------- plugins/CRTGrenobleReaderModule.cpp | 25 ++----------------------- 2 files changed, 3 insertions(+), 45 deletions(-) diff --git a/plugins/CRTBernReaderModule.cpp b/plugins/CRTBernReaderModule.cpp index 2e0e36e..bba3cd9 100644 --- a/plugins/CRTBernReaderModule.cpp +++ b/plugins/CRTBernReaderModule.cpp @@ -112,17 +112,6 @@ CRTBernReaderModule::CRTBernReaderModule(const std::string& name) register_command("scrap", &CRTBernReaderModule::do_scrap); } -inline void -tokenize(std::string const& str, const char delim, std::vector& out) -{ - std::size_t start; - std::size_t end = 0; - while ((start = str.find_first_not_of(delim, end)) != std::string::npos) { - end = str.find(delim, start); - out.push_back(str.substr(start, end - start)); - } -} - void CRTBernReaderModule::init(const std::shared_ptr mcfg) { @@ -165,17 +154,7 @@ CRTBernReaderModule::init(const std::shared_ptr mc throw err; } - // Check for CB prefix indicating Callback use - const char delim = '_'; - const std::string target = queue->UID(); - std::vector words; - tokenize(target, delim, words); - - bool callback_mode = false; - if (words.front() == "cb") { - TLOG() << "CRTBernReaderModule does not support callbacks"; - //callback_mode = true; - } + bool callback_mode = false; // CRTBernReaderModule does not support callbacks auto ptr = m_sources[queue->get_source_id()] = createSourceModel(queue->UID(), callback_mode); register_node(queue->UID(), ptr); diff --git a/plugins/CRTGrenobleReaderModule.cpp b/plugins/CRTGrenobleReaderModule.cpp index e95d32f..b768513 100644 --- a/plugins/CRTGrenobleReaderModule.cpp +++ b/plugins/CRTGrenobleReaderModule.cpp @@ -112,17 +112,6 @@ CRTGrenobleReaderModule::CRTGrenobleReaderModule(const std::string& name) register_command("scrap", &CRTGrenobleReaderModule::do_scrap); } -inline void -tokenize(std::string const& str, const char delim, std::vector& out) -{ - std::size_t start; - std::size_t end = 0; - while ((start = str.find_first_not_of(delim, end)) != std::string::npos) { - end = str.find(delim, start); - out.push_back(str.substr(start, end - start)); - } -} - void CRTGrenobleReaderModule::init(const std::shared_ptr mcfg) { @@ -165,18 +154,8 @@ CRTGrenobleReaderModule::init(const std::shared_ptrUID(); - std::vector words; - tokenize(target, delim, words); - - bool callback_mode = false; - if (words.front() == "cb") { - TLOG() << "CRTGrenobleReaderModule does not support callbacks"; - //callback_mode = true; - } - + bool callback_mode = false; // CRTGrenobleReaderModule does not support callbacks + auto ptr = m_sources[queue->get_source_id()] = createSourceModel(queue->UID(), callback_mode); register_node(queue->UID(), ptr); } From 8499d7f2d89c3717cd94d62ee54b27de3fdf79f4 Mon Sep 17 00:00:00 2001 From: Deniz Tuana Ergonul Uzun Date: Fri, 13 Mar 2026 17:04:26 +0100 Subject: [PATCH 3/9] Communicates via callbacks: CRT readers with socket writers, socket readers with DHLs --- plugins/CRTBernReaderModule.cpp | 45 ++++++------ plugins/CRTBernReaderModule.hpp | 8 ++- plugins/CRTGrenobleReaderModule.cpp | 45 ++++++------ plugins/CRTGrenobleReaderModule.hpp | 8 ++- src/CreateSource.hpp | 51 +++++++++++++ src/SourceConcept.hpp | 57 +++++++++++++++ src/SourceModel.hpp | 106 ++++++++++++++++++++++++++++ 7 files changed, 266 insertions(+), 54 deletions(-) create mode 100644 src/CreateSource.hpp create mode 100644 src/SourceConcept.hpp create mode 100644 src/SourceModel.hpp diff --git a/plugins/CRTBernReaderModule.cpp b/plugins/CRTBernReaderModule.cpp index 7d48064..5015ab1 100644 --- a/plugins/CRTBernReaderModule.cpp +++ b/plugins/CRTBernReaderModule.cpp @@ -10,6 +10,8 @@ #include "CRTBernReaderModule.hpp" +#include "CreateSource.hpp" + #include "crtmodules/opmon/CRTBernReaderModule.pb.h" #include "datahandlinglibs/utils/RateLimiter.hpp" @@ -25,8 +27,6 @@ #include "detdataformats/DetID.hpp" -DUNE_DAQ_TYPESTRING(dunedaq::fddetdataformats::CRTBernFrame, "CRTBernFrame") - namespace dunedaq { namespace crtmodules{ @@ -116,6 +116,18 @@ CRTBernReaderModule::init(const std::shared_ptr mc { auto* mdal = mcfg->get_dal(get_name()); + if (mdal->get_raw_data_callbacks().empty()) { + auto err = dunedaq::datahandlinglibs::InitializationError(ERS_HERE, + "No outputs defined for CRT Bern reader in configuration."); + ers::fatal(err); + throw err; + } + + for (auto* con : mdal->get_raw_data_callbacks()) { + auto ptr = m_sources[con->get_source_id()] = createSourceModel(con); + register_node(con->UID(), ptr); + } + auto* d2d_conn = mdal->get_connections()[0]; // there's only 1 connection auto* socket_d2d_conn = d2d_conn->cast(); if (socket_d2d_conn == nullptr) { @@ -137,26 +149,6 @@ CRTBernReaderModule::init(const std::shared_ptr mc m_fake_stream_ids[det_stream->get_source_id()] = det_stream->get_geo_id()->get_stream_id(); } } - - if (mdal->get_outputs().empty()) { - auto err = datahandlinglibs::InitializationError(ERS_HERE, - "No outputs defined for CRT Bern reader in configuration."); - ers::fatal(err); - throw err; - } - - for (auto* con : mdal->get_outputs()) { - auto* queue = con->cast(); - if (queue == nullptr) { - auto err = datahandlinglibs::InitializationError(ERS_HERE, "Outputs are not of type QueueWithGeoId."); - ers::fatal(err); - throw err; - } - - // CRTBernReaderModule does not support callbacks - auto connection_name = queue->UID(); - m_raw_data_senders[queue->get_source_id()] = get_iom_sender(connection_name); - } } void @@ -187,6 +179,11 @@ CRTBernReaderModule::do_scrap(const CommandData_t& /*obj*/) void CRTBernReaderModule::do_start(const CommandData_t& /*startobj*/) { + // Setup callbacks on all sourcemodels + for (auto& [sourceid, source] : m_sources) { + source->acquire_callback(); + } + enable_flow(); m_packet_count = 0; @@ -229,11 +226,11 @@ CRTBernReaderModule::run_produce() while (m_run_marker.load()) { // Create a fake packet for each stream - for (const auto& [sid, sender] : m_raw_data_senders) { + for (const auto& [sid, source] : m_sources) { fake_data(frame, seq_id, timestamp, m_fake_stream_ids[sid]); // TODO: To be filled by the CRT experts if (m_enable_flow.load()) [[likely]] { - sender->try_send(std::move(frame), iomanager::Sender::s_no_block); + source->handle_payload(reinterpret_cast(&frame), sizeof(frame)); ++m_packet_count; } diff --git a/plugins/CRTBernReaderModule.hpp b/plugins/CRTBernReaderModule.hpp index 503f1e0..f178010 100644 --- a/plugins/CRTBernReaderModule.hpp +++ b/plugins/CRTBernReaderModule.hpp @@ -21,6 +21,8 @@ namespace dunedaq { namespace crtmodules { +class SourceConcept; + class CRTBernReaderModule : public dunedaq::appfwk::DAQModule { public: @@ -87,11 +89,11 @@ class CRTBernReaderModule : public dunedaq::appfwk::DAQModule */ utilities::ReusableThread m_producer_thread; - using sid_to_sender_map_t = std::map>>; + using sid_to_source_map_t = std::map>; /** - * @brief Raw data senders + * @brief Data sources */ - sid_to_sender_map_t m_raw_data_senders; + sid_to_source_map_t m_sources; using sid_to_fake_stream_id_map_t = std::map; /** diff --git a/plugins/CRTGrenobleReaderModule.cpp b/plugins/CRTGrenobleReaderModule.cpp index 48a18e0..f785538 100644 --- a/plugins/CRTGrenobleReaderModule.cpp +++ b/plugins/CRTGrenobleReaderModule.cpp @@ -10,6 +10,8 @@ #include "CRTGrenobleReaderModule.hpp" +#include "CreateSource.hpp" + #include "crtmodules/opmon/CRTGrenobleReaderModule.pb.h" #include "datahandlinglibs/utils/RateLimiter.hpp" @@ -26,8 +28,6 @@ #include "detdataformats/DetID.hpp" -DUNE_DAQ_TYPESTRING(dunedaq::fddetdataformats::CRTGrenobleFrame, "CRTGrenobleFrame") - namespace dunedaq { namespace crtmodules{ @@ -117,6 +117,18 @@ CRTGrenobleReaderModule::init(const std::shared_ptrget_dal(get_name()); + if (mdal->get_raw_data_callbacks().empty()) { + auto err = dunedaq::datahandlinglibs::InitializationError(ERS_HERE, + "No outputs defined for CRT Grenoble reader in configuration."); + ers::fatal(err); + throw err; + } + + for (auto* con : mdal->get_raw_data_callbacks()) { + auto ptr = m_sources[con->get_source_id()] = createSourceModel(con); + register_node(con->UID(), ptr); + } + auto* d2d_conn = mdal->get_connections()[0]; // there's only 1 connection auto* socket_d2d_conn = d2d_conn->cast(); if (socket_d2d_conn == nullptr) { @@ -138,26 +150,6 @@ CRTGrenobleReaderModule::init(const std::shared_ptrget_source_id()] = det_stream->get_geo_id()->get_stream_id(); } } - - if (mdal->get_outputs().empty()) { - auto err = datahandlinglibs::InitializationError(ERS_HERE, - "No outputs defined for CRT Grenoble reader in configuration."); - ers::fatal(err); - throw err; - } - - for (auto* con : mdal->get_outputs()) { - auto* queue = con->cast(); - if (queue == nullptr) { - auto err = datahandlinglibs::InitializationError(ERS_HERE, "Outputs are not of type QueueWithGeoId."); - ers::fatal(err); - throw err; - } - - // CRTGrenobleReaderModule does not support callbacks - auto connection_name = queue->UID(); - m_raw_data_senders[queue->get_source_id()] = get_iom_sender(connection_name); - } } void @@ -188,6 +180,11 @@ CRTGrenobleReaderModule::do_scrap(const CommandData_t& /*obj*/) void CRTGrenobleReaderModule::do_start(const CommandData_t& /*startobj*/) { + // Setup callbacks on all sourcemodels + for (auto& [sourceid, source] : m_sources) { + source->acquire_callback(); + } + m_packet_count = 0; m_t0 = std::chrono::steady_clock::now(); @@ -231,11 +228,11 @@ CRTGrenobleReaderModule::run_produce() while (m_run_marker.load()) { // Create a fake packet for each stream - for (const auto& [sid, sender] : m_raw_data_senders) { + for (const auto& [sid, source] : m_sources) { fake_data(frame, seq_id, timestamp, m_fake_stream_ids[sid]); // TODO: To be filled by the CRT experts if (m_enable_flow.load()) [[likely]] { - sender->try_send(std::move(frame), iomanager::Sender::s_no_block); + source->handle_payload(reinterpret_cast(&frame), sizeof(frame)); ++m_packet_count; } diff --git a/plugins/CRTGrenobleReaderModule.hpp b/plugins/CRTGrenobleReaderModule.hpp index 356d9c5..f2bae1f 100644 --- a/plugins/CRTGrenobleReaderModule.hpp +++ b/plugins/CRTGrenobleReaderModule.hpp @@ -21,6 +21,8 @@ namespace dunedaq { namespace crtmodules { +class SourceConcept; + class CRTGrenobleReaderModule : public dunedaq::appfwk::DAQModule { public: @@ -87,11 +89,11 @@ class CRTGrenobleReaderModule : public dunedaq::appfwk::DAQModule */ utilities::ReusableThread m_producer_thread; - using sid_to_sender_map_t = std::map>>; + using sid_to_source_map_t = std::map>; /** - * @brief Raw data senders + * @brief Data sources */ - sid_to_sender_map_t m_raw_data_senders; + sid_to_source_map_t m_sources; using sid_to_fake_stream_id_map_t = std::map; /** diff --git a/src/CreateSource.hpp b/src/CreateSource.hpp new file mode 100644 index 0000000..5a77316 --- /dev/null +++ b/src/CreateSource.hpp @@ -0,0 +1,51 @@ +/** + * @file CreateSource.hpp Specific SourceConcept creator. + * + * This is part of the DUNE DAQ , copyright 2020. + * Licensing/copyright details are in the COPYING file that you should have + * received with this code. + */ +#ifndef CRTMODULES_SRC_CREATESOURCE_HPP_ +#define CRTMODULES_SRC_CREATESOURCE_HPP_ + +#include "SourceConcept.hpp" +#include "SourceModel.hpp" +#include "datahandlinglibs/DataHandlingIssues.hpp" + +#include "fdreadoutlibs/CRTBernTypeAdapter.hpp" +#include "fdreadoutlibs/CRTGrenobleTypeAdapter.hpp" + +#include +#include + +namespace dunedaq { + +DUNE_DAQ_TYPESTRING(dunedaq::fddetdataformats::CRTBernFrame, "CRTBernFrame") +DUNE_DAQ_TYPESTRING(dunedaq::fddetdataformats::CRTGrenobleFrame, "CRTGrenobleFrame") + +namespace crtmodules { + +std::shared_ptr +createSourceModel(const appmodel::DataMoveCallbackConf* conf) +{ + auto datatype = conf->get_data_type(); + TLOG() << "Choosing specializations for SourceModel for output connection " + << " [uid:" << conf->UID() << " , data_type:" << datatype << ']'; + + if (datatype.find("CRTBernFrame") != std::string::npos) { + auto source_model = std::make_shared>(); + source_model->set_sink_config(conf); + return source_model; + } else if (datatype.find("CRTGrenobleFrame") != std::string::npos) { + auto source_model = std::make_shared>(); + source_model->set_sink_config(conf); + return source_model; + } + + return nullptr; +} + +} // namespace crtmodules +} // namespace dunedaq + +#endif // CRTMODULES_SRC_CREATESOURCE_HPP_ diff --git a/src/SourceConcept.hpp b/src/SourceConcept.hpp new file mode 100644 index 0000000..931c203 --- /dev/null +++ b/src/SourceConcept.hpp @@ -0,0 +1,57 @@ + +/** + * @file SourceConcept.hpp SourceConcept for constructors and + * forwarding command args. Enforces the implementation to + * queue in UDP JUMBO frames to be translated to TypeAdapters and + * send them to corresponding sinks. + * + * This is part of the DUNE DAQ , copyright 2020. + * Licensing/copyright details are in the COPYING file that you should have + * received with this code. + */ +#ifndef CRTMODULES_SRC_SOURCECONCEPT_HPP_ +#define CRTMODULES_SRC_SOURCECONCEPT_HPP_ + +//#include "DefaultParserImpl.hpp" + +#include "opmonlib/MonitorableObject.hpp" +#include "appfwk/DAQModule.hpp" +#include "appmodel/DataMoveCallbackConf.hpp" +//#include "packetformat/detail/block_parser.hpp" +#include + +#include +#include +#include + +namespace dunedaq { + namespace crtmodules { + + class SourceConcept : public opmonlib::MonitorableObject + { + public: + SourceConcept() {} + virtual ~SourceConcept() {} + + SourceConcept(const SourceConcept&) = delete; ///< SourceConcept is not copy-constructible + SourceConcept& operator=(const SourceConcept&) = delete; ///< SourceConcept is not copy-assignable + SourceConcept(SourceConcept&&) = delete; ///< SourceConcept is not move-constructible + SourceConcept& operator=(SourceConcept&&) = delete; ///< SourceConcept is not move-assignable + + // virtual void init(const nlohmann::json& args) = 0; + virtual void acquire_callback() = 0; + // virtual void conf(const nlohmann::json& args) = 0; + // virtual void start(const nlohmann::json& args) = 0; + // virtual void stop(const nlohmann::json& args) = 0; + + virtual bool handle_payload(char* message, std::size_t size) = 0; + + void set_sink_config(const appmodel::DataMoveCallbackConf* sink_conf) { m_sink_conf = sink_conf; } + + const appmodel::DataMoveCallbackConf* m_sink_conf; + }; + + } // namespace crtmodules +} // namespace dunedaq + +#endif // CRTMODULES_SRC_SOURCECONCEPT_HPP_ diff --git a/src/SourceModel.hpp b/src/SourceModel.hpp new file mode 100644 index 0000000..3179dc7 --- /dev/null +++ b/src/SourceModel.hpp @@ -0,0 +1,106 @@ +/** + * @file SourceModel.hpp FELIX CR's ELink concept wrapper + * + * This is part of the DUNE DAQ , copyright 2020. + * Licensing/copyright details are in the COPYING file that you should have + * received with this code. + */ +#ifndef CRTMODULES_SRC_SOURCEMODEL_HPP_ +#define CRTMODULES_SRC_SOURCEMODEL_HPP_ + +#include "SourceConcept.hpp" + + +#include "iomanager/IOManager.hpp" +#include "iomanager/Sender.hpp" +#include "logging/Logging.hpp" + +#include "crtmodules/opmon/SourceModel.pb.h" + +// #include "datahandlinglibs/utils/ReusableThread.hpp" +#include "datahandlinglibs/DataMoveCallbackRegistry.hpp" +#include "datahandlinglibs/utils/BufferCopy.hpp" + +// #include +// #include + +#include +#include +#include +#include + + +namespace dunedaq::crtmodules { + +template +class SourceModel : public SourceConcept +{ +public: + using sink_t = iomanager::SenderConcept; + using inherited = SourceConcept; + using data_t = nlohmann::json; + + /** + * @brief Buffer size based on TargetPayloadType + */ + static constexpr auto buffer_size = sizeof(TargetPayloadType); + + /** + * @brief SourceModel Constructor + * @param name Instance name for this SourceModel instance + */ + SourceModel() + : SourceConcept() + {} + ~SourceModel() {} + + void acquire_callback() override + { + if (m_callback_is_acquired) { + TLOG_DEBUG(5) << "SourceModel callback is already acquired!"; + } else { + // Getting DataMoveCBRegistry + auto dmcbr = datahandlinglibs::DataMoveCallbackRegistry::get(); + m_sink_callback = dmcbr->get_callback(inherited::m_sink_conf); + m_callback_is_acquired = true; + } + } + + bool handle_payload(char* message, std::size_t size) // NOLINT(build/unsigned) + { + bool push_out = true; + if (push_out) { + + TargetPayloadType& target_payload = *reinterpret_cast(message); + (*m_sink_callback)(std::move(target_payload)); + + } else { + TargetPayloadType target_payload; + uint32_t bytes_copied = 0; + datahandlinglibs::buffer_copy(message, size, static_cast(&target_payload), bytes_copied, sizeof(target_payload)); + } + + return true; + } + + void generate_opmon_data() override { + + opmon::SourceInfo info; + info.set_dropped_frames( m_dropped_packets.load() ); + + publish( std::move(info) ); + } + +private: + // Callback internals + bool m_callback_is_acquired{ false }; + using sink_cb_t = std::shared_ptr>; + sink_cb_t m_sink_callback; + + std::atomic m_dropped_packets{0}; + +}; + +} // namespace dunedaq::crtmodules + +#endif // CRTMODULES_SRC_SOURCEMODEL_HPP_ From 47941f3ce202d034e8a5c23e56ea264c66f8805b Mon Sep 17 00:00:00 2001 From: Deniz Tuana Ergonul Uzun Date: Fri, 13 Mar 2026 17:09:54 +0100 Subject: [PATCH 4/9] Minor refactor --- plugins/CRTBernReaderModule.cpp | 2 ++ plugins/CRTBernReaderModule.hpp | 1 - plugins/CRTGrenobleReaderModule.cpp | 2 ++ plugins/CRTGrenobleReaderModule.hpp | 1 - 4 files changed, 4 insertions(+), 2 deletions(-) diff --git a/plugins/CRTBernReaderModule.cpp b/plugins/CRTBernReaderModule.cpp index 5015ab1..11b3bbf 100644 --- a/plugins/CRTBernReaderModule.cpp +++ b/plugins/CRTBernReaderModule.cpp @@ -21,6 +21,8 @@ #include "appmodel/SocketDetectorToDaqConnection.hpp" #include "appmodel/NWDetDataSender.hpp" +#include "fddetdataformats/CRTBernFrame.hpp" + #include "confmodel/QueueWithSourceId.hpp" #include "confmodel/DetectorStream.hpp" #include "confmodel/GeoId.hpp" diff --git a/plugins/CRTBernReaderModule.hpp b/plugins/CRTBernReaderModule.hpp index f178010..1082136 100644 --- a/plugins/CRTBernReaderModule.hpp +++ b/plugins/CRTBernReaderModule.hpp @@ -13,7 +13,6 @@ #include "appfwk/DAQModule.hpp" #include "utilities/ReusableThread.hpp" -#include "fddetdataformats/CRTBernFrame.hpp" #include #include diff --git a/plugins/CRTGrenobleReaderModule.cpp b/plugins/CRTGrenobleReaderModule.cpp index f785538..940e1b4 100644 --- a/plugins/CRTGrenobleReaderModule.cpp +++ b/plugins/CRTGrenobleReaderModule.cpp @@ -20,6 +20,8 @@ #include "appmodel/SocketDetectorToDaqConnection.hpp" #include "appmodel/NWDetDataSender.hpp" +#include "fddetdataformats/CRTGrenobleFrame.hpp" + #include "confmodel/QueueWithSourceId.hpp" #include "confmodel/DetectorStream.hpp" #include "confmodel/GeoId.hpp" diff --git a/plugins/CRTGrenobleReaderModule.hpp b/plugins/CRTGrenobleReaderModule.hpp index f2bae1f..ed2fd6c 100644 --- a/plugins/CRTGrenobleReaderModule.hpp +++ b/plugins/CRTGrenobleReaderModule.hpp @@ -13,7 +13,6 @@ #include "appfwk/DAQModule.hpp" #include "utilities/ReusableThread.hpp" -#include "fddetdataformats/CRTGrenobleFrame.hpp" #include #include From 43e747062dc2e9a341d4b97df4ae7e74318fba43 Mon Sep 17 00:00:00 2001 From: Deniz Tuana Ergonul Uzun Date: Tue, 17 Mar 2026 13:49:53 +0100 Subject: [PATCH 5/9] Socket writer receives type adapters, not frames, this is required to use source emulator --- integtest/crt_reader_test.py | 54 +++++++++++++++++++----------------- src/CreateSource.hpp | 8 +++--- 2 files changed, 32 insertions(+), 30 deletions(-) diff --git a/integtest/crt_reader_test.py b/integtest/crt_reader_test.py index 6e624fd..ddf59e2 100644 --- a/integtest/crt_reader_test.py +++ b/integtest/crt_reader_test.py @@ -40,20 +40,22 @@ onebyone_local_crt_bern_conf = copy.deepcopy(common_config_obj) onebyone_local_crt_bern_conf.session = "local-socket-1x1-config" -new_port = find_free_port() +new_local_port = find_free_port() +new_remote_port = find_free_port() onebyone_local_crt_bern_conf.config_substitutions.append( data_classes.attribute_substitution( obj_class="SocketDataSender", obj_id="socket_sender_crt", - updates={"local_port": 0, "remote_port": new_port}, + updates={"local_port": new_local_port, "remote_port": new_remote_port}, ) ) -new_port = find_free_port() +new_local_port = find_free_port() +new_remote_port = find_free_port() onebyone_local_crt_bern_conf.config_substitutions.append( data_classes.attribute_substitution( obj_class="SocketDataSender", obj_id="socket_sender_crt_2", - updates={"local_port": 0, "remote_port": new_port}, + updates={"local_port": new_local_port, "remote_port": new_remote_port}, ) ) onebyone_local_crt_bern_conf.config_substitutions.append( @@ -89,20 +91,22 @@ onebyone_local_crt_grenoble_conf = copy.deepcopy(common_config_obj) onebyone_local_crt_grenoble_conf.session = "local-socket-1x1-config" -new_port = find_free_port() +new_local_port = find_free_port() +new_remote_port = find_free_port() onebyone_local_crt_grenoble_conf.config_substitutions.append( data_classes.attribute_substitution( obj_class="SocketDataSender", obj_id="socket_sender_crt", - updates={"local_port": 0, "remote_port": new_port}, + updates={"local_port": new_local_port, "remote_port": new_remote_port}, ) ) -new_port = find_free_port() +new_local_port = find_free_port() +new_remote_port = find_free_port() onebyone_local_crt_grenoble_conf.config_substitutions.append( data_classes.attribute_substitution( obj_class="SocketDataSender", obj_id="socket_sender_crt_2", - updates={"local_port": 0, "remote_port": new_port}, + updates={"local_port": new_local_port, "remote_port": new_remote_port}, ) ) onebyone_local_crt_grenoble_conf.config_substitutions.append( @@ -114,6 +118,15 @@ replacement_object_id="def-crt-grenoble-receiver-conf" ) ) +onebyone_local_crt_grenoble_conf.config_substitutions.append( + data_classes.relationship_substitution( + obj_class="CRTReaderApplication", + obj_id="crt-data-source-01", + rel_name="callback_desc", + replacement_object_class="DataMoveCallbackDescriptor", + replacement_object_id="crt-grenoble-raw-input" + ) +) onebyone_local_crt_grenoble_conf.config_substitutions.append( data_classes.list_element_substitution( obj_class="ActionPlan", @@ -136,32 +149,21 @@ ) onebyone_local_crt_grenoble_conf.config_substitutions.append( - data_classes.list_element_substitution( - obj_class="CRTReaderApplication", - obj_id="crt-data-source-01", - rel_name="queue_rules", - list_index=0, - replacement_object_class="QueueConnectionRule", - replacement_object_id="crt-grenoble-raw-data-rule" - ) -) -onebyone_local_crt_grenoble_conf.config_substitutions.append( - data_classes.list_element_substitution( + data_classes.relationship_substitution( obj_class="ReadoutApplication", obj_id="socket-ru-01", - rel_name="queue_rules", - list_index=1, - replacement_object_class="QueueConnectionRule", - replacement_object_id="crt-grenoble-callback-raw-data-rule" + rel_name="link_handler", + replacement_object_class="DataHandlerConf", + replacement_object_id="def-crt-grenoble-link-handler" ) ) onebyone_local_crt_grenoble_conf.config_substitutions.append( data_classes.relationship_substitution( obj_class="ReadoutApplication", obj_id="socket-ru-01", - rel_name="link_handler", - replacement_object_class="DataHandlerConf", - replacement_object_id="def-crt-grenoble-link-handler" + rel_name="callback_desc", + replacement_object_class="DataMoveCallbackDescriptor", + replacement_object_id="crt-grenoble-raw-input" ) ) diff --git a/src/CreateSource.hpp b/src/CreateSource.hpp index 5a77316..2612519 100644 --- a/src/CreateSource.hpp +++ b/src/CreateSource.hpp @@ -20,8 +20,8 @@ namespace dunedaq { -DUNE_DAQ_TYPESTRING(dunedaq::fddetdataformats::CRTBernFrame, "CRTBernFrame") -DUNE_DAQ_TYPESTRING(dunedaq::fddetdataformats::CRTGrenobleFrame, "CRTGrenobleFrame") +DUNE_DAQ_TYPESTRING(dunedaq::fdreadoutlibs::types::CRTBernTypeAdapter, "CRTBernFrame") +DUNE_DAQ_TYPESTRING(dunedaq::fdreadoutlibs::types::CRTGrenobleTypeAdapter, "CRTGrenobleFrame") namespace crtmodules { @@ -33,11 +33,11 @@ createSourceModel(const appmodel::DataMoveCallbackConf* conf) << " [uid:" << conf->UID() << " , data_type:" << datatype << ']'; if (datatype.find("CRTBernFrame") != std::string::npos) { - auto source_model = std::make_shared>(); + auto source_model = std::make_shared>(); source_model->set_sink_config(conf); return source_model; } else if (datatype.find("CRTGrenobleFrame") != std::string::npos) { - auto source_model = std::make_shared>(); + auto source_model = std::make_shared>(); source_model->set_sink_config(conf); return source_model; } From 815c702b97d482bf138a4d715d2664ef72e1c4e3 Mon Sep 17 00:00:00 2001 From: Deniz Tuana Ergonul Uzun Date: Wed, 18 Mar 2026 17:35:48 +0100 Subject: [PATCH 6/9] Lint changes --- plugins/CRTBernReaderModule.cpp | 26 +++++++++--------- plugins/CRTBernReaderModule.hpp | 11 ++++---- plugins/CRTGrenobleReaderModule.cpp | 26 +++++++++--------- plugins/CRTGrenobleReaderModule.hpp | 11 ++++---- src/SourceConcept.hpp | 42 ++++++++++++++--------------- src/SourceModel.hpp | 10 +++---- 6 files changed, 63 insertions(+), 63 deletions(-) diff --git a/plugins/CRTBernReaderModule.cpp b/plugins/CRTBernReaderModule.cpp index 11b3bbf..77cfe41 100644 --- a/plugins/CRTBernReaderModule.cpp +++ b/plugins/CRTBernReaderModule.cpp @@ -29,30 +29,33 @@ #include "detdataformats/DetID.hpp" -namespace dunedaq { -namespace crtmodules{ +#include +#include +#include + +namespace dunedaq::crtmodules { /** * @brief Maximum packet sequence ID before reset */ -constexpr uint64_t max_seq_id = 4095; +constexpr uint64_t max_seq_id = 4095; // NOLINT(build/unsigned) /** * @brief Fake packet detector ID */ -constexpr uint8_t fake_det_id = (uint8_t)detdataformats::DetID::Subdetector::kVD_BernCRT; +constexpr uint8_t fake_det_id = static_cast(detdataformats::DetID::Subdetector::kVD_BernCRT); // NOLINT(build/unsigned) /** * @brief Fake packet block length */ -constexpr uint64_t fake_block_length = 0x382; +constexpr uint64_t fake_block_length = 0x382; // NOLINT(build/unsigned) /** * @brief Calculate the next fake sequence ID for a packet * @param seq_id Fake packet sequence ID */ void -fake_sequence_id(uint64_t& seq_id) +fake_sequence_id(uint64_t& seq_id) // NOLINT(build/unsigned) { seq_id = (seq_id == max_seq_id ? 0 : seq_id+1); } @@ -62,7 +65,7 @@ fake_sequence_id(uint64_t& seq_id) * @param timestamp Fake packet timestamp */ void -fake_timestamp(uint64_t& timestamp) +fake_timestamp(uint64_t& timestamp) // NOLINT(build/unsigned) { auto time_now = std::chrono::steady_clock::now().time_since_epoch(); uint64_t current_time = // NOLINT (build/unsigned) @@ -90,7 +93,7 @@ fake_adc(fddetdataformats::CRTBernFrame& frame) * @param stream_id Fake packet stream ID */ void -fake_data(fddetdataformats::CRTBernFrame& frame, uint64_t& seq_id, uint64_t& timestamp, uint32_t stream_id) +fake_data(fddetdataformats::CRTBernFrame& frame, uint64_t& seq_id, uint64_t& timestamp, uint32_t stream_id) // NOLINT(build/unsigned) { frame.daq_header.det_id = fake_det_id & 0x3f; //6 bits for det id frame.daq_header.crate_id = 1; @@ -221,8 +224,8 @@ CRTBernReaderModule::run_produce() TLOG() << "Producer thread started..."; // TODO (DTE): Debug log instead fddetdataformats::CRTBernFrame frame; - uint64_t seq_id = 0; - uint64_t timestamp = 0; + uint64_t seq_id = 0; // NOLINT(build/unsigned) + uint64_t timestamp = 0; // NOLINT(build/unsigned) datahandlinglibs::RateLimiter rate_limiter(m_configured_packet_rate_khz); @@ -250,7 +253,6 @@ CRTBernReaderModule::set_running(bool should_run) TLOG_DEBUG(5) << "Active state was toggled from " << was_running << " to " << should_run; } -} -} +} // namespace dunedaq::crtmodules DEFINE_DUNE_DAQ_MODULE(dunedaq::crtmodules::CRTBernReaderModule) diff --git a/plugins/CRTBernReaderModule.hpp b/plugins/CRTBernReaderModule.hpp index 1082136..d6d7e06 100644 --- a/plugins/CRTBernReaderModule.hpp +++ b/plugins/CRTBernReaderModule.hpp @@ -16,9 +16,9 @@ #include #include +#include -namespace dunedaq { -namespace crtmodules { +namespace dunedaq::crtmodules { class SourceConcept; @@ -88,13 +88,13 @@ class CRTBernReaderModule : public dunedaq::appfwk::DAQModule */ utilities::ReusableThread m_producer_thread; - using sid_to_source_map_t = std::map>; + using sid_to_source_map_t = std::map>; // NOLINT(build/unsigned) /** * @brief Data sources */ sid_to_source_map_t m_sources; - using sid_to_fake_stream_id_map_t = std::map; + using sid_to_fake_stream_id_map_t = std::map; // NOLINT(build/unsigned) /** * @brief Fake packet stream IDs */ @@ -116,7 +116,6 @@ class CRTBernReaderModule : public dunedaq::appfwk::DAQModule */ std::chrono::time_point m_t0; }; -} // namespace crtmodules -} // namespace dunedaq +} // namespace dunedaq::crtmodules #endif // CRTMODULES_PLUGINS_CRTBERNREADERMODULE_HPP_ diff --git a/plugins/CRTGrenobleReaderModule.cpp b/plugins/CRTGrenobleReaderModule.cpp index 940e1b4..c6ff811 100644 --- a/plugins/CRTGrenobleReaderModule.cpp +++ b/plugins/CRTGrenobleReaderModule.cpp @@ -30,30 +30,33 @@ #include "detdataformats/DetID.hpp" -namespace dunedaq { -namespace crtmodules{ +#include +#include +#include + +namespace dunedaq::crtmodules { /** * @brief Maximum packet sequence ID before reset */ -constexpr uint64_t max_seq_id = 4095; +constexpr uint64_t max_seq_id = 4095; // NOLINT(build/unsigned) /** * @brief Fake packet detector ID */ -constexpr uint8_t fake_det_id = (uint8_t)detdataformats::DetID::Subdetector::kVD_GrenobleCRT; +constexpr uint8_t fake_det_id = static_cast(detdataformats::DetID::Subdetector::kVD_GrenobleCRT); // NOLINT(build/unsigned) /** * @brief Fake packet block length */ -constexpr uint64_t fake_block_length = 0x382; +constexpr uint64_t fake_block_length = 0x382; // NOLINT(build/unsigned) /** * @brief Calculate the next fake sequence ID for a packet * @param seq_id Fake packet sequence ID */ void -fake_sequence_id(uint64_t& seq_id) +fake_sequence_id(uint64_t& seq_id) // NOLINT(build/unsigned) { seq_id = (seq_id == max_seq_id ? 0 : seq_id+1); } @@ -63,7 +66,7 @@ fake_sequence_id(uint64_t& seq_id) * @param timestamp Fake packet timestamp */ void -fake_timestamp(uint64_t& timestamp) +fake_timestamp(uint64_t& timestamp) // NOLINT(build/unsigned) { auto time_now = std::chrono::steady_clock::now().time_since_epoch(); uint64_t current_time = // NOLINT (build/unsigned) @@ -91,7 +94,7 @@ fake_adc(fddetdataformats::CRTGrenobleFrame& frame) * @param stream_id Fake packet stream ID */ void -fake_data(fddetdataformats::CRTGrenobleFrame& frame, uint64_t& seq_id, uint64_t& timestamp, uint32_t stream_id) +fake_data(fddetdataformats::CRTGrenobleFrame& frame, uint64_t& seq_id, uint64_t& timestamp, uint32_t stream_id) // NOLINT(build/unsigned) { frame.daq_header.det_id = fake_det_id & 0x3f; //6 bits for det id frame.daq_header.crate_id = 1; @@ -223,8 +226,8 @@ CRTGrenobleReaderModule::run_produce() TLOG() << "Producer thread started..."; // TODO (DTE): Debug log instead fddetdataformats::CRTGrenobleFrame frame; - uint64_t seq_id = 0; - uint64_t timestamp = 0; + uint64_t seq_id = 0; // NOLINT(build/unsigned) + uint64_t timestamp = 0; // NOLINT(build/unsigned) datahandlinglibs::RateLimiter rate_limiter(m_configured_packet_rate_khz); @@ -252,7 +255,6 @@ CRTGrenobleReaderModule::set_running(bool should_run) TLOG_DEBUG(5) << "Active state was toggled from " << was_running << " to " << should_run; } -} -} +} // namespace dunedaq::crtmodules DEFINE_DUNE_DAQ_MODULE(dunedaq::crtmodules::CRTGrenobleReaderModule) diff --git a/plugins/CRTGrenobleReaderModule.hpp b/plugins/CRTGrenobleReaderModule.hpp index ed2fd6c..266589b 100644 --- a/plugins/CRTGrenobleReaderModule.hpp +++ b/plugins/CRTGrenobleReaderModule.hpp @@ -16,9 +16,9 @@ #include #include +#include -namespace dunedaq { -namespace crtmodules { +namespace dunedaq::crtmodules { class SourceConcept; @@ -88,13 +88,13 @@ class CRTGrenobleReaderModule : public dunedaq::appfwk::DAQModule */ utilities::ReusableThread m_producer_thread; - using sid_to_source_map_t = std::map>; + using sid_to_source_map_t = std::map>; // NOLINT(build/unsigned) /** * @brief Data sources */ sid_to_source_map_t m_sources; - using sid_to_fake_stream_id_map_t = std::map; + using sid_to_fake_stream_id_map_t = std::map; // NOLINT(build/unsigned) /** * @brief Fake packet stream IDs */ @@ -116,7 +116,6 @@ class CRTGrenobleReaderModule : public dunedaq::appfwk::DAQModule */ std::chrono::time_point m_t0; }; -} // namespace crtmodules -} // namespace dunedaq +} // namespace dunedaq::crtmodules #endif // CRTMODULES_PLUGINS_CRTGRENOBLEREADERMODULE_HPP_ diff --git a/src/SourceConcept.hpp b/src/SourceConcept.hpp index 931c203..5caa281 100644 --- a/src/SourceConcept.hpp +++ b/src/SourceConcept.hpp @@ -24,34 +24,32 @@ #include #include -namespace dunedaq { - namespace crtmodules { +namespace dunedaq::crtmodules { - class SourceConcept : public opmonlib::MonitorableObject - { - public: - SourceConcept() {} - virtual ~SourceConcept() {} + class SourceConcept : public opmonlib::MonitorableObject + { + public: + SourceConcept() {} + virtual ~SourceConcept() {} - SourceConcept(const SourceConcept&) = delete; ///< SourceConcept is not copy-constructible - SourceConcept& operator=(const SourceConcept&) = delete; ///< SourceConcept is not copy-assignable - SourceConcept(SourceConcept&&) = delete; ///< SourceConcept is not move-constructible - SourceConcept& operator=(SourceConcept&&) = delete; ///< SourceConcept is not move-assignable + SourceConcept(const SourceConcept&) = delete; ///< SourceConcept is not copy-constructible + SourceConcept& operator=(const SourceConcept&) = delete; ///< SourceConcept is not copy-assignable + SourceConcept(SourceConcept&&) = delete; ///< SourceConcept is not move-constructible + SourceConcept& operator=(SourceConcept&&) = delete; ///< SourceConcept is not move-assignable - // virtual void init(const nlohmann::json& args) = 0; - virtual void acquire_callback() = 0; - // virtual void conf(const nlohmann::json& args) = 0; - // virtual void start(const nlohmann::json& args) = 0; - // virtual void stop(const nlohmann::json& args) = 0; + // virtual void init(const nlohmann::json& args) = 0; + virtual void acquire_callback() = 0; + // virtual void conf(const nlohmann::json& args) = 0; + // virtual void start(const nlohmann::json& args) = 0; + // virtual void stop(const nlohmann::json& args) = 0; - virtual bool handle_payload(char* message, std::size_t size) = 0; + virtual bool handle_payload(char* message, std::size_t size) = 0; - void set_sink_config(const appmodel::DataMoveCallbackConf* sink_conf) { m_sink_conf = sink_conf; } + void set_sink_config(const appmodel::DataMoveCallbackConf* sink_conf) { m_sink_conf = sink_conf; } - const appmodel::DataMoveCallbackConf* m_sink_conf; - }; + const appmodel::DataMoveCallbackConf* m_sink_conf; + }; - } // namespace crtmodules -} // namespace dunedaq +} // namespace dunedaq::crtmodules #endif // CRTMODULES_SRC_SOURCECONCEPT_HPP_ diff --git a/src/SourceModel.hpp b/src/SourceModel.hpp index 3179dc7..167f003 100644 --- a/src/SourceModel.hpp +++ b/src/SourceModel.hpp @@ -23,12 +23,12 @@ // #include // #include - +// NOLINT(build/unsigned) #include #include #include #include - +#include namespace dunedaq::crtmodules { @@ -66,7 +66,7 @@ class SourceModel : public SourceConcept } } - bool handle_payload(char* message, std::size_t size) // NOLINT(build/unsigned) + bool handle_payload(char* message, std::size_t size) override { bool push_out = true; if (push_out) { @@ -76,7 +76,7 @@ class SourceModel : public SourceConcept } else { TargetPayloadType target_payload; - uint32_t bytes_copied = 0; + uint32_t bytes_copied = 0; // NOLINT(build/unsigned) datahandlinglibs::buffer_copy(message, size, static_cast(&target_payload), bytes_copied, sizeof(target_payload)); } @@ -97,7 +97,7 @@ class SourceModel : public SourceConcept using sink_cb_t = std::shared_ptr>; sink_cb_t m_sink_callback; - std::atomic m_dropped_packets{0}; + std::atomic m_dropped_packets{0}; // NOLINT(build/unsigned) }; From 0e4c86156d69d038beb806b7154cb224ff579cac Mon Sep 17 00:00:00 2001 From: Deniz Tuana Ergonul Uzun Date: Fri, 20 Mar 2026 10:02:31 +0100 Subject: [PATCH 7/9] Renamed CRTReader to CRTFrameBuilder --- CMakeLists.txt | 4 +- integtest/crt_reader_test.py | 26 ++++++------ ...dule.cpp => CRTBernFrameBuilderModule.cpp} | 40 +++++++++---------- ...dule.hpp => CRTBernFrameBuilderModule.hpp} | 22 +++++----- ....cpp => CRTGrenobleFrameBuilderModule.cpp} | 40 +++++++++---------- ....hpp => CRTGrenobleFrameBuilderModule.hpp} | 22 +++++----- ....proto => CRTBernFrameBuilderModule.proto} | 2 +- ...to => CRTGrenobleFrameBuilderModule.proto} | 2 +- 8 files changed, 79 insertions(+), 79 deletions(-) rename plugins/{CRTBernReaderModule.cpp => CRTBernFrameBuilderModule.cpp} (83%) rename plugins/{CRTBernReaderModule.hpp => CRTBernFrameBuilderModule.hpp} (72%) rename plugins/{CRTGrenobleReaderModule.cpp => CRTGrenobleFrameBuilderModule.cpp} (82%) rename plugins/{CRTGrenobleReaderModule.hpp => CRTGrenobleFrameBuilderModule.hpp} (71%) rename schema/crtmodules/opmon/{CRTBernReaderModule.proto => CRTBernFrameBuilderModule.proto} (79%) rename schema/crtmodules/opmon/{CRTGrenobleReaderModule.proto => CRTGrenobleFrameBuilderModule.proto} (77%) diff --git a/CMakeLists.txt b/CMakeLists.txt index ab00807..0afbfed 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -57,8 +57,8 @@ daq_add_python_bindings(*.cpp ) # See https://dune-daq-sw.readthedocs.io/en/latest/packages/daq-cmake/#daq_add_plugin #daq_add_plugin(CRTControllerModule duneDAQModule LINK_LIBRARIES crtmodules appfwk::appfwk) # Replace appfwk library with a more specific library when appropriate -daq_add_plugin(CRTGrenobleReaderModule duneDAQModule LINK_LIBRARIES crtmodules appfwk::appfwk fdreadoutlibs::fdreadoutlibs opmonlib::opmonlib) -daq_add_plugin(CRTBernReaderModule duneDAQModule LINK_LIBRARIES crtmodules appfwk::appfwk fdreadoutlibs::fdreadoutlibs opmonlib::opmonlib) +daq_add_plugin(CRTGrenobleFrameBuilderModule duneDAQModule LINK_LIBRARIES crtmodules appfwk::appfwk fdreadoutlibs::fdreadoutlibs opmonlib::opmonlib) +daq_add_plugin(CRTBernFrameBuilderModule duneDAQModule LINK_LIBRARIES crtmodules appfwk::appfwk fdreadoutlibs::fdreadoutlibs opmonlib::opmonlib) ############################################################################## diff --git a/integtest/crt_reader_test.py b/integtest/crt_reader_test.py index ddf59e2..efbd8a3 100644 --- a/integtest/crt_reader_test.py +++ b/integtest/crt_reader_test.py @@ -60,11 +60,11 @@ ) onebyone_local_crt_bern_conf.config_substitutions.append( data_classes.relationship_substitution( - obj_class="CRTReaderApplication", + obj_class="CRTFrameBuilderApplication", obj_id="crt-data-source-01", - rel_name="data_reader", - replacement_object_class="CRTBernReaderConf", - replacement_object_id="def-crt-bern-receiver-conf" + rel_name="detector_frame_builder", + replacement_object_class="CRTBernFrameBuilderConf", + replacement_object_id="def-crt-bern-frame-builder-conf" ) ) onebyone_local_crt_bern_conf.config_substitutions.append( @@ -74,7 +74,7 @@ rel_name="steps", list_index=1, replacement_object_class="DaqModulesGroupByType", - replacement_object_id="crt-bern-reader-data-source-step" + replacement_object_id="crt-bern-frame-builder-data-source-step" ) ) onebyone_local_crt_bern_conf.config_substitutions.append( @@ -84,7 +84,7 @@ rel_name="steps", list_index=0, replacement_object_class="DaqModulesGroupByType", - replacement_object_id="crt-bern-reader-data-source-step" + replacement_object_id="crt-bern-frame-builder-data-source-step" ) ) @@ -111,16 +111,16 @@ ) onebyone_local_crt_grenoble_conf.config_substitutions.append( data_classes.relationship_substitution( - obj_class="CRTReaderApplication", + obj_class="CRTFrameBuilderApplication", obj_id="crt-data-source-01", - rel_name="data_reader", - replacement_object_class="CRTGrenobleReaderConf", - replacement_object_id="def-crt-grenoble-receiver-conf" + rel_name="detector_frame_builder", + replacement_object_class="CRTGrenobleFrameBuilderConf", + replacement_object_id="def-crt-grenoble-frame-builder-conf" ) ) onebyone_local_crt_grenoble_conf.config_substitutions.append( data_classes.relationship_substitution( - obj_class="CRTReaderApplication", + obj_class="CRTFrameBuilderApplication", obj_id="crt-data-source-01", rel_name="callback_desc", replacement_object_class="DataMoveCallbackDescriptor", @@ -134,7 +134,7 @@ rel_name="steps", list_index=1, replacement_object_class="DaqModulesGroupByType", - replacement_object_id="crt-grenoble-reader-data-source-step" + replacement_object_id="crt-grenoble-frame-builder-data-source-step" ) ) onebyone_local_crt_grenoble_conf.config_substitutions.append( @@ -144,7 +144,7 @@ rel_name="steps", list_index=0, replacement_object_class="DaqModulesGroupByType", - replacement_object_id="crt-grenoble-reader-data-source-step" + replacement_object_id="crt-grenoble-frame-builder-data-source-step" ) ) diff --git a/plugins/CRTBernReaderModule.cpp b/plugins/CRTBernFrameBuilderModule.cpp similarity index 83% rename from plugins/CRTBernReaderModule.cpp rename to plugins/CRTBernFrameBuilderModule.cpp index 77cfe41..7926b2d 100644 --- a/plugins/CRTBernReaderModule.cpp +++ b/plugins/CRTBernFrameBuilderModule.cpp @@ -1,5 +1,5 @@ /** - * @file CRTBernReaderModule.cpp + * @file CRTBernFrameBuilderModule.cpp * Reads data from the HW then puts it in a queue * @@ -8,11 +8,11 @@ * received with this code. */ -#include "CRTBernReaderModule.hpp" +#include "CRTBernFrameBuilderModule.hpp" #include "CreateSource.hpp" -#include "crtmodules/opmon/CRTBernReaderModule.pb.h" +#include "crtmodules/opmon/CRTBernFrameBuilderModule.pb.h" #include "datahandlinglibs/utils/RateLimiter.hpp" #include "datahandlinglibs/DataHandlingIssues.hpp" @@ -107,23 +107,23 @@ fake_data(fddetdataformats::CRTBernFrame& frame, uint64_t& seq_id, uint64_t& tim fake_adc(frame); } -CRTBernReaderModule::CRTBernReaderModule(const std::string& name) +CRTBernFrameBuilderModule::CRTBernFrameBuilderModule(const std::string& name) : DAQModule(name) { - register_command("conf", &CRTBernReaderModule::do_conf); - register_command("start", &CRTBernReaderModule::do_start); - register_command("stop_trigger_sources", &CRTBernReaderModule::do_stop); - register_command("scrap", &CRTBernReaderModule::do_scrap); + register_command("conf", &CRTBernFrameBuilderModule::do_conf); + register_command("start", &CRTBernFrameBuilderModule::do_start); + register_command("stop_trigger_sources", &CRTBernFrameBuilderModule::do_stop); + register_command("scrap", &CRTBernFrameBuilderModule::do_scrap); } void -CRTBernReaderModule::init(const std::shared_ptr mcfg) +CRTBernFrameBuilderModule::init(const std::shared_ptr mcfg) { auto* mdal = mcfg->get_dal(get_name()); if (mdal->get_raw_data_callbacks().empty()) { auto err = dunedaq::datahandlinglibs::InitializationError(ERS_HERE, - "No outputs defined for CRT Bern reader in configuration."); + "No outputs defined for CRT Bern frame builder in configuration."); ers::fatal(err); throw err; } @@ -157,7 +157,7 @@ CRTBernReaderModule::init(const std::shared_ptr mc } void -CRTBernReaderModule::do_conf(const CommandData_t& /*obj*/) +CRTBernFrameBuilderModule::do_conf(const CommandData_t& /*obj*/) { // Configure HW interface? if (!m_run_marker.load()) { @@ -168,7 +168,7 @@ CRTBernReaderModule::do_conf(const CommandData_t& /*obj*/) } void -CRTBernReaderModule::do_scrap(const CommandData_t& /*obj*/) +CRTBernFrameBuilderModule::do_scrap(const CommandData_t& /*obj*/) { if (m_run_marker.load()) { TLOG() << "Raising stop through variables!"; @@ -182,7 +182,7 @@ CRTBernReaderModule::do_scrap(const CommandData_t& /*obj*/) } void -CRTBernReaderModule::do_start(const CommandData_t& /*startobj*/) +CRTBernFrameBuilderModule::do_start(const CommandData_t& /*startobj*/) { // Setup callbacks on all sourcemodels for (auto& [sourceid, source] : m_sources) { @@ -194,19 +194,19 @@ CRTBernReaderModule::do_start(const CommandData_t& /*startobj*/) m_packet_count = 0; m_t0 = std::chrono::steady_clock::now(); - m_producer_thread.set_work(&CRTBernReaderModule::run_produce, this); + m_producer_thread.set_work(&CRTBernFrameBuilderModule::run_produce, this); } void -CRTBernReaderModule::do_stop(const CommandData_t& /*stopobj*/) +CRTBernFrameBuilderModule::do_stop(const CommandData_t& /*stopobj*/) { disable_flow(); } void -CRTBernReaderModule::generate_opmon_data() +CRTBernFrameBuilderModule::generate_opmon_data() { - opmon::CRTBernReaderInfo i; + opmon::CRTBernFrameBuilderInfo i; auto now = std::chrono::steady_clock::now(); int new_packets = m_packet_count.exchange(0); @@ -219,7 +219,7 @@ CRTBernReaderModule::generate_opmon_data() } void -CRTBernReaderModule::run_produce() +CRTBernFrameBuilderModule::run_produce() { TLOG() << "Producer thread started..."; // TODO (DTE): Debug log instead @@ -247,7 +247,7 @@ CRTBernReaderModule::run_produce() } void -CRTBernReaderModule::set_running(bool should_run) +CRTBernFrameBuilderModule::set_running(bool should_run) { bool was_running = m_run_marker.exchange(should_run); TLOG_DEBUG(5) << "Active state was toggled from " << was_running << " to " << should_run; @@ -255,4 +255,4 @@ CRTBernReaderModule::set_running(bool should_run) } // namespace dunedaq::crtmodules -DEFINE_DUNE_DAQ_MODULE(dunedaq::crtmodules::CRTBernReaderModule) +DEFINE_DUNE_DAQ_MODULE(dunedaq::crtmodules::CRTBernFrameBuilderModule) diff --git a/plugins/CRTBernReaderModule.hpp b/plugins/CRTBernFrameBuilderModule.hpp similarity index 72% rename from plugins/CRTBernReaderModule.hpp rename to plugins/CRTBernFrameBuilderModule.hpp index d6d7e06..266d3e0 100644 --- a/plugins/CRTBernReaderModule.hpp +++ b/plugins/CRTBernFrameBuilderModule.hpp @@ -1,5 +1,5 @@ /** - * @file CRTBernReaderModule.hpp + * @file CRTBernFrameBuilderModule.hpp * * Reads data from the HW then puts it in a queue * @@ -8,8 +8,8 @@ * received with this code. */ -#ifndef CRTMODULES_PLUGINS_CRTBERNREADERMODULE_HPP_ -#define CRTMODULES_PLUGINS_CRTBERNREADERMODULE_HPP_ +#ifndef CRTMODULES_PLUGINS_CRTBERNFRAMEBUILDERMODULE_HPP_ +#define CRTMODULES_PLUGINS_CRTBERNFRAMEBUILDERMODULE_HPP_ #include "appfwk/DAQModule.hpp" #include "utilities/ReusableThread.hpp" @@ -22,19 +22,19 @@ namespace dunedaq::crtmodules { class SourceConcept; -class CRTBernReaderModule : public dunedaq::appfwk::DAQModule +class CRTBernFrameBuilderModule : public dunedaq::appfwk::DAQModule { public: /** - * @brief CRTBernReaderModule constructor + * @brief CRTBernFrameBuilderModule constructor * @param name DAQ module instance name */ - explicit CRTBernReaderModule(const std::string& name); + explicit CRTBernFrameBuilderModule(const std::string& name); - CRTBernReaderModule(const CRTBernReaderModule&) = delete; ///< CRTBernReaderModule is not copy-constructible - CRTBernReaderModule& operator=(const CRTBernReaderModule&) = delete; ///< CRTBernReaderModule is not copy-assignable - CRTBernReaderModule(CRTBernReaderModule&&) = delete; ///< CRTBernReaderModule is not move-constructible - CRTBernReaderModule& operator=(CRTBernReaderModule&&) = delete; ///< CRTBernReaderModule is not move-assignable + CRTBernFrameBuilderModule(const CRTBernFrameBuilderModule&) = delete; ///< CRTBernFrameBuilderModule is not copy-constructible + CRTBernFrameBuilderModule& operator=(const CRTBernFrameBuilderModule&) = delete; ///< CRTBernFrameBuilderModule is not copy-assignable + CRTBernFrameBuilderModule(CRTBernFrameBuilderModule&&) = delete; ///< CRTBernFrameBuilderModule is not move-constructible + CRTBernFrameBuilderModule& operator=(CRTBernFrameBuilderModule&&) = delete; ///< CRTBernFrameBuilderModule is not move-assignable /** * @brief Handles initialization on boot @@ -118,4 +118,4 @@ class CRTBernReaderModule : public dunedaq::appfwk::DAQModule }; } // namespace dunedaq::crtmodules -#endif // CRTMODULES_PLUGINS_CRTBERNREADERMODULE_HPP_ +#endif // CRTMODULES_PLUGINS_CRTBERNFRAMEBUILDERMODULE_HPP_ diff --git a/plugins/CRTGrenobleReaderModule.cpp b/plugins/CRTGrenobleFrameBuilderModule.cpp similarity index 82% rename from plugins/CRTGrenobleReaderModule.cpp rename to plugins/CRTGrenobleFrameBuilderModule.cpp index c6ff811..0089fbb 100644 --- a/plugins/CRTGrenobleReaderModule.cpp +++ b/plugins/CRTGrenobleFrameBuilderModule.cpp @@ -1,5 +1,5 @@ /** - * @file CRTGrenobleReaderModule.cpp + * @file CRTGrenobleFrameBuilderModule.cpp * * Reads data from the HW then puts it in a queue * @@ -8,11 +8,11 @@ * received with this code. */ -#include "CRTGrenobleReaderModule.hpp" +#include "CRTGrenobleFrameBuilderModule.hpp" #include "CreateSource.hpp" -#include "crtmodules/opmon/CRTGrenobleReaderModule.pb.h" +#include "crtmodules/opmon/CRTGrenobleFrameBuilderModule.pb.h" #include "datahandlinglibs/utils/RateLimiter.hpp" @@ -108,23 +108,23 @@ fake_data(fddetdataformats::CRTGrenobleFrame& frame, uint64_t& seq_id, uint64_t& fake_adc(frame); } -CRTGrenobleReaderModule::CRTGrenobleReaderModule(const std::string& name) +CRTGrenobleFrameBuilderModule::CRTGrenobleFrameBuilderModule(const std::string& name) : DAQModule(name) { - register_command("conf", &CRTGrenobleReaderModule::do_conf); - register_command("start", &CRTGrenobleReaderModule::do_start); - register_command("stop_trigger_sources", &CRTGrenobleReaderModule::do_stop); - register_command("scrap", &CRTGrenobleReaderModule::do_scrap); + register_command("conf", &CRTGrenobleFrameBuilderModule::do_conf); + register_command("start", &CRTGrenobleFrameBuilderModule::do_start); + register_command("stop_trigger_sources", &CRTGrenobleFrameBuilderModule::do_stop); + register_command("scrap", &CRTGrenobleFrameBuilderModule::do_scrap); } void -CRTGrenobleReaderModule::init(const std::shared_ptr mcfg) +CRTGrenobleFrameBuilderModule::init(const std::shared_ptr mcfg) { auto* mdal = mcfg->get_dal(get_name()); if (mdal->get_raw_data_callbacks().empty()) { auto err = dunedaq::datahandlinglibs::InitializationError(ERS_HERE, - "No outputs defined for CRT Grenoble reader in configuration."); + "No outputs defined for CRT Grenoble frame builder in configuration."); ers::fatal(err); throw err; } @@ -158,7 +158,7 @@ CRTGrenobleReaderModule::init(const std::shared_ptr Date: Mon, 23 Mar 2026 13:14:07 +0100 Subject: [PATCH 8/9] Change of topology. Switch to IOM. --- ...ader_test.py => crt_frame_builder_test.py} | 80 +++++++------ plugins/CRTBernFrameBuilderModule.cpp | 83 +++++++------- plugins/CRTBernFrameBuilderModule.hpp | 20 ++-- plugins/CRTGrenobleFrameBuilderModule.cpp | 83 +++++++------- plugins/CRTGrenobleFrameBuilderModule.hpp | 20 ++-- src/CreateSource.hpp | 51 --------- src/SourceConcept.hpp | 55 --------- src/SourceModel.hpp | 106 ------------------ 8 files changed, 138 insertions(+), 360 deletions(-) rename integtest/{crt_reader_test.py => crt_frame_builder_test.py} (82%) delete mode 100644 src/CreateSource.hpp delete mode 100644 src/SourceConcept.hpp delete mode 100644 src/SourceModel.hpp diff --git a/integtest/crt_reader_test.py b/integtest/crt_frame_builder_test.py similarity index 82% rename from integtest/crt_reader_test.py rename to integtest/crt_frame_builder_test.py index efbd8a3..1a4d803 100644 --- a/integtest/crt_reader_test.py +++ b/integtest/crt_frame_builder_test.py @@ -10,8 +10,8 @@ pytest_plugins = "integrationtest.integrationtest_drunc" # Values that help determine the running conditions -number_of_data_producers = 1 -number_of_readout_apps = 1 +number_of_data_producers = 4 +number_of_builder_apps = 2 run_duration = 20 # seconds # Default values for validation parameters @@ -26,7 +26,7 @@ common_config_obj = data_classes.drunc_config() common_config_obj.dro_map_config.n_streams = number_of_data_producers -common_config_obj.dro_map_config.n_apps = number_of_readout_apps +common_config_obj.dro_map_config.n_apps = number_of_builder_apps # 22-Jan-2026, KAB: added the use of the DAQSYSTEMTEST_SHARE env var as part of # specifying the location of the example-configs.data.xml file. This is more # reliable than using a relative path to a parallel directory with the daqsystemtest @@ -58,35 +58,6 @@ updates={"local_port": new_local_port, "remote_port": new_remote_port}, ) ) -onebyone_local_crt_bern_conf.config_substitutions.append( - data_classes.relationship_substitution( - obj_class="CRTFrameBuilderApplication", - obj_id="crt-data-source-01", - rel_name="detector_frame_builder", - replacement_object_class="CRTBernFrameBuilderConf", - replacement_object_id="def-crt-bern-frame-builder-conf" - ) -) -onebyone_local_crt_bern_conf.config_substitutions.append( - data_classes.list_element_substitution( - obj_class="ActionPlan", - obj_id="crt-readout-start", - rel_name="steps", - list_index=1, - replacement_object_class="DaqModulesGroupByType", - replacement_object_id="crt-bern-frame-builder-data-source-step" - ) -) -onebyone_local_crt_bern_conf.config_substitutions.append( - data_classes.list_element_substitution( - obj_class="ActionPlan", - obj_id="crt-readout-stop", - rel_name="steps", - list_index=0, - replacement_object_class="DaqModulesGroupByType", - replacement_object_id="crt-bern-frame-builder-data-source-step" - ) -) onebyone_local_crt_grenoble_conf = copy.deepcopy(common_config_obj) onebyone_local_crt_grenoble_conf.session = "local-socket-1x1-config" @@ -119,12 +90,13 @@ ) ) onebyone_local_crt_grenoble_conf.config_substitutions.append( - data_classes.relationship_substitution( + data_classes.list_element_substitution( obj_class="CRTFrameBuilderApplication", obj_id="crt-data-source-01", - rel_name="callback_desc", - replacement_object_class="DataMoveCallbackDescriptor", - replacement_object_id="crt-grenoble-raw-input" + rel_name="queue_rules", + list_index=0, + replacement_object_class="QueueConnectionRule", + replacement_object_id="crt-grenoble-sender-input-queue-rule" ) ) onebyone_local_crt_grenoble_conf.config_substitutions.append( @@ -147,6 +119,42 @@ replacement_object_id="crt-grenoble-frame-builder-data-source-step" ) ) +onebyone_local_crt_grenoble_conf.config_substitutions.append( + data_classes.relationship_substitution( + obj_class="DetectorStream", + obj_id="stream_1007", + rel_name="geo_id", + replacement_object_class="GeoId", + replacement_object_id="g_13_1_1_0" + ) +) +onebyone_local_crt_grenoble_conf.config_substitutions.append( + data_classes.relationship_substitution( + obj_class="DetectorStream", + obj_id="stream_1008", + rel_name="geo_id", + replacement_object_class="GeoId", + replacement_object_id="g_13_1_1_1" + ) +) +onebyone_local_crt_grenoble_conf.config_substitutions.append( + data_classes.relationship_substitution( + obj_class="DetectorStream", + obj_id="stream_1009", + rel_name="geo_id", + replacement_object_class="GeoId", + replacement_object_id="g_13_2_1_0" + ) +) +onebyone_local_crt_grenoble_conf.config_substitutions.append( + data_classes.relationship_substitution( + obj_class="DetectorStream", + obj_id="stream_1010", + rel_name="geo_id", + replacement_object_class="GeoId", + replacement_object_id="g_13_2_1_1" + ) +) onebyone_local_crt_grenoble_conf.config_substitutions.append( data_classes.relationship_substitution( diff --git a/plugins/CRTBernFrameBuilderModule.cpp b/plugins/CRTBernFrameBuilderModule.cpp index 7926b2d..2ef9489 100644 --- a/plugins/CRTBernFrameBuilderModule.cpp +++ b/plugins/CRTBernFrameBuilderModule.cpp @@ -10,19 +10,15 @@ #include "CRTBernFrameBuilderModule.hpp" -#include "CreateSource.hpp" - #include "crtmodules/opmon/CRTBernFrameBuilderModule.pb.h" #include "datahandlinglibs/utils/RateLimiter.hpp" #include "datahandlinglibs/DataHandlingIssues.hpp" -#include "appmodel/DataReaderModule.hpp" +#include "appmodel/DetectorFrameBuilderModule.hpp" #include "appmodel/SocketDetectorToDaqConnection.hpp" #include "appmodel/NWDetDataSender.hpp" -#include "fddetdataformats/CRTBernFrame.hpp" - #include "confmodel/QueueWithSourceId.hpp" #include "confmodel/DetectorStream.hpp" #include "confmodel/GeoId.hpp" @@ -33,6 +29,8 @@ #include #include +DUNE_DAQ_TYPESTRING(dunedaq::fddetdataformats::CRTBernFrame, "CRTBernFrame") + namespace dunedaq::crtmodules { /** @@ -119,21 +117,9 @@ CRTBernFrameBuilderModule::CRTBernFrameBuilderModule(const std::string& name) void CRTBernFrameBuilderModule::init(const std::shared_ptr mcfg) { - auto* mdal = mcfg->get_dal(get_name()); - - if (mdal->get_raw_data_callbacks().empty()) { - auto err = dunedaq::datahandlinglibs::InitializationError(ERS_HERE, - "No outputs defined for CRT Bern frame builder in configuration."); - ers::fatal(err); - throw err; - } - - for (auto* con : mdal->get_raw_data_callbacks()) { - auto ptr = m_sources[con->get_source_id()] = createSourceModel(con); - register_node(con->UID(), ptr); - } + auto* mdal = mcfg->get_dal(get_name()); - auto* d2d_conn = mdal->get_connections()[0]; // there's only 1 connection + auto* d2d_conn = mdal->get_connection(); auto* socket_d2d_conn = d2d_conn->cast(); if (socket_d2d_conn == nullptr) { auto err = datahandlinglibs::InitializationError(ERS_HERE, "Connection is not of type SocketDetectorToDaqConnection."); @@ -141,19 +127,28 @@ CRTBernFrameBuilderModule::init(const std::shared_ptrget_net_senders()) { - if (nw_sender->is_disabled(*(mcfg->get_session()))) { + auto* nw_sender = socket_d2d_conn->get_net_senders()[0]; // there's only 1 sender + + for (auto det_stream : nw_sender->get_streams()) { + if (det_stream->is_disabled(*(mcfg->get_session()))) { continue; } - for (auto det_stream : nw_sender->get_streams()) { - if (det_stream->is_disabled(*(mcfg->get_session()))) { - continue; - } + m_fake_stream_ids.push_back(det_stream->get_geo_id()->get_stream_id()); - m_fake_stream_ids[det_stream->get_source_id()] = det_stream->get_geo_id()->get_stream_id(); - } + m_producer_threads.emplace_back(std::make_unique()); } + + auto* con = mdal->get_outputs()[0]; // there's only 1 output + auto* queue = con->cast(); + if (queue == nullptr) { + auto err = datahandlinglibs::InitializationError(ERS_HERE, "Output is not of type Queue."); + ers::fatal(err); + throw err; + } + + auto connection_name = queue->UID(); + m_sender = get_iom_sender(connection_name); } void @@ -173,8 +168,10 @@ CRTBernFrameBuilderModule::do_scrap(const CommandData_t& /*obj*/) if (m_run_marker.load()) { TLOG() << "Raising stop through variables!"; set_running(false); - while (!m_producer_thread.get_readiness()) { - std::this_thread::sleep_for(std::chrono::milliseconds(10)); + for (const auto& producer : m_producer_threads) { + while (!producer->get_readiness()) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } } } else { TLOG_DEBUG(5) << "Already stopped!"; @@ -184,17 +181,15 @@ CRTBernFrameBuilderModule::do_scrap(const CommandData_t& /*obj*/) void CRTBernFrameBuilderModule::do_start(const CommandData_t& /*startobj*/) { - // Setup callbacks on all sourcemodels - for (auto& [sourceid, source] : m_sources) { - source->acquire_callback(); - } - enable_flow(); m_packet_count = 0; m_t0 = std::chrono::steady_clock::now(); - m_producer_thread.set_work(&CRTBernFrameBuilderModule::run_produce, this); + uint32_t i = 0; + for (auto& producer : m_producer_threads) { + producer->set_work(&CRTBernFrameBuilderModule::run_produce, this, m_fake_stream_ids[i++]); + } } void @@ -219,7 +214,7 @@ CRTBernFrameBuilderModule::generate_opmon_data() } void -CRTBernFrameBuilderModule::run_produce() +CRTBernFrameBuilderModule::run_produce(uint32_t fake_stream_id) { TLOG() << "Producer thread started..."; // TODO (DTE): Debug log instead @@ -230,17 +225,15 @@ CRTBernFrameBuilderModule::run_produce() datahandlinglibs::RateLimiter rate_limiter(m_configured_packet_rate_khz); while (m_run_marker.load()) { - // Create a fake packet for each stream - for (const auto& [sid, source] : m_sources) { - fake_data(frame, seq_id, timestamp, m_fake_stream_ids[sid]); // TODO: To be filled by the CRT experts - - if (m_enable_flow.load()) [[likely]] { - source->handle_payload(reinterpret_cast(&frame), sizeof(frame)); - ++m_packet_count; - } + // Create a fake packet for stream + fake_data(frame, seq_id, timestamp, fake_stream_id); // TODO: To be filled by the CRT experts - rate_limiter.limit(); + if (m_enable_flow.load()) [[likely]] { + m_sender->try_send(std::move(frame), iomanager::Sender::s_no_block); + ++m_packet_count; } + + rate_limiter.limit(); } TLOG() << "Producer thread joins... "; // TODO (DTE): Debug log instead diff --git a/plugins/CRTBernFrameBuilderModule.hpp b/plugins/CRTBernFrameBuilderModule.hpp index 266d3e0..1a90558 100644 --- a/plugins/CRTBernFrameBuilderModule.hpp +++ b/plugins/CRTBernFrameBuilderModule.hpp @@ -13,6 +13,7 @@ #include "appfwk/DAQModule.hpp" #include "utilities/ReusableThread.hpp" +#include "fddetdataformats/CRTBernFrame.hpp" #include #include @@ -20,8 +21,6 @@ namespace dunedaq::crtmodules { -class SourceConcept; - class CRTBernFrameBuilderModule : public dunedaq::appfwk::DAQModule { public: @@ -52,9 +51,10 @@ class CRTBernFrameBuilderModule : public dunedaq::appfwk::DAQModule void generate_opmon_data() override; /** - * @brief Raw data produce thread function + * @brief Data produce thread function + * @param fake_stream_id Fake packet stream ID */ - void run_produce(); + void run_produce(uint32_t fake_stream_id); /** * @brief Sets run marker @@ -84,21 +84,19 @@ class CRTBernFrameBuilderModule : public dunedaq::appfwk::DAQModule // PRODUCER /** - * @brief Raw data producer thread + * @brief Data producer threads */ - utilities::ReusableThread m_producer_thread; + std::vector> m_producer_threads; - using sid_to_source_map_t = std::map>; // NOLINT(build/unsigned) /** - * @brief Data sources + * @brief Data sender */ - sid_to_source_map_t m_sources; + std::shared_ptr> m_sender; - using sid_to_fake_stream_id_map_t = std::map; // NOLINT(build/unsigned) /** * @brief Fake packet stream IDs */ - sid_to_fake_stream_id_map_t m_fake_stream_ids; + std::vector m_fake_stream_ids; // NOLINT(build/unsigned) /** * @brief Configured packet transmission rate in kHz diff --git a/plugins/CRTGrenobleFrameBuilderModule.cpp b/plugins/CRTGrenobleFrameBuilderModule.cpp index 0089fbb..08ee44f 100644 --- a/plugins/CRTGrenobleFrameBuilderModule.cpp +++ b/plugins/CRTGrenobleFrameBuilderModule.cpp @@ -10,18 +10,14 @@ #include "CRTGrenobleFrameBuilderModule.hpp" -#include "CreateSource.hpp" - #include "crtmodules/opmon/CRTGrenobleFrameBuilderModule.pb.h" #include "datahandlinglibs/utils/RateLimiter.hpp" -#include "appmodel/DataReaderModule.hpp" +#include "appmodel/DetectorFrameBuilderModule.hpp" #include "appmodel/SocketDetectorToDaqConnection.hpp" #include "appmodel/NWDetDataSender.hpp" -#include "fddetdataformats/CRTGrenobleFrame.hpp" - #include "confmodel/QueueWithSourceId.hpp" #include "confmodel/DetectorStream.hpp" #include "confmodel/GeoId.hpp" @@ -34,6 +30,8 @@ #include #include +DUNE_DAQ_TYPESTRING(dunedaq::fddetdataformats::CRTGrenobleFrame, "CRTGrenobleFrame") + namespace dunedaq::crtmodules { /** @@ -120,21 +118,9 @@ CRTGrenobleFrameBuilderModule::CRTGrenobleFrameBuilderModule(const std::string& void CRTGrenobleFrameBuilderModule::init(const std::shared_ptr mcfg) { - auto* mdal = mcfg->get_dal(get_name()); - - if (mdal->get_raw_data_callbacks().empty()) { - auto err = dunedaq::datahandlinglibs::InitializationError(ERS_HERE, - "No outputs defined for CRT Grenoble frame builder in configuration."); - ers::fatal(err); - throw err; - } - - for (auto* con : mdal->get_raw_data_callbacks()) { - auto ptr = m_sources[con->get_source_id()] = createSourceModel(con); - register_node(con->UID(), ptr); - } + auto* mdal = mcfg->get_dal(get_name()); - auto* d2d_conn = mdal->get_connections()[0]; // there's only 1 connection + auto* d2d_conn = mdal->get_connection(); auto* socket_d2d_conn = d2d_conn->cast(); if (socket_d2d_conn == nullptr) { auto err = datahandlinglibs::InitializationError(ERS_HERE, "Connection is not of type SocketDetectorToDaqConnection."); @@ -142,19 +128,28 @@ CRTGrenobleFrameBuilderModule::init(const std::shared_ptrget_net_senders()) { - if (nw_sender->is_disabled(*(mcfg->get_session()))) { + auto* nw_sender = socket_d2d_conn->get_net_senders()[0]; // there's only 1 sender + + for (auto det_stream : nw_sender->get_streams()) { + if (det_stream->is_disabled(*(mcfg->get_session()))) { continue; } - for (auto det_stream : nw_sender->get_streams()) { - if (det_stream->is_disabled(*(mcfg->get_session()))) { - continue; - } + m_fake_stream_ids.push_back(det_stream->get_geo_id()->get_stream_id()); - m_fake_stream_ids[det_stream->get_source_id()] = det_stream->get_geo_id()->get_stream_id(); - } + m_producer_threads.emplace_back(std::make_unique()); } + + auto* con = mdal->get_outputs()[0]; // there's only 1 output + auto* queue = con->cast(); + if (queue == nullptr) { + auto err = datahandlinglibs::InitializationError(ERS_HERE, "Output is not of type Queue."); + ers::fatal(err); + throw err; + } + + auto connection_name = queue->UID(); + m_sender = get_iom_sender(connection_name); } void @@ -174,8 +169,10 @@ CRTGrenobleFrameBuilderModule::do_scrap(const CommandData_t& /*obj*/) if (m_run_marker.load()) { TLOG() << "Raising stop through variables!"; set_running(false); - while (!m_producer_thread.get_readiness()) { - std::this_thread::sleep_for(std::chrono::milliseconds(10)); + for (const auto& producer : m_producer_threads) { + while (!producer->get_readiness()) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } } } else { TLOG_DEBUG(5) << "Already stopped!"; @@ -185,18 +182,16 @@ CRTGrenobleFrameBuilderModule::do_scrap(const CommandData_t& /*obj*/) void CRTGrenobleFrameBuilderModule::do_start(const CommandData_t& /*startobj*/) { - // Setup callbacks on all sourcemodels - for (auto& [sourceid, source] : m_sources) { - source->acquire_callback(); - } - m_packet_count = 0; m_t0 = std::chrono::steady_clock::now(); enable_flow(); - m_producer_thread.set_work(&CRTGrenobleFrameBuilderModule::run_produce, this); + uint32_t i = 0; + for (auto& producer : m_producer_threads) { + producer->set_work(&CRTGrenobleFrameBuilderModule::run_produce, this, m_fake_stream_ids[i++]); + } } void @@ -221,7 +216,7 @@ CRTGrenobleFrameBuilderModule::generate_opmon_data() } void -CRTGrenobleFrameBuilderModule::run_produce() +CRTGrenobleFrameBuilderModule::run_produce(uint32_t fake_stream_id) { TLOG() << "Producer thread started..."; // TODO (DTE): Debug log instead @@ -232,17 +227,15 @@ CRTGrenobleFrameBuilderModule::run_produce() datahandlinglibs::RateLimiter rate_limiter(m_configured_packet_rate_khz); while (m_run_marker.load()) { - // Create a fake packet for each stream - for (const auto& [sid, source] : m_sources) { - fake_data(frame, seq_id, timestamp, m_fake_stream_ids[sid]); // TODO: To be filled by the CRT experts - - if (m_enable_flow.load()) [[likely]] { - source->handle_payload(reinterpret_cast(&frame), sizeof(frame)); - ++m_packet_count; - } + // Create a fake packet for stream + fake_data(frame, seq_id, timestamp, fake_stream_id); // TODO: To be filled by the CRT experts - rate_limiter.limit(); + if (m_enable_flow.load()) [[likely]] { + m_sender->try_send(std::move(frame), iomanager::Sender::s_no_block); + ++m_packet_count; } + + rate_limiter.limit(); } TLOG() << "Producer thread joins... "; // TODO (DTE): Debug log instead diff --git a/plugins/CRTGrenobleFrameBuilderModule.hpp b/plugins/CRTGrenobleFrameBuilderModule.hpp index afdd3ab..a9d2391 100644 --- a/plugins/CRTGrenobleFrameBuilderModule.hpp +++ b/plugins/CRTGrenobleFrameBuilderModule.hpp @@ -13,6 +13,7 @@ #include "appfwk/DAQModule.hpp" #include "utilities/ReusableThread.hpp" +#include "fddetdataformats/CRTGrenobleFrame.hpp" #include #include @@ -20,8 +21,6 @@ namespace dunedaq::crtmodules { -class SourceConcept; - class CRTGrenobleFrameBuilderModule : public dunedaq::appfwk::DAQModule { public: @@ -52,9 +51,10 @@ class CRTGrenobleFrameBuilderModule : public dunedaq::appfwk::DAQModule void generate_opmon_data() override; /** - * @brief Raw data produce thread function + * @brief Data produce thread function + * @param fake_stream_id Fake packet stream ID */ - void run_produce(); + void run_produce(uint32_t fake_stream_id); /** * @brief Sets run marker @@ -84,21 +84,19 @@ class CRTGrenobleFrameBuilderModule : public dunedaq::appfwk::DAQModule // PRODUCER /** - * @brief Raw data producer thread + * @brief Data producer threads */ - utilities::ReusableThread m_producer_thread; + std::vector> m_producer_threads; - using sid_to_source_map_t = std::map>; // NOLINT(build/unsigned) /** - * @brief Data sources + * @brief Data sender */ - sid_to_source_map_t m_sources; + std::shared_ptr> m_sender; - using sid_to_fake_stream_id_map_t = std::map; // NOLINT(build/unsigned) /** * @brief Fake packet stream IDs */ - sid_to_fake_stream_id_map_t m_fake_stream_ids; + std::vector m_fake_stream_ids; // NOLINT(build/unsigned) /** * @brief Configured packet transmission rate in kHz diff --git a/src/CreateSource.hpp b/src/CreateSource.hpp deleted file mode 100644 index 2612519..0000000 --- a/src/CreateSource.hpp +++ /dev/null @@ -1,51 +0,0 @@ -/** - * @file CreateSource.hpp Specific SourceConcept creator. - * - * This is part of the DUNE DAQ , copyright 2020. - * Licensing/copyright details are in the COPYING file that you should have - * received with this code. - */ -#ifndef CRTMODULES_SRC_CREATESOURCE_HPP_ -#define CRTMODULES_SRC_CREATESOURCE_HPP_ - -#include "SourceConcept.hpp" -#include "SourceModel.hpp" -#include "datahandlinglibs/DataHandlingIssues.hpp" - -#include "fdreadoutlibs/CRTBernTypeAdapter.hpp" -#include "fdreadoutlibs/CRTGrenobleTypeAdapter.hpp" - -#include -#include - -namespace dunedaq { - -DUNE_DAQ_TYPESTRING(dunedaq::fdreadoutlibs::types::CRTBernTypeAdapter, "CRTBernFrame") -DUNE_DAQ_TYPESTRING(dunedaq::fdreadoutlibs::types::CRTGrenobleTypeAdapter, "CRTGrenobleFrame") - -namespace crtmodules { - -std::shared_ptr -createSourceModel(const appmodel::DataMoveCallbackConf* conf) -{ - auto datatype = conf->get_data_type(); - TLOG() << "Choosing specializations for SourceModel for output connection " - << " [uid:" << conf->UID() << " , data_type:" << datatype << ']'; - - if (datatype.find("CRTBernFrame") != std::string::npos) { - auto source_model = std::make_shared>(); - source_model->set_sink_config(conf); - return source_model; - } else if (datatype.find("CRTGrenobleFrame") != std::string::npos) { - auto source_model = std::make_shared>(); - source_model->set_sink_config(conf); - return source_model; - } - - return nullptr; -} - -} // namespace crtmodules -} // namespace dunedaq - -#endif // CRTMODULES_SRC_CREATESOURCE_HPP_ diff --git a/src/SourceConcept.hpp b/src/SourceConcept.hpp deleted file mode 100644 index 5caa281..0000000 --- a/src/SourceConcept.hpp +++ /dev/null @@ -1,55 +0,0 @@ - -/** - * @file SourceConcept.hpp SourceConcept for constructors and - * forwarding command args. Enforces the implementation to - * queue in UDP JUMBO frames to be translated to TypeAdapters and - * send them to corresponding sinks. - * - * This is part of the DUNE DAQ , copyright 2020. - * Licensing/copyright details are in the COPYING file that you should have - * received with this code. - */ -#ifndef CRTMODULES_SRC_SOURCECONCEPT_HPP_ -#define CRTMODULES_SRC_SOURCECONCEPT_HPP_ - -//#include "DefaultParserImpl.hpp" - -#include "opmonlib/MonitorableObject.hpp" -#include "appfwk/DAQModule.hpp" -#include "appmodel/DataMoveCallbackConf.hpp" -//#include "packetformat/detail/block_parser.hpp" -#include - -#include -#include -#include - -namespace dunedaq::crtmodules { - - class SourceConcept : public opmonlib::MonitorableObject - { - public: - SourceConcept() {} - virtual ~SourceConcept() {} - - SourceConcept(const SourceConcept&) = delete; ///< SourceConcept is not copy-constructible - SourceConcept& operator=(const SourceConcept&) = delete; ///< SourceConcept is not copy-assignable - SourceConcept(SourceConcept&&) = delete; ///< SourceConcept is not move-constructible - SourceConcept& operator=(SourceConcept&&) = delete; ///< SourceConcept is not move-assignable - - // virtual void init(const nlohmann::json& args) = 0; - virtual void acquire_callback() = 0; - // virtual void conf(const nlohmann::json& args) = 0; - // virtual void start(const nlohmann::json& args) = 0; - // virtual void stop(const nlohmann::json& args) = 0; - - virtual bool handle_payload(char* message, std::size_t size) = 0; - - void set_sink_config(const appmodel::DataMoveCallbackConf* sink_conf) { m_sink_conf = sink_conf; } - - const appmodel::DataMoveCallbackConf* m_sink_conf; - }; - -} // namespace dunedaq::crtmodules - -#endif // CRTMODULES_SRC_SOURCECONCEPT_HPP_ diff --git a/src/SourceModel.hpp b/src/SourceModel.hpp deleted file mode 100644 index 167f003..0000000 --- a/src/SourceModel.hpp +++ /dev/null @@ -1,106 +0,0 @@ -/** - * @file SourceModel.hpp FELIX CR's ELink concept wrapper - * - * This is part of the DUNE DAQ , copyright 2020. - * Licensing/copyright details are in the COPYING file that you should have - * received with this code. - */ -#ifndef CRTMODULES_SRC_SOURCEMODEL_HPP_ -#define CRTMODULES_SRC_SOURCEMODEL_HPP_ - -#include "SourceConcept.hpp" - - -#include "iomanager/IOManager.hpp" -#include "iomanager/Sender.hpp" -#include "logging/Logging.hpp" - -#include "crtmodules/opmon/SourceModel.pb.h" - -// #include "datahandlinglibs/utils/ReusableThread.hpp" -#include "datahandlinglibs/DataMoveCallbackRegistry.hpp" -#include "datahandlinglibs/utils/BufferCopy.hpp" - -// #include -// #include -// NOLINT(build/unsigned) -#include -#include -#include -#include -#include - -namespace dunedaq::crtmodules { - -template -class SourceModel : public SourceConcept -{ -public: - using sink_t = iomanager::SenderConcept; - using inherited = SourceConcept; - using data_t = nlohmann::json; - - /** - * @brief Buffer size based on TargetPayloadType - */ - static constexpr auto buffer_size = sizeof(TargetPayloadType); - - /** - * @brief SourceModel Constructor - * @param name Instance name for this SourceModel instance - */ - SourceModel() - : SourceConcept() - {} - ~SourceModel() {} - - void acquire_callback() override - { - if (m_callback_is_acquired) { - TLOG_DEBUG(5) << "SourceModel callback is already acquired!"; - } else { - // Getting DataMoveCBRegistry - auto dmcbr = datahandlinglibs::DataMoveCallbackRegistry::get(); - m_sink_callback = dmcbr->get_callback(inherited::m_sink_conf); - m_callback_is_acquired = true; - } - } - - bool handle_payload(char* message, std::size_t size) override - { - bool push_out = true; - if (push_out) { - - TargetPayloadType& target_payload = *reinterpret_cast(message); - (*m_sink_callback)(std::move(target_payload)); - - } else { - TargetPayloadType target_payload; - uint32_t bytes_copied = 0; // NOLINT(build/unsigned) - datahandlinglibs::buffer_copy(message, size, static_cast(&target_payload), bytes_copied, sizeof(target_payload)); - } - - return true; - } - - void generate_opmon_data() override { - - opmon::SourceInfo info; - info.set_dropped_frames( m_dropped_packets.load() ); - - publish( std::move(info) ); - } - -private: - // Callback internals - bool m_callback_is_acquired{ false }; - using sink_cb_t = std::shared_ptr>; - sink_cb_t m_sink_callback; - - std::atomic m_dropped_packets{0}; // NOLINT(build/unsigned) - -}; - -} // namespace dunedaq::crtmodules - -#endif // CRTMODULES_SRC_SOURCEMODEL_HPP_ From a774e28d67534556999b8763208183b81a220e1b Mon Sep 17 00:00:00 2001 From: Deniz Tuana Ergonul Uzun Date: Mon, 23 Mar 2026 15:12:52 +0100 Subject: [PATCH 9/9] Lint changes --- plugins/CRTBernFrameBuilderModule.cpp | 4 ++-- plugins/CRTBernFrameBuilderModule.hpp | 3 ++- plugins/CRTGrenobleFrameBuilderModule.cpp | 4 ++-- plugins/CRTGrenobleFrameBuilderModule.hpp | 3 ++- 4 files changed, 8 insertions(+), 6 deletions(-) diff --git a/plugins/CRTBernFrameBuilderModule.cpp b/plugins/CRTBernFrameBuilderModule.cpp index 2ef9489..cfd7d19 100644 --- a/plugins/CRTBernFrameBuilderModule.cpp +++ b/plugins/CRTBernFrameBuilderModule.cpp @@ -186,7 +186,7 @@ CRTBernFrameBuilderModule::do_start(const CommandData_t& /*startobj*/) m_packet_count = 0; m_t0 = std::chrono::steady_clock::now(); - uint32_t i = 0; + uint32_t i = 0; // NOLINT(build/unsigned) for (auto& producer : m_producer_threads) { producer->set_work(&CRTBernFrameBuilderModule::run_produce, this, m_fake_stream_ids[i++]); } @@ -214,7 +214,7 @@ CRTBernFrameBuilderModule::generate_opmon_data() } void -CRTBernFrameBuilderModule::run_produce(uint32_t fake_stream_id) +CRTBernFrameBuilderModule::run_produce(uint32_t fake_stream_id) // NOLINT(build/unsigned) { TLOG() << "Producer thread started..."; // TODO (DTE): Debug log instead diff --git a/plugins/CRTBernFrameBuilderModule.hpp b/plugins/CRTBernFrameBuilderModule.hpp index 1a90558..db8c8a4 100644 --- a/plugins/CRTBernFrameBuilderModule.hpp +++ b/plugins/CRTBernFrameBuilderModule.hpp @@ -18,6 +18,7 @@ #include #include #include +#include namespace dunedaq::crtmodules { @@ -54,7 +55,7 @@ class CRTBernFrameBuilderModule : public dunedaq::appfwk::DAQModule * @brief Data produce thread function * @param fake_stream_id Fake packet stream ID */ - void run_produce(uint32_t fake_stream_id); + void run_produce(uint32_t fake_stream_id); // NOLINT(build/unsigned) /** * @brief Sets run marker diff --git a/plugins/CRTGrenobleFrameBuilderModule.cpp b/plugins/CRTGrenobleFrameBuilderModule.cpp index 08ee44f..f7c612e 100644 --- a/plugins/CRTGrenobleFrameBuilderModule.cpp +++ b/plugins/CRTGrenobleFrameBuilderModule.cpp @@ -188,7 +188,7 @@ CRTGrenobleFrameBuilderModule::do_start(const CommandData_t& /*startobj*/) enable_flow(); - uint32_t i = 0; + uint32_t i = 0; // NOLINT(build/unsigned) for (auto& producer : m_producer_threads) { producer->set_work(&CRTGrenobleFrameBuilderModule::run_produce, this, m_fake_stream_ids[i++]); } @@ -216,7 +216,7 @@ CRTGrenobleFrameBuilderModule::generate_opmon_data() } void -CRTGrenobleFrameBuilderModule::run_produce(uint32_t fake_stream_id) +CRTGrenobleFrameBuilderModule::run_produce(uint32_t fake_stream_id) // NOLINT(build/unsigned) { TLOG() << "Producer thread started..."; // TODO (DTE): Debug log instead diff --git a/plugins/CRTGrenobleFrameBuilderModule.hpp b/plugins/CRTGrenobleFrameBuilderModule.hpp index a9d2391..f8ef033 100644 --- a/plugins/CRTGrenobleFrameBuilderModule.hpp +++ b/plugins/CRTGrenobleFrameBuilderModule.hpp @@ -18,6 +18,7 @@ #include #include #include +#include namespace dunedaq::crtmodules { @@ -54,7 +55,7 @@ class CRTGrenobleFrameBuilderModule : public dunedaq::appfwk::DAQModule * @brief Data produce thread function * @param fake_stream_id Fake packet stream ID */ - void run_produce(uint32_t fake_stream_id); + void run_produce(uint32_t fake_stream_id); // NOLINT(build/unsigned) /** * @brief Sets run marker