diff --git a/.github/workflows/python_ci.yaml b/.github/workflows/python_ci.yaml index 7bf425a2d..ff2702b1d 100644 --- a/.github/workflows/python_ci.yaml +++ b/.github/workflows/python_ci.yaml @@ -62,13 +62,13 @@ jobs: run: | pytest -m "not integration" - #- name: Pytest Integration Tests - #env: - #SIFT_GRPC_URI: ${{ vars.SIFT_GRPC_URI }} - #SIFT_REST_URI: ${{ vars.SIFT_REST_URI }} - #SIFT_API_KEY: ${{ secrets.SIFT_API_KEY }} - #run: | - #pytest -m "integration" + - name: Pytest Integration Tests + env: + SIFT_GRPC_URI: ${{ vars.SIFT_GRPC_URI }} + SIFT_REST_URI: ${{ vars.SIFT_REST_URI }} + SIFT_API_KEY: ${{ secrets.SIFT_API_KEY }} + run: | + pytest -m "integration" - name: Sync Stubs Mypy working-directory: python/lib diff --git a/python/lib/sift_client/_internal/low_level_wrappers/assets.py b/python/lib/sift_client/_internal/low_level_wrappers/assets.py index 9c42bd74e..72da1adce 100644 --- a/python/lib/sift_client/_internal/low_level_wrappers/assets.py +++ b/python/lib/sift_client/_internal/low_level_wrappers/assets.py @@ -15,6 +15,7 @@ from sift.assets.v1.assets_pb2_grpc import AssetServiceStub from sift_client._internal.low_level_wrappers.base import ( + DEFAULT_PAGE_SIZE, LowLevelClientBase, ) from sift_client.sift_types.asset import Asset, AssetUpdate @@ -46,7 +47,7 @@ async def list_all_assets( query_filter: str | None = None, order_by: str | None = None, max_results: int | None = None, - page_size: int | None = None, + page_size: int | None = DEFAULT_PAGE_SIZE, ) -> list[Asset]: """List all results matching the given query. @@ -70,7 +71,7 @@ async def list_all_assets( async def list_assets( self, - page_size: int | None = None, + page_size: int | None = DEFAULT_PAGE_SIZE, page_token: str | None = None, query_filter: str | None = None, order_by: str | None = None, diff --git a/python/lib/sift_client/_internal/low_level_wrappers/base.py b/python/lib/sift_client/_internal/low_level_wrappers/base.py index d349e51a2..3b0684307 100644 --- a/python/lib/sift_client/_internal/low_level_wrappers/base.py +++ b/python/lib/sift_client/_internal/low_level_wrappers/base.py @@ -3,6 +3,9 @@ from abc import ABC from typing import Any, Callable +DEFAULT_PAGE_SIZE = 1000 +"""Default page size to use for pagination.""" + class LowLevelClientBase(ABC): @staticmethod @@ -35,6 +38,11 @@ async def _handle_pagination( return results if page_token is None: page_token = "" + + # No point in querying more results than needed if limited by max_results. + if max_results is not None and page_size is not None and page_size > max_results: + page_size = max_results + while True: if max_results is not None and len(results) >= max_results: break diff --git a/python/lib/sift_client/_internal/low_level_wrappers/calculated_channels.py b/python/lib/sift_client/_internal/low_level_wrappers/calculated_channels.py index 890146d78..35421f28c 100644 --- a/python/lib/sift_client/_internal/low_level_wrappers/calculated_channels.py +++ b/python/lib/sift_client/_internal/low_level_wrappers/calculated_channels.py @@ -17,7 +17,7 @@ ) from sift.calculated_channels.v2.calculated_channels_pb2_grpc import CalculatedChannelServiceStub -from sift_client._internal.low_level_wrappers.base import LowLevelClientBase +from sift_client._internal.low_level_wrappers.base import DEFAULT_PAGE_SIZE, LowLevelClientBase from sift_client.sift_types.calculated_channel import ( CalculatedChannel, CalculatedChannelCreate, @@ -91,7 +91,7 @@ async def list_all_calculated_channels( query_filter: str | None = None, order_by: str | None = None, max_results: int | None = None, - page_size: int | None = None, + page_size: int | None = DEFAULT_PAGE_SIZE, organization_id: str | None = None, ) -> list[CalculatedChannel]: """List all calculated channels matching the given query. @@ -117,7 +117,7 @@ async def list_all_calculated_channels( async def list_calculated_channels( self, *, - page_size: int | None = None, + page_size: int | None = DEFAULT_PAGE_SIZE, page_token: str | None = None, query_filter: str | None = None, order_by: str | None = None, @@ -198,7 +198,7 @@ async def list_calculated_channel_versions( calculated_channel_id: str | None = None, client_key: str | None = None, organization_id: str | None = None, - page_size: int | None = None, + page_size: int | None = DEFAULT_PAGE_SIZE, page_token: str | None = None, query_filter: str | None = None, order_by: str | None = None, diff --git a/python/lib/sift_client/_internal/low_level_wrappers/channels.py b/python/lib/sift_client/_internal/low_level_wrappers/channels.py index 754a61cd3..0eed24ae8 100644 --- a/python/lib/sift_client/_internal/low_level_wrappers/channels.py +++ b/python/lib/sift_client/_internal/low_level_wrappers/channels.py @@ -59,7 +59,7 @@ async def get_channel(self, channel_id: str) -> Channel: async def list_channels( self, *, - page_size: int | None = None, + page_size: int | None = CHANNELS_DEFAULT_PAGE_SIZE, page_token: str | None = None, query_filter: str | None = None, order_by: str | None = None, @@ -97,6 +97,7 @@ async def list_all_channels( *, query_filter: str | None = None, order_by: str | None = None, + page_size: int | None = CHANNELS_DEFAULT_PAGE_SIZE, max_results: int | None = None, ) -> list[Channel]: """List all channels with optional filtering. @@ -109,10 +110,6 @@ async def list_all_channels( Returns: A list of all matching channels. """ - # Channels default page size is 10,000 so lower it if we're passing max_results - page_size = None - if max_results is not None and max_results <= CHANNELS_DEFAULT_PAGE_SIZE: - page_size = max_results return await self._handle_pagination( self.list_channels, kwargs={"query_filter": query_filter}, diff --git a/python/lib/sift_client/_internal/low_level_wrappers/data.py b/python/lib/sift_client/_internal/low_level_wrappers/data.py index d76b9a787..af469ba71 100644 --- a/python/lib/sift_client/_internal/low_level_wrappers/data.py +++ b/python/lib/sift_client/_internal/low_level_wrappers/data.py @@ -249,9 +249,6 @@ async def get_channel_data( tasks = [] # Queue up calls for non-cached channels in batches. batch_size = REQUEST_BATCH_SIZE - page_size = None - if max_results is not None and max_results <= CHANNELS_DEFAULT_PAGE_SIZE: - page_size = max_results for i in range(0, len(not_cached_channels), batch_size): # type: ignore batch = not_cached_channels[i : i + batch_size] # type: ignore diff --git a/python/lib/sift_client/_internal/low_level_wrappers/ingestion.py b/python/lib/sift_client/_internal/low_level_wrappers/ingestion.py index 6af319992..2793167b6 100644 --- a/python/lib/sift_client/_internal/low_level_wrappers/ingestion.py +++ b/python/lib/sift_client/_internal/low_level_wrappers/ingestion.py @@ -14,7 +14,7 @@ from typing import TYPE_CHECKING, Iterable, cast from sift.ingestion_configs.v2.ingestion_configs_pb2 import ( - GetIngestionConfigRequest, + ListIngestionConfigFlowsRequest, ListIngestionConfigFlowsResponse, ListIngestionConfigsRequest, ListIngestionConfigsResponse, @@ -96,8 +96,8 @@ def __init__(self, grpc_client: GrpcClient): async def get_ingestion_config_flows(self, ingestion_config_id: str) -> list[FlowConfig]: """Get the flows for an ingestion config.""" - res = await self._grpc_client.get_stub(IngestionConfigServiceStub).GetIngestionConfig( - GetIngestionConfigRequest(ingestion_config_id=ingestion_config_id) + res = await self._grpc_client.get_stub(IngestionConfigServiceStub).ListIngestionConfigFlows( + ListIngestionConfigFlowsRequest(ingestion_config_id=ingestion_config_id) ) res = cast("ListIngestionConfigFlowsResponse", res) return [FlowConfig._from_proto(flow) for flow in res.flows] diff --git a/python/lib/sift_client/_internal/low_level_wrappers/jobs.py b/python/lib/sift_client/_internal/low_level_wrappers/jobs.py index 59a9b69fd..4fc8013e1 100644 --- a/python/lib/sift_client/_internal/low_level_wrappers/jobs.py +++ b/python/lib/sift_client/_internal/low_level_wrappers/jobs.py @@ -19,7 +19,7 @@ ) from sift.jobs.v1.jobs_pb2_grpc import JobServiceStub -from sift_client._internal.low_level_wrappers.base import LowLevelClientBase +from sift_client._internal.low_level_wrappers.base import DEFAULT_PAGE_SIZE, LowLevelClientBase from sift_client.sift_types.job import Job from sift_client.transport import GrpcClient, WithGrpcClient @@ -41,7 +41,7 @@ def __init__(self, grpc_client: GrpcClient): async def list_jobs( self, *, - page_size: int | None = None, + page_size: int | None = DEFAULT_PAGE_SIZE, page_token: str | None = None, query_filter: str | None = None, organization_id: str | None = None, @@ -83,6 +83,7 @@ async def list_all_jobs( query_filter: str | None = None, organization_id: str | None = None, order_by: str | None = None, + page_size: int | None = DEFAULT_PAGE_SIZE, max_results: int | None = None, ) -> list[Job]: """List all jobs, handling pagination automatically. @@ -107,6 +108,7 @@ async def list_all_jobs( kwargs=kwargs, order_by=order_by, max_results=max_results, + page_size=page_size, ) return jobs diff --git a/python/lib/sift_client/_internal/low_level_wrappers/remote_files.py b/python/lib/sift_client/_internal/low_level_wrappers/remote_files.py index b1ef49833..b481e5433 100644 --- a/python/lib/sift_client/_internal/low_level_wrappers/remote_files.py +++ b/python/lib/sift_client/_internal/low_level_wrappers/remote_files.py @@ -18,6 +18,7 @@ from sift.remote_files.v1.remote_files_pb2_grpc import RemoteFileServiceStub from sift_client._internal.low_level_wrappers.base import ( + DEFAULT_PAGE_SIZE, LowLevelClientBase, ) from sift_client.transport import GrpcClient, WithGrpcClient @@ -64,7 +65,7 @@ async def list_all_remote_files( self, query_filter: str | None = None, max_results: int | None = None, - page_size: int | None = None, + page_size: int | None = DEFAULT_PAGE_SIZE, order_by: str | None = None, sift_client: SiftClient | None = None, ) -> list[FileAttachment]: @@ -91,7 +92,7 @@ async def list_all_remote_files( async def list_remote_files( self, - page_size: int | None = None, + page_size: int | None = DEFAULT_PAGE_SIZE, page_token: str | None = None, query_filter: str | None = None, order_by: str | None = None, diff --git a/python/lib/sift_client/_internal/low_level_wrappers/reports.py b/python/lib/sift_client/_internal/low_level_wrappers/reports.py index ab7178166..88884361b 100644 --- a/python/lib/sift_client/_internal/low_level_wrappers/reports.py +++ b/python/lib/sift_client/_internal/low_level_wrappers/reports.py @@ -105,6 +105,7 @@ async def list_all_reports( query_filter: str | None = None, organization_id: str | None = None, order_by: str | None = None, + page_size: int | None = None, max_results: int | None = None, ) -> list[Report]: """List all reports with optional filtering. @@ -126,6 +127,7 @@ async def list_all_reports( }, order_by=order_by, max_results=max_results, + page_size=page_size, ) async def rerun_report(self, report_id: str) -> tuple[str, str]: diff --git a/python/lib/sift_client/_internal/low_level_wrappers/rules.py b/python/lib/sift_client/_internal/low_level_wrappers/rules.py index d93959141..08a279b85 100644 --- a/python/lib/sift_client/_internal/low_level_wrappers/rules.py +++ b/python/lib/sift_client/_internal/low_level_wrappers/rules.py @@ -37,7 +37,7 @@ ) from sift.rules.v1.rules_pb2_grpc import RuleServiceStub -from sift_client._internal.low_level_wrappers.base import LowLevelClientBase +from sift_client._internal.low_level_wrappers.base import DEFAULT_PAGE_SIZE, LowLevelClientBase from sift_client._internal.low_level_wrappers.reports import ReportsLowLevelClient from sift_client._internal.util.timestamp import to_pb_timestamp from sift_client._internal.util.util import count_non_none @@ -495,7 +495,7 @@ async def list_all_rules( filter_query: str | None = None, order_by: str | None = None, max_results: int | None = None, - page_size: int | None = None, + page_size: int | None = DEFAULT_PAGE_SIZE, ) -> list[Rule]: """List all rules.""" return await self._handle_pagination( diff --git a/python/lib/sift_client/_internal/low_level_wrappers/runs.py b/python/lib/sift_client/_internal/low_level_wrappers/runs.py index b9c6efb09..12d5b8a75 100644 --- a/python/lib/sift_client/_internal/low_level_wrappers/runs.py +++ b/python/lib/sift_client/_internal/low_level_wrappers/runs.py @@ -18,7 +18,7 @@ ) from sift.runs.v2.runs_pb2_grpc import RunServiceStub -from sift_client._internal.low_level_wrappers.base import LowLevelClientBase +from sift_client._internal.low_level_wrappers.base import DEFAULT_PAGE_SIZE, LowLevelClientBase from sift_client._internal.util.timestamp import to_pb_timestamp from sift_client.sift_types.run import Run, RunCreate, RunUpdate from sift_client.transport import WithGrpcClient @@ -66,7 +66,7 @@ async def get_run(self, run_id: str) -> Run: async def list_runs( self, *, - page_size: int | None = None, + page_size: int | None = DEFAULT_PAGE_SIZE, page_token: str | None = None, query_filter: str | None = None, order_by: str | None = None, @@ -104,6 +104,7 @@ async def list_all_runs( *, query_filter: str | None = None, order_by: str | None = None, + page_size: int | None = DEFAULT_PAGE_SIZE, max_results: int | None = None, ) -> list[Run]: """List all runs with optional filtering. @@ -121,6 +122,7 @@ async def list_all_runs( kwargs={"query_filter": query_filter}, order_by=order_by, max_results=max_results, + page_size=page_size, ) async def create_run(self, *, create: RunCreate) -> Run: diff --git a/python/lib/sift_client/_internal/low_level_wrappers/tags.py b/python/lib/sift_client/_internal/low_level_wrappers/tags.py index 4ae12a4e4..5688f8c73 100644 --- a/python/lib/sift_client/_internal/low_level_wrappers/tags.py +++ b/python/lib/sift_client/_internal/low_level_wrappers/tags.py @@ -11,7 +11,7 @@ ) from sift.tags.v2.tags_pb2_grpc import TagServiceStub -from sift_client._internal.low_level_wrappers.base import LowLevelClientBase +from sift_client._internal.low_level_wrappers.base import DEFAULT_PAGE_SIZE, LowLevelClientBase from sift_client.sift_types.tag import Tag from sift_client.transport import WithGrpcClient @@ -59,7 +59,7 @@ async def create_tag(self, name: str) -> Tag: async def list_tags( self, *, - page_size: int | None = None, + page_size: int | None = DEFAULT_PAGE_SIZE, page_token: str | None = None, query_filter: str | None = None, order_by: str | None = None, @@ -97,6 +97,7 @@ async def list_all_tags( *, query_filter: str | None = None, order_by: str | None = None, + page_size: int | None = DEFAULT_PAGE_SIZE, max_results: int | None = None, ) -> list[Tag]: """List all tags with optional filtering. @@ -114,4 +115,5 @@ async def list_all_tags( kwargs={"query_filter": query_filter}, order_by=order_by, max_results=max_results, + page_size=page_size, ) diff --git a/python/lib/sift_client/_internal/low_level_wrappers/test_results.py b/python/lib/sift_client/_internal/low_level_wrappers/test_results.py index d0cf8b827..94cbc8950 100644 --- a/python/lib/sift_client/_internal/low_level_wrappers/test_results.py +++ b/python/lib/sift_client/_internal/low_level_wrappers/test_results.py @@ -33,7 +33,7 @@ ) from sift.test_reports.v1.test_reports_pb2_grpc import TestReportServiceStub -from sift_client._internal.low_level_wrappers.base import LowLevelClientBase +from sift_client._internal.low_level_wrappers.base import DEFAULT_PAGE_SIZE, LowLevelClientBase from sift_client.sift_types.test_report import ( TestMeasurement, TestMeasurementCreate, @@ -129,7 +129,7 @@ async def get_test_report(self, test_report_id: str) -> TestReport: async def list_test_reports( self, *, - page_size: int | None = None, + page_size: int | None = DEFAULT_PAGE_SIZE, page_token: str | None = None, query_filter: str | None = None, order_by: str | None = None, @@ -167,6 +167,7 @@ async def list_all_test_reports( *, query_filter: str | None = None, order_by: str | None = None, + page_size: int | None = DEFAULT_PAGE_SIZE, max_results: int | None = None, ) -> list[TestReport]: """List all test reports with optional filtering. @@ -184,6 +185,7 @@ async def list_all_test_reports( kwargs={"query_filter": query_filter}, order_by=order_by, max_results=max_results, + page_size=page_size, ) async def update_test_report(self, update: TestReportUpdate) -> TestReport: @@ -274,6 +276,7 @@ async def list_all_test_steps( query_filter: str | None = None, order_by: str | None = None, max_results: int | None = None, + page_size: int | None = DEFAULT_PAGE_SIZE, ) -> list[TestStep]: """List all test steps with optional filtering. @@ -290,6 +293,7 @@ async def list_all_test_steps( kwargs={"query_filter": query_filter}, order_by=order_by, max_results=max_results, + page_size=page_size, ) async def update_test_step(self, update: TestStepUpdate) -> TestStep: @@ -367,7 +371,7 @@ async def create_test_measurements( async def list_test_measurements( self, *, - page_size: int | None = None, + page_size: int | None = DEFAULT_PAGE_SIZE, page_token: str | None = None, query_filter: str | None = None, order_by: str | None = None, @@ -408,6 +412,7 @@ async def list_all_test_measurements( query_filter: str | None = None, order_by: str | None = None, max_results: int | None = None, + page_size: int | None = DEFAULT_PAGE_SIZE, ) -> list[TestMeasurement]: """List all test measurements with optional filtering. @@ -424,6 +429,7 @@ async def list_all_test_measurements( kwargs={"query_filter": query_filter}, order_by=order_by, max_results=max_results, + page_size=page_size, ) async def update_test_measurement(self, update: TestMeasurementUpdate) -> TestMeasurement: diff --git a/python/lib/sift_client/_tests/_internal/low_level_wrappers/test_ingestion.py b/python/lib/sift_client/_tests/_internal/low_level_wrappers/test_ingestion.py new file mode 100644 index 000000000..cb6b13d1d --- /dev/null +++ b/python/lib/sift_client/_tests/_internal/low_level_wrappers/test_ingestion.py @@ -0,0 +1,56 @@ +"""Pytest tests for the Ingestion low-level wrapper. + +These tests validate the functionality of the IngestionLowLevelClient including: +- Getting ingestion config flows +- Getting ingestion config ID from client key +- Flow name validation +""" + +import pytest + +from sift_client._internal.low_level_wrappers.ingestion import IngestionLowLevelClient +from sift_client.sift_types.ingestion import FlowConfig + +pytestmark = pytest.mark.integration + + +@pytest.fixture +def ingestion_low_level_client(sift_client): + """Get the ingestion low-level client instance.""" + return IngestionLowLevelClient(grpc_client=sift_client.grpc_client) + + +@pytest.mark.asyncio +async def test_get_ingestion_config_flows(ingestion_low_level_client, sift_client): + """Test that get_ingestion_config_flows returns correct flows. + + This test: + 1. Uses get_ingestion_config_id_from_client_key to get an ingestion config ID + 2. Gets the config flows using get_ingestion_config_flows + 3. Validates the structure and checks flow names for correctness + """ + # First, we need to find an ingestion config to test with + # We'll list ingestion configs and use the first one's client_key + ingestion_configs = await ingestion_low_level_client.list_ingestion_configs("") + + if not ingestion_configs: + pytest.skip("No ingestion configs available for testing") + + # Use the first ingestion config's client_key + ingestion_config_id = ingestion_configs[0].id_ + assert ingestion_config_id is not None + + # Get flows + flows = await ingestion_low_level_client.get_ingestion_config_flows(ingestion_config_id) + + # Verify structure + assert isinstance(flows, list) + assert len(flows) > 0 + + # Verify all items are FlowConfig instances + for flow in flows: + assert isinstance(flow, FlowConfig) + assert flow.name is not None + assert isinstance(flow.name, str) + assert len(flow.name) > 0 + assert len(flow.channels) > 0