Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions python/lib/sift_client/_internal/low_level_wrappers/runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
from typing import TYPE_CHECKING, Any, cast

from sift.runs.v2.runs_pb2 import (
CreateAdhocRunRequest,
CreateAdhocRunResponse,
CreateAutomaticRunAssociationForAssetsRequest,
CreateRunResponse,
GetRunRequest,
Expand All @@ -17,10 +19,13 @@
from sift.runs.v2.runs_pb2_grpc import RunServiceStub

from sift_client._internal.low_level_wrappers.base import LowLevelClientBase
from sift_client._internal.util.timestamp import to_pb_timestamp
from sift_client.sift_types.run import Run, RunCreate, RunUpdate
from sift_client.transport import WithGrpcClient

if TYPE_CHECKING:
from datetime import datetime

from sift_client.transport.grpc_transport import GrpcClient

# Configure logging
Expand Down Expand Up @@ -169,3 +174,50 @@ async def create_automatic_run_association_for_assets(
await self._grpc_client.get_stub(RunServiceStub).CreateAutomaticRunAssociationForAssets(
request
)

async def create_adhoc_run(
self,
*,
name: str,
description: str | None = None,
asset_ids: list[str],
start_time: datetime | None = None,
stop_time: datetime | None = None,
tag_names: list[str] | None = None,
metadata: dict[str, str | float | bool] | None = None,
client_key: str | None = None,
) -> Run:
"""Create an adhoc run.

Args:
name: The name of the run.
description: Optional description of the run.
asset_ids: List of asset IDs to associate with the run.
start_time: Optional start time of the run.
stop_time: Optional stop time of the run.
tag_names: Optional list of tag names to associate with the run.
metadata: Optional metadata to associate with the run.
client_key: Optional client key for the run.

Returns:
The created Run.

Raises:
ValueError: If name is not provided or if start_time/stop_time are invalid.
"""
from sift_client.util.metadata import metadata_dict_to_proto

request = CreateAdhocRunRequest(
name=name,
description=description or "",
start_time=to_pb_timestamp(start_time) if start_time else None,
stop_time=to_pb_timestamp(stop_time) if stop_time else None,
asset_ids=asset_ids,
tags=tag_names,
metadata=metadata_dict_to_proto(metadata) if metadata else None,
client_key=client_key,
)

response = await self._grpc_client.get_stub(RunServiceStub).CreateAdhocRun(request)
grpc_run = cast("CreateAdhocRunResponse", response).run
return Run._from_proto(grpc_run)
111 changes: 102 additions & 9 deletions python/lib/sift_client/_tests/resources/test_runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@
from datetime import datetime, timedelta, timezone

import pytest
from grpc.aio import AioRpcError

from sift_client import SiftClient
from sift_client.resources import RunsAPI, RunsAPIAsync
from sift_client.sift_types import Run
from sift_client.sift_types import ChannelConfig, ChannelDataType, Flow, Run
from sift_client.sift_types.run import RunCreate, RunUpdate

pytestmark = pytest.mark.integration
Expand Down Expand Up @@ -491,6 +492,30 @@ async def test_stop_run_with_start_time(self, runs_api_async, new_run):
class TestAssetAssociation:
"""Tests for the async asset association methods."""

async def ingest_data_to_asset(self, sift_client, asset_name):
"""Ingest some data into an asset."""
flow = Flow(
name="test-double-flow",
channels=[
ChannelConfig(name="double-channel", data_type=ChannelDataType.DOUBLE),
],
)

await sift_client.async_.ingestion.create_ingestion_config(
asset_name=asset_name,
flows=[flow],
)

start_time = datetime.now(tz=timezone.utc)
for i in range(10):
timestamp = start_time + timedelta(seconds=i)
flow.ingest(
timestamp=timestamp,
channel_values={"double-channel": float(i)},
)

sift_client.async_.ingestion.wait_for_ingestion_to_complete(timeout=2)

@pytest.mark.asyncio
async def test_create_automatic_association_for_assets(self, runs_api_async, sift_client):
"""Test associating assets with a run for automatic data ingestion."""
Expand All @@ -500,29 +525,97 @@ async def test_create_automatic_association_for_assets(self, runs_api_async, sif
name=run_name,
description="Test run for asset association",
tags=["sift-client-pytest"],
start_time=datetime.now(timezone.utc),
stop_time=datetime.now(timezone.utc) + timedelta(seconds=11),
)
created_run = await runs_api_async.create(run_create)
created_run = None

try:
# Get some assets to associate
assets = await sift_client.async_.assets.list_(limit=2)
assert len(assets) >= 1

asset_names = [asset.name for asset in assets[:2]]
assert len(assets) >= 2

# Associate assets with the run
await runs_api_async.create_automatic_association_for_assets(
run=created_run, asset_names=asset_names
created_run = await runs_api_async.create(
run_create, assets=assets, associate_new_data=True
)

for asset in assets:
await self.ingest_data_to_asset(sift_client, asset.name)
# Verify the association by getting the run and checking asset_ids
updated_run = await runs_api_async.get(run_id=created_run.id_)
updated_run = await runs_api_async.get(run_id=created_run._id_or_error)
assert updated_run.asset_ids is not None
assert len(updated_run.asset_ids) >= len(asset_names)
assert len(updated_run.asset_ids) >= len(assets)
for asset in assets:
assert asset.id_ in updated_run.asset_ids

# Fetching these channels is flaky/slow depending on how long update monitor takes to run.
# channels = await sift_client.async_.channels.list_(run=created_run)
# assert channels is not None
# assert "double-channel" in [channel.name for channel in channels]

finally:
await runs_api_async.archive(created_run)

@pytest.mark.asyncio
async def test_create_adhoc_run_all(
self, runs_api_async, sift_client, test_tag, ci_pytest_tag
):
"""Test creating an adhoc run with associated assets."""
run_name = f"test_adhoc_run_assets_{datetime.now(timezone.utc).isoformat()}"

start_time = datetime.now(timezone.utc) - timedelta(hours=2)
stop_time = datetime.now(timezone.utc) - timedelta(hours=1)
# Get some assets to associate
assets = await sift_client.async_.assets.list_(limit=2)
assert len(assets) == 2
tags = [test_tag, ci_pytest_tag]

run_create = RunCreate(
name=run_name,
description="Test adhoc run",
start_time=start_time,
stop_time=stop_time,
tags=tags,
metadata={"test_key": "test_value", "number": 42.5, "flag": True},
)
created_run = await runs_api_async.create(
run_create, assets=assets, associate_new_data=False
)

try:
assert created_run.name == run_name
assert created_run.is_adhoc is True
assert created_run.asset_ids is not None
assert len(created_run.asset_ids) >= len(assets)
# Verify all requested assets are in the run's asset_ids
for asset in assets:
assert asset.id_ in created_run.asset_ids
assert created_run.metadata is not None
assert created_run.metadata["test_key"] == "test_value"
assert created_run.metadata["number"] == 42.5
assert created_run.metadata["flag"] is True
assert set(created_run.tags) == {tag.name for tag in tags}
finally:
await runs_api_async.archive(created_run)

@pytest.mark.asyncio
async def test_create_adhoc_run_missing_assets(self, runs_api_async):
"""Test creating an adhoc run with missing assets."""
run_name = f"test_adhoc_run_missing_assets_{datetime.now(timezone.utc).isoformat()}"
run_create = RunCreate(
name=run_name,
start_time=datetime.now(timezone.utc),
stop_time=datetime.now(timezone.utc) + timedelta(seconds=11),
)
with pytest.raises(
AioRpcError,
match='invalid argument: invalid input syntax for type uuid: "asset-name-not-id"',
):
await runs_api_async.create(
run_create, assets=["asset-name-not-id"], associate_new_data=False
)


class TestRunsAPISync:
"""Test suite for the synchronous Runs API functionality.
Expand Down
13 changes: 13 additions & 0 deletions python/lib/sift_client/_tests/sift_types/test_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,3 +205,16 @@ def test_update_calls_client_and_updates_self(self, mock_run, mock_client):
mock_update.assert_called_once_with(updated_run)
# Verify it returns self
assert result is mock_run

def test_run_stop(self, mock_run, mock_client):
"""Test that stop() calls client.runs.stop and updates self."""
stopped_run = MagicMock()
mock_client.runs.get.return_value = stopped_run

# Mock the _update method to verify it's called
with MagicMock() as mock_update:
mock_run._update = mock_update
result = mock_run.stop()
mock_client.runs.stop.assert_called_once_with(run=mock_run)
mock_update.assert_called_once_with(stopped_run)
assert result is mock_run
1 change: 0 additions & 1 deletion python/lib/sift_client/resources/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ def _build_name_cel_filters(
filter_parts.append(cel.contains("name", name_contains))
if name_regex:
filter_parts.append(cel.match("name", name_regex))
print(filter_parts)
return filter_parts

def _build_time_cel_filters(
Expand Down
13 changes: 10 additions & 3 deletions python/lib/sift_client/resources/channels.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ async def list_(
modified_before: datetime | None = None,
# channel specific
asset: Asset | str | None = None,
assets: list[str | Asset] | None = None,
run: Run | str | None = None,
# common filters
description_contains: str | None = None,
Expand All @@ -89,11 +90,12 @@ async def list_(
name_contains: Partial name of the channel.
name_regex: Regular expression to filter channels by name.
channel_ids: Filter to channels with any of these IDs.
created_after: Filter channels created after this datetime.
created_before: Filter channels created before this datetime.
created_after: Filter channels created after this datetime. Note: This is related to the channel creation time, not the timestamp of the underlying data.
Comment thread
alexluck-sift marked this conversation as resolved.
created_before: Filter channels created before this datetime. Note: This is related to the channel creation time, not the timestamp of the underlying data.
modified_after: Filter channels modified after this datetime.
modified_before: Filter channels modified before this datetime.
asset: Filter channels associated with this Asset or asset ID.
assets: Filter channels associated with these Assets or asset IDs.
run: Filter channels associated with this Run or run ID.
description_contains: Partial description of the channel.
include_archived: If True, include archived channels in results.
Expand Down Expand Up @@ -123,8 +125,13 @@ async def list_(
if channel_ids:
filter_parts.append(cel.in_("channel_id", channel_ids))
if asset is not None:
asset_id = asset.id_ if isinstance(asset, Asset) else asset
asset_id = asset._id_or_error if isinstance(asset, Asset) else asset
filter_parts.append(cel.equals("asset_id", asset_id))
if assets:
asset_ids = [
asset._id_or_error if isinstance(asset, Asset) else asset for asset in assets
]
filter_parts.append(cel.in_("asset_id", asset_ids))
if run is not None:
run_id = run.id_ if isinstance(run, Run) else run
filter_parts.append(cel.equals("run_id", run_id))
Expand Down
62 changes: 43 additions & 19 deletions python/lib/sift_client/resources/runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from sift_client._internal.low_level_wrappers.runs import RunsLowLevelClient
from sift_client.resources._base import ResourceBase
from sift_client.sift_types.asset import Asset
from sift_client.sift_types.run import Run, RunCreate, RunUpdate
from sift_client.sift_types.tag import Tag
from sift_client.util import cel_utils as cel
Expand All @@ -13,7 +14,6 @@
from datetime import datetime, timedelta

from sift_client.client import SiftClient
from sift_client.sift_types.asset import Asset


class RunsAPIAsync(ResourceBase):
Expand Down Expand Up @@ -208,20 +208,61 @@ async def find(self, **kwargs) -> Run | None:
async def create(
self,
create: RunCreate | dict,
assets: list[str | Asset] | None = None,
Comment thread
ian-sift marked this conversation as resolved.
associate_new_data: bool = False,
) -> Run:
"""Create a new run.

Note on assets: You do not need to provide asset info when creating a run.
If you pass a Run to future ingestion configs associated with assets, the association will happen automatically then.
However, if you do pass assets and set associate_new_data=True, future ingested data that falls within the Run's time period will be associated with the Run. Even if that data's timestamp is in the past, if it has not been ingested yet and the Run's timestamp envelopes it, it will be associated with the Run. This may be useful if you want to capture a time range for a specific asset or won't know about this Run when ingesting to that asset.
If the data has already been ingested, leave associate_new_data=False.

Args:
create: The Run definition to create.
assets: List of assets to associate with the run. Note: if you are associating new data, you must provide assets/asset names.
associate_new_data: If True, future ingested data that falls within the Run's time period will be associated with the Run. Even if that data's timestamp is in the past, if it has not been ingested yet and the Run's timestamp envelopes it, it will be associated with the Run.

Returns:
The created Run.
"""
if isinstance(create, dict):
create = RunCreate.model_validate(create)

asset_names: list[str] = []
if associate_new_data:
if not assets:
raise ValueError(
"Assets/asset names must be provided when associate_new_data is True"
)
asset_names = [asset.name if isinstance(asset, Asset) else asset for asset in assets]
elif assets:
asset_ids = [
asset._id_or_error if isinstance(asset, Asset) else asset for asset in assets
]
tag_names = (
[tag.name if isinstance(tag, Tag) else tag for tag in create.tags]
if create.tags
else None
)
created_run = await self._low_level_client.create_adhoc_run(
name=create.name,
description=create.description,
asset_ids=asset_ids or [],
start_time=create.start_time,
stop_time=create.stop_time,
tag_names=tag_names,
metadata=create.metadata,
client_key=create.client_key,
)
return self._apply_client_to_instance(created_run)
created_run = await self._low_level_client.create_run(create=create)
return self._apply_client_to_instance(created_run)
created_run = self._apply_client_to_instance(created_run)
if associate_new_data:
await self._low_level_client.create_automatic_run_association_for_assets(
run_id=created_run._id_or_error, asset_names=asset_names
)
return created_run

async def update(self, run: str | Run, update: RunUpdate | dict) -> Run:
"""Update a Run.
Expand Down Expand Up @@ -274,20 +315,3 @@ async def stop(
run_id = run._id_or_error if isinstance(run, Run) else run
await self._low_level_client.stop_run(run_id=run_id or "")
return await self.get(run_id=run_id)

async def create_automatic_association_for_assets(
self,
run: str | Run,
*,
asset_names: list[str],
) -> None:
"""Associate assets with a run for automatic data ingestion.

Args:
run: The Run or run ID.
asset_names: List of asset names to associate.
"""
run_id = run._id_or_error or "" if isinstance(run, Run) else run
await self._low_level_client.create_automatic_run_association_for_assets(
run_id=run_id, asset_names=asset_names
)
Loading
Loading