diff --git a/crates/taskito-core/src/storage/mod.rs b/crates/taskito-core/src/storage/mod.rs index df68f4c..ff80dff 100644 --- a/crates/taskito-core/src/storage/mod.rs +++ b/crates/taskito-core/src/storage/mod.rs @@ -352,8 +352,11 @@ macro_rules! impl_storage { resources: Option<&str>, resource_health: Option<&str>, threads: i32, + hostname: Option<&str>, + pid: Option, + pool_type: Option<&str>, ) -> $crate::error::Result<()> { - self.register_worker(worker_id, queues, tags, resources, resource_health, threads) + self.register_worker(worker_id, queues, tags, resources, resource_health, threads, hostname, pid, pool_type) } fn heartbeat( &self, @@ -362,17 +365,27 @@ macro_rules! impl_storage { ) -> $crate::error::Result<()> { self.heartbeat(worker_id, resource_health) } + fn update_worker_status( + &self, + worker_id: &str, + status: &str, + ) -> $crate::error::Result<()> { + self.update_worker_status(worker_id, status) + } fn list_workers( &self, ) -> $crate::error::Result> { self.list_workers() } - fn reap_dead_workers(&self) -> $crate::error::Result { + fn reap_dead_workers(&self) -> $crate::error::Result> { self.reap_dead_workers() } fn unregister_worker(&self, worker_id: &str) -> $crate::error::Result<()> { self.unregister_worker(worker_id) } + fn list_claims_by_worker(&self, worker_id: &str) -> $crate::error::Result> { + self.list_claims_by_worker(worker_id) + } fn pause_queue(&self, queue_name: &str) -> $crate::error::Result<()> { self.pause_queue(queue_name) } @@ -739,6 +752,9 @@ impl Storage for StorageBackend { resources: Option<&str>, resource_health: Option<&str>, threads: i32, + hostname: Option<&str>, + pid: Option, + pool_type: Option<&str>, ) -> Result<()> { delegate!( self, @@ -748,21 +764,30 @@ impl Storage for StorageBackend { tags, resources, resource_health, - threads + threads, + hostname, + pid, + pool_type ) } fn heartbeat(&self, worker_id: &str, resource_health: Option<&str>) -> Result<()> { delegate!(self, heartbeat, worker_id, resource_health) } + fn update_worker_status(&self, worker_id: &str, status: &str) -> Result<()> { + delegate!(self, update_worker_status, worker_id, status) + } fn list_workers(&self) -> Result> { delegate!(self, list_workers) } - fn reap_dead_workers(&self) -> Result { + fn reap_dead_workers(&self) -> Result> { delegate!(self, reap_dead_workers) } fn unregister_worker(&self, worker_id: &str) -> Result<()> { delegate!(self, unregister_worker, worker_id) } + fn list_claims_by_worker(&self, worker_id: &str) -> Result> { + delegate!(self, list_claims_by_worker, worker_id) + } fn pause_queue(&self, queue_name: &str) -> Result<()> { delegate!(self, pause_queue, queue_name) } diff --git a/crates/taskito-core/src/storage/models.rs b/crates/taskito-core/src/storage/models.rs index 91f38d6..ce64bf3 100644 --- a/crates/taskito-core/src/storage/models.rs +++ b/crates/taskito-core/src/storage/models.rs @@ -307,6 +307,10 @@ pub struct WorkerRow { pub resources: Option, pub resource_health: Option, pub threads: i32, + pub started_at: Option, + pub hostname: Option, + pub pid: Option, + pub pool_type: Option, } #[derive(Insertable, AsChangeset, Debug)] @@ -320,6 +324,10 @@ pub struct NewWorkerRow<'a> { pub resources: Option<&'a str>, pub resource_health: Option<&'a str>, pub threads: i32, + pub started_at: Option, + pub hostname: Option<&'a str>, + pub pid: Option, + pub pool_type: Option<&'a str>, } // ── Queue State ───────────────────────────────────────────────── diff --git a/crates/taskito-core/src/storage/postgres/mod.rs b/crates/taskito-core/src/storage/postgres/mod.rs index 458eecb..6999063 100644 --- a/crates/taskito-core/src/storage/postgres/mod.rs +++ b/crates/taskito-core/src/storage/postgres/mod.rs @@ -359,6 +359,24 @@ impl PostgresStorage { "ALTER TABLE workers ADD COLUMN IF NOT EXISTS threads INTEGER NOT NULL DEFAULT 0", ); + // Migration: add worker discovery metadata columns + migration_alter( + &mut conn, + "ALTER TABLE workers ADD COLUMN IF NOT EXISTS started_at BIGINT", + ); + migration_alter( + &mut conn, + "ALTER TABLE workers ADD COLUMN IF NOT EXISTS hostname TEXT", + ); + migration_alter( + &mut conn, + "ALTER TABLE workers ADD COLUMN IF NOT EXISTS pid INTEGER", + ); + migration_alter( + &mut conn, + "ALTER TABLE workers ADD COLUMN IF NOT EXISTS pool_type TEXT", + ); + diesel::sql_query( "CREATE TABLE IF NOT EXISTS queue_state ( queue_name TEXT PRIMARY KEY, diff --git a/crates/taskito-core/src/storage/postgres/workers.rs b/crates/taskito-core/src/storage/postgres/workers.rs index c8b7eeb..8b498b9 100644 --- a/crates/taskito-core/src/storage/postgres/workers.rs +++ b/crates/taskito-core/src/storage/postgres/workers.rs @@ -1,7 +1,7 @@ use diesel::prelude::*; use super::super::models::*; -use super::super::schema::workers; +use super::super::schema::{execution_claims, workers}; use super::PostgresStorage; use crate::error::Result; use crate::job::now_millis; @@ -11,6 +11,7 @@ const DEAD_WORKER_THRESHOLD_MS: i64 = 30_000; impl PostgresStorage { /// Register a new worker or update an existing one. + #[allow(clippy::too_many_arguments)] pub fn register_worker( &self, worker_id: &str, @@ -19,6 +20,9 @@ impl PostgresStorage { resources: Option<&str>, resource_health: Option<&str>, threads: i32, + hostname: Option<&str>, + pid: Option, + pool_type: Option<&str>, ) -> Result<()> { let mut conn = self.conn()?; let now = now_millis(); @@ -32,6 +36,10 @@ impl PostgresStorage { resources, resource_health, threads, + started_at: Some(now), + hostname, + pid, + pool_type, }; diesel::insert_into(workers::table) @@ -60,6 +68,18 @@ impl PostgresStorage { Ok(()) } + /// Update the status of a worker. + pub fn update_worker_status(&self, worker_id: &str, status: &str) -> Result<()> { + let mut conn = self.conn()?; + + diesel::update(workers::table) + .filter(workers::worker_id.eq(worker_id)) + .set(workers::status.eq(status)) + .execute(&mut conn)?; + + Ok(()) + } + /// List all workers with their heartbeat status. pub fn list_workers(&self) -> Result> { let mut conn = self.conn()?; @@ -72,14 +92,22 @@ impl PostgresStorage { } /// Remove workers that haven't sent a heartbeat within the threshold. - pub fn reap_dead_workers(&self) -> Result { + /// Returns the IDs of the reaped workers. + pub fn reap_dead_workers(&self) -> Result> { let mut conn = self.conn()?; let cutoff = now_millis().saturating_sub(DEAD_WORKER_THRESHOLD_MS); - let affected = diesel::delete(workers::table.filter(workers::last_heartbeat.lt(cutoff))) - .execute(&mut conn)?; + let dead_ids: Vec = workers::table + .filter(workers::last_heartbeat.lt(cutoff)) + .select(workers::worker_id) + .load(&mut conn)?; + + if !dead_ids.is_empty() { + diesel::delete(workers::table.filter(workers::worker_id.eq_any(&dead_ids))) + .execute(&mut conn)?; + } - Ok(affected as u64) + Ok(dead_ids) } /// Unregister a worker (called on shutdown). @@ -91,4 +119,16 @@ impl PostgresStorage { Ok(()) } + + /// List all job IDs currently claimed by a worker. + pub fn list_claims_by_worker(&self, worker_id: &str) -> Result> { + let mut conn = self.conn()?; + + let job_ids: Vec = execution_claims::table + .filter(execution_claims::worker_id.eq(worker_id)) + .select(execution_claims::job_id) + .load(&mut conn)?; + + Ok(job_ids) + } } diff --git a/crates/taskito-core/src/storage/redis_backend/workers.rs b/crates/taskito-core/src/storage/redis_backend/workers.rs index 45a6dee..5dfcf76 100644 --- a/crates/taskito-core/src/storage/redis_backend/workers.rs +++ b/crates/taskito-core/src/storage/redis_backend/workers.rs @@ -8,6 +8,7 @@ use crate::storage::models::WorkerRow; const DEAD_WORKER_THRESHOLD_MS: i64 = 30_000; impl RedisStorage { + #[allow(clippy::too_many_arguments)] pub fn register_worker( &self, worker_id: &str, @@ -16,6 +17,9 @@ impl RedisStorage { resources: Option<&str>, resource_health: Option<&str>, threads: i32, + hostname: Option<&str>, + pid: Option, + pool_type: Option<&str>, ) -> Result<()> { let mut conn = self.conn()?; let now = now_millis(); @@ -30,6 +34,10 @@ impl RedisStorage { pipe.hset(&wkey, "resources", resources.unwrap_or("")); pipe.hset(&wkey, "resource_health", resource_health.unwrap_or("")); pipe.hset(&wkey, "threads", threads); + pipe.hset(&wkey, "started_at", now); + pipe.hset(&wkey, "hostname", hostname.unwrap_or("")); + pipe.hset(&wkey, "pid", pid.unwrap_or(0)); + pipe.hset(&wkey, "pool_type", pool_type.unwrap_or("")); pipe.sadd(&wall, worker_id); pipe.query::<()>(&mut conn).map_err(map_err)?; @@ -49,6 +57,16 @@ impl RedisStorage { Ok(()) } + pub fn update_worker_status(&self, worker_id: &str, status: &str) -> Result<()> { + let mut conn = self.conn()?; + let wkey = self.key(&["worker", worker_id]); + + conn.hset::<_, _, _, ()>(&wkey, "status", status) + .map_err(map_err)?; + + Ok(()) + } + pub fn list_workers(&self) -> Result> { let mut conn = self.conn()?; let wall = self.key(&["workers", "all"]); @@ -89,43 +107,46 @@ impl RedisStorage { .get("threads") .and_then(|s| s.parse().ok()) .unwrap_or(0), + started_at: data.get("started_at").and_then(|s| s.parse().ok()), + hostname: to_opt("hostname"), + pid: data + .get("pid") + .and_then(|s| s.parse().ok()) + .filter(|&v: &i32| v != 0), + pool_type: to_opt("pool_type"), }); } Ok(rows) } - pub fn reap_dead_workers(&self) -> Result { + pub fn reap_dead_workers(&self) -> Result> { let mut conn = self.conn()?; let cutoff = now_millis().saturating_sub(DEAD_WORKER_THRESHOLD_MS); let wall = self.key(&["workers", "all"]); let worker_ids: Vec = conn.smembers(&wall).map_err(map_err)?; - let mut count = 0u64; + let mut reaped = Vec::new(); for wid in worker_ids { let wkey = self.key(&["worker", &wid]); let hb: Option = conn.hget(&wkey, "last_heartbeat").map_err(map_err)?; - if let Some(last_hb) = hb { - if last_hb < cutoff { - let pipe = &mut redis::pipe(); - pipe.del(&wkey); - pipe.srem(&wall, &wid); - pipe.query::<()>(&mut conn).map_err(map_err)?; - count += 1; - } - } else { - // No heartbeat data — remove + let is_dead = match hb { + Some(last_hb) => last_hb < cutoff, + None => true, + }; + + if is_dead { let pipe = &mut redis::pipe(); pipe.del(&wkey); pipe.srem(&wall, &wid); pipe.query::<()>(&mut conn).map_err(map_err)?; - count += 1; + reaped.push(wid); } } - Ok(count) + Ok(reaped) } pub fn unregister_worker(&self, worker_id: &str) -> Result<()> { @@ -140,4 +161,43 @@ impl RedisStorage { Ok(()) } + + pub fn list_claims_by_worker(&self, worker_id: &str) -> Result> { + let mut conn = self.conn()?; + let pattern = self.key(&["exec_claim", "*"]); + + let mut job_ids = Vec::new(); + let mut cursor: u64 = 0; + loop { + let (next_cursor, keys): (u64, Vec) = redis::cmd("SCAN") + .arg(cursor) + .arg("MATCH") + .arg(&pattern) + .arg("COUNT") + .arg(100) + .query(&mut conn) + .map_err(map_err)?; + + for key in keys { + let value: Option = conn.get(&key).map_err(map_err)?; + if let Some(val) = value { + // Value format is "{worker_id}:{timestamp}" + if val.starts_with(worker_id) && val[worker_id.len()..].starts_with(':') { + // Extract job_id from key: "{prefix}exec_claim:{job_id}" + let prefix = self.key(&["exec_claim", ""]); + if let Some(job_id) = key.strip_prefix(&prefix) { + job_ids.push(job_id.to_string()); + } + } + } + } + + cursor = next_cursor; + if cursor == 0 { + break; + } + } + + Ok(job_ids) + } } diff --git a/crates/taskito-core/src/storage/schema.rs b/crates/taskito-core/src/storage/schema.rs index bd2fa57..27d982c 100644 --- a/crates/taskito-core/src/storage/schema.rs +++ b/crates/taskito-core/src/storage/schema.rs @@ -153,6 +153,10 @@ diesel::table! { resources -> Nullable, resource_health -> Nullable, threads -> Integer, + started_at -> Nullable, + hostname -> Nullable, + pid -> Nullable, + pool_type -> Nullable, } } diff --git a/crates/taskito-core/src/storage/sqlite/mod.rs b/crates/taskito-core/src/storage/sqlite/mod.rs index 0dccf32..dc847e1 100644 --- a/crates/taskito-core/src/storage/sqlite/mod.rs +++ b/crates/taskito-core/src/storage/sqlite/mod.rs @@ -356,6 +356,15 @@ impl SqliteStorage { "ALTER TABLE workers ADD COLUMN threads INTEGER NOT NULL DEFAULT 0", ); + // Migration: add worker discovery metadata columns + migration_alter( + &mut conn, + "ALTER TABLE workers ADD COLUMN started_at INTEGER", + ); + migration_alter(&mut conn, "ALTER TABLE workers ADD COLUMN hostname TEXT"); + migration_alter(&mut conn, "ALTER TABLE workers ADD COLUMN pid INTEGER"); + migration_alter(&mut conn, "ALTER TABLE workers ADD COLUMN pool_type TEXT"); + // ── Queue State ────────────────────────────────── diesel::sql_query( "CREATE TABLE IF NOT EXISTS queue_state ( diff --git a/crates/taskito-core/src/storage/sqlite/workers.rs b/crates/taskito-core/src/storage/sqlite/workers.rs index 4fa093b..34a923f 100644 --- a/crates/taskito-core/src/storage/sqlite/workers.rs +++ b/crates/taskito-core/src/storage/sqlite/workers.rs @@ -1,7 +1,7 @@ use diesel::prelude::*; use super::super::models::*; -use super::super::schema::workers; +use super::super::schema::{execution_claims, workers}; use super::SqliteStorage; use crate::error::Result; use crate::job::now_millis; @@ -11,6 +11,7 @@ const DEAD_WORKER_THRESHOLD_MS: i64 = 30_000; impl SqliteStorage { /// Register a new worker or update an existing one. + #[allow(clippy::too_many_arguments)] pub fn register_worker( &self, worker_id: &str, @@ -19,6 +20,9 @@ impl SqliteStorage { resources: Option<&str>, resource_health: Option<&str>, threads: i32, + hostname: Option<&str>, + pid: Option, + pool_type: Option<&str>, ) -> Result<()> { let mut conn = self.conn()?; let now = now_millis(); @@ -32,6 +36,10 @@ impl SqliteStorage { resources, resource_health, threads, + started_at: Some(now), + hostname, + pid, + pool_type, }; diesel::replace_into(workers::table) @@ -57,6 +65,18 @@ impl SqliteStorage { Ok(()) } + /// Update the status of a worker. + pub fn update_worker_status(&self, worker_id: &str, status: &str) -> Result<()> { + let mut conn = self.conn()?; + + diesel::update(workers::table) + .filter(workers::worker_id.eq(worker_id)) + .set(workers::status.eq(status)) + .execute(&mut conn)?; + + Ok(()) + } + /// List all workers with their heartbeat status. pub fn list_workers(&self) -> Result> { let mut conn = self.conn()?; @@ -69,14 +89,22 @@ impl SqliteStorage { } /// Remove workers that haven't sent a heartbeat within the threshold. - pub fn reap_dead_workers(&self) -> Result { + /// Returns the IDs of the reaped workers. + pub fn reap_dead_workers(&self) -> Result> { let mut conn = self.conn()?; let cutoff = now_millis().saturating_sub(DEAD_WORKER_THRESHOLD_MS); - let affected = diesel::delete(workers::table.filter(workers::last_heartbeat.lt(cutoff))) - .execute(&mut conn)?; + let dead_ids: Vec = workers::table + .filter(workers::last_heartbeat.lt(cutoff)) + .select(workers::worker_id) + .load(&mut conn)?; + + if !dead_ids.is_empty() { + diesel::delete(workers::table.filter(workers::worker_id.eq_any(&dead_ids))) + .execute(&mut conn)?; + } - Ok(affected as u64) + Ok(dead_ids) } /// Unregister a worker (called on shutdown). @@ -88,4 +116,16 @@ impl SqliteStorage { Ok(()) } + + /// List all job IDs currently claimed by a worker. + pub fn list_claims_by_worker(&self, worker_id: &str) -> Result> { + let mut conn = self.conn()?; + + let job_ids: Vec = execution_claims::table + .filter(execution_claims::worker_id.eq(worker_id)) + .select(execution_claims::job_id) + .load(&mut conn)?; + + Ok(job_ids) + } } diff --git a/crates/taskito-core/src/storage/traits.rs b/crates/taskito-core/src/storage/traits.rs index 6f08874..ded6ffb 100644 --- a/crates/taskito-core/src/storage/traits.rs +++ b/crates/taskito-core/src/storage/traits.rs @@ -119,6 +119,7 @@ pub trait Storage: Send + Sync + Clone { // ── Worker operations ─────────────────────────────────────────── + #[allow(clippy::too_many_arguments)] fn register_worker( &self, worker_id: &str, @@ -127,11 +128,16 @@ pub trait Storage: Send + Sync + Clone { resources: Option<&str>, resource_health: Option<&str>, threads: i32, + hostname: Option<&str>, + pid: Option, + pool_type: Option<&str>, ) -> Result<()>; fn heartbeat(&self, worker_id: &str, resource_health: Option<&str>) -> Result<()>; + fn update_worker_status(&self, worker_id: &str, status: &str) -> Result<()>; fn list_workers(&self) -> Result>; - fn reap_dead_workers(&self) -> Result; + fn reap_dead_workers(&self) -> Result>; fn unregister_worker(&self, worker_id: &str) -> Result<()>; + fn list_claims_by_worker(&self, worker_id: &str) -> Result>; // ── Queue pause/resume ─────────────────────────────────────── diff --git a/crates/taskito-core/tests/rust/storage_tests.rs b/crates/taskito-core/tests/rust/storage_tests.rs index 2ba970b..d8e9ce1 100644 --- a/crates/taskito-core/tests/rust/storage_tests.rs +++ b/crates/taskito-core/tests/rust/storage_tests.rs @@ -165,8 +165,18 @@ fn test_workers(s: &impl Storage) { let resources = Some(r#"["db","redis"]"#); let health = Some(r#"{"db":"healthy","redis":"healthy"}"#); - s.register_worker("w-test-1", "q-workers", None, resources, health, 4) - .unwrap(); + s.register_worker( + "w-test-1", + "q-workers", + None, + resources, + health, + 4, + Some("test-host"), + Some(12345), + Some("thread"), + ) + .unwrap(); s.heartbeat("w-test-1", Some(r#"{"db":"unhealthy","redis":"healthy"}"#)) .unwrap(); @@ -176,6 +186,16 @@ fn test_workers(s: &impl Storage) { assert_eq!(w.threads, 4); assert!(w.resources.as_deref().unwrap().contains("db")); assert!(w.resource_health.as_deref().unwrap().contains("unhealthy")); + assert_eq!(w.hostname.as_deref(), Some("test-host")); + assert_eq!(w.pid, Some(12345)); + assert_eq!(w.pool_type.as_deref(), Some("thread")); + assert!(w.started_at.is_some()); + + // Test update_worker_status + s.update_worker_status("w-test-1", "draining").unwrap(); + let workers = s.list_workers().unwrap(); + let w = workers.iter().find(|w| w.worker_id == "w-test-1").unwrap(); + assert_eq!(w.status, "draining"); s.unregister_worker("w-test-1").unwrap(); } diff --git a/crates/taskito-python/Cargo.toml b/crates/taskito-python/Cargo.toml index dfbec66..e8dbd41 100644 --- a/crates/taskito-python/Cargo.toml +++ b/crates/taskito-python/Cargo.toml @@ -26,3 +26,4 @@ serde_json = { workspace = true } serde = { workspace = true } base64 = "0.22" log = { workspace = true } +gethostname = "1.1.0" diff --git a/crates/taskito-python/src/py_queue/inspection.rs b/crates/taskito-python/src/py_queue/inspection.rs index cc922dd..5d1b46a 100644 --- a/crates/taskito-python/src/py_queue/inspection.rs +++ b/crates/taskito-python/src/py_queue/inspection.rs @@ -377,6 +377,10 @@ impl PyQueue { dict.set_item("resources", r.resources)?; dict.set_item("resource_health", r.resource_health)?; dict.set_item("threads", r.threads)?; + dict.set_item("started_at", r.started_at)?; + dict.set_item("hostname", r.hostname)?; + dict.set_item("pid", r.pid)?; + dict.set_item("pool_type", r.pool_type)?; result.push(dict.into()); } Ok(result) diff --git a/crates/taskito-python/src/py_queue/worker.rs b/crates/taskito-python/src/py_queue/worker.rs index 1656922..e7c4c0a 100644 --- a/crates/taskito-python/src/py_queue/worker.rs +++ b/crates/taskito-python/src/py_queue/worker.rs @@ -333,6 +333,8 @@ impl PyQueue { // Generate or use the provided worker ID and register let worker_id = worker_id.unwrap_or_else(|| uuid::Uuid::now_v7().to_string()); + let hostname = gethostname::gethostname().to_string_lossy().to_string(); + let pid = std::process::id() as i32; let _ = self.storage.register_worker( &worker_id, &queues_str, @@ -340,6 +342,9 @@ impl PyQueue { resources.as_deref(), None, threads, + Some(&hostname), + Some(pid), + pool.as_deref(), ); // Create the async executor for native async tasks (if feature enabled) @@ -510,12 +515,24 @@ impl PyQueue { } /// Update the heartbeat for a running worker. Called from Python every 5s. + /// Returns a list of worker IDs that were reaped as dead. #[pyo3(signature = (worker_id, resource_health=None))] - pub fn worker_heartbeat(&self, worker_id: &str, resource_health: Option<&str>) -> PyResult<()> { + pub fn worker_heartbeat( + &self, + worker_id: &str, + resource_health: Option<&str>, + ) -> PyResult> { self.storage .heartbeat(worker_id, resource_health) .map_err(|e| pyo3::exceptions::PyRuntimeError::new_err(e.to_string()))?; - let _ = self.storage.reap_dead_workers(); - Ok(()) + let reaped = self.storage.reap_dead_workers().unwrap_or_default(); + Ok(reaped) + } + + /// Update the status of a worker. + pub fn set_worker_status(&self, worker_id: &str, status: &str) -> PyResult<()> { + self.storage + .update_worker_status(worker_id, status) + .map_err(|e| pyo3::exceptions::PyRuntimeError::new_err(e.to_string())) } } diff --git a/docs/api/queue.md b/docs/api/queue.md index 1b4c84e..aea2f2e 100644 --- a/docs/api/queue.md +++ b/docs/api/queue.md @@ -539,7 +539,22 @@ Start the worker loop. **Blocks** until interrupted. queue.workers() -> list[dict] ``` -Return live state of all registered workers: ID, hostname, started_at, last_heartbeat, current job. +Return live state of all registered workers. Each dict contains: + +| Key | Type | Description | +|-----|------|-------------| +| `worker_id` | `str` | Unique worker ID | +| `hostname` | `str` | OS hostname | +| `pid` | `int` | Process ID | +| `status` | `str` | `"active"` or `"draining"` | +| `pool_type` | `str` | `"thread"`, `"prefork"`, or `"native-async"` | +| `started_at` | `int` | Registration timestamp (ms since epoch) | +| `last_heartbeat` | `int` | Last heartbeat timestamp (ms) | +| `queues` | `str` | Comma-separated queue names | +| `threads` | `int` | Worker thread/process count | +| `tags` | `str \| None` | Worker specialization tags | +| `resources` | `str \| None` | Registered resource names (JSON) | +| `resource_health` | `str \| None` | Per-resource health status (JSON) | ### `await queue.aworkers()` diff --git a/docs/changelog.md b/docs/changelog.md index ed83064..674b641 100644 --- a/docs/changelog.md +++ b/docs/changelog.md @@ -7,13 +7,20 @@ All notable changes to taskito are documented here. ### Features - **Prefork worker pool** -- `queue.run_worker(pool="prefork", app="myapp:queue")` spawns child Python processes with independent GILs for true CPU parallelism; each child imports the app module, builds its own task registry, and executes tasks in a read-execute-write loop over JSON Lines IPC; the parent Rust scheduler dequeues jobs and dispatches to the least-loaded child via stdin pipes; reader threads parse child stdout and feed results back to the scheduler; graceful shutdown sends shutdown messages to children and waits with timeout before killing +- **Worker discovery** -- `queue.workers()` now returns `hostname`, `pid`, `pool_type`, and `started_at` for each worker, giving operators visibility into multi-machine deployments +- **Worker lifecycle events** -- three new event types: `WORKER_ONLINE` (registered in storage), `WORKER_OFFLINE` (dead worker reaped), `WORKER_UNHEALTHY` (resource health degraded); subscribe via `queue.on_event(EventType.WORKER_OFFLINE, callback)` +- **Worker status transitions** -- workers report `active → draining → stopped` status; shutdown signal sets status to `"draining"` before drain timeout, visible in `queue.workers()` and the dashboard +- **Orphan rescue prep** -- `list_claims_by_worker` storage method enables future orphaned job rescue when dead workers are detected ### Internal - New Rust module `crates/taskito-python/src/prefork/` with 4 files: `mod.rs` (PreforkPool + WorkerDispatcher impl), `child.rs` (ChildWriter/ChildReader/ChildProcess split handles), `protocol.rs` (ParentMessage/ChildMessage JSON serialization), `dispatch.rs` (least-loaded dispatcher) - New Python package `py_src/taskito/prefork/` with `child.py` (child process main loop), `__init__.py` (PreforkConfig), `__main__.py` (entry point) -- `base64` crate added to `taskito-python` dependencies for payload encoding over JSON +- `base64` and `gethostname` crates added to `taskito-python` dependencies - `run_worker()` gains `pool` and `app_path` parameters in both Rust (`py_queue/worker.rs`) and Python (`app.py`) +- `workers` table gains 4 columns: `started_at`, `hostname`, `pid`, `pool_type` (all backends + migrations) +- `reap_dead_workers` returns `Vec` (reaped worker IDs) instead of `u64`; enables `WORKER_OFFLINE` event emission +- New storage methods: `update_worker_status`, `list_claims_by_worker` across all 3 backends --- diff --git a/docs/guide/advanced.md b/docs/guide/advanced.md index d9f9e41..da2195d 100644 --- a/docs/guide/advanced.md +++ b/docs/guide/advanced.md @@ -135,15 +135,28 @@ jobs = process.map([(i,) for i in range(1000)]) ### `queue.enqueue_many()` ```python +# Basic batch — same options for all jobs jobs = queue.enqueue_many( task_name="myapp.process", args_list=[(i,) for i in range(1000)], - kwargs_list=None, # Optional per-job kwargs - priority=5, # Same priority for all - queue="processing", # Same queue for all + priority=5, + queue="processing", +) + +# Full parity with enqueue() — per-job overrides +jobs = queue.enqueue_many( + task_name="myapp.process", + args_list=[(i,) for i in range(100)], + delay=5.0, # uniform 5s delay for all + unique_keys=[f"item-{i}" for i in range(100)], # per-job dedup + metadata='{"source": "batch"}', # uniform metadata + expires=3600.0, # expire after 1 hour + result_ttl=600, # keep results for 10 minutes ) ``` +Per-job lists (`delay_list`, `metadata_list`, `expires_list`, `result_ttl_list`) override uniform values when both are provided. See the [API reference](../api/queue.md#queueenqueue_many) for the full parameter list. + ## Queue Pause/Resume Temporarily pause job processing on a queue without stopping the worker: diff --git a/docs/guide/events-webhooks.md b/docs/guide/events-webhooks.md index dbd9605..b59ff02 100644 --- a/docs/guide/events-webhooks.md +++ b/docs/guide/events-webhooks.md @@ -16,6 +16,9 @@ The `EventType` enum defines all available lifecycle events: | `JOB_CANCELLED` | A job is cancelled | `job_id`, `task_name` | | `WORKER_STARTED` | A worker process/thread comes online | `worker_id`, `hostname` | | `WORKER_STOPPED` | A worker process/thread shuts down | `worker_id`, `hostname` | +| `WORKER_ONLINE` | Worker registered in storage (visible to fleet) | `worker_id`, `queues`, `pool` | +| `WORKER_OFFLINE` | Dead worker reaped (no heartbeat for 30s) | `worker_id` | +| `WORKER_UNHEALTHY` | Resource health transitions to unhealthy | `worker_id`, `resources` | | `QUEUE_PAUSED` | A named queue is paused | `queue` | | `QUEUE_RESUMED` | A paused queue is resumed | `queue` | diff --git a/docs/guide/workers.md b/docs/guide/workers.md index 9d4c42c..df203c9 100644 --- a/docs/guide/workers.md +++ b/docs/guide/workers.md @@ -163,6 +163,65 @@ $ taskito worker --app myapp:queue queue._inner.request_shutdown() ``` +## Worker Discovery + +Inspect live workers across all machines: + +```python +for w in queue.workers(): + print(f"{w['worker_id']} on {w['hostname']} (pid {w['pid']}, {w['status']})") +``` + +Each worker entry includes: + +| Field | Type | Description | +|-------|------|-------------| +| `worker_id` | `str` | Unique ID (UUIDv7) | +| `hostname` | `str` | OS hostname | +| `pid` | `int` | Process ID | +| `status` | `str` | `"active"`, `"draining"`, or deleted on exit | +| `pool_type` | `str` | `"thread"`, `"prefork"`, or `"native-async"` | +| `started_at` | `int` | Registration timestamp (ms) | +| `queues` | `str` | Comma-separated queue names | +| `threads` | `int` | Worker thread/process count | +| `last_heartbeat` | `int` | Last heartbeat timestamp (ms) | + +### Status Lifecycle + +```mermaid +stateDiagram-v2 + [*] --> active: register + active --> draining: shutdown signal + draining --> [*]: clean exit + active --> [*]: crash (reaped after 30s) +``` + +### Lifecycle Events + +Subscribe to worker lifecycle changes: + +```python +from taskito import EventType + +@queue.on_event(EventType.WORKER_ONLINE) +def on_online(event_type, payload): + print(f"Worker {payload['worker_id']} joined") + +@queue.on_event(EventType.WORKER_OFFLINE) +def on_offline(event_type, payload): + print(f"Worker {payload['worker_id']} went away") + +@queue.on_event(EventType.WORKER_UNHEALTHY) +def on_unhealthy(event_type, payload): + print(f"Worker {payload['worker_id']} unhealthy: {payload['resources']}") +``` + +| Event | Fires when | Payload | +|-------|-----------|---------| +| `WORKER_ONLINE` | Worker registered in storage | `worker_id`, `queues`, `pool` | +| `WORKER_OFFLINE` | Dead worker reaped (no heartbeat for 30s) | `worker_id` | +| `WORKER_UNHEALTHY` | Resource health transitions to unhealthy | `worker_id`, `resources` | + ## Async Tasks `async def` task functions are dispatched natively — they run on a dedicated event loop thread, not wrapped in `asyncio.run()` on a worker thread. diff --git a/py_src/taskito/_taskito.pyi b/py_src/taskito/_taskito.pyi index 7099780..7a056d7 100644 --- a/py_src/taskito/_taskito.pyi +++ b/py_src/taskito/_taskito.pyi @@ -179,12 +179,15 @@ class PyQueue: threads: int = 1, async_concurrency: int = 100, queue_configs: str | None = None, + pool: str | None = None, + app_path: str | None = None, ) -> None: ... def worker_heartbeat( self, worker_id: str, resource_health: str | None = None, - ) -> None: ... + ) -> list[str]: ... + def set_worker_status(self, worker_id: str, status: str) -> None: ... def pause_queue(self, queue_name: str) -> None: ... def resume_queue(self, queue_name: str) -> None: ... def list_paused_queues(self) -> list[str]: ... diff --git a/py_src/taskito/app.py b/py_src/taskito/app.py index 6eb0ec4..d6f8d90 100644 --- a/py_src/taskito/app.py +++ b/py_src/taskito/app.py @@ -1175,6 +1175,8 @@ def run_worker( def shutdown_handler(signum: int, frame: Any) -> None: logger.info("Warm shutdown (waiting for running tasks to finish)...") + with contextlib.suppress(Exception): + self._inner.set_worker_status(worker_id, "draining") self._inner.request_shutdown() # Restore original handlers so a second signal force-kills signal.signal(signal.SIGINT, original_sigint) @@ -1219,6 +1221,10 @@ def sighup_handler(signum: int, frame: Any) -> None: EventType.WORKER_STARTED, {"worker_id": worker_id, "queues": worker_queues}, ) + self._emit_event( + EventType.WORKER_ONLINE, + {"worker_id": worker_id, "queues": worker_queues, "pool": pool}, + ) try: queue_configs_json = json.dumps(self._queue_configs) if self._queue_configs else None @@ -1277,12 +1283,32 @@ def _run_heartbeat( stop_event: threading.Event, ) -> None: """Send periodic heartbeats to storage with current resource health.""" + prev_unhealthy: set[str] = set() while not stop_event.is_set(): resource_health = self._build_resource_health_json() try: - self._inner.worker_heartbeat(worker_id, resource_health) + reaped_ids = self._inner.worker_heartbeat(worker_id, resource_health) + # Emit WORKER_OFFLINE events for reaped dead workers + for rid in reaped_ids: + self._emit_event(EventType.WORKER_OFFLINE, {"worker_id": rid}) except Exception: logger.debug("Heartbeat failed", exc_info=True) + + # Detect health transitions → emit WORKER_UNHEALTHY + runtime = self._resource_runtime + if runtime is not None: + current_unhealthy = set(runtime._unhealthy) + new_unhealthy = current_unhealthy - prev_unhealthy + if new_unhealthy: + self._emit_event( + EventType.WORKER_UNHEALTHY, + { + "worker_id": worker_id, + "resources": sorted(new_unhealthy), + }, + ) + prev_unhealthy = current_unhealthy + stop_event.wait(timeout=5.0) # -- Resource Status -- diff --git a/py_src/taskito/events.py b/py_src/taskito/events.py index d8dff03..215e00c 100644 --- a/py_src/taskito/events.py +++ b/py_src/taskito/events.py @@ -23,6 +23,9 @@ class EventType(enum.Enum): JOB_CANCELLED = "job.cancelled" WORKER_STARTED = "worker.started" WORKER_STOPPED = "worker.stopped" + WORKER_ONLINE = "worker.online" + WORKER_OFFLINE = "worker.offline" + WORKER_UNHEALTHY = "worker.unhealthy" QUEUE_PAUSED = "queue.paused" QUEUE_RESUMED = "queue.resumed" diff --git a/tests/python/test_events.py b/tests/python/test_events.py index e8277fb..bc13ae5 100644 --- a/tests/python/test_events.py +++ b/tests/python/test_events.py @@ -83,6 +83,9 @@ def test_all_event_types_exist() -> None: "job.cancelled", "worker.started", "worker.stopped", + "worker.online", + "worker.offline", + "worker.unhealthy", "queue.paused", "queue.resumed", }