Skip to content
Open
4 changes: 3 additions & 1 deletion flowfile_core/CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@ Central FastAPI backend and DAG execution engine for Flowfile: manages flows as
- `flowfile_core/main.py` — FastAPI app, lifespan (scheduler/kernel/local-model shutdown), CORS (Tauri origin regex + explicit dev/Docker origins), all router mounts, `--run-flow` CLI.
- `flowfile_core/routes/` — REST routers: `routes.py` (editor/transform, JWT-gated), `flow_api.py` (`data_router` API-key data + `management_router` JWT), `auth.py`, `secrets.py`, `catalog.py`, `cloud_connections.py`, `ga_connections.py`, `kafka.py`, `file_manager.py`, `api_consumers.py`, `user_defined_components.py`, `logs.py`, `public.py`. (More routers live under `ai/`, `kernel/`, `artifacts/`, `ml/`.)
- `flowfile_core/flowfile/flow_graph.py` — DAG execution engine (`FlowGraph`, node add/run, worker offload). `flowfile/handler.py` — `FlowfileHandler` in-memory flow registry.
- `flowfile_core/flowfile/execution/` — compute-location seam: `transport.py` (`WorkerTransport`, sole owner of worker URLs/HTTP/WS), `exceptions.py` (typed `WorkerConnectionError` etc.), `handles.py` (`TaskHandle` protocol), `backends/` (`ExecutionBackend` ABC + `LocalBackend`/`RemoteWorkerBackend`, `resolve_backend(location)`). Route new local-vs-remote variance through a backend method, never an inline `execution_location` branch (ratchet test enforces this).
- `flowfile_core/flowfile/node_registry/` — **single source of truth for built-in node types**: one `NodeSpec` per type (`builtin/*.py`) bundling the `NodeTemplate`, settings class, defaults flag, AI classification, and (for simple nodes) the compute factory used by `FlowGraph._add_from_spec`. The legacy catalogs (`get_all_standard_nodes`, `NODE_TYPE_TO_SETTINGS_CLASS`, `nodes_with_defaults`, `_NODE_CLASS_MAP`) are derived views; contract tests in `tests/flowfile/execution/test_node_registry.py` pin them.
- `flowfile_core/flowfile/flow_data_engine/flow_data_engine.py` — per-node Polars compute wrapper (lazy frames, previews; `join/`, `fuzzy_matching/`, `subprocess_operations/` subdirs).
- `flowfile_core/flowfile/sources/external_sources/` — SQL / REST API / Google Analytics / custom source connectors (`factory.py`).
- `flowfile_core/configs/node_store/nodes.py` — node template/default registry (`get_all_standard_nodes`).
- `flowfile_core/configs/node_store/nodes.py` — legacy node template/default accessors (`get_all_standard_nodes`), now derived from `flowfile/node_registry`.
- `flowfile_core/schemas/input_schema.py` — Pydantic node-config models (~90 classes); other request/response schemas alongside.
- `flowfile_core/ai/` — AI subsystem (see patterns); routers under `ai/*_routes.py`, plus `agents/`, `providers/`, `tools/` (incl. `tools/executor/`), `local_model/`, `context/`.
- `flowfile_core/auth/` — JWT (`jwt.py`), API keys (`api_key.py`), passwords (`password.py`).
Expand Down
68 changes: 20 additions & 48 deletions flowfile_core/flowfile_core/ai/tools/classification.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,56 +30,28 @@

from __future__ import annotations

from typing import Final, Literal
from typing import Literal

NodeClass = Literal["static", "dynamic", "source", "passthrough"]

_NODE_CLASS_MAP: Final[dict[str, NodeClass]] = {
"manual_input": "source",
"filter": "static",
"formula": "static",
"select": "static",
"dynamic_rename": "dynamic",
"sort": "static",
"record_id": "static",
"sample": "static",
"random_split": "static",
"unique": "static",
"group_by": "static",
"window_functions": "static",
"pivot": "dynamic",
"unpivot": "dynamic",
"text_to_rows": "dynamic",
"graph_solver": "dynamic",
"python_script": "dynamic",
"polars_code": "dynamic",
"sql_query": "dynamic",
"join": "static",
"cross_join": "static",
"fuzzy_match": "static",
"record_count": "static",
"explore_data": "static",
"union": "static",
"output": "static",
"api_response": "static",
"read": "source",
"database_reader": "source",
"database_writer": "static",
"cloud_storage_reader": "source",
"cloud_storage_writer": "static",
"catalog_reader": "source",
"catalog_writer": "static",
"kafka_source": "source",
"google_analytics_reader": "source",
"rest_api_reader": "source",
"external_source": "source",
"promise": "passthrough",
"user_defined": "dynamic",
"train_model": "static",
"apply_model": "static",
"evaluate_model": "static",
"wait_for": "static",
}
# Derived from the node registry (each NodeSpec carries its ai_classification);
# built lazily to keep this module import-light.
_node_class_map: dict[str, NodeClass] | None = None


def _get_node_class_map() -> dict[str, NodeClass]:
global _node_class_map
if _node_class_map is None:
from flowfile_core.flowfile.node_registry import BUILTIN_REGISTRY

_node_class_map = BUILTIN_REGISTRY.ai_classification_map()
return _node_class_map


def __getattr__(name: str):
if name == "_NODE_CLASS_MAP":
return _get_node_class_map()
raise AttributeError(f"module {__name__!r} has no attribute {name!r}")


def classify_node_type(node_type: str) -> NodeClass:
Expand All @@ -89,7 +61,7 @@ def classify_node_type(node_type: str) -> NodeClass:
through the kernel dry-run path which fails closed (refusal on missing
upstream sample) rather than producing a wrong schema.
"""
return _NODE_CLASS_MAP.get(node_type, "dynamic")
return _get_node_class_map().get(node_type, "dynamic")


def is_predictable_via_mirror(node_type: str) -> bool:
Expand Down
4 changes: 2 additions & 2 deletions flowfile_core/flowfile_core/configs/node_store/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import logging

from flowfile_core.configs.node_store.nodes import get_all_standard_nodes
from flowfile_core.configs.node_store.nodes import get_all_standard_nodes, get_nodes_with_default_settings
from flowfile_core.configs.node_store.user_defined_node_registry import (
get_all_nodes_from_standard_location,
load_single_node_from_file,
Expand Down Expand Up @@ -28,7 +28,7 @@

logger = logging.getLogger(__name__)

nodes_with_defaults = {"sample", "sort", "union", "select", "record_count"}
nodes_with_defaults = get_nodes_with_default_settings()


def register_custom_node(node: NodeTemplate):
Expand Down
Loading
Loading