From c8a5493c0a63307ec62d8a2b387de859a97ecac1 Mon Sep 17 00:00:00 2001 From: Ian Later Date: Mon, 3 Nov 2025 17:23:15 -0800 Subject: [PATCH 1/4] feat(python): Add create_adhoc_run and utility function for stopping run from Run object. --- .../_internal/low_level_wrappers/runs.py | 52 ++++++++++ .../sift_client/_tests/resources/test_runs.py | 95 +++++++++++++++++-- .../sift_client/_tests/sift_types/test_run.py | 13 +++ python/lib/sift_client/resources/_base.py | 1 - python/lib/sift_client/resources/runs.py | 73 +++++++++++++- .../resources/sync_stubs/__init__.pyi | 41 +++++++- python/lib/sift_client/sift_types/run.py | 7 ++ 7 files changed, 266 insertions(+), 16 deletions(-) diff --git a/python/lib/sift_client/_internal/low_level_wrappers/runs.py b/python/lib/sift_client/_internal/low_level_wrappers/runs.py index 38c020454..b9c6efb09 100644 --- a/python/lib/sift_client/_internal/low_level_wrappers/runs.py +++ b/python/lib/sift_client/_internal/low_level_wrappers/runs.py @@ -4,6 +4,8 @@ from typing import TYPE_CHECKING, Any, cast from sift.runs.v2.runs_pb2 import ( + CreateAdhocRunRequest, + CreateAdhocRunResponse, CreateAutomaticRunAssociationForAssetsRequest, CreateRunResponse, GetRunRequest, @@ -17,10 +19,13 @@ from sift.runs.v2.runs_pb2_grpc import RunServiceStub from sift_client._internal.low_level_wrappers.base import LowLevelClientBase +from sift_client._internal.util.timestamp import to_pb_timestamp from sift_client.sift_types.run import Run, RunCreate, RunUpdate from sift_client.transport import WithGrpcClient if TYPE_CHECKING: + from datetime import datetime + from sift_client.transport.grpc_transport import GrpcClient # Configure logging @@ -169,3 +174,50 @@ async def create_automatic_run_association_for_assets( await self._grpc_client.get_stub(RunServiceStub).CreateAutomaticRunAssociationForAssets( request ) + + async def create_adhoc_run( + self, + *, + name: str, + description: str | None = None, + asset_ids: list[str], + start_time: datetime | None = None, + stop_time: datetime | None = None, + tag_names: list[str] | None = None, + metadata: dict[str, str | float | bool] | None = None, + client_key: str | None = None, + ) -> Run: + """Create an adhoc run. + + Args: + name: The name of the run. + description: Optional description of the run. + asset_ids: List of asset IDs to associate with the run. + start_time: Optional start time of the run. + stop_time: Optional stop time of the run. + tag_names: Optional list of tag names to associate with the run. + metadata: Optional metadata to associate with the run. + client_key: Optional client key for the run. + + Returns: + The created Run. + + Raises: + ValueError: If name is not provided or if start_time/stop_time are invalid. + """ + from sift_client.util.metadata import metadata_dict_to_proto + + request = CreateAdhocRunRequest( + name=name, + description=description or "", + start_time=to_pb_timestamp(start_time) if start_time else None, + stop_time=to_pb_timestamp(stop_time) if stop_time else None, + asset_ids=asset_ids, + tags=tag_names, + metadata=metadata_dict_to_proto(metadata) if metadata else None, + client_key=client_key, + ) + + response = await self._grpc_client.get_stub(RunServiceStub).CreateAdhocRun(request) + grpc_run = cast("CreateAdhocRunResponse", response).run + return Run._from_proto(grpc_run) diff --git a/python/lib/sift_client/_tests/resources/test_runs.py b/python/lib/sift_client/_tests/resources/test_runs.py index 40cce6323..71e0ae22e 100644 --- a/python/lib/sift_client/_tests/resources/test_runs.py +++ b/python/lib/sift_client/_tests/resources/test_runs.py @@ -10,10 +10,11 @@ from datetime import datetime, timedelta, timezone import pytest +from grpc.aio import AioRpcError from sift_client import SiftClient from sift_client.resources import RunsAPI, RunsAPIAsync -from sift_client.sift_types import Run +from sift_client.sift_types import ChannelConfig, ChannelDataType, Flow, Run from sift_client.sift_types.run import RunCreate, RunUpdate pytestmark = pytest.mark.integration @@ -491,6 +492,30 @@ async def test_stop_run_with_start_time(self, runs_api_async, new_run): class TestAssetAssociation: """Tests for the async asset association methods.""" + async def ingest_data_to_asset(self, sift_client, asset_name): + """Ingest some data into an asset.""" + flow = Flow( + name="test-double-flow", + channels=[ + ChannelConfig(name="double-channel", data_type=ChannelDataType.DOUBLE), + ], + ) + + await sift_client.async_.ingestion.create_ingestion_config( + asset_name=asset_name, + flows=[flow], + ) + + start_time = datetime.now(tz=timezone.utc) + for i in range(10): + timestamp = start_time + timedelta(seconds=i) + flow.ingest( + timestamp=timestamp, + channel_values={"double-channel": float(i)}, + ) + + sift_client.async_.ingestion.wait_for_ingestion_to_complete(timeout=2) + @pytest.mark.asyncio async def test_create_automatic_association_for_assets(self, runs_api_async, sift_client): """Test associating assets with a run for automatic data ingestion.""" @@ -500,29 +525,85 @@ async def test_create_automatic_association_for_assets(self, runs_api_async, sif name=run_name, description="Test run for asset association", tags=["sift-client-pytest"], + start_time=datetime.now(timezone.utc), + stop_time=datetime.now(timezone.utc) + timedelta(seconds=11), ) created_run = await runs_api_async.create(run_create) try: # Get some assets to associate assets = await sift_client.async_.assets.list_(limit=2) - assert len(assets) >= 1 - - asset_names = [asset.name for asset in assets[:2]] + assert len(assets) >= 2 # Associate assets with the run await runs_api_async.create_automatic_association_for_assets( - run=created_run, asset_names=asset_names + run=created_run, assets=assets ) + for asset in assets: + await self.ingest_data_to_asset(sift_client, asset.name) # Verify the association by getting the run and checking asset_ids - updated_run = await runs_api_async.get(run_id=created_run.id_) + updated_run = await runs_api_async.get(run_id=created_run._id_or_error) assert updated_run.asset_ids is not None - assert len(updated_run.asset_ids) >= len(asset_names) + assert len(updated_run.asset_ids) >= len(assets) + for asset in assets: + assert asset.id_ in updated_run.asset_ids + + # Fetching these channels is flaky/slow depending on how long update monitor takes to run. + # channels = await sift_client.async_.channels.list_(run=created_run) + # assert channels is not None + # assert "double-channel" in [channel.name for channel in channels] + + finally: + await runs_api_async.archive(created_run) + @pytest.mark.asyncio + async def test_create_adhoc_run_all( + self, runs_api_async, sift_client, test_tag, ci_pytest_tag + ): + """Test creating an adhoc run with associated assets.""" + run_name = f"test_adhoc_run_assets_{datetime.now(timezone.utc).isoformat()}" + + start_time = datetime.now(timezone.utc) - timedelta(hours=2) + stop_time = datetime.now(timezone.utc) - timedelta(hours=1) + # Get some assets to associate + assets = await sift_client.async_.assets.list_(limit=2) + assert len(assets) == 2 + tags = [test_tag, ci_pytest_tag] + + created_run = await runs_api_async.create_adhoc_run( + name=run_name, + assets=assets, + description="Test adhoc run", + start_time=start_time, + stop_time=stop_time, + tags=tags, + metadata={"test_key": "test_value", "number": 42.5, "flag": True}, + ) + + try: + assert created_run.name == run_name + assert created_run.is_adhoc is True + assert created_run.asset_ids is not None + assert len(created_run.asset_ids) >= len(assets) + # Verify all requested assets are in the run's asset_ids + for asset in assets: + assert asset.id_ in created_run.asset_ids + assert created_run.metadata is not None + assert created_run.metadata["test_key"] == "test_value" + assert created_run.metadata["number"] == 42.5 + assert created_run.metadata["flag"] is True + assert set(created_run.tags) == {tag.name for tag in tags} finally: await runs_api_async.archive(created_run) + @pytest.mark.asyncio + async def test_create_adhoc_run_missing_assets(self, runs_api_async): + """Test creating an adhoc run with missing assets.""" + run_name = f"test_adhoc_run_missing_assets_{datetime.now(timezone.utc).isoformat()}" + with pytest.raises(AioRpcError, match="asset_ids: value must contain at least 1 item"): + await runs_api_async.create_adhoc_run(name=run_name, assets=[]) + class TestRunsAPISync: """Test suite for the synchronous Runs API functionality. diff --git a/python/lib/sift_client/_tests/sift_types/test_run.py b/python/lib/sift_client/_tests/sift_types/test_run.py index 5f9e6b1a1..a9433f519 100644 --- a/python/lib/sift_client/_tests/sift_types/test_run.py +++ b/python/lib/sift_client/_tests/sift_types/test_run.py @@ -205,3 +205,16 @@ def test_update_calls_client_and_updates_self(self, mock_run, mock_client): mock_update.assert_called_once_with(updated_run) # Verify it returns self assert result is mock_run + + def test_run_stop(self, mock_run, mock_client): + """Test that stop() calls client.runs.stop and updates self.""" + stopped_run = MagicMock() + mock_client.runs.get.return_value = stopped_run + + # Mock the _update method to verify it's called + with MagicMock() as mock_update: + mock_run._update = mock_update + result = mock_run.stop() + mock_client.runs.stop.assert_called_once_with(run=mock_run) + mock_update.assert_called_once_with(stopped_run) + assert result is mock_run diff --git a/python/lib/sift_client/resources/_base.py b/python/lib/sift_client/resources/_base.py index 676dedbca..217e57a72 100644 --- a/python/lib/sift_client/resources/_base.py +++ b/python/lib/sift_client/resources/_base.py @@ -62,7 +62,6 @@ def _build_name_cel_filters( filter_parts.append(cel.contains("name", name_contains)) if name_regex: filter_parts.append(cel.match("name", name_regex)) - print(filter_parts) return filter_parts def _build_time_cel_filters( diff --git a/python/lib/sift_client/resources/runs.py b/python/lib/sift_client/resources/runs.py index 844139a7b..524fd282a 100644 --- a/python/lib/sift_client/resources/runs.py +++ b/python/lib/sift_client/resources/runs.py @@ -4,6 +4,7 @@ from sift_client._internal.low_level_wrappers.runs import RunsLowLevelClient from sift_client.resources._base import ResourceBase +from sift_client.sift_types.asset import Asset from sift_client.sift_types.run import Run, RunCreate, RunUpdate from sift_client.sift_types.tag import Tag from sift_client.util import cel_utils as cel @@ -13,7 +14,6 @@ from datetime import datetime, timedelta from sift_client.client import SiftClient - from sift_client.sift_types.asset import Asset class RunsAPIAsync(ResourceBase): @@ -277,17 +277,80 @@ async def stop( async def create_automatic_association_for_assets( self, - run: str | Run, *, - asset_names: list[str], + run: str | Run, + assets: list[str | Asset], ) -> None: - """Associate assets with a run for automatic data ingestion. + """Associate asset data with a given Run before ingesting it. Any data for the given assets that falls within the given Run's time period will be associated with the Run. For associating data after ingestion, use the create_adhoc_run method. Args: run: The Run or run ID. - asset_names: List of asset names to associate. + assets: List of assets or asset names to associate. """ + asset_names = [] + for asset in assets: + if isinstance(asset, Asset): + asset_names.append(asset.name) + else: + asset_names.append(asset) run_id = run._id_or_error or "" if isinstance(run, Run) else run await self._low_level_client.create_automatic_run_association_for_assets( run_id=run_id, asset_names=asset_names ) + + async def create_adhoc_run( + self, + *, + name: str, + description: str | None = None, + assets: list[str | Asset], + start_time: datetime | None = None, + stop_time: datetime | None = None, + tags: list[str | Tag] | None = None, + metadata: dict[str, str | float | bool] | None = None, + client_key: str | None = None, + ) -> Run: + """Create an ad-hoc run. + + These runs act like views onto data in given assets potentially over a specific time period. This can be created after the data has been ingested. + + Args: + name: The name of the run. + description: Optional description of the run. + assets: List of assets to associate with the run. + start_time: Optional start time of the run. + stop_time: Optional stop time of the run. + asset_ids: Optional list of asset IDs to associate with the run. + tags: Optional list of tags or tag names to associate with the run. + metadata: Optional metadata dictionary to associate with the run. + client_key: Optional client key for the run. + + Returns: + The created Run. + + Raises: + ValueError: If name is not provided or if start_time/stop_time are invalid. + """ + asset_ids = [] + for asset in assets: + if isinstance(asset, Asset): + asset_ids.append(asset._id_or_error) + else: + asset_ids.append(asset) + tag_names = [] + for tag in tags or []: + if isinstance(tag, Tag): + tag_names.append(tag.name) + else: + tag_names.append(tag) + created_run = await self._low_level_client.create_adhoc_run( + name=name, + description=description, + asset_ids=asset_ids, + start_time=start_time, + stop_time=stop_time, + tag_names=tag_names, + metadata=metadata, + client_key=client_key, + ) + return self._apply_client_to_instance(created_run) diff --git a/python/lib/sift_client/resources/sync_stubs/__init__.pyi b/python/lib/sift_client/resources/sync_stubs/__init__.pyi index cf0ee1247..c9ce106ec 100644 --- a/python/lib/sift_client/resources/sync_stubs/__init__.pyi +++ b/python/lib/sift_client/resources/sync_stubs/__init__.pyi @@ -905,14 +905,49 @@ class RunsAPI: """ ... + def create_adhoc_run( + self, + *, + name: str, + description: str | None = None, + assets: list[str | Asset], + start_time: datetime | None = None, + stop_time: datetime | None = None, + tags: list[str | Tag] | None = None, + metadata: dict[str, str | float | bool] | None = None, + client_key: str | None = None, + ) -> Run: + """Create an ad-hoc run. + + These runs act like views onto data in given assets potentially over a specific time period. This can be created after the data has been ingested. + + Args: + name: The name of the run. + description: Optional description of the run. + assets: List of assets to associate with the run. + start_time: Optional start time of the run. + stop_time: Optional stop time of the run. + asset_ids: Optional list of asset IDs to associate with the run. + tags: Optional list of tags or tag names to associate with the run. + metadata: Optional metadata dictionary to associate with the run. + client_key: Optional client key for the run. + + Returns: + The created Run. + + Raises: + ValueError: If name is not provided or if start_time/stop_time are invalid. + """ + ... + def create_automatic_association_for_assets( - self, run: str | Run, *, asset_names: list[str] + self, *, run: str | Run, assets: list[str | Asset] ) -> None: - """Associate assets with a run for automatic data ingestion. + """Associate asset data with a given Run before ingesting it. Any data for the given assets that falls within the given Run's time period will be associated with the Run. For associating data after ingestion, use the create_adhoc_run method. Args: run: The Run or run ID. - asset_names: List of asset names to associate. + assets: List of assets or asset names to associate. """ ... diff --git a/python/lib/sift_client/sift_types/run.py b/python/lib/sift_client/sift_types/run.py index 2e7816a5b..a8242dc40 100644 --- a/python/lib/sift_client/sift_types/run.py +++ b/python/lib/sift_client/sift_types/run.py @@ -117,6 +117,13 @@ def update(self, update: RunUpdate | dict) -> Run: self._update(updated_run) return self + def stop(self) -> Run: + """Stop the run.""" + self.client.runs.stop(run=self) + updated_run = self.client.runs.get(run_id=self.id_) + self._update(updated_run) + return self + class RunBase(ModelCreateUpdateBase): """Base class for Run create and update models with shared fields and validation.""" From c9055a654a185ca0626b07555327c54cfc3189db Mon Sep 17 00:00:00 2001 From: Ian Later Date: Wed, 5 Nov 2025 14:41:28 -0800 Subject: [PATCH 2/4] Automatically handle adhoc run creation --- .../sift_client/_tests/resources/test_runs.py | 17 ++- python/lib/sift_client/resources/channels.py | 13 +- python/lib/sift_client/resources/runs.py | 130 +++++++----------- .../resources/sync_stubs/__init__.pyi | 66 +++------ 4 files changed, 87 insertions(+), 139 deletions(-) diff --git a/python/lib/sift_client/_tests/resources/test_runs.py b/python/lib/sift_client/_tests/resources/test_runs.py index 71e0ae22e..0d92be923 100644 --- a/python/lib/sift_client/_tests/resources/test_runs.py +++ b/python/lib/sift_client/_tests/resources/test_runs.py @@ -528,7 +528,7 @@ async def test_create_automatic_association_for_assets(self, runs_api_async, sif start_time=datetime.now(timezone.utc), stop_time=datetime.now(timezone.utc) + timedelta(seconds=11), ) - created_run = await runs_api_async.create(run_create) + created_run = None try: # Get some assets to associate @@ -536,8 +536,8 @@ async def test_create_automatic_association_for_assets(self, runs_api_async, sif assert len(assets) >= 2 # Associate assets with the run - await runs_api_async.create_automatic_association_for_assets( - run=created_run, assets=assets + created_run = await runs_api_async.create( + run_create, asset_names=[asset.name for asset in assets] ) for asset in assets: @@ -571,15 +571,17 @@ async def test_create_adhoc_run_all( assert len(assets) == 2 tags = [test_tag, ci_pytest_tag] - created_run = await runs_api_async.create_adhoc_run( + run_create = RunCreate( name=run_name, - assets=assets, description="Test adhoc run", start_time=start_time, stop_time=stop_time, tags=tags, metadata={"test_key": "test_value", "number": 42.5, "flag": True}, ) + created_run = await runs_api_async.create( + run_create, asset_ids=[asset._id_or_error for asset in assets], data_exists=True + ) try: assert created_run.name == run_name @@ -601,8 +603,11 @@ async def test_create_adhoc_run_all( async def test_create_adhoc_run_missing_assets(self, runs_api_async): """Test creating an adhoc run with missing assets.""" run_name = f"test_adhoc_run_missing_assets_{datetime.now(timezone.utc).isoformat()}" + run_create = RunCreate( + name=run_name, + ) with pytest.raises(AioRpcError, match="asset_ids: value must contain at least 1 item"): - await runs_api_async.create_adhoc_run(name=run_name, assets=[]) + await runs_api_async.create(run_create, asset_ids=[], data_exists=True) class TestRunsAPISync: diff --git a/python/lib/sift_client/resources/channels.py b/python/lib/sift_client/resources/channels.py index c65ade03c..4659eea28 100644 --- a/python/lib/sift_client/resources/channels.py +++ b/python/lib/sift_client/resources/channels.py @@ -73,6 +73,7 @@ async def list_( modified_before: datetime | None = None, # channel specific asset: Asset | str | None = None, + assets: list[str | Asset] | None = None, run: Run | str | None = None, # common filters description_contains: str | None = None, @@ -89,11 +90,12 @@ async def list_( name_contains: Partial name of the channel. name_regex: Regular expression to filter channels by name. channel_ids: Filter to channels with any of these IDs. - created_after: Filter channels created after this datetime. - created_before: Filter channels created before this datetime. + created_after: Filter channels created after this datetime. Note: This is related to the channel creation time, not the timestamp of the underlying data. + created_before: Filter channels created before this datetime. Note: This is related to the channel creation time, not the timestamp of the underlying data. modified_after: Filter channels modified after this datetime. modified_before: Filter channels modified before this datetime. asset: Filter channels associated with this Asset or asset ID. + assets: Filter channels associated with these Assets or asset IDs. run: Filter channels associated with this Run or run ID. description_contains: Partial description of the channel. include_archived: If True, include archived channels in results. @@ -123,8 +125,13 @@ async def list_( if channel_ids: filter_parts.append(cel.in_("channel_id", channel_ids)) if asset is not None: - asset_id = asset.id_ if isinstance(asset, Asset) else asset + asset_id = asset._id_or_error if isinstance(asset, Asset) else asset filter_parts.append(cel.equals("asset_id", asset_id)) + if assets: + asset_ids = [ + asset._id_or_error if isinstance(asset, Asset) else asset for asset in assets + ] + filter_parts.append(cel.in_("asset_id", asset_ids)) if run is not None: run_id = run.id_ if isinstance(run, Run) else run filter_parts.append(cel.equals("run_id", run_id)) diff --git a/python/lib/sift_client/resources/runs.py b/python/lib/sift_client/resources/runs.py index 524fd282a..03cc151e9 100644 --- a/python/lib/sift_client/resources/runs.py +++ b/python/lib/sift_client/resources/runs.py @@ -4,7 +4,6 @@ from sift_client._internal.low_level_wrappers.runs import RunsLowLevelClient from sift_client.resources._base import ResourceBase -from sift_client.sift_types.asset import Asset from sift_client.sift_types.run import Run, RunCreate, RunUpdate from sift_client.sift_types.tag import Tag from sift_client.util import cel_utils as cel @@ -14,6 +13,7 @@ from datetime import datetime, timedelta from sift_client.client import SiftClient + from sift_client.sift_types.asset import Asset class RunsAPIAsync(ResourceBase): @@ -208,11 +208,22 @@ async def find(self, **kwargs) -> Run | None: async def create( self, create: RunCreate | dict, + asset_names: list[str] | None = None, + asset_ids: list[str] | None = None, + data_exists: bool = False, ) -> Run: """Create a new run. + Note on assets: You do not need to provide asset info when creating a run. + If you pass a Run to future ingestion configs associated with assets, the association will happen automatically then. + However, if you do pass assets, _future_ ingested data that falls within the Run's time period will be associated with the Run. Even if that data's timestamp is in the past, if it has not been ingested yet and the Run's timestamp envelopes it, it will be associated with the Run. This may be useful if you want to capture a time range for a specific asset or won't know about this Run when ingesting to that asset. + If the data has already been ingested, it will not be associated with the Run unless you pass data_exists=True. + Args: create: The Run definition to create. + asset_names: List of asset names to associate with the run. + asset_ids: List of asset IDs to associate with the run. + data_exists: If True, the run will be created as an ad-hoc run on the given assets. Returns: The created Run. @@ -220,8 +231,43 @@ async def create( if isinstance(create, dict): create = RunCreate.model_validate(create) + if asset_names and asset_ids: + raise ValueError("Either asset_names or asset_ids must be provided, not both") + + create_association = False + if asset_names or asset_ids or data_exists: + if data_exists: + if asset_names: + assets = self.client.assets.list_(names=asset_names) + asset_ids = [asset._id_or_error for asset in assets] + tag_names = ( + [tag.name if isinstance(tag, Tag) else tag for tag in create.tags] + if create.tags + else None + ) + created_run = await self._low_level_client.create_adhoc_run( + name=create.name, + description=create.description, + asset_ids=asset_ids or [], + start_time=create.start_time, + stop_time=create.stop_time, + tag_names=tag_names, + metadata=create.metadata, + client_key=create.client_key, + ) + return self._apply_client_to_instance(created_run) + else: + if asset_ids: + assets = self.client.assets.list_(asset_ids=asset_ids) + asset_names = [asset.name for asset in assets] + create_association = True created_run = await self._low_level_client.create_run(create=create) - return self._apply_client_to_instance(created_run) + created_run = self._apply_client_to_instance(created_run) + if create_association: + await self._low_level_client.create_automatic_run_association_for_assets( + run_id=created_run._id_or_error, asset_names=asset_names or [] + ) + return created_run async def update(self, run: str | Run, update: RunUpdate | dict) -> Run: """Update a Run. @@ -274,83 +320,3 @@ async def stop( run_id = run._id_or_error if isinstance(run, Run) else run await self._low_level_client.stop_run(run_id=run_id or "") return await self.get(run_id=run_id) - - async def create_automatic_association_for_assets( - self, - *, - run: str | Run, - assets: list[str | Asset], - ) -> None: - """Associate asset data with a given Run before ingesting it. Any data for the given assets that falls within the given Run's time period will be associated with the Run. For associating data after ingestion, use the create_adhoc_run method. - - Args: - run: The Run or run ID. - assets: List of assets or asset names to associate. - """ - asset_names = [] - for asset in assets: - if isinstance(asset, Asset): - asset_names.append(asset.name) - else: - asset_names.append(asset) - run_id = run._id_or_error or "" if isinstance(run, Run) else run - await self._low_level_client.create_automatic_run_association_for_assets( - run_id=run_id, asset_names=asset_names - ) - - async def create_adhoc_run( - self, - *, - name: str, - description: str | None = None, - assets: list[str | Asset], - start_time: datetime | None = None, - stop_time: datetime | None = None, - tags: list[str | Tag] | None = None, - metadata: dict[str, str | float | bool] | None = None, - client_key: str | None = None, - ) -> Run: - """Create an ad-hoc run. - - These runs act like views onto data in given assets potentially over a specific time period. This can be created after the data has been ingested. - - Args: - name: The name of the run. - description: Optional description of the run. - assets: List of assets to associate with the run. - start_time: Optional start time of the run. - stop_time: Optional stop time of the run. - asset_ids: Optional list of asset IDs to associate with the run. - tags: Optional list of tags or tag names to associate with the run. - metadata: Optional metadata dictionary to associate with the run. - client_key: Optional client key for the run. - - Returns: - The created Run. - - Raises: - ValueError: If name is not provided or if start_time/stop_time are invalid. - """ - asset_ids = [] - for asset in assets: - if isinstance(asset, Asset): - asset_ids.append(asset._id_or_error) - else: - asset_ids.append(asset) - tag_names = [] - for tag in tags or []: - if isinstance(tag, Tag): - tag_names.append(tag.name) - else: - tag_names.append(tag) - created_run = await self._low_level_client.create_adhoc_run( - name=name, - description=description, - asset_ids=asset_ids, - start_time=start_time, - stop_time=stop_time, - tag_names=tag_names, - metadata=metadata, - client_key=client_key, - ) - return self._apply_client_to_instance(created_run) diff --git a/python/lib/sift_client/resources/sync_stubs/__init__.pyi b/python/lib/sift_client/resources/sync_stubs/__init__.pyi index c9ce106ec..2a81859f5 100644 --- a/python/lib/sift_client/resources/sync_stubs/__init__.pyi +++ b/python/lib/sift_client/resources/sync_stubs/__init__.pyi @@ -469,6 +469,7 @@ class ChannelsAPI: modified_after: datetime | None = None, modified_before: datetime | None = None, asset: Asset | str | None = None, + assets: list[str | Asset] | None = None, run: Run | str | None = None, description_contains: str | None = None, include_archived: bool | None = None, @@ -484,11 +485,12 @@ class ChannelsAPI: name_contains: Partial name of the channel. name_regex: Regular expression to filter channels by name. channel_ids: Filter to channels with any of these IDs. - created_after: Filter channels created after this datetime. - created_before: Filter channels created before this datetime. + created_after: Filter channels created after this datetime. Note: This is related to the channel creation time, not the timestamp of the underlying data. + created_before: Filter channels created before this datetime. Note: This is related to the channel creation time, not the timestamp of the underlying data. modified_after: Filter channels modified after this datetime. modified_before: Filter channels modified before this datetime. asset: Filter channels associated with this Asset or asset ID. + assets: Filter channels associated with these Assets or asset IDs. run: Filter channels associated with this Run or run ID. description_contains: Partial description of the channel. include_archived: If True, include archived channels in results. @@ -894,60 +896,28 @@ class RunsAPI: """ ... - def create(self, create: RunCreate | dict) -> Run: - """Create a new run. - - Args: - create: The Run definition to create. - - Returns: - The created Run. - """ - ... - - def create_adhoc_run( + def create( self, - *, - name: str, - description: str | None = None, - assets: list[str | Asset], - start_time: datetime | None = None, - stop_time: datetime | None = None, - tags: list[str | Tag] | None = None, - metadata: dict[str, str | float | bool] | None = None, - client_key: str | None = None, + create: RunCreate | dict, + asset_names: list[str] | None = None, + asset_ids: list[str] | None = None, + data_exists: bool = False, ) -> Run: - """Create an ad-hoc run. + """Create a new run. - These runs act like views onto data in given assets potentially over a specific time period. This can be created after the data has been ingested. + Note on assets: You do not need to provide asset info when creating a run. + If you pass a Run to future ingestion configs associated with assets, the association will happen automatically then. + However, if you do pass assets, _future_ ingested data that falls within the Run's time period will be associated with the Run. Even if that data's timestamp is in the past, if it has not been ingested yet and the Run's timestamp envelopes it, it will be associated with the Run. This may be useful if you want to capture a time range for a specific asset or won't know about this Run when ingesting to that asset. + If the data has already been ingested, it will not be associated with the Run unless you pass data_exists=True. Args: - name: The name of the run. - description: Optional description of the run. - assets: List of assets to associate with the run. - start_time: Optional start time of the run. - stop_time: Optional stop time of the run. - asset_ids: Optional list of asset IDs to associate with the run. - tags: Optional list of tags or tag names to associate with the run. - metadata: Optional metadata dictionary to associate with the run. - client_key: Optional client key for the run. + create: The Run definition to create. + asset_names: List of asset names to associate with the run. + asset_ids: List of asset IDs to associate with the run. + data_exists: If True, the run will be created as an ad-hoc run on the given assets. Returns: The created Run. - - Raises: - ValueError: If name is not provided or if start_time/stop_time are invalid. - """ - ... - - def create_automatic_association_for_assets( - self, *, run: str | Run, assets: list[str | Asset] - ) -> None: - """Associate asset data with a given Run before ingesting it. Any data for the given assets that falls within the given Run's time period will be associated with the Run. For associating data after ingestion, use the create_adhoc_run method. - - Args: - run: The Run or run ID. - assets: List of assets or asset names to associate. """ ... From 0bc896066498cf1b62e0309195c345a238a4f928 Mon Sep 17 00:00:00 2001 From: Ian Later Date: Wed, 5 Nov 2025 16:28:08 -0800 Subject: [PATCH 3/4] Rework arguments --- .../sift_client/_tests/resources/test_runs.py | 15 +++- python/lib/sift_client/resources/runs.py | 75 +++++++++---------- 2 files changed, 46 insertions(+), 44 deletions(-) diff --git a/python/lib/sift_client/_tests/resources/test_runs.py b/python/lib/sift_client/_tests/resources/test_runs.py index 0d92be923..133a675b1 100644 --- a/python/lib/sift_client/_tests/resources/test_runs.py +++ b/python/lib/sift_client/_tests/resources/test_runs.py @@ -537,7 +537,7 @@ async def test_create_automatic_association_for_assets(self, runs_api_async, sif # Associate assets with the run created_run = await runs_api_async.create( - run_create, asset_names=[asset.name for asset in assets] + run_create, assets=assets, associate_new_data=True ) for asset in assets: @@ -580,7 +580,7 @@ async def test_create_adhoc_run_all( metadata={"test_key": "test_value", "number": 42.5, "flag": True}, ) created_run = await runs_api_async.create( - run_create, asset_ids=[asset._id_or_error for asset in assets], data_exists=True + run_create, assets=assets, associate_new_data=False ) try: @@ -605,9 +605,16 @@ async def test_create_adhoc_run_missing_assets(self, runs_api_async): run_name = f"test_adhoc_run_missing_assets_{datetime.now(timezone.utc).isoformat()}" run_create = RunCreate( name=run_name, + start_time=datetime.now(timezone.utc), + stop_time=datetime.now(timezone.utc) + timedelta(seconds=11), ) - with pytest.raises(AioRpcError, match="asset_ids: value must contain at least 1 item"): - await runs_api_async.create(run_create, asset_ids=[], data_exists=True) + with pytest.raises( + AioRpcError, + match='invalid argument: invalid input syntax for type uuid: "asset-name-not-id"', + ): + await runs_api_async.create( + run_create, assets=["asset-name-not-id"], associate_new_data=False + ) class TestRunsAPISync: diff --git a/python/lib/sift_client/resources/runs.py b/python/lib/sift_client/resources/runs.py index 03cc151e9..d202e3aae 100644 --- a/python/lib/sift_client/resources/runs.py +++ b/python/lib/sift_client/resources/runs.py @@ -4,6 +4,7 @@ from sift_client._internal.low_level_wrappers.runs import RunsLowLevelClient from sift_client.resources._base import ResourceBase +from sift_client.sift_types.asset import Asset from sift_client.sift_types.run import Run, RunCreate, RunUpdate from sift_client.sift_types.tag import Tag from sift_client.util import cel_utils as cel @@ -13,7 +14,6 @@ from datetime import datetime, timedelta from sift_client.client import SiftClient - from sift_client.sift_types.asset import Asset class RunsAPIAsync(ResourceBase): @@ -208,22 +208,20 @@ async def find(self, **kwargs) -> Run | None: async def create( self, create: RunCreate | dict, - asset_names: list[str] | None = None, - asset_ids: list[str] | None = None, - data_exists: bool = False, + assets: list[str | Asset] | None = None, + associate_new_data: bool = False, ) -> Run: """Create a new run. Note on assets: You do not need to provide asset info when creating a run. If you pass a Run to future ingestion configs associated with assets, the association will happen automatically then. - However, if you do pass assets, _future_ ingested data that falls within the Run's time period will be associated with the Run. Even if that data's timestamp is in the past, if it has not been ingested yet and the Run's timestamp envelopes it, it will be associated with the Run. This may be useful if you want to capture a time range for a specific asset or won't know about this Run when ingesting to that asset. - If the data has already been ingested, it will not be associated with the Run unless you pass data_exists=True. + However, if you do pass assets and set associate_new_data=True, future ingested data that falls within the Run's time period will be associated with the Run. Even if that data's timestamp is in the past, if it has not been ingested yet and the Run's timestamp envelopes it, it will be associated with the Run. This may be useful if you want to capture a time range for a specific asset or won't know about this Run when ingesting to that asset. + If the data has already been ingested, leave associate_new_data=False. Args: create: The Run definition to create. - asset_names: List of asset names to associate with the run. - asset_ids: List of asset IDs to associate with the run. - data_exists: If True, the run will be created as an ad-hoc run on the given assets. + assets: List of assets to associate with the run. Note: if you are associating new data, you must provide assets/asset names. + associate_new_data: If True, future ingested data that falls within the Run's time period will be associated with the Run. Even if that data's timestamp is in the past, if it has not been ingested yet and the Run's timestamp envelopes it, it will be associated with the Run. Returns: The created Run. @@ -231,41 +229,38 @@ async def create( if isinstance(create, dict): create = RunCreate.model_validate(create) - if asset_names and asset_ids: - raise ValueError("Either asset_names or asset_ids must be provided, not both") - - create_association = False - if asset_names or asset_ids or data_exists: - if data_exists: - if asset_names: - assets = self.client.assets.list_(names=asset_names) - asset_ids = [asset._id_or_error for asset in assets] - tag_names = ( - [tag.name if isinstance(tag, Tag) else tag for tag in create.tags] - if create.tags - else None + asset_names: list[str] = [] + if associate_new_data: + if not assets: + raise ValueError( + "Assets/asset names must be provided when associate_new_data is True" ) - created_run = await self._low_level_client.create_adhoc_run( - name=create.name, - description=create.description, - asset_ids=asset_ids or [], - start_time=create.start_time, - stop_time=create.stop_time, - tag_names=tag_names, - metadata=create.metadata, - client_key=create.client_key, - ) - return self._apply_client_to_instance(created_run) - else: - if asset_ids: - assets = self.client.assets.list_(asset_ids=asset_ids) - asset_names = [asset.name for asset in assets] - create_association = True + asset_names = [asset.name if isinstance(asset, Asset) else asset for asset in assets] + elif assets: + asset_ids = [ + asset._id_or_error if isinstance(asset, Asset) else asset for asset in assets + ] + tag_names = ( + [tag.name if isinstance(tag, Tag) else tag for tag in create.tags] + if create.tags + else None + ) + created_run = await self._low_level_client.create_adhoc_run( + name=create.name, + description=create.description, + asset_ids=asset_ids or [], + start_time=create.start_time, + stop_time=create.stop_time, + tag_names=tag_names, + metadata=create.metadata, + client_key=create.client_key, + ) + return self._apply_client_to_instance(created_run) created_run = await self._low_level_client.create_run(create=create) created_run = self._apply_client_to_instance(created_run) - if create_association: + if associate_new_data: await self._low_level_client.create_automatic_run_association_for_assets( - run_id=created_run._id_or_error, asset_names=asset_names or [] + run_id=created_run._id_or_error, asset_names=asset_names ) return created_run From 7a727de385117105a4a99b9c58122bd72899fca4 Mon Sep 17 00:00:00 2001 From: Ian Later Date: Wed, 5 Nov 2025 17:10:57 -0800 Subject: [PATCH 4/4] regen stubs --- .../sift_client/resources/sync_stubs/__init__.pyi | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/python/lib/sift_client/resources/sync_stubs/__init__.pyi b/python/lib/sift_client/resources/sync_stubs/__init__.pyi index 2a81859f5..d2f8a99e6 100644 --- a/python/lib/sift_client/resources/sync_stubs/__init__.pyi +++ b/python/lib/sift_client/resources/sync_stubs/__init__.pyi @@ -899,22 +899,20 @@ class RunsAPI: def create( self, create: RunCreate | dict, - asset_names: list[str] | None = None, - asset_ids: list[str] | None = None, - data_exists: bool = False, + assets: list[str | Asset] | None = None, + associate_new_data: bool = False, ) -> Run: """Create a new run. Note on assets: You do not need to provide asset info when creating a run. If you pass a Run to future ingestion configs associated with assets, the association will happen automatically then. - However, if you do pass assets, _future_ ingested data that falls within the Run's time period will be associated with the Run. Even if that data's timestamp is in the past, if it has not been ingested yet and the Run's timestamp envelopes it, it will be associated with the Run. This may be useful if you want to capture a time range for a specific asset or won't know about this Run when ingesting to that asset. - If the data has already been ingested, it will not be associated with the Run unless you pass data_exists=True. + However, if you do pass assets and set associate_new_data=True, future ingested data that falls within the Run's time period will be associated with the Run. Even if that data's timestamp is in the past, if it has not been ingested yet and the Run's timestamp envelopes it, it will be associated with the Run. This may be useful if you want to capture a time range for a specific asset or won't know about this Run when ingesting to that asset. + If the data has already been ingested, leave associate_new_data=False. Args: create: The Run definition to create. - asset_names: List of asset names to associate with the run. - asset_ids: List of asset IDs to associate with the run. - data_exists: If True, the run will be created as an ad-hoc run on the given assets. + assets: List of assets to associate with the run. Note: if you are associating new data, you must provide assets/asset names. + associate_new_data: If True, future ingested data that falls within the Run's time period will be associated with the Run. Even if that data's timestamp is in the past, if it has not been ingested yet and the Run's timestamp envelopes it, it will be associated with the Run. Returns: The created Run.