From 480e5ce6f04948618377e8296021bf1bb00afdab Mon Sep 17 00:00:00 2001 From: victoria-yining-huang Date: Fri, 20 Mar 2026 15:48:58 -0400 Subject: [PATCH 01/14] add make it part of StreamSource feat(streams): Add Dead Letter Queue (DLQ) support to StreamSource Adds DLQ configuration support throughout the streaming platform: - Add PyDlqConfig type stub and build_dlq_config() helper - Wire DLQ config from StreamSource to Rust ArroyoConsumer - Support DLQ override from YAML deployment configuration - Add comprehensive test coverage for DLQ functionality Co-Authored-By: Claude Sonnet 4.5 remove one redundant test parameterized tests for creating consumer add unit test for consumer.build_dlq_policy make new function pub add parameterized python test for build_dlq_config update tests standalone function remove dlq enabled field allow overriding dlq config field by field comment remove unused pub new finalize tests simpify fix schema fix test rename DlqConfig to DlqSettings remove DlqSettings remove build_dlq_config finalize tests use Mapping and Sequence for type hints add an error test case add json validation --- .../adapters/arroyo/rust_arroyo.py | 7 +- sentry_streams/sentry_streams/config.json | 37 +++ sentry_streams/sentry_streams/config_types.py | 10 + .../sentry_streams/pipeline/pipeline.py | 26 +- .../sentry_streams/rust_streams.pyi | 16 + sentry_streams/src/consumer.rs | 128 +++++++- sentry_streams/src/kafka_config.rs | 17 +- sentry_streams/src/lib.rs | 1 + sentry_streams/tests/test_dlq.py | 298 ++++++++++++++++++ 9 files changed, 530 insertions(+), 10 deletions(-) create mode 100644 sentry_streams/tests/test_dlq.py diff --git a/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py b/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py index 5e367966..283a0125 100644 --- a/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py +++ b/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py @@ -195,11 +195,7 @@ def build_kafka_producer_config( ) -def finalize_chain( - chains: TransformChains, - route: Route, - metrics_config: MetricsConfig, -) -> RuntimeOperator: +def finalize_chain(chains: TransformChains, route: Route) -> RuntimeOperator: rust_route = RustRoute(route.source, route.waypoints) config, func = chains.finalize(route) if config: @@ -302,6 +298,7 @@ def source(self, step: Source[Any]) -> Route: schema=schema_name, metric_config=build_py_metrics_config(self.__metrics_config), write_healthcheck=self.__write_healthcheck, + dlq_config=step.dlq_config, ) return Route(source_name, []) diff --git a/sentry_streams/sentry_streams/config.json b/sentry_streams/sentry_streams/config.json index 7f2d5b1c..c050d335 100644 --- a/sentry_streams/sentry_streams/config.json +++ b/sentry_streams/sentry_streams/config.json @@ -144,6 +144,43 @@ "required": [ "dsn" ] + }, + "DlqProducerConfig": { + "type": "object", + "properties": { + "bootstrap_servers": { + "type": "array", + "items": { + "type": "string" + }, + "minItems": 1 + }, + "override_params": { + "type": "object", + "additionalProperties": { + "type": "string" + } + } + }, + "required": [ + "bootstrap_servers" + ] + }, + "DlqConfig": { + "type": "object", + "properties": { + "topic": { + "type": "string", + "minLength": 1 + }, + "producer_config": { + "$ref": "#/definitions/DlqProducerConfig" + } + }, + "required": [ + "topic", + "producer_config" + ] } } } diff --git a/sentry_streams/sentry_streams/config_types.py b/sentry_streams/sentry_streams/config_types.py index 5152c299..aa59ce50 100644 --- a/sentry_streams/sentry_streams/config_types.py +++ b/sentry_streams/sentry_streams/config_types.py @@ -9,6 +9,16 @@ class StepConfig(TypedDict): starts_segment: Optional[bool] +class DlqConfig(TypedDict, total=False): + """ + Dead Letter Queue configuration for a StreamSource. + All fields are optional to allow for default behavior. + """ + + topic: str + producer_config: "KafkaProducerConfig" + + class KafkaConsumerConfig(TypedDict, StepConfig): bootstrap_servers: Sequence[str] auto_offset_reset: str diff --git a/sentry_streams/sentry_streams/pipeline/pipeline.py b/sentry_streams/sentry_streams/pipeline/pipeline.py index 0903866f..a60d518a 100644 --- a/sentry_streams/sentry_streams/pipeline/pipeline.py +++ b/sentry_streams/sentry_streams/pipeline/pipeline.py @@ -44,6 +44,7 @@ ) from sentry_streams.pipeline.rust_function_protocol import InternalRustFunction from sentry_streams.pipeline.window import MeasurementUnit, TumblingWindow, Window +from sentry_streams.rust_streams import DlqConfig, PyKafkaProducerConfig RoutingFuncReturnType = TypeVar("RoutingFuncReturnType") TransformFuncReturnType = TypeVar("TransformFuncReturnType") @@ -233,17 +234,40 @@ class StreamSource(Source[bytes]): stream_name: str header_filter: Optional[Tuple[str, bytes]] = None consumer_group: Optional[str] = None + dlq_config: Optional[DlqConfig] = None step_type: StepType = StepType.SOURCE def register(self, ctx: Pipeline[bytes], previous: Step) -> None: super().register(ctx, previous) def override_config(self, loaded_config: Mapping[str, Any]) -> None: - """Override topic and consumer_group from deployment configuration.""" + """Override topic, consumer_group, and dlq_config from deployment configuration.""" if loaded_config.get("topic"): self.stream_name = str(loaded_config.get("topic")) if loaded_config.get("consumer_group"): self.consumer_group = str(loaded_config.get("consumer_group")) + if loaded_config.get("dlq"): + loaded_dlq = loaded_config["dlq"] + topic = loaded_dlq.get("topic") or (self.dlq_config.topic if self.dlq_config else None) + # bootstrap_servers and override_params are nested under producer_config + producer_config = loaded_dlq.get("producer_config", {}) + servers = producer_config.get("bootstrap_servers") or ( + self.dlq_config.producer_config.bootstrap_servers if self.dlq_config else None + ) + override_params = producer_config.get("override_params") or ( + self.dlq_config.producer_config.override_params if self.dlq_config else None + ) + + if not topic or not servers: + raise ValueError("DLQ config requires both 'topic' and 'bootstrap_servers'") + + self.dlq_config = DlqConfig( + topic=topic, + producer_config=PyKafkaProducerConfig( + bootstrap_servers=servers, + override_params=override_params, + ), + ) @dataclass diff --git a/sentry_streams/sentry_streams/rust_streams.pyi b/sentry_streams/sentry_streams/rust_streams.pyi index ab619604..6fd7cfdb 100644 --- a/sentry_streams/sentry_streams/rust_streams.pyi +++ b/sentry_streams/sentry_streams/rust_streams.pyi @@ -47,11 +47,24 @@ class PyKafkaConsumerConfig: def override_params(self) -> Mapping[str, str] | None: ... class PyKafkaProducerConfig: + bootstrap_servers: Sequence[str] + override_params: Mapping[str, str] | None + def __init__( self, bootstrap_servers: Sequence[str], override_params: Mapping[str, str] | None = None, ) -> None: ... + +class DlqConfig: + topic: str + producer_config: PyKafkaProducerConfig + + def __init__( + self, + topic: str, + producer_config: PyKafkaProducerConfig, + ) -> None: ... @property def bootstrap_servers(self) -> Sequence[str]: ... @property @@ -115,6 +128,8 @@ class RuntimeOperator: def PythonAdapter(cls, route: Route, delegate_Factory: RustOperatorFactory) -> Self: ... class ArroyoConsumer: + dlq_config: DlqConfig | None + def __init__( self, source: str, @@ -123,6 +138,7 @@ class ArroyoConsumer: schema: str | None, metric_config: PyMetricConfig | None = None, write_healthcheck: bool = False, + dlq_config: DlqConfig | None = None, ) -> None: ... def add_step(self, step: RuntimeOperator) -> None: ... def run(self) -> None: ... diff --git a/sentry_streams/src/consumer.rs b/sentry_streams/src/consumer.rs index 3b77dee7..b39ea8e3 100644 --- a/sentry_streams/src/consumer.rs +++ b/sentry_streams/src/consumer.rs @@ -6,7 +6,7 @@ //! The pipeline is built by adding RuntimeOperators to the consumer. use crate::commit_policy::WatermarkCommitOffsets; -use crate::kafka_config::PyKafkaConsumerConfig; +use crate::kafka_config::{PyKafkaConsumerConfig, PyKafkaProducerConfig}; use crate::messages::{into_pyraw, PyStreamingMessage, RawMessage, RoutedValuePayload}; use crate::metrics::configure_metrics; use crate::metrics_config::PyMetricConfig; @@ -18,7 +18,9 @@ use crate::utils::traced_with_gil; use crate::watermark::WatermarkEmitter; use pyo3::prelude::*; use rdkafka::message::{Header, Headers, OwnedHeaders}; +use sentry_arroyo::backends::kafka::producer::KafkaProducer; use sentry_arroyo::backends::kafka::types::KafkaPayload; +use sentry_arroyo::processing::dlq::{DlqLimit, DlqPolicy, KafkaDlqProducer}; use sentry_arroyo::processing::strategies::healthcheck::HealthCheck; use sentry_arroyo::processing::strategies::noop::Noop; use sentry_arroyo::processing::strategies::run_task::RunTask; @@ -34,6 +36,31 @@ use std::sync::Arc; /// Matches Arroyo docs for Kubernetes liveness probes. const HEALTHCHECK_PATH: &str = "/tmp/health.txt"; +/// Configuration for Dead Letter Queue (DLQ). +/// When provided, invalid messages will be sent to the DLQ topic. +#[pyclass] +#[derive(Debug, Clone)] +pub struct DlqConfig { + /// The Kafka topic name to send invalid messages to + #[pyo3(get)] + pub topic: String, + + /// The Kafka producer configuration for the DLQ + #[pyo3(get)] + pub producer_config: PyKafkaProducerConfig, +} + +#[pymethods] +impl DlqConfig { + #[new] + fn new(topic: String, producer_config: PyKafkaProducerConfig) -> Self { + DlqConfig { + topic, + producer_config, + } + } +} + /// The class that represent the consumer. /// This class is exposed to python and it is the main entry point /// used by the Python adapter to build a pipeline and run it. @@ -69,12 +96,18 @@ pub struct ArroyoConsumer { /// When true, wrap the strategy chain with HealthCheck to touch a file on poll for liveness. write_healthcheck: bool, + + /// DLQ (Dead Letter Queue) configuration. + /// If provided, invalid messages will be sent to the DLQ topic. + /// Otherwise, invalid messages will cause the consumer to stop processing. + #[pyo3(get)] + dlq_config: Option, } #[pymethods] impl ArroyoConsumer { #[new] - #[pyo3(signature = (source, kafka_config, topic, schema, metric_config=None, write_healthcheck=false))] + #[pyo3(signature = (source, kafka_config, topic, schema, metric_config=None, write_healthcheck=false, dlq_config=None))] fn new( source: String, kafka_config: PyKafkaConsumerConfig, @@ -82,6 +115,7 @@ impl ArroyoConsumer { schema: Option, metric_config: Option, write_healthcheck: bool, + dlq_config: Option, ) -> Self { ArroyoConsumer { consumer_config: kafka_config, @@ -93,6 +127,7 @@ impl ArroyoConsumer { concurrency_config: Arc::new(ConcurrencyConfig::new(1)), metric_config, write_healthcheck, + dlq_config, } } @@ -120,7 +155,12 @@ impl ArroyoConsumer { self.write_healthcheck, ); let config = self.consumer_config.clone().into(); - let processor = StreamProcessor::with_kafka(config, factory, Topic::new(&self.topic), None); + + // Build DLQ policy if configured + let dlq_policy = build_dlq_policy(&self.dlq_config, self.concurrency_config.handle()); + + let processor = + StreamProcessor::with_kafka(config, factory, Topic::new(&self.topic), dlq_policy); self.handle = Some(processor.get_handle()); let mut handle = processor.get_handle(); @@ -143,6 +183,40 @@ impl ArroyoConsumer { } } +/// Builds the DLQ policy if dlq_config is provided. +/// Returns None if DLQ is not configured. +pub fn build_dlq_policy( + dlq_config: &Option, + handle: tokio::runtime::Handle, +) -> Option> { + match dlq_config { + Some(dlq_config) => { + tracing::info!("Configuring DLQ with topic: {}", dlq_config.topic); + + // Create Kafka producer for DLQ + let producer_config = dlq_config.producer_config.clone().into(); + let kafka_producer = KafkaProducer::new(producer_config); + let dlq_producer = KafkaDlqProducer::new(kafka_producer, Topic::new(&dlq_config.topic)); + + // Use default DLQ limits (no limits) and no max buffered messages + // These can be made configurable in a future PR if needed + let dlq_limit = DlqLimit::default(); + let max_buffered_messages = None; + + Some(DlqPolicy::new( + handle, + Box::new(dlq_producer), + dlq_limit, + max_buffered_messages, + )) + } + None => { + tracing::info!("DLQ not configured, invalid messages will cause processing to stop"); + None + } + } +} + /// Converts a Message to a Message. /// /// The messages we send around between steps in the pipeline contain @@ -429,4 +503,52 @@ mod tests { let _ = std::fs::remove_file(healthcheck_path); }) } + + #[test] + fn test_build_dlq_policy_with_various_configs() { + // Define test cases: (test_name, dlq_bootstrap_servers, expected_some) + let test_cases = vec![ + ("without_dlq_config", None, false), + ( + "with_dlq_config_single_broker", + Some(vec!["localhost:9092".to_string()]), + true, + ), + ( + "with_dlq_config_multiple_brokers", + Some(vec![ + "broker1:9092".to_string(), + "broker2:9092".to_string(), + "broker3:9092".to_string(), + ]), + true, + ), + ]; + + // Create a tokio runtime to get a handle for testing + let runtime = tokio::runtime::Runtime::new().unwrap(); + let handle = runtime.handle().clone(); + + for (test_name, dlq_bootstrap_servers, expected_some) in test_cases { + // Create DLQ config if bootstrap servers are provided + let dlq_config = dlq_bootstrap_servers.map(|servers| { + let producer_config = PyKafkaProducerConfig::new(servers, None); + DlqConfig::new("test-dlq".to_string(), producer_config) + }); + + // Build DLQ policy and assert + let dlq_policy: Option> = + build_dlq_policy(&dlq_config, handle.clone()); + assert_eq!( + dlq_policy.is_some(), + expected_some, + "Test case '{}' failed: expected is_some() to be {}", + test_name, + expected_some + ); + } + } + + // Note: Asserting on inside properties of dlq_policy is tested through Python integration tests + // in tests/test_dlq.py, as the dlq_policy is an external crate and inner properties are private. } diff --git a/sentry_streams/src/kafka_config.rs b/sentry_streams/src/kafka_config.rs index c2d81833..2c4a183b 100644 --- a/sentry_streams/src/kafka_config.rs +++ b/sentry_streams/src/kafka_config.rs @@ -110,16 +110,31 @@ impl From for KafkaConfig { #[pyclass(from_py_object)] #[derive(Debug, Clone)] pub struct PyKafkaProducerConfig { +<<<<<<< HEAD +<<<<<<< HEAD +<<<<<<< HEAD #[pyo3(get)] bootstrap_servers: Vec, #[pyo3(get)] override_params: Option>, +======= + pub bootstrap_servers: Vec, + pub override_params: Option>, +>>>>>>> 5e8c045 (add unit test for consumer.build_dlq_policy) +======= +======= + #[pyo3(get)] +>>>>>>> e027560 (add parameterized python test for build_dlq_config) + bootstrap_servers: Vec, + #[pyo3(get)] + override_params: Option>, +>>>>>>> 0023e30 (make new function pub) } #[pymethods] impl PyKafkaProducerConfig { #[new] - fn new( + pub fn new( bootstrap_servers: Vec, override_params: Option>, ) -> Self { diff --git a/sentry_streams/src/lib.rs b/sentry_streams/src/lib.rs index 6ac9b602..0ce64072 100644 --- a/sentry_streams/src/lib.rs +++ b/sentry_streams/src/lib.rs @@ -43,6 +43,7 @@ fn rust_streams(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; + m.add_class::()?; m.add_class::()?; m.add_class::()?; m.add_class::()?; diff --git a/sentry_streams/tests/test_dlq.py b/sentry_streams/tests/test_dlq.py new file mode 100644 index 00000000..889f33ac --- /dev/null +++ b/sentry_streams/tests/test_dlq.py @@ -0,0 +1,298 @@ +from typing import Mapping, Sequence + +import pytest + +from sentry_streams.pipeline.pipeline import StreamSource +from sentry_streams.rust_streams import ( + ArroyoConsumer, + DlqConfig, + InitialOffset, + PyKafkaConsumerConfig, + PyKafkaProducerConfig, +) + + +@pytest.mark.parametrize( + "input_dlqconfig, expected_topic, expected_bootstrap_servers", + [ + pytest.param(None, None, None, id="without_dlq"), + pytest.param( + DlqConfig( + topic="test-dlq", + producer_config=PyKafkaProducerConfig( + bootstrap_servers=["localhost:9092"], + override_params=None, + ), + ), + "test-dlq", + ["localhost:9092"], + id="with_full_config", + ), + ], +) +def test_consumer_creation( + input_dlqconfig: DlqConfig | None, + expected_topic: str | None, + expected_bootstrap_servers: Sequence[str] | None, +) -> None: + """Test that Rust ArroyoConsumer correctly accepts and stores DLQ configuration. + + This parameterized test verifies: + 1. Backward compatibility when DLQ param is omitted + 2. Full DLQ configuration is accepted and stored correctly + """ + kafka_consumer_config = PyKafkaConsumerConfig( + bootstrap_servers=["localhost:9092"], + group_id="test-group", + auto_offset_reset=InitialOffset.latest, + strict_offset_reset=False, + max_poll_interval_ms=60000, + override_params=None, + ) + + consumer = ArroyoConsumer( + source="test_source", + kafka_config=kafka_consumer_config, + topic="test-topic", + schema=None, + metric_config=None, + write_healthcheck=False, + dlq_config=input_dlqconfig, + ) + + if input_dlqconfig is None: + assert consumer.dlq_config is None + else: + assert consumer.dlq_config is not None + assert consumer.dlq_config.topic == expected_topic + assert consumer.dlq_config.producer_config.bootstrap_servers == expected_bootstrap_servers + + +@pytest.mark.parametrize( + "dlq_config, expected_topic, expected_bootstrap_servers", + [ + pytest.param(None, None, None, id="no_dlq_config"), + pytest.param( + DlqConfig( + topic="test-dlq", + producer_config=PyKafkaProducerConfig( + bootstrap_servers=["localhost:9092"], + override_params=None, + ), + ), + "test-dlq", + ["localhost:9092"], + id="single_bootstrap_server", + ), + pytest.param( + DlqConfig( + topic="my-dlq", + producer_config=PyKafkaProducerConfig( + bootstrap_servers=["broker1:9092", "broker2:9092", "broker3:9092"], + override_params=None, + ), + ), + "my-dlq", + ["broker1:9092", "broker2:9092", "broker3:9092"], + id="multiple_bootstrap_servers", + ), + ], +) +def test_stream_source_dlq_config( + dlq_config: DlqConfig | None, + expected_topic: str | None, + expected_bootstrap_servers: Sequence[str] | None, +) -> None: + """Test StreamSource correctly stores DlqConfig.""" + source = StreamSource( + name="test_source", + stream_name="test-topic", + dlq_config=dlq_config, + ) + + if expected_topic is None: + assert source.dlq_config is None + else: + assert source.dlq_config is not None + assert isinstance(source.dlq_config, DlqConfig) + assert source.dlq_config.topic == expected_topic + assert source.dlq_config.producer_config is not None + assert source.dlq_config.producer_config.bootstrap_servers == expected_bootstrap_servers + + +@pytest.mark.parametrize( + "initial_dlq_config, override_dlq, expected_topic, expected_bootstrap_servers, expected_override_params", + [ + pytest.param( + DlqConfig( + topic="new-dlq", + producer_config=PyKafkaProducerConfig( + bootstrap_servers=["broker1:9092"], + override_params={ + "security.protocol": "sasl_plaintext", + "sasl.mechanism": "SCRAM-SHA-256", + "sasl.username": "user", + "sasl.password": "pass", + }, + ), + ), + None, + "new-dlq", + ["broker1:9092"], + { + "security.protocol": "sasl_plaintext", + "sasl.mechanism": "SCRAM-SHA-256", + "sasl.username": "user", + "sasl.password": "pass", + }, + id="no_config_override", + ), + pytest.param( + None, + { + "topic": "new-dlq", + "producer_config": { + "bootstrap_servers": ["broker1:9092"], + "override_params": { + "security.protocol": "sasl_plaintext", + "sasl.mechanism": "SCRAM-SHA-256", + "sasl.username": "user", + "sasl.password": "pass", + }, + }, + }, + "new-dlq", + ["broker1:9092"], + { + "security.protocol": "sasl_plaintext", + "sasl.mechanism": "SCRAM-SHA-256", + "sasl.username": "user", + "sasl.password": "pass", + }, + id="config_override_only", + ), + pytest.param( + DlqConfig( + topic="old-dlq", + producer_config=PyKafkaProducerConfig( + bootstrap_servers=["old-broker:9092"], + override_params=None, + ), + ), + {"topic": "new-dlq"}, + "new-dlq", + ["old-broker:9092"], + None, + id="override_topic_only", + ), + pytest.param( + DlqConfig( + topic="old-dlq", + producer_config=PyKafkaProducerConfig( + bootstrap_servers=["old-broker:9092"], + override_params=None, + ), + ), + {"producer_config": {"bootstrap_servers": ["new-broker:9092", "new-broker2:9092"]}}, + "old-dlq", + ["new-broker:9092", "new-broker2:9092"], + None, + id="override_bootstrap_servers_only", + ), + pytest.param( + DlqConfig( + topic="old-dlq", + producer_config=PyKafkaProducerConfig( + bootstrap_servers=["old-broker:9092"], + override_params=None, + ), + ), + {"topic": "new-dlq", "producer_config": {"bootstrap_servers": ["new-broker:9092"]}}, + "new-dlq", + ["new-broker:9092"], + None, + id="override_topic_and_bootstrap_servers", + ), + pytest.param( + DlqConfig( + topic="dlq", + producer_config=PyKafkaProducerConfig( + bootstrap_servers=["broker:9092"], + override_params=None, + ), + ), + { + "producer_config": { + "override_params": { + "security.protocol": "sasl_plaintext", + "sasl.mechanism": "SCRAM-SHA-256", + "sasl.username": "user", + "sasl.password": "pass", + } + } + }, + "dlq", + ["broker:9092"], + { + "security.protocol": "sasl_plaintext", + "sasl.mechanism": "SCRAM-SHA-256", + "sasl.username": "user", + "sasl.password": "pass", + }, + id="override_override_params_only", + ), + ], +) +def test_stream_source_yaml_override_config_dlq( + initial_dlq_config: DlqConfig | None, + override_dlq: Mapping[str, str | Sequence[str]] | None, + expected_topic: str, + expected_bootstrap_servers: Sequence[str], + expected_override_params: Mapping[str, str], +) -> None: + """Test that StreamSource.override_config correctly overrides DLQ settings.""" + source = StreamSource( + name="my_source", + stream_name="my-topic", + dlq_config=initial_dlq_config, + ) + + if override_dlq is not None: + source.override_config({"dlq": override_dlq}) + + assert source.dlq_config is not None + assert source.dlq_config.topic == expected_topic + assert source.dlq_config.producer_config.bootstrap_servers == expected_bootstrap_servers + assert source.dlq_config.producer_config.override_params == expected_override_params + + +@pytest.mark.parametrize( + "initial_dlq_config, override_dlq", + [ + pytest.param( + None, + {"producer_config": {"bootstrap_servers": ["broker:9092"]}}, + id="no_topic_with_bootstrap_servers", + ), + pytest.param( + None, + {"topic": "my-dlq"}, + id="has_topic_no_bootstrap_servers", + ), + ], +) +def test_stream_source_override_config_dlq_missing_required_fields( + initial_dlq_config: DlqConfig | None, + override_dlq: Mapping[str, str | Sequence[str]], +) -> None: + """Test that StreamSource.override_config raises ValueError when required fields are missing.""" + source = StreamSource( + name="my_source", + stream_name="my-topic", + dlq_config=initial_dlq_config, + ) + + with pytest.raises( + ValueError, match="DLQ config requires both 'topic' and 'bootstrap_servers'" + ): + source.override_config({"dlq": override_dlq}) From 366c86a0e380ffaae0406e2130c08d703ae02a6a Mon Sep 17 00:00:00 2001 From: victoria-yining-huang Date: Sat, 18 Apr 2026 17:36:29 -0400 Subject: [PATCH 02/14] revert mistake --- .../sentry_streams/adapters/arroyo/rust_arroyo.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py b/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py index 283a0125..90fbedf1 100644 --- a/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py +++ b/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py @@ -195,7 +195,11 @@ def build_kafka_producer_config( ) -def finalize_chain(chains: TransformChains, route: Route) -> RuntimeOperator: +def finalize_chain( + chains: TransformChains, + route: Route, + metrics_config: MetricsConfig, +) -> RuntimeOperator: rust_route = RustRoute(route.source, route.waypoints) config, func = chains.finalize(route) if config: From 92638845a6143bb8a0b83c0654a0f14cdab59366 Mon Sep 17 00:00:00 2001 From: victoria-yining-huang Date: Mon, 20 Apr 2026 11:44:15 -0400 Subject: [PATCH 03/14] refactor streamSource to be the same pattern as streamSink --- .../sentry_streams/pipeline/pipeline.py | 24 +- sentry_streams/src/kafka_config.rs | 17 +- sentry_streams/tests/test_dlq.py | 223 +++++------------- 3 files changed, 70 insertions(+), 194 deletions(-) diff --git a/sentry_streams/sentry_streams/pipeline/pipeline.py b/sentry_streams/sentry_streams/pipeline/pipeline.py index a60d518a..da522c13 100644 --- a/sentry_streams/sentry_streams/pipeline/pipeline.py +++ b/sentry_streams/sentry_streams/pipeline/pipeline.py @@ -2,7 +2,7 @@ from abc import ABC, abstractmethod from collections import defaultdict -from dataclasses import dataclass +from dataclasses import dataclass, field from datetime import timedelta from enum import Enum from functools import partial @@ -234,8 +234,9 @@ class StreamSource(Source[bytes]): stream_name: str header_filter: Optional[Tuple[str, bytes]] = None consumer_group: Optional[str] = None - dlq_config: Optional[DlqConfig] = None + dlq_stream_name: Optional[str] = None step_type: StepType = StepType.SOURCE + dlq_config: Optional[DlqConfig] = field(default=None, init=False, repr=False) def register(self, ctx: Pipeline[bytes], previous: Step) -> None: super().register(ctx, previous) @@ -246,20 +247,17 @@ def override_config(self, loaded_config: Mapping[str, Any]) -> None: self.stream_name = str(loaded_config.get("topic")) if loaded_config.get("consumer_group"): self.consumer_group = str(loaded_config.get("consumer_group")) - if loaded_config.get("dlq"): + if self.dlq_stream_name is not None and loaded_config.get("dlq"): loaded_dlq = loaded_config["dlq"] - topic = loaded_dlq.get("topic") or (self.dlq_config.topic if self.dlq_config else None) - # bootstrap_servers and override_params are nested under producer_config + topic = loaded_dlq.get("topic", self.dlq_stream_name) producer_config = loaded_dlq.get("producer_config", {}) - servers = producer_config.get("bootstrap_servers") or ( - self.dlq_config.producer_config.bootstrap_servers if self.dlq_config else None - ) - override_params = producer_config.get("override_params") or ( - self.dlq_config.producer_config.override_params if self.dlq_config else None - ) + servers = producer_config.get("bootstrap_servers") + override_params = producer_config.get("override_params") - if not topic or not servers: - raise ValueError("DLQ config requires both 'topic' and 'bootstrap_servers'") + if not servers: + raise ValueError( + "DLQ config requires 'bootstrap_servers' in deployment configuration" + ) self.dlq_config = DlqConfig( topic=topic, diff --git a/sentry_streams/src/kafka_config.rs b/sentry_streams/src/kafka_config.rs index 2c4a183b..d2dc25e2 100644 --- a/sentry_streams/src/kafka_config.rs +++ b/sentry_streams/src/kafka_config.rs @@ -110,25 +110,10 @@ impl From for KafkaConfig { #[pyclass(from_py_object)] #[derive(Debug, Clone)] pub struct PyKafkaProducerConfig { -<<<<<<< HEAD -<<<<<<< HEAD -<<<<<<< HEAD #[pyo3(get)] - bootstrap_servers: Vec, - #[pyo3(get)] - override_params: Option>, -======= pub bootstrap_servers: Vec, - pub override_params: Option>, ->>>>>>> 5e8c045 (add unit test for consumer.build_dlq_policy) -======= -======= #[pyo3(get)] ->>>>>>> e027560 (add parameterized python test for build_dlq_config) - bootstrap_servers: Vec, - #[pyo3(get)] - override_params: Option>, ->>>>>>> 0023e30 (make new function pub) + pub override_params: Option>, } #[pymethods] diff --git a/sentry_streams/tests/test_dlq.py b/sentry_streams/tests/test_dlq.py index 889f33ac..527ba3bf 100644 --- a/sentry_streams/tests/test_dlq.py +++ b/sentry_streams/tests/test_dlq.py @@ -68,197 +68,95 @@ def test_consumer_creation( assert consumer.dlq_config.producer_config.bootstrap_servers == expected_bootstrap_servers -@pytest.mark.parametrize( - "dlq_config, expected_topic, expected_bootstrap_servers", - [ - pytest.param(None, None, None, id="no_dlq_config"), - pytest.param( - DlqConfig( - topic="test-dlq", - producer_config=PyKafkaProducerConfig( - bootstrap_servers=["localhost:9092"], - override_params=None, - ), - ), - "test-dlq", - ["localhost:9092"], - id="single_bootstrap_server", - ), - pytest.param( - DlqConfig( - topic="my-dlq", - producer_config=PyKafkaProducerConfig( - bootstrap_servers=["broker1:9092", "broker2:9092", "broker3:9092"], - override_params=None, - ), - ), - "my-dlq", - ["broker1:9092", "broker2:9092", "broker3:9092"], - id="multiple_bootstrap_servers", - ), - ], -) -def test_stream_source_dlq_config( - dlq_config: DlqConfig | None, - expected_topic: str | None, - expected_bootstrap_servers: Sequence[str] | None, -) -> None: - """Test StreamSource correctly stores DlqConfig.""" +def test_stream_source_dlq_stream_name() -> None: + """Test StreamSource stores dlq_stream_name and dlq_config is not set until override_config.""" source = StreamSource( name="test_source", stream_name="test-topic", - dlq_config=dlq_config, + dlq_stream_name="my-dlq", ) + assert source.dlq_stream_name == "my-dlq" + assert source.dlq_config is None - if expected_topic is None: - assert source.dlq_config is None - else: - assert source.dlq_config is not None - assert isinstance(source.dlq_config, DlqConfig) - assert source.dlq_config.topic == expected_topic - assert source.dlq_config.producer_config is not None - assert source.dlq_config.producer_config.bootstrap_servers == expected_bootstrap_servers + +def test_stream_source_no_dlq() -> None: + """Test StreamSource without DLQ.""" + source = StreamSource( + name="test_source", + stream_name="test-topic", + ) + assert source.dlq_stream_name is None + assert source.dlq_config is None @pytest.mark.parametrize( - "initial_dlq_config, override_dlq, expected_topic, expected_bootstrap_servers, expected_override_params", + "dlq_stream_name, override_dlq, expected_topic, expected_bootstrap_servers, expected_override_params", [ pytest.param( - DlqConfig( - topic="new-dlq", - producer_config=PyKafkaProducerConfig( - bootstrap_servers=["broker1:9092"], - override_params={ - "security.protocol": "sasl_plaintext", - "sasl.mechanism": "SCRAM-SHA-256", - "sasl.username": "user", - "sasl.password": "pass", - }, - ), - ), - None, - "new-dlq", - ["broker1:9092"], + "my-dlq", { - "security.protocol": "sasl_plaintext", - "sasl.mechanism": "SCRAM-SHA-256", - "sasl.username": "user", - "sasl.password": "pass", + "producer_config": { + "bootstrap_servers": ["broker1:9092"], + }, }, - id="no_config_override", + "my-dlq", + ["broker1:9092"], + None, + id="default_topic_from_stream_name", ), pytest.param( - None, + "my-dlq", { - "topic": "new-dlq", + "topic": "overridden-dlq-topic", "producer_config": { "bootstrap_servers": ["broker1:9092"], - "override_params": { - "security.protocol": "sasl_plaintext", - "sasl.mechanism": "SCRAM-SHA-256", - "sasl.username": "user", - "sasl.password": "pass", - }, }, }, - "new-dlq", + "overridden-dlq-topic", ["broker1:9092"], - { - "security.protocol": "sasl_plaintext", - "sasl.mechanism": "SCRAM-SHA-256", - "sasl.username": "user", - "sasl.password": "pass", - }, - id="config_override_only", - ), - pytest.param( - DlqConfig( - topic="old-dlq", - producer_config=PyKafkaProducerConfig( - bootstrap_servers=["old-broker:9092"], - override_params=None, - ), - ), - {"topic": "new-dlq"}, - "new-dlq", - ["old-broker:9092"], None, - id="override_topic_only", + id="topic_override_from_config", ), pytest.param( - DlqConfig( - topic="old-dlq", - producer_config=PyKafkaProducerConfig( - bootstrap_servers=["old-broker:9092"], - override_params=None, - ), - ), - {"producer_config": {"bootstrap_servers": ["new-broker:9092", "new-broker2:9092"]}}, - "old-dlq", - ["new-broker:9092", "new-broker2:9092"], - None, - id="override_bootstrap_servers_only", - ), - pytest.param( - DlqConfig( - topic="old-dlq", - producer_config=PyKafkaProducerConfig( - bootstrap_servers=["old-broker:9092"], - override_params=None, - ), - ), - {"topic": "new-dlq", "producer_config": {"bootstrap_servers": ["new-broker:9092"]}}, - "new-dlq", - ["new-broker:9092"], - None, - id="override_topic_and_bootstrap_servers", - ), - pytest.param( - DlqConfig( - topic="dlq", - producer_config=PyKafkaProducerConfig( - bootstrap_servers=["broker:9092"], - override_params=None, - ), - ), + "my-dlq", { "producer_config": { + "bootstrap_servers": ["broker1:9092", "broker2:9092"], "override_params": { "security.protocol": "sasl_plaintext", "sasl.mechanism": "SCRAM-SHA-256", "sasl.username": "user", "sasl.password": "pass", - } - } + }, + }, }, - "dlq", - ["broker:9092"], + "my-dlq", + ["broker1:9092", "broker2:9092"], { "security.protocol": "sasl_plaintext", "sasl.mechanism": "SCRAM-SHA-256", "sasl.username": "user", "sasl.password": "pass", }, - id="override_override_params_only", + id="with_override_params", ), ], ) def test_stream_source_yaml_override_config_dlq( - initial_dlq_config: DlqConfig | None, - override_dlq: Mapping[str, str | Sequence[str]] | None, + dlq_stream_name: str, + override_dlq: Mapping[str, object], expected_topic: str, expected_bootstrap_servers: Sequence[str], - expected_override_params: Mapping[str, str], + expected_override_params: Mapping[str, str] | None, ) -> None: - """Test that StreamSource.override_config correctly overrides DLQ settings.""" + """Test that StreamSource.override_config builds DLQ config from deployment config.""" source = StreamSource( name="my_source", stream_name="my-topic", - dlq_config=initial_dlq_config, + dlq_stream_name=dlq_stream_name, ) - if override_dlq is not None: - source.override_config({"dlq": override_dlq}) + source.override_config({"dlq": override_dlq}) assert source.dlq_config is not None assert source.dlq_config.topic == expected_topic @@ -266,33 +164,28 @@ def test_stream_source_yaml_override_config_dlq( assert source.dlq_config.producer_config.override_params == expected_override_params -@pytest.mark.parametrize( - "initial_dlq_config, override_dlq", - [ - pytest.param( - None, - {"producer_config": {"bootstrap_servers": ["broker:9092"]}}, - id="no_topic_with_bootstrap_servers", - ), - pytest.param( - None, - {"topic": "my-dlq"}, - id="has_topic_no_bootstrap_servers", - ), - ], -) -def test_stream_source_override_config_dlq_missing_required_fields( - initial_dlq_config: DlqConfig | None, - override_dlq: Mapping[str, str | Sequence[str]], -) -> None: - """Test that StreamSource.override_config raises ValueError when required fields are missing.""" +def test_stream_source_override_config_dlq_missing_bootstrap_servers() -> None: + """Test that override_config raises ValueError when bootstrap_servers is missing.""" source = StreamSource( name="my_source", stream_name="my-topic", - dlq_config=initial_dlq_config, + dlq_stream_name="my-dlq", ) with pytest.raises( - ValueError, match="DLQ config requires both 'topic' and 'bootstrap_servers'" + ValueError, + match="DLQ config requires 'bootstrap_servers' in deployment configuration", ): - source.override_config({"dlq": override_dlq}) + source.override_config({"dlq": {"topic": "my-dlq"}}) + + +def test_stream_source_override_config_no_dlq_stream_name_ignores_config() -> None: + """Test that override_config ignores dlq config when dlq_stream_name is not set.""" + source = StreamSource( + name="my_source", + stream_name="my-topic", + ) + + source.override_config({"dlq": {"producer_config": {"bootstrap_servers": ["broker:9092"]}}}) + + assert source.dlq_config is None From f7efc0a9993fbc5cc4d827d1c424218fa5a36898 Mon Sep 17 00:00:00 2001 From: victoria-yining-huang Date: Mon, 20 Apr 2026 15:10:35 -0400 Subject: [PATCH 04/14] add one more property decoratro --- sentry_streams/sentry_streams/rust_streams.pyi | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sentry_streams/sentry_streams/rust_streams.pyi b/sentry_streams/sentry_streams/rust_streams.pyi index 6fd7cfdb..37fc4362 100644 --- a/sentry_streams/sentry_streams/rust_streams.pyi +++ b/sentry_streams/sentry_streams/rust_streams.pyi @@ -128,8 +128,6 @@ class RuntimeOperator: def PythonAdapter(cls, route: Route, delegate_Factory: RustOperatorFactory) -> Self: ... class ArroyoConsumer: - dlq_config: DlqConfig | None - def __init__( self, source: str, @@ -143,6 +141,8 @@ class ArroyoConsumer: def add_step(self, step: RuntimeOperator) -> None: ... def run(self) -> None: ... def shutdown(self) -> None: ... + @property + def dlq_config(self) -> DlqConfig | None: ... class PyAnyMessage: def __init__( From 08237378fd0fa36af0921a74378db9bdb88d1598 Mon Sep 17 00:00:00 2001 From: victoria-yining-huang Date: Mon, 20 Apr 2026 15:36:01 -0400 Subject: [PATCH 05/14] revert --- sentry_streams/src/kafka_config.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sentry_streams/src/kafka_config.rs b/sentry_streams/src/kafka_config.rs index d2dc25e2..c2d81833 100644 --- a/sentry_streams/src/kafka_config.rs +++ b/sentry_streams/src/kafka_config.rs @@ -111,15 +111,15 @@ impl From for KafkaConfig { #[derive(Debug, Clone)] pub struct PyKafkaProducerConfig { #[pyo3(get)] - pub bootstrap_servers: Vec, + bootstrap_servers: Vec, #[pyo3(get)] - pub override_params: Option>, + override_params: Option>, } #[pymethods] impl PyKafkaProducerConfig { #[new] - pub fn new( + fn new( bootstrap_servers: Vec, override_params: Option>, ) -> Self { From f15da48482c4bf68f57d925153e13ebdc9db6d06 Mon Sep 17 00:00:00 2001 From: victoria-yining-huang Date: Mon, 20 Apr 2026 15:37:55 -0400 Subject: [PATCH 06/14] pub fn new was still needed --- sentry_streams/src/kafka_config.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sentry_streams/src/kafka_config.rs b/sentry_streams/src/kafka_config.rs index c2d81833..7b02a267 100644 --- a/sentry_streams/src/kafka_config.rs +++ b/sentry_streams/src/kafka_config.rs @@ -119,7 +119,7 @@ pub struct PyKafkaProducerConfig { #[pymethods] impl PyKafkaProducerConfig { #[new] - fn new( + pub fn new( bootstrap_servers: Vec, override_params: Option>, ) -> Self { From 7925998afc09926154ad3d810c4507bb7e782d5f Mon Sep 17 00:00:00 2001 From: victoria-yining-huang Date: Mon, 20 Apr 2026 15:59:43 -0400 Subject: [PATCH 07/14] edit tests --- sentry_streams/tests/test_dlq.py | 23 ----------------------- 1 file changed, 23 deletions(-) diff --git a/sentry_streams/tests/test_dlq.py b/sentry_streams/tests/test_dlq.py index 527ba3bf..8dc42cd5 100644 --- a/sentry_streams/tests/test_dlq.py +++ b/sentry_streams/tests/test_dlq.py @@ -68,17 +68,6 @@ def test_consumer_creation( assert consumer.dlq_config.producer_config.bootstrap_servers == expected_bootstrap_servers -def test_stream_source_dlq_stream_name() -> None: - """Test StreamSource stores dlq_stream_name and dlq_config is not set until override_config.""" - source = StreamSource( - name="test_source", - stream_name="test-topic", - dlq_stream_name="my-dlq", - ) - assert source.dlq_stream_name == "my-dlq" - assert source.dlq_config is None - - def test_stream_source_no_dlq() -> None: """Test StreamSource without DLQ.""" source = StreamSource( @@ -177,15 +166,3 @@ def test_stream_source_override_config_dlq_missing_bootstrap_servers() -> None: match="DLQ config requires 'bootstrap_servers' in deployment configuration", ): source.override_config({"dlq": {"topic": "my-dlq"}}) - - -def test_stream_source_override_config_no_dlq_stream_name_ignores_config() -> None: - """Test that override_config ignores dlq config when dlq_stream_name is not set.""" - source = StreamSource( - name="my_source", - stream_name="my-topic", - ) - - source.override_config({"dlq": {"producer_config": {"bootstrap_servers": ["broker:9092"]}}}) - - assert source.dlq_config is None From 02aae450f010373056f58ce92489f8178c8a8fdf Mon Sep 17 00:00:00 2001 From: victoria-yining-huang Date: Mon, 20 Apr 2026 16:01:46 -0400 Subject: [PATCH 08/14] topic not required --- sentry_streams/sentry_streams/config.json | 1 - 1 file changed, 1 deletion(-) diff --git a/sentry_streams/sentry_streams/config.json b/sentry_streams/sentry_streams/config.json index c050d335..73a03938 100644 --- a/sentry_streams/sentry_streams/config.json +++ b/sentry_streams/sentry_streams/config.json @@ -178,7 +178,6 @@ } }, "required": [ - "topic", "producer_config" ] } From 71738242c01c455e96c9c818a61cd30e279fdc30 Mon Sep 17 00:00:00 2001 From: victoria-yining-huang Date: Mon, 20 Apr 2026 16:05:19 -0400 Subject: [PATCH 09/14] wire up json validation --- sentry_streams/sentry_streams/config.json | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/sentry_streams/sentry_streams/config.json b/sentry_streams/sentry_streams/config.json index 73a03938..8a9b939d 100644 --- a/sentry_streams/sentry_streams/config.json +++ b/sentry_streams/sentry_streams/config.json @@ -44,7 +44,15 @@ "properties": { "steps_config": { "type": "object", - "title": "steps_config" + "title": "steps_config", + "additionalProperties": { + "type": "object", + "properties": { + "dlq": { + "$ref": "#/definitions/DlqConfig" + } + } + } } } } From 0acfcd79831636f835361c0b8b4b69e04da16e43 Mon Sep 17 00:00:00 2001 From: victoria-yining-huang Date: Tue, 21 Apr 2026 17:18:28 -0400 Subject: [PATCH 10/14] add test to prove config cannot be passed in at construction time --- sentry_streams/tests/test_dlq.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/sentry_streams/tests/test_dlq.py b/sentry_streams/tests/test_dlq.py index 8dc42cd5..1252090c 100644 --- a/sentry_streams/tests/test_dlq.py +++ b/sentry_streams/tests/test_dlq.py @@ -68,6 +68,22 @@ def test_consumer_creation( assert consumer.dlq_config.producer_config.bootstrap_servers == expected_bootstrap_servers +def test_stream_source_dlq_config_not_constructor_param() -> None: + """Test that dlq_config cannot be passed as a constructor argument.""" + with pytest.raises(TypeError, match="unexpected keyword argument"): + StreamSource( # type: ignore + name="test_source", + stream_name="test-topic", + dlq_config=DlqConfig( + topic="test-dlq", + producer_config=PyKafkaProducerConfig( + bootstrap_servers=["localhost:9092"], + override_params=None, + ), + ), + ) + + def test_stream_source_no_dlq() -> None: """Test StreamSource without DLQ.""" source = StreamSource( From fb6b40d7e18cddedd18f34d0ba64db201ef118b6 Mon Sep 17 00:00:00 2001 From: victoria-yining-huang Date: Wed, 22 Apr 2026 11:49:45 -0400 Subject: [PATCH 11/14] only build dlq object at adapter layer, tests not updated --- .../adapters/arroyo/rust_arroyo.py | 13 ++++++++++- .../sentry_streams/pipeline/pipeline.py | 23 +++++++------------ 2 files changed, 20 insertions(+), 16 deletions(-) diff --git a/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py b/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py index 90fbedf1..a00e2e4c 100644 --- a/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py +++ b/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py @@ -67,6 +67,7 @@ from sentry_streams.pipeline.window import MeasurementUnit from sentry_streams.rust_streams import ( ArroyoConsumer, + DlqConfig, InitialOffset, PyKafkaConsumerConfig, PyKafkaProducerConfig, @@ -292,6 +293,16 @@ def source(self, step: Source[Any]) -> Route: step.override_config(step_config) step.validate() + dlq_config = None + if step.dlq_bootstrap_servers is not None: + dlq_config = DlqConfig( + topic=step.dlq_topic or step.dlq_stream_name or "", + producer_config=PyKafkaProducerConfig( + bootstrap_servers=step.dlq_bootstrap_servers, + override_params=step.dlq_override_params, + ), + ) + assert isinstance(self.__write_healthcheck, bool) self.__consumers[source_name] = ArroyoConsumer( source=source_name, @@ -302,7 +313,7 @@ def source(self, step: Source[Any]) -> Route: schema=schema_name, metric_config=build_py_metrics_config(self.__metrics_config), write_healthcheck=self.__write_healthcheck, - dlq_config=step.dlq_config, + dlq_config=dlq_config, ) return Route(source_name, []) diff --git a/sentry_streams/sentry_streams/pipeline/pipeline.py b/sentry_streams/sentry_streams/pipeline/pipeline.py index da522c13..75fc147c 100644 --- a/sentry_streams/sentry_streams/pipeline/pipeline.py +++ b/sentry_streams/sentry_streams/pipeline/pipeline.py @@ -44,7 +44,6 @@ ) from sentry_streams.pipeline.rust_function_protocol import InternalRustFunction from sentry_streams.pipeline.window import MeasurementUnit, TumblingWindow, Window -from sentry_streams.rust_streams import DlqConfig, PyKafkaProducerConfig RoutingFuncReturnType = TypeVar("RoutingFuncReturnType") TransformFuncReturnType = TypeVar("TransformFuncReturnType") @@ -236,37 +235,31 @@ class StreamSource(Source[bytes]): consumer_group: Optional[str] = None dlq_stream_name: Optional[str] = None step_type: StepType = StepType.SOURCE - dlq_config: Optional[DlqConfig] = field(default=None, init=False, repr=False) + dlq_topic: Optional[str] = field(default=None, init=False, repr=False) + dlq_bootstrap_servers: Optional[Sequence[str]] = field(default=None, init=False, repr=False) + dlq_override_params: Optional[Mapping[str, str]] = field(default=None, init=False, repr=False) def register(self, ctx: Pipeline[bytes], previous: Step) -> None: super().register(ctx, previous) def override_config(self, loaded_config: Mapping[str, Any]) -> None: - """Override topic, consumer_group, and dlq_config from deployment configuration.""" + """Override topic, consumer_group, and DLQ parameters from deployment configuration.""" if loaded_config.get("topic"): self.stream_name = str(loaded_config.get("topic")) if loaded_config.get("consumer_group"): self.consumer_group = str(loaded_config.get("consumer_group")) if self.dlq_stream_name is not None and loaded_config.get("dlq"): loaded_dlq = loaded_config["dlq"] - topic = loaded_dlq.get("topic", self.dlq_stream_name) + self.dlq_topic = loaded_dlq.get("topic", self.dlq_stream_name) producer_config = loaded_dlq.get("producer_config", {}) - servers = producer_config.get("bootstrap_servers") - override_params = producer_config.get("override_params") + self.dlq_bootstrap_servers = producer_config.get("bootstrap_servers") + self.dlq_override_params = producer_config.get("override_params") - if not servers: + if not self.dlq_bootstrap_servers: raise ValueError( "DLQ config requires 'bootstrap_servers' in deployment configuration" ) - self.dlq_config = DlqConfig( - topic=topic, - producer_config=PyKafkaProducerConfig( - bootstrap_servers=servers, - override_params=override_params, - ), - ) - @dataclass class WithInput(Step, Generic[TIn]): From ee6a8b7dd847084c98dc0957f8045a724944a393 Mon Sep 17 00:00:00 2001 From: victoria-yining-huang Date: Wed, 22 Apr 2026 11:59:55 -0400 Subject: [PATCH 12/14] remove data class field repr --- sentry_streams/sentry_streams/pipeline/pipeline.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sentry_streams/sentry_streams/pipeline/pipeline.py b/sentry_streams/sentry_streams/pipeline/pipeline.py index 75fc147c..e067d2e0 100644 --- a/sentry_streams/sentry_streams/pipeline/pipeline.py +++ b/sentry_streams/sentry_streams/pipeline/pipeline.py @@ -235,9 +235,9 @@ class StreamSource(Source[bytes]): consumer_group: Optional[str] = None dlq_stream_name: Optional[str] = None step_type: StepType = StepType.SOURCE - dlq_topic: Optional[str] = field(default=None, init=False, repr=False) - dlq_bootstrap_servers: Optional[Sequence[str]] = field(default=None, init=False, repr=False) - dlq_override_params: Optional[Mapping[str, str]] = field(default=None, init=False, repr=False) + dlq_topic: Optional[str] = field(default=None, init=False) + dlq_bootstrap_servers: Optional[Sequence[str]] = field(default=None, init=False) + dlq_override_params: Optional[Mapping[str, str]] = field(default=None, init=False) def register(self, ctx: Pipeline[bytes], previous: Step) -> None: super().register(ctx, previous) From e546512ccab787afabacb37c581a1f56f7a1cbd1 Mon Sep 17 00:00:00 2001 From: victoria-yining-huang Date: Fri, 24 Apr 2026 12:45:42 -0400 Subject: [PATCH 13/14] address comments --- .../adapters/arroyo/rust_arroyo.py | 37 +++++++-- sentry_streams/sentry_streams/config.json | 21 ++---- sentry_streams/sentry_streams/config_types.py | 3 +- .../sentry_streams/pipeline/pipeline.py | 18 +---- .../sentry_streams/rust_streams.pyi | 14 ++-- sentry_streams/tests/test_dlq.py | 75 +++++++------------ 6 files changed, 69 insertions(+), 99 deletions(-) diff --git a/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py b/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py index a00e2e4c..3871ea20 100644 --- a/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py +++ b/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py @@ -196,6 +196,33 @@ def build_kafka_producer_config( ) +def build_dlq_config( + dlq_stream_name: str, + step_config: Mapping[str, Any], +) -> DlqConfig | None: + """ + Build the DLQ configuration from deployment config. + Returns None if no DLQ config is present. + """ + loaded_dlq = step_config.get("dlq") + if not loaded_dlq: + return None + + topic = loaded_dlq.get("topic", dlq_stream_name) + bootstrap_servers = loaded_dlq.get("bootstrap_servers") + + if not bootstrap_servers: + raise ValueError("DLQ config requires 'bootstrap_servers' in deployment configuration") + + return DlqConfig( + topic=topic, + producer_config=PyKafkaProducerConfig( + bootstrap_servers=bootstrap_servers, + override_params=loaded_dlq.get("override_params"), + ), + ) + + def finalize_chain( chains: TransformChains, route: Route, @@ -294,14 +321,8 @@ def source(self, step: Source[Any]) -> Route: step.validate() dlq_config = None - if step.dlq_bootstrap_servers is not None: - dlq_config = DlqConfig( - topic=step.dlq_topic or step.dlq_stream_name or "", - producer_config=PyKafkaProducerConfig( - bootstrap_servers=step.dlq_bootstrap_servers, - override_params=step.dlq_override_params, - ), - ) + if step.dlq_stream_name is not None: + dlq_config = build_dlq_config(step.dlq_stream_name, step_config) assert isinstance(self.__write_healthcheck, bool) self.__consumers[source_name] = ArroyoConsumer( diff --git a/sentry_streams/sentry_streams/config.json b/sentry_streams/sentry_streams/config.json index 8a9b939d..78399aef 100644 --- a/sentry_streams/sentry_streams/config.json +++ b/sentry_streams/sentry_streams/config.json @@ -153,9 +153,13 @@ "dsn" ] }, - "DlqProducerConfig": { + "DlqConfig": { "type": "object", "properties": { + "topic": { + "type": "string", + "minLength": 1 + }, "bootstrap_servers": { "type": "array", "items": { @@ -173,21 +177,6 @@ "required": [ "bootstrap_servers" ] - }, - "DlqConfig": { - "type": "object", - "properties": { - "topic": { - "type": "string", - "minLength": 1 - }, - "producer_config": { - "$ref": "#/definitions/DlqProducerConfig" - } - }, - "required": [ - "producer_config" - ] } } } diff --git a/sentry_streams/sentry_streams/config_types.py b/sentry_streams/sentry_streams/config_types.py index aa59ce50..b9326b42 100644 --- a/sentry_streams/sentry_streams/config_types.py +++ b/sentry_streams/sentry_streams/config_types.py @@ -16,7 +16,8 @@ class DlqConfig(TypedDict, total=False): """ topic: str - producer_config: "KafkaProducerConfig" + bootstrap_servers: Sequence[str] + override_params: Mapping[str, str] class KafkaConsumerConfig(TypedDict, StepConfig): diff --git a/sentry_streams/sentry_streams/pipeline/pipeline.py b/sentry_streams/sentry_streams/pipeline/pipeline.py index e067d2e0..512c2d81 100644 --- a/sentry_streams/sentry_streams/pipeline/pipeline.py +++ b/sentry_streams/sentry_streams/pipeline/pipeline.py @@ -2,7 +2,7 @@ from abc import ABC, abstractmethod from collections import defaultdict -from dataclasses import dataclass, field +from dataclasses import dataclass from datetime import timedelta from enum import Enum from functools import partial @@ -235,30 +235,16 @@ class StreamSource(Source[bytes]): consumer_group: Optional[str] = None dlq_stream_name: Optional[str] = None step_type: StepType = StepType.SOURCE - dlq_topic: Optional[str] = field(default=None, init=False) - dlq_bootstrap_servers: Optional[Sequence[str]] = field(default=None, init=False) - dlq_override_params: Optional[Mapping[str, str]] = field(default=None, init=False) def register(self, ctx: Pipeline[bytes], previous: Step) -> None: super().register(ctx, previous) def override_config(self, loaded_config: Mapping[str, Any]) -> None: - """Override topic, consumer_group, and DLQ parameters from deployment configuration.""" + """Override topic and consumer_group from deployment configuration.""" if loaded_config.get("topic"): self.stream_name = str(loaded_config.get("topic")) if loaded_config.get("consumer_group"): self.consumer_group = str(loaded_config.get("consumer_group")) - if self.dlq_stream_name is not None and loaded_config.get("dlq"): - loaded_dlq = loaded_config["dlq"] - self.dlq_topic = loaded_dlq.get("topic", self.dlq_stream_name) - producer_config = loaded_dlq.get("producer_config", {}) - self.dlq_bootstrap_servers = producer_config.get("bootstrap_servers") - self.dlq_override_params = producer_config.get("override_params") - - if not self.dlq_bootstrap_servers: - raise ValueError( - "DLQ config requires 'bootstrap_servers' in deployment configuration" - ) @dataclass diff --git a/sentry_streams/sentry_streams/rust_streams.pyi b/sentry_streams/sentry_streams/rust_streams.pyi index 37fc4362..dad4d439 100644 --- a/sentry_streams/sentry_streams/rust_streams.pyi +++ b/sentry_streams/sentry_streams/rust_streams.pyi @@ -47,28 +47,26 @@ class PyKafkaConsumerConfig: def override_params(self) -> Mapping[str, str] | None: ... class PyKafkaProducerConfig: - bootstrap_servers: Sequence[str] - override_params: Mapping[str, str] | None - def __init__( self, bootstrap_servers: Sequence[str], override_params: Mapping[str, str] | None = None, ) -> None: ... + @property + def bootstrap_servers(self) -> Sequence[str]: ... + @property + def override_params(self) -> Mapping[str, str] | None: ... class DlqConfig: - topic: str - producer_config: PyKafkaProducerConfig - def __init__( self, topic: str, producer_config: PyKafkaProducerConfig, ) -> None: ... @property - def bootstrap_servers(self) -> Sequence[str]: ... + def topic(self) -> str: ... @property - def override_params(self) -> Mapping[str, str] | None: ... + def producer_config(self) -> PyKafkaProducerConfig: ... class PyMetricConfig: def __init__( diff --git a/sentry_streams/tests/test_dlq.py b/sentry_streams/tests/test_dlq.py index 1252090c..2efe8c14 100644 --- a/sentry_streams/tests/test_dlq.py +++ b/sentry_streams/tests/test_dlq.py @@ -2,6 +2,7 @@ import pytest +from sentry_streams.adapters.arroyo.rust_arroyo import build_dlq_config from sentry_streams.pipeline.pipeline import StreamSource from sentry_streams.rust_streams import ( ArroyoConsumer, @@ -35,12 +36,6 @@ def test_consumer_creation( expected_topic: str | None, expected_bootstrap_servers: Sequence[str] | None, ) -> None: - """Test that Rust ArroyoConsumer correctly accepts and stores DLQ configuration. - - This parameterized test verifies: - 1. Backward compatibility when DLQ param is omitted - 2. Full DLQ configuration is accepted and stored correctly - """ kafka_consumer_config = PyKafkaConsumerConfig( bootstrap_servers=["localhost:9092"], group_id="test-group", @@ -74,13 +69,7 @@ def test_stream_source_dlq_config_not_constructor_param() -> None: StreamSource( # type: ignore name="test_source", stream_name="test-topic", - dlq_config=DlqConfig( - topic="test-dlq", - producer_config=PyKafkaProducerConfig( - bootstrap_servers=["localhost:9092"], - override_params=None, - ), - ), + dlq_config="anything", ) @@ -91,7 +80,6 @@ def test_stream_source_no_dlq() -> None: stream_name="test-topic", ) assert source.dlq_stream_name is None - assert source.dlq_config is None @pytest.mark.parametrize( @@ -100,9 +88,7 @@ def test_stream_source_no_dlq() -> None: pytest.param( "my-dlq", { - "producer_config": { - "bootstrap_servers": ["broker1:9092"], - }, + "bootstrap_servers": ["broker1:9092"], }, "my-dlq", ["broker1:9092"], @@ -113,9 +99,7 @@ def test_stream_source_no_dlq() -> None: "my-dlq", { "topic": "overridden-dlq-topic", - "producer_config": { - "bootstrap_servers": ["broker1:9092"], - }, + "bootstrap_servers": ["broker1:9092"], }, "overridden-dlq-topic", ["broker1:9092"], @@ -125,14 +109,12 @@ def test_stream_source_no_dlq() -> None: pytest.param( "my-dlq", { - "producer_config": { - "bootstrap_servers": ["broker1:9092", "broker2:9092"], - "override_params": { - "security.protocol": "sasl_plaintext", - "sasl.mechanism": "SCRAM-SHA-256", - "sasl.username": "user", - "sasl.password": "pass", - }, + "bootstrap_servers": ["broker1:9092", "broker2:9092"], + "override_params": { + "security.protocol": "sasl_plaintext", + "sasl.mechanism": "SCRAM-SHA-256", + "sasl.username": "user", + "sasl.password": "pass", }, }, "my-dlq", @@ -147,38 +129,31 @@ def test_stream_source_no_dlq() -> None: ), ], ) -def test_stream_source_yaml_override_config_dlq( +def test_build_dlq_config( dlq_stream_name: str, override_dlq: Mapping[str, object], expected_topic: str, expected_bootstrap_servers: Sequence[str], expected_override_params: Mapping[str, str] | None, ) -> None: - """Test that StreamSource.override_config builds DLQ config from deployment config.""" - source = StreamSource( - name="my_source", - stream_name="my-topic", - dlq_stream_name=dlq_stream_name, - ) - - source.override_config({"dlq": override_dlq}) - - assert source.dlq_config is not None - assert source.dlq_config.topic == expected_topic - assert source.dlq_config.producer_config.bootstrap_servers == expected_bootstrap_servers - assert source.dlq_config.producer_config.override_params == expected_override_params + """Test that build_dlq_config constructs DlqConfig from deployment config.""" + result = build_dlq_config(dlq_stream_name, {"dlq": override_dlq}) + assert result is not None + assert result.topic == expected_topic + assert result.producer_config.bootstrap_servers == expected_bootstrap_servers + assert result.producer_config.override_params == expected_override_params -def test_stream_source_override_config_dlq_missing_bootstrap_servers() -> None: - """Test that override_config raises ValueError when bootstrap_servers is missing.""" - source = StreamSource( - name="my_source", - stream_name="my-topic", - dlq_stream_name="my-dlq", - ) +def test_build_dlq_config_missing_bootstrap_servers() -> None: + """Test that build_dlq_config raises ValueError when bootstrap_servers is missing.""" with pytest.raises( ValueError, match="DLQ config requires 'bootstrap_servers' in deployment configuration", ): - source.override_config({"dlq": {"topic": "my-dlq"}}) + build_dlq_config("my-dlq", {"dlq": {"topic": "my-dlq"}}) + + +def test_build_dlq_config_no_dlq_section() -> None: + """Test that build_dlq_config returns None when no dlq section in config.""" + assert build_dlq_config("my-dlq", {}) is None From 9b9b8bc223c0ce656d191195e20686fb69c287b3 Mon Sep 17 00:00:00 2001 From: victoria-yining-huang Date: Fri, 24 Apr 2026 14:53:00 -0400 Subject: [PATCH 14/14] tests are edited --- .../adapters/arroyo/rust_arroyo.py | 8 ++------ sentry_streams/tests/test_dlq.py | 16 ++++------------ 2 files changed, 6 insertions(+), 18 deletions(-) diff --git a/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py b/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py index 3871ea20..84cf393b 100644 --- a/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py +++ b/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py @@ -199,15 +199,11 @@ def build_kafka_producer_config( def build_dlq_config( dlq_stream_name: str, step_config: Mapping[str, Any], -) -> DlqConfig | None: +) -> DlqConfig: """ Build the DLQ configuration from deployment config. - Returns None if no DLQ config is present. """ - loaded_dlq = step_config.get("dlq") - if not loaded_dlq: - return None - + loaded_dlq = step_config["dlq"] topic = loaded_dlq.get("topic", dlq_stream_name) bootstrap_servers = loaded_dlq.get("bootstrap_servers") diff --git a/sentry_streams/tests/test_dlq.py b/sentry_streams/tests/test_dlq.py index 2efe8c14..345785cd 100644 --- a/sentry_streams/tests/test_dlq.py +++ b/sentry_streams/tests/test_dlq.py @@ -63,22 +63,13 @@ def test_consumer_creation( assert consumer.dlq_config.producer_config.bootstrap_servers == expected_bootstrap_servers -def test_stream_source_dlq_config_not_constructor_param() -> None: - """Test that dlq_config cannot be passed as a constructor argument.""" - with pytest.raises(TypeError, match="unexpected keyword argument"): - StreamSource( # type: ignore - name="test_source", - stream_name="test-topic", - dlq_config="anything", - ) - - def test_stream_source_no_dlq() -> None: """Test StreamSource without DLQ.""" source = StreamSource( name="test_source", stream_name="test-topic", ) + assert source is not None assert source.dlq_stream_name is None @@ -155,5 +146,6 @@ def test_build_dlq_config_missing_bootstrap_servers() -> None: def test_build_dlq_config_no_dlq_section() -> None: - """Test that build_dlq_config returns None when no dlq section in config.""" - assert build_dlq_config("my-dlq", {}) is None + """Test that build_dlq_config raises KeyError when no dlq section in config.""" + with pytest.raises(KeyError): + build_dlq_config("my-dlq", {})