-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathexecuterThreading.py
More file actions
39 lines (31 loc) · 1.48 KB
/
executerThreading.py
File metadata and controls
39 lines (31 loc) · 1.48 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading
import time
import random
def log_status(msg):
print(f"[{time.strftime('%H:%M:%S')}] {msg}")
def measure_device(device_name):
log_status(f"[{threading.current_thread().name}] Starting measurement on {device_name}")
time.sleep(random.uniform(1.5, 3.0))
log_status(f"[{threading.current_thread().name}] Finished measurement on {device_name}")
return f"Result-{device_name}"
def calibrate_device(device_name):
log_status(f"[{threading.current_thread().name}] Starting calibration on {device_name}")
time.sleep(2)
log_status(f"[{threading.current_thread().name}] Finished calibration on {device_name}")
return f"Calibration-{device_name}"
# Initialize the pool with a limited number of threads
executor = ThreadPoolExecutor(max_workers=4)
# Submit initial measurement tasks
device_names = ['SMU1', 'LaserA', 'CameraX']
future_to_device = {executor.submit(measure_device, dev): dev for dev in device_names}
# Wait a bit before submitting a new task (simulating a new signal or event)
time.sleep(1)
log_status("New calibration request received!")
cal_future = executor.submit(calibrate_device, 'PiezoY')
# Add the calibration task to our futures list to track it too
future_to_device[cal_future] = 'PiezoY'
# Wait for all tasks to complete
for future in as_completed(future_to_device):
result = future.result()
log_status(f"Task done: {result}")