Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,9 @@ def rust_to_arroyo_msg(
for partition, offset in committable.items()
}
if isinstance(message, PyAnyMessage):
to_send: Message[Any] = PyMessage(
message.payload, message.headers, message.timestamp, message.schema
)
to_send: Message[Any] = PyMessage(message.payload, [], message.timestamp, message.schema)
elif isinstance(message, RawMessage):
to_send = PyRawMessage(message.payload, message.headers, message.timestamp, message.schema)
to_send = PyRawMessage(message.payload, [], message.timestamp, message.schema)

msg = ArroyoMessage(
Value(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,9 @@ def rust_msg_to_arroyo_reduce(
}

if isinstance(message, PyAnyMessage):
to_send: Message[Any] = PyMessage(
message.payload, message.headers, message.timestamp, message.schema
)
to_send: Message[Any] = PyMessage(message.payload, [], message.timestamp, message.schema)
elif isinstance(message, RawMessage):
to_send = PyRawMessage(message.payload, message.headers, message.timestamp, message.schema)
to_send = PyRawMessage(message.payload, [], message.timestamp, message.schema)

msg = ArroyoMessage(
Value(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@ def transform(chain: Sequence[Map[Any, Any]], message: Message[Any]) -> Message[
# Thus TMapOut = bytes.
next_msg = PyRawMessage(
payload=ret,
headers=next_msg.headers,
headers=[],
timestamp=next_msg.timestamp,
schema=next_msg.schema,
)
else:
next_msg = PyMessage(
payload=ret,
headers=next_msg.headers,
headers=[],
timestamp=next_msg.timestamp,
schema=next_msg.schema,
)
Expand Down
12 changes: 4 additions & 8 deletions sentry_streams/sentry_streams/pipeline/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ def __init__(
schema: Optional[str] = None,
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: The PyMessage and PyRawMessage constructors accept a headers parameter but silently ignore it, always initializing headers to an empty list.
Severity: MEDIUM

Suggested Fix

The headers parameter should be used to initialize the self._headers attribute in the PyMessage and PyRawMessage constructors. If the intention is to permanently remove header handling, the headers parameter should be removed from the constructor signatures and all call sites to prevent silent data loss.

Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent. Verify if this is a real issue. If it is, propose a fix; if not, explain why it's
not valid.

Location: sentry_streams/sentry_streams/pipeline/message.py#L110

Potential issue: The constructors for `PyMessage` and `PyRawMessage` accept a `headers`
parameter but the implementation ignores it, always initializing the internal `_headers`
attribute to an empty list. This means any headers passed when creating these message
objects are silently discarded. This violates the API contract and can lead to
unexpected behavior or silent failures in any code that relies on headers being
preserved across processing steps, such as custom filters or monitoring logic.

Did we get this right? 👍 / 👎 to inform future reviews.

) -> None:
self._payload: TPayload = payload
self._headers: Sequence[Tuple[str, bytes]] = headers
self._headers: Sequence[Tuple[str, bytes]] = []
self._timestamp = timestamp
self._schema = schema
self._cached_inner: PyAnyMessage | None = None
Expand Down Expand Up @@ -148,9 +148,7 @@ def __str__(self) -> str:

def to_inner(self) -> PyAnyMessage:
if self._cached_inner is None:
self._cached_inner = PyAnyMessage(
self._payload, self._headers, self._timestamp, self._schema
)
self._cached_inner = PyAnyMessage(self._payload, [], self._timestamp, self._schema)
return self._cached_inner

def deepcopy(self) -> PyMessage[TPayload]:
Expand Down Expand Up @@ -195,7 +193,7 @@ def __init__(
schema: Optional[str] = None,
) -> None:
self._payload = payload
self._headers: Sequence[Tuple[str, bytes]] = headers
self._headers: Sequence[Tuple[str, bytes]] = []
self._timestamp = timestamp
self._schema = schema
self._cached_inner: RawMessage | None = None
Expand Down Expand Up @@ -231,9 +229,7 @@ def __str__(self) -> str:

def to_inner(self) -> RawMessage:
if self._cached_inner is None:
self._cached_inner = RawMessage(
self._payload, self._headers, self._timestamp, self._schema
)
self._cached_inner = RawMessage(self._payload, [], self._timestamp, self._schema)
return self._cached_inner

def deepcopy(self) -> PyRawMessage:
Expand Down
32 changes: 19 additions & 13 deletions sentry_streams/src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ use sentry_arroyo::types::Partition;
use crate::committable::{convert_committable_to_py, convert_py_committable};
use crate::utils::traced_with_gil;

// Used from `mod tests` only; the library does not read headers from the Python constructor.
#[allow(dead_code)]
pub fn headers_to_vec(py: Python<'_>, headers: Py<PySequence>) -> PyResult<Vec<(String, Vec<u8>)>> {
// Converts the Python consumable representation of the Message headers into
// the Rust native representation (which is a Vec<(String, Vec<u8>)>).
Expand Down Expand Up @@ -203,12 +205,11 @@ impl PyAnyMessage {
headers: Py<PySequence>,
timestamp: f64,
schema: Option<String>,
py: Python<'_>,
) -> PyResult<Self> {
Ok(Self {
payload,
// Optimization to avoid running the python code if there are no headers.
headers: headers_to_vec(py, headers)?,
// Kafka headers are not read from the constructor; keep an empty vec.
headers: Vec::new(),
timestamp,
schema,
})
Expand Down Expand Up @@ -273,8 +274,8 @@ impl RawMessage {
) -> PyResult<Self> {
Ok(Self {
payload: payload.as_bytes(py).to_vec(),
// Optimization to avoid running the python code if there are no headers.
headers: headers_to_vec(py, headers)?,
// Kafka headers are not read from the constructor; keep an empty vec.
headers: Vec::new(),
timestamp,
schema,
})
Expand Down Expand Up @@ -585,7 +586,6 @@ mod tests {
py_headers.clone_ref(py),
timestamp,
schema.clone(),
py,
)
.unwrap();

Expand All @@ -597,8 +597,8 @@ mod tests {
let payload_val: String = msg.payload.bind(py).extract().unwrap();
assert_eq!(payload_val, "payload");

// Check headers
assert_eq!(msg.headers, headers);
// Check headers (constructor ignores the argument; always empty)
assert!(msg.headers.is_empty());

let new_msg = msg.replace_payload("new_payload".into_py_any(py).unwrap());

Expand All @@ -613,7 +613,10 @@ mod tests {
let repr = pymsg.call_method0(py, "__repr__").unwrap();
let expected_repr = format!(
"PyAnyMessage(payload='{}', headers={:?}, timestamp={}, schema={:?})",
payload_val, headers, timestamp, schema
payload_val,
Vec::<(String, Vec<u8>)>::new(),
timestamp,
schema
);
assert_eq!(repr.extract::<String>(py).unwrap(), expected_repr);
});
Expand Down Expand Up @@ -652,8 +655,8 @@ mod tests {
// Check payload
assert_eq!(msg.payload, payload_bytes);

// Check headers
assert_eq!(msg.headers, headers);
// Check headers (constructor ignores the argument; always empty)
assert!(msg.headers.is_empty());

// Test payload getter
let py_payload_val = msg.payload(py).unwrap();
Expand All @@ -663,7 +666,7 @@ mod tests {
// Test headers getter
let py_headers_val = msg.headers(py).unwrap();
let headers_val = headers_to_vec(py, py_headers_val).unwrap();
assert_eq!(headers_val, headers);
assert!(headers_val.is_empty());

// Replace payload via python
let new_payload_bytes = vec![200, 201, 202];
Expand All @@ -680,7 +683,10 @@ mod tests {
let repr = pymsg.call_method0(py, "__repr__").unwrap();
let expected_repr = format!(
"RawMessage(payload={:?}, headers={:?}, timestamp={}, schema={:?})",
payload_bytes, headers, timestamp, schema
payload_bytes,
Vec::<(String, Vec<u8>)>::new(),
timestamp,
schema
);
assert_eq!(repr.extract::<String>(py).unwrap(), expected_repr);
});
Expand Down
4 changes: 1 addition & 3 deletions sentry_streams/tests/adapters/arroyo/test_delegate.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,7 @@ def _process_message(self, msg: RustMessage, committable: Committable) -> RustMe

def test_rust_step() -> None:
def make_msg(payload: str) -> RustMessage:
return PyAnyMessage(
payload=payload, headers=[("head", "val".encode())], timestamp=0, schema=None
)
return PyAnyMessage(payload=payload, headers=[], timestamp=0, schema=None)

step = SingleMessageTransformer()
# Transform one message
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,7 @@ def test_process_message() -> None:

result = process_message(transformer, msg)

assert result == PyMessage(
"transformed foo", headers=[("h", "v".encode())], timestamp=123, schema="s"
)
assert result == PyMessage("transformed foo", headers=[], timestamp=123, schema="s")


def test_mapped_msg_none() -> None:
Expand Down Expand Up @@ -79,7 +77,7 @@ def test_rust_to_arroyo_msg_with_pyanymessage() -> None:
assert isinstance(arroyo_msg.payload, PyMessage)

assert arroyo_msg.payload.payload == "payload"
assert arroyo_msg.payload.headers == [("h", "v".encode())]
assert arroyo_msg.payload.headers == []
assert arroyo_msg.payload.timestamp == 123
assert arroyo_msg.payload.schema == "s"
assert Partition(Topic("topic"), 0) in arroyo_msg.committable
Expand All @@ -106,9 +104,7 @@ def test_integration() -> None:
ret_msg, _ = ret[0]
ret_msg = cast(PyAnyMessage, ret_msg)

expected = PyMessage(
"transformed foo", headers=[("h", "v".encode())], timestamp=123, schema="s"
).to_inner()
expected = PyMessage("transformed foo", headers=[], timestamp=123, schema="s").to_inner()

assert ret_msg.payload == expected.payload
assert ret_msg.headers == expected.headers
Expand Down
4 changes: 1 addition & 3 deletions sentry_streams/tests/adapters/arroyo/test_steps_chain.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,9 +156,7 @@ def transformer(msg: ArroyoMessage[Message[Any]]) -> Message[Any]:
assert len(ret) == 1
ret_msg, _ = ret[0]

expected = PyMessage(
"foo_t1_t2", headers=[("h", "v".encode())], timestamp=123, schema="s"
).to_inner()
expected = PyMessage("foo_t1_t2", headers=[], timestamp=123, schema="s").to_inner()

ret_msg = cast(PyAnyMessage, ret_msg)
assert ret_msg.payload == expected.payload
Expand Down
3 changes: 2 additions & 1 deletion sentry_streams/tests/pipeline/test_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ def test_message_access(
message: Union[PyRawMessage, PyMessage[str]],
expected_rust_type: Union[Type[PyAnyMessage], Type[RawMessage]],
) -> None:
assert message.headers == [("header1", "test".encode())]
# Python wrappers do not surface Kafka headers; inner Rust messages get [].
assert message.headers == []
assert message.timestamp == 10.0
assert message.schema == "schema"

Expand Down
Loading