Skip to content

fix(consumer): Don't Crash Rust Consumer on Transport Error#535

Merged
george-sentry merged 2 commits intomainfrom
george/fix/rust-crashes-on-transient-errors
Apr 23, 2026
Merged

fix(consumer): Don't Crash Rust Consumer on Transport Error#535
george-sentry merged 2 commits intomainfrom
george/fix/rust-crashes-on-transient-errors

Conversation

@george-sentry
Copy link
Copy Markdown
Member

Linear

Completes STREAM-856

Description

Python and Rust Arroyo consumers handle errors differently during poll.

In Python, consumers raise TransportError if they hit KafkaError._TRANSPORT during poll.

elif code == KafkaError._TRANSPORT:
raise TransportError(str(error))

TransportError is a RecoverableError, so StreamProcessor just swallows the error and goes to the next loop.

except RecoverableError:
return

Rust doesn't seem to have a concept of recoverable errors, so it just crashes when we hit any error during poll.

tracing::info!("Caught error, terminating strategy");

We need to update Rust Arroyo to retry / ignore KafkaError._TRANSPORT. This PR accomplishes that by checking whether the error is one we consider to be "recoverable", and if it is, we return Ok(None). If it isn't one we consider "recoverable," we return an error as before.

@george-sentry george-sentry requested review from a team as code owners April 21, 2026 00:33
@linear-code
Copy link
Copy Markdown

linear-code Bot commented Apr 21, 2026

@george-sentry
Copy link
Copy Markdown
Member Author

My solution is kind of ad hoc, and doesn't really reflect the Python logic. I could do that by extending the Rust ConsumerError enum with a new member like RecoverableError or TransportError.

Comment thread rust-arroyo/src/backends/kafka/mod.rs Outdated
}

/// Treat `RD_KAFKA_RESP_ERR__TRANSPORT` errors from `librdkafka` like an empty poll the same way Python treats `KafkaError._TRANSPORT`.
fn kafka_poll_error_is_recoverable_transport(err: &KafkaError) -> bool {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe a bad idea, but you can make this a macro like such:

macro_rules recoverable_error_pattern {
    () => ((KafkaError::MessageConsumption(RDKafkaErrorCode::BrokerTransportFailure)
            | KafkaError::Global(RDKafkaErrorCode::BrokerTransportFailure));
}

...

match res {
    Some(Err(recoverable_error_pattern!())) => ...,
    Some(Err(err)) => ...
}

then the exhaustiveness and dead code checks of the compiler will continue to be effective because you avoided one if matches!()

@george-sentry george-sentry merged commit 227a9b8 into main Apr 23, 2026
17 checks passed
@george-sentry george-sentry deleted the george/fix/rust-crashes-on-transient-errors branch April 23, 2026 04:46
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants