Skip to content

Migrate node execution to ExecutionBackend seam; establish node registry#562

Open
Edwardvaneechoud wants to merge 7 commits into
mainfrom
claude/core-abstraction-flowgraph-001hrn
Open

Migrate node execution to ExecutionBackend seam; establish node registry#562
Edwardvaneechoud wants to merge 7 commits into
mainfrom
claude/core-abstraction-flowgraph-001hrn

Conversation

@Edwardvaneechoud

Copy link
Copy Markdown
Owner

Summary

This PR establishes a single source of truth for built-in node types via a new NodeRegistry and NodeSpec abstraction, and introduces an ExecutionBackend seam to decouple where node compute runs (local vs. remote worker) from node logic itself.

Previously, node metadata (templates, settings classes, AI classifications, defaults) lived in four hand-maintained catalogs scattered across the codebase. Node execution logic branched inline on execution_location == "local" throughout flow_graph.py and flow_node.py. This PR consolidates both concerns into declarative, composable structures.

Key Changes

Node Registry & Specs

  • flowfile_core/flowfile/node_registry/ (new): Central registry holding one NodeSpec per built-in node type
    • spec.py: NodeSpec dataclass bundles NodeTemplate, settings class, AI classification, defaults, and optional compute factory
    • registry.py: NodeRegistry class with derived views (get_all_standard_nodes(), NODE_TYPE_TO_SETTINGS_CLASS, nodes_with_defaults, AI classification map)
    • builtin/: Node specs grouped by domain (simple.py, io_nodes.py, database.py, scripting.py, ml.py, streaming_sources.py, special.py)
  • Legacy catalog views (configs/node_store/nodes.py, schemas.NODE_TYPE_TO_SETTINGS_CLASS, ai/tools/classification._NODE_CLASS_MAP) now derive from the registry, preserving backward compatibility for existing importers

Execution Backend Seam

  • flowfile_core/flowfile/execution/ (new): Abstractions for compute location
    • backends/base.py: ExecutionBackend ABC with typed operations (run_lazyframe, run_external_read, etc.)
    • backends/local.py: LocalBackend runs compute in-process (flowfile_frame, scheduler, CLI)
    • backends/worker.py: RemoteWorkerBackend ships work to flowfile_worker service
    • backends/__init__.py: resolve_backend(location) factory
    • transport.py: WorkerTransport client (single owner of worker URLs and request plumbing)
    • handles.py: TaskHandle protocol unifying in-process and worker-backed results
    • exceptions.py: Typed errors (WorkerError, WorkerConnectionError, WorkerTaskError)

Node Execution Migration

  • flow_graph.py: Refactored to use _add_from_spec() for declarative nodes
    • Migrated add_filter(), add_sort(), add_record_count(), add_sample(), add_union() to spec-driven path
    • Removed inline execution_location branches; compute factories in specs handle location dispatch
    • Removed direct imports of fetchers (ExternalDatabaseFetcher, ExternalOutputWriter, etc.); backends own them
  • flow_node.py: Simplified to use ExecutionBackend instead of inline fetcher construction
    • Removed ExternalDatabaseFetcher, ExternalDatabaseWriter, ExternalCloudWriter, ExternalSampler direct usage
    • Calls resolve_backend(execution_location) once per node execution
  • subprocess_operations.py: Refactored to accept optional WorkerTransport parameter
    • trigger_df_operation(), trigger_sample_operation(), trigger_fuzzy_match_operation() now take transport kwarg
    • Raises typed WorkerTaskError instead of generic Exception
    • Removed direct WORKER_URL formatting; delegates to transport

Streaming & Transport

  • streaming.py: Refactored to use WorkerTransport instead of hardcoded WORKER_URL
    • streaming_receive() now accepts optional transport parameter

