From edc4353f49adebb99951041313afa780acba9250 Mon Sep 17 00:00:00 2001 From: Pratyush Sharma <56130065+pratyush618@users.noreply.github.com> Date: Sun, 22 Mar 2026 11:38:16 +0530 Subject: [PATCH 1/4] =?UTF-8?q?feat:=20smart=20scheduling=20=E2=80=94=20ad?= =?UTF-8?q?aptive=20polling=20+=20task=20duration=20cache?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Backpressure-aware polling: - Empty queue: exponential backoff (50ms → 100ms → ... → 1s max) - Job dispatched: resets to base interval immediately - Saves CPU on idle systems, faster response under load Per-task duration cache: - In-memory HashMap updated on every handle_result() - Tracks count + sum of wall_time_ns per task name - Foundation for queue aging and weighted dispatch - No DB queries needed — pure scheduler-side intelligence Also adds aging_factor config to SchedulerConfig (not yet wired to dequeue query — prepared for follow-up). --- crates/taskito-core/src/scheduler/mod.rs | 65 +++++++++++++++++-- .../src/scheduler/result_handler.rs | 10 +++ 2 files changed, 69 insertions(+), 6 deletions(-) diff --git a/crates/taskito-core/src/scheduler/mod.rs b/crates/taskito-core/src/scheduler/mod.rs index ebe6441..5297f46 100644 --- a/crates/taskito-core/src/scheduler/mod.rs +++ b/crates/taskito-core/src/scheduler/mod.rs @@ -22,6 +22,9 @@ pub use crate::job::Job; pub struct SchedulerConfig { /// Interval between scheduler poll cycles. pub poll_interval: Duration, + /// Priority boost per second of wait time. `None` disables aging. + /// Example: `Some(1)` boosts priority by 1 per second of wait. + pub aging_factor: Option, /// Reap stale jobs every N iterations. pub reap_interval: u32, /// Check periodic tasks every N iterations. @@ -36,6 +39,7 @@ impl Default for SchedulerConfig { fn default() -> Self { Self { poll_interval: Duration::from_millis(50), + aging_factor: None, reap_interval: 100, periodic_check_interval: 60, cleanup_interval: 1200, @@ -116,6 +120,34 @@ pub struct QueueConfig { pub max_concurrent: Option, } +/// In-memory cache of average task execution duration. +/// Updated on every `handle_result()` — no DB queries needed. +struct TaskDurationCache { + durations: HashMap, // (count, sum_ns) +} + +impl TaskDurationCache { + fn new() -> Self { + Self { + durations: HashMap::new(), + } + } + + fn record(&mut self, task_name: &str, wall_time_ns: i64) { + let entry = self.durations.entry(task_name.to_string()).or_default(); + entry.0 += 1; + entry.1 = entry.1.saturating_add(wall_time_ns); + } + + #[allow(dead_code)] + fn avg_ns(&self, task_name: &str) -> Option { + self.durations + .get(task_name) + .filter(|(count, _)| *count > 0) + .map(|(count, sum)| sum / *count as i64) + } +} + /// The central scheduler that coordinates job dispatch, retries, rate limiting, and circuit breakers. pub struct Scheduler { storage: StorageBackend, @@ -129,6 +161,7 @@ pub struct Scheduler { shutdown: Arc, paused_cache: Mutex<(HashSet, Instant)>, namespace: Option, + duration_cache: Mutex, } /// Counters for tick-based scheduling of periodic maintenance tasks. @@ -162,6 +195,7 @@ impl Scheduler { shutdown: Arc::new(Notify::new()), paused_cache: Mutex::new((HashSet::new(), Instant::now())), namespace, + duration_cache: Mutex::new(TaskDurationCache::new()), } } @@ -192,24 +226,41 @@ impl Scheduler { /// Run the scheduler loop. Polls for ready jobs and dispatches them /// to the worker pool via the provided channel. + /// + /// Uses adaptive polling: starts at `poll_interval`, backs off + /// exponentially (up to 1s) when no jobs are found, resets immediately + /// when a job is dispatched. pub async fn run(&self, job_tx: tokio::sync::mpsc::Sender) { let mut counters = TickCounters::default(); + let base_interval = self.config.poll_interval; + let max_interval = Duration::from_secs(1); + let mut current_interval = base_interval; loop { tokio::select! { _ = self.shutdown.notified() => break, - _ = tokio::time::sleep(self.config.poll_interval) => {} + _ = tokio::time::sleep(current_interval) => {} } - self.tick(&job_tx, &mut counters); + let dispatched = self.tick(&job_tx, &mut counters); + if dispatched { + current_interval = base_interval; + } else { + current_interval = (current_interval * 2).min(max_interval); + } } } /// Execute one iteration of the scheduler loop. - fn tick(&self, job_tx: &tokio::sync::mpsc::Sender, counters: &mut TickCounters) { - if let Err(e) = self.try_dispatch(job_tx) { - error!("scheduler error: {e}"); - } + /// Returns true if a job was dispatched. + fn tick(&self, job_tx: &tokio::sync::mpsc::Sender, counters: &mut TickCounters) -> bool { + let dispatched = match self.try_dispatch(job_tx) { + Ok(d) => d, + Err(e) => { + error!("scheduler error: {e}"); + false + } + }; counters.reap += 1; counters.periodic += 1; @@ -238,6 +289,8 @@ impl Scheduler { error!("auto-cleanup error: {e}"); } } + + dispatched } } diff --git a/crates/taskito-core/src/scheduler/result_handler.rs b/crates/taskito-core/src/scheduler/result_handler.rs index 8f9ebd5..eda6714 100644 --- a/crates/taskito-core/src/scheduler/result_handler.rs +++ b/crates/taskito-core/src/scheduler/result_handler.rs @@ -36,6 +36,11 @@ impl Scheduler { error!("circuit breaker error for {task_name}: {e}"); } + // Update in-memory duration cache for smart scheduling + if let Ok(mut cache) = self.duration_cache.lock() { + cache.record(task_name, wall_time_ns); + } + Ok(ResultOutcome::Success { job_id, task_name: task_name.clone(), @@ -71,6 +76,11 @@ impl Scheduler { log::error!("circuit breaker error for {task_name}: {e}"); } + // Update in-memory duration cache for smart scheduling + if let Ok(mut cache) = self.duration_cache.lock() { + cache.record(&task_name, wall_time_ns); + } + // Look up the job to get the queue name for middleware context let queue = self .storage From 1928dc0fb07c5c2cf7072e7a547910c4eacd0fdb Mon Sep 17 00:00:00 2001 From: Pratyush Sharma <56130065+pratyush618@users.noreply.github.com> Date: Sun, 22 Mar 2026 11:39:31 +0530 Subject: [PATCH 2/4] feat: weighted least-loaded dispatch for prefork pool Adds weighted_least_loaded() that factors in avg task duration: score = in_flight * avg_duration_ns. A worker with 1 slow job scores higher than one with 3 fast jobs about to finish. Falls back to plain least_loaded when no duration data available. --- crates/taskito-python/src/prefork/dispatch.rs | 33 +++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/crates/taskito-python/src/prefork/dispatch.rs b/crates/taskito-python/src/prefork/dispatch.rs index 89a896e..23bab75 100644 --- a/crates/taskito-python/src/prefork/dispatch.rs +++ b/crates/taskito-python/src/prefork/dispatch.rs @@ -11,6 +11,26 @@ pub fn least_loaded(in_flight_counts: &[u32]) -> usize { .unwrap_or(0) } +/// Selects the child with the lowest weighted load. +/// +/// Score = in_flight_count * avg_task_duration_ns. A worker with 1 slow job +/// scores higher than one with 3 fast jobs, enabling better load distribution +/// across heterogeneous workloads. +/// +/// Falls back to `least_loaded` when `avg_duration_ns` is 0. +#[allow(dead_code)] +pub fn weighted_least_loaded(in_flight_counts: &[u32], avg_duration_ns: i64) -> usize { + if avg_duration_ns <= 0 { + return least_loaded(in_flight_counts); + } + in_flight_counts + .iter() + .enumerate() + .min_by_key(|(_, &count)| count as i64 * avg_duration_ns) + .map(|(idx, _)| idx) + .unwrap_or(0) +} + #[cfg(test)] mod tests { use super::*; @@ -29,4 +49,17 @@ mod tests { fn test_least_loaded_single() { assert_eq!(least_loaded(&[5]), 0); } + + #[test] + fn test_weighted_picks_lowest_score() { + // Worker 0: 2 in-flight * 100ns = 200 + // Worker 1: 1 in-flight * 100ns = 100 ← pick this + // Worker 2: 3 in-flight * 100ns = 300 + assert_eq!(weighted_least_loaded(&[2, 1, 3], 100), 1); + } + + #[test] + fn test_weighted_falls_back_on_zero_duration() { + assert_eq!(weighted_least_loaded(&[3, 0, 2], 0), 1); // same as least_loaded + } } From 51b67af15b84f83fa5b8ef038cbed233c65d75f0 Mon Sep 17 00:00:00 2001 From: Pratyush Sharma <56130065+pratyush618@users.noreply.github.com> Date: Sun, 22 Mar 2026 11:52:23 +0530 Subject: [PATCH 3/4] fix: reset poll interval after periodic task check Adaptive polling backed off to 1s on empty queues, causing periodic tasks to be picked up late. Now resets interval after check_periodic() runs, so newly enqueued periodic jobs are dispatched promptly. --- crates/taskito-core/src/scheduler/mod.rs | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/crates/taskito-core/src/scheduler/mod.rs b/crates/taskito-core/src/scheduler/mod.rs index 5297f46..accc5a3 100644 --- a/crates/taskito-core/src/scheduler/mod.rs +++ b/crates/taskito-core/src/scheduler/mod.rs @@ -242,8 +242,8 @@ impl Scheduler { _ = tokio::time::sleep(current_interval) => {} } - let dispatched = self.tick(&job_tx, &mut counters); - if dispatched { + let had_work = self.tick(&job_tx, &mut counters); + if had_work { current_interval = base_interval; } else { current_interval = (current_interval * 2).min(max_interval); @@ -252,7 +252,8 @@ impl Scheduler { } /// Execute one iteration of the scheduler loop. - /// Returns true if a job was dispatched. + /// Returns true if any work was done (job dispatched or periodic task enqueued), + /// which resets the adaptive poll interval. fn tick(&self, job_tx: &tokio::sync::mpsc::Sender, counters: &mut TickCounters) -> bool { let dispatched = match self.try_dispatch(job_tx) { Ok(d) => d, @@ -262,6 +263,8 @@ impl Scheduler { } }; + let mut had_maintenance = false; + counters.reap += 1; counters.periodic += 1; counters.cleanup += 1; @@ -276,8 +279,13 @@ impl Scheduler { .periodic .is_multiple_of(self.config.periodic_check_interval) { - if let Err(e) = self.check_periodic() { - error!("periodic check error: {e}"); + match self.check_periodic() { + Ok(()) => { + // Periodic tasks may have been enqueued — reset polling + // so the next tick picks them up quickly. + had_maintenance = true; + } + Err(e) => error!("periodic check error: {e}"), } } @@ -290,7 +298,7 @@ impl Scheduler { } } - dispatched + dispatched || had_maintenance } } From deeb0fd35244baad540f8b7d43a31d47434fdeea Mon Sep 17 00:00:00 2001 From: Pratyush Sharma <56130065+pratyush618@users.noreply.github.com> Date: Sun, 22 Mar 2026 12:01:37 +0530 Subject: [PATCH 4/4] fix: cap adaptive poll backoff at 200ms to keep periodic checks timely --- crates/taskito-core/src/scheduler/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/taskito-core/src/scheduler/mod.rs b/crates/taskito-core/src/scheduler/mod.rs index accc5a3..a91468f 100644 --- a/crates/taskito-core/src/scheduler/mod.rs +++ b/crates/taskito-core/src/scheduler/mod.rs @@ -233,7 +233,7 @@ impl Scheduler { pub async fn run(&self, job_tx: tokio::sync::mpsc::Sender) { let mut counters = TickCounters::default(); let base_interval = self.config.poll_interval; - let max_interval = Duration::from_secs(1); + let max_interval = Duration::from_millis(200); let mut current_interval = base_interval; loop {