diff --git a/rust_snuba/src/processors/eap_items.rs b/rust_snuba/src/processors/eap_items.rs index c81a2dd745f..48080c513c3 100644 --- a/rust_snuba/src/processors/eap_items.rs +++ b/rust_snuba/src/processors/eap_items.rs @@ -55,6 +55,13 @@ fn process_eap_item(msg: KafkaPayload, config: &ProcessorConfig) -> anyhow::Resu "sentry._internal.ingested_at".into(), Utc::now().timestamp_millis(), ); + // we are using this to compare to outcomes which stores the timestamp as seconds + if let Some(received_at) = origin_timestamp { + eap_item.attributes.insert_int( + "sentry._internal.received_at".into(), + received_at.timestamp(), + ); + } let mut item_type_metrics = ItemTypeMetrics::new(); item_type_metrics.record_item(item_type, payload.len()); @@ -653,6 +660,107 @@ mod tests { assert!(batch.origin_timestamp.is_none()); } + #[test] + fn test_received_at_attribute_is_set() { + let item_id = Uuid::new_v4(); + let trace_item = generate_trace_item(item_id); + // generate_trace_item sets received.seconds = 1745562493 + let expected_s: i64 = 1745562493; + + let mut payload_bytes = Vec::new(); + trace_item.encode(&mut payload_bytes).unwrap(); + + let meta = KafkaMessageMetadata { + partition: 0, + offset: 1, + timestamp: DateTime::from(SystemTime::now()), + }; + + // JSON path + let json_batch = process_message( + KafkaPayload::new(None, None, Some(payload_bytes.clone())), + meta.clone(), + &ProcessorConfig::default(), + ) + .expect("The message should be processed"); + + #[derive(Deserialize)] + struct Item { + #[serde(default)] + attributes_int: HashMap, + } + let item: Item = serde_json::from_slice(&json_batch.rows.encoded_rows).unwrap(); + assert_eq!( + item.attributes_int.get("sentry._internal.received_at"), + Some(&expected_s) + ); + + // RowBinary path + let rb_batch = process_message_row_binary( + KafkaPayload::new(None, None, Some(payload_bytes)), + meta, + &ProcessorConfig::default(), + ) + .expect("The message should be processed"); + + let row = &rb_batch.rows[0]; + let received_at = row + .attributes_int + .iter() + .find(|(k, _)| k == "sentry._internal.received_at") + .map(|(_, v)| *v); + assert_eq!(received_at, Some(expected_s)); + } + + #[test] + fn test_received_at_attribute_absent_when_received_none() { + let item_id = Uuid::new_v4(); + let mut trace_item = generate_trace_item(item_id); + trace_item.received = None; + + let mut payload_bytes = Vec::new(); + trace_item.encode(&mut payload_bytes).unwrap(); + + let meta = KafkaMessageMetadata { + partition: 0, + offset: 1, + timestamp: DateTime::from(SystemTime::now()), + }; + + // JSON path + let json_batch = process_message( + KafkaPayload::new(None, None, Some(payload_bytes.clone())), + meta.clone(), + &ProcessorConfig::default(), + ) + .expect("The message should be processed"); + + #[derive(Deserialize)] + struct Item { + #[serde(default)] + attributes_int: HashMap, + } + let item: Item = serde_json::from_slice(&json_batch.rows.encoded_rows).unwrap(); + assert!(!item + .attributes_int + .contains_key("sentry._internal.received_at")); + + // RowBinary path + let rb_batch = process_message_row_binary( + KafkaPayload::new(None, None, Some(payload_bytes)), + meta, + &ProcessorConfig::default(), + ) + .expect("The message should be processed"); + + let row = &rb_batch.rows[0]; + let has_received_at = row + .attributes_int + .iter() + .any(|(k, _)| k == "sentry._internal.received_at"); + assert!(!has_received_at); + } + #[test] fn test_row_binary_basic_processing() { let item_id = Uuid::new_v4(); diff --git a/rust_snuba/src/processors/mod.rs b/rust_snuba/src/processors/mod.rs index 53b7e41f4c4..aff261e325c 100644 --- a/rust_snuba/src/processors/mod.rs +++ b/rust_snuba/src/processors/mod.rs @@ -161,6 +161,10 @@ mod tests { ".*.*[\"sentry._internal.ingested_at\"]", "", ); + settings.add_redaction( + ".*.*[\"sentry._internal.received_at\"]", + "", + ); } // This payload is protobuf (so binary), not JSON (so text). diff --git a/rust_snuba/src/processors/snapshots/rust_snuba__processors__tests__schemas@snuba-items-EAPItemsProcessor-snuba-items__1__basic.protobuf.snap b/rust_snuba/src/processors/snapshots/rust_snuba__processors__tests__schemas@snuba-items-EAPItemsProcessor-snuba-items__1__basic.protobuf.snap index bf570a6e3c1..562af2539f1 100644 --- a/rust_snuba/src/processors/snapshots/rust_snuba__processors__tests__schemas@snuba-items-EAPItemsProcessor-snuba-items__1__basic.protobuf.snap +++ b/rust_snuba/src/processors/snapshots/rust_snuba__processors__tests__schemas@snuba-items-EAPItemsProcessor-snuba-items__1__basic.protobuf.snap @@ -10,6 +10,9 @@ expression: snapshot_payload "attributes_float_3": { "another_attribute": 1238.0 }, + "attributes_float_35": { + "sentry._internal.received_at": "" + }, "attributes_float_6": { "some_bool": 1.0 }, @@ -21,7 +24,8 @@ expression: snapshot_payload }, "attributes_int": { "another_attribute": 1238, - "sentry._internal.ingested_at": "" + "sentry._internal.ingested_at": "", + "sentry._internal.received_at": "" }, "attributes_string_32": { "one_attribute": "blah" diff --git a/tests/web/rpc/v1/test_endpoint_export_trace_items.py b/tests/web/rpc/v1/test_endpoint_export_trace_items.py index 605f196342a..50acf98a566 100644 --- a/tests/web/rpc/v1/test_endpoint_export_trace_items.py +++ b/tests/web/rpc/v1/test_endpoint_export_trace_items.py @@ -63,6 +63,7 @@ def _assert_attributes_keys(trace_items: list[TraceItem]) -> None: "sentry.start_timestamp_precise", "start_timestamp_ms", "sentry._internal.ingested_at", + "sentry._internal.received_at", } ) assert actual_keys == expected_keys @@ -118,9 +119,9 @@ def test_with_pagination(self, setup_teardown: Any) -> None: response = EndpointExportTraceItems().execute(message) items.extend(response.trace_items) if len(response.trace_items) == 20: - assert response.page_token.end_pagination == False + assert not response.page_token.end_pagination else: - assert response.page_token.end_pagination == True + assert response.page_token.end_pagination break message.page_token.CopyFrom(response.page_token)