From 3260c0967012c3bd2d6eee16f9a00f4211ba33f8 Mon Sep 17 00:00:00 2001 From: Aditya Puri Date: Sun, 26 Apr 2026 12:51:42 +0000 Subject: [PATCH 1/3] feat(ingestion): add Kestra pipeline connector Adds a new pipeline-service connector for Kestra (kestra.io), closing part of #26656. Maps Kestra flows to OM Pipelines, executions to PipelineStatus, and Flow-trigger relationships to inter-pipeline lineage edges. Highlights: - Static DAG sourced from Kestra's /flows/{ns}/{id}/graph endpoint; transitive closure across synthetic GraphCluster nodes flattens flowable Parallel/Sequential/ForEach to clean OM tasks. - Tenant-aware client supports both pre-tenant Kestra (< 0.18) and tenant-scoped (0.18+) APIs via configurable tenantId. - Three auth modes: no-auth (default), basic, bearer token. - Filter-pattern support; disabled flows skipped. Tests: 14 unit tests (5 source + 9 client) against fixtures captured from a live Kestra v0.20.5 instance. --- ingestion/setup.py | 1 + .../source/pipeline/kestra/__init__.py | 0 .../source/pipeline/kestra/client.py | 150 ++ .../source/pipeline/kestra/connection.py | 65 + .../source/pipeline/kestra/metadata.py | 362 +++++ .../source/pipeline/kestra/models.py | 135 ++ .../source/pipeline/kestra/service_spec.py | 4 + .../resources/datasets/kestra_dataset.json | 1340 +++++++++++++++++ .../unit/topology/pipeline/test_kestra.py | 175 +++ .../topology/pipeline/test_kestra_client.py | 125 ++ .../pipeline/kestraConnection.json | 71 + .../entity/services/pipelineService.json | 9 +- 12 files changed, 2436 insertions(+), 1 deletion(-) create mode 100644 ingestion/src/metadata/ingestion/source/pipeline/kestra/__init__.py create mode 100644 ingestion/src/metadata/ingestion/source/pipeline/kestra/client.py create mode 100644 ingestion/src/metadata/ingestion/source/pipeline/kestra/connection.py create mode 100644 ingestion/src/metadata/ingestion/source/pipeline/kestra/metadata.py create mode 100644 ingestion/src/metadata/ingestion/source/pipeline/kestra/models.py create mode 100644 ingestion/src/metadata/ingestion/source/pipeline/kestra/service_spec.py create mode 100644 ingestion/tests/unit/resources/datasets/kestra_dataset.json create mode 100644 ingestion/tests/unit/topology/pipeline/test_kestra.py create mode 100644 ingestion/tests/unit/topology/pipeline/test_kestra_client.py create mode 100644 openmetadata-spec/src/main/resources/json/schema/entity/services/connections/pipeline/kestraConnection.json diff --git a/ingestion/setup.py b/ingestion/setup.py index 5f48df5bcd2a..f0ad21d20f70 100644 --- a/ingestion/setup.py +++ b/ingestion/setup.py @@ -222,6 +222,7 @@ VERSIONS["geoalchemy2"], "dagster_graphql>=1.8.0", }, + "kestra": set(), # Uses base requests; no extra deps "dbt": { "google-cloud", VERSIONS["boto3"], diff --git a/ingestion/src/metadata/ingestion/source/pipeline/kestra/__init__.py b/ingestion/src/metadata/ingestion/source/pipeline/kestra/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/ingestion/src/metadata/ingestion/source/pipeline/kestra/client.py b/ingestion/src/metadata/ingestion/source/pipeline/kestra/client.py new file mode 100644 index 000000000000..351804afae8e --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/pipeline/kestra/client.py @@ -0,0 +1,150 @@ +# Copyright 2025 Collate +# Licensed under the Collate Community License, Version 1.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +REST client wrapper over Kestra's open-source API. + +Supports both pre-tenant Kestra (`/api/v1/...`, < 0.18) and tenant-scoped +Kestra (`/api/v1/{tenantId}/...`, 0.18+). When `tenantId` is empty or None +the client uses the non-tenant path, matching older deployments. +""" + +from typing import Iterator, Optional + +import requests + +from metadata.generated.schema.entity.services.connections.pipeline.kestraConnection import ( + KestraConnection, +) +from metadata.ingestion.source.pipeline.kestra.models import ( + KestraExecution, + KestraFlow, + KestraGraph, + KestraSearchResult, +) +from metadata.utils.helpers import clean_uri +from metadata.utils.logger import ometa_logger + +logger = ometa_logger() + +DEFAULT_PAGE_SIZE = 200 +DEFAULT_TIMEOUT = 30 + + +class KestraClient: + """ + Wrapper to Kestra REST API. + + Auth precedence: token > basic > no-auth. Token wins if both are set. + """ + + def __init__(self, config: KestraConnection): + self.base = clean_uri(str(config.hostPort)) + tenant = (config.tenantId or "").strip() + self.tenant_id = tenant if tenant else None + self.timeout = DEFAULT_TIMEOUT + self._session = requests.Session() + self._session.verify = ( + bool(config.verifySSL) if config.verifySSL is not None else True + ) + token_val = self._secret(config.token) + password_val = self._secret(config.password) + if token_val: + self._session.headers["Authorization"] = f"Bearer {token_val}" + elif config.username and password_val: + self._session.auth = (config.username, password_val) + + @staticmethod + def _secret(field): + if field is None: + return None + if hasattr(field, "get_secret_value"): + return field.get_secret_value() + return str(field) + + def _url(self, path: str) -> str: + if self.tenant_id: + return f"{self.base}/api/v1/{self.tenant_id}{path}" + return f"{self.base}/api/v1{path}" + + def _get_json(self, path: str, **params) -> dict: + resp = self._session.get(self._url(path), params=params, timeout=self.timeout) + resp.raise_for_status() + return resp.json() + + # --- flows --- + + def search_flows( + self, + namespace: Optional[str] = None, + page_size: int = DEFAULT_PAGE_SIZE, + ) -> Iterator[KestraFlow]: + page = 1 + while True: + params = {"page": page, "size": page_size, "sort": "id:asc"} + if namespace: + params["namespace"] = namespace + data = self._get_json("/flows/search", **params) + parsed = KestraSearchResult.model_validate(data) + if not parsed.results: + break + for item in parsed.results: + yield KestraFlow.model_validate(item) + if len(parsed.results) < page_size: + break + page += 1 + + def get_flow(self, namespace: str, flow_id: str) -> KestraFlow: + data = self._get_json(f"/flows/{namespace}/{flow_id}") + return KestraFlow.model_validate(data) + + def get_flow_graph(self, namespace: str, flow_id: str) -> KestraGraph: + data = self._get_json(f"/flows/{namespace}/{flow_id}/graph") + return KestraGraph.model_validate(data) + + # --- executions --- + + def search_executions( + self, + namespace: Optional[str] = None, + flow_id: Optional[str] = None, + page_size: int = 50, + max_pages: int = 5, + ) -> Iterator[KestraExecution]: + page = 1 + while page <= max_pages: + params = {"page": page, "size": page_size, "sort": "state.startDate:desc"} + if namespace: + params["namespace"] = namespace + if flow_id: + params["flowId"] = flow_id + data = self._get_json("/executions/search", **params) + parsed = KestraSearchResult.model_validate(data) + if not parsed.results: + break + for item in parsed.results: + yield KestraExecution.model_validate(item) + if len(parsed.results) < page_size: + break + page += 1 + + def get_execution(self, execution_id: str) -> KestraExecution: + data = self._get_json(f"/executions/{execution_id}") + return KestraExecution.model_validate(data) + + def ping(self) -> bool: + """Cheap liveness probe used by test_connection.""" + resp = self._session.get( + self._url("/flows/search"), + params={"size": 1}, + timeout=self.timeout, + ) + resp.raise_for_status() + return True diff --git a/ingestion/src/metadata/ingestion/source/pipeline/kestra/connection.py b/ingestion/src/metadata/ingestion/source/pipeline/kestra/connection.py new file mode 100644 index 000000000000..dea9149adf2d --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/pipeline/kestra/connection.py @@ -0,0 +1,65 @@ +# Copyright 2025 Collate +# Licensed under the Collate Community License, Version 1.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Source connection handler +""" +from typing import Optional + +from metadata.generated.schema.entity.automations.workflow import ( + Workflow as AutomationWorkflow, +) +from metadata.generated.schema.entity.services.connections.pipeline.kestraConnection import ( + KestraConnection, +) +from metadata.generated.schema.entity.services.connections.testConnectionResult import ( + TestConnectionResult, +) +from metadata.ingestion.connections.test_connections import test_connection_steps +from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.ingestion.source.pipeline.kestra.client import KestraClient +from metadata.utils.constants import THREE_MIN + + +def get_connection(connection: KestraConnection) -> KestraClient: + """ + Create connection + """ + return KestraClient(connection) + + +def test_connection( + metadata: OpenMetadata, + client: KestraClient, + service_connection: KestraConnection, + automation_workflow: Optional[AutomationWorkflow] = None, + timeout_seconds: Optional[int] = THREE_MIN, +) -> TestConnectionResult: + """ + Test connection. This can be executed either as part of a metadata + workflow or during an Automation Workflow. + """ + + def _list_flows() -> None: + next(iter(client.search_flows(page_size=1)), None) + + test_fn = {"GetPipelines": _list_flows} + + service_type = getattr(service_connection.type, "value", None) or str( + service_connection.type + ) + + return test_connection_steps( + metadata=metadata, + test_fn=test_fn, + service_type=service_type, + automation_workflow=automation_workflow, + timeout_seconds=timeout_seconds, + ) diff --git a/ingestion/src/metadata/ingestion/source/pipeline/kestra/metadata.py b/ingestion/src/metadata/ingestion/source/pipeline/kestra/metadata.py new file mode 100644 index 000000000000..ab4eb32c991d --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/pipeline/kestra/metadata.py @@ -0,0 +1,362 @@ +# Copyright 2025 Collate +# Licensed under the Collate Community License, Version 1.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Kestra source module — ingests Kestra flows, executions, and lineage +into OpenMetadata. +""" +import traceback +from typing import Any, Iterable, Iterator, List, Optional + +from metadata.generated.schema.api.data.createPipeline import CreatePipelineRequest +from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest +from metadata.generated.schema.entity.data.pipeline import ( + Pipeline, + PipelineStatus, + StatusType, + Task, + TaskStatus, +) +from metadata.generated.schema.entity.services.connections.pipeline.kestraConnection import ( + KestraConnection, +) +from metadata.generated.schema.entity.services.ingestionPipelines.status import ( + StackTraceError, +) +from metadata.generated.schema.metadataIngestion.workflow import ( + Source as WorkflowSource, +) +from metadata.generated.schema.type.basic import ( + EntityName, + Markdown, + SourceUrl, + Timestamp, +) +from metadata.generated.schema.type.entityLineage import ( + EntitiesEdge, + LineageDetails, + Source as LineageSource, +) +from metadata.generated.schema.type.entityReference import EntityReference +from metadata.ingestion.api.models import Either +from metadata.ingestion.api.steps import InvalidSourceException +from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus +from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.ingestion.source.pipeline.kestra.client import KestraClient +from metadata.ingestion.source.pipeline.kestra.models import ( + KestraExecution, + KestraFlow, + KestraGraph, +) +from metadata.ingestion.source.pipeline.pipeline_service import PipelineServiceSource +from metadata.utils.filters import filter_by_pipeline +from metadata.utils.logger import ingestion_logger + +logger = ingestion_logger() + + +# Kestra State -> OM StatusType +STATE_MAP = { + "SUCCESS": StatusType.Successful, + "WARNING": StatusType.Successful, + "FAILED": StatusType.Failed, + "KILLED": StatusType.Failed, + "CANCELLED": StatusType.Failed, + "RETRIED": StatusType.Skipped, +} + + +def _map_state(state_current: Optional[str]) -> StatusType: + if not state_current: + return StatusType.Pending + return STATE_MAP.get(state_current.upper(), StatusType.Pending) + + +def _flow_fqn(flow: KestraFlow) -> str: + return f"{flow.namespace}.{flow.id}" + + +def _ms(dt) -> Optional[int]: + if dt is None: + return None + return int(dt.timestamp() * 1000) + + +class KestraSource(PipelineServiceSource): + """Ingest Kestra flow + execution metadata into OpenMetadata.""" + + config: WorkflowSource + + @classmethod + def create( + cls, + config_dict: dict, + metadata: OpenMetadata, + pipeline_name: Optional[str] = None, + ) -> "KestraSource": + config: WorkflowSource = WorkflowSource.model_validate(config_dict) + connection: KestraConnection = config.serviceConnection.root.config + if not isinstance(connection, KestraConnection): + raise InvalidSourceException( + f"Expected KestraConnection, got {type(connection).__name__}" + ) + return cls(config, metadata) + + # ---------- required: list pipeline_details ---------- + + def get_pipelines_list(self) -> Iterable[KestraFlow]: + try: + for flow in self.client.search_flows(): + if flow.disabled: + logger.debug("Skipping disabled flow %s", _flow_fqn(flow)) + continue + if filter_by_pipeline( + self.source_config.pipelineFilterPattern, _flow_fqn(flow) + ): + self.status.filter(_flow_fqn(flow), "Filtered out") + continue + yield flow + except Exception as exc: # noqa: BLE001 + logger.warning("Kestra list flows failed: %s", exc) + logger.debug(traceback.format_exc()) + + def get_pipeline_name(self, pipeline_details: KestraFlow) -> str: + return _flow_fqn(pipeline_details) + + # ---------- required: emit Pipeline + Status + Lineage ---------- + + def yield_pipeline( + self, pipeline_details: KestraFlow + ) -> Iterator[Either[CreatePipelineRequest]]: + try: + graph = self.client.get_flow_graph( + pipeline_details.namespace, pipeline_details.id + ) + tasks = self._tasks_from_graph(graph) + service_name = self.context.get().pipeline_service + host_port = str(self.service_connection.hostPort).rstrip("/") + source_url = ( + f"{host_port}/ui/flows/edit/" + f"{pipeline_details.namespace}/{pipeline_details.id}" + ) + + request = CreatePipelineRequest( + name=EntityName(pipeline_details.id), + displayName=pipeline_details.id, + description=( + Markdown(pipeline_details.description) + if pipeline_details.description + else None + ), + sourceUrl=SourceUrl(source_url), + tasks=tasks, + scheduleInterval=self._schedule_from_triggers( + pipeline_details.triggers + ), + service=service_name, + ) + yield Either(right=request) + except Exception as exc: # noqa: BLE001 + yield Either( + left=self._error( + f"Error ingesting Kestra flow {_flow_fqn(pipeline_details)}", exc + ) + ) + + def yield_pipeline_status( + self, pipeline_details: KestraFlow + ) -> Iterator[Either[OMetaPipelineStatus]]: + try: + summaries = list( + self.client.search_executions( + namespace=pipeline_details.namespace, + flow_id=pipeline_details.id, + page_size=50, + max_pages=1, + ) + ) + except Exception as exc: # noqa: BLE001 + yield Either( + left=self._error( + f"List executions failed for {_flow_fqn(pipeline_details)}", exc + ) + ) + return + + pipeline_fqn = self._pipeline_fqn(pipeline_details) + for summary in summaries: + try: + detail = self.client.get_execution(summary.id) + ts = _ms(detail.state.startDate or detail.state.endDate) + pipeline_status = PipelineStatus( + timestamp=Timestamp(ts) if ts else None, + executionStatus=_map_state(detail.state.current), + taskStatus=self._task_statuses(detail), + ) + yield Either( + right=OMetaPipelineStatus( + pipeline_fqn=pipeline_fqn, pipeline_status=pipeline_status + ) + ) + except Exception as exc: # noqa: BLE001 + yield Either( + left=self._error( + f"Status for execution {summary.id} failed", exc + ) + ) + + def yield_pipeline_lineage_details( + self, pipeline_details: KestraFlow + ) -> Iterator[Either[AddLineageRequest]]: + triggers = pipeline_details.triggers or [] + if not triggers: + return + + this_fqn = self._pipeline_fqn(pipeline_details) + this_pipe = self.metadata.get_by_name(entity=Pipeline, fqn=this_fqn) + if not this_pipe: + return + + service_name = self.context.get().pipeline_service + for trig in triggers: + t = trig.model_dump(by_alias=True) if hasattr(trig, "model_dump") else dict(trig) + if not str(t.get("type", "")).endswith("trigger.Flow"): + continue + for cond in t.get("conditions") or []: + ctype = str(cond.get("type", "")) + if not ctype.endswith("ExecutionFlowCondition"): + continue + up_ns = cond.get("namespace") + up_id = cond.get("flowId") + if not (up_ns and up_id): + continue + up_fqn = f"{service_name}.{up_ns}.{up_id}" + up_pipe = self.metadata.get_by_name(entity=Pipeline, fqn=up_fqn) + if not up_pipe: + continue + try: + yield Either( + right=AddLineageRequest( + edge=EntitiesEdge( + fromEntity=EntityReference( + id=up_pipe.id, + type="pipeline", + ), + toEntity=EntityReference( + id=this_pipe.id, + type="pipeline", + ), + lineageDetails=LineageDetails( + source=LineageSource.PipelineLineage, + description=f"Kestra Flow trigger: {t.get('id')}", + ), + ) + ) + ) + except Exception as exc: # noqa: BLE001 + yield Either( + left=self._error( + f"Lineage edge {up_fqn} -> {this_fqn} failed", exc + ) + ) + + # ---------- helpers ---------- + + @staticmethod + def _tasks_from_graph(graph: KestraGraph) -> List[Task]: + """ + Convert a Kestra /graph response (nodes + edges, including synthetic + cluster wrapper nodes) to a flat list of OM `Task` entities with + downstream task ids. + + Kestra emits synthetic `GraphClusterRoot` / `GraphClusterEnd` / + `GraphTrigger` nodes (no `task.id`) around real tasks. We skip them and + compute transitive closure across them so a real task's downstream set + contains real-task ids only. + """ + adj: dict[str, list[str]] = {} + for edge in graph.edges: + adj.setdefault(edge.source, []).append(edge.target) + + # uid -> task id for real tasks (synthetic nodes excluded). + uid_to_taskid: dict[str, str] = {} + for n in graph.nodes: + if n.task and n.task.id: + uid_to_taskid[n.uid] = n.task.id + + def downstream_tasks(start_uid: str) -> List[str]: + """BFS through synthetic nodes; stop at each real task encountered.""" + seen: set[str] = set() + out: List[str] = [] + queue = list(adj.get(start_uid, [])) + while queue: + uid = queue.pop(0) + if uid in seen: + continue + seen.add(uid) + tid = uid_to_taskid.get(uid) + if tid: + if tid != uid_to_taskid.get(start_uid) and tid not in out: + out.append(tid) + # don't traverse past a real task + continue + queue.extend(adj.get(uid, [])) + return out + + tasks: List[Task] = [] + emitted: set[str] = set() + for node in graph.nodes: + task_id = uid_to_taskid.get(node.uid) + if not task_id or task_id in emitted: + continue + emitted.add(task_id) + tasks.append( + Task( + name=task_id, + displayName=task_id, + taskType=node.task.type if node.task else None, + downstreamTasks=downstream_tasks(node.uid) or None, + ) + ) + return tasks + + @staticmethod + def _schedule_from_triggers(triggers: Optional[list]) -> Optional[str]: + if not triggers: + return None + for trig in triggers: + t = trig.model_dump(by_alias=True) if hasattr(trig, "model_dump") else dict(trig) + if str(t.get("type", "")).endswith("Schedule") and t.get("cron"): + return str(t["cron"]) + return None + + def _task_statuses(self, detail: KestraExecution) -> List[TaskStatus]: + rows: List[TaskStatus] = [] + for tr in detail.taskRunList or []: + ts = _ms(tr.state.startDate or tr.timestamp) + rows.append( + TaskStatus( + name=tr.taskId, + executionStatus=_map_state(tr.state.current), + startTime=Timestamp(ts) if ts else None, + ) + ) + return rows + + def _pipeline_fqn(self, flow: KestraFlow) -> str: + service_name = self.context.get().pipeline_service + return f"{service_name}.{flow.namespace}.{flow.id}" + + def _error(self, msg: str, exc: Exception) -> StackTraceError: + logger.warning("%s: %s", msg, exc) + logger.debug(traceback.format_exc()) + return StackTraceError( + name=msg, error=str(exc), stackTrace=traceback.format_exc() + ) diff --git a/ingestion/src/metadata/ingestion/source/pipeline/kestra/models.py b/ingestion/src/metadata/ingestion/source/pipeline/kestra/models.py new file mode 100644 index 000000000000..66c436fa07d6 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/pipeline/kestra/models.py @@ -0,0 +1,135 @@ +# Copyright 2025 Collate +# Licensed under the Collate Community License, Version 1.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Pydantic models for Kestra REST payloads consumed by the connector. +""" + +from datetime import datetime +from typing import Any, List, Optional + +from pydantic import BaseModel, ConfigDict, Field + + +class KestraLabel(BaseModel): + model_config = ConfigDict(extra="allow") + + key: str + value: Optional[str] = None + + +class KestraTrigger(BaseModel): + """A trigger on a flow: schedule, webhook, or flow-of-flow.""" + + model_config = ConfigDict(extra="allow") + + id: str + type: str + cron: Optional[str] = None + conditions: Optional[List[dict]] = None + + +class KestraTask(BaseModel): + """A task as it appears nested in flow.tasks[] (includes flowable children).""" + + model_config = ConfigDict(extra="allow") + + id: str + type: str + description: Optional[str] = None + tasks: Optional[List["KestraTask"]] = None + then: Optional[List["KestraTask"]] = None + else_: Optional[List["KestraTask"]] = Field(default=None, alias="else") + + +class KestraFlow(BaseModel): + model_config = ConfigDict(extra="allow") + + id: str + namespace: str + revision: Optional[int] = None + description: Optional[str] = None + labels: Optional[List[KestraLabel]] = None + disabled: Optional[bool] = False + tasks: List[KestraTask] = Field(default_factory=list) + triggers: Optional[List[KestraTrigger]] = None + + +class KestraGraphTask(BaseModel): + model_config = ConfigDict(extra="allow") + + id: Optional[str] = None + type: Optional[str] = None + + +class KestraGraphNode(BaseModel): + """Node in the /graph endpoint.""" + + model_config = ConfigDict(extra="allow") + + uid: str + type: Optional[str] = None + task: Optional[KestraGraphTask] = None + + +class KestraGraphEdge(BaseModel): + model_config = ConfigDict(extra="allow") + + source: str + target: str + relation: Optional[dict] = None + + +class KestraGraph(BaseModel): + model_config = ConfigDict(extra="allow") + + nodes: List[KestraGraphNode] = Field(default_factory=list) + edges: List[KestraGraphEdge] = Field(default_factory=list) + + +class KestraTaskRunState(BaseModel): + model_config = ConfigDict(extra="allow") + + current: str + startDate: Optional[datetime] = None + endDate: Optional[datetime] = None + + +class KestraTaskRun(BaseModel): + model_config = ConfigDict(extra="allow") + + id: str + taskId: str + state: KestraTaskRunState + timestamp: Optional[datetime] = None + attempts: Optional[List[dict]] = None + + +class KestraExecution(BaseModel): + model_config = ConfigDict(extra="allow") + + id: str + namespace: str + flowId: str + flowRevision: Optional[int] = None + state: KestraTaskRunState + taskRunList: Optional[List[KestraTaskRun]] = None + + +class KestraSearchResult(BaseModel): + """Generic envelope used by all /search endpoints.""" + + model_config = ConfigDict(extra="allow") + + results: List[Any] = Field(default_factory=list) + total: int = 0 + + +KestraTask.model_rebuild() diff --git a/ingestion/src/metadata/ingestion/source/pipeline/kestra/service_spec.py b/ingestion/src/metadata/ingestion/source/pipeline/kestra/service_spec.py new file mode 100644 index 000000000000..18ea64a174ad --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/pipeline/kestra/service_spec.py @@ -0,0 +1,4 @@ +from metadata.ingestion.source.pipeline.kestra.metadata import KestraSource +from metadata.utils.service_spec import BaseSpec + +ServiceSpec = BaseSpec(metadata_source_class=KestraSource) diff --git a/ingestion/tests/unit/resources/datasets/kestra_dataset.json b/ingestion/tests/unit/resources/datasets/kestra_dataset.json new file mode 100644 index 000000000000..48ee056812e6 --- /dev/null +++ b/ingestion/tests/unit/resources/datasets/kestra_dataset.json @@ -0,0 +1,1340 @@ +{ + "flows": { + "results": [ + { + "id": "dwh-and-analytics", + "namespace": "tutorial", + "revision": 1, + "disabled": false, + "deleted": false, + "description": "Data Warehouse and Analytics", + "tasks": [ + { + "id": "dbt", + "type": "io.kestra.plugin.core.flow.WorkingDirectory", + "tasks": [ + { + "id": "clone_repository", + "type": "io.kestra.plugin.git.Clone", + "url": "https://github.com/kestra-io/dbt-demo", + "branch": "main" + }, + { + "id": "dbt_build", + "type": "io.kestra.plugin.dbt.cli.DbtCLI", + "taskRunner": { + "type": "io.kestra.plugin.scripts.runner.docker.Docker" + }, + "commands": [ + "dbt deps", + "dbt build" + ], + "profiles": "jaffle_shop:\n outputs:\n dev:\n type: duckdb\n path: dbt.duckdb\n extensions: \n - parquet\n fixed_retries: 1\n threads: 16\n timeout_seconds: 300\n target: dev \n", + "containerImage": "ghcr.io/kestra-io/dbt-duckdb:latest" + }, + { + "id": "python", + "type": "io.kestra.plugin.scripts.python.Script", + "taskRunner": { + "type": "io.kestra.plugin.scripts.runner.docker.Docker" + }, + "outputFiles": [ + "*.csv" + ], + "containerImage": "ghcr.io/kestra-io/duckdb:latest", + "script": "import duckdb\nimport pandas as pd\n\nconn = duckdb.connect(database='dbt.duckdb', read_only=False)\n\ntables_query = \"SELECT table_name FROM information_schema.tables WHERE table_schema = 'main';\"\n\ntables = conn.execute(tables_query).fetchall()\n\n# Export each table to CSV, excluding tables that start with 'raw' or\n'stg'\n\nfor table_name in tables:\n table_name = table_name[0]\n # Skip tables with names starting with 'raw' or 'stg'\n if not table_name.startswith('raw') and not table_name.startswith('stg'):\n query = f\"SELECT * FROM {table_name}\"\n df = conn.execute(query).fetchdf()\n df.to_csv(f\"{table_name}.csv\", index=False)\n\nconn.close()\n" + } + ] + } + ] + }, + { + "id": "file-processing", + "namespace": "tutorial", + "revision": 1, + "disabled": false, + "deleted": false, + "description": "File Processing", + "variables": { + "file_id": 202403 + }, + "tasks": [ + { + "id": "get_zipfile", + "type": "io.kestra.plugin.core.http.Download", + "uri": "https://divvy-tripdata.s3.amazonaws.com/{{ render(vars.file_id) }}-divvy-tripdata.zip" + }, + { + "id": "unzip", + "type": "io.kestra.plugin.compress.ArchiveDecompress", + "algorithm": "ZIP", + "from": "{{ outputs.get_zipfile.uri }}" + }, + { + "id": "csv_to_ion", + "type": "io.kestra.plugin.serdes.csv.CsvToIon", + "from": "{{outputs.unzip.files[render(vars.file_id) ~ '-divvy-tripdata.csv']}}" + }, + { + "id": "to_parquet", + "type": "io.kestra.plugin.serdes.avro.IonToAvro", + "schema": "{\n \"type\": \"record\",\n \"name\": \"Ride\",\n \"namespace\": \"com.example.bikeshare\",\n \"fields\": [\n {\"name\": \"ride_id\", \"type\": \"string\"},\n {\"name\": \"rideable_type\", \"type\": \"string\"},\n {\"name\": \"started_at\", \"type\": {\"type\": \"long\", \"logicalType\": \"timestamp-millis\"}},\n {\"name\": \"ended_at\", \"type\": {\"type\": \"long\", \"logicalType\": \"timestamp-millis\"}},\n {\"name\": \"start_station_name\", \"type\": \"string\"},\n {\"name\": \"start_station_id\", \"type\": \"string\"},\n {\"name\": \"end_station_name\", \"type\": \"string\"},\n {\"name\": \"end_station_id\", \"type\": \"string\"},\n {\"name\": \"start_lat\", \"type\": \"double\"},\n {\"name\": \"start_lng\", \"type\": \"double\"},\n {\n \"name\": \"end_lat\", \n \"type\": [\"null\", \"double\"],\n \"default\": null\n },\n {\n \"name\": \"end_lng\",\n \"type\": [\"null\", \"double\"],\n \"default\": null\n },\n {\"name\": \"member_casual\", \"type\": \"string\"}\n ]\n}\n", + "datetimeFormat": "yyyy-MM-dd' 'HH:mm:ss", + "from": "{{ outputs.csv_to_ion.uri }}" + } + ] + }, + { + "id": "hello-world", + "namespace": "tutorial", + "revision": 1, + "inputs": [ + { + "id": "user", + "type": "STRING", + "required": true, + "defaults": "Rick Astley" + } + ], + "disabled": false, + "deleted": false, + "description": "Hello World", + "tasks": [ + { + "id": "first_task", + "type": "io.kestra.plugin.core.debug.Return", + "format": "thrilled" + }, + { + "id": "second_task", + "type": "io.kestra.plugin.scripts.shell.Commands", + "taskRunner": { + "type": "io.kestra.plugin.scripts.runner.docker.Docker" + }, + "commands": [ + "sleep 0.42", + "echo '::{\"outputs\":{\"returned_data\":\"mydata\"}}::'" + ] + }, + { + "id": "hello_world", + "type": "io.kestra.plugin.core.log.Log", + "message": "Welcome to Kestra, {{ inputs.user }}! \nWe are {{ outputs.first_task.value}} to have You here!\n" + } + ], + "triggers": [ + { + "id": "daily", + "type": "io.kestra.plugin.core.trigger.Schedule", + "disabled": true, + "cron": "0 9 * * *" + } + ] + }, + { + "id": "infrastructure-automation", + "namespace": "tutorial", + "revision": 1, + "inputs": [ + { + "id": "docker_image", + "type": "STRING", + "required": true, + "defaults": "kestra/myimage:latest" + } + ], + "disabled": false, + "deleted": false, + "description": "Infrastructure Automation", + "tasks": [ + { + "id": "build_image", + "type": "io.kestra.plugin.docker.Build", + "credentials": { + "registry": "https://index.docker.io/v1/", + "username": "{{ secret('DOCKERHUB_USERNAME') }}", + "password": "{{ secret('DOCKERHUB_PASSWORD') }}" + }, + "dockerfile": "FROM python:3.11-alpine\nRUN pip install --no-cache-dir kestra\n", + "tags": [ + "{{ inputs.docker_image }}" + ] + }, + { + "id": "run_container", + "type": "io.kestra.plugin.docker.Run", + "containerImage": "{{ inputs.docker_image }}", + "pullPolicy": "NEVER", + "commands": [ + "pip", + "show", + "kestra" + ] + }, + { + "id": "run_terraform", + "type": "io.kestra.plugin.terraform.cli.TerraformCLI", + "beforeCommands": [ + "terraform init" + ], + "commands": [ + "terraform plan 2>&1 | tee plan_output.txt", + "terraform apply -auto-approve 2>&1 | tee apply_output.txt" + ], + "taskRunner": { + "type": "io.kestra.plugin.scripts.runner.docker.Docker" + }, + "inputFiles": { + "main.tf": "terraform {\n required_providers {\n http = {\n source = \"hashicorp/http\"\n }\n local = {\n source = \"hashicorp/local\"\n }\n }\n}\n\nprovider \"http\" {}\nprovider \"local\" {}\n\nvariable \"pokemon_names\" {\n type = list(string)\n default = [\"pikachu\", \"psyduck\", \"charmander\", \"bulbasaur\"]\n}\n\ndata \"http\" \"pokemon\" {\n count = length(var.pokemon_names)\n url = \"https://pokeapi.co/api/v2/pokemon/${var.pokemon_names[count.index]}\"\n}\n\nlocals {\n pokemon_details = [for i in range(length(var.pokemon_names)) : {\n name = jsondecode(data.http.pokemon[i].response_body)[\"name\"]\n types = join(\", \", [for type in jsondecode(data.http.pokemon[i].response_body)[\"types\"] : type[\"type\"][\"name\"]])\n }]\n\n file_content = join(\"\\n\\n\", [for detail in local.pokemon_details : \"Name: ${detail.name}\\nTypes: ${detail.types}\"])\n}\n\nresource \"local_file\" \"pokemon_details_file\" {\n filename = \"${path.module}/pokemon.txt\"\n content = local.file_content\n}\n\noutput \"file_path\" {\n value = local_file.pokemon_details_file.filename\n}\n" + }, + "outputFiles": [ + "*.txt" + ] + }, + { + "id": "log_pokemon", + "type": "io.kestra.plugin.core.log.Log", + "message": "{{ read(outputs.run_terraform.outputFiles['pokemon.txt']) }}" + } + ] + }, + { + "id": "microservices-and-apis", + "namespace": "tutorial", + "revision": 1, + "inputs": [ + { + "id": "server_uri", + "type": "URI", + "required": true, + "defaults": "https://kestra.io" + }, + { + "id": "slack_webhook_uri", + "type": "URI", + "required": true, + "defaults": "https://reqres.in/api/slack" + } + ], + "disabled": false, + "deleted": false, + "description": "Microservices and APIs", + "tasks": [ + { + "id": "http_status_check", + "type": "io.kestra.plugin.core.flow.AllowFailure", + "errors": [ + { + "id": "server_unreachable", + "type": "io.kestra.plugin.notifications.slack.SlackIncomingWebhook", + "url": "{{ inputs.slack_webhook_uri }}", + "payload": "{\n \"channel\": \"#alerts\",\n \"text\": \"The server {{ inputs.server_uri }} is unreachable!\"\n}\n" + } + ], + "tasks": [ + { + "id": "http_request", + "type": "io.kestra.plugin.core.http.Request", + "uri": "{{ inputs.server_uri }}" + }, + { + "id": "check_status", + "type": "io.kestra.plugin.core.flow.If", + "condition": "{{ outputs.http_request.code != 200 }}", + "then": [ + { + "id": "unhealthy", + "type": "io.kestra.plugin.core.log.Log", + "message": "Server is unhealthy! Response {{ outputs.http_request.body }}" + }, + { + "id": "send_slack_alert", + "type": "io.kestra.plugin.notifications.slack.SlackIncomingWebhook", + "url": "{{ inputs.slack_webhook_uri }}", + "payload": "{\n \"channel\": \"#alerts\",\n \"text\": \"The server {{ inputs.server_uri }} is down!\"\n}\n" + } + ], + "else": [ + { + "id": "healthy", + "type": "io.kestra.plugin.core.log.Log", + "message": "Everything is fine!" + } + ] + } + ] + } + ], + "triggers": [ + { + "id": "daily", + "type": "io.kestra.plugin.core.trigger.Schedule", + "disabled": true, + "cron": "0 9 * * *" + } + ] + }, + { + "id": "business-automation", + "namespace": "tutorial", + "revision": 1, + "disabled": false, + "deleted": false, + "description": "Business Automation", + "tasks": [ + { + "id": "working_directory", + "type": "io.kestra.plugin.core.flow.WorkingDirectory", + "tasks": [ + { + "id": "query", + "type": "io.kestra.plugin.jdbc.sqlite.Queries", + "url": "jdbc:sqlite:kestra.db", + "sql": "CREATE TABLE IF NOT EXISTS features (\n id INTEGER PRIMARY KEY,\n name TEXT NOT NULL,\n description TEXT NOT NULL,\n release_version TEXT NOT NULL,\n edition TEXT NOT NULL\n);\n\nDELETE FROM features;\n\nINSERT INTO features (name, description, release_version, edition)\nVALUES \n ('Worker Groups', 'Allows targeting specific tasks or triggers to run on specific remote workers for better scalability and resource management.', '0.10', 'Enterprise'),\n ('Realtime Triggers', 'Supports triggering event-driven workflows in real-time.', '0.17', 'Open-Source'),\n ('Task Runners', 'Provides on-demand remote execution environments for running tasks.', '0.16', 'Open-Source'),\n ('KV Store', 'Adds key-value storage for persisting data across workflow executions.', '0.18', 'Open-Source'),\n ('SCIM Directory Sync', 'Allows synchronization of users and groups from Identity Providers.', '0.18', 'Enterprise'); \n \nSELECT * FROM features\nORDER BY release_version;\n", + "store": true, + "fetchType": "STORE" + }, + { + "id": "to_csv", + "type": "io.kestra.plugin.serdes.csv.IonToCsv", + "from": "{{ outputs.query.outputs[0].uri }}" + }, + { + "id": "to_excel", + "type": "io.kestra.plugin.serdes.excel.IonToExcel", + "from": "{{ outputs.query.outputs[0].uri }}" + } + ] + } + ] + }, + { + "id": "business-processes", + "namespace": "tutorial", + "revision": 1, + "inputs": [ + { + "id": "request.name", + "type": "STRING", + "required": true, + "defaults": "Rick Astley" + }, + { + "id": "request.start_date", + "type": "DATE", + "required": true, + "defaults": "2024-07-01" + }, + { + "id": "request.end_date", + "type": "DATE", + "required": true, + "defaults": "2024-07-07" + }, + { + "id": "slack_webhook_uri", + "type": "URI", + "required": true, + "defaults": "https://reqres.in/api/slack" + } + ], + "disabled": false, + "deleted": false, + "description": "Business Processes", + "tasks": [ + { + "id": "send_approval_request", + "type": "io.kestra.plugin.notifications.slack.SlackIncomingWebhook", + "url": "{{ inputs.slack_webhook_uri }}", + "payload": "{\n \"channel\": \"#vacation\",\n \"text\": \"Validate holiday request for {{ inputs.request.name }}. To approve the request, click on the `Resume` button here http://localhost:8080/ui/executions/{{flow.namespace}}/{{flow.id}}/{{execution.id}}\"\n}\n" + }, + { + "id": "wait_for_approval", + "type": "io.kestra.plugin.core.flow.Pause" + }, + { + "id": "process_request", + "type": "io.kestra.plugin.core.http.Request", + "uri": "https://reqres.in/api/products", + "method": "POST", + "body": "{{ inputs.request }}", + "contentType": "application/json" + } + ] + }, + { + "id": "data-engineering-pipeline", + "namespace": "tutorial", + "revision": 1, + "inputs": [ + { + "id": "columns_to_keep", + "type": "ARRAY", + "required": true, + "defaults": [ + "brand", + "price" + ], + "itemType": "STRING" + } + ], + "disabled": false, + "deleted": false, + "description": "Data Engineering Pipelines", + "tasks": [ + { + "id": "extract", + "type": "io.kestra.plugin.core.http.Download", + "uri": "https://dummyjson.com/products" + }, + { + "id": "transform", + "type": "io.kestra.plugin.scripts.python.Script", + "taskRunner": { + "type": "io.kestra.plugin.scripts.runner.docker.Docker" + }, + "env": { + "COLUMNS_TO_KEEP": "{{ inputs.columns_to_keep }}" + }, + "inputFiles": { + "data.json": "{{ outputs.extract.uri }}" + }, + "outputFiles": [ + "*.json" + ], + "containerImage": "python:3.11-alpine", + "script": "import json\nimport os\n\ncolumns_to_keep_str = os.getenv(\"COLUMNS_TO_KEEP\")\ncolumns_to_keep = json.loads(columns_to_keep_str)\n\nwith open(\"data.json\", \"r\") as file:\n data = json.load(file)\n\nfiltered_data = [\n {column: product.get(column, \"N/A\") for column in columns_to_keep}\n for product in data[\"products\"]\n]\n\nwith open(\"products.json\", \"w\") as file:\n json.dump(filtered_data, file, indent=4)\n" + }, + { + "id": "query", + "type": "io.kestra.plugin.jdbc.duckdb.Query", + "sql": "INSTALL json;\nLOAD json;\nSELECT brand, round(avg(price), 2) as avg_price\nFROM read_json_auto('{{ workingDir }}/products.json')\nGROUP BY brand\nORDER BY avg_price DESC;\n", + "store": true, + "fetchType": "STORE", + "inputFiles": { + "products.json": "{{ outputs.transform.outputFiles['products.json'] }}" + } + } + ] + }, + { + "id": "cron_etl", + "namespace": "hackathon.demo", + "revision": 1, + "disabled": false, + "deleted": false, + "description": "Daily ETL pipeline — extract, transform, load.", + "labels": [ + { + "key": "team", + "value": "data-engineering" + }, + { + "key": "owner", + "value": "aditya" + } + ], + "tasks": [ + { + "id": "extract", + "type": "io.kestra.plugin.core.log.Log", + "message": "extracting source data" + }, + { + "id": "transform", + "type": "io.kestra.plugin.core.log.Log", + "message": "transforming {{ outputs.extract }}" + }, + { + "id": "load", + "type": "io.kestra.plugin.core.log.Log", + "message": "loading into warehouse" + } + ], + "triggers": [ + { + "id": "nightly", + "type": "io.kestra.plugin.core.trigger.Schedule", + "cron": "0 2 * * *" + } + ] + }, + { + "id": "downstream_consumer", + "namespace": "hackathon.demo", + "revision": 1, + "disabled": false, + "deleted": false, + "description": "Fires when cron_etl completes.", + "tasks": [ + { + "id": "consume", + "type": "io.kestra.plugin.core.log.Log", + "message": "consuming upstream outputs" + } + ], + "triggers": [ + { + "id": "on_cron_etl", + "type": "io.kestra.plugin.core.trigger.Flow", + "conditions": [ + { + "type": "io.kestra.plugin.core.condition.ExecutionFlowCondition", + "namespace": "hackathon.demo", + "flowId": "cron_etl" + } + ] + } + ] + }, + { + "id": "parallel_job", + "namespace": "hackathon.demo", + "revision": 1, + "disabled": false, + "deleted": false, + "description": "Parallel fan-out with a join.", + "tasks": [ + { + "id": "fanout", + "type": "io.kestra.plugin.core.flow.Parallel", + "tasks": [ + { + "id": "branch_a", + "type": "io.kestra.plugin.core.log.Log", + "message": "A" + }, + { + "id": "branch_b", + "type": "io.kestra.plugin.core.log.Log", + "message": "B" + } + ] + }, + { + "id": "join", + "type": "io.kestra.plugin.core.log.Log", + "message": "joined" + } + ] + } + ], + "total": 11 + }, + "cronFlow": { + "id": "cron_etl", + "namespace": "hackathon.demo", + "revision": 1, + "disabled": false, + "deleted": false, + "description": "Daily ETL pipeline — extract, transform, load.", + "labels": [ + { + "key": "team", + "value": "data-engineering" + }, + { + "key": "owner", + "value": "aditya" + } + ], + "tasks": [ + { + "id": "extract", + "type": "io.kestra.plugin.core.log.Log", + "message": "extracting source data" + }, + { + "id": "transform", + "type": "io.kestra.plugin.core.log.Log", + "message": "transforming {{ outputs.extract }}" + }, + { + "id": "load", + "type": "io.kestra.plugin.core.log.Log", + "message": "loading into warehouse" + } + ], + "triggers": [ + { + "id": "nightly", + "type": "io.kestra.plugin.core.trigger.Schedule", + "cron": "0 2 * * *" + } + ] + }, + "cronGraph": { + "nodes": [ + { + "uid": "root.6zX0LWlBipcXmEDq83HJdx", + "type": "io.kestra.core.models.hierarchies.GraphClusterRoot", + "error": false + }, + { + "uid": "root.33a6F4AiFKHXyCfJwQI1ma", + "type": "io.kestra.core.models.hierarchies.GraphClusterEnd", + "error": false + }, + { + "uid": "root.Triggers.2OxiYbMlFkWZHXu7PjCmBO", + "type": "io.kestra.core.models.hierarchies.GraphClusterRoot", + "error": false + }, + { + "uid": "root.Triggers.Thrplyd7njcovWr2Qwmqi", + "type": "io.kestra.core.models.hierarchies.GraphClusterEnd", + "error": false + }, + { + "triggerDeclaration": { + "id": "nightly", + "type": "io.kestra.plugin.core.trigger.Schedule", + "cron": "0 2 * * *" + }, + "trigger": { + "namespace": "hackathon.demo", + "flowId": "cron_etl", + "triggerId": "nightly", + "date": "2026-04-26T07:20:11Z", + "nextExecutionDate": "2026-04-27T02:00:00Z", + "disabled": false + }, + "uid": "root.Triggers.nightly", + "type": "io.kestra.core.models.hierarchies.GraphTrigger", + "error": false + }, + { + "uid": "root.extract", + "type": "io.kestra.core.models.hierarchies.GraphTask", + "error": false, + "task": { + "id": "extract", + "type": "io.kestra.plugin.core.log.Log", + "message": "extracting source data" + }, + "relationType": "SEQUENTIAL" + }, + { + "uid": "root.transform", + "type": "io.kestra.core.models.hierarchies.GraphTask", + "error": false, + "task": { + "id": "transform", + "type": "io.kestra.plugin.core.log.Log", + "message": "transforming {{ outputs.extract }}" + }, + "relationType": "SEQUENTIAL" + }, + { + "uid": "root.load", + "type": "io.kestra.core.models.hierarchies.GraphTask", + "error": false, + "task": { + "id": "load", + "type": "io.kestra.plugin.core.log.Log", + "message": "loading into warehouse" + }, + "relationType": "SEQUENTIAL" + } + ], + "edges": [ + { + "source": "root.Triggers.Thrplyd7njcovWr2Qwmqi", + "target": "root.6zX0LWlBipcXmEDq83HJdx", + "relation": {} + }, + { + "source": "root.6zX0LWlBipcXmEDq83HJdx", + "target": "root.extract", + "relation": {} + }, + { + "source": "root.extract", + "target": "root.transform", + "relation": { + "relationType": "SEQUENTIAL" + } + }, + { + "source": "root.transform", + "target": "root.load", + "relation": { + "relationType": "SEQUENTIAL" + } + }, + { + "source": "root.load", + "target": "root.33a6F4AiFKHXyCfJwQI1ma", + "relation": {} + }, + { + "source": "root.Triggers.nightly", + "target": "root.Triggers.Thrplyd7njcovWr2Qwmqi", + "relation": {} + }, + { + "source": "root.Triggers.2OxiYbMlFkWZHXu7PjCmBO", + "target": "root.Triggers.nightly", + "relation": {} + } + ], + "clusters": [ + { + "cluster": { + "uid": "cluster_root.Triggers", + "type": "io.kestra.core.models.hierarchies.GraphCluster", + "error": false + }, + "nodes": [ + "root.Triggers.2OxiYbMlFkWZHXu7PjCmBO", + "root.Triggers.Thrplyd7njcovWr2Qwmqi", + "root.Triggers.nightly" + ], + "parents": [], + "start": "root.Triggers.2OxiYbMlFkWZHXu7PjCmBO", + "end": "root.Triggers.Thrplyd7njcovWr2Qwmqi" + } + ] + }, + "downstreamFlow": { + "id": "downstream_consumer", + "namespace": "hackathon.demo", + "revision": 1, + "disabled": false, + "deleted": false, + "description": "Fires when cron_etl completes.", + "tasks": [ + { + "id": "consume", + "type": "io.kestra.plugin.core.log.Log", + "message": "consuming upstream outputs" + } + ], + "triggers": [ + { + "id": "on_cron_etl", + "type": "io.kestra.plugin.core.trigger.Flow", + "conditions": [ + { + "type": "io.kestra.plugin.core.condition.ExecutionFlowCondition", + "namespace": "hackathon.demo", + "flowId": "cron_etl" + } + ] + } + ] + }, + "parallelFlow": { + "id": "parallel_job", + "namespace": "hackathon.demo", + "revision": 1, + "disabled": false, + "deleted": false, + "description": "Parallel fan-out with a join.", + "tasks": [ + { + "id": "fanout", + "type": "io.kestra.plugin.core.flow.Parallel", + "tasks": [ + { + "id": "branch_a", + "type": "io.kestra.plugin.core.log.Log", + "message": "A" + }, + { + "id": "branch_b", + "type": "io.kestra.plugin.core.log.Log", + "message": "B" + } + ] + }, + { + "id": "join", + "type": "io.kestra.plugin.core.log.Log", + "message": "joined" + } + ] + }, + "parallelGraph": { + "nodes": [ + { + "uid": "root.7Uud90FKuEhOL10OZAVtLQ", + "type": "io.kestra.core.models.hierarchies.GraphClusterRoot", + "error": false + }, + { + "uid": "root.QMnkadDYJa8LndEbplQYT", + "type": "io.kestra.core.models.hierarchies.GraphClusterEnd", + "error": false + }, + { + "uid": "root.fanout.4teE0rNdXtiQO9XIpl0yE0", + "type": "io.kestra.core.models.hierarchies.GraphClusterRoot", + "error": false + }, + { + "uid": "root.fanout.466kvIRI982XkOwqJ2KZrD", + "type": "io.kestra.core.models.hierarchies.GraphClusterEnd", + "error": false + }, + { + "uid": "root.fanout", + "type": "io.kestra.core.models.hierarchies.GraphTask", + "error": false, + "task": { + "id": "fanout", + "type": "io.kestra.plugin.core.flow.Parallel", + "tasks": [ + { + "id": "branch_a", + "type": "io.kestra.plugin.core.log.Log", + "message": "A" + }, + { + "id": "branch_b", + "type": "io.kestra.plugin.core.log.Log", + "message": "B" + } + ] + }, + "relationType": "PARALLEL" + }, + { + "uid": "root.fanout.branch_a", + "type": "io.kestra.core.models.hierarchies.GraphTask", + "error": false, + "task": { + "id": "branch_a", + "type": "io.kestra.plugin.core.log.Log", + "message": "A" + }, + "relationType": "PARALLEL" + }, + { + "uid": "root.fanout.branch_b", + "type": "io.kestra.core.models.hierarchies.GraphTask", + "error": false, + "task": { + "id": "branch_b", + "type": "io.kestra.plugin.core.log.Log", + "message": "B" + }, + "relationType": "PARALLEL" + }, + { + "uid": "root.join", + "type": "io.kestra.core.models.hierarchies.GraphTask", + "error": false, + "task": { + "id": "join", + "type": "io.kestra.plugin.core.log.Log", + "message": "joined" + }, + "relationType": "SEQUENTIAL" + } + ], + "edges": [ + { + "source": "root.7Uud90FKuEhOL10OZAVtLQ", + "target": "root.fanout.4teE0rNdXtiQO9XIpl0yE0", + "relation": {} + }, + { + "source": "root.join", + "target": "root.QMnkadDYJa8LndEbplQYT", + "relation": {} + }, + { + "source": "root.fanout.466kvIRI982XkOwqJ2KZrD", + "target": "root.join", + "relation": { + "relationType": "SEQUENTIAL" + } + }, + { + "source": "root.fanout", + "target": "root.fanout.branch_a", + "relation": { + "relationType": "PARALLEL" + } + }, + { + "source": "root.fanout", + "target": "root.fanout.branch_b", + "relation": { + "relationType": "PARALLEL" + } + }, + { + "source": "root.fanout.branch_b", + "target": "root.fanout.466kvIRI982XkOwqJ2KZrD", + "relation": {} + }, + { + "source": "root.fanout.branch_a", + "target": "root.fanout.466kvIRI982XkOwqJ2KZrD", + "relation": {} + }, + { + "source": "root.fanout.4teE0rNdXtiQO9XIpl0yE0", + "target": "root.fanout", + "relation": {} + } + ], + "clusters": [ + { + "cluster": { + "uid": "cluster_root.fanout", + "type": "io.kestra.core.models.hierarchies.GraphCluster", + "error": false, + "relationType": "PARALLEL", + "taskNode": { + "uid": "root.fanout", + "type": "io.kestra.core.models.hierarchies.GraphTask", + "error": false, + "task": { + "id": "fanout", + "type": "io.kestra.plugin.core.flow.Parallel", + "tasks": [ + { + "id": "branch_a", + "type": "io.kestra.plugin.core.log.Log", + "message": "A" + }, + { + "id": "branch_b", + "type": "io.kestra.plugin.core.log.Log", + "message": "B" + } + ] + }, + "relationType": "PARALLEL" + } + }, + "nodes": [ + "root.fanout.4teE0rNdXtiQO9XIpl0yE0", + "root.fanout.466kvIRI982XkOwqJ2KZrD", + "root.fanout", + "root.fanout.branch_a", + "root.fanout.branch_b" + ], + "parents": [], + "start": "root.fanout.4teE0rNdXtiQO9XIpl0yE0", + "end": "root.fanout.466kvIRI982XkOwqJ2KZrD" + } + ] + }, + "executions": { + "results": [ + { + "id": "BQlmx4ZF2Ii253ulgTACv", + "namespace": "hackathon.demo", + "flowId": "cron_etl", + "flowRevision": 1, + "taskRunList": [ + { + "id": "6bicjOJGrqXWoXNM7VhAOB", + "executionId": "BQlmx4ZF2Ii253ulgTACv", + "namespace": "hackathon.demo", + "flowId": "cron_etl", + "taskId": "extract", + "attempts": [ + { + "state": { + "current": "SUCCESS", + "histories": [ + { + "state": "CREATED", + "date": "2026-04-26T07:25:05.373Z" + }, + { + "state": "RUNNING", + "date": "2026-04-26T07:25:05.373Z" + }, + { + "state": "SUCCESS", + "date": "2026-04-26T07:25:05.384Z" + } + ], + "endDate": "2026-04-26T07:25:05.384Z", + "startDate": "2026-04-26T07:25:05.373Z", + "duration": "PT0.011S" + } + } + ], + "outputs": {}, + "state": { + "current": "SUCCESS", + "histories": [ + { + "state": "CREATED", + "date": "2026-04-26T07:25:04.403Z" + }, + { + "state": "RUNNING", + "date": "2026-04-26T07:25:05.366Z" + }, + { + "state": "SUCCESS", + "date": "2026-04-26T07:25:05.385Z" + } + ], + "endDate": "2026-04-26T07:25:05.385Z", + "startDate": "2026-04-26T07:25:04.403Z", + "duration": "PT0.982S" + } + }, + { + "id": "71DFQHHfxufTxU38wrlIhN", + "executionId": "BQlmx4ZF2Ii253ulgTACv", + "namespace": "hackathon.demo", + "flowId": "cron_etl", + "taskId": "transform", + "attempts": [ + { + "state": { + "current": "SUCCESS", + "histories": [ + { + "state": "CREATED", + "date": "2026-04-26T07:25:06.423Z" + }, + { + "state": "RUNNING", + "date": "2026-04-26T07:25:06.423Z" + }, + { + "state": "SUCCESS", + "date": "2026-04-26T07:25:06.455Z" + } + ], + "endDate": "2026-04-26T07:25:06.455Z", + "startDate": "2026-04-26T07:25:06.423Z", + "duration": "PT0.032S" + } + } + ], + "outputs": {}, + "state": { + "current": "SUCCESS", + "histories": [ + { + "state": "CREATED", + "date": "2026-04-26T07:25:06.397Z" + }, + { + "state": "RUNNING", + "date": "2026-04-26T07:25:06.421Z" + }, + { + "state": "SUCCESS", + "date": "2026-04-26T07:25:06.455Z" + } + ], + "endDate": "2026-04-26T07:25:06.455Z", + "startDate": "2026-04-26T07:25:06.397Z", + "duration": "PT0.058S" + } + }, + { + "id": "2RAgoFUAgoXfEZPBLgXiYA", + "executionId": "BQlmx4ZF2Ii253ulgTACv", + "namespace": "hackathon.demo", + "flowId": "cron_etl", + "taskId": "load", + "attempts": [ + { + "state": { + "current": "SUCCESS", + "histories": [ + { + "state": "CREATED", + "date": "2026-04-26T07:25:06.580Z" + }, + { + "state": "RUNNING", + "date": "2026-04-26T07:25:06.580Z" + }, + { + "state": "SUCCESS", + "date": "2026-04-26T07:25:06.582Z" + } + ], + "endDate": "2026-04-26T07:25:06.582Z", + "startDate": "2026-04-26T07:25:06.580Z", + "duration": "PT0.002S" + } + } + ], + "outputs": {}, + "state": { + "current": "SUCCESS", + "histories": [ + { + "state": "CREATED", + "date": "2026-04-26T07:25:06.527Z" + }, + { + "state": "RUNNING", + "date": "2026-04-26T07:25:06.578Z" + }, + { + "state": "SUCCESS", + "date": "2026-04-26T07:25:06.582Z" + } + ], + "endDate": "2026-04-26T07:25:06.582Z", + "startDate": "2026-04-26T07:25:06.527Z", + "duration": "PT0.055S" + } + } + ], + "labels": [ + { + "key": "team", + "value": "data-engineering" + }, + { + "key": "owner", + "value": "aditya" + }, + { + "key": "system.correlationId", + "value": "BQlmx4ZF2Ii253ulgTACv" + } + ], + "state": { + "current": "SUCCESS", + "histories": [ + { + "state": "CREATED", + "date": "2026-04-26T07:25:03.372Z" + }, + { + "state": "RUNNING", + "date": "2026-04-26T07:25:04.414Z" + }, + { + "state": "SUCCESS", + "date": "2026-04-26T07:25:06.621Z" + } + ], + "endDate": "2026-04-26T07:25:06.621Z", + "startDate": "2026-04-26T07:25:03.372Z", + "duration": "PT3.249S" + }, + "originalId": "BQlmx4ZF2Ii253ulgTACv", + "deleted": false, + "metadata": { + "attemptNumber": 1, + "originalCreatedDate": "2026-04-26T07:25:03.372Z" + } + } + ], + "total": 1 + }, + "executionDetail": { + "id": "BQlmx4ZF2Ii253ulgTACv", + "namespace": "hackathon.demo", + "flowId": "cron_etl", + "flowRevision": 1, + "taskRunList": [ + { + "id": "6bicjOJGrqXWoXNM7VhAOB", + "executionId": "BQlmx4ZF2Ii253ulgTACv", + "namespace": "hackathon.demo", + "flowId": "cron_etl", + "taskId": "extract", + "attempts": [ + { + "state": { + "current": "SUCCESS", + "histories": [ + { + "state": "CREATED", + "date": "2026-04-26T07:25:05.373Z" + }, + { + "state": "RUNNING", + "date": "2026-04-26T07:25:05.373Z" + }, + { + "state": "SUCCESS", + "date": "2026-04-26T07:25:05.384Z" + } + ], + "endDate": "2026-04-26T07:25:05.384Z", + "startDate": "2026-04-26T07:25:05.373Z", + "duration": "PT0.011S" + } + } + ], + "outputs": {}, + "state": { + "current": "SUCCESS", + "histories": [ + { + "state": "CREATED", + "date": "2026-04-26T07:25:04.403Z" + }, + { + "state": "RUNNING", + "date": "2026-04-26T07:25:05.366Z" + }, + { + "state": "SUCCESS", + "date": "2026-04-26T07:25:05.385Z" + } + ], + "endDate": "2026-04-26T07:25:05.385Z", + "startDate": "2026-04-26T07:25:04.403Z", + "duration": "PT0.982S" + } + }, + { + "id": "71DFQHHfxufTxU38wrlIhN", + "executionId": "BQlmx4ZF2Ii253ulgTACv", + "namespace": "hackathon.demo", + "flowId": "cron_etl", + "taskId": "transform", + "attempts": [ + { + "state": { + "current": "SUCCESS", + "histories": [ + { + "state": "CREATED", + "date": "2026-04-26T07:25:06.423Z" + }, + { + "state": "RUNNING", + "date": "2026-04-26T07:25:06.423Z" + }, + { + "state": "SUCCESS", + "date": "2026-04-26T07:25:06.455Z" + } + ], + "endDate": "2026-04-26T07:25:06.455Z", + "startDate": "2026-04-26T07:25:06.423Z", + "duration": "PT0.032S" + } + } + ], + "outputs": {}, + "state": { + "current": "SUCCESS", + "histories": [ + { + "state": "CREATED", + "date": "2026-04-26T07:25:06.397Z" + }, + { + "state": "RUNNING", + "date": "2026-04-26T07:25:06.421Z" + }, + { + "state": "SUCCESS", + "date": "2026-04-26T07:25:06.455Z" + } + ], + "endDate": "2026-04-26T07:25:06.455Z", + "startDate": "2026-04-26T07:25:06.397Z", + "duration": "PT0.058S" + } + }, + { + "id": "2RAgoFUAgoXfEZPBLgXiYA", + "executionId": "BQlmx4ZF2Ii253ulgTACv", + "namespace": "hackathon.demo", + "flowId": "cron_etl", + "taskId": "load", + "attempts": [ + { + "state": { + "current": "SUCCESS", + "histories": [ + { + "state": "CREATED", + "date": "2026-04-26T07:25:06.580Z" + }, + { + "state": "RUNNING", + "date": "2026-04-26T07:25:06.580Z" + }, + { + "state": "SUCCESS", + "date": "2026-04-26T07:25:06.582Z" + } + ], + "endDate": "2026-04-26T07:25:06.582Z", + "startDate": "2026-04-26T07:25:06.580Z", + "duration": "PT0.002S" + } + } + ], + "outputs": {}, + "state": { + "current": "SUCCESS", + "histories": [ + { + "state": "CREATED", + "date": "2026-04-26T07:25:06.527Z" + }, + { + "state": "RUNNING", + "date": "2026-04-26T07:25:06.578Z" + }, + { + "state": "SUCCESS", + "date": "2026-04-26T07:25:06.582Z" + } + ], + "endDate": "2026-04-26T07:25:06.582Z", + "startDate": "2026-04-26T07:25:06.527Z", + "duration": "PT0.055S" + } + } + ], + "labels": [ + { + "key": "team", + "value": "data-engineering" + }, + { + "key": "owner", + "value": "aditya" + }, + { + "key": "system.correlationId", + "value": "BQlmx4ZF2Ii253ulgTACv" + } + ], + "state": { + "current": "SUCCESS", + "histories": [ + { + "state": "CREATED", + "date": "2026-04-26T07:25:03.372Z" + }, + { + "state": "RUNNING", + "date": "2026-04-26T07:25:04.414Z" + }, + { + "state": "SUCCESS", + "date": "2026-04-26T07:25:06.621Z" + } + ], + "endDate": "2026-04-26T07:25:06.621Z", + "startDate": "2026-04-26T07:25:03.372Z", + "duration": "PT3.249S" + }, + "originalId": "BQlmx4ZF2Ii253ulgTACv", + "deleted": false, + "metadata": { + "attemptNumber": 1, + "originalCreatedDate": "2026-04-26T07:25:03.372Z" + } + } +} diff --git a/ingestion/tests/unit/topology/pipeline/test_kestra.py b/ingestion/tests/unit/topology/pipeline/test_kestra.py new file mode 100644 index 000000000000..f5aee45f860a --- /dev/null +++ b/ingestion/tests/unit/topology/pipeline/test_kestra.py @@ -0,0 +1,175 @@ +# Copyright 2025 Collate +# Licensed under the Collate Community License, Version 1.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Unit tests for KestraSource.yield_pipeline / yield_pipeline_status / +yield_pipeline_lineage_details, plus get_pipelines_list edge cases. +""" + +import json +import pathlib +from types import SimpleNamespace +from unittest.mock import MagicMock, patch + +import pytest + +from metadata.generated.schema.api.data.createPipeline import CreatePipelineRequest +from metadata.generated.schema.entity.data.pipeline import StatusType +from metadata.generated.schema.metadataIngestion.workflow import ( + OpenMetadataWorkflowConfig, +) +from metadata.ingestion.ometa.utils import model_str +from metadata.ingestion.source.pipeline.kestra.metadata import KestraSource +from metadata.ingestion.source.pipeline.kestra.models import ( + KestraExecution, + KestraFlow, + KestraGraph, + KestraSearchResult, +) + + +FIXTURE = ( + pathlib.Path(__file__).resolve().parents[3] + / "unit" + / "resources" + / "datasets" + / "kestra_dataset.json" +) +DATA = json.loads(FIXTURE.read_text()) + + +MOCK_CONFIG = { + "source": { + "type": "Kestra", + "serviceName": "kestra_test", + "serviceConnection": { + "config": { + "type": "Kestra", + "hostPort": "http://localhost:8080", + "tenantId": "main", + } + }, + "sourceConfig": {"config": {"type": "PipelineMetadata"}}, + }, + "sink": {"type": "metadata-rest", "config": {}}, + "workflowConfig": { + "openMetadataServerConfig": { + "hostPort": "http://localhost:8585/api", + "authProvider": "openmetadata", + "securityConfig": {"jwtToken": "test"}, + } + }, +} + + +@pytest.fixture +def source() -> KestraSource: + """Build a KestraSource via .create() with base test_connection patched.""" + config = OpenMetadataWorkflowConfig.model_validate(MOCK_CONFIG) + with patch( + "metadata.ingestion.source.pipeline.pipeline_service." + "PipelineServiceSource.test_connection" + ): + src = KestraSource.create( + MOCK_CONFIG["source"], + config.workflowConfig.openMetadataServerConfig, + ) + src.client = MagicMock() + src.context = MagicMock() + src.context.get.return_value = SimpleNamespace(pipeline_service="kestra_test") + src.metadata = MagicMock() + src.status = MagicMock() + return src + + +def test_yield_pipeline_builds_tasks_from_graph(source): + flow = KestraFlow.model_validate(DATA["cronFlow"]) + graph = KestraGraph.model_validate(DATA["cronGraph"]) + source.client.get_flow_graph.return_value = graph + + results = list(source.yield_pipeline(flow)) + + assert results + rights = [r.right for r in results if r.right is not None] + assert rights, "expected at least one CreatePipelineRequest" + req: CreatePipelineRequest = rights[0] + task_names = {model_str(t.name) for t in req.tasks} + assert {"extract", "transform", "load"}.issubset(task_names) + # Cron schedule preserved + assert req.scheduleInterval == "0 2 * * *" + + +def test_yield_pipeline_status_emits_successful_runs(source): + flow = KestraFlow.model_validate(DATA["cronFlow"]) + exec_list = KestraSearchResult.model_validate(DATA["executions"]) + exec_detail = KestraExecution.model_validate(DATA["executionDetail"]) + + source.client.search_executions.return_value = iter( + KestraExecution.model_validate(x) for x in exec_list.results + ) + source.client.get_execution.return_value = exec_detail + + statuses = list(source.yield_pipeline_status(flow)) + rights = [s.right for s in statuses if s.right is not None] + assert rights + assert rights[0].pipeline_status.executionStatus == StatusType.Successful + # 3 task statuses reported (extract, transform, load) + assert len(rights[0].pipeline_status.taskStatus) == 3 + + +def test_yield_pipeline_lineage_for_flow_trigger(source): + import uuid + flow = KestraFlow.model_validate(DATA["downstreamFlow"]) + + upstream_pipeline = SimpleNamespace( + id=uuid.uuid4(), + fullyQualifiedName=SimpleNamespace( + root="kestra_test.hackathon.demo.cron_etl" + ), + ) + this_pipeline = SimpleNamespace( + id=uuid.uuid4(), + fullyQualifiedName=SimpleNamespace( + root="kestra_test.hackathon.demo.downstream_consumer" + ), + ) + source.metadata.get_by_name.side_effect = [this_pipeline, upstream_pipeline] + + edges = list(source.yield_pipeline_lineage_details(flow)) + assert any(e.right is not None for e in edges) + + +def test_yield_pipeline_handles_parallel_flowable(source): + """Ensure Parallel flowable + its branches all surface as tasks.""" + flow = KestraFlow.model_validate(DATA["parallelFlow"]) + graph = KestraGraph.model_validate(DATA["parallelGraph"]) + source.client.get_flow_graph.return_value = graph + + req = next( + r.right for r in source.yield_pipeline(flow) if r.right is not None + ) + names = {model_str(t.name) for t in req.tasks} + # All three task IDs from the YAML are surfaced (fanout, branch_a, branch_b, join) + # plus possibly a synthetic "root" or container node + assert {"branch_a", "branch_b", "join"}.issubset(names) + + +def test_get_pipelines_list_skips_disabled(source): + enabled = KestraFlow.model_validate( + {"id": "ok", "namespace": "p", "tasks": [], "disabled": False} + ) + disabled = KestraFlow.model_validate( + {"id": "skipme", "namespace": "p", "tasks": [], "disabled": True} + ) + source.client.search_flows.return_value = iter([enabled, disabled]) + source.source_config = SimpleNamespace(pipelineFilterPattern=None) + + result = list(source.get_pipelines_list()) + assert [f.id for f in result] == ["ok"] diff --git a/ingestion/tests/unit/topology/pipeline/test_kestra_client.py b/ingestion/tests/unit/topology/pipeline/test_kestra_client.py new file mode 100644 index 000000000000..348b2a1fb2d3 --- /dev/null +++ b/ingestion/tests/unit/topology/pipeline/test_kestra_client.py @@ -0,0 +1,125 @@ +# Copyright 2025 Collate +# Licensed under the Collate Community License, Version 1.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Unit tests for KestraClient HTTP wrapper. +""" + +import json +import pathlib +from unittest.mock import MagicMock, patch + +from metadata.generated.schema.entity.services.connections.pipeline.kestraConnection import ( + KestraConnection, +) +from metadata.ingestion.models.custom_pydantic import CustomSecretStr +from metadata.ingestion.source.pipeline.kestra.client import KestraClient + + +FIXTURE = ( + pathlib.Path(__file__).resolve().parents[3] + / "unit" + / "resources" + / "datasets" + / "kestra_dataset.json" +) +DATA = json.loads(FIXTURE.read_text()) + + +def _config(**overrides) -> KestraConnection: + base = {"hostPort": "http://kestra.test:8080", "tenantId": "main"} + base.update(overrides) + return KestraConnection(**base) + + +def _mock_get(url, **_kwargs): + m = MagicMock() + m.raise_for_status.return_value = None + if url.endswith("/flows/search"): + m.json.return_value = DATA["flows"] + elif url.endswith("/flows/hackathon.demo/cron_etl/graph"): + m.json.return_value = DATA["cronGraph"] + elif url.endswith("/flows/hackathon.demo/cron_etl"): + m.json.return_value = DATA["cronFlow"] + elif url.endswith("/executions/search"): + m.json.return_value = DATA["executions"] + elif "/executions/" in url: + m.json.return_value = DATA["executionDetail"] + else: + raise AssertionError(f"unexpected URL: {url}") + return m + + +def test_search_flows_returns_results(): + client = KestraClient(_config()) + with patch.object(client._session, "get", side_effect=_mock_get): + flows = list(client.search_flows(page_size=50)) + assert len(flows) == DATA["flows"]["total"] + fqns = {f"{f.namespace}.{f.id}" for f in flows} + assert "hackathon.demo.cron_etl" in fqns + assert "hackathon.demo.downstream_consumer" in fqns + assert "hackathon.demo.parallel_job" in fqns + + +def test_get_flow_graph_returns_nodes_and_edges(): + client = KestraClient(_config()) + with patch.object(client._session, "get", side_effect=_mock_get): + graph = client.get_flow_graph("hackathon.demo", "cron_etl") + assert len(graph.nodes) > 0 + assert len(graph.edges) > 0 + + +def test_get_execution_returns_taskrunlist(): + client = KestraClient(_config()) + exec_id = DATA["executionDetail"]["id"] + with patch.object(client._session, "get", side_effect=_mock_get): + ex = client.get_execution(exec_id) + assert ex.state.current == "SUCCESS" + assert ex.taskRunList and len(ex.taskRunList) >= 3 + + +def test_basic_auth_when_username_and_password_set(): + client = KestraClient( + _config(username="admin", password=CustomSecretStr("secret")) + ) + assert client._session.auth == ("admin", "secret") + + +def test_token_auth_when_token_set(): + client = KestraClient(_config(token=CustomSecretStr("tok"))) + assert client._session.headers.get("Authorization") == "Bearer tok" + + +def test_no_auth_when_credentials_empty(): + client = KestraClient(_config()) + assert client._session.auth is None + assert "Authorization" not in client._session.headers + + +def test_token_wins_over_basic(): + client = KestraClient( + _config( + username="admin", + password=CustomSecretStr("secret"), + token=CustomSecretStr("tok"), + ) + ) + assert client._session.headers.get("Authorization") == "Bearer tok" + assert client._session.auth is None + + +def test_url_uses_tenant_when_set(): + client = KestraClient(_config(tenantId="main")) + assert client._url("/flows/search") == "http://kestra.test:8080/api/v1/main/flows/search" + + +def test_url_omits_tenant_when_empty(): + client = KestraClient(_config(tenantId="")) + assert client._url("/flows/search") == "http://kestra.test:8080/api/v1/flows/search" diff --git a/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/pipeline/kestraConnection.json b/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/pipeline/kestraConnection.json new file mode 100644 index 000000000000..6428b6cb0a01 --- /dev/null +++ b/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/pipeline/kestraConnection.json @@ -0,0 +1,71 @@ +{ + "$id": "https://open-metadata.org/schema/entity/services/connections/pipeline/kestraConnection.json", + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "KestraConnection", + "description": "Kestra Metadata Pipeline Connection Config", + "type": "object", + "javaType": "org.openmetadata.schema.services.connections.pipeline.KestraConnection", + "definitions": { + "KestraType": { + "description": "Service type.", + "type": "string", + "enum": ["Kestra"], + "default": "Kestra" + } + }, + "properties": { + "type": { + "title": "Service Type", + "description": "Service Type", + "$ref": "#/definitions/KestraType", + "default": "Kestra" + }, + "hostPort": { + "expose": true, + "title": "Host And Port", + "description": "Kestra host and port (scheme required, e.g. http://localhost:8080).", + "type": "string", + "format": "uri", + "default": "http://localhost:8080" + }, + "tenantId": { + "title": "Tenant Id", + "description": "Kestra tenant id. OSS default 'main'.", + "type": "string", + "default": "main" + }, + "username": { + "title": "Username", + "description": "Username for Basic authentication. Leave empty for no-auth or token auth.", + "type": "string" + }, + "password": { + "title": "Password", + "description": "Password for Basic authentication.", + "type": "string", + "format": "password" + }, + "token": { + "title": "API Token", + "description": "Bearer token for Kestra EE/Cloud.", + "type": "string", + "format": "password" + }, + "verifySSL": { + "title": "Verify SSL", + "type": "boolean", + "default": true + }, + "pipelineFilterPattern": { + "title": "Default Pipeline Filter Pattern", + "description": "Regex include/exclude on flow FQN (namespace.flowId).", + "$ref": "../../../../type/filterPattern.json#/definitions/filterPattern" + }, + "supportsMetadataExtraction": { + "title": "Supports Metadata Extraction", + "$ref": "../connectionBasicType.json#/definitions/supportsMetadataExtraction" + } + }, + "additionalProperties": false, + "required": ["hostPort"] +} diff --git a/openmetadata-spec/src/main/resources/json/schema/entity/services/pipelineService.json b/openmetadata-spec/src/main/resources/json/schema/entity/services/pipelineService.json index 649839fe1aed..4fa67209ec8f 100644 --- a/openmetadata-spec/src/main/resources/json/schema/entity/services/pipelineService.json +++ b/openmetadata-spec/src/main/resources/json/schema/entity/services/pipelineService.json @@ -40,7 +40,8 @@ "SSIS", "Snowplow", "Mulesoft", - "MicrosoftFabricPipeline" + "MicrosoftFabricPipeline", + "Kestra" ], "javaEnums": [ { @@ -114,6 +115,9 @@ }, { "name": "MicrosoftFabricPipeline" + }, + { + "name": "Kestra" } ] }, @@ -199,6 +203,9 @@ }, { "$ref": "./connections/pipeline/microsoftFabricPipelineConnection.json" + }, + { + "$ref": "./connections/pipeline/kestraConnection.json" } ] } From c640631c961eb16ab004393cc084483aa6e33346 Mon Sep 17 00:00:00 2001 From: Aditya Puri Date: Sun, 26 Apr 2026 14:13:11 +0000 Subject: [PATCH 2/3] fix(kestra): preserve namespace in Pipeline FQN; close session; typed trigger access Three review fixes on the connector: 1. CRITICAL: Pipeline FQN was registered as service.flowId (1-part) but _pipeline_fqn() builds service.namespace.flowId (3-part), so metadata.get_by_name silently returned None for any namespaced flow, suppressing all status and lineage emission. Pipeline names now include the Kestra namespace; lookups agree. 2. HIGH: KestraClient leaked its requests.Session on every ingestion run. Added close() and context-manager protocol. 3. HIGH: yield_pipeline_lineage_details and _schedule_from_triggers round-tripped trigger objects through model_dump(by_alias=True), then read them back as dicts. Replaced with direct typed-attribute access (trig.type, trig.cron, trig.conditions, trig.id). Tests: 14/14 still pass; added an FQN regression assertion. --- .../ingestion/source/pipeline/kestra/client.py | 10 ++++++++++ .../ingestion/source/pipeline/kestra/metadata.py | 14 ++++++-------- .../tests/unit/topology/pipeline/test_kestra.py | 3 +++ 3 files changed, 19 insertions(+), 8 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/pipeline/kestra/client.py b/ingestion/src/metadata/ingestion/source/pipeline/kestra/client.py index 351804afae8e..e28c5b5875d8 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/kestra/client.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/kestra/client.py @@ -148,3 +148,13 @@ def ping(self) -> bool: ) resp.raise_for_status() return True + + def close(self) -> None: + """Release the underlying HTTP session.""" + self._session.close() + + def __enter__(self) -> "KestraClient": + return self + + def __exit__(self, *_args) -> None: + self.close() diff --git a/ingestion/src/metadata/ingestion/source/pipeline/kestra/metadata.py b/ingestion/src/metadata/ingestion/source/pipeline/kestra/metadata.py index ab4eb32c991d..3f8a9cc979c5 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/kestra/metadata.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/kestra/metadata.py @@ -148,7 +148,7 @@ def yield_pipeline( ) request = CreatePipelineRequest( - name=EntityName(pipeline_details.id), + name=EntityName(_flow_fqn(pipeline_details)), displayName=pipeline_details.id, description=( Markdown(pipeline_details.description) @@ -226,10 +226,9 @@ def yield_pipeline_lineage_details( service_name = self.context.get().pipeline_service for trig in triggers: - t = trig.model_dump(by_alias=True) if hasattr(trig, "model_dump") else dict(trig) - if not str(t.get("type", "")).endswith("trigger.Flow"): + if not trig.type.endswith("trigger.Flow"): continue - for cond in t.get("conditions") or []: + for cond in trig.conditions or []: ctype = str(cond.get("type", "")) if not ctype.endswith("ExecutionFlowCondition"): continue @@ -255,7 +254,7 @@ def yield_pipeline_lineage_details( ), lineageDetails=LineageDetails( source=LineageSource.PipelineLineage, - description=f"Kestra Flow trigger: {t.get('id')}", + description=f"Kestra Flow trigger: {trig.id}", ), ) ) @@ -332,9 +331,8 @@ def _schedule_from_triggers(triggers: Optional[list]) -> Optional[str]: if not triggers: return None for trig in triggers: - t = trig.model_dump(by_alias=True) if hasattr(trig, "model_dump") else dict(trig) - if str(t.get("type", "")).endswith("Schedule") and t.get("cron"): - return str(t["cron"]) + if trig.type.endswith("Schedule") and trig.cron: + return str(trig.cron) return None def _task_statuses(self, detail: KestraExecution) -> List[TaskStatus]: diff --git a/ingestion/tests/unit/topology/pipeline/test_kestra.py b/ingestion/tests/unit/topology/pipeline/test_kestra.py index f5aee45f860a..9dd07d21efe0 100644 --- a/ingestion/tests/unit/topology/pipeline/test_kestra.py +++ b/ingestion/tests/unit/topology/pipeline/test_kestra.py @@ -104,6 +104,9 @@ def test_yield_pipeline_builds_tasks_from_graph(source): assert {"extract", "transform", "load"}.issubset(task_names) # Cron schedule preserved assert req.scheduleInterval == "0 2 * * *" + # FQN preserves Kestra namespace so metadata.get_by_name works + # at lineage emission time + assert model_str(req.name) == "hackathon.demo.cron_etl" def test_yield_pipeline_status_emits_successful_runs(source): From b4802898b6f6cfb02762c2fdd135c4665820f696 Mon Sep 17 00:00:00 2001 From: Aditya Puri Date: Sun, 26 Apr 2026 15:50:11 +0000 Subject: [PATCH 3/3] fix(kestra): quote namespace-dotted Pipeline FQN in lookups Pipeline names contain dots (Kestra namespace) and OM s FQN builder wraps such names in double quotes when forming the FQN. Our _pipeline_fqn() and the upstream-flow lookup in yield_pipeline_lineage_details() were building unquoted FQNs, so the metadata.get_by_name() calls during status + lineage emission missed every namespaced pipeline. Found during a real ingestion against OM 1.12.0-SNAPSHOT: server stores kestra_demo."hackathon.demo.cron_etl" but the connector was looking up kestra_demo.hackathon.demo.cron_etl. Tests: existing 14/14 still green; live ingestion now emits with 100% workflow success. --- .../metadata/ingestion/source/pipeline/kestra/metadata.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/pipeline/kestra/metadata.py b/ingestion/src/metadata/ingestion/source/pipeline/kestra/metadata.py index 3f8a9cc979c5..42c934515378 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/kestra/metadata.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/kestra/metadata.py @@ -236,7 +236,7 @@ def yield_pipeline_lineage_details( up_id = cond.get("flowId") if not (up_ns and up_id): continue - up_fqn = f"{service_name}.{up_ns}.{up_id}" + up_fqn = f'{service_name}."{up_ns}.{up_id}"' up_pipe = self.metadata.get_by_name(entity=Pipeline, fqn=up_fqn) if not up_pipe: continue @@ -350,7 +350,10 @@ def _task_statuses(self, detail: KestraExecution) -> List[TaskStatus]: def _pipeline_fqn(self, flow: KestraFlow) -> str: service_name = self.context.get().pipeline_service - return f"{service_name}.{flow.namespace}.{flow.id}" + # The Pipeline name contains dots (Kestra namespace), so OM wraps it + # in quotes when forming the FQN. Mirror that here so get_by_name() + # resolves during lineage and status emission. + return f'{service_name}."{_flow_fqn(flow)}"' def _error(self, msg: str, exc: Exception) -> StackTraceError: logger.warning("%s: %s", msg, exc)