Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
from sentry_streams.pipeline.window import MeasurementUnit
from sentry_streams.rust_streams import (
ArroyoConsumer,
DlqConfig,
InitialOffset,
PyKafkaConsumerConfig,
PyKafkaProducerConfig,
Expand Down Expand Up @@ -195,6 +196,29 @@ def build_kafka_producer_config(
)


def build_dlq_config(
dlq_stream_name: str,
step_config: Mapping[str, Any],
) -> DlqConfig:
"""
Build the DLQ configuration from deployment config.
"""
loaded_dlq = step_config["dlq"]
topic = loaded_dlq.get("topic", dlq_stream_name)
bootstrap_servers = loaded_dlq.get("bootstrap_servers")

if not bootstrap_servers:
raise ValueError("DLQ config requires 'bootstrap_servers' in deployment configuration")

return DlqConfig(
topic=topic,
producer_config=PyKafkaProducerConfig(
bootstrap_servers=bootstrap_servers,
override_params=loaded_dlq.get("override_params"),
),
)


def finalize_chain(
chains: TransformChains,
route: Route,
Expand Down Expand Up @@ -292,6 +316,10 @@ def source(self, step: Source[Any]) -> Route:
step.override_config(step_config)
step.validate()

dlq_config = None
if step.dlq_stream_name is not None:
dlq_config = build_dlq_config(step.dlq_stream_name, step_config)

assert isinstance(self.__write_healthcheck, bool)
self.__consumers[source_name] = ArroyoConsumer(
source=source_name,
Expand All @@ -302,6 +330,7 @@ def source(self, step: Source[Any]) -> Route:
schema=schema_name,
metric_config=build_py_metrics_config(self.__metrics_config),
write_healthcheck=self.__write_healthcheck,
dlq_config=dlq_config,
)
return Route(source_name, [])

Expand Down
35 changes: 34 additions & 1 deletion sentry_streams/sentry_streams/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,15 @@
"properties": {
"steps_config": {
"type": "object",
"title": "steps_config"
"title": "steps_config",
"additionalProperties": {
"type": "object",
"properties": {
"dlq": {
"$ref": "#/definitions/DlqConfig"
}
}
}
}
}
}
Expand Down Expand Up @@ -144,6 +152,31 @@
"required": [
"dsn"
]
},
"DlqConfig": {
"type": "object",
"properties": {
"topic": {
"type": "string",
"minLength": 1
},
"bootstrap_servers": {
"type": "array",
"items": {
"type": "string"
},
"minItems": 1
},
"override_params": {
"type": "object",
"additionalProperties": {
"type": "string"
}
}
},
"required": [
"bootstrap_servers"
]
}
}
}
11 changes: 11 additions & 0 deletions sentry_streams/sentry_streams/config_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,17 @@ class StepConfig(TypedDict):
starts_segment: Optional[bool]


class DlqConfig(TypedDict, total=False):
"""
Dead Letter Queue configuration for a StreamSource.
All fields are optional to allow for default behavior.
"""

topic: str
bootstrap_servers: Sequence[str]
override_params: Mapping[str, str]

Comment thread
sentry[bot] marked this conversation as resolved.

class KafkaConsumerConfig(TypedDict, StepConfig):
bootstrap_servers: Sequence[str]
auto_offset_reset: str
Expand Down
1 change: 1 addition & 0 deletions sentry_streams/sentry_streams/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ class StreamSource(Source[bytes]):
stream_name: str
header_filter: Optional[Tuple[str, bytes]] = None
consumer_group: Optional[str] = None
dlq_stream_name: Optional[str] = None
step_type: StepType = StepType.SOURCE

def register(self, ctx: Pipeline[bytes], previous: Step) -> None:
Expand Down
14 changes: 14 additions & 0 deletions sentry_streams/sentry_streams/rust_streams.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,17 @@ class PyKafkaProducerConfig:
@property
def override_params(self) -> Mapping[str, str] | None: ...

class DlqConfig:
Comment thread
george-sentry marked this conversation as resolved.
def __init__(
self,
topic: str,
producer_config: PyKafkaProducerConfig,
) -> None: ...
Comment thread
cursor[bot] marked this conversation as resolved.
@property
def topic(self) -> str: ...
@property
def producer_config(self) -> PyKafkaProducerConfig: ...

