Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/taskito-async/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "taskito-async"
version = "0.8.0"
version = "0.9.0"
edition = "2021"

[dependencies]
Expand Down
2 changes: 1 addition & 1 deletion crates/taskito-core/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "taskito-core"
version = "0.8.0"
version = "0.9.0"
edition = "2021"

[features]
Expand Down
4 changes: 3 additions & 1 deletion crates/taskito-python/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "taskito-python"
version = "0.8.0"
version = "0.9.0"
edition = "2021"

[features]
Expand All @@ -23,4 +23,6 @@ uuid = { workspace = true }
async-trait = { workspace = true }
taskito-async = { path = "../taskito-async", optional = true }
serde_json = { workspace = true }
serde = { workspace = true }
base64 = "0.22"
log = { workspace = true }
1 change: 1 addition & 0 deletions crates/taskito-python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use pyo3::prelude::*;

#[cfg(not(feature = "native-async"))]
mod async_worker;
mod prefork;
mod py_config;
mod py_job;
mod py_queue;
Expand Down
122 changes: 122 additions & 0 deletions crates/taskito-python/src/prefork/child.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
//! Child process handle — spawn, write jobs, read results.
//!
//! A child is split into two halves after spawning:
//! - `ChildWriter`: sends jobs to the child's stdin (owned by dispatch thread)
//! - `ChildReader`: reads results from the child's stdout (owned by reader thread)
//! - `ChildProcess`: holds the process handle for lifecycle management

use std::io::{BufRead, BufReader, BufWriter, Write};
use std::process::{Child, ChildStdin, ChildStdout, Command, Stdio};

use super::protocol::{ChildMessage, ParentMessage};

/// Writer half — sends job messages to the child process via stdin.
pub struct ChildWriter {
writer: BufWriter<ChildStdin>,
}

impl ChildWriter {
/// Send a message to the child process.
pub fn send(&mut self, msg: &ParentMessage) -> std::io::Result<()> {
let json = serde_json::to_string(msg)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
self.writer.write_all(json.as_bytes())?;
self.writer.write_all(b"\n")?;
self.writer.flush()
}

/// Send a shutdown message. Errors are silently ignored (child may already be gone).
pub fn send_shutdown(&mut self) {
let _ = self.send(&ParentMessage::Shutdown);
}
}

/// Reader half — reads result messages from the child process via stdout.
pub struct ChildReader {
reader: BufReader<ChildStdout>,
}

impl ChildReader {
/// Read one message from the child's stdout. Blocks until a line is available.
pub fn read(&mut self) -> Result<ChildMessage, String> {
let mut line = String::new();
match self.reader.read_line(&mut line) {
Ok(0) => Err("child process closed stdout".into()),
Ok(_) => serde_json::from_str(&line)
.map_err(|e| format!("failed to parse child message: {e}")),
Err(e) => Err(format!("failed to read from child stdout: {e}")),
}
}
}

/// Process handle for lifecycle management.
pub struct ChildProcess {
process: Child,
}

impl ChildProcess {
/// Check if the child process is still alive.
#[allow(dead_code)]
pub fn is_alive(&mut self) -> bool {
matches!(self.process.try_wait(), Ok(None))
}

/// Wait for the child to exit, with a timeout. Kills if it doesn't exit in time.
pub fn wait_or_kill(&mut self, timeout: std::time::Duration) {
let start = std::time::Instant::now();
loop {
match self.process.try_wait() {
Ok(Some(_)) => return,
Ok(None) if start.elapsed() >= timeout => {
let _ = self.process.kill();
let _ = self.process.wait();
return;
}
Ok(None) => std::thread::sleep(std::time::Duration::from_millis(100)),
Err(_) => return,
}
}
}
}

/// Spawn a child worker process and wait for its `ready` signal.
///
/// Returns the three split halves: writer, reader, and process handle.
pub fn spawn_child(
python: &str,
app_path: &str,
) -> Result<(ChildWriter, ChildReader, ChildProcess), String> {
let mut process = Command::new(python)
.args(["-m", "taskito.prefork", app_path])
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::inherit())
.spawn()
.map_err(|e| format!("failed to spawn child: {e}"))?;

let stdin = process.stdin.take().expect("stdin should be piped");
let stdout = process.stdout.take().expect("stdout should be piped");

let mut reader = ChildReader {
reader: BufReader::new(stdout),
};

// Wait for ready signal
match reader.read()? {
ChildMessage::Ready => {}
other => {
return Err(format!(
"expected ready message, got: {:?}",
std::any::type_name_of_val(&other)
));
}
}

Ok((
ChildWriter {
writer: BufWriter::new(stdin),
},
reader,
ChildProcess { process },
))
}
32 changes: 32 additions & 0 deletions crates/taskito-python/src/prefork/dispatch.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
//! Job dispatch strategies for distributing work across child processes.

