diff --git a/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py b/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py index 5e367966..84cf393b 100644 --- a/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py +++ b/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py @@ -67,6 +67,7 @@ from sentry_streams.pipeline.window import MeasurementUnit from sentry_streams.rust_streams import ( ArroyoConsumer, + DlqConfig, InitialOffset, PyKafkaConsumerConfig, PyKafkaProducerConfig, @@ -195,6 +196,29 @@ def build_kafka_producer_config( ) +def build_dlq_config( + dlq_stream_name: str, + step_config: Mapping[str, Any], +) -> DlqConfig: + """ + Build the DLQ configuration from deployment config. + """ + loaded_dlq = step_config["dlq"] + topic = loaded_dlq.get("topic", dlq_stream_name) + bootstrap_servers = loaded_dlq.get("bootstrap_servers") + + if not bootstrap_servers: + raise ValueError("DLQ config requires 'bootstrap_servers' in deployment configuration") + + return DlqConfig( + topic=topic, + producer_config=PyKafkaProducerConfig( + bootstrap_servers=bootstrap_servers, + override_params=loaded_dlq.get("override_params"), + ), + ) + + def finalize_chain( chains: TransformChains, route: Route, @@ -292,6 +316,10 @@ def source(self, step: Source[Any]) -> Route: step.override_config(step_config) step.validate() + dlq_config = None + if step.dlq_stream_name is not None: + dlq_config = build_dlq_config(step.dlq_stream_name, step_config) + assert isinstance(self.__write_healthcheck, bool) self.__consumers[source_name] = ArroyoConsumer( source=source_name, @@ -302,6 +330,7 @@ def source(self, step: Source[Any]) -> Route: schema=schema_name, metric_config=build_py_metrics_config(self.__metrics_config), write_healthcheck=self.__write_healthcheck, + dlq_config=dlq_config, ) return Route(source_name, []) diff --git a/sentry_streams/sentry_streams/config.json b/sentry_streams/sentry_streams/config.json index 7f2d5b1c..78399aef 100644 --- a/sentry_streams/sentry_streams/config.json +++ b/sentry_streams/sentry_streams/config.json @@ -44,7 +44,15 @@ "properties": { "steps_config": { "type": "object", - "title": "steps_config" + "title": "steps_config", + "additionalProperties": { + "type": "object", + "properties": { + "dlq": { + "$ref": "#/definitions/DlqConfig" + } + } + } } } } @@ -144,6 +152,31 @@ "required": [ "dsn" ] + }, + "DlqConfig": { + "type": "object", + "properties": { + "topic": { + "type": "string", + "minLength": 1 + }, + "bootstrap_servers": { + "type": "array", + "items": { + "type": "string" + }, + "minItems": 1 + }, + "override_params": { + "type": "object", + "additionalProperties": { + "type": "string" + } + } + }, + "required": [ + "bootstrap_servers" + ] } } } diff --git a/sentry_streams/sentry_streams/config_types.py b/sentry_streams/sentry_streams/config_types.py index 5152c299..b9326b42 100644 --- a/sentry_streams/sentry_streams/config_types.py +++ b/sentry_streams/sentry_streams/config_types.py @@ -9,6 +9,17 @@ class StepConfig(TypedDict): starts_segment: Optional[bool] +class DlqConfig(TypedDict, total=False): + """ + Dead Letter Queue configuration for a StreamSource. + All fields are optional to allow for default behavior. + """ + + topic: str + bootstrap_servers: Sequence[str] + override_params: Mapping[str, str] + + class KafkaConsumerConfig(TypedDict, StepConfig): bootstrap_servers: Sequence[str] auto_offset_reset: str diff --git a/sentry_streams/sentry_streams/pipeline/pipeline.py b/sentry_streams/sentry_streams/pipeline/pipeline.py index 0903866f..512c2d81 100644 --- a/sentry_streams/sentry_streams/pipeline/pipeline.py +++ b/sentry_streams/sentry_streams/pipeline/pipeline.py @@ -233,6 +233,7 @@ class StreamSource(Source[bytes]): stream_name: str header_filter: Optional[Tuple[str, bytes]] = None consumer_group: Optional[str] = None + dlq_stream_name: Optional[str] = None step_type: StepType = StepType.SOURCE def register(self, ctx: Pipeline[bytes], previous: Step) -> None: diff --git a/sentry_streams/sentry_streams/rust_streams.pyi b/sentry_streams/sentry_streams/rust_streams.pyi index ab619604..dad4d439 100644 --- a/sentry_streams/sentry_streams/rust_streams.pyi +++ b/sentry_streams/sentry_streams/rust_streams.pyi @@ -57,6 +57,17 @@ class PyKafkaProducerConfig: @property def override_params(self) -> Mapping[str, str] | None: ... +class DlqConfig: + def __init__( + self, + topic: str, + producer_config: PyKafkaProducerConfig, + ) -> None: ... + @property + def topic(self) -> str: ... + @property + def producer_config(self) -> PyKafkaProducerConfig: ... + class PyMetricConfig: def __init__( self, @@ -123,10 +134,13 @@ class ArroyoConsumer: schema: str | None, metric_config: PyMetricConfig | None = None, write_healthcheck: bool = False, + dlq_config: DlqConfig | None = None, ) -> None: ... def add_step(self, step: RuntimeOperator) -> None: ... def run(self) -> None: ... def shutdown(self) -> None: ... + @property + def dlq_config(self) -> DlqConfig | None: ... class PyAnyMessage: def __init__( diff --git a/sentry_streams/src/consumer.rs b/sentry_streams/src/consumer.rs index 3b77dee7..b39ea8e3 100644 --- a/sentry_streams/src/consumer.rs +++ b/sentry_streams/src/consumer.rs @@ -6,7 +6,7 @@ //! The pipeline is built by adding RuntimeOperators to the consumer. use crate::commit_policy::WatermarkCommitOffsets; -use crate::kafka_config::PyKafkaConsumerConfig; +use crate::kafka_config::{PyKafkaConsumerConfig, PyKafkaProducerConfig}; use crate::messages::{into_pyraw, PyStreamingMessage, RawMessage, RoutedValuePayload}; use crate::metrics::configure_metrics; use crate::metrics_config::PyMetricConfig; @@ -18,7 +18,9 @@ use crate::utils::traced_with_gil; use crate::watermark::WatermarkEmitter; use pyo3::prelude::*; use rdkafka::message::{Header, Headers, OwnedHeaders}; +use sentry_arroyo::backends::kafka::producer::KafkaProducer; use sentry_arroyo::backends::kafka::types::KafkaPayload; +use sentry_arroyo::processing::dlq::{DlqLimit, DlqPolicy, KafkaDlqProducer}; use sentry_arroyo::processing::strategies::healthcheck::HealthCheck; use sentry_arroyo::processing::strategies::noop::Noop; use sentry_arroyo::processing::strategies::run_task::RunTask; @@ -34,6 +36,31 @@ use std::sync::Arc; /// Matches Arroyo docs for Kubernetes liveness probes. const HEALTHCHECK_PATH: &str = "/tmp/health.txt"; +/// Configuration for Dead Letter Queue (DLQ). +/// When provided, invalid messages will be sent to the DLQ topic. +#[pyclass] +#[derive(Debug, Clone)] +pub struct DlqConfig { + /// The Kafka topic name to send invalid messages to + #[pyo3(get)] + pub topic: String, + + /// The Kafka producer configuration for the DLQ + #[pyo3(get)] + pub producer_config: PyKafkaProducerConfig, +} + +#[pymethods] +impl DlqConfig { + #[new] + fn new(topic: String, producer_config: PyKafkaProducerConfig) -> Self { + DlqConfig { + topic, + producer_config, + } + } +} + /// The class that represent the consumer. /// This class is exposed to python and it is the main entry point /// used by the Python adapter to build a pipeline and run it. @@ -69,12 +96,18 @@ pub struct ArroyoConsumer { /// When true, wrap the strategy chain with HealthCheck to touch a file on poll for liveness. write_healthcheck: bool, + + /// DLQ (Dead Letter Queue) configuration. + /// If provided, invalid messages will be sent to the DLQ topic. + /// Otherwise, invalid messages will cause the consumer to stop processing. + #[pyo3(get)] + dlq_config: Option, } #[pymethods] impl ArroyoConsumer { #[new] - #[pyo3(signature = (source, kafka_config, topic, schema, metric_config=None, write_healthcheck=false))] + #[pyo3(signature = (source, kafka_config, topic, schema, metric_config=None, write_healthcheck=false, dlq_config=None))] fn new( source: String, kafka_config: PyKafkaConsumerConfig, @@ -82,6 +115,7 @@ impl ArroyoConsumer { schema: Option, metric_config: Option, write_healthcheck: bool, + dlq_config: Option, ) -> Self { ArroyoConsumer { consumer_config: kafka_config, @@ -93,6 +127,7 @@ impl ArroyoConsumer { concurrency_config: Arc::new(ConcurrencyConfig::new(1)), metric_config, write_healthcheck, + dlq_config, } } @@ -120,7 +155,12 @@ impl ArroyoConsumer { self.write_healthcheck, ); let config = self.consumer_config.clone().into(); - let processor = StreamProcessor::with_kafka(config, factory, Topic::new(&self.topic), None); + + // Build DLQ policy if configured + let dlq_policy = build_dlq_policy(&self.dlq_config, self.concurrency_config.handle()); + + let processor = + StreamProcessor::with_kafka(config, factory, Topic::new(&self.topic), dlq_policy); self.handle = Some(processor.get_handle()); let mut handle = processor.get_handle(); @@ -143,6 +183,40 @@ impl ArroyoConsumer { } } +/// Builds the DLQ policy if dlq_config is provided. +/// Returns None if DLQ is not configured. +pub fn build_dlq_policy( + dlq_config: &Option, + handle: tokio::runtime::Handle, +) -> Option> { + match dlq_config { + Some(dlq_config) => { + tracing::info!("Configuring DLQ with topic: {}", dlq_config.topic); + + // Create Kafka producer for DLQ + let producer_config = dlq_config.producer_config.clone().into(); + let kafka_producer = KafkaProducer::new(producer_config); + let dlq_producer = KafkaDlqProducer::new(kafka_producer, Topic::new(&dlq_config.topic)); + + // Use default DLQ limits (no limits) and no max buffered messages + // These can be made configurable in a future PR if needed + let dlq_limit = DlqLimit::default(); + let max_buffered_messages = None; + + Some(DlqPolicy::new( + handle, + Box::new(dlq_producer), + dlq_limit, + max_buffered_messages, + )) + } + None => { + tracing::info!("DLQ not configured, invalid messages will cause processing to stop"); + None + } + } +} + /// Converts a Message to a Message. /// /// The messages we send around between steps in the pipeline contain @@ -429,4 +503,52 @@ mod tests { let _ = std::fs::remove_file(healthcheck_path); }) } + + #[test] + fn test_build_dlq_policy_with_various_configs() { + // Define test cases: (test_name, dlq_bootstrap_servers, expected_some) + let test_cases = vec![ + ("without_dlq_config", None, false), + ( + "with_dlq_config_single_broker", + Some(vec!["localhost:9092".to_string()]), + true, + ), + ( + "with_dlq_config_multiple_brokers", + Some(vec![ + "broker1:9092".to_string(), + "broker2:9092".to_string(), + "broker3:9092".to_string(), + ]), + true, + ), + ]; + + // Create a tokio runtime to get a handle for testing + let runtime = tokio::runtime::Runtime::new().unwrap(); + let handle = runtime.handle().clone(); + + for (test_name, dlq_bootstrap_servers, expected_some) in test_cases { + // Create DLQ config if bootstrap servers are provided + let dlq_config = dlq_bootstrap_servers.map(|servers| { + let producer_config = PyKafkaProducerConfig::new(servers, None); + DlqConfig::new("test-dlq".to_string(), producer_config) + }); + + // Build DLQ policy and assert + let dlq_policy: Option> = + build_dlq_policy(&dlq_config, handle.clone()); + assert_eq!( + dlq_policy.is_some(), + expected_some, + "Test case '{}' failed: expected is_some() to be {}", + test_name, + expected_some + ); + } + } + + // Note: Asserting on inside properties of dlq_policy is tested through Python integration tests + // in tests/test_dlq.py, as the dlq_policy is an external crate and inner properties are private. } diff --git a/sentry_streams/src/kafka_config.rs b/sentry_streams/src/kafka_config.rs index c2d81833..7b02a267 100644 --- a/sentry_streams/src/kafka_config.rs +++ b/sentry_streams/src/kafka_config.rs @@ -119,7 +119,7 @@ pub struct PyKafkaProducerConfig { #[pymethods] impl PyKafkaProducerConfig { #[new] - fn new( + pub fn new( bootstrap_servers: Vec, override_params: Option>, ) -> Self { diff --git a/sentry_streams/src/lib.rs b/sentry_streams/src/lib.rs index 6ac9b602..0ce64072 100644 --- a/sentry_streams/src/lib.rs +++ b/sentry_streams/src/lib.rs @@ -43,6 +43,7 @@ fn rust_streams(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; + m.add_class::()?; m.add_class::()?; m.add_class::()?; m.add_class::()?; diff --git a/sentry_streams/tests/test_dlq.py b/sentry_streams/tests/test_dlq.py new file mode 100644 index 00000000..345785cd --- /dev/null +++ b/sentry_streams/tests/test_dlq.py @@ -0,0 +1,151 @@ +from typing import Mapping, Sequence + +import pytest + +from sentry_streams.adapters.arroyo.rust_arroyo import build_dlq_config +from sentry_streams.pipeline.pipeline import StreamSource +from sentry_streams.rust_streams import ( + ArroyoConsumer, + DlqConfig, + InitialOffset, + PyKafkaConsumerConfig, + PyKafkaProducerConfig, +) + + +@pytest.mark.parametrize( + "input_dlqconfig, expected_topic, expected_bootstrap_servers", + [ + pytest.param(None, None, None, id="without_dlq"), + pytest.param( + DlqConfig( + topic="test-dlq", + producer_config=PyKafkaProducerConfig( + bootstrap_servers=["localhost:9092"], + override_params=None, + ), + ), + "test-dlq", + ["localhost:9092"], + id="with_full_config", + ), + ], +) +def test_consumer_creation( + input_dlqconfig: DlqConfig | None, + expected_topic: str | None, + expected_bootstrap_servers: Sequence[str] | None, +) -> None: + kafka_consumer_config = PyKafkaConsumerConfig( + bootstrap_servers=["localhost:9092"], + group_id="test-group", + auto_offset_reset=InitialOffset.latest, + strict_offset_reset=False, + max_poll_interval_ms=60000, + override_params=None, + ) + + consumer = ArroyoConsumer( + source="test_source", + kafka_config=kafka_consumer_config, + topic="test-topic", + schema=None, + metric_config=None, + write_healthcheck=False, + dlq_config=input_dlqconfig, + ) + + if input_dlqconfig is None: + assert consumer.dlq_config is None + else: + assert consumer.dlq_config is not None + assert consumer.dlq_config.topic == expected_topic + assert consumer.dlq_config.producer_config.bootstrap_servers == expected_bootstrap_servers + + +def test_stream_source_no_dlq() -> None: + """Test StreamSource without DLQ.""" + source = StreamSource( + name="test_source", + stream_name="test-topic", + ) + assert source is not None + assert source.dlq_stream_name is None + + +@pytest.mark.parametrize( + "dlq_stream_name, override_dlq, expected_topic, expected_bootstrap_servers, expected_override_params", + [ + pytest.param( + "my-dlq", + { + "bootstrap_servers": ["broker1:9092"], + }, + "my-dlq", + ["broker1:9092"], + None, + id="default_topic_from_stream_name", + ), + pytest.param( + "my-dlq", + { + "topic": "overridden-dlq-topic", + "bootstrap_servers": ["broker1:9092"], + }, + "overridden-dlq-topic", + ["broker1:9092"], + None, + id="topic_override_from_config", + ), + pytest.param( + "my-dlq", + { + "bootstrap_servers": ["broker1:9092", "broker2:9092"], + "override_params": { + "security.protocol": "sasl_plaintext", + "sasl.mechanism": "SCRAM-SHA-256", + "sasl.username": "user", + "sasl.password": "pass", + }, + }, + "my-dlq", + ["broker1:9092", "broker2:9092"], + { + "security.protocol": "sasl_plaintext", + "sasl.mechanism": "SCRAM-SHA-256", + "sasl.username": "user", + "sasl.password": "pass", + }, + id="with_override_params", + ), + ], +) +def test_build_dlq_config( + dlq_stream_name: str, + override_dlq: Mapping[str, object], + expected_topic: str, + expected_bootstrap_servers: Sequence[str], + expected_override_params: Mapping[str, str] | None, +) -> None: + """Test that build_dlq_config constructs DlqConfig from deployment config.""" + result = build_dlq_config(dlq_stream_name, {"dlq": override_dlq}) + + assert result is not None + assert result.topic == expected_topic + assert result.producer_config.bootstrap_servers == expected_bootstrap_servers + assert result.producer_config.override_params == expected_override_params + + +def test_build_dlq_config_missing_bootstrap_servers() -> None: + """Test that build_dlq_config raises ValueError when bootstrap_servers is missing.""" + with pytest.raises( + ValueError, + match="DLQ config requires 'bootstrap_servers' in deployment configuration", + ): + build_dlq_config("my-dlq", {"dlq": {"topic": "my-dlq"}}) + + +def test_build_dlq_config_no_dlq_section() -> None: + """Test that build_dlq_config raises KeyError when no dlq section in config.""" + with pytest.raises(KeyError): + build_dlq_config("my-dlq", {})