Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
441e5a9
feat(metrics)!: add cardinality_limit
xuan-cao-swi Aug 22, 2025
50c6729
update remaining aggregation
xuan-cao-swi Aug 22, 2025
952fe11
add cardinality limit
xuan-cao-swi Aug 25, 2025
9cb4cb5
add basic cardinality test for aggregtion
xuan-cao-swi Aug 25, 2025
aa2c75b
add more edge case
xuan-cao-swi Aug 26, 2025
8ba56b3
refactor
xuan-cao-swi Aug 26, 2025
240ff8a
fix
xuan-cao-swi Aug 26, 2025
9ec35e1
update otlp metrics exporter for cardinality
xuan-cao-swi Aug 26, 2025
9534fa7
Merge branch 'main' of github.com:xuan-cao-swi/opentelemetry-ruby int…
xuan-cao-swi Aug 26, 2025
90aab7e
fix test case
xuan-cao-swi Aug 26, 2025
07b1c87
remove testing comments
xuan-cao-swi Aug 26, 2025
b599add
Merge branch 'main' into metrics-cardinality-limit
xuan-cao-swi Sep 2, 2025
595e4c6
merge
xuan-cao-swi Sep 2, 2025
21c951b
Merge branch 'main' into metrics-cardinality-limit
xuan-cao-swi Sep 22, 2025
3d364ea
Merge branch 'main' into metrics-cardinality-limit
xuan-cao-swi Oct 7, 2025
b14eb36
Merge branch 'main' into metrics-cardinality-limit
xuan-cao-swi Oct 28, 2025
4022e04
resolve merge issue
xuan-cao-swi Oct 28, 2025
e6f8e6b
Merge branch 'main' into metrics-cardinality-limit
xuan-cao-swi Oct 31, 2025
a1cb1a3
Merge branch 'main' into metrics-cardinality-limit
xuan-cao-swi Nov 3, 2025
f6323f3
resolve merge issue
xuan-cao-swi Nov 3, 2025
a26e98b
merge
xuan-cao-swi Dec 5, 2025
e1aa85c
merge
xuan-cao-swi Jan 13, 2026
c04631d
resolve merge conflict
xuan-cao-swi Apr 20, 2026
b7d0e47
Apply suggestions from code review
xuan-cao-swi May 25, 2026
8980cca
update
xuan-cao-swi May 25, 2026
85695ff
Merge branch 'metrics-cardinality-limit' of github.com:xuan-cao-swi/o…
xuan-cao-swi May 25, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,13 @@ def initialize(endpoint: OpenTelemetry::Common::Utilities.config_opt('OTEL_EXPOR
ssl_verify_mode: MetricsExporter.ssl_verify_mode,
headers: OpenTelemetry::Common::Utilities.config_opt('OTEL_EXPORTER_OTLP_METRICS_HEADERS', 'OTEL_EXPORTER_OTLP_HEADERS', default: {}),
compression: OpenTelemetry::Common::Utilities.config_opt('OTEL_EXPORTER_OTLP_METRICS_COMPRESSION', 'OTEL_EXPORTER_OTLP_COMPRESSION', default: 'gzip'),
timeout: OpenTelemetry::Common::Utilities.config_opt('OTEL_EXPORTER_OTLP_METRICS_TIMEOUT', 'OTEL_EXPORTER_OTLP_TIMEOUT', default: 10))
timeout: OpenTelemetry::Common::Utilities.config_opt('OTEL_EXPORTER_OTLP_METRICS_TIMEOUT', 'OTEL_EXPORTER_OTLP_TIMEOUT', default: 10),
aggregation_cardinality_limit: nil)
raise ArgumentError, "invalid url for OTLP::MetricsExporter #{endpoint}" unless OpenTelemetry::Common::Utilities.valid_url?(endpoint)
raise ArgumentError, "unsupported compression key #{compression}" unless compression.nil? || %w[gzip none].include?(compression)