/// Selects the child with the fewest in-flight jobs (least-loaded).
/// Falls back to round-robin if all children have equal load.
pub fn least_loaded(in_flight_counts: &[u32]) -> usize {
in_flight_counts
.iter()
.enumerate()
.min_by_key(|(_, &count)| count)
.map(|(idx, _)| idx)
.unwrap_or(0)
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_least_loaded_picks_idle() {
assert_eq!(least_loaded(&[3, 0, 2]), 1);
}

#[test]
fn test_least_loaded_picks_first_on_tie() {
assert_eq!(least_loaded(&[1, 1, 1]), 0);
}

#[test]
fn test_least_loaded_single() {
assert_eq!(least_loaded(&[5]), 0);
}
}
166 changes: 166 additions & 0 deletions crates/taskito-python/src/prefork/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
//! Prefork worker pool — dispatches jobs to child Python processes via IPC.
//!
//! Each child is an independent Python interpreter with its own GIL,
//! enabling true parallelism for CPU-bound tasks. The parent process
//! runs the Rust scheduler and dispatches serialized jobs over stdin
//! pipes; children send results back over stdout pipes.
//!
//! Architecture:
//! - One dispatch thread: receives `Job` from scheduler, sends to children via stdin
//! - N reader threads: one per child, reads results from stdout, sends to `result_tx`
//! - Child processes: run `python -m taskito.prefork <app_path>`

mod child;
mod dispatch;
pub mod protocol;

use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
use std::sync::Arc;
use std::thread;

use async_trait::async_trait;
use crossbeam_channel::Sender;

use taskito_core::job::Job;
use taskito_core::scheduler::JobResult;
use taskito_core::worker::WorkerDispatcher;

use child::{spawn_child, ChildWriter};
use protocol::ParentMessage;

/// Multi-process worker pool that dispatches jobs to child Python processes.
pub struct PreforkPool {
num_workers: usize,
app_path: String,
python: String,
shutdown: AtomicBool,
}

impl PreforkPool {
pub fn new(num_workers: usize, app_path: String) -> Self {
let python = std::env::var("TASKITO_PYTHON").unwrap_or_else(|_| "python".to_string());

Self {
num_workers,
app_path,
python,
shutdown: AtomicBool::new(false),
}
}
}

#[async_trait]
impl WorkerDispatcher for PreforkPool {
async fn run(
&self,
mut job_rx: tokio::sync::mpsc::Receiver<Job>,
result_tx: Sender<JobResult>,
) {
let num_workers = self.num_workers;
let app_path = self.app_path.clone();
let python = self.python.clone();
let shutdown = &self.shutdown;

// Spawn all children and split into writers + readers
let mut writers: Vec<ChildWriter> = Vec::with_capacity(num_workers);
let in_flight: Arc<Vec<AtomicU32>> =
Arc::new((0..num_workers).map(|_| AtomicU32::new(0)).collect());
let mut reader_handles: Vec<thread::JoinHandle<()>> = Vec::new();
let mut process_handles: Vec<child::ChildProcess> = Vec::new();

for i in 0..num_workers {
match spawn_child(&python, &app_path) {
Ok((writer, mut reader, process)) => {
log::info!("[taskito] prefork child {i} ready");
writers.push(writer);
process_handles.push(process);

// Spawn a reader thread for this child
let tx = result_tx.clone();
let in_flight_counter = in_flight.clone();
let child_idx = i;
reader_handles.push(thread::spawn(move || {
loop {
match reader.read() {
Ok(msg) => {
if let Some(job_result) = msg.into_job_result() {
in_flight_counter[child_idx]
.fetch_sub(1, Ordering::Relaxed);
if tx.send(job_result).is_err() {
break; // result channel closed
}
}
}
Err(e) => {
log::warn!(
"[taskito] prefork child {child_idx} reader error: {e}"
);
break;
}
}
}
}));
}
Err(e) => {
log::error!("[taskito] failed to spawn prefork child {i}: {e}");
}
}
}

if writers.is_empty() {
log::error!("[taskito] no prefork children started, aborting");
return;
}

log::info!(
"[taskito] prefork pool running with {} children",
writers.len()
);

// Dispatch loop: receive jobs from scheduler, send to least-loaded child
while let Some(job) = job_rx.recv().await {
if shutdown.load(Ordering::Relaxed) {
break;
}

let counts: Vec<u32> = in_flight
.iter()
.map(|c| c.load(Ordering::Relaxed))
.collect();
let idx = dispatch::least_loaded(&counts);

let msg = ParentMessage::from(&job);
if let Err(e) = writers[idx].send(&msg) {
log::error!(
"[taskito] failed to send job {} to child {idx}: {e}",
job.id
);
// Job will be reaped by the scheduler's stale job reaper
continue;
}
in_flight[idx].fetch_add(1, Ordering::Relaxed);
}

// Graceful shutdown: tell all children to stop
for (i, writer) in writers.iter_mut().enumerate() {
writer.send_shutdown();
log::info!("[taskito] sent shutdown to prefork child {i}");
}

// Wait for children to exit
let drain_timeout = std::time::Duration::from_secs(30);
for (i, process) in process_handles.iter_mut().enumerate() {
process.wait_or_kill(drain_timeout);
log::info!("[taskito] prefork child {i} exited");
}

// Wait for reader threads
for handle in reader_handles {
let _ = handle.join();
}
}

fn shutdown(&self) {
self.shutdown.store(true, Ordering::SeqCst);
}
}
Loading
Loading