Skip to content

feat: add loop operators#4206

Open
aglinxinyuan wants to merge 263 commits into
mainfrom
xinyuan-loop-feb
Open

feat: add loop operators#4206
aglinxinyuan wants to merge 263 commits into
mainfrom
xinyuan-loop-feb

Conversation

@aglinxinyuan
Copy link
Copy Markdown
Contributor

@aglinxinyuan aglinxinyuan commented Feb 11, 2026

What changes were proposed in this PR?

Adds LoopStart and LoopEnd operators so workflows can iterate over rows of a pandas table with for-loop semantics, including nested loops. The back-edge from LoopEnd to LoopStart is implemented out-of-band — via a jump_to_operator_region RPC plus a direct iceberg write to LoopStart's input port URI — so the region DAG remains acyclic. Per-iteration state is replayed through iceberg, building on the state-materialization machinery from #4490.

Topology

   Upstream Table
        │
        ▼
   ┌──────────┐    state  ┌──────────┐                 ┌─────────┐
   │ LoopStart├──{counter,├─loop body├────────────────►│ LoopEnd │
   │          │  table,   │          │                 │         │
   └──────────┘  ...}     └──────────┘                 └────┬────┘
        ▲                                                   │
        │  jump_to_operator_region(LoopStartId)             │
        │  + write next-iteration state to LoopStart's      │
        │    input URI in iceberg                           │
        └───────────────────────────────────────────────────┘
                          condition()==True
  • LoopStart: user-supplied initialization (e.g. i = 0) and per-iteration output expression (e.g. table.iloc[i]).
  • LoopEnd: user-supplied update (e.g. i += 1) and termination condition (e.g. i < len(table)).

What changed

Area File Purpose
Operator definitions LoopStartOpDesc.scala, LoopEndOpDesc.scala Generate Python operator code from user expressions; register under LogicalOp.scala
Operator runtime base core/models/operator.pyLoopStartOperator, LoopEndOperator; exported via pytexera/__init__.py Maintain loop_counter as nesting-depth marker; emit/process state across iterations
Scheduler RegionExecutionCoordinator.scala Preserve LoopEnd's iceberg output across iterations using DocumentFactory.documentExists (#5085) — LoopEnd regions skip the otherwise-clobbering createDocument call when the document is already there
Type-based marker PhysicalOp.scala — adds isLoopEnd: Boolean field + withIsLoopEnd builder LoopEnd flag for the scheduler; consistent with how WorkflowExecutionService detects LoopStartOpDesc
Worker runtime MainLoop._attach_loop_start_id, _jump_to_loop_start Tag emitted state with LoopStartId + the upstream's URI; on LoopEnd completion, jump back + write next state
Worker output OutputManager.reset_loopend_storage + _storage_uris Truncate LoopEnd's iceberg tables at each iteration boundary so downstream sees only the latest iteration's data
Execution mode WorkflowExecutionService.scala Coerce to MATERIALIZED when any LoopStartOpDesc is present (loops require materialized edges)
Frontend LoopStart.png, LoopEnd.png Operator icons

Nesting

loop_counter is a depth counter for nested loops:

