Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 72 additions & 41 deletions sentry_streams/sentry_streams/pipeline/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
Tuple,
TypeVar,
Union,
cast,
)

from sentry_streams.rust_streams import PyAnyMessage, PyWatermark, RawMessage
Expand Down Expand Up @@ -96,6 +95,11 @@ class PyMessage(Generic[TPayload], Message[TPayload]):
By making this type generic we can still get a lot of guarantees
by the type checker when wiring python primitives together as
it can ensure primitives are compatible with each other.

The Rust ``PyAnyMessage`` is built on first :meth:`to_inner` and then cached,
so normal use of the Python wrapper avoids pyo3 allocation until a step
needs the inner type. Pickle serializes only the Python fields; the cache is
cleared on unpickle so the inner is recreated when needed.
"""

def __init__(
Expand All @@ -105,44 +109,56 @@ def __init__(
timestamp: float,
schema: Optional[str] = None,
) -> None:
self.inner = PyAnyMessage(payload, headers, timestamp, schema)
self._payload: TPayload = payload
self._headers: Sequence[Tuple[str, bytes]] = headers
self._timestamp = timestamp
self._schema = schema
self._cached_inner: PyAnyMessage | None = None

@property
def payload(self) -> TPayload:
return cast(TPayload, self.inner.payload)
return self._payload

@property
def headers(self) -> Sequence[Tuple[str, bytes]]:
return self.inner.headers
return self._headers

@property
def timestamp(self) -> float:
return self.inner.timestamp
return self._timestamp

@property
def schema(self) -> str | None:
return self.inner.schema
return self._schema

def size(self) -> int | None:
if isinstance(self.inner.payload, (str, bytes)):
return len(self.inner.payload)
if isinstance(self._payload, (str, bytes)):
return len(self._payload)
return None

def __repr__(self) -> str:
return f"PyMessage({self.inner.__repr__()})"
return (
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason not to call to_inner and then repr that inner object directly?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't that trigger the expensive allocation?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's the reason. Instantiating the rust object causes memory copy, that is what caused the issue.

f"PyMessage(PyAnyMessage(payload={self._payload!r}, "
f"headers={self._headers!r}, timestamp={self._timestamp}, "
f"schema={self._schema!r}))"
)

def __str__(self) -> str:
return repr(self)

def to_inner(self) -> RustMessage:
return self.inner
def to_inner(self) -> PyAnyMessage:
if self._cached_inner is None:
self._cached_inner = PyAnyMessage(
self._payload, self._headers, self._timestamp, self._schema
)
return self._cached_inner

def deepcopy(self) -> PyMessage[TPayload]:
return PyMessage(
deepcopy(self.inner.payload),
deepcopy(self.inner.headers),
self.inner.timestamp,
self.inner.schema,
deepcopy(self._payload),
deepcopy(self._headers),
self._timestamp,
self._schema,
)

def __getstate__(self) -> Mapping[str, Any]:
Expand All @@ -154,17 +170,21 @@ def __getstate__(self) -> Mapping[str, Any]:
}

def __setstate__(self, state: Mapping[str, Any]) -> None:
self.inner = PyAnyMessage(
state["payload"],
state["headers"],
state["timestamp"],
state.get("schema"),
)
self._payload = state["payload"]
self._headers = state["headers"]
self._timestamp = state["timestamp"]
self._schema = state.get("schema")
self._cached_inner = None


class PyRawMessage(Message[bytes]):
"""
A wrapper for the Rust RawMessage so `RawMessage` extends `Messasge`.
A wrapper for the Rust RawMessage so ``RawMessage`` extends ``Message``.

