From 433ff6583b1ebe1ccb59c8e5a78d24a07d4be437 Mon Sep 17 00:00:00 2001 From: Marcus O'Flaherty Date: Thu, 26 Feb 2026 00:45:15 +0000 Subject: [PATCH 01/10] remove SendRootPlotMulticast change logging severity type from unsigned int to LogLevel enum class change alarm level (int) to critical (bool) add buffering; only send first monitoring message each buffer period, stack logging duplicates within buffer period when identical messages are received back-to-back change config related methods to reflect restructuring: replace SendRunConfig with SendBaseConfig, SendRunModeConfig, GetRunConfig and GetRunDeviceConfig require base and runmode config ids. add zstd compression to messages in ServicesBackend, for now both multicast and zmq. --- Makefile | 23 +- src/ServiceDiscovery/Services.cpp | 259 +++++++++++++++++------ src/ServiceDiscovery/Services.h | 85 ++++++-- src/ServiceDiscovery/ServicesBackend.cpp | 95 ++++++--- src/ServiceDiscovery/ServicesBackend.h | 11 + src/ToolDAQChain/ToolDAQChain.cpp | 8 +- 6 files changed, 356 insertions(+), 125 deletions(-) diff --git a/Makefile b/Makefile index 00be811..7f254be 100644 --- a/Makefile +++ b/Makefile @@ -9,8 +9,8 @@ else CXXFLAGS+= -O3 endif -ZMQLib= -L ../zeromq-4.0.7/lib -lzmq -ZMQInclude= -I ../zeromq-4.0.7/include/ +ZMQLib= -L ../zeromq-4.0.7/lib -lzmq +ZMQInclude= -I ../zeromq-4.0.7/include/ BoostLib= -L ../boost_1_66_0/install/lib -lboost_date_time -lboost_serialization -lboost_iostreams BoostInclude= -isystem ../boost_1_66_0/install/include/ @@ -21,14 +21,13 @@ TempDataModelLib = TempToolsInclude = TempToolsLib = - Includes= -I $(ToolFrameworkDIR)/include/ -I $(SOURCEDIR)/include/ -I $(SOURCEDIR)/tempinclude/ $(ZMQInclude) $(BoostInclude) Libs=-L $(SOURCEDIR)/lib/ -lTempDAQDataModel -lTempDAQTools -lToolDAQChain -lServiceDiscovery -lDAQDataModelBase -lDAQStore -lDAQLogging $(BoostLib) $(ZMQLib) -L $(ToolFrameworkDIR)/lib/ -lToolChain -lTempDAQTools -lDataModelBase -lpthread -lLogging -lStore LIBRARIES=lib/libDAQStore.so lib/libDAQLogging.so lib/libToolDAQChain.so lib/libDAQDataModelBase.so lib/libTempDAQDataModel.so lib/libTempDAQTools.so lib/libServiceDiscovery.so HEADERS:=$(patsubst %.h, include/%.h, $(filter %.h, $(subst /, ,$(wildcard src/*/*.h) ))) TempDataModelHEADERS:=$(patsubst %.h, tempinclude/%.h, $(filter %.h, $(subst /, , $(wildcard DataModel/*.h)))) TempToolHEADERS:=$(patsubst %.h, tempinclude/%.h, $(filter %.h, $(subst /, , $(wildcard UserTools/*/*.h) $(wildcard UserTools/*.h)))) -SOURCEFILES:=$(patsubst %.cpp, %.o, $(wildcard */*.cpp) $(wildcard */*/*.cpp)) +SOURCEFILES:=$(patsubst %.cpp, %.o, $(wildcard */*.cpp) $(wildcard */*/*.cpp)) $(patsubst %.c, %.o, $(wildcard */*.c) $(wildcard */*/*.c)) #.SECONDARY: $(%.o) @@ -49,11 +48,15 @@ tempinclude/%.h: @echo -e "\e[38;5;87m\n*************** sym linking headers ****************\e[0m" ln -s $(SOURCEDIR)/$(filter %$(strip $(patsubst tempinclude/%.h, /%.h, $@)), $(wildcard DataModel/*.h) $(wildcard UserTools/*/*.h) $(wildcard UserTools/*.h)) $@ -src/%.o : src/%.cpp $(HEADERS) +src/%.o : src/%.cpp $(HEADERS) + @echo -e "\e[38;5;214m\n*************** Making " $@ "****************\e[0m" + g++ $(CXXFLAGS) -c $< -o $@ $(Includes) + +src/%.o : src/%.c $(HEADERS) @echo -e "\e[38;5;214m\n*************** Making " $@ "****************\e[0m" g++ $(CXXFLAGS) -c $< -o $@ $(Includes) -UnitTests/%.o : UnitTests/%.cpp $(HEADERS) +UnitTests/%.o : UnitTests/%.cpp $(HEADERS) @echo -e "\e[38;5;214m\n*************** Making " $@ "****************\e[0m" g++ $(CXXFLAGS) -c $< -o $@ $(Includes) @@ -61,7 +64,7 @@ UserTools/%.o : UserTools/%.cpp $(HEADERS) $(TempDataModelHEADERS) UserTools/%. @echo -e "\e[38;5;214m\n*************** Making " $@ "****************\e[0m" g++ $(CXXFLAGS) -c $< -o $@ $(Includes) $(TempDataModelInclude) $(TempToolsInclude) -UserTools/Factory/Factory.o : UserTools/Factory/Factory.cpp $(HEADERS) $(TempDataModelHEADERS) +UserTools/Factory/Factory.o : UserTools/Factory/Factory.cpp $(HEADERS) $(TempDataModelHEADERS) @echo -e "\e[38;5;214m\n*************** Making " $@ "****************\e[0m" g++ $(CXXFLAGS) -c $< -o $@ $(Includes) $(TempDataModelInclude) $(TempToolsInclude) @@ -69,15 +72,15 @@ DataModel/%.o : DataModel/%.cpp DataModel/%.h $(HEADERS) $(TempDataModelHEADERS) @echo -e "\e[38;5;214m\n*************** Making " $@ "****************\e[0m" g++ $(CXXFLAGS) -c $< -o $@ $(Includes) $(TempDataModelInclude) -lib/libDAQStore.so: $(patsubst %.cpp, %.o , $(wildcard src/Store/*.cpp)) | $(HEADERS) +lib/libDAQStore.so: $(patsubst %.cpp, %.o , $(wildcard src/Store/*.cpp)) | $(HEADERS) @echo -e "\e[38;5;201m\n*************** Making " $@ "****************\e[0m" g++ $(CXXFLAGS) --shared $^ -o $@ $(Includes) -lib/libDAQLogging.so: $(patsubst %.cpp, %.o , $(wildcard src/DAQLogging/*.cpp)) | $(HEADERS) +lib/libDAQLogging.so: $(patsubst %.cpp, %.o , $(wildcard src/DAQLogging/*.cpp)) | $(HEADERS) @echo -e "\e[38;5;201m\n*************** Making " $@ "****************\e[0m" g++ $(CXXFLAGS) --shared $^ -o $@ $(Includes) -lib/libServiceDiscovery.so: $(patsubst %.cpp, %.o , $(wildcard src/ServiceDiscovery/*.cpp)) | $(HEADERS) +lib/libServiceDiscovery.so: $(patsubst %.cpp, %.o , $(wildcard src/ServiceDiscovery/*.cpp)) $(patsubst %.c, %.o , $(wildcard src/ServiceDiscovery/*.c)) | $(HEADERS) @echo -e "\e[38;5;201m\n*************** Making " $@ "****************\e[0m" g++ $(CXXFLAGS) --shared $^ -o $@ $(Includes) diff --git a/src/ServiceDiscovery/Services.cpp b/src/ServiceDiscovery/Services.cpp index 843c05f..df7f3cd 100644 --- a/src/ServiceDiscovery/Services.cpp +++ b/src/ServiceDiscovery/Services.cpp @@ -3,12 +3,12 @@ using namespace ToolFramework; namespace { - const uint32_t MAX_UDP_PACKET_SIZE = 655355; + constexpr uint32_t MAX_UDP_PACKET_SIZE = 655355; + constexpr size_t MAX_MSG_SIZE = MAX_UDP_PACKET_SIZE-100; // 100 chars for JSON keys, timestamp string and quotes/commas } Services::Services(){ m_context=0; - m_dbname=""; m_name=""; } @@ -17,6 +17,7 @@ Services::~Services(){ m_backend_client.Finalise(); sc_vars->Stop(); + m_utils.KillThread(&thread_args); m_context=nullptr; } @@ -31,12 +32,16 @@ bool Services::Init(Store &m_variables, zmq::context_t* context_in, SlowControlC bool alerts_receive = 1; int alert_receive_port = 12243; int sc_port = 60000; + mon_merge_period_ms = 1000; + multicast_send_period_ms = 5000; m_variables.Get("alerts_send", alerts_send); m_variables.Get("alert_send_port", alert_send_port); m_variables.Get("alerts_receive", alerts_receive); m_variables.Get("alert_receive_port", alert_receive_port); m_variables.Get("sc_port", sc_port); + m_variables.Get("mon_merge_period_ms",mon_merge_period_ms); + m_variables.Get("multicast_send_period_ms",multicast_send_period_ms); sc_vars->InitThreadedReceiver(m_context, sc_port, 100, new_service, alert_receive_port, alerts_receive, alert_send_port, alerts_send); m_backend_client.SetUp(m_context); @@ -50,7 +55,6 @@ bool Services::Init(Store &m_variables, zmq::context_t* context_in, SlowControlC AlertSubscribe("LoadConfig", std::bind(&Services::LoadConfig2, this, std::placeholders::_1, std::placeholders::_2)); if(!m_variables.Get("service_name",m_name)) m_name="test_service"; - if(!m_variables.Get("db_name",m_dbname)) m_dbname="daq"; if(!m_backend_client.Initialise(m_variables)){ @@ -66,6 +70,19 @@ bool Services::Init(Store &m_variables, zmq::context_t* context_in, SlowControlC std::cerr<<"Warning: service not yet connected..."<0) level=1; - std::string cmd_string = "{\"time\":\""+TimeStringFromUnixMs(timestamp)+"\"" + ",\"device\":\""+name+"\"" - + ",\"level\":"+std::to_string(level) + + ",\"critical\":"+std::to_string(critical) + ",\"alarm\":\"" + message + "\"}"; std::string err=""; @@ -194,7 +208,47 @@ bool Services::SendDeviceConfig(const std::string& json_data, const std::string& // ««-------------- ≪ °◇◆◇° ≫ --------------»» -bool Services::SendRunConfig(const std::string& json_data, const std::string& name, const std::string& author, const std::string& description, const uint64_t timestamp, int* version, const unsigned int timeout){ +bool Services::SendBaseConfig(const std::string& json_data, const std::string& name, const std::string& author, const std::string& description, const uint64_t timestamp, int* version, const unsigned int timeout){ + + if(version) *version=-1; + + std::string cmd_string = "{ \"time\":\""+TimeStringFromUnixMs(timestamp)+"\"" + + ", \"name\":\""+ name+"\"" + + ", \"author\":\""+ author+"\"" + + ", \"description\":\""+ description+"\"" + + ", \"data\":"+ json_data +" }"; + + std::string response=""; + std::string err=""; + + if(!m_backend_client.SendCommand("W_BASECONFIG", cmd_string, &response, &timeout, &err)){ + std::cerr<<"SendBaseConfig error: "<& responses, const unsigned int timeout){ +bool Services::SQLQuery(const std::string& query, std::vector& responses, const unsigned int timeout){ responses.clear(); - //const std::string& db = (database=="") ? m_dbname : database; - std::string err=""; if(!m_backend_client.SendCommand("W_QUERY", query, &responses, &timeout, &err)){ @@ -369,13 +421,13 @@ bool Services::SQLQuery(/*const std::string& database,*/ const std::string& quer // ««-------------- ≪ °◇◆◇° ≫ --------------»» // version when expecting just one row -bool Services::SQLQuery(/*const std::string& database,*/ const std::string& query, std::string& response, const unsigned int timeout){ +bool Services::SQLQuery(const std::string& query, std::string& response, const unsigned int timeout){ response=""; std::vector responses; - bool ok = SQLQuery(/*db,*/ query, responses, timeout); + bool ok = SQLQuery(query, responses, timeout); if(responses.size()!=0){ response = responses.front(); @@ -390,10 +442,10 @@ bool Services::SQLQuery(/*const std::string& database,*/ const std::string& quer // ««-------------- ≪ °◇◆◇° ≫ --------------»» // versions that don't expect any return (e.g. insertions) -bool Services::SQLQuery(/*const std::string& database,*/ const std::string& query, const unsigned int timeout){ +bool Services::SQLQuery(const std::string& query, const unsigned int timeout){ std::string tmp; - return SQLQuery(/*database,*/ query, tmp, timeout); + return SQLQuery(query, tmp, timeout); } @@ -412,9 +464,9 @@ bool Services::GetCalibrationData(std::string& json_data, int& version, const st if(version<0){ // https://stackoverflow.com/questions/tagged/greatest-n-per-group for faster - cmd_string = "SELECT jsonb_build_object('data', data, 'version', version) FROM calibration WHERE name='"+name+"' ORDER BY version DESC LIMIT 1"; + cmd_string = "SELECT json_build_object('data', data, 'version', version) FROM calibration WHERE name='"+name+"' ORDER BY version DESC LIMIT 1"; } else { - cmd_string = "SELECT jsonb_build_object('data', data, 'version', version) FROM calibration WHERE device='"+name+"' AND version="+std::to_string(version); + cmd_string = "SELECT json_build_object('data', data, 'version', version) FROM calibration WHERE device='"+name+"' AND version="+std::to_string(version); } std::string err=""; @@ -467,9 +519,9 @@ bool Services::GetDeviceConfig(std::string& json_data, const int version, const std::string cmd_string; if(version<0){ // https://stackoverflow.com/questions/tagged/greatest-n-per-group for faster - cmd_string = "SELECT jsonb_build_object('data', data) FROM device_config WHERE device='"+name+"' ORDER BY version DESC LIMIT 1"; + cmd_string = "SELECT json_build_object('data', data) FROM device_config WHERE device='"+name+"' ORDER BY version DESC LIMIT 1"; } else { - cmd_string = "SELECT jsonb_build_object('data', data) FROM device_config WHERE device='"+name+"' AND version="+std::to_string(version); + cmd_string = "SELECT json_build_object('data', data) FROM device_config WHERE device='"+name+"' AND version="+std::to_string(version); } std::string err=""; @@ -506,12 +558,13 @@ bool Services::GetDeviceConfig(std::string& json_data, const int version, const // ««-------------- ≪ °◇◆◇° ≫ --------------»» -// get a run configuration via configuration ID -bool Services::GetRunConfig(std::string& json_data, const int config_id, const unsigned int timeout){ - +// get a run configuration via configuration ID pair +bool Services::GetRunConfig(std::string& json_data, const int runmode_config_id, const int base_config_id, const unsigned int timeout){ + json_data=""; - std::string cmd_string = "SELECT jsonb_build_object('data', data) FROM run_config WHERE config_id="+std::to_string(config_id); + std::string cmd_string = "{ \"base_config_id\":"+std::to_string(base_config_id) + + ", \"runmode_config_id\":"+std::to_string(runmode_config_id)+"}"; std::string err=""; @@ -524,7 +577,7 @@ bool Services::GetRunConfig(std::string& json_data, const int config_id, const u if(json_data.empty()){ // if we got an empty response but the command succeeded, // the query worked but matched no records - run config not found - err = "GetRunConfig error: config_id "+std::to_string(config_id)+" not found"; + err = "GetRunConfig error: no config matching "+cmd_string; std::cerr<>'"+name+"' FROM run_config WHERE config_id="+std::to_string(runconfig_id)+")::integer"; + std::string cmd_string = "{ \"base_config_id\":"+std::to_string(base_config_id) + + ", \"runmode_config_id\":"+std::to_string(runmode_config_id) + + ", \"device\":\""+name+"\"}"; std::string err=""; - if(!m_backend_client.SendCommand("R_DEVCONFIG", cmd_string, &json_data, &timeout, &err)){ + if(!m_backend_client.SendCommand("R_RUNDEVICECONFIG", cmd_string, &json_data, &timeout, &err)){ std::cerr<<"GetRunDeviceConfig error: "<>'"+name+"' FROM run_config WHERE name='"+runconfig_name+"' AND version="+std::to_string(runconfig_version)+")::integer"; + std::string cmd_string = "SELECT json_build_object('data', data, 'version', version) FROM device_config WHERE device='"+name+"' AND version=(SELECT data->>'"+name+"' FROM run_config WHERE name='"+runconfig_name+"' AND version="+std::to_string(runconfig_version)+")::integer"; std::string err=""; @@ -681,6 +740,7 @@ bool Services::GetRunDeviceConfig(std::string& json_data, const std::string& run return true; } +*/ // ««-------------- ≪ °◇◆◇° ≫ --------------»» @@ -690,9 +750,9 @@ bool Services::GetROOTplot(const std::string& plot_name, std::string& draw_optio if(version<0){ // https://stackoverflow.com/questions/tagged/greatest-n-per-group for faster - cmd_string = "SELECT jsonb_build_object('version', version, 'data', data, 'draw_options', draw_options) FROM rootplots WHERE name='"+plot_name+"' ORDER BY version DESC LIMIT 1"; + cmd_string = "SELECT json_build_object('version', version, 'data', data, 'draw_options', draw_options) FROM rootplots WHERE name='"+plot_name+"' ORDER BY version DESC LIMIT 1"; } else { - cmd_string = "SELECT jsonb_build_object('version', version, 'data', data, 'draw_options', draw_options) FROM rootplots WHERE name='"+plot_name+"' AND version="+std::to_string(version); + cmd_string = "SELECT json_build_object('version', version, 'data', data, 'draw_options', draw_options) FROM rootplots WHERE name='"+plot_name+"' AND version="+std::to_string(version); } std::string err=""; @@ -751,9 +811,9 @@ bool Services::GetPlotlyPlot( std::string cmd_string; if(version<0){ // https://stackoverflow.com/questions/tagged/greatest-n-per-group for faster - cmd_string = "SELECT jsonb_build_object('version', version, 'data', data, 'layout', layout) FROM plotlyplots WHERE name='"+name+"' ORDER BY version DESC LIMIT 1"; + cmd_string = "SELECT json_build_object('version', version, 'data', data, 'layout', layout) FROM plotlyplots WHERE name='"+name+"' ORDER BY version DESC LIMIT 1"; } else { - cmd_string = "SELECT jsonb_build_object('version', version, 'data', data, 'layout', layout) FROM plotlyplots WHERE name='"+name+"' AND version="+std::to_string(version); + cmd_string = "SELECT json_build_object('version', version, 'data', data, 'layout', layout) FROM plotlyplots WHERE name='"+name+"' AND version="+std::to_string(version); } std::string err; @@ -796,24 +856,36 @@ bool Services::GetPlotlyPlot(const std::string& name, std::string& trace, std::s // Multicast Senders // ----------------- -bool Services::SendLog(const std::string& message, unsigned int severity, const std::string& device, const uint64_t timestamp){ +bool Services::SendLog(const std::string& message, LogLevel severity, const std::string& device, const uint64_t timestamp){ const std::string& name = (device=="") ? m_name : device; - std::string cmd_string = std::string{"{\"topic\":\"LOGGING\""} - + ",\"time\":\""+TimeStringFromUnixMs(timestamp)+"\"" - + ",\"device\":\""+ name +"\"" - + ",\"severity\":"+std::to_string(severity) - + ",\"message\":\"" + message + "\"}"; - - if(cmd_string.length()>MAX_UDP_PACKET_SIZE){ - std::cerr<<"Logging message is too long! Maximum length may be MAX_UDP_PACKET_SIZE bytes"<MAX_MSG_SIZE){ + std::cerr<<"Logging message is too long!"< locker(logging_buf_mtx); + + if(logging_buf.size() && name==logging_buf.back().device && message==logging_buf.back().message){ + ++logging_buf.back().repeats; + return true; + } + + // grab timestamp at time of call if 0 + time_t ts = (timestamp!=0) ? timestamp : time(nullptr)*1000; + + logging_buf.emplace_back(message, severity, name, ts); + + return true; +} + +bool Services::SendLog(std::string& msg){ + std::string err=""; - if(!m_backend_client.SendMulticast(MulticastType::Log,cmd_string, &err)){ + if(!m_backend_client.SendMulticast(MulticastType::Log, msg, &err)){ std::cerr<<"SendLog error: "<MAX_UDP_PACKET_SIZE){ - std::cerr<<"Monitoring message is too long! Maximum length may be MAX_UDP_PACKET_SIZE bytes"<MAX_MSG_SIZE){ + std::cerr<<"Monitoring message is too long!"< locker(monitoring_buf_mtx); + + // take first of repeated monitoring sends within buffer period + auto it = monitoring_buf.find(name+subject); + if(it!=monitoring_buf.end() && (ts - it->second.timestamp)(args); + + m_args->last_send = std::chrono::steady_clock::now(); + + m_args->local_merge_buf.clear(); + std::unique_lock locker(*m_args->logging_buf_mtx); + + // merge into a batch + bool first=true; + for(LogMsg& msg : *m_args->logging_buf){ + m_args->local_merge_buf += std::string(first ? "," : "") + + "{\"topic\":\"LOGGING\"" + + ",\"time\":\""+TimeStringFromUnixMs(msg.timestamp)+"\"" + + ",\"device\":\""+ msg.device +"\"" + + ",\"severity\":"+std::to_string(int(msg.severity)) + + ",\"message\":\"" + msg. message + "\"" + + ",\"repeats\":"+std::to_string(msg.repeats)+"}"; + first=false; + } + + // send + if(m_args->services->SendLog(m_args->local_merge_buf)){ + m_args->logging_buf->clear(); // FIXME do we not clear on error...? does it depend on the error...? + } + + // repeat for monitoring messages + m_args->local_merge_buf.clear(); + locker = std::unique_lock(*m_args->monitoring_buf_mtx); + + first=true; + for(std::pair& msg : *m_args->monitoring_buf){ + m_args->local_merge_buf += std::string(first ? "," : "") + + "{\"topic\":\"MONITORING\"" + + ",\"time\":\""+TimeStringFromUnixMs(msg.second.timestamp)+"\"" + + ",\"device\":\""+ msg.second.device +"\"" + + ",\"subject\":\""+ msg.second.subject +"\"" + + ",\"data\":"+ msg.second.json_data +"}"; + first=false; + } + + // send + if(m_args->services->SendMonitoringData(m_args->local_merge_buf)){ + m_args->monitoring_buf->clear(); // FIXME do we not clear on error...? does it depend on the error...? + } + + std::this_thread::sleep_until(m_args->last_send+m_args->multicast_send_period_ms); + + return; +} diff --git a/src/ServiceDiscovery/Services.h b/src/ServiceDiscovery/Services.h index 74c2716..4a233df 100644 --- a/src/ServiceDiscovery/Services.h +++ b/src/ServiceDiscovery/Services.h @@ -5,6 +5,8 @@ #include #include #include +#include +#include #include #include #include @@ -15,11 +17,44 @@ //#include //#include #include +#include #define SERVICES_DEFAULT_TIMEOUT 1800 namespace ToolFramework { - + + enum class LogLevel { Error=0, Warning=1, Message=2, Debug=3, Debug1=4, Debug2=5, Debug3=6 }; + + struct LogMsg { + LogMsg(const std::string& i_message, LogLevel i_severity=LogLevel::Message, const std::string& i_device="", const uint64_t i_timestamp=0) : message{i_message}, severity{i_severity}, device{i_device}, timestamp{i_timestamp} {}; + std::string message; + LogLevel severity; + std::string device; + uint64_t timestamp; + uint32_t repeats; + }; + + struct MonitoringMsg { + MonitoringMsg(const std::string& i_json_data, const std::string& i_subject, const std::string& i_device="", uint64_t i_timestamp=0) : json_data{i_json_data}, subject{i_subject}, device{i_device}, timestamp{i_timestamp} {}; + std::string json_data; + std::string subject; + std::string device; + uint64_t timestamp; + }; + + class Services; + + struct BufferThreadArgs : Thread_args { + Services* services; + std::vector* logging_buf; + std::unordered_map* monitoring_buf; + std::mutex* logging_buf_mtx; + std::mutex* monitoring_buf_mtx; + std::chrono::milliseconds multicast_send_period_ms; + std::chrono::steady_clock::time_point last_send; + std::string local_merge_buf; + }; + class Services{ @@ -30,36 +65,33 @@ namespace ToolFramework { bool Init(Store &m_variables, zmq::context_t* context_in, SlowControlCollection* sc_vars_in, bool new_service=false); bool Ready(const unsigned int timeout=10000); // default service discovery broadcast period is 5s, middleman also checks intermittently, compound total time should be <10s... - bool SQLQuery(/*const std::string& database,*/ const std::string& query, std::vector& responses, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); - bool SQLQuery(/*const std::string& database,*/ const std::string& query, std::string& response, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); - bool SQLQuery(/*const std::string& database,*/ const std::string& query, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); + bool SQLQuery(const std::string& query, std::vector& responses, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); + bool SQLQuery(const std::string& query, std::string& response, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); + bool SQLQuery(const std::string& query, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); - bool SendLog(const std::string& message, unsigned int severity=2, const std::string& device="", const uint64_t timestamp=0); - bool SendAlarm(const std::string& message, unsigned int level=0, const std::string& device="", const uint64_t timestamp=0, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); + // public interface methods - push into local buffer + bool SendLog(const std::string& message, LogLevel severity=LogLevel::Message, const std::string& device="", const uint64_t timestamp=0); bool SendMonitoringData(const std::string& json_data, const std::string& subject, const std::string& device="", uint64_t timestamp=0); + + bool SendAlarm(const std::string& message, bool critical=false, const std::string& device="", const uint64_t timestamp=0, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); bool SendCalibrationData(const std::string& json_data, const std::string& description, const std::string& device="", uint64_t timestamp=0, int* version=nullptr, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); bool GetCalibrationData(std::string& json_data, int& version, const std::string& device="", const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); bool GetCalibrationData(std::string& json_data, int&& version=-1, const std::string& device="", const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); bool SendDeviceConfig(const std::string& json_data, const std::string& author, const std::string& description, const std::string& device="", uint64_t timestamp=0, int* version=nullptr, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); - bool SendRunConfig(const std::string& json_data, const std::string& name, const std::string& author, const std::string& description, uint64_t timestamp=0, int* version=nullptr, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); + bool SendBaseConfig(const std::string& json_data, const std::string& name, const std::string& author, const std::string& description, uint64_t timestamp=0, int* version=nullptr, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); + bool SendRunModeConfig(const std::string& json_data, const std::string& name, const std::string& author, const std::string& description, uint64_t timestamp=0, int* version=nullptr, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); bool GetDeviceConfig(std::string& json_data, const int version=-1, const std::string& device="", const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); - bool GetRunConfig(std::string& json_data, const int config_id, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); - bool GetRunConfig(std::string& json_data, const std::string& name, const int version=-1, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); - bool GetRunDeviceConfig(std::string& json_data, const int runconfig_id, const std::string& device="", int* version=nullptr, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); - bool GetRunDeviceConfig(std::string& json_data, const std::string& runconfig_name, const int runconfig_version=-1, const std::string& device="", int* version=nullptr, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); - // FIXME is default lifetime 5 ok? - bool SendROOTplotZmq(const std::string& plot_name, const std::string& draw_options, const std::string& json_data, int* version=nullptr, const uint64_t timestamp=0, const unsigned int lifetime=5, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); - // FIXME is default lifetime 5 ok? - bool SendROOTplotMulticast(const std::string& plot_name, const std::string& draw_options, const std::string& json_data, const unsigned int lifetime=5, const uint64_t timestamp=SERVICES_DEFAULT_TIMEOUT); + bool GetRunConfig(std::string& json_data, const int runmode_config_id, const int base_config_id=-1, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); + bool GetRunDeviceConfig(std::string& json_data, const int runmode_config_id=-1, const int base_config_id=-1, const std::string& device="", int* version=nullptr, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); + bool SendROOTplot(const std::string& plot_name, const std::string& draw_options, const std::string& json_data, int* version=nullptr, const uint64_t timestamp=0, const unsigned int lifetime=5, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); + bool SendROOTplotMulticast(const std::string& plot_name, const std::string& draw_options, const std::string& json_data, const unsigned int lifetime=5, const uint64_t timestamp=0); bool GetROOTplot(const std::string& plot_name, std::string& draw_option, std::string& json_data, int& version, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); bool GetROOTplot(const std::string& plot_name, std::string& draw_option, std::string& json_data, int&& version=-1, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); - // FIXME is default lifetime 5 ok? bool SendPlotlyPlot(const std::string& name, const std::string& json_trace, const std::string& json_layout="{}", int* version=nullptr, uint64_t timestamp=0, const unsigned int lifetime=5, unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); - // FIXME is default lifetime 5 ok? bool SendPlotlyPlot(const std::string& name, const std::vector& json_traces, const std::string& json_layout="{}", int* version=nullptr, uint64_t timestamp=0, const unsigned int lifetime=5, unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); bool GetPlotlyPlot(const std::string& name, std::string& json_trace, std::string& json_layout, int& version, unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); bool GetPlotlyPlot(const std::string& name, std::string& json_trace, std::string& json_layout, int&& version=-1, unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); - std::string TimeStringFromUnixMs(const uint64_t time); + static std::string TimeStringFromUnixMs(const uint64_t time); SlowControlCollection* GetSlowControlCollection(); SlowControlElement* GetSlowControlVariable(std::string key); @@ -83,15 +115,28 @@ namespace ToolFramework { void LoadConfig2(const char* alert, const char* payload); std::string LoadConfig1(const char* payload); + // private methods for sending from buffer + bool SendLog(std::string& msg); + bool SendMonitoringData(std::string& msg); + static void BufferThread(Thread_args* args); + std::string m_name; zmq::context_t* m_context; ServicesBackend m_backend_client; - std::string m_dbname; - std::string m_name; std::string m_local_config; uint64_t m_base_config_id; uint64_t m_run_mode_config_id; + Utilities m_utils; + BufferThreadArgs thread_args; + + std::vector logging_buf; + std::unordered_map monitoring_buf; + std::mutex logging_buf_mtx; + std::mutex monitoring_buf_mtx; + uint32_t mon_merge_period_ms; + uint32_t multicast_send_period_ms; + }; } diff --git a/src/ServiceDiscovery/ServicesBackend.cpp b/src/ServiceDiscovery/ServicesBackend.cpp index fa5b935..e612434 100644 --- a/src/ServiceDiscovery/ServicesBackend.cpp +++ b/src/ServiceDiscovery/ServicesBackend.cpp @@ -1,5 +1,9 @@ #include "ServicesBackend.h" +namespace { + const uint32_t MAX_UDP_PACKET_SIZE = 655355; +} + using namespace ToolFramework; Command::Command(std::string command_in, char type_in, std::string topic_in, const uint32_t timeout_ms_in){ @@ -119,6 +123,9 @@ bool ServicesBackend::Initialise(Store &variables_in){ if(!m_variables.Get("max_retries",max_retries)) max_retries = 3; int advertise_endpoints = 1; m_variables.Get("advertise_endpoints",advertise_endpoints); + int msg_compression=1; + m_variables.Get("msg_compression",msg_compression); + m_variables.Get("compression_level",compression_level); get_ok = InitZMQ(); if(not get_ok) return false; @@ -132,36 +139,10 @@ bool ServicesBackend::Initialise(Store &variables_in){ if(not get_ok) return false; } - /* Time Tracking */ - /* ----------------------------------------- */ - - // time to wait between resend attempts if not ack'd - int resend_period_ms = 1000; - // how often to print out stats on what we're sending - int print_stats_period_ms = 5000; - - // Update with user-specified values. - m_variables.Get("resend_period_ms",resend_period_ms); - m_variables.Get("print_stats_period_ms",print_stats_period_ms); - - // convert times to boost for easy handling - resend_period = boost::posix_time::milliseconds(resend_period_ms); - print_stats_period = boost::posix_time::milliseconds(print_stats_period_ms); - - // initialise 'last send' times - last_write = boost::posix_time::microsec_clock::universal_time(); - last_read = boost::posix_time::microsec_clock::universal_time(); - last_printout = boost::posix_time::microsec_clock::universal_time(); - - // get the hostname of this machine for monitoring stats - char buf[255]; - get_ok = gethostname(buf, 255); - if(get_ok!=0){ - std::cerr<<"Error getting hostname!"< locker(msg_buf_mtx, std::defer_lock); + if(zstd_ctx){ + locker.lock(); + bytes_to_send = ZSTD_compressCCtx(zstd_ctx, &compressed_msg_buf[1], MAX_UDP_PACKET_SIZE, command.data(), command.size(), compression_level); + if(ZSTD_isError(bytes_to_send)){ + locker.unlock(); + std::string errmsg = std::string{"Warning: error compressing multicast message "}+ZSTD_getErrorName(bytes_to_send); + Log(errmsg,v_error,verbosity); // XXX should send to MM uncompressed, along with other errors + if(err) *err= errmsg; + } else { + msg_to_send = compressed_msg_buf; + ++bytes_to_send; // account for leading 'z' to indicate compression + } + } + if(!msg_to_send){ + msg_to_send = const_cast(command.c_str()); + bytes_to_send = command.length(); + } + /* // check for listeners...? - seems redundant, multicast can always send zmq::poll(&multicast_poller,1, 0); // timeout 0 = return immediately... @@ -413,7 +415,7 @@ bool ServicesBackend::SendMulticast(MulticastType type, std::string command, std // got a listener - ship it socket_mtx->lock(); - int cnt = sendto(multicast_socket, command.c_str(), command.length()+1, 0, (struct sockaddr*)multicast_addr, multicast_addrlen); + int cnt = sendto(multicast_socket, msg_to_send, bytes_to_send+1, 0, (struct sockaddr*)multicast_addr, multicast_addrlen); socket_mtx->unlock(); if(cnt < 0){ std::string errmsg = "Error sending multicast message: "+std::string{strerror(errno)}; @@ -459,12 +461,34 @@ bool ServicesBackend::SendCommand(const std::string& topic, const std::string& c return false; } - // In the case of pub sockets, we may also want to specify a 'topic' that the recipient - // can use with ZMQ_SUBSCRIBE to filter out particular messages. + // compress the message if applicable + msg_to_send=nullptr; + std::unique_lock locker(msg_buf_mtx, std::defer_lock); + if(zstd_ctx){ + locker.lock(); + bytes_to_send = ZSTD_compressCCtx(zstd_ctx, &compressed_msg_buf[1], MAX_UDP_PACKET_SIZE, command.data(), command.size(), compression_level); + if(ZSTD_isError(bytes_to_send)){ + locker.unlock(); + std::string errmsg = std::string{"Warning: error compressing multicast message "}+ZSTD_getErrorName(bytes_to_send); + Log(errmsg,v_error,verbosity); // XXX should send to MM uncompressed, along with other errors + if(err) *err= errmsg; + } else { + msg_to_send = compressed_msg_buf; + ++bytes_to_send; // account for leading 'z' to indicate compression + } + } + if(!msg_to_send){ + msg_to_send = const_cast(command.c_str()); + bytes_to_send = command.length(); + } + + // In the case of pub sockets, we specify a 'topic' that the recipient can use with ZMQ_SUBSCRIBE + // to filter out particular messages. // In fact it's useful to indicate a topic in all cases, even when the actual message will // (for now) go over a dealer/router combination that cannot filter on the topic. // forward the timeout to the Command (and thus zmq::poll in PollAndSend...) ... is this sensible? HMMMMM FIXME - Command cmd{command, type, topic,timeout}; + Command cmd{std::string{msg_to_send}.substr(0,bytes_to_send), type, topic,timeout}; + if(locker.owns_lock()) locker.unlock(); // must check or it throws an exception // wrap our attempt to get the response in try/catch, just in case? try { @@ -821,6 +845,9 @@ bool ServicesBackend::Finalise(){ waiting_senders = std::queue>>{}; waiting_recipients.clear(); + // cleanup zmq compression context + if(zstd_ctx) ZSTD_freeCCtx(zstd_ctx); + // can't use 'Log' since we may have deleted the Logging class if(verbosity>3) std::cout<<"ServicesBackend finalise done"< // multicast #include // multicast #include +#include namespace ToolFramework { @@ -156,6 +157,16 @@ class ServicesBackend { std::atomic msg_id{0}; + // used in SendMulticast + size_t bytes_to_send; + char* msg_to_send=nullptr; + + ZSTD_CCtx* zstd_ctx=nullptr; + int compression_level=1; + char* compressed_msg_buf=nullptr; + std::mutex msg_buf_mtx; // we'll share this buffer, since it's kind of a large buffer to keep allocating for every call + // and we shouldn't be spamming the Send calls so fast mutex contention becomes a problem + // ======================================================= // zmq helper functions diff --git a/src/ToolDAQChain/ToolDAQChain.cpp b/src/ToolDAQChain/ToolDAQChain.cpp index b347820..09d40d6 100644 --- a/src/ToolDAQChain/ToolDAQChain.cpp +++ b/src/ToolDAQChain/ToolDAQChain.cpp @@ -177,7 +177,13 @@ void ToolDAQChain::Init(unsigned int IO_Threads){ using std::placeholders::_2; using std::placeholders::_3; using std::placeholders::_4; - tmp->SetSendLog(std::bind(&Services::SendLog, m_DAQdata->services, _1, _2, _3, _4)); + //tmp->SetSendLog(std::bind(&Services::SendLog, m_DAQdata->services, _1, _2, _3, _4)); + + // can't use std::bind because Services::SendLog is overloaded + //tmp->SetSendLog(std::bind(static_cast(&Services::SendLog), m_DAQdata->services, _1, _2, _3, _4)); + // use Ben's favourite feature of c++11 instead + tmp->SetSendLog([this](const std::string& m, unsigned int ll, const std::string& d, const unsigned int ts)->bool { return m_DAQdata->services->SendLog(m, LogLevel(ll), d, ts); }); + // tmp->SetSendLog(m_DAQdata->services->SendLog); } From fd26588440f2dd89a96da0e529838accac77265d Mon Sep 17 00:00:00 2001 From: marcus Date: Sat, 14 Mar 2026 23:56:54 +0000 Subject: [PATCH 02/10] uncomment functinality of LoadConfig1 as we now have it --- src/ServiceDiscovery/Services.cpp | 30 +++++++++++++++--------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/src/ServiceDiscovery/Services.cpp b/src/ServiceDiscovery/Services.cpp index df7f3cd..3388331 100644 --- a/src/ServiceDiscovery/Services.cpp +++ b/src/ServiceDiscovery/Services.cpp @@ -52,7 +52,7 @@ bool Services::Init(Store &m_variables, zmq::context_t* context_in, SlowControlC (*sc_vars)["NewConfig"]->SetValue(0); sc_vars->Add("LoadConfig",SlowControlElementType(BUTTON),std::bind(&Services::LoadConfig1, this, std::placeholders::_1),0,false,false); - AlertSubscribe("LoadConfig", std::bind(&Services::LoadConfig2, this, std::placeholders::_1, std::placeholders::_2)); + AlertSubscribe("LoadConfig", std::bind(&Services::LoadConfig2, this, std::placeholders::_1, std::placeholders::_2)); if(!m_variables.Get("service_name",m_name)) m_name="test_service"; @@ -1060,28 +1060,28 @@ void Services::LoadConfig2(const char* alert, const char* payload){ } std::string Services::LoadConfig1(const char* payload){ - + Store tmp; tmp.JsonParser(payload); uint64_t base_config_id=0; uint64_t run_mode_config_id=0; - + tmp.Get("Base",base_config_id); tmp.Get("RunMode",run_mode_config_id); - -if(run_mode_config_id!=m_run_mode_config_id || base_config_id!=m_base_config_id){ - //if(!GetRunDeviceConfig(m_local_config, base_config_id, run_mode_config_id)){ - // usleep(100000); - // } - (*sc_vars)["NewConfig"]->SetValue(1); - m_base_config_id = base_config_id; - m_run_mode_config_id = run_mode_config_id; + if(run_mode_config_id!=m_run_mode_config_id || base_config_id!=m_base_config_id){ + + if(!GetRunDeviceConfig(m_local_config, base_config_id, run_mode_config_id)){ + usleep(100000); + } + (*sc_vars)["NewConfig"]->SetValue(1); + m_base_config_id = base_config_id; + m_run_mode_config_id = run_mode_config_id; + + } + + return ""; - } - - return ""; - } // ======================== From 2d6e4279bc230b587ea368adbd03be6c86944aeb Mon Sep 17 00:00:00 2001 From: marcus Date: Sun, 15 Mar 2026 00:21:17 +0000 Subject: [PATCH 03/10] replace gmtime with thread-safe version --- src/ServiceDiscovery/Services.cpp | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/src/ServiceDiscovery/Services.cpp b/src/ServiceDiscovery/Services.cpp index 3388331..93815c1 100644 --- a/src/ServiceDiscovery/Services.cpp +++ b/src/ServiceDiscovery/Services.cpp @@ -1031,13 +1031,10 @@ std::string Services::TimeStringFromUnixMs(const uint64_t timestamp){ timestamp_ms = timestamp%1000; timestamp_sec = timestamp/1000; } - struct tm* timeptr = gmtime(×tamp_sec); // FIXME check thread safety of these time things - if(timeptr==0){ - //Log("gmtime error converting unix time '"+std::to_string(timestamp)+"' to time struct",v_error); - return "now()"; - } + struct tm timestruct; + gmtime_r(×tamp_sec, ×truct); // FIXME error checking? char timestring[24]; - int nchars = strftime(×tring[0], 20, "%F %T", timeptr); + int nchars = strftime(×tring[0], 20, "%F %T", ×truct); if(nchars==0){ //Log("strftime error converting time struct '"+std::to_string(timestamp)+"' to string",v_error); return "now()"; From e37a9a2f14bf1182d2dcb2962a73ef6081f8ab7c Mon Sep 17 00:00:00 2001 From: marcus Date: Tue, 17 Mar 2026 11:39:56 +0000 Subject: [PATCH 04/10] updates to configuration handling --- src/ServiceDiscovery/Services.cpp | 119 ++++++++++++++++++++++++++---- src/ServiceDiscovery/Services.h | 4 +- 2 files changed, 106 insertions(+), 17 deletions(-) diff --git a/src/ServiceDiscovery/Services.cpp b/src/ServiceDiscovery/Services.cpp index 93815c1..0895b66 100644 --- a/src/ServiceDiscovery/Services.cpp +++ b/src/ServiceDiscovery/Services.cpp @@ -51,7 +51,7 @@ bool Services::Init(Store &m_variables, zmq::context_t* context_in, SlowControlC sc_vars->Add("NewConfig",SlowControlElementType(INFO),0,0,false,false); (*sc_vars)["NewConfig"]->SetValue(0); - sc_vars->Add("LoadConfig",SlowControlElementType(BUTTON),std::bind(&Services::LoadConfig1, this, std::placeholders::_1),0,false,false); + sc_vars->Add("LoadConfig",SlowControlElementType(VARIABLE),std::bind(&Services::LoadConfig1, this, std::placeholders::_1),0,false,false); AlertSubscribe("LoadConfig", std::bind(&Services::LoadConfig2, this, std::placeholders::_1, std::placeholders::_2)); if(!m_variables.Get("service_name",m_name)) m_name="test_service"; @@ -563,8 +563,9 @@ bool Services::GetRunConfig(std::string& json_data, const int runmode_config_id, json_data=""; - std::string cmd_string = "{ \"base_config_id\":"+std::to_string(base_config_id) - + ", \"runmode_config_id\":"+std::to_string(runmode_config_id)+"}"; + std::string cmd_string = "SELECT json_build_object('data', base.data || runmode.data) FROM ( SELECT data FROM base_config WHERE config_id=" + + std::to_string(base_config_id) + ") base CROSS JOIN (SELECT data FROM runmode_config WHERE config_id=" + + std::to_string(runmode_config_id) + ") runmode"; std::string err=""; @@ -577,7 +578,7 @@ bool Services::GetRunConfig(std::string& json_data, const int runmode_config_id, if(json_data.empty()){ // if we got an empty response but the command succeeded, // the query worked but matched no records - run config not found - err = "GetRunConfig error: no config matching "+cmd_string; + err = "GetRunConfig error: no config matching base config "+std::to_string(base_config_id)+", runmode config "+std::to_string(runmode_config_id); std::cerr<'"+name+"')::integer FROM base CROSS JOIN runmode)"; std::string err=""; @@ -694,7 +705,63 @@ bool Services::GetRunDeviceConfig(std::string& json_data, const int runmode_conf // ««-------------- ≪ °◇◆◇° ≫ --------------»» -/* +// Get a device configuration from a *run* configuration ID +bool Services::GetCachedDeviceConfig(std::string& json_data, int base_config_id, int runmode_config_id, const std::string& device, int* version, unsigned int timeout){ + + json_data=""; + + const std::string& name = (device=="") ? m_name : device; + + std::string err=""; + + if(!m_backend_client.SendCommand("R_KACHEDDEVICECONFIG", name, &json_data, &timeout, &err)){ + std::cerr<<"GetRunDeviceConfig error: "<"}' - strip out contents + Store tmp; + tmp.JsonParser(json_data); + int base=-1, runmode=-1; + tmp.Get("base_config_id",base); + tmp.Get("runmode_config_id",runmode); + if(base!=base_config_id || runmode!=runmode_config_id){ + err="GetCachedDeviceConfig returned unexpected configuration ids: {"+std::to_string(base)+","+std::to_string(runmode) + +"}, expected {"+std::to_string(base_config_id)+","+std::to_string(runmode_config_id)+"}"; + std::cerr<>'"+name+"' FROM run_config WHERE name='"+runconfig_name+"' AND version="+std::to_string(runconfig_version)+")::integer"; + std::string cmd_string = "SELECT json_build_object('data', data, 'version', version) FROM device_config WHERE device='"+name+"' AND version=(SELECT data->>'"+name+"' FROM run_config WHERE name='"+runconfig_name+"' AND version="+std::to_string(runconfig_version)+")::integer"; std::string err=""; @@ -1041,23 +1108,43 @@ std::string Services::TimeStringFromUnixMs(const uint64_t timestamp){ } // add the milliseconds nchars = snprintf(×tring[19], 5, ".%03d", timestamp_ms); + /* if(nchars!=4){ - //Log("snprintf error converting '"+std::to_string(timestamp_ms)+"' to timestamp milliseconds",v_error); - //return "now()"; // just omit the milliseconds? or fall back to 'now'? + Log("snprintf error converting '"+std::to_string(timestamp_ms)+"' to timestamp milliseconds",v_error); + return "now()"; // just omit the milliseconds? or fall back to 'now'? } + */ return std::string{timestring}; } void Services::LoadConfig2(const char* alert, const char* payload){ - - LoadConfig1(payload); + + Store tmp; + tmp.JsonParser(payload); + uint64_t base_config_id=0; + uint64_t run_mode_config_id=0; + + tmp.Get("Base",base_config_id); + tmp.Get("RunMode",run_mode_config_id); + + if(run_mode_config_id!=m_run_mode_config_id || base_config_id!=m_base_config_id){ + + if(!GetCachedDeviceConfig(m_local_config, base_config_id, run_mode_config_id)){ + usleep(100000); + } + (*sc_vars)["NewConfig"]->SetValue(1); + m_base_config_id = base_config_id; + m_run_mode_config_id = run_mode_config_id; + + } } -std::string Services::LoadConfig1(const char* payload){ +std::string Services::LoadConfig1(const char* control){ + std::string payload = (*sc_vars)[control]->GetValue(); Store tmp; tmp.JsonParser(payload); uint64_t base_config_id=0; diff --git a/src/ServiceDiscovery/Services.h b/src/ServiceDiscovery/Services.h index 4a233df..03519a6 100644 --- a/src/ServiceDiscovery/Services.h +++ b/src/ServiceDiscovery/Services.h @@ -82,7 +82,9 @@ namespace ToolFramework { bool SendRunModeConfig(const std::string& json_data, const std::string& name, const std::string& author, const std::string& description, uint64_t timestamp=0, int* version=nullptr, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); bool GetDeviceConfig(std::string& json_data, const int version=-1, const std::string& device="", const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); bool GetRunConfig(std::string& json_data, const int runmode_config_id, const int base_config_id=-1, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); + bool GetRunModeConfig(std::string& json_data, const std::string& runmode_name, const int version=-1, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); bool GetRunDeviceConfig(std::string& json_data, const int runmode_config_id=-1, const int base_config_id=-1, const std::string& device="", int* version=nullptr, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); + bool GetCachedDeviceConfig(std::string& json_data, const int runmode_config_id, const int base_config_id, const std::string& device="", int* version=nullptr, unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); bool SendROOTplot(const std::string& plot_name, const std::string& draw_options, const std::string& json_data, int* version=nullptr, const uint64_t timestamp=0, const unsigned int lifetime=5, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); bool SendROOTplotMulticast(const std::string& plot_name, const std::string& draw_options, const std::string& json_data, const unsigned int lifetime=5, const uint64_t timestamp=0); bool GetROOTplot(const std::string& plot_name, std::string& draw_option, std::string& json_data, int& version, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); @@ -113,8 +115,8 @@ namespace ToolFramework { private: + std::string LoadConfig1(const char* sc_name); void LoadConfig2(const char* alert, const char* payload); - std::string LoadConfig1(const char* payload); // private methods for sending from buffer bool SendLog(std::string& msg); bool SendMonitoringData(std::string& msg); From 6dddb20be683adb5ea5a43cc36e061837a2accd1 Mon Sep 17 00:00:00 2001 From: marcus Date: Wed, 18 Mar 2026 16:01:23 +0000 Subject: [PATCH 05/10] don't prepend leading 'z' for compressed messages, it complicates things --- src/ServiceDiscovery/ServicesBackend.cpp | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/src/ServiceDiscovery/ServicesBackend.cpp b/src/ServiceDiscovery/ServicesBackend.cpp index e612434..06f60cf 100644 --- a/src/ServiceDiscovery/ServicesBackend.cpp +++ b/src/ServiceDiscovery/ServicesBackend.cpp @@ -141,8 +141,7 @@ bool ServicesBackend::Initialise(Store &variables_in){ if(msg_compression){ zstd_ctx = ZSTD_createCCtx(); - compressed_msg_buf = new char[ZSTD_compressBound(MAX_UDP_PACKET_SIZE)+1]; - compressed_msg_buf[0]='z'; // indicator that this message is compressed + compressed_msg_buf = new char[ZSTD_compressBound(MAX_UDP_PACKET_SIZE)]; } // initialise the message IDs based on the current time in unix seconds @@ -391,7 +390,7 @@ bool ServicesBackend::SendMulticast(MulticastType type, std::string command, std std::unique_lock locker(msg_buf_mtx, std::defer_lock); if(zstd_ctx){ locker.lock(); - bytes_to_send = ZSTD_compressCCtx(zstd_ctx, &compressed_msg_buf[1], MAX_UDP_PACKET_SIZE, command.data(), command.size(), compression_level); + bytes_to_send = ZSTD_compressCCtx(zstd_ctx, compressed_msg_buf, MAX_UDP_PACKET_SIZE, command.data(), command.size(), compression_level); if(ZSTD_isError(bytes_to_send)){ locker.unlock(); std::string errmsg = std::string{"Warning: error compressing multicast message "}+ZSTD_getErrorName(bytes_to_send); @@ -399,7 +398,6 @@ bool ServicesBackend::SendMulticast(MulticastType type, std::string command, std if(err) *err= errmsg; } else { msg_to_send = compressed_msg_buf; - ++bytes_to_send; // account for leading 'z' to indicate compression } } if(!msg_to_send){ @@ -466,7 +464,7 @@ bool ServicesBackend::SendCommand(const std::string& topic, const std::string& c std::unique_lock locker(msg_buf_mtx, std::defer_lock); if(zstd_ctx){ locker.lock(); - bytes_to_send = ZSTD_compressCCtx(zstd_ctx, &compressed_msg_buf[1], MAX_UDP_PACKET_SIZE, command.data(), command.size(), compression_level); + bytes_to_send = ZSTD_compressCCtx(zstd_ctx, compressed_msg_buf, MAX_UDP_PACKET_SIZE, command.data(), command.size(), compression_level); if(ZSTD_isError(bytes_to_send)){ locker.unlock(); std::string errmsg = std::string{"Warning: error compressing multicast message "}+ZSTD_getErrorName(bytes_to_send); @@ -474,7 +472,6 @@ bool ServicesBackend::SendCommand(const std::string& topic, const std::string& c if(err) *err= errmsg; } else { msg_to_send = compressed_msg_buf; - ++bytes_to_send; // account for leading 'z' to indicate compression } } if(!msg_to_send){ From b558e2562a5a6a53ea08fd90cb4d4a4620bc77c9 Mon Sep 17 00:00:00 2001 From: marcus Date: Wed, 18 Mar 2026 16:59:44 +0000 Subject: [PATCH 06/10] missing +1 --- src/ServiceDiscovery/ServicesBackend.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ServiceDiscovery/ServicesBackend.cpp b/src/ServiceDiscovery/ServicesBackend.cpp index 06f60cf..32c1c51 100644 --- a/src/ServiceDiscovery/ServicesBackend.cpp +++ b/src/ServiceDiscovery/ServicesBackend.cpp @@ -413,7 +413,7 @@ bool ServicesBackend::SendMulticast(MulticastType type, std::string command, std // got a listener - ship it socket_mtx->lock(); - int cnt = sendto(multicast_socket, msg_to_send, bytes_to_send+1, 0, (struct sockaddr*)multicast_addr, multicast_addrlen); + int cnt = sendto(multicast_socket, msg_to_send, bytes_to_send, 0, (struct sockaddr*)multicast_addr, multicast_addrlen); socket_mtx->unlock(); if(cnt < 0){ std::string errmsg = "Error sending multicast message: "+std::string{strerror(errno)}; From 87c450d9bf396022f444c2df3d7407f912ff93ce Mon Sep 17 00:00:00 2001 From: marcus Date: Wed, 18 Mar 2026 17:50:15 +0000 Subject: [PATCH 07/10] bugfix; comma needed when not first --- src/ServiceDiscovery/Services.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/ServiceDiscovery/Services.cpp b/src/ServiceDiscovery/Services.cpp index 0895b66..a487439 100644 --- a/src/ServiceDiscovery/Services.cpp +++ b/src/ServiceDiscovery/Services.cpp @@ -1182,7 +1182,7 @@ void Services::BufferThread(Thread_args* args){ // merge into a batch bool first=true; for(LogMsg& msg : *m_args->logging_buf){ - m_args->local_merge_buf += std::string(first ? "," : "") + m_args->local_merge_buf += std::string(first ? "" : ",") + "{\"topic\":\"LOGGING\"" + ",\"time\":\""+TimeStringFromUnixMs(msg.timestamp)+"\"" + ",\"device\":\""+ msg.device +"\"" @@ -1203,7 +1203,7 @@ void Services::BufferThread(Thread_args* args){ first=true; for(std::pair& msg : *m_args->monitoring_buf){ - m_args->local_merge_buf += std::string(first ? "," : "") + m_args->local_merge_buf += std::string(first ? "" : ",") + "{\"topic\":\"MONITORING\"" + ",\"time\":\""+TimeStringFromUnixMs(msg.second.timestamp)+"\"" + ",\"device\":\""+ msg.second.device +"\"" From c5b0d6953a122fd784bb81019472dde9500ae60e Mon Sep 17 00:00:00 2001 From: marcus Date: Wed, 18 Mar 2026 20:11:36 +0000 Subject: [PATCH 08/10] bugfix for compressed zmq messaages in ServicesBackend, add verbosity to Services class --- src/ServiceDiscovery/Services.cpp | 129 +++++++++++++---------- src/ServiceDiscovery/Services.h | 2 + src/ServiceDiscovery/ServicesBackend.cpp | 4 +- src/ServiceDiscovery/ServicesBackend.h | 2 +- 4 files changed, 76 insertions(+), 61 deletions(-) diff --git a/src/ServiceDiscovery/Services.cpp b/src/ServiceDiscovery/Services.cpp index a487439..221a0df 100644 --- a/src/ServiceDiscovery/Services.cpp +++ b/src/ServiceDiscovery/Services.cpp @@ -10,6 +10,7 @@ namespace { Services::Services(){ m_context=0; m_name=""; + m_verbose=false; } @@ -22,6 +23,10 @@ Services::~Services(){ } +void Services::SetVerbose(bool in){ + m_verbose=in; +} + bool Services::Init(Store &m_variables, zmq::context_t* context_in, SlowControlCollection* sc_vars_in, bool new_service){ m_context = context_in; @@ -55,10 +60,14 @@ bool Services::Init(Store &m_variables, zmq::context_t* context_in, SlowControlC AlertSubscribe("LoadConfig", std::bind(&Services::LoadConfig2, this, std::placeholders::_1, std::placeholders::_2)); if(!m_variables.Get("service_name",m_name)) m_name="test_service"; + if(m_name[0]=='('){ + if(m_verbose) std::cerr<<"device names cannot start with '('"<*)nullptr, &timeout, &err); if(!ok){ - std::cerr<<"SendAlarm error: "<& resp std::string err=""; - if(!m_backend_client.SendCommand("W_QUERY", query, &responses, &timeout, &err)){ - std::cerr<<"SQLQuery error: "<1){ - std::cout<<"Warning: SQLQuery returned multiple rows, only first returned"<MAX_MSG_SIZE){ - std::cerr<<"Logging message is too long!"<MAX_MSG_SIZE){ - std::cerr<<"Monitoring message is too long!"<MAX_UDP_PACKET_SIZE){ - std::cerr<<"ROOT plot json is too long! Maximum length may be MAX_UDP_PACKET_SIZE bytes"< T GetSlowControlValue(std::string name){ return (*sc_vars)[name]->GetValue(); @@ -123,6 +124,7 @@ namespace ToolFramework { static void BufferThread(Thread_args* args); std::string m_name; + bool m_verbose; zmq::context_t* m_context; ServicesBackend m_backend_client; std::string m_local_config; diff --git a/src/ServiceDiscovery/ServicesBackend.cpp b/src/ServiceDiscovery/ServicesBackend.cpp index 32c1c51..4dfe980 100644 --- a/src/ServiceDiscovery/ServicesBackend.cpp +++ b/src/ServiceDiscovery/ServicesBackend.cpp @@ -6,7 +6,7 @@ namespace { using namespace ToolFramework; -Command::Command(std::string command_in, char type_in, std::string topic_in, const uint32_t timeout_ms_in){ +Command::Command(const std::string& command_in, char type_in, const std::string& topic_in, const uint32_t timeout_ms_in){ command = command_in; type = type_in; // TODO type is unnecessary, could just use topic[0] topic=topic_in; @@ -484,7 +484,7 @@ bool ServicesBackend::SendCommand(const std::string& topic, const std::string& c // In fact it's useful to indicate a topic in all cases, even when the actual message will // (for now) go over a dealer/router combination that cannot filter on the topic. // forward the timeout to the Command (and thus zmq::poll in PollAndSend...) ... is this sensible? HMMMMM FIXME - Command cmd{std::string{msg_to_send}.substr(0,bytes_to_send), type, topic,timeout}; + Command cmd{std::string(msg_to_send,bytes_to_send), type, topic,timeout}; if(locker.owns_lock()) locker.unlock(); // must check or it throws an exception // wrap our attempt to get the response in try/catch, just in case? diff --git a/src/ServiceDiscovery/ServicesBackend.h b/src/ServiceDiscovery/ServicesBackend.h index 6f15ef6..24bcd78 100644 --- a/src/ServiceDiscovery/ServicesBackend.h +++ b/src/ServiceDiscovery/ServicesBackend.h @@ -31,7 +31,7 @@ namespace ToolFramework { struct Command { - Command(std::string command_in, char cmd_type_in, std::string topic_in, const uint32_t timeout_ms_in); + Command(const std::string& command_in, char cmd_type_in, const std::string& topic_in, const uint32_t timeout_ms_in); Command(const Command& cmd_in); // copy constructor Command(Command&& cmd_in); // move constructor From 64d1c2f4e554aa2e0dcce47d74a310ca2146c7b6 Mon Sep 17 00:00:00 2001 From: marcus Date: Wed, 18 Mar 2026 21:29:01 +0000 Subject: [PATCH 09/10] orderign --- src/ServiceDiscovery/Services.cpp | 4 ++-- src/ServiceDiscovery/Services.h | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/ServiceDiscovery/Services.cpp b/src/ServiceDiscovery/Services.cpp index 221a0df..672315d 100644 --- a/src/ServiceDiscovery/Services.cpp +++ b/src/ServiceDiscovery/Services.cpp @@ -572,7 +572,7 @@ bool Services::GetDeviceConfig(std::string& json_data, const int version, const // ««-------------- ≪ °◇◆◇° ≫ --------------»» // get a run configuration via configuration ID pair -bool Services::GetRunConfig(std::string& json_data, const int runmode_config_id, const int base_config_id, const unsigned int timeout){ +bool Services::GetRunConfig(std::string& json_data, const int base_config_id, const int runmode_config_id, const unsigned int timeout){ json_data=""; @@ -662,7 +662,7 @@ bool Services::GetRunModeConfig(std::string& json_data, const std::string& name, // ««-------------- ≪ °◇◆◇° ≫ --------------»» // Get a device configuration from a *run* configuration ID -bool Services::GetRunDeviceConfig(std::string& json_data, const int runmode_config_id, const int base_config_id, const std::string& device, int* version, unsigned int timeout){ +bool Services::GetRunDeviceConfig(std::string& json_data, const int base_config_id, const int runmode_config_id, const std::string& device, int* version, unsigned int timeout){ json_data=""; diff --git a/src/ServiceDiscovery/Services.h b/src/ServiceDiscovery/Services.h index fdaa814..f7cfa1e 100644 --- a/src/ServiceDiscovery/Services.h +++ b/src/ServiceDiscovery/Services.h @@ -81,10 +81,10 @@ namespace ToolFramework { bool SendBaseConfig(const std::string& json_data, const std::string& name, const std::string& author, const std::string& description, uint64_t timestamp=0, int* version=nullptr, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); bool SendRunModeConfig(const std::string& json_data, const std::string& name, const std::string& author, const std::string& description, uint64_t timestamp=0, int* version=nullptr, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); bool GetDeviceConfig(std::string& json_data, const int version=-1, const std::string& device="", const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); - bool GetRunConfig(std::string& json_data, const int runmode_config_id, const int base_config_id=-1, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); + bool GetRunConfig(std::string& json_data, const int base_config_id, const int runmode_config_id, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); bool GetRunModeConfig(std::string& json_data, const std::string& runmode_name, const int version=-1, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); - bool GetRunDeviceConfig(std::string& json_data, const int runmode_config_id=-1, const int base_config_id=-1, const std::string& device="", int* version=nullptr, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); - bool GetCachedDeviceConfig(std::string& json_data, const int runmode_config_id, const int base_config_id, const std::string& device="", int* version=nullptr, unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); + bool GetRunDeviceConfig(std::string& json_data, const int base_config_id, const int runmode_config_id, const std::string& device="", int* version=nullptr, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); + bool GetCachedDeviceConfig(std::string& json_data, const int base_config_id, const int runmode_config_id, const std::string& device="", int* version=nullptr, unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); bool SendROOTplot(const std::string& plot_name, const std::string& draw_options, const std::string& json_data, int* version=nullptr, const uint64_t timestamp=0, const unsigned int lifetime=5, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); bool SendROOTplotMulticast(const std::string& plot_name, const std::string& draw_options, const std::string& json_data, const unsigned int lifetime=5, const uint64_t timestamp=0); bool GetROOTplot(const std::string& plot_name, std::string& draw_option, std::string& json_data, int& version, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); From 05b9a93f04b66f1f250dbf63435c3d76e865d3d1 Mon Sep 17 00:00:00 2001 From: marc1uk <15327579+marc1uk@users.noreply.github.com> Date: Thu, 19 Mar 2026 14:41:45 +0000 Subject: [PATCH 10/10] Update ToolDAQChain.cpp comment out unused placeholders --- src/ToolDAQChain/ToolDAQChain.cpp | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/ToolDAQChain/ToolDAQChain.cpp b/src/ToolDAQChain/ToolDAQChain.cpp index 09d40d6..df77f6b 100644 --- a/src/ToolDAQChain/ToolDAQChain.cpp +++ b/src/ToolDAQChain/ToolDAQChain.cpp @@ -173,10 +173,10 @@ void ToolDAQChain::Init(unsigned int IO_Threads){ m_DAQdata->services= new Services(); m_DAQdata->services->Init(m_data->vars, m_DAQdata->context, &m_DAQdata->sc_vars, true); DAQLogging* tmp = reinterpret_cast(m_log); - using std::placeholders::_1; - using std::placeholders::_2; - using std::placeholders::_3; - using std::placeholders::_4; + //using std::placeholders::_1; + //using std::placeholders::_2; + //using std::placeholders::_3; + //using std::placeholders::_4; //tmp->SetSendLog(std::bind(&Services::SendLog, m_DAQdata->services, _1, _2, _3, _4)); // can't use std::bind because Services::SendLog is overloaded