From b0410286eb8d3112628f80c4b5862d910418e69e Mon Sep 17 00:00:00 2001 From: edwardvaneechoud Date: Tue, 23 Jun 2026 23:36:37 +0200 Subject: [PATCH] Implement run flow in flow support --- .../flowfile_core/ai/tools/classification.py | 1 + .../flowfile_core/ai/tools/node_docs.py | 21 + .../flowfile_core/configs/node_store/nodes.py | 13 + .../flowfile_core/flowfile/flow_graph.py | 279 ++++++++++- .../flowfile_core/flowfile/run_flow_guard.py | 41 ++ .../flowfile_core/routes/flow_api.py | 32 ++ flowfile_core/flowfile_core/routes/routes.py | 13 + .../flowfile_core/schemas/flow_api_schema.py | 14 + .../flowfile_core/schemas/input_schema.py | 32 ++ .../flowfile_core/schemas/schemas.py | 1 + .../tests/test_parameter_integration.py | 18 + flowfile_core/tests/test_run_flow_node.py | 435 ++++++++++++++++++ flowfile_frame/flowfile_frame/flow_frame.py | 59 +++ flowfile_frame/flowfile_frame/flow_frame.pyi | 3 + flowfile_frame/tests/test_run_flow.py | 60 +++ .../src/renderer/app/api/flowApi.api.ts | 14 + .../elements/restApiReader/RestApiReader.vue | 7 + .../node-types/elements/runFlow/RunFlow.vue | 243 ++++++++++ .../node-types/elements/runFlow/utils.ts | 16 + .../designer/assets/icons/run_flow.svg | 7 + .../renderer/app/features/designer/utils.ts | 1 + .../src/renderer/app/types/node.types.ts | 14 + 22 files changed, 1323 insertions(+), 1 deletion(-) create mode 100644 flowfile_core/flowfile_core/flowfile/run_flow_guard.py create mode 100644 flowfile_core/tests/test_run_flow_node.py create mode 100644 flowfile_frame/tests/test_run_flow.py create mode 100644 flowfile_frontend/src/renderer/app/components/nodes/node-types/elements/runFlow/RunFlow.vue create mode 100644 flowfile_frontend/src/renderer/app/components/nodes/node-types/elements/runFlow/utils.ts create mode 100644 flowfile_frontend/src/renderer/app/features/designer/assets/icons/run_flow.svg diff --git a/flowfile_core/flowfile_core/ai/tools/classification.py b/flowfile_core/flowfile_core/ai/tools/classification.py index 7f5b663cf..5eb26c787 100644 --- a/flowfile_core/flowfile_core/ai/tools/classification.py +++ b/flowfile_core/flowfile_core/ai/tools/classification.py @@ -72,6 +72,7 @@ "kafka_source": "source", "google_analytics_reader": "source", "rest_api_reader": "source", + "run_flow": "static", "external_source": "source", "promise": "passthrough", "user_defined": "dynamic", diff --git a/flowfile_core/flowfile_core/ai/tools/node_docs.py b/flowfile_core/flowfile_core/ai/tools/node_docs.py index 8e5cfbf14..df158cc77 100644 --- a/flowfile_core/flowfile_core/ai/tools/node_docs.py +++ b/flowfile_core/flowfile_core/ai/tools/node_docs.py @@ -545,6 +545,14 @@ def sidebar_section_for(node_type: str) -> str: "node — its output is the unmodified primary input. Often used to enforce " "side-effect ordering (write A before reading B)." ), + "run_flow": ( + "Run a saved sub-flow once per input row, mapping input columns to the " + "sub-flow's ${parameters}; the per-row outputs are unioned with a " + "__param_value__ column. Use to fan a pipeline (e.g. a REST API call) " + "across a list of inputs like tickers or ids. The sub-flow must have " + "exactly one API Response node defining its output. Don't use for a single " + "static call (build the flow directly) or when no per-row iteration is needed." + ), } @@ -1034,6 +1042,19 @@ def sidebar_section_for(node_type: str) -> str: "Pitfall: this is for ordering side effects, not data — " "input-1's data is discarded; only its completion is observed." ), + "run_flow": ( + "Settings panel: pick a saved flow (it must have one API Response node), " + "then map each of its ${parameters} to an input column, and optionally set " + "a delay between runs and a max-rows cap. Worked example: 'call Finnhub for " + "every ticker in my table' → drag 'Run Flow' from Combine Operations, select " + "your quote sub-flow whose REST API url is " + "https://finnhub.io/api/v1/quote?symbol=${ticker}, map its 'ticker' parameter " + "to your 'symbol' column, set a 1-second delay to respect rate limits, and " + "run — you get one output row per input row plus a __param_value__ column. " + "Pitfall: rows run sequentially (no parallelism yet) and inputs beyond max " + "rows are skipped, so raise max rows for large inputs and expect longer " + "runtimes when a delay is set." + ), } diff --git a/flowfile_core/flowfile_core/configs/node_store/nodes.py b/flowfile_core/flowfile_core/configs/node_store/nodes.py index d86be8eaf..b4304de5d 100644 --- a/flowfile_core/flowfile_core/configs/node_store/nodes.py +++ b/flowfile_core/flowfile_core/configs/node_store/nodes.py @@ -680,6 +680,19 @@ def get_all_standard_nodes() -> tuple[list[NodeTemplate], dict[str, NodeTemplate laziness="eager", tags=[NodeTag.ML, NodeTag.MACHINE_LEARNING, NodeTag.EVALUATE, NodeTag.METRICS, NodeTag.MODEL], ), + NodeTemplate( + name="Run flow", + item="run_flow", + input=1, + output=1, + transform_type="other", + node_type="process", + image="run_flow.svg", + node_group="combine", + drawer_title="Run Flow", + drawer_intro="Run a saved sub-flow once per input row, mapping columns to its parameters", + laziness="eager", + ), NodeTemplate( name="Wait For", item="wait_for", diff --git a/flowfile_core/flowfile_core/flowfile/flow_graph.py b/flowfile_core/flowfile_core/flowfile/flow_graph.py index 9134e5a0f..7af7ca287 100644 --- a/flowfile_core/flowfile_core/flowfile/flow_graph.py +++ b/flowfile_core/flowfile_core/flowfile/flow_graph.py @@ -10,7 +10,7 @@ from functools import partial from importlib.metadata import PackageNotFoundError, version from pathlib import Path -from time import time +from time import sleep, time from typing import Any, Literal, NamedTuple, Union from uuid import uuid1 @@ -83,6 +83,7 @@ find_unresolved_in_model, restore_parameters, ) +from flowfile_core.flowfile.run_flow_guard import guard_sub_flow from flowfile_core.flowfile.schema_callbacks import ( calculate_cross_join_schema, calculate_fuzzy_match_schema, @@ -4246,6 +4247,282 @@ def schema_callback() -> list[FlowfileColumn]: self.add_node_to_starting_list(node) self._node_ids.append(node_rest_api_reader.node_id) + @with_history_capture(HistoryActionType.UPDATE_SETTINGS) + def add_run_flow(self, node_run_flow: input_schema.NodeRunFlow) -> None: + """Adds a node that runs a saved sub-flow once per input row. + + For each input row, the mapped input-column values are injected into the + sub-flow's ``${param}`` references; the sub-flow runs (sequentially, with an + optional inter-run delay) and its single ``api_response`` node's output is + captured. Per-row outputs are unioned (diagonal concat) and a + ``__param_value__`` / ``__param___`` column records the value used. + + Each per-row result is materialised on the worker under a unique ref and + concatenated lazily, so the core never holds full datasets. v1 runs rows + sequentially because sub-flow runs share flow_id-keyed scratch dirs. + """ + + def _func(fl: FlowDataEngine) -> FlowDataEngine: + return self._execute_run_flow_node(node_run_flow, fl) + + def schema_callback() -> list[FlowfileColumn]: + return self._predict_run_flow_schema(node_run_flow) + + self.add_node_step( + node_id=node_run_flow.node_id, + function=_func, + node_type="run_flow", + setting_input=node_run_flow, + schema_callback=schema_callback, + input_node_ids=[node_run_flow.depending_on_id], + ) + + @staticmethod + def _run_flow_param_specs( + settings: input_schema.NodeRunFlow, + ) -> list[tuple[str, str, str]]: + """Active (param_name, input_column, output_column) tuples for the mappings. + + The output column is ``__param_value__`` for a single mapping, else one + ``__param___`` per mapped parameter. + """ + mappings = [m for m in settings.parameter_mappings if m.param_name and m.input_column] + single = len(mappings) == 1 + return [ + (m.param_name, m.input_column, "__param_value__" if single else f"__param_{m.param_name}__") + for m in mappings + ] + + @staticmethod + def _resolve_run_flow_path(settings: input_schema.NodeRunFlow) -> str | None: + """Resolve the sub-flow's filesystem path (own ``flow_reference``, else registration).""" + if settings.flow_reference: + return settings.flow_reference + if settings.flow_registration_id is not None: + from flowfile_core.flowfile.catalog_helpers import find_registration_by_registration_id + + snap = find_registration_by_registration_id(settings.flow_registration_id) + if snap is not None: + return snap.flow_path + return None + + @staticmethod + def _assert_no_run_flow_collisions(column_names: list[str], specs: list[tuple[str, str, str]]) -> None: + """Refuse a param output column that clashes with the sub-flow's own output. + + Without this, ``with_columns(...).alias("__param_value__")`` would silently + overwrite a same-named column the sub-flow already produces. + """ + existing = set(column_names) + for _, _, out_col in specs: + if out_col in existing: + raise ValueError( + f"Run-flow sub-flow already emits a column named '{out_col}', which would collide " + "with the parameter-value column. Rename that column in the sub-flow." + ) + + def _predict_run_flow_schema(self, settings: input_schema.NodeRunFlow) -> list[FlowfileColumn]: + """Predict output columns: the sub-flow's api_response schema plus the param column(s). + + Never returns empty — an empty schema_callback makes the engine fall back to + running ``_func`` during prediction, which would execute the sub-flow. + """ + specs = self._run_flow_param_specs(settings) + sub_cols: list[FlowfileColumn] = [] + flow_path = self._resolve_run_flow_path(settings) + if flow_path: + from flowfile_core.flowfile.manage.io_flowfile import open_flow + + # The guard also breaks self-referential flows: predicting the sub-flow's + # api_response schema traverses back into the nested run_flow node, which + # would otherwise open the same flow forever. + try: + with guard_sub_flow(str(Path(flow_path).resolve())): + sub_flow = open_flow(Path(flow_path), user_id=settings.user_id) + api_nodes = [n for n in sub_flow.nodes if n.node_type == "api_response"] + if len(api_nodes) == 1: + predicted = api_nodes[0].get_predicted_schema() + sub_cols = list(predicted) if predicted else [] + except Exception as e: # noqa: BLE001 - incl. recursion; prediction is best-effort + logger.warning(f"Run-flow schema prediction failed: {e}") + existing = {c.name for c in sub_cols} + param_cols = [ + FlowfileColumn.from_input(out_col, "String") for (_, _, out_col) in specs if out_col not in existing + ] + schema = sub_cols + param_cols + if not schema: + schema = [FlowfileColumn.from_input("__param_value__", "String")] + return schema + + def _materialize_run_flow_row(self, lf: pl.LazyFrame, node_id: int | str, index: int) -> pl.LazyFrame: + """Materialise one row's sub-flow output to a stable, uniquely-named location. + + Sequential sub-flow runs reuse the sub-flow's flow_id-keyed Arrow IPC output + path, so a lazy reference captured this iteration would be overwritten by the + next run. Offloading to the worker with a per-iteration ``file_ref`` writes a + distinct file and returns a lazy scan over it (core never collects). Falls + back to an in-core collect only in local mode or when no worker is reachable. + """ + if self.execution_location == "local": + return lf.collect().lazy() + try: + fetcher = ExternalDfFetcher( + lf=lf, + file_ref=f"__runflow_{node_id}_{index}", + flow_id=self.flow_id, + node_id=node_id, + wait_on_completion=True, + ) + except Exception as exc: # noqa: BLE001 - worker unreachable; degrade to in-core + logger.warning("Run-flow worker offload unavailable (%s); materializing row %s in core", exc, index) + return lf.collect().lazy() + if fetcher.has_error: + if fetcher.error_code == -1: + logger.warning("Run-flow worker materialize was killed; materializing row %s in core", index) + return lf.collect().lazy() + raise ValueError(fetcher.error_description or "Run-flow worker materialize failed") + return fetcher.get_result() + + def _load_run_flow_subflow(self, flow_path: str, user_id: int | None) -> tuple["FlowGraph", FlowNode]: + """Open the sub-flow and validate its single-``api_response`` output contract. + + Returns ``(sub_flow, api_node)``; raises ``ValueError`` on zero or multiple + api_response nodes. Sets the sub-flow to run eagerly at this graph's location. + """ + from flowfile_core.flowfile.manage.io_flowfile import open_flow + + sub_flow = open_flow(Path(flow_path), user_id=user_id) + api_nodes = [n for n in sub_flow.nodes if n.node_type == "api_response"] + if len(api_nodes) == 0: + raise ValueError("Run-flow sub-flow has no API Response node to define its output.") + if len(api_nodes) > 1: + raise ValueError("Run-flow sub-flow has more than one API Response node.") + sub_flow.flow_settings.execution_mode = "Performance" + sub_flow.flow_settings.execution_location = self.execution_location + return sub_flow, api_nodes[0] + + @staticmethod + def _build_run_flow_driver( + input_lf: pl.LazyFrame, specs: list[tuple[str, str, str]], max_rows: int + ) -> tuple[pl.DataFrame, bool]: + """Build the bounded driver frame for iteration. + + Selects only the mapped columns (or a synthetic row index when nothing is + mapped) and collects ``head(max_rows + 1)`` so overflow is detected without + materialising the whole input. Returns ``(driver_df capped to max_rows, overflow)``. + """ + mapped_cols = list(dict.fromkeys(input_col for (_, input_col, _) in specs)) + if mapped_cols: + driver_df = input_lf.select(mapped_cols).head(max_rows + 1).collect() + else: + total = input_lf.select(pl.len()).collect().item() + driver_df = pl.DataFrame({"__row__": range(min(total, max_rows + 1))}) + overflow = driver_df.height > max_rows + if overflow: + driver_df = driver_df.head(max_rows) + return driver_df, overflow + + @staticmethod + def _raise_run_flow_no_data(sub_flow: "FlowGraph", api_node: FlowNode, run_info) -> None: + """Raise a clear error when the sub-flow ran but its api_response produced no data.""" + from flowfile_core.flowfile.api_runner import _first_error + + unconfigured = [n.node_id for n in sub_flow.nodes if not n.is_setup] + if unconfigured: + raise ValueError( + f"Run-flow sub-flow has unconfigured node(s) {sorted(unconfigured)} that were " + "saved without settings. Open the sub-flow, finish configuring/connecting every " + "node, run it once to confirm it produces data, then re-save it." + ) + detail = api_node.results.errors or _first_error(run_info) + raise ValueError( + "Run-flow sub-flow API Response node produced no data" + + ( + f": {detail}" + if detail + else ". Check that the API Response node is connected to the branch whose " + "data you want returned, and that the sub-flow runs and produces rows on its own." + ) + ) + + def _run_subflow_for_row( + self, + sub_flow: "FlowGraph", + api_node: FlowNode, + row: dict, + specs: list[tuple[str, str, str]], + params_by_name: dict, + node_id: int | str, + index: int, + ) -> pl.LazyFrame: + """Inject this row's params, run the sub-flow once, annotate and materialise its output. + + Returns a lazy frame (sub-flow output plus the param column(s)), possibly with + zero rows; the engine re-runs the param-referencing nodes because the resolved + ``${param}`` value changes the node hash, so there is no cross-row data bleed. + """ + from flowfile_core.flowfile.api_runner import _first_error, _flow_run_lock + + for param_name, input_col, _ in specs: + param = params_by_name.get(param_name) + if param is not None: + value = row.get(input_col) + param.default_value = "" if value is None else str(value) + with _flow_run_lock(sub_flow.flow_id): + run_info = sub_flow.run_graph() + if run_info is None or not run_info.success: + raise ValueError(_first_error(run_info) or f"Run-flow sub-flow failed on row {index}") + data = api_node.get_resulting_data() + if data is None: + self._raise_run_flow_no_data(sub_flow, api_node, run_info) + row_lf = data.data_frame + if isinstance(row_lf, pl.DataFrame): + row_lf = row_lf.lazy() + if index == 0: + self._assert_no_run_flow_collisions(row_lf.collect_schema().names(), specs) + for _param_name, input_col, out_col in specs: + value = row.get(input_col) + row_lf = row_lf.with_columns(pl.lit("" if value is None else str(value)).alias(out_col)) + return self._materialize_run_flow_row(row_lf, node_id, index) + + def _execute_run_flow_node(self, settings: input_schema.NodeRunFlow, fl: FlowDataEngine) -> FlowDataEngine: + """Run the sub-flow once per input row and return the unioned, lazily-built result.""" + flow_path = self._resolve_run_flow_path(settings) + if not flow_path: + raise ValueError("Run-flow node has no sub-flow selected.") + + specs = self._run_flow_param_specs(settings) + max_rows = max(0, settings.max_rows) + + input_lf = fl.data_frame.lazy() if isinstance(fl.data_frame, pl.DataFrame) else fl.data_frame + driver_df, overflow = self._build_run_flow_driver(input_lf, specs, max_rows) + if overflow: + self.flow_logger.warning( + f"Run-flow node {settings.node_id}: input exceeds max_rows={max_rows}; " + f"running the first {max_rows} rows and skipping the rest." + ) + + sub_flow, api_node = self._load_run_flow_subflow(flow_path, settings.user_id) + params_by_name = {p.name: p for p in sub_flow.flow_settings.parameters} + + results: list[pl.LazyFrame] = [] + n_iter = driver_df.height + # v1 is sequential: parallelism would need a per-instance flow_id (the sub-flow + # shares its saved flow_id across runs, keying _flow_run_lock and worker dirs). + with guard_sub_flow(str(Path(flow_path).resolve())): + for i, row in enumerate(driver_df.iter_rows(named=True)): + results.append( + self._run_subflow_for_row(sub_flow, api_node, row, specs, params_by_name, settings.node_id, i) + ) + if n_iter > 1 and ((i + 1) % 25 == 0 or i + 1 == n_iter): + self.flow_logger.info(f"Run-flow node {settings.node_id}: completed {i + 1}/{n_iter} rows") + if settings.delay_seconds and i < n_iter - 1: + sleep(settings.delay_seconds) + + if not results: + return FlowDataEngine.create_from_schema(self._predict_run_flow_schema(settings)) + return FlowDataEngine(pl.concat(results, how="diagonal_relaxed")) + @with_history_capture(HistoryActionType.UPDATE_SETTINGS) def add_cloud_storage_writer(self, node_cloud_storage_writer: input_schema.NodeCloudStorageWriter) -> None: """Adds a node to write data to a cloud storage provider. diff --git a/flowfile_core/flowfile_core/flowfile/run_flow_guard.py b/flowfile_core/flowfile_core/flowfile/run_flow_guard.py new file mode 100644 index 000000000..444a4383d --- /dev/null +++ b/flowfile_core/flowfile_core/flowfile/run_flow_guard.py @@ -0,0 +1,41 @@ +"""Cycle detection for the Run-flow node. + +A sub-flow that (directly or transitively) runs the flow already executing would +loop forever. This tracks the resolved flow paths currently on the execution +stack via a ``ContextVar`` (which nested synchronous ``run_graph`` calls share) +and refuses to re-enter one. +""" + +from __future__ import annotations + +import contextvars +from collections.abc import Iterator +from contextlib import contextmanager + +_active_flow_paths: contextvars.ContextVar[frozenset[str]] = contextvars.ContextVar( + "run_flow_active_paths", default=frozenset() +) + + +class RecursiveSubFlowError(Exception): + """Raised when a Run-flow node would re-enter a flow already executing.""" + + +@contextmanager +def guard_sub_flow(flow_path: str) -> Iterator[None]: + """Mark *flow_path* as executing for the duration of the block. + + Raises: + RecursiveSubFlowError: if *flow_path* is already on the execution stack. + """ + active = _active_flow_paths.get() + if flow_path in active: + raise RecursiveSubFlowError( + f"Detected recursive sub-flow execution: '{flow_path}' is already running. " + "A Run-flow node cannot (directly or transitively) run the flow that contains it." + ) + token = _active_flow_paths.set(active | {flow_path}) + try: + yield + finally: + _active_flow_paths.reset(token) diff --git a/flowfile_core/flowfile_core/routes/flow_api.py b/flowfile_core/flowfile_core/routes/flow_api.py index 4a9b12156..7f5b5fa06 100644 --- a/flowfile_core/flowfile_core/routes/flow_api.py +++ b/flowfile_core/flowfile_core/routes/flow_api.py @@ -46,6 +46,7 @@ ApiTestRequest, FlowParamInfo, PublishableFlow, + RunnableFlow, ) _API_RUN_TIMEOUT = float(os.environ.get("FLOWFILE_API_RUN_TIMEOUT_SECONDS", "120")) @@ -297,6 +298,37 @@ def list_publishable_flows( ] +@management_router.get("/runnable-flows", response_model=list[RunnableFlow]) +def list_runnable_flows( + current_user=Depends(get_current_active_user), + db: Session = Depends(get_db), +): + """The current user's API-compatible flows, for the Run-flow node's picker. + + A flow qualifies when it has exactly one ``api_response`` node + (``is_api_compatible``). Unlike ``publishable-flows`` this does not exclude flows + already published as endpoints — a flow can be both an API and a sub-flow. + """ + regs = ( + db.query(db_models.FlowRegistration) + .filter( + db_models.FlowRegistration.owner_id == current_user.id, + db_models.FlowRegistration.is_api_compatible.is_(True), + ) + .order_by(db_models.FlowRegistration.name) + .all() + ) + return [ + RunnableFlow( + registration_id=r.id, + name=r.name, + flow_path=r.flow_path or "", + file_exists=bool(r.flow_path) and Path(r.flow_path).exists(), + ) + for r in regs + ] + + @management_router.get("/flows/{registration_id}/parameters", response_model=list[FlowParamInfo]) def get_flow_parameters( registration_id: int, diff --git a/flowfile_core/flowfile_core/routes/routes.py b/flowfile_core/flowfile_core/routes/routes.py index 575758b90..419449614 100644 --- a/flowfile_core/flowfile_core/routes/routes.py +++ b/flowfile_core/flowfile_core/routes/routes.py @@ -77,6 +77,7 @@ ) from flowfile_core.flowfile.flow_graph import add_connection, delete_connection from flowfile_core.flowfile.flow_node.multi_output import DEFAULT_OUTPUT_HANDLE +from flowfile_core.flowfile.parameter_resolver import apply_parameters_in_place from flowfile_core.flowfile.sources.external_sources.rest_api_source import ( build_rest_api_worker_settings, infer_schema_from_sample, @@ -1201,6 +1202,18 @@ def fetch_rest_api_sample( if flow is None: raise HTTPException(404, "could not find the flow") + # Resolve ${param} references (in url/headers/query/body) using the flow's + # parameter defaults so a parameterized request can be sampled. ``node`` is a + # throwaway parse of the request body, so no restoration is needed; on an + # undefined param apply_parameters_in_place rolls back and raises with the + # offending names. The editor sample path is trusted (the flow owner), so the + # api_runner string-safety gate does not apply here. + params = {p.name: p.default_value for p in flow.flow_settings.parameters} + try: + apply_parameters_in_place(node, params) + except ValueError as e: + raise HTTPException(422, str(e)) from e + # Resolve the credential to an encrypted token (stored secret by name, or an # inline plaintext); the worker decrypts it. Never persist inline plaintext. auth = node.rest_api_settings.auth diff --git a/flowfile_core/flowfile_core/schemas/flow_api_schema.py b/flowfile_core/flowfile_core/schemas/flow_api_schema.py index f809479c7..30e9b987c 100644 --- a/flowfile_core/flowfile_core/schemas/flow_api_schema.py +++ b/flowfile_core/flowfile_core/schemas/flow_api_schema.py @@ -123,6 +123,20 @@ class PublishableFlow(BaseModel): file_exists: bool = True +class RunnableFlow(BaseModel): + """A flow that can be run as a sub-flow (has an api_response node). + + Surfaced by the Run-flow node's picker. Unlike ``PublishableFlow`` this carries the + flow's path (stored as the node's canonical ``flow_reference``) and does not exclude + flows already published as endpoints — a flow can be both an API and a sub-flow. + """ + + registration_id: int + name: str + flow_path: str + file_exists: bool = True + + class ApiKeyCreate(BaseModel): """Request body to mint a new API key for an endpoint.""" diff --git a/flowfile_core/flowfile_core/schemas/input_schema.py b/flowfile_core/flowfile_core/schemas/input_schema.py index 5702c6cb2..ab49bc935 100644 --- a/flowfile_core/flowfile_core/schemas/input_schema.py +++ b/flowfile_core/flowfile_core/schemas/input_schema.py @@ -1261,6 +1261,38 @@ def get_default_description(self) -> str: return " | ".join(pieces) +class ParameterMapping(BaseModel): + """Maps an input column to a sub-flow parameter for the Run-flow node.""" + + param_name: str + input_column: str = "" + + +class NodeRunFlow(NodeSingleInput): + """Settings for a node that runs a saved sub-flow once per input row. + + Each input row supplies string values for the sub-flow's ``${param}`` references + (``parameter_mappings``); the sub-flow runs once per row (sequentially, with an + optional ``delay_seconds`` between runs) and the per-row outputs are unioned. The + sub-flow's single ``api_response`` node defines what data is returned. + + ``flow_reference`` (the saved flow's filesystem path) is canonical; a + ``flow_registration_id`` is resolved to a path at run time when set. + """ + + flow_reference: str | None = None + flow_registration_id: int | None = None + parameter_mappings: list[ParameterMapping] = Field(default_factory=list) + delay_seconds: float = 0.0 + max_rows: int = 1000 + + def get_default_description(self) -> str: + """Describes which sub-flow runs per input row.""" + if self.flow_reference: + return f"Run {Path(self.flow_reference).stem} per input row" + return "Run a sub-flow per input row" + + class NodeFormula(NodeSingleInput): """Settings for a node that applies a formula to create/modify a column.""" diff --git a/flowfile_core/flowfile_core/schemas/schemas.py b/flowfile_core/flowfile_core/schemas/schemas.py index fa421e092..eeefdfbe9 100644 --- a/flowfile_core/flowfile_core/schemas/schemas.py +++ b/flowfile_core/flowfile_core/schemas/schemas.py @@ -54,6 +54,7 @@ "kafka_source": input_schema.NodeKafkaSource, "google_analytics_reader": input_schema.NodeGoogleAnalyticsReader, "rest_api_reader": input_schema.NodeRestApiReader, + "run_flow": input_schema.NodeRunFlow, "external_source": input_schema.NodeExternalSource, "promise": input_schema.NodePromise, "user_defined": input_schema.UserDefinedNode, diff --git a/flowfile_core/tests/test_parameter_integration.py b/flowfile_core/tests/test_parameter_integration.py index 5b4169ad1..c6d8538cc 100644 --- a/flowfile_core/tests/test_parameter_integration.py +++ b/flowfile_core/tests/test_parameter_integration.py @@ -585,3 +585,21 @@ def test_parameter_change_invalidates_predicted_schema(): finally: os.unlink(csv_a) os.unlink(csv_b) + + +# REST API node + + +def test_rest_api_url_parameter_reaches_worker_settings(): + """${param} in a REST API url resolves into the settings sent to the worker.""" + from flowfile_core.flowfile.parameter_resolver import apply_parameters_in_place + from flowfile_core.flowfile.sources.external_sources.rest_api_source import build_rest_api_worker_settings + + node = input_schema.NodeRestApiReader( + flow_id=1, + node_id=1, + rest_api_settings=input_schema.RestApiSettings(url="https://finnhub.io/api/v1/quote?symbol=${ticker}"), + ) + apply_parameters_in_place(node, {"ticker": "AAPL"}) + worker_settings = build_rest_api_worker_settings(node, None) + assert worker_settings.url == "https://finnhub.io/api/v1/quote?symbol=AAPL" diff --git a/flowfile_core/tests/test_run_flow_node.py b/flowfile_core/tests/test_run_flow_node.py new file mode 100644 index 000000000..d1f77f2b3 --- /dev/null +++ b/flowfile_core/tests/test_run_flow_node.py @@ -0,0 +1,435 @@ +"""Integration tests for the Run-flow node. + +A Run-flow node runs a saved sub-flow once per input row, mapping input columns to +the sub-flow's ${param} references. These tests build a hermetic sub-flow +(manual_input -> polars_code echoing ${x} -> api_response), save it, then drive it +from a parent flow and assert the unioned per-row output plus the __param_value__ +correlation column. +""" + +import logging + +import polars as pl +import pytest + +from flowfile_core.flowfile.flow_graph import add_connection +from flowfile_core.flowfile.handler import FlowfileHandler +from flowfile_core.schemas import input_schema, schemas, transform_schema +from flowfile_core.schemas.schemas import FlowParameter + + +def _make_graph(flow_id: int, execution_location: str = "local"): + handler = FlowfileHandler() + handler.register_flow( + schemas.FlowSettings( + flow_id=flow_id, + name="run_flow_test", + path=".", + execution_mode="Development", + execution_location=execution_location, + ) + ) + return handler.get_flow(flow_id) + + +def _build_and_save_echo_subflow(path, flow_id: int = 7011) -> None: + """manual_input(1 row) -> polars_code echoing ${x} -> api_response, saved to *path*. + + The sub-flow has a single flow parameter ``x``; each run emits one row whose + ``echo`` column equals the value injected for ``x``. + """ + handler = FlowfileHandler() + handler.register_flow( + schemas.FlowSettings( + flow_id=flow_id, + name="echo_subflow", + path=str(path), + execution_mode="Development", + execution_location="local", + ) + ) + graph = handler.get_flow(flow_id) + graph.flow_settings.parameters = [FlowParameter(name="x", default_value="")] + + graph.add_node_promise(input_schema.NodePromise(flow_id=flow_id, node_id=1, node_type="manual_input")) + graph.add_manual_input( + input_schema.NodeManualInput( + flow_id=flow_id, + node_id=1, + raw_data_format=input_schema.RawData.from_pylist([{"seed": 1}]), + ) + ) + + graph.add_node_promise(input_schema.NodePromise(flow_id=flow_id, node_id=2, node_type="polars_code")) + graph.add_polars_code( + input_schema.NodePolarsCode( + flow_id=flow_id, + node_id=2, + polars_code_input=transform_schema.PolarsCodeInput( + polars_code="output_df = input_df.with_columns(pl.lit('${x}').alias('echo')).select('echo')" + ), + depending_on_ids=[1], + ) + ) + add_connection(graph, input_schema.NodeConnection.create_from_simple_input(1, 2)) + + graph.add_node_promise(input_schema.NodePromise(flow_id=flow_id, node_id=3, node_type="api_response")) + graph.add_api_response(input_schema.NodeApiResponse(flow_id=flow_id, node_id=3, depending_on_id=2)) + add_connection(graph, input_schema.NodeConnection.create_from_simple_input(2, 3)) + + graph.save_flow(str(path)) + + +def _run(graph): + result = graph.run_graph() + assert result is not None, "run_graph returned None" + return result + + +def test_run_flow_runs_subflow_per_row(execution_location, tmp_path): + """The sub-flow runs once per input row; outputs union with a __param_value__ column.""" + sub_path = tmp_path / "echo.yaml" + _build_and_save_echo_subflow(sub_path) + + graph = _make_graph(7000, execution_location=execution_location) + tickers = ["AAPL", "MSFT", "GOOG"] + graph.add_node_promise(input_schema.NodePromise(flow_id=graph.flow_id, node_id=1, node_type="manual_input")) + graph.add_manual_input( + input_schema.NodeManualInput( + flow_id=graph.flow_id, + node_id=1, + raw_data_format=input_schema.RawData.from_pylist([{"ticker": t} for t in tickers]), + ) + ) + + graph.add_node_promise(input_schema.NodePromise(flow_id=graph.flow_id, node_id=2, node_type="run_flow")) + graph.add_run_flow( + input_schema.NodeRunFlow( + flow_id=graph.flow_id, + node_id=2, + depending_on_id=1, + flow_reference=str(sub_path), + parameter_mappings=[input_schema.ParameterMapping(param_name="x", input_column="ticker")], + max_rows=100, + ) + ) + add_connection(graph, input_schema.NodeConnection.create_from_simple_input(1, 2)) + + result = _run(graph) + assert result.success, "\n".join(f"node {r.node_id}: {r.error}" for r in result.node_step_result if not r.success) + + out = graph.get_node(2).get_resulting_data().data_frame + df = out.collect() if isinstance(out, pl.LazyFrame) else out + df = df.sort("__param_value__") + assert df.height == len(tickers) + assert df["echo"].to_list() == sorted(tickers) + assert df["__param_value__"].to_list() == sorted(tickers) + + +def test_run_flow_max_rows_cap(execution_location, tmp_path, caplog): + """Input larger than max_rows is truncated to max_rows with a logged warning.""" + sub_path = tmp_path / "echo.yaml" + _build_and_save_echo_subflow(sub_path) + + graph = _make_graph(7001, execution_location=execution_location) + graph.add_node_promise(input_schema.NodePromise(flow_id=graph.flow_id, node_id=1, node_type="manual_input")) + graph.add_manual_input( + input_schema.NodeManualInput( + flow_id=graph.flow_id, + node_id=1, + raw_data_format=input_schema.RawData.from_pylist([{"ticker": f"T{i}"} for i in range(5)]), + ) + ) + + graph.add_node_promise(input_schema.NodePromise(flow_id=graph.flow_id, node_id=2, node_type="run_flow")) + graph.add_run_flow( + input_schema.NodeRunFlow( + flow_id=graph.flow_id, + node_id=2, + depending_on_id=1, + flow_reference=str(sub_path), + parameter_mappings=[input_schema.ParameterMapping(param_name="x", input_column="ticker")], + max_rows=2, + ) + ) + add_connection(graph, input_schema.NodeConnection.create_from_simple_input(1, 2)) + + with caplog.at_level(logging.WARNING): + result = _run(graph) + assert result.success + + out = graph.get_node(2).get_resulting_data().data_frame + df = out.collect() if isinstance(out, pl.LazyFrame) else out + assert df.height == 2 + assert any("max_rows" in rec.message for rec in caplog.records) + + +def test_run_flow_recursion_guard(tmp_path): + """A flow whose Run-flow node references itself fails with a recursion error.""" + self_path = tmp_path / "self.yaml" + + handler = FlowfileHandler() + handler.register_flow( + schemas.FlowSettings( + flow_id=7002, + name="self_ref", + path=str(self_path), + execution_mode="Development", + execution_location="local", + ) + ) + graph = handler.get_flow(7002) + + graph.add_node_promise(input_schema.NodePromise(flow_id=7002, node_id=1, node_type="manual_input")) + graph.add_manual_input( + input_schema.NodeManualInput( + flow_id=7002, + node_id=1, + raw_data_format=input_schema.RawData.from_pylist([{"v": 1}]), + ) + ) + graph.add_node_promise(input_schema.NodePromise(flow_id=7002, node_id=2, node_type="run_flow")) + graph.add_run_flow( + input_schema.NodeRunFlow( + flow_id=7002, + node_id=2, + depending_on_id=1, + flow_reference=str(self_path), + max_rows=5, + ) + ) + add_connection(graph, input_schema.NodeConnection.create_from_simple_input(1, 2)) + graph.add_node_promise(input_schema.NodePromise(flow_id=7002, node_id=3, node_type="api_response")) + graph.add_api_response(input_schema.NodeApiResponse(flow_id=7002, node_id=3, depending_on_id=2)) + add_connection(graph, input_schema.NodeConnection.create_from_simple_input(2, 3)) + graph.save_flow(str(self_path)) + + from flowfile_core.flowfile.manage.io_flowfile import open_flow + + reopened = open_flow(self_path, user_id=1) + result = reopened.run_graph() + node2 = next(r for r in result.node_step_result if r.node_id == 2) + assert node2.success is False + assert "recursive" in (node2.error or "").lower() + + +def test_run_flow_no_subflow_selected_errors(tmp_path): + """A Run-flow node with no sub-flow reference fails clearly.""" + graph = _make_graph(7003, execution_location="local") + graph.add_node_promise(input_schema.NodePromise(flow_id=graph.flow_id, node_id=1, node_type="manual_input")) + graph.add_manual_input( + input_schema.NodeManualInput( + flow_id=graph.flow_id, + node_id=1, + raw_data_format=input_schema.RawData.from_pylist([{"ticker": "AAPL"}]), + ) + ) + graph.add_node_promise(input_schema.NodePromise(flow_id=graph.flow_id, node_id=2, node_type="run_flow")) + graph.add_run_flow( + input_schema.NodeRunFlow( + flow_id=graph.flow_id, + node_id=2, + depending_on_id=1, + parameter_mappings=[input_schema.ParameterMapping(param_name="x", input_column="ticker")], + ) + ) + add_connection(graph, input_schema.NodeConnection.create_from_simple_input(1, 2)) + + result = graph.run_graph() + node2 = next(r for r in result.node_step_result if r.node_id == 2) + assert node2.success is False + assert "sub-flow" in (node2.error or "").lower() + + +def _build_and_save_filter_subflow(path, flow_id: int = 7401) -> None: + """manual_input -> polars_code filtering on ${x} -> api_response. + + Only the row matching ``${x}`` survives, so an unmatched value yields 0 rows. + """ + handler = FlowfileHandler() + handler.register_flow( + schemas.FlowSettings( + flow_id=flow_id, + name="filter_subflow", + path=str(path), + execution_mode="Development", + execution_location="local", + ) + ) + graph = handler.get_flow(flow_id) + graph.flow_settings.parameters = [FlowParameter(name="x", default_value="")] + + graph.add_node_promise(input_schema.NodePromise(flow_id=flow_id, node_id=1, node_type="manual_input")) + graph.add_manual_input( + input_schema.NodeManualInput( + flow_id=flow_id, + node_id=1, + raw_data_format=input_schema.RawData.from_pylist([{"sym": "AAA"}]), + ) + ) + graph.add_node_promise(input_schema.NodePromise(flow_id=flow_id, node_id=2, node_type="polars_code")) + graph.add_polars_code( + input_schema.NodePolarsCode( + flow_id=flow_id, + node_id=2, + polars_code_input=transform_schema.PolarsCodeInput( + polars_code="output_df = input_df.filter(pl.col('sym') == '${x}')" + ), + depending_on_ids=[1], + ) + ) + add_connection(graph, input_schema.NodeConnection.create_from_simple_input(1, 2)) + graph.add_node_promise(input_schema.NodePromise(flow_id=flow_id, node_id=3, node_type="api_response")) + graph.add_api_response(input_schema.NodeApiResponse(flow_id=flow_id, node_id=3, depending_on_id=2)) + add_connection(graph, input_schema.NodeConnection.create_from_simple_input(2, 3)) + graph.save_flow(str(path)) + + +def test_run_flow_empty_rows_tolerated(execution_location, tmp_path): + """A sub-flow run that yields 0 rows for an input contributes nothing (no failure).""" + sub_path = tmp_path / "filter.yaml" + _build_and_save_filter_subflow(sub_path) + + graph = _make_graph(7400, execution_location=execution_location) + graph.add_node_promise(input_schema.NodePromise(flow_id=graph.flow_id, node_id=1, node_type="manual_input")) + graph.add_manual_input( + input_schema.NodeManualInput( + flow_id=graph.flow_id, + node_id=1, + # "AAA" matches (1 row); "ZZZ" matches nothing (0 rows). + raw_data_format=input_schema.RawData.from_pylist([{"ticker": "AAA"}, {"ticker": "ZZZ"}]), + ) + ) + graph.add_node_promise(input_schema.NodePromise(flow_id=graph.flow_id, node_id=2, node_type="run_flow")) + graph.add_run_flow( + input_schema.NodeRunFlow( + flow_id=graph.flow_id, + node_id=2, + depending_on_id=1, + flow_reference=str(sub_path), + parameter_mappings=[input_schema.ParameterMapping(param_name="x", input_column="ticker")], + ) + ) + add_connection(graph, input_schema.NodeConnection.create_from_simple_input(1, 2)) + + result = _run(graph) + assert result.success, "\n".join(f"node {r.node_id}: {r.error}" for r in result.node_step_result if not r.success) + + out = graph.get_node(2).get_resulting_data().data_frame + df = out.collect() if isinstance(out, pl.LazyFrame) else out + # Only the matching input contributes a row; the empty run is silently dropped. + assert df.height == 1 + assert df["__param_value__"].to_list() == ["AAA"] + + +def _build_and_save_collision_subflow(path, flow_id: int = 7501) -> None: + """A sub-flow whose api_response already emits a ``__param_value__`` column. + + The column comes straight from a data source (manual_input) rather than from + polars_code, which forbids ``__`` dunder patterns. Used to verify the Run-flow + node refuses to overwrite it with the parameter-value column. + """ + handler = FlowfileHandler() + handler.register_flow( + schemas.FlowSettings( + flow_id=flow_id, + name="collision_subflow", + path=str(path), + execution_mode="Development", + execution_location="local", + ) + ) + graph = handler.get_flow(flow_id) + graph.flow_settings.parameters = [FlowParameter(name="x", default_value="")] + + graph.add_node_promise(input_schema.NodePromise(flow_id=flow_id, node_id=1, node_type="manual_input")) + graph.add_manual_input( + input_schema.NodeManualInput( + flow_id=flow_id, + node_id=1, + raw_data_format=input_schema.RawData.from_pylist([{"__param_value__": "preexisting"}]), + ) + ) + graph.add_node_promise(input_schema.NodePromise(flow_id=flow_id, node_id=2, node_type="api_response")) + graph.add_api_response(input_schema.NodeApiResponse(flow_id=flow_id, node_id=2, depending_on_id=1)) + add_connection(graph, input_schema.NodeConnection.create_from_simple_input(1, 2)) + graph.save_flow(str(path)) + + +def test_run_flow_output_column_collision_errors(tmp_path): + """A sub-flow that already emits the param output column fails with a clear error.""" + sub_path = tmp_path / "collision.yaml" + _build_and_save_collision_subflow(sub_path) + + graph = _make_graph(7500, execution_location="local") + graph.add_node_promise(input_schema.NodePromise(flow_id=graph.flow_id, node_id=1, node_type="manual_input")) + graph.add_manual_input( + input_schema.NodeManualInput( + flow_id=graph.flow_id, + node_id=1, + raw_data_format=input_schema.RawData.from_pylist([{"ticker": "AAA"}]), + ) + ) + graph.add_node_promise(input_schema.NodePromise(flow_id=graph.flow_id, node_id=2, node_type="run_flow")) + graph.add_run_flow( + input_schema.NodeRunFlow( + flow_id=graph.flow_id, + node_id=2, + depending_on_id=1, + flow_reference=str(sub_path), + parameter_mappings=[input_schema.ParameterMapping(param_name="x", input_column="ticker")], + ) + ) + add_connection(graph, input_schema.NodeConnection.create_from_simple_input(1, 2)) + + result = graph.run_graph() + node2 = next(r for r in result.node_step_result if r.node_id == 2) + assert node2.success is False + assert "collide" in (node2.error or "").lower() + + +def test_run_flow_empty_input_returns_predicted_schema(tmp_path): + """Zero input rows yields an empty result that still advertises the output columns.""" + sub_path = tmp_path / "echo.yaml" + _build_and_save_echo_subflow(sub_path) + + graph = _make_graph(7600, execution_location="local") + graph.add_node_promise(input_schema.NodePromise(flow_id=graph.flow_id, node_id=1, node_type="manual_input")) + graph.add_manual_input( + input_schema.NodeManualInput( + flow_id=graph.flow_id, + node_id=1, + raw_data_format=input_schema.RawData.from_pylist([{"ticker": "AAA"}]), + ) + ) + # Empty the input (keep its schema) before the Run-flow node. + graph.add_node_promise(input_schema.NodePromise(flow_id=graph.flow_id, node_id=2, node_type="polars_code")) + graph.add_polars_code( + input_schema.NodePolarsCode( + flow_id=graph.flow_id, + node_id=2, + polars_code_input=transform_schema.PolarsCodeInput(polars_code="output_df = input_df.head(0)"), + depending_on_ids=[1], + ) + ) + add_connection(graph, input_schema.NodeConnection.create_from_simple_input(1, 2)) + + graph.add_node_promise(input_schema.NodePromise(flow_id=graph.flow_id, node_id=3, node_type="run_flow")) + graph.add_run_flow( + input_schema.NodeRunFlow( + flow_id=graph.flow_id, + node_id=3, + depending_on_id=2, + flow_reference=str(sub_path), + parameter_mappings=[input_schema.ParameterMapping(param_name="x", input_column="ticker")], + ) + ) + add_connection(graph, input_schema.NodeConnection.create_from_simple_input(2, 3)) + + result = _run(graph) + assert result.success, "\n".join(f"node {r.node_id}: {r.error}" for r in result.node_step_result if not r.success) + + out = graph.get_node(3).get_resulting_data().data_frame + df = out.collect() if isinstance(out, pl.LazyFrame) else out + assert df.height == 0 + assert set(df.columns) == {"echo", "__param_value__"} diff --git a/flowfile_frame/flowfile_frame/flow_frame.py b/flowfile_frame/flowfile_frame/flow_frame.py index 50e4131d1..d3456e0a2 100644 --- a/flowfile_frame/flowfile_frame/flow_frame.py +++ b/flowfile_frame/flowfile_frame/flow_frame.py @@ -417,6 +417,65 @@ def _create_child_frame(self, new_node_id, *, precomputed_result=None): except AttributeError: raise ValueError("Could not execute the function") from None + def run_flow( + self, + flow_reference: str | None = None, + *, + flow_registration_id: int | None = None, + parameter_mappings: dict[str, str] | list[Any] | None = None, + delay_seconds: float = 0.0, + max_rows: int = 1000, + description: str | None = None, + ) -> FlowFrame: + """Run a saved sub-flow once per input row, mapping input columns to its ${parameters}. + + For each input row the mapped column values are injected into the sub-flow's + ``${param}`` references; the sub-flow runs (sequentially, with an optional + ``delay_seconds`` between runs) and its single ``api_response`` node's output is + captured. Per-row outputs are unioned and a ``__param_value__`` / + ``__param___`` column records the value(s) used. + + Args: + flow_reference: Filesystem path to the saved sub-flow (``.flowfile`` / + ``.yaml`` / ``.json``). Canonical reference. + flow_registration_id: Catalog registration id of the sub-flow; resolved to a + path at run time when ``flow_reference`` is not given. + parameter_mappings: Maps the sub-flow's ${parameters} to input columns — + either a dict ``{"ticker": "symbol"}`` (param -> column) or a list of + ``{"param_name": ..., "input_column": ...}`` dicts. + delay_seconds: Optional delay between per-row runs (rate-limit friendly). + max_rows: Cap on the number of input rows processed. + description: Optional node description. + + Returns: + FlowFrame: A FlowFrame backed by the Run-flow node's unioned output. + """ + from flowfile_core.schemas.input_schema import NodeRunFlow, ParameterMapping + from flowfile_frame.rest_api import get_current_user_id + + if isinstance(parameter_mappings, dict): + mappings = [ParameterMapping(param_name=str(k), input_column=str(v)) for k, v in parameter_mappings.items()] + elif parameter_mappings: + mappings = [m if isinstance(m, ParameterMapping) else ParameterMapping(**m) for m in parameter_mappings] + else: + mappings = [] + + new_node_id = generate_node_id() + settings = NodeRunFlow( + flow_id=self.flow_graph.flow_id, + node_id=new_node_id, + user_id=get_current_user_id(), + depending_on_id=self.node_id, + description=description, + flow_reference=flow_reference, + flow_registration_id=flow_registration_id, + parameter_mappings=mappings, + delay_seconds=delay_seconds, + max_rows=max_rows, + ) + self.flow_graph.add_run_flow(settings) + return self._create_child_frame(new_node_id) + @staticmethod def _generate_sort_polars_code( pure_sort_expr_strs: list[str], diff --git a/flowfile_frame/flowfile_frame/flow_frame.pyi b/flowfile_frame/flowfile_frame/flow_frame.pyi index c16c9511e..0469d321f 100644 --- a/flowfile_frame/flowfile_frame/flow_frame.pyi +++ b/flowfile_frame/flowfile_frame/flow_frame.pyi @@ -331,6 +331,9 @@ class FlowFrame: # Create rolling groups based on a temporal or integer column. def rolling(self, index_column: IntoExpr, period: str | timedelta, offset: str | timedelta | None = None, closed: ClosedInterval = 'right', group_by: IntoExpr | Iterable[IntoExpr] | None = None, description: Optional[str] = None) -> LazyGroupBy: ... + # Run a saved sub-flow once per input row, mapping input columns to its ${parameters}. + def run_flow(self, flow_reference: str | None = None, flow_registration_id: int | None = None, parameter_mappings: dict[str, str] | list[Any] | None = None, delay_seconds: float = 0.0, max_rows: int = 1000, description: str | None = None) -> 'FlowFrame': ... + # Save the graph def save_graph(self, file_path: str, auto_arrange: bool = True, description: Optional[str] = None) -> Any: ... diff --git a/flowfile_frame/tests/test_run_flow.py b/flowfile_frame/tests/test_run_flow.py new file mode 100644 index 000000000..3714a2bb9 --- /dev/null +++ b/flowfile_frame/tests/test_run_flow.py @@ -0,0 +1,60 @@ +"""Tests for FlowFrame.run_flow — running a saved sub-flow once per input row.""" + +from flowfile_frame.flow_frame_methods import from_dict + + +def _build_and_save_echo_subflow(path) -> None: + """manual_input -> polars_code echoing ${x} -> api_response, saved to *path*.""" + from flowfile_core.flowfile.flow_graph import add_connection + from flowfile_core.flowfile.handler import FlowfileHandler + from flowfile_core.schemas import input_schema, schemas, transform_schema + from flowfile_core.schemas.schemas import FlowParameter + + handler = FlowfileHandler() + handler.register_flow( + schemas.FlowSettings( + flow_id=8101, + name="echo_subflow", + path=str(path), + execution_mode="Development", + execution_location="local", + ) + ) + graph = handler.get_flow(8101) + graph.flow_settings.parameters = [FlowParameter(name="x", default_value="")] + + graph.add_node_promise(input_schema.NodePromise(flow_id=8101, node_id=1, node_type="manual_input")) + graph.add_manual_input( + input_schema.NodeManualInput( + flow_id=8101, + node_id=1, + raw_data_format=input_schema.RawData.from_pylist([{"seed": 1}]), + ) + ) + graph.add_node_promise(input_schema.NodePromise(flow_id=8101, node_id=2, node_type="polars_code")) + graph.add_polars_code( + input_schema.NodePolarsCode( + flow_id=8101, + node_id=2, + polars_code_input=transform_schema.PolarsCodeInput( + polars_code="output_df = input_df.with_columns(pl.lit('${x}').alias('echo')).select('echo')" + ), + depending_on_ids=[1], + ) + ) + add_connection(graph, input_schema.NodeConnection.create_from_simple_input(1, 2)) + graph.add_node_promise(input_schema.NodePromise(flow_id=8101, node_id=3, node_type="api_response")) + graph.add_api_response(input_schema.NodeApiResponse(flow_id=8101, node_id=3, depending_on_id=2)) + add_connection(graph, input_schema.NodeConnection.create_from_simple_input(2, 3)) + graph.save_flow(str(path)) + + +def test_run_flow_per_row(tmp_path): + sub_path = tmp_path / "echo.yaml" + _build_and_save_echo_subflow(sub_path) + + frame = from_dict({"ticker": ["AAPL", "MSFT"]}) + out = frame.run_flow(str(sub_path), parameter_mappings={"x": "ticker"}).collect().sort("__param_value__") + + assert out["echo"].to_list() == ["AAPL", "MSFT"] + assert out["__param_value__"].to_list() == ["AAPL", "MSFT"] diff --git a/flowfile_frontend/src/renderer/app/api/flowApi.api.ts b/flowfile_frontend/src/renderer/app/api/flowApi.api.ts index 6299591e3..c7a11e816 100644 --- a/flowfile_frontend/src/renderer/app/api/flowApi.api.ts +++ b/flowfile_frontend/src/renderer/app/api/flowApi.api.ts @@ -37,6 +37,15 @@ export interface PublishableFlow { file_exists: boolean; } +// An API-compatible flow that can be run as a sub-flow, for the Run-flow node's picker. +// Carries the flow path (stored as the node's flow_reference); includes published flows. +export interface RunnableFlow { + registration_id: number; + name: string; + flow_path: string; + file_exists: boolean; +} + export interface ApiTestResult { data: Record[] | Record; row_count: number; @@ -103,6 +112,11 @@ export class FlowApiApi { return res.data; } + static async listRunnableFlows(): Promise { + const res = await axios.get("/flow-api/runnable-flows"); + return res.data; + } + static async getFlowParameters(registrationId: number): Promise { const res = await axios.get(`/flow-api/flows/${registrationId}/parameters`); return res.data; diff --git a/flowfile_frontend/src/renderer/app/components/nodes/node-types/elements/restApiReader/RestApiReader.vue b/flowfile_frontend/src/renderer/app/components/nodes/node-types/elements/restApiReader/RestApiReader.vue index 94f6566fe..b7a2fc467 100644 --- a/flowfile_frontend/src/renderer/app/components/nodes/node-types/elements/restApiReader/RestApiReader.vue +++ b/flowfile_frontend/src/renderer/app/components/nodes/node-types/elements/restApiReader/RestApiReader.vue @@ -25,6 +25,13 @@ class="form-control" placeholder="https://api.example.com/v1/items" /> +
+ + Supports ${param} references to flow parameters (resolved at run time; + Fetch sample uses each parameter's default value). +
diff --git a/flowfile_frontend/src/renderer/app/components/nodes/node-types/elements/runFlow/RunFlow.vue b/flowfile_frontend/src/renderer/app/components/nodes/node-types/elements/runFlow/RunFlow.vue new file mode 100644 index 000000000..990eabb13 --- /dev/null +++ b/flowfile_frontend/src/renderer/app/components/nodes/node-types/elements/runFlow/RunFlow.vue @@ -0,0 +1,243 @@ + + + + + diff --git a/flowfile_frontend/src/renderer/app/components/nodes/node-types/elements/runFlow/utils.ts b/flowfile_frontend/src/renderer/app/components/nodes/node-types/elements/runFlow/utils.ts new file mode 100644 index 000000000..0dbcf9750 --- /dev/null +++ b/flowfile_frontend/src/renderer/app/components/nodes/node-types/elements/runFlow/utils.ts @@ -0,0 +1,16 @@ +import type { NodeRunFlow } from "../../../../../types/node.types"; + +export const createNodeRunFlow = (flowId: number, nodeId: number): NodeRunFlow => { + return { + flow_id: flowId, + node_id: nodeId, + pos_x: 0, + pos_y: 0, + cache_results: false, + flow_reference: null, + flow_registration_id: null, + parameter_mappings: [], + delay_seconds: 0, + max_rows: 1000, + }; +}; diff --git a/flowfile_frontend/src/renderer/app/features/designer/assets/icons/run_flow.svg b/flowfile_frontend/src/renderer/app/features/designer/assets/icons/run_flow.svg new file mode 100644 index 000000000..627e8a041 --- /dev/null +++ b/flowfile_frontend/src/renderer/app/features/designer/assets/icons/run_flow.svg @@ -0,0 +1,7 @@ + + + + + + + diff --git a/flowfile_frontend/src/renderer/app/features/designer/utils.ts b/flowfile_frontend/src/renderer/app/features/designer/utils.ts index d951a28ad..d5ff10eb9 100644 --- a/flowfile_frontend/src/renderer/app/features/designer/utils.ts +++ b/flowfile_frontend/src/renderer/app/features/designer/utils.ts @@ -49,6 +49,7 @@ const BUILTIN_ICONS = new Set([ "union.svg", "unique.svg", "unpivot.svg", + "run_flow.svg", ]); const DEFAULT_ICON = "user-defined-icon.png"; diff --git a/flowfile_frontend/src/renderer/app/types/node.types.ts b/flowfile_frontend/src/renderer/app/types/node.types.ts index e09216416..b22c47f74 100644 --- a/flowfile_frontend/src/renderer/app/types/node.types.ts +++ b/flowfile_frontend/src/renderer/app/types/node.types.ts @@ -1079,6 +1079,20 @@ export interface NodeRestApiReader extends NodeBase { fields?: MinimalFieldInput[] | null; } +// Run-flow node: run a saved sub-flow once per input row, mapping columns to its ${params}. +export interface ParameterMapping { + param_name: string; + input_column: string; +} + +export interface NodeRunFlow extends NodeSingleInput { + flow_reference: string | null; + flow_registration_id: number | null; + parameter_mappings: ParameterMapping[]; + delay_seconds: number; + max_rows: number; +} + // ML Nodes export type MLParamType = "boolean" | "number" | "integer" | "select";