Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
263 commits
Select commit Hold shift + click to select a range
b313e16
fix: update if operator state access
aglinxinyuan Apr 19, 2026
808a5e7
fix fmt
aglinxinyuan Apr 19, 2026
67c4e24
fix fmt
aglinxinyuan Apr 19, 2026
5cd9657
fix fmt
aglinxinyuan Apr 19, 2026
1173dd4
fix fmt
aglinxinyuan Apr 19, 2026
907dd10
fix fmt
aglinxinyuan Apr 19, 2026
42201cb
fix fmt
aglinxinyuan Apr 19, 2026
94b874c
fix fmt
aglinxinyuan Apr 19, 2026
345fa41
fix fmt
aglinxinyuan Apr 19, 2026
b6d14ee
fix fmt
aglinxinyuan Apr 19, 2026
92905d8
fix fmt
aglinxinyuan Apr 19, 2026
fc8cdf8
init
aglinxinyuan Apr 19, 2026
5c1d369
update
aglinxinyuan Apr 20, 2026
52eace6
Apply xinyuan-loop-feb changes from 157156c onto main
aglinxinyuan Apr 20, 2026
57e0f41
Merge branch 'xinyuan-state-only' into xinyuan-loop-feb
aglinxinyuan Apr 20, 2026
cce8da2
fix fmt
aglinxinyuan Apr 20, 2026
45a51e1
Merge remote-tracking branch 'origin/xinyuan-loop-feb' into xinyuan-l…
aglinxinyuan Apr 20, 2026
6be98d5
Merge branch 'main' into xiaozhen-sync-region-kill
aglinxinyuan Apr 20, 2026
e294684
Merge branch 'xiaozhen-sync-region-kill' into xinyuan-state-only
aglinxinyuan Apr 20, 2026
9da2034
Merge branch 'xinyuan-state-only' into xinyuan-loop-feb
aglinxinyuan Apr 20, 2026
2540c8a
test: add multiple state processing regression
aglinxinyuan Apr 20, 2026
e0bb804
fix fmt
aglinxinyuan Apr 20, 2026
151eb15
fix fmt
aglinxinyuan Apr 20, 2026
30a997b
Merge branch 'xinyuan-state-only' into xinyuan-loop-feb
aglinxinyuan Apr 21, 2026
3c7f46b
Merge branch 'main' into xiaozhen-sync-region-kill
aglinxinyuan Apr 21, 2026
e5d8e50
Merge branch 'xiaozhen-sync-region-kill' into xinyuan-state-only
aglinxinyuan Apr 21, 2026
233c8c8
Merge branch 'xinyuan-state-only' into xinyuan-loop-feb
aglinxinyuan Apr 21, 2026
69bb785
update
aglinxinyuan Apr 21, 2026
555e55d
Add scheduler jump-to-operator support
aglinxinyuan Apr 21, 2026
001a409
Add scheduler jump test
aglinxinyuan Apr 21, 2026
58d851d
update
aglinxinyuan Apr 22, 2026
a4a72cc
update
aglinxinyuan Apr 22, 2026
eafb7f6
Merge branch 'main' into xinyuan-scheduler-jump
aglinxinyuan Apr 22, 2026
d695dec
fix fmt
aglinxinyuan Apr 22, 2026
8d3a45c
Merge branch 'main' into xinyuan-remove-passToAllDownstream
chenlica Apr 22, 2026
c4a5f6c
Merge branch 'main' into xinyuan-remove-passToAllDownstream
aglinxinyuan Apr 22, 2026
a0e4e21
Merge branch 'main' into xinyuan-remove-passToAllDownstream
chenlica Apr 22, 2026
abad7f1
Merge branch 'main' into xinyuan-scheduler-jump
aglinxinyuan Apr 22, 2026
3926100
Merge branch 'main' into xinyuan-scheduler-jump
aglinxinyuan Apr 22, 2026
80afe67
update
aglinxinyuan Apr 23, 2026
84bd376
refactor: keep only state changes on top of main
aglinxinyuan Apr 23, 2026
b570aae
refactor: keep state changes without materialization
aglinxinyuan Apr 23, 2026
3c0c816
feat: add state materialization support
aglinxinyuan Apr 23, 2026
bd1bc28
fix: keep only materialization change in region coordinator
aglinxinyuan Apr 23, 2026
272caff
chore: drop local AGENTS file from branch
aglinxinyuan Apr 23, 2026
e7a0f15
test: cover state materialization round trip
aglinxinyuan Apr 23, 2026
3c4dbb8
test: cover multiple state rows in materialization
aglinxinyuan Apr 23, 2026
0588464
Merge branch 'main' into xinyuan-loop-feb
aglinxinyuan Apr 23, 2026
72a3c24
Merge branch 'main' into xinyuan-scheduler-jump
aglinxinyuan Apr 23, 2026
cb1d9e1
fix fmt
aglinxinyuan Apr 24, 2026
3df8fef
Merge branch 'main' into xinyuan-state-only
aglinxinyuan Apr 24, 2026
9200094
Merge branch 'main' into xinyuan-remove-passToAllDownstream
aglinxinyuan Apr 24, 2026
f4b40a5
Merge branch 'xinyuan-remove-passToAllDownstream' into xinyuan-state-…
aglinxinyuan Apr 24, 2026
193e7d3
Merge branch 'main' into xinyuan-state-materialization
aglinxinyuan Apr 24, 2026
09537cb
Merge branch 'xinyuan-state-only' into xinyuan-state-materialization
aglinxinyuan Apr 24, 2026
2484147
Merge branch 'main' into xinyuan-state-only
aglinxinyuan Apr 24, 2026
9c00ac5
Merge branch 'main' into xinyuan-scheduler-jump
aglinxinyuan Apr 24, 2026
59e91c9
Merge branch 'main' into xinyuan-scheduler-jump
aglinxinyuan Apr 25, 2026
a5b5011
sync control channel cleanup on loop branch
aglinxinyuan Apr 25, 2026
ca43316
remove accidental AGENTS file from loop branch
aglinxinyuan Apr 25, 2026
858539d
fix fmt
aglinxinyuan Apr 25, 2026
f182695
fix fmt
aglinxinyuan Apr 25, 2026
2904bf6
fix fmt
aglinxinyuan Apr 25, 2026
45bcb38
Merge branch 'main' into xinyuan-loop-feb
aglinxinyuan Apr 25, 2026
f327f0c
Merge branch 'main' into xinyuan-scheduler-jump
aglinxinyuan Apr 26, 2026
95bd924
Merge branch 'main' into xinyuan-loop-feb
aglinxinyuan Apr 26, 2026
20e2ced
Refactor jump state into execution coordinator
aglinxinyuan Apr 26, 2026
4c8cb5f
Simplify coordinator jump scheduling
aglinxinyuan Apr 26, 2026
b7c2e3b
fix fmt
aglinxinyuan Apr 26, 2026
0aa5017
Clarify jump-to-region API
aglinxinyuan Apr 26, 2026
612206f
Restore iterator-style coordinator wiring
aglinxinyuan Apr 26, 2026
86cbdc5
fix fmt
aglinxinyuan Apr 26, 2026
73d0e2f
Merge branch 'main' into xinyuan-scheduler-jump
aglinxinyuan Apr 27, 2026
f726337
Merge branch 'main' into xinyuan-scheduler-jump
aglinxinyuan Apr 27, 2026
baf5dbb
fix
aglinxinyuan Apr 27, 2026
ba6104f
fix fmt
aglinxinyuan Apr 27, 2026
cff7fb1
Merge branch 'main' into xinyuan-scheduler-jump
aglinxinyuan Apr 28, 2026
560b670
update
aglinxinyuan Apr 28, 2026
3579bc1
update
aglinxinyuan Apr 28, 2026
802bdd0
update
aglinxinyuan Apr 28, 2026
9f5feab
Merge branch 'main' into xinyuan-scheduler-jump
aglinxinyuan Apr 28, 2026
f5ef447
update
aglinxinyuan Apr 28, 2026
24ac4ed
Merge branch 'main' into xinyuan-state-only
aglinxinyuan Apr 28, 2026
cd6809d
Merge branch 'xinyuan-state-only' into xinyuan-state-materialization
aglinxinyuan Apr 28, 2026
488ed16
Merge branch 'main' into xinyuan-loop-feb
aglinxinyuan Apr 29, 2026
7a2038a
Merge branch 'main' into xiaozhen-sync-region-kill
aglinxinyuan Apr 29, 2026
f875fab
Merge branch 'xiaozhen-sync-region-kill' into xinyuan-loop-feb
aglinxinyuan Apr 29, 2026
937c6df
Merge branch 'main' into xiaozhen-sync-region-kill
aglinxinyuan Apr 29, 2026
c06c241
Merge branch 'xiaozhen-sync-region-kill' into xinyuan-loop-feb
aglinxinyuan Apr 29, 2026
cf16195
update
aglinxinyuan Apr 29, 2026
488040e
update
aglinxinyuan Apr 29, 2026
c3ac307
update
aglinxinyuan Apr 29, 2026
62d2854
update
aglinxinyuan Apr 29, 2026
8a72f8d
Merge branch 'main' into xinyuan-state-only
aglinxinyuan Apr 29, 2026
ed650ea
fix(amber): keep per-task finally switches, drop run-loop end-of-body…
aglinxinyuan Apr 30, 2026
60db011
fix: address state review comments
aglinxinyuan Apr 30, 2026
74e2b1d
update
aglinxinyuan Apr 30, 2026
2fe3270
update
aglinxinyuan Apr 30, 2026
87330ec
update
aglinxinyuan Apr 30, 2026
0f9a5f8
Merge branch 'main' into xinyuan-scheduler-jump
aglinxinyuan Apr 30, 2026
83e6fb5
fix fmt
aglinxinyuan Apr 30, 2026
d545e7d
Merge branch 'main' into xinyuan-state-only
Xiao-zhen-Liu Apr 30, 2026
2716fc5
Merge branch 'main' into xinyuan-state-only
aglinxinyuan Apr 30, 2026
178f59b
Merge branch 'main' into xinyuan-scheduler-jump
aglinxinyuan Apr 30, 2026
2ab9e00
test(amber): expand jump-to-operator-region coordinator coverage
aglinxinyuan Apr 30, 2026
9472125
feat(amber): require Schedule level keys to be contiguous from 0
aglinxinyuan Apr 30, 2026
6f40300
Merge branch 'main' into xinyuan-loop-feb
aglinxinyuan Apr 30, 2026
be4baff
Merge remote-tracking branch 'origin/main' into xinyuan-scheduler-jump
aglinxinyuan Apr 30, 2026
264ca26
Merge branch 'main' into xinyuan-scheduler-jump
aglinxinyuan Apr 30, 2026
28a412b
Merge branch 'main' into xinyuan-state-only
aglinxinyuan Apr 30, 2026
3b8ec8e
fix: make python State runtime type
aglinxinyuan Apr 30, 2026
15a1bc8
Merge branch 'main' into xinyuan-state-only
aglinxinyuan Apr 30, 2026
70d4a9a
Merge branch 'xinyuan-state-only' into xinyuan-state-materialization
aglinxinyuan May 1, 2026
0756972
refactor: drop StateFrame.__post_init__ auto-coerce
aglinxinyuan May 1, 2026
d92ed51
fix fmt
aglinxinyuan May 1, 2026
2a8966e
refactor(scala): move State serializers into the State class
aglinxinyuan May 1, 2026
9be8139
Merge xinyuan-state-only + adapt callers to in-class State API
aglinxinyuan May 1, 2026
7303189
fix fmt
aglinxinyuan May 1, 2026
b7842a0
Merge branch 'xinyuan-state-only' into xinyuan-state-materialization
aglinxinyuan May 1, 2026
a4827cd
Merge branch 'main' into xinyuan-loop-feb
aglinxinyuan May 1, 2026
89e2276
Merge xinyuan-state-materialization into xinyuan-loop-feb
aglinxinyuan May 1, 2026
fbd2a4d
Merge branch 'xinyuan-loop-feb' of github.com:apache/texera into xiny…
aglinxinyuan May 1, 2026
528a478
Merge branch 'main' into xinyuan-loop-feb
aglinxinyuan May 1, 2026
c0b0309
Merge branch 'main' into xinyuan-state-only
aglinxinyuan May 1, 2026
0f90755
update
aglinxinyuan May 1, 2026
3e86158
fix fmt
aglinxinyuan May 1, 2026
80037ce
fix fmt
aglinxinyuan May 1, 2026
c1d19af
fix: route StateFrame through state.toTuple in PythonProxyClient
aglinxinyuan May 1, 2026
180f481
Merge xinyuan-scheduler-jump into xinyuan-loop-feb
aglinxinyuan May 1, 2026
407cd29
add test cases
aglinxinyuan May 1, 2026
b475f3a
Merge branch 'xinyuan-state-only' into xinyuan-state-materialization
aglinxinyuan May 1, 2026
e7f9d36
Merge branch 'main' into xinyuan-state-only
aglinxinyuan May 1, 2026
0cc2d18
Merge branch 'xinyuan-state-only' into xinyuan-state-materialization
aglinxinyuan May 1, 2026
1d55217
Merge branch 'main' into xinyuan-loop-feb
aglinxinyuan May 1, 2026
11d68ef
Merge branch 'xinyuan-state-materialization' into xinyuan-loop-feb
aglinxinyuan May 1, 2026
12bb7e4
refactor: drop redundant JumpToOperator API in favor of JumpToOperato…
aglinxinyuan May 1, 2026
c6aba8b
add test cases
aglinxinyuan May 1, 2026
195fa3b
Merge branch 'main' into xinyuan-state-materialization
aglinxinyuan May 1, 2026
481d757
add test cases
aglinxinyuan May 1, 2026
08c5666
Merge branch 'main' into xinyuan-scheduler-jump
aglinxinyuan May 1, 2026
af1b059
Merge branch 'xinyuan-state-materialization' into xinyuan-loop-feb
aglinxinyuan May 1, 2026
1b7460d
update
aglinxinyuan May 1, 2026
47252b3
update
aglinxinyuan May 1, 2026
df5d347
update
aglinxinyuan May 1, 2026
4d012d1
Merge branch 'main' into xinyuan-loop-feb
aglinxinyuan May 1, 2026
e906f5a
update
aglinxinyuan May 1, 2026
869c7ea
Merge branch 'main' into xinyuan-state-materialization
aglinxinyuan May 1, 2026
13c8824
Merge branch 'xinyuan-state-materialization' into xinyuan-loop-feb
aglinxinyuan May 1, 2026
0b761b3
update
aglinxinyuan May 1, 2026
a32c56a
Merge branch 'main' into xinyuan-scheduler-jump
aglinxinyuan May 1, 2026
d2f9c0e
refactor(amber): move schedule-rewrite logic out of Schedule
aglinxinyuan May 1, 2026
837323a
refactor(amber): use Schedule.copy in jumpToRegionContainingOperator
aglinxinyuan May 1, 2026
e1c6433
refactor(amber): simplify jumpToRegionContainingOperator to a cursor …
aglinxinyuan May 1, 2026
b89de48
refactor(amber): slim down Schedule to data + cursor
aglinxinyuan May 2, 2026
0f55e37
refactor(amber): rename Schedule cursor to currentLevel
aglinxinyuan May 2, 2026
b62d7bd
refactor(amber): rename pullNextRegions to getNextRegions
aglinxinyuan May 2, 2026
2fa51e0
refactor(amber): inline jump rewrite in JumpToOperatorRegionHandler
aglinxinyuan May 2, 2026
fc05cbe
refactor(amber): inline coordinator lookup in JumpToOperatorRegionHan…
aglinxinyuan May 2, 2026
8c46d1a
refactor(amber): direct levelSets lookup in Schedule.next
aglinxinyuan May 2, 2026
e47de64
refactor(amber): use Map.isDefinedAt in Schedule.hasNext
aglinxinyuan May 2, 2026
daab8be
refactor(amber): inline coordinator getNextRegions
aglinxinyuan May 2, 2026
9872d8e
refactor(amber): lazy-init WorkflowExecutionCoordinator with the real…
aglinxinyuan May 2, 2026
ec76300
revert(amber): drop lazy-init coordinator, keep direct replaceSchedul…
aglinxinyuan May 2, 2026
010c829
Merge branch 'main' into xinyuan-scheduler-jump
aglinxinyuan May 2, 2026
8444b4c
chore(amber): drop placeholder-comment on ControllerProcessor coordin…
aglinxinyuan May 2, 2026
7d4a00e
update
aglinxinyuan May 2, 2026
a32abcf
update
aglinxinyuan May 2, 2026
6de0e4b
Merge branch 'main' into xinyuan-scheduler-jump
aglinxinyuan May 2, 2026
7d0bd96
feat(amber): record jumps as replay tails appended to the schedule
aglinxinyuan May 2, 2026
664fdd9
refactor(amber): drop executionLevels, use cursor reset for jumps
aglinxinyuan May 2, 2026
5c5b652
Merge branch 'main' into xinyuan-scheduler-jump
aglinxinyuan May 2, 2026
3b4d5f6
Merge branch 'main' into xinyuan-scheduler-jump
aglinxinyuan May 2, 2026
dd9e738
refactor(amber): move target-operator search into JumpToOperatorRegio…
aglinxinyuan May 2, 2026
0c681ed
refactor(amber): drop initialSchedule constructor param on coordinator
aglinxinyuan May 2, 2026
c646d5b
refactor(amber): expose coordinator schedule as a public var
aglinxinyuan May 2, 2026
c0d02f9
Merge branch 'main' into xinyuan-scheduler-jump
aglinxinyuan May 2, 2026
ea3d5f1
update
aglinxinyuan May 2, 2026
61985a2
Merge remote-tracking branch 'origin/xinyuan-scheduler-jump' into xin…
aglinxinyuan May 2, 2026
6f5e725
Merge branch 'main' into xinyuan-scheduler-jump
aglinxinyuan May 2, 2026
9b9aba8
update
aglinxinyuan May 2, 2026
bb839cf
Merge branch 'main' into xinyuan-state-materialization
aglinxinyuan May 2, 2026
580e09c
Merge branch 'xinyuan-state-materialization' into xinyuan-loop-feb
aglinxinyuan May 2, 2026
339f7a8
Merge branch 'xinyuan-scheduler-jump' into xinyuan-loop-feb
aglinxinyuan May 2, 2026
ff3a9d1
fix(amber): IfOpExec uses State.values map after state refactor
aglinxinyuan May 2, 2026
58b1248
Merge branch 'main' into xinyuan-state-materialization
aglinxinyuan May 2, 2026
32c7d84
Merge branch 'main' into xinyuan-state-materialization
aglinxinyuan May 2, 2026
dc74a72
Merge branch 'xinyuan-state-materialization' into xinyuan-loop-feb
aglinxinyuan May 2, 2026
b326684
fix fmt
aglinxinyuan May 2, 2026
37ce61d
fix fmt
aglinxinyuan May 2, 2026
d84314a
fix fmt
aglinxinyuan May 2, 2026
5905737
Merge branch 'main' into xinyuan-state-materialization
aglinxinyuan May 2, 2026
6ad18e8
add tests
aglinxinyuan May 2, 2026
39a4018
Merge branch 'main' into xinyuan-state-materialization
aglinxinyuan May 2, 2026
b8865e0
Merge branch 'main' into xinyuan-state-materialization
aglinxinyuan May 2, 2026
eb1f50d
Merge branch 'main' into xinyuan-state-materialization
aglinxinyuan May 2, 2026
03d8189
fix tests
aglinxinyuan May 2, 2026
fb1e038
test(python): cover state-reader run() block and DocumentFactory routing
aglinxinyuan May 3, 2026
3d160b9
Merge branch 'main' into xinyuan-state-materialization
aglinxinyuan May 3, 2026
9174563
Merge branch 'main' into xinyuan-state-materialization
aglinxinyuan May 5, 2026
cba7fe0
Merge branch 'main' into xinyuan-state-materialization
aglinxinyuan May 5, 2026
ad60adb
fix
aglinxinyuan May 5, 2026
5494240
fix
aglinxinyuan May 5, 2026
7f8376e
fix
aglinxinyuan May 5, 2026
1645717
fix
aglinxinyuan May 5, 2026
10f243f
Merge branch 'main' into xinyuan-state-materialization
aglinxinyuan May 5, 2026
aff5611
Merge branch 'xinyuan-state-materialization' into xinyuan-loop-feb
aglinxinyuan May 5, 2026
425b589
Merge remote-tracking branch 'origin/main' into xinyuan-loop-feb
aglinxinyuan May 15, 2026
f313ee5
fix(pyamber): track _storage_uris so reset_loopend_storage works
aglinxinyuan May 15, 2026
d635c57
update
aglinxinyuan May 15, 2026
8294e96
fix(loop): drop stale uri_from_result_uri / uriFromResultUri helpers
aglinxinyuan May 15, 2026
85e8cc2
init
aglinxinyuan May 15, 2026
08d3834
init
aglinxinyuan May 15, 2026
fffe355
refactor(amber): extract workerIdx once in setupOutputStorageWriterTh…
aglinxinyuan May 15, 2026
0cf9a0f
Revert "refactor(amber): extract workerIdx once in setupOutputStorage…
aglinxinyuan May 15, 2026
7a320f6
init
aglinxinyuan May 15, 2026
171f7d6
fix fmt
aglinxinyuan May 15, 2026
538a821
fix fmt
aglinxinyuan May 15, 2026
4502539
fix fmt
aglinxinyuan May 15, 2026
53e4d79
refactor(storage): replace LoopEnd open-or-create try/catch with expl…
aglinxinyuan May 15, 2026
2aec649
refactor: hoist isLoopEndRegion branch outside the per-URI guards
aglinxinyuan May 15, 2026
d0d35c2
test(pyamber): remove test_output_manager.py
aglinxinyuan May 15, 2026
5a86b97
test(pyamber): remove test_document_factory.py
aglinxinyuan May 15, 2026
240f9e6
test(pyamber): restore test_output_manager.py and test_document_facto…
aglinxinyuan May 15, 2026
19b0cd9
refactor: collapse LoopEnd guards back to per-URI condition
aglinxinyuan May 15, 2026
7aeb5ee
feat(storage): add DocumentFactory.documentExists for absence checks
aglinxinyuan May 16, 2026
3ba674d
chore: remove DocumentFactory.documentExists hunk (now in #5085)
aglinxinyuan May 16, 2026
d760f98
docs: drop try/catch rationale from documentExists comment
aglinxinyuan May 16, 2026
ceff249
refactor(storage): address PR review on documentExists
aglinxinyuan May 16, 2026
771f0eb
refactor(storage): inline catalog.tableExists in documentExists
aglinxinyuan May 16, 2026
8c02359
feat(storage,python): add DocumentFactory.document_exists
aglinxinyuan May 16, 2026
641a8df
Potential fix for pull request finding
aglinxinyuan May 16, 2026
7b174eb
style: scalafmt IcebergDocumentSpec
aglinxinyuan May 16, 2026
f6dbae4
Merge branch 'main' into xinyuan-loop-feb
aglinxinyuan May 16, 2026
d121e21
Merge branch 'xinyuan-document-factory-exists' into xinyuan-loop-feb
aglinxinyuan May 16, 2026
850e225
Merge branch 'main' into xinyuan-document-factory-exists
mengw15 May 16, 2026
33dff84
Merge branch 'xinyuan-document-factory-exists' into xinyuan-loop-feb
aglinxinyuan May 16, 2026
caf7ce4
refactor(loop): drop loop_start_id() and inline state cleanup at the …
aglinxinyuan May 16, 2026
5d893dc
refactor(loop): drop generatePythonCode try/catch and inline get_inpu…
aglinxinyuan May 16, 2026
da1aba6
Merge remote-tracking branch 'origin/main' into xinyuan-loop-feb
aglinxinyuan May 16, 2026
a22b95e
refactor(loop): small cleanups in process_input_state, condition, and…
aglinxinyuan May 16, 2026
4a48559
refactor(loop): mark LoopEnd via PhysicalOp.isLoopEnd, not op-id prefix
aglinxinyuan May 16, 2026
245fe4b
refactor(loop): match condition() annotation in generated code with b…
aglinxinyuan May 16, 2026
8db3f75
refactor(loop): collapse per-port storage URI dict to single field
aglinxinyuan May 16, 2026
71ca457
refactor(loop): stash storage_uri_base on PortStorageWriter
aglinxinyuan May 16, 2026
0d3125d
Revert "refactor(loop): stash storage_uri_base on PortStorageWriter"
aglinxinyuan May 16, 2026
833056d
fix fmt
aglinxinyuan May 16, 2026
f7e5dcb
Merge branch 'main' into xinyuan-loop-feb
aglinxinyuan May 18, 2026
80469e7
Merge branch 'main' into xinyuan-loop-feb
Xiao-zhen-Liu May 19, 2026
ca12ad0
test(loop): cover LoopStart/LoopEnd codegen + flat/nested runtime beh…
aglinxinyuan May 20, 2026
9ca0cc3
Merge branch 'main' into xinyuan-loop-feb
aglinxinyuan May 20, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,11 @@ def __init__(self, worker_id: str):
PortIdentity, typing.Tuple[Queue, PortStorageWriter, Thread]
] = dict()

