diff --git a/Makefile b/Makefile index 00be811..7f254be 100644 --- a/Makefile +++ b/Makefile @@ -9,8 +9,8 @@ else CXXFLAGS+= -O3 endif -ZMQLib= -L ../zeromq-4.0.7/lib -lzmq -ZMQInclude= -I ../zeromq-4.0.7/include/ +ZMQLib= -L ../zeromq-4.0.7/lib -lzmq +ZMQInclude= -I ../zeromq-4.0.7/include/ BoostLib= -L ../boost_1_66_0/install/lib -lboost_date_time -lboost_serialization -lboost_iostreams BoostInclude= -isystem ../boost_1_66_0/install/include/ @@ -21,14 +21,13 @@ TempDataModelLib = TempToolsInclude = TempToolsLib = - Includes= -I $(ToolFrameworkDIR)/include/ -I $(SOURCEDIR)/include/ -I $(SOURCEDIR)/tempinclude/ $(ZMQInclude) $(BoostInclude) Libs=-L $(SOURCEDIR)/lib/ -lTempDAQDataModel -lTempDAQTools -lToolDAQChain -lServiceDiscovery -lDAQDataModelBase -lDAQStore -lDAQLogging $(BoostLib) $(ZMQLib) -L $(ToolFrameworkDIR)/lib/ -lToolChain -lTempDAQTools -lDataModelBase -lpthread -lLogging -lStore LIBRARIES=lib/libDAQStore.so lib/libDAQLogging.so lib/libToolDAQChain.so lib/libDAQDataModelBase.so lib/libTempDAQDataModel.so lib/libTempDAQTools.so lib/libServiceDiscovery.so HEADERS:=$(patsubst %.h, include/%.h, $(filter %.h, $(subst /, ,$(wildcard src/*/*.h) ))) TempDataModelHEADERS:=$(patsubst %.h, tempinclude/%.h, $(filter %.h, $(subst /, , $(wildcard DataModel/*.h)))) TempToolHEADERS:=$(patsubst %.h, tempinclude/%.h, $(filter %.h, $(subst /, , $(wildcard UserTools/*/*.h) $(wildcard UserTools/*.h)))) -SOURCEFILES:=$(patsubst %.cpp, %.o, $(wildcard */*.cpp) $(wildcard */*/*.cpp)) +SOURCEFILES:=$(patsubst %.cpp, %.o, $(wildcard */*.cpp) $(wildcard */*/*.cpp)) $(patsubst %.c, %.o, $(wildcard */*.c) $(wildcard */*/*.c)) #.SECONDARY: $(%.o) @@ -49,11 +48,15 @@ tempinclude/%.h: @echo -e "\e[38;5;87m\n*************** sym linking headers ****************\e[0m" ln -s $(SOURCEDIR)/$(filter %$(strip $(patsubst tempinclude/%.h, /%.h, $@)), $(wildcard DataModel/*.h) $(wildcard UserTools/*/*.h) $(wildcard UserTools/*.h)) $@ -src/%.o : src/%.cpp $(HEADERS) +src/%.o : src/%.cpp $(HEADERS) + @echo -e "\e[38;5;214m\n*************** Making " $@ "****************\e[0m" + g++ $(CXXFLAGS) -c $< -o $@ $(Includes) + +src/%.o : src/%.c $(HEADERS) @echo -e "\e[38;5;214m\n*************** Making " $@ "****************\e[0m" g++ $(CXXFLAGS) -c $< -o $@ $(Includes) -UnitTests/%.o : UnitTests/%.cpp $(HEADERS) +UnitTests/%.o : UnitTests/%.cpp $(HEADERS) @echo -e "\e[38;5;214m\n*************** Making " $@ "****************\e[0m" g++ $(CXXFLAGS) -c $< -o $@ $(Includes) @@ -61,7 +64,7 @@ UserTools/%.o : UserTools/%.cpp $(HEADERS) $(TempDataModelHEADERS) UserTools/%. @echo -e "\e[38;5;214m\n*************** Making " $@ "****************\e[0m" g++ $(CXXFLAGS) -c $< -o $@ $(Includes) $(TempDataModelInclude) $(TempToolsInclude) -UserTools/Factory/Factory.o : UserTools/Factory/Factory.cpp $(HEADERS) $(TempDataModelHEADERS) +UserTools/Factory/Factory.o : UserTools/Factory/Factory.cpp $(HEADERS) $(TempDataModelHEADERS) @echo -e "\e[38;5;214m\n*************** Making " $@ "****************\e[0m" g++ $(CXXFLAGS) -c $< -o $@ $(Includes) $(TempDataModelInclude) $(TempToolsInclude) @@ -69,15 +72,15 @@ DataModel/%.o : DataModel/%.cpp DataModel/%.h $(HEADERS) $(TempDataModelHEADERS) @echo -e "\e[38;5;214m\n*************** Making " $@ "****************\e[0m" g++ $(CXXFLAGS) -c $< -o $@ $(Includes) $(TempDataModelInclude) -lib/libDAQStore.so: $(patsubst %.cpp, %.o , $(wildcard src/Store/*.cpp)) | $(HEADERS) +lib/libDAQStore.so: $(patsubst %.cpp, %.o , $(wildcard src/Store/*.cpp)) | $(HEADERS) @echo -e "\e[38;5;201m\n*************** Making " $@ "****************\e[0m" g++ $(CXXFLAGS) --shared $^ -o $@ $(Includes) -lib/libDAQLogging.so: $(patsubst %.cpp, %.o , $(wildcard src/DAQLogging/*.cpp)) | $(HEADERS) +lib/libDAQLogging.so: $(patsubst %.cpp, %.o , $(wildcard src/DAQLogging/*.cpp)) | $(HEADERS) @echo -e "\e[38;5;201m\n*************** Making " $@ "****************\e[0m" g++ $(CXXFLAGS) --shared $^ -o $@ $(Includes) -lib/libServiceDiscovery.so: $(patsubst %.cpp, %.o , $(wildcard src/ServiceDiscovery/*.cpp)) | $(HEADERS) +lib/libServiceDiscovery.so: $(patsubst %.cpp, %.o , $(wildcard src/ServiceDiscovery/*.cpp)) $(patsubst %.c, %.o , $(wildcard src/ServiceDiscovery/*.c)) | $(HEADERS) @echo -e "\e[38;5;201m\n*************** Making " $@ "****************\e[0m" g++ $(CXXFLAGS) --shared $^ -o $@ $(Includes) diff --git a/src/ServiceDiscovery/Services.cpp b/src/ServiceDiscovery/Services.cpp index 843c05f..672315d 100644 --- a/src/ServiceDiscovery/Services.cpp +++ b/src/ServiceDiscovery/Services.cpp @@ -3,13 +3,14 @@ using namespace ToolFramework; namespace { - const uint32_t MAX_UDP_PACKET_SIZE = 655355; + constexpr uint32_t MAX_UDP_PACKET_SIZE = 655355; + constexpr size_t MAX_MSG_SIZE = MAX_UDP_PACKET_SIZE-100; // 100 chars for JSON keys, timestamp string and quotes/commas } Services::Services(){ m_context=0; - m_dbname=""; m_name=""; + m_verbose=false; } @@ -17,10 +18,15 @@ Services::~Services(){ m_backend_client.Finalise(); sc_vars->Stop(); + m_utils.KillThread(&thread_args); m_context=nullptr; } +void Services::SetVerbose(bool in){ + m_verbose=in; +} + bool Services::Init(Store &m_variables, zmq::context_t* context_in, SlowControlCollection* sc_vars_in, bool new_service){ m_context = context_in; @@ -31,12 +37,16 @@ bool Services::Init(Store &m_variables, zmq::context_t* context_in, SlowControlC bool alerts_receive = 1; int alert_receive_port = 12243; int sc_port = 60000; + mon_merge_period_ms = 1000; + multicast_send_period_ms = 5000; m_variables.Get("alerts_send", alerts_send); m_variables.Get("alert_send_port", alert_send_port); m_variables.Get("alerts_receive", alerts_receive); m_variables.Get("alert_receive_port", alert_receive_port); m_variables.Get("sc_port", sc_port); + m_variables.Get("mon_merge_period_ms",mon_merge_period_ms); + m_variables.Get("multicast_send_period_ms",multicast_send_period_ms); sc_vars->InitThreadedReceiver(m_context, sc_port, 100, new_service, alert_receive_port, alerts_receive, alert_send_port, alerts_send); m_backend_client.SetUp(m_context); @@ -46,15 +56,18 @@ bool Services::Init(Store &m_variables, zmq::context_t* context_in, SlowControlC sc_vars->Add("NewConfig",SlowControlElementType(INFO),0,0,false,false); (*sc_vars)["NewConfig"]->SetValue(0); - sc_vars->Add("LoadConfig",SlowControlElementType(BUTTON),std::bind(&Services::LoadConfig1, this, std::placeholders::_1),0,false,false); - AlertSubscribe("LoadConfig", std::bind(&Services::LoadConfig2, this, std::placeholders::_1, std::placeholders::_2)); + sc_vars->Add("LoadConfig",SlowControlElementType(VARIABLE),std::bind(&Services::LoadConfig1, this, std::placeholders::_1),0,false,false); + AlertSubscribe("LoadConfig", std::bind(&Services::LoadConfig2, this, std::placeholders::_1, std::placeholders::_2)); if(!m_variables.Get("service_name",m_name)) m_name="test_service"; - if(!m_variables.Get("db_name",m_dbname)) m_dbname="daq"; + if(m_name[0]=='('){ + if(m_verbose) std::cerr<<"device names cannot start with '('"<0) level=1; - std::string cmd_string = "{\"time\":\""+TimeStringFromUnixMs(timestamp)+"\"" + ",\"device\":\""+name+"\"" - + ",\"level\":"+std::to_string(level) + + ",\"critical\":"+std::to_string(critical) + ",\"alarm\":\"" + message + "\"}"; std::string err=""; @@ -94,7 +117,7 @@ bool Services::SendAlarm(const std::string& message, unsigned int level, const s // send the alarm on the pub socket bool ok = m_backend_client.SendCommand("W_ALARM", cmd_string, (std::vector*)nullptr, &timeout, &err); if(!ok){ - std::cerr<<"SendAlarm error: "<& responses, const unsigned int timeout){ +bool Services::SQLQuery(const std::string& query, std::vector& responses, const unsigned int timeout){ responses.clear(); - //const std::string& db = (database=="") ? m_dbname : database; - std::string err=""; - if(!m_backend_client.SendCommand("W_QUERY", query, &responses, &timeout, &err)){ - std::cerr<<"SQLQuery error: "< responses; - bool ok = SQLQuery(/*db,*/ query, responses, timeout); + bool ok = SQLQuery(query, responses, timeout); if(responses.size()!=0){ response = responses.front(); if(responses.size()>1){ - std::cout<<"Warning: SQLQuery returned multiple rows, only first returned"<>'"+name+"' FROM run_config WHERE config_id="+std::to_string(runconfig_id)+")::integer"; + /* + std::string cmd_string = "{ \"base_config_id\":"+std::to_string(base_config_id) + + ", \"runmode_config_id\":"+std::to_string(runmode_config_id) + + ", \"device\":\""+name+"\"}"; + */ + + std::string cmd_string = "WITH base AS ( SELECT data FROM base_config WHERE config_id="+std::to_string(base_config_id)+"), " + " runmode AS ( SELECT data FROM runmode_config WHERE config_id="+std::to_string(runmode_config_id)+") " + "SELECT json_build_object('version', version, 'data', data) FROM device_config WHERE device='"+name+"' " + "AND version=(SELECT ((base.data || runmode.data)->'"+name+"')::integer FROM base CROSS JOIN runmode)"; std::string err=""; - if(!m_backend_client.SendCommand("R_DEVCONFIG", cmd_string, &json_data, &timeout, &err)){ - std::cerr<<"GetRunDeviceConfig error: "<"}' - strip out contents + Store tmp; + tmp.JsonParser(json_data); + int base=-1, runmode=-1; + tmp.Get("base_config_id",base); + tmp.Get("runmode_config_id",runmode); + if(base!=base_config_id || runmode!=runmode_config_id){ + err="GetCachedDeviceConfig returned unexpected configuration ids: {"+std::to_string(base)+","+std::to_string(runmode) + +"}, expected {"+std::to_string(base_config_id)+","+std::to_string(runmode_config_id)+"}"; + if(m_verbose) std::cerr<>'"+name+"' FROM run_config WHERE name='"+runconfig_name+"' AND version="+std::to_string(runconfig_version)+")::integer"; + std::string cmd_string = "SELECT json_build_object('data', data, 'version', version) FROM device_config WHERE device='"+name+"' AND version=(SELECT data->>'"+name+"' FROM run_config WHERE name='"+runconfig_name+"' AND version="+std::to_string(runconfig_version)+")::integer"; std::string err=""; if(!m_backend_client.SendCommand("R_DEVCONFIG", cmd_string, &json_data, &timeout, &err)){ - std::cerr<<"GetRunDeviceConfig error: "<MAX_UDP_PACKET_SIZE){ - std::cerr<<"Logging message is too long! Maximum length may be MAX_UDP_PACKET_SIZE bytes"<MAX_MSG_SIZE){ + if(m_verbose) std::cerr<<"Logging message is too long!"< locker(logging_buf_mtx); + + if(logging_buf.size() && name==logging_buf.back().device && message==logging_buf.back().message){ + ++logging_buf.back().repeats; + return true; + } + + // grab timestamp at time of call if 0 + time_t ts = (timestamp!=0) ? timestamp : time(nullptr)*1000; + + logging_buf.emplace_back(message, severity, name, ts); + + return true; +} + +bool Services::SendLog(std::string& msg){ + std::string err=""; - if(!m_backend_client.SendMulticast(MulticastType::Log,cmd_string, &err)){ - std::cerr<<"SendLog error: "<MAX_UDP_PACKET_SIZE){ - std::cerr<<"Monitoring message is too long! Maximum length may be MAX_UDP_PACKET_SIZE bytes"<MAX_MSG_SIZE){ + if(m_verbose) std::cerr<<"Monitoring message is too long!"< locker(monitoring_buf_mtx); + + // take first of repeated monitoring sends within buffer period + auto it = monitoring_buf.find(name+subject); + if(it!=monitoring_buf.end() && (ts - it->second.timestamp)MAX_UDP_PACKET_SIZE){ - std::cerr<<"ROOT plot json is too long! Maximum length may be MAX_UDP_PACKET_SIZE bytes"<SetValue(1); + m_base_config_id = base_config_id; + m_run_mode_config_id = run_mode_config_id; + + } } -std::string Services::LoadConfig1(const char* payload){ - +std::string Services::LoadConfig1(const char* control){ + + std::string payload = (*sc_vars)[control]->GetValue(); Store tmp; tmp.JsonParser(payload); uint64_t base_config_id=0; uint64_t run_mode_config_id=0; - + tmp.Get("Base",base_config_id); tmp.Get("RunMode",run_mode_config_id); - -if(run_mode_config_id!=m_run_mode_config_id || base_config_id!=m_base_config_id){ - //if(!GetRunDeviceConfig(m_local_config, base_config_id, run_mode_config_id)){ - // usleep(100000); - // } - (*sc_vars)["NewConfig"]->SetValue(1); - m_base_config_id = base_config_id; - m_run_mode_config_id = run_mode_config_id; + if(run_mode_config_id!=m_run_mode_config_id || base_config_id!=m_base_config_id){ + + if(!GetRunDeviceConfig(m_local_config, base_config_id, run_mode_config_id)){ + usleep(100000); + } + (*sc_vars)["NewConfig"]->SetValue(1); + m_base_config_id = base_config_id; + m_run_mode_config_id = run_mode_config_id; + + } + + return ""; - } +} - return ""; +// ======================== +void Services::BufferThread(Thread_args* args){ + + BufferThreadArgs* m_args = dynamic_cast(args); + + m_args->last_send = std::chrono::steady_clock::now(); + + m_args->local_merge_buf.clear(); + std::unique_lock locker(*m_args->logging_buf_mtx); + + // merge into a batch + bool first=true; + for(LogMsg& msg : *m_args->logging_buf){ + m_args->local_merge_buf += std::string(first ? "" : ",") + + "{\"topic\":\"LOGGING\"" + + ",\"time\":\""+TimeStringFromUnixMs(msg.timestamp)+"\"" + + ",\"device\":\""+ msg.device +"\"" + + ",\"severity\":"+std::to_string(int(msg.severity)) + + ",\"message\":\"" + msg. message + "\"" + + ",\"repeats\":"+std::to_string(msg.repeats)+"}"; + first=false; + } + + // send + if(m_args->services->SendLog(m_args->local_merge_buf)){ + m_args->logging_buf->clear(); // FIXME do we not clear on error...? does it depend on the error...? + } + + // repeat for monitoring messages + m_args->local_merge_buf.clear(); + locker = std::unique_lock(*m_args->monitoring_buf_mtx); + + first=true; + for(std::pair& msg : *m_args->monitoring_buf){ + m_args->local_merge_buf += std::string(first ? "" : ",") + + "{\"topic\":\"MONITORING\"" + + ",\"time\":\""+TimeStringFromUnixMs(msg.second.timestamp)+"\"" + + ",\"device\":\""+ msg.second.device +"\"" + + ",\"subject\":\""+ msg.second.subject +"\"" + + ",\"data\":"+ msg.second.json_data +"}"; + first=false; + } + + // send + if(m_args->services->SendMonitoringData(m_args->local_merge_buf)){ + m_args->monitoring_buf->clear(); // FIXME do we not clear on error...? does it depend on the error...? + } + + std::this_thread::sleep_until(m_args->last_send+m_args->multicast_send_period_ms); + + return; } diff --git a/src/ServiceDiscovery/Services.h b/src/ServiceDiscovery/Services.h index 74c2716..f7cfa1e 100644 --- a/src/ServiceDiscovery/Services.h +++ b/src/ServiceDiscovery/Services.h @@ -5,6 +5,8 @@ #include #include #include +#include +#include #include #include #include @@ -15,11 +17,44 @@ //#include //#include #include +#include #define SERVICES_DEFAULT_TIMEOUT 1800 namespace ToolFramework { - + + enum class LogLevel { Error=0, Warning=1, Message=2, Debug=3, Debug1=4, Debug2=5, Debug3=6 }; + + struct LogMsg { + LogMsg(const std::string& i_message, LogLevel i_severity=LogLevel::Message, const std::string& i_device="", const uint64_t i_timestamp=0) : message{i_message}, severity{i_severity}, device{i_device}, timestamp{i_timestamp} {}; + std::string message; + LogLevel severity; + std::string device; + uint64_t timestamp; + uint32_t repeats; + }; + + struct MonitoringMsg { + MonitoringMsg(const std::string& i_json_data, const std::string& i_subject, const std::string& i_device="", uint64_t i_timestamp=0) : json_data{i_json_data}, subject{i_subject}, device{i_device}, timestamp{i_timestamp} {}; + std::string json_data; + std::string subject; + std::string device; + uint64_t timestamp; + }; + + class Services; + + struct BufferThreadArgs : Thread_args { + Services* services; + std::vector* logging_buf; + std::unordered_map* monitoring_buf; + std::mutex* logging_buf_mtx; + std::mutex* monitoring_buf_mtx; + std::chrono::milliseconds multicast_send_period_ms; + std::chrono::steady_clock::time_point last_send; + std::string local_merge_buf; + }; + class Services{ @@ -30,36 +65,35 @@ namespace ToolFramework { bool Init(Store &m_variables, zmq::context_t* context_in, SlowControlCollection* sc_vars_in, bool new_service=false); bool Ready(const unsigned int timeout=10000); // default service discovery broadcast period is 5s, middleman also checks intermittently, compound total time should be <10s... - bool SQLQuery(/*const std::string& database,*/ const std::string& query, std::vector& responses, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); - bool SQLQuery(/*const std::string& database,*/ const std::string& query, std::string& response, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); - bool SQLQuery(/*const std::string& database,*/ const std::string& query, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); + bool SQLQuery(const std::string& query, std::vector& responses, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); + bool SQLQuery(const std::string& query, std::string& response, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); + bool SQLQuery(const std::string& query, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); - bool SendLog(const std::string& message, unsigned int severity=2, const std::string& device="", const uint64_t timestamp=0); - bool SendAlarm(const std::string& message, unsigned int level=0, const std::string& device="", const uint64_t timestamp=0, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); + // public interface methods - push into local buffer + bool SendLog(const std::string& message, LogLevel severity=LogLevel::Message, const std::string& device="", const uint64_t timestamp=0); bool SendMonitoringData(const std::string& json_data, const std::string& subject, const std::string& device="", uint64_t timestamp=0); + + bool SendAlarm(const std::string& message, bool critical=false, const std::string& device="", const uint64_t timestamp=0, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); bool SendCalibrationData(const std::string& json_data, const std::string& description, const std::string& device="", uint64_t timestamp=0, int* version=nullptr, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); bool GetCalibrationData(std::string& json_data, int& version, const std::string& device="", const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); bool GetCalibrationData(std::string& json_data, int&& version=-1, const std::string& device="", const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); bool SendDeviceConfig(const std::string& json_data, const std::string& author, const std::string& description, const std::string& device="", uint64_t timestamp=0, int* version=nullptr, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); - bool SendRunConfig(const std::string& json_data, const std::string& name, const std::string& author, const std::string& description, uint64_t timestamp=0, int* version=nullptr, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); + bool SendBaseConfig(const std::string& json_data, const std::string& name, const std::string& author, const std::string& description, uint64_t timestamp=0, int* version=nullptr, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); + bool SendRunModeConfig(const std::string& json_data, const std::string& name, const std::string& author, const std::string& description, uint64_t timestamp=0, int* version=nullptr, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); bool GetDeviceConfig(std::string& json_data, const int version=-1, const std::string& device="", const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); - bool GetRunConfig(std::string& json_data, const int config_id, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); - bool GetRunConfig(std::string& json_data, const std::string& name, const int version=-1, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); - bool GetRunDeviceConfig(std::string& json_data, const int runconfig_id, const std::string& device="", int* version=nullptr, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); - bool GetRunDeviceConfig(std::string& json_data, const std::string& runconfig_name, const int runconfig_version=-1, const std::string& device="", int* version=nullptr, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); - // FIXME is default lifetime 5 ok? - bool SendROOTplotZmq(const std::string& plot_name, const std::string& draw_options, const std::string& json_data, int* version=nullptr, const uint64_t timestamp=0, const unsigned int lifetime=5, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); - // FIXME is default lifetime 5 ok? - bool SendROOTplotMulticast(const std::string& plot_name, const std::string& draw_options, const std::string& json_data, const unsigned int lifetime=5, const uint64_t timestamp=SERVICES_DEFAULT_TIMEOUT); + bool GetRunConfig(std::string& json_data, const int base_config_id, const int runmode_config_id, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); + bool GetRunModeConfig(std::string& json_data, const std::string& runmode_name, const int version=-1, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); + bool GetRunDeviceConfig(std::string& json_data, const int base_config_id, const int runmode_config_id, const std::string& device="", int* version=nullptr, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); + bool GetCachedDeviceConfig(std::string& json_data, const int base_config_id, const int runmode_config_id, const std::string& device="", int* version=nullptr, unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); + bool SendROOTplot(const std::string& plot_name, const std::string& draw_options, const std::string& json_data, int* version=nullptr, const uint64_t timestamp=0, const unsigned int lifetime=5, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); + bool SendROOTplotMulticast(const std::string& plot_name, const std::string& draw_options, const std::string& json_data, const unsigned int lifetime=5, const uint64_t timestamp=0); bool GetROOTplot(const std::string& plot_name, std::string& draw_option, std::string& json_data, int& version, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); bool GetROOTplot(const std::string& plot_name, std::string& draw_option, std::string& json_data, int&& version=-1, const unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); - // FIXME is default lifetime 5 ok? bool SendPlotlyPlot(const std::string& name, const std::string& json_trace, const std::string& json_layout="{}", int* version=nullptr, uint64_t timestamp=0, const unsigned int lifetime=5, unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); - // FIXME is default lifetime 5 ok? bool SendPlotlyPlot(const std::string& name, const std::vector& json_traces, const std::string& json_layout="{}", int* version=nullptr, uint64_t timestamp=0, const unsigned int lifetime=5, unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); bool GetPlotlyPlot(const std::string& name, std::string& json_trace, std::string& json_layout, int& version, unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); bool GetPlotlyPlot(const std::string& name, std::string& json_trace, std::string& json_layout, int&& version=-1, unsigned int timeout=SERVICES_DEFAULT_TIMEOUT); - std::string TimeStringFromUnixMs(const uint64_t time); + static std::string TimeStringFromUnixMs(const uint64_t time); SlowControlCollection* GetSlowControlCollection(); SlowControlElement* GetSlowControlVariable(std::string key); @@ -72,6 +106,7 @@ namespace ToolFramework { std::string PrintSlowControlVariables(); std::string GetDeviceName(); + void SetVerbose(bool in); template T GetSlowControlValue(std::string name){ return (*sc_vars)[name]->GetValue(); @@ -81,17 +116,31 @@ namespace ToolFramework { private: + std::string LoadConfig1(const char* sc_name); void LoadConfig2(const char* alert, const char* payload); - std::string LoadConfig1(const char* payload); + // private methods for sending from buffer + bool SendLog(std::string& msg); + bool SendMonitoringData(std::string& msg); + static void BufferThread(Thread_args* args); + std::string m_name; + bool m_verbose; zmq::context_t* m_context; ServicesBackend m_backend_client; - std::string m_dbname; - std::string m_name; std::string m_local_config; uint64_t m_base_config_id; uint64_t m_run_mode_config_id; + Utilities m_utils; + BufferThreadArgs thread_args; + + std::vector logging_buf; + std::unordered_map monitoring_buf; + std::mutex logging_buf_mtx; + std::mutex monitoring_buf_mtx; + uint32_t mon_merge_period_ms; + uint32_t multicast_send_period_ms; + }; } diff --git a/src/ServiceDiscovery/ServicesBackend.cpp b/src/ServiceDiscovery/ServicesBackend.cpp index fa5b935..4dfe980 100644 --- a/src/ServiceDiscovery/ServicesBackend.cpp +++ b/src/ServiceDiscovery/ServicesBackend.cpp @@ -1,8 +1,12 @@ #include "ServicesBackend.h" +namespace { + const uint32_t MAX_UDP_PACKET_SIZE = 655355; +} + using namespace ToolFramework; -Command::Command(std::string command_in, char type_in, std::string topic_in, const uint32_t timeout_ms_in){ +Command::Command(const std::string& command_in, char type_in, const std::string& topic_in, const uint32_t timeout_ms_in){ command = command_in; type = type_in; // TODO type is unnecessary, could just use topic[0] topic=topic_in; @@ -119,6 +123,9 @@ bool ServicesBackend::Initialise(Store &variables_in){ if(!m_variables.Get("max_retries",max_retries)) max_retries = 3; int advertise_endpoints = 1; m_variables.Get("advertise_endpoints",advertise_endpoints); + int msg_compression=1; + m_variables.Get("msg_compression",msg_compression); + m_variables.Get("compression_level",compression_level); get_ok = InitZMQ(); if(not get_ok) return false; @@ -132,36 +139,9 @@ bool ServicesBackend::Initialise(Store &variables_in){ if(not get_ok) return false; } - /* Time Tracking */ - /* ----------------------------------------- */ - - // time to wait between resend attempts if not ack'd - int resend_period_ms = 1000; - // how often to print out stats on what we're sending - int print_stats_period_ms = 5000; - - // Update with user-specified values. - m_variables.Get("resend_period_ms",resend_period_ms); - m_variables.Get("print_stats_period_ms",print_stats_period_ms); - - // convert times to boost for easy handling - resend_period = boost::posix_time::milliseconds(resend_period_ms); - print_stats_period = boost::posix_time::milliseconds(print_stats_period_ms); - - // initialise 'last send' times - last_write = boost::posix_time::microsec_clock::universal_time(); - last_read = boost::posix_time::microsec_clock::universal_time(); - last_printout = boost::posix_time::microsec_clock::universal_time(); - - // get the hostname of this machine for monitoring stats - char buf[255]; - get_ok = gethostname(buf, 255); - if(get_ok!=0){ - std::cerr<<"Error getting hostname!"< locker(msg_buf_mtx, std::defer_lock); + if(zstd_ctx){ + locker.lock(); + bytes_to_send = ZSTD_compressCCtx(zstd_ctx, compressed_msg_buf, MAX_UDP_PACKET_SIZE, command.data(), command.size(), compression_level); + if(ZSTD_isError(bytes_to_send)){ + locker.unlock(); + std::string errmsg = std::string{"Warning: error compressing multicast message "}+ZSTD_getErrorName(bytes_to_send); + Log(errmsg,v_error,verbosity); // XXX should send to MM uncompressed, along with other errors + if(err) *err= errmsg; + } else { + msg_to_send = compressed_msg_buf; + } + } + if(!msg_to_send){ + msg_to_send = const_cast(command.c_str()); + bytes_to_send = command.length(); + } + /* // check for listeners...? - seems redundant, multicast can always send zmq::poll(&multicast_poller,1, 0); // timeout 0 = return immediately... @@ -413,7 +413,7 @@ bool ServicesBackend::SendMulticast(MulticastType type, std::string command, std // got a listener - ship it socket_mtx->lock(); - int cnt = sendto(multicast_socket, command.c_str(), command.length()+1, 0, (struct sockaddr*)multicast_addr, multicast_addrlen); + int cnt = sendto(multicast_socket, msg_to_send, bytes_to_send, 0, (struct sockaddr*)multicast_addr, multicast_addrlen); socket_mtx->unlock(); if(cnt < 0){ std::string errmsg = "Error sending multicast message: "+std::string{strerror(errno)}; @@ -459,12 +459,33 @@ bool ServicesBackend::SendCommand(const std::string& topic, const std::string& c return false; } - // In the case of pub sockets, we may also want to specify a 'topic' that the recipient - // can use with ZMQ_SUBSCRIBE to filter out particular messages. + // compress the message if applicable + msg_to_send=nullptr; + std::unique_lock locker(msg_buf_mtx, std::defer_lock); + if(zstd_ctx){ + locker.lock(); + bytes_to_send = ZSTD_compressCCtx(zstd_ctx, compressed_msg_buf, MAX_UDP_PACKET_SIZE, command.data(), command.size(), compression_level); + if(ZSTD_isError(bytes_to_send)){ + locker.unlock(); + std::string errmsg = std::string{"Warning: error compressing multicast message "}+ZSTD_getErrorName(bytes_to_send); + Log(errmsg,v_error,verbosity); // XXX should send to MM uncompressed, along with other errors + if(err) *err= errmsg; + } else { + msg_to_send = compressed_msg_buf; + } + } + if(!msg_to_send){ + msg_to_send = const_cast(command.c_str()); + bytes_to_send = command.length(); + } + + // In the case of pub sockets, we specify a 'topic' that the recipient can use with ZMQ_SUBSCRIBE + // to filter out particular messages. // In fact it's useful to indicate a topic in all cases, even when the actual message will // (for now) go over a dealer/router combination that cannot filter on the topic. // forward the timeout to the Command (and thus zmq::poll in PollAndSend...) ... is this sensible? HMMMMM FIXME - Command cmd{command, type, topic,timeout}; + Command cmd{std::string(msg_to_send,bytes_to_send), type, topic,timeout}; + if(locker.owns_lock()) locker.unlock(); // must check or it throws an exception // wrap our attempt to get the response in try/catch, just in case? try { @@ -821,6 +842,9 @@ bool ServicesBackend::Finalise(){ waiting_senders = std::queue>>{}; waiting_recipients.clear(); + // cleanup zmq compression context + if(zstd_ctx) ZSTD_freeCCtx(zstd_ctx); + // can't use 'Log' since we may have deleted the Logging class if(verbosity>3) std::cout<<"ServicesBackend finalise done"< // multicast #include // multicast #include +#include namespace ToolFramework { struct Command { - Command(std::string command_in, char cmd_type_in, std::string topic_in, const uint32_t timeout_ms_in); + Command(const std::string& command_in, char cmd_type_in, const std::string& topic_in, const uint32_t timeout_ms_in); Command(const Command& cmd_in); // copy constructor Command(Command&& cmd_in); // move constructor @@ -156,6 +157,16 @@ class ServicesBackend { std::atomic msg_id{0}; + // used in SendMulticast + size_t bytes_to_send; + char* msg_to_send=nullptr; + + ZSTD_CCtx* zstd_ctx=nullptr; + int compression_level=1; + char* compressed_msg_buf=nullptr; + std::mutex msg_buf_mtx; // we'll share this buffer, since it's kind of a large buffer to keep allocating for every call + // and we shouldn't be spamming the Send calls so fast mutex contention becomes a problem + // ======================================================= // zmq helper functions diff --git a/src/ToolDAQChain/ToolDAQChain.cpp b/src/ToolDAQChain/ToolDAQChain.cpp index b347820..df77f6b 100644 --- a/src/ToolDAQChain/ToolDAQChain.cpp +++ b/src/ToolDAQChain/ToolDAQChain.cpp @@ -173,11 +173,17 @@ void ToolDAQChain::Init(unsigned int IO_Threads){ m_DAQdata->services= new Services(); m_DAQdata->services->Init(m_data->vars, m_DAQdata->context, &m_DAQdata->sc_vars, true); DAQLogging* tmp = reinterpret_cast(m_log); - using std::placeholders::_1; - using std::placeholders::_2; - using std::placeholders::_3; - using std::placeholders::_4; - tmp->SetSendLog(std::bind(&Services::SendLog, m_DAQdata->services, _1, _2, _3, _4)); + //using std::placeholders::_1; + //using std::placeholders::_2; + //using std::placeholders::_3; + //using std::placeholders::_4; + //tmp->SetSendLog(std::bind(&Services::SendLog, m_DAQdata->services, _1, _2, _3, _4)); + + // can't use std::bind because Services::SendLog is overloaded + //tmp->SetSendLog(std::bind(static_cast(&Services::SendLog), m_DAQdata->services, _1, _2, _3, _4)); + // use Ben's favourite feature of c++11 instead + tmp->SetSendLog([this](const std::string& m, unsigned int ll, const std::string& d, const unsigned int ts)->bool { return m_DAQdata->services->SendLog(m, LogLevel(ll), d, ts); }); + // tmp->SetSendLog(m_DAQdata->services->SendLog); }