Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions flowfile_core/flowfile_core/ai/tools/classification.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
"kafka_source": "source",
"google_analytics_reader": "source",
"rest_api_reader": "source",
"run_flow": "static",
"external_source": "source",
"promise": "passthrough",
"user_defined": "dynamic",
Expand Down
21 changes: 21 additions & 0 deletions flowfile_core/flowfile_core/ai/tools/node_docs.py
Original file line number Diff line number Diff line change
Expand Up @@ -545,6 +545,14 @@ def sidebar_section_for(node_type: str) -> str:
"node — its output is the unmodified primary input. Often used to enforce "
"side-effect ordering (write A before reading B)."
),
"run_flow": (
"Run a saved sub-flow once per input row, mapping input columns to the "
"sub-flow's ${parameters}; the per-row outputs are unioned with a "
"__param_value__ column. Use to fan a pipeline (e.g. a REST API call) "
"across a list of inputs like tickers or ids. The sub-flow must have "
"exactly one API Response node defining its output. Don't use for a single "
"static call (build the flow directly) or when no per-row iteration is needed."
),
}


Expand Down Expand Up @@ -1034,6 +1042,19 @@ def sidebar_section_for(node_type: str) -> str:
"Pitfall: this is for ordering side effects, not data — "
"input-1's data is discarded; only its completion is observed."
),
"run_flow": (
"Settings panel: pick a saved flow (it must have one API Response node), "
"then map each of its ${parameters} to an input column, and optionally set "
"a delay between runs and a max-rows cap. Worked example: 'call Finnhub for "
"every ticker in my table' → drag 'Run Flow' from Combine Operations, select "
"your quote sub-flow whose REST API url is "
"https://finnhub.io/api/v1/quote?symbol=${ticker}, map its 'ticker' parameter "
"to your 'symbol' column, set a 1-second delay to respect rate limits, and "
"run — you get one output row per input row plus a __param_value__ column. "
"Pitfall: rows run sequentially (no parallelism yet) and inputs beyond max "
"rows are skipped, so raise max rows for large inputs and expect longer "
"runtimes when a delay is set."
),
}


