Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
6829245
add streams api
andersfylling Apr 14, 2026
3225b62
cleanup
andersfylling Apr 14, 2026
193403c
use correct client property during testing
andersfylling Apr 14, 2026
b65d523
fix sync import
andersfylling Apr 14, 2026
7efcb0b
fix sync mock client
andersfylling Apr 14, 2026
cfbca3f
remove incorrect streams attribute
andersfylling Apr 14, 2026
ed6ed65
make resource writeable
andersfylling Apr 14, 2026
5a52db6
fix: set chunking limits for Streams API create and delete operations
andersfylling Apr 20, 2026
1e39c1b
regenerate sync API for Streams after adding chunking limits
andersfylling Apr 20, 2026
b25a668
fix: remove unused imports in sync data modeling API init
andersfylling Apr 20, 2026
04f1ec5
Merge branch 'master' into andersfylling/cognite-sdk/streams-api
andersfylling Apr 20, 2026
71c5d9e
Merge branch 'master' into andersfylling/cognite-sdk/streams-api
andersfylling Apr 23, 2026
2c50de3
refactor(streams): clean up docstrings and remove StreamTemplate.version
andersfylling Apr 23, 2026
8cee723
test: remove version field assertions from StreamTemplate test
andersfylling Apr 23, 2026
ec2959d
refactor(streams): address review feedback from haakonvt
andersfylling Apr 23, 2026
1b71d9d
address review comments
andersfylling May 7, 2026
1e6937c
fix sphinx doc generation
andersfylling May 7, 2026
987b7d1
ensure we run tests on docstrings
andersfylling May 7, 2026
3695a86
Merge branch 'master' into andersfylling/cognite-sdk/streams-api
andersfylling May 7, 2026
934d0e1
remove support for dict type
andersfylling May 7, 2026
6027ab1
use _list helper
andersfylling May 7, 2026
11ce01c
allow limit args
andersfylling May 7, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cognite/client/_api/data_modeling/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from cognite.client._api.data_modeling.instances import InstancesAPI
from cognite.client._api.data_modeling.spaces import SpacesAPI
from cognite.client._api.data_modeling.statistics import StatisticsAPI
from cognite.client._api.data_modeling.streams import StreamsAPI
from cognite.client._api.data_modeling.views import ViewsAPI
from cognite.client._api_client import APIClient

