Sequential event loop in FE (with not fully deterministic implementation)#548
Sequential event loop in FE (with not fully deterministic implementation)#548
Conversation
| "Unexpected exception in await_future_runtime_hook", | ||
| exc_info=e, | ||
| ) | ||
| raise InternalError("Unexpected error while awaiting future") |
There was a problem hiding this comment.
Lock ineffective for generator-based await hook
Medium Severity
The _runtime_hook_lock in await_future_runtime_hook provides no actual protection. _await_future_runtime_hook is a generator function (it uses yield from), so calling it merely creates a generator object without executing any of its body. The with self._runtime_hook_lock block acquires the lock, creates the generator, returns it, and releases the lock—all before the generator's body (which starts a background thread and accesses shared state like _future_infos and _output_event_queue) ever runs. This contradicts the lock's stated purpose of preventing concurrent hook execution from multiple user threads.
Additional Locations (1)
There was a problem hiding this comment.
Yep, I know, this is not implemented properly yet.
src/tensorlake/function_executor/allocation_runner/event_loop/event_loop.py
Outdated
Show resolved
Hide resolved
677435a to
f2ca419
Compare
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
Autofix Details
Bugbot Autofix prepared a fix for the issue found in the latest run.
- ✅ Fixed: Server event thread never stopped on early return
- I widened the
__run_allocationcleanupfinallyto always enqueue the stop event on every return/exception path and only join the server-event thread when it is alive.
- I widened the
Or push these changes by commenting:
@cursor push 5af7490f0a
Preview (5af7490f0a)
diff --git a/src/tensorlake/function_executor/allocation_runner/allocation_runner.py b/src/tensorlake/function_executor/allocation_runner/allocation_runner.py
--- a/src/tensorlake/function_executor/allocation_runner/allocation_runner.py
+++ b/src/tensorlake/function_executor/allocation_runner/allocation_runner.py
@@ -680,39 +680,40 @@
Raises exception on internal error. Returns AllocationResult on user code error or normal completion.
"""
- # We need to be very careful who's code we're running here. Exceptions raised in customer
- # code should be caught here and converted into proper AllocationResult indicating customer code failure.
- # Exceptions in our internal FE code are just raised here and handled by caller.
+ try:
+ # We need to be very careful who's code we're running here. Exceptions raised in customer
+ # code should be caught here and converted into proper AllocationResult indicating customer code failure.
+ # Exceptions in our internal FE code are just raised here and handled by caller.
- # This is internal FE code.
- serialized_args: List[SerializedValue] = download_function_arguments(
- self._allocation, self._blob_store, self._logger
- )
- function_call_metadata: FunctionCallMetadata | None = (
- validate_and_deserialize_function_call_metadata(
- serialized_function_call_metadata=self._allocation.inputs.function_call_metadata,
- serialized_args=serialized_args,
- function=self._function,
- logger=self._logger,
+ # This is internal FE code.
+ serialized_args: List[SerializedValue] = download_function_arguments(
+ self._allocation, self._blob_store, self._logger
)
- )
+ function_call_metadata: FunctionCallMetadata | None = (
+ validate_and_deserialize_function_call_metadata(
+ serialized_function_call_metadata=self._allocation.inputs.function_call_metadata,
+ serialized_args=serialized_args,
+ function=self._function,
+ logger=self._logger,
+ )
+ )
- self._parse_output_overrides(function_call_metadata)
- args_or_result: tuple[list[Any], dict[str, Any]] | AllocationResult = (
- self._parse_function_call_args(function_call_metadata, serialized_args)
- )
- if isinstance(args_or_result, AllocationResult):
- # user code error during argument parsing.
- return args_or_result
+ self._parse_output_overrides(function_call_metadata)
+ args_or_result: tuple[list[Any], dict[str, Any]] | AllocationResult = (
+ self._parse_function_call_args(function_call_metadata, serialized_args)
+ )
+ if isinstance(args_or_result, AllocationResult):
+ # user code error during argument parsing.
+ return args_or_result
- args, kwargs = args_or_result
- try:
+ args, kwargs = args_or_result
self._event_loop.start(args, kwargs)
return self._process_event_loop_output_events()
finally:
self._server_event_queue.put(_ServerEventStopProcessingThread())
- self._logger.info("waiting for server event processing thread to finish")
- self._process_server_events_thread.join()
+ if self._process_server_events_thread.is_alive():
+ self._logger.info("waiting for server event processing thread to finish")
+ self._process_server_events_thread.join()
def _process_event_loop_output_events(self) -> AllocationResult:
"""Processes output events from the event loop until allocation completes.
src/tensorlake/function_executor/allocation_runner/allocation_runner.py
Outdated
Show resolved
Hide resolved
d0e3a86 to
5896027
Compare
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 2 potential issues.
Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.
| ) | ||
| else: | ||
| output_events.append(OutputEventFinishAllocation(value=output)) | ||
| self._output_event_queue.put(OutputEventBatch(events=output_events)) |
There was a problem hiding this comment.
Non-tail-call output events skip function call creation confirmation
Medium Severity
When the user function's output resolves to a non-Future value (e.g., ReduceOperationFuture with a single plain-value input that has child OutputEventCreateFunctionCall events from the DFS walk), those function call events are sent in the same OutputEventBatch as OutputEventFinishAllocation. The allocation runner processes each event sequentially: _handle_event_loop_output_event_call_function submits to the server but doesn't wait for creation confirmation, then hits OutputEventFinishAllocation and returns the result. Unlike the tail-call path which uses _create_function_calls (with .wait()), this path has no confirmation step, so the allocation may complete before child function calls are acknowledged.
Additional Locations (1)
…ion) A sequential strictly ordered input/output event stream is implemented. However, the event loop itself is not working with user code in a deterministic way yet. See the many FIXMEs I left. I'll address this in further commits. I'd like to merge this because all tests pass and it's a lot of changes already. Also deduped application deployments in application tests because they otherwise makes the tests running very long which is wasting time. map-reduce benchmark didn't change: ``` poetry run python ../indexify/benchmarks/map_reduce/main.py --num-requests 5 --maps-count 100 main: Deploying benchmark application... Starting map reduce benchmark with maps_count: 100, num_requests: 5 Started request zGIw2a5rn-vY6dbzlmg28 with 100 functions Started request -xQed7iN6inQYMAma1Qp1 with 100 functions Started request d_AqeMpEy1FDoW40i6YNS with 100 functions Started request 7-36sH5Ef7j9HSxhHo-91 with 100 functions Started request YXmCgx1ZJyGHqfIhEMOtE with 100 functions All 5 requests started in 0.01 seconds Waiting for all requests to complete... Completed 5 requests in 84.60s Test Configuration: - Requests: 5 - Map calls per request: 100 - Total function calls: 1000 PR: All 5 requests started in 0.01 seconds Waiting for all requests to complete... Completed 5 requests in 83.96s Test Configuration: - Requests: 5 - Map calls per request: 100 - Total function calls: 1000 ```
5896027 to
8d63d45
Compare



