feat: add loop operators#4206
Conversation
…jump site LoopEndOperator.loop_start_id() returned a state value but also had two hidden `del` side effects — a surprising shape for an _id() accessor. The cleanup logic was split across two locations (loop_start_id deleted table/output; _jump_to_loop_start deleted LoopStartId/LoopStartStateURI), so the strip-then-write step wasn't obvious in either place. Inline the four-key strip into _jump_to_loop_start so all the metadata trimming happens right next to the iceberg write that needs it, and the operator class stays purely declarative.
…t_port_base_uri generatePythonCode is a pure string interpolation with no failure modes, so the broad try/catch in LoopStartOpDesc and LoopEndOpDesc just defers any future code-gen bug from compile time to a Python syntax error at the worker. Drop the wrappers. InputManager.get_input_port_base_uri had one caller; inline it using the existing public get_input_port_mat_reader_threads accessor.
# Conflicts: # amber/src/main/python/core/storage/document_factory.py # common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/DocumentFactory.scala # common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala
… OperatorInfo
- process_input_state: collapse the two mutually-exclusive isinstance
branches into if/elif over a single bound `executor`. No behavior
change; saves one attribute lookup and makes mutual exclusivity
explicit.
- LoopEndOperator.condition: annotate return as `bool` (was `None`).
Callers and the generated subclass already treat it as a boolean.
- Loop{Start,End}OpDesc: replace the duplicated userFriendlyName /
description with a real one-line description so the UI tooltip
isn't redundant.
RegionExecutionCoordinator detected LoopEnd regions by matching on the
auto-generated op-id prefix ("LoopEnd-operator-"). That coupling
silently breaks the moment the op-id format changes or a user names a
non-LoopEnd op with the same prefix.
Add `isLoopEnd: Boolean = false` to PhysicalOp (with a small
`withIsLoopEnd` builder), set it from LoopEndOpDesc, and check it
directly in the scheduler. Matches the type-based pattern already used
in WorkflowExecutionService for LoopStart.
LoopEnd has exactly one output port, so OutputManager only ever stored one entry in _storage_uris. Replace the dict with a single Optional[str] field.
Remove OutputManager._storage_uri_base by carrying the base URI on the result-port PortStorageWriter itself. reset_loopend_storage now reads the URI from the writer it is about to close, so OutputManager no longer holds any loop-specific bookkeeping field.
This reverts commit 71ca457.
There was a problem hiding this comment.
Pull request overview
Introduces LoopStart / LoopEnd control-flow operators implementing for-loop semantics (including nesting) over pandas tables. Iteration is implemented out-of-band: the region DAG stays acyclic, and back-edges are realized by a jump_to_operator_region RPC plus a direct iceberg write to LoopStart's input URI, reusing the cross-region state-materialization plumbing from #4490 and the documentExists helper from #5085.
Changes:
- Adds Scala op descriptors (
LoopStartOpDesc,LoopEndOpDesc) that codegen Python and registers them inLogicalOp; adds anisLoopEndmarker onPhysicalOpso the region coordinator preserves LoopEnd's iceberg tables across iterations. - Adds Python base classes (
LoopStartOperator,LoopEndOperator) and wiresMainLoop.completeto jump back to LoopStart and rewrite its input state on each iteration;OutputManager.reset_storagere-provisions iceberg tables per iteration. - Forces
ExecutionMode.MATERIALIZEDwhenever aLoopStartOpDescis present in the logical plan.
Reviewed changes
Copilot reviewed 10 out of 12 changed files in this pull request and generated 10 comments.
Show a summary per file
| File | Description |
|---|---|
| common/workflow-operator/.../loop/LoopStartOpDesc.scala | New operator descriptor; codegens Python LoopStart class from user initialization/output expressions. |
| common/workflow-operator/.../loop/LoopEndOpDesc.scala | New operator descriptor; codegens Python LoopEnd class from user update/condition, marks physical op as LoopEnd. |
| common/workflow-operator/.../LogicalOp.scala | Registers the two new operator subtypes for JSON polymorphic dispatch. |
| common/workflow-core/.../PhysicalOp.scala | Adds isLoopEnd field and withIsLoopEnd builder. |
| amber/.../web/service/WorkflowExecutionService.scala | Coerces execution mode to MATERIALIZED if any LoopStart op is present. |
| amber/.../scheduling/RegionExecutionCoordinator.scala | Skips re-creating LoopEnd region's iceberg result/state docs if they already exist. |
| amber/src/main/python/pytexera/init.py | Exposes LoopStartOperator / LoopEndOperator from the public API. |
| amber/src/main/python/core/models/operator.py | Adds LoopStart/LoopEnd operator base classes implementing loop_counter + state pickling semantics. |
| amber/src/main/python/core/runnables/main_loop.py | Tags emitted state with LoopStart id/URI; on LoopEnd completion, jumps to LoopStart and rewrites its input URI; resets LoopEnd's output storage per iteration. |
| amber/src/main/python/core/architecture/packaging/output_manager.py | Adds reset_storage and remembers _storage_uri_base for re-provisioning iceberg tables. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
@aglinxinyuan Can you upload the workflows used for testing in the PR description? |
Updated. |
|
I plan to add test cases on a separate PR. What do you think? |
I think it makes more sense to include test cases in this PR. Usually test cases are not good candidates for splitting into a future PR. |
Agreed. |
|
Sure, I’ll add test cases in this PR. For reference, before adding the test cases, this PR already contains 302 lines of changes across 12 files. |
…avior Adds three spec files exercising the loop PR's surface: * LoopStartOpDescSpec: pins operatorInfo, the generated Python code (init / output expression sites, LoopStartOperator subclassing), and PhysicalOp flags (non-parallelizable, single worker, not isLoopEnd). * LoopEndOpDescSpec: pins operatorInfo, the generated Python code (update / condition sites, condition() returning bool, the nested- loop loop_counter > 0 pass-through branch, the matching-loop pickle round-trip), and PhysicalOp flags including isLoopEnd=true. * test_loop_operators.py: drives stub subclasses that mirror the generated runtime classes through both single-loop scenarios (first-time merge, produce_state_on_finish pickling, matching branch update/condition) and nested-loop scenarios (loop_counter increments on LoopStart re-entry; decrements on LoopEnd pass- through; depth-symmetric round trip across outer x inner loop).
What changes were proposed in this PR?
Adds
LoopStartandLoopEndoperators so workflows can iterate over rows of a pandas table with for-loop semantics, including nested loops. The back-edge fromLoopEndtoLoopStartis implemented out-of-band — via ajump_to_operator_regionRPC plus a direct iceberg write to LoopStart's input port URI — so the region DAG remains acyclic. Per-iteration state is replayed through iceberg, building on the state-materialization machinery from #4490.Topology
LoopStart: user-suppliedinitialization(e.g.i = 0) and per-iterationoutputexpression (e.g.table.iloc[i]).LoopEnd: user-suppliedupdate(e.g.i += 1) and terminationcondition(e.g.i < len(table)).What changed
LoopStartOpDesc.scala,LoopEndOpDesc.scalaLogicalOp.scalacore/models/operator.py—LoopStartOperator,LoopEndOperator; exported viapytexera/__init__.pyloop_counteras nesting-depth marker; emit/process state across iterationsRegionExecutionCoordinator.scalaDocumentFactory.documentExists(#5085) — LoopEnd regions skip the otherwise-clobberingcreateDocumentcall when the document is already therePhysicalOp.scala— addsisLoopEnd: Booleanfield +withIsLoopEndbuilderWorkflowExecutionServicedetectsLoopStartOpDescMainLoop._attach_loop_start_id,_jump_to_loop_startLoopStartId+ the upstream's URI; on LoopEnd completion, jump back + write next stateOutputManager.reset_loopend_storage+_storage_urisWorkflowExecutionService.scalaMATERIALIZEDwhen anyLoopStartOpDescis present (loops require materialized edges)LoopStart.png,LoopEnd.pngNesting
loop_counteris a depth counter for nested loops:LoopStart.process_stateLoopStartStateURI(outer scope),loop_counter += 1and pass through; outer metadata is preserved.LoopEnd.process_stateloop_counter > 0, decrement and return state (don't absorb as terminal).LoopEndloop_counter == 0→ absorbs state, runs userupdate, evaluatescondition(), jumps back to itsLoopStart.Any related issues, documentation, discussions?
Closes #4442. Builds on #4490 (state materialization across regions) and #5085 (
DocumentFactory.documentExists).How was this PR tested?
Source → LoopStart(i = 0, table.iloc[i]) → UDF → LoopEnd(i += 1, i < len(table)) → Sink. Sink reads exactly the final-iteration output.loop_counterround-trips through both depths.executionMode = PIPELINEDwith a LoopStart present is coerced toMATERIALIZED.LoopEnd.condition() == Falseexits cleanly without firingjump_to_operator_region; downstream operators read the final iteration's tuples from LoopEnd's iceberg output.sbt scalafmtCheckAll scalafixAll --checkandruff checkclean.Was this PR authored or co-authored using generative AI tooling?
Co-authored with Claude Opus 4.7 in compliance with ASF.