From deed2d69286ced82495b6ddfba88259c8884efee Mon Sep 17 00:00:00 2001 From: Lukas Hering Date: Mon, 16 Feb 2026 01:11:38 -0500 Subject: [PATCH 1/4] initial fix for instrumentation creation race condition --- .../sdk/metrics/_internal/__init__.py | 28 ++++++++----------- 1 file changed, 12 insertions(+), 16 deletions(-) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/__init__.py index faa0959fce2..db036d897fc 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/__init__.py @@ -90,7 +90,17 @@ def __init__( self._instrument_id_instrument_lock = Lock() def create_counter(self, name, unit="", description="") -> APICounter: - status = self._register_instrument(name, _Counter, unit, description) + with self._instrument_id_instrument_lock: + status = self._register_instrument(name, _Counter, unit, description) + if not status.already_registered: + self._instrument_id_instrument[status.instrument_id] = _Counter( + name, + self._instrumentation_scope, + self._measurement_consumer, + unit, + description, + ) + instrument = self._instrument_id_instrument[status.instrument_id] if status.conflict: # FIXME #2558 go through all views here and check if this @@ -103,21 +113,7 @@ def create_counter(self, name, unit="", description="") -> APICounter: description, status, ) - if status.already_registered: - with self._instrument_id_instrument_lock: - return self._instrument_id_instrument[status.instrument_id] - - instrument = _Counter( - name, - self._instrumentation_scope, - self._measurement_consumer, - unit, - description, - ) - - with self._instrument_id_instrument_lock: - self._instrument_id_instrument[status.instrument_id] = instrument - return instrument + return instrument def create_up_down_counter( self, name, unit="", description="" From 712cd903d806c4c9c69655f9c66308ee551df60c Mon Sep 17 00:00:00 2001 From: Lukas Hering Date: Mon, 16 Feb 2026 11:30:36 -0500 Subject: [PATCH 2/4] fix remaining instrument creation functions in SDK Meter --- .../sdk/metrics/_internal/__init__.py | 251 +++++++++--------- 1 file changed, 125 insertions(+), 126 deletions(-) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/__init__.py index db036d897fc..8e9795bdbc8 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/__init__.py @@ -91,14 +91,18 @@ def __init__( def create_counter(self, name, unit="", description="") -> APICounter: with self._instrument_id_instrument_lock: - status = self._register_instrument(name, _Counter, unit, description) + status = self._register_instrument( + name, _Counter, unit, description + ) if not status.already_registered: - self._instrument_id_instrument[status.instrument_id] = _Counter( - name, - self._instrumentation_scope, - self._measurement_consumer, - unit, - description, + self._instrument_id_instrument[status.instrument_id] = ( + _Counter( + name, + self._instrumentation_scope, + self._measurement_consumer, + unit, + description, + ) ) instrument = self._instrument_id_instrument[status.instrument_id] @@ -118,9 +122,21 @@ def create_counter(self, name, unit="", description="") -> APICounter: def create_up_down_counter( self, name, unit="", description="" ) -> APIUpDownCounter: - status = self._register_instrument( - name, _UpDownCounter, unit, description - ) + with self._instrument_id_instrument_lock: + status = self._register_instrument( + name, _UpDownCounter, unit, description + ) + if not status.already_registered: + self._instrument_id_instrument[status.instrument_id] = ( + _UpDownCounter( + name, + self._instrumentation_scope, + self._measurement_consumer, + unit, + description, + ) + ) + instrument = self._instrument_id_instrument[status.instrument_id] if status.conflict: # FIXME #2558 go through all views here and check if this @@ -133,21 +149,7 @@ def create_up_down_counter( description, status, ) - if status.already_registered: - with self._instrument_id_instrument_lock: - return self._instrument_id_instrument[status.instrument_id] - - instrument = _UpDownCounter( - name, - self._instrumentation_scope, - self._measurement_consumer, - unit, - description, - ) - - with self._instrument_id_instrument_lock: - self._instrument_id_instrument[status.instrument_id] = instrument - return instrument + return instrument def create_observable_counter( self, @@ -156,9 +158,28 @@ def create_observable_counter( unit="", description="", ) -> APIObservableCounter: - status = self._register_instrument( - name, _ObservableCounter, unit, description - ) + with self._instrument_id_instrument_lock: + status = self._register_instrument( + name, _ObservableCounter, unit, description + ) + if not status.already_registered: + instrument = _ObservableCounter( + name, + self._instrumentation_scope, + self._measurement_consumer, + callbacks, + unit, + description, + ) + self._instrument_id_instrument[status.instrument_id] = ( + instrument + ) + instrument = self._instrument_id_instrument[status.instrument_id] + + if not status.already_registered: + self._measurement_consumer.register_asynchronous_instrument( + instrument + ) if status.conflict: # FIXME #2558 go through all views here and check if this @@ -171,24 +192,7 @@ def create_observable_counter( description, status, ) - if status.already_registered: - with self._instrument_id_instrument_lock: - return self._instrument_id_instrument[status.instrument_id] - - instrument = _ObservableCounter( - name, - self._instrumentation_scope, - self._measurement_consumer, - callbacks, - unit, - description, - ) - - self._measurement_consumer.register_asynchronous_instrument(instrument) - - with self._instrument_id_instrument_lock: - self._instrument_id_instrument[status.instrument_id] = instrument - return instrument + return instrument def create_histogram( self, @@ -219,13 +223,26 @@ def create_histogram( "explicit_bucket_boundaries_advisory must be a sequence of numbers" ) - status = self._register_instrument( - name, - _Histogram, - unit, - description, - explicit_bucket_boundaries_advisory, - ) + with self._instrument_id_instrument_lock: + status = self._register_instrument( + name, + _Histogram, + unit, + description, + explicit_bucket_boundaries_advisory, + ) + if not status.already_registered: + self._instrument_id_instrument[status.instrument_id] = ( + _Histogram( + name, + self._instrumentation_scope, + self._measurement_consumer, + unit, + description, + explicit_bucket_boundaries_advisory, + ) + ) + instrument = self._instrument_id_instrument[status.instrument_id] if status.conflict: # FIXME #2558 go through all views here and check if this @@ -238,24 +255,20 @@ def create_histogram( description, status, ) - if status.already_registered: - with self._instrument_id_instrument_lock: - return self._instrument_id_instrument[status.instrument_id] - - instrument = _Histogram( - name, - self._instrumentation_scope, - self._measurement_consumer, - unit, - description, - explicit_bucket_boundaries_advisory, - ) - with self._instrument_id_instrument_lock: - self._instrument_id_instrument[status.instrument_id] = instrument - return instrument + return instrument def create_gauge(self, name, unit="", description="") -> APIGauge: - status = self._register_instrument(name, _Gauge, unit, description) + with self._instrument_id_instrument_lock: + status = self._register_instrument(name, _Gauge, unit, description) + if not status.already_registered: + self._instrument_id_instrument[status.instrument_id] = _Gauge( + name, + self._instrumentation_scope, + self._measurement_consumer, + unit, + description, + ) + instrument = self._instrument_id_instrument[status.instrument_id] if status.conflict: # FIXME #2558 go through all views here and check if this @@ -268,28 +281,31 @@ def create_gauge(self, name, unit="", description="") -> APIGauge: description, status, ) - if status.already_registered: - with self._instrument_id_instrument_lock: - return self._instrument_id_instrument[status.instrument_id] - - instrument = _Gauge( - name, - self._instrumentation_scope, - self._measurement_consumer, - unit, - description, - ) - - with self._instrument_id_instrument_lock: - self._instrument_id_instrument[status.instrument_id] = instrument - return instrument + return instrument def create_observable_gauge( self, name, callbacks=None, unit="", description="" ) -> APIObservableGauge: - status = self._register_instrument( - name, _ObservableGauge, unit, description - ) + with self._instrument_id_instrument_lock: + status = self._register_instrument( + name, _ObservableGauge, unit, description + ) + if not status.already_registered: + instrument = _ObservableGauge( + name, + self._instrumentation_scope, + self._measurement_consumer, + callbacks, + unit, + description, + ) + self._measurement_consumer.register_asynchronous_instrument( + instrument + ) + self._instrument_id_instrument[status.instrument_id] = ( + instrument + ) + instrument = self._instrument_id_instrument[status.instrument_id] if status.conflict: # FIXME #2558 go through all views here and check if this @@ -302,31 +318,31 @@ def create_observable_gauge( description, status, ) - if status.already_registered: - with self._instrument_id_instrument_lock: - return self._instrument_id_instrument[status.instrument_id] - - instrument = _ObservableGauge( - name, - self._instrumentation_scope, - self._measurement_consumer, - callbacks, - unit, - description, - ) - - self._measurement_consumer.register_asynchronous_instrument(instrument) - - with self._instrument_id_instrument_lock: - self._instrument_id_instrument[status.instrument_id] = instrument - return instrument + return instrument def create_observable_up_down_counter( self, name, callbacks=None, unit="", description="" ) -> APIObservableUpDownCounter: - status = self._register_instrument( - name, _ObservableUpDownCounter, unit, description - ) + with self._instrument_id_instrument_lock: + status = self._register_instrument( + name, _ObservableUpDownCounter, unit, description + ) + if not status.already_registered: + instrument = _ObservableUpDownCounter( + name, + self._instrumentation_scope, + self._measurement_consumer, + callbacks, + unit, + description, + ) + self._measurement_consumer.register_asynchronous_instrument( + instrument + ) + self._instrument_id_instrument[status.instrument_id] = ( + instrument + ) + instrument = self._instrument_id_instrument[status.instrument_id] if status.conflict: # FIXME #2558 go through all views here and check if this @@ -339,24 +355,7 @@ def create_observable_up_down_counter( description, status, ) - if status.already_registered: - with self._instrument_id_instrument_lock: - return self._instrument_id_instrument[status.instrument_id] - - instrument = _ObservableUpDownCounter( - name, - self._instrumentation_scope, - self._measurement_consumer, - callbacks, - unit, - description, - ) - - self._measurement_consumer.register_asynchronous_instrument(instrument) - - with self._instrument_id_instrument_lock: - self._instrument_id_instrument[status.instrument_id] = instrument - return instrument + return instrument def _get_exemplar_filter(exemplar_filter: str) -> ExemplarFilter: From 0a5124ea32f69f3c4322b20a0c2930fb39fdb8ea Mon Sep 17 00:00:00 2001 From: Lukas Hering Date: Mon, 16 Feb 2026 13:47:33 -0500 Subject: [PATCH 3/4] update CHANGELOG.md --- CHANGELOG.md | 2 + .../sdk/metrics/_internal/__init__.py | 31 ++++------ .../tests/metrics/test_metrics.py | 59 ++++++++++++++++++- 3 files changed, 73 insertions(+), 19 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6a1e33938ae..1d3f839d93d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -40,6 +40,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#4709](https://github.com/open-telemetry/opentelemetry-python/pull/4709)) - Implement experimental TracerConfigurator ([#4861](https://github.com/open-telemetry/opentelemetry-python/pull/4861)) +- `opentelemetry-sdk`: Fix instrument creation race condition + ([#4913](https://github.com/open-telemetry/opentelemetry-python/pull/4913)) ## Version 1.39.0/0.60b0 (2025-12-03) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/__init__.py index 8e9795bdbc8..61532c6802d 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/__init__.py @@ -163,7 +163,7 @@ def create_observable_counter( name, _ObservableCounter, unit, description ) if not status.already_registered: - instrument = _ObservableCounter( + self._instrument_id_instrument[status.instrument_id] = _ObservableCounter( name, self._instrumentation_scope, self._measurement_consumer, @@ -171,9 +171,6 @@ def create_observable_counter( unit, description, ) - self._instrument_id_instrument[status.instrument_id] = ( - instrument - ) instrument = self._instrument_id_instrument[status.instrument_id] if not status.already_registered: @@ -291,7 +288,7 @@ def create_observable_gauge( name, _ObservableGauge, unit, description ) if not status.already_registered: - instrument = _ObservableGauge( + self._instrument_id_instrument[status.instrument_id] = _ObservableGauge( name, self._instrumentation_scope, self._measurement_consumer, @@ -299,14 +296,13 @@ def create_observable_gauge( unit, description, ) - self._measurement_consumer.register_asynchronous_instrument( - instrument - ) - self._instrument_id_instrument[status.instrument_id] = ( - instrument - ) instrument = self._instrument_id_instrument[status.instrument_id] + if not status.already_registered: + self._measurement_consumer.register_asynchronous_instrument( + instrument + ) + if status.conflict: # FIXME #2558 go through all views here and check if this # instrument registration conflict can be fixed. If it can be, do @@ -328,7 +324,7 @@ def create_observable_up_down_counter( name, _ObservableUpDownCounter, unit, description ) if not status.already_registered: - instrument = _ObservableUpDownCounter( + self._instrument_id_instrument[status.instrument_id] = _ObservableUpDownCounter( name, self._instrumentation_scope, self._measurement_consumer, @@ -336,14 +332,13 @@ def create_observable_up_down_counter( unit, description, ) - self._measurement_consumer.register_asynchronous_instrument( - instrument - ) - self._instrument_id_instrument[status.instrument_id] = ( - instrument - ) instrument = self._instrument_id_instrument[status.instrument_id] + if not status.already_registered: + self._measurement_consumer.register_asynchronous_instrument( + instrument + ) + if status.conflict: # FIXME #2558 go through all views here and check if this # instrument registration conflict can be fixed. If it can be, do diff --git a/opentelemetry-sdk/tests/metrics/test_metrics.py b/opentelemetry-sdk/tests/metrics/test_metrics.py index 3991fd6e154..0dc6d4ddf08 100644 --- a/opentelemetry-sdk/tests/metrics/test_metrics.py +++ b/opentelemetry-sdk/tests/metrics/test_metrics.py @@ -15,9 +15,11 @@ # pylint: disable=protected-access,no-self-use import weakref +from collections.abc import Callable from logging import WARNING +from threading import Lock from time import sleep -from typing import Iterable, Sequence +from typing import Any, Iterable, Sequence from unittest.mock import MagicMock, Mock, patch from opentelemetry.attributes import BoundedAttributes @@ -427,6 +429,61 @@ def test_consume_measurement_gauge(self, mock_sync_measurement_consumer): sync_consumer_instance.consume_measurement.assert_called() +class TestMeterConcurrency(ConcurrencyTestBase, TestCase): + def test_create_instrument_concurrency(self): + """ + Tests that concurrent creation of the same instrument does not + result in a KeyError or inconsistent state for all instrument types. + """ + + meter = Meter(Mock(), Mock()) + original_register = meter._register_instrument + lock = Lock() + registered_names = set() + + def mocked_register(name: str, *args, **kwargs): + status = original_register(name, *args, **kwargs) + with lock: + first: bool = name not in registered_names + registered_names.add(name) + + if first: + # Test interleaving of threads by sleeping after the first thread registers + # the instrument, but before the instrument is created. + sleep(0.25) + return status + + def make_create_instrument( + meter: Meter, method_name: str, args: list[Any] + ) -> Callable[[], Any]: + return lambda: getattr(meter, method_name)( + f"concurrent_{method_name}", *args + ) + + with patch.object( + meter, "_register_instrument", side_effect=mocked_register + ): + create_methods = [ + ("create_counter", []), + ("create_up_down_counter", []), + ("create_histogram", []), + ("create_gauge", []), + ("create_observable_counter", [[lambda options: []]]), + ("create_observable_gauge", [[lambda options: []]]), + ("create_observable_up_down_counter", [[lambda options: []]]), + ] + + for method_name, args in create_methods: + with self.subTest(method=method_name): + instruments = self.run_with_many_threads( + make_create_instrument(meter, method_name, args), + num_threads=20, + ) + + for instr in instruments: + self.assertIs(instr, instruments[0]) + + class TestMeter(TestCase): def setUp(self): self.meter = Meter(Mock(), Mock()) From ad130b42ebdacfdfb997e52a06517b8aae3373a6 Mon Sep 17 00:00:00 2001 From: Lukas Hering Date: Mon, 16 Feb 2026 13:50:51 -0500 Subject: [PATCH 4/4] fix lint error --- .../sdk/metrics/_internal/__init__.py | 48 +++++++++++-------- 1 file changed, 27 insertions(+), 21 deletions(-) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/__init__.py index 61532c6802d..d78352ecd5a 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/__init__.py @@ -163,13 +163,15 @@ def create_observable_counter( name, _ObservableCounter, unit, description ) if not status.already_registered: - self._instrument_id_instrument[status.instrument_id] = _ObservableCounter( - name, - self._instrumentation_scope, - self._measurement_consumer, - callbacks, - unit, - description, + self._instrument_id_instrument[status.instrument_id] = ( + _ObservableCounter( + name, + self._instrumentation_scope, + self._measurement_consumer, + callbacks, + unit, + description, + ) ) instrument = self._instrument_id_instrument[status.instrument_id] @@ -288,13 +290,15 @@ def create_observable_gauge( name, _ObservableGauge, unit, description ) if not status.already_registered: - self._instrument_id_instrument[status.instrument_id] = _ObservableGauge( - name, - self._instrumentation_scope, - self._measurement_consumer, - callbacks, - unit, - description, + self._instrument_id_instrument[status.instrument_id] = ( + _ObservableGauge( + name, + self._instrumentation_scope, + self._measurement_consumer, + callbacks, + unit, + description, + ) ) instrument = self._instrument_id_instrument[status.instrument_id] @@ -324,13 +328,15 @@ def create_observable_up_down_counter( name, _ObservableUpDownCounter, unit, description ) if not status.already_registered: - self._instrument_id_instrument[status.instrument_id] = _ObservableUpDownCounter( - name, - self._instrumentation_scope, - self._measurement_consumer, - callbacks, - unit, - description, + self._instrument_id_instrument[status.instrument_id] = ( + _ObservableUpDownCounter( + name, + self._instrumentation_scope, + self._measurement_consumer, + callbacks, + unit, + description, + ) ) instrument = self._instrument_id_instrument[status.instrument_id]