A sequential strictly ordered input/output event stream is implemented. However, the event loop itself is not working with user code in a deterministic way yet. See the many FIXMEs I left. I'll address this in further commits. I'd like to merge this because all tests pass and it's a lot of changes already.
Now there're clearly defined responsibilities:
Also deduped application deployments in application tests because they otherwise makes the tests running very long which is wasting time.
map-reduce benchmark didn't change:
Note
High Risk
Large refactor of core execution/Future lifecycle and durability ID generation plus runtime hook API changes; subtle ordering/concurrency edge cases (noted by FIXMEs) could affect determinism and replay correctness.
Overview
Refactors Function Executor allocation execution to a new
AllocationEventLoopthat runs user code in a dedicated thread and communicates withAllocationRunnervia ordered output events (create function call, add watcher, finish allocation) and input events (call created, watcher result), moving Future graph registration/durability and wait/await logic into the event loop.Updates runtime hook plumbing from
run_futures(futures=[...])torun_future(future=...)across the SDK interface, local runner, and FE service, including improved error propagation onFuture.run()failures.Reworks execution-plan construction: event loop emits function-call refs/collections, and
sdk_algorithmsnow serializes output-event args and converts them intoExecutionPlanUpdates; also extracts durable-ID generation into a dedicated module. Test suites are sped up by deploying apps once per test class (setUpClass).Written by Cursor Bugbot for commit 8d63d45. This will update automatically on new commits. Configure here.