diff --git a/CMakeLists.txt b/CMakeLists.txt index ab00807..0afbfed 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -57,8 +57,8 @@ daq_add_python_bindings(*.cpp ) # See https://dune-daq-sw.readthedocs.io/en/latest/packages/daq-cmake/#daq_add_plugin #daq_add_plugin(CRTControllerModule duneDAQModule LINK_LIBRARIES crtmodules appfwk::appfwk) # Replace appfwk library with a more specific library when appropriate -daq_add_plugin(CRTGrenobleReaderModule duneDAQModule LINK_LIBRARIES crtmodules appfwk::appfwk fdreadoutlibs::fdreadoutlibs opmonlib::opmonlib) -daq_add_plugin(CRTBernReaderModule duneDAQModule LINK_LIBRARIES crtmodules appfwk::appfwk fdreadoutlibs::fdreadoutlibs opmonlib::opmonlib) +daq_add_plugin(CRTGrenobleFrameBuilderModule duneDAQModule LINK_LIBRARIES crtmodules appfwk::appfwk fdreadoutlibs::fdreadoutlibs opmonlib::opmonlib) +daq_add_plugin(CRTBernFrameBuilderModule duneDAQModule LINK_LIBRARIES crtmodules appfwk::appfwk fdreadoutlibs::fdreadoutlibs opmonlib::opmonlib) ############################################################################## diff --git a/integtest/crt_reader_test.py b/integtest/crt_frame_builder_test.py similarity index 62% rename from integtest/crt_reader_test.py rename to integtest/crt_frame_builder_test.py index 61f4b8c..1a4d803 100644 --- a/integtest/crt_reader_test.py +++ b/integtest/crt_frame_builder_test.py @@ -10,8 +10,8 @@ pytest_plugins = "integrationtest.integrationtest_drunc" # Values that help determine the running conditions -number_of_data_producers = 1 -number_of_readout_apps = 1 +number_of_data_producers = 4 +number_of_builder_apps = 2 run_duration = 20 # seconds # Default values for validation parameters @@ -26,7 +26,7 @@ common_config_obj = data_classes.drunc_config() common_config_obj.dro_map_config.n_streams = number_of_data_producers -common_config_obj.dro_map_config.n_apps = number_of_readout_apps +common_config_obj.dro_map_config.n_apps = number_of_builder_apps # 22-Jan-2026, KAB: added the use of the DAQSYSTEMTEST_SHARE env var as part of # specifying the location of the example-configs.data.xml file. This is more # reliable than using a relative path to a parallel directory with the daqsystemtest @@ -40,62 +40,63 @@ onebyone_local_crt_bern_conf = copy.deepcopy(common_config_obj) onebyone_local_crt_bern_conf.session = "local-socket-1x1-config" -new_port = find_free_port() +new_local_port = find_free_port() +new_remote_port = find_free_port() onebyone_local_crt_bern_conf.config_substitutions.append( data_classes.attribute_substitution( obj_class="SocketDataSender", obj_id="socket_sender_crt", - updates={"port": new_port}, + updates={"local_port": new_local_port, "remote_port": new_remote_port}, ) ) +new_local_port = find_free_port() +new_remote_port = find_free_port() onebyone_local_crt_bern_conf.config_substitutions.append( - data_classes.relationship_substitution( - obj_class="CRTReaderApplication", - obj_id="crt-data-source-01", - rel_name="data_reader", - replacement_object_class="CRTBernReaderConf", - replacement_object_id="def-crt-bern-receiver-conf" - ) -) -onebyone_local_crt_bern_conf.config_substitutions.append( - data_classes.list_element_substitution( - obj_class="ActionPlan", - obj_id="crt-readout-start", - rel_name="steps", - list_index=1, - replacement_object_class="DaqModulesGroupByType", - replacement_object_id="crt-bern-reader-data-source-step" - ) -) -onebyone_local_crt_bern_conf.config_substitutions.append( - data_classes.list_element_substitution( - obj_class="ActionPlan", - obj_id="crt-readout-stop", - rel_name="steps", - list_index=0, - replacement_object_class="DaqModulesGroupByType", - replacement_object_id="crt-bern-reader-data-source-step" + data_classes.attribute_substitution( + obj_class="SocketDataSender", + obj_id="socket_sender_crt_2", + updates={"local_port": new_local_port, "remote_port": new_remote_port}, ) ) onebyone_local_crt_grenoble_conf = copy.deepcopy(common_config_obj) onebyone_local_crt_grenoble_conf.session = "local-socket-1x1-config" -new_port = find_free_port() +new_local_port = find_free_port() +new_remote_port = find_free_port() onebyone_local_crt_grenoble_conf.config_substitutions.append( data_classes.attribute_substitution( obj_class="SocketDataSender", obj_id="socket_sender_crt", - updates={"port": new_port}, + updates={"local_port": new_local_port, "remote_port": new_remote_port}, + ) +) +new_local_port = find_free_port() +new_remote_port = find_free_port() +onebyone_local_crt_grenoble_conf.config_substitutions.append( + data_classes.attribute_substitution( + obj_class="SocketDataSender", + obj_id="socket_sender_crt_2", + updates={"local_port": new_local_port, "remote_port": new_remote_port}, ) ) onebyone_local_crt_grenoble_conf.config_substitutions.append( data_classes.relationship_substitution( - obj_class="CRTReaderApplication", + obj_class="CRTFrameBuilderApplication", + obj_id="crt-data-source-01", + rel_name="detector_frame_builder", + replacement_object_class="CRTGrenobleFrameBuilderConf", + replacement_object_id="def-crt-grenoble-frame-builder-conf" + ) +) +onebyone_local_crt_grenoble_conf.config_substitutions.append( + data_classes.list_element_substitution( + obj_class="CRTFrameBuilderApplication", obj_id="crt-data-source-01", - rel_name="data_reader", - replacement_object_class="CRTGrenobleReaderConf", - replacement_object_id="def-crt-grenoble-receiver-conf" + rel_name="queue_rules", + list_index=0, + replacement_object_class="QueueConnectionRule", + replacement_object_id="crt-grenoble-sender-input-queue-rule" ) ) onebyone_local_crt_grenoble_conf.config_substitutions.append( @@ -105,7 +106,7 @@ rel_name="steps", list_index=1, replacement_object_class="DaqModulesGroupByType", - replacement_object_id="crt-grenoble-reader-data-source-step" + replacement_object_id="crt-grenoble-frame-builder-data-source-step" ) ) onebyone_local_crt_grenoble_conf.config_substitutions.append( @@ -115,30 +116,46 @@ rel_name="steps", list_index=0, replacement_object_class="DaqModulesGroupByType", - replacement_object_id="crt-grenoble-reader-data-source-step" + replacement_object_id="crt-grenoble-frame-builder-data-source-step" ) ) - onebyone_local_crt_grenoble_conf.config_substitutions.append( - data_classes.list_element_substitution( - obj_class="CRTReaderApplication", - obj_id="crt-data-source-01", - rel_name="queue_rules", - list_index=0, - replacement_object_class="QueueConnectionRule", - replacement_object_id="crt-grenoble-raw-data-rule" + data_classes.relationship_substitution( + obj_class="DetectorStream", + obj_id="stream_1007", + rel_name="geo_id", + replacement_object_class="GeoId", + replacement_object_id="g_13_1_1_0" ) ) onebyone_local_crt_grenoble_conf.config_substitutions.append( - data_classes.list_element_substitution( - obj_class="ReadoutApplication", - obj_id="socket-ru-01", - rel_name="queue_rules", - list_index=1, - replacement_object_class="QueueConnectionRule", - replacement_object_id="crt-grenoble-callback-raw-data-rule" + data_classes.relationship_substitution( + obj_class="DetectorStream", + obj_id="stream_1008", + rel_name="geo_id", + replacement_object_class="GeoId", + replacement_object_id="g_13_1_1_1" + ) +) +onebyone_local_crt_grenoble_conf.config_substitutions.append( + data_classes.relationship_substitution( + obj_class="DetectorStream", + obj_id="stream_1009", + rel_name="geo_id", + replacement_object_class="GeoId", + replacement_object_id="g_13_2_1_0" + ) +) +onebyone_local_crt_grenoble_conf.config_substitutions.append( + data_classes.relationship_substitution( + obj_class="DetectorStream", + obj_id="stream_1010", + rel_name="geo_id", + replacement_object_class="GeoId", + replacement_object_id="g_13_2_1_1" ) ) + onebyone_local_crt_grenoble_conf.config_substitutions.append( data_classes.relationship_substitution( obj_class="ReadoutApplication", @@ -148,6 +165,15 @@ replacement_object_id="def-crt-grenoble-link-handler" ) ) +onebyone_local_crt_grenoble_conf.config_substitutions.append( + data_classes.relationship_substitution( + obj_class="ReadoutApplication", + obj_id="socket-ru-01", + rel_name="callback_desc", + replacement_object_class="DataMoveCallbackDescriptor", + replacement_object_id="crt-grenoble-raw-input" + ) +) confgen_arguments = { "Local CRT Bern 1x1 Conf": onebyone_local_crt_bern_conf, diff --git a/plugins/CRTBernFrameBuilderModule.cpp b/plugins/CRTBernFrameBuilderModule.cpp new file mode 100644 index 0000000..cfd7d19 --- /dev/null +++ b/plugins/CRTBernFrameBuilderModule.cpp @@ -0,0 +1,251 @@ +/** + * @file CRTBernFrameBuilderModule.cpp + + * Reads data from the HW then puts it in a queue + * + * This is part of the DUNE DAQ Software Suite, copyright 2020. + * Licensing/copyright details are in the COPYING file that you should have + * received with this code. + */ + +#include "CRTBernFrameBuilderModule.hpp" + +#include "crtmodules/opmon/CRTBernFrameBuilderModule.pb.h" + +#include "datahandlinglibs/utils/RateLimiter.hpp" +#include "datahandlinglibs/DataHandlingIssues.hpp" + +#include "appmodel/DetectorFrameBuilderModule.hpp" +#include "appmodel/SocketDetectorToDaqConnection.hpp" +#include "appmodel/NWDetDataSender.hpp" + +#include "confmodel/QueueWithSourceId.hpp" +#include "confmodel/DetectorStream.hpp" +#include "confmodel/GeoId.hpp" + +#include "detdataformats/DetID.hpp" + +#include +#include +#include + +DUNE_DAQ_TYPESTRING(dunedaq::fddetdataformats::CRTBernFrame, "CRTBernFrame") + +namespace dunedaq::crtmodules { + +/** + * @brief Maximum packet sequence ID before reset + */ +constexpr uint64_t max_seq_id = 4095; // NOLINT(build/unsigned) + +/** + * @brief Fake packet detector ID + */ +constexpr uint8_t fake_det_id = static_cast(detdataformats::DetID::Subdetector::kVD_BernCRT); // NOLINT(build/unsigned) + +/** + * @brief Fake packet block length + */ +constexpr uint64_t fake_block_length = 0x382; // NOLINT(build/unsigned) + +/** + * @brief Calculate the next fake sequence ID for a packet + * @param seq_id Fake packet sequence ID + */ +void +fake_sequence_id(uint64_t& seq_id) // NOLINT(build/unsigned) +{ + seq_id = (seq_id == max_seq_id ? 0 : seq_id+1); +} + +/** + * @brief Calculate the next fake timestamp for a packet + * @param timestamp Fake packet timestamp + */ +void +fake_timestamp(uint64_t& timestamp) // NOLINT(build/unsigned) +{ + auto time_now = std::chrono::steady_clock::now().time_since_epoch(); + uint64_t current_time = // NOLINT (build/unsigned) + std::chrono::duration_cast(time_now).count(); + timestamp = current_time / 16; // 625/10000 (same as 625*us/10) +} + +/** + * @brief Fake ADC of the given packet + * @param frame Fake packet + */ +void +fake_adc(fddetdataformats::CRTBernFrame& frame) +{ + for (int channel = 0; channel < fddetdataformats::CRTBernFrame::s_num_channels; ++channel) { + frame.set_adc(channel, 0); + } +} + +/** + * @brief Create a fake packet + * @param frame Fake packet + * @param seq_id Fake packet sequence ID + * @param timestamp Fake packet timestamp + * @param stream_id Fake packet stream ID + */ +void +fake_data(fddetdataformats::CRTBernFrame& frame, uint64_t& seq_id, uint64_t& timestamp, uint32_t stream_id) // NOLINT(build/unsigned) +{ + frame.daq_header.det_id = fake_det_id & 0x3f; //6 bits for det id + frame.daq_header.crate_id = 1; + frame.daq_header.slot_id = 1; + frame.daq_header.stream_id = stream_id; + fake_sequence_id(seq_id); + frame.daq_header.seq_id = seq_id; + frame.daq_header.block_length = fake_block_length; + fake_timestamp(timestamp); + frame.daq_header.timestamp = timestamp; + fake_adc(frame); +} + +CRTBernFrameBuilderModule::CRTBernFrameBuilderModule(const std::string& name) + : DAQModule(name) +{ + register_command("conf", &CRTBernFrameBuilderModule::do_conf); + register_command("start", &CRTBernFrameBuilderModule::do_start); + register_command("stop_trigger_sources", &CRTBernFrameBuilderModule::do_stop); + register_command("scrap", &CRTBernFrameBuilderModule::do_scrap); +} + +void +CRTBernFrameBuilderModule::init(const std::shared_ptr mcfg) +{ + auto* mdal = mcfg->get_dal(get_name()); + + auto* d2d_conn = mdal->get_connection(); + auto* socket_d2d_conn = d2d_conn->cast(); + if (socket_d2d_conn == nullptr) { + auto err = datahandlinglibs::InitializationError(ERS_HERE, "Connection is not of type SocketDetectorToDaqConnection."); + ers::fatal(err); + throw err; + } + + auto* nw_sender = socket_d2d_conn->get_net_senders()[0]; // there's only 1 sender + + for (auto det_stream : nw_sender->get_streams()) { + if (det_stream->is_disabled(*(mcfg->get_session()))) { + continue; + } + + m_fake_stream_ids.push_back(det_stream->get_geo_id()->get_stream_id()); + + m_producer_threads.emplace_back(std::make_unique()); + } + + auto* con = mdal->get_outputs()[0]; // there's only 1 output + auto* queue = con->cast(); + if (queue == nullptr) { + auto err = datahandlinglibs::InitializationError(ERS_HERE, "Output is not of type Queue."); + ers::fatal(err); + throw err; + } + + auto connection_name = queue->UID(); + m_sender = get_iom_sender(connection_name); +} + +void +CRTBernFrameBuilderModule::do_conf(const CommandData_t& /*obj*/) +{ + // Configure HW interface? + if (!m_run_marker.load()) { + set_running(true); + } else { + TLOG_DEBUG(5) << "Already running!"; + } +} + +void +CRTBernFrameBuilderModule::do_scrap(const CommandData_t& /*obj*/) +{ + if (m_run_marker.load()) { + TLOG() << "Raising stop through variables!"; + set_running(false); + for (const auto& producer : m_producer_threads) { + while (!producer->get_readiness()) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + } + } else { + TLOG_DEBUG(5) << "Already stopped!"; + } +} + +void +CRTBernFrameBuilderModule::do_start(const CommandData_t& /*startobj*/) +{ + enable_flow(); + + m_packet_count = 0; + m_t0 = std::chrono::steady_clock::now(); + + uint32_t i = 0; // NOLINT(build/unsigned) + for (auto& producer : m_producer_threads) { + producer->set_work(&CRTBernFrameBuilderModule::run_produce, this, m_fake_stream_ids[i++]); + } +} + +void +CRTBernFrameBuilderModule::do_stop(const CommandData_t& /*stopobj*/) +{ + disable_flow(); +} + +void +CRTBernFrameBuilderModule::generate_opmon_data() +{ + opmon::CRTBernFrameBuilderInfo i; + + auto now = std::chrono::steady_clock::now(); + int new_packets = m_packet_count.exchange(0); + double seconds = std::chrono::duration_cast(now - m_t0).count() / 1000000.; + m_t0 = now; + + i.set_packet_rate_khz(new_packets / seconds / 1000.); + + publish(std::move(i)); +} + +void +CRTBernFrameBuilderModule::run_produce(uint32_t fake_stream_id) // NOLINT(build/unsigned) +{ + TLOG() << "Producer thread started..."; // TODO (DTE): Debug log instead + + fddetdataformats::CRTBernFrame frame; + uint64_t seq_id = 0; // NOLINT(build/unsigned) + uint64_t timestamp = 0; // NOLINT(build/unsigned) + + datahandlinglibs::RateLimiter rate_limiter(m_configured_packet_rate_khz); + + while (m_run_marker.load()) { + // Create a fake packet for stream + fake_data(frame, seq_id, timestamp, fake_stream_id); // TODO: To be filled by the CRT experts + + if (m_enable_flow.load()) [[likely]] { + m_sender->try_send(std::move(frame), iomanager::Sender::s_no_block); + ++m_packet_count; + } + + rate_limiter.limit(); + } + + TLOG() << "Producer thread joins... "; // TODO (DTE): Debug log instead +} + +void +CRTBernFrameBuilderModule::set_running(bool should_run) +{ + bool was_running = m_run_marker.exchange(should_run); + TLOG_DEBUG(5) << "Active state was toggled from " << was_running << " to " << should_run; +} + +} // namespace dunedaq::crtmodules + +DEFINE_DUNE_DAQ_MODULE(dunedaq::crtmodules::CRTBernFrameBuilderModule) diff --git a/plugins/CRTBernReaderModule.hpp b/plugins/CRTBernFrameBuilderModule.hpp similarity index 52% rename from plugins/CRTBernReaderModule.hpp rename to plugins/CRTBernFrameBuilderModule.hpp index 3567272..db8c8a4 100644 --- a/plugins/CRTBernReaderModule.hpp +++ b/plugins/CRTBernFrameBuilderModule.hpp @@ -1,5 +1,5 @@ /** - * @file CRTBernReaderModule.hpp + * @file CRTBernFrameBuilderModule.hpp * * Reads data from the HW then puts it in a queue * @@ -8,39 +8,39 @@ * received with this code. */ -#ifndef CRTMODULES_PLUGINS_CRTBERNREADERMODULE_HPP_ -#define CRTMODULES_PLUGINS_CRTBERNREADERMODULE_HPP_ +#ifndef CRTMODULES_PLUGINS_CRTBERNFRAMEBUILDERMODULE_HPP_ +#define CRTMODULES_PLUGINS_CRTBERNFRAMEBUILDERMODULE_HPP_ #include "appfwk/DAQModule.hpp" #include "utilities/ReusableThread.hpp" +#include "fddetdataformats/CRTBernFrame.hpp" #include #include +#include +#include -namespace dunedaq { -namespace crtmodules { +namespace dunedaq::crtmodules { -class SourceConcept; - -class CRTBernReaderModule : public dunedaq::appfwk::DAQModule +class CRTBernFrameBuilderModule : public dunedaq::appfwk::DAQModule { public: /** - * @brief CRTBernReaderModule constructor + * @brief CRTBernFrameBuilderModule constructor * @param name DAQ module instance name */ - explicit CRTBernReaderModule(const std::string& name); + explicit CRTBernFrameBuilderModule(const std::string& name); - CRTBernReaderModule(const CRTBernReaderModule&) = delete; ///< CRTBernReaderModule is not copy-constructible - CRTBernReaderModule& operator=(const CRTBernReaderModule&) = delete; ///< CRTBernReaderModule is not copy-assignable - CRTBernReaderModule(CRTBernReaderModule&&) = delete; ///< CRTBernReaderModule is not move-constructible - CRTBernReaderModule& operator=(CRTBernReaderModule&&) = delete; ///< CRTBernReaderModule is not move-assignable + CRTBernFrameBuilderModule(const CRTBernFrameBuilderModule&) = delete; ///< CRTBernFrameBuilderModule is not copy-constructible + CRTBernFrameBuilderModule& operator=(const CRTBernFrameBuilderModule&) = delete; ///< CRTBernFrameBuilderModule is not copy-assignable + CRTBernFrameBuilderModule(CRTBernFrameBuilderModule&&) = delete; ///< CRTBernFrameBuilderModule is not move-constructible + CRTBernFrameBuilderModule& operator=(CRTBernFrameBuilderModule&&) = delete; ///< CRTBernFrameBuilderModule is not move-assignable /** * @brief Handles initialization on boot * @param mcfg DAQ configuration data */ - void init(const std::shared_ptr mfcg) override; + void init(const std::shared_ptr mcfg) override; private: // Commands @@ -52,16 +52,10 @@ class CRTBernReaderModule : public dunedaq::appfwk::DAQModule void generate_opmon_data() override; /** - * @brief Raw data produce thread function + * @brief Data produce thread function + * @param fake_stream_id Fake packet stream ID */ - void run_produce(); - - /** - * @brief Forwards the payload to get processed - * @param payload Payload buffer - * @param size Payload size - */ - void handle_eth_payload(char* payload, std::size_t size); + void run_produce(uint32_t fake_stream_id); // NOLINT(build/unsigned) /** * @brief Sets run marker @@ -91,17 +85,19 @@ class CRTBernReaderModule : public dunedaq::appfwk::DAQModule // PRODUCER /** - * @brief Raw data producer thread + * @brief Data producer threads */ - utilities::ReusableThread m_producer_thread; + std::vector> m_producer_threads; - // Sinks (SourceConcepts) /** - * @brief Data sources + * @brief Data sender */ - using sid_to_source_map_t = std::map>; - sid_to_source_map_t m_sources; - uint32_t m_source_id; // NOLINT(build/unsigned) + std::shared_ptr> m_sender; + + /** + * @brief Fake packet stream IDs + */ + std::vector m_fake_stream_ids; // NOLINT(build/unsigned) /** * @brief Configured packet transmission rate in kHz @@ -117,9 +113,8 @@ class CRTBernReaderModule : public dunedaq::appfwk::DAQModule /** * @brief Timestamp used to measure time between opmon reports */ - std::chrono::time_point m_t0; + std::chrono::time_point m_t0; }; -} // namespace crtmodules -} // namespace dunedaq +} // namespace dunedaq::crtmodules -#endif // CRTMODULES_PLUGINS_CRTBERNREADERMODULE_HPP_ +#endif // CRTMODULES_PLUGINS_CRTBERNFRAMEBUILDERMODULE_HPP_ diff --git a/plugins/CRTBernReaderModule.cpp b/plugins/CRTBernReaderModule.cpp deleted file mode 100644 index 1ffa3e1..0000000 --- a/plugins/CRTBernReaderModule.cpp +++ /dev/null @@ -1,267 +0,0 @@ -/** - * @file CRTBernReaderModule.cpp - - * Reads data from the HW then puts it in a queue - * - * This is part of the DUNE DAQ Software Suite, copyright 2020. - * Licensing/copyright details are in the COPYING file that you should have - * received with this code. - */ - -#include "CRTBernReaderModule.hpp" - -#include "CreateSource.hpp" - -#include "crtmodules/opmon/CRTBernReaderModule.pb.h" - -#include "datahandlinglibs/utils/RateLimiter.hpp" - -#include "appmodel/DataReaderModule.hpp" - -#include "confmodel/QueueWithSourceId.hpp" - -#include "fddetdataformats/CRTBernFrame.hpp" - -#include "detdataformats/DetID.hpp" - -namespace dunedaq { -namespace crtmodules{ - -/** - * @brief Maximum packet sequence ID before reset - */ -constexpr uint64_t max_seq_id = 4095; - -/** - * @brief Fake packet detector ID - */ -constexpr uint8_t fake_det_id = (uint8_t)detdataformats::DetID::Subdetector::kVD_BernCRT; - -/** - * @brief Fake packet stream ID - */ -constexpr uint64_t fake_stream_id = 0; - -/** - * @brief Fake packet block length - */ -constexpr uint64_t fake_block_length = 0x382; - -/** - * @brief Calculate the next fake sequence ID for a packet - * @param seq_id Fake packet sequence ID - */ -void -fake_sequence_id(uint64_t& seq_id) -{ - seq_id = (seq_id == max_seq_id ? 0 : seq_id+1); -} - -/** - * @brief Calculate the next fake timestamp for a packet - * @param timestamp Fake packet timestamp - */ -void -fake_timestamp(uint64_t& timestamp) -{ - auto time_now = std::chrono::steady_clock::now().time_since_epoch(); - uint64_t current_time = // NOLINT (build/unsigned) - std::chrono::duration_cast(time_now).count(); - timestamp = current_time / 16; // 625/10000 (same as 625*us/10) -} - -/** - * @brief Fake ADC of the given packet - * @param frame Fake packet - */ -void -fake_adc(fddetdataformats::CRTBernFrame& frame) -{ - for (int channel = 0; channel < fddetdataformats::CRTBernFrame::s_num_channels; ++channel) { - frame.set_adc(channel, 0); - } -} - -/** - * @brief Create a fake packet - * @param frame Fake packet - * @param seq_id Fake packet sequence ID - * @param timestamp Fake packet timestamp - */ -void -fake_data(fddetdataformats::CRTBernFrame& frame, uint64_t& seq_id, uint64_t& timestamp) -{ - frame.daq_header.det_id = fake_det_id & 0x3f; //6 bits for det id - frame.daq_header.crate_id = 1; - frame.daq_header.slot_id = 1; - frame.daq_header.stream_id = fake_stream_id; - fake_sequence_id(seq_id); - frame.daq_header.seq_id = seq_id; - frame.daq_header.block_length = fake_block_length; - fake_timestamp(timestamp); - frame.daq_header.timestamp = timestamp; - fake_adc(frame); -} - -CRTBernReaderModule::CRTBernReaderModule(const std::string& name) - : DAQModule(name) -{ - register_command("conf", &CRTBernReaderModule::do_conf); - register_command("start", &CRTBernReaderModule::do_start); - register_command("stop_trigger_sources", &CRTBernReaderModule::do_stop); - register_command("scrap", &CRTBernReaderModule::do_scrap); -} - -inline void -tokenize(std::string const& str, const char delim, std::vector& out) -{ - std::size_t start; - std::size_t end = 0; - while ((start = str.find_first_not_of(delim, end)) != std::string::npos) { - end = str.find(delim, start); - out.push_back(str.substr(start, end - start)); - } -} - -void -CRTBernReaderModule::init(const std::shared_ptr mfcg) -{ - auto* mdal = mfcg->get_dal(get_name()); - - if (mdal->get_raw_data_callbacks().empty()) { - auto err = dunedaq::datahandlinglibs::InitializationError(ERS_HERE, - "No outputs defined for CRT Bern reader in configuration."); - ers::fatal(err); - throw err; - } - - for (auto* con : mdal->get_raw_data_callbacks()) { - m_source_id = con->get_source_id(); - auto ptr = m_sources[con->get_source_id()] = createSourceModel(con); - register_node(con->UID(), ptr); - } -} - -void -CRTBernReaderModule::do_conf(const CommandData_t& /*obj*/) -{ - // Configure HW interface? - if (!m_run_marker.load()) { - set_running(true); - } else { - TLOG_DEBUG(5) << "Already running!"; - } -} - -void -CRTBernReaderModule::do_scrap(const CommandData_t& /*obj*/) -{ - if (m_run_marker.load()) { - TLOG() << "Raising stop through variables!"; - set_running(false); -// if (!m_callback_mode) { - while (!m_producer_thread.get_readiness()) { - std::this_thread::sleep_for(std::chrono::milliseconds(10)); - } -// } - } else { - TLOG_DEBUG(5) << "Already stopped!"; - } -} - -void -CRTBernReaderModule::do_start(const CommandData_t& /*startobj*/) -{ - // Setup callbacks on all sourcemodels - for (auto& [sourceid, source] : m_sources) { - source->acquire_callback(); - } - - enable_flow(); - - m_packet_count = 0; - - m_t0 = std::chrono::high_resolution_clock::now(); - - //if (!m_callback_mode) { - m_producer_thread.set_work(&CRTBernReaderModule::run_produce, this); - //} -} - -void -CRTBernReaderModule::do_stop(const CommandData_t& /*stopobj*/) -{ - disable_flow(); -} - -void -CRTBernReaderModule::generate_opmon_data() -{ - opmon::CRTBernReaderInfo i; - - auto now = std::chrono::high_resolution_clock::now(); - int new_packets = m_packet_count.exchange(0); - double seconds = std::chrono::duration_cast(now - m_t0).count() / 1000000.; - m_t0 = now; - - i.set_packet_rate_khz(new_packets / seconds / 1000.); - - publish(std::move(i)); -} - -void -CRTBernReaderModule::run_produce() -{ - TLOG() << "Producer thread started..."; // TODO (DTE): Debug log instead - - fddetdataformats::CRTBernFrame frame; - uint64_t seq_id = 0; - uint64_t timestamp = 0; - - datahandlinglibs::RateLimiter rate_limiter(m_configured_packet_rate_khz); - - while (m_run_marker.load()) { - fake_data(frame, seq_id, timestamp); // TODO: To be filled by the CRT experts - - if (m_enable_flow.load()) [[likely]] { - handle_eth_payload(reinterpret_cast(&frame), sizeof(frame)); - ++m_packet_count; - } - - rate_limiter.limit(); - } - - TLOG() << "Producer thread joins... "; // TODO (DTE): Debug log instead -} - -void -CRTBernReaderModule::handle_eth_payload(char* payload, std::size_t size) -{ - // Get DAQ Header and its StreamID - //auto* daq_header = reinterpret_cast(payload); - //auto src_id = m_stream_id_to_source_id[src_rx_q][(unsigned)daq_header->stream_id]; - - if ( auto src_it = m_sources.find(m_source_id); src_it != m_sources.end()) { - src_it->second->handle_payload(payload, size); - } else { - // Really bad -> unexpeced StreamID in UDP Payload. - // This check is needed in order to avoid dynamically add thousands - // of Sources on the fly, in case the data corruption is extremely severe. - //if (m_num_unexid_frames.count(0) == 0) { - // m_num_unexid_frames[0] = 0; - //} - //m_num_unexid_frames[0]++; - } -} - -void -CRTBernReaderModule::set_running(bool should_run) -{ - bool was_running = m_run_marker.exchange(should_run); - TLOG_DEBUG(5) << "Active state was toggled from " << was_running << " to " << should_run; -} - -} -} - -DEFINE_DUNE_DAQ_MODULE(dunedaq::crtmodules::CRTBernReaderModule) diff --git a/plugins/CRTGrenobleFrameBuilderModule.cpp b/plugins/CRTGrenobleFrameBuilderModule.cpp new file mode 100644 index 0000000..f7c612e --- /dev/null +++ b/plugins/CRTGrenobleFrameBuilderModule.cpp @@ -0,0 +1,253 @@ +/** + * @file CRTGrenobleFrameBuilderModule.cpp + * + * Reads data from the HW then puts it in a queue + * + * This is part of the DUNE DAQ Software Suite, copyright 2020. + * Licensing/copyright details are in the COPYING file that you should have + * received with this code. + */ + +#include "CRTGrenobleFrameBuilderModule.hpp" + +#include "crtmodules/opmon/CRTGrenobleFrameBuilderModule.pb.h" + +#include "datahandlinglibs/utils/RateLimiter.hpp" + +#include "appmodel/DetectorFrameBuilderModule.hpp" +#include "appmodel/SocketDetectorToDaqConnection.hpp" +#include "appmodel/NWDetDataSender.hpp" + +#include "confmodel/QueueWithSourceId.hpp" +#include "confmodel/DetectorStream.hpp" +#include "confmodel/GeoId.hpp" + +#include "datahandlinglibs/DataHandlingIssues.hpp" + +#include "detdataformats/DetID.hpp" + +#include +#include +#include + +DUNE_DAQ_TYPESTRING(dunedaq::fddetdataformats::CRTGrenobleFrame, "CRTGrenobleFrame") + +namespace dunedaq::crtmodules { + +/** + * @brief Maximum packet sequence ID before reset + */ +constexpr uint64_t max_seq_id = 4095; // NOLINT(build/unsigned) + +/** + * @brief Fake packet detector ID + */ +constexpr uint8_t fake_det_id = static_cast(detdataformats::DetID::Subdetector::kVD_GrenobleCRT); // NOLINT(build/unsigned) + +/** + * @brief Fake packet block length + */ +constexpr uint64_t fake_block_length = 0x382; // NOLINT(build/unsigned) + +/** + * @brief Calculate the next fake sequence ID for a packet + * @param seq_id Fake packet sequence ID + */ +void +fake_sequence_id(uint64_t& seq_id) // NOLINT(build/unsigned) +{ + seq_id = (seq_id == max_seq_id ? 0 : seq_id+1); +} + +/** + * @brief Calculate the next fake timestamp for a packet + * @param timestamp Fake packet timestamp + */ +void +fake_timestamp(uint64_t& timestamp) // NOLINT(build/unsigned) +{ + auto time_now = std::chrono::steady_clock::now().time_since_epoch(); + uint64_t current_time = // NOLINT (build/unsigned) + std::chrono::duration_cast(time_now).count(); + timestamp = current_time / 16; // 625/10000 (same as 625*us/10) +} + +/** + * @brief Fake ADC of the given packet + * @param frame Fake packet + */ +void +fake_adc(fddetdataformats::CRTGrenobleFrame& frame) +{ + for (int channel = 0; channel < fddetdataformats::CRTGrenobleFrame::s_num_channels; ++channel) { + frame.set_adc(channel, 0); + } +} + +/** + * @brief Create a fake packet + * @param frame Fake packet + * @param seq_id Fake packet sequence ID + * @param timestamp Fake packet timestamp + * @param stream_id Fake packet stream ID + */ +void +fake_data(fddetdataformats::CRTGrenobleFrame& frame, uint64_t& seq_id, uint64_t& timestamp, uint32_t stream_id) // NOLINT(build/unsigned) +{ + frame.daq_header.det_id = fake_det_id & 0x3f; //6 bits for det id + frame.daq_header.crate_id = 1; + frame.daq_header.slot_id = 1; + frame.daq_header.stream_id = stream_id; + fake_sequence_id(seq_id); + frame.daq_header.seq_id = seq_id; + frame.daq_header.block_length = fake_block_length; + fake_timestamp(timestamp); + frame.daq_header.timestamp = timestamp; + fake_adc(frame); +} + +CRTGrenobleFrameBuilderModule::CRTGrenobleFrameBuilderModule(const std::string& name) + : DAQModule(name) +{ + register_command("conf", &CRTGrenobleFrameBuilderModule::do_conf); + register_command("start", &CRTGrenobleFrameBuilderModule::do_start); + register_command("stop_trigger_sources", &CRTGrenobleFrameBuilderModule::do_stop); + register_command("scrap", &CRTGrenobleFrameBuilderModule::do_scrap); +} + +void +CRTGrenobleFrameBuilderModule::init(const std::shared_ptr mcfg) +{ + auto* mdal = mcfg->get_dal(get_name()); + + auto* d2d_conn = mdal->get_connection(); + auto* socket_d2d_conn = d2d_conn->cast(); + if (socket_d2d_conn == nullptr) { + auto err = datahandlinglibs::InitializationError(ERS_HERE, "Connection is not of type SocketDetectorToDaqConnection."); + ers::fatal(err); + throw err; + } + + auto* nw_sender = socket_d2d_conn->get_net_senders()[0]; // there's only 1 sender + + for (auto det_stream : nw_sender->get_streams()) { + if (det_stream->is_disabled(*(mcfg->get_session()))) { + continue; + } + + m_fake_stream_ids.push_back(det_stream->get_geo_id()->get_stream_id()); + + m_producer_threads.emplace_back(std::make_unique()); + } + + auto* con = mdal->get_outputs()[0]; // there's only 1 output + auto* queue = con->cast(); + if (queue == nullptr) { + auto err = datahandlinglibs::InitializationError(ERS_HERE, "Output is not of type Queue."); + ers::fatal(err); + throw err; + } + + auto connection_name = queue->UID(); + m_sender = get_iom_sender(connection_name); +} + +void +CRTGrenobleFrameBuilderModule::do_conf(const CommandData_t& /*obj*/) +{ + // Configure HW interface? + if (!m_run_marker.load()) { + set_running(true); + } else { + TLOG_DEBUG(5) << "Already running!"; + } +} + +void +CRTGrenobleFrameBuilderModule::do_scrap(const CommandData_t& /*obj*/) +{ + if (m_run_marker.load()) { + TLOG() << "Raising stop through variables!"; + set_running(false); + for (const auto& producer : m_producer_threads) { + while (!producer->get_readiness()) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + } + } else { + TLOG_DEBUG(5) << "Already stopped!"; + } +} + +void +CRTGrenobleFrameBuilderModule::do_start(const CommandData_t& /*startobj*/) +{ + m_packet_count = 0; + + m_t0 = std::chrono::steady_clock::now(); + + enable_flow(); + + uint32_t i = 0; // NOLINT(build/unsigned) + for (auto& producer : m_producer_threads) { + producer->set_work(&CRTGrenobleFrameBuilderModule::run_produce, this, m_fake_stream_ids[i++]); + } +} + +void +CRTGrenobleFrameBuilderModule::do_stop(const CommandData_t& /*stopobj*/) +{ + disable_flow(); +} + +void +CRTGrenobleFrameBuilderModule::generate_opmon_data() +{ + opmon::CRTGrenobleFrameBuilderInfo i; + + auto now = std::chrono::steady_clock::now(); + int new_packets = m_packet_count.exchange(0); + double seconds = std::chrono::duration_cast(now - m_t0).count() / 1000000.; + m_t0 = now; + + i.set_packet_rate_khz(new_packets / seconds / 1000.); + + publish(std::move(i)); +} + +void +CRTGrenobleFrameBuilderModule::run_produce(uint32_t fake_stream_id) // NOLINT(build/unsigned) +{ + TLOG() << "Producer thread started..."; // TODO (DTE): Debug log instead + + fddetdataformats::CRTGrenobleFrame frame; + uint64_t seq_id = 0; // NOLINT(build/unsigned) + uint64_t timestamp = 0; // NOLINT(build/unsigned) + + datahandlinglibs::RateLimiter rate_limiter(m_configured_packet_rate_khz); + + while (m_run_marker.load()) { + // Create a fake packet for stream + fake_data(frame, seq_id, timestamp, fake_stream_id); // TODO: To be filled by the CRT experts + + if (m_enable_flow.load()) [[likely]] { + m_sender->try_send(std::move(frame), iomanager::Sender::s_no_block); + ++m_packet_count; + } + + rate_limiter.limit(); + } + + TLOG() << "Producer thread joins... "; // TODO (DTE): Debug log instead +} + +void +CRTGrenobleFrameBuilderModule::set_running(bool should_run) +{ + bool was_running = m_run_marker.exchange(should_run); + TLOG_DEBUG(5) << "Active state was toggled from " << was_running << " to " << should_run; +} + +} // namespace dunedaq::crtmodules + +DEFINE_DUNE_DAQ_MODULE(dunedaq::crtmodules::CRTGrenobleFrameBuilderModule) diff --git a/plugins/CRTGrenobleReaderModule.hpp b/plugins/CRTGrenobleFrameBuilderModule.hpp similarity index 52% rename from plugins/CRTGrenobleReaderModule.hpp rename to plugins/CRTGrenobleFrameBuilderModule.hpp index 7c8269f..f8ef033 100644 --- a/plugins/CRTGrenobleReaderModule.hpp +++ b/plugins/CRTGrenobleFrameBuilderModule.hpp @@ -1,5 +1,5 @@ /** - * @file CRTGrenobleReaderModule.hpp + * @file CRTGrenobleFrameBuilderModule.hpp * * Reads data from the HW then puts it in a queue * @@ -8,39 +8,39 @@ * received with this code. */ -#ifndef CRTMODULES_PLUGINS_CRTGRENOBLEREADERMODULE_HPP_ -#define CRTMODULES_PLUGINS_CRTGRENOBLEREADERMODULE_HPP_ +#ifndef CRTMODULES_PLUGINS_CRTGRENOBLEFRAMEBUILDERMODULE_HPP_ +#define CRTMODULES_PLUGINS_CRTGRENOBLEFRAMEBUILDERMODULE_HPP_ #include "appfwk/DAQModule.hpp" #include "utilities/ReusableThread.hpp" +#include "fddetdataformats/CRTGrenobleFrame.hpp" #include #include +#include +#include -namespace dunedaq { -namespace crtmodules { +namespace dunedaq::crtmodules { -class SourceConcept; - -class CRTGrenobleReaderModule : public dunedaq::appfwk::DAQModule +class CRTGrenobleFrameBuilderModule : public dunedaq::appfwk::DAQModule { public: /** - * @brief CRTGrenobleReaderModule constructor + * @brief CRTGrenobleFrameBuilderModule constructor * @param name DAQ module instance name */ - explicit CRTGrenobleReaderModule(const std::string& name); + explicit CRTGrenobleFrameBuilderModule(const std::string& name); - CRTGrenobleReaderModule(const CRTGrenobleReaderModule&) = delete; ///< CRTGrenobleReaderModule is not copy-constructible - CRTGrenobleReaderModule& operator=(const CRTGrenobleReaderModule&) = delete; ///< CRTGrenobleReaderModule is not copy-assignable - CRTGrenobleReaderModule(CRTGrenobleReaderModule&&) = delete; ///< CRTGrenobleReaderModule is not move-constructible - CRTGrenobleReaderModule& operator=(CRTGrenobleReaderModule&&) = delete; ///< CRTGrenobleReaderModule is not move-assignable + CRTGrenobleFrameBuilderModule(const CRTGrenobleFrameBuilderModule&) = delete; ///< CRTGrenobleFrameBuilderModule is not copy-constructible + CRTGrenobleFrameBuilderModule& operator=(const CRTGrenobleFrameBuilderModule&) = delete; ///< CRTGrenobleFrameBuilderModule is not copy-assignable + CRTGrenobleFrameBuilderModule(CRTGrenobleFrameBuilderModule&&) = delete; ///< CRTGrenobleFrameBuilderModule is not move-constructible + CRTGrenobleFrameBuilderModule& operator=(CRTGrenobleFrameBuilderModule&&) = delete; ///< CRTGrenobleFrameBuilderModule is not move-assignable /** * @brief Handles initialization on boot * @param mcfg DAQ configuration data */ - void init(const std::shared_ptr mfcg) override; + void init(const std::shared_ptr mcfg) override; private: // Commands @@ -52,16 +52,10 @@ class CRTGrenobleReaderModule : public dunedaq::appfwk::DAQModule void generate_opmon_data() override; /** - * @brief Raw data produce thread function + * @brief Data produce thread function + * @param fake_stream_id Fake packet stream ID */ - void run_produce(); - - /** - * @brief Forwards the payload to get processed - * @param payload Payload buffer - * @param size Payload size - */ - void handle_eth_payload(char* payload, std::size_t size); + void run_produce(uint32_t fake_stream_id); // NOLINT(build/unsigned) /** * @brief Sets run marker @@ -91,17 +85,19 @@ class CRTGrenobleReaderModule : public dunedaq::appfwk::DAQModule // PRODUCER /** - * @brief Raw data producer thread + * @brief Data producer threads */ - utilities::ReusableThread m_producer_thread; + std::vector> m_producer_threads; - // Sinks (SourceConcepts) /** - * @brief Data sources + * @brief Data sender */ - using sid_to_source_map_t = std::map>; - sid_to_source_map_t m_sources; - uint32_t m_source_id; // NOLINT(build/unsigned) + std::shared_ptr> m_sender; + + /** + * @brief Fake packet stream IDs + */ + std::vector m_fake_stream_ids; // NOLINT(build/unsigned) /** * @brief Configured packet transmission rate in kHz @@ -119,7 +115,6 @@ class CRTGrenobleReaderModule : public dunedaq::appfwk::DAQModule */ std::chrono::time_point m_t0; }; -} // namespace crtmodules -} // namespace dunedaq +} // namespace dunedaq::crtmodules -#endif // CRTMODULES_PLUGINS_CRTGRENOBLEREADERMODULE_HPP_ +#endif // CRTMODULES_PLUGINS_CRTGRENOBLEFRAMEBUILDERMODULE_HPP_ diff --git a/plugins/CRTGrenobleReaderModule.cpp b/plugins/CRTGrenobleReaderModule.cpp deleted file mode 100644 index 670d7a3..0000000 --- a/plugins/CRTGrenobleReaderModule.cpp +++ /dev/null @@ -1,267 +0,0 @@ -/** - * @file CRTGrenobleReaderModule.cpp - * - * Reads data from the HW then puts it in a queue - * - * This is part of the DUNE DAQ Software Suite, copyright 2020. - * Licensing/copyright details are in the COPYING file that you should have - * received with this code. - */ - -#include "CRTGrenobleReaderModule.hpp" - -#include "CreateSource.hpp" - -#include "crtmodules/opmon/CRTGrenobleReaderModule.pb.h" - -#include "datahandlinglibs/utils/RateLimiter.hpp" - -#include "appmodel/DataReaderModule.hpp" - -#include "confmodel/QueueWithSourceId.hpp" - -#include "fddetdataformats/CRTGrenobleFrame.hpp" - -#include "detdataformats/DetID.hpp" - -namespace dunedaq { -namespace crtmodules{ - -/** - * @brief Maximum packet sequence ID before reset - */ -constexpr uint64_t max_seq_id = 4095; - -/** - * @brief Fake packet detector ID - */ -constexpr uint8_t fake_det_id = (uint8_t)detdataformats::DetID::Subdetector::kVD_GrenobleCRT; - -/** - * @brief Fake packet stream ID - */ -constexpr uint64_t fake_stream_id = 0; - -/** - * @brief Fake packet block length - */ -constexpr uint64_t fake_block_length = 0x382; - -/** - * @brief Calculate the next fake sequence ID for a packet - * @param seq_id Fake packet sequence ID - */ -void -fake_sequence_id(uint64_t& seq_id) -{ - seq_id = (seq_id == max_seq_id ? 0 : seq_id+1); -} - -/** - * @brief Calculate the next fake timestamp for a packet - * @param timestamp Fake packet timestamp - */ -void -fake_timestamp(uint64_t& timestamp) -{ - auto time_now = std::chrono::steady_clock::now().time_since_epoch(); - uint64_t current_time = // NOLINT (build/unsigned) - std::chrono::duration_cast(time_now).count(); - timestamp = current_time / 16; // 625/10000 (same as 625*us/10) -} - -/** - * @brief Fake ADC of the given packet - * @param frame Fake packet - */ -void -fake_adc(fddetdataformats::CRTGrenobleFrame& frame) -{ - for (int channel = 0; channel < fddetdataformats::CRTGrenobleFrame::s_num_channels; ++channel) { - frame.set_adc(channel, 0); - } -} - -/** - * @brief Create a fake packet - * @param frame Fake packet - * @param seq_id Fake packet sequence ID - * @param timestamp Fake packet timestamp - */ -void -fake_data(fddetdataformats::CRTGrenobleFrame& frame, uint64_t& seq_id, uint64_t& timestamp) -{ - frame.daq_header.det_id = fake_det_id & 0x3f; //6 bits for det id - frame.daq_header.crate_id = 1; - frame.daq_header.slot_id = 1; - frame.daq_header.stream_id = fake_stream_id; - fake_sequence_id(seq_id); - frame.daq_header.seq_id = seq_id; - frame.daq_header.block_length = fake_block_length; - fake_timestamp(timestamp); - frame.daq_header.timestamp = timestamp; - fake_adc(frame); -} - -CRTGrenobleReaderModule::CRTGrenobleReaderModule(const std::string& name) - : DAQModule(name) -{ - register_command("conf", &CRTGrenobleReaderModule::do_conf); - register_command("start", &CRTGrenobleReaderModule::do_start); - register_command("stop_trigger_sources", &CRTGrenobleReaderModule::do_stop); - register_command("scrap", &CRTGrenobleReaderModule::do_scrap); -} - -inline void -tokenize(std::string const& str, const char delim, std::vector& out) -{ - std::size_t start; - std::size_t end = 0; - while ((start = str.find_first_not_of(delim, end)) != std::string::npos) { - end = str.find(delim, start); - out.push_back(str.substr(start, end - start)); - } -} - -void -CRTGrenobleReaderModule::init(const std::shared_ptr mfcg) -{ - auto* mdal = mfcg->get_dal(get_name()); - - if (mdal->get_raw_data_callbacks().empty()) { - auto err = dunedaq::datahandlinglibs::InitializationError(ERS_HERE, - "No outputs defined for CRT Grenoble reader in configuration."); - ers::fatal(err); - throw err; - } - - for (auto* con : mdal->get_raw_data_callbacks()) { - m_source_id = con->get_source_id(); - auto ptr = m_sources[con->get_source_id()] = createSourceModel(con); - register_node(con->UID(), ptr); - } -} - -void -CRTGrenobleReaderModule::do_conf(const CommandData_t& /*obj*/) -{ - // Configure HW interface? - if (!m_run_marker.load()) { - set_running(true); - } else { - TLOG_DEBUG(5) << "Already running!"; - } -} - -void -CRTGrenobleReaderModule::do_scrap(const CommandData_t& /*obj*/) -{ - if (m_run_marker.load()) { - TLOG() << "Raising stop through variables!"; - set_running(false); -// if (!m_callback_mode) { - while (!m_producer_thread.get_readiness()) { - std::this_thread::sleep_for(std::chrono::milliseconds(10)); - } -// } - } else { - TLOG_DEBUG(5) << "Already stopped!"; - } -} - -void -CRTGrenobleReaderModule::do_start(const CommandData_t& /*startobj*/) -{ - // Setup callbacks on all sourcemodels - for (auto& [sourceid, source] : m_sources) { - source->acquire_callback(); - } - - m_packet_count = 0; - - m_t0 = std::chrono::steady_clock::now(); - - enable_flow(); - - //if (!m_callback_mode) { - m_producer_thread.set_work(&CRTGrenobleReaderModule::run_produce, this); - //} -} - -void -CRTGrenobleReaderModule::do_stop(const CommandData_t& /*stopobj*/) -{ - disable_flow(); -} - -void -CRTGrenobleReaderModule::generate_opmon_data() -{ - opmon::CRTGrenobleReaderInfo i; - - auto now = std::chrono::steady_clock::now(); - int new_packets = m_packet_count.exchange(0); - double seconds = std::chrono::duration_cast(now - m_t0).count() / 1000000.; - m_t0 = now; - - i.set_packet_rate_khz(new_packets / seconds / 1000.); - - publish(std::move(i)); -} - -void -CRTGrenobleReaderModule::run_produce() -{ - TLOG() << "Producer thread started..."; // TODO (DTE): Debug log instead - - fddetdataformats::CRTGrenobleFrame frame; - uint64_t seq_id = 0; - uint64_t timestamp = 0; - - datahandlinglibs::RateLimiter rate_limiter(m_configured_packet_rate_khz); - - while (m_run_marker.load()) { - fake_data(frame, seq_id, timestamp); // TODO: To be filled by the CRT experts - - if (m_enable_flow.load()) [[likely]] { - handle_eth_payload(reinterpret_cast(&frame), sizeof(frame)); - ++m_packet_count; - } - - rate_limiter.limit(); - } - - TLOG() << "Producer thread joins... "; // TODO (DTE): Debug log instead -} - -void -CRTGrenobleReaderModule::handle_eth_payload(char* payload, std::size_t size) -{ - // Get DAQ Header and its StreamID - //auto* daq_header = reinterpret_cast(payload); - //auto src_id = m_stream_id_to_source_id[src_rx_q][(unsigned)daq_header->stream_id]; - - if ( auto src_it = m_sources.find(m_source_id); src_it != m_sources.end()) { - src_it->second->handle_payload(payload, size); - } else { - // Really bad -> unexpeced StreamID in UDP Payload. - // This check is needed in order to avoid dynamically add thousands - // of Sources on the fly, in case the data corruption is extremely severe. - //if (m_num_unexid_frames.count(0) == 0) { - // m_num_unexid_frames[0] = 0; - //} - //m_num_unexid_frames[0]++; - } -} - -void -CRTGrenobleReaderModule::set_running(bool should_run) -{ - bool was_running = m_run_marker.exchange(should_run); - TLOG_DEBUG(5) << "Active state was toggled from " << was_running << " to " << should_run; -} - -} -} - -DEFINE_DUNE_DAQ_MODULE(dunedaq::crtmodules::CRTGrenobleReaderModule) diff --git a/schema/crtmodules/opmon/CRTBernReaderModule.proto b/schema/crtmodules/opmon/CRTBernFrameBuilderModule.proto similarity index 79% rename from schema/crtmodules/opmon/CRTBernReaderModule.proto rename to schema/crtmodules/opmon/CRTBernFrameBuilderModule.proto index 77070c2..4de324e 100644 --- a/schema/crtmodules/opmon/CRTBernReaderModule.proto +++ b/schema/crtmodules/opmon/CRTBernFrameBuilderModule.proto @@ -2,6 +2,6 @@ syntax = "proto3"; package dunedaq.crtmodules.opmon; -message CRTBernReaderInfo { +message CRTBernFrameBuilderInfo { double packet_rate_khz = 1; // Measured packet transmission rate in kHz } diff --git a/schema/crtmodules/opmon/CRTGrenobleReaderModule.proto b/schema/crtmodules/opmon/CRTGrenobleFrameBuilderModule.proto similarity index 77% rename from schema/crtmodules/opmon/CRTGrenobleReaderModule.proto rename to schema/crtmodules/opmon/CRTGrenobleFrameBuilderModule.proto index 1ab2c7a..f633b93 100644 --- a/schema/crtmodules/opmon/CRTGrenobleReaderModule.proto +++ b/schema/crtmodules/opmon/CRTGrenobleFrameBuilderModule.proto @@ -2,6 +2,6 @@ syntax = "proto3"; package dunedaq.crtmodules.opmon; -message CRTGrenobleReaderInfo { +message CRTGrenobleFrameBuilderInfo { double packet_rate_khz = 1; // Measured packet transmission rate in kHz } diff --git a/src/CreateSource.hpp b/src/CreateSource.hpp deleted file mode 100644 index 2612519..0000000 --- a/src/CreateSource.hpp +++ /dev/null @@ -1,51 +0,0 @@ -/** - * @file CreateSource.hpp Specific SourceConcept creator. - * - * This is part of the DUNE DAQ , copyright 2020. - * Licensing/copyright details are in the COPYING file that you should have - * received with this code. - */ -#ifndef CRTMODULES_SRC_CREATESOURCE_HPP_ -#define CRTMODULES_SRC_CREATESOURCE_HPP_ - -#include "SourceConcept.hpp" -#include "SourceModel.hpp" -#include "datahandlinglibs/DataHandlingIssues.hpp" - -#include "fdreadoutlibs/CRTBernTypeAdapter.hpp" -#include "fdreadoutlibs/CRTGrenobleTypeAdapter.hpp" - -#include -#include - -namespace dunedaq { - -DUNE_DAQ_TYPESTRING(dunedaq::fdreadoutlibs::types::CRTBernTypeAdapter, "CRTBernFrame") -DUNE_DAQ_TYPESTRING(dunedaq::fdreadoutlibs::types::CRTGrenobleTypeAdapter, "CRTGrenobleFrame") - -namespace crtmodules { - -std::shared_ptr -createSourceModel(const appmodel::DataMoveCallbackConf* conf) -{ - auto datatype = conf->get_data_type(); - TLOG() << "Choosing specializations for SourceModel for output connection " - << " [uid:" << conf->UID() << " , data_type:" << datatype << ']'; - - if (datatype.find("CRTBernFrame") != std::string::npos) { - auto source_model = std::make_shared>(); - source_model->set_sink_config(conf); - return source_model; - } else if (datatype.find("CRTGrenobleFrame") != std::string::npos) { - auto source_model = std::make_shared>(); - source_model->set_sink_config(conf); - return source_model; - } - - return nullptr; -} - -} // namespace crtmodules -} // namespace dunedaq - -#endif // CRTMODULES_SRC_CREATESOURCE_HPP_ diff --git a/src/SourceConcept.hpp b/src/SourceConcept.hpp deleted file mode 100644 index 21d583b..0000000 --- a/src/SourceConcept.hpp +++ /dev/null @@ -1,58 +0,0 @@ - -/** - * @file SourceConcept.hpp SourceConcept for constructors and - * forwarding command args. Enforces the implementation to - * queue in UDP JUMBO frames to be translated to TypeAdapters and - * send them to corresponding sinks. - * - * This is part of the DUNE DAQ , copyright 2020. - * Licensing/copyright details are in the COPYING file that you should have - * received with this code. - */ -#ifndef CRTMODULES_SRC_SOURCECONCEPT_HPP_ -#define CRTMODULES_SRC_SOURCECONCEPT_HPP_ - -//#include "DefaultParserImpl.hpp" - -#include "opmonlib/MonitorableObject.hpp" -#include "appfwk/DAQModule.hpp" -#include "appmodel/DataMoveCallbackConf.hpp" -//#include "packetformat/detail/block_parser.hpp" -#include - -#include -#include -#include - -namespace dunedaq { - namespace crtmodules { - - class SourceConcept : public opmonlib::MonitorableObject - { - public: - SourceConcept() {} - virtual ~SourceConcept() {} - - SourceConcept(const SourceConcept&) = delete; ///< SourceConcept is not copy-constructible - SourceConcept& operator=(const SourceConcept&) = delete; ///< SourceConcept is not copy-assignable - SourceConcept(SourceConcept&&) = delete; ///< SourceConcept is not move-constructible - SourceConcept& operator=(SourceConcept&&) = delete; ///< SourceConcept is not move-assignable - - // virtual void init(const nlohmann::json& args) = 0; - virtual void acquire_callback() = 0; - // virtual void conf(const nlohmann::json& args) = 0; - // virtual void start(const nlohmann::json& args) = 0; - // virtual void stop(const nlohmann::json& args) = 0; - - virtual bool handle_payload(char* message, std::size_t size) = 0; - virtual std::size_t get_frame_size() const = 0; - - void set_sink_config(const appmodel::DataMoveCallbackConf* sink_conf) { m_sink_conf = sink_conf; } - - const appmodel::DataMoveCallbackConf* m_sink_conf; - }; - - } // namespace crtmodules -} // namespace dunedaq - -#endif // CRTMODULES_SRC_SOURCECONCEPT_HPP_ diff --git a/src/SourceModel.hpp b/src/SourceModel.hpp deleted file mode 100644 index 3d00833..0000000 --- a/src/SourceModel.hpp +++ /dev/null @@ -1,111 +0,0 @@ -/** - * @file SourceModel.hpp FELIX CR's ELink concept wrapper - * - * This is part of the DUNE DAQ , copyright 2020. - * Licensing/copyright details are in the COPYING file that you should have - * received with this code. - */ -#ifndef CRTMODULES_SRC_SOURCEMODEL_HPP_ -#define CRTMODULES_SRC_SOURCEMODEL_HPP_ - -#include "SourceConcept.hpp" - - -#include "iomanager/IOManager.hpp" -#include "iomanager/Sender.hpp" -#include "logging/Logging.hpp" - -#include "crtmodules/opmon/SourceModel.pb.h" - -// #include "datahandlinglibs/utils/ReusableThread.hpp" -#include "datahandlinglibs/DataMoveCallbackRegistry.hpp" -#include "datahandlinglibs/utils/BufferCopy.hpp" - -// #include -// #include - -#include -#include -#include -#include - - -namespace dunedaq::crtmodules { - -template -class SourceModel : public SourceConcept -{ -public: - using sink_t = iomanager::SenderConcept; - using inherited = SourceConcept; - using data_t = nlohmann::json; - - /** - * @brief Buffer size based on TargetPayloadType - */ - static constexpr auto buffer_size = sizeof(TargetPayloadType); - - /** - * @brief SourceModel Constructor - * @param name Instance name for this SourceModel instance - */ - SourceModel() - : SourceConcept() - {} - ~SourceModel() {} - - void acquire_callback() override - { - if (m_callback_is_acquired) { - TLOG_DEBUG(5) << "SourceModel callback is already acquired!"; - } else { - // Getting DataMoveCBRegistry - auto dmcbr = datahandlinglibs::DataMoveCallbackRegistry::get(); - m_sink_callback = dmcbr->get_callback(inherited::m_sink_conf); - m_callback_is_acquired = true; - } - } - - bool handle_payload(char* message, std::size_t size) // NOLINT(build/unsigned) - { - bool push_out = true; - if (push_out) { - - TargetPayloadType& target_payload = *reinterpret_cast(message); - (*m_sink_callback)(std::move(target_payload)); - - } else { - TargetPayloadType target_payload; - uint32_t bytes_copied = 0; - datahandlinglibs::buffer_copy(message, size, static_cast(&target_payload), bytes_copied, sizeof(target_payload)); - } - - return true; - } - - std::size_t get_frame_size() const override { - TargetPayloadType target_payload; - return target_payload.get_frame_size(); // TODO (DTE): Could be a static function? - } - - void generate_opmon_data() override { - - opmon::SourceInfo info; - info.set_dropped_frames( m_dropped_packets.load() ); - - publish( std::move(info) ); - } - -private: - // Callback internals - bool m_callback_is_acquired{ false }; - using sink_cb_t = std::shared_ptr>; - sink_cb_t m_sink_callback; - - std::atomic m_dropped_packets{0}; - -}; - -} // namespace dunedaq::crtmodules - -#endif // CRTMODULES_SRC_SOURCEMODEL_HPP_