diff --git a/cognite/client/_api/datapoint_tasks.py b/cognite/client/_api/datapoint_tasks.py index 0a44d7f1b9..c433d3080f 100644 --- a/cognite/client/_api/datapoint_tasks.py +++ b/cognite/client/_api/datapoint_tasks.py @@ -23,10 +23,12 @@ from cognite.client._constants import NUMPY_IS_AVAILABLE from cognite.client._proto.data_point_list_response_pb2 import DataPointListItem +from cognite.client._proto.data_points_pb2 import StateDatapoint from cognite.client.data_classes.data_modeling import NodeId from cognite.client.data_classes.datapoint_aggregates import ( _INT_AGGREGATES_CAMEL, _OBJECT_AGGREGATES_CAMEL, + STATE_AGGREGATES_CAMEL, Aggregate, ) from cognite.client.data_classes.datapoints import ( @@ -39,6 +41,7 @@ AggregateDatapoints, DatapointsRaw, DpsUnpackFns, + StateDatapoints, _DataContainer, create_aggregates_arrays_from_dps_container, create_aggregates_list_from_dps_container, @@ -74,7 +77,6 @@ DatapointsExternalId, DatapointsId, DatapointsInstanceId, - RawDatapointValue, ) @@ -617,7 +619,14 @@ def _clear_data_containers(self) -> None: # Help gc clear out temporary containers del self.query, self.ts_data, self.dps_data del self.subtasks, self.subtask_outside_points - for container in ("status_code", "status_symbol", "object_data"): + for container in ( + "status_code", + "status_symbol", + "object_data", + "state_numeric_data", + "state_string_data", + "state_agg_data", + ): try: delattr(self, container) except AttributeError: @@ -696,8 +705,13 @@ def split_into_subtasks(self, concurrency_limit: int, n_tot_queries: int) -> lis class BaseRawTaskOrchestrator(BaseTaskOrchestrator): def __init__(self, **kwargs: Any) -> None: - self.dp_outside_start: tuple[int, RawDatapointValue] | None = None - self.dp_outside_end: tuple[int, RawDatapointValue] | None = None + self.dp_outside_start: tuple[int, float | str | tuple[int | None, str | None] | None] | None = None + self.dp_outside_end: tuple[int, float | str | tuple[int | None, str | None] | None] | None = None + # Initialise state containers BEFORE super().__init__ because that call chains into + # _store_ts_info -> _store_first_batch -> _unpack_and_store, which reads these attrs. + # _store_ts_info will swap them to defaultdict(list) for state-typed series. + self.state_numeric_data: _DataContainer | None = None + self.state_string_data: _DataContainer | None = None super().__init__(**kwargs) self.dp_outside_status_code_start: int | None = None @@ -705,6 +719,14 @@ def __init__(self, **kwargs: Any) -> None: self.dp_outside_status_symbol_start: str | None = None self.dp_outside_status_symbol_end: str | None = None + def _store_ts_info(self, res: DataPointListItem) -> None: + super()._store_ts_info(res) + if self.ts_info.get("type") == "state": + self.state_numeric_data = defaultdict(list) + self.state_string_data = defaultdict(list) + if self.use_numpy: + self.raw_dtype_numpy = np.object_ + @property def offset_next(self) -> Literal[1]: return 1 # millisecond @@ -714,10 +736,29 @@ def _create_empty_result(self) -> Datapoints | DatapointsArray: if not self.use_numpy: if self.query.include_status: status_cols.update(status_code=[], status_symbol=[]) + if self.ts_info.get("type") == "state": + return Datapoints( + **self.ts_info, + timestamp=[], + value=None, + numeric_value=[], + string_value=[], + **status_cols, + ) return Datapoints(**self.ts_info, timestamp=[], value=[], **status_cols) if self.query.include_status: status_cols.update(status_code=np.array([], dtype=np.int32), status_symbol=np.array([], dtype=np.object_)) + if self.ts_info.get("type") == "state": + return DatapointsArray._load_from_arrays( + { + **self.ts_info, + "timestamp": np.array([], dtype=np.int64), + "numeric_value": np.array([], dtype=np.object_), + "string_value": np.array([], dtype=np.object_), + **status_cols, + } + ) return DatapointsArray._load_from_arrays( { **self.ts_info, @@ -728,7 +769,7 @@ def _create_empty_result(self) -> Datapoints | DatapointsArray: ) def _was_any_data_fetched(self) -> bool: - return any((self.ts_data, self.dp_outside_start, self.dp_outside_end)) + return any((self.ts_data, self.state_numeric_data, self.dp_outside_start, self.dp_outside_end)) def _get_result(self) -> Datapoints | DatapointsArray: if not self._was_any_data_fetched(): @@ -745,6 +786,17 @@ def _get_result(self) -> Datapoints | DatapointsArray: ) if not self.query.ignore_bad_datapoints: status_columns["null_timestamps"] = self.null_timestamps + if self.ts_info.get("type") == "state": + assert self.state_numeric_data is not None and self.state_string_data is not None + return DatapointsArray._load_from_arrays( + { + **self.ts_info, + "timestamp": create_array_from_dps_container(self.ts_data), + "numeric_value": create_array_from_dps_container(self.state_numeric_data), + "string_value": create_array_from_dps_container(self.state_string_data), + **status_columns, + } + ) return DatapointsArray._load_from_arrays( { **self.ts_info, @@ -758,6 +810,16 @@ def _get_result(self) -> Datapoints | DatapointsArray: status_code=create_list_from_dps_container(self.status_code), status_symbol=create_list_from_dps_container(self.status_symbol), ) + if self.ts_info.get("type") == "state": + assert self.state_numeric_data is not None and self.state_string_data is not None + return Datapoints( + **self.ts_info, + timestamp=create_list_from_dps_container(self.ts_data), + value=None, + numeric_value=create_list_from_dps_container(self.state_numeric_data), + string_value=create_list_from_dps_container(self.state_string_data), + **status_columns, + ) return Datapoints( **self.ts_info, timestamp=create_list_from_dps_container(self.ts_data), @@ -775,7 +837,36 @@ def _include_outside_points_in_result(self) -> None: if not dp: continue ts: list[int] | NumpyInt64Array = [dp[0]] - value: list[float | str] | NumpyFloat64Array | NumpyObjArray = [dp[1]] + if self.ts_info.get("type") == "state": + assert self.state_numeric_data is not None and self.state_string_data is not None + assert isinstance(dp[1], tuple) + num, s = dp[1] + num_list: list[int | None] = [num] + str_list: list[str | None] = [s] + if self.use_numpy: + ts = np.array(ts, dtype=np.int64) + num_arr = np.array(num_list, dtype=np.object_) + str_arr = np.array(str_list, dtype=np.object_) + if num is None and s is None: + self.null_timestamps.add(dp[0]) + self.ts_data[idx,].append(ts) + self.state_numeric_data[idx,].append(num_arr) + self.state_string_data[idx,].append(str_arr) + else: + self.ts_data[idx,].append(ts) + self.state_numeric_data[idx,].append(num_list) + self.state_string_data[idx,].append(str_list) + + if self.query.include_status: + if self.use_numpy: + status_code = np.array(status_code, dtype=np.uint32) # type: ignore [assignment] + status_symbol = np.array(status_symbol, dtype=np.object_) # type: ignore [assignment] + self.status_code[idx,].append(status_code) + self.status_symbol[idx,].append(status_symbol) + continue + + assert not isinstance(dp[1], tuple) # state branch handled above + value: list[float | str | None] | NumpyFloat64Array | NumpyObjArray = [dp[1]] if self.use_numpy: ts = np.array(ts, dtype=np.int64) value = np.array(value, dtype=self.raw_dtype_numpy) @@ -792,6 +883,25 @@ def _include_outside_points_in_result(self) -> None: self.status_symbol[idx,].append(status_symbol) def _unpack_and_store(self, idx: tuple[float, ...], dps: DatapointsRaw) -> None: # type: ignore [override] + if self.state_numeric_data is not None: + assert self.state_string_data is not None + st_dps = cast(StateDatapoints, dps) + if self.use_numpy: + self.ts_data[idx].append(DpsUnpackFns.extract_timestamps_numpy(st_dps)) + self.state_numeric_data[idx].append(DpsUnpackFns.extract_state_numeric_numpy(st_dps)) + self.state_string_data[idx].append(DpsUnpackFns.extract_state_string_numpy(st_dps)) + if self.query.include_status: + self.status_code[idx].append(DpsUnpackFns.extract_status_code_numpy(st_dps)) + self.status_symbol[idx].append(DpsUnpackFns.extract_status_symbol_numpy(st_dps)) + else: + self.ts_data[idx].append(DpsUnpackFns.extract_timestamps(st_dps)) + self.state_numeric_data[idx].append(DpsUnpackFns.extract_state_numeric(st_dps)) + self.state_string_data[idx].append(DpsUnpackFns.extract_state_string(st_dps)) + if self.query.include_status: + self.status_code[idx].append(DpsUnpackFns.extract_status_code(st_dps)) + self.status_symbol[idx].append(DpsUnpackFns.extract_status_symbol(st_dps)) + return + if self.use_numpy: self.ts_data[idx].append(DpsUnpackFns.extract_timestamps_numpy(dps)) assert self.raw_dtype_numpy is not None @@ -834,14 +944,30 @@ def _extract_outside_points(self, dps: DatapointsRaw) -> None: # We got a dp before `start`, this (and 'after') should not impact our count towards `limit`, # so we pop to remove it from dps: first = dps.pop(0) - if not self.query.ignore_bad_datapoints: + if isinstance(first, StateDatapoint): + self.dp_outside_start = ( + DpsUnpackFns.ts(first), + ( + DpsUnpackFns.nullable_state_numeric(first), + DpsUnpackFns.nullable_state_string(first), + ), + ) + elif not self.query.ignore_bad_datapoints: self.dp_outside_start = DpsUnpackFns.ts(first), DpsUnpackFns.nullable_raw_dp(first) else: self.dp_outside_start = DpsUnpackFns.ts(first), DpsUnpackFns.raw_dp(first) if dps and dps[-1].timestamp >= self.query.end_ms: # >= because `end` is exclusive last = dps.pop(-1) - if not self.query.ignore_bad_datapoints: + if isinstance(last, StateDatapoint): + self.dp_outside_end = ( + DpsUnpackFns.ts(last), + ( + DpsUnpackFns.nullable_state_numeric(last), + DpsUnpackFns.nullable_state_string(last), + ), + ) + elif not self.query.ignore_bad_datapoints: self.dp_outside_end = DpsUnpackFns.ts(last), DpsUnpackFns.nullable_raw_dp(last) else: self.dp_outside_end = DpsUnpackFns.ts(last), DpsUnpackFns.raw_dp(last) @@ -906,8 +1032,19 @@ def offset_next(self) -> int: def _set_aggregate_vars(self, aggs_camel_case: list[str], use_numpy: bool, include_status: bool) -> None: # Developer note here: If you ask for datapoints to be returned in JSON, you get `count` as an integer. # Nice. However, when using protobuf, you get `double` xD - self.all_aggregates = aggs_camel_case - self.object_aggs = list(_OBJECT_AGGREGATES_CAMEL.intersection(aggs_camel_case)) + self.request_state_aggregate_details = bool(STATE_AGGREGATES_CAMEL.intersection(aggs_camel_case)) + self.state_agg_data: defaultdict[tuple[float, ...], list[Any]] | None + if self.request_state_aggregate_details: + self.state_agg_data = defaultdict(list) + else: + self.state_agg_data = None + + aggs_without_state = [a for a in aggs_camel_case if a not in STATE_AGGREGATES_CAMEL] + self.all_aggregates = list(aggs_without_state) + if self.request_state_aggregate_details: + self.all_aggregates.append("stateAggregates") + + self.object_aggs = list(_OBJECT_AGGREGATES_CAMEL.intersection(aggs_without_state)) if self.object_aggs: self.object_data: dict[Literal["minDatapoint", "maxDatapoint"], _DataContainer] = { agg: defaultdict(list) for agg in self.object_aggs @@ -915,7 +1052,7 @@ def _set_aggregate_vars(self, aggs_camel_case: list[str], use_numpy: bool, inclu self.object_agg_unpack_fns = [ DpsUnpackFns.extract_fn_min_or_max_dp(agg, include_status) for agg in self.object_aggs ] - self.numeric_aggs = [agg for agg in aggs_camel_case if agg not in self.object_aggs] + self.numeric_aggs = [agg for agg in aggs_without_state if agg not in self.object_aggs] self.n_numeric_aggs = len(self.numeric_aggs) if self.n_numeric_aggs: self.numeric_agg_unpack_fn = DpsUnpackFns.custom_from_aggregates(self.numeric_aggs) @@ -938,6 +1075,8 @@ def _create_empty_result(self) -> Datapoints | DatapointsArray: arr_dct.update({agg: np.array([], dtype=np.int64) for agg in self.int_aggs}) if self.object_aggs: arr_dct.update({agg: np.array([], dtype=np.object_) for agg in self.object_aggs}) + if self.request_state_aggregate_details: + arr_dct["stateAggregates"] = np.array([], dtype=np.object_) return DatapointsArray._load_from_arrays({**self.ts_info, **arr_dct}) lst_dct: dict[str, list] = {agg: [] for agg in self.all_aggregates} @@ -962,6 +1101,8 @@ def _get_result(self) -> Datapoints | DatapointsArray: # (step_)interpolation), count returns nan... which we need float to represent... which we do not want. # Thus we convert any NaNs to 0 (which for count - and duration - makes perfect sense): arr_dct[agg] = ensure_int_numpy(arr_dct[agg]) + if self.state_agg_data is not None: + arr_dct["stateAggregates"] = create_object_array_from_container(self.state_agg_data) return DatapointsArray._load_from_arrays({**self.ts_info, **arr_dct}) lst_dct = {"timestamp": create_list_from_dps_container(self.ts_data)} @@ -975,6 +1116,8 @@ def _get_result(self) -> Datapoints | DatapointsArray: lst_dct[agg] = list(map(ensure_int, lst_dct[agg])) if self.object_aggs: lst_dct.update({agg: create_list_from_dps_container(data) for agg, data in self.object_data.items()}) + if self.state_agg_data is not None: + lst_dct["stateAggregates"] = create_list_from_dps_container(self.state_agg_data) return Datapoints(**self.ts_info, **convert_all_keys_to_snake_case(lst_dct)) def _unpack_and_store(self, idx: tuple[float, ...], dps: AggregateDatapoints) -> None: # type: ignore [override] @@ -983,6 +1126,9 @@ def _unpack_and_store(self, idx: tuple[float, ...], dps: AggregateDatapoints) -> for agg, unpack_fn in zip(self.object_aggs, self.object_agg_unpack_fns): self.object_data[agg][idx].append(list(map(unpack_fn, dps))) + if self.state_agg_data is not None: + self.state_agg_data[idx].append(DpsUnpackFns.extract_state_aggregates_rows(dps)) + if self.use_numpy: self._unpack_and_store_numpy(idx, dps) else: diff --git a/cognite/client/_api/datapoints.py b/cognite/client/_api/datapoints.py index 914cedcfe6..3ce655b5e6 100644 --- a/cognite/client/_api/datapoints.py +++ b/cognite/client/_api/datapoints.py @@ -46,7 +46,7 @@ LatestDatapointQuery, ) from cognite.client.data_classes.data_modeling.ids import NodeId -from cognite.client.data_classes.datapoint_aggregates import Aggregate +from cognite.client.data_classes.datapoint_aggregates import STATE_AGGREGATES_CAMEL, Aggregate from cognite.client.exceptions import CogniteAPIError, CogniteNotFoundError from cognite.client.utils import _json_extended as _json from cognite.client.utils._auxiliary import ( @@ -60,6 +60,7 @@ unpack_items_in_payload, ) from cognite.client.utils._concurrency import AsyncSDKTask, execute_async_tasks +from cognite.client.utils._experimental import FeaturePreviewWarning from cognite.client.utils._identifier import Identifier, IdentifierSequence, IdentifierSequenceCore from cognite.client.utils._importing import local_import from cognite.client.utils._time import ( @@ -533,6 +534,36 @@ def __init__(self, config: ClientConfig, api_version: str | None, cognite_client self._POST_DPS_OBJECTS_LIMIT = 10_000 self.query_validator = _DpsQueryValidator(dps_limit_raw=self._DPS_LIMIT_RAW, dps_limit_agg=self._DPS_LIMIT_AGG) + self._state_time_series_warning = FeaturePreviewWarning( + api_maturity="beta", + sdk_maturity="alpha", + feature_name="State time series", + ) + + # Warns once per DatapointsAPI instance when state time series features are touched + # via queries (state aggregates), results (returned series of type "state") or inserts + # (handled by DatapointsPoster after validation). + def _maybe_warn_state( + self, + *, + queries: list[DatapointsQuery] | None = None, + results: Datapoints | DatapointsArray | DatapointsList | DatapointsArrayList | None = None, + ) -> None: + if queries is not None: + for q in queries: + if q.aggregates and STATE_AGGREGATES_CAMEL.intersection(q.aggs_camel_case): + self._state_time_series_warning.warn() + return + if results is None: + return + if isinstance(results, (Datapoints, DatapointsArray)): + if results.type == "state": + self._state_time_series_warning.warn() + return + for item in results.data: + if item.type == "state": + self._state_time_series_warning.warn() + return def _get_semaphore(self, operation: Literal["read", "write", "delete"]) -> asyncio.BoundedSemaphore: from cognite.client import global_config @@ -1210,7 +1241,9 @@ async def retrieve( treat_uncertain_as_bad=treat_uncertain_as_bad, ) self.query_validator(parsed_queries := query.parse_into_queries()) + self._maybe_warn_state(queries=parsed_queries) dps_lst = await self._select_dps_fetch_strategy(parsed_queries)(self, parsed_queries).fetch_all_datapoints() + self._maybe_warn_state(results=dps_lst) if not query.is_single_identifier: return dps_lst @@ -1457,9 +1490,11 @@ async def retrieve_arrays( treat_uncertain_as_bad=treat_uncertain_as_bad, ) self.query_validator(parsed_queries := query.parse_into_queries()) + self._maybe_warn_state(queries=parsed_queries) dps_lst = await self._select_dps_fetch_strategy(parsed_queries)( self, parsed_queries ).fetch_all_datapoints_numpy() + self._maybe_warn_state(results=dps_lst) if not query.is_single_identifier: return dps_lst @@ -1604,10 +1639,12 @@ async def retrieve_dataframe( treat_uncertain_as_bad=treat_uncertain_as_bad, ) self.query_validator(parsed_queries := query.parse_into_queries()) + self._maybe_warn_state(queries=parsed_queries) fetcher = self._select_dps_fetch_strategy(parsed_queries)(self, parsed_queries) if not uniform_index: result = await fetcher.fetch_all_datapoints_numpy() + self._maybe_warn_state(results=result) return result.to_pandas( include_aggregate_name=include_aggregate_name, include_granularity_name=include_granularity_name, @@ -1625,6 +1662,7 @@ async def retrieve_dataframe( "OR when timezone is used OR when a calendar granularity is used (e.g. month/quarter/year)" ) result = await fetcher.fetch_all_datapoints_numpy() + self._maybe_warn_state(results=result) df = result.to_pandas( include_aggregate_name=include_aggregate_name, include_granularity_name=include_granularity_name, @@ -2303,24 +2341,44 @@ def _select_dps_fetch_strategy(self, queries: list[DatapointsQuery]) -> type[Dps class _InsertDatapoint(NamedTuple): ts: int | datetime.datetime - value: str | float + value: str | float | None = None status_code: int | None = None status_symbol: str | None = None + kind: Literal["raw", "state"] = "raw" + numeric_value: int | None = None + string_value: str | None = None @classmethod def from_dict(cls, dct: dict[str, Any]) -> Self: + if "numericValue" in dct or "stringValue" in dct: + if status := dct.get("status"): + return cls( + dct["timestamp"], + None, + status.get("code"), + status.get("symbol"), + "state", + dct.get("numericValue"), + dct.get("stringValue"), + ) + return cls(dct["timestamp"], None, None, None, "state", dct.get("numericValue"), dct.get("stringValue")) if status := dct.get("status"): - return cls(dct["timestamp"], dct["value"], status.get("code"), status.get("symbol")) - return cls(dct["timestamp"], dct["value"]) + return cls(dct["timestamp"], dct["value"], status.get("code"), status.get("symbol"), "raw", None, None) + return cls(dct["timestamp"], dct["value"], None, None, "raw", None, None) def dump(self) -> dict[str, Any]: - dumped: dict[str, Any] = {"timestamp": timestamp_to_ms(self.ts), "value": self.value} + dumped: dict[str, Any] = {"timestamp": timestamp_to_ms(self.ts)} + if self.kind == "state": + if self.numeric_value is not None: + dumped["numericValue"] = self.numeric_value + if self.string_value is not None: + dumped["stringValue"] = self.string_value + else: + dumped["value"] = _json.convert_nonfinite_float_to_str(self.value) if self.status_code: # also skip if 0 dumped["status"] = {"code": self.status_code} if self.status_symbol and self.status_symbol != "Good": dumped.setdefault("status", {})["symbol"] = self.status_symbol - # Out-of-range float values must be passed as strings: - dumped["value"] = _json.convert_nonfinite_float_to_str(dumped["value"]) return dumped @@ -2356,6 +2414,8 @@ def _verify_and_prepare_dps_objects( continue identifier = validate_user_input_dict_with_identifier(obj, required_keys={"datapoints"}) validated_dps = self._parse_and_validate_dps(obj["datapoints"]) + if any(dp.kind == "state" for dp in validated_dps): + self.dps_client._state_time_series_warning.warn() dps_to_insert[identifier].extend(validated_dps) return list(dps_to_insert.items()) @@ -2372,14 +2432,25 @@ def _parse_and_validate_dps(self, dps: Datapoints | DatapointsArray | list[tuple if self._dps_are_insert_ready(dps): return dps # Internal SDK shortcut to avoid casting elif self._dps_are_tuples(dps): - return [_InsertDatapoint(*tpl) for tpl in dps] + out: list[_InsertDatapoint] = [] + for tpl in dps: + match len(tpl): + case 2: + out.append(_InsertDatapoint(tpl[0], tpl[1])) + case 3: + ts, val, code = tpl + out.append(_InsertDatapoint(ts, val, int(code), None)) + case _: + raise TypeError(f"Unsupported datapoint tuple length {len(tpl)}: {tpl!r}") + return out elif self._dps_are_dicts(dps): try: return [_InsertDatapoint.from_dict(dp) for dp in dps] - except KeyError: + except KeyError as e: raise KeyError( - "A datapoint is missing one or both keys ['value', 'timestamp']. Note: 'status' is optional." - ) + "A datapoint dict must include 'timestamp' and either 'value' or at least one of " + "['numericValue', 'stringValue'] for state time series. 'status' is optional." + ) from e raise TypeError( "Datapoints to be inserted must be of type Datapoints or DatapointsArray (with raw datapoints), " f"or be a list containing tuples or dicts, not {type(dps[0])}" @@ -2446,9 +2517,26 @@ def _split_datapoints(lst: list[_T], n_first: int, n: int) -> Iterator[tuple[lis @staticmethod def _verify_dps_object_for_insertion(dps: Datapoints | DatapointsArray) -> None: - if dps.value is None: + n_ts = len(dps.timestamp) + if dps.type == "state": + if dps.numeric_value is None and dps.string_value is None: + raise ValueError( + f"Only raw datapoints are supported when inserting data from ``{type(dps).__name__}`` " + "(state series require numeric_value and/or string_value arrays)" + ) + if dps.numeric_value is not None and len(dps.numeric_value) != n_ts: + raise ValueError( + f"Number of timestamps ({n_ts}) does not match number of numeric state values " + f"({len(dps.numeric_value)}) to insert" + ) + if dps.string_value is not None and len(dps.string_value) != n_ts: + raise ValueError( + f"Number of timestamps ({n_ts}) does not match number of string state values " + f"({len(dps.string_value)}) to insert" + ) + elif dps.value is None: raise ValueError(f"Only raw datapoints are supported when inserting data from ``{type(dps).__name__}``") - if (n_ts := len(dps.timestamp)) != (n_dps := len(dps.value)): + elif n_ts != (n_dps := len(dps.value)): raise ValueError(f"Number of timestamps ({n_ts}) does not match number of datapoints ({n_dps}) to insert") if dps.status_code is not None and dps.status_symbol is not None: @@ -2461,14 +2549,50 @@ def _verify_dps_object_for_insertion(dps: Datapoints | DatapointsArray) -> None: raise ValueError("One of status code/symbol is missing on datapoints object") def _extract_raw_data_from_datapoints(self, dps: Datapoints) -> list[_InsertDatapoint]: + if dps.type == "state": + n = len(dps.timestamp) + nums = list(dps.numeric_value) if dps.numeric_value is not None else [None] * n + strs = list(dps.string_value) if dps.string_value is not None else [None] * n + if dps.status_code is None: + return [ + _InsertDatapoint(ts, None, None, None, "state", nv, sv) + for ts, nv, sv in zip(dps.timestamp, nums, strs) + ] + return [ + _InsertDatapoint(ts, None, c, s, "state", nv, sv) + for ts, nv, sv, c, s in zip(dps.timestamp, nums, strs, dps.status_code, dps.status_symbol) # type: ignore [arg-type] + ] if dps.status_code is None: return list(map(_InsertDatapoint, dps.timestamp, dps.value)) # type: ignore [arg-type] return list(map(_InsertDatapoint, dps.timestamp, dps.value, dps.status_code)) # type: ignore [arg-type] def _extract_raw_data_from_datapoints_array(self, dps: DatapointsArray) -> list[_InsertDatapoint]: # Using `tolist()` converts to the nearest compatible built-in Python type (in C code): - values = dps.value.tolist() # type: ignore [union-attr] timestamps = dps.timestamp.astype("datetime64[ms]").astype("int64").tolist() + if dps.type == "state": + n = len(timestamps) + nums = dps.numeric_value.tolist() if dps.numeric_value is not None else [None] * n + strs = dps.string_value.tolist() if dps.string_value is not None else [None] * n + if dps.null_timestamps: + nums = [None if ts in dps.null_timestamps else nv for ts, nv in zip(timestamps, nums)] + strs = [None if ts in dps.null_timestamps else sv for ts, sv in zip(timestamps, strs)] + if dps.status_code is None: + return [ + _InsertDatapoint(ts, None, None, None, "state", nv, sv) + for ts, nv, sv in zip(timestamps, nums, strs) + ] + return [ + _InsertDatapoint(ts, None, c, s, "state", nv, sv) + for ts, nv, sv, c, s in zip( + timestamps, + nums, + strs, + dps.status_code.tolist(), + dps.status_symbol.tolist(), # type: ignore [union-attr] + ) + ] + + values = dps.value.tolist() # type: ignore [union-attr] if dps.null_timestamps: # 'Missing' and NaN can not be differentiated when we read from numpy arrays: diff --git a/cognite/client/_proto/data_point_insertion_request_pb2.py b/cognite/client/_proto/data_point_insertion_request_pb2.py index 19fca1f776..f385317466 100644 --- a/cognite/client/_proto/data_point_insertion_request_pb2.py +++ b/cognite/client/_proto/data_point_insertion_request_pb2.py @@ -15,7 +15,7 @@ import cognite.client._proto.data_points_pb2 as data__points__pb2 -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\"data_point_insertion_request.proto\x12\x1f\x63om.cognite.v1.timeseries.proto\x1a\x11\x64\x61ta_points.proto\"\xc7\x02\n\x16\x44\x61taPointInsertionItem\x12\x0c\n\x02id\x18\x01 \x01(\x03H\x00\x12\x14\n\nexternalId\x18\x02 \x01(\tH\x00\x12\x41\n\ninstanceId\x18\x05 \x01(\x0b\x32+.com.cognite.v1.timeseries.proto.InstanceIdH\x00\x12O\n\x11numericDatapoints\x18\x03 \x01(\x0b\x32\x32.com.cognite.v1.timeseries.proto.NumericDatapointsH\x01\x12M\n\x10stringDatapoints\x18\x04 \x01(\x0b\x32\x31.com.cognite.v1.timeseries.proto.StringDatapointsH\x01\x42\x15\n\x13timeSeriesReferenceB\x0f\n\rdatapointType\"c\n\x19\x44\x61taPointInsertionRequest\x12\x46\n\x05items\x18\x01 \x03(\x0b\x32\x37.com.cognite.v1.timeseries.proto.DataPointInsertionItemB\x02P\x01\x62\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\"data_point_insertion_request.proto\x12\x1f\x63om.cognite.v1.timeseries.proto\x1a\x11\x64\x61ta_points.proto\"\x94\x03\n\x16\x44\x61taPointInsertionItem\x12\x0c\n\x02id\x18\x01 \x01(\x03H\x00\x12\x14\n\nexternalId\x18\x02 \x01(\tH\x00\x12\x41\n\ninstanceId\x18\x05 \x01(\x0b\x32+.com.cognite.v1.timeseries.proto.InstanceIdH\x00\x12O\n\x11numericDatapoints\x18\x03 \x01(\x0b\x32\x32.com.cognite.v1.timeseries.proto.NumericDatapointsH\x01\x12M\n\x10stringDatapoints\x18\x04 \x01(\x0b\x32\x31.com.cognite.v1.timeseries.proto.StringDatapointsH\x01\x12K\n\x0fstateDatapoints\x18\x06 \x01(\x0b\x32\x30.com.cognite.v1.timeseries.proto.StateDatapointsH\x01\x42\x15\n\x13timeSeriesReferenceB\x0f\n\rdatapointType\"c\n\x19\x44\x61taPointInsertionRequest\x12\x46\n\x05items\x18\x01 \x03(\x0b\x32\x37.com.cognite.v1.timeseries.proto.DataPointInsertionItemB\x02P\x01\x62\x06proto3') _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) @@ -24,7 +24,7 @@ _globals['DESCRIPTOR']._options = None _globals['DESCRIPTOR']._serialized_options = b'P\001' _globals['_DATAPOINTINSERTIONITEM']._serialized_start=91 - _globals['_DATAPOINTINSERTIONITEM']._serialized_end=418 - _globals['_DATAPOINTINSERTIONREQUEST']._serialized_start=420 - _globals['_DATAPOINTINSERTIONREQUEST']._serialized_end=519 + _globals['_DATAPOINTINSERTIONITEM']._serialized_end=495 + _globals['_DATAPOINTINSERTIONREQUEST']._serialized_start=497 + _globals['_DATAPOINTINSERTIONREQUEST']._serialized_end=596 # @@protoc_insertion_point(module_scope) diff --git a/cognite/client/_proto/data_point_insertion_request_pb2.pyi b/cognite/client/_proto/data_point_insertion_request_pb2.pyi index c553056fef..45a7f52b23 100644 --- a/cognite/client/_proto/data_point_insertion_request_pb2.pyi +++ b/cognite/client/_proto/data_point_insertion_request_pb2.pyi @@ -7,18 +7,20 @@ from typing import ClassVar as _ClassVar, Iterable as _Iterable, Mapping as _Map DESCRIPTOR: _descriptor.FileDescriptor class DataPointInsertionItem(_message.Message): - __slots__ = ("id", "externalId", "instanceId", "numericDatapoints", "stringDatapoints") + __slots__ = ("id", "externalId", "instanceId", "numericDatapoints", "stringDatapoints", "stateDatapoints") ID_FIELD_NUMBER: _ClassVar[int] EXTERNALID_FIELD_NUMBER: _ClassVar[int] INSTANCEID_FIELD_NUMBER: _ClassVar[int] NUMERICDATAPOINTS_FIELD_NUMBER: _ClassVar[int] STRINGDATAPOINTS_FIELD_NUMBER: _ClassVar[int] + STATEDATAPOINTS_FIELD_NUMBER: _ClassVar[int] id: int externalId: str instanceId: _data_points_pb2.InstanceId numericDatapoints: _data_points_pb2.NumericDatapoints stringDatapoints: _data_points_pb2.StringDatapoints - def __init__(self, id: _Optional[int] = ..., externalId: _Optional[str] = ..., instanceId: _Optional[_Union[_data_points_pb2.InstanceId, _Mapping]] = ..., numericDatapoints: _Optional[_Union[_data_points_pb2.NumericDatapoints, _Mapping]] = ..., stringDatapoints: _Optional[_Union[_data_points_pb2.StringDatapoints, _Mapping]] = ...) -> None: ... + stateDatapoints: _data_points_pb2.StateDatapoints + def __init__(self, id: _Optional[int] = ..., externalId: _Optional[str] = ..., instanceId: _Optional[_Union[_data_points_pb2.InstanceId, _Mapping]] = ..., numericDatapoints: _Optional[_Union[_data_points_pb2.NumericDatapoints, _Mapping]] = ..., stringDatapoints: _Optional[_Union[_data_points_pb2.StringDatapoints, _Mapping]] = ..., stateDatapoints: _Optional[_Union[_data_points_pb2.StateDatapoints, _Mapping]] = ...) -> None: ... class DataPointInsertionRequest(_message.Message): __slots__ = ("items",) diff --git a/cognite/client/_proto/data_point_list_response_pb2.py b/cognite/client/_proto/data_point_list_response_pb2.py index 32af28d94b..de38646777 100644 --- a/cognite/client/_proto/data_point_list_response_pb2.py +++ b/cognite/client/_proto/data_point_list_response_pb2.py @@ -15,7 +15,7 @@ import cognite.client._proto.data_points_pb2 as data__points__pb2 -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1e\x64\x61ta_point_list_response.proto\x12\x1f\x63om.cognite.v1.timeseries.proto\x1a\x11\x64\x61ta_points.proto\"\x95\x04\n\x11\x44\x61taPointListItem\x12\n\n\x02id\x18\x01 \x01(\x03\x12\x12\n\nexternalId\x18\x02 \x01(\t\x12?\n\ninstanceId\x18\x0b \x01(\x0b\x32+.com.cognite.v1.timeseries.proto.InstanceId\x12\x10\n\x08isString\x18\x06 \x01(\x08\x12\x0e\n\x06isStep\x18\x07 \x01(\x08\x12\x0c\n\x04unit\x18\x08 \x01(\t\x12\x12\n\nnextCursor\x18\t \x01(\t\x12\x16\n\x0eunitExternalId\x18\n \x01(\t\x12=\n\x04type\x18\x0c \x01(\x0e\x32/.com.cognite.v1.timeseries.proto.TimeSeriesType\x12O\n\x11numericDatapoints\x18\x03 \x01(\x0b\x32\x32.com.cognite.v1.timeseries.proto.NumericDatapointsH\x00\x12M\n\x10stringDatapoints\x18\x04 \x01(\x0b\x32\x31.com.cognite.v1.timeseries.proto.StringDatapointsH\x00\x12S\n\x13\x61ggregateDatapoints\x18\x05 \x01(\x0b\x32\x34.com.cognite.v1.timeseries.proto.AggregateDatapointsH\x00\x42\x0f\n\rdatapointType\"Z\n\x15\x44\x61taPointListResponse\x12\x41\n\x05items\x18\x01 \x03(\x0b\x32\x32.com.cognite.v1.timeseries.proto.DataPointListItem*j\n\x0eTimeSeriesType\x12\x1f\n\x1bTIMESERIES_TYPE_UNSPECIFIED\x10\x00\x12\x1b\n\x17TIMESERIES_TYPE_NUMERIC\x10\x01\x12\x1a\n\x16TIMESERIES_TYPE_STRING\x10\x02\x42\x02P\x01\x62\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1e\x64\x61ta_point_list_response.proto\x12\x1f\x63om.cognite.v1.timeseries.proto\x1a\x11\x64\x61ta_points.proto\"\xe2\x04\n\x11\x44\x61taPointListItem\x12\n\n\x02id\x18\x01 \x01(\x03\x12\x12\n\nexternalId\x18\x02 \x01(\t\x12?\n\ninstanceId\x18\x0b \x01(\x0b\x32+.com.cognite.v1.timeseries.proto.InstanceId\x12\x10\n\x08isString\x18\x06 \x01(\x08\x12\x0e\n\x06isStep\x18\x07 \x01(\x08\x12\x0c\n\x04unit\x18\x08 \x01(\t\x12\x12\n\nnextCursor\x18\t \x01(\t\x12\x16\n\x0eunitExternalId\x18\n \x01(\t\x12=\n\x04type\x18\x0c \x01(\x0e\x32/.com.cognite.v1.timeseries.proto.TimeSeriesType\x12O\n\x11numericDatapoints\x18\x03 \x01(\x0b\x32\x32.com.cognite.v1.timeseries.proto.NumericDatapointsH\x00\x12M\n\x10stringDatapoints\x18\x04 \x01(\x0b\x32\x31.com.cognite.v1.timeseries.proto.StringDatapointsH\x00\x12S\n\x13\x61ggregateDatapoints\x18\x05 \x01(\x0b\x32\x34.com.cognite.v1.timeseries.proto.AggregateDatapointsH\x00\x12K\n\x0fstateDatapoints\x18\r \x01(\x0b\x32\x30.com.cognite.v1.timeseries.proto.StateDatapointsH\x00\x42\x0f\n\rdatapointType\"Z\n\x15\x44\x61taPointListResponse\x12\x41\n\x05items\x18\x01 \x03(\x0b\x32\x32.com.cognite.v1.timeseries.proto.DataPointListItem*\x85\x01\n\x0eTimeSeriesType\x12\x1f\n\x1bTIMESERIES_TYPE_UNSPECIFIED\x10\x00\x12\x1b\n\x17TIMESERIES_TYPE_NUMERIC\x10\x01\x12\x1a\n\x16TIMESERIES_TYPE_STRING\x10\x02\x12\x19\n\x15TIMESERIES_TYPE_STATE\x10\x03\x42\x02P\x01\x62\x06proto3') _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) @@ -23,10 +23,10 @@ if _descriptor._USE_C_DESCRIPTORS == False: _globals['DESCRIPTOR']._options = None _globals['DESCRIPTOR']._serialized_options = b'P\001' - _globals['_TIMESERIESTYPE']._serialized_start=714 - _globals['_TIMESERIESTYPE']._serialized_end=820 + _globals['_TIMESERIESTYPE']._serialized_start=792 + _globals['_TIMESERIESTYPE']._serialized_end=925 _globals['_DATAPOINTLISTITEM']._serialized_start=87 - _globals['_DATAPOINTLISTITEM']._serialized_end=620 - _globals['_DATAPOINTLISTRESPONSE']._serialized_start=622 - _globals['_DATAPOINTLISTRESPONSE']._serialized_end=712 + _globals['_DATAPOINTLISTITEM']._serialized_end=697 + _globals['_DATAPOINTLISTRESPONSE']._serialized_start=699 + _globals['_DATAPOINTLISTRESPONSE']._serialized_end=789 # @@protoc_insertion_point(module_scope) diff --git a/cognite/client/_proto/data_point_list_response_pb2.pyi b/cognite/client/_proto/data_point_list_response_pb2.pyi index bb817316ca..4baeb6f4bb 100644 --- a/cognite/client/_proto/data_point_list_response_pb2.pyi +++ b/cognite/client/_proto/data_point_list_response_pb2.pyi @@ -12,12 +12,14 @@ class TimeSeriesType(int, metaclass=_enum_type_wrapper.EnumTypeWrapper): TIMESERIES_TYPE_UNSPECIFIED: _ClassVar[TimeSeriesType] TIMESERIES_TYPE_NUMERIC: _ClassVar[TimeSeriesType] TIMESERIES_TYPE_STRING: _ClassVar[TimeSeriesType] + TIMESERIES_TYPE_STATE: _ClassVar[TimeSeriesType] TIMESERIES_TYPE_UNSPECIFIED: TimeSeriesType TIMESERIES_TYPE_NUMERIC: TimeSeriesType TIMESERIES_TYPE_STRING: TimeSeriesType +TIMESERIES_TYPE_STATE: TimeSeriesType class DataPointListItem(_message.Message): - __slots__ = ("id", "externalId", "instanceId", "isString", "isStep", "unit", "nextCursor", "unitExternalId", "type", "numericDatapoints", "stringDatapoints", "aggregateDatapoints") + __slots__ = ("id", "externalId", "instanceId", "isString", "isStep", "unit", "nextCursor", "unitExternalId", "type", "numericDatapoints", "stringDatapoints", "aggregateDatapoints", "stateDatapoints") ID_FIELD_NUMBER: _ClassVar[int] EXTERNALID_FIELD_NUMBER: _ClassVar[int] INSTANCEID_FIELD_NUMBER: _ClassVar[int] @@ -30,6 +32,7 @@ class DataPointListItem(_message.Message): NUMERICDATAPOINTS_FIELD_NUMBER: _ClassVar[int] STRINGDATAPOINTS_FIELD_NUMBER: _ClassVar[int] AGGREGATEDATAPOINTS_FIELD_NUMBER: _ClassVar[int] + STATEDATAPOINTS_FIELD_NUMBER: _ClassVar[int] id: int externalId: str instanceId: _data_points_pb2.InstanceId @@ -42,7 +45,8 @@ class DataPointListItem(_message.Message): numericDatapoints: _data_points_pb2.NumericDatapoints stringDatapoints: _data_points_pb2.StringDatapoints aggregateDatapoints: _data_points_pb2.AggregateDatapoints - def __init__(self, id: _Optional[int] = ..., externalId: _Optional[str] = ..., instanceId: _Optional[_Union[_data_points_pb2.InstanceId, _Mapping]] = ..., isString: bool = ..., isStep: bool = ..., unit: _Optional[str] = ..., nextCursor: _Optional[str] = ..., unitExternalId: _Optional[str] = ..., type: _Optional[_Union[TimeSeriesType, str]] = ..., numericDatapoints: _Optional[_Union[_data_points_pb2.NumericDatapoints, _Mapping]] = ..., stringDatapoints: _Optional[_Union[_data_points_pb2.StringDatapoints, _Mapping]] = ..., aggregateDatapoints: _Optional[_Union[_data_points_pb2.AggregateDatapoints, _Mapping]] = ...) -> None: ... + stateDatapoints: _data_points_pb2.StateDatapoints + def __init__(self, id: _Optional[int] = ..., externalId: _Optional[str] = ..., instanceId: _Optional[_Union[_data_points_pb2.InstanceId, _Mapping]] = ..., isString: bool = ..., isStep: bool = ..., unit: _Optional[str] = ..., nextCursor: _Optional[str] = ..., unitExternalId: _Optional[str] = ..., type: _Optional[_Union[TimeSeriesType, str]] = ..., numericDatapoints: _Optional[_Union[_data_points_pb2.NumericDatapoints, _Mapping]] = ..., stringDatapoints: _Optional[_Union[_data_points_pb2.StringDatapoints, _Mapping]] = ..., aggregateDatapoints: _Optional[_Union[_data_points_pb2.AggregateDatapoints, _Mapping]] = ..., stateDatapoints: _Optional[_Union[_data_points_pb2.StateDatapoints, _Mapping]] = ...) -> None: ... class DataPointListResponse(_message.Message): __slots__ = ("items",) diff --git a/cognite/client/_proto/data_points_pb2.py b/cognite/client/_proto/data_points_pb2.py index 7d949d9d31..c32eea6102 100644 --- a/cognite/client/_proto/data_points_pb2.py +++ b/cognite/client/_proto/data_points_pb2.py @@ -14,7 +14,7 @@ -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x11\x64\x61ta_points.proto\x12\x1f\x63om.cognite.v1.timeseries.proto\"&\n\x06Status\x12\x0c\n\x04\x63ode\x18\x01 \x01(\x03\x12\x0e\n\x06symbol\x18\x02 \x01(\t\"\x80\x01\n\x10NumericDatapoint\x12\x11\n\ttimestamp\x18\x01 \x01(\x03\x12\r\n\x05value\x18\x02 \x01(\x01\x12\x37\n\x06status\x18\x03 \x01(\x0b\x32\'.com.cognite.v1.timeseries.proto.Status\x12\x11\n\tnullValue\x18\x04 \x01(\x08\"Z\n\x11NumericDatapoints\x12\x45\n\ndatapoints\x18\x01 \x03(\x0b\x32\x31.com.cognite.v1.timeseries.proto.NumericDatapoint\"\x7f\n\x0fStringDatapoint\x12\x11\n\ttimestamp\x18\x01 \x01(\x03\x12\r\n\x05value\x18\x02 \x01(\t\x12\x37\n\x06status\x18\x03 \x01(\x0b\x32\'.com.cognite.v1.timeseries.proto.Status\x12\x11\n\tnullValue\x18\x04 \x01(\x08\"X\n\x10StringDatapoints\x12\x44\n\ndatapoints\x18\x01 \x03(\x0b\x32\x30.com.cognite.v1.timeseries.proto.StringDatapoint\"\x83\x04\n\x12\x41ggregateDatapoint\x12\x11\n\ttimestamp\x18\x01 \x01(\x03\x12\x0f\n\x07\x61verage\x18\x02 \x01(\x01\x12\x0b\n\x03max\x18\x03 \x01(\x01\x12\x0b\n\x03min\x18\x04 \x01(\x01\x12\r\n\x05\x63ount\x18\x05 \x01(\x01\x12\x0b\n\x03sum\x18\x06 \x01(\x01\x12\x15\n\rinterpolation\x18\x07 \x01(\x01\x12\x19\n\x11stepInterpolation\x18\x08 \x01(\x01\x12\x1a\n\x12\x63ontinuousVariance\x18\t \x01(\x01\x12\x18\n\x10\x64iscreteVariance\x18\n \x01(\x01\x12\x16\n\x0etotalVariation\x18\x0b \x01(\x01\x12\x11\n\tcountGood\x18\x0c \x01(\x01\x12\x16\n\x0e\x63ountUncertain\x18\r \x01(\x01\x12\x10\n\x08\x63ountBad\x18\x0e \x01(\x01\x12\x14\n\x0c\x64urationGood\x18\x0f \x01(\x01\x12\x19\n\x11\x64urationUncertain\x18\x10 \x01(\x01\x12\x13\n\x0b\x64urationBad\x18\x11 \x01(\x01\x12G\n\x0cmaxDatapoint\x18\x12 \x01(\x0b\x32\x31.com.cognite.v1.timeseries.proto.NumericDatapoint\x12G\n\x0cminDatapoint\x18\x13 \x01(\x0b\x32\x31.com.cognite.v1.timeseries.proto.NumericDatapoint\"^\n\x13\x41ggregateDatapoints\x12G\n\ndatapoints\x18\x01 \x03(\x0b\x32\x33.com.cognite.v1.timeseries.proto.AggregateDatapoint\"/\n\nInstanceId\x12\r\n\x05space\x18\x01 \x01(\t\x12\x12\n\nexternalId\x18\x02 \x01(\tB\x02P\x01\x62\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x11\x64\x61ta_points.proto\x12\x1f\x63om.cognite.v1.timeseries.proto\"&\n\x06Status\x12\x0c\n\x04\x63ode\x18\x01 \x01(\x03\x12\x0e\n\x06symbol\x18\x02 \x01(\t\"\x80\x01\n\x10NumericDatapoint\x12\x11\n\ttimestamp\x18\x01 \x01(\x03\x12\r\n\x05value\x18\x02 \x01(\x01\x12\x37\n\x06status\x18\x03 \x01(\x0b\x32\'.com.cognite.v1.timeseries.proto.Status\x12\x11\n\tnullValue\x18\x04 \x01(\x08\"Z\n\x11NumericDatapoints\x12\x45\n\ndatapoints\x18\x01 \x03(\x0b\x32\x31.com.cognite.v1.timeseries.proto.NumericDatapoint\"\x7f\n\x0fStringDatapoint\x12\x11\n\ttimestamp\x18\x01 \x01(\x03\x12\r\n\x05value\x18\x02 \x01(\t\x12\x37\n\x06status\x18\x03 \x01(\x0b\x32\'.com.cognite.v1.timeseries.proto.Status\x12\x11\n\tnullValue\x18\x04 \x01(\x08\"X\n\x10StringDatapoints\x12\x44\n\ndatapoints\x18\x01 \x03(\x0b\x32\x30.com.cognite.v1.timeseries.proto.StringDatapoint\"\xb2\x01\n\x0eStateDatapoint\x12\x11\n\ttimestamp\x18\x01 \x01(\x03\x12\x19\n\x0cnumericValue\x18\x02 \x01(\x03H\x00\x88\x01\x01\x12\x18\n\x0bstringValue\x18\x03 \x01(\tH\x01\x88\x01\x01\x12\x37\n\x06status\x18\x04 \x01(\x0b\x32\'.com.cognite.v1.timeseries.proto.StatusB\x0f\n\r_numericValueB\x0e\n\x0c_stringValue\"V\n\x0fStateDatapoints\x12\x43\n\ndatapoints\x18\x01 \x03(\x0b\x32/.com.cognite.v1.timeseries.proto.StateDatapoint\"\xcd\x04\n\x12\x41ggregateDatapoint\x12\x11\n\ttimestamp\x18\x01 \x01(\x03\x12\x0f\n\x07\x61verage\x18\x02 \x01(\x01\x12\x0b\n\x03max\x18\x03 \x01(\x01\x12\x0b\n\x03min\x18\x04 \x01(\x01\x12\r\n\x05\x63ount\x18\x05 \x01(\x01\x12\x0b\n\x03sum\x18\x06 \x01(\x01\x12\x15\n\rinterpolation\x18\x07 \x01(\x01\x12\x19\n\x11stepInterpolation\x18\x08 \x01(\x01\x12\x1a\n\x12\x63ontinuousVariance\x18\t \x01(\x01\x12\x18\n\x10\x64iscreteVariance\x18\n \x01(\x01\x12\x16\n\x0etotalVariation\x18\x0b \x01(\x01\x12\x11\n\tcountGood\x18\x0c \x01(\x01\x12\x16\n\x0e\x63ountUncertain\x18\r \x01(\x01\x12\x10\n\x08\x63ountBad\x18\x0e \x01(\x01\x12\x14\n\x0c\x64urationGood\x18\x0f \x01(\x01\x12\x19\n\x11\x64urationUncertain\x18\x10 \x01(\x01\x12\x13\n\x0b\x64urationBad\x18\x11 \x01(\x01\x12G\n\x0cmaxDatapoint\x18\x12 \x01(\x0b\x32\x31.com.cognite.v1.timeseries.proto.NumericDatapoint\x12G\n\x0cminDatapoint\x18\x13 \x01(\x0b\x32\x31.com.cognite.v1.timeseries.proto.NumericDatapoint\x12H\n\x0fstateAggregates\x18\x14 \x03(\x0b\x32/.com.cognite.v1.timeseries.proto.StateAggregate\"^\n\x13\x41ggregateDatapoints\x12G\n\ndatapoints\x18\x01 \x03(\x0b\x32\x33.com.cognite.v1.timeseries.proto.AggregateDatapoint\"\xda\x01\n\x0eStateAggregate\x12\x14\n\x0cnumericValue\x18\x01 \x01(\x03\x12\x18\n\x0bstringValue\x18\x02 \x01(\tH\x00\x88\x01\x01\x12\x17\n\nstateCount\x18\x03 \x01(\x03H\x01\x88\x01\x01\x12\x1d\n\x10stateTransitions\x18\x04 \x01(\x03H\x02\x88\x01\x01\x12\x1a\n\rstateDuration\x18\x05 \x01(\x03H\x03\x88\x01\x01\x42\x0e\n\x0c_stringValueB\r\n\x0b_stateCountB\x13\n\x11_stateTransitionsB\x10\n\x0e_stateDuration\"/\n\nInstanceId\x12\r\n\x05space\x18\x01 \x01(\t\x12\x12\n\nexternalId\x18\x02 \x01(\tB\x02P\x01\x62\x06proto3') _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) @@ -32,10 +32,16 @@ _globals['_STRINGDATAPOINT']._serialized_end=444 _globals['_STRINGDATAPOINTS']._serialized_start=446 _globals['_STRINGDATAPOINTS']._serialized_end=534 - _globals['_AGGREGATEDATAPOINT']._serialized_start=537 - _globals['_AGGREGATEDATAPOINT']._serialized_end=1052 - _globals['_AGGREGATEDATAPOINTS']._serialized_start=1054 - _globals['_AGGREGATEDATAPOINTS']._serialized_end=1148 - _globals['_INSTANCEID']._serialized_start=1150 - _globals['_INSTANCEID']._serialized_end=1197 + _globals['_STATEDATAPOINT']._serialized_start=537 + _globals['_STATEDATAPOINT']._serialized_end=715 + _globals['_STATEDATAPOINTS']._serialized_start=717 + _globals['_STATEDATAPOINTS']._serialized_end=803 + _globals['_AGGREGATEDATAPOINT']._serialized_start=806 + _globals['_AGGREGATEDATAPOINT']._serialized_end=1395 + _globals['_AGGREGATEDATAPOINTS']._serialized_start=1397 + _globals['_AGGREGATEDATAPOINTS']._serialized_end=1491 + _globals['_STATEAGGREGATE']._serialized_start=1494 + _globals['_STATEAGGREGATE']._serialized_end=1712 + _globals['_INSTANCEID']._serialized_start=1714 + _globals['_INSTANCEID']._serialized_end=1761 # @@protoc_insertion_point(module_scope) diff --git a/cognite/client/_proto/data_points_pb2.pyi b/cognite/client/_proto/data_points_pb2.pyi index c7c0aa71cb..5e78e36d65 100644 --- a/cognite/client/_proto/data_points_pb2.pyi +++ b/cognite/client/_proto/data_points_pb2.pyi @@ -49,8 +49,26 @@ class StringDatapoints(_message.Message): datapoints: _containers.RepeatedCompositeFieldContainer[StringDatapoint] def __init__(self, datapoints: _Optional[_Iterable[_Union[StringDatapoint, _Mapping]]] = ...) -> None: ... +class StateDatapoint(_message.Message): + __slots__ = ("timestamp", "numericValue", "stringValue", "status") + TIMESTAMP_FIELD_NUMBER: _ClassVar[int] + NUMERICVALUE_FIELD_NUMBER: _ClassVar[int] + STRINGVALUE_FIELD_NUMBER: _ClassVar[int] + STATUS_FIELD_NUMBER: _ClassVar[int] + timestamp: int + numericValue: int + stringValue: str + status: Status + def __init__(self, timestamp: _Optional[int] = ..., numericValue: _Optional[int] = ..., stringValue: _Optional[str] = ..., status: _Optional[_Union[Status, _Mapping]] = ...) -> None: ... + +class StateDatapoints(_message.Message): + __slots__ = ("datapoints",) + DATAPOINTS_FIELD_NUMBER: _ClassVar[int] + datapoints: _containers.RepeatedCompositeFieldContainer[StateDatapoint] + def __init__(self, datapoints: _Optional[_Iterable[_Union[StateDatapoint, _Mapping]]] = ...) -> None: ... + class AggregateDatapoint(_message.Message): - __slots__ = ("timestamp", "average", "max", "min", "count", "sum", "interpolation", "stepInterpolation", "continuousVariance", "discreteVariance", "totalVariation", "countGood", "countUncertain", "countBad", "durationGood", "durationUncertain", "durationBad", "maxDatapoint", "minDatapoint") + __slots__ = ("timestamp", "average", "max", "min", "count", "sum", "interpolation", "stepInterpolation", "continuousVariance", "discreteVariance", "totalVariation", "countGood", "countUncertain", "countBad", "durationGood", "durationUncertain", "durationBad", "maxDatapoint", "minDatapoint", "stateAggregates") TIMESTAMP_FIELD_NUMBER: _ClassVar[int] AVERAGE_FIELD_NUMBER: _ClassVar[int] MAX_FIELD_NUMBER: _ClassVar[int] @@ -70,6 +88,7 @@ class AggregateDatapoint(_message.Message): DURATIONBAD_FIELD_NUMBER: _ClassVar[int] MAXDATAPOINT_FIELD_NUMBER: _ClassVar[int] MINDATAPOINT_FIELD_NUMBER: _ClassVar[int] + STATEAGGREGATES_FIELD_NUMBER: _ClassVar[int] timestamp: int average: float max: float @@ -89,7 +108,8 @@ class AggregateDatapoint(_message.Message): durationBad: float maxDatapoint: NumericDatapoint minDatapoint: NumericDatapoint - def __init__(self, timestamp: _Optional[int] = ..., average: _Optional[float] = ..., max: _Optional[float] = ..., min: _Optional[float] = ..., count: _Optional[float] = ..., sum: _Optional[float] = ..., interpolation: _Optional[float] = ..., stepInterpolation: _Optional[float] = ..., continuousVariance: _Optional[float] = ..., discreteVariance: _Optional[float] = ..., totalVariation: _Optional[float] = ..., countGood: _Optional[float] = ..., countUncertain: _Optional[float] = ..., countBad: _Optional[float] = ..., durationGood: _Optional[float] = ..., durationUncertain: _Optional[float] = ..., durationBad: _Optional[float] = ..., maxDatapoint: _Optional[_Union[NumericDatapoint, _Mapping]] = ..., minDatapoint: _Optional[_Union[NumericDatapoint, _Mapping]] = ...) -> None: ... + stateAggregates: _containers.RepeatedCompositeFieldContainer[StateAggregate] + def __init__(self, timestamp: _Optional[int] = ..., average: _Optional[float] = ..., max: _Optional[float] = ..., min: _Optional[float] = ..., count: _Optional[float] = ..., sum: _Optional[float] = ..., interpolation: _Optional[float] = ..., stepInterpolation: _Optional[float] = ..., continuousVariance: _Optional[float] = ..., discreteVariance: _Optional[float] = ..., totalVariation: _Optional[float] = ..., countGood: _Optional[float] = ..., countUncertain: _Optional[float] = ..., countBad: _Optional[float] = ..., durationGood: _Optional[float] = ..., durationUncertain: _Optional[float] = ..., durationBad: _Optional[float] = ..., maxDatapoint: _Optional[_Union[NumericDatapoint, _Mapping]] = ..., minDatapoint: _Optional[_Union[NumericDatapoint, _Mapping]] = ..., stateAggregates: _Optional[_Iterable[_Union[StateAggregate, _Mapping]]] = ...) -> None: ... class AggregateDatapoints(_message.Message): __slots__ = ("datapoints",) @@ -97,6 +117,20 @@ class AggregateDatapoints(_message.Message): datapoints: _containers.RepeatedCompositeFieldContainer[AggregateDatapoint] def __init__(self, datapoints: _Optional[_Iterable[_Union[AggregateDatapoint, _Mapping]]] = ...) -> None: ... +class StateAggregate(_message.Message): + __slots__ = ("numericValue", "stringValue", "stateCount", "stateTransitions", "stateDuration") + NUMERICVALUE_FIELD_NUMBER: _ClassVar[int] + STRINGVALUE_FIELD_NUMBER: _ClassVar[int] + STATECOUNT_FIELD_NUMBER: _ClassVar[int] + STATETRANSITIONS_FIELD_NUMBER: _ClassVar[int] + STATEDURATION_FIELD_NUMBER: _ClassVar[int] + numericValue: int + stringValue: str + stateCount: int + stateTransitions: int + stateDuration: int + def __init__(self, numericValue: _Optional[int] = ..., stringValue: _Optional[str] = ..., stateCount: _Optional[int] = ..., stateTransitions: _Optional[int] = ..., stateDuration: _Optional[int] = ...) -> None: ... + class InstanceId(_message.Message): __slots__ = ("space", "externalId") SPACE_FIELD_NUMBER: _ClassVar[int] diff --git a/cognite/client/_sync_api/datapoints.py b/cognite/client/_sync_api/datapoints.py index e19f20118a..3c0b66d00d 100644 --- a/cognite/client/_sync_api/datapoints.py +++ b/cognite/client/_sync_api/datapoints.py @@ -1,6 +1,6 @@ """ =============================================================================== -830266fb5fd8d205e7c1163175ba904e +75a36fd02e14b32a34bc7a21e3415244 This file is auto-generated from the Async API modules, - do not edit manually! =============================================================================== """ diff --git a/cognite/client/data_classes/__init__.py b/cognite/client/data_classes/__init__.py index 9d4fab6aad..ac2145bd9c 100644 --- a/cognite/client/data_classes/__init__.py +++ b/cognite/client/data_classes/__init__.py @@ -47,6 +47,7 @@ LatestDatapoint, LatestDatapointList, LatestDatapointQuery, + StateAggregate, StatusCode, SyntheticDatapoints, SyntheticDatapointsList, @@ -486,6 +487,7 @@ "SimulationTaskOutput", "SimulationTaskParameters", "SourceFile", + "StateAggregate", "StatusCode", "SubworkflowTaskParameters", "SyntheticDatapoints", diff --git a/cognite/client/data_classes/datapoint_aggregates.py b/cognite/client/data_classes/datapoint_aggregates.py index 96c882e900..543d7b5aa7 100644 --- a/cognite/client/data_classes/datapoint_aggregates.py +++ b/cognite/client/data_classes/datapoint_aggregates.py @@ -19,6 +19,9 @@ "max_datapoint", "min", "min_datapoint", + "state_count", + "state_duration", + "state_transitions", "step_interpolation", "sum", "total_variation", @@ -26,7 +29,12 @@ _OBJECT_AGGREGATES_CAMEL: frozenset[Literal["maxDatapoint", "minDatapoint"]] = frozenset( {"maxDatapoint", "minDatapoint"} ) -OBJECT_AGGREGATES: frozenset[Literal["max_datapoint", "min_datapoint"]] = frozenset({"max_datapoint", "min_datapoint"}) +OBJECT_AGGREGATES: frozenset[Literal["max_datapoint", "min_datapoint", "state_aggregates"]] = frozenset( + {"max_datapoint", "min_datapoint", "state_aggregates"} +) +# Requested via aggregates=... but returned nested under ``stateAggregates`` on each aggregate datapoint: +STATE_AGGREGATES_CAMEL: frozenset[str] = frozenset({"stateCount", "stateTransitions", "stateDuration"}) +AGGREGATE_REQUEST_ONLY_SNAKE = frozenset({"state_count", "state_transitions", "state_duration"}) # Assumption: All INT aggregates should adhere to the following logic: Missing values can be replace with 0. # Thus, if you add a new aggregate here, and this is no longer the case, a refactor is needed: @@ -42,8 +50,9 @@ } ) INT_AGGREGATES = frozenset(map(to_snake_case, _INT_AGGREGATES_CAMEL)) -ALL_SORTED_DP_AGGS = sorted(typing.get_args(Aggregate)) -_ALL_AGGREGATES = frozenset(ALL_SORTED_DP_AGGS) +_ALL_AGGREGATE_LITERAL_ARGS = typing.get_args(Aggregate) +ALL_SORTED_DP_AGGS = sorted(a for a in _ALL_AGGREGATE_LITERAL_ARGS if a not in AGGREGATE_REQUEST_ONLY_SNAKE) +_ALL_AGGREGATES = frozenset(ALL_SORTED_DP_AGGS) | frozenset({"state_aggregates"}) ALL_SORTED_NUMERIC_DP_AGGS = [agg for agg in ALL_SORTED_DP_AGGS if agg not in ("min_datapoint", "max_datapoint")] # When we add unit info to dataframe columns, we need to know if the physical unit should be included or not. diff --git a/cognite/client/data_classes/datapoints.py b/cognite/client/data_classes/datapoints.py index e8a522079d..bcc8a32c67 100644 --- a/cognite/client/data_classes/datapoints.py +++ b/cognite/client/data_classes/datapoints.py @@ -61,6 +61,7 @@ import pandas from cognite.client._api.datapoint_tasks import BaseTaskOrchestrator + from cognite.client._proto.data_points_pb2 import StateAggregate as ProtoStateAggregate NumpyDatetime64NSArray: TypeAlias = npt.NDArray[np.datetime64] NumpyUInt32Array: TypeAlias = npt.NDArray[np.uint32] @@ -73,7 +74,7 @@ def numpy_dtype_fix( - element: np.float64 | str | MaxOrMinDatapoint, camel_case: bool = False + element: np.float64 | str | MaxOrMinDatapoint | StateAggregate, camel_case: bool = False ) -> float | str | dict[str, int | float | str]: try: # Using .item() on numpy scalars gives us vanilla python types: @@ -84,6 +85,8 @@ def numpy_dtype_fix( return element elif isinstance(element, MaxOrMinDatapoint): return element.dump(camel_case=camel_case) + elif isinstance(element, StateAggregate): + return element.dump(camel_case=camel_case) raise @@ -216,6 +219,54 @@ def _max_dp_class(dct: dict[str, Any]) -> type[MaxDatapoint | MaxDatapointWithSt return MaxDatapointWithStatus if "statusCode" in dct else MaxDatapoint +@dataclass(slots=True, frozen=True) +class StateAggregate: + """Per-state statistics inside an aggregate datapoint for state time series.""" + + numeric_value: int + string_value: str | None + state_count: int | None + state_transitions: int | None + state_duration: int | None + + @classmethod + def from_proto(cls, msg: ProtoStateAggregate) -> Self: + return cls( + numeric_value=msg.numericValue, + string_value=msg.stringValue if msg.HasField("stringValue") else None, + state_count=msg.stateCount if msg.HasField("stateCount") else None, + state_transitions=msg.stateTransitions if msg.HasField("stateTransitions") else None, + state_duration=msg.stateDuration if msg.HasField("stateDuration") else None, + ) + + @classmethod + def _load(cls, dct: dict[str, Any]) -> Self: + nv = dct.get("numeric_value", dct.get("numericValue")) + if nv is None: + raise KeyError("numeric_value / numericValue is required for StateAggregate") + return cls( + numeric_value=nv, + string_value=dct.get("string_value", dct.get("stringValue")), + state_count=dct.get("state_count", dct.get("stateCount")), + state_transitions=dct.get("state_transitions", dct.get("stateTransitions")), + state_duration=dct.get("state_duration", dct.get("stateDuration")), + ) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + out: dict[str, Any] = { + "numericValue" if camel_case else "numeric_value": self.numeric_value, + } + if self.string_value is not None: + out["stringValue" if camel_case else "string_value"] = self.string_value + if self.state_count is not None: + out["stateCount" if camel_case else "state_count"] = self.state_count + if self.state_transitions is not None: + out["stateTransitions" if camel_case else "state_transitions"] = self.state_transitions + if self.state_duration is not None: + out["stateDuration" if camel_case else "state_duration"] = self.state_duration + return out + + @dataclass class DatapointsQuery: """Represent a user request for datapoints for a single time series""" @@ -465,6 +516,9 @@ class Datapoint(CogniteResource): duration_uncertain (int | None): The duration the aggregate is defined and marked as uncertain (measured in milliseconds). status_code (int | None): The status code for the raw datapoint. status_symbol (str | None): The status symbol for the raw datapoint. + numeric_value (int | None): For state time series, the numeric state code (raw datapoints). + string_value (str | None): For state time series, the string state label (raw datapoints). + state_aggregates (list[StateAggregate] | None): For state time series aggregates, nested per-interval rows. timezone (datetime.timezone | ZoneInfo | None): The timezone to use when displaying the datapoint. """ @@ -492,6 +546,9 @@ def __init__( duration_uncertain: int | None = None, status_code: int | None = None, status_symbol: str | None = None, + numeric_value: int | None = None, + string_value: str | None = None, + state_aggregates: list[StateAggregate] | None = None, timezone: datetime.timezone | ZoneInfo | None = None, ) -> None: self.timestamp = timestamp @@ -516,6 +573,9 @@ def __init__( self.duration_uncertain = duration_uncertain self.status_code = status_code self.status_symbol = status_symbol + self.numeric_value = numeric_value + self.string_value = string_value + self.state_aggregates = state_aggregates self.timezone = timezone def __str__(self) -> str: @@ -535,7 +595,7 @@ def to_pandas(self, camel_case: bool = False) -> pandas.DataFrame: # type: igno pd = local_import("pandas") dumped = self.dump(camel_case=camel_case) - for key in iterable_to_case(["min_datapoint", "max_datapoint"], camel_case): + for key in iterable_to_case(["min_datapoint", "max_datapoint", "state_aggregates"], camel_case): if dp := dumped.get(key): dumped[key] = [dp] # make pandas treat this dict as a scalar value @@ -569,6 +629,14 @@ def _load(cls, resource: dict[str, Any]) -> Self: with contextlib.suppress(ValueError): timezone = parse_str_timezone(raw_timezone) + raw_saggs = resource.get("stateAggregates") + state_aggregates_lst: list[StateAggregate] | None = None + if raw_saggs: + if isinstance(raw_saggs[0], StateAggregate): + state_aggregates_lst = list(raw_saggs) + elif isinstance(raw_saggs[0], dict): + state_aggregates_lst = [StateAggregate._load(row) for row in raw_saggs] + return cls( timestamp=resource["timestamp"], value=resource.get("value"), @@ -592,6 +660,9 @@ def _load(cls, resource: dict[str, Any]) -> Self: duration_uncertain=resource.get("durationUncertain"), status_code=resource.get("statusCode"), status_symbol=resource.get("statusSymbol"), + numeric_value=resource.get("numericValue"), + string_value=resource.get("stringValue"), + state_aggregates=state_aggregates_lst, timezone=timezone, ) @@ -599,6 +670,14 @@ def dump(self, camel_case: bool = True, include_timezone: bool = True) -> dict[s dumped = super().dump(camel_case=camel_case) # Keep value even if None (bad status codes support missing): dumped["value"] = self.value # TODO: What if Datapoint represents one or more aggregates? + if self.numeric_value is not None: + dumped["numericValue" if camel_case else "numeric_value"] = self.numeric_value + if self.string_value is not None: + dumped["stringValue" if camel_case else "string_value"] = self.string_value + if self.state_aggregates is not None: + dumped["stateAggregates" if camel_case else "state_aggregates"] = [ + sa.dump(camel_case=camel_case) for sa in self.state_aggregates + ] if self.max_datapoint: dumped["maxDatapoint" if camel_case else "max_datapoint"] = self.max_datapoint.dump(camel_case) if self.min_datapoint: @@ -627,6 +706,8 @@ def __init__( granularity: str | None = None, timestamp: NumpyDatetime64NSArray | None = None, value: NumpyFloat64Array | NumpyObjArray | None = None, + numeric_value: NumpyInt64Array | NumpyObjArray | None = None, + string_value: NumpyObjArray | None = None, average: NumpyFloat64Array | None = None, max: NumpyFloat64Array | None = None, max_datapoint: NumpyObjArray | None = None, @@ -645,6 +726,7 @@ def __init__( duration_bad: NumpyInt64Array | None = None, duration_good: NumpyInt64Array | None = None, duration_uncertain: NumpyInt64Array | None = None, + state_aggregates: NumpyObjArray | None = None, status_code: NumpyUInt32Array | None = None, status_symbol: NumpyObjArray | None = None, null_timestamps: set[int] | None = None, @@ -663,6 +745,8 @@ def __init__( timestamp if timestamp is not None else np.array([], dtype="datetime64[ns]") ) self.value = value + self.numeric_value = numeric_value + self.string_value = string_value self.average = average self.max = max self.max_datapoint = max_datapoint @@ -681,6 +765,7 @@ def __init__( self.duration_bad = duration_bad self.duration_good = duration_good self.duration_uncertain = duration_uncertain + self.state_aggregates = state_aggregates self.status_code = status_code self.status_symbol = status_symbol self.null_timestamps = null_timestamps @@ -726,6 +811,15 @@ def _load( for attr, values in datapoints_by_attr.items(): if attr == "timestamp": array_by_attr[attr] = np.array(values, dtype="datetime64[ms]").astype("datetime64[ns]") + elif attr == "stateAggregates": + parsed_rows: list[list[StateAggregate]] = [] + for row in values: + parsed_rows.append( + [StateAggregate._load(x) if isinstance(x, dict) else x for x in row], + ) + array_by_attr[attr] = np.array(parsed_rows, dtype=np.object_) + elif attr in ("numericValue", "stringValue"): + array_by_attr[attr] = np.array(values, dtype=np.object_) elif attr in _INT_AGGREGATES_CAMEL: array_by_attr[attr] = np.array(values, dtype=np.int64) else: @@ -748,11 +842,13 @@ def _load( is_step=dps_dct["isStep"], is_string=dps_dct["isString"], unit=dps_dct.get("unit"), - type=dps_dct["type"], + type=dps_dct.get("type") or ("string" if dps_dct["isString"] else "numeric"), granularity=dps_dct.get("granularity"), unit_external_id=dps_dct.get("unitExternalId"), timestamp=array_by_attr.get("timestamp"), value=array_by_attr.get("value"), + numeric_value=array_by_attr.get("numericValue"), + string_value=array_by_attr.get("stringValue"), average=array_by_attr.get("average"), max=array_by_attr.get("max"), min=array_by_attr.get("min"), @@ -769,6 +865,7 @@ def _load( duration_bad=array_by_attr.get("durationBad"), duration_good=array_by_attr.get("durationGood"), duration_uncertain=array_by_attr.get("durationUncertain"), + state_aggregates=array_by_attr.get("stateAggregates"), status_code=array_by_attr.get("statusCode"), status_symbol=array_by_attr.get("statusSymbol"), null_timestamps=set(dps_dct["nullTimestamps"]) if "nullTimestamps" in dps_dct else None, @@ -800,7 +897,7 @@ def __getitem__(self, item: int | slice) -> Datapoint | DatapointsArray: data: dict[str, float | str | dict | None] = { attr: numpy_dtype_fix(arr[item]) for attr, arr in zip(attrs[1:], arrays[1:]) } - for key in ("min_datapoint", "max_datapoint"): + for key in ("min_datapoint", "max_datapoint", "state_aggregates"): if key in data: data[key] = getattr(self, key)[item] if self.status_code is not None: @@ -829,10 +926,11 @@ def __iter__(self) -> NoReturn: def _data_fields(self) -> tuple[list[str], list[npt.NDArray]]: # Note: Does not return status-related fields + extra = ("numeric_value", "string_value", "state_aggregates") data_field_tuples = [ (attr, arr) - for attr in ("timestamp", "value", *ALL_SORTED_DP_AGGS) # ts must be first - if (arr := getattr(self, attr)) is not None + for attr in ("timestamp", "value", *extra, *ALL_SORTED_DP_AGGS) # ts must be first + if (arr := getattr(self, attr, None)) is not None ] attrs, arrays = map(list, zip(*data_field_tuples)) return attrs, arrays @@ -943,6 +1041,8 @@ class Datapoints(CogniteResource): granularity (str | None): The granularity of the aggregate datapoints (does not apply to raw data) timestamp (list[int] | None): The data timestamps in milliseconds since the epoch (Jan 1, 1970). Can be negative to define a date before 1970. Minimum timestamp is 1900.01.01 00:00:00 UTC value (list[str] | list[float] | None): The raw data values. Can be string or numeric. + numeric_value (list[int | None] | None): For state time series, the per-datapoint numeric state code; ``None`` where the state had no numeric value. + string_value (list[str | None] | None): For state time series, the per-datapoint string state label; ``None`` where the state had no string value. average (list[float] | None): The time-weighted average values per aggregate interval. max (list[float] | None): The maximum values per aggregate interval. max_datapoint (list[MaxDatapoint] | list[MaxDatapointWithStatus] | None): Objects with the maximum values and their timestamps in the aggregate intervals, optionally including status codes and symbols. @@ -961,6 +1061,7 @@ class Datapoints(CogniteResource): duration_bad (list[int] | None): The duration the aggregate is defined and marked as bad (measured in milliseconds). duration_good (list[int] | None): The duration the aggregate is defined and marked as good (measured in milliseconds). duration_uncertain (list[int] | None): The duration the aggregate is defined and marked as uncertain (measured in milliseconds). + state_aggregates (list[list[StateAggregate]] | None): For state time series aggregate queries, per-interval ``StateAggregate`` rows. status_code (list[int] | None): The status codes for the raw datapoints. status_symbol (list[str] | None): The status symbols for the raw datapoints. timezone (datetime.timezone | ZoneInfo | None): The timezone to use when displaying the datapoints. @@ -979,6 +1080,8 @@ def __init__( granularity: str | None = None, timestamp: list[int] | None = None, value: list[str] | list[float] | None = None, + numeric_value: list[int | None] | None = None, + string_value: list[str | None] | None = None, average: list[float] | None = None, max: list[float] | None = None, max_datapoint: list[MaxDatapoint] | list[MaxDatapointWithStatus] | None = None, @@ -997,6 +1100,7 @@ def __init__( duration_bad: list[int] | None = None, duration_good: list[int] | None = None, duration_uncertain: list[int] | None = None, + state_aggregates: list[list[StateAggregate]] | None = None, status_code: list[int] | None = None, status_symbol: list[str] | None = None, timezone: datetime.timezone | ZoneInfo | None = None, @@ -1012,6 +1116,8 @@ def __init__( self.granularity = granularity self.timestamp: list[int] = timestamp or [] self.value = value + self.numeric_value = numeric_value + self.string_value = string_value self.average = average self.max = max self.max_datapoint = max_datapoint @@ -1030,6 +1136,7 @@ def __init__( self.duration_bad = duration_bad self.duration_good = duration_good self.duration_uncertain = duration_uncertain + self.state_aggregates = state_aggregates self.status_code = status_code self.status_symbol = status_symbol self.timezone = timezone @@ -1097,6 +1204,14 @@ def dump(self, camel_case: bool = True) -> dict[str, Any]: if self.timezone is not None: dumped["timezone"] = convert_timezone_to_str(self.timezone) datapoints = [dp.dump(camel_case=camel_case, include_timezone=False) for dp in self.__get_datapoint_objects()] + if self.type == "state": + # Always emit both numericValue and stringValue keys (even when None) so + # `Datapoints._load(Datapoints.dump())` round-trips sparse state series. + num_key = "numericValue" if camel_case else "numeric_value" + str_key = "stringValue" if camel_case else "string_value" + for dp, num, s in zip(datapoints, self.numeric_value or [], self.string_value or []): + dp[num_key] = num + dp[str_key] = s if self.status_code is not None or self.status_symbol is not None: if ( self.status_code is None @@ -1152,7 +1267,7 @@ def _load( instance_id=NodeId._load_if(dps_object.get("instanceId")), is_string=dps_object["isString"], is_step=dps_object["isStep"], - type=dps_object["type"], + type=dps_object.get("type") or ("string" if dps_object["isString"] else "numeric"), unit=dps_object.get("unit"), unit_external_id=dps_object.get("unitExternalId"), ) @@ -1174,6 +1289,8 @@ def _load( data_lists["minDatapoint"] = list(map(_min_dp_class(min_dp[0])._load, min_dp)) if max_dp := data_lists.get("maxDatapoint"): data_lists["maxDatapoint"] = list(map(_max_dp_class(max_dp[0])._load, max_dp)) + if state_agg := data_lists.get("stateAggregates"): + data_lists["stateAggregates"] = [StateAggregate._load(x) if isinstance(x, dict) else x for x in state_agg] for key, data in data_lists.items(): snake_key = to_snake_case(key) diff --git a/cognite/client/utils/_datapoints.py b/cognite/client/utils/_datapoints.py index 117a45cbab..3ee422cb54 100644 --- a/cognite/client/utils/_datapoints.py +++ b/cognite/client/utils/_datapoints.py @@ -13,13 +13,15 @@ from cognite.client._constants import NUMPY_IS_AVAILABLE from cognite.client._proto.data_point_list_response_pb2 import ( TIMESERIES_TYPE_NUMERIC, + TIMESERIES_TYPE_STATE, TIMESERIES_TYPE_STRING, + TIMESERIES_TYPE_UNSPECIFIED, DataPointListItem, - TimeSeriesType, ) from cognite.client._proto.data_points_pb2 import ( AggregateDatapoint, NumericDatapoint, + StateDatapoint, StringDatapoint, ) from cognite.client.data_classes.data_modeling import NodeId @@ -30,6 +32,7 @@ MaxOrMinDatapoint, MinDatapoint, MinDatapointWithStatus, + StateAggregate, ) from cognite.client.utils.useful_types import SequenceNotStr @@ -43,11 +46,13 @@ NumericDatapoints = RepeatedCompositeFieldContainer[NumericDatapoint] StringDatapoints = RepeatedCompositeFieldContainer[StringDatapoint] -DatapointAny = AggregateDatapoint | NumericDatapoint | StringDatapoint -DatapointsAny = AggregateDatapoints | NumericDatapoints | StringDatapoints +StateDatapoints = RepeatedCompositeFieldContainer[StateDatapoint] -DatapointRaw = NumericDatapoint | StringDatapoint -DatapointsRaw = NumericDatapoints | StringDatapoints +DatapointAny = AggregateDatapoint | NumericDatapoint | StringDatapoint | StateDatapoint +DatapointsAny = AggregateDatapoints | NumericDatapoints | StringDatapoints | StateDatapoints + +DatapointRaw = NumericDatapoint | StringDatapoint | StateDatapoint +DatapointsRaw = NumericDatapoints | StringDatapoints | StateDatapoints RawDatapointValue = float | str DatapointsId = int | DatapointsQuery | Sequence[int | DatapointsQuery] @@ -57,7 +62,12 @@ class DpsUnpackFns: ts: Callable[[DatapointAny], int] = op.attrgetter("timestamp") - raw_dp: Callable[[DatapointRaw], RawDatapointValue] = op.attrgetter("value") + + @staticmethod + def raw_dp(dp: DatapointRaw) -> RawDatapointValue: + if isinstance(dp, StateDatapoint): + raise TypeError("raw_dp does not apply to state datapoints") + return dp.value @staticmethod def custom_from_aggregates(lst: list[str]) -> Callable[[AggregateDatapoint], tuple[float, ...]]: @@ -73,9 +83,10 @@ def status_symbol(dp: DatapointRaw) -> str: # When datapoints with bad status codes are not ignored, value may be missing: @staticmethod - def nullable_raw_dp(dp: DatapointRaw) -> float | str: - # We pretend like float is always returned to not break every dps annot. in the entire SDK.. - return dp.value if not dp.nullValue else None # type: ignore [return-value] + def nullable_raw_dp(dp: DatapointRaw) -> float | str | None: + if isinstance(dp, StateDatapoint): + raise TypeError("nullable_raw_dp does not apply to state datapoints") + return dp.value if not dp.nullValue else None # minDatapoint and maxDatapoint are also objects in the response. The proto lookups doesn't fail, # so we must be very careful to only attach status codes if requested. @@ -124,7 +135,7 @@ def extract_raw_dps_numpy(dps: DatapointsRaw, dtype: type[np.float64] | type[np. return np.fromiter(map(DpsUnpackFns.raw_dp, dps), dtype=dtype, count=len(dps)) @staticmethod - def extract_nullable_raw_dps(dps: DatapointsRaw) -> list[float | str]: # actually list of [... | None] + def extract_nullable_raw_dps(dps: DatapointsRaw) -> list[float | str | None]: return list(map(DpsUnpackFns.nullable_raw_dp, dps)) @staticmethod @@ -132,12 +143,12 @@ def extract_nullable_raw_dps_numpy( dps: DatapointsRaw, dtype: type[np.float64] | type[np.object_] ) -> tuple[npt.NDArray[Any], list[int]]: # This is a very hot loop, thus we make some ugly optimizations: - values = [None] * len(dps) + values: list[float | str | None] = [None] * len(dps) missing: list[int] = [] add_missing = missing.append for i, dp in enumerate(map(DpsUnpackFns.nullable_raw_dp, dps)): # we use list because of its significantly lower overhead than numpy on single element access: - values[i] = dp # type: ignore [call-overload] + values[i] = dp if dp is None: add_missing(i) arr = np.array(values, dtype=dtype) @@ -159,6 +170,34 @@ def extract_status_symbol(dps: DatapointsRaw) -> list[str]: def extract_status_symbol_numpy(dps: DatapointsRaw) -> npt.NDArray[np.object_]: return np.fromiter(map(DpsUnpackFns.status_symbol, dps), dtype=np.object_, count=len(dps)) + @staticmethod + def nullable_state_numeric(dp: StateDatapoint) -> int | None: + return dp.numericValue if dp.HasField("numericValue") else None + + @staticmethod + def nullable_state_string(dp: StateDatapoint) -> str | None: + return dp.stringValue if dp.HasField("stringValue") else None + + @staticmethod + def extract_state_numeric(dps: StateDatapoints) -> list[int | None]: + return list(map(DpsUnpackFns.nullable_state_numeric, dps)) + + @staticmethod + def extract_state_string(dps: StateDatapoints) -> list[str | None]: + return list(map(DpsUnpackFns.nullable_state_string, dps)) + + @staticmethod + def extract_state_numeric_numpy(dps: StateDatapoints) -> npt.NDArray[np.object_]: + return np.fromiter(map(DpsUnpackFns.nullable_state_numeric, dps), dtype=np.object_, count=len(dps)) + + @staticmethod + def extract_state_string_numpy(dps: StateDatapoints) -> npt.NDArray[np.object_]: + return np.fromiter(map(DpsUnpackFns.nullable_state_string, dps), dtype=np.object_, count=len(dps)) + + @staticmethod + def extract_state_aggregates_rows(dps: AggregateDatapoints) -> list[list[StateAggregate]]: + return [[StateAggregate.from_proto(sa) for sa in dp.stateAggregates] for dp in dps] + @staticmethod def extract_aggregates( dps: AggregateDatapoints, @@ -226,23 +265,38 @@ def get_datapoints_from_proto(res: DataPointListItem) -> DatapointsAny: return cast(DatapointsAny, []) -def proto_type_to_str(ts_type: TimeSeriesType) -> Literal["numeric", "string"]: - if ts_type == TIMESERIES_TYPE_NUMERIC: # 1 +def infer_time_series_type_from_list_item( + res: DataPointListItem, +) -> Literal["numeric", "string", "state", "unknown"]: + t = res.type + if t == TIMESERIES_TYPE_UNSPECIFIED: + # Older API responses do not set the new `type` field; fall back to which datapoint + # variant was returned in the `datapointType` oneof. + match res.WhichOneof("datapointType"): + case "stateDatapoints": + return "state" + case "stringDatapoints": + return "string" + case "numericDatapoints" | "aggregateDatapoints": + return "numeric" + case _: + return "unknown" + if t == TIMESERIES_TYPE_NUMERIC: return "numeric" - elif ts_type == TIMESERIES_TYPE_STRING: # 2 + if t == TIMESERIES_TYPE_STRING: return "string" - elif ts_type >= 3: - from cognite.client._version import __version__ - - warnings.warn( - f"Unknown time series type ({ts_type}) received from the API. " - "Please upgrade to a newer version of the Cognite SDK to handle this type " - f"(current version={__version__!r}).", - UserWarning, - stacklevel=3, - ) - # We also return 'unknown' for TIMESERIES_TYPE_UNSPECIFIED (0): - return "unknown" # type: ignore [return-value] + if t == TIMESERIES_TYPE_STATE: + return "state" + from cognite.client._version import __version__ + + warnings.warn( + f"Unknown time series type ({t}) received from the API. " + "Please upgrade to a newer version of the Cognite SDK to handle this type " + f"(current version={__version__!r}).", + UserWarning, + stacklevel=3, + ) + return "unknown" def get_ts_info_from_proto(res: DataPointListItem) -> dict[str, int | str | bool | NodeId | None]: @@ -256,7 +310,7 @@ def get_ts_info_from_proto(res: DataPointListItem) -> dict[str, int | str | bool "external_id": res.externalId, "is_string": res.isString, "is_step": res.isStep, - "type": proto_type_to_str(res.type), + "type": infer_time_series_type_from_list_item(res), "unit": res.unit, "unit_external_id": res.unitExternalId, "instance_id": instance_id, diff --git a/cognite/client/utils/_pandas_helpers.py b/cognite/client/utils/_pandas_helpers.py index abe69a168e..95371e1e69 100644 --- a/cognite/client/utils/_pandas_helpers.py +++ b/cognite/client/utils/_pandas_helpers.py @@ -228,7 +228,17 @@ class _DpsColumnInfo: """ column_id: NodeId | str | int - data: list[float] | list[str] | list[int] | NumpyUInt32Array | NumpyInt64Array | NumpyFloat64Array | NumpyObjArray + data: ( + list[float] + | list[str] + | list[int] + | list[int | None] + | list[str | None] + | NumpyUInt32Array + | NumpyInt64Array + | NumpyFloat64Array + | NumpyObjArray + ) is_string: bool | None = None is_array: bool = False aggregate: str | None = None @@ -259,6 +269,9 @@ def _convert_to_array_for_raw_dps( ) -> npt.NDArray[np.object_] | npt.NDArray[np.float64] | npt.NDArray[np.uint32]: import numpy as np + if self.aggregate in ("numeric_value", "string_value"): + return np.array(self.data, dtype=np.object_) + match self.is_string, self.status_info: case True, None: return np.array(self.data, dtype=np.object_) @@ -297,6 +310,41 @@ def _extract_raw_column_info( is_array: bool, include_status: bool, ) -> list[_DpsColumnInfo]: + if dps.type == "state": + columns: list[_DpsColumnInfo] = [] + if dps.numeric_value is not None: + columns.append( + _DpsColumnInfo( + identifier, + data=dps.numeric_value, + is_string=False, + is_array=is_array, + aggregate="numeric_value", + unit_xid=dps.unit_external_id or None, + ) + ) + if dps.string_value is not None: + columns.append( + _DpsColumnInfo( + identifier, + data=dps.string_value, + is_string=True, + is_array=is_array, + aggregate="string_value", + unit_xid=dps.unit_external_id or None, + ) + ) + if not columns: + raise ValueError("State time series datapoints must have numeric_value and/or string_value arrays set") + if include_status: + if dps.status_code is not None: + columns.append(_DpsColumnInfo(identifier, data=dps.status_code, is_array=is_array, status_info="code")) + if dps.status_symbol is not None: + columns.append( + _DpsColumnInfo(identifier, data=dps.status_symbol, is_array=is_array, status_info="symbol") + ) + return columns + assert dps.value is not None columns = [ _DpsColumnInfo( @@ -342,6 +390,8 @@ def _extract_column_info_from_dps_for_dataframe( identifier = _resolve_ts_identifier_as_df_column_name(dps) is_array = isinstance(dps, DatapointsArray) + if dps.type == "state": + return _extract_raw_column_info(dps, identifier, is_array, include_status) if dps.value is not None: return _extract_raw_column_info(dps, identifier, is_array, include_status) return _extract_aggregate_column_info_from_dps(dps, identifier, is_array) diff --git a/tests/tests_integration/test_api/test_state_timeseries.py b/tests/tests_integration/test_api/test_state_timeseries.py new file mode 100644 index 0000000000..6856f001ce --- /dev/null +++ b/tests/tests_integration/test_api/test_state_timeseries.py @@ -0,0 +1,205 @@ +from __future__ import annotations + +import contextlib +import datetime +import os +import sys +import uuid +from collections.abc import Iterator + +import pytest + +from cognite.client import AsyncCogniteClient, ClientConfig, CogniteClient +from cognite.client.data_classes.data_modeling import NodeApply, NodeOrEdgeData, SpaceApply, ViewId +from cognite.client.data_classes.data_modeling.ids import NodeId +from cognite.client.utils._experimental import FeaturePreviewWarning + +SPACE_PREFIX = "sp_python_sdk_state_ts" +STATE_SET_VIEW = ViewId("cdf_cdm", "CogniteStateSet", "v1") +TIME_SERIES_VIEW = ViewId("cdf_cdm", "CogniteTimeSeries", "v1") + +# Fixed timestamps so aggregate assertions stay deterministic. +TS_T0 = 1_609_459_200_000 # 2021-01-01 00:00:00 UTC +TS_T1 = 1_609_462_800_000 # +1h +TS_T2 = 1_609_466_400_000 # +2h +QUERY_END = 1_609_545_600_000 # 2021-01-02 00:00:00 UTC + + +def _shard_id(worker_id: str) -> str: + """Return a unique-per-process suffix, including OS / Python version / xdist worker / random.""" + py = f"py{sys.version_info.major}{sys.version_info.minor}" + osid = "win" if os.name == "nt" else "nix" + return f"{osid}_{py}_{worker_id}_{uuid.uuid4().hex[:8]}" + + +@pytest.fixture(scope="session") +def beta_cognite_client(cognite_client: CogniteClient) -> CogniteClient: + cfg = cognite_client.config + return CogniteClient( + ClientConfig( + client_name=cfg.client_name, + project=cfg.project, + base_url=cfg.base_url, + credentials=cfg.credentials, + api_subversion="beta", + ) + ) + + +@pytest.fixture(scope="class") +def state_ts_resources(beta_cognite_client: CogniteClient, worker_id: str) -> Iterator[tuple[str, str, str, NodeId]]: + shard = _shard_id(worker_id) + space = f"{SPACE_PREFIX}_{shard}" + state_set_xid = f"valve_states_{shard}" + ts_xid = f"valve_001_state_{shard}" + + beta_cognite_client.data_modeling.spaces.apply(SpaceApply(space=space)) + beta_cognite_client.data_modeling.instances.apply( + nodes=[ + NodeApply( + space=space, + external_id=state_set_xid, + sources=[ + NodeOrEdgeData( + source=STATE_SET_VIEW, + properties={ + "name": "Valve Position States", + "description": "Standard position states for industrial valves", + "states": [ + {"numericValue": 0, "stringValue": "CLOSED"}, + {"numericValue": 1, "stringValue": "OPEN"}, + {"numericValue": 2, "stringValue": "TRANSITIONING"}, + ], + }, + ) + ], + ), + NodeApply( + space=space, + external_id=ts_xid, + sources=[ + NodeOrEdgeData( + source=TIME_SERIES_VIEW, + properties={ + "name": "Valve 001 Position", + "description": "Integration test state time series", + "type": "state", + "stateSet": {"space": space, "externalId": state_set_xid}, + }, + ) + ], + ), + ], + replace=True, + ) + + ts_node_id = NodeId(space, ts_xid) + try: + yield space, state_set_xid, ts_xid, ts_node_id + finally: + with contextlib.suppress(Exception): + beta_cognite_client.data_modeling.instances.delete( + nodes=[NodeId(space, ts_xid), NodeId(space, state_set_xid)] + ) + with contextlib.suppress(Exception): + beta_cognite_client.data_modeling.spaces.delete(spaces=[space]) + + +def _state_datapoints() -> list[dict[str, int | float | str | datetime.datetime]]: + return [ + {"timestamp": TS_T0, "numericValue": 0, "stringValue": "CLOSED"}, + {"timestamp": TS_T1, "numericValue": 1, "stringValue": "OPEN"}, + {"timestamp": TS_T2, "numericValue": 0, "stringValue": "CLOSED"}, + ] + + +class TestStateTimeSeries: + def test_insert_and_retrieve_raw( + self, + cognite_client: CogniteClient, + state_ts_resources: tuple[str, str, str, NodeId], + ) -> None: + _, _, _, ts_node_id = state_ts_resources + + with pytest.warns(FeaturePreviewWarning, match="State time series"): + cognite_client.time_series.data.insert(_state_datapoints(), instance_id=ts_node_id) + + with pytest.warns(FeaturePreviewWarning, match="State time series"): + dps = cognite_client.time_series.data.retrieve(instance_id=ts_node_id, start=TS_T0, end=QUERY_END) + + assert dps is not None + assert dps.type == "state" + assert list(dps.timestamp) == [TS_T0, TS_T1, TS_T2] + assert dps.numeric_value == [0, 1, 0] + assert dps.string_value == ["CLOSED", "OPEN", "CLOSED"] + + def test_retrieve_arrays_state_raw( + self, + cognite_client: CogniteClient, + state_ts_resources: tuple[str, str, str, NodeId], + ) -> None: + import numpy as np + + _, _, _, ts_node_id = state_ts_resources + + with pytest.warns(FeaturePreviewWarning, match="State time series"): + arr = cognite_client.time_series.data.retrieve_arrays(instance_id=ts_node_id, start=TS_T0, end=QUERY_END) + + assert arr is not None + assert arr.type == "state" + assert arr.numeric_value is not None and arr.string_value is not None + assert arr.numeric_value.dtype == np.object_ + assert arr.string_value.dtype == np.object_ + assert arr.numeric_value.tolist() == [0, 1, 0] + assert arr.string_value.tolist() == ["CLOSED", "OPEN", "CLOSED"] + + def test_retrieve_state_aggregates( + self, + cognite_client: CogniteClient, + state_ts_resources: tuple[str, str, str, NodeId], + ) -> None: + _, _, _, ts_node_id = state_ts_resources + + with pytest.warns(FeaturePreviewWarning, match="State time series"): + dps = cognite_client.time_series.data.retrieve( + instance_id=ts_node_id, + start=TS_T0, + end=QUERY_END, + aggregates=["state_count", "state_transitions", "state_duration"], + granularity="1d", + ) + + assert dps is not None + assert dps.state_aggregates is not None and len(dps.state_aggregates) >= 1 + rows = dps.state_aggregates[0] + by_value = {row.numeric_value: row for row in rows} + + closed, open_ = by_value[0], by_value[1] + assert closed.string_value == "CLOSED" + assert open_.string_value == "OPEN" + assert closed.state_count == 2 + assert open_.state_count == 1 + assert closed.state_transitions == 2 + assert open_.state_transitions == 1 + assert open_.state_duration == 3_600_000 + assert closed.state_duration is not None and closed.state_duration > 0 + + +class TestStateTimeSeriesAsync: + async def test_insert_and_retrieve_raw_async( + self, + async_client: AsyncCogniteClient, + state_ts_resources: tuple[str, str, str, NodeId], + ) -> None: + _, _, _, ts_node_id = state_ts_resources + + with pytest.warns(FeaturePreviewWarning, match="State time series"): + await async_client.time_series.data.insert(_state_datapoints(), instance_id=ts_node_id) + + with pytest.warns(FeaturePreviewWarning, match="State time series"): + dps = await async_client.time_series.data.retrieve(instance_id=ts_node_id, start=TS_T0, end=QUERY_END) + + assert dps is not None + assert dps.type == "state" + assert dps.numeric_value == [0, 1, 0] + assert dps.string_value == ["CLOSED", "OPEN", "CLOSED"] diff --git a/tests/tests_unit/test_api/test_datapoints.py b/tests/tests_unit/test_api/test_datapoints.py index f09ac40893..c72680d8b6 100644 --- a/tests/tests_unit/test_api/test_datapoints.py +++ b/tests/tests_unit/test_api/test_datapoints.py @@ -260,7 +260,7 @@ def test_by_external_id(self, cognite_client: CogniteClient, mock_post_datapoint @pytest.mark.parametrize("ts_key, value_key", [("timestamp", "values"), ("timstamp", "value")]) def test_invalid_datapoints_keys(self, cognite_client: CogniteClient, ts_key: str, value_key: str) -> None: dps: list[dict] = [{ts_key: i * 1e11, value_key: i} for i in range(1, 11)] - with pytest.raises(KeyError, match="A datapoint is missing one or both keys"): + with pytest.raises(KeyError, match="A datapoint dict must include"): cognite_client.time_series.data.insert(dps, id=1) def test_insert_datapoints_over_limit( @@ -1099,3 +1099,100 @@ async def override_insert_dps(self: Any, payload: list[dict]) -> None: assert 0 < n_dps <= dps_limit assert 0 < len(call) <= ts_limit assert expected_n_dps == tot_n_dps + + +class TestStateTimeSeriesUtils: + def test_get_datapoints_from_proto_state(self) -> None: + from cognite.client._proto.data_point_list_response_pb2 import ( + TIMESERIES_TYPE_STATE, + DataPointListItem, + ) + from cognite.client._proto.data_points_pb2 import StateDatapoint, StateDatapoints + from cognite.client.utils._datapoints import ( + get_datapoints_from_proto, + get_ts_info_from_proto, + infer_time_series_type_from_list_item, + ) + + item = DataPointListItem( + id=42, + isString=False, + isStep=False, + type=TIMESERIES_TYPE_STATE, + stateDatapoints=StateDatapoints( + datapoints=[ + StateDatapoint(timestamp=10, numericValue=1), + StateDatapoint(timestamp=20, stringValue="open"), + ] + ), + ) + dps = get_datapoints_from_proto(item) + assert len(dps) == 2 + assert infer_time_series_type_from_list_item(item) == "state" + info = get_ts_info_from_proto(item) + assert info["type"] == "state" + assert info["id"] == 42 + + def test_dps_unpack_fns_state_aggregates(self) -> None: + from cognite.client._proto.data_points_pb2 import ( + AggregateDatapoint, + AggregateDatapoints, + ) + from cognite.client._proto.data_points_pb2 import ( + StateAggregate as ProtoStateAggregate, + ) + from cognite.client.utils._datapoints import DpsUnpackFns + + adps = AggregateDatapoints( + datapoints=[ + AggregateDatapoint( + timestamp=1000, + average=1.0, + stateAggregates=[ + ProtoStateAggregate(numericValue=1, stringValue="a", stateCount=2), + ProtoStateAggregate(numericValue=2, stringValue="b", stateTransitions=3), + ], + ) + ] + ) + rows = DpsUnpackFns.extract_state_aggregates_rows(adps.datapoints) + assert len(rows) == 1 + assert len(rows[0]) == 2 + assert rows[0][0].numeric_value == 1 + assert rows[0][0].string_value == "a" + assert rows[0][0].state_count == 2 + assert rows[0][1].state_transitions == 3 + + +@pytest.mark.allow_no_semaphore( + "DatapointsAPI._insert_datapoints holds the semaphore via outer 'async with' for memory " + "backpressure, then passes None to _post to avoid double-acquiring." +) +class TestStateTimeSeriesInsert: + def test_insert_state_dict_emits_feature_preview_warning( + self, cognite_client: CogniteClient, mock_post_datapoints: HTTPXMock + ) -> None: + from cognite.client.utils._experimental import FeaturePreviewWarning + + dps: list[dict[str, Any]] = [{"timestamp": 1, "numericValue": 0, "stringValue": "off"}] + with pytest.warns(FeaturePreviewWarning, match="State time series"): + cognite_client.time_series.data.insert(dps, id=1) + req = mock_post_datapoints.get_requests()[0] + body = jsgz_load(req.content) + dumped = body["items"][0]["datapoints"][0] + assert dumped["numericValue"] == 0 + assert dumped["stringValue"] == "off" + assert "value" not in dumped + + def test_datapoints_query_includes_state_aggregate_names(self) -> None: + from cognite.client.data_classes import DatapointsQuery + + q = DatapointsQuery( + id=1, + aggregates=["average", "state_count", "state_transitions"], + granularity="1h", + ) + aggs = q.aggs_camel_case + assert "stateCount" in aggs + assert "stateTransitions" in aggs + assert "average" in aggs diff --git a/tests/tests_unit/test_data_classes/test_datapoints.py b/tests/tests_unit/test_data_classes/test_datapoints.py index c8de7f6419..0b387cf34c 100644 --- a/tests/tests_unit/test_data_classes/test_datapoints.py +++ b/tests/tests_unit/test_data_classes/test_datapoints.py @@ -6,10 +6,10 @@ import pytest -from cognite.client.data_classes import Datapoint, DatapointsArray +from cognite.client.data_classes import Datapoint, Datapoints, DatapointsArray from cognite.client.data_classes._base import CogniteResourceList from cognite.client.data_classes.data_modeling.ids import NodeId -from cognite.client.data_classes.datapoints import DatapointsArrayList, DatapointsList +from cognite.client.data_classes.datapoints import DatapointsArrayList, DatapointsList, StateAggregate class TestDatapoint: @@ -117,3 +117,58 @@ def test_identifier_priority(self, dps_lst_cls: type[CogniteResourceList]) -> No [(123,), ("foo",), (NodeId(space="s", external_id="x"),)], names=["identifier"] ) pd.testing.assert_frame_equal(df, exp_df) + + +class TestStateAggregate: + def test_load_dump_round_trip(self) -> None: + raw = { + "numericValue": 7, + "stringValue": "on", + "stateCount": 2, + "stateTransitions": 1, + "stateDuration": 3600, + } + sa = StateAggregate._load(raw) + assert sa.numeric_value == 7 + assert sa.string_value == "on" + assert sa.state_count == 2 + assert sa.state_transitions == 1 + assert sa.state_duration == 3600 + assert StateAggregate._load(sa.dump(camel_case=False)) == sa + + +@pytest.mark.dsl +class TestDatapointsStateSeries: + def test_dump_load_state_raw(self) -> None: + # State series with sparse columns must round-trip via Datapoints.dump/_load. + dps = Datapoints( + id=1, + timestamp=[1, 2], + is_string=False, + is_step=False, + type="state", + numeric_value=[1, None], + string_value=[None, "b"], + ) + loaded = Datapoints._load(dps.dump()) + assert loaded.type == "state" + assert loaded.numeric_value is not None + assert loaded.string_value is not None + assert list(loaded.numeric_value) == [1, None] + assert list(loaded.string_value) == [None, "b"] + + def test_datapoints_array_state_dump(self) -> None: + import numpy as np + + dps = DatapointsArray( + id=9, + timestamp=np.array([1000, 2000], dtype="datetime64[ms]"), + is_string=False, + is_step=False, + type="state", + numeric_value=np.array([3, 4], dtype=np.int64), + string_value=None, + ) + dumped = dps.dump() + assert dumped["type"] == "state" + assert len(dumped["datapoints"]) == 2 diff --git a/tests/utils.py b/tests/utils.py index 24651c73ad..88a06cc3a0 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -421,6 +421,10 @@ def create_instance(self, resource_cls: type[T_Object], skip_defaulted_args: boo else: for raw in ["value", "status_code", "status_symbol", "min_datapoint", "max_datapoint"]: keyword_arguments.pop(raw, None) + # State-series fields are only valid when type == "state"; the random choice above + # never produces that, so drop them to avoid a malformed instance. + for state_field in ("numeric_value", "string_value", "state_aggregates"): + keyword_arguments.pop(state_field, None) elif resource_cls is TransformationSchemaUnknownType: keyword_arguments["raw_schema"]["type"] = "unknown" elif resource_cls is SequenceRows: