From c2b478fddf9e55970367dbf490151c68cc17910d Mon Sep 17 00:00:00 2001 From: Filippo Pacifici Date: Wed, 22 Apr 2026 13:57:26 -0700 Subject: [PATCH 1/2] Optimize message --- .../sentry_streams/pipeline/message.py | 113 +++++++++++------- .../sentry_streams/rust_streams.pyi | 4 +- sentry_streams/src/messages.rs | 31 +++-- .../arroyo/test_multi_process_delegate.py | 2 +- .../tests/adapters/arroyo/test_steps_chain.py | 2 +- sentry_streams/tests/pipeline/test_message.py | 5 +- 6 files changed, 102 insertions(+), 55 deletions(-) diff --git a/sentry_streams/sentry_streams/pipeline/message.py b/sentry_streams/sentry_streams/pipeline/message.py index 81a15d8a..a884bfa6 100644 --- a/sentry_streams/sentry_streams/pipeline/message.py +++ b/sentry_streams/sentry_streams/pipeline/message.py @@ -11,7 +11,6 @@ Tuple, TypeVar, Union, - cast, ) from sentry_streams.rust_streams import PyAnyMessage, PyWatermark, RawMessage @@ -96,6 +95,11 @@ class PyMessage(Generic[TPayload], Message[TPayload]): By making this type generic we can still get a lot of guarantees by the type checker when wiring python primitives together as it can ensure primitives are compatible with each other. + + The Rust ``PyAnyMessage`` is built on first :meth:`to_inner` and then cached, + so normal use of the Python wrapper avoids pyo3 allocation until a step + needs the inner type. Pickle serializes only the Python fields; the cache is + cleared on unpickle so the inner is recreated when needed. """ def __init__( @@ -105,44 +109,56 @@ def __init__( timestamp: float, schema: Optional[str] = None, ) -> None: - self.inner = PyAnyMessage(payload, headers, timestamp, schema) + self._payload: TPayload = payload + self._headers: Sequence[Tuple[str, bytes]] = headers + self._timestamp = timestamp + self._schema = schema + self._cached_inner: PyAnyMessage | None = None @property def payload(self) -> TPayload: - return cast(TPayload, self.inner.payload) + return self._payload @property def headers(self) -> Sequence[Tuple[str, bytes]]: - return self.inner.headers + return self._headers @property def timestamp(self) -> float: - return self.inner.timestamp + return self._timestamp @property def schema(self) -> str | None: - return self.inner.schema + return self._schema def size(self) -> int | None: - if isinstance(self.inner.payload, (str, bytes)): - return len(self.inner.payload) + if isinstance(self._payload, (str, bytes)): + return len(self._payload) return None def __repr__(self) -> str: - return f"PyMessage({self.inner.__repr__()})" + return ( + f"PyMessage(PyAnyMessage(payload={self._payload!r}, " + f"headers={self._headers!r}, timestamp={self._timestamp}, " + f"schema={self._schema!r}))" + ) def __str__(self) -> str: return repr(self) - def to_inner(self) -> RustMessage: - return self.inner + def to_inner(self) -> PyAnyMessage: + if self._cached_inner is None: + self._cached_inner = PyAnyMessage( + self._payload, self._headers, self._timestamp, self._schema + ) + return self._cached_inner def deepcopy(self) -> PyMessage[TPayload]: return PyMessage( - deepcopy(self.inner.payload), - deepcopy(self.inner.headers), - self.inner.timestamp, - self.inner.schema, + deepcopy(self._payload), + deepcopy(self._headers), + self._timestamp, + self._schema, ) def __getstate__(self) -> Mapping[str, Any]: @@ -154,17 +170,21 @@ def __getstate__(self) -> Mapping[str, Any]: } def __setstate__(self, state: Mapping[str, Any]) -> None: - self.inner = PyAnyMessage( - state["payload"], - state["headers"], - state["timestamp"], - state.get("schema"), - ) + self._payload = state["payload"] + self._headers = state["headers"] + self._timestamp = state["timestamp"] + self._schema = state.get("schema") + self._cached_inner = None class PyRawMessage(Message[bytes]): """ - A wrapper for the Rust RawMessage so `RawMessage` extends `Messasge`. + A wrapper for the Rust RawMessage so ``RawMessage`` extends ``Message``. + + The Rust ``RawMessage`` is built on first :meth:`to_inner` and then cached, + so constructing or copying this wrapper avoids pyo3 work until a delegate + needs the inner type. Pickle serializes only the Python fields; the cache is + cleared on unpickle so the inner is recreated when needed. """ def __init__( @@ -174,42 +194,54 @@ def __init__( timestamp: float, schema: Optional[str] = None, ) -> None: - self.inner = RawMessage(payload, headers, timestamp, schema) + self._payload = payload + self._headers: Sequence[Tuple[str, bytes]] = headers + self._timestamp = timestamp + self._schema = schema + self._cached_inner: RawMessage | None = None @property def payload(self) -> bytes: - return self.inner.payload + return self._payload @property def headers(self) -> Sequence[Tuple[str, bytes]]: - return self.inner.headers + return self._headers @property def timestamp(self) -> float: - return self.inner.timestamp + return self._timestamp @property def schema(self) -> str | None: - return self.inner.schema + return self._schema def size(self) -> int | None: - return len(self.inner.payload) + return len(self._payload) def __repr__(self) -> str: - return f"RawMessage({self.inner.__repr__()})" + return ( + f"RawMessage(RawMessage(payload={self._payload!r}, " + f"headers={self._headers!r}, timestamp={self._timestamp}, " + f"schema={self._schema!r}))" + ) def __str__(self) -> str: return repr(self) - def to_inner(self) -> RustMessage: - return self.inner + def to_inner(self) -> RawMessage: + if self._cached_inner is None: + self._cached_inner = RawMessage( + self._payload, self._headers, self._timestamp, self._schema + ) + return self._cached_inner def deepcopy(self) -> PyRawMessage: return PyRawMessage( - deepcopy(self.inner.payload), - deepcopy(self.inner.headers), - self.inner.timestamp, - self.inner.schema, + deepcopy(self._payload), + deepcopy(self._headers), + self._timestamp, + self._schema, ) def __getstate__(self) -> Mapping[str, Any]: @@ -221,12 +253,11 @@ def __getstate__(self) -> Mapping[str, Any]: } def __setstate__(self, state: Mapping[str, Any]) -> None: - self.inner = RawMessage( - state["payload"], - state["headers"], - state["timestamp"], - state.get("schema"), - ) + self._payload = state["payload"] + self._headers = state["headers"] + self._timestamp = state["timestamp"] + self._schema = state.get("schema") + self._cached_inner = None def rust_msg_equals(msg: RustMessage, other: RustMessage) -> bool: diff --git a/sentry_streams/sentry_streams/rust_streams.pyi b/sentry_streams/sentry_streams/rust_streams.pyi index ab619604..c5b340be 100644 --- a/sentry_streams/sentry_streams/rust_streams.pyi +++ b/sentry_streams/sentry_streams/rust_streams.pyi @@ -132,7 +132,7 @@ class PyAnyMessage: def __init__( self, payload: Any, - headers: Sequence[Tuple[str, bytes]], + headers: Sequence[Tuple[str, bytes]] | None, timestamp: float, schema: str | None, ) -> None: ... @@ -149,7 +149,7 @@ class RawMessage: def __init__( self, payload: bytes, - headers: Sequence[Tuple[str, bytes]], + headers: Sequence[Tuple[str, bytes]] | None, timestamp: float, schema: str | None, ) -> None: ... diff --git a/sentry_streams/src/messages.rs b/sentry_streams/src/messages.rs index 86ec8799..9c6c6bb2 100644 --- a/sentry_streams/src/messages.rs +++ b/sentry_streams/src/messages.rs @@ -200,14 +200,18 @@ impl PyAnyMessage { #[new] pub fn new( payload: Py, - headers: Py, + headers: Option>, timestamp: f64, schema: Option, py: Python<'_>, ) -> PyResult { Ok(Self { payload, - headers: headers_to_vec(py, headers)?, + // Optimization to avoid running the python code if there are no headers. + headers: match headers { + Some(h) => headers_to_vec(py, h)?, + None => Vec::new(), + }, timestamp, schema, }) @@ -265,14 +269,18 @@ impl RawMessage { #[new] pub fn new( payload: Py, - headers: Py, + headers: Option>, timestamp: f64, schema: Option, py: Python, ) -> PyResult { Ok(Self { payload: payload.as_bytes(py).to_vec(), - headers: headers_to_vec(py, headers)?, + // Optimization to avoid running the python code if there are no headers. + headers: match headers { + Some(h) => headers_to_vec(py, h)?, + None => Vec::new(), + }, timestamp, schema, }) @@ -580,7 +588,7 @@ mod tests { // Create PyAnyMessage let msg = PyAnyMessage::new( payload.clone_ref(py), - py_headers.clone_ref(py), + Some(py_headers.clone_ref(py)), timestamp, schema.clone(), py, @@ -636,7 +644,7 @@ mod tests { // Create RawMessage let msg = RawMessage::new( py_payload.unbind(), - py_headers.clone_ref(py), + Some(py_headers.clone_ref(py)), timestamp, schema.clone(), py, @@ -730,9 +738,14 @@ mod tests { let py_headers = headers_to_sequence(py, &headers).unwrap(); let payload_bytes = vec![100, 101, 102, 103]; let py_payload = PyBytes::new(py, &payload_bytes); - let raw_msg = - RawMessage::new(py_payload.unbind(), py_headers.clone_ref(py), 0., None, py) - .unwrap(); + let raw_msg = RawMessage::new( + py_payload.unbind(), + Some(py_headers.clone_ref(py)), + 0., + None, + py, + ) + .unwrap(); let py_raw_msg = raw_msg.into_pyobject(py).unwrap().unbind(); let msg = PyStreamingMessage::RawMessage { content: py_raw_msg, diff --git a/sentry_streams/tests/adapters/arroyo/test_multi_process_delegate.py b/sentry_streams/tests/adapters/arroyo/test_multi_process_delegate.py index ad3424c4..919e42f0 100644 --- a/sentry_streams/tests/adapters/arroyo/test_multi_process_delegate.py +++ b/sentry_streams/tests/adapters/arroyo/test_multi_process_delegate.py @@ -108,7 +108,7 @@ def test_integration() -> None: expected = PyMessage( "transformed foo", headers=[("h", "v".encode())], timestamp=123, schema="s" - ).inner + ).to_inner() assert ret_msg.payload == expected.payload assert ret_msg.headers == expected.headers diff --git a/sentry_streams/tests/adapters/arroyo/test_steps_chain.py b/sentry_streams/tests/adapters/arroyo/test_steps_chain.py index 59114bd1..5c145523 100644 --- a/sentry_streams/tests/adapters/arroyo/test_steps_chain.py +++ b/sentry_streams/tests/adapters/arroyo/test_steps_chain.py @@ -158,7 +158,7 @@ def transformer(msg: ArroyoMessage[Message[Any]]) -> Message[Any]: expected = PyMessage( "foo_t1_t2", headers=[("h", "v".encode())], timestamp=123, schema="s" - ).inner + ).to_inner() ret_msg = cast(PyAnyMessage, ret_msg) assert ret_msg.payload == expected.payload diff --git a/sentry_streams/tests/pipeline/test_message.py b/sentry_streams/tests/pipeline/test_message.py index bb90ac47..528d39f8 100644 --- a/sentry_streams/tests/pipeline/test_message.py +++ b/sentry_streams/tests/pipeline/test_message.py @@ -44,7 +44,10 @@ def test_message_access( assert str(message) == repr(message) - assert isinstance(message.to_inner(), expected_rust_type) + inner1 = message.to_inner() + inner2 = message.to_inner() + assert isinstance(inner1, expected_rust_type) + assert inner1 is inner2 copy = message.deepcopy() assert id(copy) != id(message) From 54809b86202f628b7bd377d0f5107b44e4765651 Mon Sep 17 00:00:00 2001 From: Filippo Pacifici Date: Wed, 22 Apr 2026 14:56:33 -0700 Subject: [PATCH 2/2] Revert the change in headers --- .../sentry_streams/rust_streams.pyi | 4 +-- sentry_streams/src/messages.rs | 29 ++++++------------- 2 files changed, 11 insertions(+), 22 deletions(-) diff --git a/sentry_streams/sentry_streams/rust_streams.pyi b/sentry_streams/sentry_streams/rust_streams.pyi index c5b340be..ab619604 100644 --- a/sentry_streams/sentry_streams/rust_streams.pyi +++ b/sentry_streams/sentry_streams/rust_streams.pyi @@ -132,7 +132,7 @@ class PyAnyMessage: def __init__( self, payload: Any, - headers: Sequence[Tuple[str, bytes]] | None, + headers: Sequence[Tuple[str, bytes]], timestamp: float, schema: str | None, ) -> None: ... @@ -149,7 +149,7 @@ class RawMessage: def __init__( self, payload: bytes, - headers: Sequence[Tuple[str, bytes]] | None, + headers: Sequence[Tuple[str, bytes]], timestamp: float, schema: str | None, ) -> None: ... diff --git a/sentry_streams/src/messages.rs b/sentry_streams/src/messages.rs index 9c6c6bb2..103e4560 100644 --- a/sentry_streams/src/messages.rs +++ b/sentry_streams/src/messages.rs @@ -200,7 +200,7 @@ impl PyAnyMessage { #[new] pub fn new( payload: Py, - headers: Option>, + headers: Py, timestamp: f64, schema: Option, py: Python<'_>, @@ -208,10 +208,7 @@ impl PyAnyMessage { Ok(Self { payload, // Optimization to avoid running the python code if there are no headers. - headers: match headers { - Some(h) => headers_to_vec(py, h)?, - None => Vec::new(), - }, + headers: headers_to_vec(py, headers)?, timestamp, schema, }) @@ -269,7 +266,7 @@ impl RawMessage { #[new] pub fn new( payload: Py, - headers: Option>, + headers: Py, timestamp: f64, schema: Option, py: Python, @@ -277,10 +274,7 @@ impl RawMessage { Ok(Self { payload: payload.as_bytes(py).to_vec(), // Optimization to avoid running the python code if there are no headers. - headers: match headers { - Some(h) => headers_to_vec(py, h)?, - None => Vec::new(), - }, + headers: headers_to_vec(py, headers)?, timestamp, schema, }) @@ -588,7 +582,7 @@ mod tests { // Create PyAnyMessage let msg = PyAnyMessage::new( payload.clone_ref(py), - Some(py_headers.clone_ref(py)), + py_headers.clone_ref(py), timestamp, schema.clone(), py, @@ -644,7 +638,7 @@ mod tests { // Create RawMessage let msg = RawMessage::new( py_payload.unbind(), - Some(py_headers.clone_ref(py)), + py_headers.clone_ref(py), timestamp, schema.clone(), py, @@ -738,14 +732,9 @@ mod tests { let py_headers = headers_to_sequence(py, &headers).unwrap(); let payload_bytes = vec![100, 101, 102, 103]; let py_payload = PyBytes::new(py, &payload_bytes); - let raw_msg = RawMessage::new( - py_payload.unbind(), - Some(py_headers.clone_ref(py)), - 0., - None, - py, - ) - .unwrap(); + let raw_msg = + RawMessage::new(py_payload.unbind(), py_headers.clone_ref(py), 0., None, py) + .unwrap(); let py_raw_msg = raw_msg.into_pyobject(py).unwrap().unbind(); let msg = PyStreamingMessage::RawMessage { content: py_raw_msg,