class PyMetricConfig:
def __init__(
self,
Expand Down Expand Up @@ -123,10 +134,13 @@ class ArroyoConsumer:
schema: str | None,
metric_config: PyMetricConfig | None = None,
write_healthcheck: bool = False,
dlq_config: DlqConfig | None = None,
) -> None: ...
def add_step(self, step: RuntimeOperator) -> None: ...
def run(self) -> None: ...
def shutdown(self) -> None: ...
@property
def dlq_config(self) -> DlqConfig | None: ...

class PyAnyMessage:
def __init__(
Expand Down
128 changes: 125 additions & 3 deletions sentry_streams/src/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
//! The pipeline is built by adding RuntimeOperators to the consumer.

use crate::commit_policy::WatermarkCommitOffsets;
use crate::kafka_config::PyKafkaConsumerConfig;
use crate::kafka_config::{PyKafkaConsumerConfig, PyKafkaProducerConfig};
use crate::messages::{into_pyraw, PyStreamingMessage, RawMessage, RoutedValuePayload};
use crate::metrics::configure_metrics;
use crate::metrics_config::PyMetricConfig;
Expand All @@ -18,7 +18,9 @@ use crate::utils::traced_with_gil;
use crate::watermark::WatermarkEmitter;
use pyo3::prelude::*;
use rdkafka::message::{Header, Headers, OwnedHeaders};
use sentry_arroyo::backends::kafka::producer::KafkaProducer;
use sentry_arroyo::backends::kafka::types::KafkaPayload;
use sentry_arroyo::processing::dlq::{DlqLimit, DlqPolicy, KafkaDlqProducer};
use sentry_arroyo::processing::strategies::healthcheck::HealthCheck;
use sentry_arroyo::processing::strategies::noop::Noop;
use sentry_arroyo::processing::strategies::run_task::RunTask;
Expand All @@ -34,6 +36,31 @@ use std::sync::Arc;
/// Matches Arroyo docs for Kubernetes liveness probes.
const HEALTHCHECK_PATH: &str = "/tmp/health.txt";

/// Configuration for Dead Letter Queue (DLQ).
/// When provided, invalid messages will be sent to the DLQ topic.
#[pyclass]
#[derive(Debug, Clone)]
pub struct DlqConfig {
/// The Kafka topic name to send invalid messages to
#[pyo3(get)]
pub topic: String,

/// The Kafka producer configuration for the DLQ
#[pyo3(get)]
pub producer_config: PyKafkaProducerConfig,
}

#[pymethods]
impl DlqConfig {
#[new]
fn new(topic: String, producer_config: PyKafkaProducerConfig) -> Self {
DlqConfig {
topic,
producer_config,
}
}
}
Comment thread
cursor[bot] marked this conversation as resolved.

/// The class that represent the consumer.
/// This class is exposed to python and it is the main entry point
/// used by the Python adapter to build a pipeline and run it.
Expand Down Expand Up @@ -69,19 +96,26 @@ pub struct ArroyoConsumer {

/// When true, wrap the strategy chain with HealthCheck to touch a file on poll for liveness.
write_healthcheck: bool,

/// DLQ (Dead Letter Queue) configuration.
/// If provided, invalid messages will be sent to the DLQ topic.
/// Otherwise, invalid messages will cause the consumer to stop processing.
#[pyo3(get)]
dlq_config: Option<DlqConfig>,
}

#[pymethods]
impl ArroyoConsumer {
#[new]
#[pyo3(signature = (source, kafka_config, topic, schema, metric_config=None, write_healthcheck=false))]
#[pyo3(signature = (source, kafka_config, topic, schema, metric_config=None, write_healthcheck=false, dlq_config=None))]
fn new(
source: String,
kafka_config: PyKafkaConsumerConfig,
topic: String,
schema: Option<String>,
metric_config: Option<PyMetricConfig>,
write_healthcheck: bool,
dlq_config: Option<DlqConfig>,
) -> Self {
ArroyoConsumer {
consumer_config: kafka_config,
Expand All @@ -93,6 +127,7 @@ impl ArroyoConsumer {
concurrency_config: Arc::new(ConcurrencyConfig::new(1)),
metric_config,
write_healthcheck,
dlq_config,
}
}

Expand Down Expand Up @@ -120,7 +155,12 @@ impl ArroyoConsumer {
self.write_healthcheck,
);
let config = self.consumer_config.clone().into();
let processor = StreamProcessor::with_kafka(config, factory, Topic::new(&self.topic), None);

// Build DLQ policy if configured
let dlq_policy = build_dlq_policy(&self.dlq_config, self.concurrency_config.handle());

let processor =
StreamProcessor::with_kafka(config, factory, Topic::new(&self.topic), dlq_policy);
self.handle = Some(processor.get_handle());

let mut handle = processor.get_handle();
Expand All @@ -143,6 +183,40 @@ impl ArroyoConsumer {
}
}

/// Builds the DLQ policy if dlq_config is provided.
/// Returns None if DLQ is not configured.
pub fn build_dlq_policy(
dlq_config: &Option<DlqConfig>,
handle: tokio::runtime::Handle,
) -> Option<DlqPolicy<KafkaPayload>> {
match dlq_config {
Some(dlq_config) => {
tracing::info!("Configuring DLQ with topic: {}", dlq_config.topic);

// Create Kafka producer for DLQ
let producer_config = dlq_config.producer_config.clone().into();
let kafka_producer = KafkaProducer::new(producer_config);
let dlq_producer = KafkaDlqProducer::new(kafka_producer, Topic::new(&dlq_config.topic));

// Use default DLQ limits (no limits) and no max buffered messages
// These can be made configurable in a future PR if needed
let dlq_limit = DlqLimit::default();
let max_buffered_messages = None;
Comment on lines +203 to +204
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to make these configurable now. It is not a quick operation to expose this in config. We need a PR in streams, release it, upgrade it in the client code and ship the client code. Not something that can be done quickly after an incident.
We already know we need to customize these today.


Some(DlqPolicy::new(
handle,
Box::new(dlq_producer),
dlq_limit,
max_buffered_messages,
))
}
None => {
tracing::info!("DLQ not configured, invalid messages will cause processing to stop");
None
}
Comment thread
victoria-yining-huang marked this conversation as resolved.
}
}

/// Converts a Message<KafkaPayload> to a Message<RoutedValue>.
///
/// The messages we send around between steps in the pipeline contain
Expand Down Expand Up @@ -429,4 +503,52 @@ mod tests {
let _ = std::fs::remove_file(healthcheck_path);
})
}

