diff --git a/CMakeLists.txt b/CMakeLists.txt index f942424aeb..ed3c664e1a 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -13,7 +13,7 @@ include(AddQCWorkflow) # ---- Project ---- project(QualityControl - VERSION 1.128.0 + VERSION 1.129.0 DESCRIPTION "O2 Data Quality Control Framework" LANGUAGES C CXX) diff --git a/Framework/CMakeLists.txt b/Framework/CMakeLists.txt index 2f5863b68d..914df7f046 100644 --- a/Framework/CMakeLists.txt +++ b/Framework/CMakeLists.txt @@ -323,7 +323,7 @@ endforeach() foreach(t testWorkflow testTaskRunner testCheckWorkflow testPostProcessingConfig testPostProcessingInterface - testCheck testTrendingTask) + testCheck testTrendingTask testCustomParameters) target_sources(${t} PRIVATE ${CMAKE_BINARY_DIR}/getTestDataDirectory.cxx) target_include_directories(${t} PRIVATE ${CMAKE_SOURCE_DIR}) diff --git a/Framework/basic.json b/Framework/basic.json index 942014ceff..9bf3ca6c98 100644 --- a/Framework/basic.json +++ b/Framework/basic.json @@ -39,6 +39,7 @@ "tasks": { "QcTask": { "active": "true", + "critical": "false", "": "if false the task is allowed to die without stopping the workflow, default: true", "className": "o2::quality_control_modules::skeleton::SkeletonTask", "moduleName": "QcSkeleton", "detectorName": "TST", @@ -105,8 +106,7 @@ "fraction": "0.1", "seed": "1234" } - ], - "blocking": "false" + ] } ] } diff --git a/Framework/include/QualityControl/CustomParameters.h b/Framework/include/QualityControl/CustomParameters.h index b5599c2f96..0543de1d5b 100644 --- a/Framework/include/QualityControl/CustomParameters.h +++ b/Framework/include/QualityControl/CustomParameters.h @@ -22,6 +22,7 @@ #include #include #include +#include namespace o2::quality_control::core { @@ -167,6 +168,12 @@ class CustomParameters */ friend std::ostream& operator<<(std::ostream& out, const CustomParameters& customParameters); + /** + * \brief Provided the config subtree of the custom parameters, load its content and populate this CustomParameters. + * \param paramsTree The subtree corresponding to extendedTaskParameters, extendedCheckParameters, etc... + */ + void populateCustomParameters(const boost::property_tree::ptree& paramsTree); + private: CustomParametersType mCustomParameters; }; diff --git a/Framework/include/QualityControl/InfrastructureGenerator.h b/Framework/include/QualityControl/InfrastructureGenerator.h index cc102fd150..09484af311 100644 --- a/Framework/include/QualityControl/InfrastructureGenerator.h +++ b/Framework/include/QualityControl/InfrastructureGenerator.h @@ -219,9 +219,13 @@ class InfrastructureGenerator static void generateMergers(framework::WorkflowSpec& workflow, const std::string& taskName, size_t numberOfLocalMachines, std::vector> cycleDurationSeconds, - const std::string& mergingMode, size_t resetAfterCycles, - std::string monitoringUrl, const std::string& detectorName, - std::vector mergersPerLayer, bool enableMovingWindows); + const std::string& mergingMode, + size_t resetAfterCycles, + std::string monitoringUrl, + const std::string& detectorName, + std::vector mergersPerLayer, + bool enableMovingWindows, + bool critical); static void generateCheckRunners(framework::WorkflowSpec& workflow, const InfrastructureSpec& infrastructureSpec); static void generateAggregator(framework::WorkflowSpec& workflow, const InfrastructureSpec& infrastructureSpec); static void generatePostProcessing(framework::WorkflowSpec& workflow, const InfrastructureSpec& infrastructureSpec); diff --git a/Framework/include/QualityControl/PostProcessingConfig.h b/Framework/include/QualityControl/PostProcessingConfig.h index dbaff6f3b5..eb128ba549 100644 --- a/Framework/include/QualityControl/PostProcessingConfig.h +++ b/Framework/include/QualityControl/PostProcessingConfig.h @@ -17,6 +17,8 @@ #ifndef QUALITYCONTROL_POSTPROCESSINGCONFIG_H #define QUALITYCONTROL_POSTPROCESSINGCONFIG_H +#include "QualityControl/CustomParameters.h" + #include #include #include @@ -45,6 +47,8 @@ struct PostProcessingConfig { std::string consulUrl; core::Activity activity; bool matchAnyRunNumber = false; + bool critical; + core::CustomParameters customParameters; }; } // namespace o2::quality_control::postprocessing diff --git a/Framework/include/QualityControl/PostProcessingInterface.h b/Framework/include/QualityControl/PostProcessingInterface.h index cf0afa77e3..a22985800a 100644 --- a/Framework/include/QualityControl/PostProcessingInterface.h +++ b/Framework/include/QualityControl/PostProcessingInterface.h @@ -17,6 +17,8 @@ #ifndef QUALITYCONTROL_POSTPROCESSINTERFACE_H #define QUALITYCONTROL_POSTPROCESSINTERFACE_H +#include "QualityControl/CustomParameters.h" + #include #include #include "QualityControl/Triggers.h" @@ -65,6 +67,7 @@ class PostProcessingInterface /// \param services Interface containing optional interfaces, for example DatabaseInterface virtual void finalize(Trigger trigger, framework::ServiceRegistryRef services) = 0; + void setCustomParameters(const core::CustomParameters& parameters); void setObjectsManager(std::shared_ptr objectsManager); void setID(const std::string& id); [[nodiscard]] const std::string& getID() const; @@ -73,6 +76,7 @@ class PostProcessingInterface protected: std::shared_ptr getObjectsManager(); + core::CustomParameters mCustomParameters; private: std::string mID; diff --git a/Framework/include/QualityControl/PostProcessingTaskSpec.h b/Framework/include/QualityControl/PostProcessingTaskSpec.h index 9b83baebbc..4847416eab 100644 --- a/Framework/include/QualityControl/PostProcessingTaskSpec.h +++ b/Framework/include/QualityControl/PostProcessingTaskSpec.h @@ -41,8 +41,10 @@ struct PostProcessingTaskSpec { std::string id = "Invalid"; std::string taskName = "Invalid"; bool active = true; + bool critical = true; std::string detectorName = "Invalid"; boost::property_tree::ptree tree = {}; + core::CustomParameters customParameters; }; } // namespace o2::quality_control::core diff --git a/Framework/include/QualityControl/TaskRunnerConfig.h b/Framework/include/QualityControl/TaskRunnerConfig.h index 48646c922d..e39a9aac2f 100644 --- a/Framework/include/QualityControl/TaskRunnerConfig.h +++ b/Framework/include/QualityControl/TaskRunnerConfig.h @@ -47,6 +47,7 @@ struct TaskRunnerConfig { std::string className; std::vector> cycleDurations = {}; int maxNumberCycles; + bool critical; std::string consulUrl{}; std::string conditionUrl{}; std::string monitoringUrl{}; diff --git a/Framework/include/QualityControl/TaskSpec.h b/Framework/include/QualityControl/TaskSpec.h index 9fb668b08b..5afe0af726 100644 --- a/Framework/include/QualityControl/TaskSpec.h +++ b/Framework/include/QualityControl/TaskSpec.h @@ -59,6 +59,7 @@ struct TaskSpec { DataSourceSpec dataSource; // advanced bool active = true; + bool critical = true; int maxNumberCycles = -1; size_t resetAfterCycles = 0; std::string saveObjectsToFile; diff --git a/Framework/postprocessing.json b/Framework/postprocessing.json index 7c2e8a29db..5362227b9d 100644 --- a/Framework/postprocessing.json +++ b/Framework/postprocessing.json @@ -42,6 +42,7 @@ "postprocessing": { "ExamplePostprocessing": { "active": "true", + "critical": "false", "": "if false the task is allowed to die without stopping the workflow, default: true", "className": "o2::quality_control_modules::skeleton::SkeletonPostProcessing", "moduleName": "QcSkeleton", "detectorName": "TST", diff --git a/Framework/src/AggregatorRunnerFactory.cxx b/Framework/src/AggregatorRunnerFactory.cxx index b9b5aa6a2a..a2d44f1a56 100644 --- a/Framework/src/AggregatorRunnerFactory.cxx +++ b/Framework/src/AggregatorRunnerFactory.cxx @@ -49,6 +49,8 @@ DataProcessorSpec AggregatorRunnerFactory::create(const core::CommonSpec& common }; newAggregatorRunner.labels.emplace_back(o2::framework::ecs::qcReconfigurable); newAggregatorRunner.labels.emplace_back(AggregatorRunner::getLabel()); + framework::DataProcessorLabel resilientLabel = { "resilient" }; + newAggregatorRunner.labels.emplace_back(resilientLabel); newAggregatorRunner.algorithm = adaptFromTask(std::move(aggregatorRunner)); return newAggregatorRunner; } diff --git a/Framework/src/CheckRunner.cxx b/Framework/src/CheckRunner.cxx index 83a05d8325..2142c695a8 100644 --- a/Framework/src/CheckRunner.cxx +++ b/Framework/src/CheckRunner.cxx @@ -357,6 +357,7 @@ QualityObjectsType CheckRunner::check() QualityObjectsType allQOs; for (auto& [checkName, check] : mChecks) { if (updatePolicyManager.isReady(check.getName())) { + ILOG(Debug, Support) << "Monitor Objects for the check '" << checkName << "' are ready --> check()" << ENDM; auto newQOs = check.check(mMonitorObjects); mTotalNumberCheckExecuted += newQOs.size(); @@ -366,7 +367,7 @@ QualityObjectsType CheckRunner::check() // Was checked, update latest revision updatePolicyManager.updateActorRevision(checkName); } else { - ILOG(Info, Support) << "Monitor Objects for the check '" << checkName << "' are not ready, ignoring" << ENDM; + ILOG(Debug, Support) << "Monitor Objects for the check '" << checkName << "' are not ready, ignoring" << ENDM; } } return allQOs; diff --git a/Framework/src/CheckRunnerFactory.cxx b/Framework/src/CheckRunnerFactory.cxx index 7d9e3f6276..fe6f2a41a2 100644 --- a/Framework/src/CheckRunnerFactory.cxx +++ b/Framework/src/CheckRunnerFactory.cxx @@ -45,6 +45,7 @@ DataProcessorSpec CheckRunnerFactory::create(CheckRunnerConfig checkRunnerConfig options }; newCheckRunner.labels.emplace_back(o2::framework::ecs::qcReconfigurable); newCheckRunner.labels.emplace_back(CheckRunner::getCheckRunnerLabel()); + newCheckRunner.labels.emplace_back(framework::DataProcessorLabel{ "resilient" }); newCheckRunner.algorithm = adaptFromTask(std::move(qcCheckRunner)); return newCheckRunner; } @@ -61,7 +62,7 @@ DataProcessorSpec CheckRunnerFactory::createSinkDevice(const CheckRunnerConfig& checkRunnerConfig.options, {}, { o2::framework::ecs::qcReconfigurable } }; - + newCheckRunner.labels.emplace_back(framework::DataProcessorLabel{ "resilient" }); return newCheckRunner; } diff --git a/Framework/src/CustomParameters.cxx b/Framework/src/CustomParameters.cxx index e703760ca5..7d69afa5a0 100644 --- a/Framework/src/CustomParameters.cxx +++ b/Framework/src/CustomParameters.cxx @@ -12,6 +12,7 @@ #include "QualityControl/CustomParameters.h" #include #include +#include #include #include @@ -33,7 +34,7 @@ std::ostream& operator<<(std::ostream& out, const CustomParameters& customParame CustomParameters::CustomParameters() { - mCustomParameters["default"]["default"] = {}; + mCustomParameters["null"]["null"] = {}; } void CustomParameters::set(const std::string& key, const std::string& value, const std::string& runType, const std::string& beamType) @@ -154,7 +155,7 @@ std::unordered_map::const_iterator CustomParameters::f std::unordered_map::const_iterator CustomParameters::end() const { - return mCustomParameters.at("default").at("default").end(); + return mCustomParameters.at("null").at("null").end(); } std::string CustomParameters::operator[](const std::string& key) const @@ -170,4 +171,15 @@ std::string& CustomParameters::operator[](const std::string& key) return mCustomParameters.at("default").at("default").at(key); } +void CustomParameters::populateCustomParameters(const boost::property_tree::ptree& tree) +{ + for (const auto& [runtype, subTreeRunType] : tree) { + for (const auto& [beamtype, subTreeBeamType] : subTreeRunType) { + for (const auto& [key, value] : subTreeBeamType) { + set(key, value.get_value(), runtype, beamtype); + } + } + } +} + } // namespace o2::quality_control::core \ No newline at end of file diff --git a/Framework/src/InfrastructureGenerator.cxx b/Framework/src/InfrastructureGenerator.cxx index f10b4c2636..dbc70961ff 100644 --- a/Framework/src/InfrastructureGenerator.cxx +++ b/Framework/src/InfrastructureGenerator.cxx @@ -144,7 +144,7 @@ framework::WorkflowSpec InfrastructureGenerator::generateFullChainInfrastructure bool enableMovingWindows = !taskSpec.movingWindows.empty(); generateMergers(workflow, taskSpec.taskName, 1, cycleDurationsMultiplied, taskSpec.mergingMode, resetAfterCycles, infrastructureSpec.common.monitoringUrl, - taskSpec.detectorName, taskSpec.mergersPerLayer, enableMovingWindows); + taskSpec.detectorName, taskSpec.mergersPerLayer, enableMovingWindows, taskSpec.critical); } else { // TaskLocationSpec::Remote auto taskConfig = TaskRunnerFactory::extractConfig(infrastructureSpec.common, taskSpec, 0, taskSpec.resetAfterCycles); workflow.emplace_back(TaskRunnerFactory::create(taskConfig)); @@ -277,9 +277,8 @@ o2::framework::WorkflowSpec InfrastructureGenerator::generateRemoteInfrastructur std::for_each(cycleDurationsMultiplied.begin(), cycleDurationsMultiplied.end(), [taskSpec](std::pair& p) { p.first *= taskSpec.mergerCycleMultiplier; }); bool enableMovingWindows = !taskSpec.movingWindows.empty(); - generateMergers(workflow, taskSpec.taskName, numberOfLocalMachines, cycleDurationsMultiplied, - taskSpec.mergingMode, resetAfterCycles, infrastructureSpec.common.monitoringUrl, - taskSpec.detectorName, taskSpec.mergersPerLayer, enableMovingWindows); + generateMergers(workflow, taskSpec.taskName, numberOfLocalMachines, cycleDurationsMultiplied, taskSpec.mergingMode, + resetAfterCycles, infrastructureSpec.common.monitoringUrl, taskSpec.detectorName, taskSpec.mergersPerLayer, enableMovingWindows, taskSpec.critical); } else if (taskSpec.location == TaskLocationSpec::Remote) { @@ -550,6 +549,9 @@ void InfrastructureGenerator::generateLocalTaskLocalProxy(framework::WorkflowSpe { proxyInput }, channelConfig.c_str())); workflow.back().labels.emplace_back(taskSpec.localControl == "odc" ? ecs::preserveRawChannelsLabel : ecs::uniqueProxyLabel); + if (!taskSpec.critical) { + workflow.back().labels.emplace_back(framework::DataProcessorLabel{ "expendable" }); + } if (getenv("O2_QC_KILL_PROXIES") != nullptr) { workflow.back().metadata.push_back(DataProcessorMetadata{ ecs::privateMemoryKillThresholdMB, proxyMemoryKillThresholdMB }); } @@ -577,6 +579,9 @@ void InfrastructureGenerator::generateLocalTaskRemoteProxy(framework::WorkflowSp channelConfig.c_str(), dplModelAdaptor()); proxy.labels.emplace_back(taskSpec.localControl == "odc" ? ecs::preserveRawChannelsLabel : ecs::uniqueProxyLabel); + if (!taskSpec.critical) { + workflow.back().labels.emplace_back(framework::DataProcessorLabel{ "expendable" }); + } // if not in RUNNING, we should drop all the incoming messages, we set the corresponding proxy option. enableDraining(proxy.options); if (getenv("O2_QC_KILL_PROXIES") != nullptr) { @@ -585,11 +590,9 @@ void InfrastructureGenerator::generateLocalTaskRemoteProxy(framework::WorkflowSp workflow.emplace_back(std::move(proxy)); } void InfrastructureGenerator::generateMergers(framework::WorkflowSpec& workflow, const std::string& taskName, - size_t numberOfLocalMachines, - std::vector> cycleDurations, - const std::string& mergingMode, size_t resetAfterCycles, - std::string monitoringUrl, const std::string& detectorName, - std::vector mergersPerLayer, bool enableMovingWindows) + size_t numberOfLocalMachines, std::vector> cycleDurations, + const std::string& mergingMode, size_t resetAfterCycles, std::string monitoringUrl, + const std::string& detectorName, std::vector mergersPerLayer, bool enableMovingWindows, bool critical) { Inputs mergerInputs; for (size_t id = 1; id <= numberOfLocalMachines; id++) { @@ -617,8 +620,9 @@ void InfrastructureGenerator::generateMergers(framework::WorkflowSpec& workflow, mergerConfig.topologySize = { TopologySize::MergersPerLayer, mergersPerLayer }; mergerConfig.monitoringUrl = std::move(monitoringUrl); mergerConfig.detectorName = detectorName; - mergerConfig.parallelismType = { (mergerConfig.inputObjectTimespan.value == InputObjectsTimespan::LastDifference) ? ParallelismType::RoundRobin : ParallelismType::SplitInputs }; + mergerConfig.labels.push_back({ "resilient" }); mergerConfig.publishMovingWindow = { enableMovingWindows ? PublishMovingWindow::Yes : PublishMovingWindow::No }; + mergerConfig.parallelismType = { (mergerConfig.inputObjectTimespan.value == InputObjectsTimespan::LastDifference) ? ParallelismType::RoundRobin : ParallelismType::SplitInputs }; mergersBuilder.setConfig(mergerConfig); mergersBuilder.generateInfrastructure(workflow); @@ -786,6 +790,10 @@ void InfrastructureGenerator::generatePostProcessing(WorkflowSpec& workflow, con ppTask.getOptions() }; dataProcessorSpec.labels.emplace_back(PostProcessingDevice::getLabel()); + if (!ppTaskSpec.critical) { + framework::DataProcessorLabel expendableLabel = { "expendable" }; + dataProcessorSpec.labels.emplace_back(expendableLabel); + } dataProcessorSpec.algorithm = adaptFromTask(std::move(ppTask)); workflow.emplace_back(std::move(dataProcessorSpec)); diff --git a/Framework/src/InfrastructureSpecReader.cxx b/Framework/src/InfrastructureSpecReader.cxx index 44fb1ee40a..cffa04de04 100644 --- a/Framework/src/InfrastructureSpecReader.cxx +++ b/Framework/src/InfrastructureSpecReader.cxx @@ -110,6 +110,7 @@ TaskSpec InfrastructureSpecReader::readSpecEntry(const std::string& ta } ts.dataSource = readSpecEntry(taskID, taskTree.get_child("dataSource"), wholeTree); ts.active = taskTree.get("active", ts.active); + ts.critical = taskTree.get("critical", ts.critical); ts.maxNumberCycles = taskTree.get("maxNumberCycles", ts.maxNumberCycles); ts.resetAfterCycles = taskTree.get("resetAfterCycles", ts.resetAfterCycles); ts.saveObjectsToFile = taskTree.get("saveObjectsToFile", ts.saveObjectsToFile); @@ -117,13 +118,7 @@ TaskSpec InfrastructureSpecReader::readSpecEntry(const std::string& ta ILOG(Warning, Devel) << "Both taskParameters and extendedTaskParameters are defined in the QC config file. We will use only extendedTaskParameters. " << ENDM; } if (taskTree.count("extendedTaskParameters") > 0) { - for (const auto& [runtype, subTreeRunType] : taskTree.get_child("extendedTaskParameters")) { - for (const auto& [beamtype, subTreeBeamType] : subTreeRunType) { - for (const auto& [key, value] : subTreeBeamType) { - ts.customParameters.set(key, value.get_value(), runtype, beamtype); - } - } - } + ts.customParameters.populateCustomParameters(taskTree.get_child("extendedTaskParameters")); } else if (taskTree.count("taskParameters") > 0) { for (const auto& [key, value] : taskTree.get_child("taskParameters")) { ts.customParameters.set(key, value.get_value()); @@ -316,13 +311,7 @@ CheckSpec InfrastructureSpecReader::readSpecEntry(const std::string& cs.active = checkTree.get("active", cs.active); if (checkTree.count("extendedCheckParameters") > 0) { - for (const auto& [runtype, subTreeRunType] : checkTree.get_child("extendedCheckParameters")) { - for (const auto& [beamtype, subTreeBeamType] : subTreeRunType) { - for (const auto& [key, value] : subTreeBeamType) { - cs.customParameters.set(key, value.get_value(), runtype, beamtype); - } - } - } + cs.customParameters.populateCustomParameters(checkTree.get_child("extendedCheckParameters")); } if (checkTree.count("checkParameters") > 0) { for (const auto& [key, value] : checkTree.get_child("checkParameters")) { @@ -356,13 +345,7 @@ AggregatorSpec InfrastructureSpecReader::readSpecEntry(const std as.active = aggregatorTree.get("active", as.active); if (aggregatorTree.count("extendedAggregatorParameters") > 0) { - for (const auto& [runtype, subTreeRunType] : aggregatorTree.get_child("extendedAggregatorParameters")) { - for (const auto& [beamtype, subTreeBeamType] : subTreeRunType) { - for (const auto& [key, value] : subTreeBeamType) { - as.customParameters.set(key, value.get_value(), runtype, beamtype); - } - } - } + as.customParameters.populateCustomParameters(aggregatorTree.get_child("extendedAggregatorParameters")); } if (aggregatorTree.count("aggregatorParameters") > 0) { for (const auto& [key, value] : aggregatorTree.get_child("aggregatorParameters")) { @@ -381,6 +364,7 @@ PostProcessingTaskSpec ppts.id = ppTaskId; ppts.taskName = ppTaskTree.get("taskName", ppts.id); ppts.active = ppTaskTree.get("active", ppts.active); + ppts.critical = ppTaskTree.get("critical", ppts.critical); ppts.detectorName = ppTaskTree.get("detectorName", ppts.detectorName); ppts.tree = wholeTree; diff --git a/Framework/src/PostProcessingConfig.cxx b/Framework/src/PostProcessingConfig.cxx index 87b8bd2271..21ac362ae7 100644 --- a/Framework/src/PostProcessingConfig.cxx +++ b/Framework/src/PostProcessingConfig.cxx @@ -15,6 +15,7 @@ /// #include "QualityControl/PostProcessingConfig.h" + #include namespace o2::quality_control::postprocessing @@ -36,7 +37,8 @@ PostProcessingConfig::PostProcessingConfig(const std::string& id, const boost::p config.get("qc.config.Activity.provenance", "qc"), { config.get("qc.config.Activity.start", 0), config.get("qc.config.Activity.end", -1) }), - matchAnyRunNumber(config.get("qc.config.postprocessing.matchAnyRunNumber", false)) + matchAnyRunNumber(config.get("qc.config.postprocessing.matchAnyRunNumber", false)), + critical(true) { for (const auto& initTrigger : config.get_child("qc.postprocessing." + id + ".initTrigger")) { initTriggers.push_back(initTrigger.second.get_value()); @@ -47,6 +49,10 @@ PostProcessingConfig::PostProcessingConfig(const std::string& id, const boost::p for (const auto& stopTrigger : config.get_child("qc.postprocessing." + id + ".stopTrigger")) { stopTriggers.push_back(stopTrigger.second.get_value()); } + auto ppTree = config.get_child("qc.postprocessing." + id); + if (ppTree.count("extendedTaskParameters")) { + customParameters.populateCustomParameters(ppTree.get_child("extendedTaskParameters")); + } } -} // namespace o2::quality_control::postprocessing \ No newline at end of file +} // namespace o2::quality_control::postprocessing diff --git a/Framework/src/PostProcessingFactory.cxx b/Framework/src/PostProcessingFactory.cxx index 8f30c9042e..600e6cc10a 100644 --- a/Framework/src/PostProcessingFactory.cxx +++ b/Framework/src/PostProcessingFactory.cxx @@ -27,7 +27,9 @@ namespace o2::quality_control::postprocessing // todo: consider having a common helper for each class which loads tasks like below (QC tasks, checks, pp) PostProcessingInterface* PostProcessingFactory::create(const PostProcessingConfig& config) { - return root_class_factory::create(config.moduleName, config.className); + auto* result = root_class_factory::create(config.moduleName, config.className); + result->setCustomParameters(config.customParameters); + return result; } } // namespace o2::quality_control::postprocessing \ No newline at end of file diff --git a/Framework/src/PostProcessingInterface.cxx b/Framework/src/PostProcessingInterface.cxx index 545e5cfdac..e85415a0e4 100644 --- a/Framework/src/PostProcessingInterface.cxx +++ b/Framework/src/PostProcessingInterface.cxx @@ -50,4 +50,9 @@ std::shared_ptr PostProcessingInterface::getObjectsManager return mObjectsManager; } +void PostProcessingInterface::setCustomParameters(const core::CustomParameters& parameters) +{ + mCustomParameters = parameters; +} + } // namespace o2::quality_control::postprocessing diff --git a/Framework/src/PostProcessingRunner.cxx b/Framework/src/PostProcessingRunner.cxx index 4a5b4e2850..8e18177a1e 100644 --- a/Framework/src/PostProcessingRunner.cxx +++ b/Framework/src/PostProcessingRunner.cxx @@ -106,6 +106,7 @@ void PostProcessingRunner::init(const PostProcessingRunnerConfig& runnerConfig, mTaskState = TaskState::Created; mTask->setID(mTaskConfig.id); mTask->setName(mTaskConfig.taskName); + mTask->setCustomParameters(mTaskConfig.customParameters); mTask->configure(mRunnerConfig.configTree); } else { throw std::runtime_error("Failed to create the task '" + mTaskConfig.taskName + "' (det " + mTaskConfig.detectorName + ")"); diff --git a/Framework/src/TaskRunner.cxx b/Framework/src/TaskRunner.cxx index 7024ba7031..e28d119de6 100644 --- a/Framework/src/TaskRunner.cxx +++ b/Framework/src/TaskRunner.cxx @@ -413,6 +413,7 @@ void TaskRunner::printTaskConfig() const << " / Module name : " << mTaskConfig.moduleName // << " / Detector name : " << mTaskConfig.detectorName // << " / Max number cycles : " << mTaskConfig.maxNumberCycles // + << " / critical : " << mTaskConfig.critical // << " / Save to file : " << mTaskConfig.saveToFile << " / Cycle duration seconds : "; for (auto& [cycleDuration, period] : mTaskConfig.cycleDurations) { diff --git a/Framework/src/TaskRunnerFactory.cxx b/Framework/src/TaskRunnerFactory.cxx index 4a692dfb92..3bd046660e 100644 --- a/Framework/src/TaskRunnerFactory.cxx +++ b/Framework/src/TaskRunnerFactory.cxx @@ -53,6 +53,10 @@ o2::framework::DataProcessorSpec TaskRunnerFactory::create(const TaskRunnerConfi }; newTask.labels.emplace_back(o2::framework::ecs::qcReconfigurable); newTask.labels.emplace_back(TaskRunner::getTaskRunnerLabel()); + if (!taskConfig.critical) { + framework::DataProcessorLabel expendableLabel = { "expendable" }; + newTask.labels.emplace_back(expendableLabel); + } return newTask; } @@ -144,6 +148,7 @@ TaskRunnerConfig TaskRunnerFactory::extractConfig(const CommonSpec& globalConfig taskSpec.className, multipleCycleDurations, taskSpec.maxNumberCycles, + taskSpec.critical, globalConfig.consulUrl, globalConfig.conditionDBUrl, globalConfig.monitoringUrl, diff --git a/Framework/test/testCustomParameters.cxx b/Framework/test/testCustomParameters.cxx index 1b47892ca8..d1678c4b26 100644 --- a/Framework/test/testCustomParameters.cxx +++ b/Framework/test/testCustomParameters.cxx @@ -21,6 +21,9 @@ #define BOOST_TEST_DYN_LINK #include +#include +#include +#include "getTestDataDirectory.h" using namespace o2::quality_control::core; using namespace std; @@ -183,6 +186,27 @@ BOOST_AUTO_TEST_CASE(test_cp_new_access_pattern) } } +BOOST_AUTO_TEST_CASE(test_load_from_ptree) +{ + boost::property_tree::ptree jsontree; + std::string configFilePath = std::string(getTestDataDirectory()) + "testWorkflow.json"; + + boost::property_tree::read_json(configFilePath, jsontree); + + string v0 = jsontree.get("qc.tasks.skeletonTask.extendedTaskParameters.default.default.myOwnKey"); + + boost::property_tree::ptree params = jsontree.get_child("qc.tasks.skeletonTask.extendedTaskParameters"); + + CustomParameters cp; + cp.populateCustomParameters(params); + + cout << cp << endl; + + BOOST_CHECK_EQUAL(cp.at("myOwnKey"), "myOwnValue"); + BOOST_CHECK_EQUAL(cp.at("myOwnKey1", "PHYSICS"), "myOwnValue1b"); + BOOST_CHECK_EQUAL(cp.atOptional("asdf").has_value(), false); +} + BOOST_AUTO_TEST_CASE(test_default_if_not_found_at_optional) { CustomParameters cp; diff --git a/Framework/test/testTaskInterface.cxx b/Framework/test/testTaskInterface.cxx index a8e48bac05..77cbc1de19 100644 --- a/Framework/test/testTaskInterface.cxx +++ b/Framework/test/testTaskInterface.cxx @@ -173,6 +173,7 @@ TEST_CASE("test_task_factory") "o2::quality_control_modules::skeleton::SkeletonTask", { { 10, 1 } }, -1, + true, "" }; diff --git a/Framework/test/testWorkflow.json b/Framework/test/testWorkflow.json index 1106f50498..f3f636b009 100644 --- a/Framework/test/testWorkflow.json +++ b/Framework/test/testWorkflow.json @@ -25,7 +25,20 @@ "type": "dataSamplingPolicy", "name": "test-policy" }, - "taskParameters": {}, + "extendedTaskParameters": { + "default": { + "default": { + "myOwnKey": "myOwnValue", + "myOwnKey2": "myOwnValue2" + } + }, + "PHYSICS": { + "default": { + "myOwnKey1": "myOwnValue1b", + "myOwnKey2": "myOwnValue2b" + } + } + }, "location": "remote", "localMachines": [] } diff --git a/Modules/Common/src/BigScreen.cxx b/Modules/Common/src/BigScreen.cxx index ce0edb9140..2016eb530b 100644 --- a/Modules/Common/src/BigScreen.cxx +++ b/Modules/Common/src/BigScreen.cxx @@ -20,6 +20,7 @@ #include "QualityControl/MonitorObject.h" #include "QualityControl/DatabaseInterface.h" #include "QualityControl/ObjectMetadataKeys.h" +#include "QualityControl/ActivityHelpers.h" #include #include #include @@ -105,19 +106,27 @@ void BigScreen::initialize(quality_control::postprocessing::Trigger t, framework static std::pair, bool> getQO(repository::DatabaseInterface& qcdb, Trigger t, BigScreenConfig::DataSource& source, long notOlderThan, bool ignoreActivity) { - // retrieve MO from CCDB - do not associate to trigger activity if ignoreActivity is true - auto qo = ignoreActivity ? qcdb.retrieveQO(source.path, t.timestamp, {}) : qcdb.retrieveQO(source.path, t.timestamp, t.activity); - if (!qo) { + // find the time-stamp of the most recent object matching the current activity + // if ignoreActivity is true the activity matching criteria are not applied + Activity activity = ignoreActivity ? Activity{} : t.activity; + auto timestamp = t.timestamp; + const auto objFullPath = t.activity.mProvenance + "/" + source.path; + const auto filterMetadata = activity_helpers::asDatabaseMetadata(activity, false); + const auto objectValidity = qcdb.getLatestObjectValidity(objFullPath, filterMetadata); + if (objectValidity.isValid()) { + timestamp = objectValidity.getMax() - 1; + } else { + ILOG(Warning, Devel) << "Could not find an object '" << objFullPath << "' for activity " << activity << ENDM; return { nullptr, false }; } - // get the MO creation time stamp - long timeStamp{ 0 }; - auto iter = qo->getMetadataMap().find(repository::metadata_keys::created); - if (iter != qo->getMetadataMap().end()) { - timeStamp = std::stol(iter->second); + + // retrieve QO from CCDB - do not associate to trigger activity if ignoreActivity is true + auto qo = qcdb.retrieveQO(source.path, timestamp, activity); + if (!qo) { + return { nullptr, false }; } - long elapsed = static_cast(t.timestamp) - timeStamp; + long elapsed = static_cast(t.timestamp) - timestamp; // check if the object is not older than a given number of milliseconds if (elapsed > notOlderThan) { return { qo, false }; diff --git a/Modules/GLO/src/ITSTPCMatchingTask.cxx b/Modules/GLO/src/ITSTPCMatchingTask.cxx index 60e61e03fa..d4d12b4829 100644 --- a/Modules/GLO/src/ITSTPCMatchingTask.cxx +++ b/Modules/GLO/src/ITSTPCMatchingTask.cxx @@ -62,7 +62,7 @@ void ITSTPCMatchingTask::initialize(o2::framework::InitContext& /*ctx*/) } if (auto param = mCustomParameters.find("maxChi2PerClusterITS"); param != mCustomParameters.end()) { ILOG(Debug, Devel) << "Custom parameter - maxChi2PerClusterITS (for track selection): " << param->second << ENDM; - mMatchITSTPCQC.setMaxChi2PerClusterITS(atoi(param->second.c_str())); + mMatchITSTPCQC.setMaxChi2PerClusterITS(atof(param->second.c_str())); } // TO DO: define an agreed way to implement the setter for ITS matching (min. # layers, which layers) // [...] --> exploit the method TrackCuts::setRequireHitsInITSLayers(...) diff --git a/Modules/ITS/include/ITS/ITSFhrTask.h b/Modules/ITS/include/ITS/ITSFhrTask.h index deb911c196..bd53cd0b92 100644 --- a/Modules/ITS/include/ITS/ITSFhrTask.h +++ b/Modules/ITS/include/ITS/ITSFhrTask.h @@ -115,6 +115,7 @@ class ITSFhrTask final : public TaskInterface double** mChipZ /* = new double*[NStaves[lay]]*/; // IB/OB : mChipZ[Stave][chip] int** mChipStat /* = new double*[NStaves[lay]]*/; // IB/OB : mChipStat[Stave][chip] + int* mActiveChips /* = new int[NStaves[lay]]*/; int mNoisyPixelNumber[7][48] = { { 0 } }; int mMaxGeneralAxisRange = -3; // the range of TH2Poly plots z axis range, pow(10, mMinGeneralAxisRange) ~ pow(10, mMaxGeneralAxisRange) @@ -131,6 +132,7 @@ class ITSFhrTask final : public TaskInterface TH1D* mErrorPlots; TH2I* mErrorVsFeeid; TH2Poly* mGeneralOccupancy; // Max Occuapncy(chip/hic) in one stave + TH2Poly* mEmptyLanesFraction; TH2Poly* mGeneralNoisyPixel; // Noisy pixel number in one stave // Occupancy and hit-map diff --git a/Modules/ITS/src/ITSFhrTask.cxx b/Modules/ITS/src/ITSFhrTask.cxx index 031836f07f..1e1219700a 100644 --- a/Modules/ITS/src/ITSFhrTask.cxx +++ b/Modules/ITS/src/ITSFhrTask.cxx @@ -44,6 +44,7 @@ ITSFhrTask::ITSFhrTask() ITSFhrTask::~ITSFhrTask() { delete mGeneralOccupancy; + delete mEmptyLanesFraction; delete mGeneralNoisyPixel; delete mDecoder; delete mChipDataBuffer; @@ -82,6 +83,7 @@ ITSFhrTask::~ITSFhrTask() delete[] mChipStat; delete[] mErrorCount; delete[] mHitPixelID_InStave; + delete[] mActiveChips; } void ITSFhrTask::initialize(o2::framework::InitContext& /*ctx*/) @@ -95,6 +97,13 @@ void ITSFhrTask::initialize(o2::framework::InitContext& /*ctx*/) mGeneralOccupancy->SetMinimum(pow(10, mMinGeneralAxisRange)); mGeneralOccupancy->SetMaximum(pow(10, mMaxGeneralAxisRange)); + mEmptyLanesFraction = new TH2Poly(); + mEmptyLanesFraction->SetTitle("Fraction of Empty Lanes;mm (IB 3x);mm (IB 3x)"); + mEmptyLanesFraction->SetName("General/EmptyLanes_Fraction"); + mEmptyLanesFraction->SetStats(0); + mEmptyLanesFraction->SetMinimum(0.); + mEmptyLanesFraction->SetMaximum(1.); + mGeneralNoisyPixel = new TH2Poly(); mGeneralNoisyPixel->SetTitle("Noisy Pixel Number;mm (IB 3x);mm (IB 3x)"); mGeneralNoisyPixel->SetName("General/Noisy_Pixel"); @@ -124,6 +133,8 @@ void ITSFhrTask::initialize(o2::framework::InitContext& /*ctx*/) mChipStat = new int*[NStaves[mLayer]]; mErrorCount = new int**[NStaves[mLayer]]; + mActiveChips = new int[NStaves[mLayer]]; + for (int ilayer = 0; ilayer < 7; ilayer++) { for (int istave = 0; istave < NStaves[ilayer]; istave++) { double* px = new double[4]; @@ -136,12 +147,16 @@ void ITSFhrTask::initialize(o2::framework::InitContext& /*ctx*/) } } mGeneralOccupancy->AddBin(4, px, py); + mEmptyLanesFraction->AddBin(4, px, py); mGeneralNoisyPixel->AddBin(4, px, py); } } if (mGeneralOccupancy) { getObjectsManager()->startPublishing(mGeneralOccupancy); } + if (mEmptyLanesFraction) { + getObjectsManager()->startPublishing(mEmptyLanesFraction); + } if (mGeneralNoisyPixel) { getObjectsManager()->startPublishing(mGeneralNoisyPixel); } @@ -167,7 +182,7 @@ void ITSFhrTask::initialize(o2::framework::InitContext& /*ctx*/) } } } - // define the hitnumber and occupancy array + // define the hitnumber and occupancy array; initialize to zero the number of active chips per stave if (mLayer < NLayerIB) { for (int istave = 0; istave < NStaves[mLayer]; istave++) { mHitnumberLane[istave] = new int[nChipsPerHic[mLayer]]; @@ -180,6 +195,7 @@ void ITSFhrTask::initialize(o2::framework::InitContext& /*ctx*/) for (int ihic = 0; ihic < nHicPerStave[mLayer]; ihic++) { mHitPixelID_InStave[istave][ihic] = new std::unordered_map[nChipsPerHic[mLayer]]; } + mActiveChips[istave] = 0; for (int ichip = 0; ichip < nChipsPerHic[mLayer]; ichip++) { mHitnumberLane[istave][ichip] = 0; mOccupancyLane[istave][ichip] = 0; @@ -203,6 +219,7 @@ void ITSFhrTask::initialize(o2::framework::InitContext& /*ctx*/) for (int ihic = 0; ihic < nHicPerStave[mLayer]; ihic++) { mHitPixelID_InStave[istave][ihic] = new std::unordered_map[nChipsPerHic[mLayer]]; } + mActiveChips[istave] = 0; for (int ichip = 0; ichip < nHicPerStave[mLayer] * nChipsPerHic[mLayer]; ichip++) { mChipPhi[istave][ichip] = 0; mChipZ[istave][ichip] = 0; @@ -409,10 +426,12 @@ void ITSFhrTask::monitorData(o2::framework::ProcessingContext& ctx) if (mLayer < NLayerIB) { for (int istave = 0; istave < NStaves[mLayer]; istave++) { digVec[istave] = new std::vector[nHicPerStave[mLayer]]; + mActiveChips[istave] = 0; } } else { for (int istave = 0; istave < NStaves[mLayer]; istave++) { digVec[istave] = new std::vector[nHicPerStave[mLayer]]; + mActiveChips[istave] = 0; } } @@ -451,6 +470,8 @@ void ITSFhrTask::monitorData(o2::framework::ProcessingContext& ctx) hic = 0; mHitnumberLane[stave][chip]++; mChipStat[stave][chip]++; + mActiveChips[stave]++; + LOG(INFO) << "Layer: " << mLayer << "\t Stave: " << stave << "\t Number of active chips: " << mActiveChips[stave]; } else { stave = (mChipDataBuffer->getChipID() - ChipBoundary[mLayer]) / (14 * nHicPerStave[mLayer]); int chipIdLocal = (mChipDataBuffer->getChipID() - ChipBoundary[mLayer]) % (14 * nHicPerStave[mLayer]); @@ -460,6 +481,8 @@ void ITSFhrTask::monitorData(o2::framework::ProcessingContext& ctx) mHitnumberLane[stave][lane]++; mChipStat[stave][chipIdLocal]++; + mActiveChips[stave]++; + LOG(INFO) << "Layer: " << mLayer << "\t Stave: " << stave << "\t Number of active chips: " << mActiveChips[stave]; } digVec[stave][hic].emplace_back(mChipDataBuffer->getChipID(), pixel.getRow(), pixel.getCol()); } @@ -663,6 +686,7 @@ void ITSFhrTask::monitorData(o2::framework::ProcessingContext& ctx) } } mGeneralOccupancy->SetBinContent(istave + 1 + StaveBoundary[mLayer], *(std::max_element(mOccupancyLane[istave], mOccupancyLane[istave] + nChipsPerHic[mLayer]))); + mEmptyLanesFraction->SetBinContent(istave + 1 + StaveBoundary[mLayer], 1 - mActiveChips[istave]/(nChipsPerHic[mLayer]*nHicPerStave[mLayer])); mGeneralNoisyPixel->SetBinContent(istave + 1 + StaveBoundary[mLayer], mNoisyPixelNumber[mLayer][istave]); } else { for (int ichip = 0; ichip < nHicPerStave[mLayer] * nChipsPerHic[mLayer]; ichip++) { @@ -690,6 +714,7 @@ void ITSFhrTask::monitorData(o2::framework::ProcessingContext& ctx) } } mGeneralOccupancy->SetBinContent(istave + 1 + StaveBoundary[mLayer], *(std::max_element(mOccupancyLane[istave], mOccupancyLane[istave] + nHicPerStave[mLayer] * 2))); + mEmptyLanesFraction->SetBinContent(istave + 1 + StaveBoundary[mLayer], 1 - mActiveChips[istave]/(nChipsPerHic[mLayer]*nHicPerStave[mLayer])); mGeneralNoisyPixel->SetBinContent(istave + 1 + StaveBoundary[mLayer], mNoisyPixelNumber[mLayer][istave]); } } @@ -781,6 +806,7 @@ void ITSFhrTask::reset() resetGeneralPlots(); resetOccupancyPlots(); mGeneralOccupancy->Reset("content"); + mEmptyLanesFraction->Reset("content"); mGeneralNoisyPixel->Reset("content"); mDecoder->clearStat(); diff --git a/Modules/TRD/src/TrackingTask.cxx b/Modules/TRD/src/TrackingTask.cxx index 704f126d7a..86a56d596d 100644 --- a/Modules/TRD/src/TrackingTask.cxx +++ b/Modules/TRD/src/TrackingTask.cxx @@ -116,7 +116,7 @@ void TrackingTask::monitorData(o2::framework::ProcessingContext& ctx) continue; } // eta-phi distribution per layer - mTracksEtaPhiPerLayer[charge][iLayer]->Fill(trackTRD.getOuterParam().getEta(), trackTRD.getOuterParam().getPhiPos()); + mTracksEtaPhiPerLayer[iLayer][charge]->Fill(trackTRD.getOuterParam().getEta(), trackTRD.getOuterParam().getPhiPos()); } // end of loop over layers } // end of loop over tracks } // end of loop over track trigger records @@ -203,7 +203,7 @@ void TrackingTask::buildHistograms() axisConfig(mNtracks, "# of tracks", "", "", 1, 1.0, 1.3); publishObject(mNtracks); - mNtracklets = new TH1D("Ntracklets", "Number of Tracklets per track", 6, 0.0, 6.0); + mNtracklets = new TH1D("Ntracklets", "Number of Tracklets per Track", 7, -0.5, 6.5); axisConfig(mNtracklets, "# of tracklets", "", "", 1, 1.0, 1.1); publishObject(mNtracklets); diff --git a/doc/Advanced.md b/doc/Advanced.md index 663053ea28..e7df8973bc 100644 --- a/doc/Advanced.md +++ b/doc/Advanced.md @@ -5,6 +5,7 @@ Advanced topics + * [Advanced topics](#advanced-topics) * [Framework](#framework) * [Plugging the QC to an existing DPL workflow](#plugging-the-qc-to-an-existing-dpl-workflow) * [Production of QC objects outside this framework](#production-of-qc-objects-outside-this-framework) @@ -18,6 +19,7 @@ Advanced topics * [Monitor cycles](#monitor-cycles) * [Writing a DPL data producer](#writing-a-dpl-data-producer) * [Custom merging](#custom-merging) + * [Critical and non-critical tasks](#critical-and-non-critical-tasks) * [QC with DPL Analysis](#qc-with-dpl-analysis) * [Uploading objects to QCDB](#uploading-objects-to-qcdb) * [Getting AODs in QC Tasks](#getting-aods-in-qc-tasks) @@ -502,6 +504,54 @@ Feel free to consult the existing usage examples among other modules in the QC r Once a custom class is implemented, one should let QCG know how to display it correctly, which is explained in the subsection [Display a non-standard ROOT object in QCG](#display-a-non-standard-root-object-in-qcg). +## Critical, resilient and non-critical tasks + +DPL devices can be marked as expendable, resilient or critical. Expendable tasks can die without affecting the run. +Resilient tasks can survive having one or all their inputs coming from an expendable task but they will stop the system if they themselves die. +Critical tasks (default) will stop the system if they die and will not accept input from expendable tasks. + +In QC we use these `labels`. + +### QC tasks + +In QC, one can mark a task as critical or non-critical: +```json + "tasks": { + "QcTask": { + "active": "true", + "critical": "false", "": "if false the task is allowed to die without stopping the workflow, default: true", +``` +By default they are `critical` meaning that their failure will stop the run. +If they are not critical, they will be `expendable` and will not stop the run if they die. + +### Auto-generated proxies + +They adopt the criticality of the task they are proxying. + +### QC mergers + +Mergers are `resilient`. + +### QC check runners + +CheckRunners are `resilient`. + +### QC aggregators + +Aggregators are `resilient`. + +### QC post-processing tasks + +Post-processing tasks can be marked as critical or non-critical: +```json + "postprocessing": { + "ExamplePostprocessing": { + "active": "true", + "critical": "false", "": "if false the task is allowed to die without stopping the workflow, default: true", +``` +By default, they are critical meaning that their failure will stop the run. +If they are not critical, they will be `expendable` and will not stop the run if they die. + ## QC with DPL Analysis QC offers several ways to interact with the DPL Analysis framework. @@ -1308,6 +1358,7 @@ the "tasks" path. "className": "namespace::of::Task", "": "Class name of the QC Task with full namespace.", "moduleName": "QcSkeleton", "": "Library name. It can be found in CMakeLists of the detector module.", "detectorName": "TST", "": "3-letter code of the detector.", + "critical": "true", "": "if false the task is allowed to die without stopping the workflow, default: true", "cycleDurationSeconds": "10", "": "Cycle duration (how often objects are published), 10 seconds minimum.", "": "The first cycle will be randomly shorter. ", "": "Alternatively, one can specify different cycle durations for different periods. The last item in cycleDurations will be used for the rest of the duration whatever the period. The first cycle will be randomly shorter.", diff --git a/doc/PostProcessing.md b/doc/PostProcessing.md index d34d68eac0..f60d826aa0 100644 --- a/doc/PostProcessing.md +++ b/doc/PostProcessing.md @@ -142,6 +142,12 @@ Checks can be applied to the results of Post-processing Tasks just as for normal ... ``` +## Definition and access of user-specific configuration + +A postprocessing task can access custom parameters declared in the configuration file at `qc.postprocessing..extendedTaskParameters`. They are stored inside an object of type `CustomParameters` named `mCustomParameters`, which is a protected member of `TaskInterface`. + +[More details](Advanced.md#definition-and-access-of-user-specific-configuration) can be found about this feature in the Tasks (same behaviour). + #### Triggers configuration Each of the three methods can be invoked by one or more triggers. Below are listed the possible options (case insensitive).