The Rust ``RawMessage`` is built on first :meth:`to_inner` and then cached,
so constructing or copying this wrapper avoids pyo3 work until a delegate
needs the inner type. Pickle serializes only the Python fields; the cache is
cleared on unpickle so the inner is recreated when needed.
"""

def __init__(
Expand All @@ -174,42 +194,54 @@ def __init__(
timestamp: float,
schema: Optional[str] = None,
) -> None:
self.inner = RawMessage(payload, headers, timestamp, schema)
self._payload = payload
self._headers: Sequence[Tuple[str, bytes]] = headers
self._timestamp = timestamp
self._schema = schema
self._cached_inner: RawMessage | None = None

@property
def payload(self) -> bytes:
return self.inner.payload
return self._payload

@property
def headers(self) -> Sequence[Tuple[str, bytes]]:
return self.inner.headers
return self._headers

@property
def timestamp(self) -> float:
return self.inner.timestamp
return self._timestamp

@property
def schema(self) -> str | None:
return self.inner.schema
return self._schema

def size(self) -> int | None:
return len(self.inner.payload)
return len(self._payload)

def __repr__(self) -> str:
return f"RawMessage({self.inner.__repr__()})"
return (
f"RawMessage(RawMessage(payload={self._payload!r}, "
f"headers={self._headers!r}, timestamp={self._timestamp}, "
f"schema={self._schema!r}))"
)

def __str__(self) -> str:
return repr(self)

def to_inner(self) -> RustMessage:
return self.inner
def to_inner(self) -> RawMessage:
if self._cached_inner is None:
self._cached_inner = RawMessage(
self._payload, self._headers, self._timestamp, self._schema
)
return self._cached_inner

def deepcopy(self) -> PyRawMessage:
return PyRawMessage(
deepcopy(self.inner.payload),
deepcopy(self.inner.headers),
self.inner.timestamp,
self.inner.schema,
deepcopy(self._payload),
deepcopy(self._headers),
self._timestamp,
self._schema,
)

def __getstate__(self) -> Mapping[str, Any]:
Expand All @@ -221,12 +253,11 @@ def __getstate__(self) -> Mapping[str, Any]:
}

def __setstate__(self, state: Mapping[str, Any]) -> None:
self.inner = RawMessage(
state["payload"],
state["headers"],
state["timestamp"],
state.get("schema"),
)
self._payload = state["payload"]
self._headers = state["headers"]
self._timestamp = state["timestamp"]
self._schema = state.get("schema")
self._cached_inner = None


def rust_msg_equals(msg: RustMessage, other: RustMessage) -> bool:
Expand Down
2 changes: 2 additions & 0 deletions sentry_streams/src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ impl PyAnyMessage {
) -> PyResult<Self> {
Ok(Self {
payload,
// Optimization to avoid running the python code if there are no headers.
headers: headers_to_vec(py, headers)?,
timestamp,
schema,
Expand Down Expand Up @@ -272,6 +273,7 @@ impl RawMessage {
) -> PyResult<Self> {
Ok(Self {
payload: payload.as_bytes(py).to_vec(),
// Optimization to avoid running the python code if there are no headers.
headers: headers_to_vec(py, headers)?,
timestamp,
schema,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ def test_integration() -> None:

expected = PyMessage(
"transformed foo", headers=[("h", "v".encode())], timestamp=123, schema="s"
).inner
).to_inner()

assert ret_msg.payload == expected.payload
assert ret_msg.headers == expected.headers
Expand Down
2 changes: 1 addition & 1 deletion sentry_streams/tests/adapters/arroyo/test_steps_chain.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ def transformer(msg: ArroyoMessage[Message[Any]]) -> Message[Any]:

expected = PyMessage(
"foo_t1_t2", headers=[("h", "v".encode())], timestamp=123, schema="s"
).inner
).to_inner()

ret_msg = cast(PyAnyMessage, ret_msg)
assert ret_msg.payload == expected.payload
Expand Down
5 changes: 4 additions & 1 deletion sentry_streams/tests/pipeline/test_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,10 @@ def test_message_access(

assert str(message) == repr(message)

assert isinstance(message.to_inner(), expected_rust_type)
inner1 = message.to_inner()
inner2 = message.to_inner()
assert isinstance(inner1, expected_rust_type)
assert inner1 is inner2

copy = message.deepcopy()
assert id(copy) != id(message)
Expand Down
Loading