Where Behavior
Inner LoopStart.process_state If incoming state already has LoopStartStateURI (outer scope), loop_counter += 1 and pass through; outer metadata is preserved.
Inner LoopEnd.process_state If loop_counter > 0, decrement and return state (don't absorb as terminal).
Outermost LoopEnd loop_counter == 0 → absorbs state, runs user update, evaluates condition(), jumps back to its LoopStart.

Any related issues, documentation, discussions?

Closes #4442. Builds on #4490 (state materialization across regions) and #5085 (DocumentFactory.documentExists).

How was this PR tested?

  • Single-level loop: Source → LoopStart(i = 0, table.iloc[i]) → UDF → LoopEnd(i += 1, i < len(table)) → Sink. Sink reads exactly the final-iteration output.
  • Nested loop (two levels): inner loop runs to completion before outer advances; loop_counter round-trips through both depths.
  • User-set executionMode = PIPELINED with a LoopStart present is coerced to MATERIALIZED.
  • LoopEnd.condition() == False exits cleanly without firing jump_to_operator_region; downstream operators read the final iteration's tuples from LoopEnd's iceberg output.
  • sbt scalafmtCheckAll scalafixAll --check and ruff check clean.
  • Testing workflows:

Was this PR authored or co-authored using generative AI tooling?

Co-authored with Claude Opus 4.7 in compliance with ASF.

@github-actions github-actions Bot added engine frontend Changes related to the frontend GUI common labels Feb 11, 2026
@aglinxinyuan aglinxinyuan reopened this Feb 11, 2026
@aglinxinyuan aglinxinyuan reopened this Feb 11, 2026
@aglinxinyuan aglinxinyuan reopened this Feb 12, 2026
@aglinxinyuan aglinxinyuan reopened this Feb 14, 2026
@aglinxinyuan aglinxinyuan reopened this Feb 14, 2026
@aglinxinyuan aglinxinyuan changed the base branch from main to xiaozhen-sync-region-kill February 14, 2026 00:30
@aglinxinyuan aglinxinyuan self-assigned this Feb 28, 2026
@github-actions github-actions Bot added the python label Mar 1, 2026
@aglinxinyuan aglinxinyuan changed the base branch from xiaozhen-sync-region-kill to xinyuan-state-only April 19, 2026 08:28
@aglinxinyuan aglinxinyuan changed the base branch from main to xinyuan-document-factory-exists May 16, 2026 01:38
aglinxinyuan and others added 5 commits May 15, 2026 18:38
…jump site

LoopEndOperator.loop_start_id() returned a state value but also had two
hidden `del` side effects — a surprising shape for an _id() accessor.
The cleanup logic was split across two locations (loop_start_id deleted
table/output; _jump_to_loop_start deleted LoopStartId/LoopStartStateURI),
so the strip-then-write step wasn't obvious in either place.

Inline the four-key strip into _jump_to_loop_start so all the metadata
trimming happens right next to the iceberg write that needs it, and the
operator class stays purely declarative.
…t_port_base_uri

generatePythonCode is a pure string interpolation with no failure modes,
so the broad try/catch in LoopStartOpDesc and LoopEndOpDesc just defers
any future code-gen bug from compile time to a Python syntax error at
the worker. Drop the wrappers.

InputManager.get_input_port_base_uri had one caller; inline it using
the existing public get_input_port_mat_reader_threads accessor.
Base automatically changed from xinyuan-document-factory-exists to main May 16, 2026 04:35
# Conflicts:
#	amber/src/main/python/core/storage/document_factory.py
#	common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/DocumentFactory.scala
#	common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala
… OperatorInfo

- process_input_state: collapse the two mutually-exclusive isinstance
  branches into if/elif over a single bound `executor`. No behavior
  change; saves one attribute lookup and makes mutual exclusivity
  explicit.
- LoopEndOperator.condition: annotate return as `bool` (was `None`).
  Callers and the generated subclass already treat it as a boolean.
- Loop{Start,End}OpDesc: replace the duplicated userFriendlyName /
  description with a real one-line description so the UI tooltip
  isn't redundant.
RegionExecutionCoordinator detected LoopEnd regions by matching on the
auto-generated op-id prefix ("LoopEnd-operator-"). That coupling
silently breaks the moment the op-id format changes or a user names a
non-LoopEnd op with the same prefix.

Add `isLoopEnd: Boolean = false` to PhysicalOp (with a small
`withIsLoopEnd` builder), set it from LoopEndOpDesc, and check it
directly in the scheduler. Matches the type-based pattern already used
in WorkflowExecutionService for LoopStart.
LoopEnd has exactly one output port, so OutputManager only ever stored
one entry in _storage_uris. Replace the dict with a single Optional[str]
field.
Remove OutputManager._storage_uri_base by carrying the base URI on the
result-port PortStorageWriter itself. reset_loopend_storage now reads
the URI from the writer it is about to close, so OutputManager no
longer holds any loop-specific bookkeeping field.
@aglinxinyuan aglinxinyuan marked this pull request as ready for review May 18, 2026 21:52
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Introduces LoopStart / LoopEnd control-flow operators implementing for-loop semantics (including nesting) over pandas tables. Iteration is implemented out-of-band: the region DAG stays acyclic, and back-edges are realized by a jump_to_operator_region RPC plus a direct iceberg write to LoopStart's input URI, reusing the cross-region state-materialization plumbing from #4490 and the documentExists helper from #5085.

Changes:

  • Adds Scala op descriptors (LoopStartOpDesc, LoopEndOpDesc) that codegen Python and registers them in LogicalOp; adds an isLoopEnd marker on PhysicalOp so the region coordinator preserves LoopEnd's iceberg tables across iterations.
  • Adds Python base classes (LoopStartOperator, LoopEndOperator) and wires MainLoop.complete to jump back to LoopStart and rewrite its input state on each iteration; OutputManager.reset_storage re-provisions iceberg tables per iteration.
  • Forces ExecutionMode.MATERIALIZED whenever a LoopStartOpDesc is present in the logical plan.

Reviewed changes

Copilot reviewed 10 out of 12 changed files in this pull request and generated 10 comments.

Show a summary per file
File Description
common/workflow-operator/.../loop/LoopStartOpDesc.scala New operator descriptor; codegens Python LoopStart class from user initialization/output expressions.
common/workflow-operator/.../loop/LoopEndOpDesc.scala New operator descriptor; codegens Python LoopEnd class from user update/condition, marks physical op as LoopEnd.
common/workflow-operator/.../LogicalOp.scala Registers the two new operator subtypes for JSON polymorphic dispatch.
common/workflow-core/.../PhysicalOp.scala Adds isLoopEnd field and withIsLoopEnd builder.
amber/.../web/service/WorkflowExecutionService.scala Coerces execution mode to MATERIALIZED if any LoopStart op is present.
amber/.../scheduling/RegionExecutionCoordinator.scala Skips re-creating LoopEnd region's iceberg result/state docs if they already exist.
amber/src/main/python/pytexera/init.py Exposes LoopStartOperator / LoopEndOperator from the public API.
amber/src/main/python/core/models/operator.py Adds LoopStart/LoopEnd operator base classes implementing loop_counter + state pickling semantics.
amber/src/main/python/core/runnables/main_loop.py Tags emitted state with LoopStart id/URI; on LoopEnd completion, jumps to LoopStart and rewrites its input URI; resets LoopEnd's output storage per iteration.
amber/src/main/python/core/architecture/packaging/output_manager.py Adds reset_storage and remembers _storage_uri_base for re-provisioning iceberg tables.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread amber/src/main/python/core/runnables/main_loop.py
Comment thread amber/src/main/python/core/runnables/main_loop.py
Comment thread amber/src/main/python/core/models/operator.py
Comment thread amber/src/main/python/core/models/operator.py
Comment thread amber/src/main/python/core/architecture/packaging/output_manager.py
Comment thread amber/src/main/python/core/architecture/packaging/output_manager.py
Comment thread amber/src/main/python/core/runnables/main_loop.py
@Xiao-zhen-Liu
Copy link
Copy Markdown
Contributor

@aglinxinyuan Can you upload the workflows used for testing in the PR description?

@aglinxinyuan
Copy link
Copy Markdown
Contributor Author

@aglinxinyuan Can you upload the workflows used for testing in the PR description?

Updated.

@aglinxinyuan
Copy link
Copy Markdown
Contributor Author

aglinxinyuan commented May 19, 2026

I plan to add test cases on a separate PR. What do you think?

@Xiao-zhen-Liu
Copy link
Copy Markdown
Contributor

I plan to add test cases on a separate PR. What do you think?

I think it makes more sense to include test cases in this PR. Usually test cases are not good candidates for splitting into a future PR.

@chenlica
Copy link
Copy Markdown
Contributor

I plan to add test cases on a separate PR. What do you think?

I think it makes more sense to include test cases in this PR. Usually test cases are not good candidates for splitting into a future PR.

Agreed.

@aglinxinyuan
Copy link
Copy Markdown
Contributor Author

Sure, I’ll add test cases in this PR.

For reference, before adding the test cases, this PR already contains 302 lines of changes across 12 files.

…avior

Adds three spec files exercising the loop PR's surface:

* LoopStartOpDescSpec: pins operatorInfo, the generated Python code
  (init / output expression sites, LoopStartOperator subclassing),
  and PhysicalOp flags (non-parallelizable, single worker, not
  isLoopEnd).

* LoopEndOpDescSpec: pins operatorInfo, the generated Python code
  (update / condition sites, condition() returning bool, the nested-
  loop loop_counter > 0 pass-through branch, the matching-loop pickle
  round-trip), and PhysicalOp flags including isLoopEnd=true.

* test_loop_operators.py: drives stub subclasses that mirror the
  generated runtime classes through both single-loop scenarios
  (first-time merge, produce_state_on_finish pickling, matching
  branch update/condition) and nested-loop scenarios (loop_counter
  increments on LoopStart re-entry; decrements on LoopEnd pass-
  through; depth-symmetric round trip across outer x inner loop).
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

common engine frontend Changes related to the frontend GUI python

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Introduce for loop

6 participants