From 85b85024e623b3b706f2f902f5f313890ab57ec6 Mon Sep 17 00:00:00 2001 From: Sachin Bhagwat Date: Wed, 6 May 2026 15:58:50 +0530 Subject: [PATCH 01/18] fix(workflows): support functionApp task type and unknown task fallbacks Parse functionApp parameters and execution output as first-class types. Route unrecognized task types through Unknown parameters/output so listing workflow versions no longer fails when the API adds new task kinds. Co-authored-by: Cursor --- cognite/client/data_classes/__init__.py | 8 ++ cognite/client/data_classes/workflows.py | 123 ++++++++++++++++-- .../test_data_classes/test_workflows.py | 35 +++++ 3 files changed, 157 insertions(+), 9 deletions(-) diff --git a/cognite/client/data_classes/__init__.py b/cognite/client/data_classes/__init__.py index 9d4fab6aad..3c8b55e3c7 100644 --- a/cognite/client/data_classes/__init__.py +++ b/cognite/client/data_classes/__init__.py @@ -279,6 +279,8 @@ CDFTaskParameters, DynamicTaskOutput, DynamicTaskParameters, + FunctionAppTaskOutput, + FunctionAppTaskParameters, FunctionTaskOutput, FunctionTaskParameters, SimulationTaskOutput, @@ -286,6 +288,8 @@ SubworkflowTaskParameters, TransformationTaskOutput, TransformationTaskParameters, + UnknownWorkflowTaskOutput, + UnknownWorkflowTaskParameters, Workflow, WorkflowDefinition, WorkflowDefinitionUpsert, @@ -414,6 +418,8 @@ "FileMetadataWriteList", "FileMultipartUploadSession", "Function", + "FunctionAppTaskOutput", + "FunctionAppTaskParameters", "FunctionCall", "FunctionCallList", "FunctionCallLog", @@ -544,6 +550,8 @@ "TransformationUpdate", "TransformationWrite", "TransformationWriteList", + "UnknownWorkflowTaskOutput", + "UnknownWorkflowTaskParameters", "UserProfile", "UserProfileList", "Workflow", diff --git a/cognite/client/data_classes/workflows.py b/cognite/client/data_classes/workflows.py index 6c2e03283f..226ea5d56d 100644 --- a/cognite/client/data_classes/workflows.py +++ b/cognite/client/data_classes/workflows.py @@ -23,7 +23,7 @@ SimulationInputOverride, ) from cognite.client.utils._experimental import FeaturePreviewWarning -from cognite.client.utils._text import convert_all_keys_to_camel_case, to_snake_case +from cognite.client.utils._text import convert_all_keys_recursive, convert_all_keys_to_camel_case, to_snake_case TaskStatus: TypeAlias = Literal[ "in_progress", @@ -148,7 +148,15 @@ def as_write(self) -> WorkflowUpsertList: return WorkflowUpsertList([workflow.as_write() for workflow in self.data]) -ValidTaskType = Literal["function", "transformation", "cdf", "dynamic", "subworkflow", "simulation"] +ValidTaskType = Literal[ + "function", + "transformation", + "cdf", + "dynamic", + "subworkflow", + "simulation", + "functionApp", +] class WorkflowTaskParameters(CogniteResource, ABC): @@ -166,6 +174,8 @@ def load_parameters(cls, data: dict) -> WorkflowTaskParameters: if type_ == "function": return FunctionTaskParameters._load(parameters) + elif type_ == "functionApp": + return FunctionAppTaskParameters._load(parameters) elif type_ == "transformation": return TransformationTaskParameters._load(parameters) elif type_ == "cdf": @@ -179,7 +189,7 @@ def load_parameters(cls, data: dict) -> WorkflowTaskParameters: elif type_ == "simulation": return SimulationTaskParameters._load(parameters) else: - raise ValueError(f"Unknown task type: {type_}. Expected {ValidTaskType}") + return UnknownWorkflowTaskParameters(type_ if isinstance(type_, str) else "unknown", parameters) class FunctionTaskParameters(WorkflowTaskParameters): @@ -257,6 +267,63 @@ def dump(self, camel_case: bool = True) -> dict[str, Any]: return output +class FunctionAppTaskParameters(WorkflowTaskParameters): + task_type = "functionApp" + + def __init__( + self, + external_id: str, + data: dict | str | None = None, + is_async_complete: bool | None = None, + ) -> None: + self.external_id = external_id + self.data = data + self.is_async_complete = is_async_complete + + @classmethod + def _load(cls, resource: dict[str, Any]) -> FunctionAppTaskParameters: + function_app: dict[str, Any] = resource["functionApp"] + + return cls( + external_id=function_app["externalId"], + data=function_app.get("data"), + is_async_complete=resource.get("isAsyncComplete", resource.get("asyncComplete")), + ) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + function_app: dict[str, Any] = { + ("externalId" if camel_case else "external_id"): self.external_id, + } + if self.data: + function_app["data"] = self.data + + output: dict[str, Any] = { + "functionApp": function_app, + } + if self.is_async_complete is not None: + output["isAsyncComplete" if camel_case else "is_async_complete"] = self.is_async_complete + return output + + +class UnknownWorkflowTaskParameters(WorkflowTaskParameters): + def __init__(self, dynamic_task_type: str, parameters: dict[str, Any]) -> None: + self.dynamic_task_type = dynamic_task_type + self._parameters = parameters + + @classmethod + def _load(cls, resource: dict[str, Any]) -> Self: + type_ = resource.get("type", resource.get("taskType")) + inner = resource.get("parameters", resource.get("input")) or {} + return cls(type_ if isinstance(type_, str) else "unknown", inner) + + @property + def task_type(self) -> str: # type: ignore[override] + return self.dynamic_task_type + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + return convert_all_keys_recursive(self._parameters, camel_case=camel_case) + + _SIMULATORS_WARNING = FeaturePreviewWarning( api_maturity="General Availability", sdk_maturity="alpha", feature_name="Simulators" ) @@ -562,8 +629,8 @@ def __init__( self.depends_on = depends_on @property - def type(self) -> ValidTaskType: - return self.parameters.task_type + def type(self) -> str: + return cast(str, self.parameters.task_type) @classmethod def _load(cls, resource: dict) -> WorkflowTask: @@ -625,8 +692,10 @@ def load_output(cls, data: dict) -> WorkflowTaskOutput: return SubworkflowTaskOutput.load(data) elif task_type == "simulation": return SimulationTaskOutput.load(data) + elif task_type == "functionApp": + return FunctionAppTaskOutput.load(data) else: - raise ValueError(f"Unknown task type: {task_type}") + return UnknownWorkflowTaskOutput.load(data) @abstractmethod def dump(self, camel_case: bool = True) -> dict[str, Any]: @@ -664,6 +733,42 @@ def dump(self, camel_case: bool = True) -> dict[str, Any]: } +class FunctionAppTaskOutput(WorkflowTaskOutput): + task_type: ClassVar[str] = "functionApp" + + def __init__(self, call_id: int | None, function_id: int | None, response: dict | None) -> None: + self.call_id = call_id + self.function_id = function_id + self.response = response + + @classmethod + def load(cls, data: dict[str, Any]) -> FunctionAppTaskOutput: + output = data["output"] + return cls(output.get("callId"), output.get("functionId"), output.get("response")) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + return { + "callId" if camel_case else "call_id": self.call_id, + "functionId" if camel_case else "function_id": self.function_id, + "response": self.response, + } + + +class UnknownWorkflowTaskOutput(WorkflowTaskOutput): + task_type: ClassVar[str] = "" + + def __init__(self, output: dict[str, Any]) -> None: + self._output = output + + @classmethod + def load(cls, data: dict[str, Any]) -> Self: + raw = data.get("output") + return cls(raw if isinstance(raw, dict) else {}) + + def dump(self, camel_case: bool = True) -> dict[str, Any]: + return convert_all_keys_recursive(self._output, camel_case=camel_case) + + class SimulationTaskOutput(WorkflowTaskOutput): """ The class represent the output of Simulation execution. @@ -828,8 +933,8 @@ def __init__( self.reason_for_incompletion = reason_for_incompletion @property - def task_type(self) -> ValidTaskType: - return self.input.task_type + def task_type(self) -> str: + return cast(str, self.input.task_type) @classmethod def _load(cls, resource: dict) -> WorkflowTaskExecution: @@ -851,7 +956,7 @@ def dump(self, camel_case: bool = True) -> dict[str, Any]: output["status"] = self.status.upper() output[("taskType" if camel_case else "task_type")] = self.task_type # API uses isAsyncComplete and asyncComplete inconsistently: - if self.task_type == "function": + if self.task_type in ("function", "functionApp"): if (is_async_complete := output["input"].get("isAsyncComplete")) is not None: output["input"]["asyncComplete"] = is_async_complete del output["input"]["isAsyncComplete"] diff --git a/tests/tests_unit/test_data_classes/test_workflows.py b/tests/tests_unit/test_data_classes/test_workflows.py index 98d986ed51..15736eab18 100644 --- a/tests/tests_unit/test_data_classes/test_workflows.py +++ b/tests/tests_unit/test_data_classes/test_workflows.py @@ -11,11 +11,13 @@ CDFTaskOutput, DynamicTaskOutput, DynamicTaskParameters, + FunctionAppTaskOutput, FunctionTaskOutput, FunctionTaskParameters, SimulationTaskParameters, TransformationTaskOutput, TransformationTaskParameters, + UnknownWorkflowTaskOutput, WorkflowDefinition, WorkflowDefinitionUpsert, WorkflowExecutionDetailed, @@ -245,8 +247,41 @@ class TestWorkflowTask: }, }, ), + ( + { + "externalId": "taskApp", + "type": "functionApp", + "parameters": {"functionApp": {"externalId": "myApp"}}, + }, + ), + ( + { + "externalId": "taskFuture", + "type": "futureWorkflowTaskType", + "parameters": {"futureWorkflowTaskType": {"alpha": 1}}, + }, + ), ], ) def test_serialization(self, raw: dict[str, Any]) -> None: loaded = WorkflowTask._load(raw) assert loaded.dump() == raw + + +class TestWorkflowTaskOutputDispatch: + @pytest.mark.parametrize( + ["data", "expected_type"], + [ + ( + {"taskType": "functionApp", "output": {"callId": 1, "functionId": 2, "response": {"k": "v"}}}, + FunctionAppTaskOutput, + ), + ( + {"taskType": "customWorkflowOutput", "output": {"customField": 99}}, + UnknownWorkflowTaskOutput, + ), + ], + ) + def test_load_output(self, data: dict[str, Any], expected_type: type[WorkflowTaskOutput]) -> None: + loaded = WorkflowTaskOutput.load_output(data) + assert isinstance(loaded, expected_type) From e031d8bbe0ccb871c362a41ba1366cd5ebcc3374 Mon Sep 17 00:00:00 2001 From: Sachin Bhagwat Date: Wed, 6 May 2026 16:18:34 +0530 Subject: [PATCH 02/18] Update cognite/client/data_classes/workflows.py Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> --- cognite/client/data_classes/workflows.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/cognite/client/data_classes/workflows.py b/cognite/client/data_classes/workflows.py index 226ea5d56d..7bfa3f49f0 100644 --- a/cognite/client/data_classes/workflows.py +++ b/cognite/client/data_classes/workflows.py @@ -306,7 +306,7 @@ def dump(self, camel_case: bool = True) -> dict[str, Any]: class UnknownWorkflowTaskParameters(WorkflowTaskParameters): - def __init__(self, dynamic_task_type: str, parameters: dict[str, Any]) -> None: + def __init__(self, dynamic_task_type: str, parameters: Any) -> None: self.dynamic_task_type = dynamic_task_type self._parameters = parameters @@ -321,6 +321,8 @@ def task_type(self) -> str: # type: ignore[override] return self.dynamic_task_type def dump(self, camel_case: bool = True) -> dict[str, Any]: + if not isinstance(self._parameters, dict): + return self._parameters # type: ignore[return-value] return convert_all_keys_recursive(self._parameters, camel_case=camel_case) From ff2e4dd7ac401c9f6932c0f3f681aa372358f1d1 Mon Sep 17 00:00:00 2001 From: Sachin Bhagwat Date: Wed, 6 May 2026 16:24:20 +0530 Subject: [PATCH 03/18] Workflows: UnknownWorkflowTaskOutput stores task type and handles non-dict output Co-authored-by: Cursor --- cognite/client/data_classes/workflows.py | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/cognite/client/data_classes/workflows.py b/cognite/client/data_classes/workflows.py index 7bfa3f49f0..8bd661034c 100644 --- a/cognite/client/data_classes/workflows.py +++ b/cognite/client/data_classes/workflows.py @@ -322,7 +322,7 @@ def task_type(self) -> str: # type: ignore[override] def dump(self, camel_case: bool = True) -> dict[str, Any]: if not isinstance(self._parameters, dict): - return self._parameters # type: ignore[return-value] + return self._parameters return convert_all_keys_recursive(self._parameters, camel_case=camel_case) @@ -757,17 +757,23 @@ def dump(self, camel_case: bool = True) -> dict[str, Any]: class UnknownWorkflowTaskOutput(WorkflowTaskOutput): - task_type: ClassVar[str] = "" - - def __init__(self, output: dict[str, Any]) -> None: + def __init__(self, task_type: str, output: Any) -> None: + self.dynamic_task_type = task_type self._output = output @classmethod def load(cls, data: dict[str, Any]) -> Self: - raw = data.get("output") - return cls(raw if isinstance(raw, dict) else {}) + task_type = data.get("taskType", "unknown") + output = data.get("output") + return cls(task_type, output if isinstance(output, dict) else {}) + + @property + def task_type(self) -> str: # type: ignore[override] + return self.dynamic_task_type def dump(self, camel_case: bool = True) -> dict[str, Any]: + if not isinstance(self._output, dict): + return self._output return convert_all_keys_recursive(self._output, camel_case=camel_case) From 345eae01c2971c516f2bfc2da77e7ceb804ca9a2 Mon Sep 17 00:00:00 2001 From: Sachin Bhagwat Date: Wed, 6 May 2026 16:37:04 +0530 Subject: [PATCH 04/18] Workflows: always dict-shaped unknown task dump; function-only asyncComplete normalize Co-authored-by: Cursor --- cognite/client/data_classes/workflows.py | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/cognite/client/data_classes/workflows.py b/cognite/client/data_classes/workflows.py index 8bd661034c..69e44d7d07 100644 --- a/cognite/client/data_classes/workflows.py +++ b/cognite/client/data_classes/workflows.py @@ -321,9 +321,10 @@ def task_type(self) -> str: # type: ignore[override] return self.dynamic_task_type def dump(self, camel_case: bool = True) -> dict[str, Any]: - if not isinstance(self._parameters, dict): - return self._parameters - return convert_all_keys_recursive(self._parameters, camel_case=camel_case) + return convert_all_keys_recursive( + self._parameters if isinstance(self._parameters, dict) else {}, + camel_case=camel_case, + ) _SIMULATORS_WARNING = FeaturePreviewWarning( @@ -772,9 +773,10 @@ def task_type(self) -> str: # type: ignore[override] return self.dynamic_task_type def dump(self, camel_case: bool = True) -> dict[str, Any]: - if not isinstance(self._output, dict): - return self._output - return convert_all_keys_recursive(self._output, camel_case=camel_case) + return convert_all_keys_recursive( + self._output if isinstance(self._output, dict) else {}, + camel_case=camel_case, + ) class SimulationTaskOutput(WorkflowTaskOutput): @@ -964,7 +966,7 @@ def dump(self, camel_case: bool = True) -> dict[str, Any]: output["status"] = self.status.upper() output[("taskType" if camel_case else "task_type")] = self.task_type # API uses isAsyncComplete and asyncComplete inconsistently: - if self.task_type in ("function", "functionApp"): + if self.task_type == "function": if (is_async_complete := output["input"].get("isAsyncComplete")) is not None: output["input"]["asyncComplete"] = is_async_complete del output["input"]["isAsyncComplete"] From 0288a36f07236f511c02b9979c6cf8d286681ff5 Mon Sep 17 00:00:00 2001 From: Sachin Bhagwat Date: Thu, 7 May 2026 11:12:29 +0530 Subject: [PATCH 05/18] Workflows: drop FunctionApp task classes; use unknown param/output for functionApp Co-authored-by: Cursor --- cognite/client/data_classes/__init__.py | 4 -- cognite/client/data_classes/workflows.py | 63 ------------------- .../test_data_classes/test_workflows.py | 26 +------- 3 files changed, 3 insertions(+), 90 deletions(-) diff --git a/cognite/client/data_classes/__init__.py b/cognite/client/data_classes/__init__.py index 3c8b55e3c7..c824ffcdf4 100644 --- a/cognite/client/data_classes/__init__.py +++ b/cognite/client/data_classes/__init__.py @@ -279,8 +279,6 @@ CDFTaskParameters, DynamicTaskOutput, DynamicTaskParameters, - FunctionAppTaskOutput, - FunctionAppTaskParameters, FunctionTaskOutput, FunctionTaskParameters, SimulationTaskOutput, @@ -418,8 +416,6 @@ "FileMetadataWriteList", "FileMultipartUploadSession", "Function", - "FunctionAppTaskOutput", - "FunctionAppTaskParameters", "FunctionCall", "FunctionCallList", "FunctionCallLog", diff --git a/cognite/client/data_classes/workflows.py b/cognite/client/data_classes/workflows.py index 69e44d7d07..22251a4cdf 100644 --- a/cognite/client/data_classes/workflows.py +++ b/cognite/client/data_classes/workflows.py @@ -174,8 +174,6 @@ def load_parameters(cls, data: dict) -> WorkflowTaskParameters: if type_ == "function": return FunctionTaskParameters._load(parameters) - elif type_ == "functionApp": - return FunctionAppTaskParameters._load(parameters) elif type_ == "transformation": return TransformationTaskParameters._load(parameters) elif type_ == "cdf": @@ -267,44 +265,6 @@ def dump(self, camel_case: bool = True) -> dict[str, Any]: return output -class FunctionAppTaskParameters(WorkflowTaskParameters): - task_type = "functionApp" - - def __init__( - self, - external_id: str, - data: dict | str | None = None, - is_async_complete: bool | None = None, - ) -> None: - self.external_id = external_id - self.data = data - self.is_async_complete = is_async_complete - - @classmethod - def _load(cls, resource: dict[str, Any]) -> FunctionAppTaskParameters: - function_app: dict[str, Any] = resource["functionApp"] - - return cls( - external_id=function_app["externalId"], - data=function_app.get("data"), - is_async_complete=resource.get("isAsyncComplete", resource.get("asyncComplete")), - ) - - def dump(self, camel_case: bool = True) -> dict[str, Any]: - function_app: dict[str, Any] = { - ("externalId" if camel_case else "external_id"): self.external_id, - } - if self.data: - function_app["data"] = self.data - - output: dict[str, Any] = { - "functionApp": function_app, - } - if self.is_async_complete is not None: - output["isAsyncComplete" if camel_case else "is_async_complete"] = self.is_async_complete - return output - - class UnknownWorkflowTaskParameters(WorkflowTaskParameters): def __init__(self, dynamic_task_type: str, parameters: Any) -> None: self.dynamic_task_type = dynamic_task_type @@ -695,8 +655,6 @@ def load_output(cls, data: dict) -> WorkflowTaskOutput: return SubworkflowTaskOutput.load(data) elif task_type == "simulation": return SimulationTaskOutput.load(data) - elif task_type == "functionApp": - return FunctionAppTaskOutput.load(data) else: return UnknownWorkflowTaskOutput.load(data) @@ -736,27 +694,6 @@ def dump(self, camel_case: bool = True) -> dict[str, Any]: } -class FunctionAppTaskOutput(WorkflowTaskOutput): - task_type: ClassVar[str] = "functionApp" - - def __init__(self, call_id: int | None, function_id: int | None, response: dict | None) -> None: - self.call_id = call_id - self.function_id = function_id - self.response = response - - @classmethod - def load(cls, data: dict[str, Any]) -> FunctionAppTaskOutput: - output = data["output"] - return cls(output.get("callId"), output.get("functionId"), output.get("response")) - - def dump(self, camel_case: bool = True) -> dict[str, Any]: - return { - "callId" if camel_case else "call_id": self.call_id, - "functionId" if camel_case else "function_id": self.function_id, - "response": self.response, - } - - class UnknownWorkflowTaskOutput(WorkflowTaskOutput): def __init__(self, task_type: str, output: Any) -> None: self.dynamic_task_type = task_type diff --git a/tests/tests_unit/test_data_classes/test_workflows.py b/tests/tests_unit/test_data_classes/test_workflows.py index 15736eab18..f44b309b72 100644 --- a/tests/tests_unit/test_data_classes/test_workflows.py +++ b/tests/tests_unit/test_data_classes/test_workflows.py @@ -11,7 +11,6 @@ CDFTaskOutput, DynamicTaskOutput, DynamicTaskParameters, - FunctionAppTaskOutput, FunctionTaskOutput, FunctionTaskParameters, SimulationTaskParameters, @@ -247,13 +246,6 @@ class TestWorkflowTask: }, }, ), - ( - { - "externalId": "taskApp", - "type": "functionApp", - "parameters": {"functionApp": {"externalId": "myApp"}}, - }, - ), ( { "externalId": "taskFuture", @@ -269,19 +261,7 @@ def test_serialization(self, raw: dict[str, Any]) -> None: class TestWorkflowTaskOutputDispatch: - @pytest.mark.parametrize( - ["data", "expected_type"], - [ - ( - {"taskType": "functionApp", "output": {"callId": 1, "functionId": 2, "response": {"k": "v"}}}, - FunctionAppTaskOutput, - ), - ( - {"taskType": "customWorkflowOutput", "output": {"customField": 99}}, - UnknownWorkflowTaskOutput, - ), - ], - ) - def test_load_output(self, data: dict[str, Any], expected_type: type[WorkflowTaskOutput]) -> None: + def test_load_output_unknown_task_type(self) -> None: + data: dict[str, Any] = {"taskType": "customWorkflowOutput", "output": {"customField": 99}} loaded = WorkflowTaskOutput.load_output(data) - assert isinstance(loaded, expected_type) + assert isinstance(loaded, UnknownWorkflowTaskOutput) From f34bac0881e67edca6b9934855d99a3c31803218 Mon Sep 17 00:00:00 2001 From: Sachin Bhagwat Date: Fri, 8 May 2026 11:05:51 +0530 Subject: [PATCH 06/18] Workflows: delegate unknown load to _load; drop recursive key convert on unknown dumps Co-authored-by: Cursor --- cognite/client/data_classes/workflows.py | 40 +++++++++++++++--------- 1 file changed, 25 insertions(+), 15 deletions(-) diff --git a/cognite/client/data_classes/workflows.py b/cognite/client/data_classes/workflows.py index 22251a4cdf..cccb0d8f3f 100644 --- a/cognite/client/data_classes/workflows.py +++ b/cognite/client/data_classes/workflows.py @@ -1,8 +1,10 @@ from __future__ import annotations +import warnings from abc import ABC, abstractmethod from collections import UserList from collections.abc import Collection, Sequence +from copy import deepcopy from dataclasses import dataclass from typing import Any, ClassVar, Literal, TypeAlias, cast, final from zoneinfo import ZoneInfo @@ -23,7 +25,7 @@ SimulationInputOverride, ) from cognite.client.utils._experimental import FeaturePreviewWarning -from cognite.client.utils._text import convert_all_keys_recursive, convert_all_keys_to_camel_case, to_snake_case +from cognite.client.utils._text import convert_all_keys_to_camel_case, to_snake_case TaskStatus: TypeAlias = Literal[ "in_progress", @@ -187,7 +189,7 @@ def load_parameters(cls, data: dict) -> WorkflowTaskParameters: elif type_ == "simulation": return SimulationTaskParameters._load(parameters) else: - return UnknownWorkflowTaskParameters(type_ if isinstance(type_, str) else "unknown", parameters) + return UnknownWorkflowTaskParameters._load(data) class FunctionTaskParameters(WorkflowTaskParameters): @@ -277,14 +279,18 @@ def _load(cls, resource: dict[str, Any]) -> Self: return cls(type_ if isinstance(type_, str) else "unknown", inner) @property - def task_type(self) -> str: # type: ignore[override] - return self.dynamic_task_type + def task_type(self) -> ValidTaskType: # type: ignore[override] + return cast(ValidTaskType, self.dynamic_task_type) def dump(self, camel_case: bool = True) -> dict[str, Any]: - return convert_all_keys_recursive( - self._parameters if isinstance(self._parameters, dict) else {}, - camel_case=camel_case, - ) + if not camel_case: + warnings.warn( + "camel_case=False is ignored for UnknownWorkflowTaskParameters.dump(); API payloads use camelCase keys.", + UserWarning, + stacklevel=2, + ) + params = self._parameters if isinstance(self._parameters, dict) else {} + return deepcopy(params) _SIMULATORS_WARNING = FeaturePreviewWarning( @@ -706,14 +712,18 @@ def load(cls, data: dict[str, Any]) -> Self: return cls(task_type, output if isinstance(output, dict) else {}) @property - def task_type(self) -> str: # type: ignore[override] - return self.dynamic_task_type + def task_type(self) -> ValidTaskType: # type: ignore[override] + return cast(ValidTaskType, self.dynamic_task_type) def dump(self, camel_case: bool = True) -> dict[str, Any]: - return convert_all_keys_recursive( - self._output if isinstance(self._output, dict) else {}, - camel_case=camel_case, - ) + if not camel_case: + warnings.warn( + "camel_case=False is ignored for UnknownWorkflowTaskOutput.dump(); API payloads use camelCase keys.", + UserWarning, + stacklevel=2, + ) + out = self._output if isinstance(self._output, dict) else {} + return deepcopy(out) class SimulationTaskOutput(WorkflowTaskOutput): @@ -881,7 +891,7 @@ def __init__( @property def task_type(self) -> str: - return cast(str, self.input.task_type) + return self.input.task_type @classmethod def _load(cls, resource: dict) -> WorkflowTaskExecution: From 1ccc16397d782f3e1e2676d1dfc1929dfd86224f Mon Sep 17 00:00:00 2001 From: Sachin Bhagwat Date: Fri, 8 May 2026 12:52:49 +0530 Subject: [PATCH 07/18] Align WorkflowTask.type with ValidTaskType from parameters Co-authored-by: Cursor --- cognite/client/data_classes/workflows.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cognite/client/data_classes/workflows.py b/cognite/client/data_classes/workflows.py index cccb0d8f3f..46b4bea43d 100644 --- a/cognite/client/data_classes/workflows.py +++ b/cognite/client/data_classes/workflows.py @@ -598,8 +598,8 @@ def __init__( self.depends_on = depends_on @property - def type(self) -> str: - return cast(str, self.parameters.task_type) + def type(self) -> ValidTaskType: + return self.parameters.task_type @classmethod def _load(cls, resource: dict) -> WorkflowTask: From 48921165626d95ff3cd134f0e4d43dd7ffe01817 Mon Sep 17 00:00:00 2001 From: Sachin Bhagwat Date: Fri, 8 May 2026 12:56:51 +0530 Subject: [PATCH 08/18] Type WorkflowTaskExecution.task_type as ValidTaskType Co-authored-by: Cursor --- cognite/client/data_classes/workflows.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cognite/client/data_classes/workflows.py b/cognite/client/data_classes/workflows.py index 46b4bea43d..81aaa0680a 100644 --- a/cognite/client/data_classes/workflows.py +++ b/cognite/client/data_classes/workflows.py @@ -890,7 +890,7 @@ def __init__( self.reason_for_incompletion = reason_for_incompletion @property - def task_type(self) -> str: + def task_type(self) -> ValidTaskType: return self.input.task_type @classmethod From ee0d026617c01ae0c111b24d3317be22333ba86b Mon Sep 17 00:00:00 2001 From: Sachin Bhagwat Date: Fri, 8 May 2026 16:20:44 +0530 Subject: [PATCH 09/18] Address review: unknown workflow task params/output dump and load Co-authored-by: Cursor --- cognite/client/data_classes/workflows.py | 22 +++++++++++-------- .../test_data_classes/test_workflows.py | 13 +++++++++++ 2 files changed, 26 insertions(+), 9 deletions(-) diff --git a/cognite/client/data_classes/workflows.py b/cognite/client/data_classes/workflows.py index 81aaa0680a..102d2ee75e 100644 --- a/cognite/client/data_classes/workflows.py +++ b/cognite/client/data_classes/workflows.py @@ -268,14 +268,20 @@ def dump(self, camel_case: bool = True) -> dict[str, Any]: class UnknownWorkflowTaskParameters(WorkflowTaskParameters): - def __init__(self, dynamic_task_type: str, parameters: Any) -> None: + def __init__(self, dynamic_task_type: str, parameters: dict[str, Any]) -> None: self.dynamic_task_type = dynamic_task_type self._parameters = parameters @classmethod def _load(cls, resource: dict[str, Any]) -> Self: type_ = resource.get("type", resource.get("taskType")) - inner = resource.get("parameters", resource.get("input")) or {} + raw_inner = resource.get("parameters", resource.get("input")) + if raw_inner is None: + inner: dict[str, Any] = {} + elif isinstance(raw_inner, dict): + inner = raw_inner + else: + inner = {} return cls(type_ if isinstance(type_, str) else "unknown", inner) @property @@ -289,8 +295,7 @@ def dump(self, camel_case: bool = True) -> dict[str, Any]: UserWarning, stacklevel=2, ) - params = self._parameters if isinstance(self._parameters, dict) else {} - return deepcopy(params) + return deepcopy(self._parameters) _SIMULATORS_WARNING = FeaturePreviewWarning( @@ -708,22 +713,21 @@ def __init__(self, task_type: str, output: Any) -> None: @classmethod def load(cls, data: dict[str, Any]) -> Self: task_type = data.get("taskType", "unknown") - output = data.get("output") - return cls(task_type, output if isinstance(output, dict) else {}) + raw_output = data.get("output") + return cls(task_type, {} if raw_output is None else raw_output) @property def task_type(self) -> ValidTaskType: # type: ignore[override] return cast(ValidTaskType, self.dynamic_task_type) - def dump(self, camel_case: bool = True) -> dict[str, Any]: + def dump(self, camel_case: bool = True) -> Any: if not camel_case: warnings.warn( "camel_case=False is ignored for UnknownWorkflowTaskOutput.dump(); API payloads use camelCase keys.", UserWarning, stacklevel=2, ) - out = self._output if isinstance(self._output, dict) else {} - return deepcopy(out) + return deepcopy(self._output) class SimulationTaskOutput(WorkflowTaskOutput): diff --git a/tests/tests_unit/test_data_classes/test_workflows.py b/tests/tests_unit/test_data_classes/test_workflows.py index f44b309b72..bfd0bca90f 100644 --- a/tests/tests_unit/test_data_classes/test_workflows.py +++ b/tests/tests_unit/test_data_classes/test_workflows.py @@ -265,3 +265,16 @@ def test_load_output_unknown_task_type(self) -> None: data: dict[str, Any] = {"taskType": "customWorkflowOutput", "output": {"customField": 99}} loaded = WorkflowTaskOutput.load_output(data) assert isinstance(loaded, UnknownWorkflowTaskOutput) + + @pytest.mark.parametrize( + "output_payload", + [ + {"customField": 99}, + [1, 2, 3], + ], + ) + def test_unknown_output_dump_returns_stored_payload(self, output_payload: dict[str, Any] | list[int]) -> None: + data: dict[str, Any] = {"taskType": "customWorkflowOutput", "output": output_payload} + loaded = WorkflowTaskOutput.load_output(data) + assert isinstance(loaded, UnknownWorkflowTaskOutput) + assert loaded.dump() == output_payload From 0febb2974be4b3526605bbff913c48293c335ef7 Mon Sep 17 00:00:00 2001 From: Sachin Bhagwat Date: Fri, 8 May 2026 16:30:49 +0530 Subject: [PATCH 10/18] Skip generic CogniteResource round-trips for UnknownWorkflowTaskParameters Co-authored-by: Cursor --- tests/tests_unit/test_base.py | 26 +++++++++++++++++++++----- 1 file changed, 21 insertions(+), 5 deletions(-) diff --git a/tests/tests_unit/test_base.py b/tests/tests_unit/test_base.py index eb54d0307c..168ee7412d 100644 --- a/tests/tests_unit/test_base.py +++ b/tests/tests_unit/test_base.py @@ -46,6 +46,7 @@ from cognite.client.data_classes.postgres_gateway import TableList, User, UserCreated, UserCreatedList, UserList from cognite.client.data_classes.sequences import SequenceUpdate from cognite.client.data_classes.time_series import TimeSeries, TimeSeriesList +from cognite.client.data_classes.workflows import UnknownWorkflowTaskParameters from cognite.client.exceptions import CogniteMissingClientError from cognite.client.testing import CogniteClientMock from cognite.client.utils import _json_extended as _json @@ -175,7 +176,10 @@ class TestCogniteResource: "cog_res_subclass", [ pytest.param(cls, id=f"{cls.__name__} in {cls.__module__}") - for cls in all_concrete_subclasses(CogniteResource, exclude={SyntheticDatapoints, SubscriptionDatapoints}) + for cls in all_concrete_subclasses( + CogniteResource, + exclude={SyntheticDatapoints, SubscriptionDatapoints, UnknownWorkflowTaskParameters}, + ) ], ) def test_json_serialize( @@ -198,7 +202,13 @@ def test_json_serialize( # Agent._load requires runtimeVersion/ownerId (always sent by the API), # but Agent.__init__ keeps them optional for SDK back-compat. The # minimal-args round-trip therefore can't satisfy both contracts. - for cls in all_concrete_subclasses(CogniteResource, exclude={SubscriptionDatapoints, Agent}) + # UnknownWorkflowTaskParameters.dump() is only the inner parameters object + # (see WorkflowTask); it omits task type, so generic CogniteResource + # load/dump round-trips do not apply. + for cls in all_concrete_subclasses( + CogniteResource, + exclude={SubscriptionDatapoints, Agent, UnknownWorkflowTaskParameters}, + ) ], ) def test_dump_load_only_required( @@ -295,7 +305,9 @@ def test_writable_list_as_write( "cog_res_subclass", [ pytest.param(cls, id=f"{cls.__name__} in {cls.__module__}") - for cls in all_concrete_subclasses(CogniteResource, exclude={SubscriptionDatapoints}) + for cls in all_concrete_subclasses( + CogniteResource, exclude={SubscriptionDatapoints, UnknownWorkflowTaskParameters} + ) ], ) def test_load_has_no_side_effects( @@ -325,7 +337,9 @@ def test_load_has_no_side_effects( "cog_res_subclass", [ pytest.param(cls, id=f"{cls.__name__} in {cls.__module__}") - for cls in all_concrete_subclasses(CogniteResource, exclude={SubscriptionDatapoints}) + for cls in all_concrete_subclasses( + CogniteResource, exclude={SubscriptionDatapoints, UnknownWorkflowTaskParameters} + ) ], ) def test_handle_unknown_arguments_when_loading( @@ -368,7 +382,9 @@ def _for_all_nested_dicts(obj: dict, func: Callable[[dict], None]) -> None: "cog_res_subclass", [ pytest.param(cls, id=f"{cls.__name__} in {cls.__module__}") - for cls in all_concrete_subclasses(CogniteResource, exclude={SubscriptionDatapoints}) + for cls in all_concrete_subclasses( + CogniteResource, exclude={SubscriptionDatapoints, UnknownWorkflowTaskParameters} + ) ], ) def test_yaml_serialize( From 3a7d59f2e382e4b6ccef9bdf7764a19913c3efcb Mon Sep 17 00:00:00 2001 From: Sachin Bhagwat Date: Fri, 8 May 2026 16:36:30 +0530 Subject: [PATCH 11/18] Empty commit to retrigger CI Co-authored-by: Cursor From ebf3a2d0c8cdf27a3822e1dbe68a1f031ea43cb5 Mon Sep 17 00:00:00 2001 From: Sachin Bhagwat Date: Fri, 8 May 2026 16:36:58 +0530 Subject: [PATCH 12/18] Empty commit to trigger CI Co-authored-by: Cursor From 19e70c519985dc698aed0ad882bf75f994a3ebaf Mon Sep 17 00:00:00 2001 From: Sachin Bhagwat Date: Fri, 8 May 2026 17:03:11 +0530 Subject: [PATCH 13/18] Type UnknownWorkflowTaskOutput as dict; align workflow tests Co-authored-by: Cursor --- cognite/client/data_classes/workflows.py | 4 ++-- tests/tests_unit/test_data_classes/test_workflows.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/cognite/client/data_classes/workflows.py b/cognite/client/data_classes/workflows.py index 102d2ee75e..c48920807d 100644 --- a/cognite/client/data_classes/workflows.py +++ b/cognite/client/data_classes/workflows.py @@ -706,7 +706,7 @@ def dump(self, camel_case: bool = True) -> dict[str, Any]: class UnknownWorkflowTaskOutput(WorkflowTaskOutput): - def __init__(self, task_type: str, output: Any) -> None: + def __init__(self, task_type: str, output: dict[str, Any]) -> None: self.dynamic_task_type = task_type self._output = output @@ -720,7 +720,7 @@ def load(cls, data: dict[str, Any]) -> Self: def task_type(self) -> ValidTaskType: # type: ignore[override] return cast(ValidTaskType, self.dynamic_task_type) - def dump(self, camel_case: bool = True) -> Any: + def dump(self, camel_case: bool = True) -> dict[str, Any]: if not camel_case: warnings.warn( "camel_case=False is ignored for UnknownWorkflowTaskOutput.dump(); API payloads use camelCase keys.", diff --git a/tests/tests_unit/test_data_classes/test_workflows.py b/tests/tests_unit/test_data_classes/test_workflows.py index bfd0bca90f..717fe6bba3 100644 --- a/tests/tests_unit/test_data_classes/test_workflows.py +++ b/tests/tests_unit/test_data_classes/test_workflows.py @@ -270,10 +270,10 @@ def test_load_output_unknown_task_type(self) -> None: "output_payload", [ {"customField": 99}, - [1, 2, 3], + {"otherKey": "value"}, ], ) - def test_unknown_output_dump_returns_stored_payload(self, output_payload: dict[str, Any] | list[int]) -> None: + def test_unknown_output_dump_returns_stored_payload(self, output_payload: dict[str, Any]) -> None: data: dict[str, Any] = {"taskType": "customWorkflowOutput", "output": output_payload} loaded = WorkflowTaskOutput.load_output(data) assert isinstance(loaded, UnknownWorkflowTaskOutput) From 4dd0d138743e4ae350a0810cf7c6c14587457bd9 Mon Sep 17 00:00:00 2001 From: Sachin Bhagwat Date: Mon, 11 May 2026 11:33:21 +0530 Subject: [PATCH 14/18] Workflows: unknown task output/parameters tests and base test pointer Merge origin/master. Align UnknownWorkflowTaskOutput typing with raw load. Add TestUnknownWorkflowTaskParametersCogniteResourceParity for envelope round-trips. Shorten test_base exclude comment for UnknownWorkflowTaskParameters. Co-authored-by: Cursor --- cognite/client/data_classes/workflows.py | 18 ++--- tests/tests_unit/test_base.py | 7 +- .../test_data_classes/test_workflows.py | 71 +++++++++++++++++++ 3 files changed, 80 insertions(+), 16 deletions(-) diff --git a/cognite/client/data_classes/workflows.py b/cognite/client/data_classes/workflows.py index c48920807d..eb6a5d052b 100644 --- a/cognite/client/data_classes/workflows.py +++ b/cognite/client/data_classes/workflows.py @@ -276,12 +276,7 @@ def __init__(self, dynamic_task_type: str, parameters: dict[str, Any]) -> None: def _load(cls, resource: dict[str, Any]) -> Self: type_ = resource.get("type", resource.get("taskType")) raw_inner = resource.get("parameters", resource.get("input")) - if raw_inner is None: - inner: dict[str, Any] = {} - elif isinstance(raw_inner, dict): - inner = raw_inner - else: - inner = {} + inner = raw_inner if isinstance(raw_inner, dict) else {} return cls(type_ if isinstance(type_, str) else "unknown", inner) @property @@ -706,21 +701,18 @@ def dump(self, camel_case: bool = True) -> dict[str, Any]: class UnknownWorkflowTaskOutput(WorkflowTaskOutput): - def __init__(self, task_type: str, output: dict[str, Any]) -> None: - self.dynamic_task_type = task_type + def __init__(self, output: Any) -> None: self._output = output @classmethod def load(cls, data: dict[str, Any]) -> Self: - task_type = data.get("taskType", "unknown") - raw_output = data.get("output") - return cls(task_type, {} if raw_output is None else raw_output) + return cls(data.get("output")) @property def task_type(self) -> ValidTaskType: # type: ignore[override] - return cast(ValidTaskType, self.dynamic_task_type) + return cast(ValidTaskType, "unknown") - def dump(self, camel_case: bool = True) -> dict[str, Any]: + def dump(self, camel_case: bool = True) -> Any: if not camel_case: warnings.warn( "camel_case=False is ignored for UnknownWorkflowTaskOutput.dump(); API payloads use camelCase keys.", diff --git a/tests/tests_unit/test_base.py b/tests/tests_unit/test_base.py index 168ee7412d..e602d4e48a 100644 --- a/tests/tests_unit/test_base.py +++ b/tests/tests_unit/test_base.py @@ -170,6 +170,10 @@ def cognite_mock_client() -> CogniteClientMock: return CogniteClientMock() +# UnknownWorkflowTaskParameters is excluded from several parametrized CogniteResource tests below +# because dump() returns only the inner "parameters" blob while CogniteResource.load() for that +# class is exercised with a full {"type", "parameters"} envelope (see +# tests.tests_unit.test_data_classes.test_workflows.TestUnknownWorkflowTaskParametersCogniteResourceParity). class TestCogniteResource: @pytest.mark.dsl @pytest.mark.parametrize( @@ -202,9 +206,6 @@ def test_json_serialize( # Agent._load requires runtimeVersion/ownerId (always sent by the API), # but Agent.__init__ keeps them optional for SDK back-compat. The # minimal-args round-trip therefore can't satisfy both contracts. - # UnknownWorkflowTaskParameters.dump() is only the inner parameters object - # (see WorkflowTask); it omits task type, so generic CogniteResource - # load/dump round-trips do not apply. for cls in all_concrete_subclasses( CogniteResource, exclude={SubscriptionDatapoints, Agent, UnknownWorkflowTaskParameters}, diff --git a/tests/tests_unit/test_data_classes/test_workflows.py b/tests/tests_unit/test_data_classes/test_workflows.py index 717fe6bba3..7397717bb9 100644 --- a/tests/tests_unit/test_data_classes/test_workflows.py +++ b/tests/tests_unit/test_data_classes/test_workflows.py @@ -1,10 +1,13 @@ from __future__ import annotations import json +from collections.abc import Callable +from copy import deepcopy from pathlib import Path from typing import Any import pytest +import yaml from cognite.client.data_classes.simulators.runs import SimulationInputOverride from cognite.client.data_classes.workflows import ( @@ -17,6 +20,7 @@ TransformationTaskOutput, TransformationTaskParameters, UnknownWorkflowTaskOutput, + UnknownWorkflowTaskParameters, WorkflowDefinition, WorkflowDefinitionUpsert, WorkflowExecutionDetailed, @@ -260,6 +264,73 @@ def test_serialization(self, raw: dict[str, Any]) -> None: assert loaded.dump() == raw +class TestUnknownWorkflowTaskParametersCogniteResourceParity: + @staticmethod + def _envelope(instance: UnknownWorkflowTaskParameters) -> dict[str, Any]: + return {"type": instance.dynamic_task_type, "parameters": instance.dump(camel_case=True)} + + @staticmethod + def _for_all_nested_dicts(obj: dict[str, Any], func: Callable[[dict[str, Any]], None]) -> None: + to_check: list[Any] = [obj] + while to_check: + case = to_check.pop() + if isinstance(case, dict): + to_check.extend(case.values()) + func(case) + elif isinstance(case, list): + to_check.extend(case) + + @pytest.mark.dsl + def test_json_serialize(self) -> None: + instance = UnknownWorkflowTaskParameters("customWorkflowTaskType", {"k": 1, "nested": {"x": 2}}) + envelope = self._envelope(instance) + loaded = UnknownWorkflowTaskParameters.load(json.dumps(envelope)) + assert loaded.dump(camel_case=True) == instance.dump(camel_case=True) + assert loaded.dynamic_task_type == instance.dynamic_task_type + + @pytest.mark.dsl + def test_dump_load_dict(self) -> None: + instance = UnknownWorkflowTaskParameters("customWorkflowTaskType", {"k": 1}) + envelope = self._envelope(instance) + loaded = UnknownWorkflowTaskParameters.load(envelope) + assert loaded.dump() == instance.dump() + assert loaded.dynamic_task_type == instance.dynamic_task_type + + @pytest.mark.dsl + def test_load_has_no_side_effects(self) -> None: + instance = UnknownWorkflowTaskParameters("customWorkflowTaskType", {"a": 1}) + dumped = self._envelope(instance) + original_dumped = deepcopy(dumped) + _ = UnknownWorkflowTaskParameters.load(dumped) + assert dumped == original_dumped + + @pytest.mark.dsl + def test_handle_unknown_arguments_when_loading(self) -> None: + instance = UnknownWorkflowTaskParameters("customWorkflowTaskType", {"a": 1, "b": {"c": 2}}) + dumped = self._envelope(instance) + + def _add_unknown_key(obj: dict[str, Any]) -> None: + other_value = next(iter(obj.values())) if len(obj) > 0 else None + obj["some-new-unknown-key"] = other_value + + def _remove_unknown_key(obj: dict[str, Any]) -> None: + obj.pop("some-new-unknown-key", None) + + self._for_all_nested_dicts(dumped, _add_unknown_key) + loaded = UnknownWorkflowTaskParameters.load(dumped) + loaded_dump = loaded.dump(camel_case=True) + self._for_all_nested_dicts(loaded_dump, _remove_unknown_key) + assert loaded_dump == instance.dump(camel_case=True) + + @pytest.mark.dsl + def test_yaml_serialize(self) -> None: + instance = UnknownWorkflowTaskParameters("customWorkflowTaskType", {"y": 3}) + envelope = self._envelope(instance) + yaml_serialised = yaml.safe_dump(envelope, sort_keys=False) + loaded = UnknownWorkflowTaskParameters.load(yaml_serialised) + assert loaded.dump(camel_case=True) == instance.dump(camel_case=True) + + class TestWorkflowTaskOutputDispatch: def test_load_output_unknown_task_type(self) -> None: data: dict[str, Any] = {"taskType": "customWorkflowOutput", "output": {"customField": 99}} From 8f4df5ce990a030470c3a2623b868706f09b05cf Mon Sep 17 00:00:00 2001 From: Sachin Bhagwat Date: Mon, 11 May 2026 13:24:47 +0530 Subject: [PATCH 15/18] Fix UnknownWorkflowTaskParameters init; exclude from json serialize test; trim workflow tests Co-authored-by: Cursor --- cognite/client/data_classes/workflows.py | 10 ++- tests/tests_unit/test_base.py | 18 ++--- .../test_data_classes/test_workflows.py | 71 ------------------- 3 files changed, 8 insertions(+), 91 deletions(-) diff --git a/cognite/client/data_classes/workflows.py b/cognite/client/data_classes/workflows.py index eb6a5d052b..65a07b57f1 100644 --- a/cognite/client/data_classes/workflows.py +++ b/cognite/client/data_classes/workflows.py @@ -268,16 +268,14 @@ def dump(self, camel_case: bool = True) -> dict[str, Any]: class UnknownWorkflowTaskParameters(WorkflowTaskParameters): - def __init__(self, dynamic_task_type: str, parameters: dict[str, Any]) -> None: + def __init__(self, dynamic_task_type: Any, parameters: Any) -> None: self.dynamic_task_type = dynamic_task_type self._parameters = parameters @classmethod def _load(cls, resource: dict[str, Any]) -> Self: - type_ = resource.get("type", resource.get("taskType")) - raw_inner = resource.get("parameters", resource.get("input")) - inner = raw_inner if isinstance(raw_inner, dict) else {} - return cls(type_ if isinstance(type_, str) else "unknown", inner) + t = resource.get("type", resource.get("taskType")) + return cls(t, deepcopy(resource)) @property def task_type(self) -> ValidTaskType: # type: ignore[override] @@ -706,7 +704,7 @@ def __init__(self, output: Any) -> None: @classmethod def load(cls, data: dict[str, Any]) -> Self: - return cls(data.get("output")) + return cls(data.get("output", {})) @property def task_type(self) -> ValidTaskType: # type: ignore[override] diff --git a/tests/tests_unit/test_base.py b/tests/tests_unit/test_base.py index e602d4e48a..79757991aa 100644 --- a/tests/tests_unit/test_base.py +++ b/tests/tests_unit/test_base.py @@ -170,10 +170,6 @@ def cognite_mock_client() -> CogniteClientMock: return CogniteClientMock() -# UnknownWorkflowTaskParameters is excluded from several parametrized CogniteResource tests below -# because dump() returns only the inner "parameters" blob while CogniteResource.load() for that -# class is exercised with a full {"type", "parameters"} envelope (see -# tests.tests_unit.test_data_classes.test_workflows.TestUnknownWorkflowTaskParametersCogniteResourceParity). class TestCogniteResource: @pytest.mark.dsl @pytest.mark.parametrize( @@ -208,7 +204,7 @@ def test_json_serialize( # minimal-args round-trip therefore can't satisfy both contracts. for cls in all_concrete_subclasses( CogniteResource, - exclude={SubscriptionDatapoints, Agent, UnknownWorkflowTaskParameters}, + exclude={SubscriptionDatapoints, Agent}, ) ], ) @@ -306,9 +302,7 @@ def test_writable_list_as_write( "cog_res_subclass", [ pytest.param(cls, id=f"{cls.__name__} in {cls.__module__}") - for cls in all_concrete_subclasses( - CogniteResource, exclude={SubscriptionDatapoints, UnknownWorkflowTaskParameters} - ) + for cls in all_concrete_subclasses(CogniteResource, exclude={SubscriptionDatapoints}) ], ) def test_load_has_no_side_effects( @@ -338,9 +332,7 @@ def test_load_has_no_side_effects( "cog_res_subclass", [ pytest.param(cls, id=f"{cls.__name__} in {cls.__module__}") - for cls in all_concrete_subclasses( - CogniteResource, exclude={SubscriptionDatapoints, UnknownWorkflowTaskParameters} - ) + for cls in all_concrete_subclasses(CogniteResource, exclude={SubscriptionDatapoints}) ], ) def test_handle_unknown_arguments_when_loading( @@ -383,9 +375,7 @@ def _for_all_nested_dicts(obj: dict, func: Callable[[dict], None]) -> None: "cog_res_subclass", [ pytest.param(cls, id=f"{cls.__name__} in {cls.__module__}") - for cls in all_concrete_subclasses( - CogniteResource, exclude={SubscriptionDatapoints, UnknownWorkflowTaskParameters} - ) + for cls in all_concrete_subclasses(CogniteResource, exclude={SubscriptionDatapoints}) ], ) def test_yaml_serialize( diff --git a/tests/tests_unit/test_data_classes/test_workflows.py b/tests/tests_unit/test_data_classes/test_workflows.py index 7397717bb9..717fe6bba3 100644 --- a/tests/tests_unit/test_data_classes/test_workflows.py +++ b/tests/tests_unit/test_data_classes/test_workflows.py @@ -1,13 +1,10 @@ from __future__ import annotations import json -from collections.abc import Callable -from copy import deepcopy from pathlib import Path from typing import Any import pytest -import yaml from cognite.client.data_classes.simulators.runs import SimulationInputOverride from cognite.client.data_classes.workflows import ( @@ -20,7 +17,6 @@ TransformationTaskOutput, TransformationTaskParameters, UnknownWorkflowTaskOutput, - UnknownWorkflowTaskParameters, WorkflowDefinition, WorkflowDefinitionUpsert, WorkflowExecutionDetailed, @@ -264,73 +260,6 @@ def test_serialization(self, raw: dict[str, Any]) -> None: assert loaded.dump() == raw -class TestUnknownWorkflowTaskParametersCogniteResourceParity: - @staticmethod - def _envelope(instance: UnknownWorkflowTaskParameters) -> dict[str, Any]: - return {"type": instance.dynamic_task_type, "parameters": instance.dump(camel_case=True)} - - @staticmethod - def _for_all_nested_dicts(obj: dict[str, Any], func: Callable[[dict[str, Any]], None]) -> None: - to_check: list[Any] = [obj] - while to_check: - case = to_check.pop() - if isinstance(case, dict): - to_check.extend(case.values()) - func(case) - elif isinstance(case, list): - to_check.extend(case) - - @pytest.mark.dsl - def test_json_serialize(self) -> None: - instance = UnknownWorkflowTaskParameters("customWorkflowTaskType", {"k": 1, "nested": {"x": 2}}) - envelope = self._envelope(instance) - loaded = UnknownWorkflowTaskParameters.load(json.dumps(envelope)) - assert loaded.dump(camel_case=True) == instance.dump(camel_case=True) - assert loaded.dynamic_task_type == instance.dynamic_task_type - - @pytest.mark.dsl - def test_dump_load_dict(self) -> None: - instance = UnknownWorkflowTaskParameters("customWorkflowTaskType", {"k": 1}) - envelope = self._envelope(instance) - loaded = UnknownWorkflowTaskParameters.load(envelope) - assert loaded.dump() == instance.dump() - assert loaded.dynamic_task_type == instance.dynamic_task_type - - @pytest.mark.dsl - def test_load_has_no_side_effects(self) -> None: - instance = UnknownWorkflowTaskParameters("customWorkflowTaskType", {"a": 1}) - dumped = self._envelope(instance) - original_dumped = deepcopy(dumped) - _ = UnknownWorkflowTaskParameters.load(dumped) - assert dumped == original_dumped - - @pytest.mark.dsl - def test_handle_unknown_arguments_when_loading(self) -> None: - instance = UnknownWorkflowTaskParameters("customWorkflowTaskType", {"a": 1, "b": {"c": 2}}) - dumped = self._envelope(instance) - - def _add_unknown_key(obj: dict[str, Any]) -> None: - other_value = next(iter(obj.values())) if len(obj) > 0 else None - obj["some-new-unknown-key"] = other_value - - def _remove_unknown_key(obj: dict[str, Any]) -> None: - obj.pop("some-new-unknown-key", None) - - self._for_all_nested_dicts(dumped, _add_unknown_key) - loaded = UnknownWorkflowTaskParameters.load(dumped) - loaded_dump = loaded.dump(camel_case=True) - self._for_all_nested_dicts(loaded_dump, _remove_unknown_key) - assert loaded_dump == instance.dump(camel_case=True) - - @pytest.mark.dsl - def test_yaml_serialize(self) -> None: - instance = UnknownWorkflowTaskParameters("customWorkflowTaskType", {"y": 3}) - envelope = self._envelope(instance) - yaml_serialised = yaml.safe_dump(envelope, sort_keys=False) - loaded = UnknownWorkflowTaskParameters.load(yaml_serialised) - assert loaded.dump(camel_case=True) == instance.dump(camel_case=True) - - class TestWorkflowTaskOutputDispatch: def test_load_output_unknown_task_type(self) -> None: data: dict[str, Any] = {"taskType": "customWorkflowOutput", "output": {"customField": 99}} From acd49438cec61d7f6d91cdbaa796f75ba0e395c5 Mon Sep 17 00:00:00 2001 From: Sachin Bhagwat Date: Mon, 11 May 2026 17:22:37 +0530 Subject: [PATCH 16/18] fix(workflows): unknown task parameters dump/load and load_parameters wiring Wrap non-dict payloads in an opaque dict so CogniteResource.load round-trips. Build unknown parameters from task type plus parameters blob, not full task dict. Narrow dynamic_task_type to str | None; document opaque wrap/unwrap in comments. Keep UnknownWorkflowTaskParameters in generic CogniteResource base tests. Co-authored-by: Cursor --- cognite/client/data_classes/workflows.py | 18 +++++++++++++++--- tests/tests_unit/test_base.py | 3 +-- 2 files changed, 16 insertions(+), 5 deletions(-) diff --git a/cognite/client/data_classes/workflows.py b/cognite/client/data_classes/workflows.py index 65a07b57f1..89bdb4ff3b 100644 --- a/cognite/client/data_classes/workflows.py +++ b/cognite/client/data_classes/workflows.py @@ -189,7 +189,7 @@ def load_parameters(cls, data: dict) -> WorkflowTaskParameters: elif type_ == "simulation": return SimulationTaskParameters._load(parameters) else: - return UnknownWorkflowTaskParameters._load(data) + return UnknownWorkflowTaskParameters(type_, deepcopy(parameters)) class FunctionTaskParameters(WorkflowTaskParameters): @@ -268,12 +268,21 @@ def dump(self, camel_case: bool = True) -> dict[str, Any]: class UnknownWorkflowTaskParameters(WorkflowTaskParameters): - def __init__(self, dynamic_task_type: Any, parameters: Any) -> None: + # CogniteResource.load parses strings to a dict before _load; a bare scalar from dump() + # would still parse to a non-dict and fail. Wrapping non-dict payloads keeps JSON/YAML + # round-trips (e.g. test_base) valid while still storing the original value in _parameters. + _OPAQUE_WRAPPER_KEY: ClassVar[str] = "__cogniteSdkUnknownWorkflowTaskParametersOpaque__" + + def __init__(self, dynamic_task_type: str | None, parameters: Any) -> None: self.dynamic_task_type = dynamic_task_type self._parameters = parameters @classmethod def _load(cls, resource: dict[str, Any]) -> Self: + # dump() wraps non-dict _parameters in a one-key dict so load() always receives a dict; + # here we reverse that and store the inner value again (no task type on that path). + if len(resource) == 1 and next(iter(resource), None) == cls._OPAQUE_WRAPPER_KEY: + return cls(None, deepcopy(resource[cls._OPAQUE_WRAPPER_KEY])) t = resource.get("type", resource.get("taskType")) return cls(t, deepcopy(resource)) @@ -288,7 +297,10 @@ def dump(self, camel_case: bool = True) -> dict[str, Any]: UserWarning, stacklevel=2, ) - return deepcopy(self._parameters) + p = deepcopy(self._parameters) + if isinstance(p, dict): + return p + return {self._OPAQUE_WRAPPER_KEY: p} _SIMULATORS_WARNING = FeaturePreviewWarning( diff --git a/tests/tests_unit/test_base.py b/tests/tests_unit/test_base.py index 79757991aa..a31af12ffe 100644 --- a/tests/tests_unit/test_base.py +++ b/tests/tests_unit/test_base.py @@ -46,7 +46,6 @@ from cognite.client.data_classes.postgres_gateway import TableList, User, UserCreated, UserCreatedList, UserList from cognite.client.data_classes.sequences import SequenceUpdate from cognite.client.data_classes.time_series import TimeSeries, TimeSeriesList -from cognite.client.data_classes.workflows import UnknownWorkflowTaskParameters from cognite.client.exceptions import CogniteMissingClientError from cognite.client.testing import CogniteClientMock from cognite.client.utils import _json_extended as _json @@ -178,7 +177,7 @@ class TestCogniteResource: pytest.param(cls, id=f"{cls.__name__} in {cls.__module__}") for cls in all_concrete_subclasses( CogniteResource, - exclude={SyntheticDatapoints, SubscriptionDatapoints, UnknownWorkflowTaskParameters}, + exclude={SyntheticDatapoints, SubscriptionDatapoints}, ) ], ) From 28353ac9e836a659a2ca54f6a47241ba87e46f23 Mon Sep 17 00:00:00 2001 From: Sachin Bhagwat Date: Tue, 12 May 2026 10:49:11 +0530 Subject: [PATCH 17/18] chore(workflows): drop functionApp from ValidTaskType literal Co-authored-by: Cursor --- cognite/client/data_classes/workflows.py | 1 - 1 file changed, 1 deletion(-) diff --git a/cognite/client/data_classes/workflows.py b/cognite/client/data_classes/workflows.py index 89bdb4ff3b..65e4917955 100644 --- a/cognite/client/data_classes/workflows.py +++ b/cognite/client/data_classes/workflows.py @@ -157,7 +157,6 @@ def as_write(self) -> WorkflowUpsertList: "dynamic", "subworkflow", "simulation", - "functionApp", ] From 63ad6ab12c0c7168b2772b2e06ed1bab3f390111 Mon Sep 17 00:00:00 2001 From: Sachin Bhagwat Date: Tue, 12 May 2026 11:53:13 +0530 Subject: [PATCH 18/18] test: restore test_base.py to match master UnknownWorkflowTaskParameters round-trips are covered by workflows opaque wrapper. Co-authored-by: Cursor --- tests/tests_unit/test_base.py | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/tests/tests_unit/test_base.py b/tests/tests_unit/test_base.py index a31af12ffe..eb54d0307c 100644 --- a/tests/tests_unit/test_base.py +++ b/tests/tests_unit/test_base.py @@ -175,10 +175,7 @@ class TestCogniteResource: "cog_res_subclass", [ pytest.param(cls, id=f"{cls.__name__} in {cls.__module__}") - for cls in all_concrete_subclasses( - CogniteResource, - exclude={SyntheticDatapoints, SubscriptionDatapoints}, - ) + for cls in all_concrete_subclasses(CogniteResource, exclude={SyntheticDatapoints, SubscriptionDatapoints}) ], ) def test_json_serialize( @@ -201,10 +198,7 @@ def test_json_serialize( # Agent._load requires runtimeVersion/ownerId (always sent by the API), # but Agent.__init__ keeps them optional for SDK back-compat. The # minimal-args round-trip therefore can't satisfy both contracts. - for cls in all_concrete_subclasses( - CogniteResource, - exclude={SubscriptionDatapoints, Agent}, - ) + for cls in all_concrete_subclasses(CogniteResource, exclude={SubscriptionDatapoints, Agent}) ], ) def test_dump_load_only_required(