Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ include(AddQCWorkflow)
# ---- Project ----

project(QualityControl
VERSION 1.128.0
VERSION 1.129.0
DESCRIPTION "O2 Data Quality Control Framework"
LANGUAGES C CXX)

Expand Down
2 changes: 1 addition & 1 deletion Framework/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ endforeach()

foreach(t testWorkflow testTaskRunner testCheckWorkflow
testPostProcessingConfig testPostProcessingInterface
testCheck testTrendingTask)
testCheck testTrendingTask testCustomParameters)
target_sources(${t} PRIVATE
${CMAKE_BINARY_DIR}/getTestDataDirectory.cxx)
target_include_directories(${t} PRIVATE ${CMAKE_SOURCE_DIR})
Expand Down
4 changes: 2 additions & 2 deletions Framework/basic.json
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
"tasks": {
"QcTask": {
"active": "true",
"critical": "false", "": "if false the task is allowed to die without stopping the workflow, default: true",
"className": "o2::quality_control_modules::skeleton::SkeletonTask",
"moduleName": "QcSkeleton",
"detectorName": "TST",
Expand Down Expand Up @@ -105,8 +106,7 @@
"fraction": "0.1",
"seed": "1234"
}
],
"blocking": "false"
]
}
]
}
7 changes: 7 additions & 0 deletions Framework/include/QualityControl/CustomParameters.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <string>
#include <unordered_map>
#include <optional>
#include <boost/property_tree/ptree_fwd.hpp>

namespace o2::quality_control::core
{
Expand Down Expand Up @@ -167,6 +168,12 @@ class CustomParameters
*/
friend std::ostream& operator<<(std::ostream& out, const CustomParameters& customParameters);

/**
* \brief Provided the config subtree of the custom parameters, load its content and populate this CustomParameters.
* \param paramsTree The subtree corresponding to extendedTaskParameters, extendedCheckParameters, etc...
*/
void populateCustomParameters(const boost::property_tree::ptree& paramsTree);

private:
CustomParametersType mCustomParameters;
};
Expand Down
10 changes: 7 additions & 3 deletions Framework/include/QualityControl/InfrastructureGenerator.h
Original file line number Diff line number Diff line change
Expand Up @@ -219,9 +219,13 @@ class InfrastructureGenerator
static void generateMergers(framework::WorkflowSpec& workflow, const std::string& taskName,
size_t numberOfLocalMachines,
std::vector<std::pair<size_t, size_t>> cycleDurationSeconds,
const std::string& mergingMode, size_t resetAfterCycles,
std::string monitoringUrl, const std::string& detectorName,
std::vector<size_t> mergersPerLayer, bool enableMovingWindows);
const std::string& mergingMode,
size_t resetAfterCycles,
std::string monitoringUrl,
const std::string& detectorName,
std::vector<size_t> mergersPerLayer,
bool enableMovingWindows,
bool critical);
static void generateCheckRunners(framework::WorkflowSpec& workflow, const InfrastructureSpec& infrastructureSpec);
static void generateAggregator(framework::WorkflowSpec& workflow, const InfrastructureSpec& infrastructureSpec);
static void generatePostProcessing(framework::WorkflowSpec& workflow, const InfrastructureSpec& infrastructureSpec);
Expand Down
4 changes: 4 additions & 0 deletions Framework/include/QualityControl/PostProcessingConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
#ifndef QUALITYCONTROL_POSTPROCESSINGCONFIG_H
#define QUALITYCONTROL_POSTPROCESSINGCONFIG_H

#include "QualityControl/CustomParameters.h"

#include <vector>
#include <string>
#include <boost/property_tree/ptree_fwd.hpp>
Expand Down Expand Up @@ -45,6 +47,8 @@ struct PostProcessingConfig {
std::string consulUrl;
core::Activity activity;
bool matchAnyRunNumber = false;
bool critical;
core::CustomParameters customParameters;
};

} // namespace o2::quality_control::postprocessing
Expand Down
4 changes: 4 additions & 0 deletions Framework/include/QualityControl/PostProcessingInterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
#ifndef QUALITYCONTROL_POSTPROCESSINTERFACE_H
#define QUALITYCONTROL_POSTPROCESSINTERFACE_H

#include "QualityControl/CustomParameters.h"

