From 816f12324d716b9eca230e25376df537f5a35dad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Anders=20=C3=98en=20Fylling?= Date: Fri, 8 May 2026 13:17:02 +0200 Subject: [PATCH 1/7] docs(records): add implementation plan for Records API Co-Authored-By: Claude Sonnet 4.6 --- PLAN.md | 284 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 284 insertions(+) create mode 100644 PLAN.md diff --git a/PLAN.md b/PLAN.md new file mode 100644 index 0000000000..be307ac4f6 --- /dev/null +++ b/PLAN.md @@ -0,0 +1,284 @@ +# Records API — Implementation Plan + +## Confirmed decisions + +| Decision | Choice | +|---|---| +| Client path | `client.data_modeling.records` | +| Preview warning | `api_maturity="General Availability", sdk_maturity="alpha"` | +| Aggregate DTOs | Full typed tree (recursive) | +| `sync()` shape | `RecordSyncIterator` — iterable, exposes `.cursor` after drain | +| Filter DSL | Reuse `Filter` from `data_classes/filters.py` | +| List method name | `list()` | +| Delete params | `Record | RecordWrite | Sequence[Record | RecordWrite]` | + +--- + +## Files to create (2) + +| File | Contents | +|---|---| +| `cognite/client/data_classes/data_modeling/records.py` | All DTOs | +| `cognite/client/_api/data_modeling/records.py` | `RecordsAPI` | + +## Files to modify (~14) + +| File | Change | +|---|---| +| `cognite/client/data_classes/data_modeling/__init__.py` | Re-export all public DTOs | +| `cognite/client/_api/data_modeling/__init__.py` | Instantiate `RecordsAPI`, add to `DataModelingAPI` | +| `cognite/client/_cognite_client.py` | Docs-time import in `BUILD_COGNITE_SDK_DOCS` block | +| `cognite/client/testing.py` | Mock async + sync `RecordsAPI` (×2) | +| `cognite/client/utils/_url.py` | Add ingest + upsert paths to `NON_RETRYABLE_CREATE_DELETE_RESOURCE_PATHS` | +| `tests/tests_unit/test_api/test_data_modeling/test_records.py` | **New** — unit tests | +| `tests/tests_unit/test_docstring_examples.py` | Register doctest module | +| `tests/tests_unit/test_api_client.py` | Add retry classification entries | +| `docs/source/data_modeling.rst` | `autosummary` + `automodule` sections | +| `cognite/client/_sync_api/data_modeling/records.py` | **Generated** by sync codegen (do not hand-edit) | + +--- + +## DTO design (`data_classes/data_modeling/records.py`) + +### Write-side + +```python +class RecordSourceReference(CogniteObject): # {type, space, externalId} +class RecordSource(CogniteObject): # {source: RecordSourceReference, properties: dict} +class RecordWrite(CogniteResource): # {space, externalId, sources: list[RecordSource]} +class RecordWriteList(CogniteResourceList[RecordWrite]) +``` + +### Read-side + +```python +class Record(WriteableCogniteResource[RecordWrite]): + # {space, externalId, createdTime, lastUpdatedTime, properties: {space: {container: {prop: val}}}} + # as_write() reconstructs RecordWrite from nested properties +class RecordList(WriteableCogniteResourceList[RecordWrite, Record]) +``` + +### Sync-side + +```python +class SyncRecord(Record): # adds status: Literal["created","updated","deleted"] +class SyncRecordList(CogniteResourceList[SyncRecord]) +class RecordSyncIterator: # wraps generator, exposes .cursor after drain +``` + +### Shared request helpers + +```python +class TimeRange(CogniteObject): # gte/gt/lte/lt (str|int) +class SourceSelector(CogniteObject): # {source: RecordSourceReference, properties: list[str]} +class SortSpec(CogniteObject): # {property: list[str], direction} +class TargetUnits(CogniteObject): # unit conversion (unitSystemName or properties list) +``` + +### Aggregate DTO tree (full typed, ~150 lines) + +```python +# Metric aggregates (scalar result): +class AvgAggregate, CountAggregate, MinAggregate, MaxAggregate, SumAggregate + +# Bucket aggregates (recursive sub-aggregates): +class UniqueValuesAggregate # property, aggregates? +class TimeHistogramAggregate # property, calendarInterval|fixedInterval, hardBounds?, aggregates? +class NumberHistogramAggregate # property, interval, hardBounds?, aggregates? +class FiltersAggregate # filters: list[Filter], aggregates? +class NumberHistogramHardBounds # min?, max? + +# TypeAlias — avoids ABC overhead: +AggregateSpec: TypeAlias = AvgAggregate | CountAggregate | ... | FiltersAggregate +AggregateResult: TypeAlias = dict[str, Any] # parsed response bucket/scalar +``` + +--- + +## API class — method signatures (`_api/data_modeling/records.py`) + +```python +class RecordsAPI(APIClient): + # No class-level _RESOURCE_PATH — parameterized per call: f"/streams/{stream_id}/records" + # FeaturePreviewWarning(api_maturity="General Availability", sdk_maturity="alpha", feature_name="Records") + + async def ingest(self, stream_id: str, items: RecordWrite | Sequence[RecordWrite]) -> None: + # POST /streams/{id}/records — 202 empty body + # Chunks in 1000 using split_into_chunks + self._post (can't use _create_multiple — no response body) + + async def upsert(self, stream_id: str, items: RecordWrite | Sequence[RecordWrite]) -> None: + # POST /streams/{id}/records/upsert — 202 empty body, mutable only + # Same chunking approach + + async def delete(self, stream_id: str, items: Record | RecordWrite | Sequence[Record | RecordWrite]) -> None: + # POST /streams/{id}/records/delete — 200 empty body, mutable only + # Extract space+externalId from items; chunk in 1000 + + async def list( + self, + stream_id: str, + *, + last_updated_time: TimeRange | None = None, + filter: Filter | None = None, + sources: Sequence[SourceSelector] | None = None, + sort: Sequence[SortSpec] | None = None, + limit: int | None = 25, + target_units: TargetUnits | None = None, + include_typing: bool = False, + ) -> RecordList: + # POST /streams/{id}/records/filter — single page, no cursor + + def sync( + self, + stream_id: str, + *, + cursor: str | None = None, + initialize_cursor: str | None = None, + filter: Filter | None = None, + sources: Sequence[SourceSelector] | None = None, + limit: int | None = None, + target_units: TargetUnits | None = None, + include_typing: bool = False, + ) -> RecordSyncIterator: + # Returns RecordSyncIterator (iterable over SyncRecord, has .cursor attribute) + # Internally loops while hasNext=True, then exhausts + + async def aggregate( + self, + stream_id: str, + aggregates: dict[str, AggregateSpec], + *, + last_updated_time: TimeRange | None = None, + filter: Filter | None = None, + target_units: TargetUnits | None = None, + include_typing: bool = False, + ) -> dict[str, AggregateResult]: + # POST /streams/{id}/records/aggregate +``` + +--- + +## Tricky implementation notes + +1. **`_RESOURCE_PATH` is not a class constant** — every method builds `f"/streams/{stream_id}/records"` inline. This means we don't get `_create_multiple`'s auto-chunking. Use `split_into_chunks` from `cognite/client/utils/_auxiliary.py` + `self._post`. + +2. **Ingest/upsert/delete all return empty bodies** — return `None`. The SDK's normal helpers (`_create_multiple`, `_delete_multiple`) expect response bodies. Use `self._post` directly. + +3. **Delete is POST, not HTTP DELETE** — `_delete_multiple` sends HTTP `DELETE`. For records, it's `POST .../delete`. Use `self._post` directly. + +4. **`NON_RETRYABLE_CREATE_DELETE_RESOURCE_PATHS`** — needs entries for paths matching `.*/streams/.*/records$` and `.*/streams/.*/records/upsert$`. Check the existing entry format (streams uses a prefix string; may need regex or a wildcard). + +5. **`RecordSyncIterator.cursor`** — the sync loop consumes pages until `hasNext=False`, then sets `self.cursor = last_cursor`. Caller saves it for the next call. + +6. **`as_write()` on `Record`** — reconstructs `RecordWrite` from `properties: {space: {container: {prop: val}}}` by iterating the nested structure and building `sources: [{source: {type, space, externalId}, properties: {prop: val}}]`. + +7. **`SyncRecord.properties` can be `None`** for `status="deleted"` tombstones — override `_load` to handle missing key. + +8. **Aggregate response** is `{aggregates: {id: bucket_or_metric_result}}` — the result dict value type varies by aggregate type. `AggregateResult: TypeAlias = dict[str, Any]` is the pragmatic call since each bucket type has a completely different shape. + +--- + +## Iteration order (lessons from streams PR #2534) + +1. DTOs first — get types right before API methods +2. API class using `self._post` from the start — no helpers to retrofit later +3. Docstrings + `>>>` examples before unit tests +4. Wiring (DataModelingAPI, `_cognite_client.py`, `testing.py`, `data_modeling.rst`, doctest registration) all together +5. Unit tests +6. Sync codegen **last** — only after all async signatures are stable +7. `NON_RETRYABLE_CREATE_DELETE_RESOURCE_PATHS` + `test_api_client.py` entry + +--- + +## API reference (cleaned from OpenAPI spec) + +### Shared concepts + +- **Path prefix:** `/api/20230101/projects/{project}/streams/{streamId}/records/...` +- **ACLs:** Write endpoints need `StreamRecordsAcl:WRITE`; read endpoints need `StreamRecordsAcl:READ` + `DataModelsAcl:READ` +- **Record identity:** `space` + `externalId` pair +- **Immutable vs mutable streams:** + - Immutable: `lastUpdatedTime` range **required** on list/aggregate; upsert and delete are not allowed (422) + - Mutable: duplicate `space+externalId` within a batch is rejected (422) +- **Property path format:** 3-element array `[space, containerExternalId, propertyId]`; top-level fields use 1-element arrays +- **`sources` selector:** `[{source: {type: "container", space, externalId}, properties: ["*" | propId, ...]}]` (1–10 items) +- **Partial success:** Ingest/upsert/delete are non-transactional; `error.partial` lists per-record outcomes on 500/503 +- **Response size limit:** 20 MB max for list responses + +### 1. Ingest (`POST /streams/{id}/records`) + +- **Request:** `{items: array[1..1000]}` where each item has `space`, `externalId`, `sources` +- **Response:** 202 empty body +- **409:** Some mutable records already exist — `error.partial` lists per-record outcomes +- **422:** Duplicate `space+externalId` pairs in request (mutable), or request > 10 MB +- **Immutable deduplication:** Records where all fields are identical are deduplicated (eventually consistent) + +### 2. Upsert (`POST /streams/{id}/records/upsert`) + +- **Request:** Identical schema to ingest +- **Response:** 202 empty body +- **Mutable only:** 422 if attempted on immutable stream + +### 3. Delete (`POST /streams/{id}/records/delete`) + +- **Request:** `{items: array[1..1000]}` where each item has `space`, `externalId` +- **Response:** 200 empty body +- **Mutable only:** 422 if attempted on immutable stream +- **`ignoreUnknownIds=true` implicit:** Unknown IDs do not cause failure +- **Tombstones on sync:** Deleted records appear in sync with `status: "deleted"` for at least 3 days + +### 4. List/filter (`POST /streams/{id}/records/filter`) + +- **Request:** + + | Field | Type | Required | Notes | + |---|---|---|---| + | `lastUpdatedTime` | TimeRange | conditional | Required for immutable streams; needs at least one lower bound | + | `filter` | Filter DSL | no | max 100 nodes, depth 10 | + | `sources` | array[1..10] | no | Which container properties to return | + | `sort` | array[1..5] | no | `{property, direction?}` | + | `limit` | int | no | 1–1000, default 10 | + | `targetUnits` | object | no | Unit conversion | + | `includeTyping` | bool | no | | + +- **Response:** `{items: array[Record], typing?: {...}}` +- **No cursor** — pagination requires chunking by `lastUpdatedTime` range manually +- **Filter DSL operators:** `and`, `or`, `not`, `matchAll`, `equals`, `range`, `in`, `prefix`, `exists`, `hasData`, `containsAll`, `containsAny` + +### 5. Sync (`POST /streams/{id}/records/sync`) + +- **Request:** + + | Field | Type | Required | Notes | + |---|---|---|---| + | `cursor` | string | conditional | Resume from previous cursor | + | `initializeCursor` | string | conditional | Starting point if no cursor (e.g. `"7d-ago"`, `"400h-ago"`) | + | `filter` | Filter DSL | no | | + | `sources` | array[1..10] | no | | + | `limit` | int | no | 1–1000, default 10 | + | `targetUnits` | object | no | | + | `includeTyping` | bool | no | | + +- **Response:** `{items: array[SyncRecord], nextCursor: string, hasNext: bool, typing?: {...}}` +- **`hasNext=true`:** More records available — keep polling with `nextCursor` +- **SyncRecord** has additional `status: "created" | "updated" | "deleted"`; `properties` is absent for deleted records + +### 6. Aggregate (`POST /streams/{id}/records/aggregate`) + +- **Request:** + + | Field | Type | Required | Notes | + |---|---|---|---| + | `aggregates` | dict[id, AggregateSpec] | yes | 1–5 top-level entries | + | `lastUpdatedTime` | TimeRange | conditional | Required for immutable streams | + | `filter` | Filter DSL | no | | + | `targetUnits` | object | no | | + | `includeTyping` | bool | no | | + +- **Metric aggregates:** `avg`, `count`, `min`, `max`, `sum` — each takes `{property: [space, container, propId]}` +- **Bucket aggregates:** `uniqueValues`, `timeHistogram`, `numberHistogram`, `filters` — each supports nested `aggregates` (recursive) +- **`timeHistogram`:** requires `calendarInterval` (`"1s"`, `"1m"`, `"1h"`, `"1d"`, `"1w"`, `"1M"`, `"1q"`, `"1y"`) OR `fixedInterval` (e.g. `"42m"`) +- **`numberHistogram`:** requires `interval: float` +- **`filters`:** `{filters: array[1..10 of Filter], aggregates?}` — max 30 filter buckets total +- **Aggregate ID constraints:** no `.`, not `_count` or `_bucket_count`, pattern `^[^\[\]>.]{1,255}$` +- **Response:** `{aggregates: {id: result}}` where result is a scalar or bucket array depending on type From ddd7277523c57848f06b876b6b06361dc7f0076d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Anders=20=C3=98en=20Fylling?= Date: Fri, 8 May 2026 13:54:17 +0200 Subject: [PATCH 2/7] feat(records): add SDK support for the Records API Implements all 6 endpoints (ingest, upsert, delete, list, sync, aggregate) for cognite.client.data_modeling.records with full typed DTOs, async generator for sync(), chunked writes, and the complete 16-step wiring (testing mocks, docs, doctest registration, retry classification, sync codegen). Co-Authored-By: Claude Sonnet 4.6 --- cognite/client/_api/data_modeling/__init__.py | 2 + cognite/client/_api/data_modeling/records.py | 412 ++++++++++ .../_sync_api/data_modeling/__init__.py | 17 +- .../client/_sync_api/data_modeling/records.py | 346 +++++++++ .../data_classes/data_modeling/__init__.py | 52 ++ .../data_classes/data_modeling/records.py | 725 ++++++++++++++++++ cognite/client/testing.py | 6 + cognite/client/utils/_url.py | 2 + docs/source/data_modeling.rst | 17 + pyproject.toml | 3 + .../test_data_modeling/test_records.py | 321 ++++++++ tests/tests_unit/test_api_client.py | 7 + tests/tests_unit/test_docstring_examples.py | 2 + 13 files changed, 1911 insertions(+), 1 deletion(-) create mode 100644 cognite/client/_api/data_modeling/records.py create mode 100644 cognite/client/_sync_api/data_modeling/records.py create mode 100644 cognite/client/data_classes/data_modeling/records.py create mode 100644 tests/tests_unit/test_api/test_data_modeling/test_records.py diff --git a/cognite/client/_api/data_modeling/__init__.py b/cognite/client/_api/data_modeling/__init__.py index 32313a417e..200cf033d0 100644 --- a/cognite/client/_api/data_modeling/__init__.py +++ b/cognite/client/_api/data_modeling/__init__.py @@ -7,6 +7,7 @@ from cognite.client._api.data_modeling.data_models import DataModelsAPI from cognite.client._api.data_modeling.graphql import DataModelingGraphQLAPI from cognite.client._api.data_modeling.instances import InstancesAPI +from cognite.client._api.data_modeling.records import RecordsAPI from cognite.client._api.data_modeling.spaces import SpacesAPI from cognite.client._api.data_modeling.statistics import StatisticsAPI from cognite.client._api.data_modeling.streams import StreamsAPI @@ -28,6 +29,7 @@ def __init__(self, config: ClientConfig, api_version: str | None, cognite_client self.instances = InstancesAPI(config, api_version, cognite_client) self.graphql = DataModelingGraphQLAPI(config, api_version, cognite_client) self.statistics = StatisticsAPI(config, api_version, cognite_client) + self.records = RecordsAPI(config, api_version, cognite_client) self.streams = StreamsAPI(config, api_version, cognite_client) def _get_semaphore( diff --git a/cognite/client/_api/data_modeling/records.py b/cognite/client/_api/data_modeling/records.py new file mode 100644 index 0000000000..27f9bb34a2 --- /dev/null +++ b/cognite/client/_api/data_modeling/records.py @@ -0,0 +1,412 @@ +from __future__ import annotations + +from collections.abc import AsyncIterator, Sequence +from typing import TYPE_CHECKING, Any + +from cognite.client._api_client import APIClient +from cognite.client.data_classes.data_modeling.records import ( + AggregateResult, + AggregateSpec, + Record, + RecordList, + RecordSortSpec, + RecordSourceSelector, + RecordWrite, + SyncRecord, + TargetUnits, + TimeRange, +) +from cognite.client.data_classes.filters import Filter +from cognite.client.utils._auxiliary import split_into_chunks +from cognite.client.utils._experimental import FeaturePreviewWarning + +if TYPE_CHECKING: + from cognite.client import AsyncCogniteClient + from cognite.client.config import ClientConfig + +_INGEST_LIMIT = 1000 + + +class RecordsAPI(APIClient): + """API for reading and writing records in a stream. + + Records are stored in a stream and their schema is defined by the containers + referenced as sources. Access this API via ``client.data_modeling.records``. + """ + + def __init__(self, config: ClientConfig, api_version: str | None, cognite_client: AsyncCogniteClient) -> None: + super().__init__(config, api_version, cognite_client) + self._warning = FeaturePreviewWarning( + api_maturity="General Availability", sdk_maturity="alpha", feature_name="Records" + ) + + def _records_url(self, stream_id: str, suffix: str = "") -> str: + return f"/streams/{stream_id}/records{suffix}" + + async def ingest(self, stream_id: str, items: RecordWrite | Sequence[RecordWrite]) -> None: + """`Ingest records into a stream `_. + + Creates new records. For immutable streams, duplicate records (identical + ``space``, ``externalId``, and all property values) are silently discarded. + For mutable streams, duplicate ``space + externalId`` within a single batch + returns a 422. + + Each request accepts up to 1 000 records; larger lists are chunked automatically. + + Args: + stream_id (str): External ID of the stream to ingest into. + items (RecordWrite | Sequence[RecordWrite]): One or more records to ingest. + + Examples: + + Ingest a single record: + + >>> from cognite.client import CogniteClient + >>> from cognite.client.data_classes.data_modeling.records import ( + ... RecordWrite, + ... RecordSource, + ... RecordSourceReference, + ... ) + >>> client = CogniteClient() + >>> client.data_modeling.records.ingest( + ... stream_id="my-stream", + ... items=RecordWrite( + ... space="my-space", + ... external_id="rec-1", + ... sources=[ + ... RecordSource( + ... source=RecordSourceReference( + ... space="my-space", external_id="my-container" + ... ), + ... properties={"temperature": 22.5}, + ... ) + ... ], + ... ), + ... ) + """ + self._warning.warn() + item_list: list[RecordWrite] = [items] if isinstance(items, RecordWrite) else list(items) + semaphore = self._get_semaphore("write") + for chunk in split_into_chunks(item_list, _INGEST_LIMIT): + await self._post( + url_path=self._records_url(stream_id), + json={"items": [r.dump() for r in chunk]}, + semaphore=semaphore, + ) + + async def upsert(self, stream_id: str, items: RecordWrite | Sequence[RecordWrite]) -> None: + """`Upsert records into a stream `_. + + Creates or updates records. Only valid for mutable streams (returns 422 on + immutable). Existing records matching ``space + externalId`` are updated at + the property level. + + Each request accepts up to 1 000 records; larger lists are chunked automatically. + + Args: + stream_id (str): External ID of the stream to upsert into. + items (RecordWrite | Sequence[RecordWrite]): One or more records to upsert. + + Examples: + + Upsert a single record: + + >>> from cognite.client import CogniteClient + >>> from cognite.client.data_classes.data_modeling.records import ( + ... RecordWrite, + ... RecordSource, + ... RecordSourceReference, + ... ) + >>> client = CogniteClient() + >>> client.data_modeling.records.upsert( + ... stream_id="my-stream", + ... items=RecordWrite( + ... space="my-space", + ... external_id="rec-1", + ... sources=[ + ... RecordSource( + ... source=RecordSourceReference( + ... space="my-space", external_id="my-container" + ... ), + ... properties={"temperature": 23.0}, + ... ) + ... ], + ... ), + ... ) + """ + self._warning.warn() + item_list: list[RecordWrite] = [items] if isinstance(items, RecordWrite) else list(items) + semaphore = self._get_semaphore("write") + for chunk in split_into_chunks(item_list, _INGEST_LIMIT): + await self._post( + url_path=self._records_url(stream_id, "/upsert"), + json={"items": [r.dump() for r in chunk]}, + semaphore=semaphore, + ) + + async def delete( + self, + stream_id: str, + items: Record | RecordWrite | Sequence[Record | RecordWrite], + ) -> None: + """`Delete records from a stream `_. + + Only valid for mutable streams (returns 422 on immutable). Unknown + ``space + externalId`` pairs are silently ignored. + + Each request accepts up to 1 000 identifiers; larger lists are chunked automatically. + + Args: + stream_id (str): External ID of the stream to delete from. + items (Record | RecordWrite | Sequence[Record | RecordWrite]): Records to delete. + Only ``space`` and ``external_id`` are used; other fields are ignored. + + Examples: + + Delete records by external ID: + + >>> from cognite.client import CogniteClient + >>> from cognite.client.data_classes.data_modeling.records import RecordWrite + >>> client = CogniteClient() + >>> client.data_modeling.records.delete( + ... stream_id="my-stream", + ... items=[ + ... RecordWrite(space="my-space", external_id="rec-1", sources=[]), + ... RecordWrite(space="my-space", external_id="rec-2", sources=[]), + ... ], + ... ) + """ + self._warning.warn() + item_list: list[Record | RecordWrite] = [items] if isinstance(items, (Record, RecordWrite)) else list(items) + semaphore = self._get_semaphore("delete") + for chunk in split_into_chunks(item_list, _INGEST_LIMIT): + await self._post( + url_path=self._records_url(stream_id, "/delete"), + json={"items": [{"space": r.space, "externalId": r.external_id} for r in chunk]}, + semaphore=semaphore, + ) + + async def list( + self, + stream_id: str, + *, + last_updated_time: TimeRange | None = None, + filter: Filter | None = None, + sources: Sequence[RecordSourceSelector] | None = None, + sort: Sequence[RecordSortSpec] | None = None, + limit: int | None = 25, + target_units: TargetUnits | None = None, + include_typing: bool = False, + ) -> RecordList: + """`List records from a stream `_. + + Returns records matching the given filters. For immutable streams, ``last_updated_time`` + with at least one lower bound is required. The endpoint has no cursor — to page over + large time windows, issue multiple calls with partitioned ``last_updated_time`` ranges. + + Args: + stream_id (str): External ID of the stream to query. + last_updated_time (TimeRange | None): Filter records by last-updated time. + **Required** for immutable streams (must include a lower bound). + filter (Filter | None): Filter expression (see :mod:`cognite.client.data_classes.filters`). + sources (Sequence[RecordSourceSelector] | None): Which container properties to return. + sort (Sequence[RecordSortSpec] | None): Sort specifications (up to 5). + limit (int | None): Maximum number of records to return (1-1000). Defaults to 25. + target_units (TargetUnits | None): Unit conversion to apply to numeric properties. + include_typing (bool): Include property type metadata in the response. + + Returns: + RecordList: Matching records. + + Examples: + + List all records updated in the last hour: + + >>> import time + >>> from cognite.client import CogniteClient + >>> from cognite.client.data_classes.data_modeling.records import TimeRange + >>> client = CogniteClient() + >>> now_ms = int(time.time() * 1000) + >>> res = client.data_modeling.records.list( + ... stream_id="my-stream", + ... last_updated_time=TimeRange(gte=now_ms - 3_600_000), + ... limit=100, + ... ) + """ + self._warning.warn() + body: dict[str, Any] = {} + if last_updated_time is not None: + body["lastUpdatedTime"] = last_updated_time.dump() + if filter is not None: + body["filter"] = filter.dump() + if sources is not None: + body["sources"] = [s.dump() for s in sources] + if sort is not None: + body["sort"] = [s.dump() for s in sort] + if limit is not None: + body["limit"] = limit + if target_units is not None: + body["targetUnits"] = target_units.dump() + if include_typing: + body["includeTyping"] = True + + res = await self._post( + url_path=self._records_url(stream_id, "/filter"), + json=body, + semaphore=self._get_semaphore("read"), + ) + return RecordList._load(res.json()["items"]) + + async def sync( + self, + stream_id: str, + *, + cursor: str | None = None, + initialize_cursor: str | None = None, + filter: Filter | None = None, + sources: Sequence[RecordSourceSelector] | None = None, + limit: int | None = None, + target_units: TargetUnits | None = None, + include_typing: bool = False, + ) -> AsyncIterator[SyncRecord]: + """`Sync records from a stream `_. + + Returns a change feed of new, updated, and deleted records. Provide either + ``cursor`` (to resume a previous sync position) or ``initialize_cursor`` + (to start from a relative time, e.g. ``"7d-ago"``). The generator drains all + available pages (following ``hasNext``) then stops — call ``sync()`` again with + a new ``cursor`` to poll for subsequent changes. + + Args: + stream_id (str): External ID of the stream to sync. + cursor (str | None): Resume from a previous sync cursor. + initialize_cursor (str | None): Starting point if no cursor (e.g. ``"7d-ago"``). + Ignored when ``cursor`` is set. + filter (Filter | None): Filter expression. + sources (Sequence[RecordSourceSelector] | None): Which container properties to return. + limit (int | None): Maximum records per API call (1-1000). + target_units (TargetUnits | None): Unit conversion for numeric properties. + include_typing (bool): Include property type metadata. + + Yields: + SyncRecord: Records in change order. Deleted records have ``properties=None``. + + Examples: + + Sync all records changed in the last 7 days: + + >>> from cognite.client import CogniteClient + >>> client = CogniteClient() + >>> for record in client.data_modeling.records.sync( + ... stream_id="my-stream", + ... initialize_cursor="7d-ago", + ... ): + ... pass # process record + """ + self._warning.warn() + body: dict[str, Any] = {} + if cursor is not None: + body["cursor"] = cursor + elif initialize_cursor is not None: + body["initializeCursor"] = initialize_cursor + if filter is not None: + body["filter"] = filter.dump() + if sources is not None: + body["sources"] = [s.dump() for s in sources] + if limit is not None: + body["limit"] = limit + if target_units is not None: + body["targetUnits"] = target_units.dump() + if include_typing: + body["includeTyping"] = True + + semaphore = self._get_semaphore("read") + while True: + res = await self._post( + url_path=self._records_url(stream_id, "/sync"), + json=body, + semaphore=semaphore, + ) + data = res.json() + for item in data["items"]: + yield SyncRecord._load(item) + body = {"cursor": data["nextCursor"]} + if not data["hasNext"]: + break + + async def aggregate( + self, + stream_id: str, + aggregates: dict[str, AggregateSpec], + *, + last_updated_time: TimeRange | None = None, + filter: Filter | None = None, + target_units: TargetUnits | None = None, + include_typing: bool = False, + ) -> dict[str, AggregateResult]: + """`Aggregate records from a stream `_. + + Compute metrics (avg, count, min, max, sum) and bucket aggregations + (unique values, time histogram, number histogram, filters) over records. + Bucket aggregates support nested sub-aggregates. + + For immutable streams, ``last_updated_time`` with at least one lower bound is required. + + Args: + stream_id (str): External ID of the stream to aggregate. + aggregates (dict[str, AggregateSpec]): Mapping of user-defined aggregate IDs + to aggregate specifications (1-5 top-level entries). IDs must not contain + ``"."`` and cannot be ``"_count"`` or ``"_bucket_count"``. + last_updated_time (TimeRange | None): Time range filter. **Required** for immutable streams. + filter (Filter | None): Filter expression applied before aggregating. + target_units (TargetUnits | None): Unit conversion for numeric properties. + include_typing (bool): Include property type metadata in the response. + + Returns: + dict[str, AggregateResult]: Mapping of aggregate IDs to their results. + Metric aggregates return a ``{"avg": value}``-style dict; bucket aggregates + return lists of bucket objects. + + Examples: + + Count records and compute average temperature: + + >>> from cognite.client import CogniteClient + >>> from cognite.client.data_classes.data_modeling.records import ( + ... AvgAggregate, + ... CountAggregate, + ... TimeRange, + ... ) + >>> client = CogniteClient() + >>> res = client.data_modeling.records.aggregate( + ... stream_id="my-stream", + ... aggregates={ + ... "total": CountAggregate( + ... property=["my-space", "my-container", "temperature"] + ... ), + ... "avg_temp": AvgAggregate( + ... property=["my-space", "my-container", "temperature"] + ... ), + ... }, + ... last_updated_time=TimeRange(gte="2024-01-01T00:00:00Z"), + ... ) + """ + self._warning.warn() + body: dict[str, Any] = { + "aggregates": {k: v.dump() for k, v in aggregates.items()}, + } + if last_updated_time is not None: + body["lastUpdatedTime"] = last_updated_time.dump() + if filter is not None: + body["filter"] = filter.dump() + if target_units is not None: + body["targetUnits"] = target_units.dump() + if include_typing: + body["includeTyping"] = True + + res = await self._post( + url_path=self._records_url(stream_id, "/aggregate"), + json=body, + semaphore=self._get_semaphore("read"), + ) + return res.json()["aggregates"] diff --git a/cognite/client/_sync_api/data_modeling/__init__.py b/cognite/client/_sync_api/data_modeling/__init__.py index f0cbe9e923..fca375ba70 100644 --- a/cognite/client/_sync_api/data_modeling/__init__.py +++ b/cognite/client/_sync_api/data_modeling/__init__.py @@ -1,22 +1,36 @@ """ =============================================================================== -584030bc5e2a4b8168f54c101f7f521d +9c0a8f4ac6e240299c816cc09cacd693 This file is auto-generated from the Async API modules, - do not edit manually! =============================================================================== """ from __future__ import annotations +import asyncio +from collections.abc import Coroutine, Iterator +from typing import TYPE_CHECKING, Any, Literal, overload + from cognite.client import AsyncCogniteClient +from cognite.client._api_client import APIClient from cognite.client._sync_api.data_modeling.containers import SyncContainersAPI from cognite.client._sync_api.data_modeling.data_models import SyncDataModelsAPI from cognite.client._sync_api.data_modeling.graphql import SyncDataModelingGraphQLAPI from cognite.client._sync_api.data_modeling.instances import SyncInstancesAPI +from cognite.client._sync_api.data_modeling.records import SyncRecordsAPI from cognite.client._sync_api.data_modeling.spaces import SyncSpacesAPI from cognite.client._sync_api.data_modeling.statistics import SyncStatisticsAPI from cognite.client._sync_api.data_modeling.streams import SyncStreamsAPI from cognite.client._sync_api.data_modeling.views import SyncViewsAPI from cognite.client._sync_api_client import SyncAPIClient +from cognite.client.utils._async_helpers import SyncIterator, run_sync +from cognite.client.utils._concurrency import _get_event_loop_executor + +if TYPE_CHECKING: + import pandas as pd + + from cognite.client import AsyncCogniteClient +from cognite.client.config import ClientConfig class SyncDataModelingAPI(SyncAPIClient): @@ -31,4 +45,5 @@ def __init__(self, async_client: AsyncCogniteClient) -> None: self.instances = SyncInstancesAPI(async_client) self.graphql = SyncDataModelingGraphQLAPI(async_client) self.statistics = SyncStatisticsAPI(async_client) + self.records = SyncRecordsAPI(async_client) self.streams = SyncStreamsAPI(async_client) diff --git a/cognite/client/_sync_api/data_modeling/records.py b/cognite/client/_sync_api/data_modeling/records.py new file mode 100644 index 0000000000..a83670c512 --- /dev/null +++ b/cognite/client/_sync_api/data_modeling/records.py @@ -0,0 +1,346 @@ +""" +=============================================================================== +2058e4bb661c1caf5ebdf1598c7ddc36 +This file is auto-generated from the Async API modules, - do not edit manually! +=============================================================================== +""" + +from __future__ import annotations + +from collections.abc import Iterator, Sequence +from typing import TYPE_CHECKING + +from cognite.client import AsyncCogniteClient +from cognite.client._sync_api_client import SyncAPIClient +from cognite.client.data_classes.data_modeling.records import ( + AggregateResult, + AggregateSpec, + Record, + RecordList, + RecordSortSpec, + RecordSourceSelector, + RecordWrite, + SyncRecord, + TargetUnits, + TimeRange, +) +from cognite.client.data_classes.filters import Filter +from cognite.client.utils._async_helpers import SyncIterator, run_sync + +if TYPE_CHECKING: + from cognite.client import AsyncCogniteClient + + +class SyncRecordsAPI(SyncAPIClient): + """Auto-generated, do not modify manually.""" + + def __init__(self, async_client: AsyncCogniteClient) -> None: + self.__async_client = async_client + + def ingest(self, stream_id: str, items: RecordWrite | Sequence[RecordWrite]) -> None: + """ + `Ingest records into a stream `_. + + Creates new records. For immutable streams, duplicate records (identical + ``space``, ``externalId``, and all property values) are silently discarded. + For mutable streams, duplicate ``space + externalId`` within a single batch + returns a 422. + + Each request accepts up to 1 000 records; larger lists are chunked automatically. + + Args: + stream_id (str): External ID of the stream to ingest into. + items (RecordWrite | Sequence[RecordWrite]): One or more records to ingest. + + Examples: + + Ingest a single record: + + >>> from cognite.client import CogniteClient + >>> from cognite.client.data_classes.data_modeling.records import ( + ... RecordWrite, + ... RecordSource, + ... RecordSourceReference, + ... ) + >>> client = CogniteClient() + >>> client.data_modeling.records.ingest( + ... stream_id="my-stream", + ... items=RecordWrite( + ... space="my-space", + ... external_id="rec-1", + ... sources=[ + ... RecordSource( + ... source=RecordSourceReference( + ... space="my-space", external_id="my-container" + ... ), + ... properties={"temperature": 22.5}, + ... ) + ... ], + ... ), + ... ) + """ + return run_sync(self.__async_client.data_modeling.records.ingest(stream_id=stream_id, items=items)) + + def upsert(self, stream_id: str, items: RecordWrite | Sequence[RecordWrite]) -> None: + """ + `Upsert records into a stream `_. + + Creates or updates records. Only valid for mutable streams (returns 422 on + immutable). Existing records matching ``space + externalId`` are updated at + the property level. + + Each request accepts up to 1 000 records; larger lists are chunked automatically. + + Args: + stream_id (str): External ID of the stream to upsert into. + items (RecordWrite | Sequence[RecordWrite]): One or more records to upsert. + + Examples: + + Upsert a single record: + + >>> from cognite.client import CogniteClient + >>> from cognite.client.data_classes.data_modeling.records import ( + ... RecordWrite, + ... RecordSource, + ... RecordSourceReference, + ... ) + >>> client = CogniteClient() + >>> client.data_modeling.records.upsert( + ... stream_id="my-stream", + ... items=RecordWrite( + ... space="my-space", + ... external_id="rec-1", + ... sources=[ + ... RecordSource( + ... source=RecordSourceReference( + ... space="my-space", external_id="my-container" + ... ), + ... properties={"temperature": 23.0}, + ... ) + ... ], + ... ), + ... ) + """ + return run_sync(self.__async_client.data_modeling.records.upsert(stream_id=stream_id, items=items)) + + def delete(self, stream_id: str, items: Record | RecordWrite | Sequence[Record | RecordWrite]) -> None: + """ + `Delete records from a stream `_. + + Only valid for mutable streams (returns 422 on immutable). Unknown + ``space + externalId`` pairs are silently ignored. + + Each request accepts up to 1 000 identifiers; larger lists are chunked automatically. + + Args: + stream_id (str): External ID of the stream to delete from. + items (Record | RecordWrite | Sequence[Record | RecordWrite]): Records to delete. + Only ``space`` and ``external_id`` are used; other fields are ignored. + + Examples: + + Delete records by external ID: + + >>> from cognite.client import CogniteClient + >>> from cognite.client.data_classes.data_modeling.records import RecordWrite + >>> client = CogniteClient() + >>> client.data_modeling.records.delete( + ... stream_id="my-stream", + ... items=[ + ... RecordWrite(space="my-space", external_id="rec-1", sources=[]), + ... RecordWrite(space="my-space", external_id="rec-2", sources=[]), + ... ], + ... ) + """ + return run_sync(self.__async_client.data_modeling.records.delete(stream_id=stream_id, items=items)) + + def list( + self, + stream_id: str, + *, + last_updated_time: TimeRange | None = None, + filter: Filter | None = None, + sources: Sequence[RecordSourceSelector] | None = None, + sort: Sequence[RecordSortSpec] | None = None, + limit: int | None = 25, + target_units: TargetUnits | None = None, + include_typing: bool = False, + ) -> RecordList: + """ + `List records from a stream `_. + + Returns records matching the given filters. For immutable streams, ``last_updated_time`` + with at least one lower bound is required. The endpoint has no cursor — to page over + large time windows, issue multiple calls with partitioned ``last_updated_time`` ranges. + + Args: + stream_id (str): External ID of the stream to query. + last_updated_time (TimeRange | None): Filter records by last-updated time. + **Required** for immutable streams (must include a lower bound). + filter (Filter | None): Filter expression (see :mod:`cognite.client.data_classes.filters`). + sources (Sequence[RecordSourceSelector] | None): Which container properties to return. + sort (Sequence[RecordSortSpec] | None): Sort specifications (up to 5). + limit (int | None): Maximum number of records to return (1-1000). Defaults to 25. + target_units (TargetUnits | None): Unit conversion to apply to numeric properties. + include_typing (bool): Include property type metadata in the response. + + Returns: + RecordList: Matching records. + + Examples: + + List all records updated in the last hour: + + >>> import time + >>> from cognite.client import CogniteClient + >>> from cognite.client.data_classes.data_modeling.records import TimeRange + >>> client = CogniteClient() + >>> now_ms = int(time.time() * 1000) + >>> res = client.data_modeling.records.list( + ... stream_id="my-stream", + ... last_updated_time=TimeRange(gte=now_ms - 3_600_000), + ... limit=100, + ... ) + """ + return run_sync( + self.__async_client.data_modeling.records.list( + stream_id=stream_id, + last_updated_time=last_updated_time, + filter=filter, + sources=sources, + sort=sort, + limit=limit, + target_units=target_units, + include_typing=include_typing, + ) + ) + + def sync( + self, + stream_id: str, + *, + cursor: str | None = None, + initialize_cursor: str | None = None, + filter: Filter | None = None, + sources: Sequence[RecordSourceSelector] | None = None, + limit: int | None = None, + target_units: TargetUnits | None = None, + include_typing: bool = False, + ) -> Iterator[SyncRecord]: + """ + `Sync records from a stream `_. + + Returns a change feed of new, updated, and deleted records. Provide either + ``cursor`` (to resume a previous sync position) or ``initialize_cursor`` + (to start from a relative time, e.g. ``"7d-ago"``). The generator drains all + available pages (following ``hasNext``) then stops — call ``sync()`` again with + a new ``cursor`` to poll for subsequent changes. + + Args: + stream_id (str): External ID of the stream to sync. + cursor (str | None): Resume from a previous sync cursor. + initialize_cursor (str | None): Starting point if no cursor (e.g. ``"7d-ago"``). + Ignored when ``cursor`` is set. + filter (Filter | None): Filter expression. + sources (Sequence[RecordSourceSelector] | None): Which container properties to return. + limit (int | None): Maximum records per API call (1-1000). + target_units (TargetUnits | None): Unit conversion for numeric properties. + include_typing (bool): Include property type metadata. + + Yields: + SyncRecord: Records in change order. Deleted records have ``properties=None``. + + Examples: + + Sync all records changed in the last 7 days: + + >>> from cognite.client import CogniteClient + >>> client = CogniteClient() + >>> for record in client.data_modeling.records.sync( + ... stream_id="my-stream", + ... initialize_cursor="7d-ago", + ... ): + ... pass # process record + """ # noqa: DOC404 + yield from SyncIterator( + self.__async_client.data_modeling.records.sync( + stream_id=stream_id, + cursor=cursor, + initialize_cursor=initialize_cursor, + filter=filter, + sources=sources, + limit=limit, + target_units=target_units, + include_typing=include_typing, + ) + ) # type: ignore [misc] + + def aggregate( + self, + stream_id: str, + aggregates: dict[str, AggregateSpec], + *, + last_updated_time: TimeRange | None = None, + filter: Filter | None = None, + target_units: TargetUnits | None = None, + include_typing: bool = False, + ) -> dict[str, AggregateResult]: + """ + `Aggregate records from a stream `_. + + Compute metrics (avg, count, min, max, sum) and bucket aggregations + (unique values, time histogram, number histogram, filters) over records. + Bucket aggregates support nested sub-aggregates. + + For immutable streams, ``last_updated_time`` with at least one lower bound is required. + + Args: + stream_id (str): External ID of the stream to aggregate. + aggregates (dict[str, AggregateSpec]): Mapping of user-defined aggregate IDs + to aggregate specifications (1-5 top-level entries). IDs must not contain + ``"."`` and cannot be ``"_count"`` or ``"_bucket_count"``. + last_updated_time (TimeRange | None): Time range filter. **Required** for immutable streams. + filter (Filter | None): Filter expression applied before aggregating. + target_units (TargetUnits | None): Unit conversion for numeric properties. + include_typing (bool): Include property type metadata in the response. + + Returns: + dict[str, AggregateResult]: Mapping of aggregate IDs to their results. + Metric aggregates return a ``{"avg": value}``-style dict; bucket aggregates + return lists of bucket objects. + + Examples: + + Count records and compute average temperature: + + >>> from cognite.client import CogniteClient + >>> from cognite.client.data_classes.data_modeling.records import ( + ... AvgAggregate, + ... CountAggregate, + ... TimeRange, + ... ) + >>> client = CogniteClient() + >>> res = client.data_modeling.records.aggregate( + ... stream_id="my-stream", + ... aggregates={ + ... "total": CountAggregate( + ... property=["my-space", "my-container", "temperature"] + ... ), + ... "avg_temp": AvgAggregate( + ... property=["my-space", "my-container", "temperature"] + ... ), + ... }, + ... last_updated_time=TimeRange(gte="2024-01-01T00:00:00Z"), + ... ) + """ + return run_sync( + self.__async_client.data_modeling.records.aggregate( + stream_id=stream_id, + aggregates=aggregates, + last_updated_time=last_updated_time, + filter=filter, + target_units=target_units, + include_typing=include_typing, + ) + ) diff --git a/cognite/client/data_classes/data_modeling/__init__.py b/cognite/client/data_classes/data_modeling/__init__.py index 0fab81400f..aef8e242d7 100644 --- a/cognite/client/data_classes/data_modeling/__init__.py +++ b/cognite/client/data_classes/data_modeling/__init__.py @@ -118,6 +118,33 @@ Union, UnionAll, ) +from cognite.client.data_classes.data_modeling.records import ( + AggregateResult, + AggregateSpec, + AvgAggregate, + CountAggregate, + FiltersAggregate, + MaxAggregate, + MinAggregate, + NumberHistogramAggregate, + NumberHistogramHardBounds, + Record, + RecordList, + RecordSortSpec, + RecordSource, + RecordSourceReference, + RecordSourceSelector, + RecordWrite, + RecordWriteList, + SumAggregate, + SyncRecord, + SyncRecordList, + TargetUnitProperty, + TargetUnits, + TimeHistogramAggregate, + TimeRange, + UniqueValuesAggregate, +) from cognite.client.data_classes.data_modeling.spaces import Space, SpaceApply, SpaceApplyList, SpaceList from cognite.client.data_classes.data_modeling.streams import ( Stream, @@ -151,8 +178,11 @@ from cognite.client.data_classes.filters import Filter __all__ = [ + "AggregateResult", + "AggregateSpec", "AggregatedValue", "Aggregation", + "AvgAggregate", "BTreeIndex", "BTreeIndexApply", "Boolean", @@ -168,6 +198,7 @@ "ContainerList", "ContainerProperty", "ContainerPropertyApply", + "CountAggregate", "DataModel", "DataModelApply", "DataModelApplyList", @@ -198,6 +229,7 @@ "ExecutionPlan", "FileReference", "Filter", + "FiltersAggregate", "Float32", "Float64", "Index", @@ -218,6 +250,8 @@ "Json", "MappedProperty", "MappedPropertyApply", + "MaxAggregate", + "MinAggregate", "MultiEdgeConnection", "MultiEdgeConnectionApply", "MultiReverseDirectRelation", @@ -234,12 +268,22 @@ "NodeOrEdgeResultSetExpression", "NodeResultSetExpression", "NodeResultSetExpressionSync", + "NumberHistogramAggregate", + "NumberHistogramHardBounds", "PropertyId", "PropertyOptions", "PropertyType", "Query", "QueryResult", "QuerySync", + "Record", + "RecordList", + "RecordSortSpec", + "RecordSource", + "RecordSourceReference", + "RecordSourceSelector", + "RecordWrite", + "RecordWriteList", "RequiresConstraint", "RequiresConstraintApply", "ResultSetExpression", @@ -264,7 +308,14 @@ "StreamTemplateWriteSettings", "StreamWrite", "SubscriptionContext", + "SumAggregate", + "SyncRecord", + "SyncRecordList", + "TargetUnitProperty", + "TargetUnits", "Text", + "TimeHistogramAggregate", + "TimeRange", "TimeSeriesReference", "Timestamp", "TranslatedQuery", @@ -274,6 +325,7 @@ "TypedNodeApply", "Union", "UnionAll", + "UniqueValuesAggregate", "UniquenessConstraint", "UniquenessConstraintApply", "VersionedDataModelingId", diff --git a/cognite/client/data_classes/data_modeling/records.py b/cognite/client/data_classes/data_modeling/records.py new file mode 100644 index 0000000000..f3d1ff8f1f --- /dev/null +++ b/cognite/client/data_classes/data_modeling/records.py @@ -0,0 +1,725 @@ +from __future__ import annotations + +from typing import Any, Literal, TypeAlias + +from typing_extensions import Self + +from cognite.client.data_classes._base import ( + CogniteResource, + CogniteResourceList, + ExternalIDTransformerMixin, + WriteableCogniteResource, + WriteableCogniteResourceList, +) +from cognite.client.data_classes.filters import Filter + +# ─── Write helpers ───────────────────────────────────────────────────────────── + + +class RecordSourceReference(CogniteResource): + """Container reference used as a source in a record write. + + Args: + space (str): Space that contains the container. + external_id (str): External ID of the container. + type (str): Must be ``"container"`` (default). + """ + + def __init__(self, space: str, external_id: str, type: str = "container") -> None: + self.space = space + self.external_id = external_id + self.type = type + + @classmethod + def _load(cls, resource: dict[str, Any]) -> Self: + return cls( + space=resource["space"], + external_id=resource["externalId"], + type=resource.get("type", "container"), + ) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + return { + "type": self.type, + "space": self.space, + "externalId" if camel_case else "external_id": self.external_id, + } + + +class RecordSource(CogniteResource): + """Container source with property values for a record write. + + Args: + source (RecordSourceReference): Reference to the container. + properties (dict[str, Any]): Map of ``{property_id: value}``. + """ + + def __init__(self, source: RecordSourceReference, properties: dict[str, Any]) -> None: + self.source = source + self.properties = properties + + @classmethod + def _load(cls, resource: dict[str, Any]) -> Self: + return cls( + source=RecordSourceReference._load(resource["source"]), + properties=resource["properties"], + ) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + return { + "source": self.source.dump(camel_case=camel_case), + "properties": self.properties, + } + + +class RecordWrite(WriteableCogniteResource["RecordWrite"]): + """Write representation of a record, used for ingest and upsert. + + This is the write version of :class:`Record`. + + Args: + space (str): Space the record belongs to. + external_id (str): External ID of the record (1–256 chars, no null bytes). + sources (list[RecordSource]): Container property values to write (1–100 sources). + + Examples: + + Build a record write object: + + >>> from cognite.client.data_classes.data_modeling.records import ( + ... RecordWrite, + ... RecordSource, + ... RecordSourceReference, + ... ) + >>> rec = RecordWrite( + ... space="my-space", + ... external_id="rec-1", + ... sources=[ + ... RecordSource( + ... source=RecordSourceReference( + ... space="my-space", external_id="my-container" + ... ), + ... properties={"temperature": 22.5, "location": "north"}, + ... ) + ... ], + ... ) + """ + + def __init__(self, space: str, external_id: str, sources: list[RecordSource]) -> None: + self.space = space + self.external_id = external_id + self.sources = sources + + @classmethod + def _load(cls, resource: dict[str, Any]) -> Self: + return cls( + space=resource["space"], + external_id=resource["externalId"], + sources=[RecordSource._load(s) for s in resource.get("sources", [])], + ) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + return { + "space": self.space, + "externalId" if camel_case else "external_id": self.external_id, + "sources": [s.dump(camel_case=camel_case) for s in self.sources], + } + + def as_write(self) -> RecordWrite: + return self + + +class RecordWriteList(CogniteResourceList[RecordWrite]): + """A list of :class:`RecordWrite` objects.""" + + _RESOURCE = RecordWrite + + +# ─── Read DTOs ───────────────────────────────────────────────────────────────── + + +class Record(WriteableCogniteResource[RecordWrite]): + """A record returned from the stream records API. + + This is the read version of :class:`RecordWrite`. + + Args: + space (str): Space the record belongs to. + external_id (str): External ID of the record. + created_time (int): Creation time in milliseconds since epoch. + last_updated_time (int): Last updated time in milliseconds since epoch. + properties (dict[str, dict[str, dict[str, Any]]] | None): Property values keyed by + ``{space: {container_external_id: {property_id: value}}}``. + """ + + def __init__( + self, + space: str, + external_id: str, + created_time: int, + last_updated_time: int, + properties: dict[str, dict[str, dict[str, Any]]] | None = None, + ) -> None: + self.space = space + self.external_id = external_id + self.created_time = created_time + self.last_updated_time = last_updated_time + self.properties = properties + + @classmethod + def _load(cls, resource: dict[str, Any]) -> Self: + return cls( + space=resource["space"], + external_id=resource["externalId"], + created_time=resource["createdTime"], + last_updated_time=resource["lastUpdatedTime"], + properties=resource.get("properties"), + ) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + d: dict[str, Any] = { + "space": self.space, + "externalId" if camel_case else "external_id": self.external_id, + "createdTime" if camel_case else "created_time": self.created_time, + "lastUpdatedTime" if camel_case else "last_updated_time": self.last_updated_time, + } + if self.properties is not None: + d["properties"] = self.properties + return d + + def as_write(self) -> RecordWrite: + """Convert to :class:`RecordWrite` by grouping read properties back into sources.""" + sources = [ + RecordSource( + source=RecordSourceReference(space=space, external_id=container), + properties=dict(props), + ) + for space, containers in (self.properties or {}).items() + for container, props in containers.items() + ] + return RecordWrite(space=self.space, external_id=self.external_id, sources=sources) + + +class RecordList(WriteableCogniteResourceList[RecordWrite, Record], ExternalIDTransformerMixin): + """A list of :class:`Record` objects.""" + + _RESOURCE = Record + + def as_write(self) -> RecordWriteList: + return RecordWriteList([r.as_write() for r in self]) + + +# ─── Sync DTOs ───────────────────────────────────────────────────────────────── + + +class SyncRecord(Record): + """A record returned by the sync endpoint, annotated with a change status. + + For ``status="deleted"`` tombstones, :attr:`properties` is ``None``. + + Args: + space (str): Space the record belongs to. + external_id (str): External ID of the record. + created_time (int): Creation time in milliseconds since epoch. + last_updated_time (int): Last updated time in milliseconds since epoch. + status (Literal["created", "updated", "deleted"]): Change status. + properties (dict[str, dict[str, dict[str, Any]]] | None): Property values (absent for deleted tombstones). + """ + + def __init__( + self, + space: str, + external_id: str, + created_time: int, + last_updated_time: int, + status: Literal["created", "updated", "deleted"], + properties: dict[str, dict[str, dict[str, Any]]] | None = None, + ) -> None: + super().__init__( + space=space, + external_id=external_id, + created_time=created_time, + last_updated_time=last_updated_time, + properties=properties, + ) + self.status = status + + @classmethod + def _load(cls, resource: dict[str, Any]) -> Self: + return cls( + space=resource["space"], + external_id=resource["externalId"], + created_time=resource["createdTime"], + last_updated_time=resource["lastUpdatedTime"], + status=resource["status"], + properties=resource.get("properties"), + ) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + d = super().dump(camel_case=camel_case) + d["status"] = self.status + return d + + +class SyncRecordList(CogniteResourceList[SyncRecord]): + """A list of :class:`SyncRecord` objects.""" + + _RESOURCE = SyncRecord + + +# ─── Request helpers ─────────────────────────────────────────────────────────── + + +class TimeRange(CogniteResource): + """Inclusive/exclusive time range for ``last_updated_time`` filters. + + At least one lower bound (``gte`` or ``gt``) is **required** for immutable streams. + + Args: + gte (str | int | None): Lower bound, inclusive (ISO-8601 string or ms since epoch). + gt (str | int | None): Lower bound, exclusive. + lte (str | int | None): Upper bound, inclusive. + lt (str | int | None): Upper bound, exclusive. + + Examples: + + Create a time range for the last 24 hours: + + >>> import time + >>> from cognite.client.data_classes.data_modeling.records import TimeRange + >>> now_ms = int(time.time() * 1000) + >>> tr = TimeRange(gte=now_ms - 86_400_000) + """ + + def __init__( + self, + gte: str | int | None = None, + gt: str | int | None = None, + lte: str | int | None = None, + lt: str | int | None = None, + ) -> None: + self.gte = gte + self.gt = gt + self.lte = lte + self.lt = lt + + @classmethod + def _load(cls, resource: dict[str, Any]) -> Self: + return cls( + gte=resource.get("gte"), + gt=resource.get("gt"), + lte=resource.get("lte"), + lt=resource.get("lt"), + ) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + return { + k: v for k, v in {"gte": self.gte, "gt": self.gt, "lte": self.lte, "lt": self.lt}.items() if v is not None + } + + +class RecordSourceSelector(CogniteResource): + """Selects which container properties to include in filter and sync responses. + + Args: + source (RecordSourceReference): The container to select properties from. + properties (list[str]): Property IDs to return. Use ``["*"]`` to return all. + """ + + def __init__(self, source: RecordSourceReference, properties: list[str]) -> None: + self.source = source + self.properties = properties + + @classmethod + def _load(cls, resource: dict[str, Any]) -> Self: + return cls( + source=RecordSourceReference._load(resource["source"]), + properties=resource["properties"], + ) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + return { + "source": self.source.dump(camel_case=camel_case), + "properties": self.properties, + } + + +class RecordSortSpec(CogniteResource): + """Sort specification for record list requests. + + Args: + property (list[str]): Property path. Use 3 segments for container properties + (``[space, container_external_id, property_id]``), or a single segment for + top-level fields (``["space"]``, ``["externalId"]``, etc.). + direction (Literal["ascending", "descending"]): Sort direction (default ``"ascending"``). + """ + + def __init__(self, property: list[str], direction: Literal["ascending", "descending"] = "ascending") -> None: + self.property = property + self.direction = direction + + @classmethod + def _load(cls, resource: dict[str, Any]) -> Self: + return cls(property=resource["property"], direction=resource.get("direction", "ascending")) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + return {"property": self.property, "direction": self.direction} + + +class TargetUnitProperty(CogniteResource): + """Per-property unit conversion specification for record queries. + + Args: + property (str): Property ID to convert. + external_id (str): External ID of the target unit. + """ + + def __init__(self, property: str, external_id: str) -> None: + self.property = property + self.external_id = external_id + + @classmethod + def _load(cls, resource: dict[str, Any]) -> Self: + return cls(property=resource["property"], external_id=resource["unit"]["externalId"]) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + return {"property": self.property, "unit": {"externalId": self.external_id}} + + +class TargetUnits(CogniteResource): + """Unit conversion specification for record queries. + + Provide either ``unit_system_name`` for a global conversion or ``properties`` + for per-property control — not both. + + Args: + unit_system_name (str | None): Convert all convertible properties to a named unit system. + properties (list[TargetUnitProperty] | None): Per-property unit conversion specifications. + """ + + def __init__( + self, + unit_system_name: str | None = None, + properties: list[TargetUnitProperty] | None = None, + ) -> None: + self.unit_system_name = unit_system_name + self.properties = properties + + @classmethod + def _load(cls, resource: dict[str, Any]) -> Self: + return cls( + unit_system_name=resource.get("unitSystemName"), + properties=( + [TargetUnitProperty._load(p) for p in resource["properties"]] if "properties" in resource else None + ), + ) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + if self.unit_system_name is not None: + return {"unitSystemName" if camel_case else "unit_system_name": self.unit_system_name} + if self.properties is not None: + return {"properties": [p.dump(camel_case=camel_case) for p in self.properties]} + return {} + + +# ─── Aggregate metric types ──────────────────────────────────────────────────── + + +class AvgAggregate(CogniteResource): + """Average metric aggregate. + + Args: + property (list[str]): 3-segment property path ``[space, container_external_id, property_id]``. + """ + + def __init__(self, property: list[str]) -> None: + self.property = property + + @classmethod + def _load(cls, resource: dict[str, Any]) -> Self: + return cls(property=resource["avg"]["property"]) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + return {"avg": {"property": self.property}} + + +class CountAggregate(CogniteResource): + """Count of non-null values metric aggregate. + + Args: + property (list[str]): 3-segment property path ``[space, container_external_id, property_id]``. + """ + + def __init__(self, property: list[str]) -> None: + self.property = property + + @classmethod + def _load(cls, resource: dict[str, Any]) -> Self: + return cls(property=resource["count"]["property"]) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + return {"count": {"property": self.property}} + + +class MinAggregate(CogniteResource): + """Minimum value metric aggregate. + + Args: + property (list[str]): 3-segment property path ``[space, container_external_id, property_id]``. + """ + + def __init__(self, property: list[str]) -> None: + self.property = property + + @classmethod + def _load(cls, resource: dict[str, Any]) -> Self: + return cls(property=resource["min"]["property"]) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + return {"min": {"property": self.property}} + + +class MaxAggregate(CogniteResource): + """Maximum value metric aggregate. + + Args: + property (list[str]): 3-segment property path ``[space, container_external_id, property_id]``. + """ + + def __init__(self, property: list[str]) -> None: + self.property = property + + @classmethod + def _load(cls, resource: dict[str, Any]) -> Self: + return cls(property=resource["max"]["property"]) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + return {"max": {"property": self.property}} + + +class SumAggregate(CogniteResource): + """Sum metric aggregate. + + Args: + property (list[str]): 3-segment property path ``[space, container_external_id, property_id]``. + """ + + def __init__(self, property: list[str]) -> None: + self.property = property + + @classmethod + def _load(cls, resource: dict[str, Any]) -> Self: + return cls(property=resource["sum"]["property"]) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + return {"sum": {"property": self.property}} + + +# ─── Aggregate bucket types ──────────────────────────────────────────────────── + + +class NumberHistogramHardBounds(CogniteResource): + """Hard bounds that limit the range of bucket keys in a histogram. + + Args: + min (float | None): Minimum bucket key (inclusive). + max (float | None): Maximum bucket key (inclusive). + """ + + def __init__(self, min: float | None = None, max: float | None = None) -> None: + self.min = min + self.max = max + + @classmethod + def _load(cls, resource: dict[str, Any]) -> Self: + return cls(min=resource.get("min"), max=resource.get("max")) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + return {k: v for k, v in {"min": self.min, "max": self.max}.items() if v is not None} + + +class UniqueValuesAggregate(CogniteResource): + """Bucket aggregate grouping records by unique property values. + + Args: + property (list[str]): Property path (1–3 segments). + aggregates (dict[str, AggregateSpec] | None): Optional nested aggregates computed per bucket. + """ + + def __init__(self, property: list[str], aggregates: dict[str, AggregateSpec] | None = None) -> None: + self.property = property + self.aggregates = aggregates + + @classmethod + def _load(cls, resource: dict[str, Any]) -> Self: + spec = resource["uniqueValues"] + agg_data = spec.get("aggregates") + return cls( + property=spec["property"], + aggregates={k: _load_aggregate_spec(v) for k, v in agg_data.items()} if agg_data else None, + ) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + spec: dict[str, Any] = {"property": self.property} + if self.aggregates is not None: + spec["aggregates"] = {k: v.dump(camel_case=camel_case) for k, v in self.aggregates.items()} + return {"uniqueValues": spec} + + +class TimeHistogramAggregate(CogniteResource): + """Bucket aggregate grouping records into time-based intervals. + + Provide either ``calendar_interval`` or ``fixed_interval``, not both. + + Args: + property (list[str]): 3-segment path to a timestamp-type property. + calendar_interval (Literal["1s", "1m", "1h", "1d", "1w", "1M", "1q", "1y"] | None): Calendar-aligned bucket width. + fixed_interval (str | None): Fixed bucket width in duration format (e.g. ``"12h"``). + hard_bounds (NumberHistogramHardBounds | None): Limits the range of emitted bucket keys. + aggregates (dict[str, AggregateSpec] | None): Optional nested aggregates computed per bucket. + """ + + def __init__( + self, + property: list[str], + calendar_interval: Literal["1s", "1m", "1h", "1d", "1w", "1M", "1q", "1y"] | None = None, + fixed_interval: str | None = None, + hard_bounds: NumberHistogramHardBounds | None = None, + aggregates: dict[str, AggregateSpec] | None = None, + ) -> None: + self.property = property + self.calendar_interval = calendar_interval + self.fixed_interval = fixed_interval + self.hard_bounds = hard_bounds + self.aggregates = aggregates + + @classmethod + def _load(cls, resource: dict[str, Any]) -> Self: + spec = resource["timeHistogram"] + agg_data = spec.get("aggregates") + hard_bounds_data = spec.get("hardBounds") + return cls( + property=spec["property"], + calendar_interval=spec.get("calendarInterval"), + fixed_interval=spec.get("fixedInterval"), + hard_bounds=NumberHistogramHardBounds._load(hard_bounds_data) if hard_bounds_data else None, + aggregates={k: _load_aggregate_spec(v) for k, v in agg_data.items()} if agg_data else None, + ) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + spec: dict[str, Any] = {"property": self.property} + if self.calendar_interval is not None: + spec["calendarInterval" if camel_case else "calendar_interval"] = self.calendar_interval + if self.fixed_interval is not None: + spec["fixedInterval" if camel_case else "fixed_interval"] = self.fixed_interval + if self.hard_bounds is not None: + spec["hardBounds" if camel_case else "hard_bounds"] = self.hard_bounds.dump(camel_case=camel_case) + if self.aggregates is not None: + spec["aggregates"] = {k: v.dump(camel_case=camel_case) for k, v in self.aggregates.items()} + return {"timeHistogram" if camel_case else "time_histogram": spec} + + +class NumberHistogramAggregate(CogniteResource): + """Bucket aggregate grouping records into fixed-width numeric intervals. + + Args: + property (list[str]): 3-segment path to a numeric property. + interval (float): Width of each bucket. + hard_bounds (NumberHistogramHardBounds | None): Limits the range of emitted bucket keys. + aggregates (dict[str, AggregateSpec] | None): Optional nested aggregates computed per bucket. + """ + + def __init__( + self, + property: list[str], + interval: float, + hard_bounds: NumberHistogramHardBounds | None = None, + aggregates: dict[str, AggregateSpec] | None = None, + ) -> None: + self.property = property + self.interval = interval + self.hard_bounds = hard_bounds + self.aggregates = aggregates + + @classmethod + def _load(cls, resource: dict[str, Any]) -> Self: + spec = resource["numberHistogram"] + agg_data = spec.get("aggregates") + hard_bounds_data = spec.get("hardBounds") + return cls( + property=spec["property"], + interval=spec["interval"], + hard_bounds=NumberHistogramHardBounds._load(hard_bounds_data) if hard_bounds_data else None, + aggregates={k: _load_aggregate_spec(v) for k, v in agg_data.items()} if agg_data else None, + ) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + spec: dict[str, Any] = {"property": self.property, "interval": self.interval} + if self.hard_bounds is not None: + spec["hardBounds" if camel_case else "hard_bounds"] = self.hard_bounds.dump(camel_case=camel_case) + if self.aggregates is not None: + spec["aggregates"] = {k: v.dump(camel_case=camel_case) for k, v in self.aggregates.items()} + return {"numberHistogram" if camel_case else "number_histogram": spec} + + +class FiltersAggregate(CogniteResource): + """Bucket aggregate that creates one bucket per filter expression. + + Args: + filters (list[Filter]): One bucket per filter (1–10 filters; max 30 filter + buckets total across all :class:`FiltersAggregate` instances in a request). + aggregates (dict[str, AggregateSpec] | None): Optional nested aggregates computed per bucket. + """ + + def __init__(self, filters: list[Filter], aggregates: dict[str, AggregateSpec] | None = None) -> None: + self.filters = filters + self.aggregates = aggregates + + @classmethod + def _load(cls, resource: dict[str, Any]) -> Self: + spec = resource["filters"] + agg_data = spec.get("aggregates") + return cls( + filters=[Filter.load(f) for f in spec.get("filters", [])], + aggregates={k: _load_aggregate_spec(v) for k, v in agg_data.items()} if agg_data else None, + ) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + spec: dict[str, Any] = {"filters": [f.dump() for f in self.filters]} + if self.aggregates is not None: + spec["aggregates"] = {k: v.dump(camel_case=camel_case) for k, v in self.aggregates.items()} + return {"filters": spec} + + +AggregateSpec: TypeAlias = ( + AvgAggregate + | CountAggregate + | MinAggregate + | MaxAggregate + | SumAggregate + | UniqueValuesAggregate + | TimeHistogramAggregate + | NumberHistogramAggregate + | FiltersAggregate +) +AggregateResult: TypeAlias = dict[str, Any] + +_AGGREGATE_KEY_MAP: dict[str, type[AggregateSpec]] = { + "avg": AvgAggregate, + "count": CountAggregate, + "min": MinAggregate, + "max": MaxAggregate, + "sum": SumAggregate, + "uniqueValues": UniqueValuesAggregate, + "timeHistogram": TimeHistogramAggregate, + "numberHistogram": NumberHistogramAggregate, + "filters": FiltersAggregate, +} + + +def _load_aggregate_spec(data: dict[str, Any]) -> AggregateSpec: + key = next(iter(data)) + cls = _AGGREGATE_KEY_MAP.get(key) + if cls is None: + raise ValueError(f"Unknown aggregate type key: {key!r}") + return cls._load(data) diff --git a/cognite/client/testing.py b/cognite/client/testing.py index 90ed51e389..bb66f744fe 100644 --- a/cognite/client/testing.py +++ b/cognite/client/testing.py @@ -17,6 +17,7 @@ from cognite.client._api.data_modeling.data_models import DataModelsAPI from cognite.client._api.data_modeling.graphql import DataModelingGraphQLAPI from cognite.client._api.data_modeling.instances import InstancesAPI +from cognite.client._api.data_modeling.records import RecordsAPI from cognite.client._api.data_modeling.space_statistics import SpaceStatisticsAPI from cognite.client._api.data_modeling.spaces import SpacesAPI from cognite.client._api.data_modeling.statistics import StatisticsAPI @@ -101,6 +102,7 @@ from cognite.client._sync_api.data_modeling.data_models import SyncDataModelsAPI from cognite.client._sync_api.data_modeling.graphql import SyncDataModelingGraphQLAPI from cognite.client._sync_api.data_modeling.instances import SyncInstancesAPI +from cognite.client._sync_api.data_modeling.records import SyncRecordsAPI from cognite.client._sync_api.data_modeling.space_statistics import SyncSpaceStatisticsAPI from cognite.client._sync_api.data_modeling.spaces import SyncSpacesAPI from cognite.client._sync_api.data_modeling.statistics import SyncStatisticsAPI @@ -226,6 +228,7 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: dm_views = create_autospec(ViewsAPI, instance=True, spec_set=True) dm_instances = create_autospec(InstancesAPI, instance=True, spec_set=True) dm_graphql = create_autospec(DataModelingGraphQLAPI, instance=True, spec_set=True) + dm_records = create_autospec(RecordsAPI, instance=True, spec_set=True) dm_streams = create_autospec(StreamsAPI, instance=True, spec_set=True) self.data_modeling = create_autospec( DataModelingAPI, @@ -237,6 +240,7 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: instances=dm_instances, graphql=dm_graphql, statistics=dm_statistics, + records=dm_records, streams=dm_streams, ) flip_spec_set_on(self.data_modeling, dm_statistics) @@ -427,6 +431,7 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: dm_views = create_autospec(SyncViewsAPI, instance=True, spec_set=True) dm_instances = create_autospec(SyncInstancesAPI, instance=True, spec_set=True) dm_graphql = create_autospec(SyncDataModelingGraphQLAPI, instance=True, spec_set=True) + dm_records = create_autospec(SyncRecordsAPI, instance=True, spec_set=True) dm_streams = create_autospec(SyncStreamsAPI, instance=True, spec_set=True) self.data_modeling = create_autospec( SyncDataModelingAPI, @@ -438,6 +443,7 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: instances=dm_instances, graphql=dm_graphql, statistics=dm_statistics, + records=dm_records, streams=dm_streams, ) flip_spec_set_on(self.data_modeling, dm_statistics) diff --git a/cognite/client/utils/_url.py b/cognite/client/utils/_url.py index 567a752959..2ca1d1c779 100644 --- a/cognite/client/utils/_url.py +++ b/cognite/client/utils/_url.py @@ -35,6 +35,8 @@ "relationships", "sequences", "streams", + "streams/[^/]+/records", + "streams/[^/]+/records/upsert", "simulators", "simulators/models", "simulators/models/revisions", diff --git a/docs/source/data_modeling.rst b/docs/source/data_modeling.rst index af2bb93b23..86ee926010 100644 --- a/docs/source/data_modeling.rst +++ b/docs/source/data_modeling.rst @@ -286,6 +286,23 @@ Data modeling statistics data classes .. currentmodule:: cognite.client +Records +------- +.. autosummary:: + :methods: + :toctree: generated/ + :template: custom-automethods-template.rst + + AsyncCogniteClient.data_modeling.records + +Records data classes +^^^^^^^^^^^^^^^^^^^^ +.. automodule:: cognite.client.data_classes.data_modeling.records + :members: + :show-inheritance: + +.. currentmodule:: cognite.client + Streams ------- .. autosummary:: diff --git a/pyproject.toml b/pyproject.toml index 69a778839a..556b8020dd 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -30,6 +30,9 @@ external = ["DOC"] [tool.ruff.lint.per-file-ignores] # let scripts use print statements "scripts/*" = ["T201"] +# auto-generated sync __init__.py files aggregate sub-APIs; the codegen template adds imports that +# ruff 0.9+ won't auto-remove from __init__.py files (treats them as potential re-exports) +"cognite/client/_sync_api/**/__init__.py" = ["F401"] [tool.ruff.format] # format code examples in docstrings to avoid users scrolling horizontally diff --git a/tests/tests_unit/test_api/test_data_modeling/test_records.py b/tests/tests_unit/test_api/test_data_modeling/test_records.py new file mode 100644 index 0000000000..a028ed6f81 --- /dev/null +++ b/tests/tests_unit/test_api/test_data_modeling/test_records.py @@ -0,0 +1,321 @@ +from __future__ import annotations + +import re +from collections.abc import Callable + +import pytest +from pytest_httpx import HTTPXMock + +from cognite.client import AsyncCogniteClient, CogniteClient +from cognite.client.data_classes.data_modeling.records import ( + AvgAggregate, + CountAggregate, + Record, + RecordList, + RecordSource, + RecordSourceReference, + RecordWrite, + SyncRecord, + SyncRecordList, + TimeRange, +) +from tests.utils import jsgz_load + + +@pytest.fixture +def records_base_url(async_client: AsyncCogniteClient) -> str: + return async_client.data_modeling.records._base_url_with_base_path + "/streams/my-stream/records" + + +@pytest.fixture +def make_record_response() -> Callable[..., dict]: + def _make(external_id: str = "rec-1", space: str = "sp") -> dict: + return { + "space": space, + "externalId": external_id, + "createdTime": 1000, + "lastUpdatedTime": 2000, + "properties": {"sp": {"container-x": {"temp": 22.5}}}, + } + + return _make + + +@pytest.fixture +def record_response(make_record_response: Callable[..., dict]) -> dict: + return make_record_response() + + +@pytest.fixture +def record_list_response(record_response: dict) -> dict: + return {"items": [record_response]} + + +@pytest.fixture +def write_item() -> RecordWrite: + return RecordWrite( + space="sp", + external_id="rec-1", + sources=[ + RecordSource( + source=RecordSourceReference(space="sp", external_id="container-x"), + properties={"temp": 22.5}, + ) + ], + ) + + +class TestRecordsAPIIngest: + def test_ingest_single_posts_correct_body( + self, + cognite_client: CogniteClient, + httpx_mock: HTTPXMock, + records_base_url: str, + write_item: RecordWrite, + ) -> None: + httpx_mock.add_response(method="POST", url=re.compile(re.escape(records_base_url) + r"$"), status_code=202) + cognite_client.data_modeling.records.ingest("my-stream", write_item) + requests = httpx_mock.get_requests() + assert len(requests) == 1 + body = jsgz_load(requests[0].content) + assert body == { + "items": [ + { + "space": "sp", + "externalId": "rec-1", + "sources": [ + { + "source": {"type": "container", "space": "sp", "externalId": "container-x"}, + "properties": {"temp": 22.5}, + } + ], + } + ] + } + + def test_ingest_chunks_over_1000( + self, + cognite_client: CogniteClient, + httpx_mock: HTTPXMock, + records_base_url: str, + ) -> None: + url_pattern = re.compile(re.escape(records_base_url) + r"$") + httpx_mock.add_response(method="POST", url=url_pattern, status_code=202) + httpx_mock.add_response(method="POST", url=url_pattern, status_code=202) + items = [ + RecordWrite(space="sp", external_id=f"r-{i}", sources=[]) + for i in range(1001) + ] + cognite_client.data_modeling.records.ingest("my-stream", items) + requests = httpx_mock.get_requests() + assert len(requests) == 2 + assert len(jsgz_load(requests[0].content)["items"]) == 1000 + assert len(jsgz_load(requests[1].content)["items"]) == 1 + + +class TestRecordsAPIUpsert: + def test_upsert_posts_to_upsert_path( + self, + cognite_client: CogniteClient, + httpx_mock: HTTPXMock, + records_base_url: str, + write_item: RecordWrite, + ) -> None: + httpx_mock.add_response( + method="POST", url=re.compile(re.escape(records_base_url) + r"/upsert$"), status_code=202 + ) + cognite_client.data_modeling.records.upsert("my-stream", write_item) + requests = httpx_mock.get_requests() + assert len(requests) == 1 + assert requests[0].url.path.endswith("/records/upsert") + + +class TestRecordsAPIDelete: + def test_delete_posts_space_external_id_pairs( + self, + cognite_client: CogniteClient, + httpx_mock: HTTPXMock, + records_base_url: str, + write_item: RecordWrite, + ) -> None: + httpx_mock.add_response( + method="POST", url=re.compile(re.escape(records_base_url) + r"/delete$"), status_code=200 + ) + cognite_client.data_modeling.records.delete("my-stream", write_item) + requests = httpx_mock.get_requests() + assert len(requests) == 1 + body = jsgz_load(requests[0].content) + assert body == {"items": [{"space": "sp", "externalId": "rec-1"}]} + + def test_delete_accepts_record_objects( + self, + cognite_client: CogniteClient, + httpx_mock: HTTPXMock, + records_base_url: str, + record_response: dict, + ) -> None: + httpx_mock.add_response( + method="POST", url=re.compile(re.escape(records_base_url) + r"/delete$"), status_code=200 + ) + record = Record._load(record_response) + cognite_client.data_modeling.records.delete("my-stream", record) + body = jsgz_load(httpx_mock.get_requests()[0].content) + assert body == {"items": [{"space": "sp", "externalId": "rec-1"}]} + + def test_delete_chunks_over_1000( + self, + cognite_client: CogniteClient, + httpx_mock: HTTPXMock, + records_base_url: str, + ) -> None: + url_pattern = re.compile(re.escape(records_base_url) + r"/delete$") + httpx_mock.add_response(method="POST", url=url_pattern, status_code=200) + httpx_mock.add_response(method="POST", url=url_pattern, status_code=200) + items = [RecordWrite(space="sp", external_id=f"r-{i}", sources=[]) for i in range(1001)] + cognite_client.data_modeling.records.delete("my-stream", items) + assert len(httpx_mock.get_requests()) == 2 + + +class TestRecordsAPIList: + def test_list_returns_record_list( + self, + cognite_client: CogniteClient, + httpx_mock: HTTPXMock, + records_base_url: str, + record_list_response: dict, + ) -> None: + httpx_mock.add_response( + method="POST", url=re.compile(re.escape(records_base_url) + r"/filter$"), json=record_list_response + ) + out = cognite_client.data_modeling.records.list("my-stream") + assert isinstance(out, RecordList) + assert out[0].external_id == "rec-1" + assert out[0].properties == {"sp": {"container-x": {"temp": 22.5}}} + + def test_list_sends_last_updated_time( + self, + cognite_client: CogniteClient, + httpx_mock: HTTPXMock, + records_base_url: str, + record_list_response: dict, + ) -> None: + httpx_mock.add_response( + method="POST", url=re.compile(re.escape(records_base_url) + r"/filter$"), json=record_list_response + ) + cognite_client.data_modeling.records.list( + "my-stream", + last_updated_time=TimeRange(gte=1_000_000), + limit=50, + ) + body = jsgz_load(httpx_mock.get_requests()[0].content) + assert body["lastUpdatedTime"] == {"gte": 1_000_000} + assert body["limit"] == 50 + + +class TestRecordsAPISync: + def test_sync_yields_records_and_follows_has_next( + self, + cognite_client: CogniteClient, + httpx_mock: HTTPXMock, + records_base_url: str, + ) -> None: + url = re.compile(re.escape(records_base_url) + r"/sync$") + page1 = { + "items": [{"space": "sp", "externalId": "a", "createdTime": 1, "lastUpdatedTime": 2, "status": "created"}], + "nextCursor": "cursor-2", + "hasNext": True, + } + page2 = { + "items": [{"space": "sp", "externalId": "b", "createdTime": 3, "lastUpdatedTime": 4, "status": "updated"}], + "nextCursor": "cursor-end", + "hasNext": False, + } + httpx_mock.add_response(method="POST", url=url, json=page1) + httpx_mock.add_response(method="POST", url=url, json=page2) + + out = list(cognite_client.data_modeling.records.sync("my-stream", initialize_cursor="1d-ago")) + assert len(out) == 2 + assert isinstance(out[0], SyncRecord) + assert out[0].external_id == "a" + assert out[1].external_id == "b" + assert len(httpx_mock.get_requests()) == 2 + + def test_sync_deleted_tombstone_has_no_properties( + self, + cognite_client: CogniteClient, + httpx_mock: HTTPXMock, + records_base_url: str, + ) -> None: + httpx_mock.add_response( + method="POST", + url=re.compile(re.escape(records_base_url) + r"/sync$"), + json={ + "items": [ + {"space": "sp", "externalId": "gone", "createdTime": 1, "lastUpdatedTime": 5, "status": "deleted"} + ], + "nextCursor": "c", + "hasNext": False, + }, + ) + out = list(cognite_client.data_modeling.records.sync("my-stream", cursor="some-cursor")) + assert len(out) == 1 + assert out[0].status == "deleted" + assert out[0].properties is None + + +class TestRecordsAPIAggregate: + def test_aggregate_posts_correct_body( + self, + cognite_client: CogniteClient, + httpx_mock: HTTPXMock, + records_base_url: str, + ) -> None: + httpx_mock.add_response( + method="POST", + url=re.compile(re.escape(records_base_url) + r"/aggregate$"), + json={"aggregates": {"total": {"count": 42}, "avg_t": {"avg": 21.5}}}, + ) + prop = ["sp", "container-x", "temp"] + res = cognite_client.data_modeling.records.aggregate( + "my-stream", + aggregates={ + "total": CountAggregate(property=prop), + "avg_t": AvgAggregate(property=prop), + }, + last_updated_time=TimeRange(gte=0), + ) + body = jsgz_load(httpx_mock.get_requests()[0].content) + assert body["aggregates"] == { + "total": {"count": {"property": prop}}, + "avg_t": {"avg": {"property": prop}}, + } + assert body["lastUpdatedTime"] == {"gte": 0} + assert res == {"total": {"count": 42}, "avg_t": {"avg": 21.5}} + + +class TestRecordDTOs: + def test_record_load_round_trip(self, record_response: dict) -> None: + r = Record._load(record_response) + assert r.space == "sp" + assert r.external_id == "rec-1" + assert r.created_time == 1000 + assert r.last_updated_time == 2000 + assert r.properties == {"sp": {"container-x": {"temp": 22.5}}} + + def test_record_as_write_reconstructs_sources(self, record_response: dict) -> None: + r = Record._load(record_response) + w = r.as_write() + assert isinstance(w, RecordWrite) + assert w.space == "sp" + assert w.external_id == "rec-1" + assert len(w.sources) == 1 + assert w.sources[0].source.space == "sp" + assert w.sources[0].source.external_id == "container-x" + assert w.sources[0].properties == {"temp": 22.5} + + def test_sync_record_deleted_tombstone(self) -> None: + r = SyncRecord._load( + {"space": "sp", "externalId": "x", "createdTime": 1, "lastUpdatedTime": 2, "status": "deleted"} + ) + assert r.status == "deleted" + assert r.properties is None diff --git a/tests/tests_unit/test_api_client.py b/tests/tests_unit/test_api_client.py index 0ddd5354d8..404d576502 100644 --- a/tests/tests_unit/test_api_client.py +++ b/tests/tests_unit/test_api_client.py @@ -1841,6 +1841,13 @@ async def test_is_retryable_resource_api_endpoints(self, method: str, path: str, # Streams API ("POST", "https://api.cognitedata.com/api/v1/projects/bla/streams", False), ("POST", "https://api.cognitedata.com/api/v1/projects/bla/streams/delete", False), + # Records API + ("POST", "https://api.cognitedata.com/api/v1/projects/bla/streams/my-stream/records", False), + ("POST", "https://api.cognitedata.com/api/v1/projects/bla/streams/my-stream/records/upsert", False), + ("POST", "https://api.cognitedata.com/api/v1/projects/bla/streams/my-stream/records/delete", False), + ("POST", "https://api.cognitedata.com/api/v1/projects/bla/streams/my-stream/records/filter", True), + ("POST", "https://api.cognitedata.com/api/v1/projects/bla/streams/my-stream/records/sync", True), + ("POST", "https://api.cognitedata.com/api/v1/projects/bla/streams/my-stream/records/aggregate", True), ] ), ) diff --git a/tests/tests_unit/test_docstring_examples.py b/tests/tests_unit/test_docstring_examples.py index ebc313620f..00251a79bd 100644 --- a/tests/tests_unit/test_docstring_examples.py +++ b/tests/tests_unit/test_docstring_examples.py @@ -34,6 +34,7 @@ data_models, graphql, instances, + records, spaces, statistics, streams, @@ -130,6 +131,7 @@ def test_data_modeling(self) -> None: run_docstring_tests(graphql) run_docstring_tests(statistics) run_docstring_tests(streams) + run_docstring_tests(records) def test_datapoint_subscriptions(self) -> None: run_docstring_tests(datapoints_subscriptions) From 4766c89df948aa3f9ee9189a6900c66b9961880d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Anders=20=C3=98en=20Fylling?= Date: Fri, 8 May 2026 14:01:10 +0200 Subject: [PATCH 3/7] run pre-commit --- .../tests_unit/test_api/test_data_modeling/test_records.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/tests/tests_unit/test_api/test_data_modeling/test_records.py b/tests/tests_unit/test_api/test_data_modeling/test_records.py index a028ed6f81..b2fb4e23e3 100644 --- a/tests/tests_unit/test_api/test_data_modeling/test_records.py +++ b/tests/tests_unit/test_api/test_data_modeling/test_records.py @@ -16,7 +16,6 @@ RecordSourceReference, RecordWrite, SyncRecord, - SyncRecordList, TimeRange, ) from tests.utils import jsgz_load @@ -102,10 +101,7 @@ def test_ingest_chunks_over_1000( url_pattern = re.compile(re.escape(records_base_url) + r"$") httpx_mock.add_response(method="POST", url=url_pattern, status_code=202) httpx_mock.add_response(method="POST", url=url_pattern, status_code=202) - items = [ - RecordWrite(space="sp", external_id=f"r-{i}", sources=[]) - for i in range(1001) - ] + items = [RecordWrite(space="sp", external_id=f"r-{i}", sources=[]) for i in range(1001)] cognite_client.data_modeling.records.ingest("my-stream", items) requests = httpx_mock.get_requests() assert len(requests) == 2 From 7f80f96bef633318b32e881800ead4d5d943191a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Anders=20=C3=98en=20Fylling?= Date: Fri, 8 May 2026 14:01:55 +0200 Subject: [PATCH 4/7] fix(records): add docs-time import for RecordsAPI in _cognite_client.py Sphinx autosummary requires the class to be importable via BUILD_COGNITE_SDK_DOCS block so it can resolve AsyncCogniteClient.data_modeling.records in the RST directives. Co-Authored-By: Claude Sonnet 4.6 --- cognite/client/_cognite_client.py | 1 + 1 file changed, 1 insertion(+) diff --git a/cognite/client/_cognite_client.py b/cognite/client/_cognite_client.py index c2f6413b79..86ead78279 100644 --- a/cognite/client/_cognite_client.py +++ b/cognite/client/_cognite_client.py @@ -49,6 +49,7 @@ from cognite.client._api.data_modeling.data_models import DataModelsAPI from cognite.client._api.data_modeling.graphql import DataModelingGraphQLAPI from cognite.client._api.data_modeling.instances import InstancesAPI + from cognite.client._api.data_modeling.records import RecordsAPI from cognite.client._api.data_modeling.space_statistics import SpaceStatisticsAPI from cognite.client._api.data_modeling.spaces import SpacesAPI from cognite.client._api.data_modeling.statistics import StatisticsAPI From cf4b9c6723eab5caa567a7f0c8625e90c7cdbf1b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Anders=20=C3=98en=20Fylling?= Date: Fri, 8 May 2026 14:14:41 +0200 Subject: [PATCH 5/7] run pre-commit --- cognite/client/_cognite_client.py | 1 - 1 file changed, 1 deletion(-) diff --git a/cognite/client/_cognite_client.py b/cognite/client/_cognite_client.py index 86ead78279..c2f6413b79 100644 --- a/cognite/client/_cognite_client.py +++ b/cognite/client/_cognite_client.py @@ -49,7 +49,6 @@ from cognite.client._api.data_modeling.data_models import DataModelsAPI from cognite.client._api.data_modeling.graphql import DataModelingGraphQLAPI from cognite.client._api.data_modeling.instances import InstancesAPI - from cognite.client._api.data_modeling.records import RecordsAPI from cognite.client._api.data_modeling.space_statistics import SpaceStatisticsAPI from cognite.client._api.data_modeling.spaces import SpacesAPI from cognite.client._api.data_modeling.statistics import StatisticsAPI From eb92f8f22081dadff981bd848a9144561844306f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Anders=20=C3=98en=20Fylling?= Date: Mon, 11 May 2026 10:55:09 +0200 Subject: [PATCH 6/7] fix(records): resolve CI lint and docs failures MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Replace en-dash (–) with hyphen (-) in 4 docstrings (RUF002) - Remove unused type: ignore [misc] comment in sync records API (mypy) - Fix docstring quote style from double to single in Literal types (custom-checks) - Add RecordsAPI to _make_accessors_for_building_docs() so Sphinx can discover data_modeling.records Co-Authored-By: Claude Sonnet 4.6 --- cognite/client/_cognite_client.py | 2 ++ .../client/_sync_api/data_modeling/records.py | 2 +- .../data_classes/data_modeling/records.py | 18 ++++++++---------- 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/cognite/client/_cognite_client.py b/cognite/client/_cognite_client.py index c2f6413b79..f126f189f7 100644 --- a/cognite/client/_cognite_client.py +++ b/cognite/client/_cognite_client.py @@ -52,6 +52,7 @@ from cognite.client._api.data_modeling.space_statistics import SpaceStatisticsAPI from cognite.client._api.data_modeling.spaces import SpacesAPI from cognite.client._api.data_modeling.statistics import StatisticsAPI + from cognite.client._api.data_modeling.records import RecordsAPI from cognite.client._api.data_modeling.streams import StreamsAPI from cognite.client._api.data_modeling.views import ViewsAPI from cognite.client._api.datapoints import DatapointsAPI @@ -426,6 +427,7 @@ def _make_accessors_for_building_docs() -> None: AsyncCogniteClient.data_modeling.graphql = DataModelingGraphQLAPI # type: ignore AsyncCogniteClient.data_modeling.statistics = StatisticsAPI # type: ignore AsyncCogniteClient.data_modeling.statistics.spaces = SpaceStatisticsAPI # type: ignore + AsyncCogniteClient.data_modeling.records = RecordsAPI # type: ignore AsyncCogniteClient.data_modeling.streams = StreamsAPI # type: ignore AsyncCogniteClient.documents = DocumentsAPI # type: ignore AsyncCogniteClient.documents.previews = DocumentPreviewAPI # type: ignore diff --git a/cognite/client/_sync_api/data_modeling/records.py b/cognite/client/_sync_api/data_modeling/records.py index a83670c512..52a46bf0b5 100644 --- a/cognite/client/_sync_api/data_modeling/records.py +++ b/cognite/client/_sync_api/data_modeling/records.py @@ -274,7 +274,7 @@ def sync( target_units=target_units, include_typing=include_typing, ) - ) # type: ignore [misc] + ) def aggregate( self, diff --git a/cognite/client/data_classes/data_modeling/records.py b/cognite/client/data_classes/data_modeling/records.py index f3d1ff8f1f..aa9374dbba 100644 --- a/cognite/client/data_classes/data_modeling/records.py +++ b/cognite/client/data_classes/data_modeling/records.py @@ -79,8 +79,8 @@ class RecordWrite(WriteableCogniteResource["RecordWrite"]): Args: space (str): Space the record belongs to. - external_id (str): External ID of the record (1–256 chars, no null bytes). - sources (list[RecordSource]): Container property values to write (1–100 sources). + external_id (str): External ID of the record (1-256 chars, no null bytes). + sources (list[RecordSource]): Container property values to write (1-100 sources). Examples: @@ -222,7 +222,7 @@ class SyncRecord(Record): external_id (str): External ID of the record. created_time (int): Creation time in milliseconds since epoch. last_updated_time (int): Last updated time in milliseconds since epoch. - status (Literal["created", "updated", "deleted"]): Change status. + status (Literal['created', 'updated', 'deleted']): Change status. properties (dict[str, dict[str, dict[str, Any]]] | None): Property values (absent for deleted tombstones). """ @@ -348,10 +348,8 @@ class RecordSortSpec(CogniteResource): """Sort specification for record list requests. Args: - property (list[str]): Property path. Use 3 segments for container properties - (``[space, container_external_id, property_id]``), or a single segment for - top-level fields (``["space"]``, ``["externalId"]``, etc.). - direction (Literal["ascending", "descending"]): Sort direction (default ``"ascending"``). + property (list[str]): Property path. Use 3 segments for container properties (``[space, container_external_id, property_id]``), or a single segment for top-level fields (``["space"]``, ``["externalId"]``, etc.). + direction (Literal['ascending', 'descending']): Sort direction (default ``"ascending"``). """ def __init__(self, property: list[str], direction: Literal["ascending", "descending"] = "ascending") -> None: @@ -542,7 +540,7 @@ class UniqueValuesAggregate(CogniteResource): """Bucket aggregate grouping records by unique property values. Args: - property (list[str]): Property path (1–3 segments). + property (list[str]): Property path (1-3 segments). aggregates (dict[str, AggregateSpec] | None): Optional nested aggregates computed per bucket. """ @@ -573,7 +571,7 @@ class TimeHistogramAggregate(CogniteResource): Args: property (list[str]): 3-segment path to a timestamp-type property. - calendar_interval (Literal["1s", "1m", "1h", "1d", "1w", "1M", "1q", "1y"] | None): Calendar-aligned bucket width. + calendar_interval (Literal['1s', '1m', '1h', '1d', '1w', '1M', '1q', '1y'] | None): Calendar-aligned bucket width. fixed_interval (str | None): Fixed bucket width in duration format (e.g. ``"12h"``). hard_bounds (NumberHistogramHardBounds | None): Limits the range of emitted bucket keys. aggregates (dict[str, AggregateSpec] | None): Optional nested aggregates computed per bucket. @@ -666,7 +664,7 @@ class FiltersAggregate(CogniteResource): """Bucket aggregate that creates one bucket per filter expression. Args: - filters (list[Filter]): One bucket per filter (1–10 filters; max 30 filter + filters (list[Filter]): One bucket per filter (1-10 filters; max 30 filter buckets total across all :class:`FiltersAggregate` instances in a request). aggregates (dict[str, AggregateSpec] | None): Optional nested aggregates computed per bucket. """ From 53241571a842aa0de76d2d72ae282bf5d3ed39eb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Anders=20=C3=98en=20Fylling?= Date: Mon, 11 May 2026 11:03:19 +0200 Subject: [PATCH 7/7] fix(records): fix import order for RecordsAPI in _cognite_client.py ruff requires imports sorted alphabetically; records must come before space_statistics. Co-Authored-By: Claude Sonnet 4.6 --- cognite/client/_cognite_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cognite/client/_cognite_client.py b/cognite/client/_cognite_client.py index f126f189f7..76b35dc734 100644 --- a/cognite/client/_cognite_client.py +++ b/cognite/client/_cognite_client.py @@ -49,10 +49,10 @@ from cognite.client._api.data_modeling.data_models import DataModelsAPI from cognite.client._api.data_modeling.graphql import DataModelingGraphQLAPI from cognite.client._api.data_modeling.instances import InstancesAPI + from cognite.client._api.data_modeling.records import RecordsAPI from cognite.client._api.data_modeling.space_statistics import SpaceStatisticsAPI from cognite.client._api.data_modeling.spaces import SpacesAPI from cognite.client._api.data_modeling.statistics import StatisticsAPI - from cognite.client._api.data_modeling.records import RecordsAPI from cognite.client._api.data_modeling.streams import StreamsAPI from cognite.client._api.data_modeling.views import ViewsAPI from cognite.client._api.datapoints import DatapointsAPI