Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 29 additions & 4 deletions crates/taskito-core/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -352,8 +352,11 @@ macro_rules! impl_storage {
resources: Option<&str>,
resource_health: Option<&str>,
threads: i32,
hostname: Option<&str>,
pid: Option<i32>,
pool_type: Option<&str>,
) -> $crate::error::Result<()> {
self.register_worker(worker_id, queues, tags, resources, resource_health, threads)
self.register_worker(worker_id, queues, tags, resources, resource_health, threads, hostname, pid, pool_type)
}
fn heartbeat(
&self,
Expand All @@ -362,17 +365,27 @@ macro_rules! impl_storage {
) -> $crate::error::Result<()> {
self.heartbeat(worker_id, resource_health)
}
fn update_worker_status(
&self,
worker_id: &str,
status: &str,
) -> $crate::error::Result<()> {
self.update_worker_status(worker_id, status)
}
fn list_workers(
&self,
) -> $crate::error::Result<Vec<$crate::storage::models::WorkerRow>> {
self.list_workers()
}
fn reap_dead_workers(&self) -> $crate::error::Result<u64> {
fn reap_dead_workers(&self) -> $crate::error::Result<Vec<String>> {
self.reap_dead_workers()
}
fn unregister_worker(&self, worker_id: &str) -> $crate::error::Result<()> {
self.unregister_worker(worker_id)
}
fn list_claims_by_worker(&self, worker_id: &str) -> $crate::error::Result<Vec<String>> {
self.list_claims_by_worker(worker_id)
}
fn pause_queue(&self, queue_name: &str) -> $crate::error::Result<()> {
self.pause_queue(queue_name)
}
Expand Down Expand Up @@ -739,6 +752,9 @@ impl Storage for StorageBackend {
resources: Option<&str>,
resource_health: Option<&str>,
threads: i32,
hostname: Option<&str>,
pid: Option<i32>,
pool_type: Option<&str>,
) -> Result<()> {
delegate!(
self,
Expand All @@ -748,21 +764,30 @@ impl Storage for StorageBackend {
tags,
resources,
resource_health,
threads
threads,
hostname,
pid,
pool_type
)
}
fn heartbeat(&self, worker_id: &str, resource_health: Option<&str>) -> Result<()> {
delegate!(self, heartbeat, worker_id, resource_health)
}
fn update_worker_status(&self, worker_id: &str, status: &str) -> Result<()> {
delegate!(self, update_worker_status, worker_id, status)
}
fn list_workers(&self) -> Result<Vec<models::WorkerRow>> {
delegate!(self, list_workers)
}
fn reap_dead_workers(&self) -> Result<u64> {
fn reap_dead_workers(&self) -> Result<Vec<String>> {
delegate!(self, reap_dead_workers)
}
fn unregister_worker(&self, worker_id: &str) -> Result<()> {
delegate!(self, unregister_worker, worker_id)
}
fn list_claims_by_worker(&self, worker_id: &str) -> Result<Vec<String>> {
delegate!(self, list_claims_by_worker, worker_id)
}
fn pause_queue(&self, queue_name: &str) -> Result<()> {
delegate!(self, pause_queue, queue_name)
}
Expand Down
8 changes: 8 additions & 0 deletions crates/taskito-core/src/storage/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,10 @@ pub struct WorkerRow {
pub resources: Option<String>,
pub resource_health: Option<String>,
pub threads: i32,
pub started_at: Option<i64>,
pub hostname: Option<String>,
pub pid: Option<i32>,
pub pool_type: Option<String>,
}

#[derive(Insertable, AsChangeset, Debug)]
Expand All @@ -320,6 +324,10 @@ pub struct NewWorkerRow<'a> {
pub resources: Option<&'a str>,
pub resource_health: Option<&'a str>,
pub threads: i32,
pub started_at: Option<i64>,
pub hostname: Option<&'a str>,
pub pid: Option<i32>,
pub pool_type: Option<&'a str>,
}

// ── Queue State ─────────────────────────────────────────────────
Expand Down
18 changes: 18 additions & 0 deletions crates/taskito-core/src/storage/postgres/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,24 @@ impl PostgresStorage {
"ALTER TABLE workers ADD COLUMN IF NOT EXISTS threads INTEGER NOT NULL DEFAULT 0",
);

// Migration: add worker discovery metadata columns
migration_alter(
&mut conn,
"ALTER TABLE workers ADD COLUMN IF NOT EXISTS started_at BIGINT",
);
migration_alter(
&mut conn,
"ALTER TABLE workers ADD COLUMN IF NOT EXISTS hostname TEXT",
);
migration_alter(
&mut conn,
"ALTER TABLE workers ADD COLUMN IF NOT EXISTS pid INTEGER",
);
migration_alter(
&mut conn,
"ALTER TABLE workers ADD COLUMN IF NOT EXISTS pool_type TEXT",
);

diesel::sql_query(
"CREATE TABLE IF NOT EXISTS queue_state (
queue_name TEXT PRIMARY KEY,
Expand Down
50 changes: 45 additions & 5 deletions crates/taskito-core/src/storage/postgres/workers.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use diesel::prelude::*;

use super::super::models::*;
use super::super::schema::workers;
use super::super::schema::{execution_claims, workers};
use super::PostgresStorage;
use crate::error::Result;
use crate::job::now_millis;
Expand All @@ -11,6 +11,7 @@ const DEAD_WORKER_THRESHOLD_MS: i64 = 30_000;

impl PostgresStorage {
/// Register a new worker or update an existing one.
#[allow(clippy::too_many_arguments)]
pub fn register_worker(
&self,
worker_id: &str,
Expand All @@ -19,6 +20,9 @@ impl PostgresStorage {
resources: Option<&str>,
resource_health: Option<&str>,
threads: i32,
hostname: Option<&str>,
pid: Option<i32>,
pool_type: Option<&str>,
) -> Result<()> {
let mut conn = self.conn()?;
let now = now_millis();
Expand All @@ -32,6 +36,10 @@ impl PostgresStorage {
resources,
resource_health,
threads,
started_at: Some(now),
hostname,
pid,
pool_type,
};

diesel::insert_into(workers::table)
Expand Down Expand Up @@ -60,6 +68,18 @@ impl PostgresStorage {
Ok(())
}

/// Update the status of a worker.
pub fn update_worker_status(&self, worker_id: &str, status: &str) -> Result<()> {
let mut conn = self.conn()?;

diesel::update(workers::table)
.filter(workers::worker_id.eq(worker_id))
.set(workers::status.eq(status))
.execute(&mut conn)?;

Ok(())
}

/// List all workers with their heartbeat status.
pub fn list_workers(&self) -> Result<Vec<WorkerRow>> {
let mut conn = self.conn()?;
Expand All @@ -72,14 +92,22 @@ impl PostgresStorage {
}

/// Remove workers that haven't sent a heartbeat within the threshold.
pub fn reap_dead_workers(&self) -> Result<u64> {
/// Returns the IDs of the reaped workers.
pub fn reap_dead_workers(&self) -> Result<Vec<String>> {
let mut conn = self.conn()?;
let cutoff = now_millis().saturating_sub(DEAD_WORKER_THRESHOLD_MS);

let affected = diesel::delete(workers::table.filter(workers::last_heartbeat.lt(cutoff)))
.execute(&mut conn)?;
let dead_ids: Vec<String> = workers::table
.filter(workers::last_heartbeat.lt(cutoff))
.select(workers::worker_id)
.load(&mut conn)?;

if !dead_ids.is_empty() {
diesel::delete(workers::table.filter(workers::worker_id.eq_any(&dead_ids)))
.execute(&mut conn)?;
}

Ok(affected as u64)
Ok(dead_ids)
}

/// Unregister a worker (called on shutdown).
Expand All @@ -91,4 +119,16 @@ impl PostgresStorage {

Ok(())
}

/// List all job IDs currently claimed by a worker.
pub fn list_claims_by_worker(&self, worker_id: &str) -> Result<Vec<String>> {
let mut conn = self.conn()?;

let job_ids: Vec<String> = execution_claims::table
.filter(execution_claims::worker_id.eq(worker_id))
.select(execution_claims::job_id)
.load(&mut conn)?;

Ok(job_ids)
}
}
88 changes: 74 additions & 14 deletions crates/taskito-core/src/storage/redis_backend/workers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::storage::models::WorkerRow;
const DEAD_WORKER_THRESHOLD_MS: i64 = 30_000;

impl RedisStorage {
#[allow(clippy::too_many_arguments)]
pub fn register_worker(
&self,
worker_id: &str,
Expand All @@ -16,6 +17,9 @@ impl RedisStorage {
resources: Option<&str>,
resource_health: Option<&str>,
threads: i32,
hostname: Option<&str>,
pid: Option<i32>,
pool_type: Option<&str>,
) -> Result<()> {
let mut conn = self.conn()?;
let now = now_millis();
Expand All @@ -30,6 +34,10 @@ impl RedisStorage {
pipe.hset(&wkey, "resources", resources.unwrap_or(""));
pipe.hset(&wkey, "resource_health", resource_health.unwrap_or(""));
pipe.hset(&wkey, "threads", threads);
pipe.hset(&wkey, "started_at", now);
pipe.hset(&wkey, "hostname", hostname.unwrap_or(""));
pipe.hset(&wkey, "pid", pid.unwrap_or(0));
pipe.hset(&wkey, "pool_type", pool_type.unwrap_or(""));
pipe.sadd(&wall, worker_id);
pipe.query::<()>(&mut conn).map_err(map_err)?;

Expand All @@ -49,6 +57,16 @@ impl RedisStorage {
Ok(())
}

pub fn update_worker_status(&self, worker_id: &str, status: &str) -> Result<()> {
let mut conn = self.conn()?;
let wkey = self.key(&["worker", worker_id]);

conn.hset::<_, _, _, ()>(&wkey, "status", status)
.map_err(map_err)?;

Ok(())
}

pub fn list_workers(&self) -> Result<Vec<WorkerRow>> {
let mut conn = self.conn()?;
let wall = self.key(&["workers", "all"]);
Expand Down Expand Up @@ -89,43 +107,46 @@ impl RedisStorage {
.get("threads")
.and_then(|s| s.parse().ok())
.unwrap_or(0),
started_at: data.get("started_at").and_then(|s| s.parse().ok()),
hostname: to_opt("hostname"),
pid: data
.get("pid")
.and_then(|s| s.parse().ok())
.filter(|&v: &i32| v != 0),
pool_type: to_opt("pool_type"),
});
}

Ok(rows)
}

pub fn reap_dead_workers(&self) -> Result<u64> {
pub fn reap_dead_workers(&self) -> Result<Vec<String>> {
let mut conn = self.conn()?;
let cutoff = now_millis().saturating_sub(DEAD_WORKER_THRESHOLD_MS);
let wall = self.key(&["workers", "all"]);

let worker_ids: Vec<String> = conn.smembers(&wall).map_err(map_err)?;

let mut count = 0u64;
let mut reaped = Vec::new();
for wid in worker_ids {
let wkey = self.key(&["worker", &wid]);
let hb: Option<i64> = conn.hget(&wkey, "last_heartbeat").map_err(map_err)?;

if let Some(last_hb) = hb {
if last_hb < cutoff {
let pipe = &mut redis::pipe();
pipe.del(&wkey);
pipe.srem(&wall, &wid);
pipe.query::<()>(&mut conn).map_err(map_err)?;
count += 1;
}
} else {
// No heartbeat data — remove
let is_dead = match hb {
Some(last_hb) => last_hb < cutoff,
None => true,
};

if is_dead {
let pipe = &mut redis::pipe();
pipe.del(&wkey);
pipe.srem(&wall, &wid);
pipe.query::<()>(&mut conn).map_err(map_err)?;
count += 1;
reaped.push(wid);
}
}

Ok(count)
Ok(reaped)
}

pub fn unregister_worker(&self, worker_id: &str) -> Result<()> {
Expand All @@ -140,4 +161,43 @@ impl RedisStorage {

Ok(())
}

pub fn list_claims_by_worker(&self, worker_id: &str) -> Result<Vec<String>> {
let mut conn = self.conn()?;
let pattern = self.key(&["exec_claim", "*"]);

let mut job_ids = Vec::new();
let mut cursor: u64 = 0;
loop {
let (next_cursor, keys): (u64, Vec<String>) = redis::cmd("SCAN")
.arg(cursor)
.arg("MATCH")
.arg(&pattern)
.arg("COUNT")
.arg(100)
.query(&mut conn)
.map_err(map_err)?;

for key in keys {
let value: Option<String> = conn.get(&key).map_err(map_err)?;
if let Some(val) = value {
// Value format is "{worker_id}:{timestamp}"
if val.starts_with(worker_id) && val[worker_id.len()..].starts_with(':') {
// Extract job_id from key: "{prefix}exec_claim:{job_id}"
let prefix = self.key(&["exec_claim", ""]);
if let Some(job_id) = key.strip_prefix(&prefix) {
job_ids.push(job_id.to_string());
}
}
}
}

cursor = next_cursor;
if cursor == 0 {
break;
}
}

Ok(job_ids)
}
}
4 changes: 4 additions & 0 deletions crates/taskito-core/src/storage/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,10 @@ diesel::table! {
resources -> Nullable<Text>,
resource_health -> Nullable<Text>,
threads -> Integer,
started_at -> Nullable<BigInt>,
hostname -> Nullable<Text>,
pid -> Nullable<Integer>,
pool_type -> Nullable<Text>,
}
}

Expand Down
Loading
Loading