diff --git a/rust-arroyo/src/backends/kafka/mod.rs b/rust-arroyo/src/backends/kafka/mod.rs index 75183984..4e16ab7c 100644 --- a/rust-arroyo/src/backends/kafka/mod.rs +++ b/rust-arroyo/src/backends/kafka/mod.rs @@ -81,6 +81,15 @@ impl KafkaConsumerState { } } +/// Treat recoverable `librdkafka` errors as an empty poll the same way Python treats `KafkaError._TRANSPORT`. +fn kafka_error_is_recoverable(err: &KafkaError) -> bool { + matches!( + err, + KafkaError::MessageConsumption(RDKafkaErrorCode::BrokerTransportFailure) + | KafkaError::Global(RDKafkaErrorCode::BrokerTransportFailure) + ) +} + fn create_kafka_message(topics: &[Topic], msg: BorrowedMessage) -> BrokerMessage { let topic = msg.topic(); // NOTE: We avoid calling `Topic::new` here, as that uses a lock to intern the `topic` name. @@ -387,8 +396,17 @@ impl ArroyoConsumer for KafkaConsumer Ok(None), - Some(res) => { - let msg = create_kafka_message(&self.topics, res?); + + Some(Err(err)) if kafka_error_is_recoverable(&err) => { + let error: &dyn std::error::Error = &err; + tracing::warn!(error, "Kafka poll transport error, retrying..."); + Ok(None) + } + + Some(Err(err)) => Err(err.into()), + + Some(Ok(msg)) => { + let msg = create_kafka_message(&self.topics, msg); self.offset_state .lock() .offsets