Skip to content

feat(dlq): add dlq support (no-op)#277

Open
victoria-yining-huang wants to merge 14 commits intomainfrom
vic/add_dlq
Open

feat(dlq): add dlq support (no-op)#277
victoria-yining-huang wants to merge 14 commits intomainfrom
vic/add_dlq

Conversation

@victoria-yining-huang
Copy link
Copy Markdown
Contributor

@victoria-yining-huang victoria-yining-huang commented Mar 20, 2026

ticket

PR1 (this):

  • noop
  • add arroyo dlq support that can reach rust-arroyo
  • json validation

example config file schema:

  env: {}
                                                                                                                                                                                              
  pipeline:
    segments:                                                                                                                                                                                 
      - steps_config:                                                                                                                                                                       
          myinput:                                                                                                                                                                            
            starts_segment: True
            bootstrap_servers:                                                                                                                                                                
              - "localhost:9092"                                                                                                                                                              
            consumer_group: "my-consumer-group"
            dlq:                                                                                                                                                                              
              topic: "custom-dlq-topic"  # optional, defaults to dlq_stream_name                                                                                                                                                                                                                                                                       
              bootstrap_servers:        # required
                - "localhost:9092"                                                                                                                                                          
              override_params:          # optional                                                                                                                                          
                security.protocol: "sasl_plaintext"
                sasl.mechanism: "SCRAM-SHA-256"                                                                                                                                             
                sasl.username: "user"                                                                                                                                                       
                sasl.password: "pass"
                                                                                                                                                                                              
          mysink:                                                                                                                                                                           
            bootstrap_servers:
              - "localhost:9092"                     

And the corresponding pipeline code would just be:

                                                                                                                                                                               
  source = pipeline.source(
      StreamSource(                                                                                                                                                                           
          name="myinput",
          stream_name="my-input-stream",                                                                                                                                                      
          dlq_stream_name="my-dlq",  # that's it, no kafka details                                                                                                                          
      )                                                                                                                                                                                       
  )
        

PR2: default config
PR3: wire everything together, end to end testing
PR4: enable by default, deploy some default topics

@victoria-yining-huang victoria-yining-huang requested a review from a team as a code owner March 20, 2026 19:50
@github-actions
Copy link
Copy Markdown

github-actions Bot commented Mar 20, 2026

Semver Impact of This PR

🟡 Minor (new features)

📋 Changelog Preview

This is how your changes will appear in the changelog.
Entries from this PR are highlighted with a left border (blockquote style).


New Features ✨

  • (dlq) Add dlq support (no-op) by victoria-yining-huang in #277

Bug Fixes 🐛

  • (kafka config) Connect override_params from yaml config to rust arroyo by victoria-yining-huang in #291
  • (type checking) Add property decorators by victoria-yining-huang in #297

Internal Changes 🔧

  • (deps-dev) Bump pytest from 8.4.1 to 9.0.3 in /sentry_streams by dependabot in #295

🤖 This preview updates automatically when you update the PR.

Comment thread sentry_streams/sentry_streams/config_types.py
Copy link
Copy Markdown

@cursor cursor Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

Autofix Details

Bugbot Autofix prepared a fix for the issue found in the latest run.

  • ✅ Fixed: Python type stub missing new DLQ class and parameter
    • Updated rust_streams.pyi to add PyDlqConfig and the optional dlq_config argument on ArroyoConsumer.__init__ to match the Rust-exposed interface.

Create PR

Or push these changes by commenting:

@cursor push cb8d770063
Preview (cb8d770063)
diff --git a/sentry_streams/sentry_streams/rust_streams.pyi b/sentry_streams/sentry_streams/rust_streams.pyi
--- a/sentry_streams/sentry_streams/rust_streams.pyi
+++ b/sentry_streams/sentry_streams/rust_streams.pyi
@@ -41,6 +41,12 @@
         override_params: Mapping[str, str],
     ) -> None: ...
 
