From 27b39ce5c1ada0a831721cce3bb133ff0eeed711 Mon Sep 17 00:00:00 2001 From: Taylor Jones Date: Wed, 25 Feb 2026 16:49:33 -0600 Subject: [PATCH 1/2] python(feat): Add channel archive/unarchive support --- .../_internal/low_level_wrappers/channels.py | 20 +++++ .../_tests/resources/test_channels.py | 83 +++++++++++++++---- python/lib/sift_client/resources/channels.py | 54 ++++++++++-- 3 files changed, 137 insertions(+), 20 deletions(-) diff --git a/python/lib/sift_client/_internal/low_level_wrappers/channels.py b/python/lib/sift_client/_internal/low_level_wrappers/channels.py index 0eed24ae8..50f3bf71b 100644 --- a/python/lib/sift_client/_internal/low_level_wrappers/channels.py +++ b/python/lib/sift_client/_internal/low_level_wrappers/channels.py @@ -4,6 +4,8 @@ from typing import TYPE_CHECKING, Any, cast from sift.channels.v3.channels_pb2 import ( + BatchArchiveChannelsRequest, + BatchUnarchiveChannelsRequest, GetChannelRequest, GetChannelResponse, ListChannelsRequest, @@ -117,3 +119,21 @@ async def list_all_channels( order_by=order_by, max_results=max_results, ) + + async def batch_archive_channels(self, channel_ids: list[str]) -> None: + """Batch archive channels by setting active to false. + + Args: + channel_ids: The channel IDs to archive. + """ + request = BatchArchiveChannelsRequest(channel_ids=channel_ids) + await self._grpc_client.get_stub(ChannelServiceStub).BatchArchiveChannels(request) + + async def batch_unarchive_channels(self, channel_ids: list[str]) -> None: + """Batch unarchive channels by setting active to true. + + Args: + channel_ids: The channel IDs to unarchive. + """ + request = BatchUnarchiveChannelsRequest(channel_ids=channel_ids) + await self._grpc_client.get_stub(ChannelServiceStub).BatchUnarchiveChannels(request) diff --git a/python/lib/sift_client/_tests/resources/test_channels.py b/python/lib/sift_client/_tests/resources/test_channels.py index e369652db..7bd77332e 100644 --- a/python/lib/sift_client/_tests/resources/test_channels.py +++ b/python/lib/sift_client/_tests/resources/test_channels.py @@ -7,7 +7,12 @@ - Error handling and edge cases """ +import asyncio +import uuid +from urllib.parse import urljoin + import pytest +import requests from sift_client import SiftClient from sift_client.resources import ChannelsAPI, ChannelsAPIAsync @@ -183,21 +188,6 @@ async def test_list_with_limit(self, channels_api_async): assert isinstance(channels_3, list) assert len(channels_3) <= 3 - # TODO: active channel test - # @pytest.mark.asyncio - # async def test_list_include_archived(self, channels_api_async): - # """Test channel listing with archived channels included.""" - # # Test without archived channels (default) - # channels_active = await channels_api_async.list_(limit=5, include_archived=False) - # assert isinstance(channels_active, list) - # - # # Test with archived channels included - # channels_all = await channels_api_async.list_(limit=5, include_archived=True) - # assert isinstance(channels_all, list) - # - # # Should have at least as many channels when including archived - # assert len(channels_all) >= len(channels_active) - @pytest.mark.asyncio async def test_list_with_time_filters(self, channels_api_async): """Test channel listing with time-based filters.""" @@ -240,6 +230,69 @@ async def test_find_multiple_raises_error(self, channels_api_async): with pytest.raises(ValueError, match="Multiple"): await channels_api_async.find(name_contains="test", limit=5) + class TestArchive: + """Tests for the async archive method.""" + + @pytest.mark.asyncio + async def test_create_archive_unarchive_flow(self, channels_api_async, test_channel): + """Create a channel via REST schemaless ingest, then archive/unarchive via channels API; verify at each step with find.""" + asset_name = test_channel.asset.name + asset_id = test_channel.asset_id + unique_name = f"archive-test-channel-{uuid.uuid4().hex}" + + rest_client = channels_api_async.client.rest_client + rest_url = urljoin(rest_client.base_url, "api/v2/ingest") + api_key = rest_client._config.api_key + + # Create the channel by ingesting a single data point (schemaless). + # + # This is currently the simplest way to create a channel. Simply + # creating a channel schema is not sufficient since schemaless channels + # that have no data are filtered out of the `ListChannels` response. + payload = { + "asset_name": asset_name, + "data": [ + { + "timestamp": "2024-11-06T10:27:20-07:00", + "values": [ + {"channel": unique_name, "value": 1}, + ], + } + ], + } + resp = requests.post( + rest_url, + headers={ + "Authorization": f"Bearer {api_key}", + "Content-Type": "application/json", + }, + json=payload, + timeout=30, + ) + resp.raise_for_status() + + # Retry find until the channel is visible. + created = None + for _ in range(20): + created = await channels_api_async.find(name=unique_name, asset=asset_id) + if created is not None: + break + await asyncio.sleep(0.5) + assert created is not None, f"Channel {unique_name} did not appear after ingest" + + await channels_api_async.archive([created]) + found_archived = await channels_api_async.find( + name=unique_name, asset=asset_id, archived=True + ) + assert found_archived is not None + + await channels_api_async.unarchive([created]) + found_active = await channels_api_async.find(name=unique_name, asset=asset_id) + assert found_active is not None + + # Cleanup by archiving the channel again + await channels_api_async.archive([created]) + # TODO: data retrieval tests # class TestGetData: # """Tests for the async get_data method.""" diff --git a/python/lib/sift_client/resources/channels.py b/python/lib/sift_client/resources/channels.py index 5dc09890d..fdc6990e1 100644 --- a/python/lib/sift_client/resources/channels.py +++ b/python/lib/sift_client/resources/channels.py @@ -19,6 +19,30 @@ from sift_client.sift_types.channel import Channel +def _channel_ids_from_list(items: list[str | Channel]) -> list[str]: + """Resolve a list of channel IDs or Channel objects to a list of channel IDs. + + Args: + items: List of channel IDs (str) or Channel objects. + + Returns: + List of channel ID strings. + + Raises: + ValueError: If any Channel object has no id set. + """ + ids: list[str] = [] + for item in items: + if isinstance(item, str): + ids.append(item) + else: + try: + ids.append(item._id_or_error) + except ValueError: + raise ValueError("One or more Channel objects have no id set.") from None + return ids + + class ChannelsAPIAsync(ResourceBase): """High-level API for interacting with channels. @@ -75,7 +99,7 @@ async def list_( run: Run | str | None = None, # common filters description_contains: str | None = None, - include_archived: bool | None = None, + archived: bool | None = None, filter_query: str | None = None, order_by: str | None = None, limit: int | None = None, @@ -96,7 +120,7 @@ async def list_( assets: Filter channels associated with these Assets or asset IDs. run: Filter channels associated with this Run or run ID. description_contains: Partial description of the channel. - include_archived: If True, include archived channels in results. + archived: If True, searches for archived channels. filter_query: Explicit CEL query to filter channels. order_by: Field and direction to order results by. limit: Maximum number of channels to return. If None, returns all matches. @@ -117,7 +141,6 @@ async def list_( *self._build_common_cel_filters( description_contains=description_contains, filter_query=filter_query, - include_archived=include_archived, ), ] if channel_ids: @@ -133,9 +156,10 @@ async def list_( if run is not None: run_id = run.id_ if isinstance(run, Run) else run filter_parts.append(cel.equals("run_id", run_id)) + # This is opposite of usual archived state - if include_archived is not None: - filter_parts.append(cel.equals("active", not include_archived)) + if archived is not None: + filter_parts.append(cel.equals("active", not archived)) query_filter = cel.and_(*filter_parts) @@ -163,6 +187,26 @@ async def find(self, **kwargs) -> Channel | None: return channels[0] return None + async def archive(self, channels: list[str | Channel]) -> None: + """Batch archive channels by setting active to false. + + Args: + channels: List of channel IDs or Channel objects to archive. If a Channel + has no id set, raises ValueError. + """ + channel_ids = _channel_ids_from_list(channels) + await self._low_level_client.batch_archive_channels(channel_ids) + + async def unarchive(self, channels: list[str | Channel]) -> None: + """Batch unarchive channels by setting active to true. + + Args: + channels: List of channel IDs or Channel objects to unarchive. If a Channel + has no id set, raises ValueError. + """ + channel_ids = _channel_ids_from_list(channels) + await self._low_level_client.batch_unarchive_channels(channel_ids) + def _ensure_data_low_level_client(self): """Ensure that the data low level client is initialized. Separated out like this to not require large dependencies (pandas/pyarrow) for the client if not fetching data.""" if self._data_low_level_client is None: From b996a54b4d23ecc93ddb5d8d3c9af0176b078533 Mon Sep 17 00:00:00 2001 From: Taylor Jones Date: Wed, 25 Feb 2026 17:02:12 -0600 Subject: [PATCH 2/2] Run gen-stubs script --- .../resources/sync_stubs/__init__.pyi | 22 +++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/python/lib/sift_client/resources/sync_stubs/__init__.pyi b/python/lib/sift_client/resources/sync_stubs/__init__.pyi index 56f49cd7b..843a0061f 100644 --- a/python/lib/sift_client/resources/sync_stubs/__init__.pyi +++ b/python/lib/sift_client/resources/sync_stubs/__init__.pyi @@ -407,6 +407,15 @@ class ChannelsAPI: ... def _run(self, coro): ... + def archive(self, channels: list[str | Channel]) -> None: + """Batch archive channels by setting active to false. + + Args: + channels: List of channel IDs or Channel objects to archive. If a Channel + has no id set, raises ValueError. + """ + ... + def find(self, **kwargs) -> Channel | None: """Find a single channel matching the given query. Takes the same arguments as `list`. If more than one channel is found, raises an error. @@ -484,7 +493,7 @@ class ChannelsAPI: assets: list[str | Asset] | None = None, run: Run | str | None = None, description_contains: str | None = None, - include_archived: bool | None = None, + archived: bool | None = None, filter_query: str | None = None, order_by: str | None = None, limit: int | None = None, @@ -505,7 +514,7 @@ class ChannelsAPI: assets: Filter channels associated with these Assets or asset IDs. run: Filter channels associated with this Run or run ID. description_contains: Partial description of the channel. - include_archived: If True, include archived channels in results. + archived: If True, searches for archived channels. filter_query: Explicit CEL query to filter channels. order_by: Field and direction to order results by. limit: Maximum number of channels to return. If None, returns all matches. @@ -515,6 +524,15 @@ class ChannelsAPI: """ ... + def unarchive(self, channels: list[str | Channel]) -> None: + """Batch unarchive channels by setting active to true. + + Args: + channels: List of channel IDs or Channel objects to unarchive. If a Channel + has no id set, raises ValueError. + """ + ... + class FileAttachmentsAPI: """Sync counterpart to `FileAttachmentsAPIAsync`.