From 68c57608880183da1a009701cc2bdeffe4a7e753 Mon Sep 17 00:00:00 2001 From: Filippo Pacifici Date: Mon, 27 Apr 2026 16:52:53 -0700 Subject: [PATCH 01/10] feat(sentry-streams): Add native Rust batch step for PyAnyMessage Implement Batch (row accumulation, flush-time list build under GIL) and BatchStep (ProcessingStrategy: watermarks, backpressure, route matching). Pipeline Batch operator uses the Rust path instead of only the Python reduce delegate. Only PyAnyMessage streaming rows are supported; RawMessage is rejected with InvalidMessage for broker-originated inputs. Add split unit tests: batch_tests exercise Batch in isolation; step_tests focus on strategy behavior and forwarding. Made-with: Cursor --- .../adapters/arroyo/rust_arroyo.py | 18 + .../sentry_streams/rust_streams.pyi | 7 + sentry_streams/src/batch_step.rs | 665 ++++++++++++++++++ sentry_streams/src/lib.rs | 1 + sentry_streams/src/operators.rs | 21 + sentry_streams/uv.lock | 4 +- 6 files changed, 714 insertions(+), 2 deletions(-) create mode 100644 sentry_streams/src/batch_step.rs diff --git a/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py b/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py index 7d894730..7f17ca40 100644 --- a/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py +++ b/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py @@ -45,6 +45,7 @@ ) from sentry_streams.pipeline.message import Message from sentry_streams.pipeline.pipeline import ( + Batch, Broadcast, ComplexStep, DevNullSink, @@ -471,6 +472,23 @@ def reduce( loaded_config: Mapping[str, Any] = self.steps_config.get(name, {}) step.override_config(loaded_config) step.validate() + + if isinstance(step, Batch): + max_batch_time_ms: float | None + if step.batch_timedelta is not None: + max_batch_time_ms = step.batch_timedelta.total_seconds() * 1000.0 + else: + max_batch_time_ms = None + logger.info(f"Adding batch (native): {step.name} to pipeline") + self.__consumers[stream.source].add_step( + RuntimeOperator.Batch( + route=route, + max_batch_size=step.batch_size, + max_batch_time_ms=max_batch_time_ms, + ) + ) + return stream + step = MetricsReportingReduce(step, name) logger.info(f"Adding reduce: {step.name} to pipeline") diff --git a/sentry_streams/sentry_streams/rust_streams.pyi b/sentry_streams/sentry_streams/rust_streams.pyi index ab619604..55823ad3 100644 --- a/sentry_streams/sentry_streams/rust_streams.pyi +++ b/sentry_streams/sentry_streams/rust_streams.pyi @@ -112,6 +112,13 @@ class RuntimeOperator: @classmethod def Broadcast(cls, route: Route, downstream_routes: Sequence[str]) -> Self: ... @classmethod + def Batch( + cls, + route: Route, + max_batch_size: int | None = None, + max_batch_time_ms: float | None = None, + ) -> Self: ... + @classmethod def PythonAdapter(cls, route: Route, delegate_Factory: RustOperatorFactory) -> Self: ... class ArroyoConsumer: diff --git a/sentry_streams/src/batch_step.rs b/sentry_streams/src/batch_step.rs new file mode 100644 index 00000000..5d8b8aaa --- /dev/null +++ b/sentry_streams/src/batch_step.rs @@ -0,0 +1,665 @@ +//! Batches `PyAnyMessage` rows on a route, then [`BatchStep`] forwards one `PyAnyMessage` with a +//! list payload and manages watermarks and backpressure to the next Arroyo strategy. +//! +//! Only `PyAnyMessage` streaming input is supported; `RawMessage` is rejected with +//! `SubmitError::InvalidMessage` when the input is a broker message. +//! +//! Python objects require the GIL to read. We keep `Py` per row and take the GIL +//! only when materializing the batched message on flush, after [`Message::into_payload`]. +use crate::messages::{into_pyany, PyAnyMessage, PyStreamingMessage, RoutedValuePayload}; +use crate::routes::{Route, RoutedValue}; +use crate::time_helpers::current_epoch; +use crate::utils::traced_with_gil; +use pyo3::prelude::*; +use pyo3::types::PyList; +use sentry_arroyo::processing::strategies::{ + merge_commit_request, CommitRequest, MessageRejected, ProcessingStrategy, StrategyError, + SubmitError, +}; +use sentry_arroyo::types::{InnerMessage, Message, Partition}; +use sentry_arroyo::utils::timing::Deadline; +use std::collections::{BTreeMap, VecDeque}; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; + +fn invalid_message_submit_error(message: &Message) -> SubmitError { + match &message.inner_message { + InnerMessage::BrokerMessage(broker_message) => { + SubmitError::InvalidMessage(broker_message.into()) + } + InnerMessage::AnyMessage(..) => { + panic!("BatchStep: invalid message on AnyMessage; Arroyo cannot surface InvalidMessage") + } + } +} + +// --- Batch: accumulate `Py`; GIL on flush when building the list payload --- + +/// Count- and/or time-based window of `PyAnyMessage` rows for one route. The first row is added +/// via [`Batch::from_first_row`]; more rows with [`Batch::append_row`]. +pub(crate) struct Batch { + route: Route, + max_batch_size: Option, + max_batch_time: Option, + rows: Vec>, + batch_deadline: Option, + batch_offsets: BTreeMap, +} + +impl Batch { + pub fn from_first_row( + route: Route, + max_batch_size: Option, + max_batch_time: Option, + first: Py, + initial_committable: BTreeMap, + ) -> Self { + let deadline = max_batch_time.map(Deadline::new); + Self { + route, + max_batch_size, + max_batch_time, + rows: vec![first], + batch_deadline: deadline, + batch_offsets: initial_committable, + } + } + + pub fn append_row(&mut self, row: Py) { + self.rows.push(row); + } + + pub fn is_empty(&self) -> bool { + self.rows.is_empty() + } + + fn len(&self) -> usize { + self.rows.len() + } + + pub fn apply_committable_from(&mut self, message: &Message) { + for (p, o) in message.committable() { + self.batch_offsets + .entry(p) + .and_modify(|e| *e = (*e).max(o)) + .or_insert(o); + } + } + + /// Whether the current window is complete by size and/or time. + pub fn should_flush(&self) -> bool { + if self.is_empty() { + return false; + } + if self.max_batch_size.is_some_and(|m| self.len() >= m) { + return true; + } + if let (Some(_), Some(d)) = (&self.max_batch_time, &self.batch_deadline) { + if d.has_elapsed() { + return true; + } + } + false + } + + pub fn current_offsets_snapshot(&self) -> BTreeMap { + self.batch_offsets.clone() + } + + /// GIL: build list payload from stored `Py` rows, wrap in a batched `Message`. + pub fn build_stacked_message(&self) -> Result, StrategyError> { + if self.rows.is_empty() { + return Err(StrategyError::Other("Batch: empty window".into())); + } + let route = self.route.clone(); + let committable = self.batch_offsets.clone(); + let ts = SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|d| d.as_secs_f64()) + .unwrap_or(0.0); + + let content = traced_with_gil!(|py| -> PyResult> { + let first_schema = self.rows[0].bind(py).borrow().schema.clone(); + let py_items: Vec> = self + .rows + .iter() + .map(|pm| pm.bind(py).borrow().payload.clone_ref(py)) + .collect(); + let list = PyList::new(py, &py_items)?.unbind(); + let inner = PyAnyMessage { + payload: list.into_any(), + headers: vec![], + timestamp: ts, + schema: first_schema, + }; + into_pyany(py, inner) + }) + .map_err(|e| StrategyError::Other(Box::new(e)))?; + + let py_streaming = PyStreamingMessage::PyAnyMessage { content }; + let rv = RoutedValue { + route, + payload: RoutedValuePayload::PyStreamingMessage(py_streaming), + }; + Ok(Message::new_any_message(rv, committable)) + } +} + +// --- BatchStep: Arroyo strategy — watermarks, backpressure, owns `Option` --- + +pub struct BatchStep { + next_step: Box>, + + route: Route, + max_batch_size: Option, + max_batch_time: Option, + /// `None` until the first `PyAnyMessage` in a window; then the open batch. + batch: Option, + /// Watermarks received while the current batch window is open; taken when emitting after batch. + watermark_buffer: Vec>, + + message_carried_over: Option>, + pending_downstream: VecDeque>, + commit_request_carried_over: Option, +} + +impl BatchStep { + pub fn new( + route: Route, + max_batch_size: Option, + max_batch_time: Option, + next_step: Box>, + ) -> Self { + Self { + next_step, + route, + max_batch_size, + max_batch_time, + batch: None, + watermark_buffer: Vec::new(), + message_carried_over: None, + pending_downstream: VecDeque::new(), + commit_request_carried_over: None, + } + } + + fn is_quiesced(&self) -> bool { + self.message_carried_over.is_none() + && self.batch.as_ref().map_or(true, |b| b.is_empty()) + && self.pending_downstream.is_empty() + && self.watermark_buffer.is_empty() + } + + fn submit_with_poll(&mut self, msg: Message) -> Result { + let c = self.next_step.poll()?; + self.commit_request_carried_over = + merge_commit_request(self.commit_request_carried_over.take(), c); + match self.next_step.submit(msg) { + Ok(()) => Ok(true), + Err(SubmitError::MessageRejected(MessageRejected { message })) => { + self.pending_downstream.push_front(message); + Ok(false) + } + Err(SubmitError::InvalidMessage(e)) => Err(e.into()), + } + } + + fn drain_pending_downstream(&mut self) -> Result<(), StrategyError> { + while let Some(msg) = self.pending_downstream.pop_front() { + if !self.submit_with_poll(msg)? { + break; + } + } + Ok(()) + } + + fn retry_carried_batch(&mut self) -> Result<(), StrategyError> { + if let Some(msg) = self.message_carried_over.take() { + let c = self.next_step.poll()?; + self.commit_request_carried_over = + merge_commit_request(self.commit_request_carried_over.take(), c); + match self.next_step.submit(msg) { + Ok(()) => {} + Err(SubmitError::MessageRejected(MessageRejected { message })) => { + self.message_carried_over = Some(message); + } + Err(SubmitError::InvalidMessage(e)) => return Err(e.into()), + } + } + Ok(()) + } + + fn try_emit_batch(&mut self, force: bool) -> Result<(), StrategyError> { + if self.message_carried_over.is_some() { + return Ok(()); + } + if self.batch.as_ref().map_or(true, |b| b.is_empty()) { + return Ok(()); + } + if !force && !self.batch.as_ref().map_or(true, |b| b.should_flush()) { + return Ok(()); + } + + let b = self + .batch + .as_ref() + .ok_or_else(|| StrategyError::Other("BatchStep: emit without active batch".into()))?; + let committable_for_synthetic = b.current_offsets_snapshot(); + let batch_msg = b.build_stacked_message()?; + self.batch = None; + let wm_after_batch: Vec<_> = std::mem::take(&mut self.watermark_buffer); + + let c = self.next_step.poll()?; + self.commit_request_carried_over = + merge_commit_request(self.commit_request_carried_over.take(), c); + + match self.next_step.submit(batch_msg) { + Err(SubmitError::MessageRejected(MessageRejected { message })) => { + self.message_carried_over = Some(message); + self.watermark_buffer = wm_after_batch; + } + Err(SubmitError::InvalidMessage(e)) => { + self.watermark_buffer = wm_after_batch; + return Err(e.into()); + } + Ok(()) => { + self.emit_watermark_tail(wm_after_batch, committable_for_synthetic)?; + } + } + Ok(()) + } + + fn emit_watermark_tail( + &mut self, + mut wm_after_batch: Vec>, + committable_for_synthetic: BTreeMap, + ) -> Result<(), StrategyError> { + let wmk = self.make_synthetic_watermark(committable_for_synthetic); + wm_after_batch.push(wmk); + while !wm_after_batch.is_empty() { + let m = wm_after_batch.remove(0); + if !self.submit_with_poll(m)? { + for r in wm_after_batch.into_iter().rev() { + self.pending_downstream.push_front(r); + } + break; + } + } + Ok(()) + } + + fn make_synthetic_watermark( + &self, + committable: BTreeMap, + ) -> Message { + let ts = current_epoch(); + let rv = RoutedValue { + route: self.route.clone(), + payload: RoutedValuePayload::make_watermark_payload(committable.clone(), ts), + }; + Message::new_any_message(rv, committable) + } +} + +/// Public constructor used by `operators::build`. +pub fn build_batch_step( + route: &Route, + max_batch_size: Option, + max_batch_time: Option, + next: Box>, +) -> Box> { + Box::new(BatchStep::new( + route.clone(), + max_batch_size, + max_batch_time, + next, + )) +} + +impl ProcessingStrategy for BatchStep { + fn poll(&mut self) -> Result, StrategyError> { + self.drain_pending_downstream()?; + self.retry_carried_batch()?; + if self.message_carried_over.is_none() { + self.try_emit_batch(false)?; + } + let c = self.next_step.poll()?; + Ok(merge_commit_request( + self.commit_request_carried_over.take(), + c, + )) + } + + fn submit(&mut self, message: Message) -> Result<(), SubmitError> { + if self.route != message.payload().route { + return self.next_step.submit(message); + } + + match &message.payload().payload { + RoutedValuePayload::WatermarkMessage(_) => { + self.watermark_buffer.push(message); + Ok(()) + } + RoutedValuePayload::PyStreamingMessage(pysm) => { + if matches!(pysm, PyStreamingMessage::RawMessage { .. }) { + return Err(invalid_message_submit_error(&message)); + } + + if self.batch.is_none() { + let committable: BTreeMap = message.committable().collect(); + let inner = message.into_payload(); + let pysm = match inner.payload { + RoutedValuePayload::PyStreamingMessage(m) => m, + _ => unreachable!(), + }; + let PyStreamingMessage::PyAnyMessage { content } = pysm else { + unreachable!("RawMessage was rejected using &pysm before into_payload"); + }; + self.batch = Some(Batch::from_first_row( + self.route.clone(), + self.max_batch_size, + self.max_batch_time, + content, + committable, + )); + return Ok(()); + } + + let b = self.batch.as_mut().expect("open batch"); + b.apply_committable_from(&message); + let inner = message.into_payload(); + let pysm = match inner.payload { + RoutedValuePayload::PyStreamingMessage(m) => m, + _ => unreachable!(), + }; + let PyStreamingMessage::PyAnyMessage { content } = pysm else { + unreachable!("RawMessage was rejected using &pysm before into_payload"); + }; + b.append_row(content); + Ok(()) + } + } + } + + fn terminate(&mut self) { + self.next_step.terminate(); + } + + fn join(&mut self, timeout: Option) -> Result, StrategyError> { + let deadline = timeout.map(Deadline::new); + loop { + if deadline.as_ref().is_some_and(|d| d.has_elapsed()) { + break; + } + self.drain_pending_downstream()?; + self.retry_carried_batch()?; + if self.message_carried_over.is_none() { + self.try_emit_batch(true)?; + } + if self.is_quiesced() { + break; + } + } + let remaining = self.next_step.join(deadline.map(|d| d.remaining()))?; + Ok(merge_commit_request( + self.commit_request_carried_over.take(), + remaining, + )) + } +} + +#[cfg(test)] +mod tests { + mod batch { + //! [`Batch`] only: rows, committable merge, `should_flush`, and list materialization (GIL). + + use crate::batch_step::Batch; + use crate::messages::{into_pyany, PyAnyMessage, PyStreamingMessage, RoutedValuePayload}; + use crate::routes::Route; + use crate::testutils::build_routed_value; + use crate::utils::traced_with_gil; + use chrono::Utc; + use pyo3::prelude::*; + use pyo3::types::{PyAnyMethods, PyList}; + use pyo3::IntoPyObject; + use sentry_arroyo::types::{Message, Partition, Topic}; + use std::collections::BTreeMap; + + fn route() -> Route { + Route::new("s".into(), vec!["w".into()]) + } + + #[test] + fn build_stacked_message_makes_list_payload() { + traced_with_gil!(|py| { + let r = route(); + let p1 = 1i32.into_pyobject(py).unwrap().into_any().unbind(); + let p2 = 2i32.into_pyobject(py).unwrap().into_any().unbind(); + let a1 = into_pyany( + py, + PyAnyMessage { + payload: p1, + headers: vec![], + timestamp: 0.0, + schema: None, + }, + ) + .unwrap(); + let a2 = into_pyany( + py, + PyAnyMessage { + payload: p2, + headers: vec![], + timestamp: 0.0, + schema: None, + }, + ) + .unwrap(); + let part = Partition::new(Topic::new("t"), 0); + let mut b = Batch::from_first_row( + r.clone(), + Some(2), + None, + a1, + BTreeMap::from([(part, 1u64)]), + ); + b.append_row(a2); + let msg = b.build_stacked_message().expect("build"); + assert!( + msg.committable().any(|(p, o)| p == part && o == 1), + "committable should include merged batch offsets" + ); + let RoutedValuePayload::PyStreamingMessage(pysm) = &msg.payload().payload else { + panic!("expected PyStreamingMessage"); + }; + let PyStreamingMessage::PyAnyMessage { content } = pysm else { + panic!("expected PyAny"); + }; + let pl = content.bind(py).getattr("payload").unwrap(); + let list = pl.cast::().unwrap(); + assert_eq!(list.len(), 2); + assert_eq!(list.get_item(0).unwrap().extract::().unwrap(), 1); + assert_eq!(list.get_item(1).unwrap().extract::().unwrap(), 2); + }); + } + + /// Uses [`build_routed_value`] to build a broker `Message` for [`Batch::apply_committable_from`]. + #[test] + fn apply_committable_merges_max_per_partition() { + traced_with_gil!(|py| { + let r = route(); + let p0 = 0i32.into_pyobject(py).unwrap().into_any().unbind(); + let row = into_pyany( + py, + PyAnyMessage { + payload: p0, + headers: vec![], + timestamp: 0.0, + schema: None, + }, + ) + .unwrap(); + let part = Partition::new(Topic::new("t"), 0); + let mut b = + Batch::from_first_row(r, None, None, row, BTreeMap::from([(part, 1u64)])); + let p_for_msg = 0i32.into_pyobject(py).unwrap().into_any().unbind(); + let py_rv = build_routed_value(py, p_for_msg, "s", vec!["w".into()]); + let m = Message::new_broker_message(py_rv, part, 9, Utc::now()); + b.apply_committable_from(&m); + let snap = b.current_offsets_snapshot(); + assert_eq!(snap.get(&part).copied(), Some(10u64)); + }); + } + + #[test] + fn should_flush_when_max_batch_size_reached() { + traced_with_gil!(|py| { + let r = route(); + let a1 = any_row(py, 1i32); + let a2 = any_row(py, 2i32); + let part = Partition::new(Topic::new("t"), 0); + let mut b = + Batch::from_first_row(r, Some(2), None, a1, BTreeMap::from([(part, 0u64)])); + assert!(!b.should_flush(), "one row, limit 2"); + b.append_row(a2); + assert!(b.should_flush(), "two rows, limit 2"); + }); + } + + fn any_row(py: pyo3::Python<'_>, v: i32) -> pyo3::Py { + let p = v.into_pyobject(py).unwrap().into_any().unbind(); + into_pyany( + py, + PyAnyMessage { + payload: p, + headers: vec![], + timestamp: 0.0, + schema: None, + }, + ) + .unwrap() + } + } + + mod step { + //! [`BatchStep`] as [`ProcessingStrategy`]: routing, raw rejection, handoff to the next step. + + use super::super::{BatchStep, Message}; + use crate::fake_strategy::FakeStrategy; + use crate::testutils::{build_raw_routed_value, build_routed_value}; + use crate::utils::traced_with_gil; + use chrono::Utc; + use pyo3::prelude::*; + use pyo3::IntoPyObject; + use sentry_arroyo::processing::strategies::{ + InvalidMessage, ProcessingStrategy, SubmitError, + }; + use sentry_arroyo::types::{Partition, Topic}; + use std::collections::BTreeMap; + use std::sync::{Arc, Mutex}; + use std::time::Duration; + + use crate::routes::Route; + + fn batch_step_with_fake( + route: Route, + max_n: Option, + max_t: Option, + ) -> (BatchStep, Arc>>>) { + let sub = Arc::new(Mutex::new(Vec::new())); + let next = FakeStrategy::new(sub.clone(), Arc::default(), false); + let step = BatchStep::new(route, max_n, max_t, Box::new(next)); + (step, sub) + } + + #[test] + fn forwards_mismatched_route_to_next_strategy() { + let step_route = Route::new("a".into(), vec![]); + let (mut step, captured) = batch_step_with_fake(step_route, None, None); + traced_with_gil!(|py| { + let p = 1i32.into_pyobject(py).unwrap().into_any().unbind(); + let rv = crate::testutils::build_routed_value(py, p, "b", vec![]); + let m = Message::new_any_message(rv, BTreeMap::new()); + step.submit(m).expect("forward"); + }); + assert_eq!(captured.lock().unwrap().len(), 1); + } + + #[test] + fn flushes_one_message_to_downstream_when_batch_full() { + let route = Route::new("s".into(), vec!["w".into()]); + let (mut step, out) = batch_step_with_fake(route, Some(2), None); + traced_with_gil!(|py| { + let p1 = 1i32.into_pyobject(py).unwrap().into_any().unbind(); + let p2 = 2i32.into_pyobject(py).unwrap().into_any().unbind(); + let m1 = Message::new_any_message( + build_routed_value(py, p1, "s", vec!["w".into()]), + BTreeMap::new(), + ); + let m2 = Message::new_any_message( + build_routed_value(py, p2, "s", vec!["w".into()]), + BTreeMap::new(), + ); + step.submit(m1).unwrap(); + step.submit(m2).unwrap(); + step.poll().unwrap(); + }); + let n = out.lock().unwrap().len(); + assert_eq!( + n, 1, + "downstream should receive one batched message, not list internals" + ); + } + + #[test] + fn submit_rejects_raw_message_after_pyany() { + let route = Route::new("s".into(), vec!["w".into()]); + let (mut step, _out) = batch_step_with_fake(route, Some(10), None); + let part = Partition::new(Topic::new("topic"), 0); + traced_with_gil!(|py| { + let p0 = 0i32.into_pyobject(py).unwrap().into_any().unbind(); + let py_m = Message::new_broker_message( + build_routed_value(py, p0, "s", vec!["w".into()]), + part, + 1, + Utc::now(), + ); + let raw_m = Message::new_broker_message( + build_raw_routed_value(py, vec![1, 2, 3], "s", vec!["w".into()]), + part, + 2, + Utc::now(), + ); + step.submit(py_m).unwrap(); + let err = step.submit(raw_m); + let SubmitError::InvalidMessage(InvalidMessage { offset, .. }) = + err.expect_err("raw") + else { + panic!("expected InvalidMessage"); + }; + assert_eq!(offset, 2); + }); + } + + #[test] + fn submit_rejects_leading_raw_message() { + let route = Route::new("s".into(), vec!["w".into()]); + let (mut step, _out) = batch_step_with_fake(route, Some(10), None); + let part = Partition::new(Topic::new("topic"), 0); + traced_with_gil!(|py| { + let raw_m = Message::new_broker_message( + build_raw_routed_value(py, vec![9], "s", vec!["w".into()]), + part, + 0, + Utc::now(), + ); + let err = step.submit(raw_m); + let SubmitError::InvalidMessage(InvalidMessage { offset, .. }) = + err.expect_err("raw first") + else { + panic!("expected InvalidMessage"); + }; + assert_eq!(offset, 0); + }); + } + } +} diff --git a/sentry_streams/src/lib.rs b/sentry_streams/src/lib.rs index 6ac9b602..09604e77 100644 --- a/sentry_streams/src/lib.rs +++ b/sentry_streams/src/lib.rs @@ -1,4 +1,5 @@ use pyo3::prelude::*; +mod batch_step; mod broadcaster; mod callers; mod commit_policy; diff --git a/sentry_streams/src/operators.rs b/sentry_streams/src/operators.rs index cc5f4e87..8bf5188c 100644 --- a/sentry_streams/src/operators.rs +++ b/sentry_streams/src/operators.rs @@ -1,3 +1,4 @@ +use crate::batch_step::build_batch_step; use crate::broadcaster::Broadcaster; use crate::header_filter_step::build_header_int_filter; use crate::kafka_config::PyKafkaProducerConfig; @@ -13,6 +14,7 @@ use sentry_arroyo::backends::kafka::producer::KafkaProducer; use sentry_arroyo::backends::kafka::types::KafkaPayload; use sentry_arroyo::processing::strategies::run_task_in_threads::ConcurrencyConfig; use sentry_arroyo::processing::strategies::ProcessingStrategy; +use std::time::Duration; /// RuntimeOperator represent a translated step in the streaming pipeline the /// Arroyo Rust runtime know how to run. @@ -90,6 +92,17 @@ pub enum RuntimeOperator { routing_function: Py, downstream_routes: Py, }, + /// Batches `PyAnyMessage` inputs on the route (per-row `PyStreamingMessage::PyAnyMessage`); + /// `RawMessage` is rejected as invalid. Emits one `PyAnyMessage` with a list payload, then + /// buffered and synthetic watermarks. + #[pyo3(name = "Batch")] + Batch { + route: Route, + /// `None` means no size limit (time-only window). + max_batch_size: Option, + /// Wall-clock duration in milliseconds; `None` means no time limit (size-only batch). + max_batch_time_ms: Option, + }, /// Delegates messages processing to a Python operator that provides /// the same kind of interface as an Arroyo strategy. This is meant /// to simplify the porting of python strategies to Rust. @@ -183,6 +196,14 @@ pub fn build( build_router(route, func_ref, next) } + RuntimeOperator::Batch { + route, + max_batch_size, + max_batch_time_ms, + } => { + let max_t = max_batch_time_ms.map(|ms| Duration::from_secs_f64((ms / 1000.0).max(0.0))); + build_batch_step(route, *max_batch_size, max_t, next) + } RuntimeOperator::PythonAdapter { route, delegate_factory, diff --git a/sentry_streams/uv.lock b/sentry_streams/uv.lock index cee17541..7ad47042 100644 --- a/sentry_streams/uv.lock +++ b/sentry_streams/uv.lock @@ -1,5 +1,5 @@ version = 1 -revision = 3 +revision = 2 requires-python = ">=3.11" [[package]] @@ -893,7 +893,7 @@ wheels = [ [[package]] name = "sentry-streams" -version = "0.0.44" +version = "0.0.46" source = { editable = "." } dependencies = [ { name = "click" }, From 7a94e9124b0bfc3a6e95e6902b15e973aaf5812c Mon Sep 17 00:00:00 2001 From: Filippo Pacifici Date: Mon, 27 Apr 2026 17:48:33 -0700 Subject: [PATCH 02/10] ref(sentry-streams): Fix batch step tests and carried-over path Complete the watermark and backpressure tests, drop the broken broker rewrap helper in favor of explicit committable maps, and annotate batch offset maps for type inference. Add a test-only setter to exercise MessageRejected when a batch is held. Refs GH-307 Made-with: Cursor --- sentry_streams/src/batch_step.rs | 363 +++++++++++++++++-------------- 1 file changed, 199 insertions(+), 164 deletions(-) diff --git a/sentry_streams/src/batch_step.rs b/sentry_streams/src/batch_step.rs index 5d8b8aaa..fd92ca13 100644 --- a/sentry_streams/src/batch_step.rs +++ b/sentry_streams/src/batch_step.rs @@ -1,11 +1,13 @@ -//! Batches `PyAnyMessage` rows on a route, then [`BatchStep`] forwards one `PyAnyMessage` with a +//! Batches `PyAnyMessage` elements on a route, then [`BatchStep`] forwards one `PyAnyMessage` with a //! list payload and manages watermarks and backpressure to the next Arroyo strategy. //! //! Only `PyAnyMessage` streaming input is supported; `RawMessage` is rejected with //! `SubmitError::InvalidMessage` when the input is a broker message. +// TODO: Support `RawMessage` streaming input in addition to `PyAnyMessage` (list of bytes in the +// batched `PyAnyMessage` payload). //! -//! Python objects require the GIL to read. We keep `Py` per row and take the GIL -//! only when materializing the batched message on flush, after [`Message::into_payload`]. +//! Python objects require the GIL to read. We keep `Py` per element and take the +//! GIL only when materializing the batched message on flush, after [`Message::into_payload`]. use crate::messages::{into_pyany, PyAnyMessage, PyStreamingMessage, RoutedValuePayload}; use crate::routes::{Route, RoutedValue}; use crate::time_helpers::current_epoch; @@ -32,57 +34,76 @@ fn invalid_message_submit_error(message: &Message) -> SubmitError`; GIL on flush when building the list payload --- - -/// Count- and/or time-based window of `PyAnyMessage` rows for one route. The first row is added -/// via [`Batch::from_first_row`]; more rows with [`Batch::append_row`]. +/// Count- and/or time-based window of `PyAnyMessage` elements for one route. Start with +/// [`Batch::from_initial_message`]; add more with [`Batch::append`]. pub(crate) struct Batch { route: Route, max_batch_size: Option, - max_batch_time: Option, - rows: Vec>, + /// Set when the window is time-bounded; elapsed means flush by time. batch_deadline: Option, + elements: Vec>, batch_offsets: BTreeMap, } impl Batch { - pub fn from_first_row( + /// First element in a window: applies this message’s committable, consumes it, and sets the + /// time-bounded deadline when `max_batch_time` is set. + pub fn from_initial_message( route: Route, max_batch_size: Option, max_batch_time: Option, - first: Py, - initial_committable: BTreeMap, + message: Message, ) -> Self { - let deadline = max_batch_time.map(Deadline::new); + let mut batch_offsets: BTreeMap = BTreeMap::new(); + for (p, o) in message.committable() { + batch_offsets + .entry(p) + .and_modify(|e| *e = (*e).max(o)) + .or_insert(o); + } + let inner = message.into_payload(); + let pysm = match inner.payload { + RoutedValuePayload::PyStreamingMessage(m) => m, + _ => unreachable!(), + }; + let PyStreamingMessage::PyAnyMessage { content } = pysm else { + unreachable!(); + }; + let batch_deadline = max_batch_time.map(Deadline::new); Self { route, max_batch_size, - max_batch_time, - rows: vec![first], - batch_deadline: deadline, - batch_offsets: initial_committable, + batch_deadline, + elements: vec![content], + batch_offsets, } } - pub fn append_row(&mut self, row: Py) { - self.rows.push(row); - } - - pub fn is_empty(&self) -> bool { - self.rows.is_empty() - } - - fn len(&self) -> usize { - self.rows.len() - } - - pub fn apply_committable_from(&mut self, message: &Message) { + /// Merges committable from the message, consumes it, and appends the `PyAnyMessage` element. + pub fn append(&mut self, message: Message) { for (p, o) in message.committable() { self.batch_offsets .entry(p) .and_modify(|e| *e = (*e).max(o)) .or_insert(o); } + let inner = message.into_payload(); + let pysm = match inner.payload { + RoutedValuePayload::PyStreamingMessage(m) => m, + _ => unreachable!(), + }; + let PyStreamingMessage::PyAnyMessage { content } = pysm else { + unreachable!(); + }; + self.elements.push(content); + } + + pub fn is_empty(&self) -> bool { + self.elements.is_empty() + } + + fn len(&self) -> usize { + self.elements.len() } /// Whether the current window is complete by size and/or time. @@ -93,10 +114,12 @@ impl Batch { if self.max_batch_size.is_some_and(|m| self.len() >= m) { return true; } - if let (Some(_), Some(d)) = (&self.max_batch_time, &self.batch_deadline) { - if d.has_elapsed() { - return true; - } + if self + .batch_deadline + .as_ref() + .is_some_and(|d| d.has_elapsed()) + { + return true; } false } @@ -105,9 +128,9 @@ impl Batch { self.batch_offsets.clone() } - /// GIL: build list payload from stored `Py` rows, wrap in a batched `Message`. + /// GIL: build list payload from stored `Py` elements, wrap in a batched `Message`. pub fn build_stacked_message(&self) -> Result, StrategyError> { - if self.rows.is_empty() { + if self.elements.is_empty() { return Err(StrategyError::Other("Batch: empty window".into())); } let route = self.route.clone(); @@ -118,9 +141,9 @@ impl Batch { .unwrap_or(0.0); let content = traced_with_gil!(|py| -> PyResult> { - let first_schema = self.rows[0].bind(py).borrow().schema.clone(); + let first_schema = self.elements[0].bind(py).borrow().schema.clone(); let py_items: Vec> = self - .rows + .elements .iter() .map(|pm| pm.bind(py).borrow().payload.clone_ref(py)) .collect(); @@ -144,8 +167,6 @@ impl Batch { } } -// --- BatchStep: Arroyo strategy — watermarks, backpressure, owns `Option` --- - pub struct BatchStep { next_step: Box>, @@ -182,13 +203,6 @@ impl BatchStep { } } - fn is_quiesced(&self) -> bool { - self.message_carried_over.is_none() - && self.batch.as_ref().map_or(true, |b| b.is_empty()) - && self.pending_downstream.is_empty() - && self.watermark_buffer.is_empty() - } - fn submit_with_poll(&mut self, msg: Message) -> Result { let c = self.next_step.poll()?; self.commit_request_carried_over = @@ -273,30 +287,44 @@ impl BatchStep { mut wm_after_batch: Vec>, committable_for_synthetic: BTreeMap, ) -> Result<(), StrategyError> { - let wmk = self.make_synthetic_watermark(committable_for_synthetic); + let ts = current_epoch(); + let wmk = Message::new_any_message( + RoutedValue { + route: self.route.clone(), + payload: RoutedValuePayload::make_watermark_payload( + committable_for_synthetic.clone(), + ts, + ), + }, + committable_for_synthetic, + ); wm_after_batch.push(wmk); - while !wm_after_batch.is_empty() { - let m = wm_after_batch.remove(0); - if !self.submit_with_poll(m)? { - for r in wm_after_batch.into_iter().rev() { - self.pending_downstream.push_front(r); + for m in wm_after_batch { + let c = self.next_step.poll()?; + self.commit_request_carried_over = + merge_commit_request(self.commit_request_carried_over.take(), c); + match self.next_step.submit(m) { + Ok(()) => {} + Err(SubmitError::MessageRejected(_)) => { + // TODO: preserve or retry watermarks when downstream applies backpressure; today + // we drop the rest of the tail. + break; } - break; + Err(SubmitError::InvalidMessage(e)) => return Err(e.into()), } } Ok(()) } +} - fn make_synthetic_watermark( - &self, - committable: BTreeMap, - ) -> Message { - let ts = current_epoch(); - let rv = RoutedValue { - route: self.route.clone(), - payload: RoutedValuePayload::make_watermark_payload(committable.clone(), ts), - }; - Message::new_any_message(rv, committable) +#[cfg(test)] +impl BatchStep { + /// Simulates downstream having rejected a batched message (backpressure). + pub(crate) fn set_message_carried_over_for_test( + &mut self, + message: Option>, + ) { + self.message_carried_over = message; } } @@ -330,12 +358,19 @@ impl ProcessingStrategy for BatchStep { } fn submit(&mut self, message: Message) -> Result<(), SubmitError> { + if self.message_carried_over.is_some() { + return Err(SubmitError::MessageRejected(MessageRejected { message })); + } + if self.route != message.payload().route { return self.next_step.submit(message); } match &message.payload().payload { RoutedValuePayload::WatermarkMessage(_) => { + if self.batch.as_ref().map_or(true, |b| b.is_empty()) { + return self.next_step.submit(message); + } self.watermark_buffer.push(message); Ok(()) } @@ -345,36 +380,17 @@ impl ProcessingStrategy for BatchStep { } if self.batch.is_none() { - let committable: BTreeMap = message.committable().collect(); - let inner = message.into_payload(); - let pysm = match inner.payload { - RoutedValuePayload::PyStreamingMessage(m) => m, - _ => unreachable!(), - }; - let PyStreamingMessage::PyAnyMessage { content } = pysm else { - unreachable!("RawMessage was rejected using &pysm before into_payload"); - }; - self.batch = Some(Batch::from_first_row( + self.batch = Some(Batch::from_initial_message( self.route.clone(), self.max_batch_size, self.max_batch_time, - content, - committable, + message, )); return Ok(()); } let b = self.batch.as_mut().expect("open batch"); - b.apply_committable_from(&message); - let inner = message.into_payload(); - let pysm = match inner.payload { - RoutedValuePayload::PyStreamingMessage(m) => m, - _ => unreachable!(), - }; - let PyStreamingMessage::PyAnyMessage { content } = pysm else { - unreachable!("RawMessage was rejected using &pysm before into_payload"); - }; - b.append_row(content); + b.append(message); Ok(()) } } @@ -395,7 +411,11 @@ impl ProcessingStrategy for BatchStep { if self.message_carried_over.is_none() { self.try_emit_batch(true)?; } - if self.is_quiesced() { + if self.message_carried_over.is_none() + && self.batch.as_ref().map_or(true, |b| b.is_empty()) + && self.pending_downstream.is_empty() + && self.watermark_buffer.is_empty() + { break; } } @@ -410,10 +430,10 @@ impl ProcessingStrategy for BatchStep { #[cfg(test)] mod tests { mod batch { - //! [`Batch`] only: rows, committable merge, `should_flush`, and list materialization (GIL). + //! [`Batch`] in isolation: elements, committable, `should_flush`, list build (GIL). use crate::batch_step::Batch; - use crate::messages::{into_pyany, PyAnyMessage, PyStreamingMessage, RoutedValuePayload}; + use crate::messages::{PyStreamingMessage, RoutedValuePayload}; use crate::routes::Route; use crate::testutils::build_routed_value; use crate::utils::traced_with_gil; @@ -434,38 +454,20 @@ mod tests { let r = route(); let p1 = 1i32.into_pyobject(py).unwrap().into_any().unbind(); let p2 = 2i32.into_pyobject(py).unwrap().into_any().unbind(); - let a1 = into_pyany( - py, - PyAnyMessage { - payload: p1, - headers: vec![], - timestamp: 0.0, - schema: None, - }, - ) - .unwrap(); - let a2 = into_pyany( - py, - PyAnyMessage { - payload: p2, - headers: vec![], - timestamp: 0.0, - schema: None, - }, - ) - .unwrap(); let part = Partition::new(Topic::new("t"), 0); - let mut b = Batch::from_first_row( - r.clone(), - Some(2), - None, - a1, + let m1 = Message::new_any_message( + build_routed_value(py, p1, "s", vec!["w".into()]), BTreeMap::from([(part, 1u64)]), ); - b.append_row(a2); + let m2 = Message::new_any_message( + build_routed_value(py, p2, "s", vec!["w".into()]), + BTreeMap::from([(part, 2u64)]), + ); + let mut b = Batch::from_initial_message(r.clone(), Some(2), None, m1); + b.append(m2); let msg = b.build_stacked_message().expect("build"); assert!( - msg.committable().any(|(p, o)| p == part && o == 1), + msg.committable().any(|(p, o)| p == part && o == 2), "committable should include merged batch offsets" ); let RoutedValuePayload::PyStreamingMessage(pysm) = &msg.payload().payload else { @@ -482,29 +484,25 @@ mod tests { }); } - /// Uses [`build_routed_value`] to build a broker `Message` for [`Batch::apply_committable_from`]. #[test] - fn apply_committable_merges_max_per_partition() { + fn append_merges_max_per_partition() { traced_with_gil!(|py| { let r = route(); let p0 = 0i32.into_pyobject(py).unwrap().into_any().unbind(); - let row = into_pyany( - py, - PyAnyMessage { - payload: p0, - headers: vec![], - timestamp: 0.0, - schema: None, - }, - ) - .unwrap(); - let part = Partition::new(Topic::new("t"), 0); - let mut b = - Batch::from_first_row(r, None, None, row, BTreeMap::from([(part, 1u64)])); let p_for_msg = 0i32.into_pyobject(py).unwrap().into_any().unbind(); - let py_rv = build_routed_value(py, p_for_msg, "s", vec!["w".into()]); - let m = Message::new_broker_message(py_rv, part, 9, Utc::now()); - b.apply_committable_from(&m); + let part = Partition::new(Topic::new("t"), 0); + let m1 = Message::new_any_message( + build_routed_value(py, p0, "s", vec!["w".into()]), + BTreeMap::from([(part, 1u64)]), + ); + let m2 = Message::new_broker_message( + build_routed_value(py, p_for_msg, "s", vec!["w".into()]), + part, + 9, + Utc::now(), + ); + let mut b = Batch::from_initial_message(r, None, None, m1); + b.append(m2); let snap = b.current_offsets_snapshot(); assert_eq!(snap.get(&part).copied(), Some(10u64)); }); @@ -514,34 +512,27 @@ mod tests { fn should_flush_when_max_batch_size_reached() { traced_with_gil!(|py| { let r = route(); - let a1 = any_row(py, 1i32); - let a2 = any_row(py, 2i32); + let p1 = 1i32.into_pyobject(py).unwrap().into_any().unbind(); + let p2 = 2i32.into_pyobject(py).unwrap().into_any().unbind(); let part = Partition::new(Topic::new("t"), 0); - let mut b = - Batch::from_first_row(r, Some(2), None, a1, BTreeMap::from([(part, 0u64)])); - assert!(!b.should_flush(), "one row, limit 2"); - b.append_row(a2); - assert!(b.should_flush(), "two rows, limit 2"); + let m1 = Message::new_any_message( + build_routed_value(py, p1, "s", vec!["w".into()]), + BTreeMap::from([(part, 0u64)]), + ); + let m2 = Message::new_any_message( + build_routed_value(py, p2, "s", vec!["w".into()]), + BTreeMap::from([(part, 1u64)]), + ); + let mut b = Batch::from_initial_message(r, Some(2), None, m1); + assert!(!b.should_flush(), "one element, limit 2"); + b.append(m2); + assert!(b.should_flush(), "two elements, limit 2"); }); } - - fn any_row(py: pyo3::Python<'_>, v: i32) -> pyo3::Py { - let p = v.into_pyobject(py).unwrap().into_any().unbind(); - into_pyany( - py, - PyAnyMessage { - payload: p, - headers: vec![], - timestamp: 0.0, - schema: None, - }, - ) - .unwrap() - } } mod step { - //! [`BatchStep`] as [`ProcessingStrategy`]: routing, raw rejection, handoff to the next step. + //! [`BatchStep`] as [`ProcessingStrategy`]: routing, raw rejection, backpressure, watermarks. use super::super::{BatchStep, Message}; use crate::fake_strategy::FakeStrategy; @@ -551,30 +542,36 @@ mod tests { use pyo3::prelude::*; use pyo3::IntoPyObject; use sentry_arroyo::processing::strategies::{ - InvalidMessage, ProcessingStrategy, SubmitError, + InvalidMessage, MessageRejected, ProcessingStrategy, SubmitError, }; use sentry_arroyo::types::{Partition, Topic}; use std::collections::BTreeMap; use std::sync::{Arc, Mutex}; use std::time::Duration; + use crate::messages::RoutedValuePayload; use crate::routes::Route; fn batch_step_with_fake( route: Route, max_n: Option, max_t: Option, - ) -> (BatchStep, Arc>>>) { + ) -> ( + BatchStep, + Arc>>>, + Arc>>, + ) { let sub = Arc::new(Mutex::new(Vec::new())); - let next = FakeStrategy::new(sub.clone(), Arc::default(), false); + let wms = Arc::new(Mutex::new(Vec::new())); + let next = FakeStrategy::new(sub.clone(), wms.clone(), false); let step = BatchStep::new(route, max_n, max_t, Box::new(next)); - (step, sub) + (step, sub, wms) } #[test] fn forwards_mismatched_route_to_next_strategy() { let step_route = Route::new("a".into(), vec![]); - let (mut step, captured) = batch_step_with_fake(step_route, None, None); + let (mut step, captured, _wms) = batch_step_with_fake(step_route, None, None); traced_with_gil!(|py| { let p = 1i32.into_pyobject(py).unwrap().into_any().unbind(); let rv = crate::testutils::build_routed_value(py, p, "b", vec![]); @@ -587,7 +584,7 @@ mod tests { #[test] fn flushes_one_message_to_downstream_when_batch_full() { let route = Route::new("s".into(), vec!["w".into()]); - let (mut step, out) = batch_step_with_fake(route, Some(2), None); + let (mut step, out, _wms) = batch_step_with_fake(route, Some(2), None); traced_with_gil!(|py| { let p1 = 1i32.into_pyobject(py).unwrap().into_any().unbind(); let p2 = 2i32.into_pyobject(py).unwrap().into_any().unbind(); @@ -613,7 +610,7 @@ mod tests { #[test] fn submit_rejects_raw_message_after_pyany() { let route = Route::new("s".into(), vec!["w".into()]); - let (mut step, _out) = batch_step_with_fake(route, Some(10), None); + let (mut step, _out, _wms) = batch_step_with_fake(route, Some(10), None); let part = Partition::new(Topic::new("topic"), 0); traced_with_gil!(|py| { let p0 = 0i32.into_pyobject(py).unwrap().into_any().unbind(); @@ -643,7 +640,7 @@ mod tests { #[test] fn submit_rejects_leading_raw_message() { let route = Route::new("s".into(), vec!["w".into()]); - let (mut step, _out) = batch_step_with_fake(route, Some(10), None); + let (mut step, _out, _wms) = batch_step_with_fake(route, Some(10), None); let part = Partition::new(Topic::new("topic"), 0); traced_with_gil!(|py| { let raw_m = Message::new_broker_message( @@ -661,5 +658,43 @@ mod tests { assert_eq!(offset, 0); }); } + + #[test] + fn watermark_forwarded_immediately_when_batch_empty() { + let route = Route::new("s".into(), vec!["w".into()]); + let (mut step, _out, wms) = batch_step_with_fake(route.clone(), None, None); + let rv = crate::routes::RoutedValue { + route, + payload: RoutedValuePayload::make_watermark_payload(BTreeMap::new(), 0), + }; + let m = Message::new_any_message(rv, BTreeMap::new()); + step.submit(m) + .expect("watermark should go to next step when no open batch"); + assert_eq!(wms.lock().unwrap().len(), 1); + } + + #[test] + fn submit_rejects_while_message_carried_over() { + let route = Route::new("s".into(), vec!["w".into()]); + let (mut step, _out, _wms) = batch_step_with_fake(route.clone(), None, None); + traced_with_gil!(|py| { + let p_carried = 1i32.into_pyobject(py).unwrap().into_any().unbind(); + let p_next = 2i32.into_pyobject(py).unwrap().into_any().unbind(); + let carried = Message::new_any_message( + build_routed_value(py, p_carried, "s", vec!["w".into()]), + BTreeMap::new(), + ); + step.set_message_carried_over_for_test(Some(carried)); + let m = Message::new_any_message( + build_routed_value(py, p_next, "s", vec!["w".into()]), + BTreeMap::new(), + ); + let err = step.submit(m).expect_err("expected backpressure"); + assert!(matches!( + err, + SubmitError::MessageRejected(MessageRejected { .. }) + )); + }); + } } } From b825e8932f1dbcc45fcddc01b6e0a29e83520091 Mon Sep 17 00:00:00 2001 From: Filippo Pacifici Date: Mon, 27 Apr 2026 19:03:41 -0700 Subject: [PATCH 03/10] ref(sentry-streams): Dequeue post-batch watermarks on poll After a successful batch submit, push buffered and synthetic watermarks into pending_watermarks and drain them in poll (poll next, merge commit, submit; MessageRejected re-queues to the front) matching PythonAdapter's transformed message pattern. Refs GH-307 Made-with: Cursor --- sentry_streams/src/batch_step.rs | 69 +++++++++++++++++++------------- 1 file changed, 42 insertions(+), 27 deletions(-) diff --git a/sentry_streams/src/batch_step.rs b/sentry_streams/src/batch_step.rs index fd92ca13..26ba224c 100644 --- a/sentry_streams/src/batch_step.rs +++ b/sentry_streams/src/batch_step.rs @@ -175,8 +175,13 @@ pub struct BatchStep { max_batch_time: Option, /// `None` until the first `PyAnyMessage` in a window; then the open batch. batch: Option, - /// Watermarks received while the current batch window is open; taken when emitting after batch. + /// Watermarks received while the current batch window is open; moved to + /// [`Self::pending_watermarks`] when the batch is emitted successfully. watermark_buffer: Vec>, + /// Watermarks (and the synthetic batch watermark) queued for the next step; appended on + /// batch flush, drained on each [`ProcessingStrategy::poll`] (same deque + re-queue pattern as + /// `PythonAdapter`'s transformed message queue). + pending_watermarks: VecDeque>, message_carried_over: Option>, pending_downstream: VecDeque>, @@ -197,6 +202,7 @@ impl BatchStep { max_batch_time, batch: None, watermark_buffer: Vec::new(), + pending_watermarks: VecDeque::new(), message_carried_over: None, pending_downstream: VecDeque::new(), commit_request_carried_over: None, @@ -226,6 +232,23 @@ impl BatchStep { Ok(()) } + fn drain_pending_watermarks(&mut self) -> Result<(), StrategyError> { + while let Some(msg) = self.pending_watermarks.pop_front() { + let c = self.next_step.poll()?; + self.commit_request_carried_over = + merge_commit_request(self.commit_request_carried_over.take(), c); + match self.next_step.submit(msg) { + Ok(()) => {} + Err(SubmitError::MessageRejected(MessageRejected { message })) => { + self.pending_watermarks.push_front(message); + break; + } + Err(SubmitError::InvalidMessage(e)) => return Err(e.into()), + } + } + Ok(()) + } + fn retry_carried_batch(&mut self) -> Result<(), StrategyError> { if let Some(msg) = self.message_carried_over.take() { let c = self.next_step.poll()?; @@ -276,44 +299,31 @@ impl BatchStep { return Err(e.into()); } Ok(()) => { - self.emit_watermark_tail(wm_after_batch, committable_for_synthetic)?; + self.enqueue_watermark_tail(wm_after_batch, committable_for_synthetic); } } Ok(()) } - fn emit_watermark_tail( + /// Pushes buffered route watermarks and the synthetic batch watermark into the pending queue + /// for delivery on subsequent poll cycles. + fn enqueue_watermark_tail( &mut self, - mut wm_after_batch: Vec>, - committable_for_synthetic: BTreeMap, - ) -> Result<(), StrategyError> { + wm_after_batch: Vec>, + committable: BTreeMap, + ) { + for m in wm_after_batch { + self.pending_watermarks.push_back(m); + } let ts = current_epoch(); let wmk = Message::new_any_message( RoutedValue { route: self.route.clone(), - payload: RoutedValuePayload::make_watermark_payload( - committable_for_synthetic.clone(), - ts, - ), + payload: RoutedValuePayload::make_watermark_payload(committable.clone(), ts), }, - committable_for_synthetic, + committable, ); - wm_after_batch.push(wmk); - for m in wm_after_batch { - let c = self.next_step.poll()?; - self.commit_request_carried_over = - merge_commit_request(self.commit_request_carried_over.take(), c); - match self.next_step.submit(m) { - Ok(()) => {} - Err(SubmitError::MessageRejected(_)) => { - // TODO: preserve or retry watermarks when downstream applies backpressure; today - // we drop the rest of the tail. - break; - } - Err(SubmitError::InvalidMessage(e)) => return Err(e.into()), - } - } - Ok(()) + self.pending_watermarks.push_back(wmk); } } @@ -346,10 +356,12 @@ pub fn build_batch_step( impl ProcessingStrategy for BatchStep { fn poll(&mut self) -> Result, StrategyError> { self.drain_pending_downstream()?; + self.drain_pending_watermarks()?; self.retry_carried_batch()?; if self.message_carried_over.is_none() { self.try_emit_batch(false)?; } + self.drain_pending_watermarks()?; let c = self.next_step.poll()?; Ok(merge_commit_request( self.commit_request_carried_over.take(), @@ -407,13 +419,16 @@ impl ProcessingStrategy for BatchStep { break; } self.drain_pending_downstream()?; + self.drain_pending_watermarks()?; self.retry_carried_batch()?; if self.message_carried_over.is_none() { self.try_emit_batch(true)?; } + self.drain_pending_watermarks()?; if self.message_carried_over.is_none() && self.batch.as_ref().map_or(true, |b| b.is_empty()) && self.pending_downstream.is_empty() + && self.pending_watermarks.is_empty() && self.watermark_buffer.is_empty() { break; From 674524ff22fc6db1c1b88114119ef0d08131253b Mon Sep 17 00:00:00 2001 From: Filippo Pacifici Date: Tue, 28 Apr 2026 14:49:48 -0700 Subject: [PATCH 04/10] ref(sentry-streams): Simplify batch step per PR review Batch now takes committable maps and Py in from_initial/append so the struct avoids unreachable payload branches. Rename build_stacked_message to flush. Collapse pending_downstream and pending_watermarks into a single outbound deque with one drain_outbound (poll+merge+submit+requeue). BatchStep::submit decomposes the routed message after a RawMessage guard. Refs GH-307 Made-with: Cursor --- sentry_streams/src/batch_step.rs | 199 ++++++++++++++++--------------- 1 file changed, 100 insertions(+), 99 deletions(-) diff --git a/sentry_streams/src/batch_step.rs b/sentry_streams/src/batch_step.rs index 26ba224c..0cc3d78a 100644 --- a/sentry_streams/src/batch_step.rs +++ b/sentry_streams/src/batch_step.rs @@ -35,7 +35,7 @@ fn invalid_message_submit_error(message: &Message) -> SubmitError, @@ -46,55 +46,40 @@ pub(crate) struct Batch { } impl Batch { - /// First element in a window: applies this message’s committable, consumes it, and sets the - /// time-bounded deadline when `max_batch_time` is set. - pub fn from_initial_message( + /// First element in a window. `committable` and `content` are taken from the same + /// [`RoutedValue`] (see [`BatchStep::submit`]); `Batch` only ever holds `PyAnyMessage` elements. + pub fn from_initial( route: Route, max_batch_size: Option, max_batch_time: Option, - message: Message, + committable: BTreeMap, + first_element: Py, ) -> Self { let mut batch_offsets: BTreeMap = BTreeMap::new(); - for (p, o) in message.committable() { + for (p, o) in committable { batch_offsets .entry(p) .and_modify(|e| *e = (*e).max(o)) .or_insert(o); } - let inner = message.into_payload(); - let pysm = match inner.payload { - RoutedValuePayload::PyStreamingMessage(m) => m, - _ => unreachable!(), - }; - let PyStreamingMessage::PyAnyMessage { content } = pysm else { - unreachable!(); - }; let batch_deadline = max_batch_time.map(Deadline::new); Self { route, max_batch_size, batch_deadline, - elements: vec![content], + elements: vec![first_element], batch_offsets, } } - /// Merges committable from the message, consumes it, and appends the `PyAnyMessage` element. - pub fn append(&mut self, message: Message) { - for (p, o) in message.committable() { + /// Merges this message’s `committable` into the running offset map and appends `content`. + pub fn append(&mut self, committable: BTreeMap, content: Py) { + for (p, o) in committable { self.batch_offsets .entry(p) .and_modify(|e| *e = (*e).max(o)) .or_insert(o); } - let inner = message.into_payload(); - let pysm = match inner.payload { - RoutedValuePayload::PyStreamingMessage(m) => m, - _ => unreachable!(), - }; - let PyStreamingMessage::PyAnyMessage { content } = pysm else { - unreachable!(); - }; self.elements.push(content); } @@ -128,8 +113,9 @@ impl Batch { self.batch_offsets.clone() } - /// GIL: build list payload from stored `Py` elements, wrap in a batched `Message`. - pub fn build_stacked_message(&self) -> Result, StrategyError> { + /// GIL: build list payload from stored `Py` elements, wrap in a batched + /// [`Message`]. + pub fn flush(&self) -> Result, StrategyError> { if self.elements.is_empty() { return Err(StrategyError::Other("Batch: empty window".into())); } @@ -175,16 +161,15 @@ pub struct BatchStep { max_batch_time: Option, /// `None` until the first `PyAnyMessage` in a window; then the open batch. batch: Option, - /// Watermarks received while the current batch window is open; moved to - /// [`Self::pending_watermarks`] when the batch is emitted successfully. + /// Watermarks received while the current batch window is open; on successful batch send they + /// are appended to [`Self::outbound`]. watermark_buffer: Vec>, - /// Watermarks (and the synthetic batch watermark) queued for the next step; appended on - /// batch flush, drained on each [`ProcessingStrategy::poll`] (same deque + re-queue pattern as - /// `PythonAdapter`'s transformed message queue). - pending_watermarks: VecDeque>, + /// Batched follow-ups and downstream backpressure: anything not yet accepted by + /// `next_step` (watermarks, synthetic batch watermark, or other rejects), drained on + /// [`Self::drain_outbound`]. + outbound: VecDeque>, message_carried_over: Option>, - pending_downstream: VecDeque>, commit_request_carried_over: Option, } @@ -202,45 +187,24 @@ impl BatchStep { max_batch_time, batch: None, watermark_buffer: Vec::new(), - pending_watermarks: VecDeque::new(), + outbound: VecDeque::new(), message_carried_over: None, - pending_downstream: VecDeque::new(), commit_request_carried_over: None, } } - fn submit_with_poll(&mut self, msg: Message) -> Result { - let c = self.next_step.poll()?; - self.commit_request_carried_over = - merge_commit_request(self.commit_request_carried_over.take(), c); - match self.next_step.submit(msg) { - Ok(()) => Ok(true), - Err(SubmitError::MessageRejected(MessageRejected { message })) => { - self.pending_downstream.push_front(message); - Ok(false) - } - Err(SubmitError::InvalidMessage(e)) => Err(e.into()), - } - } - - fn drain_pending_downstream(&mut self) -> Result<(), StrategyError> { - while let Some(msg) = self.pending_downstream.pop_front() { - if !self.submit_with_poll(msg)? { - break; - } - } - Ok(()) - } - - fn drain_pending_watermarks(&mut self) -> Result<(), StrategyError> { - while let Some(msg) = self.pending_watermarks.pop_front() { + /// [`ProcessingStrategy::poll`], then merge `CommitRequest`, then `submit` each queued message + /// until a [`SubmitError::MessageRejected`] re-queues the front and stops (same idea as + /// `PythonAdapter`’s transformed message drain). + fn drain_outbound(&mut self) -> Result<(), StrategyError> { + while let Some(msg) = self.outbound.pop_front() { let c = self.next_step.poll()?; self.commit_request_carried_over = merge_commit_request(self.commit_request_carried_over.take(), c); match self.next_step.submit(msg) { Ok(()) => {} Err(SubmitError::MessageRejected(MessageRejected { message })) => { - self.pending_watermarks.push_front(message); + self.outbound.push_front(message); break; } Err(SubmitError::InvalidMessage(e)) => return Err(e.into()), @@ -281,7 +245,7 @@ impl BatchStep { .as_ref() .ok_or_else(|| StrategyError::Other("BatchStep: emit without active batch".into()))?; let committable_for_synthetic = b.current_offsets_snapshot(); - let batch_msg = b.build_stacked_message()?; + let batch_msg = b.flush()?; self.batch = None; let wm_after_batch: Vec<_> = std::mem::take(&mut self.watermark_buffer); @@ -305,15 +269,13 @@ impl BatchStep { Ok(()) } - /// Pushes buffered route watermarks and the synthetic batch watermark into the pending queue - /// for delivery on subsequent poll cycles. fn enqueue_watermark_tail( &mut self, wm_after_batch: Vec>, committable: BTreeMap, ) { for m in wm_after_batch { - self.pending_watermarks.push_back(m); + self.outbound.push_back(m); } let ts = current_epoch(); let wmk = Message::new_any_message( @@ -323,7 +285,7 @@ impl BatchStep { }, committable, ); - self.pending_watermarks.push_back(wmk); + self.outbound.push_back(wmk); } } @@ -355,13 +317,12 @@ pub fn build_batch_step( impl ProcessingStrategy for BatchStep { fn poll(&mut self) -> Result, StrategyError> { - self.drain_pending_downstream()?; - self.drain_pending_watermarks()?; + self.drain_outbound()?; self.retry_carried_batch()?; if self.message_carried_over.is_none() { self.try_emit_batch(false)?; } - self.drain_pending_watermarks()?; + self.drain_outbound()?; let c = self.next_step.poll()?; Ok(merge_commit_request( self.commit_request_carried_over.take(), @@ -378,31 +339,52 @@ impl ProcessingStrategy for BatchStep { return self.next_step.submit(message); } - match &message.payload().payload { - RoutedValuePayload::WatermarkMessage(_) => { + if let RoutedValuePayload::PyStreamingMessage(PyStreamingMessage::RawMessage { .. }) = + &message.payload().payload + { + return Err(invalid_message_submit_error(&message)); + } + + let committable: BTreeMap = message.committable().collect(); + let rv = message.into_payload(); + match rv.payload { + RoutedValuePayload::WatermarkMessage(wm) => { if self.batch.as_ref().map_or(true, |b| b.is_empty()) { - return self.next_step.submit(message); + return self.next_step.submit(Message::new_any_message( + RoutedValue { + route: rv.route, + payload: RoutedValuePayload::WatermarkMessage(wm), + }, + committable, + )); } - self.watermark_buffer.push(message); + self.watermark_buffer.push(Message::new_any_message( + RoutedValue { + route: rv.route, + payload: RoutedValuePayload::WatermarkMessage(wm), + }, + committable, + )); Ok(()) } RoutedValuePayload::PyStreamingMessage(pysm) => { - if matches!(pysm, PyStreamingMessage::RawMessage { .. }) { - return Err(invalid_message_submit_error(&message)); - } - + let PyStreamingMessage::PyAnyMessage { content } = pysm else { + unreachable!("BatchStep: RawMessage should have been rejected above"); + }; if self.batch.is_none() { - self.batch = Some(Batch::from_initial_message( + self.batch = Some(Batch::from_initial( self.route.clone(), self.max_batch_size, self.max_batch_time, - message, + committable, + content, )); return Ok(()); } - - let b = self.batch.as_mut().expect("open batch"); - b.append(message); + self.batch + .as_mut() + .expect("open batch") + .append(committable, content); Ok(()) } } @@ -418,17 +400,15 @@ impl ProcessingStrategy for BatchStep { if deadline.as_ref().is_some_and(|d| d.has_elapsed()) { break; } - self.drain_pending_downstream()?; - self.drain_pending_watermarks()?; + self.drain_outbound()?; self.retry_carried_batch()?; if self.message_carried_over.is_none() { self.try_emit_batch(true)?; } - self.drain_pending_watermarks()?; + self.drain_outbound()?; if self.message_carried_over.is_none() && self.batch.as_ref().map_or(true, |b| b.is_empty()) - && self.pending_downstream.is_empty() - && self.pending_watermarks.is_empty() + && self.outbound.is_empty() && self.watermark_buffer.is_empty() { break; @@ -448,8 +428,8 @@ mod tests { //! [`Batch`] in isolation: elements, committable, `should_flush`, list build (GIL). use crate::batch_step::Batch; - use crate::messages::{PyStreamingMessage, RoutedValuePayload}; - use crate::routes::Route; + use crate::messages::{PyAnyMessage, PyStreamingMessage, RoutedValuePayload}; + use crate::routes::{Route, RoutedValue}; use crate::testutils::build_routed_value; use crate::utils::traced_with_gil; use chrono::Utc; @@ -463,8 +443,23 @@ mod tests { Route::new("s".into(), vec!["w".into()]) } + /// Test helper: same decomposition as [`BatchStep::submit`] for `PyAnyMessage` inputs. + fn committable_and_pyany( + message: Message, + ) -> (BTreeMap, Py) { + let c = message.committable().collect(); + let rv = message.into_payload(); + let RoutedValuePayload::PyStreamingMessage(PyStreamingMessage::PyAnyMessage { + content, + }) = rv.payload + else { + panic!("test expects PyAnyMessage"); + }; + (c, content) + } + #[test] - fn build_stacked_message_makes_list_payload() { + fn flush_makes_list_payload() { traced_with_gil!(|py| { let r = route(); let p1 = 1i32.into_pyobject(py).unwrap().into_any().unbind(); @@ -478,9 +473,11 @@ mod tests { build_routed_value(py, p2, "s", vec!["w".into()]), BTreeMap::from([(part, 2u64)]), ); - let mut b = Batch::from_initial_message(r.clone(), Some(2), None, m1); - b.append(m2); - let msg = b.build_stacked_message().expect("build"); + let (c1, el1) = committable_and_pyany(m1); + let mut b = Batch::from_initial(r.clone(), Some(2), None, c1, el1); + let (c2, el2) = committable_and_pyany(m2); + b.append(c2, el2); + let msg = b.flush().expect("build"); assert!( msg.committable().any(|(p, o)| p == part && o == 2), "committable should include merged batch offsets" @@ -516,8 +513,10 @@ mod tests { 9, Utc::now(), ); - let mut b = Batch::from_initial_message(r, None, None, m1); - b.append(m2); + let (c1, el1) = committable_and_pyany(m1); + let mut b = Batch::from_initial(r, None, None, c1, el1); + let (c2, el2) = committable_and_pyany(m2); + b.append(c2, el2); let snap = b.current_offsets_snapshot(); assert_eq!(snap.get(&part).copied(), Some(10u64)); }); @@ -538,9 +537,11 @@ mod tests { build_routed_value(py, p2, "s", vec!["w".into()]), BTreeMap::from([(part, 1u64)]), ); - let mut b = Batch::from_initial_message(r, Some(2), None, m1); + let (c1, el1) = committable_and_pyany(m1); + let mut b = Batch::from_initial(r, Some(2), None, c1, el1); assert!(!b.should_flush(), "one element, limit 2"); - b.append(m2); + let (c2, el2) = committable_and_pyany(m2); + b.append(c2, el2); assert!(b.should_flush(), "two elements, limit 2"); }); } From c3e5aa8a57c120efb3e7dbf73df1d82b9d9ca1fb Mon Sep 17 00:00:00 2001 From: Filippo Pacifici Date: Tue, 28 Apr 2026 14:54:37 -0700 Subject: [PATCH 05/10] ref(sentry-streams): Drop message_carried_over for outbound queue Queue failed batch flushes on outbound with push_front like other MessageRejected paths. Use stalled_batch to keep submit gating (outbound can hold non-stall work). Remove retry_carried_batch; drain_outbound clears the stall bit after a successful submit. Refs GH-307 Made-with: Cursor --- sentry_streams/src/batch_step.rs | 68 ++++++++++++++------------------ 1 file changed, 30 insertions(+), 38 deletions(-) diff --git a/sentry_streams/src/batch_step.rs b/sentry_streams/src/batch_step.rs index 0cc3d78a..59193bac 100644 --- a/sentry_streams/src/batch_step.rs +++ b/sentry_streams/src/batch_step.rs @@ -165,11 +165,13 @@ pub struct BatchStep { /// are appended to [`Self::outbound`]. watermark_buffer: Vec>, /// Batched follow-ups and downstream backpressure: anything not yet accepted by - /// `next_step` (watermarks, synthetic batch watermark, or other rejects), drained on - /// [`Self::drain_outbound`]. + /// `next_step` (watermarks, synthetic batch watermark, a failed batch flush, or any other + /// [`SubmitError::MessageRejected`]), drained on [`Self::drain_outbound`]. outbound: VecDeque>, - - message_carried_over: Option>, + /// When true, a batched `Message` is waiting in [`Self::outbound`] and must be delivered + /// before this step accepts new streaming rows (same gating the old `message_carried_over` + /// provided); other work may still be in [`Self::outbound`]. + stalled_batch: bool, commit_request_carried_over: Option, } @@ -188,7 +190,7 @@ impl BatchStep { batch: None, watermark_buffer: Vec::new(), outbound: VecDeque::new(), - message_carried_over: None, + stalled_batch: false, commit_request_carried_over: None, } } @@ -202,7 +204,11 @@ impl BatchStep { self.commit_request_carried_over = merge_commit_request(self.commit_request_carried_over.take(), c); match self.next_step.submit(msg) { - Ok(()) => {} + Ok(()) => { + if self.stalled_batch { + self.stalled_batch = false; + } + } Err(SubmitError::MessageRejected(MessageRejected { message })) => { self.outbound.push_front(message); break; @@ -213,24 +219,8 @@ impl BatchStep { Ok(()) } - fn retry_carried_batch(&mut self) -> Result<(), StrategyError> { - if let Some(msg) = self.message_carried_over.take() { - let c = self.next_step.poll()?; - self.commit_request_carried_over = - merge_commit_request(self.commit_request_carried_over.take(), c); - match self.next_step.submit(msg) { - Ok(()) => {} - Err(SubmitError::MessageRejected(MessageRejected { message })) => { - self.message_carried_over = Some(message); - } - Err(SubmitError::InvalidMessage(e)) => return Err(e.into()), - } - } - Ok(()) - } - fn try_emit_batch(&mut self, force: bool) -> Result<(), StrategyError> { - if self.message_carried_over.is_some() { + if self.stalled_batch { return Ok(()); } if self.batch.as_ref().map_or(true, |b| b.is_empty()) { @@ -255,7 +245,8 @@ impl BatchStep { match self.next_step.submit(batch_msg) { Err(SubmitError::MessageRejected(MessageRejected { message })) => { - self.message_carried_over = Some(message); + self.outbound.push_front(message); + self.stalled_batch = true; self.watermark_buffer = wm_after_batch; } Err(SubmitError::InvalidMessage(e)) => { @@ -291,12 +282,15 @@ impl BatchStep { #[cfg(test)] impl BatchStep { - /// Simulates downstream having rejected a batched message (backpressure). - pub(crate) fn set_message_carried_over_for_test( - &mut self, - message: Option>, - ) { - self.message_carried_over = message; + /// Simulates a batched `Message` stuck behind downstream backpressure: same as a reject on + /// the batch flush, but with no prior [`try_emit_batch`]. + pub(crate) fn set_stalled_outbound_for_test(&mut self, message: Option>) { + if let Some(m) = message { + self.outbound.push_front(m); + self.stalled_batch = true; + } else { + self.stalled_batch = false; + } } } @@ -318,8 +312,7 @@ pub fn build_batch_step( impl ProcessingStrategy for BatchStep { fn poll(&mut self) -> Result, StrategyError> { self.drain_outbound()?; - self.retry_carried_batch()?; - if self.message_carried_over.is_none() { + if !self.stalled_batch { self.try_emit_batch(false)?; } self.drain_outbound()?; @@ -331,7 +324,7 @@ impl ProcessingStrategy for BatchStep { } fn submit(&mut self, message: Message) -> Result<(), SubmitError> { - if self.message_carried_over.is_some() { + if self.stalled_batch { return Err(SubmitError::MessageRejected(MessageRejected { message })); } @@ -401,12 +394,11 @@ impl ProcessingStrategy for BatchStep { break; } self.drain_outbound()?; - self.retry_carried_batch()?; - if self.message_carried_over.is_none() { + if !self.stalled_batch { self.try_emit_batch(true)?; } self.drain_outbound()?; - if self.message_carried_over.is_none() + if !self.stalled_batch && self.batch.as_ref().map_or(true, |b| b.is_empty()) && self.outbound.is_empty() && self.watermark_buffer.is_empty() @@ -690,7 +682,7 @@ mod tests { } #[test] - fn submit_rejects_while_message_carried_over() { + fn submit_rejects_while_batch_stalled_in_outbound() { let route = Route::new("s".into(), vec!["w".into()]); let (mut step, _out, _wms) = batch_step_with_fake(route.clone(), None, None); traced_with_gil!(|py| { @@ -700,7 +692,7 @@ mod tests { build_routed_value(py, p_carried, "s", vec!["w".into()]), BTreeMap::new(), ); - step.set_message_carried_over_for_test(Some(carried)); + step.set_stalled_outbound_for_test(Some(carried)); let m = Message::new_any_message( build_routed_value(py, p_next, "s", vec!["w".into()]), BTreeMap::new(), From 0ca3f290547abe9ca4becfd8b07cead811621be3 Mon Sep 17 00:00:00 2001 From: Filippo Pacifici Date: Tue, 28 Apr 2026 14:59:16 -0700 Subject: [PATCH 06/10] ref(sentry-streams): Enqueue batch flush for drain_outbound only try_emit_batch no longer poll+submits the batched message; it push_backs the batch, sets stalled_batch, and enqueues the watermark tail so a single path handles delivery and backpressure. Refs GH-307 Made-with: Cursor --- sentry_streams/src/batch_step.rs | 25 +++++++------------------ 1 file changed, 7 insertions(+), 18 deletions(-) diff --git a/sentry_streams/src/batch_step.rs b/sentry_streams/src/batch_step.rs index 59193bac..2f7ca321 100644 --- a/sentry_streams/src/batch_step.rs +++ b/sentry_streams/src/batch_step.rs @@ -239,24 +239,13 @@ impl BatchStep { self.batch = None; let wm_after_batch: Vec<_> = std::mem::take(&mut self.watermark_buffer); - let c = self.next_step.poll()?; - self.commit_request_carried_over = - merge_commit_request(self.commit_request_carried_over.take(), c); - - match self.next_step.submit(batch_msg) { - Err(SubmitError::MessageRejected(MessageRejected { message })) => { - self.outbound.push_front(message); - self.stalled_batch = true; - self.watermark_buffer = wm_after_batch; - } - Err(SubmitError::InvalidMessage(e)) => { - self.watermark_buffer = wm_after_batch; - return Err(e.into()); - } - Ok(()) => { - self.enqueue_watermark_tail(wm_after_batch, committable_for_synthetic); - } - } + // Order: batched `Message` first, then buffered route WMs, then the synthetic batch WM; + // [`drain_outbound`] performs all `poll`+`submit` (including [`SubmitError`]) like other + // outbound work. `stalled_batch` blocks new `PyAny` input until the batched `Message` is + // accepted, matching the old synchronous `submit` path. + self.outbound.push_back(batch_msg); + self.stalled_batch = true; + self.enqueue_watermark_tail(wm_after_batch, committable_for_synthetic); Ok(()) } From 9c6e3b6accd0fd06939d3d31ac07fb09fe70aaff Mon Sep 17 00:00:00 2001 From: Filippo Pacifici Date: Tue, 28 Apr 2026 15:12:34 -0700 Subject: [PATCH 07/10] Simplify --- sentry_streams/src/batch_step.rs | 19 +++++++------------ 1 file changed, 7 insertions(+), 12 deletions(-) diff --git a/sentry_streams/src/batch_step.rs b/sentry_streams/src/batch_step.rs index 2f7ca321..586679a6 100644 --- a/sentry_streams/src/batch_step.rs +++ b/sentry_streams/src/batch_step.rs @@ -300,10 +300,7 @@ pub fn build_batch_step( impl ProcessingStrategy for BatchStep { fn poll(&mut self) -> Result, StrategyError> { - self.drain_outbound()?; - if !self.stalled_batch { - self.try_emit_batch(false)?; - } + self.try_emit_batch(false)?; self.drain_outbound()?; let c = self.next_step.poll()?; Ok(merge_commit_request( @@ -313,14 +310,15 @@ impl ProcessingStrategy for BatchStep { } fn submit(&mut self, message: Message) -> Result<(), SubmitError> { - if self.stalled_batch { - return Err(SubmitError::MessageRejected(MessageRejected { message })); - } - if self.route != message.payload().route { return self.next_step.submit(message); } + if self.stalled_batch { + return Err(SubmitError::MessageRejected(MessageRejected { message })); + } + + // TODO: Support RawMessage as well. if let RoutedValuePayload::PyStreamingMessage(PyStreamingMessage::RawMessage { .. }) = &message.payload().payload { @@ -382,10 +380,7 @@ impl ProcessingStrategy for BatchStep { if deadline.as_ref().is_some_and(|d| d.has_elapsed()) { break; } - self.drain_outbound()?; - if !self.stalled_batch { - self.try_emit_batch(true)?; - } + self.try_emit_batch(false)?; self.drain_outbound()?; if !self.stalled_batch && self.batch.as_ref().map_or(true, |b| b.is_empty()) From 160fef43d788a180c4815e3deed226d44abf633f Mon Sep 17 00:00:00 2001 From: Filippo Pacifici Date: Tue, 28 Apr 2026 15:22:17 -0700 Subject: [PATCH 08/10] Fix docs --- sentry_streams/src/batch_step.rs | 58 +++++++++++++++++--------------- 1 file changed, 31 insertions(+), 27 deletions(-) diff --git a/sentry_streams/src/batch_step.rs b/sentry_streams/src/batch_step.rs index 586679a6..51e49956 100644 --- a/sentry_streams/src/batch_step.rs +++ b/sentry_streams/src/batch_step.rs @@ -52,6 +52,8 @@ impl Batch { route: Route, max_batch_size: Option, max_batch_time: Option, + // Keeps track of the highest offset for each partition. This represent the committable + // we will return when the batch is flushed. committable: BTreeMap, first_element: Py, ) -> Self { @@ -72,7 +74,6 @@ impl Batch { } } - /// Merges this message’s `committable` into the running offset map and appends `content`. pub fn append(&mut self, committable: BTreeMap, content: Py) { for (p, o) in committable { self.batch_offsets @@ -91,7 +92,6 @@ impl Batch { self.elements.len() } - /// Whether the current window is complete by size and/or time. pub fn should_flush(&self) -> bool { if self.is_empty() { return false; @@ -113,8 +113,6 @@ impl Batch { self.batch_offsets.clone() } - /// GIL: build list payload from stored `Py` elements, wrap in a batched - /// [`Message`]. pub fn flush(&self) -> Result, StrategyError> { if self.elements.is_empty() { return Err(StrategyError::Other("Batch: empty window".into())); @@ -163,15 +161,20 @@ pub struct BatchStep { batch: Option, /// Watermarks received while the current batch window is open; on successful batch send they /// are appended to [`Self::outbound`]. + /// + /// We need to hold on the watermarks we receive until the batch is flushed otherwise we + /// would indicate that messages have to be committed before they are sent through. watermark_buffer: Vec>, - /// Batched follow-ups and downstream backpressure: anything not yet accepted by - /// `next_step` (watermarks, synthetic batch watermark, a failed batch flush, or any other - /// [`SubmitError::MessageRejected`]), drained on [`Self::drain_outbound`]. + /// This is the queue of messages that are ready to be pushed through to the next step. + /// We have to keep it as part of the struct state because we are not guaranteed to be + /// able to push it through in one call to `poll`. We may receive MessageRejected errors + /// on any of the messages we want to flush. + /// In those case we hold the pending messages and try again on the next call to `poll`. outbound: VecDeque>, /// When true, a batched `Message` is waiting in [`Self::outbound`] and must be delivered - /// before this step accepts new streaming rows (same gating the old `message_carried_over` - /// provided); other work may still be in [`Self::outbound`]. - stalled_batch: bool, + /// before this step accepts new messages. + /// This helps us capping the size of the pending queue during prolonged backpressure periods. + pending_batch: bool, commit_request_carried_over: Option, } @@ -190,14 +193,17 @@ impl BatchStep { batch: None, watermark_buffer: Vec::new(), outbound: VecDeque::new(), - stalled_batch: false, + pending_batch: false, commit_request_carried_over: None, } } - /// [`ProcessingStrategy::poll`], then merge `CommitRequest`, then `submit` each queued message - /// until a [`SubmitError::MessageRejected`] re-queues the front and stops (same idea as - /// `PythonAdapter`’s transformed message drain). + /// Tries to drain the queue containing the pending messages. + /// At the first MessageRejected it stops and returns the error leaving the queue + /// intact for the following attempt. + /// + /// It calls `poll` on the next step to guarantee it has a chance to process the + /// on going work. fn drain_outbound(&mut self) -> Result<(), StrategyError> { while let Some(msg) = self.outbound.pop_front() { let c = self.next_step.poll()?; @@ -205,8 +211,8 @@ impl BatchStep { merge_commit_request(self.commit_request_carried_over.take(), c); match self.next_step.submit(msg) { Ok(()) => { - if self.stalled_batch { - self.stalled_batch = false; + if self.pending_batch { + self.pending_batch = false; } } Err(SubmitError::MessageRejected(MessageRejected { message })) => { @@ -220,7 +226,7 @@ impl BatchStep { } fn try_emit_batch(&mut self, force: bool) -> Result<(), StrategyError> { - if self.stalled_batch { + if self.pending_batch { return Ok(()); } if self.batch.as_ref().map_or(true, |b| b.is_empty()) { @@ -234,17 +240,16 @@ impl BatchStep { .batch .as_ref() .ok_or_else(|| StrategyError::Other("BatchStep: emit without active batch".into()))?; + + // We create a synthetic watermark to avoid waiting for the next batch to complete before + // allowing the consumer to commit. let committable_for_synthetic = b.current_offsets_snapshot(); let batch_msg = b.flush()?; self.batch = None; let wm_after_batch: Vec<_> = std::mem::take(&mut self.watermark_buffer); - // Order: batched `Message` first, then buffered route WMs, then the synthetic batch WM; - // [`drain_outbound`] performs all `poll`+`submit` (including [`SubmitError`]) like other - // outbound work. `stalled_batch` blocks new `PyAny` input until the batched `Message` is - // accepted, matching the old synchronous `submit` path. self.outbound.push_back(batch_msg); - self.stalled_batch = true; + self.pending_batch = true; self.enqueue_watermark_tail(wm_after_batch, committable_for_synthetic); Ok(()) } @@ -276,14 +281,13 @@ impl BatchStep { pub(crate) fn set_stalled_outbound_for_test(&mut self, message: Option>) { if let Some(m) = message { self.outbound.push_front(m); - self.stalled_batch = true; + self.pending_batch = true; } else { - self.stalled_batch = false; + self.pending_batch = false; } } } -/// Public constructor used by `operators::build`. pub fn build_batch_step( route: &Route, max_batch_size: Option, @@ -314,7 +318,7 @@ impl ProcessingStrategy for BatchStep { return self.next_step.submit(message); } - if self.stalled_batch { + if self.pending_batch { return Err(SubmitError::MessageRejected(MessageRejected { message })); } @@ -382,7 +386,7 @@ impl ProcessingStrategy for BatchStep { } self.try_emit_batch(false)?; self.drain_outbound()?; - if !self.stalled_batch + if !self.pending_batch && self.batch.as_ref().map_or(true, |b| b.is_empty()) && self.outbound.is_empty() && self.watermark_buffer.is_empty() From 046bf53957345851b9f00f4585ff318a32421a65 Mon Sep 17 00:00:00 2001 From: Filippo Pacifici Date: Tue, 28 Apr 2026 15:53:14 -0700 Subject: [PATCH 09/10] Force flush on join --- sentry_streams/src/batch_step.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sentry_streams/src/batch_step.rs b/sentry_streams/src/batch_step.rs index 51e49956..94fb27ee 100644 --- a/sentry_streams/src/batch_step.rs +++ b/sentry_streams/src/batch_step.rs @@ -384,7 +384,7 @@ impl ProcessingStrategy for BatchStep { if deadline.as_ref().is_some_and(|d| d.has_elapsed()) { break; } - self.try_emit_batch(false)?; + self.try_emit_batch(true)?; self.drain_outbound()?; if !self.pending_batch && self.batch.as_ref().map_or(true, |b| b.is_empty()) From f207b8aad71dc66ecea014df8cb420f1fa385f9d Mon Sep 17 00:00:00 2001 From: Filippo Pacifici Date: Wed, 29 Apr 2026 07:38:50 -0700 Subject: [PATCH 10/10] Address review comments --- sentry_streams/src/batch_step.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/sentry_streams/src/batch_step.rs b/sentry_streams/src/batch_step.rs index 94fb27ee..54f0bb20 100644 --- a/sentry_streams/src/batch_step.rs +++ b/sentry_streams/src/batch_step.rs @@ -114,9 +114,6 @@ impl Batch { } pub fn flush(&self) -> Result, StrategyError> { - if self.elements.is_empty() { - return Err(StrategyError::Other("Batch: empty window".into())); - } let route = self.route.clone(); let committable = self.batch_offsets.clone(); let ts = SystemTime::now()