Expand Down
13 changes: 13 additions & 0 deletions flowfile_core/flowfile_core/configs/node_store/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -680,6 +680,19 @@ def get_all_standard_nodes() -> tuple[list[NodeTemplate], dict[str, NodeTemplate
laziness="eager",
tags=[NodeTag.ML, NodeTag.MACHINE_LEARNING, NodeTag.EVALUATE, NodeTag.METRICS, NodeTag.MODEL],
),
NodeTemplate(
name="Run flow",
item="run_flow",
input=1,
output=1,
transform_type="other",
node_type="process",
image="run_flow.svg",
node_group="combine",
drawer_title="Run Flow",
drawer_intro="Run a saved sub-flow once per input row, mapping columns to its parameters",
laziness="eager",
),
NodeTemplate(
name="Wait For",
item="wait_for",
Expand Down
279 changes: 278 additions & 1 deletion flowfile_core/flowfile_core/flowfile/flow_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from functools import partial
from importlib.metadata import PackageNotFoundError, version
from pathlib import Path
from time import time
from time import sleep, time
from typing import Any, Literal, NamedTuple, Union
from uuid import uuid1

Expand Down Expand Up @@ -83,6 +83,7 @@
find_unresolved_in_model,
restore_parameters,
)
from flowfile_core.flowfile.run_flow_guard import guard_sub_flow
from flowfile_core.flowfile.schema_callbacks import (
calculate_cross_join_schema,
calculate_fuzzy_match_schema,
Expand Down Expand Up @@ -4246,6 +4247,282 @@ def schema_callback() -> list[FlowfileColumn]:
self.add_node_to_starting_list(node)
self._node_ids.append(node_rest_api_reader.node_id)

@with_history_capture(HistoryActionType.UPDATE_SETTINGS)
def add_run_flow(self, node_run_flow: input_schema.NodeRunFlow) -> None:
"""Adds a node that runs a saved sub-flow once per input row.

For each input row, the mapped input-column values are injected into the
sub-flow's ``${param}`` references; the sub-flow runs (sequentially, with an
optional inter-run delay) and its single ``api_response`` node's output is
captured. Per-row outputs are unioned (diagonal concat) and a
``__param_value__`` / ``__param_<name>__`` column records the value used.

Each per-row result is materialised on the worker under a unique ref and
concatenated lazily, so the core never holds full datasets. v1 runs rows
sequentially because sub-flow runs share flow_id-keyed scratch dirs.
"""

def _func(fl: FlowDataEngine) -> FlowDataEngine:
return self._execute_run_flow_node(node_run_flow, fl)

def schema_callback() -> list[FlowfileColumn]:
return self._predict_run_flow_schema(node_run_flow)

self.add_node_step(
node_id=node_run_flow.node_id,
function=_func,
node_type="run_flow",
setting_input=node_run_flow,
schema_callback=schema_callback,
input_node_ids=[node_run_flow.depending_on_id],
)

@staticmethod
def _run_flow_param_specs(
settings: input_schema.NodeRunFlow,
) -> list[tuple[str, str, str]]:
"""Active (param_name, input_column, output_column) tuples for the mappings.

The output column is ``__param_value__`` for a single mapping, else one
``__param_<name>__`` per mapped parameter.
"""
mappings = [m for m in settings.parameter_mappings if m.param_name and m.input_column]
single = len(mappings) == 1
return [
(m.param_name, m.input_column, "__param_value__" if single else f"__param_{m.param_name}__")
for m in mappings
]

@staticmethod
def _resolve_run_flow_path(settings: input_schema.NodeRunFlow) -> str | None:
"""Resolve the sub-flow's filesystem path (own ``flow_reference``, else registration)."""
if settings.flow_reference:
return settings.flow_reference
if settings.flow_registration_id is not None:
from flowfile_core.flowfile.catalog_helpers import find_registration_by_registration_id

snap = find_registration_by_registration_id(settings.flow_registration_id)
if snap is not None:
return snap.flow_path
return None

@staticmethod
def _assert_no_run_flow_collisions(column_names: list[str], specs: list[tuple[str, str, str]]) -> None:
"""Refuse a param output column that clashes with the sub-flow's own output.

Without this, ``with_columns(...).alias("__param_value__")`` would silently
overwrite a same-named column the sub-flow already produces.
"""
existing = set(column_names)
for _, _, out_col in specs:
if out_col in existing:
raise ValueError(
f"Run-flow sub-flow already emits a column named '{out_col}', which would collide "
"with the parameter-value column. Rename that column in the sub-flow."
)

def _predict_run_flow_schema(self, settings: input_schema.NodeRunFlow) -> list[FlowfileColumn]:
"""Predict output columns: the sub-flow's api_response schema plus the param column(s).

Never returns empty — an empty schema_callback makes the engine fall back to
running ``_func`` during prediction, which would execute the sub-flow.
"""
specs = self._run_flow_param_specs(settings)
sub_cols: list[FlowfileColumn] = []
flow_path = self._resolve_run_flow_path(settings)
if flow_path:
from flowfile_core.flowfile.manage.io_flowfile import open_flow

# The guard also breaks self-referential flows: predicting the sub-flow's
# api_response schema traverses back into the nested run_flow node, which
# would otherwise open the same flow forever.
try:
with guard_sub_flow(str(Path(flow_path).resolve())):
sub_flow = open_flow(Path(flow_path), user_id=settings.user_id)
api_nodes = [n for n in sub_flow.nodes if n.node_type == "api_response"]
if len(api_nodes) == 1:
predicted = api_nodes[0].get_predicted_schema()
sub_cols = list(predicted) if predicted else []
except Exception as e: # noqa: BLE001 - incl. recursion; prediction is best-effort
logger.warning(f"Run-flow schema prediction failed: {e}")
existing = {c.name for c in sub_cols}
param_cols = [
FlowfileColumn.from_input(out_col, "String") for (_, _, out_col) in specs if out_col not in existing
]
schema = sub_cols + param_cols
if not schema:
schema = [FlowfileColumn.from_input("__param_value__", "String")]
return schema

def _materialize_run_flow_row(self, lf: pl.LazyFrame, node_id: int | str, index: int) -> pl.LazyFrame:
"""Materialise one row's sub-flow output to a stable, uniquely-named location.

Sequential sub-flow runs reuse the sub-flow's flow_id-keyed Arrow IPC output
path, so a lazy reference captured this iteration would be overwritten by the
next run. Offloading to the worker with a per-iteration ``file_ref`` writes a
distinct file and returns a lazy scan over it (core never collects). Falls
back to an in-core collect only in local mode or when no worker is reachable.
"""
if self.execution_location == "local":
return lf.collect().lazy()
try:
fetcher = ExternalDfFetcher(
lf=lf,
file_ref=f"__runflow_{node_id}_{index}",
flow_id=self.flow_id,
node_id=node_id,
wait_on_completion=True,
)
except Exception as exc: # noqa: BLE001 - worker unreachable; degrade to in-core
logger.warning("Run-flow worker offload unavailable (%s); materializing row %s in core", exc, index)
return lf.collect().lazy()
if fetcher.has_error:
if fetcher.error_code == -1:
logger.warning("Run-flow worker materialize was killed; materializing row %s in core", index)
return lf.collect().lazy()
raise ValueError(fetcher.error_description or "Run-flow worker materialize failed")
return fetcher.get_result()

def _load_run_flow_subflow(self, flow_path: str, user_id: int | None) -> tuple["FlowGraph", FlowNode]:
"""Open the sub-flow and validate its single-``api_response`` output contract.

Returns ``(sub_flow, api_node)``; raises ``ValueError`` on zero or multiple
api_response nodes. Sets the sub-flow to run eagerly at this graph's location.
"""
from flowfile_core.flowfile.manage.io_flowfile import open_flow

sub_flow = open_flow(Path(flow_path), user_id=user_id)
api_nodes = [n for n in sub_flow.nodes if n.node_type == "api_response"]
if len(api_nodes) == 0:
raise ValueError("Run-flow sub-flow has no API Response node to define its output.")
if len(api_nodes) > 1:
raise ValueError("Run-flow sub-flow has more than one API Response node.")
sub_flow.flow_settings.execution_mode = "Performance"
sub_flow.flow_settings.execution_location = self.execution_location
return sub_flow, api_nodes[0]

@staticmethod
def _build_run_flow_driver(
input_lf: pl.LazyFrame, specs: list[tuple[str, str, str]], max_rows: int
) -> tuple[pl.DataFrame, bool]:
"""Build the bounded driver frame for iteration.

Selects only the mapped columns (or a synthetic row index when nothing is
mapped) and collects ``head(max_rows + 1)`` so overflow is detected without
materialising the whole input. Returns ``(driver_df capped to max_rows, overflow)``.
"""
mapped_cols = list(dict.fromkeys(input_col for (_, input_col, _) in specs))
if mapped_cols:
driver_df = input_lf.select(mapped_cols).head(max_rows + 1).collect()
else:
total = input_lf.select(pl.len()).collect().item()
driver_df = pl.DataFrame({"__row__": range(min(total, max_rows + 1))})
overflow = driver_df.height > max_rows
if overflow:
driver_df = driver_df.head(max_rows)
return driver_df, overflow

@staticmethod
def _raise_run_flow_no_data(sub_flow: "FlowGraph", api_node: FlowNode, run_info) -> None:
"""Raise a clear error when the sub-flow ran but its api_response produced no data."""
from flowfile_core.flowfile.api_runner import _first_error

unconfigured = [n.node_id for n in sub_flow.nodes if not n.is_setup]
if unconfigured:
raise ValueError(
f"Run-flow sub-flow has unconfigured node(s) {sorted(unconfigured)} that were "
"saved without settings. Open the sub-flow, finish configuring/connecting every "
"node, run it once to confirm it produces data, then re-save it."
)
detail = api_node.results.errors or _first_error(run_info)
raise ValueError(
"Run-flow sub-flow API Response node produced no data"
+ (
f": {detail}"
if detail
else ". Check that the API Response node is connected to the branch whose "
"data you want returned, and that the sub-flow runs and produces rows on its own."
)
)

def _run_subflow_for_row(
self,
sub_flow: "FlowGraph",
api_node: FlowNode,
row: dict,
specs: list[tuple[str, str, str]],
params_by_name: dict,
node_id: int | str,
index: int,
) -> pl.LazyFrame:
"""Inject this row's params, run the sub-flow once, annotate and materialise its output.

Returns a lazy frame (sub-flow output plus the param column(s)), possibly with
zero rows; the engine re-runs the param-referencing nodes because the resolved
``${param}`` value changes the node hash, so there is no cross-row data bleed.
"""
from flowfile_core.flowfile.api_runner import _first_error, _flow_run_lock

for param_name, input_col, _ in specs:
param = params_by_name.get(param_name)
if param is not None:
value = row.get(input_col)
param.default_value = "" if value is None else str(value)
with _flow_run_lock(sub_flow.flow_id):
run_info = sub_flow.run_graph()
if run_info is None or not run_info.success:
raise ValueError(_first_error(run_info) or f"Run-flow sub-flow failed on row {index}")
data = api_node.get_resulting_data()
if data is None:
self._raise_run_flow_no_data(sub_flow, api_node, run_info)
row_lf = data.data_frame
if isinstance(row_lf, pl.DataFrame):
row_lf = row_lf.lazy()
if index == 0:
self._assert_no_run_flow_collisions(row_lf.collect_schema().names(), specs)
for _param_name, input_col, out_col in specs:
value = row.get(input_col)
row_lf = row_lf.with_columns(pl.lit("" if value is None else str(value)).alias(out_col))
return self._materialize_run_flow_row(row_lf, node_id, index)

def _execute_run_flow_node(self, settings: input_schema.NodeRunFlow, fl: FlowDataEngine) -> FlowDataEngine:
"""Run the sub-flow once per input row and return the unioned, lazily-built result."""
flow_path = self._resolve_run_flow_path(settings)
if not flow_path:
raise ValueError("Run-flow node has no sub-flow selected.")

specs = self._run_flow_param_specs(settings)
max_rows = max(0, settings.max_rows)

input_lf = fl.data_frame.lazy() if isinstance(fl.data_frame, pl.DataFrame) else fl.data_frame
driver_df, overflow = self._build_run_flow_driver(input_lf, specs, max_rows)
if overflow:
self.flow_logger.warning(
f"Run-flow node {settings.node_id}: input exceeds max_rows={max_rows}; "
f"running the first {max_rows} rows and skipping the rest."
)

sub_flow, api_node = self._load_run_flow_subflow(flow_path, settings.user_id)
params_by_name = {p.name: p for p in sub_flow.flow_settings.parameters}

results: list[pl.LazyFrame] = []
n_iter = driver_df.height
# v1 is sequential: parallelism would need a per-instance flow_id (the sub-flow
# shares its saved flow_id across runs, keying _flow_run_lock and worker dirs).
with guard_sub_flow(str(Path(flow_path).resolve())):
for i, row in enumerate(driver_df.iter_rows(named=True)):
results.append(
self._run_subflow_for_row(sub_flow, api_node, row, specs, params_by_name, settings.node_id, i)
)
if n_iter > 1 and ((i + 1) % 25 == 0 or i + 1 == n_iter):
self.flow_logger.info(f"Run-flow node {settings.node_id}: completed {i + 1}/{n_iter} rows")
if settings.delay_seconds and i < n_iter - 1:
sleep(settings.delay_seconds)

if not results:
return FlowDataEngine.create_from_schema(self._predict_run_flow_schema(settings))
return FlowDataEngine(pl.concat(results, how="diagonal_relaxed"))

@with_history_capture(HistoryActionType.UPDATE_SETTINGS)
def add_cloud_storage_writer(self, node_cloud_storage_writer: input_schema.NodeCloudStorageWriter) -> None:
"""Adds a node to write data to a cloud storage provider.
Expand Down
Loading
Loading