Skip to content

Sequential event loop in FE (with not fully deterministic implementation)#548

Open
eabatalov wants to merge 1 commit intomainfrom
eugene/fe-deterministic-event-loo
Open

Sequential event loop in FE (with not fully deterministic implementation)#548
eabatalov wants to merge 1 commit intomainfrom
eugene/fe-deterministic-event-loo

Conversation

@eabatalov
Copy link
Contributor

@eabatalov eabatalov commented Mar 4, 2026

A sequential strictly ordered input/output event stream is implemented. However, the event loop itself is not working with user code in a deterministic way yet. See the many FIXMEs I left. I'll address this in further commits. I'd like to merge this because all tests pass and it's a lot of changes already.

Now there're clearly defined responsibilities:

  • Event Loop: running customer code deterministically, managing SDK objects like Futures, acting on input events from input event stream and producing output events in output event stream.
  • AllocationRunner: serialization/deserialization, gRPC, replay (input/output events).

Also deduped application deployments in application tests because they otherwise makes the tests running very long which is wasting time.

map-reduce benchmark didn't change:

poetry run python ../indexify/benchmarks/map_reduce/main.py --num-requests 5 --maps-count 100
main:

Deploying benchmark application...
Starting map reduce benchmark with maps_count: 100, num_requests: 5
Started request zGIw2a5rn-vY6dbzlmg28 with 100 functions
Started request -xQed7iN6inQYMAma1Qp1 with 100 functions
Started request d_AqeMpEy1FDoW40i6YNS with 100 functions
Started request 7-36sH5Ef7j9HSxhHo-91 with 100 functions
Started request YXmCgx1ZJyGHqfIhEMOtE with 100 functions
All 5 requests started in 0.01 seconds
Waiting for all requests to complete...

Completed 5 requests in 84.60s

Test Configuration:
- Requests: 5
- Map calls per request: 100
- Total function calls: 1000

PR:

All 5 requests started in 0.01 seconds
Waiting for all requests to complete...

Completed 5 requests in 83.96s

Test Configuration:
- Requests: 5
- Map calls per request: 100
- Total function calls: 1000

Note

High Risk
Large refactor of core execution/Future lifecycle and durability ID generation plus runtime hook API changes; subtle ordering/concurrency edge cases (noted by FIXMEs) could affect determinism and replay correctness.

Overview
Refactors Function Executor allocation execution to a new AllocationEventLoop that runs user code in a dedicated thread and communicates with AllocationRunner via ordered output events (create function call, add watcher, finish allocation) and input events (call created, watcher result), moving Future graph registration/durability and wait/await logic into the event loop.

Updates runtime hook plumbing from run_futures(futures=[...]) to run_future(future=...) across the SDK interface, local runner, and FE service, including improved error propagation on Future.run() failures.

Reworks execution-plan construction: event loop emits function-call refs/collections, and sdk_algorithms now serializes output-event args and converts them into ExecutionPlanUpdates; also extracts durable-ID generation into a dedicated module. Test suites are sped up by deploying apps once per test class (setUpClass).

Written by Cursor Bugbot for commit 8d63d45. This will update automatically on new commits. Configure here.

@eabatalov eabatalov requested a review from diptanu March 4, 2026 18:43
"Unexpected exception in await_future_runtime_hook",
exc_info=e,
)
raise InternalError("Unexpected error while awaiting future")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lock ineffective for generator-based await hook

Medium Severity

The _runtime_hook_lock in await_future_runtime_hook provides no actual protection. _await_future_runtime_hook is a generator function (it uses yield from), so calling it merely creates a generator object without executing any of its body. The with self._runtime_hook_lock block acquires the lock, creates the generator, returns it, and releases the lock—all before the generator's body (which starts a background thread and accesses shared state like _future_infos and _output_event_queue) ever runs. This contradicts the lock's stated purpose of preventing concurrent hook execution from multiple user threads.

Additional Locations (1)

Fix in Cursor Fix in Web

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, I know, this is not implemented properly yet.

@eabatalov eabatalov force-pushed the eugene/fe-deterministic-event-loo branch from 677435a to f2ca419 Compare March 5, 2026 10:35
Copy link

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

Autofix Details

Bugbot Autofix prepared a fix for the issue found in the latest run.

  • ✅ Fixed: Server event thread never stopped on early return
    • I widened the __run_allocation cleanup finally to always enqueue the stop event on every return/exception path and only join the server-event thread when it is alive.

Create PR

Or push these changes by commenting:

