From e7719381088e8bc6abca24cb791b21e64c98bc6a Mon Sep 17 00:00:00 2001 From: Pratyush Sharma <56130065+pratyush618@users.noreply.github.com> Date: Sun, 22 Mar 2026 09:03:20 +0530 Subject: [PATCH] feat: auto-restart dead prefork children with restart tracking --- crates/taskito-python/src/prefork/mod.rs | 32 ++++++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/crates/taskito-python/src/prefork/mod.rs b/crates/taskito-python/src/prefork/mod.rs index b4ab40c..e60090b 100644 --- a/crates/taskito-python/src/prefork/mod.rs +++ b/crates/taskito-python/src/prefork/mod.rs @@ -118,11 +118,43 @@ impl WorkerDispatcher for PreforkPool { ); // Dispatch loop: receive jobs from scheduler, send to least-loaded child + let mut restart_count: u64 = 0; while let Some(job) = job_rx.recv().await { if shutdown.load(Ordering::Relaxed) { break; } + // Check for dead children and restart them + for i in 0..process_handles.len() { + if !process_handles[i].is_alive() { + log::warn!("[taskito] prefork child {i} died, restarting"); + restart_count += 1; + match spawn_child(&python, &app_path) { + Ok((writer, mut reader, process)) => { + writers[i] = writer; + process_handles[i] = process; + in_flight[i].store(0, Ordering::Relaxed); + let tx = result_tx.clone(); + let in_flight_counter = in_flight.clone(); + reader_handles.push(thread::spawn(move || { + while let Ok(msg) = reader.read() { + if let Some(job_result) = msg.into_job_result() { + in_flight_counter[i].fetch_sub(1, Ordering::Relaxed); + if tx.send(job_result).is_err() { + break; + } + } + } + })); + log::info!("[taskito] prefork child {i} restarted (total restarts: {restart_count})"); + } + Err(e) => { + log::error!("[taskito] failed to restart child {i}: {e}"); + } + } + } + } + let counts: Vec = in_flight .iter() .map(|c| c.load(Ordering::Relaxed))