#include <string>
#include <boost/property_tree/ptree_fwd.hpp>
#include "QualityControl/Triggers.h"
Expand Down Expand Up @@ -65,6 +67,7 @@ class PostProcessingInterface
/// \param services Interface containing optional interfaces, for example DatabaseInterface
virtual void finalize(Trigger trigger, framework::ServiceRegistryRef services) = 0;

void setCustomParameters(const core::CustomParameters& parameters);
void setObjectsManager(std::shared_ptr<core::ObjectsManager> objectsManager);
void setID(const std::string& id);
[[nodiscard]] const std::string& getID() const;
Expand All @@ -73,6 +76,7 @@ class PostProcessingInterface

protected:
std::shared_ptr<core::ObjectsManager> getObjectsManager();
core::CustomParameters mCustomParameters;

private:
std::string mID;
Expand Down
2 changes: 2 additions & 0 deletions Framework/include/QualityControl/PostProcessingTaskSpec.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,10 @@ struct PostProcessingTaskSpec {
std::string id = "Invalid";
std::string taskName = "Invalid";
bool active = true;
bool critical = true;
std::string detectorName = "Invalid";
boost::property_tree::ptree tree = {};
core::CustomParameters customParameters;
};

} // namespace o2::quality_control::core
Expand Down
1 change: 1 addition & 0 deletions Framework/include/QualityControl/TaskRunnerConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ struct TaskRunnerConfig {
std::string className;
std::vector<std::pair<size_t, size_t>> cycleDurations = {};
int maxNumberCycles;
bool critical;
std::string consulUrl{};
std::string conditionUrl{};
std::string monitoringUrl{};
Expand Down
1 change: 1 addition & 0 deletions Framework/include/QualityControl/TaskSpec.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ struct TaskSpec {
DataSourceSpec dataSource;
// advanced
bool active = true;
bool critical = true;
int maxNumberCycles = -1;
size_t resetAfterCycles = 0;
std::string saveObjectsToFile;
Expand Down
1 change: 1 addition & 0 deletions Framework/postprocessing.json
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
"postprocessing": {
"ExamplePostprocessing": {
"active": "true",
"critical": "false", "": "if false the task is allowed to die without stopping the workflow, default: true",
"className": "o2::quality_control_modules::skeleton::SkeletonPostProcessing",
"moduleName": "QcSkeleton",
"detectorName": "TST",
Expand Down
2 changes: 2 additions & 0 deletions Framework/src/AggregatorRunnerFactory.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ DataProcessorSpec AggregatorRunnerFactory::create(const core::CommonSpec& common
};
newAggregatorRunner.labels.emplace_back(o2::framework::ecs::qcReconfigurable);
newAggregatorRunner.labels.emplace_back(AggregatorRunner::getLabel());
framework::DataProcessorLabel resilientLabel = { "resilient" };
newAggregatorRunner.labels.emplace_back(resilientLabel);
newAggregatorRunner.algorithm = adaptFromTask<AggregatorRunner>(std::move(aggregatorRunner));
return newAggregatorRunner;
}
Expand Down
3 changes: 2 additions & 1 deletion Framework/src/CheckRunner.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,7 @@ QualityObjectsType CheckRunner::check()
QualityObjectsType allQOs;
for (auto& [checkName, check] : mChecks) {
if (updatePolicyManager.isReady(check.getName())) {
ILOG(Debug, Support) << "Monitor Objects for the check '" << checkName << "' are ready --> check()" << ENDM;
auto newQOs = check.check(mMonitorObjects);
mTotalNumberCheckExecuted += newQOs.size();

Expand All @@ -366,7 +367,7 @@ QualityObjectsType CheckRunner::check()
// Was checked, update latest revision
updatePolicyManager.updateActorRevision(checkName);
} else {
ILOG(Info, Support) << "Monitor Objects for the check '" << checkName << "' are not ready, ignoring" << ENDM;
ILOG(Debug, Support) << "Monitor Objects for the check '" << checkName << "' are not ready, ignoring" << ENDM;
}
}
return allQOs;
Expand Down
3 changes: 2 additions & 1 deletion Framework/src/CheckRunnerFactory.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ DataProcessorSpec CheckRunnerFactory::create(CheckRunnerConfig checkRunnerConfig
options };
newCheckRunner.labels.emplace_back(o2::framework::ecs::qcReconfigurable);
newCheckRunner.labels.emplace_back(CheckRunner::getCheckRunnerLabel());
newCheckRunner.labels.emplace_back(framework::DataProcessorLabel{ "resilient" });
newCheckRunner.algorithm = adaptFromTask<CheckRunner>(std::move(qcCheckRunner));
return newCheckRunner;
}
Expand All @@ -61,7 +62,7 @@ DataProcessorSpec CheckRunnerFactory::createSinkDevice(const CheckRunnerConfig&
checkRunnerConfig.options,
{},
{ o2::framework::ecs::qcReconfigurable } };

newCheckRunner.labels.emplace_back(framework::DataProcessorLabel{ "resilient" });
return newCheckRunner;
}

Expand Down
16 changes: 14 additions & 2 deletions Framework/src/CustomParameters.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "QualityControl/CustomParameters.h"
#include <DataFormatsParameters/ECSDataAdapters.h>
#include <iostream>
#include <boost/property_tree/ptree.hpp>
#include <string_view>
#include <vector>

Expand All @@ -33,7 +34,7 @@ std::ostream& operator<<(std::ostream& out, const CustomParameters& customParame

CustomParameters::CustomParameters()
{
mCustomParameters["default"]["default"] = {};
mCustomParameters["null"]["null"] = {};
}

void CustomParameters::set(const std::string& key, const std::string& value, const std::string& runType, const std::string& beamType)
Expand Down Expand Up @@ -154,7 +155,7 @@ std::unordered_map<std::string, std::string>::const_iterator CustomParameters::f

std::unordered_map<std::string, std::string>::const_iterator CustomParameters::end() const
{
return mCustomParameters.at("default").at("default").end();
return mCustomParameters.at("null").at("null").end();
}

std::string CustomParameters::operator[](const std::string& key) const
Expand All @@ -170,4 +171,15 @@ std::string& CustomParameters::operator[](const std::string& key)
return mCustomParameters.at("default").at("default").at(key);
}

void CustomParameters::populateCustomParameters(const boost::property_tree::ptree& tree)
{
for (const auto& [runtype, subTreeRunType] : tree) {
for (const auto& [beamtype, subTreeBeamType] : subTreeRunType) {
for (const auto& [key, value] : subTreeBeamType) {
set(key, value.get_value<std::string>(), runtype, beamtype);
}
}
}
}

} // namespace o2::quality_control::core
28 changes: 18 additions & 10 deletions Framework/src/InfrastructureGenerator.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ framework::WorkflowSpec InfrastructureGenerator::generateFullChainInfrastructure
bool enableMovingWindows = !taskSpec.movingWindows.empty();
generateMergers(workflow, taskSpec.taskName, 1, cycleDurationsMultiplied,
taskSpec.mergingMode, resetAfterCycles, infrastructureSpec.common.monitoringUrl,
taskSpec.detectorName, taskSpec.mergersPerLayer, enableMovingWindows);
taskSpec.detectorName, taskSpec.mergersPerLayer, enableMovingWindows, taskSpec.critical);
} else { // TaskLocationSpec::Remote
auto taskConfig = TaskRunnerFactory::extractConfig(infrastructureSpec.common, taskSpec, 0, taskSpec.resetAfterCycles);
workflow.emplace_back(TaskRunnerFactory::create(taskConfig));
Expand Down Expand Up @@ -277,9 +277,8 @@ o2::framework::WorkflowSpec InfrastructureGenerator::generateRemoteInfrastructur
std::for_each(cycleDurationsMultiplied.begin(), cycleDurationsMultiplied.end(),
[taskSpec](std::pair<size_t, size_t>& p) { p.first *= taskSpec.mergerCycleMultiplier; });
bool enableMovingWindows = !taskSpec.movingWindows.empty();
generateMergers(workflow, taskSpec.taskName, numberOfLocalMachines, cycleDurationsMultiplied,
taskSpec.mergingMode, resetAfterCycles, infrastructureSpec.common.monitoringUrl,
taskSpec.detectorName, taskSpec.mergersPerLayer, enableMovingWindows);
generateMergers(workflow, taskSpec.taskName, numberOfLocalMachines, cycleDurationsMultiplied, taskSpec.mergingMode,
resetAfterCycles, infrastructureSpec.common.monitoringUrl, taskSpec.detectorName, taskSpec.mergersPerLayer, enableMovingWindows, taskSpec.critical);

} else if (taskSpec.location == TaskLocationSpec::Remote) {

Expand Down Expand Up @@ -550,6 +549,9 @@ void InfrastructureGenerator::generateLocalTaskLocalProxy(framework::WorkflowSpe
{ proxyInput },
channelConfig.c_str()));
workflow.back().labels.emplace_back(taskSpec.localControl == "odc" ? ecs::preserveRawChannelsLabel : ecs::uniqueProxyLabel);
if (!taskSpec.critical) {
workflow.back().labels.emplace_back(framework::DataProcessorLabel{ "expendable" });
}
if (getenv("O2_QC_KILL_PROXIES") != nullptr) {
workflow.back().metadata.push_back(DataProcessorMetadata{ ecs::privateMemoryKillThresholdMB, proxyMemoryKillThresholdMB });
}
Expand Down Expand Up @@ -577,6 +579,9 @@ void InfrastructureGenerator::generateLocalTaskRemoteProxy(framework::WorkflowSp
channelConfig.c_str(),
dplModelAdaptor());
proxy.labels.emplace_back(taskSpec.localControl == "odc" ? ecs::preserveRawChannelsLabel : ecs::uniqueProxyLabel);
if (!taskSpec.critical) {
workflow.back().labels.emplace_back(framework::DataProcessorLabel{ "expendable" });
}
// if not in RUNNING, we should drop all the incoming messages, we set the corresponding proxy option.
enableDraining(proxy.options);
if (getenv("O2_QC_KILL_PROXIES") != nullptr) {
Expand All @@ -585,11 +590,9 @@ void InfrastructureGenerator::generateLocalTaskRemoteProxy(framework::WorkflowSp
workflow.emplace_back(std::move(proxy));
}
void InfrastructureGenerator::generateMergers(framework::WorkflowSpec& workflow, const std::string& taskName,
size_t numberOfLocalMachines,
std::vector<std::pair<size_t, size_t>> cycleDurations,
const std::string& mergingMode, size_t resetAfterCycles,
std::string monitoringUrl, const std::string& detectorName,
std::vector<size_t> mergersPerLayer, bool enableMovingWindows)
size_t numberOfLocalMachines, std::vector<std::pair<size_t, size_t>> cycleDurations,
const std::string& mergingMode, size_t resetAfterCycles, std::string monitoringUrl,
const std::string& detectorName, std::vector<size_t> mergersPerLayer, bool enableMovingWindows, bool critical)
{
Inputs mergerInputs;
for (size_t id = 1; id <= numberOfLocalMachines; id++) {
Expand Down Expand Up @@ -617,8 +620,9 @@ void InfrastructureGenerator::generateMergers(framework::WorkflowSpec& workflow,
mergerConfig.topologySize = { TopologySize::MergersPerLayer, mergersPerLayer };
mergerConfig.monitoringUrl = std::move(monitoringUrl);
mergerConfig.detectorName = detectorName;
mergerConfig.parallelismType = { (mergerConfig.inputObjectTimespan.value == InputObjectsTimespan::LastDifference) ? ParallelismType::RoundRobin : ParallelismType::SplitInputs };
mergerConfig.labels.push_back({ "resilient" });
mergerConfig.publishMovingWindow = { enableMovingWindows ? PublishMovingWindow::Yes : PublishMovingWindow::No };
mergerConfig.parallelismType = { (mergerConfig.inputObjectTimespan.value == InputObjectsTimespan::LastDifference) ? ParallelismType::RoundRobin : ParallelismType::SplitInputs };
mergersBuilder.setConfig(mergerConfig);

mergersBuilder.generateInfrastructure(workflow);
Expand Down Expand Up @@ -786,6 +790,10 @@ void InfrastructureGenerator::generatePostProcessing(WorkflowSpec& workflow, con
ppTask.getOptions()
};
dataProcessorSpec.labels.emplace_back(PostProcessingDevice::getLabel());
if (!ppTaskSpec.critical) {
framework::DataProcessorLabel expendableLabel = { "expendable" };
dataProcessorSpec.labels.emplace_back(expendableLabel);
}
dataProcessorSpec.algorithm = adaptFromTask<PostProcessingDevice>(std::move(ppTask));

workflow.emplace_back(std::move(dataProcessorSpec));
Expand Down
26 changes: 5 additions & 21 deletions Framework/src/InfrastructureSpecReader.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -110,20 +110,15 @@ TaskSpec InfrastructureSpecReader::readSpecEntry<TaskSpec>(const std::string& ta
}
ts.dataSource = readSpecEntry<DataSourceSpec>(taskID, taskTree.get_child("dataSource"), wholeTree);
ts.active = taskTree.get<bool>("active", ts.active);
ts.critical = taskTree.get<bool>("critical", ts.critical);
ts.maxNumberCycles = taskTree.get<int>("maxNumberCycles", ts.maxNumberCycles);
ts.resetAfterCycles = taskTree.get<size_t>("resetAfterCycles", ts.resetAfterCycles);
ts.saveObjectsToFile = taskTree.get<std::string>("saveObjectsToFile", ts.saveObjectsToFile);
if (taskTree.count("extendedTaskParameters") > 0 && taskTree.count("taskParameters") > 0) {
ILOG(Warning, Devel) << "Both taskParameters and extendedTaskParameters are defined in the QC config file. We will use only extendedTaskParameters. " << ENDM;
}
if (taskTree.count("extendedTaskParameters") > 0) {
for (const auto& [runtype, subTreeRunType] : taskTree.get_child("extendedTaskParameters")) {
for (const auto& [beamtype, subTreeBeamType] : subTreeRunType) {
for (const auto& [key, value] : subTreeBeamType) {
ts.customParameters.set(key, value.get_value<std::string>(), runtype, beamtype);
}
}
}
ts.customParameters.populateCustomParameters(taskTree.get_child("extendedTaskParameters"));
} else if (taskTree.count("taskParameters") > 0) {
for (const auto& [key, value] : taskTree.get_child("taskParameters")) {
ts.customParameters.set(key, value.get_value<std::string>());
Expand Down Expand Up @@ -316,13 +311,7 @@ CheckSpec InfrastructureSpecReader::readSpecEntry<CheckSpec>(const std::string&

cs.active = checkTree.get<bool>("active", cs.active);
if (checkTree.count("extendedCheckParameters") > 0) {
for (const auto& [runtype, subTreeRunType] : checkTree.get_child("extendedCheckParameters")) {
for (const auto& [beamtype, subTreeBeamType] : subTreeRunType) {
for (const auto& [key, value] : subTreeBeamType) {
cs.customParameters.set(key, value.get_value<std::string>(), runtype, beamtype);
}
}
}
cs.customParameters.populateCustomParameters(checkTree.get_child("extendedCheckParameters"));
}
if (checkTree.count("checkParameters") > 0) {
for (const auto& [key, value] : checkTree.get_child("checkParameters")) {
Expand Down Expand Up @@ -356,13 +345,7 @@ AggregatorSpec InfrastructureSpecReader::readSpecEntry<AggregatorSpec>(const std

as.active = aggregatorTree.get<bool>("active", as.active);
if (aggregatorTree.count("extendedAggregatorParameters") > 0) {
for (const auto& [runtype, subTreeRunType] : aggregatorTree.get_child("extendedAggregatorParameters")) {
for (const auto& [beamtype, subTreeBeamType] : subTreeRunType) {
for (const auto& [key, value] : subTreeBeamType) {
as.customParameters.set(key, value.get_value<std::string>(), runtype, beamtype);
}
}
}
as.customParameters.populateCustomParameters(aggregatorTree.get_child("extendedAggregatorParameters"));
}
if (aggregatorTree.count("aggregatorParameters") > 0) {
for (const auto& [key, value] : aggregatorTree.get_child("aggregatorParameters")) {
Expand All @@ -381,6 +364,7 @@ PostProcessingTaskSpec
ppts.id = ppTaskId;
ppts.taskName = ppTaskTree.get<std::string>("taskName", ppts.id);
ppts.active = ppTaskTree.get<bool>("active", ppts.active);
ppts.critical = ppTaskTree.get<bool>("critical", ppts.critical);
ppts.detectorName = ppTaskTree.get<std::string>("detectorName", ppts.detectorName);
ppts.tree = wholeTree;

Expand Down
Loading