From eb9a4ecc75b40590ef73c1d872f18ba688dc6467 Mon Sep 17 00:00:00 2001 From: james-mcnulty Date: Mon, 20 Apr 2026 17:27:19 -0700 Subject: [PATCH 1/2] Don't Crash Rust Consumer on Transient `RD_KAFKA_RESP_ERR__TRANSPORT` Error --- rust-arroyo/src/backends/kafka/mod.rs | 22 ++++++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/rust-arroyo/src/backends/kafka/mod.rs b/rust-arroyo/src/backends/kafka/mod.rs index 75183984..541c3835 100644 --- a/rust-arroyo/src/backends/kafka/mod.rs +++ b/rust-arroyo/src/backends/kafka/mod.rs @@ -81,6 +81,15 @@ impl KafkaConsumerState { } } +/// Treat `RD_KAFKA_RESP_ERR__TRANSPORT` errors from `librdkafka` like an empty poll the same way Python treats `KafkaError._TRANSPORT`. +fn kafka_poll_error_is_recoverable_transport(err: &KafkaError) -> bool { + matches!( + err, + KafkaError::MessageConsumption(RDKafkaErrorCode::BrokerTransportFailure) + | KafkaError::Global(RDKafkaErrorCode::BrokerTransportFailure) + ) +} + fn create_kafka_message(topics: &[Topic], msg: BorrowedMessage) -> BrokerMessage { let topic = msg.topic(); // NOTE: We avoid calling `Topic::new` here, as that uses a lock to intern the `topic` name. @@ -387,8 +396,17 @@ impl ArroyoConsumer for KafkaConsumer Ok(None), - Some(res) => { - let msg = create_kafka_message(&self.topics, res?); + + Some(Err(err)) if kafka_poll_error_is_recoverable_transport(&err) => { + let error: &dyn std::error::Error = &err; + tracing::warn!(error, "kafka poll transport error, retrying"); + Ok(None) + } + + Some(Err(err)) => Err(err.into()), + + Some(Ok(msg)) => { + let msg = create_kafka_message(&self.topics, msg); self.offset_state .lock() .offsets From a1113ea67b893d4d1c61ffc909583c5eb76baed0 Mon Sep 17 00:00:00 2001 From: james-mcnulty Date: Wed, 22 Apr 2026 21:37:03 -0700 Subject: [PATCH 2/2] Rename Recoverable Error Function --- rust-arroyo/src/backends/kafka/mod.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/rust-arroyo/src/backends/kafka/mod.rs b/rust-arroyo/src/backends/kafka/mod.rs index 541c3835..4e16ab7c 100644 --- a/rust-arroyo/src/backends/kafka/mod.rs +++ b/rust-arroyo/src/backends/kafka/mod.rs @@ -81,8 +81,8 @@ impl KafkaConsumerState { } } -/// Treat `RD_KAFKA_RESP_ERR__TRANSPORT` errors from `librdkafka` like an empty poll the same way Python treats `KafkaError._TRANSPORT`. -fn kafka_poll_error_is_recoverable_transport(err: &KafkaError) -> bool { +/// Treat recoverable `librdkafka` errors as an empty poll the same way Python treats `KafkaError._TRANSPORT`. +fn kafka_error_is_recoverable(err: &KafkaError) -> bool { matches!( err, KafkaError::MessageConsumption(RDKafkaErrorCode::BrokerTransportFailure) @@ -397,9 +397,9 @@ impl ArroyoConsumer for KafkaConsumer Ok(None), - Some(Err(err)) if kafka_poll_error_is_recoverable_transport(&err) => { + Some(Err(err)) if kafka_error_is_recoverable(&err) => { let error: &dyn std::error::Error = &err; - tracing::warn!(error, "kafka poll transport error, retrying"); + tracing::warn!(error, "Kafka poll transport error, retrying..."); Ok(None) }