From a3d343dc7f3ef2be19fcde380d65dcb2e69d17c0 Mon Sep 17 00:00:00 2001 From: Taylor Jones Date: Tue, 10 Feb 2026 15:51:27 -0600 Subject: [PATCH 1/4] python(bug): Add pagination for ingestion config/flow functions --- .../_internal/low_level_wrappers/ingestion.py | 128 ++++++++++++++++-- .../low_level_wrappers/test_ingestion.py | 2 +- 2 files changed, 117 insertions(+), 13 deletions(-) diff --git a/python/lib/sift_client/_internal/low_level_wrappers/ingestion.py b/python/lib/sift_client/_internal/low_level_wrappers/ingestion.py index 2793167b6..df2c659d3 100644 --- a/python/lib/sift_client/_internal/low_level_wrappers/ingestion.py +++ b/python/lib/sift_client/_internal/low_level_wrappers/ingestion.py @@ -11,7 +11,7 @@ import hashlib import logging from collections import namedtuple -from typing import TYPE_CHECKING, Iterable, cast +from typing import TYPE_CHECKING, Any, Iterable, cast from sift.ingestion_configs.v2.ingestion_configs_pb2 import ( ListIngestionConfigFlowsRequest, @@ -32,6 +32,9 @@ logger = logging.getLogger(__name__) +DEFAULT_INGESTION_PAGE_SIZE = 100 +"""Default page size for ingestion config and flow list calls (flow configs can be large).""" + if TYPE_CHECKING: from datetime import datetime @@ -94,26 +97,127 @@ def __init__(self, grpc_client: GrpcClient): """ super().__init__(grpc_client=grpc_client) - async def get_ingestion_config_flows(self, ingestion_config_id: str) -> list[FlowConfig]: - """Get the flows for an ingestion config.""" + async def list_ingestion_configs( + self, + filter_query: str, + page_size: int | None = DEFAULT_INGESTION_PAGE_SIZE, + page_token: str | None = None, + order_by: str | None = None, + ) -> tuple[list[IngestionConfig], str]: + """List ingestion configs (single page). + + Args: + filter_query: The CEL filter query. + page_size: Number of results per page. + page_token: Token for the next page. + order_by: Unused; accepted for _handle_pagination compatibility. + + Returns: + A tuple of (list of IngestionConfig, next_page_token). + """ + request_kwargs: dict[str, Any] = { + "filter": filter_query, + "page_token": page_token or "", + } + if page_size is not None: + request_kwargs["page_size"] = page_size + request = ListIngestionConfigsRequest(**request_kwargs) + res = await self._grpc_client.get_stub(IngestionConfigServiceStub).ListIngestionConfigs( + request + ) + res = cast("ListIngestionConfigsResponse", res) + configs = [IngestionConfig._from_proto(config) for config in res.ingestion_configs] + return configs, res.next_page_token + + async def list_all_ingestion_configs( + self, + filter_query: str, + page_size: int | None = DEFAULT_INGESTION_PAGE_SIZE, + max_results: int | None = None, + ) -> list[IngestionConfig]: + """List all ingestion configs matching the filter, using pagination. + + Args: + filter_query: The CEL filter query. + page_size: Number of results per page. + max_results: Maximum total results to return; None for no limit. + + Returns: + A list of all matching IngestionConfigs. + """ + return await self._handle_pagination( + self.list_ingestion_configs, + kwargs={"filter_query": filter_query}, + page_size=page_size, + max_results=max_results, + ) + + async def list_ingestion_config_flows( + self, + ingestion_config_id: str, + page_size: int | None = DEFAULT_INGESTION_PAGE_SIZE, + page_token: str | None = None, + order_by: str | None = None, + query_filter: str = "", + ) -> tuple[list[FlowConfig], str]: + """List ingestion config flows (single page). + + Args: + ingestion_config_id: The ingestion config ID. + page_size: Number of results per page. + page_token: Token for the next page. + order_by: Unused; accepted for _handle_pagination compatibility. + query_filter: Optional CEL filter for flows. + + Returns: + A tuple of (list of FlowConfig, next_page_token). + """ + request_kwargs: dict[str, Any] = {"ingestion_config_id": ingestion_config_id} + + if page_size is not None: + request_kwargs["page_size"] = page_size + if page_token is not None: + request_kwargs["page_token"] = page_token + if query_filter is not None: + request_kwargs["filter"] = query_filter + if order_by is not None: + request_kwargs["order_by"] = order_by + + request = ListIngestionConfigFlowsRequest(**request_kwargs) res = await self._grpc_client.get_stub(IngestionConfigServiceStub).ListIngestionConfigFlows( - ListIngestionConfigFlowsRequest(ingestion_config_id=ingestion_config_id) + request ) res = cast("ListIngestionConfigFlowsResponse", res) - return [FlowConfig._from_proto(flow) for flow in res.flows] + flows = [FlowConfig._from_proto(flow) for flow in res.flows] + return flows, res.next_page_token - async def list_ingestion_configs(self, filter_query: str) -> list[IngestionConfig]: - """List ingestion configs.""" - res = await self._grpc_client.get_stub(IngestionConfigServiceStub).ListIngestionConfigs( - ListIngestionConfigsRequest(filter=filter_query) + async def get_ingestion_config_flows( + self, + ingestion_config_id: str, + page_size: int | None = DEFAULT_INGESTION_PAGE_SIZE, + max_results: int | None = None, + ) -> list[FlowConfig]: + """Get all flows for an ingestion config, using pagination. + + Args: + ingestion_config_id: The ingestion config ID. + page_size: Number of results per page. + max_results: Maximum total results to return; None for no limit. + + Returns: + A list of all FlowConfigs for the ingestion config. + """ + return await self._handle_pagination( + self.list_ingestion_config_flows, + kwargs={"ingestion_config_id": ingestion_config_id}, + page_size=page_size, + max_results=max_results, ) - res = cast("ListIngestionConfigsResponse", res) - return [IngestionConfig._from_proto(config) for config in res.ingestion_configs] async def get_ingestion_config_id_from_client_key(self, client_key: str) -> str | None: """Get the ingestion config id.""" filter_query = cel.equals("client_key", client_key) - ingestion_configs = await self.list_ingestion_configs(filter_query) + ingestion_configs = await self.list_all_ingestion_configs(filter_query) if not ingestion_configs: return None if len(ingestion_configs) > 1: diff --git a/python/lib/sift_client/_tests/_internal/low_level_wrappers/test_ingestion.py b/python/lib/sift_client/_tests/_internal/low_level_wrappers/test_ingestion.py index cb6b13d1d..ae22d3c19 100644 --- a/python/lib/sift_client/_tests/_internal/low_level_wrappers/test_ingestion.py +++ b/python/lib/sift_client/_tests/_internal/low_level_wrappers/test_ingestion.py @@ -31,7 +31,7 @@ async def test_get_ingestion_config_flows(ingestion_low_level_client, sift_clien """ # First, we need to find an ingestion config to test with # We'll list ingestion configs and use the first one's client_key - ingestion_configs = await ingestion_low_level_client.list_ingestion_configs("") + ingestion_configs = await ingestion_low_level_client.list_all_ingestion_configs("") if not ingestion_configs: pytest.skip("No ingestion configs available for testing") From 1e849fd57e210b026b3b4c771cf84e06c0fdf794 Mon Sep 17 00:00:00 2001 From: Taylor Jones Date: Tue, 10 Feb 2026 16:42:06 -0600 Subject: [PATCH 2/4] PR feedback: renames --- .../_internal/low_level_wrappers/ingestion.py | 31 +++++++++++-------- 1 file changed, 18 insertions(+), 13 deletions(-) diff --git a/python/lib/sift_client/_internal/low_level_wrappers/ingestion.py b/python/lib/sift_client/_internal/low_level_wrappers/ingestion.py index df2c659d3..075e3f97e 100644 --- a/python/lib/sift_client/_internal/low_level_wrappers/ingestion.py +++ b/python/lib/sift_client/_internal/low_level_wrappers/ingestion.py @@ -32,7 +32,7 @@ logger = logging.getLogger(__name__) -DEFAULT_INGESTION_PAGE_SIZE = 100 +DEFAULT_INGESTION_CONFIG_PAGE_SIZE = 100 """Default page size for ingestion config and flow list calls (flow configs can be large).""" if TYPE_CHECKING: @@ -100,7 +100,7 @@ def __init__(self, grpc_client: GrpcClient): async def list_ingestion_configs( self, filter_query: str, - page_size: int | None = DEFAULT_INGESTION_PAGE_SIZE, + page_size: int | None = DEFAULT_INGESTION_CONFIG_PAGE_SIZE, page_token: str | None = None, order_by: str | None = None, ) -> tuple[list[IngestionConfig], str]: @@ -115,12 +115,17 @@ async def list_ingestion_configs( Returns: A tuple of (list of IngestionConfig, next_page_token). """ - request_kwargs: dict[str, Any] = { - "filter": filter_query, - "page_token": page_token or "", - } + request_kwargs: dict[str, Any] = {} + if page_size is not None: request_kwargs["page_size"] = page_size + if page_token is not None: + request_kwargs["page_token"] = page_token + if filter_query is not None: + request_kwargs["filter"] = filter_query + if order_by is not None: + request_kwargs["order_by"] = order_by + request = ListIngestionConfigsRequest(**request_kwargs) res = await self._grpc_client.get_stub(IngestionConfigServiceStub).ListIngestionConfigs( request @@ -132,7 +137,7 @@ async def list_ingestion_configs( async def list_all_ingestion_configs( self, filter_query: str, - page_size: int | None = DEFAULT_INGESTION_PAGE_SIZE, + page_size: int | None = DEFAULT_INGESTION_CONFIG_PAGE_SIZE, max_results: int | None = None, ) -> list[IngestionConfig]: """List all ingestion configs matching the filter, using pagination. @@ -155,10 +160,10 @@ async def list_all_ingestion_configs( async def list_ingestion_config_flows( self, ingestion_config_id: str, - page_size: int | None = DEFAULT_INGESTION_PAGE_SIZE, + page_size: int | None = DEFAULT_INGESTION_CONFIG_PAGE_SIZE, page_token: str | None = None, order_by: str | None = None, - query_filter: str = "", + filter_query: str = "", ) -> tuple[list[FlowConfig], str]: """List ingestion config flows (single page). @@ -167,7 +172,7 @@ async def list_ingestion_config_flows( page_size: Number of results per page. page_token: Token for the next page. order_by: Unused; accepted for _handle_pagination compatibility. - query_filter: Optional CEL filter for flows. + filter_query: Optional CEL filter for flows. Returns: A tuple of (list of FlowConfig, next_page_token). @@ -178,8 +183,8 @@ async def list_ingestion_config_flows( request_kwargs["page_size"] = page_size if page_token is not None: request_kwargs["page_token"] = page_token - if query_filter is not None: - request_kwargs["filter"] = query_filter + if filter_query is not None: + request_kwargs["filter"] = filter_query if order_by is not None: request_kwargs["order_by"] = order_by @@ -194,7 +199,7 @@ async def list_ingestion_config_flows( async def get_ingestion_config_flows( self, ingestion_config_id: str, - page_size: int | None = DEFAULT_INGESTION_PAGE_SIZE, + page_size: int | None = DEFAULT_INGESTION_CONFIG_PAGE_SIZE, max_results: int | None = None, ) -> list[FlowConfig]: """Get all flows for an ingestion config, using pagination. From 783beae49c3479c0156154861335eb5af2ab17a9 Mon Sep 17 00:00:00 2001 From: Taylor Jones Date: Wed, 11 Feb 2026 12:06:35 -0600 Subject: [PATCH 3/4] filter_query optional --- .../sift_client/_internal/low_level_wrappers/ingestion.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/python/lib/sift_client/_internal/low_level_wrappers/ingestion.py b/python/lib/sift_client/_internal/low_level_wrappers/ingestion.py index 075e3f97e..ad248837f 100644 --- a/python/lib/sift_client/_internal/low_level_wrappers/ingestion.py +++ b/python/lib/sift_client/_internal/low_level_wrappers/ingestion.py @@ -99,7 +99,7 @@ def __init__(self, grpc_client: GrpcClient): async def list_ingestion_configs( self, - filter_query: str, + filter_query: str | None = None, page_size: int | None = DEFAULT_INGESTION_CONFIG_PAGE_SIZE, page_token: str | None = None, order_by: str | None = None, @@ -136,7 +136,7 @@ async def list_ingestion_configs( async def list_all_ingestion_configs( self, - filter_query: str, + filter_query: str | None = None, page_size: int | None = DEFAULT_INGESTION_CONFIG_PAGE_SIZE, max_results: int | None = None, ) -> list[IngestionConfig]: @@ -163,7 +163,7 @@ async def list_ingestion_config_flows( page_size: int | None = DEFAULT_INGESTION_CONFIG_PAGE_SIZE, page_token: str | None = None, order_by: str | None = None, - filter_query: str = "", + filter_query: str | None = None, ) -> tuple[list[FlowConfig], str]: """List ingestion config flows (single page). From fec59d532224c2cb2dd4cec42e6533c26cf28b39 Mon Sep 17 00:00:00 2001 From: Taylor Jones Date: Wed, 11 Feb 2026 12:16:29 -0600 Subject: [PATCH 4/4] And test doesn't need query anymore --- .../_tests/_internal/low_level_wrappers/test_ingestion.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/lib/sift_client/_tests/_internal/low_level_wrappers/test_ingestion.py b/python/lib/sift_client/_tests/_internal/low_level_wrappers/test_ingestion.py index ae22d3c19..d3ba247fe 100644 --- a/python/lib/sift_client/_tests/_internal/low_level_wrappers/test_ingestion.py +++ b/python/lib/sift_client/_tests/_internal/low_level_wrappers/test_ingestion.py @@ -31,7 +31,7 @@ async def test_get_ingestion_config_flows(ingestion_low_level_client, sift_clien """ # First, we need to find an ingestion config to test with # We'll list ingestion configs and use the first one's client_key - ingestion_configs = await ingestion_low_level_client.list_all_ingestion_configs("") + ingestion_configs = await ingestion_low_level_client.list_all_ingestion_configs() if not ingestion_configs: pytest.skip("No ingestion configs available for testing")