Tests

  • **`tests/

https://claude.ai/code/session_018wuo4YBi8gTb1aXY3vH4xq

claude added 6 commits July 2, 2026 01:57
Move URL formatting, HTTP plumbing and WS streaming out of the fetchers
and trigger functions into execution/transport.py. BaseFetcher and all
fetcher subclasses accept an injected WorkerTransport (defaulting to the
settings-driven singleton), and NodeExecutor detects worker outages via
the typed WorkerConnectionError instead of substring-matching error text.

Groundwork for the ExecutionBackend seam (local vs worker compute).

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_018wuo4YBi8gTb1aXY3vH4xq
Introduce execution/backends: an ExecutionBackend ABC with LocalBackend
(in-process) and RemoteWorkerBackend (wraps the fetchers over a
WorkerTransport), plus a TaskHandle protocol unifying worker fetchers and
in-process results. NodeExecutor resolves the backend from run_location
and passes it to FlowNode._do_execute_remote/_do_execute_local_with_sampling,
which now call backend.run_lazyframe/sample/count_records instead of
constructing ExternalDfFetcher/ExternalSampler inline. FlowGraph exposes
execution_backend, resolved per call so closures honor the location at
run time.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_018wuo4YBi8gTb1aXY3vH4xq
Replace the inline 'if execution_location == "local"' branches in the
add_output, add_database_reader and add_random_split node builders with a
single call on the flow's execution backend. The local/remote variance
moves into LocalBackend/RemoteWorkerBackend (write_output, read_database,
random_split ops); both database paths now consume the same rendered
DatabaseExternalReadSettings payload.

Adds location-parity tests and a ratchet guard that pins the number of
remaining inline location comparisons in flow_graph.py (13 -> 10).

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_018wuo4YBi8gTb1aXY3vH4xq
Introduce flowfile/node_registry: one NodeSpec per built-in node type
bundling the NodeTemplate, settings class, default-settings flag and AI
classification that previously lived in four hand-maintained catalogs.
The legacy views are now derived from the registry:

- configs/node_store/nodes.py get_all_standard_nodes() (template literals
  moved to node_registry/builtin/)
- schemas.NODE_TYPE_TO_SETTINGS_CLASS (lazy module __getattr__)
- the duplicated nodes_with_defaults sets
- ai/tools/classification._NODE_CLASS_MAP

routes.get_node_model resolves through the registry first, keeping the
reflective input_schema scan only as fallback for legacy types
(datasource). Contract tests pin the derived views to frozen snapshots
of the previous literals.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_018wuo4YBi8gTb1aXY3vH4xq
FlowGraph._add_from_spec builds a node from its NodeSpec: the spec's
compute_factory produces the closure and the input node ids derive from
the spec's arity. add_sort, add_filter, add_record_count, add_sample and
add_union become thin delegations with their signatures, docstrings and
history capture unchanged (flowfile_frame and the routes getattr dispatch
are unaffected; stubs verified via make check_stubs). Closure bodies move
to node_registry/builtin/simple.py as compute factories, preserving the
_func naming convention add_node_step relies on.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_018wuo4YBi8gTb1aXY3vH4xq
@netlify

netlify Bot commented Jul 2, 2026

Copy link
Copy Markdown

Deploy Preview for flowfile-wasm canceled.

Name Link
🔨 Latest commit c17b387
🔍 Latest deploy log https://app.netlify.com/projects/flowfile-wasm/deploys/6a46680d0bef6300086592a9

@claude

claude Bot commented Jul 2, 2026

Copy link
Copy Markdown

Claude finished @Edwardvaneechoud's task in 7m 51s —— View job


Claude's Review

Tasks

  • Read repo root CLAUDE.md and flowfile_core/CLAUDE.md
  • Review diff against base commit (origin/main was unreachable in this sandbox — fetch is blocked; used the PR's merge-base commit a3ae528 instead, which matches the stated origin/main)
  • Check correctness (no .collect() regressions, execution backend seam, node registry)
  • Check conventions (Polars, Pydantic v2, lean comments)
  • Check security (secrets, worker URL handling)
  • Check tests / migrations
  • Post final review

Summary

This is a clean, well-executed refactor that establishes two new seams (NodeRegistry/NodeSpec and ExecutionBackend) without changing observable behavior. I spot-checked the migrated logic against the pre-PR code and additionally ran an independent agent to do an exhaustive field-by-field comparison of every node type's NodeTemplate, settings-class mapping, and AI classification between the old hand-maintained catalogs and the new registry specs — no discrepancies found (all laziness, tags, multi, prod_ready, can_be_start, output_names, has_default_settings values match exactly, including implicit defaults).

Strengths

  • flowfile_core/tests/flowfile/execution/test_node_registry.py pins frozen snapshots of the old NODE_TYPE_TO_SETTINGS_CLASS/_NODE_CLASS_MAP/nodes_with_defaults catalogs and asserts the derived views still match — a strong regression guard for exactly the kind of silent-drift risk this refactor introduces.
  • test_no_inline_location_branches.py is a good ratchet test (inline execution_location == "local" comparisons can only shrink, migrated builders are pinned branch-free). Current count in flow_graph.py is exactly at the declared ceiling (10).
  • WorkerConnectionError deliberately subclasses both the new WorkerError and requests.exceptions.ConnectionError, so pre-existing except requests.RequestException call sites keep working — nice attention to backward compatibility, and covered by test_worker_connection_error_is_requests_exception.
  • executor.py's error classification now checks isinstance(error, WorkerConnectionError) instead of the old fragile "Connection refused" in error_str and "/submit_query/" in error_str string match — a genuine robustness improvement.
  • test_backend_parity.py and test_add_from_spec.py exercise real run_graph() calls in both local/remote execution locations rather than mocking, consistent with this repo's testing conventions.
  • LocalBackend.sample/count_records use bounded .head()/pl.len() collects, consistent with the core-never-collects-on-the-hot-path contract documented in flowfile_core/CLAUDE.md.
  • Secret handling in LocalBackend.read_database decrypts via the existing secret_manager.decrypt_secret, and the encrypted password flows through DatabaseExternalReadSettings.connection.password identically to how the old inline local branch did it — no behavior change, no secret handling regression.

Minor observations (non-blocking)

  • In flow_graph.py's add_database_reader, the unified path now unconditionally sets fl.lazy = True after constructing FlowDataEngine(reader_handle.get_result()), whereas the old remote branch didn't set it explicitly. This is a no-op in practice (FlowDataEngine.__init__ already sets _lazy = True when constructed from a LazyFrame, and the lazy setter is a no-op when the value is unchanged), so no behavior change — just flagging that I checked it.
  • test_local_backend_has_no_result_cache (test_backends.py:63) asserts pytest.raises(Exception) for get_cached_lazyframe; could tighten to WorkerTaskError for a more precise contract, but low value.

Verification limits in this sandboxed review

  • This environment blocks network access (git fetch and running poetry/pytest both required approval that wasn't grantable), so I could not execute the new test suite (flowfile_core/tests/flowfile/execution/) or the linter directly. I instead did a careful line-by-line reading of the diff plus an independent agent-driven exhaustive field comparison against the pre-PR catalogs. Given the CI job referenced in this PR's checks, I'd defer to those results for actual execution confirmation.

No correctness, security, or convention issues found that need fixing before merge.

@claude

claude Bot commented Jul 2, 2026

Copy link
Copy Markdown

Claude finished @Edwardvaneechoud's task in 4m 23s —— View job


Claude's Review

Tasks

  • Read repo root CLAUDE.md and flowfile_core/CLAUDE.md
  • Resolve diff base (3857aced6 = current origin/main; network fetch blocked in sandbox, used local refs — diff stat matches the PR's stated 37 files / +2976/-1112 exactly)
  • Check correctness (execution backend seam, node registry, .collect() usage)
  • Check conventions (Polars, Pydantic v2, lean comments)
  • Check security (secrets, worker URL handling)
  • Check tests / migrations
  • Post final review

Summary

I independently re-verified this diff (same content Claude reviewed in the earlier pass on this PR — no new commits landed since). Spot-checked the riskiest seams line-by-line rather than re-running the same field comparison:

  • LocalBackend.read_database (execution/backends/local.py) drops the old local branch's table_name/schema_name/fields args to SqlSource and only passes the resolved query. Traced this through add_database_reader (flow_graph.py:3799-3819): the caller pre-resolves table_name"SELECT * FROM schema.table" via BaseSqlSource before building DatabaseExternalReadSettings, and SqlSource.get_pl_df() only ever reads self.query regardless of query_mode. No behavior change — the dropped args only affected query_mode-dependent methods (get_sample, get_flow_file_columns) that LocalBackend.read_database never calls.
  • flow_node.py/executor.py: backend is threaded from NodeExecutor.execute()_execute_with_strategy_do_local_with_sampling/_do_remote on every real call path; the backend or resolve_backend("remote") defaults in flow_node.py are only a fallback for direct/test calls and correctly preserve old behavior (sampling and _do_execute_remote were always worker-bound).
  • subprocess_operations.py: every fetcher class (ExternalDfFetcher, ExternalSampler, ExternalDatabaseFetcher, ExternalKafkaFetcher, etc.) now threads an optional transport through to BaseFetcher, including the previously-missed cancel_task call in BaseFetcher.stop() (self._transport.cancel_task(...) instead of the module-level global) — this closes a gap where a custom transport wouldn't have been honored on cancellation.
  • test_no_inline_location_branches.py ratchet: flow_graph.py currently has exactly 10 inline execution_location == "local" comparisons, matching the declared ceiling — confirmed with a direct grep, not just trusting the test.
  • node_registry/builtin/simple.py: spot-checked the filter, sort, record_count, sample, union compute factories against the removed inline closures they replaced — identical logic, and filter's renew_schema=False (needed since filtering doesn't change the schema) carried over correctly to the NodeSpec.
  • schemas.py/classification.py: legacy NODE_TYPE_TO_SETTINGS_CLASS/_NODE_CLASS_MAP are now lazily computed via module __getattr__, cached in a module-level singleton dict derived from BUILTIN_REGISTRY. Behaves the same as the old plain dict for existing mutation-based test patterns, and avoids a schemasnode_registry import cycle.
  • WorkerConnectionError in executor.py's error classification replaces the old fragile "Connection refused" in error_str and "/submit_query/" in error_str string match with isinstance(error, WorkerConnectionError) — confirmed WorkerTransport._request raises this typed error on requests.exceptions.ConnectionError, and it's also a requests.exceptions.ConnectionError subclass so pre-existing except requests.RequestException sites still catch it.
  • routes.py: get_node_model now resolves through the registry first (get_node_spec(node_type).settings_class) with a fallback to the legacy reflective module scan for types outside the registry (e.g. datasource) — correct layering, no loss of coverage.

No correctness, security, or convention issues found. This mirrors the outcome of the earlier review pass on this PR; nothing has changed in the diff to revisit that conclusion.

Verification limits in this sandboxed review

  • Network access is blocked (git fetch origin main and poetry/pytest all require approval unavailable here), so I could not execute the test suite directly. I instead resolved the correct diff base from local refs (3857aced6, confirmed to reproduce the PR's exact stated file/line stats) and did a targeted manual trace of the highest-risk code paths instead of relying solely on the existing test coverage.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants