From db19bdeb5ba7055d46394302bc96c23ed736d1aa Mon Sep 17 00:00:00 2001 From: Saraj Manes Date: Mon, 27 Apr 2026 12:13:00 -0400 Subject: [PATCH 1/3] feat(EAP): Update the data export endpoint to respect routing decision Update the export logic to respect routing decision to avoid scanning too much data and avoid timeouts --- .../web/rpc/v1/endpoint_export_trace_items.py | 280 ++++++++++++++---- .../v1/test_endpoint_export_trace_items.py | 5 +- 2 files changed, 223 insertions(+), 62 deletions(-) diff --git a/snuba/web/rpc/v1/endpoint_export_trace_items.py b/snuba/web/rpc/v1/endpoint_export_trace_items.py index c982430ed1b..5ab68dfcc07 100644 --- a/snuba/web/rpc/v1/endpoint_export_trace_items.py +++ b/snuba/web/rpc/v1/endpoint_export_trace_items.py @@ -2,13 +2,15 @@ from datetime import datetime from typing import Any, Dict, Iterable, NamedTuple, Type, cast +import sentry_sdk from google.protobuf.json_format import MessageToDict from google.protobuf.timestamp_pb2 import Timestamp +from sentry_protos.snuba.v1.downsampled_storage_pb2 import DownsampledStorageConfig from sentry_protos.snuba.v1.endpoint_trace_items_pb2 import ( ExportTraceItemsRequest, ExportTraceItemsResponse, ) -from sentry_protos.snuba.v1.request_common_pb2 import PageToken, TraceItemType +from sentry_protos.snuba.v1.request_common_pb2 import PageToken, RequestMeta, TraceItemType from sentry_protos.snuba.v1.trace_item_pb2 import AnyValue, ArrayValue, TraceItem from snuba import state @@ -36,6 +38,7 @@ ) from snuba.web.rpc.common.debug_info import setup_trace_query_settings from snuba.web.rpc.common.exceptions import BadSnubaRPCRequestException +from snuba.web.rpc.storage_routing.routing_strategies.storage_routing import RoutingDecision _DEFAULT_PAGE_SIZE = 10_000 @@ -48,76 +51,172 @@ TraceItemFilter, ) +FLEX_WIN_START = "sentry__time_window.start_timestamp" +FLEX_WIN_END = "sentry__time_window.end_timestamp" + + +def _flex_time_window_filters(start_sec: int, end_sec: int) -> list[TraceItemFilter]: + return [ + TraceItemFilter( + comparison_filter=ComparisonFilter( + key=AttributeKey(name=FLEX_WIN_START), + op=ComparisonFilter.OP_GREATER_THAN_OR_EQUALS, + value=AttributeValue(val_int=start_sec), + ) + ), + TraceItemFilter( + comparison_filter=ComparisonFilter( + key=AttributeKey(name=FLEX_WIN_END), + op=ComparisonFilter.OP_LESS_THAN, + value=AttributeValue(val_int=end_sec), + ) + ), + ] + + +def _parse_flex_window_from_filters(filters: list[TraceItemFilter]) -> tuple[int, int] | None: + start_sec: int | None = None + end_sec: int | None = None + for filt in filters: + if not filt.HasField("comparison_filter"): + continue + k = filt.comparison_filter.key.name + if k == FLEX_WIN_START and filt.comparison_filter.value.HasField("val_int"): + start_sec = filt.comparison_filter.value.val_int + elif k == FLEX_WIN_END and filt.comparison_filter.value.HasField("val_int"): + end_sec = filt.comparison_filter.value.val_int + if start_sec is not None and end_sec is not None: + return (start_sec, end_sec) + if start_sec is not None or end_sec is not None: + raise ValueError("Invalid flex time window in page token") + return None + + +def _parse_last_seen_tuple( + filters: list[TraceItemFilter], +) -> tuple[int, TraceItemType.ValueType, float, str, str]: + """Parse the 5 'last seen' key equality filters""" + if len(filters) != 5: + raise ValueError("Invalid last_seen in page token") + if not ( + filters[0].comparison_filter.key.name == "last_seen_project_id" + and filters[0].comparison_filter.key.type == AttributeKey.Type.TYPE_INT + ): + raise ValueError("Invalid project id") + last_seen_project_id = filters[0].comparison_filter.value.val_int + if not ( + filters[1].comparison_filter.key.name == "last_seen_item_type" + and filters[1].comparison_filter.key.type == AttributeKey.Type.TYPE_INT + ): + raise ValueError("Invalid item type") + last_seen_item_type = filters[1].comparison_filter.value.val_int + if not ( + filters[2].comparison_filter.key.name == "last_seen_timestamp" + and filters[2].comparison_filter.key.type == AttributeKey.Type.TYPE_DOUBLE + ): + raise ValueError("Invalid timestamp") + last_seen_timestamp = filters[2].comparison_filter.value.val_double + if not ( + filters[3].comparison_filter.key.name == "last_seen_trace_id" + and filters[3].comparison_filter.key.type == AttributeKey.Type.TYPE_STRING + ): + raise ValueError("Invalid trace id") + last_seen_trace_id = filters[3].comparison_filter.value.val_str + if not ( + filters[4].comparison_filter.key.name == "last_seen_item_id" + and filters[4].comparison_filter.key.type == AttributeKey.Type.TYPE_STRING + ): + raise ValueError("Invalid item id") + last_seen_item_id = filters[4].comparison_filter.value.val_str + return ( + last_seen_project_id, + cast(TraceItemType.ValueType, last_seen_item_type), + last_seen_timestamp, + last_seen_trace_id, + last_seen_item_id, + ) + class ExportTraceItemsPageToken: + """Page token: always encodes the active [start,end) in unix seconds, shared with flex routing. + + * **2 filters:** that window only — move to the next time slice (flex) with no keyset cursor. + * **7 filters:** same 2 time-window fields plus 5 equality fields — continue after the + last row within that window (keyset / tuple seek). + + The first request has an empty `page_token`. Every continuation request must resend + `RequestMeta` (user range) and send a token so routing can read + `sentry__time_window.*` and align with the slice being scanned. + """ + def __init__( self, - last_seen_project_id: int, - last_seen_item_type: TraceItemType.ValueType, - last_seen_timestamp: float, - last_seen_trace_id: str, - last_seen_item_id: str, + *, + window_start_sec: int, + window_end_sec: int, + last_seen_project_id: int = 0, + last_seen_item_type: TraceItemType.ValueType = TraceItemType.TRACE_ITEM_TYPE_UNSPECIFIED, + last_seen_timestamp: float = 0.0, + last_seen_trace_id: str = "", + last_seen_item_id: str = "", + include_last_seen: bool = False, ): + self.window_start_sec = window_start_sec + self.window_end_sec = window_end_sec self.last_seen_project_id = last_seen_project_id self.last_seen_item_type = last_seen_item_type self.last_seen_timestamp = last_seen_timestamp self.last_seen_trace_id = last_seen_trace_id self.last_seen_item_id = last_seen_item_id + self.include_last_seen = include_last_seen + + @property + def has_last_seen(self) -> bool: + return self.include_last_seen @classmethod def from_protobuf(cls, page_token: PageToken) -> Optional["ExportTraceItemsPageToken"]: if page_token == PageToken(): return None - filters = page_token.filter_offset.and_filter.filters - if len(filters) != 5: + if not page_token.filter_offset.HasField("and_filter"): raise ValueError("Invalid page token") - - if not ( - filters[0].comparison_filter.key.name == "last_seen_project_id" - and filters[0].comparison_filter.key.type == AttributeKey.Type.TYPE_INT - ): - raise ValueError("Invalid project id") - last_seen_project_id = filters[0].comparison_filter.value.val_int - if not ( - filters[1].comparison_filter.key.name == "last_seen_item_type" - and filters[1].comparison_filter.key.type == AttributeKey.Type.TYPE_INT - ): - raise ValueError("Invalid item type") - last_seen_item_type = filters[1].comparison_filter.value.val_int - - if not ( - filters[2].comparison_filter.key.name == "last_seen_timestamp" - and filters[2].comparison_filter.key.type == AttributeKey.Type.TYPE_DOUBLE - ): - raise ValueError("Invalid timestamp") - last_seen_timestamp = filters[2].comparison_filter.value.val_double - - if not ( - filters[3].comparison_filter.key.name == "last_seen_trace_id" - and filters[3].comparison_filter.key.type == AttributeKey.Type.TYPE_STRING - ): - raise ValueError("Invalid trace id") - last_seen_trace_id = filters[3].comparison_filter.value.val_str - - if not ( - filters[4].comparison_filter.key.name == "last_seen_item_id" - and filters[4].comparison_filter.key.type == AttributeKey.Type.TYPE_STRING - ): - raise ValueError("Invalid item id") - last_seen_item_id = filters[4].comparison_filter.value.val_str - - return cls( - last_seen_project_id, - cast(TraceItemType.ValueType, last_seen_item_type), - last_seen_timestamp, - last_seen_trace_id, - last_seen_item_id, - ) + filters = list(page_token.filter_offset.and_filter.filters) + n = len(filters) + if n == 2: + win = _parse_flex_window_from_filters(filters) + if win is None: + raise ValueError("Invalid page token") + w0, w1 = win + return cls( + window_start_sec=w0, + window_end_sec=w1, + include_last_seen=False, + ) + if n == 7: + win = _parse_flex_window_from_filters(filters[:2]) + if win is None: + raise ValueError("Invalid page token") + w0, w1 = win + pid, ity, lsts, ltr, lid = _parse_last_seen_tuple(filters[2:]) + return cls( + window_start_sec=w0, + window_end_sec=w1, + last_seen_project_id=pid, + last_seen_item_type=ity, + last_seen_timestamp=lsts, + last_seen_trace_id=ltr, + last_seen_item_id=lid, + include_last_seen=True, + ) + raise ValueError("Invalid page token: expected 2 or 7 filter clauses") def to_protobuf(self) -> PageToken: - filters = TraceItemFilter( - and_filter=AndFilter( - filters=[ + and_filters: list[TraceItemFilter] = list( + _flex_time_window_filters(self.window_start_sec, self.window_end_sec) + ) + if self.has_last_seen: + and_filters.extend( + [ TraceItemFilter( comparison_filter=ComparisonFilter( key=AttributeKey( @@ -165,12 +264,41 @@ def to_protobuf(self) -> PageToken: ), ] ) + if not and_filters: + raise ValueError("empty export page token") + return PageToken( + filter_offset=TraceItemFilter( + and_filter=AndFilter(filters=and_filters), + ) ) - return PageToken(filter_offset=filters) + + +def _is_flextime_export(in_msg: ExportTraceItemsRequest) -> bool: + if not in_msg.meta.HasField("downsampled_storage_config"): + return False + return ( + in_msg.meta.downsampled_storage_config.mode + == DownsampledStorageConfig.Mode.MODE_HIGHEST_ACCURACY_FLEXTIME + ) + + +def _export_query_meta( + in_msg: ExportTraceItemsRequest, routing_decision: RoutingDecision +) -> RequestMeta: + if routing_decision.time_window is None: + return in_msg.meta + meta = RequestMeta() + meta.CopyFrom(in_msg.meta) + meta.start_timestamp.CopyFrom(routing_decision.time_window.start_timestamp) + meta.end_timestamp.CopyFrom(routing_decision.time_window.end_timestamp) + return meta def _build_query( - in_msg: ExportTraceItemsRequest, limit: int, page_token: ExportTraceItemsPageToken | None = None + in_msg: ExportTraceItemsRequest, + limit: int, + page_token: ExportTraceItemsPageToken | None = None, + query_meta: RequestMeta | None = None, ) -> Query: selected_columns = [ SelectedExpression("timestamp", f.toUnixTimestamp(column("timestamp"), alias="timestamp")), @@ -244,13 +372,14 @@ def _build_query( ), ) ] - if page_token is not None + if page_token is not None and page_token.has_last_seen else [] ) + meta = query_meta if query_meta is not None else in_msg.meta query = Query( from_clause=entity, selected_columns=selected_columns, - condition=base_conditions_and(in_msg.meta, *(page_token_filter)), + condition=base_conditions_and(meta, *(page_token_filter)), order_by=[ # we add organization_id and project_id to the order by to optimize data reading # https://clickhouse.com/docs/sql-reference/statements/select/order-by#optimization-of-data-reading @@ -269,14 +398,28 @@ def _build_query( def _build_snuba_request( - in_msg: ExportTraceItemsRequest, limit: int, page_token: ExportTraceItemsPageToken | None = None + in_msg: ExportTraceItemsRequest, + routing_decision: RoutingDecision, + limit: int, + page_token: ExportTraceItemsPageToken | None = None, ) -> SnubaRequest: query_settings = setup_trace_query_settings() if in_msg.meta.debug else HTTPQuerySettings() + try: + routing_decision.strategy.merge_clickhouse_settings(routing_decision, query_settings) + query_settings.set_sampling_tier(routing_decision.tier) + except Exception as e: + sentry_sdk.capture_message(f"Error merging clickhouse settings: {e}") + query_settings.set_skip_transform_order_by(True) return SnubaRequest( id=uuid.UUID(in_msg.meta.request_id), original_body=MessageToDict(in_msg), - query=_build_query(in_msg, limit, page_token), + query=_build_query( + in_msg, + limit, + page_token, + query_meta=_export_query_meta(in_msg, routing_decision), + ), query_settings=query_settings, attribution_info=AttributionInfo( referrer=in_msg.meta.referrer, @@ -425,22 +568,39 @@ def _execute(self, in_msg: ExportTraceItemsRequest) -> ExportTraceItemsResponse: page_token = ExportTraceItemsPageToken.from_protobuf(in_msg.page_token) results = run_query( dataset=PluggableDataset(name="eap", all_entities=[]), - request=_build_snuba_request(in_msg, limit, page_token), + request=_build_snuba_request(in_msg, self.routing_decision, limit, page_token), timer=self._timer, ) rows = results.result.get("data", []) processed_results = _convert_rows(rows) + is_flex = _is_flextime_export(in_msg) + orig_start = in_msg.meta.start_timestamp.seconds + routed = self.routing_decision.time_window + + w_start = in_msg.meta.start_timestamp.seconds + w_end = in_msg.meta.end_timestamp.seconds + if routed is not None: + w_start, w_end = routed.start_timestamp.seconds, routed.end_timestamp.seconds next_token: PageToken | None = None if len(processed_results.items) >= limit: next_token = ExportTraceItemsPageToken( + window_start_sec=w_start, + window_end_sec=w_end, + include_last_seen=True, last_seen_project_id=processed_results.last_seen_project_id, last_seen_item_type=processed_results.last_seen_item_type, last_seen_trace_id=processed_results.last_seen_trace_id, last_seen_timestamp=processed_results.last_seen_timestamp, last_seen_item_id=processed_results.last_seen_item_id, ).to_protobuf() + elif is_flex and routed is not None and routed.start_timestamp.seconds > orig_start: + next_token = ExportTraceItemsPageToken( + window_start_sec=orig_start, + window_end_sec=routed.start_timestamp.seconds, + include_last_seen=False, + ).to_protobuf() else: next_token = PageToken(end_pagination=True) diff --git a/tests/web/rpc/v1/test_endpoint_export_trace_items.py b/tests/web/rpc/v1/test_endpoint_export_trace_items.py index 605f196342a..79b340a4e46 100644 --- a/tests/web/rpc/v1/test_endpoint_export_trace_items.py +++ b/tests/web/rpc/v1/test_endpoint_export_trace_items.py @@ -118,9 +118,10 @@ def test_with_pagination(self, setup_teardown: Any) -> None: response = EndpointExportTraceItems().execute(message) items.extend(response.trace_items) if len(response.trace_items) == 20: - assert response.page_token.end_pagination == False + assert response.page_token.end_pagination is False + assert len(response.page_token.filter_offset.and_filter.filters) == 7 else: - assert response.page_token.end_pagination == True + assert response.page_token.end_pagination, "End Pagination token mismatch" break message.page_token.CopyFrom(response.page_token) From a16822f6ec6d8f4d7f6b80016fbd59dcfd981cb3 Mon Sep 17 00:00:00 2001 From: Saraj Manes Date: Mon, 27 Apr 2026 12:20:18 -0400 Subject: [PATCH 2/3] comment updates --- snuba/web/rpc/v1/endpoint_export_trace_items.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/snuba/web/rpc/v1/endpoint_export_trace_items.py b/snuba/web/rpc/v1/endpoint_export_trace_items.py index 5ab68dfcc07..fd1cb2c00f8 100644 --- a/snuba/web/rpc/v1/endpoint_export_trace_items.py +++ b/snuba/web/rpc/v1/endpoint_export_trace_items.py @@ -140,13 +140,9 @@ def _parse_last_seen_tuple( class ExportTraceItemsPageToken: """Page token: always encodes the active [start,end) in unix seconds, shared with flex routing. - * **2 filters:** that window only — move to the next time slice (flex) with no keyset cursor. - * **7 filters:** same 2 time-window fields plus 5 equality fields — continue after the + 2 filters: that window only — move to the next time slice (flex) with no keyset cursor. + 7 filters: same 2 time-window fields plus 5 equality fields — continue after the last row within that window (keyset / tuple seek). - - The first request has an empty `page_token`. Every continuation request must resend - `RequestMeta` (user range) and send a token so routing can read - `sentry__time_window.*` and align with the slice being scanned. """ def __init__( From b46663ef03f99912b540ea796c8d728f1fffdd9a Mon Sep 17 00:00:00 2001 From: Saraj Manes Date: Tue, 28 Apr 2026 16:55:01 -0400 Subject: [PATCH 3/3] Simplify the page token little bit --- snuba/web/rpc/v1/endpoint_export_trace_items.py | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/snuba/web/rpc/v1/endpoint_export_trace_items.py b/snuba/web/rpc/v1/endpoint_export_trace_items.py index fd1cb2c00f8..e252d30f0b2 100644 --- a/snuba/web/rpc/v1/endpoint_export_trace_items.py +++ b/snuba/web/rpc/v1/endpoint_export_trace_items.py @@ -140,8 +140,8 @@ def _parse_last_seen_tuple( class ExportTraceItemsPageToken: """Page token: always encodes the active [start,end) in unix seconds, shared with flex routing. - 2 filters: that window only — move to the next time slice (flex) with no keyset cursor. - 7 filters: same 2 time-window fields plus 5 equality fields — continue after the + 2 filters: window only; move to the next time slice (flex) with no keyset cursor. + 7 filters: same 2 time-window fields plus 5 equality fields; continue after the last row within that window (keyset / tuple seek). """ @@ -155,7 +155,6 @@ def __init__( last_seen_timestamp: float = 0.0, last_seen_trace_id: str = "", last_seen_item_id: str = "", - include_last_seen: bool = False, ): self.window_start_sec = window_start_sec self.window_end_sec = window_end_sec @@ -164,11 +163,10 @@ def __init__( self.last_seen_timestamp = last_seen_timestamp self.last_seen_trace_id = last_seen_trace_id self.last_seen_item_id = last_seen_item_id - self.include_last_seen = include_last_seen @property def has_last_seen(self) -> bool: - return self.include_last_seen + return self.last_seen_item_id != "" @classmethod def from_protobuf(cls, page_token: PageToken) -> Optional["ExportTraceItemsPageToken"]: @@ -186,7 +184,6 @@ def from_protobuf(cls, page_token: PageToken) -> Optional["ExportTraceItemsPageT return cls( window_start_sec=w0, window_end_sec=w1, - include_last_seen=False, ) if n == 7: win = _parse_flex_window_from_filters(filters[:2]) @@ -202,7 +199,6 @@ def from_protobuf(cls, page_token: PageToken) -> Optional["ExportTraceItemsPageT last_seen_timestamp=lsts, last_seen_trace_id=ltr, last_seen_item_id=lid, - include_last_seen=True, ) raise ValueError("Invalid page token: expected 2 or 7 filter clauses") @@ -584,7 +580,6 @@ def _execute(self, in_msg: ExportTraceItemsRequest) -> ExportTraceItemsResponse: next_token = ExportTraceItemsPageToken( window_start_sec=w_start, window_end_sec=w_end, - include_last_seen=True, last_seen_project_id=processed_results.last_seen_project_id, last_seen_item_type=processed_results.last_seen_item_type, last_seen_trace_id=processed_results.last_seen_trace_id, @@ -595,7 +590,6 @@ def _execute(self, in_msg: ExportTraceItemsRequest) -> ExportTraceItemsResponse: next_token = ExportTraceItemsPageToken( window_start_sec=orig_start, window_end_sec=routed.start_timestamp.seconds, - include_last_seen=False, ).to_protobuf() else: next_token = PageToken(end_pagination=True)