-
-
Notifications
You must be signed in to change notification settings - Fork 0
feat(dlq): add dlq support (no-op) #277
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
480e5ce
366c86a
9263884
f7efc0a
0823737
f15da48
7925998
02aae45
7173824
0acfcd7
fb6b40d
ee6a8b7
e546512
9b9b8bc
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -6,7 +6,7 @@ | |
| //! The pipeline is built by adding RuntimeOperators to the consumer. | ||
|
|
||
| use crate::commit_policy::WatermarkCommitOffsets; | ||
| use crate::kafka_config::PyKafkaConsumerConfig; | ||
| use crate::kafka_config::{PyKafkaConsumerConfig, PyKafkaProducerConfig}; | ||
| use crate::messages::{into_pyraw, PyStreamingMessage, RawMessage, RoutedValuePayload}; | ||
| use crate::metrics::configure_metrics; | ||
| use crate::metrics_config::PyMetricConfig; | ||
|
|
@@ -18,7 +18,9 @@ use crate::utils::traced_with_gil; | |
| use crate::watermark::WatermarkEmitter; | ||
| use pyo3::prelude::*; | ||
| use rdkafka::message::{Header, Headers, OwnedHeaders}; | ||
| use sentry_arroyo::backends::kafka::producer::KafkaProducer; | ||
| use sentry_arroyo::backends::kafka::types::KafkaPayload; | ||
| use sentry_arroyo::processing::dlq::{DlqLimit, DlqPolicy, KafkaDlqProducer}; | ||
| use sentry_arroyo::processing::strategies::healthcheck::HealthCheck; | ||
| use sentry_arroyo::processing::strategies::noop::Noop; | ||
| use sentry_arroyo::processing::strategies::run_task::RunTask; | ||
|
|
@@ -34,6 +36,31 @@ use std::sync::Arc; | |
| /// Matches Arroyo docs for Kubernetes liveness probes. | ||
| const HEALTHCHECK_PATH: &str = "/tmp/health.txt"; | ||
|
|
||
| /// Configuration for Dead Letter Queue (DLQ). | ||
| /// When provided, invalid messages will be sent to the DLQ topic. | ||
| #[pyclass] | ||
| #[derive(Debug, Clone)] | ||
| pub struct DlqConfig { | ||
| /// The Kafka topic name to send invalid messages to | ||
| #[pyo3(get)] | ||
| pub topic: String, | ||
|
|
||
| /// The Kafka producer configuration for the DLQ | ||
| #[pyo3(get)] | ||
| pub producer_config: PyKafkaProducerConfig, | ||
| } | ||
|
|
||
| #[pymethods] | ||
| impl DlqConfig { | ||
| #[new] | ||
| fn new(topic: String, producer_config: PyKafkaProducerConfig) -> Self { | ||
| DlqConfig { | ||
| topic, | ||
| producer_config, | ||
| } | ||
| } | ||
| } | ||
|
cursor[bot] marked this conversation as resolved.
|
||
|
|
||
| /// The class that represent the consumer. | ||
| /// This class is exposed to python and it is the main entry point | ||
| /// used by the Python adapter to build a pipeline and run it. | ||
|
|
@@ -69,19 +96,26 @@ pub struct ArroyoConsumer { | |
|
|
||
| /// When true, wrap the strategy chain with HealthCheck to touch a file on poll for liveness. | ||
| write_healthcheck: bool, | ||
|
|
||
| /// DLQ (Dead Letter Queue) configuration. | ||
| /// If provided, invalid messages will be sent to the DLQ topic. | ||
| /// Otherwise, invalid messages will cause the consumer to stop processing. | ||
| #[pyo3(get)] | ||
| dlq_config: Option<DlqConfig>, | ||
| } | ||
|
|
||
| #[pymethods] | ||
| impl ArroyoConsumer { | ||
| #[new] | ||
| #[pyo3(signature = (source, kafka_config, topic, schema, metric_config=None, write_healthcheck=false))] | ||
| #[pyo3(signature = (source, kafka_config, topic, schema, metric_config=None, write_healthcheck=false, dlq_config=None))] | ||
| fn new( | ||
| source: String, | ||
| kafka_config: PyKafkaConsumerConfig, | ||
| topic: String, | ||
| schema: Option<String>, | ||
| metric_config: Option<PyMetricConfig>, | ||
| write_healthcheck: bool, | ||
| dlq_config: Option<DlqConfig>, | ||
| ) -> Self { | ||
| ArroyoConsumer { | ||
| consumer_config: kafka_config, | ||
|
|
@@ -93,6 +127,7 @@ impl ArroyoConsumer { | |
| concurrency_config: Arc::new(ConcurrencyConfig::new(1)), | ||
| metric_config, | ||
| write_healthcheck, | ||
| dlq_config, | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -120,7 +155,12 @@ impl ArroyoConsumer { | |
| self.write_healthcheck, | ||
| ); | ||
| let config = self.consumer_config.clone().into(); | ||
| let processor = StreamProcessor::with_kafka(config, factory, Topic::new(&self.topic), None); | ||
|
|
||
| // Build DLQ policy if configured | ||
| let dlq_policy = build_dlq_policy(&self.dlq_config, self.concurrency_config.handle()); | ||
|
|
||
| let processor = | ||
| StreamProcessor::with_kafka(config, factory, Topic::new(&self.topic), dlq_policy); | ||
| self.handle = Some(processor.get_handle()); | ||
|
|
||
| let mut handle = processor.get_handle(); | ||
|
|
@@ -143,6 +183,40 @@ impl ArroyoConsumer { | |
| } | ||
| } | ||
|
|
||
| /// Builds the DLQ policy if dlq_config is provided. | ||
| /// Returns None if DLQ is not configured. | ||
| pub fn build_dlq_policy( | ||
| dlq_config: &Option<DlqConfig>, | ||
| handle: tokio::runtime::Handle, | ||
| ) -> Option<DlqPolicy<KafkaPayload>> { | ||
| match dlq_config { | ||
| Some(dlq_config) => { | ||
| tracing::info!("Configuring DLQ with topic: {}", dlq_config.topic); | ||
|
|
||
| // Create Kafka producer for DLQ | ||
| let producer_config = dlq_config.producer_config.clone().into(); | ||
| let kafka_producer = KafkaProducer::new(producer_config); | ||
| let dlq_producer = KafkaDlqProducer::new(kafka_producer, Topic::new(&dlq_config.topic)); | ||
|
|
||
| // Use default DLQ limits (no limits) and no max buffered messages | ||
| // These can be made configurable in a future PR if needed | ||
| let dlq_limit = DlqLimit::default(); | ||
| let max_buffered_messages = None; | ||
|
Comment on lines
+203
to
+204
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We need to make these configurable now. It is not a quick operation to expose this in config. We need a PR in streams, release it, upgrade it in the client code and ship the client code. Not something that can be done quickly after an incident. |
||
|
|
||
| Some(DlqPolicy::new( | ||
| handle, | ||
| Box::new(dlq_producer), | ||
| dlq_limit, | ||
| max_buffered_messages, | ||
| )) | ||
| } | ||
| None => { | ||
| tracing::info!("DLQ not configured, invalid messages will cause processing to stop"); | ||
| None | ||
| } | ||
|
victoria-yining-huang marked this conversation as resolved.
|
||
| } | ||
| } | ||
|
|
||
| /// Converts a Message<KafkaPayload> to a Message<RoutedValue>. | ||
| /// | ||
| /// The messages we send around between steps in the pipeline contain | ||
|
|
@@ -429,4 +503,52 @@ mod tests { | |
| let _ = std::fs::remove_file(healthcheck_path); | ||
| }) | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_build_dlq_policy_with_various_configs() { | ||
| // Define test cases: (test_name, dlq_bootstrap_servers, expected_some) | ||
| let test_cases = vec![ | ||
| ("without_dlq_config", None, false), | ||
| ( | ||
| "with_dlq_config_single_broker", | ||
| Some(vec!["localhost:9092".to_string()]), | ||
| true, | ||
| ), | ||
| ( | ||
| "with_dlq_config_multiple_brokers", | ||
| Some(vec![ | ||
| "broker1:9092".to_string(), | ||
| "broker2:9092".to_string(), | ||
| "broker3:9092".to_string(), | ||
| ]), | ||
| true, | ||
| ), | ||
| ]; | ||
|
|
||
| // Create a tokio runtime to get a handle for testing | ||
| let runtime = tokio::runtime::Runtime::new().unwrap(); | ||
| let handle = runtime.handle().clone(); | ||
|
|
||
| for (test_name, dlq_bootstrap_servers, expected_some) in test_cases { | ||
| // Create DLQ config if bootstrap servers are provided | ||
| let dlq_config = dlq_bootstrap_servers.map(|servers| { | ||
| let producer_config = PyKafkaProducerConfig::new(servers, None); | ||
| DlqConfig::new("test-dlq".to_string(), producer_config) | ||
| }); | ||
|
|
||
| // Build DLQ policy and assert | ||
| let dlq_policy: Option<DlqPolicy<KafkaPayload>> = | ||
| build_dlq_policy(&dlq_config, handle.clone()); | ||
| assert_eq!( | ||
| dlq_policy.is_some(), | ||
| expected_some, | ||
| "Test case '{}' failed: expected is_some() to be {}", | ||
| test_name, | ||
| expected_some | ||
| ); | ||
| } | ||
| } | ||
|
|
||
| // Note: Asserting on inside properties of dlq_policy is tested through Python integration tests | ||
| // in tests/test_dlq.py, as the dlq_policy is an external crate and inner properties are private. | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.