@cursor push 5af7490f0a
Preview (5af7490f0a)
diff --git a/src/tensorlake/function_executor/allocation_runner/allocation_runner.py b/src/tensorlake/function_executor/allocation_runner/allocation_runner.py
--- a/src/tensorlake/function_executor/allocation_runner/allocation_runner.py
+++ b/src/tensorlake/function_executor/allocation_runner/allocation_runner.py
@@ -680,39 +680,40 @@
 
         Raises exception on internal error. Returns AllocationResult on user code error or normal completion.
         """
-        # We need to be very careful who's code we're running here. Exceptions raised in customer
-        # code should be caught here and converted into proper AllocationResult indicating customer code failure.
-        # Exceptions in our internal FE code are just raised here and handled by caller.
+        try:
+            # We need to be very careful who's code we're running here. Exceptions raised in customer
+            # code should be caught here and converted into proper AllocationResult indicating customer code failure.
+            # Exceptions in our internal FE code are just raised here and handled by caller.
 
-        # This is internal FE code.
-        serialized_args: List[SerializedValue] = download_function_arguments(
-            self._allocation, self._blob_store, self._logger
-        )
-        function_call_metadata: FunctionCallMetadata | None = (
-            validate_and_deserialize_function_call_metadata(
-                serialized_function_call_metadata=self._allocation.inputs.function_call_metadata,
-                serialized_args=serialized_args,
-                function=self._function,
-                logger=self._logger,
+            # This is internal FE code.
+            serialized_args: List[SerializedValue] = download_function_arguments(
+                self._allocation, self._blob_store, self._logger
             )
-        )
+            function_call_metadata: FunctionCallMetadata | None = (
+                validate_and_deserialize_function_call_metadata(
+                    serialized_function_call_metadata=self._allocation.inputs.function_call_metadata,
+                    serialized_args=serialized_args,
+                    function=self._function,
+                    logger=self._logger,
+                )
+            )
 
-        self._parse_output_overrides(function_call_metadata)
-        args_or_result: tuple[list[Any], dict[str, Any]] | AllocationResult = (
-            self._parse_function_call_args(function_call_metadata, serialized_args)
-        )
-        if isinstance(args_or_result, AllocationResult):
-            # user code error during argument parsing.
-            return args_or_result
+            self._parse_output_overrides(function_call_metadata)
+            args_or_result: tuple[list[Any], dict[str, Any]] | AllocationResult = (
+                self._parse_function_call_args(function_call_metadata, serialized_args)
+            )
+            if isinstance(args_or_result, AllocationResult):
+                # user code error during argument parsing.
+                return args_or_result
 
-        args, kwargs = args_or_result
-        try:
+            args, kwargs = args_or_result
             self._event_loop.start(args, kwargs)
             return self._process_event_loop_output_events()
         finally:
             self._server_event_queue.put(_ServerEventStopProcessingThread())
-            self._logger.info("waiting for server event processing thread to finish")
-            self._process_server_events_thread.join()
+            if self._process_server_events_thread.is_alive():
+                self._logger.info("waiting for server event processing thread to finish")
+                self._process_server_events_thread.join()
 
     def _process_event_loop_output_events(self) -> AllocationResult:
         """Processes output events from the event loop until allocation completes.
This Bugbot Autofix run was free. To enable autofix for future PRs, go to the Cursor dashboard.

@eabatalov eabatalov force-pushed the eugene/fe-deterministic-event-loo branch 2 times, most recently from d0e3a86 to 5896027 Compare March 5, 2026 11:29
Copy link

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 2 potential issues.

Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.

)
else:
output_events.append(OutputEventFinishAllocation(value=output))
self._output_event_queue.put(OutputEventBatch(events=output_events))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Non-tail-call output events skip function call creation confirmation

Medium Severity

When the user function's output resolves to a non-Future value (e.g., ReduceOperationFuture with a single plain-value input that has child OutputEventCreateFunctionCall events from the DFS walk), those function call events are sent in the same OutputEventBatch as OutputEventFinishAllocation. The allocation runner processes each event sequentially: _handle_event_loop_output_event_call_function submits to the server but doesn't wait for creation confirmation, then hits OutputEventFinishAllocation and returns the result. Unlike the tail-call path which uses _create_function_calls (with .wait()), this path has no confirmation step, so the allocation may complete before child function calls are acknowledged.

Additional Locations (1)

Fix in Cursor Fix in Web

…ion)

A sequential strictly ordered input/output event stream is implemented.
However, the event loop itself is not working with user code in a deterministic
way yet. See the many FIXMEs I left. I'll address this in further commits.
I'd like to merge this because all tests pass and it's a lot of changes already.

Also deduped application deployments in application tests because they otherwise
makes the tests running very long which is wasting time.

map-reduce benchmark didn't change:

```
poetry run python ../indexify/benchmarks/map_reduce/main.py --num-requests 5 --maps-count 100
main:

Deploying benchmark application...
Starting map reduce benchmark with maps_count: 100, num_requests: 5
Started request zGIw2a5rn-vY6dbzlmg28 with 100 functions
Started request -xQed7iN6inQYMAma1Qp1 with 100 functions
Started request d_AqeMpEy1FDoW40i6YNS with 100 functions
Started request 7-36sH5Ef7j9HSxhHo-91 with 100 functions
Started request YXmCgx1ZJyGHqfIhEMOtE with 100 functions
All 5 requests started in 0.01 seconds
Waiting for all requests to complete...

Completed 5 requests in 84.60s

Test Configuration:
- Requests: 5
- Map calls per request: 100
- Total function calls: 1000

PR:

All 5 requests started in 0.01 seconds
Waiting for all requests to complete...

Completed 5 requests in 83.96s

Test Configuration:
- Requests: 5
- Map calls per request: 100
- Total function calls: 1000
```
@eabatalov eabatalov force-pushed the eugene/fe-deterministic-event-loo branch from 5896027 to 8d63d45 Compare March 5, 2026 12:01
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant