diff --git a/cognite/client/data_classes/__init__.py b/cognite/client/data_classes/__init__.py index 8fd8f2b9d6..1eaa11c3bb 100644 --- a/cognite/client/data_classes/__init__.py +++ b/cognite/client/data_classes/__init__.py @@ -288,6 +288,8 @@ SubworkflowTaskParameters, TransformationTaskOutput, TransformationTaskParameters, + UnknownWorkflowTaskOutput, + UnknownWorkflowTaskParameters, Workflow, WorkflowDefinition, WorkflowDefinitionUpsert, @@ -548,6 +550,8 @@ "TransformationUpdate", "TransformationWrite", "TransformationWriteList", + "UnknownWorkflowTaskOutput", + "UnknownWorkflowTaskParameters", "UserProfile", "UserProfileList", "Workflow", diff --git a/cognite/client/data_classes/workflows.py b/cognite/client/data_classes/workflows.py index 6c2e03283f..65e4917955 100644 --- a/cognite/client/data_classes/workflows.py +++ b/cognite/client/data_classes/workflows.py @@ -1,8 +1,10 @@ from __future__ import annotations +import warnings from abc import ABC, abstractmethod from collections import UserList from collections.abc import Collection, Sequence +from copy import deepcopy from dataclasses import dataclass from typing import Any, ClassVar, Literal, TypeAlias, cast, final from zoneinfo import ZoneInfo @@ -148,7 +150,14 @@ def as_write(self) -> WorkflowUpsertList: return WorkflowUpsertList([workflow.as_write() for workflow in self.data]) -ValidTaskType = Literal["function", "transformation", "cdf", "dynamic", "subworkflow", "simulation"] +ValidTaskType = Literal[ + "function", + "transformation", + "cdf", + "dynamic", + "subworkflow", + "simulation", +] class WorkflowTaskParameters(CogniteResource, ABC): @@ -179,7 +188,7 @@ def load_parameters(cls, data: dict) -> WorkflowTaskParameters: elif type_ == "simulation": return SimulationTaskParameters._load(parameters) else: - raise ValueError(f"Unknown task type: {type_}. Expected {ValidTaskType}") + return UnknownWorkflowTaskParameters(type_, deepcopy(parameters)) class FunctionTaskParameters(WorkflowTaskParameters): @@ -257,6 +266,42 @@ def dump(self, camel_case: bool = True) -> dict[str, Any]: return output +class UnknownWorkflowTaskParameters(WorkflowTaskParameters): + # CogniteResource.load parses strings to a dict before _load; a bare scalar from dump() + # would still parse to a non-dict and fail. Wrapping non-dict payloads keeps JSON/YAML + # round-trips (e.g. test_base) valid while still storing the original value in _parameters. + _OPAQUE_WRAPPER_KEY: ClassVar[str] = "__cogniteSdkUnknownWorkflowTaskParametersOpaque__" + + def __init__(self, dynamic_task_type: str | None, parameters: Any) -> None: + self.dynamic_task_type = dynamic_task_type + self._parameters = parameters + + @classmethod + def _load(cls, resource: dict[str, Any]) -> Self: + # dump() wraps non-dict _parameters in a one-key dict so load() always receives a dict; + # here we reverse that and store the inner value again (no task type on that path). + if len(resource) == 1 and next(iter(resource), None) == cls._OPAQUE_WRAPPER_KEY: + return cls(None, deepcopy(resource[cls._OPAQUE_WRAPPER_KEY])) + t = resource.get("type", resource.get("taskType")) + return cls(t, deepcopy(resource)) + + @property + def task_type(self) -> ValidTaskType: # type: ignore[override] + return cast(ValidTaskType, self.dynamic_task_type) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + if not camel_case: + warnings.warn( + "camel_case=False is ignored for UnknownWorkflowTaskParameters.dump(); API payloads use camelCase keys.", + UserWarning, + stacklevel=2, + ) + p = deepcopy(self._parameters) + if isinstance(p, dict): + return p + return {self._OPAQUE_WRAPPER_KEY: p} + + _SIMULATORS_WARNING = FeaturePreviewWarning( api_maturity="General Availability", sdk_maturity="alpha", feature_name="Simulators" ) @@ -626,7 +671,7 @@ def load_output(cls, data: dict) -> WorkflowTaskOutput: elif task_type == "simulation": return SimulationTaskOutput.load(data) else: - raise ValueError(f"Unknown task type: {task_type}") + return UnknownWorkflowTaskOutput.load(data) @abstractmethod def dump(self, camel_case: bool = True) -> dict[str, Any]: @@ -664,6 +709,28 @@ def dump(self, camel_case: bool = True) -> dict[str, Any]: } +class UnknownWorkflowTaskOutput(WorkflowTaskOutput): + def __init__(self, output: Any) -> None: + self._output = output + + @classmethod + def load(cls, data: dict[str, Any]) -> Self: + return cls(data.get("output", {})) + + @property + def task_type(self) -> ValidTaskType: # type: ignore[override] + return cast(ValidTaskType, "unknown") + + def dump(self, camel_case: bool = True) -> Any: + if not camel_case: + warnings.warn( + "camel_case=False is ignored for UnknownWorkflowTaskOutput.dump(); API payloads use camelCase keys.", + UserWarning, + stacklevel=2, + ) + return deepcopy(self._output) + + class SimulationTaskOutput(WorkflowTaskOutput): """ The class represent the output of Simulation execution. diff --git a/tests/tests_unit/test_data_classes/test_workflows.py b/tests/tests_unit/test_data_classes/test_workflows.py index 98d986ed51..717fe6bba3 100644 --- a/tests/tests_unit/test_data_classes/test_workflows.py +++ b/tests/tests_unit/test_data_classes/test_workflows.py @@ -16,6 +16,7 @@ SimulationTaskParameters, TransformationTaskOutput, TransformationTaskParameters, + UnknownWorkflowTaskOutput, WorkflowDefinition, WorkflowDefinitionUpsert, WorkflowExecutionDetailed, @@ -245,8 +246,35 @@ class TestWorkflowTask: }, }, ), + ( + { + "externalId": "taskFuture", + "type": "futureWorkflowTaskType", + "parameters": {"futureWorkflowTaskType": {"alpha": 1}}, + }, + ), ], ) def test_serialization(self, raw: dict[str, Any]) -> None: loaded = WorkflowTask._load(raw) assert loaded.dump() == raw + + +class TestWorkflowTaskOutputDispatch: + def test_load_output_unknown_task_type(self) -> None: + data: dict[str, Any] = {"taskType": "customWorkflowOutput", "output": {"customField": 99}} + loaded = WorkflowTaskOutput.load_output(data) + assert isinstance(loaded, UnknownWorkflowTaskOutput) + + @pytest.mark.parametrize( + "output_payload", + [ + {"customField": 99}, + {"otherKey": "value"}, + ], + ) + def test_unknown_output_dump_returns_stored_payload(self, output_payload: dict[str, Any]) -> None: + data: dict[str, Any] = {"taskType": "customWorkflowOutput", "output": output_payload} + loaded = WorkflowTaskOutput.load_output(data) + assert isinstance(loaded, UnknownWorkflowTaskOutput) + assert loaded.dump() == output_payload