feat(dlq): add dlq support (no-op)#277
Conversation
Semver Impact of This PR🟡 Minor (new features) 📋 Changelog PreviewThis is how your changes will appear in the changelog. New Features ✨
Bug Fixes 🐛
Internal Changes 🔧
🤖 This preview updates automatically when you update the PR. |
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
Autofix Details
Bugbot Autofix prepared a fix for the issue found in the latest run.
- ✅ Fixed: Python type stub missing new DLQ class and parameter
- Updated
rust_streams.pyito addPyDlqConfigand the optionaldlq_configargument onArroyoConsumer.__init__to match the Rust-exposed interface.
- Updated
Or push these changes by commenting:
@cursor push cb8d770063
Preview (cb8d770063)
diff --git a/sentry_streams/sentry_streams/rust_streams.pyi b/sentry_streams/sentry_streams/rust_streams.pyi
--- a/sentry_streams/sentry_streams/rust_streams.pyi
+++ b/sentry_streams/sentry_streams/rust_streams.pyi
@@ -41,6 +41,12 @@
override_params: Mapping[str, str],
) -> None: ...
+class PyDlqConfig:
+ topic: str
+ producer_config: PyKafkaProducerConfig
+
+ def __init__(self, topic: str, producer_config: PyKafkaProducerConfig) -> None: ...
+
class PyMetricConfig:
def __init__(
self,
@@ -105,6 +111,7 @@
schema: str | None,
metric_config: PyMetricConfig | None = None,
write_healthcheck: bool = False,
+ dlq_config: PyDlqConfig | None = None,
) -> None: ...
def add_step(self, step: RuntimeOperator) -> None: ...
def run(self) -> None: ...This Bugbot Autofix run was free. To enable autofix for future PRs, go to the Cursor dashboard.
fpacifici
left a comment
There was a problem hiding this comment.
Please see my comments on the unit tests and on the defaults.
Also please adjust the example pipelines to set up the DLQ. Since this cannot be tested with unit test consider adapting https://github.com/getsentry/streams/blob/main/sentry_streams/integration_tests/test_example_pipelines.py
| /// Builds the DLQ policy if dlq_config is provided. | ||
| /// Returns None if DLQ is not configured. | ||
| fn build_dlq_policy(&self) -> Option<DlqPolicy<KafkaPayload>> { | ||
| match &self.dlq_config { |
There was a problem hiding this comment.
Making this a standalone funciton also allows you not to have this case for when the config is not provided.
| /// When provided, invalid messages will be sent to the DLQ topic. | ||
| #[pyclass] | ||
| #[derive(Debug, Clone)] | ||
| pub struct PyDlqConfig { |
There was a problem hiding this comment.
Why PyDlqConfig rather than DlqConfig? Is there a rust version that we need to distinguish from ?
There was a problem hiding this comment.
I just saw that it's a naming convention we have in the repo, like PyKafkaConsumerConfig. I assume it means "this Rust struct is meant for coming from Python"
There was a problem hiding this comment.
That is not supposed to be a coding convention. We had PyKafkaConsumerConfig to distinguish it from KafkaConsumerConfig which already existed in the Rust code base. Under normal operations you do not need to add Py in front of an object you expose to python.
| topic=dlq_data["topic"], | ||
| bootstrap_servers=dlq_data["bootstrap_servers"], |
There was a problem hiding this comment.
What if the user wants to override only the topic name and leave the bootstrap_servers as they are?
Also should we make the bootstrap_servers default to the same one we use for the StreamingSource ?
There was a problem hiding this comment.
added individual field overriding ability
There was a problem hiding this comment.
default value i was planning on handling in the next PR, see my PR description
| topic=step.dlq_config.topic, | ||
| producer_config=PyKafkaProducerConfig( | ||
| bootstrap_servers=step.dlq_config.bootstrap_servers, | ||
| override_params=None, |
There was a problem hiding this comment.
This cannot be hardcoded to None, we almsot always apply some override parameters.
See for example all producers in s4s2 have authentication arguments
https://github.com/getsentry/ops/blob/master/k8s/services/super-big-consumers/_errors_config.yaml#L5-L10
| """ | ||
|
|
||
| topic: str | ||
| bootstrap_servers: Sequence[str] |
There was a problem hiding this comment.
See my comment above. If you conenct to kafka, either as producer or consumer, you need to allow the user to override the connection parameters. https://github.com/getsentry/ops/blob/master/k8s/services/super-big-consumers/_errors_config.yaml#L5-L10
| /// When provided, invalid messages will be sent to the DLQ topic. | ||
| #[pyclass] | ||
| #[derive(Debug, Clone)] | ||
| pub struct PyDlqConfig { |
There was a problem hiding this comment.
That is not supposed to be a coding convention. We had PyKafkaConsumerConfig to distinguish it from KafkaConsumerConfig which already existed in the Rust code base. Under normal operations you do not need to add Py in front of an object you expose to python.
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
Autofix Details
Bugbot Autofix prepared a fix for the issue found in the latest run.
- ✅ Fixed: DLQ producer config drops override_params needed for auth
- I added
override_paramsto DLQ pipeline config propagation and updated DLQ producer construction and tests so auth-related override parameters are preserved end-to-end.
- I added
Or push these changes by commenting:
@cursor push 212acf9bdc
Preview (212acf9bdc)
diff --git a/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py b/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py
--- a/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py
+++ b/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py
@@ -186,7 +186,7 @@
topic=step.dlq_config.topic,
producer_config=PyKafkaProducerConfig(
bootstrap_servers=step.dlq_config.bootstrap_servers,
- override_params=None,
+ override_params=step.dlq_config.override_params,
),
)
diff --git a/sentry_streams/sentry_streams/pipeline/pipeline.py b/sentry_streams/sentry_streams/pipeline/pipeline.py
--- a/sentry_streams/sentry_streams/pipeline/pipeline.py
+++ b/sentry_streams/sentry_streams/pipeline/pipeline.py
@@ -2,7 +2,7 @@
from abc import ABC, abstractmethod
from collections import defaultdict
-from dataclasses import dataclass
+from dataclasses import dataclass, field
from datetime import timedelta
from enum import Enum
from functools import partial
@@ -235,6 +235,7 @@
topic: str
bootstrap_servers: Sequence[str]
+ override_params: Mapping[str, str] = field(default_factory=dict)
@dataclass
@@ -267,6 +268,9 @@
servers = producer_config.get("bootstrap_servers") or (
self.dlq_config.bootstrap_servers if self.dlq_config else None
)
+ override_params = producer_config.get("override_params")
+ if override_params is None:
+ override_params = self.dlq_config.override_params if self.dlq_config else {}
if topic is None or servers is None:
raise ValueError("DLQ config requires both 'topic' and 'bootstrap_servers'")
@@ -274,6 +278,7 @@
self.dlq_config = DlqConfig(
topic=topic,
bootstrap_servers=servers,
+ override_params=cast(Mapping[str, str], override_params),
)
diff --git a/sentry_streams/tests/test_dlq.py b/sentry_streams/tests/test_dlq.py
--- a/sentry_streams/tests/test_dlq.py
+++ b/sentry_streams/tests/test_dlq.py
@@ -68,21 +68,28 @@
@pytest.mark.parametrize(
- "dlq_config, expected_topic, expected_bootstrap_servers",
+ "dlq_config, expected_topic, expected_bootstrap_servers, expected_override_params",
[
- pytest.param(None, None, None, id="no_dlq_config"),
+ pytest.param(None, None, None, None, id="no_dlq_config"),
pytest.param(
- DlqConfig(topic="test-dlq", bootstrap_servers=["localhost:9092"]),
+ DlqConfig(
+ topic="test-dlq",
+ bootstrap_servers=["localhost:9092"],
+ override_params={"sasl.username": "test"},
+ ),
"test-dlq",
["localhost:9092"],
+ {"sasl.username": "test"},
id="single_bootstrap_server",
),
pytest.param(
DlqConfig(
- topic="my-dlq", bootstrap_servers=["broker1:9092", "broker2:9092", "broker3:9092"]
+ topic="my-dlq",
+ bootstrap_servers=["broker1:9092", "broker2:9092", "broker3:9092"],
),
"my-dlq",
["broker1:9092", "broker2:9092", "broker3:9092"],
+ {},
id="multiple_bootstrap_servers",
),
],
@@ -91,6 +98,7 @@
dlq_config: DlqConfig | None,
expected_topic: str | None,
expected_bootstrap_servers: list[str] | None,
+ expected_override_params: dict[str, str] | None,
) -> None:
"""Test build_dlq_config returns correct PyDlqConfig for various inputs."""
source = StreamSource(
@@ -109,47 +117,75 @@
assert result.topic == expected_topic
assert result.producer_config is not None
assert result.producer_config.bootstrap_servers == expected_bootstrap_servers
- assert result.producer_config.override_params is None
+ assert result.producer_config.override_params == expected_override_params
@pytest.mark.parametrize(
- "initial_dlq_config, override_dlq, expected_topic, expected_bootstrap_servers",
+ "initial_dlq_config, override_dlq, expected_topic, expected_bootstrap_servers, expected_override_params",
[
pytest.param(
None,
- {"topic": "new-dlq", "producer_config": {"bootstrap_servers": ["broker1:9092"]}},
+ {
+ "topic": "new-dlq",
+ "producer_config": {
+ "bootstrap_servers": ["broker1:9092"],
+ "override_params": {"security.protocol": "SASL_SSL"},
+ },
+ },
"new-dlq",
["broker1:9092"],
+ {"security.protocol": "SASL_SSL"},
id="create_new_config",
),
pytest.param(
- DlqConfig(topic="old-dlq", bootstrap_servers=["old-broker:9092"]),
+ DlqConfig(
+ topic="old-dlq",
+ bootstrap_servers=["old-broker:9092"],
+ override_params={"old.param": "old-value"},
+ ),
{"topic": "new-dlq"},
"new-dlq",
["old-broker:9092"],
+ {"old.param": "old-value"},
id="override_topic_only",
),
pytest.param(
- DlqConfig(topic="old-dlq", bootstrap_servers=["old-broker:9092"]),
- {"producer_config": {"bootstrap_servers": ["new-broker:9092", "new-broker2:9092"]}},
+ DlqConfig(
+ topic="old-dlq",
+ bootstrap_servers=["old-broker:9092"],
+ override_params={"old.param": "old-value"},
+ ),
+ {
+ "producer_config": {
+ "bootstrap_servers": ["new-broker:9092", "new-broker2:9092"],
+ "override_params": {"new.param": "new-value"},
+ }
+ },
"old-dlq",
["new-broker:9092", "new-broker2:9092"],
+ {"new.param": "new-value"},
id="override_bootstrap_servers_only",
),
pytest.param(
- DlqConfig(topic="old-dlq", bootstrap_servers=["old-broker:9092"]),
+ DlqConfig(
+ topic="old-dlq",
+ bootstrap_servers=["old-broker:9092"],
+ override_params={"old.param": "old-value"},
+ ),
{"topic": "new-dlq", "producer_config": {"bootstrap_servers": ["new-broker:9092"]}},
"new-dlq",
["new-broker:9092"],
+ {"old.param": "old-value"},
id="override_both_fields",
),
],
)
def test_stream_source_override_config_dlq(
initial_dlq_config: DlqConfig | None,
- override_dlq: dict[str, str | list[str]],
+ override_dlq: dict[str, object],
expected_topic: str,
expected_bootstrap_servers: list[str],
+ expected_override_params: dict[str, str],
) -> None:
"""Test that StreamSource.override_config correctly overrides DLQ settings."""
source = StreamSource(
@@ -163,3 +199,4 @@
assert source.dlq_config is not None
assert source.dlq_config.topic == expected_topic
assert source.dlq_config.bootstrap_servers == expected_bootstrap_servers
+ assert source.dlq_config.override_params == expected_override_paramsThis Bugbot Autofix run was free. To enable autofix for future PRs, go to the Cursor dashboard.
| def PythonAdapter(cls, route: Route, delegate_Factory: RustOperatorFactory) -> Self: ... | ||
|
|
||
| class ArroyoConsumer: | ||
| dlq_config: DlqConfig | None |
There was a problem hiding this comment.
I like that this field is explicitly defined here, but it's also obviously not the only field in the class. Would it make sense to explicitly add the other fields here too?
There was a problem hiding this comment.
I think I see why, is it because only dlq_config has the pyo3(get) attribute in the Rust struct?
There was a problem hiding this comment.
Please remove this. pyi files, here, are only for interfaces of objects exposed by rust. If you need to expose dlq_config make it a property like you see in PyMetricConfig above.
george-sentry
left a comment
There was a problem hiding this comment.
Nothing seems suspicious and Filippo's comments are addressed.
There was a problem hiding this comment.
I think the producer configuration is becoming confusing. There are issues with consistency and misalignment in what should be part of the pipeline objects vs override config.
What do you think about taking a detour, sending first a PR to fix the producer config? then this will become easier:
- Fix the creation of
PyKafkaProducerConfigin http://github.com/getsentry/streams/blob/main/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py#L190-L193 (it expects override_params while the config object TypedDict definesadditional_settings. Fix the config types to rely on override_params - Do the same on the producer
- Add the pipeline changes to support the DLQ (leave the rust implementation separate).
| "topic": { | ||
| "type": "string", | ||
| "minLength": 1 | ||
| }, | ||
| "producer_config": { | ||
| "$ref": "#/definitions/DlqProducerConfig" | ||
| } |
There was a problem hiding this comment.
In the other scenarios where we allow to define a topic and override parameters we follow this structure:
streaming_source:
bootstrap_servers: {{ kafka_bootstrap_servers }}
auto_offset_reset: latest
consumer_group: pipeline-sbc-items-log
topic: asdasdasd
override_params:
asdasdasd
Topic, bootstrap_servers and override_params are at the same level. Is there a reason to have the structure you added?
topic: asdasd
producer_config:
bootstrap_servers: asdasdasd
override_config:
asdasd
If not, please follow the same structure of the consumer config (or change that one as well) to keep consistency in the schema.
There was a problem hiding this comment.
topic and override_config in the streaming_source belong to the producer config. If we flatten the structure out, topic sounds like it's now the source topic, not the dlq topic. override_params just sounds vague. This is why I added the producer_config key
There was a problem hiding this comment.
Yes, but here we are talking about the structure of the DlqConfig object.
In your implementation it would look like:
dlq_config:
topic: asdasd
producer_config:
bootstrap_servers: asdasdasd
override_config:
asdasd
why not:
dlq_config:
topic: asdasd
bootstrap_servers: asdasdasd
override_config:
asdasd
| """ | ||
|
|
||
| topic: str | ||
| producer_config: "KafkaProducerConfig" |
There was a problem hiding this comment.
I don't think this is the right structure.
We do not allow sinks or sources to define the bootstrap_servers in code. This is only allowed in the config and we should stay consistent with that:
StreamingSource and StreamingSinks have the following features:
- Do not name topics: just stream name as kafka is not exposed.
- Have a default topic that is equal to the stream name
- regrettably I added consumer group to the StreamingSource. that was a mistake. It hsould have been config only.
- override parameters, consumer group, topic override, bootstrap servers are only in config as they are kafka specific.
Please follow that pattern:
- DlqConfig should only have a
stream_name. Everything else is in config. As you may see the type you are referncing here from the pipeline.py file is inconfig_typeswhich are only supposed to define the structure of the config file. This should raise the concern that these details should not be in the pipeline object class.
| topic = loaded_dlq.get("topic") or (self.dlq_config.topic if self.dlq_config else None) | ||
| # bootstrap_servers and override_params are nested under producer_config | ||
| producer_config = loaded_dlq.get("producer_config", {}) | ||
| servers = producer_config.get("bootstrap_servers") or ( | ||
| self.dlq_config.producer_config.bootstrap_servers if self.dlq_config else None | ||
| ) | ||
| override_params = producer_config.get("override_params") or ( | ||
| self.dlq_config.producer_config.override_params if self.dlq_config else None |
There was a problem hiding this comment.
See my comment below regarding the content of the DlqConfig: https://github.com/getsentry/streams/pull/277/changes#r3051786666.
We should not have attributes like bootstrap_servers or override_params in the pipeline object. They should only defined in config as the StreamSource should be kafka independent.
| def PythonAdapter(cls, route: Route, delegate_Factory: RustOperatorFactory) -> Self: ... | ||
|
|
||
| class ArroyoConsumer: | ||
| dlq_config: DlqConfig | None |
There was a problem hiding this comment.
Please remove this. pyi files, here, are only for interfaces of objects exposed by rust. If you need to expose dlq_config make it a property like you see in PyMetricConfig above.
| let dlq_limit = DlqLimit::default(); | ||
| let max_buffered_messages = None; |
There was a problem hiding this comment.
We need to make these configurable now. It is not a quick operation to expose this in config. We need a PR in streams, release it, upgrade it in the client code and ship the client code. Not something that can be done quickly after an incident.
We already know we need to customize these today.
make it part of StreamSource feat(streams): Add Dead Letter Queue (DLQ) support to StreamSource Adds DLQ configuration support throughout the streaming platform: - Add PyDlqConfig type stub and build_dlq_config() helper - Wire DLQ config from StreamSource to Rust ArroyoConsumer - Support DLQ override from YAML deployment configuration - Add comprehensive test coverage for DLQ functionality Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com> remove one redundant test parameterized tests for creating consumer add unit test for consumer.build_dlq_policy make new function pub add parameterized python test for build_dlq_config update tests standalone function remove dlq enabled field allow overriding dlq config field by field comment remove unused pub new finalize tests simpify fix schema fix test rename DlqConfig to DlqSettings remove DlqSettings remove build_dlq_config finalize tests use Mapping and Sequence for type hints add an error test case add json validation
6b4468a to
480e5ce
Compare
| producer_config=PyKafkaProducerConfig( | ||
| bootstrap_servers=servers, | ||
| override_params=override_params, | ||
| ), |
There was a problem hiding this comment.
You cannot depend on adapter specific classes in the pipeline code.
PyKafkaProducerConfigis a class that is specific of rust arroyo internal implementation.- Everything in the pipeline.py file is the adapter independent code to define the pipeline.
Also we still have the same problem discussed here https://github.com/getsentry/streams/pull/277/changes#r3051786666.
In this version the StreamSource class allows the user to specify the DlqConfig in code, which contains kafka conneciton parameters. These should not be manageable in the pipeline definition. they are reserved to the config.
You may see in the rust arroyo implementation that all these config only parameters are overridden inside the adapter code and not in the pipeline code (https://github.com/getsentry/streams/blob/main/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py#L167).
While it is not a great design to be able to override things in two places, preserving the separation between what can be configured in code and what cannot is more important
| "topic": { | ||
| "type": "string", | ||
| "minLength": 1 | ||
| }, | ||
| "producer_config": { | ||
| "$ref": "#/definitions/DlqProducerConfig" | ||
| } |
There was a problem hiding this comment.
Yes, but here we are talking about the structure of the DlqConfig object.
In your implementation it would look like:
dlq_config:
topic: asdasd
producer_config:
bootstrap_servers: asdasdasd
override_config:
asdasd
why not:
dlq_config:
topic: asdasd
bootstrap_servers: asdasdasd
override_config:
asdasd
| ) | ||
| from sentry_streams.pipeline.rust_function_protocol import InternalRustFunction | ||
| from sentry_streams.pipeline.window import MeasurementUnit, TumblingWindow, Window | ||
| from sentry_streams.rust_streams import DlqConfig, PyKafkaProducerConfig |
There was a problem hiding this comment.
You cannot import the internals of an adapter from the adapter agnostic code (pipeline.py). I think DlqConfig is defined in the wrong place.
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.
Reviewed by Cursor Bugbot for commit fb6b40d. Configure here.


ticket
PR1 (this):
example config file schema:
And the corresponding pipeline code would just be:
PR2: default config
PR3: wire everything together, end to end testing
PR4: enable by default, deploy some default topics