diff --git a/python/lib/sift_client/_internal/low_level_wrappers/runs.py b/python/lib/sift_client/_internal/low_level_wrappers/runs.py index 38c020454..b9c6efb09 100644 --- a/python/lib/sift_client/_internal/low_level_wrappers/runs.py +++ b/python/lib/sift_client/_internal/low_level_wrappers/runs.py @@ -4,6 +4,8 @@ from typing import TYPE_CHECKING, Any, cast from sift.runs.v2.runs_pb2 import ( + CreateAdhocRunRequest, + CreateAdhocRunResponse, CreateAutomaticRunAssociationForAssetsRequest, CreateRunResponse, GetRunRequest, @@ -17,10 +19,13 @@ from sift.runs.v2.runs_pb2_grpc import RunServiceStub from sift_client._internal.low_level_wrappers.base import LowLevelClientBase +from sift_client._internal.util.timestamp import to_pb_timestamp from sift_client.sift_types.run import Run, RunCreate, RunUpdate from sift_client.transport import WithGrpcClient if TYPE_CHECKING: + from datetime import datetime + from sift_client.transport.grpc_transport import GrpcClient # Configure logging @@ -169,3 +174,50 @@ async def create_automatic_run_association_for_assets( await self._grpc_client.get_stub(RunServiceStub).CreateAutomaticRunAssociationForAssets( request ) + + async def create_adhoc_run( + self, + *, + name: str, + description: str | None = None, + asset_ids: list[str], + start_time: datetime | None = None, + stop_time: datetime | None = None, + tag_names: list[str] | None = None, + metadata: dict[str, str | float | bool] | None = None, + client_key: str | None = None, + ) -> Run: + """Create an adhoc run. + + Args: + name: The name of the run. + description: Optional description of the run. + asset_ids: List of asset IDs to associate with the run. + start_time: Optional start time of the run. + stop_time: Optional stop time of the run. + tag_names: Optional list of tag names to associate with the run. + metadata: Optional metadata to associate with the run. + client_key: Optional client key for the run. + + Returns: + The created Run. + + Raises: + ValueError: If name is not provided or if start_time/stop_time are invalid. + """ + from sift_client.util.metadata import metadata_dict_to_proto + + request = CreateAdhocRunRequest( + name=name, + description=description or "", + start_time=to_pb_timestamp(start_time) if start_time else None, + stop_time=to_pb_timestamp(stop_time) if stop_time else None, + asset_ids=asset_ids, + tags=tag_names, + metadata=metadata_dict_to_proto(metadata) if metadata else None, + client_key=client_key, + ) + + response = await self._grpc_client.get_stub(RunServiceStub).CreateAdhocRun(request) + grpc_run = cast("CreateAdhocRunResponse", response).run + return Run._from_proto(grpc_run) diff --git a/python/lib/sift_client/_tests/resources/test_runs.py b/python/lib/sift_client/_tests/resources/test_runs.py index 40cce6323..133a675b1 100644 --- a/python/lib/sift_client/_tests/resources/test_runs.py +++ b/python/lib/sift_client/_tests/resources/test_runs.py @@ -10,10 +10,11 @@ from datetime import datetime, timedelta, timezone import pytest +from grpc.aio import AioRpcError from sift_client import SiftClient from sift_client.resources import RunsAPI, RunsAPIAsync -from sift_client.sift_types import Run +from sift_client.sift_types import ChannelConfig, ChannelDataType, Flow, Run from sift_client.sift_types.run import RunCreate, RunUpdate pytestmark = pytest.mark.integration @@ -491,6 +492,30 @@ async def test_stop_run_with_start_time(self, runs_api_async, new_run): class TestAssetAssociation: """Tests for the async asset association methods.""" + async def ingest_data_to_asset(self, sift_client, asset_name): + """Ingest some data into an asset.""" + flow = Flow( + name="test-double-flow", + channels=[ + ChannelConfig(name="double-channel", data_type=ChannelDataType.DOUBLE), + ], + ) + + await sift_client.async_.ingestion.create_ingestion_config( + asset_name=asset_name, + flows=[flow], + ) + + start_time = datetime.now(tz=timezone.utc) + for i in range(10): + timestamp = start_time + timedelta(seconds=i) + flow.ingest( + timestamp=timestamp, + channel_values={"double-channel": float(i)}, + ) + + sift_client.async_.ingestion.wait_for_ingestion_to_complete(timeout=2) + @pytest.mark.asyncio async def test_create_automatic_association_for_assets(self, runs_api_async, sift_client): """Test associating assets with a run for automatic data ingestion.""" @@ -500,29 +525,97 @@ async def test_create_automatic_association_for_assets(self, runs_api_async, sif name=run_name, description="Test run for asset association", tags=["sift-client-pytest"], + start_time=datetime.now(timezone.utc), + stop_time=datetime.now(timezone.utc) + timedelta(seconds=11), ) - created_run = await runs_api_async.create(run_create) + created_run = None try: # Get some assets to associate assets = await sift_client.async_.assets.list_(limit=2) - assert len(assets) >= 1 - - asset_names = [asset.name for asset in assets[:2]] + assert len(assets) >= 2 # Associate assets with the run - await runs_api_async.create_automatic_association_for_assets( - run=created_run, asset_names=asset_names + created_run = await runs_api_async.create( + run_create, assets=assets, associate_new_data=True ) + for asset in assets: + await self.ingest_data_to_asset(sift_client, asset.name) # Verify the association by getting the run and checking asset_ids - updated_run = await runs_api_async.get(run_id=created_run.id_) + updated_run = await runs_api_async.get(run_id=created_run._id_or_error) assert updated_run.asset_ids is not None - assert len(updated_run.asset_ids) >= len(asset_names) + assert len(updated_run.asset_ids) >= len(assets) + for asset in assets: + assert asset.id_ in updated_run.asset_ids + + # Fetching these channels is flaky/slow depending on how long update monitor takes to run. + # channels = await sift_client.async_.channels.list_(run=created_run) + # assert channels is not None + # assert "double-channel" in [channel.name for channel in channels] + + finally: + await runs_api_async.archive(created_run) + + @pytest.mark.asyncio + async def test_create_adhoc_run_all( + self, runs_api_async, sift_client, test_tag, ci_pytest_tag + ): + """Test creating an adhoc run with associated assets.""" + run_name = f"test_adhoc_run_assets_{datetime.now(timezone.utc).isoformat()}" + + start_time = datetime.now(timezone.utc) - timedelta(hours=2) + stop_time = datetime.now(timezone.utc) - timedelta(hours=1) + # Get some assets to associate + assets = await sift_client.async_.assets.list_(limit=2) + assert len(assets) == 2 + tags = [test_tag, ci_pytest_tag] + run_create = RunCreate( + name=run_name, + description="Test adhoc run", + start_time=start_time, + stop_time=stop_time, + tags=tags, + metadata={"test_key": "test_value", "number": 42.5, "flag": True}, + ) + created_run = await runs_api_async.create( + run_create, assets=assets, associate_new_data=False + ) + + try: + assert created_run.name == run_name + assert created_run.is_adhoc is True + assert created_run.asset_ids is not None + assert len(created_run.asset_ids) >= len(assets) + # Verify all requested assets are in the run's asset_ids + for asset in assets: + assert asset.id_ in created_run.asset_ids + assert created_run.metadata is not None + assert created_run.metadata["test_key"] == "test_value" + assert created_run.metadata["number"] == 42.5 + assert created_run.metadata["flag"] is True + assert set(created_run.tags) == {tag.name for tag in tags} finally: await runs_api_async.archive(created_run) + @pytest.mark.asyncio + async def test_create_adhoc_run_missing_assets(self, runs_api_async): + """Test creating an adhoc run with missing assets.""" + run_name = f"test_adhoc_run_missing_assets_{datetime.now(timezone.utc).isoformat()}" + run_create = RunCreate( + name=run_name, + start_time=datetime.now(timezone.utc), + stop_time=datetime.now(timezone.utc) + timedelta(seconds=11), + ) + with pytest.raises( + AioRpcError, + match='invalid argument: invalid input syntax for type uuid: "asset-name-not-id"', + ): + await runs_api_async.create( + run_create, assets=["asset-name-not-id"], associate_new_data=False + ) + class TestRunsAPISync: """Test suite for the synchronous Runs API functionality. diff --git a/python/lib/sift_client/_tests/sift_types/test_run.py b/python/lib/sift_client/_tests/sift_types/test_run.py index 5f9e6b1a1..a9433f519 100644 --- a/python/lib/sift_client/_tests/sift_types/test_run.py +++ b/python/lib/sift_client/_tests/sift_types/test_run.py @@ -205,3 +205,16 @@ def test_update_calls_client_and_updates_self(self, mock_run, mock_client): mock_update.assert_called_once_with(updated_run) # Verify it returns self assert result is mock_run + + def test_run_stop(self, mock_run, mock_client): + """Test that stop() calls client.runs.stop and updates self.""" + stopped_run = MagicMock() + mock_client.runs.get.return_value = stopped_run + + # Mock the _update method to verify it's called + with MagicMock() as mock_update: + mock_run._update = mock_update + result = mock_run.stop() + mock_client.runs.stop.assert_called_once_with(run=mock_run) + mock_update.assert_called_once_with(stopped_run) + assert result is mock_run diff --git a/python/lib/sift_client/resources/_base.py b/python/lib/sift_client/resources/_base.py index 676dedbca..217e57a72 100644 --- a/python/lib/sift_client/resources/_base.py +++ b/python/lib/sift_client/resources/_base.py @@ -62,7 +62,6 @@ def _build_name_cel_filters( filter_parts.append(cel.contains("name", name_contains)) if name_regex: filter_parts.append(cel.match("name", name_regex)) - print(filter_parts) return filter_parts def _build_time_cel_filters( diff --git a/python/lib/sift_client/resources/channels.py b/python/lib/sift_client/resources/channels.py index c65ade03c..4659eea28 100644 --- a/python/lib/sift_client/resources/channels.py +++ b/python/lib/sift_client/resources/channels.py @@ -73,6 +73,7 @@ async def list_( modified_before: datetime | None = None, # channel specific asset: Asset | str | None = None, + assets: list[str | Asset] | None = None, run: Run | str | None = None, # common filters description_contains: str | None = None, @@ -89,11 +90,12 @@ async def list_( name_contains: Partial name of the channel. name_regex: Regular expression to filter channels by name. channel_ids: Filter to channels with any of these IDs. - created_after: Filter channels created after this datetime. - created_before: Filter channels created before this datetime. + created_after: Filter channels created after this datetime. Note: This is related to the channel creation time, not the timestamp of the underlying data. + created_before: Filter channels created before this datetime. Note: This is related to the channel creation time, not the timestamp of the underlying data. modified_after: Filter channels modified after this datetime. modified_before: Filter channels modified before this datetime. asset: Filter channels associated with this Asset or asset ID. + assets: Filter channels associated with these Assets or asset IDs. run: Filter channels associated with this Run or run ID. description_contains: Partial description of the channel. include_archived: If True, include archived channels in results. @@ -123,8 +125,13 @@ async def list_( if channel_ids: filter_parts.append(cel.in_("channel_id", channel_ids)) if asset is not None: - asset_id = asset.id_ if isinstance(asset, Asset) else asset + asset_id = asset._id_or_error if isinstance(asset, Asset) else asset filter_parts.append(cel.equals("asset_id", asset_id)) + if assets: + asset_ids = [ + asset._id_or_error if isinstance(asset, Asset) else asset for asset in assets + ] + filter_parts.append(cel.in_("asset_id", asset_ids)) if run is not None: run_id = run.id_ if isinstance(run, Run) else run filter_parts.append(cel.equals("run_id", run_id)) diff --git a/python/lib/sift_client/resources/runs.py b/python/lib/sift_client/resources/runs.py index 844139a7b..d202e3aae 100644 --- a/python/lib/sift_client/resources/runs.py +++ b/python/lib/sift_client/resources/runs.py @@ -4,6 +4,7 @@ from sift_client._internal.low_level_wrappers.runs import RunsLowLevelClient from sift_client.resources._base import ResourceBase +from sift_client.sift_types.asset import Asset from sift_client.sift_types.run import Run, RunCreate, RunUpdate from sift_client.sift_types.tag import Tag from sift_client.util import cel_utils as cel @@ -13,7 +14,6 @@ from datetime import datetime, timedelta from sift_client.client import SiftClient - from sift_client.sift_types.asset import Asset class RunsAPIAsync(ResourceBase): @@ -208,11 +208,20 @@ async def find(self, **kwargs) -> Run | None: async def create( self, create: RunCreate | dict, + assets: list[str | Asset] | None = None, + associate_new_data: bool = False, ) -> Run: """Create a new run. + Note on assets: You do not need to provide asset info when creating a run. + If you pass a Run to future ingestion configs associated with assets, the association will happen automatically then. + However, if you do pass assets and set associate_new_data=True, future ingested data that falls within the Run's time period will be associated with the Run. Even if that data's timestamp is in the past, if it has not been ingested yet and the Run's timestamp envelopes it, it will be associated with the Run. This may be useful if you want to capture a time range for a specific asset or won't know about this Run when ingesting to that asset. + If the data has already been ingested, leave associate_new_data=False. + Args: create: The Run definition to create. + assets: List of assets to associate with the run. Note: if you are associating new data, you must provide assets/asset names. + associate_new_data: If True, future ingested data that falls within the Run's time period will be associated with the Run. Even if that data's timestamp is in the past, if it has not been ingested yet and the Run's timestamp envelopes it, it will be associated with the Run. Returns: The created Run. @@ -220,8 +229,40 @@ async def create( if isinstance(create, dict): create = RunCreate.model_validate(create) + asset_names: list[str] = [] + if associate_new_data: + if not assets: + raise ValueError( + "Assets/asset names must be provided when associate_new_data is True" + ) + asset_names = [asset.name if isinstance(asset, Asset) else asset for asset in assets] + elif assets: + asset_ids = [ + asset._id_or_error if isinstance(asset, Asset) else asset for asset in assets + ] + tag_names = ( + [tag.name if isinstance(tag, Tag) else tag for tag in create.tags] + if create.tags + else None + ) + created_run = await self._low_level_client.create_adhoc_run( + name=create.name, + description=create.description, + asset_ids=asset_ids or [], + start_time=create.start_time, + stop_time=create.stop_time, + tag_names=tag_names, + metadata=create.metadata, + client_key=create.client_key, + ) + return self._apply_client_to_instance(created_run) created_run = await self._low_level_client.create_run(create=create) - return self._apply_client_to_instance(created_run) + created_run = self._apply_client_to_instance(created_run) + if associate_new_data: + await self._low_level_client.create_automatic_run_association_for_assets( + run_id=created_run._id_or_error, asset_names=asset_names + ) + return created_run async def update(self, run: str | Run, update: RunUpdate | dict) -> Run: """Update a Run. @@ -274,20 +315,3 @@ async def stop( run_id = run._id_or_error if isinstance(run, Run) else run await self._low_level_client.stop_run(run_id=run_id or "") return await self.get(run_id=run_id) - - async def create_automatic_association_for_assets( - self, - run: str | Run, - *, - asset_names: list[str], - ) -> None: - """Associate assets with a run for automatic data ingestion. - - Args: - run: The Run or run ID. - asset_names: List of asset names to associate. - """ - run_id = run._id_or_error or "" if isinstance(run, Run) else run - await self._low_level_client.create_automatic_run_association_for_assets( - run_id=run_id, asset_names=asset_names - ) diff --git a/python/lib/sift_client/resources/sync_stubs/__init__.pyi b/python/lib/sift_client/resources/sync_stubs/__init__.pyi index cf0ee1247..d2f8a99e6 100644 --- a/python/lib/sift_client/resources/sync_stubs/__init__.pyi +++ b/python/lib/sift_client/resources/sync_stubs/__init__.pyi @@ -469,6 +469,7 @@ class ChannelsAPI: modified_after: datetime | None = None, modified_before: datetime | None = None, asset: Asset | str | None = None, + assets: list[str | Asset] | None = None, run: Run | str | None = None, description_contains: str | None = None, include_archived: bool | None = None, @@ -484,11 +485,12 @@ class ChannelsAPI: name_contains: Partial name of the channel. name_regex: Regular expression to filter channels by name. channel_ids: Filter to channels with any of these IDs. - created_after: Filter channels created after this datetime. - created_before: Filter channels created before this datetime. + created_after: Filter channels created after this datetime. Note: This is related to the channel creation time, not the timestamp of the underlying data. + created_before: Filter channels created before this datetime. Note: This is related to the channel creation time, not the timestamp of the underlying data. modified_after: Filter channels modified after this datetime. modified_before: Filter channels modified before this datetime. asset: Filter channels associated with this Asset or asset ID. + assets: Filter channels associated with these Assets or asset IDs. run: Filter channels associated with this Run or run ID. description_contains: Partial description of the channel. include_archived: If True, include archived channels in results. @@ -894,28 +896,29 @@ class RunsAPI: """ ... - def create(self, create: RunCreate | dict) -> Run: + def create( + self, + create: RunCreate | dict, + assets: list[str | Asset] | None = None, + associate_new_data: bool = False, + ) -> Run: """Create a new run. + Note on assets: You do not need to provide asset info when creating a run. + If you pass a Run to future ingestion configs associated with assets, the association will happen automatically then. + However, if you do pass assets and set associate_new_data=True, future ingested data that falls within the Run's time period will be associated with the Run. Even if that data's timestamp is in the past, if it has not been ingested yet and the Run's timestamp envelopes it, it will be associated with the Run. This may be useful if you want to capture a time range for a specific asset or won't know about this Run when ingesting to that asset. + If the data has already been ingested, leave associate_new_data=False. + Args: create: The Run definition to create. + assets: List of assets to associate with the run. Note: if you are associating new data, you must provide assets/asset names. + associate_new_data: If True, future ingested data that falls within the Run's time period will be associated with the Run. Even if that data's timestamp is in the past, if it has not been ingested yet and the Run's timestamp envelopes it, it will be associated with the Run. Returns: The created Run. """ ... - def create_automatic_association_for_assets( - self, run: str | Run, *, asset_names: list[str] - ) -> None: - """Associate assets with a run for automatic data ingestion. - - Args: - run: The Run or run ID. - asset_names: List of asset names to associate. - """ - ... - def find(self, **kwargs) -> Run | None: """Find a single run matching the given query. Takes the same arguments as `list_`. If more than one run is found, raises an error. diff --git a/python/lib/sift_client/sift_types/run.py b/python/lib/sift_client/sift_types/run.py index 2e7816a5b..a8242dc40 100644 --- a/python/lib/sift_client/sift_types/run.py +++ b/python/lib/sift_client/sift_types/run.py @@ -117,6 +117,13 @@ def update(self, update: RunUpdate | dict) -> Run: self._update(updated_run) return self + def stop(self) -> Run: + """Stop the run.""" + self.client.runs.stop(run=self) + updated_run = self.client.runs.get(run_id=self.id_) + self._update(updated_run) + return self + class RunBase(ModelCreateUpdateBase): """Base class for Run create and update models with shared fields and validation."""