# Loop-end operators have a single output port; remember its base
# URI so `reset_loopend_storage` can re-provision the iceberg
# tables on each loop iteration.
self._storage_uri_base: typing.Optional[str] = None

def is_missing_output_ports(self):
"""
This method is only used for ensuring correct region execution.
Expand Down Expand Up @@ -133,6 +138,9 @@ def set_up_port_storage_writer(self, port_id: PortIdentity, storage_uri_base: st
state materialization on the same port. `storage_uri_base` is the
port's base URI; the result and state URIs are derived from it.
"""
# Remember the base URI so `reset_loopend_storage` can re-provision
# the iceberg tables on subsequent loop iterations.
self._storage_uri_base = storage_uri_base
document, _ = DocumentFactory.open_document(
VFSURIFactory.result_uri(storage_uri_base)
)
Expand Down Expand Up @@ -217,6 +225,19 @@ def save_state_to_storage_if_needed(self, state: State, port_id=None) -> None:
elif port_id in self._port_state_writers:
self._port_state_writers[port_id][0].put(element)

def reset_storage(self) -> None:
Comment thread
aglinxinyuan marked this conversation as resolved.
port_id = self.get_port_ids()[0]
storage_uri_base = self._storage_uri_base
self.close_port_storage_writers()
DocumentFactory.create_document(
VFSURIFactory.result_uri(storage_uri_base),
self._ports[port_id].get_schema(),
)
DocumentFactory.create_document(
VFSURIFactory.state_uri(storage_uri_base), State.SCHEMA
)
self.set_up_port_storage_writer(port_id, storage_uri_base)
Comment thread
aglinxinyuan marked this conversation as resolved.