+class PyDlqConfig:
+    topic: str
+    producer_config: PyKafkaProducerConfig
+
+    def __init__(self, topic: str, producer_config: PyKafkaProducerConfig) -> None: ...
+
 class PyMetricConfig:
     def __init__(
         self,
@@ -105,6 +111,7 @@
         schema: str | None,
         metric_config: PyMetricConfig | None = None,
         write_healthcheck: bool = False,
+        dlq_config: PyDlqConfig | None = None,
     ) -> None: ...
     def add_step(self, step: RuntimeOperator) -> None: ...
     def run(self) -> None: ...

This Bugbot Autofix run was free. To enable autofix for future PRs, go to the Cursor dashboard.

Comment thread sentry_streams/src/consumer.rs
@victoria-yining-huang victoria-yining-huang changed the title feat(dlq): add dlq support rust side feat(dlq): add dlq support (no-op) Mar 23, 2026
Comment thread sentry_streams/sentry_streams/pipeline/pipeline.py Outdated
Comment thread sentry_streams/sentry_streams/pipeline/pipeline.py Outdated
Copy link
Copy Markdown
Collaborator

@fpacifici fpacifici left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please see my comments on the unit tests and on the defaults.

Also please adjust the example pipelines to set up the DLQ. Since this cannot be tested with unit test consider adapting https://github.com/getsentry/streams/blob/main/sentry_streams/integration_tests/test_example_pipelines.py

