Skip to content

Commit 8d04800

Browse files
committed
Optimize multiprocessing worker performance and improve state management
1 parent 6e0d213 commit 8d04800

2 files changed

Lines changed: 567 additions & 20 deletions

File tree

datalab/gui/processor/base.py

Lines changed: 166 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import time
1414
import warnings
1515
from dataclasses import dataclass
16+
from enum import Enum, auto
1617
from multiprocessing.pool import Pool
1718
from typing import TYPE_CHECKING, Any, Callable, Generic, Literal, Optional
1819

@@ -94,12 +95,81 @@ def run_with_env(func: Callable, args: tuple, env_json: str) -> CompOut:
9495
return wng_err_func(func, args)
9596

9697

98+
class WorkerState(Enum):
99+
"""Worker states for computation lifecycle."""
100+
101+
IDLE = auto() # Ready to start new computation
102+
STARTING = auto() # Computation starting (prevents race conditions)
103+
RUNNING = auto() # Computation in progress
104+
FINISHED = auto() # Computation completed, result available
105+
106+
107+
class WorkerStateMachine:
108+
"""State machine for managing worker computation lifecycle.
109+
110+
This class handles state transitions for worker computations,
111+
ensuring valid state flow and preventing invalid operations.
112+
"""
113+
114+
def __init__(self) -> None:
115+
"""Initialize the state machine in IDLE state."""
116+
self._current_state = WorkerState.IDLE
117+
118+
@property
119+
def current_state(self) -> WorkerState:
120+
"""Get the current state.
121+
122+
Returns:
123+
Current WorkerState.
124+
"""
125+
return self._current_state
126+
127+
def transition_to(self, target_state: WorkerState) -> None:
128+
"""Transition to the specified target state.
129+
130+
Args:
131+
target_state: The state to transition to.
132+
133+
Raises:
134+
ValueError: If the transition is not valid from the current state.
135+
"""
136+
# Define valid state transitions
137+
valid_transitions = {
138+
WorkerState.IDLE: {WorkerState.STARTING},
139+
WorkerState.STARTING: {WorkerState.RUNNING},
140+
WorkerState.RUNNING: {WorkerState.FINISHED},
141+
WorkerState.FINISHED: {WorkerState.IDLE},
142+
}
143+
144+
# Allow transitions to the same state (no-op)
145+
if target_state == self._current_state:
146+
return
147+
148+
# Check if transition is valid
149+
allowed_targets = valid_transitions.get(self._current_state, set())
150+
if target_state not in allowed_targets:
151+
raise ValueError(
152+
f"Invalid transition from {self._current_state} to {target_state}. "
153+
f"Valid transitions: {allowed_targets}"
154+
)
155+
156+
self._current_state = target_state
157+
158+
def reset_to_idle(self) -> None:
159+
"""Reset state to IDLE unconditionally.
160+
161+
This is used for restart/cancel operations where we need
162+
to force the state back to IDLE regardless of current state.
163+
"""
164+
self._current_state = WorkerState.IDLE
165+
166+
97167
class Worker:
98168
"""Multiprocessing worker, to run long-running tasks in a separate process"""
99169

100170
def __init__(self) -> None:
101171
self.asyncresult: AsyncResult = None
102-
self.result: Any = None
172+
self.state_machine = WorkerStateMachine()
103173

104174
@staticmethod
105175
def create_pool() -> None:
@@ -129,47 +199,128 @@ def terminate_pool(wait: bool = False) -> None:
129199
def restart_pool(self) -> None:
130200
"""Terminate and recreate the pool"""
131201
# Terminate the process and stop the timer
132-
self.terminate_pool(wait=False)
202+
Worker.terminate_pool(wait=False)
133203
# Recreate the pool for the next computation
134-
self.create_pool()
204+
Worker.create_pool()
205+
# Reset worker state after pool restart
206+
self.asyncresult = None
207+
self.state_machine.reset_to_idle()
135208

136209
def run(self, func: Callable, args: tuple[Any]) -> None:
137210
"""Run computation.
138211
139212
Args:
140213
func: function to run
141214
args: arguments
215+
216+
Raises:
217+
ValueError: If not in IDLE state or pool is not available.
142218
"""
219+
# Check if we can start computation
220+
if self.state_machine.current_state != WorkerState.IDLE:
221+
current_state = self.state_machine.current_state
222+
raise ValueError(f"Cannot start computation from {current_state} state")
223+
224+
# Transition to starting state
225+
self.state_machine.transition_to(WorkerState.STARTING)
226+
143227
global POOL # pylint: disable=global-statement,global-variable-not-assigned
144-
assert POOL is not None
228+
if POOL is None:
229+
raise ValueError("Multiprocessing pool is not available")
230+
231+
# Start the computation
145232
env_json = sigima_options.get_env()
146233
self.asyncresult = POOL.apply_async(run_with_env, (func, args, env_json))
147234

235+
# Transition to running state
236+
self.state_machine.transition_to(WorkerState.RUNNING)
237+
238+
def restart(self) -> None:
239+
"""Restart/cancel current computation"""
240+
current_state = self.state_machine.current_state
241+
242+
if current_state == WorkerState.IDLE:
243+
return # Already idle, nothing to restart
244+
elif current_state == WorkerState.STARTING:
245+
# If we're still starting, just go back to idle
246+
self.asyncresult = None
247+
elif current_state == WorkerState.RUNNING:
248+
# Cancel the running computation - use restart_pool for consistency
249+
self.restart_pool()
250+
return # restart_pool already handles state reset
251+
elif current_state == WorkerState.FINISHED:
252+
# Clean up and go to idle
253+
self.asyncresult = None
254+
255+
# Let state machine handle the transition to idle
256+
self.state_machine.reset_to_idle()
257+
148258
def close(self) -> None:
149259
"""Close worker: close pool properly and wait for all tasks to finish"""
150260
# Close multiprocessing Pool properly, but only if no computation is running,
151261
# to avoid blocking the GUI at exit (so, when wait=True, we wait for the
152262
# task to finish before closing the pool but there is actually no task running,
153263
# so the pool is closed immediately but *properly*)
154-
self.terminate_pool(wait=self.asyncresult is None)
264+
Worker.terminate_pool(wait=self.asyncresult is None)
155265

156266
def is_computation_finished(self) -> bool:
157267
"""Return True if computation is finished.
158268
159269
Returns:
160270
bool: True if computation is finished
161271
"""
162-
return self.asyncresult.ready()
272+
current_state = self.state_machine.current_state
273+
274+
if current_state == WorkerState.IDLE:
275+
return True # No computation has been started
276+
elif current_state == WorkerState.STARTING:
277+
return False # Computation is starting, not finished yet
278+
elif current_state == WorkerState.FINISHED:
279+
return True # Already finished
280+
elif current_state == WorkerState.RUNNING:
281+
if self.asyncresult is None:
282+
return False # Should not happen, but defensive
283+
finished = self.asyncresult.ready()
284+
if finished:
285+
# Transition to finished state
286+
self.state_machine.transition_to(WorkerState.FINISHED)
287+
return finished
288+
else:
289+
raise ValueError(f"Invalid worker state: {current_state}")
163290

164291
def get_result(self) -> CompOut:
165292
"""Return computation result.
166293
167294
Returns:
168295
CompOut: computation result
296+
297+
Raises:
298+
ValueError: If not in FINISHED state or no result available.
169299
"""
170-
self.result = self.asyncresult.get()
171-
self.asyncresult = None
172-
return self.result
300+
# Check if we can get result
301+
if self.state_machine.current_state != WorkerState.FINISHED:
302+
current_state = self.state_machine.current_state
303+
raise ValueError(f"Cannot get result from {current_state} state")
304+
305+
if self.asyncresult is None:
306+
raise ValueError("No result available")
307+
308+
# Get result and clean up (ensure cleanup happens even if exception occurs)
309+
try:
310+
result = self.asyncresult.get()
311+
return result
312+
finally:
313+
# Always clean up, even if get() raises an exception
314+
self.asyncresult = None
315+
self.state_machine.transition_to(WorkerState.IDLE)
316+
317+
def has_result_available(self) -> bool:
318+
"""Check if computation finished successfully and result is available.
319+
320+
Returns:
321+
True if computation completed successfully and result can be retrieved.
322+
"""
323+
return self.state_machine.current_state == WorkerState.FINISHED
173324

174325

175326
def is_pairwise_mode() -> bool:
@@ -268,7 +419,7 @@ def set_process_isolation_enabled(self, enabled: bool) -> None:
268419
if enabled:
269420
if self.worker is None:
270421
self.worker = Worker()
271-
self.worker.create_pool()
422+
Worker.create_pool()
272423
else:
273424
if self.worker is not None:
274425
self.worker.terminate_pool()
@@ -469,19 +620,14 @@ def __exec_func(
469620
return wng_err_func(func, args)
470621
# Process isolation: run function in a separate process
471622
self.worker.run(func, args)
472-
# Adaptive sleep to balance responsiveness and performance
473-
sleep_duration = 0.001 # Start with 1ms
474-
max_sleep = 0.01 # Cap at 10ms
475-
sleep_increment = 0.001 # Increase by 1ms each iteration
476623
while not self.worker.is_computation_finished():
477-
QW.QApplication.processEvents() # Keep UI responsive
478-
time.sleep(sleep_duration) # Adaptive sleep to avoid busy waiting
479-
# Gradually increase sleep time for longer computations
480-
sleep_duration = min(sleep_duration + sleep_increment, max_sleep)
624+
QW.QApplication.processEvents()
625+
time.sleep(0) # Just yields to other threads - no forced delay
481626
if progress.wasCanceled(): # User canceled the operation
482-
self.worker.restart_pool() # Terminate and recreate the pool
627+
self.worker.restart() # Cancel computation and reset to idle
483628
break
484-
if self.worker.is_computation_finished():
629+
# Only get result if computation actually finished (not canceled)
630+
if self.worker.has_result_available():
485631
return self.worker.get_result()
486632
return None
487633

0 commit comments

Comments
 (0)