def close_port_storage_writers(self) -> None:
"""
Flush the buffers of port storage writers and wait for all the
Expand Down
27 changes: 27 additions & 0 deletions amber/src/main/python/core/models/operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,3 +291,30 @@ def process_table(self, table: Table, port: int) -> Iterator[Optional[TableLike]
time, or None.
"""
yield


class LoopStartOperator(TableOperator):
@overrides.final
def process_state(self, state: State, port: int) -> Optional[State]:
if "LoopStartStateURI" in state:
state["loop_counter"] += 1
Comment thread
aglinxinyuan marked this conversation as resolved.
return state
self.state.update(state)
return None

@overrides.final
def produce_state_on_finish(self, port: int) -> State:
from pickle import dumps

self.state["table"] = dumps(Table(self._TableOperator__table_data[port]))
return dict(self.state)
Comment thread
aglinxinyuan marked this conversation as resolved.


class LoopEndOperator(TableOperator):
@overrides.final
def process_table(self, table: Table, port: int) -> Iterator[Optional[TableLike]]:
yield table

@abstractmethod
def condition(self) -> bool:
pass
49 changes: 47 additions & 2 deletions amber/src/main/python/core/runnables/main_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,11 @@
ECMElement,
InternalQueueElement,
)
from core.models.operator import LoopEndOperator, LoopStartOperator
from core.models.state import State
from core.runnables.data_processor import DataProcessor
from core.storage.document_factory import DocumentFactory
from core.storage.vfs_uri_factory import VFSURIFactory
from core.util import StoppableQueueBlockingRunnable, get_one_of
from core.util.console_message.timestamp import current_time_in_local_timezone
from core.util.customized_queue.queue_base import QueueElement
Expand All @@ -48,6 +51,7 @@
PortIdentity,
ChannelIdentity,
EmbeddedControlMessageIdentity,
OperatorIdentity,
)
from proto.org.apache.texera.amber.engine.architecture.rpc import (
ConsoleMessage,
Expand All @@ -61,6 +65,7 @@
EmbeddedControlMessage,
AsyncRpcContext,
ControlRequest,
JumpToOperatorRegionRequest,
)
from proto.org.apache.texera.amber.engine.architecture.worker import (
WorkerState,
Expand All @@ -87,19 +92,54 @@ def __init__(
target=self.data_processor.run, daemon=True, name="data_processor_thread"
).start()

def _attach_loop_start_id(self, output_state: State) -> None:
if "LoopStartId" in output_state:
return
output_state["LoopStartId"] = self.context.worker_id.split("-", 1)[1].rsplit(
"-main-0", 1
)[0]
Comment thread
aglinxinyuan marked this conversation as resolved.
# The URI lives on the upstream operator's output port (which
# LoopStart's first materialization reader is reading from).
reader_runnables = (
self.context.input_manager.get_input_port_mat_reader_threads()
)
output_state["LoopStartStateURI"] = VFSURIFactory.state_uri(
next(iter(reader_runnables.values()))[0].uri
)

def _jump_to_loop_start(
self, executor: LoopEndOperator, controller_interface
) -> None:
state = executor.state
controller_interface.jump_to_operator_region(
JumpToOperatorRegionRequest(OperatorIdentity(state["LoopStartId"]))
)
uri = state["LoopStartStateURI"]
# Strip the per-iteration scratch (`table`, `output`) and the
# loop metadata (`LoopStartId`, `LoopStartStateURI`) so only the
# user-visible loop state is written back to LoopStart's input.
for key in ("table", "output", "LoopStartId", "LoopStartStateURI"):
state.pop(key, None)
writer = DocumentFactory.create_document(uri, State.SCHEMA).writer("0")
writer.put_one(State(state).to_tuple())
writer.close()
Comment thread
aglinxinyuan marked this conversation as resolved.

def complete(self) -> None:
"""
Complete the DataProcessor, marking state to COMPLETED, and notify the
controller.
"""
# flush the buffered console prints
self._check_and_report_console_messages(force_flush=True)
self.context.executor_manager.executor.close()
controller_interface = self._async_rpc_client.controller_stub()
executor = self.context.executor_manager.executor
if isinstance(executor, LoopEndOperator) and executor.condition():
self._jump_to_loop_start(executor, controller_interface)
Comment thread
aglinxinyuan marked this conversation as resolved.
executor.close()
# stop the data processing thread
self.data_processor.stop()
self.context.state_manager.transit_to(WorkerState.COMPLETED)
self.context.statistics_manager.update_total_execution_time(time.time_ns())
controller_interface = self._async_rpc_client.controller_stub()
controller_interface.worker_execution_completed(EmptyRequest())
self.context.close()

Expand Down Expand Up @@ -193,6 +233,11 @@ def process_input_state(self) -> None:
self._switch_context()
output_state = self.context.state_processing_manager.get_output_state()
if output_state is not None:
executor = self.context.executor_manager.executor
if isinstance(executor, LoopEndOperator):
self.context.output_manager.reset_storage()
elif isinstance(executor, LoopStartOperator):
self._attach_loop_start_id(output_state)
for to, batch in self.context.output_manager.emit_state(output_state):
self._output_queue.put(
DataElement(
Expand Down
3 changes: 3 additions & 0 deletions amber/src/main/python/pytexera/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from overrides import overrides
from typing import Iterator, Optional, Union

from core.models.operator import LoopStartOperator, LoopEndOperator
from pyamber import *
from .storage.dataset_file_document import DatasetFileDocument
from .storage.large_binary_input_stream import LargeBinaryInputStream
Expand All @@ -43,6 +44,8 @@
"UDFTableOperator",
"UDFBatchOperator",
"UDFSourceOperator",
"LoopStartOperator",
"LoopEndOperator",
"DatasetFileDocument",
"largebinary",
"LargeBinaryInputStream",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -576,8 +576,17 @@ class RegionExecutionCoordinator(
region.getOperator(outputPortId.opId).outputPorts(outputPortId.portId)._3
val schema =
schemaOptional.getOrElse(throw new IllegalStateException("Schema is missing"))
DocumentFactory.createDocument(resultURI, schema)
DocumentFactory.createDocument(stateURI, State.schema)
// LoopEnd operators may re-execute the region multiple times; on
// subsequent iterations the result/state documents already exist,
// and `createDocument` (overrideIfExists=true) would clobber them.
// Skip the create call when the document is already there.
val isLoopEndRegion = region.getOperators.exists(_.isLoopEnd)
if (!isLoopEndRegion || !DocumentFactory.documentExists(resultURI)) {
DocumentFactory.createDocument(resultURI, schema)
}
if (!isLoopEndRegion || !DocumentFactory.documentExists(stateURI)) {
DocumentFactory.createDocument(stateURI, State.schema)
}
if (!isRestart) {
val (_, eid, _, _) = decodeURI(resultURI)
WorkflowExecutionsResource.insertOperatorPortResultUri(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,14 @@ package org.apache.texera.web.service

import com.typesafe.scalalogging.LazyLogging
import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdentity}
import org.apache.texera.amber.core.workflow.WorkflowContext
import org.apache.texera.amber.core.workflow.{ExecutionMode, WorkflowContext}
import org.apache.texera.amber.engine.architecture.controller.{ControllerConfig, Workflow}
import org.apache.texera.amber.engine.architecture.rpc.controlcommands.EmptyRequest
import org.apache.texera.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState._
import org.apache.texera.amber.engine.common.Utils
import org.apache.texera.amber.engine.common.client.AmberClient
import org.apache.texera.amber.engine.common.executionruntimestate.ExecutionMetadataStore
import org.apache.texera.amber.operator.loop.LoopStartOpDesc
import org.apache.texera.web.model.websocket.event.{
TexeraWebSocketEvent,
WorkflowErrorEvent,
Expand Down Expand Up @@ -66,7 +67,12 @@ class WorkflowExecutionService(
) extends SubscriptionManager
with LazyLogging {

workflowContext.workflowSettings = request.workflowSettings
workflowContext.workflowSettings =
if (request.logicalPlan.operators.exists(_.isInstanceOf[LoopStartOpDesc])) {
request.workflowSettings.copy(executionMode = ExecutionMode.MATERIALIZED)
} else {
request.workflowSettings
}
Comment thread
aglinxinyuan marked this conversation as resolved.
val wsInput = new WebsocketInput(errorHandler)

addSubscription(
Expand Down
Loading
Loading