Comment thread sentry_streams/src/consumer.rs Outdated
Comment thread sentry_streams/tests/test_dlq.py Outdated
Comment thread sentry_streams/tests/test_dlq.py Outdated
Comment thread sentry_streams/tests/test_dlq.py Outdated
Comment thread sentry_streams/src/consumer.rs Outdated
Comment thread sentry_streams/src/consumer.rs Outdated
/// Builds the DLQ policy if dlq_config is provided.
/// Returns None if DLQ is not configured.
fn build_dlq_policy(&self) -> Option<DlqPolicy<KafkaPayload>> {
match &self.dlq_config {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Making this a standalone funciton also allows you not to have this case for when the config is not provided.

Comment thread sentry_streams/src/consumer.rs Outdated
/// When provided, invalid messages will be sent to the DLQ topic.
#[pyclass]
#[derive(Debug, Clone)]
pub struct PyDlqConfig {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why PyDlqConfig rather than DlqConfig? Is there a rust version that we need to distinguish from ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just saw that it's a naming convention we have in the repo, like PyKafkaConsumerConfig. I assume it means "this Rust struct is meant for coming from Python"

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is not supposed to be a coding convention. We had PyKafkaConsumerConfig to distinguish it from KafkaConsumerConfig which already existed in the Rust code base. Under normal operations you do not need to add Py in front of an object you expose to python.

Comment on lines +264 to +265
topic=dlq_data["topic"],
bootstrap_servers=dlq_data["bootstrap_servers"],
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if the user wants to override only the topic name and leave the bootstrap_servers as they are?
Also should we make the bootstrap_servers default to the same one we use for the StreamingSource ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added individual field overriding ability

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

default value i was planning on handling in the next PR, see my PR description

Comment thread sentry_streams/sentry_streams/pipeline/pipeline.py Outdated
Comment thread sentry_streams/sentry_streams/pipeline/pipeline.py Outdated
Comment thread sentry_streams/sentry_streams/pipeline/pipeline.py Outdated
Comment thread sentry_streams/sentry_streams/pipeline/pipeline.py Outdated
Comment thread sentry_streams/sentry_streams/pipeline/pipeline.py Outdated
Comment thread sentry_streams/sentry_streams/pipeline/pipeline.py Outdated
Comment thread sentry_streams/sentry_streams/pipeline/pipeline.py Outdated
Comment thread sentry_streams/sentry_streams/config_types.py Outdated
topic=step.dlq_config.topic,
producer_config=PyKafkaProducerConfig(
bootstrap_servers=step.dlq_config.bootstrap_servers,
override_params=None,
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This cannot be hardcoded to None, we almsot always apply some override parameters.
See for example all producers in s4s2 have authentication arguments
https://github.com/getsentry/ops/blob/master/k8s/services/super-big-consumers/_errors_config.yaml#L5-L10

"""

topic: str
bootstrap_servers: Sequence[str]
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my comment above. If you conenct to kafka, either as producer or consumer, you need to allow the user to override the connection parameters. https://github.com/getsentry/ops/blob/master/k8s/services/super-big-consumers/_errors_config.yaml#L5-L10

Comment thread sentry_streams/src/consumer.rs Outdated
/// When provided, invalid messages will be sent to the DLQ topic.
#[pyclass]
#[derive(Debug, Clone)]
pub struct PyDlqConfig {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is not supposed to be a coding convention. We had PyKafkaConsumerConfig to distinguish it from KafkaConsumerConfig which already existed in the Rust code base. Under normal operations you do not need to add Py in front of an object you expose to python.

Comment thread sentry_streams/tests/test_dlq.py Outdated
Copy link
Copy Markdown

@cursor cursor Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

Autofix Details

Bugbot Autofix prepared a fix for the issue found in the latest run.

  • ✅ Fixed: DLQ producer config drops override_params needed for auth
    • I added override_params to DLQ pipeline config propagation and updated DLQ producer construction and tests so auth-related override parameters are preserved end-to-end.

Create PR

Or push these changes by commenting:

@cursor push 212acf9bdc
Preview (212acf9bdc)
diff --git a/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py b/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py
--- a/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py
+++ b/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py
@@ -186,7 +186,7 @@
         topic=step.dlq_config.topic,
         producer_config=PyKafkaProducerConfig(
             bootstrap_servers=step.dlq_config.bootstrap_servers,
-            override_params=None,
+            override_params=step.dlq_config.override_params,
         ),
     )
 

diff --git a/sentry_streams/sentry_streams/pipeline/pipeline.py b/sentry_streams/sentry_streams/pipeline/pipeline.py
--- a/sentry_streams/sentry_streams/pipeline/pipeline.py
+++ b/sentry_streams/sentry_streams/pipeline/pipeline.py
@@ -2,7 +2,7 @@
 
 from abc import ABC, abstractmethod
 from collections import defaultdict
-from dataclasses import dataclass
+from dataclasses import dataclass, field
 from datetime import timedelta
 from enum import Enum
 from functools import partial
@@ -235,6 +235,7 @@
 
     topic: str
     bootstrap_servers: Sequence[str]
+    override_params: Mapping[str, str] = field(default_factory=dict)
 
 
 @dataclass
@@ -267,6 +268,9 @@
             servers = producer_config.get("bootstrap_servers") or (
                 self.dlq_config.bootstrap_servers if self.dlq_config else None
             )
+            override_params = producer_config.get("override_params")
+            if override_params is None:
+                override_params = self.dlq_config.override_params if self.dlq_config else {}
 
             if topic is None or servers is None:
                 raise ValueError("DLQ config requires both 'topic' and 'bootstrap_servers'")
@@ -274,6 +278,7 @@
             self.dlq_config = DlqConfig(
                 topic=topic,
                 bootstrap_servers=servers,
+                override_params=cast(Mapping[str, str], override_params),
             )
 
 

diff --git a/sentry_streams/tests/test_dlq.py b/sentry_streams/tests/test_dlq.py
--- a/sentry_streams/tests/test_dlq.py
+++ b/sentry_streams/tests/test_dlq.py
@@ -68,21 +68,28 @@
 
 
 @pytest.mark.parametrize(
-    "dlq_config, expected_topic, expected_bootstrap_servers",
+    "dlq_config, expected_topic, expected_bootstrap_servers, expected_override_params",
     [
-        pytest.param(None, None, None, id="no_dlq_config"),
+        pytest.param(None, None, None, None, id="no_dlq_config"),
         pytest.param(
-            DlqConfig(topic="test-dlq", bootstrap_servers=["localhost:9092"]),
+            DlqConfig(
+                topic="test-dlq",
+                bootstrap_servers=["localhost:9092"],
+                override_params={"sasl.username": "test"},
+            ),
             "test-dlq",
             ["localhost:9092"],
+            {"sasl.username": "test"},
             id="single_bootstrap_server",
         ),
         pytest.param(
             DlqConfig(
-                topic="my-dlq", bootstrap_servers=["broker1:9092", "broker2:9092", "broker3:9092"]
+                topic="my-dlq",
+                bootstrap_servers=["broker1:9092", "broker2:9092", "broker3:9092"],
             ),
             "my-dlq",
             ["broker1:9092", "broker2:9092", "broker3:9092"],
+            {},
             id="multiple_bootstrap_servers",
         ),
     ],
@@ -91,6 +98,7 @@
     dlq_config: DlqConfig | None,
     expected_topic: str | None,
     expected_bootstrap_servers: list[str] | None,
+    expected_override_params: dict[str, str] | None,
 ) -> None:
     """Test build_dlq_config returns correct PyDlqConfig for various inputs."""
     source = StreamSource(
@@ -109,47 +117,75 @@
         assert result.topic == expected_topic
         assert result.producer_config is not None
         assert result.producer_config.bootstrap_servers == expected_bootstrap_servers
-        assert result.producer_config.override_params is None
+        assert result.producer_config.override_params == expected_override_params
 
 
 @pytest.mark.parametrize(
-    "initial_dlq_config, override_dlq, expected_topic, expected_bootstrap_servers",
+    "initial_dlq_config, override_dlq, expected_topic, expected_bootstrap_servers, expected_override_params",
     [
         pytest.param(
             None,
-            {"topic": "new-dlq", "producer_config": {"bootstrap_servers": ["broker1:9092"]}},
+            {
+                "topic": "new-dlq",
+                "producer_config": {
+                    "bootstrap_servers": ["broker1:9092"],
+                    "override_params": {"security.protocol": "SASL_SSL"},
+                },
+            },
             "new-dlq",
             ["broker1:9092"],
+            {"security.protocol": "SASL_SSL"},
             id="create_new_config",
         ),
         pytest.param(
-            DlqConfig(topic="old-dlq", bootstrap_servers=["old-broker:9092"]),
+            DlqConfig(
+                topic="old-dlq",
+                bootstrap_servers=["old-broker:9092"],
+                override_params={"old.param": "old-value"},
+            ),
             {"topic": "new-dlq"},
             "new-dlq",
             ["old-broker:9092"],
+            {"old.param": "old-value"},
             id="override_topic_only",
         ),
         pytest.param(
-            DlqConfig(topic="old-dlq", bootstrap_servers=["old-broker:9092"]),
-            {"producer_config": {"bootstrap_servers": ["new-broker:9092", "new-broker2:9092"]}},
+            DlqConfig(
+                topic="old-dlq",
+                bootstrap_servers=["old-broker:9092"],
+                override_params={"old.param": "old-value"},
+            ),
+            {
+                "producer_config": {
+                    "bootstrap_servers": ["new-broker:9092", "new-broker2:9092"],
+                    "override_params": {"new.param": "new-value"},
+                }
+            },
             "old-dlq",
             ["new-broker:9092", "new-broker2:9092"],
+            {"new.param": "new-value"},
             id="override_bootstrap_servers_only",
         ),
         pytest.param(
-            DlqConfig(topic="old-dlq", bootstrap_servers=["old-broker:9092"]),
+            DlqConfig(
+                topic="old-dlq",
+                bootstrap_servers=["old-broker:9092"],
+                override_params={"old.param": "old-value"},
+            ),
             {"topic": "new-dlq", "producer_config": {"bootstrap_servers": ["new-broker:9092"]}},
             "new-dlq",
             ["new-broker:9092"],
+            {"old.param": "old-value"},
             id="override_both_fields",
         ),
     ],
 )
 def test_stream_source_override_config_dlq(
     initial_dlq_config: DlqConfig | None,
-    override_dlq: dict[str, str | list[str]],
+    override_dlq: dict[str, object],
     expected_topic: str,
     expected_bootstrap_servers: list[str],
+    expected_override_params: dict[str, str],
 ) -> None:
     """Test that StreamSource.override_config correctly overrides DLQ settings."""
     source = StreamSource(
@@ -163,3 +199,4 @@
     assert source.dlq_config is not None
     assert source.dlq_config.topic == expected_topic
     assert source.dlq_config.bootstrap_servers == expected_bootstrap_servers
+    assert source.dlq_config.override_params == expected_override_params

This Bugbot Autofix run was free. To enable autofix for future PRs, go to the Cursor dashboard.

Comment thread sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py
Comment thread sentry_streams/sentry_streams/pipeline/pipeline.py Outdated
Comment thread sentry_streams/tests/test_dlq.py
Comment thread sentry_streams/sentry_streams/pipeline/pipeline.py Outdated
Comment thread sentry_streams/sentry_streams/pipeline/pipeline.py Outdated
Comment thread sentry_streams/sentry_streams/config_types.py Outdated
Comment thread sentry_streams/sentry_streams/rust_streams.pyi
def PythonAdapter(cls, route: Route, delegate_Factory: RustOperatorFactory) -> Self: ...

class ArroyoConsumer:
dlq_config: DlqConfig | None
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like that this field is explicitly defined here, but it's also obviously not the only field in the class. Would it make sense to explicitly add the other fields here too?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I see why, is it because only dlq_config has the pyo3(get) attribute in the Rust struct?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove this. pyi files, here, are only for interfaces of objects exposed by rust. If you need to expose dlq_config make it a property like you see in PyMetricConfig above.

Comment thread sentry_streams/src/consumer.rs
Copy link
Copy Markdown
Member

@george-sentry george-sentry left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nothing seems suspicious and Filippo's comments are addressed.

Copy link
Copy Markdown
Collaborator

@fpacifici fpacifici left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the producer configuration is becoming confusing. There are issues with consistency and misalignment in what should be part of the pipeline objects vs override config.

What do you think about taking a detour, sending first a PR to fix the producer config? then this will become easier:

Comment on lines +167 to +173
"topic": {
"type": "string",
"minLength": 1
},
"producer_config": {
"$ref": "#/definitions/DlqProducerConfig"
}
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the other scenarios where we allow to define a topic and override parameters we follow this structure:

streaming_source:
    bootstrap_servers: {{ kafka_bootstrap_servers }}
    auto_offset_reset: latest
    consumer_group: pipeline-sbc-items-log
    topic: asdasdasd
    override_params:
        asdasdasd

Topic, bootstrap_servers and override_params are at the same level. Is there a reason to have the structure you added?

topic: asdasd
producer_config: 
    bootstrap_servers: asdasdasd
    override_config:
        asdasd

If not, please follow the same structure of the consumer config (or change that one as well) to keep consistency in the schema.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

topic and override_config in the streaming_source belong to the producer config. If we flatten the structure out, topic sounds like it's now the source topic, not the dlq topic. override_params just sounds vague. This is why I added the producer_config key

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but here we are talking about the structure of the DlqConfig object.
In your implementation it would look like:

dlq_config:
  topic: asdasd 
  producer_config: 
     bootstrap_servers: asdasdasd
     override_config:
        asdasd

why not:

dlq_config:
  topic: asdasd 
  bootstrap_servers: asdasdasd
  override_config:
        asdasd

"""

topic: str
producer_config: "KafkaProducerConfig"
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is the right structure.
We do not allow sinks or sources to define the bootstrap_servers in code. This is only allowed in the config and we should stay consistent with that:
StreamingSource and StreamingSinks have the following features:

  • Do not name topics: just stream name as kafka is not exposed.
  • Have a default topic that is equal to the stream name
  • regrettably I added consumer group to the StreamingSource. that was a mistake. It hsould have been config only.
  • override parameters, consumer group, topic override, bootstrap servers are only in config as they are kafka specific.

Please follow that pattern:

  • DlqConfig should only have a stream_name. Everything else is in config. As you may see the type you are referncing here from the pipeline.py file is in config_types which are only supposed to define the structure of the config file. This should raise the concern that these details should not be in the pipeline object class.

Comment on lines +251 to +258
topic = loaded_dlq.get("topic") or (self.dlq_config.topic if self.dlq_config else None)
# bootstrap_servers and override_params are nested under producer_config
producer_config = loaded_dlq.get("producer_config", {})
servers = producer_config.get("bootstrap_servers") or (
self.dlq_config.producer_config.bootstrap_servers if self.dlq_config else None
)
override_params = producer_config.get("override_params") or (
self.dlq_config.producer_config.override_params if self.dlq_config else None
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my comment below regarding the content of the DlqConfig: https://github.com/getsentry/streams/pull/277/changes#r3051786666.
We should not have attributes like bootstrap_servers or override_params in the pipeline object. They should only defined in config as the StreamSource should be kafka independent.

Comment thread sentry_streams/sentry_streams/config_types.py Outdated
def PythonAdapter(cls, route: Route, delegate_Factory: RustOperatorFactory) -> Self: ...

class ArroyoConsumer:
dlq_config: DlqConfig | None
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove this. pyi files, here, are only for interfaces of objects exposed by rust. If you need to expose dlq_config make it a property like you see in PyMetricConfig above.

Comment on lines +203 to +204
let dlq_limit = DlqLimit::default();
let max_buffered_messages = None;
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to make these configurable now. It is not a quick operation to expose this in config. We need a PR in streams, release it, upgrade it in the client code and ship the client code. Not something that can be done quickly after an incident.
We already know we need to customize these today.

make it part of StreamSource

feat(streams): Add Dead Letter Queue (DLQ) support to StreamSource

Adds DLQ configuration support throughout the streaming platform:
- Add PyDlqConfig type stub and build_dlq_config() helper
- Wire DLQ config from StreamSource to Rust ArroyoConsumer
- Support DLQ override from YAML deployment configuration
- Add comprehensive test coverage for DLQ functionality

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>

remove one redundant test

parameterized tests for creating consumer

add unit test for consumer.build_dlq_policy

make new function pub

add parameterized python test for build_dlq_config

update tests

standalone function

remove dlq enabled field

 allow overriding dlq config field by field

comment

remove unused pub new

finalize tests

simpify

fix schema

fix test

rename DlqConfig to DlqSettings

remove DlqSettings

remove build_dlq_config

finalize tests

use Mapping and Sequence for type hints

add an error test case

add json validation
Comment thread sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py Outdated
Comment thread sentry_streams/sentry_streams/rust_streams.pyi
Comment thread sentry_streams/sentry_streams/config_types.py Outdated
Comment thread sentry_streams/sentry_streams/config.json Outdated
Comment thread sentry_streams/sentry_streams/config.json Outdated
Comment on lines +264 to +267
producer_config=PyKafkaProducerConfig(
bootstrap_servers=servers,
override_params=override_params,
),
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You cannot depend on adapter specific classes in the pipeline code.

  • PyKafkaProducerConfig is a class that is specific of rust arroyo internal implementation.
  • Everything in the pipeline.py file is the adapter independent code to define the pipeline.

Also we still have the same problem discussed here https://github.com/getsentry/streams/pull/277/changes#r3051786666.

In this version the StreamSource class allows the user to specify the DlqConfig in code, which contains kafka conneciton parameters. These should not be manageable in the pipeline definition. they are reserved to the config.

You may see in the rust arroyo implementation that all these config only parameters are overridden inside the adapter code and not in the pipeline code (https://github.com/getsentry/streams/blob/main/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py#L167).
While it is not a great design to be able to override things in two places, preserving the separation between what can be configured in code and what cannot is more important

Comment on lines +167 to +173
"topic": {
"type": "string",
"minLength": 1
},
"producer_config": {
"$ref": "#/definitions/DlqProducerConfig"
}
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but here we are talking about the structure of the DlqConfig object.
In your implementation it would look like:

dlq_config:
  topic: asdasd 
  producer_config: 
     bootstrap_servers: asdasdasd
     override_config:
        asdasd

why not:

dlq_config:
  topic: asdasd 
  bootstrap_servers: asdasdasd
  override_config:
        asdasd

)
from sentry_streams.pipeline.rust_function_protocol import InternalRustFunction
from sentry_streams.pipeline.window import MeasurementUnit, TumblingWindow, Window
from sentry_streams.rust_streams import DlqConfig, PyKafkaProducerConfig
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You cannot import the internals of an adapter from the adapter agnostic code (pipeline.py). I think DlqConfig is defined in the wrong place.

Comment thread sentry_streams/sentry_streams/pipeline/pipeline.py Outdated
Copy link
Copy Markdown

@cursor cursor Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

Fix All in Cursor

❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.

Reviewed by Cursor Bugbot for commit fb6b40d. Configure here.

Comment thread sentry_streams/tests/test_dlq.py Outdated
Comment thread sentry_streams/tests/test_dlq.py Outdated
Comment thread sentry_streams/tests/test_dlq.py Outdated
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants