diff --git a/sentry_streams/sentry_streams/adapters/arroyo/multi_process_delegate.py b/sentry_streams/sentry_streams/adapters/arroyo/multi_process_delegate.py index 24af795a..fe1f8159 100644 --- a/sentry_streams/sentry_streams/adapters/arroyo/multi_process_delegate.py +++ b/sentry_streams/sentry_streams/adapters/arroyo/multi_process_delegate.py @@ -75,11 +75,9 @@ def rust_to_arroyo_msg( for partition, offset in committable.items() } if isinstance(message, PyAnyMessage): - to_send: Message[Any] = PyMessage( - message.payload, message.headers, message.timestamp, message.schema - ) + to_send: Message[Any] = PyMessage(message.payload, [], message.timestamp, message.schema) elif isinstance(message, RawMessage): - to_send = PyRawMessage(message.payload, message.headers, message.timestamp, message.schema) + to_send = PyRawMessage(message.payload, [], message.timestamp, message.schema) msg = ArroyoMessage( Value( diff --git a/sentry_streams/sentry_streams/adapters/arroyo/reduce_delegate.py b/sentry_streams/sentry_streams/adapters/arroyo/reduce_delegate.py index d30a9ad9..c753b41e 100644 --- a/sentry_streams/sentry_streams/adapters/arroyo/reduce_delegate.py +++ b/sentry_streams/sentry_streams/adapters/arroyo/reduce_delegate.py @@ -39,11 +39,9 @@ def rust_msg_to_arroyo_reduce( } if isinstance(message, PyAnyMessage): - to_send: Message[Any] = PyMessage( - message.payload, message.headers, message.timestamp, message.schema - ) + to_send: Message[Any] = PyMessage(message.payload, [], message.timestamp, message.schema) elif isinstance(message, RawMessage): - to_send = PyRawMessage(message.payload, message.headers, message.timestamp, message.schema) + to_send = PyRawMessage(message.payload, [], message.timestamp, message.schema) msg = ArroyoMessage( Value( diff --git a/sentry_streams/sentry_streams/adapters/arroyo/steps_chain.py b/sentry_streams/sentry_streams/adapters/arroyo/steps_chain.py index 278c9bb0..92d7e937 100644 --- a/sentry_streams/sentry_streams/adapters/arroyo/steps_chain.py +++ b/sentry_streams/sentry_streams/adapters/arroyo/steps_chain.py @@ -26,14 +26,14 @@ def transform(chain: Sequence[Map[Any, Any]], message: Message[Any]) -> Message[ # Thus TMapOut = bytes. next_msg = PyRawMessage( payload=ret, - headers=next_msg.headers, + headers=[], timestamp=next_msg.timestamp, schema=next_msg.schema, ) else: next_msg = PyMessage( payload=ret, - headers=next_msg.headers, + headers=[], timestamp=next_msg.timestamp, schema=next_msg.schema, ) diff --git a/sentry_streams/sentry_streams/pipeline/message.py b/sentry_streams/sentry_streams/pipeline/message.py index a884bfa6..196c1a11 100644 --- a/sentry_streams/sentry_streams/pipeline/message.py +++ b/sentry_streams/sentry_streams/pipeline/message.py @@ -110,7 +110,7 @@ def __init__( schema: Optional[str] = None, ) -> None: self._payload: TPayload = payload - self._headers: Sequence[Tuple[str, bytes]] = headers + self._headers: Sequence[Tuple[str, bytes]] = [] self._timestamp = timestamp self._schema = schema self._cached_inner: PyAnyMessage | None = None @@ -148,9 +148,7 @@ def __str__(self) -> str: def to_inner(self) -> PyAnyMessage: if self._cached_inner is None: - self._cached_inner = PyAnyMessage( - self._payload, self._headers, self._timestamp, self._schema - ) + self._cached_inner = PyAnyMessage(self._payload, [], self._timestamp, self._schema) return self._cached_inner def deepcopy(self) -> PyMessage[TPayload]: @@ -195,7 +193,7 @@ def __init__( schema: Optional[str] = None, ) -> None: self._payload = payload - self._headers: Sequence[Tuple[str, bytes]] = headers + self._headers: Sequence[Tuple[str, bytes]] = [] self._timestamp = timestamp self._schema = schema self._cached_inner: RawMessage | None = None @@ -231,9 +229,7 @@ def __str__(self) -> str: def to_inner(self) -> RawMessage: if self._cached_inner is None: - self._cached_inner = RawMessage( - self._payload, self._headers, self._timestamp, self._schema - ) + self._cached_inner = RawMessage(self._payload, [], self._timestamp, self._schema) return self._cached_inner def deepcopy(self) -> PyRawMessage: diff --git a/sentry_streams/src/messages.rs b/sentry_streams/src/messages.rs index 103e4560..4cf246b9 100644 --- a/sentry_streams/src/messages.rs +++ b/sentry_streams/src/messages.rs @@ -44,6 +44,8 @@ use sentry_arroyo::types::Partition; use crate::committable::{convert_committable_to_py, convert_py_committable}; use crate::utils::traced_with_gil; +// Used from `mod tests` only; the library does not read headers from the Python constructor. +#[allow(dead_code)] pub fn headers_to_vec(py: Python<'_>, headers: Py) -> PyResult)>> { // Converts the Python consumable representation of the Message headers into // the Rust native representation (which is a Vec<(String, Vec)>). @@ -203,12 +205,11 @@ impl PyAnyMessage { headers: Py, timestamp: f64, schema: Option, - py: Python<'_>, ) -> PyResult { Ok(Self { payload, - // Optimization to avoid running the python code if there are no headers. - headers: headers_to_vec(py, headers)?, + // Kafka headers are not read from the constructor; keep an empty vec. + headers: Vec::new(), timestamp, schema, }) @@ -273,8 +274,8 @@ impl RawMessage { ) -> PyResult { Ok(Self { payload: payload.as_bytes(py).to_vec(), - // Optimization to avoid running the python code if there are no headers. - headers: headers_to_vec(py, headers)?, + // Kafka headers are not read from the constructor; keep an empty vec. + headers: Vec::new(), timestamp, schema, }) @@ -585,7 +586,6 @@ mod tests { py_headers.clone_ref(py), timestamp, schema.clone(), - py, ) .unwrap(); @@ -597,8 +597,8 @@ mod tests { let payload_val: String = msg.payload.bind(py).extract().unwrap(); assert_eq!(payload_val, "payload"); - // Check headers - assert_eq!(msg.headers, headers); + // Check headers (constructor ignores the argument; always empty) + assert!(msg.headers.is_empty()); let new_msg = msg.replace_payload("new_payload".into_py_any(py).unwrap()); @@ -613,7 +613,10 @@ mod tests { let repr = pymsg.call_method0(py, "__repr__").unwrap(); let expected_repr = format!( "PyAnyMessage(payload='{}', headers={:?}, timestamp={}, schema={:?})", - payload_val, headers, timestamp, schema + payload_val, + Vec::<(String, Vec)>::new(), + timestamp, + schema ); assert_eq!(repr.extract::(py).unwrap(), expected_repr); }); @@ -652,8 +655,8 @@ mod tests { // Check payload assert_eq!(msg.payload, payload_bytes); - // Check headers - assert_eq!(msg.headers, headers); + // Check headers (constructor ignores the argument; always empty) + assert!(msg.headers.is_empty()); // Test payload getter let py_payload_val = msg.payload(py).unwrap(); @@ -663,7 +666,7 @@ mod tests { // Test headers getter let py_headers_val = msg.headers(py).unwrap(); let headers_val = headers_to_vec(py, py_headers_val).unwrap(); - assert_eq!(headers_val, headers); + assert!(headers_val.is_empty()); // Replace payload via python let new_payload_bytes = vec![200, 201, 202]; @@ -680,7 +683,10 @@ mod tests { let repr = pymsg.call_method0(py, "__repr__").unwrap(); let expected_repr = format!( "RawMessage(payload={:?}, headers={:?}, timestamp={}, schema={:?})", - payload_bytes, headers, timestamp, schema + payload_bytes, + Vec::<(String, Vec)>::new(), + timestamp, + schema ); assert_eq!(repr.extract::(py).unwrap(), expected_repr); }); diff --git a/sentry_streams/tests/adapters/arroyo/test_delegate.py b/sentry_streams/tests/adapters/arroyo/test_delegate.py index 81951e48..4d560ce2 100644 --- a/sentry_streams/tests/adapters/arroyo/test_delegate.py +++ b/sentry_streams/tests/adapters/arroyo/test_delegate.py @@ -47,9 +47,7 @@ def _process_message(self, msg: RustMessage, committable: Committable) -> RustMe def test_rust_step() -> None: def make_msg(payload: str) -> RustMessage: - return PyAnyMessage( - payload=payload, headers=[("head", "val".encode())], timestamp=0, schema=None - ) + return PyAnyMessage(payload=payload, headers=[], timestamp=0, schema=None) step = SingleMessageTransformer() # Transform one message diff --git a/sentry_streams/tests/adapters/arroyo/test_multi_process_delegate.py b/sentry_streams/tests/adapters/arroyo/test_multi_process_delegate.py index 919e42f0..06516f6f 100644 --- a/sentry_streams/tests/adapters/arroyo/test_multi_process_delegate.py +++ b/sentry_streams/tests/adapters/arroyo/test_multi_process_delegate.py @@ -41,9 +41,7 @@ def test_process_message() -> None: result = process_message(transformer, msg) - assert result == PyMessage( - "transformed foo", headers=[("h", "v".encode())], timestamp=123, schema="s" - ) + assert result == PyMessage("transformed foo", headers=[], timestamp=123, schema="s") def test_mapped_msg_none() -> None: @@ -79,7 +77,7 @@ def test_rust_to_arroyo_msg_with_pyanymessage() -> None: assert isinstance(arroyo_msg.payload, PyMessage) assert arroyo_msg.payload.payload == "payload" - assert arroyo_msg.payload.headers == [("h", "v".encode())] + assert arroyo_msg.payload.headers == [] assert arroyo_msg.payload.timestamp == 123 assert arroyo_msg.payload.schema == "s" assert Partition(Topic("topic"), 0) in arroyo_msg.committable @@ -106,9 +104,7 @@ def test_integration() -> None: ret_msg, _ = ret[0] ret_msg = cast(PyAnyMessage, ret_msg) - expected = PyMessage( - "transformed foo", headers=[("h", "v".encode())], timestamp=123, schema="s" - ).to_inner() + expected = PyMessage("transformed foo", headers=[], timestamp=123, schema="s").to_inner() assert ret_msg.payload == expected.payload assert ret_msg.headers == expected.headers diff --git a/sentry_streams/tests/adapters/arroyo/test_steps_chain.py b/sentry_streams/tests/adapters/arroyo/test_steps_chain.py index 5c145523..24679cfd 100644 --- a/sentry_streams/tests/adapters/arroyo/test_steps_chain.py +++ b/sentry_streams/tests/adapters/arroyo/test_steps_chain.py @@ -156,9 +156,7 @@ def transformer(msg: ArroyoMessage[Message[Any]]) -> Message[Any]: assert len(ret) == 1 ret_msg, _ = ret[0] - expected = PyMessage( - "foo_t1_t2", headers=[("h", "v".encode())], timestamp=123, schema="s" - ).to_inner() + expected = PyMessage("foo_t1_t2", headers=[], timestamp=123, schema="s").to_inner() ret_msg = cast(PyAnyMessage, ret_msg) assert ret_msg.payload == expected.payload diff --git a/sentry_streams/tests/pipeline/test_message.py b/sentry_streams/tests/pipeline/test_message.py index 528d39f8..8cf85d2f 100644 --- a/sentry_streams/tests/pipeline/test_message.py +++ b/sentry_streams/tests/pipeline/test_message.py @@ -38,7 +38,8 @@ def test_message_access( message: Union[PyRawMessage, PyMessage[str]], expected_rust_type: Union[Type[PyAnyMessage], Type[RawMessage]], ) -> None: - assert message.headers == [("header1", "test".encode())] + # Python wrappers do not surface Kafka headers; inner Rust messages get []. + assert message.headers == [] assert message.timestamp == 10.0 assert message.schema == "schema"