Migrate node execution to ExecutionBackend seam; establish node registry#562
Migrate node execution to ExecutionBackend seam; establish node registry#562Edwardvaneechoud wants to merge 7 commits into
Conversation
Move URL formatting, HTTP plumbing and WS streaming out of the fetchers and trigger functions into execution/transport.py. BaseFetcher and all fetcher subclasses accept an injected WorkerTransport (defaulting to the settings-driven singleton), and NodeExecutor detects worker outages via the typed WorkerConnectionError instead of substring-matching error text. Groundwork for the ExecutionBackend seam (local vs worker compute). Co-Authored-By: Claude Fable 5 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_018wuo4YBi8gTb1aXY3vH4xq
Introduce execution/backends: an ExecutionBackend ABC with LocalBackend (in-process) and RemoteWorkerBackend (wraps the fetchers over a WorkerTransport), plus a TaskHandle protocol unifying worker fetchers and in-process results. NodeExecutor resolves the backend from run_location and passes it to FlowNode._do_execute_remote/_do_execute_local_with_sampling, which now call backend.run_lazyframe/sample/count_records instead of constructing ExternalDfFetcher/ExternalSampler inline. FlowGraph exposes execution_backend, resolved per call so closures honor the location at run time. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_018wuo4YBi8gTb1aXY3vH4xq
Replace the inline 'if execution_location == "local"' branches in the add_output, add_database_reader and add_random_split node builders with a single call on the flow's execution backend. The local/remote variance moves into LocalBackend/RemoteWorkerBackend (write_output, read_database, random_split ops); both database paths now consume the same rendered DatabaseExternalReadSettings payload. Adds location-parity tests and a ratchet guard that pins the number of remaining inline location comparisons in flow_graph.py (13 -> 10). Co-Authored-By: Claude Fable 5 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_018wuo4YBi8gTb1aXY3vH4xq
Introduce flowfile/node_registry: one NodeSpec per built-in node type bundling the NodeTemplate, settings class, default-settings flag and AI classification that previously lived in four hand-maintained catalogs. The legacy views are now derived from the registry: - configs/node_store/nodes.py get_all_standard_nodes() (template literals moved to node_registry/builtin/) - schemas.NODE_TYPE_TO_SETTINGS_CLASS (lazy module __getattr__) - the duplicated nodes_with_defaults sets - ai/tools/classification._NODE_CLASS_MAP routes.get_node_model resolves through the registry first, keeping the reflective input_schema scan only as fallback for legacy types (datasource). Contract tests pin the derived views to frozen snapshots of the previous literals. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_018wuo4YBi8gTb1aXY3vH4xq
FlowGraph._add_from_spec builds a node from its NodeSpec: the spec's compute_factory produces the closure and the input node ids derive from the spec's arity. add_sort, add_filter, add_record_count, add_sample and add_union become thin delegations with their signatures, docstrings and history capture unchanged (flowfile_frame and the routes getattr dispatch are unaffected; stubs verified via make check_stubs). Closure bodies move to node_registry/builtin/simple.py as compute factories, preserving the _func naming convention add_node_step relies on. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_018wuo4YBi8gTb1aXY3vH4xq
Co-Authored-By: Claude Fable 5 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_018wuo4YBi8gTb1aXY3vH4xq
✅ Deploy Preview for flowfile-wasm canceled.
|
|
Claude finished @Edwardvaneechoud's task in 7m 51s —— View job Claude's ReviewTasks
SummaryThis is a clean, well-executed refactor that establishes two new seams ( Strengths
Minor observations (non-blocking)
Verification limits in this sandboxed review
No correctness, security, or convention issues found that need fixing before merge. |
|
Claude finished @Edwardvaneechoud's task in 4m 23s —— View job Claude's ReviewTasks
SummaryI independently re-verified this diff (same content Claude reviewed in the earlier pass on this PR — no new commits landed since). Spot-checked the riskiest seams line-by-line rather than re-running the same field comparison:
No correctness, security, or convention issues found. This mirrors the outcome of the earlier review pass on this PR; nothing has changed in the diff to revisit that conclusion. Verification limits in this sandboxed review
|
Summary
This PR establishes a single source of truth for built-in node types via a new
NodeRegistryandNodeSpecabstraction, and introduces anExecutionBackendseam to decouple where node compute runs (local vs. remote worker) from node logic itself.Previously, node metadata (templates, settings classes, AI classifications, defaults) lived in four hand-maintained catalogs scattered across the codebase. Node execution logic branched inline on
execution_location == "local"throughoutflow_graph.pyandflow_node.py. This PR consolidates both concerns into declarative, composable structures.Key Changes
Node Registry & Specs
flowfile_core/flowfile/node_registry/(new): Central registry holding oneNodeSpecper built-in node typespec.py:NodeSpecdataclass bundlesNodeTemplate, settings class, AI classification, defaults, and optional compute factoryregistry.py:NodeRegistryclass with derived views (get_all_standard_nodes(),NODE_TYPE_TO_SETTINGS_CLASS,nodes_with_defaults, AI classification map)builtin/: Node specs grouped by domain (simple.py,io_nodes.py,database.py,scripting.py,ml.py,streaming_sources.py,special.py)configs/node_store/nodes.py,schemas.NODE_TYPE_TO_SETTINGS_CLASS,ai/tools/classification._NODE_CLASS_MAP) now derive from the registry, preserving backward compatibility for existing importersExecution Backend Seam
flowfile_core/flowfile/execution/(new): Abstractions for compute locationbackends/base.py:ExecutionBackendABC with typed operations (run_lazyframe,run_external_read, etc.)backends/local.py:LocalBackendruns compute in-process (flowfile_frame, scheduler, CLI)backends/worker.py:RemoteWorkerBackendships work to flowfile_worker servicebackends/__init__.py:resolve_backend(location)factorytransport.py:WorkerTransportclient (single owner of worker URLs and request plumbing)handles.py:TaskHandleprotocol unifying in-process and worker-backed resultsexceptions.py: Typed errors (WorkerError,WorkerConnectionError,WorkerTaskError)Node Execution Migration
flow_graph.py: Refactored to use_add_from_spec()for declarative nodesadd_filter(),add_sort(),add_record_count(),add_sample(),add_union()to spec-driven pathexecution_locationbranches; compute factories in specs handle location dispatchExternalDatabaseFetcher,ExternalOutputWriter, etc.); backends own themflow_node.py: Simplified to useExecutionBackendinstead of inline fetcher constructionExternalDatabaseFetcher,ExternalDatabaseWriter,ExternalCloudWriter,ExternalSamplerdirect usageresolve_backend(execution_location)once per node executionsubprocess_operations.py: Refactored to accept optionalWorkerTransportparametertrigger_df_operation(),trigger_sample_operation(),trigger_fuzzy_match_operation()now taketransportkwargWorkerTaskErrorinstead of genericExceptionWORKER_URLformatting; delegates to transportStreaming & Transport
streaming.py: Refactored to useWorkerTransportinstead of hardcodedWORKER_URLstreaming_receive()now accepts optionaltransportparameterTests
https://claude.ai/code/session_018wuo4YBi8gTb1aXY3vH4xq