# create the MetricStore object
super()
super(aggregation_cardinality_limit: aggregation_cardinality_limit)

@uri = if endpoint == ENV['OTEL_EXPORTER_OTLP_ENDPOINT']
URI.join(endpoint, 'v1/metrics')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def collect(start_time, end_time, data_points)
data_points.values.map!(&:dup)
end

def update(increment, attributes, data_points, exemplar_offer: false)
def update(increment, attributes, data_points, cardinality_limit, exemplar_offer: false)
data_points[attributes] = NumberDataPoint.new(
{},
0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ module Aggregation
# Contains the implementation of the ExplicitBucketHistogram aggregation
# https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#explicit-bucket-histogram-aggregation
class ExplicitBucketHistogram
OVERFLOW_ATTRIBUTE_SET = { 'otel.metric.overflow' => true }.freeze
attr_reader :exemplar_reservoir

DEFAULT_BOUNDARIES = [0, 5, 10, 25, 50, 75, 100, 250, 500, 1000].freeze
Expand Down Expand Up @@ -59,60 +60,76 @@ def collect(start_time, end_time, data_points)
end
end

def update(amount, attributes, data_points, exemplar_offer: false)
hdp = data_points.fetch(attributes) do
if @record_min_max
min = Float::INFINITY
max = -Float::INFINITY
end
def update(amount, attributes, data_points, cardinality_limit, exemplar_offer: false)
hdp = if data_points.key?(attributes)
data_points[attributes]
elsif data_points.size >= cardinality_limit - 1
data_points[OVERFLOW_ATTRIBUTE_SET] || create_new_data_point(OVERFLOW_ATTRIBUTE_SET, data_points)
else
create_new_data_point(attributes, data_points)
end

data_points[attributes] = HistogramDataPoint.new(
attributes,
nil, # :start_time_unix_nano
nil, # :time_unix_nano
0, # :count
0, # :sum
empty_bucket_counts, # :bucket_counts
@boundaries, # :explicit_bounds
nil, # :exemplars
min, # :min
max # :max
)
end
update_histogram_data_point(hdp, amount, exemplar_offer: exemplar_offer)
nil
end

reservoir = @exemplar_reservoir_storage[attributes]
unless reservoir
reservoir = @exemplar_reservoir.dup
reservoir.reset
@exemplar_reservoir_storage[attributes] = reservoir
end
def aggregation_temporality
@aggregation_temporality.temporality
end

if exemplar_offer
reservoir.offer(value: amount,
timestamp: OpenTelemetry::Common::Utilities.time_in_nanoseconds,
attributes: attributes,
context: OpenTelemetry::Context.current)
private

def create_new_data_point(attributes, data_points)
if @record_min_max
min = Float::INFINITY
max = -Float::INFINITY
end

data_points[attributes] = HistogramDataPoint.new(
attributes,
nil, # :start_time_unix_nano
nil, # :time_unix_nano
0, # :count
0, # :sum
empty_bucket_counts, # :bucket_counts
@boundaries, # :explicit_bounds
nil, # :exemplars
min, # :min
max # :max
)
end

def update_histogram_data_point(hdp, amount, exemplar_offer: false)
reservior_update(hdp.attributes, amount, exemplar_offer)

if @record_min_max
hdp.max = amount if amount > hdp.max
hdp.min = amount if amount < hdp.min
end

hdp.sum += amount
hdp.count += 1
if @boundaries
bucket_index = @boundaries.bsearch_index { |i| i >= amount } || @boundaries.size
hdp.bucket_counts[bucket_index] += 1
end
nil
end
return unless @boundaries

def aggregation_temporality
@aggregation_temporality.temporality
bucket_index = @boundaries.bsearch_index { |i| i >= amount } || @boundaries.size
hdp.bucket_counts[bucket_index] += 1
end

private
def reservior_update(attributes, amount, exemplar_offer)
reservoir = @exemplar_reservoir_storage[attributes]
unless reservoir
reservoir = @exemplar_reservoir.dup
reservoir.reset
@exemplar_reservoir_storage[attributes] = reservoir
end

return unless exemplar_offer

reservoir.offer(value: amount,
timestamp: OpenTelemetry::Common::Utilities.time_in_nanoseconds,
attributes: attributes,
context: OpenTelemetry::Context.current)
end

def empty_bucket_counts
@boundaries ? Array.new(@boundaries.size + 1, 0) : nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ module Metrics
module Aggregation
# Contains the implementation of the {https://opentelemetry.io/docs/specs/otel/metrics/data-model/#exponentialhistogram ExponentialBucketHistogram} aggregation
class ExponentialBucketHistogram # rubocop:disable Metrics/ClassLength
OVERFLOW_ATTRIBUTE_SET = { 'otel.metric.overflow' => true }.freeze

# relate to min max scale: https://opentelemetry.io/docs/specs/otel/metrics/sdk/#support-a-minimum-and-maximum-scale
DEFAULT_SIZE = 160
DEFAULT_SCALE = 20
Expand Down Expand Up @@ -221,49 +223,53 @@ def collect(start_time, end_time, data_points)
# rubocop:enable Metrics/MethodLength

# this is aggregate in python; there is no merge in aggregate; but rescale happened
# rubocop:disable Metrics/MethodLength, Metrics/CyclomaticComplexity
def update(amount, attributes, data_points, exemplar_offer: false)
# fetch or initialize the ExponentialHistogramDataPoint
hdp = data_points.fetch(attributes) do
if @record_min_max
min = Float::INFINITY
max = -Float::INFINITY
end
def update(amount, attributes, data_points, cardinality_limit, exemplar_offer: false)
hdp = if data_points.key?(attributes)
data_points[attributes]
elsif data_points.size >= cardinality_limit - 1
data_points[OVERFLOW_ATTRIBUTE_SET] || create_new_data_point(OVERFLOW_ATTRIBUTE_SET, data_points)
else
create_new_data_point(attributes, data_points)
end

update_histogram_data_point(hdp, attributes, amount, exemplar_offer: exemplar_offer)
nil
end

# this code block will only be executed if no data_points was found with the attributes
data_points[attributes] = ExponentialHistogramDataPoint.new(
attributes,
nil, # :start_time_unix_nano
0, # :time_unix_nano
0, # :count
0, # :sum
@scale, # :scale
@zero_count, # :zero_count
ExponentialHistogram::Buckets.new, # :positive
ExponentialHistogram::Buckets.new, # :negative
0, # :flags
nil, # :exemplars
min, # :min
max, # :max
@zero_threshold # :zero_threshold
)
end
def aggregation_temporality
@aggregation_temporality.temporality
end

reservoir = @exemplar_reservoir_storage[attributes]
unless reservoir
reservoir = @exemplar_reservoir.dup
reservoir.reset
@exemplar_reservoir_storage[attributes] = reservoir
end
private

if exemplar_offer
reservoir.offer(value: amount,
timestamp: OpenTelemetry::Common::Utilities.time_in_nanoseconds,
attributes: attributes,
context: OpenTelemetry::Context.current)
def create_new_data_point(attributes, data_points)
if @record_min_max
min = Float::INFINITY
max = -Float::INFINITY
end

# Start to populate the data point (esp. the buckets)
data_points[attributes] = ExponentialHistogramDataPoint.new(
attributes,
nil, # :start_time_unix_nano
0, # :time_unix_nano
0, # :count
0, # :sum
@scale, # :scale
@zero_count, # :zero_count
ExponentialHistogram::Buckets.new, # :positive
ExponentialHistogram::Buckets.new, # :negative
0, # :flags
nil, # :exemplars
min, # :min
max, # :max
@zero_threshold # :zero_threshold
)
end

# rubocop:disable Metrics/CyclomaticComplexity,Metrics/MethodLength
def update_histogram_data_point(hdp, attributes, amount, exemplar_offer: false)
reservior_update(attributes, amount, exemplar_offer)

if @record_min_max
hdp.max = amount if amount > hdp.max
hdp.min = amount if amount < hdp.min
Expand Down Expand Up @@ -339,15 +345,8 @@ def update(amount, attributes, data_points, exemplar_offer: false)
bucket_index += buckets.counts.size if bucket_index.negative?

buckets.increment_bucket(bucket_index)
nil
end
# rubocop:enable Metrics/MethodLength, Metrics/CyclomaticComplexity

def aggregation_temporality
@aggregation_temporality.temporality
end

private
# rubocop:enable Metrics/CyclomaticComplexity,Metrics/MethodLength

def grow_buckets(span, buckets)
return if span < buckets.counts.size
Expand All @@ -356,6 +355,22 @@ def grow_buckets(span, buckets)
buckets.grow(span + 1, @size)
end

def reservior_update(attributes, amount, exemplar_offer)
reservoir = @exemplar_reservoir_storage[attributes]
unless reservoir
reservoir = @exemplar_reservoir.dup
reservoir.reset
@exemplar_reservoir_storage[attributes] = reservoir
end

return unless exemplar_offer

reservoir.offer(value: amount,
timestamp: OpenTelemetry::Common::Utilities.time_in_nanoseconds,
attributes: attributes,
context: OpenTelemetry::Context.current)
end

def new_mapping(scale)
scale = validate_scale(scale)
scale <= 0 ? ExponentialHistogram::ExponentMapping.new(scale) : ExponentialHistogram::LogarithmMapping.new(scale)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ module Metrics
module Aggregation
# Contains the implementation of the LastValue aggregation
class LastValue
OVERFLOW_ATTRIBUTE_SET = { 'otel.metric.overflow' => true }.freeze
attr_reader :exemplar_reservoir

# if no reservoir pass from instrument, then use this empty reservoir to avoid no method found error
Expand All @@ -33,29 +34,51 @@ def collect(start_time, end_time, data_points)
ndps
end

def update(increment, attributes, data_points, exemplar_offer: false)
reservoir = @exemplar_reservoir_storage[attributes]
unless reservoir
reservoir = @exemplar_reservoir.dup
reservoir.reset
@exemplar_reservoir_storage[attributes] = reservoir
end
def update(increment, attributes, data_points, cardinality_limit, exemplar_offer: false)
# Check if we already have this attribute set
ndp = if data_points.key?(attributes)
data_points[attributes]
elsif data_points.size >= cardinality_limit - 1
data_points[OVERFLOW_ATTRIBUTE_SET] || create_new_data_point(OVERFLOW_ATTRIBUTE_SET, data_points)
else
create_new_data_point(attributes, data_points)
end

if exemplar_offer
reservoir.offer(value: increment,
timestamp: OpenTelemetry::Common::Utilities.time_in_nanoseconds,
attributes: attributes,
context: OpenTelemetry::Context.current)
end
update_number_data_point(ndp, increment, exemplar_offer: exemplar_offer)
nil
end

private

def create_new_data_point(attributes, data_points)
data_points[attributes] = NumberDataPoint.new(
attributes,
nil,
nil,
increment,
0,
nil
)
nil
end

def update_number_data_point(ndp, increment, exemplar_offer: false)
ndp.value = increment
reservior_update(ndp.attributes, increment, exemplar_offer)
end

def reservior_update(attributes, increment, exemplar_offer)
reservoir = @exemplar_reservoir_storage[attributes]
unless reservoir
reservoir = @exemplar_reservoir.dup
reservoir.reset
@exemplar_reservoir_storage[attributes] = reservoir
end

return unless exemplar_offer

reservoir.offer(value: increment,
timestamp: OpenTelemetry::Common::Utilities.time_in_nanoseconds,
attributes: attributes,
context: OpenTelemetry::Context.current)
end
end
end
Expand Down
Loading
Loading