Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions crates/taskito-python/src/prefork/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,43 @@ impl WorkerDispatcher for PreforkPool {
);

// Dispatch loop: receive jobs from scheduler, send to least-loaded child
let mut restart_count: u64 = 0;
while let Some(job) = job_rx.recv().await {
if shutdown.load(Ordering::Relaxed) {
break;
}

// Check for dead children and restart them
for i in 0..process_handles.len() {
if !process_handles[i].is_alive() {
log::warn!("[taskito] prefork child {i} died, restarting");
restart_count += 1;
match spawn_child(&python, &app_path) {
Ok((writer, mut reader, process)) => {
writers[i] = writer;
process_handles[i] = process;
in_flight[i].store(0, Ordering::Relaxed);
let tx = result_tx.clone();
let in_flight_counter = in_flight.clone();
reader_handles.push(thread::spawn(move || {
while let Ok(msg) = reader.read() {
if let Some(job_result) = msg.into_job_result() {
in_flight_counter[i].fetch_sub(1, Ordering::Relaxed);
if tx.send(job_result).is_err() {
break;
}
}
}
}));
log::info!("[taskito] prefork child {i} restarted (total restarts: {restart_count})");
}
Err(e) => {
log::error!("[taskito] failed to restart child {i}: {e}");
}
}
}
}

let counts: Vec<u32> = in_flight
.iter()
.map(|c| c.load(Ordering::Relaxed))
Expand Down
Loading