From b27b25beacc1df896f2e4b1e759662f7438f77b9 Mon Sep 17 00:00:00 2001 From: Meredith Heller Date: Wed, 29 Apr 2026 12:09:09 -0700 Subject: [PATCH 1/4] ref(eap-outcomes): add sentry._internal.received_at attribute --- rust_snuba/src/processors/eap_items.rs | 108 +++++++++++++++++++++++++ 1 file changed, 108 insertions(+) diff --git a/rust_snuba/src/processors/eap_items.rs b/rust_snuba/src/processors/eap_items.rs index c81a2dd745..48080c513c 100644 --- a/rust_snuba/src/processors/eap_items.rs +++ b/rust_snuba/src/processors/eap_items.rs @@ -55,6 +55,13 @@ fn process_eap_item(msg: KafkaPayload, config: &ProcessorConfig) -> anyhow::Resu "sentry._internal.ingested_at".into(), Utc::now().timestamp_millis(), ); + // we are using this to compare to outcomes which stores the timestamp as seconds + if let Some(received_at) = origin_timestamp { + eap_item.attributes.insert_int( + "sentry._internal.received_at".into(), + received_at.timestamp(), + ); + } let mut item_type_metrics = ItemTypeMetrics::new(); item_type_metrics.record_item(item_type, payload.len()); @@ -653,6 +660,107 @@ mod tests { assert!(batch.origin_timestamp.is_none()); } + #[test] + fn test_received_at_attribute_is_set() { + let item_id = Uuid::new_v4(); + let trace_item = generate_trace_item(item_id); + // generate_trace_item sets received.seconds = 1745562493 + let expected_s: i64 = 1745562493; + + let mut payload_bytes = Vec::new(); + trace_item.encode(&mut payload_bytes).unwrap(); + + let meta = KafkaMessageMetadata { + partition: 0, + offset: 1, + timestamp: DateTime::from(SystemTime::now()), + }; + + // JSON path + let json_batch = process_message( + KafkaPayload::new(None, None, Some(payload_bytes.clone())), + meta.clone(), + &ProcessorConfig::default(), + ) + .expect("The message should be processed"); + + #[derive(Deserialize)] + struct Item { + #[serde(default)] + attributes_int: HashMap, + } + let item: Item = serde_json::from_slice(&json_batch.rows.encoded_rows).unwrap(); + assert_eq!( + item.attributes_int.get("sentry._internal.received_at"), + Some(&expected_s) + ); + + // RowBinary path + let rb_batch = process_message_row_binary( + KafkaPayload::new(None, None, Some(payload_bytes)), + meta, + &ProcessorConfig::default(), + ) + .expect("The message should be processed"); + + let row = &rb_batch.rows[0]; + let received_at = row + .attributes_int + .iter() + .find(|(k, _)| k == "sentry._internal.received_at") + .map(|(_, v)| *v); + assert_eq!(received_at, Some(expected_s)); + } + + #[test] + fn test_received_at_attribute_absent_when_received_none() { + let item_id = Uuid::new_v4(); + let mut trace_item = generate_trace_item(item_id); + trace_item.received = None; + + let mut payload_bytes = Vec::new(); + trace_item.encode(&mut payload_bytes).unwrap(); + + let meta = KafkaMessageMetadata { + partition: 0, + offset: 1, + timestamp: DateTime::from(SystemTime::now()), + }; + + // JSON path + let json_batch = process_message( + KafkaPayload::new(None, None, Some(payload_bytes.clone())), + meta.clone(), + &ProcessorConfig::default(), + ) + .expect("The message should be processed"); + + #[derive(Deserialize)] + struct Item { + #[serde(default)] + attributes_int: HashMap, + } + let item: Item = serde_json::from_slice(&json_batch.rows.encoded_rows).unwrap(); + assert!(!item + .attributes_int + .contains_key("sentry._internal.received_at")); + + // RowBinary path + let rb_batch = process_message_row_binary( + KafkaPayload::new(None, None, Some(payload_bytes)), + meta, + &ProcessorConfig::default(), + ) + .expect("The message should be processed"); + + let row = &rb_batch.rows[0]; + let has_received_at = row + .attributes_int + .iter() + .any(|(k, _)| k == "sentry._internal.received_at"); + assert!(!has_received_at); + } + #[test] fn test_row_binary_basic_processing() { let item_id = Uuid::new_v4(); From 1e41187ae9c7a8a8a4e8588894e66d52d870131e Mon Sep 17 00:00:00 2001 From: Meredith Heller Date: Wed, 29 Apr 2026 13:02:41 -0700 Subject: [PATCH 2/4] fix test --- rust_snuba/src/processors/mod.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/rust_snuba/src/processors/mod.rs b/rust_snuba/src/processors/mod.rs index 53b7e41f4c..aff261e325 100644 --- a/rust_snuba/src/processors/mod.rs +++ b/rust_snuba/src/processors/mod.rs @@ -161,6 +161,10 @@ mod tests { ".*.*[\"sentry._internal.ingested_at\"]", "", ); + settings.add_redaction( + ".*.*[\"sentry._internal.received_at\"]", + "", + ); } // This payload is protobuf (so binary), not JSON (so text). From 983eae0f09cd7e5487b91228ef141b2db4a96a7b Mon Sep 17 00:00:00 2001 From: Meredith Heller Date: Wed, 29 Apr 2026 16:50:50 -0700 Subject: [PATCH 3/4] cargo insta review --- ...ms-EAPItemsProcessor-snuba-items__1__basic.protobuf.snap | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/rust_snuba/src/processors/snapshots/rust_snuba__processors__tests__schemas@snuba-items-EAPItemsProcessor-snuba-items__1__basic.protobuf.snap b/rust_snuba/src/processors/snapshots/rust_snuba__processors__tests__schemas@snuba-items-EAPItemsProcessor-snuba-items__1__basic.protobuf.snap index bf570a6e3c..562af2539f 100644 --- a/rust_snuba/src/processors/snapshots/rust_snuba__processors__tests__schemas@snuba-items-EAPItemsProcessor-snuba-items__1__basic.protobuf.snap +++ b/rust_snuba/src/processors/snapshots/rust_snuba__processors__tests__schemas@snuba-items-EAPItemsProcessor-snuba-items__1__basic.protobuf.snap @@ -10,6 +10,9 @@ expression: snapshot_payload "attributes_float_3": { "another_attribute": 1238.0 }, + "attributes_float_35": { + "sentry._internal.received_at": "" + }, "attributes_float_6": { "some_bool": 1.0 }, @@ -21,7 +24,8 @@ expression: snapshot_payload }, "attributes_int": { "another_attribute": 1238, - "sentry._internal.ingested_at": "" + "sentry._internal.ingested_at": "", + "sentry._internal.received_at": "" }, "attributes_string_32": { "one_attribute": "blah" From 44a90e38b0b82f8913b8ebd9a7c463bf3fdc5033 Mon Sep 17 00:00:00 2001 From: Meredith Heller Date: Wed, 29 Apr 2026 18:19:04 -0700 Subject: [PATCH 4/4] fix page tooken and other --- tests/web/rpc/v1/test_endpoint_export_trace_items.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tests/web/rpc/v1/test_endpoint_export_trace_items.py b/tests/web/rpc/v1/test_endpoint_export_trace_items.py index 605f196342..50acf98a56 100644 --- a/tests/web/rpc/v1/test_endpoint_export_trace_items.py +++ b/tests/web/rpc/v1/test_endpoint_export_trace_items.py @@ -63,6 +63,7 @@ def _assert_attributes_keys(trace_items: list[TraceItem]) -> None: "sentry.start_timestamp_precise", "start_timestamp_ms", "sentry._internal.ingested_at", + "sentry._internal.received_at", } ) assert actual_keys == expected_keys @@ -118,9 +119,9 @@ def test_with_pagination(self, setup_teardown: Any) -> None: response = EndpointExportTraceItems().execute(message) items.extend(response.trace_items) if len(response.trace_items) == 20: - assert response.page_token.end_pagination == False + assert not response.page_token.end_pagination else: - assert response.page_token.end_pagination == True + assert response.page_token.end_pagination break message.page_token.CopyFrom(response.page_token)