diff --git a/python/lib/sift_client/_internal/low_level_wrappers/ingestion.py b/python/lib/sift_client/_internal/low_level_wrappers/ingestion.py index 2793167b6..ad248837f 100644 --- a/python/lib/sift_client/_internal/low_level_wrappers/ingestion.py +++ b/python/lib/sift_client/_internal/low_level_wrappers/ingestion.py @@ -11,7 +11,7 @@ import hashlib import logging from collections import namedtuple -from typing import TYPE_CHECKING, Iterable, cast +from typing import TYPE_CHECKING, Any, Iterable, cast from sift.ingestion_configs.v2.ingestion_configs_pb2 import ( ListIngestionConfigFlowsRequest, @@ -32,6 +32,9 @@ logger = logging.getLogger(__name__) +DEFAULT_INGESTION_CONFIG_PAGE_SIZE = 100 +"""Default page size for ingestion config and flow list calls (flow configs can be large).""" + if TYPE_CHECKING: from datetime import datetime @@ -94,26 +97,132 @@ def __init__(self, grpc_client: GrpcClient): """ super().__init__(grpc_client=grpc_client) - async def get_ingestion_config_flows(self, ingestion_config_id: str) -> list[FlowConfig]: - """Get the flows for an ingestion config.""" + async def list_ingestion_configs( + self, + filter_query: str | None = None, + page_size: int | None = DEFAULT_INGESTION_CONFIG_PAGE_SIZE, + page_token: str | None = None, + order_by: str | None = None, + ) -> tuple[list[IngestionConfig], str]: + """List ingestion configs (single page). + + Args: + filter_query: The CEL filter query. + page_size: Number of results per page. + page_token: Token for the next page. + order_by: Unused; accepted for _handle_pagination compatibility. + + Returns: + A tuple of (list of IngestionConfig, next_page_token). + """ + request_kwargs: dict[str, Any] = {} + + if page_size is not None: + request_kwargs["page_size"] = page_size + if page_token is not None: + request_kwargs["page_token"] = page_token + if filter_query is not None: + request_kwargs["filter"] = filter_query + if order_by is not None: + request_kwargs["order_by"] = order_by + + request = ListIngestionConfigsRequest(**request_kwargs) + res = await self._grpc_client.get_stub(IngestionConfigServiceStub).ListIngestionConfigs( + request + ) + res = cast("ListIngestionConfigsResponse", res) + configs = [IngestionConfig._from_proto(config) for config in res.ingestion_configs] + return configs, res.next_page_token + + async def list_all_ingestion_configs( + self, + filter_query: str | None = None, + page_size: int | None = DEFAULT_INGESTION_CONFIG_PAGE_SIZE, + max_results: int | None = None, + ) -> list[IngestionConfig]: + """List all ingestion configs matching the filter, using pagination. + + Args: + filter_query: The CEL filter query. + page_size: Number of results per page. + max_results: Maximum total results to return; None for no limit. + + Returns: + A list of all matching IngestionConfigs. + """ + return await self._handle_pagination( + self.list_ingestion_configs, + kwargs={"filter_query": filter_query}, + page_size=page_size, + max_results=max_results, + ) + + async def list_ingestion_config_flows( + self, + ingestion_config_id: str, + page_size: int | None = DEFAULT_INGESTION_CONFIG_PAGE_SIZE, + page_token: str | None = None, + order_by: str | None = None, + filter_query: str | None = None, + ) -> tuple[list[FlowConfig], str]: + """List ingestion config flows (single page). + + Args: + ingestion_config_id: The ingestion config ID. + page_size: Number of results per page. + page_token: Token for the next page. + order_by: Unused; accepted for _handle_pagination compatibility. + filter_query: Optional CEL filter for flows. + + Returns: + A tuple of (list of FlowConfig, next_page_token). + """ + request_kwargs: dict[str, Any] = {"ingestion_config_id": ingestion_config_id} + + if page_size is not None: + request_kwargs["page_size"] = page_size + if page_token is not None: + request_kwargs["page_token"] = page_token + if filter_query is not None: + request_kwargs["filter"] = filter_query + if order_by is not None: + request_kwargs["order_by"] = order_by + + request = ListIngestionConfigFlowsRequest(**request_kwargs) res = await self._grpc_client.get_stub(IngestionConfigServiceStub).ListIngestionConfigFlows( - ListIngestionConfigFlowsRequest(ingestion_config_id=ingestion_config_id) + request ) res = cast("ListIngestionConfigFlowsResponse", res) - return [FlowConfig._from_proto(flow) for flow in res.flows] + flows = [FlowConfig._from_proto(flow) for flow in res.flows] + return flows, res.next_page_token - async def list_ingestion_configs(self, filter_query: str) -> list[IngestionConfig]: - """List ingestion configs.""" - res = await self._grpc_client.get_stub(IngestionConfigServiceStub).ListIngestionConfigs( - ListIngestionConfigsRequest(filter=filter_query) + async def get_ingestion_config_flows( + self, + ingestion_config_id: str, + page_size: int | None = DEFAULT_INGESTION_CONFIG_PAGE_SIZE, + max_results: int | None = None, + ) -> list[FlowConfig]: + """Get all flows for an ingestion config, using pagination. + + Args: + ingestion_config_id: The ingestion config ID. + page_size: Number of results per page. + max_results: Maximum total results to return; None for no limit. + + Returns: + A list of all FlowConfigs for the ingestion config. + """ + return await self._handle_pagination( + self.list_ingestion_config_flows, + kwargs={"ingestion_config_id": ingestion_config_id}, + page_size=page_size, + max_results=max_results, ) - res = cast("ListIngestionConfigsResponse", res) - return [IngestionConfig._from_proto(config) for config in res.ingestion_configs] async def get_ingestion_config_id_from_client_key(self, client_key: str) -> str | None: """Get the ingestion config id.""" filter_query = cel.equals("client_key", client_key) - ingestion_configs = await self.list_ingestion_configs(filter_query) + ingestion_configs = await self.list_all_ingestion_configs(filter_query) if not ingestion_configs: return None if len(ingestion_configs) > 1: diff --git a/python/lib/sift_client/_tests/_internal/low_level_wrappers/test_ingestion.py b/python/lib/sift_client/_tests/_internal/low_level_wrappers/test_ingestion.py index cb6b13d1d..d3ba247fe 100644 --- a/python/lib/sift_client/_tests/_internal/low_level_wrappers/test_ingestion.py +++ b/python/lib/sift_client/_tests/_internal/low_level_wrappers/test_ingestion.py @@ -31,7 +31,7 @@ async def test_get_ingestion_config_flows(ingestion_low_level_client, sift_clien """ # First, we need to find an ingestion config to test with # We'll list ingestion configs and use the first one's client_key - ingestion_configs = await ingestion_low_level_client.list_ingestion_configs("") + ingestion_configs = await ingestion_low_level_client.list_all_ingestion_configs() if not ingestion_configs: pytest.skip("No ingestion configs available for testing")