diff --git a/.agents/skills/.gitkeep b/.agents/skills/.gitkeep new file mode 100644 index 00000000..e69de29b diff --git a/sentry_streams/sentry_streams/pipeline/pipeline.py b/sentry_streams/sentry_streams/pipeline/pipeline.py index 512c2d81..4f13fec6 100644 --- a/sentry_streams/sentry_streams/pipeline/pipeline.py +++ b/sentry_streams/sentry_streams/pipeline/pipeline.py @@ -629,6 +629,10 @@ class Batch( batch_timedelta (time duration). If neither is specified, defaults to a 10-second time window. + The native Rust batch step batches ``PyAnyMessage`` and/or ``RawMessage`` rows; the emitted + message has a single ``PyAnyMessage`` with a ``list`` payload (mixed Python values and/or + ``bytes`` per element). + Both batch_size and batch_timedelta can be overridden via the deployment config's steps_config section. """ diff --git a/sentry_streams/src/batch_step.rs b/sentry_streams/src/batch_step.rs index 54f0bb20..d9089c0f 100644 --- a/sentry_streams/src/batch_step.rs +++ b/sentry_streams/src/batch_step.rs @@ -1,53 +1,65 @@ -//! Batches `PyAnyMessage` elements on a route, then [`BatchStep`] forwards one `PyAnyMessage` with a -//! list payload and manages watermarks and backpressure to the next Arroyo strategy. +//! Batches streaming messages on a route. [`Batch`] stores [`PyStreamingMessage`] values; `PyAnyMessage` +//! and `RawMessage` may appear in the same window. On flush, output is a single `PyAnyMessage` whose +//! `payload` is a Python `list` with one item per element (each item is the row’s Python payload for +//! `PyAnyMessage`, or `bytes` for `RawMessage`). The batched `schema` is taken from the first +//! element. Watermark handling and backpressure are unchanged. //! -//! Only `PyAnyMessage` streaming input is supported; `RawMessage` is rejected with -//! `SubmitError::InvalidMessage` when the input is a broker message. -// TODO: Support `RawMessage` streaming input in addition to `PyAnyMessage` (list of bytes in the -// batched `PyAnyMessage` payload). -//! -//! Python objects require the GIL to read. We keep `Py` per element and take the -//! GIL only when materializing the batched message on flush, after [`Message::into_payload`]. +//! The GIL is taken only to build the list on flush (after [`Message::into_payload`] in submit). use crate::messages::{into_pyany, PyAnyMessage, PyStreamingMessage, RoutedValuePayload}; use crate::routes::{Route, RoutedValue}; use crate::time_helpers::current_epoch; use crate::utils::traced_with_gil; use pyo3::prelude::*; -use pyo3::types::PyList; +use pyo3::types::{PyBytes, PyList}; use sentry_arroyo::processing::strategies::{ merge_commit_request, CommitRequest, MessageRejected, ProcessingStrategy, StrategyError, SubmitError, }; -use sentry_arroyo::types::{InnerMessage, Message, Partition}; +use sentry_arroyo::types::{Message, Partition}; use sentry_arroyo::utils::timing::Deadline; use std::collections::{BTreeMap, VecDeque}; use std::time::{Duration, SystemTime, UNIX_EPOCH}; -fn invalid_message_submit_error(message: &Message) -> SubmitError { - match &message.inner_message { - InnerMessage::BrokerMessage(broker_message) => { - SubmitError::InvalidMessage(broker_message.into()) - } - InnerMessage::AnyMessage(..) => { - panic!("BatchStep: invalid message on AnyMessage; Arroyo cannot surface InvalidMessage") +fn first_element_schema(py: Python<'_>, first: &PyStreamingMessage) -> Option { + match first { + PyStreamingMessage::PyAnyMessage { content } => content.bind(py).borrow().schema.clone(), + PyStreamingMessage::RawMessage { content } => content.bind(py).borrow().schema.clone(), + } +} + +/// Returns the Python object to be added to the produced batch. This is +/// the Python object that contains the payload of the message. +fn list_item_for_streaming_message( + py: Python<'_>, + pysm: &PyStreamingMessage, +) -> PyResult> { + match pysm { + PyStreamingMessage::PyAnyMessage { content } => { + Ok(content.bind(py).borrow().payload.clone_ref(py)) + } + PyStreamingMessage::RawMessage { content } => { + // RawMessages payload is turned into a Python bytes object as + // we do not have a native Rust batch message. + let p = &content.bind(py).borrow().payload; + Ok(PyBytes::new(py, p).into_any().unbind()) } } } -/// Count- and/or time-based window of `PyAnyMessage` elements for one route. Start with -/// [`Batch::from_initial`]; add more with [`Batch::append`]. +/// Count- and/or time-based window of streaming elements for one route. On flush, output is +/// always a batched `PyAnyMessage` with a list payload. pub(crate) struct Batch { route: Route, max_batch_size: Option, /// Set when the window is time-bounded; elapsed means flush by time. batch_deadline: Option, - elements: Vec>, + elements: Vec, batch_offsets: BTreeMap, } impl Batch { - /// First element in a window. `committable` and `content` are taken from the same - /// [`RoutedValue`] (see [`BatchStep::submit`]); `Batch` only ever holds `PyAnyMessage` elements. + /// First element in a window. `committable` and `first` are from the same [`RoutedValue`] + /// (see [`BatchStep::submit`]). Later elements may use either `PyAnyMessage` or `RawMessage`. pub fn from_initial( route: Route, max_batch_size: Option, @@ -55,7 +67,7 @@ impl Batch { // Keeps track of the highest offset for each partition. This represent the committable // we will return when the batch is flushed. committable: BTreeMap, - first_element: Py, + first: PyStreamingMessage, ) -> Self { let mut batch_offsets: BTreeMap = BTreeMap::new(); for (p, o) in committable { @@ -69,19 +81,19 @@ impl Batch { route, max_batch_size, batch_deadline, - elements: vec![first_element], + elements: vec![first], batch_offsets, } } - pub fn append(&mut self, committable: BTreeMap, content: Py) { + pub fn append(&mut self, committable: BTreeMap, pysm: PyStreamingMessage) { for (p, o) in committable { self.batch_offsets .entry(p) .and_modify(|e| *e = (*e).max(o)) .or_insert(o); } - self.elements.push(content); + self.elements.push(pysm); } pub fn is_empty(&self) -> bool { @@ -122,12 +134,13 @@ impl Batch { .unwrap_or(0.0); let content = traced_with_gil!(|py| -> PyResult> { - let first_schema = self.elements[0].bind(py).borrow().schema.clone(); - let py_items: Vec> = self + let first_schema = first_element_schema(py, &self.elements[0]); + let py_items: Result>, _> = self .elements .iter() - .map(|pm| pm.bind(py).borrow().payload.clone_ref(py)) + .map(|el| list_item_for_streaming_message(py, el)) .collect(); + let py_items = py_items.map_err(|e: PyErr| e)?; let list = PyList::new(py, &py_items)?.unbind(); let inner = PyAnyMessage { payload: list.into_any(), @@ -154,7 +167,7 @@ pub struct BatchStep { route: Route, max_batch_size: Option, max_batch_time: Option, - /// `None` until the first `PyAnyMessage` in a window; then the open batch. + /// `None` until the first streaming message in a window. batch: Option, /// Watermarks received while the current batch window is open; on successful batch send they /// are appended to [`Self::outbound`]. @@ -319,13 +332,6 @@ impl ProcessingStrategy for BatchStep { return Err(SubmitError::MessageRejected(MessageRejected { message })); } - // TODO: Support RawMessage as well. - if let RoutedValuePayload::PyStreamingMessage(PyStreamingMessage::RawMessage { .. }) = - &message.payload().payload - { - return Err(invalid_message_submit_error(&message)); - } - let committable: BTreeMap = message.committable().collect(); let rv = message.into_payload(); match rv.payload { @@ -349,23 +355,20 @@ impl ProcessingStrategy for BatchStep { Ok(()) } RoutedValuePayload::PyStreamingMessage(pysm) => { - let PyStreamingMessage::PyAnyMessage { content } = pysm else { - unreachable!("BatchStep: RawMessage should have been rejected above"); - }; if self.batch.is_none() { self.batch = Some(Batch::from_initial( self.route.clone(), self.max_batch_size, self.max_batch_time, committable, - content, + pysm, )); - return Ok(()); + } else { + self.batch + .as_mut() + .expect("open batch") + .append(committable, pysm); } - self.batch - .as_mut() - .expect("open batch") - .append(committable, content); Ok(()) } } @@ -405,13 +408,13 @@ mod tests { //! [`Batch`] in isolation: elements, committable, `should_flush`, list build (GIL). use crate::batch_step::Batch; - use crate::messages::{PyAnyMessage, PyStreamingMessage, RoutedValuePayload}; + use crate::messages::{PyStreamingMessage, RoutedValuePayload}; use crate::routes::{Route, RoutedValue}; - use crate::testutils::build_routed_value; + use crate::testutils::{build_raw_routed_value, build_routed_value}; use crate::utils::traced_with_gil; use chrono::Utc; use pyo3::prelude::*; - use pyo3::types::{PyAnyMethods, PyList}; + use pyo3::types::{PyAnyMethods, PyBytes, PyList}; use pyo3::IntoPyObject; use sentry_arroyo::types::{Message, Partition, Topic}; use std::collections::BTreeMap; @@ -420,19 +423,16 @@ mod tests { Route::new("s".into(), vec!["w".into()]) } - /// Test helper: same decomposition as [`BatchStep::submit`] for `PyAnyMessage` inputs. - fn committable_and_pyany( + /// Same decomposition as [`BatchStep::submit`]: committable map + owned streaming row. + fn committable_and_streaming( message: Message, - ) -> (BTreeMap, Py) { + ) -> (BTreeMap, PyStreamingMessage) { let c = message.committable().collect(); let rv = message.into_payload(); - let RoutedValuePayload::PyStreamingMessage(PyStreamingMessage::PyAnyMessage { - content, - }) = rv.payload - else { - panic!("test expects PyAnyMessage"); + let RoutedValuePayload::PyStreamingMessage(s) = rv.payload else { + panic!("test expects PyStreamingMessage"); }; - (c, content) + (c, s) } #[test] @@ -450,9 +450,9 @@ mod tests { build_routed_value(py, p2, "s", vec!["w".into()]), BTreeMap::from([(part, 2u64)]), ); - let (c1, el1) = committable_and_pyany(m1); + let (c1, el1) = committable_and_streaming(m1); let mut b = Batch::from_initial(r.clone(), Some(2), None, c1, el1); - let (c2, el2) = committable_and_pyany(m2); + let (c2, el2) = committable_and_streaming(m2); b.append(c2, el2); let msg = b.flush().expect("build"); assert!( @@ -473,6 +473,52 @@ mod tests { }); } + #[test] + fn flush_makes_list_of_bytes() { + traced_with_gil!(|py| { + let r = route(); + let part = Partition::new(Topic::new("t"), 0); + let m1 = Message::new_any_message( + build_raw_routed_value(py, vec![1, 2], "s", vec!["w".into()]), + BTreeMap::from([(part, 1u64)]), + ); + let m2 = Message::new_any_message( + build_raw_routed_value(py, vec![3], "s", vec!["w".into()]), + BTreeMap::from([(part, 2u64)]), + ); + let (c1, el1) = committable_and_streaming(m1); + let mut b = Batch::from_initial(r, Some(2), None, c1, el1); + let (c2, el2) = committable_and_streaming(m2); + b.append(c2, el2); + let msg = b.flush().expect("build"); + let RoutedValuePayload::PyStreamingMessage(pysm) = &msg.payload().payload else { + panic!("expected PyStreamingMessage"); + }; + let PyStreamingMessage::PyAnyMessage { content } = pysm else { + panic!("batched output is always PyAnyMessage with list"); + }; + let pl = content.bind(py).getattr("payload").unwrap(); + let list = pl.cast::().unwrap(); + assert_eq!(list.len(), 2); + let b0: Vec = list + .get_item(0) + .unwrap() + .cast::() + .unwrap() + .as_bytes() + .to_vec(); + let b1: Vec = list + .get_item(1) + .unwrap() + .cast::() + .unwrap() + .as_bytes() + .to_vec(); + assert_eq!(b0, vec![1, 2]); + assert_eq!(b1, vec![3]); + }); + } + #[test] fn append_merges_max_per_partition() { traced_with_gil!(|py| { @@ -490,9 +536,9 @@ mod tests { 9, Utc::now(), ); - let (c1, el1) = committable_and_pyany(m1); + let (c1, el1) = committable_and_streaming(m1); let mut b = Batch::from_initial(r, None, None, c1, el1); - let (c2, el2) = committable_and_pyany(m2); + let (c2, el2) = committable_and_streaming(m2); b.append(c2, el2); let snap = b.current_offsets_snapshot(); assert_eq!(snap.get(&part).copied(), Some(10u64)); @@ -514,10 +560,10 @@ mod tests { build_routed_value(py, p2, "s", vec!["w".into()]), BTreeMap::from([(part, 1u64)]), ); - let (c1, el1) = committable_and_pyany(m1); + let (c1, el1) = committable_and_streaming(m1); let mut b = Batch::from_initial(r, Some(2), None, c1, el1); assert!(!b.should_flush(), "one element, limit 2"); - let (c2, el2) = committable_and_pyany(m2); + let (c2, el2) = committable_and_streaming(m2); b.append(c2, el2); assert!(b.should_flush(), "two elements, limit 2"); }); @@ -525,7 +571,7 @@ mod tests { } mod step { - //! [`BatchStep`] as [`ProcessingStrategy`]: routing, raw rejection, backpressure, watermarks. + //! [`BatchStep`] as [`ProcessingStrategy`]: routing, mixed streaming rows, backpressure, watermarks. use super::super::{BatchStep, Message}; use crate::fake_strategy::FakeStrategy; @@ -533,9 +579,10 @@ mod tests { use crate::utils::traced_with_gil; use chrono::Utc; use pyo3::prelude::*; + use pyo3::types::{PyBytes, PyList}; use pyo3::IntoPyObject; use sentry_arroyo::processing::strategies::{ - InvalidMessage, MessageRejected, ProcessingStrategy, SubmitError, + MessageRejected, ProcessingStrategy, SubmitError, }; use sentry_arroyo::types::{Partition, Topic}; use std::collections::BTreeMap; @@ -601,7 +648,7 @@ mod tests { } #[test] - fn submit_rejects_raw_message_after_pyany() { + fn submit_accepts_raw_broker_message_after_pyany() { let route = Route::new("s".into(), vec!["w".into()]); let (mut step, _out, _wms) = batch_step_with_fake(route, Some(10), None); let part = Partition::new(Topic::new("topic"), 0); @@ -620,35 +667,42 @@ mod tests { Utc::now(), ); step.submit(py_m).unwrap(); - let err = step.submit(raw_m); - let SubmitError::InvalidMessage(InvalidMessage { offset, .. }) = - err.expect_err("raw") - else { - panic!("expected InvalidMessage"); - }; - assert_eq!(offset, 2); + step.submit(raw_m) + .expect("mixed PyAny and Raw in one batch"); }); } #[test] - fn submit_rejects_leading_raw_message() { + fn submit_mixed_streaming_raw_after_streaming_any_flushes_mixed_list() { let route = Route::new("s".into(), vec!["w".into()]); - let (mut step, _out, _wms) = batch_step_with_fake(route, Some(10), None); + let (mut step, out, _wms) = batch_step_with_fake(route, Some(2), None); let part = Partition::new(Topic::new("topic"), 0); traced_with_gil!(|py| { - let raw_m = Message::new_broker_message( - build_raw_routed_value(py, vec![9], "s", vec!["w".into()]), - part, - 0, - Utc::now(), + let p0 = 0i32.into_pyobject(py).unwrap().into_any().unbind(); + let any_m = Message::new_any_message( + build_routed_value(py, p0, "s", vec!["w".into()]), + BTreeMap::from([(part, 1u64)]), ); - let err = step.submit(raw_m); - let SubmitError::InvalidMessage(InvalidMessage { offset, .. }) = - err.expect_err("raw first") - else { - panic!("expected InvalidMessage"); - }; - assert_eq!(offset, 0); + let raw_m = Message::new_any_message( + build_raw_routed_value(py, vec![1, 2, 3], "s", vec!["w".into()]), + BTreeMap::from([(part, 2u64)]), + ); + step.submit(any_m).unwrap(); + step.submit(raw_m).expect("any then raw in same window"); + step.poll().expect("emit batch of 2"); + let b = out.lock().unwrap(); + assert_eq!(b.len(), 1, "one batched payload"); + let list = b[0].bind(py).cast::().expect("list payload"); + assert_eq!(list.len(), 2); + assert_eq!(list.get_item(0).unwrap().extract::().unwrap(), 0); + let b1: Vec = list + .get_item(1) + .unwrap() + .cast::() + .unwrap() + .as_bytes() + .to_vec(); + assert_eq!(b1, vec![1, 2, 3]); }); } diff --git a/sentry_streams/src/operators.rs b/sentry_streams/src/operators.rs index 8bf5188c..78113d99 100644 --- a/sentry_streams/src/operators.rs +++ b/sentry_streams/src/operators.rs @@ -92,8 +92,8 @@ pub enum RuntimeOperator { routing_function: Py, downstream_routes: Py, }, - /// Batches `PyAnyMessage` inputs on the route (per-row `PyStreamingMessage::PyAnyMessage`); - /// `RawMessage` is rejected as invalid. Emits one `PyAnyMessage` with a list payload, then + /// Batches streaming rows (`PyAnyMessage` and/or `RawMessage` in the same window). Emits one + /// `PyAnyMessage` with a `list` payload (each item is the row’s Python value or `bytes`), then /// buffered and synthetic watermarks. #[pyo3(name = "Batch")] Batch { diff --git a/sentry_streams/uv.lock b/sentry_streams/uv.lock index 7ad47042..6a6baba9 100644 --- a/sentry_streams/uv.lock +++ b/sentry_streams/uv.lock @@ -893,7 +893,7 @@ wheels = [ [[package]] name = "sentry-streams" -version = "0.0.46" +version = "0.0.48" source = { editable = "." } dependencies = [ { name = "click" },