From f576062f2ae8f4fbeb8e2a6d38877e947eb33182 Mon Sep 17 00:00:00 2001 From: Maarten Sebregts Date: Thu, 12 Feb 2026 11:14:59 +0100 Subject: [PATCH] Add exponential backoff while waiting for kafka topic Improve diagnostics by adding log messages. --- src/imas_streams/kafka.py | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/src/imas_streams/kafka.py b/src/imas_streams/kafka.py index 19da882..8666272 100644 --- a/src/imas_streams/kafka.py +++ b/src/imas_streams/kafka.py @@ -23,6 +23,8 @@ DEFAULT_KAFKA_CONSUMER_TIMEOUT = 60 # seconds +_INITIAL_BACKOFF_TIME = 0.02 # seconds +_MAXIMUM_BACKOFF_TIME = 1.0 # seconds _STREAMING_HEADER_KEY = "streaming-imas-metadata" @@ -190,6 +192,8 @@ def _subscribe(self, timeout) -> StreamingIMASMetadata: # Wait until the requested topic is available start_time = time.monotonic() topic_name = self._settings.topic_name + logger.info("Subscribing to topic '%s' ...", topic_name) + backoff = _INITIAL_BACKOFF_TIME while True: metadata = self._consumer.list_topics(topic=topic_name, timeout=1) topic_metadata = metadata.topics.get(topic_name) @@ -197,14 +201,23 @@ def _subscribe(self, timeout) -> StreamingIMASMetadata: break if (start_time + timeout) <= time.monotonic(): raise RuntimeError( - f"Timeout while waiting for kafka topic '{topic_name}'" + f"Timeout reached while waiting for kafka topic '{topic_name}'" ) + # Don't busy-loop when topic does not exist yet + logger.info( + "Topic '%s' does not exist yet, waiting %gs before retry", + topic_name, + backoff, + ) + time.sleep(backoff) + backoff = min(backoff * 2, _MAXIMUM_BACKOFF_TIME) # The topic is available, subscribe and receive metadata self._consumer.subscribe([topic_name]) + logger.info("Subscribed to topic '%s'", topic_name) msg = self._consumer.poll(timeout) if msg is None: - raise RuntimeError("Timeout while waiting for streaming metadata.") + raise RuntimeError("Timeout reached while waiting for streaming metadata.") if msg.error() is not None: raise msg.error() headers = dict(msg.headers())