Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
168 changes: 157 additions & 11 deletions cognite/client/_api/datapoint_tasks.py

Large diffs are not rendered by default.

152 changes: 138 additions & 14 deletions cognite/client/_api/datapoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
LatestDatapointQuery,
)
from cognite.client.data_classes.data_modeling.ids import NodeId
from cognite.client.data_classes.datapoint_aggregates import Aggregate
from cognite.client.data_classes.datapoint_aggregates import STATE_AGGREGATES_CAMEL, Aggregate
from cognite.client.exceptions import CogniteAPIError, CogniteNotFoundError
from cognite.client.utils import _json_extended as _json
from cognite.client.utils._auxiliary import (
Expand All @@ -60,6 +60,7 @@
unpack_items_in_payload,
)
from cognite.client.utils._concurrency import AsyncSDKTask, execute_async_tasks
from cognite.client.utils._experimental import FeaturePreviewWarning
from cognite.client.utils._identifier import Identifier, IdentifierSequence, IdentifierSequenceCore
from cognite.client.utils._importing import local_import
from cognite.client.utils._time import (
Expand Down Expand Up @@ -533,6 +534,36 @@ def __init__(self, config: ClientConfig, api_version: str | None, cognite_client
self._POST_DPS_OBJECTS_LIMIT = 10_000

self.query_validator = _DpsQueryValidator(dps_limit_raw=self._DPS_LIMIT_RAW, dps_limit_agg=self._DPS_LIMIT_AGG)
self._state_time_series_warning = FeaturePreviewWarning(
api_maturity="beta",
sdk_maturity="alpha",
feature_name="State time series",
)

# Warns once per DatapointsAPI instance when state time series features are touched
# via queries (state aggregates), results (returned series of type "state") or inserts
# (handled by DatapointsPoster after validation).
def _maybe_warn_state(
self,
*,
queries: list[DatapointsQuery] | None = None,
results: Datapoints | DatapointsArray | DatapointsList | DatapointsArrayList | None = None,
) -> None:
if queries is not None:
for q in queries:
if q.aggregates and STATE_AGGREGATES_CAMEL.intersection(q.aggs_camel_case):
self._state_time_series_warning.warn()
return
if results is None:
return
if isinstance(results, (Datapoints, DatapointsArray)):
if results.type == "state":
self._state_time_series_warning.warn()
return
for item in results.data:
if item.type == "state":
self._state_time_series_warning.warn()
return

def _get_semaphore(self, operation: Literal["read", "write", "delete"]) -> asyncio.BoundedSemaphore:
from cognite.client import global_config
Expand Down Expand Up @@ -1210,7 +1241,9 @@ async def retrieve(
treat_uncertain_as_bad=treat_uncertain_as_bad,
)
self.query_validator(parsed_queries := query.parse_into_queries())
self._maybe_warn_state(queries=parsed_queries)
dps_lst = await self._select_dps_fetch_strategy(parsed_queries)(self, parsed_queries).fetch_all_datapoints()
self._maybe_warn_state(results=dps_lst)

if not query.is_single_identifier:
return dps_lst
Expand Down Expand Up @@ -1457,9 +1490,11 @@ async def retrieve_arrays(
treat_uncertain_as_bad=treat_uncertain_as_bad,
)
self.query_validator(parsed_queries := query.parse_into_queries())
self._maybe_warn_state(queries=parsed_queries)
dps_lst = await self._select_dps_fetch_strategy(parsed_queries)(
self, parsed_queries
).fetch_all_datapoints_numpy()
self._maybe_warn_state(results=dps_lst)

if not query.is_single_identifier:
return dps_lst
Expand Down Expand Up @@ -1604,10 +1639,12 @@ async def retrieve_dataframe(
treat_uncertain_as_bad=treat_uncertain_as_bad,
)
self.query_validator(parsed_queries := query.parse_into_queries())
self._maybe_warn_state(queries=parsed_queries)
fetcher = self._select_dps_fetch_strategy(parsed_queries)(self, parsed_queries)

if not uniform_index:
result = await fetcher.fetch_all_datapoints_numpy()
self._maybe_warn_state(results=result)
return result.to_pandas(
include_aggregate_name=include_aggregate_name,
include_granularity_name=include_granularity_name,
Expand All @@ -1625,6 +1662,7 @@ async def retrieve_dataframe(
"OR when timezone is used OR when a calendar granularity is used (e.g. month/quarter/year)"
)
result = await fetcher.fetch_all_datapoints_numpy()
self._maybe_warn_state(results=result)
df = result.to_pandas(
include_aggregate_name=include_aggregate_name,
include_granularity_name=include_granularity_name,
Expand Down Expand Up @@ -2303,24 +2341,44 @@ def _select_dps_fetch_strategy(self, queries: list[DatapointsQuery]) -> type[Dps

class _InsertDatapoint(NamedTuple):
ts: int | datetime.datetime
value: str | float
value: str | float | None = None
status_code: int | None = None
status_symbol: str | None = None
kind: Literal["raw", "state"] = "raw"
numeric_value: int | None = None
string_value: str | None = None

@classmethod
def from_dict(cls, dct: dict[str, Any]) -> Self:
if "numericValue" in dct or "stringValue" in dct:
if status := dct.get("status"):
return cls(
dct["timestamp"],
None,
status.get("code"),
status.get("symbol"),
"state",
dct.get("numericValue"),
dct.get("stringValue"),
)
return cls(dct["timestamp"], None, None, None, "state", dct.get("numericValue"), dct.get("stringValue"))
if status := dct.get("status"):
return cls(dct["timestamp"], dct["value"], status.get("code"), status.get("symbol"))
return cls(dct["timestamp"], dct["value"])
return cls(dct["timestamp"], dct["value"], status.get("code"), status.get("symbol"), "raw", None, None)
return cls(dct["timestamp"], dct["value"], None, None, "raw", None, None)

def dump(self) -> dict[str, Any]:
dumped: dict[str, Any] = {"timestamp": timestamp_to_ms(self.ts), "value": self.value}
dumped: dict[str, Any] = {"timestamp": timestamp_to_ms(self.ts)}
if self.kind == "state":
if self.numeric_value is not None:
dumped["numericValue"] = self.numeric_value
if self.string_value is not None:
dumped["stringValue"] = self.string_value
else:
dumped["value"] = _json.convert_nonfinite_float_to_str(self.value)
if self.status_code: # also skip if 0
dumped["status"] = {"code": self.status_code}
if self.status_symbol and self.status_symbol != "Good":
dumped.setdefault("status", {})["symbol"] = self.status_symbol
# Out-of-range float values must be passed as strings:
dumped["value"] = _json.convert_nonfinite_float_to_str(dumped["value"])
return dumped


Expand Down Expand Up @@ -2356,6 +2414,8 @@ def _verify_and_prepare_dps_objects(
continue
identifier = validate_user_input_dict_with_identifier(obj, required_keys={"datapoints"})
validated_dps = self._parse_and_validate_dps(obj["datapoints"])
if any(dp.kind == "state" for dp in validated_dps):
self.dps_client._state_time_series_warning.warn()
dps_to_insert[identifier].extend(validated_dps)
return list(dps_to_insert.items())

Expand All @@ -2372,14 +2432,25 @@ def _parse_and_validate_dps(self, dps: Datapoints | DatapointsArray | list[tuple
if self._dps_are_insert_ready(dps):
return dps # Internal SDK shortcut to avoid casting
elif self._dps_are_tuples(dps):
return [_InsertDatapoint(*tpl) for tpl in dps]
out: list[_InsertDatapoint] = []
for tpl in dps:
match len(tpl):
case 2:
out.append(_InsertDatapoint(tpl[0], tpl[1]))
case 3:
ts, val, code = tpl
out.append(_InsertDatapoint(ts, val, int(code), None))
case _:
raise TypeError(f"Unsupported datapoint tuple length {len(tpl)}: {tpl!r}")
return out
elif self._dps_are_dicts(dps):
try:
return [_InsertDatapoint.from_dict(dp) for dp in dps]
except KeyError:
except KeyError as e:
raise KeyError(
"A datapoint is missing one or both keys ['value', 'timestamp']. Note: 'status' is optional."
)
"A datapoint dict must include 'timestamp' and either 'value' or at least one of "
"['numericValue', 'stringValue'] for state time series. 'status' is optional."
) from e
raise TypeError(
"Datapoints to be inserted must be of type Datapoints or DatapointsArray (with raw datapoints), "
f"or be a list containing tuples or dicts, not {type(dps[0])}"
Expand Down Expand Up @@ -2446,9 +2517,26 @@ def _split_datapoints(lst: list[_T], n_first: int, n: int) -> Iterator[tuple[lis

@staticmethod
def _verify_dps_object_for_insertion(dps: Datapoints | DatapointsArray) -> None:
if dps.value is None:
n_ts = len(dps.timestamp)
if dps.type == "state":
if dps.numeric_value is None and dps.string_value is None:
raise ValueError(
f"Only raw datapoints are supported when inserting data from ``{type(dps).__name__}`` "
"(state series require numeric_value and/or string_value arrays)"
)
if dps.numeric_value is not None and len(dps.numeric_value) != n_ts:
raise ValueError(
f"Number of timestamps ({n_ts}) does not match number of numeric state values "
f"({len(dps.numeric_value)}) to insert"
)
if dps.string_value is not None and len(dps.string_value) != n_ts:
raise ValueError(
f"Number of timestamps ({n_ts}) does not match number of string state values "
f"({len(dps.string_value)}) to insert"
)
elif dps.value is None:
raise ValueError(f"Only raw datapoints are supported when inserting data from ``{type(dps).__name__}``")
if (n_ts := len(dps.timestamp)) != (n_dps := len(dps.value)):
elif n_ts != (n_dps := len(dps.value)):
raise ValueError(f"Number of timestamps ({n_ts}) does not match number of datapoints ({n_dps}) to insert")

if dps.status_code is not None and dps.status_symbol is not None:
Expand All @@ -2461,14 +2549,50 @@ def _verify_dps_object_for_insertion(dps: Datapoints | DatapointsArray) -> None:
raise ValueError("One of status code/symbol is missing on datapoints object")

def _extract_raw_data_from_datapoints(self, dps: Datapoints) -> list[_InsertDatapoint]:
if dps.type == "state":
n = len(dps.timestamp)
nums = list(dps.numeric_value) if dps.numeric_value is not None else [None] * n
strs = list(dps.string_value) if dps.string_value is not None else [None] * n
if dps.status_code is None:
return [
_InsertDatapoint(ts, None, None, None, "state", nv, sv)
for ts, nv, sv in zip(dps.timestamp, nums, strs)
]
return [
_InsertDatapoint(ts, None, c, s, "state", nv, sv)
for ts, nv, sv, c, s in zip(dps.timestamp, nums, strs, dps.status_code, dps.status_symbol) # type: ignore [arg-type]
]
if dps.status_code is None:
return list(map(_InsertDatapoint, dps.timestamp, dps.value)) # type: ignore [arg-type]
return list(map(_InsertDatapoint, dps.timestamp, dps.value, dps.status_code)) # type: ignore [arg-type]

def _extract_raw_data_from_datapoints_array(self, dps: DatapointsArray) -> list[_InsertDatapoint]:
# Using `tolist()` converts to the nearest compatible built-in Python type (in C code):
values = dps.value.tolist() # type: ignore [union-attr]
timestamps = dps.timestamp.astype("datetime64[ms]").astype("int64").tolist()
if dps.type == "state":
n = len(timestamps)
nums = dps.numeric_value.tolist() if dps.numeric_value is not None else [None] * n
strs = dps.string_value.tolist() if dps.string_value is not None else [None] * n
if dps.null_timestamps:
nums = [None if ts in dps.null_timestamps else nv for ts, nv in zip(timestamps, nums)]
strs = [None if ts in dps.null_timestamps else sv for ts, sv in zip(timestamps, strs)]
if dps.status_code is None:
return [
_InsertDatapoint(ts, None, None, None, "state", nv, sv)
for ts, nv, sv in zip(timestamps, nums, strs)
]
return [
_InsertDatapoint(ts, None, c, s, "state", nv, sv)
for ts, nv, sv, c, s in zip(
timestamps,
nums,
strs,
dps.status_code.tolist(),
dps.status_symbol.tolist(), # type: ignore [union-attr]
)
]

values = dps.value.tolist() # type: ignore [union-attr]

if dps.null_timestamps:
# 'Missing' and NaN can not be differentiated when we read from numpy arrays:
Expand Down
8 changes: 4 additions & 4 deletions cognite/client/_proto/data_point_insertion_request_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions cognite/client/_proto/data_point_insertion_request_pb2.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,20 @@ from typing import ClassVar as _ClassVar, Iterable as _Iterable, Mapping as _Map
DESCRIPTOR: _descriptor.FileDescriptor

class DataPointInsertionItem(_message.Message):
__slots__ = ("id", "externalId", "instanceId", "numericDatapoints", "stringDatapoints")
__slots__ = ("id", "externalId", "instanceId", "numericDatapoints", "stringDatapoints", "stateDatapoints")
ID_FIELD_NUMBER: _ClassVar[int]
EXTERNALID_FIELD_NUMBER: _ClassVar[int]
INSTANCEID_FIELD_NUMBER: _ClassVar[int]
NUMERICDATAPOINTS_FIELD_NUMBER: _ClassVar[int]
STRINGDATAPOINTS_FIELD_NUMBER: _ClassVar[int]
STATEDATAPOINTS_FIELD_NUMBER: _ClassVar[int]
id: int
externalId: str
instanceId: _data_points_pb2.InstanceId
numericDatapoints: _data_points_pb2.NumericDatapoints
stringDatapoints: _data_points_pb2.StringDatapoints
def __init__(self, id: _Optional[int] = ..., externalId: _Optional[str] = ..., instanceId: _Optional[_Union[_data_points_pb2.InstanceId, _Mapping]] = ..., numericDatapoints: _Optional[_Union[_data_points_pb2.NumericDatapoints, _Mapping]] = ..., stringDatapoints: _Optional[_Union[_data_points_pb2.StringDatapoints, _Mapping]] = ...) -> None: ...
stateDatapoints: _data_points_pb2.StateDatapoints
def __init__(self, id: _Optional[int] = ..., externalId: _Optional[str] = ..., instanceId: _Optional[_Union[_data_points_pb2.InstanceId, _Mapping]] = ..., numericDatapoints: _Optional[_Union[_data_points_pb2.NumericDatapoints, _Mapping]] = ..., stringDatapoints: _Optional[_Union[_data_points_pb2.StringDatapoints, _Mapping]] = ..., stateDatapoints: _Optional[_Union[_data_points_pb2.StateDatapoints, _Mapping]] = ...) -> None: ...

class DataPointInsertionRequest(_message.Message):
__slots__ = ("items",)
Expand Down
Loading
Loading