diff --git a/amber/src/main/python/core/architecture/packaging/output_manager.py b/amber/src/main/python/core/architecture/packaging/output_manager.py index b85e3e39bf1..7ef0ca804a2 100644 --- a/amber/src/main/python/core/architecture/packaging/output_manager.py +++ b/amber/src/main/python/core/architecture/packaging/output_manager.py @@ -92,6 +92,11 @@ def __init__(self, worker_id: str): PortIdentity, typing.Tuple[Queue, PortStorageWriter, Thread] ] = dict() + # Loop-end operators have a single output port; remember its base + # URI so `reset_loopend_storage` can re-provision the iceberg + # tables on each loop iteration. + self._storage_uri_base: typing.Optional[str] = None + def is_missing_output_ports(self): """ This method is only used for ensuring correct region execution. @@ -133,6 +138,9 @@ def set_up_port_storage_writer(self, port_id: PortIdentity, storage_uri_base: st state materialization on the same port. `storage_uri_base` is the port's base URI; the result and state URIs are derived from it. """ + # Remember the base URI so `reset_loopend_storage` can re-provision + # the iceberg tables on subsequent loop iterations. + self._storage_uri_base = storage_uri_base document, _ = DocumentFactory.open_document( VFSURIFactory.result_uri(storage_uri_base) ) @@ -217,6 +225,19 @@ def save_state_to_storage_if_needed(self, state: State, port_id=None) -> None: elif port_id in self._port_state_writers: self._port_state_writers[port_id][0].put(element) + def reset_storage(self) -> None: + port_id = self.get_port_ids()[0] + storage_uri_base = self._storage_uri_base + self.close_port_storage_writers() + DocumentFactory.create_document( + VFSURIFactory.result_uri(storage_uri_base), + self._ports[port_id].get_schema(), + ) + DocumentFactory.create_document( + VFSURIFactory.state_uri(storage_uri_base), State.SCHEMA + ) + self.set_up_port_storage_writer(port_id, storage_uri_base) + def close_port_storage_writers(self) -> None: """ Flush the buffers of port storage writers and wait for all the diff --git a/amber/src/main/python/core/models/operator.py b/amber/src/main/python/core/models/operator.py index 71da570c7f4..b51e284c55b 100644 --- a/amber/src/main/python/core/models/operator.py +++ b/amber/src/main/python/core/models/operator.py @@ -291,3 +291,30 @@ def process_table(self, table: Table, port: int) -> Iterator[Optional[TableLike] time, or None. """ yield + + +class LoopStartOperator(TableOperator): + @overrides.final + def process_state(self, state: State, port: int) -> Optional[State]: + if "LoopStartStateURI" in state: + state["loop_counter"] += 1 + return state + self.state.update(state) + return None + + @overrides.final + def produce_state_on_finish(self, port: int) -> State: + from pickle import dumps + + self.state["table"] = dumps(Table(self._TableOperator__table_data[port])) + return dict(self.state) + + +class LoopEndOperator(TableOperator): + @overrides.final + def process_table(self, table: Table, port: int) -> Iterator[Optional[TableLike]]: + yield table + + @abstractmethod + def condition(self) -> bool: + pass diff --git a/amber/src/main/python/core/runnables/main_loop.py b/amber/src/main/python/core/runnables/main_loop.py index 1334af12bfe..faa57b2c31b 100644 --- a/amber/src/main/python/core/runnables/main_loop.py +++ b/amber/src/main/python/core/runnables/main_loop.py @@ -38,8 +38,11 @@ ECMElement, InternalQueueElement, ) +from core.models.operator import LoopEndOperator, LoopStartOperator from core.models.state import State from core.runnables.data_processor import DataProcessor +from core.storage.document_factory import DocumentFactory +from core.storage.vfs_uri_factory import VFSURIFactory from core.util import StoppableQueueBlockingRunnable, get_one_of from core.util.console_message.timestamp import current_time_in_local_timezone from core.util.customized_queue.queue_base import QueueElement @@ -48,6 +51,7 @@ PortIdentity, ChannelIdentity, EmbeddedControlMessageIdentity, + OperatorIdentity, ) from proto.org.apache.texera.amber.engine.architecture.rpc import ( ConsoleMessage, @@ -61,6 +65,7 @@ EmbeddedControlMessage, AsyncRpcContext, ControlRequest, + JumpToOperatorRegionRequest, ) from proto.org.apache.texera.amber.engine.architecture.worker import ( WorkerState, @@ -87,6 +92,38 @@ def __init__( target=self.data_processor.run, daemon=True, name="data_processor_thread" ).start() + def _attach_loop_start_id(self, output_state: State) -> None: + if "LoopStartId" in output_state: + return + output_state["LoopStartId"] = self.context.worker_id.split("-", 1)[1].rsplit( + "-main-0", 1 + )[0] + # The URI lives on the upstream operator's output port (which + # LoopStart's first materialization reader is reading from). + reader_runnables = ( + self.context.input_manager.get_input_port_mat_reader_threads() + ) + output_state["LoopStartStateURI"] = VFSURIFactory.state_uri( + next(iter(reader_runnables.values()))[0].uri + ) + + def _jump_to_loop_start( + self, executor: LoopEndOperator, controller_interface + ) -> None: + state = executor.state + controller_interface.jump_to_operator_region( + JumpToOperatorRegionRequest(OperatorIdentity(state["LoopStartId"])) + ) + uri = state["LoopStartStateURI"] + # Strip the per-iteration scratch (`table`, `output`) and the + # loop metadata (`LoopStartId`, `LoopStartStateURI`) so only the + # user-visible loop state is written back to LoopStart's input. + for key in ("table", "output", "LoopStartId", "LoopStartStateURI"): + state.pop(key, None) + writer = DocumentFactory.create_document(uri, State.SCHEMA).writer("0") + writer.put_one(State(state).to_tuple()) + writer.close() + def complete(self) -> None: """ Complete the DataProcessor, marking state to COMPLETED, and notify the @@ -94,12 +131,15 @@ def complete(self) -> None: """ # flush the buffered console prints self._check_and_report_console_messages(force_flush=True) - self.context.executor_manager.executor.close() + controller_interface = self._async_rpc_client.controller_stub() + executor = self.context.executor_manager.executor + if isinstance(executor, LoopEndOperator) and executor.condition(): + self._jump_to_loop_start(executor, controller_interface) + executor.close() # stop the data processing thread self.data_processor.stop() self.context.state_manager.transit_to(WorkerState.COMPLETED) self.context.statistics_manager.update_total_execution_time(time.time_ns()) - controller_interface = self._async_rpc_client.controller_stub() controller_interface.worker_execution_completed(EmptyRequest()) self.context.close() @@ -193,6 +233,11 @@ def process_input_state(self) -> None: self._switch_context() output_state = self.context.state_processing_manager.get_output_state() if output_state is not None: + executor = self.context.executor_manager.executor + if isinstance(executor, LoopEndOperator): + self.context.output_manager.reset_storage() + elif isinstance(executor, LoopStartOperator): + self._attach_loop_start_id(output_state) for to, batch in self.context.output_manager.emit_state(output_state): self._output_queue.put( DataElement( diff --git a/amber/src/main/python/pytexera/__init__.py b/amber/src/main/python/pytexera/__init__.py index e40d1a43fe0..c6001667380 100644 --- a/amber/src/main/python/pytexera/__init__.py +++ b/amber/src/main/python/pytexera/__init__.py @@ -19,6 +19,7 @@ from overrides import overrides from typing import Iterator, Optional, Union +from core.models.operator import LoopStartOperator, LoopEndOperator from pyamber import * from .storage.dataset_file_document import DatasetFileDocument from .storage.large_binary_input_stream import LargeBinaryInputStream @@ -43,6 +44,8 @@ "UDFTableOperator", "UDFBatchOperator", "UDFSourceOperator", + "LoopStartOperator", + "LoopEndOperator", "DatasetFileDocument", "largebinary", "LargeBinaryInputStream", diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala index 5a9df11b589..6d083fa6d6b 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala @@ -576,8 +576,17 @@ class RegionExecutionCoordinator( region.getOperator(outputPortId.opId).outputPorts(outputPortId.portId)._3 val schema = schemaOptional.getOrElse(throw new IllegalStateException("Schema is missing")) - DocumentFactory.createDocument(resultURI, schema) - DocumentFactory.createDocument(stateURI, State.schema) + // LoopEnd operators may re-execute the region multiple times; on + // subsequent iterations the result/state documents already exist, + // and `createDocument` (overrideIfExists=true) would clobber them. + // Skip the create call when the document is already there. + val isLoopEndRegion = region.getOperators.exists(_.isLoopEnd) + if (!isLoopEndRegion || !DocumentFactory.documentExists(resultURI)) { + DocumentFactory.createDocument(resultURI, schema) + } + if (!isLoopEndRegion || !DocumentFactory.documentExists(stateURI)) { + DocumentFactory.createDocument(stateURI, State.schema) + } if (!isRestart) { val (_, eid, _, _) = decodeURI(resultURI) WorkflowExecutionsResource.insertOperatorPortResultUri( diff --git a/amber/src/main/scala/org/apache/texera/web/service/WorkflowExecutionService.scala b/amber/src/main/scala/org/apache/texera/web/service/WorkflowExecutionService.scala index 741687e02c9..c6bbbafc88d 100644 --- a/amber/src/main/scala/org/apache/texera/web/service/WorkflowExecutionService.scala +++ b/amber/src/main/scala/org/apache/texera/web/service/WorkflowExecutionService.scala @@ -21,13 +21,14 @@ package org.apache.texera.web.service import com.typesafe.scalalogging.LazyLogging import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdentity} -import org.apache.texera.amber.core.workflow.WorkflowContext +import org.apache.texera.amber.core.workflow.{ExecutionMode, WorkflowContext} import org.apache.texera.amber.engine.architecture.controller.{ControllerConfig, Workflow} import org.apache.texera.amber.engine.architecture.rpc.controlcommands.EmptyRequest import org.apache.texera.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState._ import org.apache.texera.amber.engine.common.Utils import org.apache.texera.amber.engine.common.client.AmberClient import org.apache.texera.amber.engine.common.executionruntimestate.ExecutionMetadataStore +import org.apache.texera.amber.operator.loop.LoopStartOpDesc import org.apache.texera.web.model.websocket.event.{ TexeraWebSocketEvent, WorkflowErrorEvent, @@ -66,7 +67,12 @@ class WorkflowExecutionService( ) extends SubscriptionManager with LazyLogging { - workflowContext.workflowSettings = request.workflowSettings + workflowContext.workflowSettings = + if (request.logicalPlan.operators.exists(_.isInstanceOf[LoopStartOpDesc])) { + request.workflowSettings.copy(executionMode = ExecutionMode.MATERIALIZED) + } else { + request.workflowSettings + } val wsInput = new WebsocketInput(errorHandler) addSubscription( diff --git a/amber/src/test/python/core/models/test_loop_operators.py b/amber/src/test/python/core/models/test_loop_operators.py new file mode 100644 index 00000000000..8fee19f09f2 --- /dev/null +++ b/amber/src/test/python/core/models/test_loop_operators.py @@ -0,0 +1,424 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Unit tests for the loop runtime: LoopStartOperator and LoopEndOperator. + +These exercise the abstract base classes in operator.py that the +generated `ProcessLoopStartOperator` / `ProcessLoopEndOperator` classes +extend. The tests use minimal stub subclasses that mirror what +`LoopStartOpDesc.generatePythonCode` / `LoopEndOpDesc.generatePythonCode` +emit so the behavior covered here is the same shape that ships at +runtime. + +Single-loop coverage: + - LoopStart's first-time state observation (merge into self.state). + - LoopEnd's process_table is the identity yield. + - End-to-end one-iteration loop driven through the matching-loop branch. + +Nested-loop coverage: + - LoopStart.process_state with `LoopStartStateURI` already present + must increment `loop_counter` and pass the state through downstream + (this is what makes inner LoopStart not consume outer-loop state). + - LoopEnd's generated process_state, when `loop_counter > 0`, must + decrement and return the state unchanged so the outer LoopEnd is + the one that runs the user's update / condition. + - Round-trip outer × inner loop preserves the nesting invariant + (loop_counter is symmetric across LoopStart/LoopEnd traversals). +""" + +from pickle import loads +from typing import Iterator, Optional + +import pytest + +from core.models import State, Table, TableLike, Tuple +from core.models.operator import LoopEndOperator, LoopStartOperator + + +# --------------------------------------------------------------------------- +# Stub subclasses that mirror the generated Python in +# LoopStart/LoopEnd OpDesc. Keeping them here (rather than reusing the +# real generator) lets the test pin behavior without spinning up a Scala +# runtime to produce code. +# --------------------------------------------------------------------------- + + +class _StubLoopStart(LoopStartOperator): + """Mirrors `ProcessLoopStartOperator` from LoopStartOpDesc codegen. + + open() seeds `loop_counter` to 0 and runs the user's `initialization`. + process_table runs the user's `output` expression and yields the + result for downstream. + """ + + def __init__(self, initialization="i = 0", output_expr="table.iloc[i]"): + super().__init__() + self._initialization = initialization + self._output_expr = output_expr + + def open(self) -> None: + self.state = {"loop_counter": 0} + exec(self._initialization, {}, self.state) + + def process_table(self, table: Table, port: int) -> Iterator[Optional[TableLike]]: + self.state["table"] = table + exec(f"output = {self._output_expr}", {}, self.state) + yield self.state["output"] + + +class _StubLoopEnd(LoopEndOperator): + """Mirrors `ProcessLoopEndOperator` from LoopEndOpDesc codegen. + + process_state recognises the nested-loop pass-through path + (`loop_counter > 0`) and decrements; on the matching-loop branch + it stashes the state, deserializes the pickled table, and runs the + user's `update`. condition() returns the boolean result of the + user's `condition` expression evaluated in self.state. + """ + + def __init__(self, update="i += 1", condition_expr="i < 3"): + super().__init__() + self._update = update + self._condition_expr = condition_expr + self.state = {} + + def process_state(self, state: State, port: int) -> Optional[State]: + loop_counter = int(state.get("loop_counter", 0)) + if loop_counter > 0: + state["loop_counter"] = loop_counter - 1 + return state + self.state = dict(state) + self.state["table"] = loads(self.state["table"]) + exec(self._update, {}, self.state) + return None + + def condition(self) -> bool: + exec(f"output = {self._condition_expr}", {}, self.state) + return self.state["output"] + + +# --------------------------------------------------------------------------- +# LoopStartOperator — process_state +# --------------------------------------------------------------------------- + + +class TestLoopStartProcessState: + def test_first_time_state_is_merged_into_self_state_and_none_is_returned(self): + # First entry: state from upstream (no LoopStartStateURI). The + # base class must merge it into self.state and return None so + # nothing flows downstream of LoopStart until the table is in. + op = _StubLoopStart() + op.open() + op.state["i"] = 0 # simulate the user's initialization + + result = op.process_state(State({"upstream_key": "v"}), port=0) + + assert result is None, "first-time state must not be forwarded" + assert op.state["upstream_key"] == "v", "state was not merged into self.state" + # loop_counter is left at its open() value (0) on first entry. + assert op.state["loop_counter"] == 0 + + def test_reentry_state_with_loop_start_uri_increments_loop_counter(self): + # Re-entry from this LoopStart's own LoopEnd: the state carries + # LoopStartStateURI, so the base class must INCREMENT + # loop_counter and PASS THROUGH the state downstream. This is + # what main_loop's _attach_loop_start_id relies on. + op = _StubLoopStart() + op.open() + incoming = State({"LoopStartStateURI": "vfs:///x", "loop_counter": 0, "i": 2}) + + result = op.process_state(incoming, port=0) + + assert result is not None, "re-entry state must be returned for downstream" + assert result["loop_counter"] == 1 + # The user variable rides along. + assert result["i"] == 2 + + def test_reentry_at_nested_loop_counter_bumps_one(self): + # Nested loop: an outer loop's re-entry state passes through this + # inner LoopStart with a loop_counter already > 0 (because the + # outer LoopStart bumped it on its own re-entry first). The + # invariant is that we only ever +1, never reset. + op = _StubLoopStart() + op.open() + incoming = State({"LoopStartStateURI": "vfs:///outer", "loop_counter": 5}) + + result = op.process_state(incoming, port=0) + + assert result["loop_counter"] == 6 + + +# --------------------------------------------------------------------------- +# LoopStartOperator — produce_state_on_finish +# --------------------------------------------------------------------------- + + +class TestLoopStartProduceStateOnFinish: + def test_pickles_buffered_table_into_state_table_field(self): + # produce_state_on_finish must serialize the buffered table via + # pickle (so the cross-region state stream can carry a heavy + # pandas DataFrame as bytes). The receiving LoopEnd unpickles + # it on the matching-loop branch. + op = _StubLoopStart() + op.open() + # Drive a couple of tuples through to populate the per-port buffer. + list(op.process_tuple(Tuple({"v": 1}), port=0)) + list(op.process_tuple(Tuple({"v": 2}), port=0)) + + produced = op.produce_state_on_finish(port=0) + + assert isinstance(produced, dict) + assert "table" in produced + assert isinstance(produced["table"], bytes), "table must be pickled bytes" + # Round-trip through pickle.loads must give back our two tuples. + unpickled = loads(produced["table"]) + assert isinstance(unpickled, Table) + rows = list(unpickled.as_tuples()) + assert rows == [Tuple({"v": 1}), Tuple({"v": 2})] + + def test_user_state_fields_survive_into_produced_state(self): + # Any vars the user set in open() (e.g. i, accumulators) must + # ride along in the produced state so LoopEnd can run the user's + # `update` expression against them. + op = _StubLoopStart(initialization="i = 0; acc = []") + op.open() + list(op.process_tuple(Tuple({"v": 1}), port=0)) + + produced = op.produce_state_on_finish(port=0) + + assert produced["i"] == 0 + assert produced["acc"] == [] + assert produced["loop_counter"] == 0 + + +# --------------------------------------------------------------------------- +# LoopEndOperator — base class behaviour +# --------------------------------------------------------------------------- + + +class TestLoopEndBase: + def test_process_table_yields_input_table_unchanged(self): + # The base class finalizes process_table to a single identity + # yield. The user only ever overrides condition() and (via + # codegen) process_state. + op = _StubLoopEnd() + in_table = Table([Tuple({"x": 1}), Tuple({"x": 2})]) + out = list(op.process_table(in_table, port=0)) + assert out == [in_table] + + def test_condition_is_abstract_on_base_class(self): + # A class that extends LoopEndOperator without supplying + # condition() must be uninstantiable. This is what stops a + # user from shipping a loop with an empty exit condition. + class _Missing(LoopEndOperator): + pass + + with pytest.raises(TypeError, match="condition"): + _Missing() + + +# --------------------------------------------------------------------------- +# Generated-style LoopEnd — single-loop matching branch +# --------------------------------------------------------------------------- + + +class TestLoopEndMatchingBranch: + def test_loop_counter_zero_runs_user_update_and_returns_none(self): + # The matching-loop branch (loop_counter == 0) is where the user's + # update expression runs. process_state must return None so no + # state flows downstream; the actual loop-back is driven by + # main_loop.complete() reading executor.state. + op = _StubLoopEnd(update="i += 1") + # Simulate LoopStart's produced state arriving here. + from pickle import dumps + + incoming = State( + { + "loop_counter": 0, + "i": 2, + "table": dumps(Table([Tuple({"v": 1})])), + "LoopStartId": "outer-loop", + "LoopStartStateURI": "vfs:///outer", + } + ) + + result = op.process_state(incoming, port=0) + + assert result is None, "matching-loop branch must not emit state downstream" + assert op.state["i"] == 3, "user's update did not run on the matching branch" + # The table is unpickled in-place so condition() can see it as + # a real Table without a second round of deserialization. + assert isinstance(op.state["table"], Table) + # Loop metadata is preserved so _jump_to_loop_start can read it. + assert op.state["LoopStartId"] == "outer-loop" + assert op.state["LoopStartStateURI"] == "vfs:///outer" + + def test_condition_evaluates_user_expression_against_stashed_state(self): + op = _StubLoopEnd(update="i += 1", condition_expr="i < 3") + from pickle import dumps + + # Drive process_state once so self.state is populated. + op.process_state( + State( + { + "loop_counter": 0, + "i": 1, + "table": dumps(Table([Tuple({"v": 1})])), + } + ), + port=0, + ) + assert op.condition() is True # i became 2, 2 < 3 + + # Run another iteration to push i past the threshold. + op.process_state( + State( + { + "loop_counter": 0, + "i": 2, + "table": dumps(Table([Tuple({"v": 1})])), + } + ), + port=0, + ) + assert op.condition() is False # i became 3, 3 < 3 is False + + +# --------------------------------------------------------------------------- +# Nested loops — LoopEnd pass-through branch +# --------------------------------------------------------------------------- + + +class TestLoopEndNestedPassThrough: + def test_loop_counter_positive_decrements_and_passes_state_through(self): + # When the inner LoopEnd receives state with loop_counter > 0, + # the state belongs to an OUTER loop. The inner LoopEnd must + # decrement loop_counter and return the state for downstream + # routing (which eventually reaches the outer LoopEnd at + # loop_counter == 0). + op = _StubLoopEnd(update="i += 1") + op.state = {"sentinel": "must_not_be_overwritten"} + + incoming = State({"loop_counter": 2, "outer_var": "v"}) + result = op.process_state(incoming, port=0) + + assert result is not None, "pass-through branch must emit state downstream" + assert result["loop_counter"] == 1 + assert result["outer_var"] == "v" + # The pass-through branch must NOT overwrite self.state — the + # inner LoopEnd's own matching-loop state from a previous inner + # iteration must be preserved. + assert op.state == {"sentinel": "must_not_be_overwritten"} + + def test_pass_through_chain_collapses_to_matching_branch_at_zero(self): + # Walk a state with loop_counter=3 through three levels of + # nested LoopEnds: each strips one level, and the fourth + # (loop_counter == 0) is the matching loop that runs the + # user's update. This pins the depth-symmetric invariant + # nested-for-loop scheduling depends on. + from pickle import dumps + + outer = _StubLoopEnd(update="i += 10") + middle = _StubLoopEnd(update="i += 100") + inner = _StubLoopEnd(update="i += 1000") + match = _StubLoopEnd(update="i += 1") + + state = State( + { + "loop_counter": 3, + "i": 0, + "table": dumps(Table([Tuple({"v": 1})])), + } + ) + + # Each outer→inner hop decrements once. + state = outer.process_state(state, port=0) + assert state["loop_counter"] == 2 + state = middle.process_state(state, port=0) + assert state["loop_counter"] == 1 + state = inner.process_state(state, port=0) + assert state["loop_counter"] == 0 + # At loop_counter == 0 the matching LoopEnd consumes the state + # and runs ITS user update — not any of the pass-through ops'. + result = match.process_state(state, port=0) + assert result is None + assert match.state["i"] == 1, "only the matching LoopEnd's update should fire" + + +# --------------------------------------------------------------------------- +# Nested loops — round trip +# --------------------------------------------------------------------------- + + +class TestNestedLoopRoundTrip: + def test_outer_then_inner_loop_state_keeps_counters_symmetric(self): + # Simulate the state flow for one outer iteration that itself + # triggers one inner iteration: + # + # outer LoopStart re-entry → loop_counter 0 → 1 + # inner LoopStart re-entry → loop_counter 1 → 2 + # inner LoopEnd → loop_counter 2 → 1 + # outer LoopEnd → loop_counter 1 → 0 (matching branch) + # + # The matching branch on the outer LoopEnd is reached iff every + # increment is mirrored by exactly one decrement. A bug in + # either side would land us in the wrong branch. + outer_start = _StubLoopStart() + inner_start = _StubLoopStart() + inner_end = _StubLoopEnd(update="outer_i += 100") + outer_end = _StubLoopEnd(update="outer_i += 1") + outer_start.open() + inner_start.open() + + from pickle import dumps + + # outer LoopEnd jumped back to outer LoopStart with this state. + state_in = State( + { + "LoopStartStateURI": "vfs:///outer", + "loop_counter": 0, + "outer_i": 0, + "table": dumps(Table([Tuple({"v": 1})])), + } + ) + + # outer LoopStart re-entry: +1 + state_after_outer_start = outer_start.process_state(state_in, port=0) + assert state_after_outer_start["loop_counter"] == 1 + # inner LoopStart sees the same passing state and +1 again. + state_after_inner_start = inner_start.process_state( + state_after_outer_start, port=0 + ) + assert state_after_inner_start["loop_counter"] == 2 + # inner LoopEnd: pass-through branch (-1). + state_after_inner_end = inner_end.process_state(state_after_inner_start, port=0) + assert state_after_inner_end is not None + assert state_after_inner_end["loop_counter"] == 1 + # outer LoopEnd: pass-through (-1) lands at 0, the matching branch. + # Now process_state would have to run the matching branch path + # because loop_counter == 0. To get there we need one more hop: + result = outer_end.process_state(state_after_inner_end, port=0) + # NOT yet at 0 — pass-through decrements to 0 and returns. The + # NEXT hop is the matching branch. + assert result is not None + assert result["loop_counter"] == 0 + + # Final landing on the matching branch consumes the state and + # runs the outer update (outer_i += 1). + matching = _StubLoopEnd(update="outer_i += 1") + assert matching.process_state(result, port=0) is None + assert matching.state["outer_i"] == 1 diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/PhysicalOp.scala b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/PhysicalOp.scala index 44125045c97..650c73d2c7b 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/PhysicalOp.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/PhysicalOp.scala @@ -198,6 +198,7 @@ case class PhysicalOp( // schema propagation function propagateSchema: SchemaPropagationFunc = SchemaPropagationFunc(schemas => schemas), isOneToManyOp: Boolean = false, + isLoopEnd: Boolean = false, // hint for number of workers suggestedWorkerNum: Option[Int] = None, // name of the PVE to execute within @@ -316,6 +317,14 @@ case class PhysicalOp( def withIsOneToManyOp(isOneToManyOp: Boolean): PhysicalOp = this.copy(isOneToManyOp = isOneToManyOp) + /** + * Creates a copy marked as a LoopEnd operator. Used by the region + * scheduler to preserve this operator's iceberg output across loop + * iterations instead of overwriting it on every region invocation. + */ + def withIsLoopEnd(isLoopEnd: Boolean): PhysicalOp = + this.copy(isLoopEnd = isLoopEnd) + /** * Creates a copy of the PhysicalOp with the schema of a specified input port updated. * The schema can either be a successful schema definition or an error represented as a Throwable. diff --git a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala index 4e9d6c6e2cd..44e71a379bb 100644 --- a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala +++ b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala @@ -144,6 +144,7 @@ import org.apache.texera.amber.operator.visualization.waterfallChart.WaterfallCh import org.apache.texera.amber.operator.visualization.windRoseChart.WindRoseChartOpDesc import org.apache.texera.amber.operator.visualization.wordCloud.WordCloudOpDesc import org.apache.commons.lang3.builder.{EqualsBuilder, HashCodeBuilder, ToStringBuilder} +import org.apache.texera.amber.operator.loop.{LoopEndOpDesc, LoopStartOpDesc} import org.apache.texera.amber.operator.sklearn.testing.SklearnTestingOpDesc import org.apache.texera.amber.operator.source.scan.file.{FileScanOpDesc, FileScanSourceOpDesc} import org.apache.texera.amber.operator.visualization.stripChart.StripChartOpDesc @@ -220,6 +221,8 @@ trait StateTransferFunc new Type(value = classOf[TypeCastingOpDesc], name = "TypeCasting"), new Type(value = classOf[LimitOpDesc], name = "Limit"), new Type(value = classOf[SleepOpDesc], name = "Sleep"), + new Type(value = classOf[LoopStartOpDesc], name = "LoopStart"), + new Type(value = classOf[LoopEndOpDesc], name = "LoopEnd"), new Type(value = classOf[RandomKSamplingOpDesc], name = "RandomKSampling"), new Type(value = classOf[ReservoirSamplingOpDesc], name = "ReservoirSampling"), new Type(value = classOf[HashJoinOpDesc[String]], name = "HashJoin"), diff --git a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopEndOpDesc.scala b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopEndOpDesc.scala new file mode 100644 index 00000000000..db4af48fc90 --- /dev/null +++ b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopEndOpDesc.scala @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.operator.loop + +import com.fasterxml.jackson.annotation.JsonProperty +import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle +import org.apache.texera.amber.core.executor.OpExecWithCode +import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdentity} +import org.apache.texera.amber.core.workflow.{InputPort, OutputPort, PhysicalOp} +import org.apache.texera.amber.operator.LogicalOp +import org.apache.texera.amber.operator.metadata.{OperatorGroupConstants, OperatorInfo} + +class LoopEndOpDesc extends LogicalOp { + @JsonProperty(required = true, defaultValue = "i += 1") + @JsonSchemaTitle("Update") + var update: String = _ + + @JsonProperty(required = true, defaultValue = "i < len(table)") + @JsonSchemaTitle("Condition") + var condition: String = _ + + override def getPhysicalOp( + workflowId: WorkflowIdentity, + executionId: ExecutionIdentity + ): PhysicalOp = + PhysicalOp + .oneToOnePhysicalOp( + workflowId, + executionId, + operatorIdentifier, + OpExecWithCode(generatePythonCode(), "python") + ) + .withInputPorts(operatorInfo.inputPorts) + .withOutputPorts(operatorInfo.outputPorts) + .withSuggestedWorkerNum(1) + .withParallelizable(false) + .withIsLoopEnd(true) + + override def operatorInfo: OperatorInfo = + OperatorInfo( + "Loop End", + "Close a loop body and decide whether to iterate again based on a condition; pairs with Loop Start.", + OperatorGroupConstants.CONTROL_GROUP, + inputPorts = List(InputPort()), + outputPorts = List(OutputPort()) + ) + + def generatePythonCode(): String = { + s""" + |from pytexera import * + |class ProcessLoopEndOperator(LoopEndOperator): + | @overrides + | def process_state(self, state: State, port: int) -> Optional[State]: + | loop_counter = int(state.get("loop_counter", 0)) + | if loop_counter > 0: + | state["loop_counter"] = loop_counter - 1 + | return state + | self.state = dict(state) + | from pickle import loads + | self.state["table"] = loads(self.state["table"]) + | exec("$update", {}, self.state) + | return None + | + | @overrides + | def condition(self) -> bool: + | exec("output = $condition", {}, self.state) + | return self.state["output"] + |""".stripMargin + } +} diff --git a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopStartOpDesc.scala b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopStartOpDesc.scala new file mode 100644 index 00000000000..c00c99cc0e6 --- /dev/null +++ b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopStartOpDesc.scala @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.operator.loop + +import com.fasterxml.jackson.annotation.JsonProperty +import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle +import org.apache.texera.amber.core.executor.OpExecWithCode +import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdentity} +import org.apache.texera.amber.core.workflow.{InputPort, OutputPort, PhysicalOp} +import org.apache.texera.amber.operator.LogicalOp +import org.apache.texera.amber.operator.metadata.{OperatorGroupConstants, OperatorInfo} + +class LoopStartOpDesc extends LogicalOp { + @JsonProperty(required = true, defaultValue = "i = 0") + @JsonSchemaTitle("Initialization") + var initialization: String = _ + + @JsonProperty(required = true, defaultValue = "table.iloc[i]") + @JsonSchemaTitle("Output") + var output: String = _ + + override def getPhysicalOp( + workflowId: WorkflowIdentity, + executionId: ExecutionIdentity + ): PhysicalOp = + PhysicalOp + .oneToOnePhysicalOp( + workflowId, + executionId, + operatorIdentifier, + OpExecWithCode(generatePythonCode(), "python") + ) + .withInputPorts(operatorInfo.inputPorts) + .withOutputPorts(operatorInfo.outputPorts) + .withSuggestedWorkerNum(1) + .withParallelizable(false) + + override def operatorInfo: OperatorInfo = + OperatorInfo( + "Loop Start", + "Begin a loop that iterates over rows of the input table; pairs with Loop End.", + OperatorGroupConstants.CONTROL_GROUP, + inputPorts = List(InputPort()), + outputPorts = List(OutputPort()) + ) + + def generatePythonCode(): String = { + s""" + |from pytexera import * + |class ProcessLoopStartOperator(LoopStartOperator): + | @overrides + | def open(self): + | self.state = {"loop_counter": 0} + | exec("$initialization", {}, self.state) + | + | @overrides + | def process_table(self, table: Table, port: int) -> Iterator[Optional[TableLike]]: + | self.state["table"] = table + | exec("output = $output", {}, self.state) + | yield self.state["output"] + |""".stripMargin + } +} diff --git a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/loop/LoopEndOpDescSpec.scala b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/loop/LoopEndOpDescSpec.scala new file mode 100644 index 00000000000..fe0963f63fe --- /dev/null +++ b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/loop/LoopEndOpDescSpec.scala @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.operator.loop + +import org.apache.texera.amber.core.executor.OpExecWithCode +import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdentity} +import org.apache.texera.amber.operator.metadata.OperatorGroupConstants +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +class LoopEndOpDescSpec extends AnyFlatSpec with Matchers { + + private val workflowId = WorkflowIdentity(1L) + private val executionId = ExecutionIdentity(1L) + + private def desc( + update: String = "i += 1", + condition: String = "i < len(table)" + ): LoopEndOpDesc = { + val d = new LoopEndOpDesc() + d.update = update + d.condition = condition + d + } + + "LoopEndOpDesc.operatorInfo" should "advertise the user-friendly name and Control group" in { + val info = desc().operatorInfo + info.userFriendlyName shouldBe "Loop End" + info.operatorGroupName shouldBe OperatorGroupConstants.CONTROL_GROUP + info.operatorDescription should include("loop") + } + + it should "expose exactly one input port and one output port" in { + val info = desc().operatorInfo + info.inputPorts should have length 1 + info.outputPorts should have length 1 + } + + "LoopEndOpDesc.generatePythonCode" should "embed the user-supplied update and condition expressions" in { + // Distinct sentinels so we know the codegen wires the right user field + // into the right `exec(...)` site. If `condition` were accidentally + // pasted in place of `update`, a generic `code.contains("i")` check + // would still pass — these sentinels force the asymmetry. + val code = desc(update = "UPDATE_SENT", condition = "COND_SENT").generatePythonCode() + code should include("UPDATE_SENT") + code should include("COND_SENT") + } + + it should "subclass LoopEndOperator from pytexera" in { + // Runtime branch `isinstance(executor, LoopEndOperator)` in main_loop + // gates the loop-end reset path; a rename of either side must break + // this assertion. + val code = desc().generatePythonCode() + code should include("from pytexera import *") + code should include("class ProcessLoopEndOperator(LoopEndOperator)") + } + + it should "declare condition() as returning bool, matching the abstract base" in { + // The abstract base in operator.py was fixed to `-> bool`; the + // generator template must agree. A `-> None` slip here would produce + // a class that disagrees with the abstract contract. + val code = desc().generatePythonCode() + code should include("def condition(self) -> bool:") + } + + it should "decrement loop_counter and pass state through when loop_counter > 0 (nested-loop case)" in { + // For nested loops, the inner LoopEnd sees state belonging to an + // outer loop. The generated process_state recognises this by a + // positive loop_counter and just decrements + returns the state, + // leaving the actual loop-control work to the outer LoopEnd. + // This branch is critical for nested-for-loop correctness so pin + // its shape explicitly. + val code = desc().generatePythonCode() + code should include("loop_counter = int(state.get(\"loop_counter\", 0))") + code should include("if loop_counter > 0:") + code should include("state[\"loop_counter\"] = loop_counter - 1") + } + + it should "stash state, deserialize the pickled table, and run the user update on the matching-loop branch" in { + val code = desc(update = "i = i + 7").generatePythonCode() + // The matching-loop branch is the path the user's `update` expression + // runs on. Pin the pickle round-trip and the exec call so a refactor + // of either is intentional. + code should include("self.state = dict(state)") + code should include("from pickle import loads") + code should include("self.state[\"table\"] = loads(self.state[\"table\"])") + code should include("exec(\"i = i + 7\"") + } + + it should "evaluate the user condition in process-shared state" in { + val code = desc(condition = "i < 3").generatePythonCode() + // condition() must read from self.state (populated by the matching- + // loop branch above) and assign into self.state["output"] before + // returning it. Pinning both the exec format and the assignment + // keeps a future "just return the expr" refactor from silently + // dropping the state side-effect main_loop.complete() depends on. + code should include("exec(\"output = i < 3\"") + code should include("self.state[\"output\"]") + } + + "LoopEndOpDesc.getPhysicalOp" should "produce a non-parallelizable PhysicalOp pinned to a single worker" in { + // Same reasoning as LoopStart: the loop body's per-iteration state + // is per-instance, and the accumulated table must be a single buffer. + val physical = desc().getPhysicalOp(workflowId, executionId) + physical.parallelizable shouldBe false + physical.suggestedWorkerNum shouldBe Some(1) + } + + it should "be tagged as a loop end so RegionExecutionCoordinator skips iceberg recreation" in { + // The isLoopEnd flag drives the + // `if (!isLoopEndRegion || !DocumentFactory.documentExists(...))` + // branch in RegionExecutionCoordinator. Without the tag, every loop + // iteration would unconditionally recreate the result/state tables + // and lose accumulated data. The flag must be set. + val physical = desc().getPhysicalOp(workflowId, executionId) + physical.isLoopEnd shouldBe true + } + + it should "carry the generated Python code via OpExecWithCode" in { + val physical = desc().getPhysicalOp(workflowId, executionId) + physical.opExecInitInfo match { + case OpExecWithCode(code, language) => + language shouldBe "python" + code should include("class ProcessLoopEndOperator(LoopEndOperator)") + case other => + fail(s"expected OpExecWithCode, got $other") + } + } + + it should "carry forward the operatorInfo input/output ports onto the PhysicalOp" in { + val physical = desc().getPhysicalOp(workflowId, executionId) + physical.inputPorts.size shouldBe desc().operatorInfo.inputPorts.size + physical.outputPorts.size shouldBe desc().operatorInfo.outputPorts.size + } +} diff --git a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/loop/LoopStartOpDescSpec.scala b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/loop/LoopStartOpDescSpec.scala new file mode 100644 index 00000000000..2409b5a58c1 --- /dev/null +++ b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/loop/LoopStartOpDescSpec.scala @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.operator.loop + +import org.apache.texera.amber.core.executor.OpExecWithCode +import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdentity} +import org.apache.texera.amber.operator.metadata.OperatorGroupConstants +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +class LoopStartOpDescSpec extends AnyFlatSpec with Matchers { + + private val workflowId = WorkflowIdentity(1L) + private val executionId = ExecutionIdentity(1L) + + private def desc(init: String = "i = 0", out: String = "table.iloc[i]"): LoopStartOpDesc = { + val d = new LoopStartOpDesc() + d.initialization = init + d.output = out + d + } + + "LoopStartOpDesc.operatorInfo" should "advertise the user-friendly name and Control group" in { + val info = desc().operatorInfo + info.userFriendlyName shouldBe "Loop Start" + info.operatorGroupName shouldBe OperatorGroupConstants.CONTROL_GROUP + info.operatorDescription should include("loop") + } + + it should "expose exactly one input port and one output port" in { + val info = desc().operatorInfo + info.inputPorts should have length 1 + info.outputPorts should have length 1 + } + + "LoopStartOpDesc.generatePythonCode" should "embed the user-supplied initialization and output expressions" in { + // The init + output strings are interpolated directly into the generated + // class so the Python `exec` calls at runtime see the user-provided code. + // Use distinct sentinels so we know each field is wired through and not + // accidentally swapped (e.g. init pasted in place of output). + val code = desc(init = "INIT_SENT", out = "OUT_SENT").generatePythonCode() + code should include("INIT_SENT") + code should include("OUT_SENT") + } + + it should "subclass LoopStartOperator from pytexera" in { + // The generated class must extend the base LoopStartOperator (defined + // in core.models.operator) so the runtime's + // `isinstance(executor, LoopStartOperator)` branch in main_loop fires + // for it. A rename of either side should break this assertion. + val code = desc().generatePythonCode() + code should include("from pytexera import *") + code should include("class ProcessLoopStartOperator(LoopStartOperator)") + } + + it should "wire the initialization expression into open() and the output expression into process_table()" in { + // The user's `initialization` runs once in `open()` to seed self.state + // (specifically self.state['loop_counter'] = 0 plus user vars); the + // user's `output` runs in `process_table()` against the buffered table. + // Pin both call sites so a future refactor that swaps the two doesn't + // silently produce a runnable-looking class that loops over nothing. + val code = desc(init = "i = 0", out = "table.iloc[i]").generatePythonCode() + code should include("def open(self)") + code should include("\"loop_counter\": 0") + code should include("exec(\"i = 0\"") + code should include("def process_table(self, table: Table, port: int)") + code should include("exec(\"output = table.iloc[i]\"") + } + + "LoopStartOpDesc.getPhysicalOp" should "produce a non-parallelizable PhysicalOp pinned to a single worker" in { + // LoopStart must run on exactly one worker because the loop state + // (self.state, the accumulated table) is per-instance, not distributed. + // Parallelizing it would fan-out the table and break the loop body's + // per-iteration invariants. + val physical = desc().getPhysicalOp(workflowId, executionId) + physical.parallelizable shouldBe false + physical.suggestedWorkerNum shouldBe Some(1) + } + + it should "not be tagged as a loop end" in { + // The isLoopEnd flag is consumed by RegionExecutionCoordinator to skip + // recreating result/state tables across loop iterations. LoopStart + // must NOT carry the flag — only LoopEnd does. + val physical = desc().getPhysicalOp(workflowId, executionId) + physical.isLoopEnd shouldBe false + } + + it should "carry the generated Python code via OpExecWithCode" in { + val physical = desc().getPhysicalOp(workflowId, executionId) + physical.opExecInitInfo match { + case OpExecWithCode(code, language) => + language shouldBe "python" + code should include("class ProcessLoopStartOperator(LoopStartOperator)") + case other => + fail(s"expected OpExecWithCode, got $other") + } + } + + it should "carry forward the operatorInfo input/output ports onto the PhysicalOp" in { + val physical = desc().getPhysicalOp(workflowId, executionId) + physical.inputPorts.size shouldBe desc().operatorInfo.inputPorts.size + physical.outputPorts.size shouldBe desc().operatorInfo.outputPorts.size + } +} diff --git a/frontend/src/assets/operator_images/LoopEnd.png b/frontend/src/assets/operator_images/LoopEnd.png new file mode 100644 index 00000000000..ee0f9ab6fac Binary files /dev/null and b/frontend/src/assets/operator_images/LoopEnd.png differ diff --git a/frontend/src/assets/operator_images/LoopStart.png b/frontend/src/assets/operator_images/LoopStart.png new file mode 100644 index 00000000000..7e5be023cdf Binary files /dev/null and b/frontend/src/assets/operator_images/LoopStart.png differ