diff --git a/sentry_streams/sentry_streams/pipeline/message.py b/sentry_streams/sentry_streams/pipeline/message.py index 81a15d8a..a884bfa6 100644 --- a/sentry_streams/sentry_streams/pipeline/message.py +++ b/sentry_streams/sentry_streams/pipeline/message.py @@ -11,7 +11,6 @@ Tuple, TypeVar, Union, - cast, ) from sentry_streams.rust_streams import PyAnyMessage, PyWatermark, RawMessage @@ -96,6 +95,11 @@ class PyMessage(Generic[TPayload], Message[TPayload]): By making this type generic we can still get a lot of guarantees by the type checker when wiring python primitives together as it can ensure primitives are compatible with each other. + + The Rust ``PyAnyMessage`` is built on first :meth:`to_inner` and then cached, + so normal use of the Python wrapper avoids pyo3 allocation until a step + needs the inner type. Pickle serializes only the Python fields; the cache is + cleared on unpickle so the inner is recreated when needed. """ def __init__( @@ -105,44 +109,56 @@ def __init__( timestamp: float, schema: Optional[str] = None, ) -> None: - self.inner = PyAnyMessage(payload, headers, timestamp, schema) + self._payload: TPayload = payload + self._headers: Sequence[Tuple[str, bytes]] = headers + self._timestamp = timestamp + self._schema = schema + self._cached_inner: PyAnyMessage | None = None @property def payload(self) -> TPayload: - return cast(TPayload, self.inner.payload) + return self._payload @property def headers(self) -> Sequence[Tuple[str, bytes]]: - return self.inner.headers + return self._headers @property def timestamp(self) -> float: - return self.inner.timestamp + return self._timestamp @property def schema(self) -> str | None: - return self.inner.schema + return self._schema def size(self) -> int | None: - if isinstance(self.inner.payload, (str, bytes)): - return len(self.inner.payload) + if isinstance(self._payload, (str, bytes)): + return len(self._payload) return None def __repr__(self) -> str: - return f"PyMessage({self.inner.__repr__()})" + return ( + f"PyMessage(PyAnyMessage(payload={self._payload!r}, " + f"headers={self._headers!r}, timestamp={self._timestamp}, " + f"schema={self._schema!r}))" + ) def __str__(self) -> str: return repr(self) - def to_inner(self) -> RustMessage: - return self.inner + def to_inner(self) -> PyAnyMessage: + if self._cached_inner is None: + self._cached_inner = PyAnyMessage( + self._payload, self._headers, self._timestamp, self._schema + ) + return self._cached_inner def deepcopy(self) -> PyMessage[TPayload]: return PyMessage( - deepcopy(self.inner.payload), - deepcopy(self.inner.headers), - self.inner.timestamp, - self.inner.schema, + deepcopy(self._payload), + deepcopy(self._headers), + self._timestamp, + self._schema, ) def __getstate__(self) -> Mapping[str, Any]: @@ -154,17 +170,21 @@ def __getstate__(self) -> Mapping[str, Any]: } def __setstate__(self, state: Mapping[str, Any]) -> None: - self.inner = PyAnyMessage( - state["payload"], - state["headers"], - state["timestamp"], - state.get("schema"), - ) + self._payload = state["payload"] + self._headers = state["headers"] + self._timestamp = state["timestamp"] + self._schema = state.get("schema") + self._cached_inner = None class PyRawMessage(Message[bytes]): """ - A wrapper for the Rust RawMessage so `RawMessage` extends `Messasge`. + A wrapper for the Rust RawMessage so ``RawMessage`` extends ``Message``. + + The Rust ``RawMessage`` is built on first :meth:`to_inner` and then cached, + so constructing or copying this wrapper avoids pyo3 work until a delegate + needs the inner type. Pickle serializes only the Python fields; the cache is + cleared on unpickle so the inner is recreated when needed. """ def __init__( @@ -174,42 +194,54 @@ def __init__( timestamp: float, schema: Optional[str] = None, ) -> None: - self.inner = RawMessage(payload, headers, timestamp, schema) + self._payload = payload + self._headers: Sequence[Tuple[str, bytes]] = headers + self._timestamp = timestamp + self._schema = schema + self._cached_inner: RawMessage | None = None @property def payload(self) -> bytes: - return self.inner.payload + return self._payload @property def headers(self) -> Sequence[Tuple[str, bytes]]: - return self.inner.headers + return self._headers @property def timestamp(self) -> float: - return self.inner.timestamp + return self._timestamp @property def schema(self) -> str | None: - return self.inner.schema + return self._schema def size(self) -> int | None: - return len(self.inner.payload) + return len(self._payload) def __repr__(self) -> str: - return f"RawMessage({self.inner.__repr__()})" + return ( + f"RawMessage(RawMessage(payload={self._payload!r}, " + f"headers={self._headers!r}, timestamp={self._timestamp}, " + f"schema={self._schema!r}))" + ) def __str__(self) -> str: return repr(self) - def to_inner(self) -> RustMessage: - return self.inner + def to_inner(self) -> RawMessage: + if self._cached_inner is None: + self._cached_inner = RawMessage( + self._payload, self._headers, self._timestamp, self._schema + ) + return self._cached_inner def deepcopy(self) -> PyRawMessage: return PyRawMessage( - deepcopy(self.inner.payload), - deepcopy(self.inner.headers), - self.inner.timestamp, - self.inner.schema, + deepcopy(self._payload), + deepcopy(self._headers), + self._timestamp, + self._schema, ) def __getstate__(self) -> Mapping[str, Any]: @@ -221,12 +253,11 @@ def __getstate__(self) -> Mapping[str, Any]: } def __setstate__(self, state: Mapping[str, Any]) -> None: - self.inner = RawMessage( - state["payload"], - state["headers"], - state["timestamp"], - state.get("schema"), - ) + self._payload = state["payload"] + self._headers = state["headers"] + self._timestamp = state["timestamp"] + self._schema = state.get("schema") + self._cached_inner = None def rust_msg_equals(msg: RustMessage, other: RustMessage) -> bool: diff --git a/sentry_streams/src/messages.rs b/sentry_streams/src/messages.rs index 86ec8799..103e4560 100644 --- a/sentry_streams/src/messages.rs +++ b/sentry_streams/src/messages.rs @@ -207,6 +207,7 @@ impl PyAnyMessage { ) -> PyResult { Ok(Self { payload, + // Optimization to avoid running the python code if there are no headers. headers: headers_to_vec(py, headers)?, timestamp, schema, @@ -272,6 +273,7 @@ impl RawMessage { ) -> PyResult { Ok(Self { payload: payload.as_bytes(py).to_vec(), + // Optimization to avoid running the python code if there are no headers. headers: headers_to_vec(py, headers)?, timestamp, schema, diff --git a/sentry_streams/tests/adapters/arroyo/test_multi_process_delegate.py b/sentry_streams/tests/adapters/arroyo/test_multi_process_delegate.py index ad3424c4..919e42f0 100644 --- a/sentry_streams/tests/adapters/arroyo/test_multi_process_delegate.py +++ b/sentry_streams/tests/adapters/arroyo/test_multi_process_delegate.py @@ -108,7 +108,7 @@ def test_integration() -> None: expected = PyMessage( "transformed foo", headers=[("h", "v".encode())], timestamp=123, schema="s" - ).inner + ).to_inner() assert ret_msg.payload == expected.payload assert ret_msg.headers == expected.headers diff --git a/sentry_streams/tests/adapters/arroyo/test_steps_chain.py b/sentry_streams/tests/adapters/arroyo/test_steps_chain.py index 59114bd1..5c145523 100644 --- a/sentry_streams/tests/adapters/arroyo/test_steps_chain.py +++ b/sentry_streams/tests/adapters/arroyo/test_steps_chain.py @@ -158,7 +158,7 @@ def transformer(msg: ArroyoMessage[Message[Any]]) -> Message[Any]: expected = PyMessage( "foo_t1_t2", headers=[("h", "v".encode())], timestamp=123, schema="s" - ).inner + ).to_inner() ret_msg = cast(PyAnyMessage, ret_msg) assert ret_msg.payload == expected.payload diff --git a/sentry_streams/tests/pipeline/test_message.py b/sentry_streams/tests/pipeline/test_message.py index bb90ac47..528d39f8 100644 --- a/sentry_streams/tests/pipeline/test_message.py +++ b/sentry_streams/tests/pipeline/test_message.py @@ -44,7 +44,10 @@ def test_message_access( assert str(message) == repr(message) - assert isinstance(message.to_inner(), expected_rust_type) + inner1 = message.to_inner() + inner2 = message.to_inner() + assert isinstance(inner1, expected_rust_type) + assert inner1 is inner2 copy = message.deepcopy() assert id(copy) != id(message)