Skip to content

Commit fe5f1d4

Browse files
committed
DPL: add metric aggregation based on policies
1 parent e73e5b9 commit fe5f1d4

6 files changed

Lines changed: 731 additions & 0 deletions

File tree

Framework/Core/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,8 @@ o2_add_library(Framework
119119
src/PluginManager.cxx
120120
src/RateLimiter.cxx
121121
src/ResourcesMonitoringHelper.cxx
122+
src/AggregationPolicy.cxx
123+
src/MetricAggregator.cxx
122124
src/ResourcePolicy.cxx
123125
src/ResourcePolicyHelpers.cxx
124126
src/RootArrowFilesystem.cxx
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
// Copyright 2019-2025 CERN and copyright holders of ALICE O2.
2+
// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3+
// All rights not expressly granted are reserved.
4+
//
5+
// This software is distributed under the terms of the GNU General Public
6+
// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7+
//
8+
// In applying this license CERN does not waive the privileges and immunities
9+
// granted to it by virtue of its status as an Intergovernmental Organization
10+
// or submit itself to any jurisdiction.
11+
12+
#ifndef O2_FRAMEWORK_METRICAGGREGATOR_AGGREGATIONPOLICY_H
13+
#define O2_FRAMEWORK_METRICAGGREGATOR_AGGREGATIONPOLICY_H
14+
15+
#include <string>
16+
#include <string_view>
17+
#include <vector>
18+
#include <regex>
19+
20+
namespace o2
21+
{
22+
23+
namespace framework
24+
{
25+
26+
namespace metricaggregator
27+
{
28+
/// Defines the selection strategy for devices.
29+
enum class AggregationSelectionType {
30+
All,
31+
Specific
32+
};
33+
/// Defines the reduction strategy for metrics.
34+
enum class AggregationMetricType {
35+
Sum,
36+
Average,
37+
Rate,
38+
Specific,
39+
Simple
40+
};
41+
42+
/// Parses environment configurations and evaluates aggregation rules.
43+
class AggregationPolicy
44+
{
45+
public:
46+
AggregationPolicy() = default;
47+
~AggregationPolicy() = default;
48+
/// Reads configuration from environment variables and sets internal rules.
49+
void configureFromEnv();
50+
/// Returns the configured device selection type.
51+
AggregationSelectionType getSelection() const noexcept;
52+
/// Returns the configured global metric reduction type.
53+
AggregationMetricType getReduction() const noexcept;
54+
/// Determines the specific reduction type required for a given metric name.
55+
AggregationMetricType getAggregationTypeForMetric(std::string_view metricName) const;
56+
/// Evaluates whether the policy allows processing for the provided device name.
57+
bool selectDevice(std::string_view deviceId) const;
58+
private:
59+
/// Maps a regular expression pattern to a specific aggregation type.
60+
struct MetricRule {
61+
std::regex metricPattern;
62+
AggregationMetricType type;
63+
};
64+
65+
std::vector<std::string> split(std::string_view input, char delim) const;
66+
/// Converts a string literal into an AggregationSelectionType enum.
67+
AggregationSelectionType parseSelectionType(const std::string& str);
68+
/// Converts a string literal into an AggregationMetricType enum.
69+
AggregationMetricType parseReductionType(const std::string& str);
70+
71+
AggregationSelectionType mSelection = AggregationSelectionType::All;
72+
AggregationMetricType mReduction = AggregationMetricType::Sum;
73+
std::vector<std::string> mSpecificDevices;
74+
std::vector<MetricRule> mSpecificMetricRules;
75+
};
76+
77+
} // namespace metricaggregator
78+
} // namespace framework
79+
} // namespace o2
80+
81+
#endif // O2_FRAMEWORK_METRICAGGREGATOR_AGGREGATIONPOLICY_H
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
// Copyright 2019-2025 CERN and copyright holders of ALICE O2.
2+
// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3+
// All rights not expressly granted are reserved.
4+
//
5+
// This software is distributed under the terms of the GNU General Public
6+
// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7+
//
8+
// In applying this license CERN does not waive the privileges and immunities
9+
// granted to it by virtue of its status as an Intergovernmental Organization
10+
// or submit itself to any jurisdiction.
11+
12+
#ifndef O2_FRAMEWORK_METRICAGGREGATOR_METRICAGGREGATOR_H
13+
#define O2_FRAMEWORK_METRICAGGREGATOR_METRICAGGREGATOR_H
14+
15+
#include "Framework/ServiceHandle.h"
16+
#include "Framework/ServiceMetricsInfo.h"
17+
#include "Framework/Monitoring.h"
18+
19+
#include <fairmq/ProgOptions.h>
20+
#include <memory>
21+
#include <string>
22+
#include <vector>
23+
#include <unordered_map>
24+
25+
#include "Framework/AggregationPolicy.h"
26+
27+
namespace o2
28+
{
29+
30+
namespace framework
31+
{
32+
33+
namespace metricaggregator
34+
{
35+
/// Stores a single numeric measurement and its associated timestamp.
36+
struct MetricSample {
37+
double value = 0.0;
38+
std::size_t timestamp = 0;
39+
};
40+
41+
/// Collects and reduces metrics across multiple framework devices.
42+
/// Transmits the aggregated results to an external monitoring backend.
43+
class MetricAggregator
44+
{
45+
public:
46+
explicit MetricAggregator();
47+
~MetricAggregator() = default;
48+
/// Initializes the internal aggregation policy from environment variables.
49+
void setPolicy();
50+
/// Returns the current policy configuration as a formatted string.
51+
std::string getPolicy();
52+
/// Routes metrics to the appropriate processing function based on the policy reduction type.
53+
void mergeMetrics(const std::vector<DeviceMetricsInfo>& metrics,
54+
const DeviceMetricsInfo& driverMetrics,
55+
const std::vector<DeviceSpec>& specs);
56+
private:
57+
/// Appends a suffix to the metric name based on the applied aggregation type.
58+
std::string getMetricNameFromPolicy(std::string_view metricName, AggregationMetricType aggregationType);
59+
/// Flushes metrics directly without aggregation.
60+
void flushMetricsSimple(const std::vector<DeviceMetricsInfo>& deviceMetrics,
61+
const DeviceMetricsInfo& driverMetrics,
62+
const std::vector<DeviceSpec>& specs);
63+
/// Flushes metrics by applying the aggregation policy.
64+
void flushMetrics(const std::vector<DeviceMetricsInfo>& deviceMetrics,
65+
const DeviceMetricsInfo& driverMetrics,
66+
const std::vector<DeviceSpec>& specs);
67+
/// Retrieves the monitoring backend type from environment variables.
68+
const char* getBackendFromEnv();
69+
70+
const char* mBackend = nullptr;
71+
std::unique_ptr<o2::monitoring::Monitoring> mMonitoring;
72+
/// Stores the previous samples required to compute rates over time.
73+
std::unordered_map<std::string,std::vector<MetricSample>> mLastSentSamples;
74+
std::unique_ptr<AggregationPolicy> mPolicy;
75+
};
76+
} // namespace metricaggregator
77+
} // namespace framework
78+
} // namespace o2
79+
80+
#endif // O2_FRAMEWORK_METRICAGGREGATOR_METRICAGGREGATOR_H
Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
// Copyright 2019-2025 CERN and copyright holders of ALICE O2.
2+
// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3+
// All rights not expressly granted are reserved.
4+
//
5+
// This software is distributed under the terms of the GNU General Public
6+
// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7+
//
8+
// In applying this license CERN does not waive the privileges and immunities
9+
// granted to it by virtue of its status as an Intergovernmental Organization
10+
// or submit itself to any jurisdiction.
11+
12+
#include "Framework/AggregationPolicy.h"
13+
#include "Framework/Logger.h"
14+
15+
#include <algorithm>
16+
#include <cstdlib>
17+
#include <sstream>
18+
#include <string>
19+
#include <vector>
20+
#include <regex>
21+
#include <stdexcept>
22+
23+
using namespace o2::framework::metricaggregator;
24+
25+
std::vector<std::string> AggregationPolicy::split(std::string_view input, char delim) const {
26+
std::vector<std::string> tokens;
27+
std::string token;
28+
std::istringstream tokenStream{std::string(input)};
29+
while (std::getline(tokenStream, token, delim)) {
30+
tokens.push_back(token);
31+
}
32+
return tokens;
33+
}
34+
35+
void AggregationPolicy::configureFromEnv()
36+
{
37+
const char* envPolicy = std::getenv("ALIEN_JDL_AGGREGATOR_POLICY");
38+
if (!envPolicy) {
39+
LOGP(warn, "[AggregationPolicy] ALIEN_JDL_AGGREGATOR_POLICY is not set. Using default 'all:simple'.");
40+
mSelection = AggregationSelectionType::All;
41+
mReduction = AggregationMetricType::Simple;
42+
return;
43+
}
44+
45+
try {
46+
std::string policyStr(envPolicy);
47+
std::vector<std::string> parts = split(policyStr, ':');
48+
49+
if (parts.size() < 2) {
50+
LOGP(error, "[AggregationPolicy] Invalid ALIEN_JDL_AGGREGATOR_POLICY format");
51+
return;
52+
}
53+
54+
mSelection = parseSelectionType(parts[0]);
55+
mReduction = parseReductionType(parts[1]);
56+
57+
if (mSelection == AggregationSelectionType::Specific) {
58+
const char* envDevices = std::getenv("ALIEN_JDL_AGGREGATOR_DEVICES");
59+
if (!envDevices) {
60+
throw std::invalid_argument("ALIEN_JDL_AGGREGATOR_DEVICES environment variable is required when selection type is 'specific'");
61+
}
62+
mSpecificDevices = split(std::string(envDevices), ',');
63+
}
64+
if (mReduction == AggregationMetricType::Specific) {
65+
const char* envMetrics = std::getenv("ALIEN_JDL_AGGREGATOR_METRICS");
66+
if (!envMetrics) {
67+
LOGP(warn, "[AggregationPolicy] ALIEN_JDL_AGGREGATOR_METRICS environment variable missing for 'specific' reduction type. Using default.");
68+
mSpecificMetricRules.push_back({std::regex(".*"), AggregationMetricType::Sum});
69+
return;
70+
}
71+
72+
std::stringstream metricsStream(envMetrics);
73+
std::string metricRuleStr;
74+
while(std::getline(metricsStream, metricRuleStr, ';')) {
75+
auto pos = metricRuleStr.find(':');
76+
if (pos == std::string::npos) {
77+
throw std::invalid_argument("Invalid metric rule format: " + metricRuleStr);
78+
}
79+
std::string typeStr = metricRuleStr.substr(0, pos);
80+
std::string patternStr = metricRuleStr.substr(pos + 1);
81+
AggregationMetricType type = parseReductionType(typeStr);
82+
mSpecificMetricRules.push_back({std::regex(patternStr), type});
83+
}
84+
}
85+
} catch (std::exception const& e) {
86+
LOGP(error, "[AggregationPolicy] Failed to parse ALIEN_JDL_AGGREGATOR_POLICY: {}", e.what());
87+
}
88+
}
89+
90+
AggregationMetricType AggregationPolicy::getAggregationTypeForMetric(std::string_view metricName) const
91+
{
92+
if (mReduction != AggregationMetricType::Specific) {
93+
return mReduction;
94+
}
95+
for (const auto& rule : mSpecificMetricRules) {
96+
if (std::regex_match(std::string(metricName), rule.metricPattern)) {
97+
return rule.type;
98+
}
99+
}
100+
if (mReduction == AggregationMetricType::Specific) {
101+
LOGP(error, "[AggregationPolicy] No specific aggregation type found for metric '{}'", metricName);
102+
}
103+
throw std::invalid_argument("No specific aggregation type found for metric: " + std::string(metricName));
104+
}
105+
106+
AggregationSelectionType AggregationPolicy::getSelection() const noexcept
107+
{
108+
return mSelection;
109+
}
110+
111+
AggregationMetricType AggregationPolicy::getReduction() const noexcept
112+
{
113+
return mReduction;
114+
}
115+
116+
AggregationSelectionType AggregationPolicy::parseSelectionType(const std::string& str)
117+
{
118+
if (str == "all") {
119+
return AggregationSelectionType::All;
120+
} else if (str == "specific") {
121+
return AggregationSelectionType::Specific;
122+
}
123+
throw std::invalid_argument("Invalid selection type: " + str);
124+
}
125+
126+
AggregationMetricType AggregationPolicy::parseReductionType(const std::string& str)
127+
{
128+
if (str == "sum") {
129+
return AggregationMetricType::Sum;
130+
} else if (str == "average") {
131+
return AggregationMetricType::Average;
132+
} else if (str == "rate") {
133+
return AggregationMetricType::Rate;
134+
} else if (str == "simple") {
135+
return AggregationMetricType::Simple;
136+
} else if (str == "specific") {
137+
return AggregationMetricType::Specific;
138+
}
139+
throw std::invalid_argument("Invalid reduction type: " + str);
140+
}
141+
142+
143+
144+
bool AggregationPolicy::selectDevice(std::string_view deviceId) const
145+
{
146+
if (mSelection == AggregationSelectionType::Specific) {
147+
return std::find(mSpecificDevices.begin(), mSpecificDevices.end(), deviceId) != mSpecificDevices.end();
148+
}
149+
return true;
150+
}

0 commit comments

Comments
 (0)