From 4f77640ed01be6753f1f376fdf6006a0a86f9230 Mon Sep 17 00:00:00 2001 From: Filippo Pacifici Date: Wed, 22 Apr 2026 13:57:26 -0700 Subject: [PATCH 1/4] Optimize message --- .../sentry_streams/rust_streams.pyi | 4 ++-- sentry_streams/src/messages.rs | 19 ++++++++++++------- 2 files changed, 14 insertions(+), 9 deletions(-) diff --git a/sentry_streams/sentry_streams/rust_streams.pyi b/sentry_streams/sentry_streams/rust_streams.pyi index ab619604..c5b340be 100644 --- a/sentry_streams/sentry_streams/rust_streams.pyi +++ b/sentry_streams/sentry_streams/rust_streams.pyi @@ -132,7 +132,7 @@ class PyAnyMessage: def __init__( self, payload: Any, - headers: Sequence[Tuple[str, bytes]], + headers: Sequence[Tuple[str, bytes]] | None, timestamp: float, schema: str | None, ) -> None: ... @@ -149,7 +149,7 @@ class RawMessage: def __init__( self, payload: bytes, - headers: Sequence[Tuple[str, bytes]], + headers: Sequence[Tuple[str, bytes]] | None, timestamp: float, schema: str | None, ) -> None: ... diff --git a/sentry_streams/src/messages.rs b/sentry_streams/src/messages.rs index 103e4560..480d6e0c 100644 --- a/sentry_streams/src/messages.rs +++ b/sentry_streams/src/messages.rs @@ -200,7 +200,7 @@ impl PyAnyMessage { #[new] pub fn new( payload: Py, - headers: Py, + headers: Option>, timestamp: f64, schema: Option, py: Python<'_>, @@ -266,7 +266,7 @@ impl RawMessage { #[new] pub fn new( payload: Py, - headers: Py, + headers: Option>, timestamp: f64, schema: Option, py: Python, @@ -582,7 +582,7 @@ mod tests { // Create PyAnyMessage let msg = PyAnyMessage::new( payload.clone_ref(py), - py_headers.clone_ref(py), + Some(py_headers.clone_ref(py)), timestamp, schema.clone(), py, @@ -638,7 +638,7 @@ mod tests { // Create RawMessage let msg = RawMessage::new( py_payload.unbind(), - py_headers.clone_ref(py), + Some(py_headers.clone_ref(py)), timestamp, schema.clone(), py, @@ -732,9 +732,14 @@ mod tests { let py_headers = headers_to_sequence(py, &headers).unwrap(); let payload_bytes = vec![100, 101, 102, 103]; let py_payload = PyBytes::new(py, &payload_bytes); - let raw_msg = - RawMessage::new(py_payload.unbind(), py_headers.clone_ref(py), 0., None, py) - .unwrap(); + let raw_msg = RawMessage::new( + py_payload.unbind(), + Some(py_headers.clone_ref(py)), + 0., + None, + py, + ) + .unwrap(); let py_raw_msg = raw_msg.into_pyobject(py).unwrap().unbind(); let msg = PyStreamingMessage::RawMessage { content: py_raw_msg, From dec471ae053b5134297489a35b67f87ec3013d73 Mon Sep 17 00:00:00 2001 From: Filippo Pacifici Date: Wed, 22 Apr 2026 14:56:33 -0700 Subject: [PATCH 2/4] Revert the change in headers --- .../sentry_streams/rust_streams.pyi | 4 ++-- sentry_streams/src/messages.rs | 19 +++++++------------ 2 files changed, 9 insertions(+), 14 deletions(-) diff --git a/sentry_streams/sentry_streams/rust_streams.pyi b/sentry_streams/sentry_streams/rust_streams.pyi index c5b340be..ab619604 100644 --- a/sentry_streams/sentry_streams/rust_streams.pyi +++ b/sentry_streams/sentry_streams/rust_streams.pyi @@ -132,7 +132,7 @@ class PyAnyMessage: def __init__( self, payload: Any, - headers: Sequence[Tuple[str, bytes]] | None, + headers: Sequence[Tuple[str, bytes]], timestamp: float, schema: str | None, ) -> None: ... @@ -149,7 +149,7 @@ class RawMessage: def __init__( self, payload: bytes, - headers: Sequence[Tuple[str, bytes]] | None, + headers: Sequence[Tuple[str, bytes]], timestamp: float, schema: str | None, ) -> None: ... diff --git a/sentry_streams/src/messages.rs b/sentry_streams/src/messages.rs index 480d6e0c..103e4560 100644 --- a/sentry_streams/src/messages.rs +++ b/sentry_streams/src/messages.rs @@ -200,7 +200,7 @@ impl PyAnyMessage { #[new] pub fn new( payload: Py, - headers: Option>, + headers: Py, timestamp: f64, schema: Option, py: Python<'_>, @@ -266,7 +266,7 @@ impl RawMessage { #[new] pub fn new( payload: Py, - headers: Option>, + headers: Py, timestamp: f64, schema: Option, py: Python, @@ -582,7 +582,7 @@ mod tests { // Create PyAnyMessage let msg = PyAnyMessage::new( payload.clone_ref(py), - Some(py_headers.clone_ref(py)), + py_headers.clone_ref(py), timestamp, schema.clone(), py, @@ -638,7 +638,7 @@ mod tests { // Create RawMessage let msg = RawMessage::new( py_payload.unbind(), - Some(py_headers.clone_ref(py)), + py_headers.clone_ref(py), timestamp, schema.clone(), py, @@ -732,14 +732,9 @@ mod tests { let py_headers = headers_to_sequence(py, &headers).unwrap(); let payload_bytes = vec![100, 101, 102, 103]; let py_payload = PyBytes::new(py, &payload_bytes); - let raw_msg = RawMessage::new( - py_payload.unbind(), - Some(py_headers.clone_ref(py)), - 0., - None, - py, - ) - .unwrap(); + let raw_msg = + RawMessage::new(py_payload.unbind(), py_headers.clone_ref(py), 0., None, py) + .unwrap(); let py_raw_msg = raw_msg.into_pyobject(py).unwrap().unbind(); let msg = PyStreamingMessage::RawMessage { content: py_raw_msg, From 9e174dc98e7474563752c1375cdf55640fa5476a Mon Sep 17 00:00:00 2001 From: Filippo Pacifici Date: Wed, 22 Apr 2026 14:19:09 -0700 Subject: [PATCH 3/4] Disable headers --- .../adapters/arroyo/multi_process_delegate.py | 6 ++---- .../adapters/arroyo/reduce_delegate.py | 6 ++---- .../sentry_streams/adapters/arroyo/steps_chain.py | 4 ++-- sentry_streams/sentry_streams/pipeline/message.py | 12 ++++-------- 4 files changed, 10 insertions(+), 18 deletions(-) diff --git a/sentry_streams/sentry_streams/adapters/arroyo/multi_process_delegate.py b/sentry_streams/sentry_streams/adapters/arroyo/multi_process_delegate.py index 24af795a..fe1f8159 100644 --- a/sentry_streams/sentry_streams/adapters/arroyo/multi_process_delegate.py +++ b/sentry_streams/sentry_streams/adapters/arroyo/multi_process_delegate.py @@ -75,11 +75,9 @@ def rust_to_arroyo_msg( for partition, offset in committable.items() } if isinstance(message, PyAnyMessage): - to_send: Message[Any] = PyMessage( - message.payload, message.headers, message.timestamp, message.schema - ) + to_send: Message[Any] = PyMessage(message.payload, [], message.timestamp, message.schema) elif isinstance(message, RawMessage): - to_send = PyRawMessage(message.payload, message.headers, message.timestamp, message.schema) + to_send = PyRawMessage(message.payload, [], message.timestamp, message.schema) msg = ArroyoMessage( Value( diff --git a/sentry_streams/sentry_streams/adapters/arroyo/reduce_delegate.py b/sentry_streams/sentry_streams/adapters/arroyo/reduce_delegate.py index d30a9ad9..c753b41e 100644 --- a/sentry_streams/sentry_streams/adapters/arroyo/reduce_delegate.py +++ b/sentry_streams/sentry_streams/adapters/arroyo/reduce_delegate.py @@ -39,11 +39,9 @@ def rust_msg_to_arroyo_reduce( } if isinstance(message, PyAnyMessage): - to_send: Message[Any] = PyMessage( - message.payload, message.headers, message.timestamp, message.schema - ) + to_send: Message[Any] = PyMessage(message.payload, [], message.timestamp, message.schema) elif isinstance(message, RawMessage): - to_send = PyRawMessage(message.payload, message.headers, message.timestamp, message.schema) + to_send = PyRawMessage(message.payload, [], message.timestamp, message.schema) msg = ArroyoMessage( Value( diff --git a/sentry_streams/sentry_streams/adapters/arroyo/steps_chain.py b/sentry_streams/sentry_streams/adapters/arroyo/steps_chain.py index 278c9bb0..92d7e937 100644 --- a/sentry_streams/sentry_streams/adapters/arroyo/steps_chain.py +++ b/sentry_streams/sentry_streams/adapters/arroyo/steps_chain.py @@ -26,14 +26,14 @@ def transform(chain: Sequence[Map[Any, Any]], message: Message[Any]) -> Message[ # Thus TMapOut = bytes. next_msg = PyRawMessage( payload=ret, - headers=next_msg.headers, + headers=[], timestamp=next_msg.timestamp, schema=next_msg.schema, ) else: next_msg = PyMessage( payload=ret, - headers=next_msg.headers, + headers=[], timestamp=next_msg.timestamp, schema=next_msg.schema, ) diff --git a/sentry_streams/sentry_streams/pipeline/message.py b/sentry_streams/sentry_streams/pipeline/message.py index a884bfa6..48f400bb 100644 --- a/sentry_streams/sentry_streams/pipeline/message.py +++ b/sentry_streams/sentry_streams/pipeline/message.py @@ -110,7 +110,7 @@ def __init__( schema: Optional[str] = None, ) -> None: self._payload: TPayload = payload - self._headers: Sequence[Tuple[str, bytes]] = headers + self._headers: Sequence[Tuple[str, bytes]] = [] self._timestamp = timestamp self._schema = schema self._cached_inner: PyAnyMessage | None = None @@ -148,9 +148,7 @@ def __str__(self) -> str: def to_inner(self) -> PyAnyMessage: if self._cached_inner is None: - self._cached_inner = PyAnyMessage( - self._payload, self._headers, self._timestamp, self._schema - ) + self._cached_inner = PyAnyMessage(self._payload, None, self._timestamp, self._schema) return self._cached_inner def deepcopy(self) -> PyMessage[TPayload]: @@ -195,7 +193,7 @@ def __init__( schema: Optional[str] = None, ) -> None: self._payload = payload - self._headers: Sequence[Tuple[str, bytes]] = headers + self._headers: Sequence[Tuple[str, bytes]] = [] self._timestamp = timestamp self._schema = schema self._cached_inner: RawMessage | None = None @@ -231,9 +229,7 @@ def __str__(self) -> str: def to_inner(self) -> RawMessage: if self._cached_inner is None: - self._cached_inner = RawMessage( - self._payload, self._headers, self._timestamp, self._schema - ) + self._cached_inner = RawMessage(self._payload, None, self._timestamp, self._schema) return self._cached_inner def deepcopy(self) -> PyRawMessage: From fb8d2bb12d960fcedd22225802d4de37f1d92062 Mon Sep 17 00:00:00 2001 From: Filippo Pacifici Date: Wed, 22 Apr 2026 14:38:26 -0700 Subject: [PATCH 4/4] Remove headers --- .../sentry_streams/pipeline/message.py | 4 +-- sentry_streams/src/messages.rs | 32 +++++++++++-------- .../tests/adapters/arroyo/test_delegate.py | 4 +-- .../arroyo/test_multi_process_delegate.py | 10 ++---- .../tests/adapters/arroyo/test_steps_chain.py | 4 +-- sentry_streams/tests/pipeline/test_message.py | 3 +- 6 files changed, 28 insertions(+), 29 deletions(-) diff --git a/sentry_streams/sentry_streams/pipeline/message.py b/sentry_streams/sentry_streams/pipeline/message.py index 48f400bb..196c1a11 100644 --- a/sentry_streams/sentry_streams/pipeline/message.py +++ b/sentry_streams/sentry_streams/pipeline/message.py @@ -148,7 +148,7 @@ def __str__(self) -> str: def to_inner(self) -> PyAnyMessage: if self._cached_inner is None: - self._cached_inner = PyAnyMessage(self._payload, None, self._timestamp, self._schema) + self._cached_inner = PyAnyMessage(self._payload, [], self._timestamp, self._schema) return self._cached_inner def deepcopy(self) -> PyMessage[TPayload]: @@ -229,7 +229,7 @@ def __str__(self) -> str: def to_inner(self) -> RawMessage: if self._cached_inner is None: - self._cached_inner = RawMessage(self._payload, None, self._timestamp, self._schema) + self._cached_inner = RawMessage(self._payload, [], self._timestamp, self._schema) return self._cached_inner def deepcopy(self) -> PyRawMessage: diff --git a/sentry_streams/src/messages.rs b/sentry_streams/src/messages.rs index 103e4560..4cf246b9 100644 --- a/sentry_streams/src/messages.rs +++ b/sentry_streams/src/messages.rs @@ -44,6 +44,8 @@ use sentry_arroyo::types::Partition; use crate::committable::{convert_committable_to_py, convert_py_committable}; use crate::utils::traced_with_gil; +// Used from `mod tests` only; the library does not read headers from the Python constructor. +#[allow(dead_code)] pub fn headers_to_vec(py: Python<'_>, headers: Py) -> PyResult)>> { // Converts the Python consumable representation of the Message headers into // the Rust native representation (which is a Vec<(String, Vec)>). @@ -203,12 +205,11 @@ impl PyAnyMessage { headers: Py, timestamp: f64, schema: Option, - py: Python<'_>, ) -> PyResult { Ok(Self { payload, - // Optimization to avoid running the python code if there are no headers. - headers: headers_to_vec(py, headers)?, + // Kafka headers are not read from the constructor; keep an empty vec. + headers: Vec::new(), timestamp, schema, }) @@ -273,8 +274,8 @@ impl RawMessage { ) -> PyResult { Ok(Self { payload: payload.as_bytes(py).to_vec(), - // Optimization to avoid running the python code if there are no headers. - headers: headers_to_vec(py, headers)?, + // Kafka headers are not read from the constructor; keep an empty vec. + headers: Vec::new(), timestamp, schema, }) @@ -585,7 +586,6 @@ mod tests { py_headers.clone_ref(py), timestamp, schema.clone(), - py, ) .unwrap(); @@ -597,8 +597,8 @@ mod tests { let payload_val: String = msg.payload.bind(py).extract().unwrap(); assert_eq!(payload_val, "payload"); - // Check headers - assert_eq!(msg.headers, headers); + // Check headers (constructor ignores the argument; always empty) + assert!(msg.headers.is_empty()); let new_msg = msg.replace_payload("new_payload".into_py_any(py).unwrap()); @@ -613,7 +613,10 @@ mod tests { let repr = pymsg.call_method0(py, "__repr__").unwrap(); let expected_repr = format!( "PyAnyMessage(payload='{}', headers={:?}, timestamp={}, schema={:?})", - payload_val, headers, timestamp, schema + payload_val, + Vec::<(String, Vec)>::new(), + timestamp, + schema ); assert_eq!(repr.extract::(py).unwrap(), expected_repr); }); @@ -652,8 +655,8 @@ mod tests { // Check payload assert_eq!(msg.payload, payload_bytes); - // Check headers - assert_eq!(msg.headers, headers); + // Check headers (constructor ignores the argument; always empty) + assert!(msg.headers.is_empty()); // Test payload getter let py_payload_val = msg.payload(py).unwrap(); @@ -663,7 +666,7 @@ mod tests { // Test headers getter let py_headers_val = msg.headers(py).unwrap(); let headers_val = headers_to_vec(py, py_headers_val).unwrap(); - assert_eq!(headers_val, headers); + assert!(headers_val.is_empty()); // Replace payload via python let new_payload_bytes = vec![200, 201, 202]; @@ -680,7 +683,10 @@ mod tests { let repr = pymsg.call_method0(py, "__repr__").unwrap(); let expected_repr = format!( "RawMessage(payload={:?}, headers={:?}, timestamp={}, schema={:?})", - payload_bytes, headers, timestamp, schema + payload_bytes, + Vec::<(String, Vec)>::new(), + timestamp, + schema ); assert_eq!(repr.extract::(py).unwrap(), expected_repr); }); diff --git a/sentry_streams/tests/adapters/arroyo/test_delegate.py b/sentry_streams/tests/adapters/arroyo/test_delegate.py index 81951e48..4d560ce2 100644 --- a/sentry_streams/tests/adapters/arroyo/test_delegate.py +++ b/sentry_streams/tests/adapters/arroyo/test_delegate.py @@ -47,9 +47,7 @@ def _process_message(self, msg: RustMessage, committable: Committable) -> RustMe def test_rust_step() -> None: def make_msg(payload: str) -> RustMessage: - return PyAnyMessage( - payload=payload, headers=[("head", "val".encode())], timestamp=0, schema=None - ) + return PyAnyMessage(payload=payload, headers=[], timestamp=0, schema=None) step = SingleMessageTransformer() # Transform one message diff --git a/sentry_streams/tests/adapters/arroyo/test_multi_process_delegate.py b/sentry_streams/tests/adapters/arroyo/test_multi_process_delegate.py index 919e42f0..06516f6f 100644 --- a/sentry_streams/tests/adapters/arroyo/test_multi_process_delegate.py +++ b/sentry_streams/tests/adapters/arroyo/test_multi_process_delegate.py @@ -41,9 +41,7 @@ def test_process_message() -> None: result = process_message(transformer, msg) - assert result == PyMessage( - "transformed foo", headers=[("h", "v".encode())], timestamp=123, schema="s" - ) + assert result == PyMessage("transformed foo", headers=[], timestamp=123, schema="s") def test_mapped_msg_none() -> None: @@ -79,7 +77,7 @@ def test_rust_to_arroyo_msg_with_pyanymessage() -> None: assert isinstance(arroyo_msg.payload, PyMessage) assert arroyo_msg.payload.payload == "payload" - assert arroyo_msg.payload.headers == [("h", "v".encode())] + assert arroyo_msg.payload.headers == [] assert arroyo_msg.payload.timestamp == 123 assert arroyo_msg.payload.schema == "s" assert Partition(Topic("topic"), 0) in arroyo_msg.committable @@ -106,9 +104,7 @@ def test_integration() -> None: ret_msg, _ = ret[0] ret_msg = cast(PyAnyMessage, ret_msg) - expected = PyMessage( - "transformed foo", headers=[("h", "v".encode())], timestamp=123, schema="s" - ).to_inner() + expected = PyMessage("transformed foo", headers=[], timestamp=123, schema="s").to_inner() assert ret_msg.payload == expected.payload assert ret_msg.headers == expected.headers diff --git a/sentry_streams/tests/adapters/arroyo/test_steps_chain.py b/sentry_streams/tests/adapters/arroyo/test_steps_chain.py index 5c145523..24679cfd 100644 --- a/sentry_streams/tests/adapters/arroyo/test_steps_chain.py +++ b/sentry_streams/tests/adapters/arroyo/test_steps_chain.py @@ -156,9 +156,7 @@ def transformer(msg: ArroyoMessage[Message[Any]]) -> Message[Any]: assert len(ret) == 1 ret_msg, _ = ret[0] - expected = PyMessage( - "foo_t1_t2", headers=[("h", "v".encode())], timestamp=123, schema="s" - ).to_inner() + expected = PyMessage("foo_t1_t2", headers=[], timestamp=123, schema="s").to_inner() ret_msg = cast(PyAnyMessage, ret_msg) assert ret_msg.payload == expected.payload diff --git a/sentry_streams/tests/pipeline/test_message.py b/sentry_streams/tests/pipeline/test_message.py index 528d39f8..8cf85d2f 100644 --- a/sentry_streams/tests/pipeline/test_message.py +++ b/sentry_streams/tests/pipeline/test_message.py @@ -38,7 +38,8 @@ def test_message_access( message: Union[PyRawMessage, PyMessage[str]], expected_rust_type: Union[Type[PyAnyMessage], Type[RawMessage]], ) -> None: - assert message.headers == [("header1", "test".encode())] + # Python wrappers do not surface Kafka headers; inner Rust messages get []. + assert message.headers == [] assert message.timestamp == 10.0 assert message.schema == "schema"