#[test]
fn test_build_dlq_policy_with_various_configs() {
// Define test cases: (test_name, dlq_bootstrap_servers, expected_some)
let test_cases = vec![
("without_dlq_config", None, false),
(
"with_dlq_config_single_broker",
Some(vec!["localhost:9092".to_string()]),
true,
),
(
"with_dlq_config_multiple_brokers",
Some(vec![
"broker1:9092".to_string(),
"broker2:9092".to_string(),
"broker3:9092".to_string(),
]),
true,
),
];

// Create a tokio runtime to get a handle for testing
let runtime = tokio::runtime::Runtime::new().unwrap();
let handle = runtime.handle().clone();

for (test_name, dlq_bootstrap_servers, expected_some) in test_cases {
// Create DLQ config if bootstrap servers are provided
let dlq_config = dlq_bootstrap_servers.map(|servers| {
let producer_config = PyKafkaProducerConfig::new(servers, None);
DlqConfig::new("test-dlq".to_string(), producer_config)
});

// Build DLQ policy and assert
let dlq_policy: Option<DlqPolicy<KafkaPayload>> =
build_dlq_policy(&dlq_config, handle.clone());
assert_eq!(
dlq_policy.is_some(),
expected_some,
"Test case '{}' failed: expected is_some() to be {}",
test_name,
expected_some
);
}
}

// Note: Asserting on inside properties of dlq_policy is tested through Python integration tests
// in tests/test_dlq.py, as the dlq_policy is an external crate and inner properties are private.
}
2 changes: 1 addition & 1 deletion sentry_streams/src/kafka_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ pub struct PyKafkaProducerConfig {
#[pymethods]
impl PyKafkaProducerConfig {
#[new]
fn new(
pub fn new(
bootstrap_servers: Vec<String>,
override_params: Option<HashMap<String, String>>,
) -> Self {
Expand Down
1 change: 1 addition & 0 deletions sentry_streams/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ fn rust_streams(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<kafka_config::PyKafkaProducerConfig>()?;
m.add_class::<kafka_config::InitialOffset>()?;
m.add_class::<consumer::ArroyoConsumer>()?;
m.add_class::<consumer::DlqConfig>()?;
m.add_class::<metrics_config::PyMetricConfig>()?;
m.add_class::<messages::PyAnyMessage>()?;
m.add_class::<messages::RawMessage>()?;
Expand Down
Loading
Loading