Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 36 additions & 2 deletions src/imas_streams/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@
_INITIAL_BACKOFF_TIME = 0.02 # seconds
_MAXIMUM_BACKOFF_TIME = 1.0 # seconds
_STREAMING_HEADER_KEY = "streaming-imas-metadata"
# Kafka server will wait maximal _FETCH_WAIT_MAX_MS before sending new messages to the
# consumer. Adjusted from the default (500ms) to decrease latency, especially when using
# "most_recent_only" since a seek() needs to wait at least this amount of time before it
# is effective (see KafkaConsumer._fast_forward()).
_FETCH_WAIT_MAX_MS = 50 # milli-seconds


class KafkaSettings(BaseModel):
Expand Down Expand Up @@ -151,7 +156,8 @@ def __init__(
settings: KafkaSettings,
stream_consumer_cls: type[StreamConsumer],
*,
timeout=DEFAULT_KAFKA_CONSUMER_TIMEOUT,
timeout: int = DEFAULT_KAFKA_CONSUMER_TIMEOUT,
most_recent_only: bool = False,
**stream_consumer_kwargs,
) -> None:
"""Create a new KafkaConsumer.
Expand All @@ -163,13 +169,23 @@ def __init__(
settings: Kafka host and topic to connect to.
stream_consumer_cls: StreamConsumer type used for processing the received
messages.

Keyword Args:
timeout: Maximum time (in seconds) to wait for the topic.
most_recent_only: Set to True to only receive the most recent message with
every iteration of ``stream()``.
stream_consumser_kwargs: any additional keyword arguments are forwarded to
the constructor of ``stream_consumer_cls``.
"""
self._settings = settings
self._most_recent_only = most_recent_only
conf = {
"bootstrap.servers": settings.host,
"auto.offset.reset": "earliest",
"group.id": str(uuid.uuid4()),
# This influences the latency of receiving messages. Also impacts the seek()
# in self._fast_forward()!
"fetch.wait.max.ms": _FETCH_WAIT_MAX_MS,
}
self._consumer = confluent_kafka.Consumer(conf)

Expand Down Expand Up @@ -220,7 +236,7 @@ def _subscribe(self, timeout) -> StreamingIMASMetadata:
raise RuntimeError("Timeout reached while waiting for streaming metadata.")
if msg.error() is not None:
raise msg.error()
headers = dict(msg.headers())
headers = dict(msg.headers() or [])
if _STREAMING_HEADER_KEY not in headers:
raise RuntimeError(
f"Topic '{topic_name}' does not contain IMAS streaming metadata."
Expand All @@ -244,6 +260,8 @@ def stream(self, *, timeout=DEFAULT_KAFKA_CONSUMER_TIMEOUT) -> Iterator[Any]:
"""
try:
while True:
if self._most_recent_only:
self._fast_forward()
msg = self._consumer.poll(timeout)
if msg is None:
logger.info(
Expand All @@ -263,3 +281,19 @@ def stream(self, *, timeout=DEFAULT_KAFKA_CONSUMER_TIMEOUT) -> Iterator[Any]:
finally:
self._consumer.commit()
self._consumer.close()

def _fast_forward(self) -> None:
"""Fast forward the Kafka stream, so the last available message will be returned
next."""
assignment = self._consumer.assignment()
if len(assignment) != 1:
raise RuntimeError(f"Expected a single topic assignment, got {assignment}")
cur_offset = self._consumer.position(assignment)[0]
_, high_watermark = self._consumer.get_watermark_offsets(assignment[0])
# Check if we're not already at the end
if cur_offset.offset < high_watermark - 1:
cur_offset.offset = high_watermark - 1
self._consumer.seek(cur_offset)
# Wait for the in-flight request to return, seek takes effect afterwards
# See: https://www.feldera.com/blog/seeking-in-kafka
time.sleep(_FETCH_WAIT_MAX_MS / 1000)
24 changes: 24 additions & 0 deletions tests/test_kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,30 @@ def test_kafka_producer_consumer(kafka_host, test_magnetics):
assert i == 4 # We should have received 5 messages


def test_kafka_producer_consumer_most_recent_only(kafka_host, test_magnetics):
ids_producer = StreamingIDSProducer(test_magnetics)
settings = KafkaSettings(host=kafka_host, topic_name="test")
kafka_producer = KafkaProducer(settings, ids_producer.metadata)

for i in range(5):
test_magnetics.time[0] = i
test_magnetics.flux_loop[0].flux.data[0] = 1 - i / 10

message = ids_producer.create_message(test_magnetics)
kafka_producer.produce(bytes(message))

kafka_consumer = KafkaConsumer(
settings, StreamingIDSConsumer, most_recent_only=True
)
result = list(kafka_consumer.stream(timeout=0.1))
# We should have only the most recent message at i=4
assert len(result) == 1
ids = result[0]
assert ids.time[0] == 4
assert ids.flux_loop[0].name == "test"
assert ids.flux_loop[0].flux.data[0] == 1 - 4 / 10


def test_kafka_producer_topic_exists(kafka_host, test_magnetics):
ids_producer = StreamingIDSProducer(test_magnetics)
settings = KafkaSettings(host=kafka_host, topic_name="test")
Expand Down
Loading