From 339dcce9c811205bb75eec9310131c145286a34a Mon Sep 17 00:00:00 2001 From: Maarten Sebregts Date: Wed, 4 Mar 2026 12:15:59 +0100 Subject: [PATCH 1/4] Implement option to only consume most recent data on the Kafka topic --- src/imas_streams/kafka.py | 26 +++++++++++++++++++++++++- tests/test_kafka.py | 24 ++++++++++++++++++++++++ 2 files changed, 49 insertions(+), 1 deletion(-) diff --git a/src/imas_streams/kafka.py b/src/imas_streams/kafka.py index 8986dad..244d1cb 100644 --- a/src/imas_streams/kafka.py +++ b/src/imas_streams/kafka.py @@ -26,6 +26,7 @@ _INITIAL_BACKOFF_TIME = 0.02 # seconds _MAXIMUM_BACKOFF_TIME = 1.0 # seconds _STREAMING_HEADER_KEY = "streaming-imas-metadata" +_FETCH_WAIT_MAX_MS = 50 class KafkaSettings(BaseModel): @@ -151,7 +152,8 @@ def __init__( settings: KafkaSettings, stream_consumer_cls: type[StreamConsumer], *, - timeout=DEFAULT_KAFKA_CONSUMER_TIMEOUT, + timeout: int = DEFAULT_KAFKA_CONSUMER_TIMEOUT, + most_recent_only: bool = False, **stream_consumer_kwargs, ) -> None: """Create a new KafkaConsumer. @@ -166,10 +168,14 @@ def __init__( timeout: Maximum time (in seconds) to wait for the topic. """ self._settings = settings + self._most_recent_only = most_recent_only conf = { "bootstrap.servers": settings.host, "auto.offset.reset": "earliest", "group.id": str(uuid.uuid4()), + # This influences the latency of receiving messages. Also impacts the seek() + # in self._fast_forward()! + "fetch.wait.max.ms": _FETCH_WAIT_MAX_MS, } self._consumer = confluent_kafka.Consumer(conf) @@ -244,6 +250,8 @@ def stream(self, *, timeout=DEFAULT_KAFKA_CONSUMER_TIMEOUT) -> Iterator[Any]: """ try: while True: + if self._most_recent_only: + self._fast_forward() msg = self._consumer.poll(timeout) if msg is None: logger.info( @@ -263,3 +271,19 @@ def stream(self, *, timeout=DEFAULT_KAFKA_CONSUMER_TIMEOUT) -> Iterator[Any]: finally: self._consumer.commit() self._consumer.close() + + def _fast_forward(self) -> None: + """Fast forward the Kafka stream, so the last available message will be returned + next.""" + assignment = self._consumer.assignment() + if len(assignment) != 1: + raise RuntimeError(f"Expected a single topic assignment, got {assignment}") + cur_offset = self._consumer.position(assignment)[0] + _, high_watermark = self._consumer.get_watermark_offsets(assignment[0]) + # Check if we're not already at the end + if cur_offset.offset < high_watermark - 1: + cur_offset.offset = high_watermark - 1 + self._consumer.seek(cur_offset) + # Wait for the in-flight request to return, seek takes effect afterwards + # See: https://www.feldera.com/blog/seeking-in-kafka + time.sleep(_FETCH_WAIT_MAX_MS / 1000) diff --git a/tests/test_kafka.py b/tests/test_kafka.py index 37cc7c7..aa646e3 100644 --- a/tests/test_kafka.py +++ b/tests/test_kafka.py @@ -74,6 +74,30 @@ def test_kafka_producer_consumer(kafka_host, test_magnetics): assert i == 4 # We should have received 5 messages +def test_kafka_producer_consumer_most_recent_only(kafka_host, test_magnetics): + ids_producer = StreamingIDSProducer(test_magnetics) + settings = KafkaSettings(host=kafka_host, topic_name="test") + kafka_producer = KafkaProducer(settings, ids_producer.metadata) + + for i in range(5): + test_magnetics.time[0] = i + test_magnetics.flux_loop[0].flux.data[0] = 1 - i / 10 + + message = ids_producer.create_message(test_magnetics) + kafka_producer.produce(bytes(message)) + + kafka_consumer = KafkaConsumer( + settings, StreamingIDSConsumer, most_recent_only=True + ) + result = list(kafka_consumer.stream(timeout=0.1)) + # We should have only the most recent message at i=4 + assert len(result) == 1 + ids = result[0] + assert ids.time[0] == 4 + assert ids.flux_loop[0].name == "test" + assert ids.flux_loop[0].flux.data[0] == 1 - 4 / 10 + + def test_kafka_producer_topic_exists(kafka_host, test_magnetics): ids_producer = StreamingIDSProducer(test_magnetics) settings = KafkaSettings(host=kafka_host, topic_name="test") From 1d248dd638f970b971f89b23c9d2df467d9e97e3 Mon Sep 17 00:00:00 2001 From: Maarten Sebregts Date: Wed, 4 Mar 2026 13:12:58 +0100 Subject: [PATCH 2/4] Fix TypeError ('NoneType' object is not iterable) when kafka message has no headers --- src/imas_streams/kafka.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/imas_streams/kafka.py b/src/imas_streams/kafka.py index 244d1cb..5ca0fca 100644 --- a/src/imas_streams/kafka.py +++ b/src/imas_streams/kafka.py @@ -226,7 +226,7 @@ def _subscribe(self, timeout) -> StreamingIMASMetadata: raise RuntimeError("Timeout reached while waiting for streaming metadata.") if msg.error() is not None: raise msg.error() - headers = dict(msg.headers()) + headers = dict(msg.headers() or []) if _STREAMING_HEADER_KEY not in headers: raise RuntimeError( f"Topic '{topic_name}' does not contain IMAS streaming metadata." From c4556673dce22d2a8c32726c7e4a2756418a01e2 Mon Sep 17 00:00:00 2001 From: Maarten Sebregts Date: Wed, 4 Mar 2026 14:08:48 +0100 Subject: [PATCH 3/4] Update docstrings for new argument --- src/imas_streams/kafka.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/imas_streams/kafka.py b/src/imas_streams/kafka.py index 5ca0fca..340ec4b 100644 --- a/src/imas_streams/kafka.py +++ b/src/imas_streams/kafka.py @@ -165,7 +165,13 @@ def __init__( settings: Kafka host and topic to connect to. stream_consumer_cls: StreamConsumer type used for processing the received messages. + + Keyword Args: timeout: Maximum time (in seconds) to wait for the topic. + most_recent_only: Set to True to only receive the most recent message with + every iteration of ``stream()``. + stream_consumser_kwargs: any additional keyword arguments are forwarded to + the constructor of ``stream_consumer_cls``. """ self._settings = settings self._most_recent_only = most_recent_only From df0b65e53f2916909f83f9fc52a6e9b4c7b8700d Mon Sep 17 00:00:00 2001 From: Maarten Sebregts Date: Wed, 11 Mar 2026 13:14:41 +0100 Subject: [PATCH 4/4] Add explanation comment for the _FETCH_WAIT_MAX_MS value --- src/imas_streams/kafka.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/imas_streams/kafka.py b/src/imas_streams/kafka.py index 340ec4b..df1d46c 100644 --- a/src/imas_streams/kafka.py +++ b/src/imas_streams/kafka.py @@ -26,7 +26,11 @@ _INITIAL_BACKOFF_TIME = 0.02 # seconds _MAXIMUM_BACKOFF_TIME = 1.0 # seconds _STREAMING_HEADER_KEY = "streaming-imas-metadata" -_FETCH_WAIT_MAX_MS = 50 +# Kafka server will wait maximal _FETCH_WAIT_MAX_MS before sending new messages to the +# consumer. Adjusted from the default (500ms) to decrease latency, especially when using +# "most_recent_only" since a seek() needs to wait at least this amount of time before it +# is effective (see KafkaConsumer._fast_forward()). +_FETCH_WAIT_MAX_MS = 50 # milli-seconds class KafkaSettings(BaseModel):