Expand All @@ -27,6 +28,7 @@ def __init__(self, config: ClientConfig, api_version: str | None, cognite_client
self.instances = InstancesAPI(config, api_version, cognite_client)
self.graphql = DataModelingGraphQLAPI(config, api_version, cognite_client)
self.statistics = StatisticsAPI(config, api_version, cognite_client)
self.streams = StreamsAPI(config, api_version, cognite_client)

def _get_semaphore(
self, operation: Literal["read", "write", "delete", "search", "read_schema", "write_schema"]
Expand Down
160 changes: 160 additions & 0 deletions cognite/client/_api/data_modeling/streams.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
from __future__ import annotations

from collections.abc import Sequence
from typing import TYPE_CHECKING, overload

from cognite.client._api_client import APIClient
from cognite.client.data_classes.data_modeling.streams import (
Stream,
StreamList,
StreamWrite,
)
from cognite.client.utils._experimental import FeaturePreviewWarning
from cognite.client.utils._identifier import Identifier, IdentifierSequence
from cognite.client.utils.useful_types import SequenceNotStr

if TYPE_CHECKING:
from cognite.client import AsyncCogniteClient
from cognite.client.config import ClientConfig


class StreamsAPI(APIClient):
_RESOURCE_PATH = "/streams"

def __init__(self, config: ClientConfig, api_version: str | None, cognite_client: AsyncCogniteClient) -> None:
super().__init__(config, api_version, cognite_client)
self._CREATE_LIMIT = 1
self._DELETE_LIMIT = 1
Comment thread
andersfylling marked this conversation as resolved.
self._warning = FeaturePreviewWarning(
api_maturity="General Availability", sdk_maturity="alpha", feature_name="Streams"
)

@overload
async def create(self, items: StreamWrite) -> Stream: ...

@overload
async def create(self, items: Sequence[StreamWrite]) -> StreamList: ...

async def create(self, items: StreamWrite | Sequence[StreamWrite]) -> Stream | StreamList:
"""`Create streams <https://api-docs.cognite.com/20230101/tag/Streams/operation/createStream>`_.

Args:
items (StreamWrite | Sequence[StreamWrite]): One or more streams to create.

Returns:
Stream | StreamList: The created stream or streams.
Comment thread
andersfylling marked this conversation as resolved.

Examples:

Create a single stream from a template:

>>> from cognite.client import CogniteClient
>>> from cognite.client.data_classes.data_modeling.streams import (
... StreamWrite,
... StreamTemplate,
... StreamTemplateWriteSettings,
... )
>>> client = CogniteClient()
>>> # async_client = AsyncCogniteClient() # another option
>>> res = client.data_modeling.streams.create(
... StreamWrite(
... external_id="my-stream",
... settings=StreamTemplateWriteSettings(
... template=StreamTemplate(name="ImmutableTestStream"),
... ),
... )
... )
"""
self._warning.warn()
return await self._create_multiple(
items=items,
list_cls=StreamList,
resource_cls=Stream,
input_resource_cls=StreamWrite,
)

async def list(self) -> StreamList:
"""`List streams <https://api-docs.cognite.com/20230101/tag/Streams/operation/listStreams>`_.

Note:
There is no paging limit parameter: the endpoint returns all streams in the project
(projects are expected to have few streams).

Returns:
StreamList: The streams in the project.
Comment thread
andersfylling marked this conversation as resolved.

Examples:

List all streams in the project:

>>> from cognite.client import CogniteClient
>>> client = CogniteClient()
>>> # async_client = AsyncCogniteClient() # another option
>>> res = client.data_modeling.streams.list()
"""
self._warning.warn()
return await self._list(method="GET", list_cls=StreamList, resource_cls=Stream)

async def retrieve(self, external_id: str, include_statistics: bool | None = None) -> Stream | None:
"""`Retrieve a stream <https://api-docs.cognite.com/20230101/tag/Streams/operation/getStream>`_.

Args:
external_id (str): External ID of the stream to retrieve.
include_statistics (bool | None): When ``True``, usage statistics will be returned together
with stream settings. Computing statistics can be expensive.

Returns:
Stream | None: The stream metadata (and optionally statistics), or ``None`` if not found.

Examples:

Retrieve a stream by external ID:

>>> from cognite.client import CogniteClient
>>> client = CogniteClient()
>>> # async_client = AsyncCogniteClient() # another option
>>> res = client.data_modeling.streams.retrieve("my-stream")

Retrieve a stream with usage statistics:

>>> res = client.data_modeling.streams.retrieve(
... "my-stream",
... include_statistics=True,
... )
"""
self._warning.warn()
return await self._retrieve(
cls=Stream,
identifier=Identifier(external_id),
params={"includeStatistics": include_statistics} if include_statistics is not None else None,
)

async def delete(self, external_id: str | SequenceNotStr[str]) -> None:
"""`Delete streams <https://api-docs.cognite.com/20230101/tag/Streams/operation/deleteStreams>`_.

Note:
Deletion is a soft delete that retains capacity for an extended period;
prefer deleting only when necessary.

Args:
external_id (str | SequenceNotStr[str]): External ID or list of external IDs of
streams to delete.

Examples:

Delete a single stream:

>>> from cognite.client import CogniteClient
>>> client = CogniteClient()
>>> # async_client = AsyncCogniteClient() # another option
>>> client.data_modeling.streams.delete("my-stream")

Delete multiple streams:

>>> client.data_modeling.streams.delete(["stream-a", "stream-b"])
"""
self._warning.warn()
await self._delete_multiple(
identifiers=IdentifierSequence.load(external_ids=external_id),
wrap_ids=True,
)
2 changes: 2 additions & 0 deletions cognite/client/_cognite_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
from cognite.client._api.data_modeling.space_statistics import SpaceStatisticsAPI
from cognite.client._api.data_modeling.spaces import SpacesAPI
from cognite.client._api.data_modeling.statistics import StatisticsAPI
from cognite.client._api.data_modeling.streams import StreamsAPI
from cognite.client._api.data_modeling.views import ViewsAPI
from cognite.client._api.datapoints import DatapointsAPI
from cognite.client._api.datapoints_subscriptions import DatapointsSubscriptionAPI
Expand Down Expand Up @@ -425,6 +426,7 @@ def _make_accessors_for_building_docs() -> None:
AsyncCogniteClient.data_modeling.graphql = DataModelingGraphQLAPI # type: ignore
AsyncCogniteClient.data_modeling.statistics = StatisticsAPI # type: ignore
AsyncCogniteClient.data_modeling.statistics.spaces = SpaceStatisticsAPI # type: ignore
AsyncCogniteClient.data_modeling.streams = StreamsAPI # type: ignore
AsyncCogniteClient.documents = DocumentsAPI # type: ignore
AsyncCogniteClient.documents.previews = DocumentPreviewAPI # type: ignore
AsyncCogniteClient.workflows = WorkflowAPI # type: ignore
Expand Down
9 changes: 3 additions & 6 deletions cognite/client/_sync_api/data_modeling/__init__.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,23 @@
"""
===============================================================================
c76b2b9351d2a5eee6a710fa9893bfa4
584030bc5e2a4b8168f54c101f7f521d
This file is auto-generated from the Async API modules, - do not edit manually!
===============================================================================
"""

from __future__ import annotations

from typing import TYPE_CHECKING

from cognite.client import AsyncCogniteClient
from cognite.client._sync_api.data_modeling.containers import SyncContainersAPI
from cognite.client._sync_api.data_modeling.data_models import SyncDataModelsAPI
from cognite.client._sync_api.data_modeling.graphql import SyncDataModelingGraphQLAPI
from cognite.client._sync_api.data_modeling.instances import SyncInstancesAPI
from cognite.client._sync_api.data_modeling.spaces import SyncSpacesAPI
from cognite.client._sync_api.data_modeling.statistics import SyncStatisticsAPI
from cognite.client._sync_api.data_modeling.streams import SyncStreamsAPI
from cognite.client._sync_api.data_modeling.views import SyncViewsAPI
from cognite.client._sync_api_client import SyncAPIClient

if TYPE_CHECKING:
from cognite.client import AsyncCogniteClient


class SyncDataModelingAPI(SyncAPIClient):
"""Auto-generated, do not modify manually."""
Expand All @@ -35,3 +31,4 @@ def __init__(self, async_client: AsyncCogniteClient) -> None:
self.instances = SyncInstancesAPI(async_client)
self.graphql = SyncDataModelingGraphQLAPI(async_client)
self.statistics = SyncStatisticsAPI(async_client)
self.streams = SyncStreamsAPI(async_client)
149 changes: 149 additions & 0 deletions cognite/client/_sync_api/data_modeling/streams.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
"""
===============================================================================
3bfd805fbceb341bb437635ac632d5ad
This file is auto-generated from the Async API modules, - do not edit manually!
===============================================================================
"""

from __future__ import annotations

from collections.abc import Sequence
from typing import TYPE_CHECKING, overload

from cognite.client import AsyncCogniteClient
from cognite.client._sync_api_client import SyncAPIClient
from cognite.client.data_classes.data_modeling.streams import Stream, StreamList, StreamWrite
from cognite.client.utils._async_helpers import run_sync
from cognite.client.utils.useful_types import SequenceNotStr

if TYPE_CHECKING:
from cognite.client import AsyncCogniteClient


class SyncStreamsAPI(SyncAPIClient):
"""Auto-generated, do not modify manually."""

def __init__(self, async_client: AsyncCogniteClient) -> None:
self.__async_client = async_client

@overload
def create(self, items: StreamWrite) -> Stream: ...

@overload
def create(self, items: Sequence[StreamWrite]) -> StreamList: ...

def create(self, items: StreamWrite | Sequence[StreamWrite]) -> Stream | StreamList:
"""
`Create streams <https://api-docs.cognite.com/20230101/tag/Streams/operation/createStream>`_.

Args:
items (StreamWrite | Sequence[StreamWrite]): One or more streams to create.

Returns:
Stream | StreamList: The created stream or streams.

Examples:

Create a single stream from a template:

>>> from cognite.client import CogniteClient
>>> from cognite.client.data_classes.data_modeling.streams import (
... StreamWrite,
... StreamTemplate,
... StreamTemplateWriteSettings,
... )
>>> client = CogniteClient()
>>> # async_client = AsyncCogniteClient() # another option
>>> res = client.data_modeling.streams.create(
... StreamWrite(
... external_id="my-stream",
... settings=StreamTemplateWriteSettings(
... template=StreamTemplate(name="ImmutableTestStream"),
... ),
... )
... )
"""
return run_sync(self.__async_client.data_modeling.streams.create(items=items))

def list(self) -> StreamList:
"""
`List streams <https://api-docs.cognite.com/20230101/tag/Streams/operation/listStreams>`_.

Note:
There is no paging limit parameter: the endpoint returns all streams in the project
(projects are expected to have few streams).

Returns:
StreamList: The streams in the project.

Examples:

List all streams in the project:

>>> from cognite.client import CogniteClient
>>> client = CogniteClient()
>>> # async_client = AsyncCogniteClient() # another option
>>> res = client.data_modeling.streams.list()
"""
return run_sync(self.__async_client.data_modeling.streams.list())

def retrieve(self, external_id: str, include_statistics: bool | None = None) -> Stream | None:
"""
`Retrieve a stream <https://api-docs.cognite.com/20230101/tag/Streams/operation/getStream>`_.

Args:
external_id (str): External ID of the stream to retrieve.
include_statistics (bool | None): When ``True``, usage statistics will be returned together
with stream settings. Computing statistics can be expensive.

Returns:
Stream | None: The stream metadata (and optionally statistics), or ``None`` if not found.

Examples:

Retrieve a stream by external ID:

>>> from cognite.client import CogniteClient
>>> client = CogniteClient()
>>> # async_client = AsyncCogniteClient() # another option
>>> res = client.data_modeling.streams.retrieve("my-stream")

Retrieve a stream with usage statistics:

>>> res = client.data_modeling.streams.retrieve(
... "my-stream",
... include_statistics=True,
... )
"""
return run_sync(
self.__async_client.data_modeling.streams.retrieve(
external_id=external_id, include_statistics=include_statistics
)
)

def delete(self, external_id: str | SequenceNotStr[str]) -> None:
"""
`Delete streams <https://api-docs.cognite.com/20230101/tag/Streams/operation/deleteStreams>`_.

Note:
Deletion is a soft delete that retains capacity for an extended period;
prefer deleting only when necessary.

Args:
external_id (str | SequenceNotStr[str]): External ID or list of external IDs of
streams to delete.

Examples:

Delete a single stream:

>>> from cognite.client import CogniteClient
>>> client = CogniteClient()
>>> # async_client = AsyncCogniteClient() # another option
>>> client.data_modeling.streams.delete("my-stream")

Delete multiple streams:

>>> client.data_modeling.streams.delete(["stream-a", "stream-b"])
"""
return run_sync(self.__async_client.data_modeling.streams.delete(external_id=external_id))
Loading
Loading