Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 5 additions & 15 deletions sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py
Original file line number Diff line number Diff line change
Expand Up @@ -457,28 +457,17 @@ def filter(self, step: Filter[Any], stream: Route) -> Route:
self.__consumers[stream.source].add_step(
RuntimeOperator.HeaderFilter(
route=route,
step_name=step.name,
header_name=step.header_name,
expected_value=step.value,
)
)
return stream

elif isinstance(step, PredicateFilter):

def filter_msg(msg: Message[Any]) -> bool:
input_metrics(step.name)
has_error = None
start_time = time.time()
try:
result = step.resolved_function(msg)
return result
except Exception as e:
has_error = str(e.__class__.__name__)
raise e
finally:
output_metrics(step.name, has_error, start_time)

self.__consumers[stream.source].add_step(RuntimeOperator.Filter(route, filter_msg))
self.__consumers[stream.source].add_step(
RuntimeOperator.Filter(route, lambda msg: step.resolved_function(msg), step.name)
)
return stream

else:
Expand Down Expand Up @@ -512,6 +501,7 @@ def reduce(
self.__consumers[stream.source].add_step(
RuntimeOperator.Batch(
route=route,
step_name=step.name,
max_batch_size=step.batch_size,
max_batch_time_ms=max_batch_time_ms,
)
Expand Down
16 changes: 14 additions & 2 deletions sentry_streams/sentry_streams/rust_streams.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,20 @@ class RuntimeOperator:
@classmethod
def Map(cls, route: Route, function: Callable[[Message[Any]], Any]) -> Self: ...
@classmethod
def Filter(cls, route: Route, function: Callable[[Message[Any]], bool]) -> Self: ...
def Filter(
cls,
route: Route,
function: Callable[[Message[Any]], bool],
step_name: str,
) -> Self: ...
@classmethod
def HeaderFilter(cls, route: Route, header_name: str, expected_value: int) -> Self: ...
def HeaderFilter(
cls,
route: Route,
step_name: str,
header_name: str,
expected_value: int,
) -> Self: ...
@classmethod
def StreamSink(
cls, route: Route, topic_name: str, kafka_config: PyKafkaProducerConfig
Expand Down Expand Up @@ -126,6 +137,7 @@ class RuntimeOperator:
def Batch(
cls,
route: Route,
step_name: str,
max_batch_size: int | None = None,
max_batch_time_ms: float | None = None,
) -> Self: ...
Expand Down
19 changes: 17 additions & 2 deletions sentry_streams/src/batch_step.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
//!
//! The GIL is taken only to build the list on flush (after [`Message::into_payload`] in submit).
use crate::messages::{into_pyany, PyAnyMessage, PyStreamingMessage, RoutedValuePayload};
use crate::pipeline_stats::get_stats;
use crate::routes::{Route, RoutedValue};
use crate::time_helpers::current_epoch;
use crate::utils::traced_with_gil;
Expand All @@ -18,7 +19,7 @@ use sentry_arroyo::processing::strategies::{
use sentry_arroyo::types::{Message, Partition};
use sentry_arroyo::utils::timing::Deadline;
use std::collections::{BTreeMap, VecDeque};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};

fn first_element_schema(py: Python<'_>, first: &PyStreamingMessage) -> Option<String> {
match first {
Expand Down Expand Up @@ -165,6 +166,7 @@ pub struct BatchStep {
next_step: Box<dyn ProcessingStrategy<RoutedValue>>,

route: Route,
step_name: String,
max_batch_size: Option<usize>,
max_batch_time: Option<Duration>,
/// `None` until the first streaming message in a window.
Expand Down Expand Up @@ -193,11 +195,13 @@ impl BatchStep {
route: Route,
max_batch_size: Option<usize>,
max_batch_time: Option<Duration>,
step_name: String,
next_step: Box<dyn ProcessingStrategy<RoutedValue>>,
) -> Self {
Self {
next_step,
route,
step_name,
max_batch_size,
max_batch_time,
batch: None,
Expand Down Expand Up @@ -254,7 +258,9 @@ impl BatchStep {
// We create a synthetic watermark to avoid waiting for the next batch to complete before
// allowing the consumer to commit.
let committable_for_synthetic = b.current_offsets_snapshot();
let flush_start = Instant::now();
let batch_msg = b.flush()?;
get_stats().step_timing(&self.step_name, flush_start.elapsed().as_secs_f64());
self.batch = None;
let wm_after_batch: Vec<_> = std::mem::take(&mut self.watermark_buffer);

Expand Down Expand Up @@ -302,12 +308,14 @@ pub fn build_batch_step(
route: &Route,
max_batch_size: Option<usize>,
max_batch_time: Option<Duration>,
step_name: String,
next: Box<dyn ProcessingStrategy<RoutedValue>>,
) -> Box<dyn ProcessingStrategy<RoutedValue>> {
Box::new(BatchStep::new(
route.clone(),
max_batch_size,
max_batch_time,
step_name,
next,
))
}
Expand Down Expand Up @@ -369,6 +377,7 @@ impl ProcessingStrategy<RoutedValue> for BatchStep {
.expect("open batch")
.append(committable, pysm);
}
get_stats().step_exec(&self.step_name);
Ok(())
}
}
Expand Down Expand Up @@ -604,7 +613,13 @@ mod tests {
let sub = Arc::new(Mutex::new(Vec::new()));
let wms = Arc::new(Mutex::new(Vec::new()));
let next = FakeStrategy::new(sub.clone(), wms.clone(), false);
let step = BatchStep::new(route, max_n, max_t, Box::new(next));
let step = BatchStep::new(
route,
max_n,
max_t,
"test_batch".to_string(),
Box::new(next),
);
(step, sub, wms)
}

Expand Down
28 changes: 24 additions & 4 deletions sentry_streams/src/filter_step.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
use crate::callers::{try_apply_py, ApplyError};
use crate::messages::RoutedValuePayload;
use crate::pipeline_stats::get_stats;
use crate::routes::{Route, RoutedValue};
use crate::utils::traced_with_gil;
use pyo3::{Py, PyAny};
use sentry_arroyo::processing::strategies::{
CommitRequest, ProcessingStrategy, StrategyError, SubmitError,
};
use sentry_arroyo::types::{InnerMessage, Message};
use std::time::Duration;
use std::time::{Duration, Instant};

pub struct Filter {
pub callable: Py<PyAny>,
pub next_step: Box<dyn ProcessingStrategy<RoutedValue>>,
pub route: Route,
pub step_name: String,
}

impl Filter {
Expand All @@ -26,11 +28,13 @@ impl Filter {
callable: Py<PyAny>,
next_step: Box<dyn ProcessingStrategy<RoutedValue>>,
route: Route,
step_name: String,
) -> Self {
Self {
callable,
next_step,
route,
step_name,
}
}
}
Expand All @@ -52,6 +56,9 @@ impl ProcessingStrategy<RoutedValue> for Filter {
unreachable!("Watermark message trying to be passed to filter function.")
};

let stats = get_stats();
stats.step_exec(&self.step_name);
let start = Instant::now();
let res = traced_with_gil!(|py| {
try_apply_py(
py,
Expand All @@ -60,13 +67,24 @@ impl ProcessingStrategy<RoutedValue> for Filter {
)
.and_then(|py_res| py_res.is_truthy(py).map_err(|_| ApplyError::ApplyFailed))
});
let elapsed = start.elapsed().as_secs_f64();

match (res, &message.inner_message) {
(Ok(true), _) => self.next_step.submit(message),
(Ok(false), _) => Ok(()),
(Ok(true), _) => {
stats.step_timing(&self.step_name, elapsed);
self.next_step.submit(message)
}
(Ok(false), _) => {
stats.step_timing(&self.step_name, elapsed);
Ok(())
}
(Err(ApplyError::ApplyFailed), _) => panic!("Python filter function raised exception that is not sentry_streams.pipeline.exception.InvalidMessageError"),
(Err(ApplyError::InvalidMessage), InnerMessage::AnyMessage(..)) => panic!("Got exception while processing AnyMessage, Arroyo cannot handle error on AnyMessage"),
(Err(ApplyError::InvalidMessage), InnerMessage::BrokerMessage(broker_message)) => Err(SubmitError::InvalidMessage(broker_message.into())),
(Err(ApplyError::InvalidMessage), InnerMessage::BrokerMessage(broker_message)) => {
stats.step_error(&self.step_name);
stats.step_timing(&self.step_name, elapsed);
Err(SubmitError::InvalidMessage(broker_message.into()))
}
}
}

Expand Down Expand Up @@ -118,6 +136,7 @@ mod tests {
build_filter(
&Route::new("source1".to_string(), vec!["waypoint1".to_string()]),
callable,
"test_step".to_string(),
Box::new(next_step),
)
})
Expand Down Expand Up @@ -222,6 +241,7 @@ mod tests {
let mut strategy = build_filter(
&Route::new("source1".to_string(), vec!["waypoint1".to_string()]),
callable,
"test_step".to_string(),
Box::new(next_step),
);

Expand Down
26 changes: 22 additions & 4 deletions sentry_streams/src/header_filter_step.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! Filter messages by integer equality on a named header, without calling Python.
use crate::messages::PyStreamingMessage;
use crate::messages::RoutedValuePayload;
use crate::pipeline_stats::get_stats;
use crate::routes::{Route, RoutedValue};
use crate::utils::traced_with_gil;
use sentry_arroyo::processing::strategies::{
Expand Down Expand Up @@ -59,6 +60,7 @@ fn streaming_message_headers(msg: &PyStreamingMessage) -> Vec<(String, Vec<u8>)>
pub struct HeaderIntEqualityFilter {
header_name: String,
expected: i64,
step_name: String,
next_step: Box<dyn ProcessingStrategy<RoutedValue>>,
route: Route,
}
Expand All @@ -67,12 +69,14 @@ impl HeaderIntEqualityFilter {
pub fn new(
header_name: String,
expected: i64,
step_name: String,
next_step: Box<dyn ProcessingStrategy<RoutedValue>>,
route: Route,
) -> Self {
Self {
header_name,
expected,
step_name,
next_step,
route,
}
Expand All @@ -85,22 +89,28 @@ impl ProcessingStrategy<RoutedValue> for HeaderIntEqualityFilter {
}

fn submit(&mut self, message: Message<RoutedValue>) -> Result<(), SubmitError<RoutedValue>> {
if self.route != message.payload().route {
if self.route != message.payload().route || message.payload().payload.is_watermark_msg() {
Copy link
Copy Markdown
Collaborator Author

@fpacifici fpacifici May 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved here in order not to count watermarks in metrics

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add a tag or use a different metric for watermarks? I would think we'd still like metrics on watermark messages, but tracked separately from main messages.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a good point. More importantly we should emit metrics when they are emitted and when they are committed.

Let me think over it.
I will send a PR for those that covers the overall pipeline
https://linear.app/getsentry/issue/STREAM-921/add-metrics-to-track-watermarks-per-step

return self.next_step.submit(message);
}

let RoutedValuePayload::PyStreamingMessage(ref py_streaming_msg) =
message.payload().payload
else {
// Watermarks and any other non-streaming payload: pass through (same as build_map in transformer.rs).
return self.next_step.submit(message);
};

let stats = get_stats();
stats.step_exec(&self.step_name);
let headers = streaming_message_headers(py_streaming_msg);
match header_int_equality_decision(&headers, &self.header_name, self.expected) {
let decision = header_int_equality_decision(&headers, &self.header_name, self.expected);

match decision {
Ok(true) => self.next_step.submit(message),
Ok(false) => Ok(()),
Err(()) => Err(invalid_message_submit_error(&message)),
Err(()) => {
stats.step_error(&self.step_name);
Err(invalid_message_submit_error(&message))
}
}
}

Expand All @@ -117,11 +127,13 @@ pub fn build_header_int_filter(
route: &Route,
header_name: String,
expected: i64,
step_name: String,
next: Box<dyn ProcessingStrategy<RoutedValue>>,
) -> Box<dyn ProcessingStrategy<RoutedValue>> {
Box::new(HeaderIntEqualityFilter::new(
header_name,
expected,
step_name,
next,
route.clone(),
))
Expand Down Expand Up @@ -217,6 +229,7 @@ mod tests {
&Route::new("source1".to_string(), vec!["waypoint1".to_string()]),
"pid".to_string(),
42,
"test_step".to_string(),
Box::new(next_step),
);

Expand Down Expand Up @@ -272,6 +285,7 @@ mod tests {
&Route::new("source1".to_string(), vec!["waypoint1".to_string()]),
"pid".to_string(),
42,
"test_step".to_string(),
Box::new(next_step),
);

Expand Down Expand Up @@ -303,6 +317,7 @@ mod tests {
&Route::new("source1".to_string(), vec!["waypoint1".to_string()]),
"pid".to_string(),
42,
"test_step".to_string(),
Box::new(FakeStrategy::new(Arc::default(), Arc::default(), false)),
);

Expand Down Expand Up @@ -339,6 +354,7 @@ mod tests {
&Route::new("s".to_string(), vec!["w".to_string()]),
"k".to_string(),
-1,
"test_step".to_string(),
Box::new(next_step),
);

Expand Down Expand Up @@ -368,6 +384,7 @@ mod tests {
&Route::new("source".to_string(), vec![]),
"h".to_string(),
1,
"test_step".to_string(),
Box::new(FakeStrategy::new(Arc::default(), submitted_wm_clone, false)),
);

Expand All @@ -391,6 +408,7 @@ mod tests {
&Route::new("source1".to_string(), vec!["waypoint1".to_string()]),
"x".to_string(),
1,
"test_step".to_string(),
Box::new(FakeStrategy::new(submitted, Arc::default(), false)),
);

Expand Down
1 change: 1 addition & 0 deletions sentry_streams/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ mod metrics;
mod metrics_config;
mod mocks;
mod operators;
mod pipeline_stats;
mod python_operator;
mod routers;
mod routes;
Expand Down
Loading
Loading