Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions src/imas_streams/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@


DEFAULT_KAFKA_CONSUMER_TIMEOUT = 60 # seconds
_INITIAL_BACKOFF_TIME = 0.02 # seconds
_MAXIMUM_BACKOFF_TIME = 1.0 # seconds
_STREAMING_HEADER_KEY = "streaming-imas-metadata"


Expand Down Expand Up @@ -190,21 +192,32 @@ def _subscribe(self, timeout) -> StreamingIMASMetadata:
# Wait until the requested topic is available
start_time = time.monotonic()
topic_name = self._settings.topic_name
logger.info("Subscribing to topic '%s' ...", topic_name)
backoff = _INITIAL_BACKOFF_TIME
while True:
metadata = self._consumer.list_topics(topic=topic_name, timeout=1)
topic_metadata = metadata.topics.get(topic_name)
if topic_metadata is not None and topic_metadata.error is None:
break
if (start_time + timeout) <= time.monotonic():
raise RuntimeError(
f"Timeout while waiting for kafka topic '{topic_name}'"
f"Timeout reached while waiting for kafka topic '{topic_name}'"
)
# Don't busy-loop when topic does not exist yet
logger.info(
"Topic '%s' does not exist yet, waiting %gs before retry",
topic_name,
backoff,
)
time.sleep(backoff)
backoff = min(backoff * 2, _MAXIMUM_BACKOFF_TIME)

# The topic is available, subscribe and receive metadata
self._consumer.subscribe([topic_name])
logger.info("Subscribed to topic '%s'", topic_name)
msg = self._consumer.poll(timeout)
if msg is None:
raise RuntimeError("Timeout while waiting for streaming metadata.")
raise RuntimeError("Timeout reached while waiting for streaming metadata.")
if msg.error() is not None:
raise msg.error()
headers = dict(msg.headers())
Expand Down