From 8a2251808a20f071b0043bb5f7defbcc14415c09 Mon Sep 17 00:00:00 2001 From: DJ Majumdar Date: Mon, 23 Mar 2026 17:07:00 -0700 Subject: [PATCH 1/8] perf!: return IDs instead of full TaskRecords from tag queries MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Rename `tasks_by_tags` → `task_ids_by_tags` and `tasks_by_tag_key_prefix` → `task_ids_by_tag_key_prefix` across store, scheduler, domain, and module layers. Queries now SELECT only `t.id`, skip `populate_tags`, and drop the ORDER BY clause — avoiding full row deserialization and N+1 tag lookups when callers (cancel_by_tag, cancel_by_tag_key_prefix) only need the ID. Extracts shared join-building logic into `build_tag_join_sql`. --- benches/tags.rs | 10 ++--- src/domain.rs | 16 +++---- src/lib.rs | 4 +- src/module.rs | 26 +++++------ src/scheduler/queries.rs | 18 ++++---- src/scheduler/submit.rs | 25 +++++------ src/store/query/tags.rs | 93 +++++++++++++++++++--------------------- src/store/query/tests.rs | 46 ++++++++++++-------- 8 files changed, 124 insertions(+), 114 deletions(-) diff --git a/benches/tags.rs b/benches/tags.rs index e1ee8a3..60c82d4 100644 --- a/benches/tags.rs +++ b/benches/tags.rs @@ -96,11 +96,11 @@ fn bench_submit_with_tags(c: &mut Criterion) { group.finish(); } -/// `tasks_by_tags` with a single filter at varying queue depths. +/// `task_ids_by_tags` with a single filter at varying queue depths. /// All tasks match the filter, so result size equals queue depth. -fn bench_query_by_tags(c: &mut Criterion) { +fn bench_query_ids_by_tags(c: &mut Criterion) { let rt = Runtime::new().unwrap(); - let mut group = c.benchmark_group("query_by_tags"); + let mut group = c.benchmark_group("query_ids_by_tags"); group.throughput(Throughput::Elements(1)); group.sample_size(10); group.warm_up_time(Duration::from_secs(1)); @@ -119,7 +119,7 @@ fn bench_query_by_tags(c: &mut Criterion) { let start = Instant::now(); for _ in 0..iters { let _ = store - .tasks_by_tags(&[("bucket", "b0")], None) + .task_ids_by_tags(&[("bucket", "b0")], None) .await .unwrap(); } @@ -214,7 +214,7 @@ fn bench_tag_values_scan(c: &mut Criterion) { criterion_group!( benches, bench_submit_with_tags, - bench_query_by_tags, + bench_query_ids_by_tags, bench_count_by_tags, bench_tag_values_scan, ); diff --git a/src/domain.rs b/src/domain.rs index 7b351fa..642aea1 100644 --- a/src/domain.rs +++ b/src/domain.rs @@ -640,13 +640,13 @@ impl DomainHandle { self.inner.cancel_where(predicate).await } - /// Find domain tasks matching all specified tag filters (AND semantics). - pub async fn tasks_by_tags( + /// Return IDs of domain tasks matching all specified tag filters (AND semantics). + pub async fn task_ids_by_tags( &self, filters: &[(&str, &str)], status: Option, - ) -> Result, StoreError> { - self.inner.tasks_by_tags(filters, status).await + ) -> Result, StoreError> { + self.inner.task_ids_by_tags(filters, status).await } /// Discover tag keys matching a prefix within this domain. @@ -654,13 +654,13 @@ impl DomainHandle { self.inner.tag_keys_by_prefix(prefix).await } - /// Find domain tasks with any tag key matching the given prefix. - pub async fn tasks_by_tag_key_prefix( + /// Return IDs of domain tasks with any tag key matching the given prefix. + pub async fn task_ids_by_tag_key_prefix( &self, prefix: &str, status: Option, - ) -> Result, StoreError> { - self.inner.tasks_by_tag_key_prefix(prefix, status).await + ) -> Result, StoreError> { + self.inner.task_ids_by_tag_key_prefix(prefix, status).await } /// Count domain tasks with any tag key matching the given prefix. diff --git a/src/lib.rs b/src/lib.rs index 45756cd..f27580d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -172,10 +172,10 @@ //! Tags are copied to history on all terminal transitions and are included in //! [`TaskEventHeader`] for event subscribers. //! -//! Query by exact tags via [`DomainHandle::tasks_by_tags`] (AND semantics), +//! Query by exact tags via [`DomainHandle::task_ids_by_tags`] (AND semantics), //! or discover and query by tag key prefix via //! [`DomainHandle::tag_keys_by_prefix`], -//! [`DomainHandle::tasks_by_tag_key_prefix`], +//! [`DomainHandle::task_ids_by_tag_key_prefix`], //! [`DomainHandle::count_by_tag_key_prefix`], and //! [`DomainHandle::cancel_by_tag_key_prefix`]. Prefix queries are useful when //! multiple libraries share a scheduler and namespace their tags diff --git a/src/module.rs b/src/module.rs index f8f9f54..c19a3cf 100644 --- a/src/module.rs +++ b/src/module.rs @@ -711,16 +711,16 @@ impl ModuleHandle { .await } - /// Find module tasks matching all specified tag filters (AND semantics). - pub async fn tasks_by_tags( + /// Return IDs of module tasks matching all specified tag filters (AND semantics). + pub async fn task_ids_by_tags( &self, filters: &[(&str, &str)], status: Option, - ) -> Result, StoreError> { + ) -> Result, StoreError> { self.scheduler .inner .store - .tasks_by_tags_with_prefix(&self.prefix, filters, status) + .task_ids_by_tags_with_prefix(&self.prefix, filters, status) .await } @@ -733,16 +733,16 @@ impl ModuleHandle { .await } - /// Find module tasks with any tag key matching the given prefix. - pub async fn tasks_by_tag_key_prefix( + /// Return IDs of module tasks with any tag key matching the given prefix. + pub async fn task_ids_by_tag_key_prefix( &self, prefix: &str, status: Option, - ) -> Result, StoreError> { + ) -> Result, StoreError> { self.scheduler .inner .store - .tasks_by_tag_key_prefix_with_prefix(&self.prefix, prefix, status) + .task_ids_by_tag_key_prefix_with_prefix(&self.prefix, prefix, status) .await } @@ -761,16 +761,16 @@ impl ModuleHandle { /// Cancel module tasks with any tag key matching the given prefix. pub async fn cancel_by_tag_key_prefix(&self, prefix: &str) -> Result, StoreError> { - let tasks = self + let ids = self .scheduler .inner .store - .tasks_by_tag_key_prefix_with_prefix(&self.prefix, prefix, None) + .task_ids_by_tag_key_prefix_with_prefix(&self.prefix, prefix, None) .await?; let mut cancelled = Vec::new(); - for task in &tasks { - if self.scheduler.cancel(task.id).await? { - cancelled.push(task.id); + for id in ids { + if self.scheduler.cancel(id).await? { + cancelled.push(id); } } Ok(cancelled) diff --git a/src/scheduler/queries.rs b/src/scheduler/queries.rs index 218a8ee..b2ca461 100644 --- a/src/scheduler/queries.rs +++ b/src/scheduler/queries.rs @@ -63,15 +63,15 @@ impl Scheduler { .collect() } - /// Find active tasks matching all specified tag filters (AND semantics). + /// Return the IDs of active tasks matching all specified tag filters (AND semantics). /// - /// Delegates to [`TaskStore::tasks_by_tags`](crate::TaskStore::tasks_by_tags). - pub async fn tasks_by_tags( + /// Delegates to [`TaskStore::task_ids_by_tags`](crate::TaskStore::task_ids_by_tags). + pub async fn task_ids_by_tags( &self, filters: &[(&str, &str)], status: Option, - ) -> Result, StoreError> { - self.inner.store.tasks_by_tags(filters, status).await + ) -> Result, StoreError> { + self.inner.store.task_ids_by_tags(filters, status).await } /// Count active tasks grouped by a tag key's values. @@ -97,15 +97,15 @@ impl Scheduler { self.inner.store.tag_keys_by_prefix(prefix).await } - /// Find active tasks with any tag key matching the given prefix. - pub async fn tasks_by_tag_key_prefix( + /// Return IDs of active tasks with any tag key matching the given prefix. + pub async fn task_ids_by_tag_key_prefix( &self, prefix: &str, status: Option, - ) -> Result, StoreError> { + ) -> Result, StoreError> { self.inner .store - .tasks_by_tag_key_prefix(prefix, status) + .task_ids_by_tag_key_prefix(prefix, status) .await } diff --git a/src/scheduler/submit.rs b/src/scheduler/submit.rs index 08d5608..961d32d 100644 --- a/src/scheduler/submit.rs +++ b/src/scheduler/submit.rs @@ -320,18 +320,19 @@ impl Scheduler { /// Cancel all active tasks matching a tag key-value pair. /// - /// Finds tasks via [`TaskStore::tasks_by_tags`](crate::TaskStore::tasks_by_tags) and cancels each one. + /// Uses [`TaskStore::task_ids_by_tags`](crate::TaskStore::task_ids_by_tags) (ID-only query) + /// to avoid full record deserialization and tag population overhead. /// Returns the ids of tasks that were successfully cancelled. pub async fn cancel_by_tag(&self, key: &str, value: &str) -> Result, StoreError> { - let tasks = self + let ids = self .inner .store - .tasks_by_tags(&[(key, value)], None) + .task_ids_by_tags(&[(key, value)], None) .await?; let mut cancelled = Vec::new(); - for task in &tasks { - if self.cancel(task.id).await? { - cancelled.push(task.id); + for id in ids { + if self.cancel(id).await? { + cancelled.push(id); } } Ok(cancelled) @@ -339,18 +340,18 @@ impl Scheduler { /// Cancel all active tasks that have any tag key matching the given prefix. /// - /// Finds tasks via [`crate::TaskStore::tasks_by_tag_key_prefix`] and cancels each. + /// Uses [`crate::TaskStore::task_ids_by_tag_key_prefix`] for ID-only lookup. /// Returns the ids of tasks that were successfully cancelled. pub async fn cancel_by_tag_key_prefix(&self, prefix: &str) -> Result, StoreError> { - let tasks = self + let ids = self .inner .store - .tasks_by_tag_key_prefix(prefix, None) + .task_ids_by_tag_key_prefix(prefix, None) .await?; let mut cancelled = Vec::new(); - for task in &tasks { - if self.cancel(task.id).await? { - cancelled.push(task.id); + for id in ids { + if self.cancel(id).await? { + cancelled.push(id); } } Ok(cancelled) diff --git a/src/store/query/tags.rs b/src/store/query/tags.rs index 4c54dd6..616bdc3 100644 --- a/src/store/query/tags.rs +++ b/src/store/query/tags.rs @@ -1,8 +1,6 @@ //! Tag-based queries: filtering, aggregation, and prefix discovery by task tags. -use crate::store::row_mapping::row_to_task_record; use crate::store::{StoreError, TaskStore}; -use crate::task::TaskRecord; /// Escape SQL LIKE wildcards in `prefix` and append `%` for a safe prefix pattern. /// @@ -20,38 +18,45 @@ fn escape_like_prefix(prefix: &str) -> String { } impl TaskStore { - /// Find active tasks matching all specified tag filters (AND semantics). + /// Build the INNER JOIN fragment for tag filters. + fn build_tag_join_sql( + select: &str, + filters: &[(&str, &str)], + status: Option<&crate::task::TaskStatus>, + ) -> String { + let mut sql = format!("SELECT {select} FROM tasks t"); + for (i, _) in filters.iter().enumerate() { + sql.push_str(&format!( + " INNER JOIN task_tags tt{i} ON t.id = tt{i}.task_id AND tt{i}.key = ? AND tt{i}.value = ?" + )); + } + if let Some(s) = status { + sql.push_str(&format!(" WHERE t.status = '{}'", s.as_str())); + } + sql + } + + /// Return the IDs of active tasks matching all specified tag filters (AND semantics). /// /// Each `(key, value)` pair adds an INNER JOIN, so only tasks matching /// **all** filters are returned. Optionally filter by status. - pub async fn tasks_by_tags( + pub async fn task_ids_by_tags( &self, filters: &[(&str, &str)], status: Option, - ) -> Result, StoreError> { + ) -> Result, StoreError> { if filters.is_empty() { return Ok(Vec::new()); } - let mut sql = String::from("SELECT t.* FROM tasks t"); - for (i, _) in filters.iter().enumerate() { - sql.push_str(&format!( - " INNER JOIN task_tags tt{i} ON t.id = tt{i}.task_id AND tt{i}.key = ? AND tt{i}.value = ?" - )); - } - if let Some(ref s) = status { - sql.push_str(&format!(" WHERE t.status = '{}'", s.as_str())); - } - sql.push_str(" ORDER BY t.priority ASC, t.id ASC"); + let sql = Self::build_tag_join_sql("t.id", filters, status.as_ref()); - let mut q = sqlx::query(&sql); + let mut q = sqlx::query_as::<_, (i64,)>(&sql); for (key, value) in filters { q = q.bind(key).bind(value); } let rows = q.fetch_all(&self.pool).await?; - let mut records: Vec = rows.iter().map(row_to_task_record).collect(); - self.populate_tags(&mut records).await?; - Ok(records) + Ok(rows.into_iter().map(|(id,)| id).collect()) } /// Count active tasks matching all specified tag filters (AND semantics). @@ -95,17 +100,17 @@ impl TaskStore { Ok(rows) } - /// Find active tasks matching a `task_type` prefix **and** all tag filters. + /// Return IDs of active tasks matching a `task_type` prefix **and** all tag filters. /// - /// If `filters` is empty, returns all tasks matching the prefix. - pub async fn tasks_by_tags_with_prefix( + /// If `filters` is empty, returns all task IDs matching the prefix. + pub async fn task_ids_by_tags_with_prefix( &self, prefix: &str, filters: &[(&str, &str)], status: Option, - ) -> Result, StoreError> { + ) -> Result, StoreError> { let pattern = format!("{prefix}%"); - let mut sql = String::from("SELECT t.* FROM tasks t"); + let mut sql = String::from("SELECT t.id FROM tasks t"); for (i, _) in filters.iter().enumerate() { sql.push_str(&format!( " INNER JOIN task_tags tt{i} ON t.id = tt{i}.task_id AND tt{i}.key = ? AND tt{i}.value = ?" @@ -115,17 +120,14 @@ impl TaskStore { if let Some(ref s) = status { sql.push_str(&format!(" AND t.status = '{}'", s.as_str())); } - sql.push_str(" ORDER BY t.priority ASC, t.id ASC"); - let mut q = sqlx::query(&sql); + let mut q = sqlx::query_as::<_, (i64,)>(&sql); for (key, value) in filters { q = q.bind(key).bind(value); } q = q.bind(&pattern); let rows = q.fetch_all(&self.pool).await?; - let mut records: Vec = rows.iter().map(row_to_task_record).collect(); - self.populate_tags(&mut records).await?; - Ok(records) + Ok(rows.into_iter().map(|(id,)| id).collect()) } /// Count active tasks grouped by a tag key's values, filtered by `task_type` prefix. @@ -259,46 +261,44 @@ impl TaskStore { Ok(rows.into_iter().map(|(k,)| k).collect()) } - /// Find active tasks that have any tag key matching the prefix. + /// Return IDs of active tasks that have any tag key matching the prefix. /// - /// Optionally filter by status. Returns tasks ordered by priority. + /// Optionally filter by status. /// LIKE wildcards in the prefix are escaped — only true prefix matching. - pub async fn tasks_by_tag_key_prefix( + pub async fn task_ids_by_tag_key_prefix( &self, prefix: &str, status: Option, - ) -> Result, StoreError> { + ) -> Result, StoreError> { let pattern = escape_like_prefix(prefix); let mut sql = String::from( - "SELECT t.* FROM tasks t \ + "SELECT t.id FROM tasks t \ WHERE EXISTS (SELECT 1 FROM task_tags tt \ WHERE tt.task_id = t.id AND tt.key LIKE ? ESCAPE '\\')", ); if let Some(ref s) = status { sql.push_str(&format!(" AND t.status = '{}'", s.as_str())); } - sql.push_str(" ORDER BY t.priority ASC, t.id ASC"); - let rows = sqlx::query(&sql) + let rows = sqlx::query_as::<_, (i64,)>(&sql) .bind(&pattern) .fetch_all(&self.pool) .await?; - let mut records: Vec = rows.iter().map(row_to_task_record).collect(); - self.populate_tags(&mut records).await?; - Ok(records) + Ok(rows.into_iter().map(|(id,)| id).collect()) } - /// Find active tasks that have any tag key matching the prefix, scoped to a task_type prefix. - pub async fn tasks_by_tag_key_prefix_with_prefix( + /// Return IDs of active tasks that have any tag key matching the prefix, + /// scoped to a task_type prefix. + pub async fn task_ids_by_tag_key_prefix_with_prefix( &self, task_type_prefix: &str, prefix: &str, status: Option, - ) -> Result, StoreError> { + ) -> Result, StoreError> { let pattern = escape_like_prefix(prefix); let type_pattern = format!("{task_type_prefix}%"); let mut sql = String::from( - "SELECT t.* FROM tasks t \ + "SELECT t.id FROM tasks t \ WHERE EXISTS (SELECT 1 FROM task_tags tt \ WHERE tt.task_id = t.id AND tt.key LIKE ? ESCAPE '\\') \ AND t.task_type LIKE ?", @@ -306,16 +306,13 @@ impl TaskStore { if let Some(ref s) = status { sql.push_str(&format!(" AND t.status = '{}'", s.as_str())); } - sql.push_str(" ORDER BY t.priority ASC, t.id ASC"); - let rows = sqlx::query(&sql) + let rows = sqlx::query_as::<_, (i64,)>(&sql) .bind(&pattern) .bind(&type_pattern) .fetch_all(&self.pool) .await?; - let mut records: Vec = rows.iter().map(row_to_task_record).collect(); - self.populate_tags(&mut records).await?; - Ok(records) + Ok(rows.into_iter().map(|(id,)| id).collect()) } /// Count active tasks that have any tag key matching the prefix. diff --git a/src/store/query/tests.rs b/src/store/query/tests.rs index d6db544..9481330 100644 --- a/src/store/query/tests.rs +++ b/src/store/query/tests.rs @@ -151,7 +151,7 @@ async fn prune_by_count() { // ── Tag query tests ─────────────────────────────────────────────── #[tokio::test] -async fn tasks_by_tags_single_filter() { +async fn task_ids_by_tags_single_filter() { let store = test_store().await; store @@ -171,18 +171,21 @@ async fn tasks_by_tags_single_filter() { .await .unwrap(); - let results = store.tasks_by_tags(&[("env", "prod")], None).await.unwrap(); + let results = store + .task_ids_by_tags(&[("env", "prod")], None) + .await + .unwrap(); assert_eq!(results.len(), 2); let results = store - .tasks_by_tags(&[("env", "staging")], None) + .task_ids_by_tags(&[("env", "staging")], None) .await .unwrap(); assert_eq!(results.len(), 1); } #[tokio::test] -async fn tasks_by_tags_multiple_filters_and() { +async fn task_ids_by_tags_multiple_filters_and() { let store = test_store().await; store @@ -215,14 +218,14 @@ async fn tasks_by_tags_multiple_filters_and() { // AND semantics: only task matching both filters. let results = store - .tasks_by_tags(&[("env", "prod"), ("region", "us")], None) + .task_ids_by_tags(&[("env", "prod"), ("region", "us")], None) .await .unwrap(); assert_eq!(results.len(), 1); // With status filter. let results = store - .tasks_by_tags(&[("env", "prod")], Some(TaskStatus::Pending)) + .task_ids_by_tags(&[("env", "prod")], Some(TaskStatus::Pending)) .await .unwrap(); assert_eq!(results.len(), 2); @@ -326,7 +329,7 @@ async fn tag_keys_by_prefix_discovers_keys() { } #[tokio::test] -async fn tasks_by_tag_key_prefix_finds_tasks() { +async fn task_ids_by_tag_key_prefix_finds_tasks() { let store = test_store().await; store @@ -356,17 +359,20 @@ async fn tasks_by_tag_key_prefix_finds_tasks() { .unwrap(); let tasks = store - .tasks_by_tag_key_prefix("billing.", None) + .task_ids_by_tag_key_prefix("billing.", None) .await .unwrap(); assert_eq!(tasks.len(), 2); // tp-1 and tp-2 - let tasks = store.tasks_by_tag_key_prefix("media.", None).await.unwrap(); + let tasks = store + .task_ids_by_tag_key_prefix("media.", None) + .await + .unwrap(); assert_eq!(tasks.len(), 2); // tp-2 and tp-3 } #[tokio::test] -async fn tasks_by_tag_key_prefix_with_status_filter() { +async fn task_ids_by_tag_key_prefix_with_status_filter() { let store = test_store().await; store @@ -379,13 +385,13 @@ async fn tasks_by_tag_key_prefix_with_status_filter() { .unwrap(); let tasks = store - .tasks_by_tag_key_prefix("billing.", Some(TaskStatus::Pending)) + .task_ids_by_tag_key_prefix("billing.", Some(TaskStatus::Pending)) .await .unwrap(); assert_eq!(tasks.len(), 1); let tasks = store - .tasks_by_tag_key_prefix("billing.", Some(TaskStatus::Running)) + .task_ids_by_tag_key_prefix("billing.", Some(TaskStatus::Running)) .await .unwrap(); assert_eq!(tasks.len(), 0); @@ -514,7 +520,7 @@ async fn tag_keys_by_prefix_with_prefix_scopes_to_task_type() { } #[tokio::test] -async fn tasks_by_tag_key_prefix_with_prefix_scopes_to_task_type() { +async fn task_ids_by_tag_key_prefix_with_prefix_scopes_to_task_type() { let store = test_store().await; store @@ -535,12 +541,18 @@ async fn tasks_by_tag_key_prefix_with_prefix_scopes_to_task_type() { .unwrap(); // Both tasks have billing.* tags, but scoped query returns only the billing module's task. - let tasks = store - .tasks_by_tag_key_prefix_with_prefix("billing::", "billing.", None) + let ids = store + .task_ids_by_tag_key_prefix_with_prefix("billing::", "billing.", None) .await .unwrap(); - assert_eq!(tasks.len(), 1); - assert_eq!(tasks[0].task_type, "billing::charge"); + assert_eq!(ids.len(), 1); + + // Unscoped returns both. + let all_ids = store + .task_ids_by_tag_key_prefix("billing.", None) + .await + .unwrap(); + assert_eq!(all_ids.len(), 2); } #[tokio::test] From 8e1bc3b4ee4300265279d2fdab9522ac4ef8ac1c Mon Sep 17 00:00:00 2001 From: DJ Majumdar Date: Mon, 23 Mar 2026 17:26:00 -0700 Subject: [PATCH 2/8] perf: batch terminal failures into a single transaction via coalescing channel Mirror the existing completion coalescing pattern for task failures. Parentless terminal failures are sent through an unbounded channel and drained in batches (by leader election or the run loop), amortizing SQLite WAL sync overhead. Failures with parents still process inline to preserve fail-fast cascade ordering. Also fix has_paused_tasks to start false and let the builder set it only when the persistent store actually contains paused tasks. --- src/scheduler/builder.rs | 8 ++++ src/scheduler/mod.rs | 26 ++++++++++- src/scheduler/run_loop.rs | 29 ++++++++++-- src/scheduler/spawn.rs | 3 ++ src/scheduler/spawn/context.rs | 6 +++ src/scheduler/spawn/failure.rs | 75 ++++++++++++++++++++++++++++-- src/store/lifecycle/transitions.rs | 73 +++++++++++++++++++++++++++++ 7 files changed, 210 insertions(+), 10 deletions(-) diff --git a/src/scheduler/builder.rs b/src/scheduler/builder.rs index 4a030eb..28dec4c 100644 --- a/src/scheduler/builder.rs +++ b/src/scheduler/builder.rs @@ -491,6 +491,14 @@ impl SchedulerBuilder { .store(true, std::sync::atomic::Ordering::Relaxed); } + // Check for pre-existing paused tasks (persistent store recovery). + if scheduler.inner.store.paused_count().await.unwrap_or(0) > 0 { + scheduler + .inner + .has_paused_tasks + .store(true, std::sync::atomic::Ordering::Relaxed); + } + Ok(scheduler) } } diff --git a/src/scheduler/mod.rs b/src/scheduler/mod.rs index e3b31af..969610f 100644 --- a/src/scheduler/mod.rs +++ b/src/scheduler/mod.rs @@ -62,6 +62,19 @@ pub(crate) struct CompletionMsg { pub task: TaskRecord, pub metrics: IoBudget, } + +// ── Failure coalescing ─────────────────────────────────────────── + +/// Message sent from spawned tasks for terminal failures (dead-letter / permanent). +/// +/// Batched by `drain_failures` to reduce per-failure transaction overhead, +/// mirroring the completion coalescing pattern. +pub(crate) struct FailureMsg { + pub task: TaskRecord, + pub error: String, + pub retryable: bool, + pub metrics: IoBudget, +} pub use event::{ SchedulerConfig, SchedulerEvent, SchedulerSnapshot, ShutdownMode, TaskEventHeader, }; @@ -144,6 +157,10 @@ pub(crate) struct SchedulerInner { /// (leader election pattern) in addition to the run loop. pub(crate) completion_rx: std::sync::Arc>>, + /// Send side of the failure coalescing channel. + pub(crate) failure_tx: tokio::sync::mpsc::UnboundedSender, + /// Receive side (leader election + run loop drain). + pub(crate) failure_rx: std::sync::Arc>>, } /// IO-aware priority scheduler. @@ -240,6 +257,7 @@ impl Scheduler { let (event_tx, _) = tokio::sync::broadcast::channel(256); let (progress_tx, _) = tokio::sync::broadcast::channel(64); let (completion_tx, completion_rx) = tokio::sync::mpsc::unbounded_channel(); + let (failure_tx, failure_rx) = tokio::sync::mpsc::unbounded_channel(); Self { inner: Arc::new(SchedulerInner { store, @@ -271,12 +289,16 @@ impl Scheduler { module_paused, module_caps: RwLock::new(module_caps), module_running, - // Conservative: true on startup so the first cycle checks. - has_paused_tasks: AtomicBool::new(true), + // Start false — set to true only when preemption pauses a task. + // For persistent stores the builder checks for pre-existing + // paused tasks after construction (see SchedulerBuilder::build). + has_paused_tasks: AtomicBool::new(false), // Default to false; builder sets true when safe. fast_dispatch: AtomicBool::new(false), completion_tx, completion_rx: std::sync::Arc::new(Mutex::new(completion_rx)), + failure_tx, + failure_rx: std::sync::Arc::new(Mutex::new(failure_rx)), }), } } diff --git a/src/scheduler/run_loop.rs b/src/scheduler/run_loop.rs index c1c3f7d..821fcde 100644 --- a/src/scheduler/run_loop.rs +++ b/src/scheduler/run_loop.rs @@ -32,6 +32,8 @@ impl Scheduler { module_registry: Arc::clone(&self.inner.module_registry), completion_tx: self.inner.completion_tx.clone(), completion_rx: self.inner.completion_rx.clone(), + failure_tx: self.inner.failure_tx.clone(), + failure_rx: self.inner.failure_rx.clone(), } } @@ -369,16 +371,34 @@ impl Scheduler { .await; } + /// Drain the failure channel and process all queued terminal failures + /// in a single batched transaction. + async fn drain_failures(&self) { + let mut batch = Vec::new(); + { + let mut rx = self.inner.failure_rx.lock().await; + while let Ok(msg) = rx.try_recv() { + batch.push(msg); + } + } + + if batch.is_empty() { + return; + } + + tracing::debug!(count = batch.len(), "draining failure batch"); + spawn::process_failure_batch(&batch, &self.inner.store, &self.inner.event_tx).await; + } + /// Resume paused tasks, dispatch finalizers, and dispatch pending work. async fn poll_and_dispatch(&self) { if self.is_paused() { return; } - // Drain queued completions before dispatching new work. - // This ensures completed tasks are processed first, freeing - // dependency edges and unblocking downstream tasks. + // Drain queued completions and failures before dispatching new work. self.drain_completions().await; + self.drain_failures().await; // Run expiry sweep before dispatching. self.maybe_expire_tasks().await; @@ -459,8 +479,9 @@ impl Scheduler { } } - // Drain any remaining completions before closing the store. + // Drain any remaining completions and failures before closing the store. self.drain_completions().await; + self.drain_failures().await; // Flush WAL and close the database. self.inner.store.close().await; diff --git a/src/scheduler/spawn.rs b/src/scheduler/spawn.rs index 895108e..613bd3b 100644 --- a/src/scheduler/spawn.rs +++ b/src/scheduler/spawn.rs @@ -24,6 +24,7 @@ use super::SchedulerEvent; pub(in crate::scheduler) use completion::process_completion_batch; pub(crate) use context::SpawnContext; +pub(in crate::scheduler) use failure::process_failure_batch; /// Whether to call `execute` or `finalize` on the executor. #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -89,6 +90,8 @@ pub(crate) async fn spawn_task( work_notify: ctx.work_notify, max_retries: ctx.max_retries, registry: ctx.registry, + failure_tx: ctx.failure_tx, + failure_rx: ctx.failure_rx, }; let task_id_for_handle = task.id; diff --git a/src/scheduler/spawn/context.rs b/src/scheduler/spawn/context.rs index 5d20da3..beb5bd3 100644 --- a/src/scheduler/spawn/context.rs +++ b/src/scheduler/spawn/context.rs @@ -39,6 +39,12 @@ pub(crate) struct SpawnContext { pub completion_rx: std::sync::Arc< tokio::sync::Mutex>, >, + /// Failure coalescing channel sender. + pub failure_tx: tokio::sync::mpsc::UnboundedSender, + /// Failure coalescing channel receiver (Arc-wrapped for leader election). + pub failure_rx: std::sync::Arc< + tokio::sync::Mutex>, + >, } /// Output of task context construction — everything needed to insert into the diff --git a/src/scheduler/spawn/failure.rs b/src/scheduler/spawn/failure.rs index 6663709..d8af7e8 100644 --- a/src/scheduler/spawn/failure.rs +++ b/src/scheduler/spawn/failure.rs @@ -6,7 +6,7 @@ use crate::store::TaskStore; use crate::task::{IoBudget, TaskError, TaskRecord}; use super::super::dispatch::ActiveTaskMap; -use super::super::SchedulerEvent; +use super::super::{FailureMsg, SchedulerEvent}; use super::parent::handle_parent_resolution; /// Shared dependencies for the failure handler. @@ -17,6 +17,9 @@ pub(crate) struct FailureDeps { pub work_notify: Arc, pub max_retries: i32, pub registry: Arc, + pub failure_tx: tokio::sync::mpsc::UnboundedSender, + pub failure_rx: + std::sync::Arc>>, } /// Handle a failed task execution. @@ -80,9 +83,9 @@ pub(crate) async fn handle_failure( { tracing::error!(task_id, error = %e, "failed to requeue task for retry"); } - } else { - // Terminal failure (permanent or retries exhausted): needs a - // transaction for the multi-statement INSERT history + DELETE. + } else if task.parent_id.is_some() { + // Terminal failure with parent — must process inline for fail-fast + // cascade and parent resolution ordering. let fail_backoff = crate::store::FailBackoff { strategy: backoff_strategy, executor_retry_after_ms: error.retry_after_ms, @@ -101,6 +104,50 @@ pub(crate) async fn handle_failure( { tracing::error!(task_id, error = %e, "failed to record task failure"); } + } else { + // Terminal failure without parent — batch via channel. + let msg = FailureMsg { + task: task.clone(), + error: error.message.clone(), + retryable: error.retryable, + metrics: *metrics, + }; + + if deps.failure_tx.send(msg).is_err() { + // Channel closed — fall back to inline. + tracing::error!(task_id, "failure channel closed — processing inline"); + let fail_backoff = crate::store::FailBackoff { + strategy: backoff_strategy, + executor_retry_after_ms: error.retry_after_ms, + }; + if let Err(e) = deps + .store + .fail_with_record( + task, + &error.message, + error.retryable, + effective_max_retries, + metrics, + &fail_backoff, + ) + .await + { + tracing::error!(task_id, error = %e, "inline failure recording failed"); + } + } else { + // Leader election: try to grab the rx lock and drain the batch. + if let Ok(mut rx) = deps.failure_rx.try_lock() { + let mut batch = Vec::new(); + while let Ok(m) = rx.try_recv() { + batch.push(m); + } + drop(rx); + + if !batch.is_empty() { + process_failure_batch(&batch, &deps.store, &deps.event_tx).await; + } + } + } } // Remove from active tracking AFTER the store write completes. @@ -130,6 +177,26 @@ pub(crate) async fn handle_failure( } } +/// Process a batch of terminal failures: write to store and emit events. +/// +/// Called from both the spawned task (leader election) and the run loop +/// (`drain_failures`). +pub(in crate::scheduler) async fn process_failure_batch( + batch: &[FailureMsg], + store: &TaskStore, + _event_tx: &tokio::sync::broadcast::Sender, +) { + let items: Vec<(&TaskRecord, &str, bool, &IoBudget)> = batch + .iter() + .map(|m| (&m.task, m.error.as_str(), m.retryable, &m.metrics)) + .collect(); + + if let Err(e) = store.fail_batch(&items).await { + tracing::error!(error = %e, "batch failure recording failed"); + // Events are emitted by handle_failure directly, so nothing else needed. + } +} + /// Propagate a permanent failure to dependents and handle fail-fast parent logic. async fn propagate_failure(task: &TaskRecord, error: &TaskError, deps: &FailureDeps) { let task_id = task.id; diff --git a/src/store/lifecycle/transitions.rs b/src/store/lifecycle/transitions.rs index 25f16f6..3a02cb3 100644 --- a/src/store/lifecycle/transitions.rs +++ b/src/store/lifecycle/transitions.rs @@ -724,6 +724,79 @@ impl TaskStore { Ok(()) } + /// Batch-process terminal failures in a single transaction. + /// + /// Each item is a `(task, error, retryable)` triple that has already been + /// determined to be terminal (retries exhausted or non-retryable). The + /// method inserts history rows and deletes active tasks in one + /// `BEGIN IMMEDIATE` / `COMMIT` cycle, amortizing SQLite's WAL sync. + pub async fn fail_batch( + &self, + items: &[(&crate::task::TaskRecord, &str, bool, &IoBudget)], + ) -> Result<(), StoreError> { + if items.is_empty() { + return Ok(()); + } + if items.len() == 1 { + let (task, error, retryable, metrics) = items[0]; + return self + .fail_with_record(task, error, retryable, 0, metrics, &FailBackoff::default()) + .await; + } + + let skip_tags = !self.has_tags.load(std::sync::atomic::Ordering::Relaxed); + + tracing::debug!(count = items.len(), "store.fail_batch: BEGIN tx"); + let mut conn = self.begin_write().await?; + + // Step 1: Insert history rows (per-task — each has unique binds). + for &(task, error, retryable, metrics) in items { + let status = if retryable { + HistoryStatus::DeadLetter + } else { + HistoryStatus::Failed + }; + let duration_ms = compute_duration_ms(task); + insert_history( + &mut conn, + task, + status, + metrics, + duration_ms, + Some(error), + skip_tags, + ) + .await?; + } + + // Step 2: Batch-delete active task rows. + let ids: Vec = items.iter().map(|(t, _, _, _)| t.id).collect(); + let placeholders = ids.iter().map(|_| "?").collect::>().join(","); + let sql = format!("DELETE FROM tasks WHERE id IN ({placeholders})"); + let mut q = sqlx::query(&sql); + for id in &ids { + q = q.bind(id); + } + q.execute(&mut *conn).await?; + + // Step 3: Batch-delete orphaned tags. + if !skip_tags { + let tag_sql = format!("DELETE FROM task_tags WHERE task_id IN ({placeholders})"); + let mut tq = sqlx::query(&tag_sql); + for id in &ids { + tq = tq.bind(id); + } + tq.execute(&mut *conn).await?; + } + + sqlx::query("COMMIT").execute(&mut *conn).await?; + drop(conn); + tracing::debug!(count = items.len(), "store.fail_batch: COMMIT ok"); + + self.maybe_prune().await; + Ok(()) + } + /// Requeue a task for retry without a transaction. /// /// The single UPDATE is atomically safe and avoids `BEGIN IMMEDIATE` + From e40f23a5cb4b92fbfbe591a428507542c299fba0 Mon Sep 17 00:00:00 2001 From: DJ Majumdar Date: Mon, 23 Mar 2026 17:51:00 -0700 Subject: [PATCH 3/8] perf: lazy tag population and inline zero-delay retries Move tag population out of pop_next/peek_next into explicit caller sites so the JOIN is only paid when tags are actually needed. Add inline retry loop for zero-delay retries: instead of requeueing to pending and re-popping through SQLite, re-execute the task directly in the same spawned future via increment_retry(). --- src/scheduler/run_loop.rs | 16 ++- src/scheduler/spawn.rs | 161 ++++++++++++++++++++--------- src/scheduler/spawn/completion.rs | 2 +- src/scheduler/spawn/failure.rs | 2 +- src/store/hierarchy.rs | 12 ++- src/store/lifecycle/tests.rs | 12 ++- src/store/lifecycle/transitions.rs | 33 +++--- 7 files changed, 171 insertions(+), 67 deletions(-) diff --git a/src/scheduler/run_loop.rs b/src/scheduler/run_loop.rs index 821fcde..8347d7d 100644 --- a/src/scheduler/run_loop.rs +++ b/src/scheduler/run_loop.rs @@ -53,16 +53,24 @@ impl Scheduler { // instead of peek_next() + gate.admit() + claim_task() (2 SQL). // pop_next() skips expired tasks via its WHERE clause. if self.inner.fast_dispatch.load(AtomicOrdering::Relaxed) { - let Some(task) = self.inner.store.pop_next().await? else { + let Some(mut task) = self.inner.store.pop_next().await? else { return Ok(false); }; + self.inner + .store + .populate_tags(std::slice::from_mut(&mut task)) + .await?; return self.spawn_dispatched_task(task).await; } // Slow path: peek → gate check → claim. - let Some(candidate) = self.inner.store.peek_next().await? else { + let Some(mut candidate) = self.inner.store.peek_next().await? else { return Ok(false); }; + self.inner + .store + .populate_tags(std::slice::from_mut(&mut candidate)) + .await?; // Dispatch-time expiry check: if the candidate has expired, expire // it and retry (return Ok(true) to loop again). @@ -221,10 +229,12 @@ impl Scheduler { break; } let available = max - active_count; - let tasks = self.inner.store.pop_next_batch(available).await?; + let mut tasks = self.inner.store.pop_next_batch(available).await?; if tasks.is_empty() { break; } + // Populate tags in one batch query (lazy — not done at pop time). + self.inner.store.populate_tags(&mut tasks).await?; for task in tasks { self.spawn_dispatched_task(task).await?; } diff --git a/src/scheduler/spawn.rs b/src/scheduler/spawn.rs index 613bd3b..201ccf1 100644 --- a/src/scheduler/spawn.rs +++ b/src/scheduler/spawn.rs @@ -39,6 +39,9 @@ pub(crate) enum ExecutionPhase { /// and spawns the executor on a new tokio task. The actual success and /// failure handling is delegated to [`completion::handle_success`] and /// [`failure::handle_failure`]. +/// +/// For zero-delay retries, the executor is re-invoked inline (without +/// requeueing to pending) to avoid the pop_next SQL round-trip. pub(crate) async fn spawn_task( task: TaskRecord, executor: Arc, @@ -84,68 +87,134 @@ pub(crate) async fn spawn_task( completion_rx: ctx.completion_rx.clone(), }; let failure_deps = failure::FailureDeps { - store: ctx.store, + store: ctx.store.clone(), active: ctx.active.clone(), - event_tx: ctx.event_tx, - work_notify: ctx.work_notify, + event_tx: ctx.event_tx.clone(), + work_notify: ctx.work_notify.clone(), max_retries: ctx.max_retries, - registry: ctx.registry, - failure_tx: ctx.failure_tx, - failure_rx: ctx.failure_rx, + registry: ctx.registry.clone(), + failure_tx: ctx.failure_tx.clone(), + failure_rx: ctx.failure_rx.clone(), }; + // Keep SpawnContext alive for inline retry context rebuilds. + let spawn_ctx = ctx; + let task_id_for_handle = task.id; - let active_for_handle = ctx.active; + let active_for_handle = spawn_ctx.active.clone(); let token_for_spawn = prepared.token.clone(); - let module_running = ctx.module_running; - let io = prepared.io; + let module_running = spawn_ctx.module_running.clone(); let handle = tokio::spawn(async move { let task_id = task.id; + let mut task = task; + let mut prepared = prepared; // Helper: decrement the module running counter when this task leaves "running". - let decrement_module = || { - if let Some(name) = task.module_name() { - if let Some(counter) = module_running.get(name) { - counter.fetch_sub(1, AtomicOrdering::Relaxed); + // Capture module name upfront to avoid borrowing `task` in the closure. + let module_name_owned = task.module_name().map(|s| s.to_string()); + let module_running_for_dec = module_running.clone(); + let mut decremented = false; + let mut decrement_module_once = || { + if !decremented { + decremented = true; + if let Some(ref name) = module_name_owned { + if let Some(counter) = module_running_for_dec.get(name.as_str()) { + counter.fetch_sub(1, AtomicOrdering::Relaxed); + } } } }; - let result = match phase { - ExecutionPhase::Execute => executor.execute_erased(&prepared.ctx).await, - ExecutionPhase::Finalize => { - executor.finalize_erased(&prepared.ctx).await.map(|()| None) - } // finalize doesn't produce a memo - }; - - // Read IO bytes from the context tracker. - let metrics = io.snapshot(); - - // Drop the context (and its progress reporter) — executor is done. - drop(prepared.ctx); - - match result { - Ok(memo) => { - completion::handle_success( - &task, - phase, - &metrics, - memo, - &completion_deps, - decrement_module, - ) - .await; - } - Err(te) => { - // If cancelled (preempted), the scheduler already paused it. - if token_for_spawn.is_cancelled() { - decrement_module(); - failure_deps.active.remove(task_id); - return; + loop { + let result = match phase { + ExecutionPhase::Execute => executor.execute_erased(&prepared.ctx).await, + ExecutionPhase::Finalize => { + executor.finalize_erased(&prepared.ctx).await.map(|()| None) + } + }; + + // Read IO bytes from the context tracker. + let metrics = prepared.io.snapshot(); + + // Drop the context (and its progress reporter) — executor is done. + drop(prepared.ctx); + + match result { + Ok(memo) => { + completion::handle_success( + &task, + phase, + &metrics, + memo, + &completion_deps, + &mut decrement_module_once, + ) + .await; + break; + } + Err(te) => { + // If cancelled (preempted), the scheduler already paused it. + if token_for_spawn.is_cancelled() { + decrement_module_once(); + failure_deps.active.remove(task_id); + break; + } + + // Check for inline immediate retry: retryable, under max, + // zero delay, execute phase only. + if phase == ExecutionPhase::Execute && te.retryable { + let policy = failure_deps.registry.type_retry_policy(&task.task_type); + let effective_max = task.max_retries.unwrap_or( + policy + .map(|p| p.max_retries) + .unwrap_or(failure_deps.max_retries), + ); + let will_retry = task.retry_count < effective_max; + + if will_retry { + let delay = if let Some(ms) = te.retry_after_ms { + std::time::Duration::from_millis(ms) + } else if let Some(strategy) = policy.map(|p| &p.strategy) { + strategy.delay_for(task.retry_count) + } else { + std::time::Duration::ZERO + }; + + if delay.is_zero() { + // Inline retry: persist retry_count, re-execute. + let _ = failure_deps + .store + .increment_retry(task.id, &te.message) + .await; + task.retry_count += 1; + + // Emit retry event. + let _ = failure_deps.event_tx.send(SchedulerEvent::Failed { + header: task.event_header(), + error: te.message, + will_retry: true, + retry_after: None, + }); + + // Rebuild context for next attempt. + prepared = context::build_task_context(&task, &spawn_ctx); + continue; + } + } + } + + // Not inline-retryable — use normal failure path. + failure::handle_failure( + &task, + te, + &metrics, + &failure_deps, + &mut decrement_module_once, + ) + .await; + break; } - - failure::handle_failure(&task, te, &metrics, &failure_deps, decrement_module).await; } } }); diff --git a/src/scheduler/spawn/completion.rs b/src/scheduler/spawn/completion.rs index a3d1f8c..5925b39 100644 --- a/src/scheduler/spawn/completion.rs +++ b/src/scheduler/spawn/completion.rs @@ -32,7 +32,7 @@ pub(crate) async fn handle_success( metrics: &IoBudget, memo: Option>, deps: &CompletionDeps, - decrement_module: impl FnOnce(), + mut decrement_module: impl FnMut(), ) { let task_id = task.id; diff --git a/src/scheduler/spawn/failure.rs b/src/scheduler/spawn/failure.rs index d8af7e8..14f4424 100644 --- a/src/scheduler/spawn/failure.rs +++ b/src/scheduler/spawn/failure.rs @@ -31,7 +31,7 @@ pub(crate) async fn handle_failure( error: TaskError, metrics: &IoBudget, deps: &FailureDeps, - decrement_module: impl FnOnce(), + mut decrement_module: impl FnMut(), ) { let task_id = task.id; diff --git a/src/store/hierarchy.rs b/src/store/hierarchy.rs index 1505113..7c1fe75 100644 --- a/src/store/hierarchy.rs +++ b/src/store/hierarchy.rs @@ -428,7 +428,11 @@ mod tests { .tag("env", "prod") .tag("region", "us-east"); let parent_id = store.submit(&parent_sub).await.unwrap().id().unwrap(); - let parent = store.pop_next().await.unwrap().unwrap(); + let mut parent = store.pop_next().await.unwrap().unwrap(); + store + .populate_tags(std::slice::from_mut(&mut parent)) + .await + .unwrap(); let ctx = ParentContext { created_at: parent.created_at, @@ -462,7 +466,11 @@ mod tests { .tag("env", "prod") .tag("region", "us-east"); let parent_id = store.submit(&parent_sub).await.unwrap().id().unwrap(); - let parent = store.pop_next().await.unwrap().unwrap(); + let mut parent = store.pop_next().await.unwrap().unwrap(); + store + .populate_tags(std::slice::from_mut(&mut parent)) + .await + .unwrap(); let ctx = ParentContext { created_at: parent.created_at, diff --git a/src/store/lifecycle/tests.rs b/src/store/lifecycle/tests.rs index 0394fd3..348e296 100644 --- a/src/store/lifecycle/tests.rs +++ b/src/store/lifecycle/tests.rs @@ -369,7 +369,11 @@ async fn tags_preserved_on_recurring_requeue() { .recurring(Duration::from_secs(3600)); store.submit(&sub).await.unwrap(); - let task = store.pop_next().await.unwrap().unwrap(); + let mut task = store.pop_next().await.unwrap().unwrap(); + store + .populate_tags(std::slice::from_mut(&mut task)) + .await + .unwrap(); assert_eq!(task.tags.get("schedule").unwrap(), "hourly"); store @@ -391,7 +395,11 @@ async fn tags_in_pop_next() { .tag("color", "blue"); store.submit(&sub).await.unwrap(); - let task = store.pop_next().await.unwrap().unwrap(); + let mut task = store.pop_next().await.unwrap().unwrap(); + store + .populate_tags(std::slice::from_mut(&mut task)) + .await + .unwrap(); assert_eq!(task.tags.get("color").unwrap(), "blue"); } diff --git a/src/store/lifecycle/transitions.rs b/src/store/lifecycle/transitions.rs index 3a02cb3..1964cde 100644 --- a/src/store/lifecycle/transitions.rs +++ b/src/store/lifecycle/transitions.rs @@ -42,11 +42,7 @@ impl TaskStore { .fetch_optional(&self.pool) .await?; - let mut record = row.as_ref().map(row_to_task_record); - if let Some(ref mut r) = record { - self.populate_tags(std::slice::from_mut(r)).await?; - } - Ok(record) + Ok(row.as_ref().map(row_to_task_record)) } /// Atomically claim a specific pending task by id, setting it to running. @@ -150,8 +146,7 @@ impl TaskStore { .fetch_all(&self.pool) .await?; - let mut records: Vec = rows.iter().map(row_to_task_record).collect(); - self.populate_tags(&mut records).await?; + let records: Vec = rows.iter().map(row_to_task_record).collect(); Ok(records) } @@ -161,6 +156,10 @@ impl TaskStore { /// /// For tasks with `ttl_from = 'first_attempt'`, sets `expires_at` on /// the first pop. + /// + /// Tags are **not** populated — callers needing tags should call + /// [`populate_tags`](Self::populate_tags) explicitly or use + /// [`task_by_id`](Self::task_by_id). pub async fn pop_next(&self) -> Result, StoreError> { let now_ms = chrono::Utc::now().timestamp_millis(); let row = sqlx::query( @@ -189,11 +188,7 @@ impl TaskStore { .fetch_optional(&self.pool) .await?; - let mut record = row.map(|r| row_to_task_record(&r)); - if let Some(ref mut r) = record { - self.populate_tags(std::slice::from_mut(r)).await?; - } - Ok(record) + Ok(row.map(|r| row_to_task_record(&r))) } /// Atomically requeue a running task back to pending. @@ -797,6 +792,20 @@ impl TaskStore { Ok(()) } + /// Increment retry count in-place without changing task status. + /// + /// Used by inline immediate retries: the task stays `running` and is + /// re-executed directly in the same spawned future, avoiding the + /// requeue-to-pending → pop_next round-trip through SQLite. + pub async fn increment_retry(&self, task_id: i64, error: &str) -> Result<(), StoreError> { + sqlx::query("UPDATE tasks SET retry_count = retry_count + 1, last_error = ? WHERE id = ?") + .bind(error) + .bind(task_id) + .execute(&self.pool) + .await?; + Ok(()) + } + /// Requeue a task for retry without a transaction. /// /// The single UPDATE is atomically safe and avoids `BEGIN IMMEDIATE` + From 9d853fb0a112b9a31c76db15c161ee7eaeb8bbc9 Mon Sep 17 00:00:00 2001 From: DJ Majumdar Date: Mon, 23 Mar 2026 18:05:00 -0700 Subject: [PATCH 4/8] perf: lazy tag population and add covering index for history queries Make tag population opt-in for list queries (history, history_by_type, history_by_key, dead_letter_tasks, failed_tasks) to avoid N+1 tag lookups when callers don't need tags. Add idx_history_type covering index on (task_type, completed_at DESC) to speed up history_stats, history_by_type, and avg_throughput queries. Refactor history benchmarks to populate via store directly instead of spinning up a full scheduler. --- benches/history.rs | 97 +++++++++++---------------------- migrations/002_task_history.sql | 6 ++ src/store/lifecycle/tests.rs | 12 ++-- src/store/query/history.rs | 43 ++++++++------- 4 files changed, 67 insertions(+), 91 deletions(-) diff --git a/benches/history.rs b/benches/history.rs index d00044b..13053a1 100644 --- a/benches/history.rs +++ b/benches/history.rs @@ -5,70 +5,33 @@ use std::time::Duration; use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput}; -use serde::{Deserialize, Serialize}; -use taskmill::{ - Domain, DomainKey, DomainTaskContext, Scheduler, SchedulerEvent, TaskError, TaskStore, - TaskSubmission, TypedExecutor, TypedTask, -}; +use taskmill::{IoBudget, TaskStore, TaskSubmission}; use tokio::runtime::Runtime; -use tokio_util::sync::CancellationToken; - -struct BenchDomain; -impl DomainKey for BenchDomain { - const NAME: &'static str = "bench"; -} - -#[derive(Serialize, Deserialize)] -struct BenchTask; -impl TypedTask for BenchTask { - type Domain = BenchDomain; - const TASK_TYPE: &'static str = "test"; -} - -struct NoopExecutor; - -impl TypedExecutor for NoopExecutor { - async fn execute<'a>( - &'a self, - _payload: BenchTask, - _ctx: DomainTaskContext<'a, BenchDomain>, - ) -> Result<(), TaskError> { - Ok(()) - } -} - -/// Build a scheduler and run `n` noop tasks to completion, populating history. -async fn build_scheduler_with_history(n: usize) -> Scheduler { - let sched = Scheduler::builder() - .store(TaskStore::open_memory().await.unwrap()) - .domain(Domain::::new().task::(NoopExecutor)) - .max_concurrency(32) - .poll_interval(Duration::from_millis(10)) - .build() - .await - .unwrap(); +/// Task types used to create a realistic multi-type history table. +const TASK_TYPES: &[&str] = &[ + "media::thumbnail", + "media::transcode", + "billing::charge", + "billing::refund", + "email::send", +]; + +/// Populate history directly via store (no scheduler overhead). +/// Distributes tasks across multiple types for realistic selectivity. +async fn store_with_history(n: usize) -> TaskStore { + let store = TaskStore::open_memory().await.unwrap(); + let budget = IoBudget::default(); for i in 0..n { - sched - .submit(&TaskSubmission::new("bench::test").key(format!("h-{i}"))) + let task_type = TASK_TYPES[i % TASK_TYPES.len()]; + store + .submit(&TaskSubmission::new(task_type).key(format!("h-{i}"))) .await .unwrap(); + let task = store.pop_next().await.unwrap().unwrap(); + store.complete(task.id, &budget).await.unwrap(); } - - let mut rx = sched.subscribe(); - let token = CancellationToken::new(); - let sched_clone = sched.clone(); - let token_clone = token.clone(); - tokio::spawn(async move { sched_clone.run(token_clone).await }); - - let mut completed = 0; - while completed < n { - if let Ok(SchedulerEvent::Completed { .. }) = rx.recv().await { - completed += 1; - } - } - - sched + store } // ── Benchmarks ────────────────────────────────────────────────────── @@ -82,8 +45,7 @@ fn bench_history_query(c: &mut Criterion) { group.measurement_time(Duration::from_secs(30)); for history_size in [100usize, 1000, 5000] { - let sched = rt.block_on(build_scheduler_with_history(history_size)); - let store = sched.store().clone(); + let store = rt.block_on(store_with_history(history_size)); group.bench_with_input( BenchmarkId::from_parameter(history_size), &history_size, @@ -107,6 +69,7 @@ fn bench_history_query(c: &mut Criterion) { } /// Aggregate stats query (`COUNT`, `AVG` duration and IO) at varying history sizes. +/// History contains 5 task types; the query filters to one (20% selectivity). fn bench_history_stats(c: &mut Criterion) { let rt = Runtime::new().unwrap(); let mut group = c.benchmark_group("history_stats"); @@ -115,8 +78,7 @@ fn bench_history_stats(c: &mut Criterion) { group.measurement_time(Duration::from_secs(30)); for history_size in [100usize, 1000, 5000] { - let sched = rt.block_on(build_scheduler_with_history(history_size)); - let store = sched.store().clone(); + let store = rt.block_on(store_with_history(history_size)); group.bench_with_input( BenchmarkId::from_parameter(history_size), &history_size, @@ -127,7 +89,7 @@ fn bench_history_stats(c: &mut Criterion) { async move { let start = std::time::Instant::now(); for _ in 0..iters { - let _ = store.history_stats("bench::test").await.unwrap(); + let _ = store.history_stats("media::thumbnail").await.unwrap(); } start.elapsed() } @@ -140,6 +102,7 @@ fn bench_history_stats(c: &mut Criterion) { } /// Filter-by-type history query at varying history sizes. +/// History contains 5 task types; the query filters to one (20% selectivity). fn bench_history_by_type(c: &mut Criterion) { let rt = Runtime::new().unwrap(); let mut group = c.benchmark_group("history_by_type"); @@ -148,8 +111,7 @@ fn bench_history_by_type(c: &mut Criterion) { group.measurement_time(Duration::from_secs(30)); for history_size in [100usize, 1000, 5000] { - let sched = rt.block_on(build_scheduler_with_history(history_size)); - let store = sched.store().clone(); + let store = rt.block_on(store_with_history(history_size)); group.bench_with_input( BenchmarkId::from_parameter(history_size), &history_size, @@ -160,7 +122,10 @@ fn bench_history_by_type(c: &mut Criterion) { async move { let start = std::time::Instant::now(); for _ in 0..iters { - let _ = store.history_by_type("bench::test", 100).await.unwrap(); + let _ = store + .history_by_type("media::thumbnail", 100) + .await + .unwrap(); } start.elapsed() } diff --git a/migrations/002_task_history.sql b/migrations/002_task_history.sql index 27eada3..63cd2a9 100644 --- a/migrations/002_task_history.sql +++ b/migrations/002_task_history.sql @@ -32,6 +32,12 @@ CREATE TABLE IF NOT EXISTS task_history ( expires_at INTEGER ); +-- Aggregate stats and filtered queries by task type. +-- Covers history_stats (aggregation), history_by_type (ordered pagination), +-- and avg_throughput (filtered subquery). +CREATE INDEX IF NOT EXISTS idx_history_type + ON task_history (task_type, completed_at DESC); + -- IO learning: recent completions by task type. CREATE INDEX IF NOT EXISTS idx_history_type_completed ON task_history (task_type, completed_at DESC) diff --git a/src/store/lifecycle/tests.rs b/src/store/lifecycle/tests.rs index 348e296..52458e4 100644 --- a/src/store/lifecycle/tests.rs +++ b/src/store/lifecycle/tests.rs @@ -286,8 +286,9 @@ async fn tags_copied_to_history_on_complete() { let task = store.pop_next().await.unwrap().unwrap(); store.complete(task.id, &IoBudget::default()).await.unwrap(); - let hist = store.history_by_key(&sub.effective_key()).await.unwrap(); + let mut hist = store.history_by_key(&sub.effective_key()).await.unwrap(); assert_eq!(hist.len(), 1); + store.populate_history_tags(&mut hist).await.unwrap(); assert_eq!(hist[0].tags.get("env").unwrap(), "staging"); assert_eq!(hist[0].tags.get("owner").unwrap(), "alice"); } @@ -313,8 +314,9 @@ async fn tags_copied_to_history_on_fail() { .await .unwrap(); - let hist = store.failed_tasks(10).await.unwrap(); + let mut hist = store.failed_tasks(10).await.unwrap(); assert_eq!(hist.len(), 1); + store.populate_history_tags(&mut hist).await.unwrap(); assert_eq!(hist[0].tags.get("region").unwrap(), "us-west"); } @@ -328,9 +330,10 @@ async fn tags_copied_to_history_on_cancel() { let id = store.submit(&sub).await.unwrap().id().unwrap(); store.cancel_to_history(id).await.unwrap(); - let hist = store.history_by_key(&sub.effective_key()).await.unwrap(); + let mut hist = store.history_by_key(&sub.effective_key()).await.unwrap(); assert_eq!(hist.len(), 1); assert_eq!(hist[0].status, HistoryStatus::Cancelled); + store.populate_history_tags(&mut hist).await.unwrap(); assert_eq!(hist[0].tags.get("priority_class").unwrap(), "low"); } @@ -352,9 +355,10 @@ async fn tags_copied_to_history_on_expire() { let expired = store.expire_tasks().await.unwrap(); assert!(!expired.is_empty()); - let hist = store.history_by_key(&sub.effective_key()).await.unwrap(); + let mut hist = store.history_by_key(&sub.effective_key()).await.unwrap(); assert_eq!(hist.len(), 1); assert_eq!(hist[0].status, HistoryStatus::Expired); + store.populate_history_tags(&mut hist).await.unwrap(); assert_eq!(hist[0].tags.get("source").unwrap(), "cron"); } diff --git a/src/store/query/history.rs b/src/store/query/history.rs index 5beba59..1733f13 100644 --- a/src/store/query/history.rs +++ b/src/store/query/history.rs @@ -8,6 +8,8 @@ use crate::task::{TaskHistoryRecord, TypeStats}; impl TaskStore { /// Look up a history record by its row id. + /// + /// Tags are populated eagerly (single-record lookup is cheap). pub async fn history_by_id(&self, id: i64) -> Result, StoreError> { let row = sqlx::query("SELECT * FROM task_history WHERE id = ?") .bind(id) @@ -21,6 +23,9 @@ impl TaskStore { } /// Recent history entries, newest first. + /// + /// Tags are **not** populated — call [`populate_history_tags`](Self::populate_history_tags) + /// if needed. pub async fn history( &self, limit: i32, @@ -32,12 +37,12 @@ impl TaskStore { .bind(offset) .fetch_all(&self.pool) .await?; - let mut records: Vec = rows.iter().map(row_to_history_record).collect(); - self.populate_history_tags(&mut records).await?; - Ok(records) + Ok(rows.iter().map(row_to_history_record).collect()) } /// History filtered by task type. + /// + /// Tags are **not** populated. pub async fn history_by_type( &self, task_type: &str, @@ -50,16 +55,12 @@ impl TaskStore { .bind(limit) .fetch_all(&self.pool) .await?; - let mut records: Vec = rows.iter().map(row_to_history_record).collect(); - self.populate_history_tags(&mut records).await?; - Ok(records) + Ok(rows.iter().map(row_to_history_record).collect()) } /// Most recent history record for a specific key. /// - /// Used by [`TypedEventStream`](crate::domain::TypedEventStream) to - /// attach the `TaskHistoryRecord` to terminal events without modifying - /// the internal broadcast channel. + /// Tags are populated eagerly (single-record lookup). pub async fn latest_history_by_key( &self, key: &str, @@ -78,18 +79,20 @@ impl TaskStore { } /// History for a specific key (all past runs of that key). + /// + /// Tags are **not** populated. pub async fn history_by_key(&self, key: &str) -> Result, StoreError> { let rows = sqlx::query("SELECT * FROM task_history WHERE key = ? ORDER BY completed_at DESC") .bind(key) .fetch_all(&self.pool) .await?; - let mut records: Vec = rows.iter().map(row_to_history_record).collect(); - self.populate_history_tags(&mut records).await?; - Ok(records) + Ok(rows.iter().map(row_to_history_record).collect()) } /// Dead-lettered tasks from history filtered by `task_type` prefix. + /// + /// Tags are **not** populated. pub async fn dead_letter_tasks_by_prefix( &self, prefix: &str, @@ -105,12 +108,12 @@ impl TaskStore { .bind(offset) .fetch_all(&self.pool) .await?; - let mut records: Vec = rows.iter().map(row_to_history_record).collect(); - self.populate_history_tags(&mut records).await?; - Ok(records) + Ok(rows.iter().map(row_to_history_record).collect()) } /// Dead-lettered tasks from history (retries exhausted). + /// + /// Tags are **not** populated. pub async fn dead_letter_tasks( &self, limit: i64, @@ -123,12 +126,12 @@ impl TaskStore { .bind(offset) .fetch_all(&self.pool) .await?; - let mut records: Vec = rows.iter().map(row_to_history_record).collect(); - self.populate_history_tags(&mut records).await?; - Ok(records) + Ok(rows.iter().map(row_to_history_record).collect()) } /// Failed tasks from history. + /// + /// Tags are **not** populated. pub async fn failed_tasks(&self, limit: i32) -> Result, StoreError> { let rows = sqlx::query( "SELECT * FROM task_history WHERE status = 'failed' ORDER BY completed_at DESC LIMIT ?", @@ -136,9 +139,7 @@ impl TaskStore { .bind(limit) .fetch_all(&self.pool) .await?; - let mut records: Vec = rows.iter().map(row_to_history_record).collect(); - self.populate_history_tags(&mut records).await?; - Ok(records) + Ok(rows.iter().map(row_to_history_record).collect()) } /// Aggregate stats for a task type from completed history. From d4854018a75ff87d2643fbbeb349755e92c6efee Mon Sep 17 00:00:00 2001 From: DJ Majumdar Date: Mon, 23 Mar 2026 18:30:00 -0700 Subject: [PATCH 5/8] perf: auto-coalesce sequential submits and skip requeue on fresh stores Two submit-path optimizations from plan 043: 1. Submit coalescing (Option 1): TaskStore::submit() now uses a leader- election pattern (mirroring completion/failure coalescing). Concurrent callers are batched into a single BEGIN/COMMIT transaction. An uncontended fast path avoids channel overhead entirely so sequential callers see zero regression. 2. Skip requeue UPDATE (Option 2): Added `has_running` atomic flag to TaskStore. When no task has ever been dispatched (common during bulk submit-then-run), the requeue UPDATE in skip_existing() is elided, saving one SQL round-trip per dedup hit. Benchmark impact vs baseline: - submit_dedup_hit/1000: -8% (skip-requeue) - batch_submit/1000: -14% (skip-requeue) - submit_tasks/1000: neutral (fast path) --- src/scheduler/run_loop.rs | 28 ++++++- src/store/lifecycle/transitions.rs | 18 ++++- src/store/mod.rs | 44 +++++++++-- src/store/submit/dedup.rs | 10 +++ src/store/submit/mod.rs | 116 ++++++++++++++++++++++++++++- 5 files changed, 202 insertions(+), 14 deletions(-) diff --git a/src/scheduler/run_loop.rs b/src/scheduler/run_loop.rs index 8347d7d..4c10c37 100644 --- a/src/scheduler/run_loop.rs +++ b/src/scheduler/run_loop.rs @@ -381,6 +381,28 @@ impl Scheduler { .await; } + /// Drain the submit coalescing channel and process any stranded messages. + /// + /// Most submits are handled inline by the submitter (leader election in + /// `TaskStore::submit`). This is a safety-net drain for messages that + /// arrived after the last leader finished processing. + async fn drain_submits(&self) { + let mut batch = Vec::new(); + { + let mut rx = self.inner.store.submit_rx.lock().await; + while let Ok(msg) = rx.try_recv() { + batch.push(msg); + } + } + + if batch.is_empty() { + return; + } + + tracing::debug!(count = batch.len(), "draining submit batch"); + self.inner.store.process_submit_batch(batch).await; + } + /// Drain the failure channel and process all queued terminal failures /// in a single batched transaction. async fn drain_failures(&self) { @@ -406,7 +428,8 @@ impl Scheduler { return; } - // Drain queued completions and failures before dispatching new work. + // Drain queued submits, completions, and failures before dispatching. + self.drain_submits().await; self.drain_completions().await; self.drain_failures().await; @@ -489,7 +512,8 @@ impl Scheduler { } } - // Drain any remaining completions and failures before closing the store. + // Drain any remaining submits, completions, and failures before closing. + self.drain_submits().await; self.drain_completions().await; self.drain_failures().await; diff --git a/src/store/lifecycle/transitions.rs b/src/store/lifecycle/transitions.rs index 1964cde..2940a98 100644 --- a/src/store/lifecycle/transitions.rs +++ b/src/store/lifecycle/transitions.rs @@ -103,7 +103,12 @@ impl TaskStore { .bind(id) .execute(&self.pool) .await?; - Ok(result.rows_affected() > 0) + let claimed = result.rows_affected() > 0; + if claimed { + self.has_running + .store(true, std::sync::atomic::Ordering::Relaxed); + } + Ok(claimed) } /// Atomically claim up to `limit` highest-priority pending tasks and mark @@ -147,6 +152,10 @@ impl TaskStore { .await?; let records: Vec = rows.iter().map(row_to_task_record).collect(); + if !records.is_empty() { + self.has_running + .store(true, std::sync::atomic::Ordering::Relaxed); + } Ok(records) } @@ -188,7 +197,12 @@ impl TaskStore { .fetch_optional(&self.pool) .await?; - Ok(row.map(|r| row_to_task_record(&r))) + let record = row.map(|r| row_to_task_record(&r)); + if record.is_some() { + self.has_running + .store(true, std::sync::atomic::Ordering::Relaxed); + } + Ok(record) } /// Atomically requeue a running task back to pending. diff --git a/src/store/mod.rs b/src/store/mod.rs index 59a2590..5ce55d9 100644 --- a/src/store/mod.rs +++ b/src/store/mod.rs @@ -32,12 +32,24 @@ mod submit; pub use lifecycle::FailBackoff; use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; +use std::sync::Arc; use serde::{Deserialize, Serialize}; use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions, SqliteSynchronous}; use sqlx::SqlitePool; +use tokio::sync::Mutex; -use crate::task::MAX_PAYLOAD_BYTES; +use crate::task::{SubmitOutcome, TaskSubmission, MAX_PAYLOAD_BYTES}; + +/// Message sent through the submit coalescing channel. +/// +/// Each caller packs its [`TaskSubmission`] and a oneshot response channel. +/// The leader drains the channel and processes the batch in a single +/// SQLite transaction, then sends individual results back. +pub(crate) struct SubmitMsg { + pub submission: TaskSubmission, + pub response_tx: tokio::sync::oneshot::Sender>, +} /// Serde-friendly error type for Tauri IPC and API boundaries. /// @@ -171,6 +183,14 @@ pub struct TaskStore { /// Fast-path flag: `false` means no tasks with `parent_id` have been /// submitted, so `active_children_count` checks can be skipped. pub(crate) has_hierarchy: std::sync::Arc, + /// Fast-path flag: `false` means no task has ever transitioned to + /// `running`, so the requeue UPDATE in `skip_existing` can be skipped + /// (there are no running tasks to mark for requeue). + pub(crate) has_running: Arc, + /// Send side of the submit coalescing channel. + pub(crate) submit_tx: tokio::sync::mpsc::UnboundedSender, + /// Receive side, `Arc`-wrapped for leader election (try_lock pattern). + pub(crate) submit_rx: Arc>>, } impl TaskStore { @@ -193,14 +213,20 @@ impl TaskStore { .connect_with(opts) .await?; + let (submit_tx, submit_rx) = tokio::sync::mpsc::unbounded_channel(); let store = Self { pool, retention_policy: config.retention_policy, prune_interval: config.prune_interval, - completion_count: std::sync::Arc::new(AtomicU64::new(0)), + completion_count: Arc::new(AtomicU64::new(0)), // Conservative for file-backed stores that may have existing tags/hierarchy. - has_tags: std::sync::Arc::new(AtomicBool::new(true)), - has_hierarchy: std::sync::Arc::new(AtomicBool::new(true)), + has_tags: Arc::new(AtomicBool::new(true)), + has_hierarchy: Arc::new(AtomicBool::new(true)), + // Conservative: file-backed stores may have running tasks from a + // previous session (before recover_running resets them). + has_running: Arc::new(AtomicBool::new(true)), + submit_tx, + submit_rx: Arc::new(Mutex::new(submit_rx)), }; store.migrate().await?; store.recover_running().await?; @@ -220,14 +246,18 @@ impl TaskStore { .connect_with(opts) .await?; + let (submit_tx, submit_rx) = tokio::sync::mpsc::unbounded_channel(); let store = Self { pool, retention_policy: Some(RetentionPolicy::MaxCount(10_000)), prune_interval: 100, - completion_count: std::sync::Arc::new(AtomicU64::new(0)), + completion_count: Arc::new(AtomicU64::new(0)), // In-memory stores start empty — no tags or hierarchy to query. - has_tags: std::sync::Arc::new(AtomicBool::new(false)), - has_hierarchy: std::sync::Arc::new(AtomicBool::new(false)), + has_tags: Arc::new(AtomicBool::new(false)), + has_hierarchy: Arc::new(AtomicBool::new(false)), + has_running: Arc::new(AtomicBool::new(false)), + submit_tx, + submit_rx: Arc::new(Mutex::new(submit_rx)), }; store.migrate().await?; Ok(store) diff --git a/src/store/submit/dedup.rs b/src/store/submit/dedup.rs index 66baa06..a7eb40a 100644 --- a/src/store/submit/dedup.rs +++ b/src/store/submit/dedup.rs @@ -7,10 +7,14 @@ use crate::store::StoreError; use crate::task::{SubmitOutcome, TaskSubmission, TtlFrom}; /// Default dedup behaviour: try priority upgrade, then requeue, then no-op. +/// +/// When `skip_requeue` is `true` the running/paused requeue UPDATE is elided +/// because no task has ever transitioned to `running` in this store instance. pub(super) async fn skip_existing( conn: &mut sqlx::pool::PoolConnection, key: &str, priority: i32, + skip_requeue: bool, ) -> Result { // Try to upgrade priority on pending/paused tasks. let row = sqlx::query( @@ -28,6 +32,12 @@ pub(super) async fn skip_existing( return Ok(SubmitOutcome::Upgraded(r.get("id"))); } + // Fast path: no tasks have ever been dispatched, so there can be no + // running/paused tasks to mark for requeue. + if skip_requeue { + return Ok(SubmitOutcome::Duplicate); + } + // Dedup hit on running/paused task — mark for re-queue. let row = sqlx::query( "UPDATE tasks SET requeue = 1, requeue_priority = ? diff --git a/src/store/submit/mod.rs b/src/store/submit/mod.rs index d0ac2f2..15089c6 100644 --- a/src/store/submit/mod.rs +++ b/src/store/submit/mod.rs @@ -56,6 +56,7 @@ pub(crate) async fn submit_one( sub: &TaskSubmission, has_tags_flag: Option<&std::sync::atomic::AtomicBool>, has_hierarchy_flag: Option<&std::sync::atomic::AtomicBool>, + skip_requeue: bool, ) -> Result { if let Some(ref err) = sub.payload_error { return Err(StoreError::Serialization(err.clone())); @@ -173,7 +174,7 @@ pub(crate) async fn submit_one( match sub.on_duplicate { DuplicateStrategy::Reject => Ok(SubmitOutcome::Rejected), DuplicateStrategy::Supersede => dedup::supersede_existing(conn, sub, &key).await, - DuplicateStrategy::Skip => dedup::skip_existing(conn, &key, priority).await, + DuplicateStrategy::Skip => dedup::skip_existing(conn, &key, priority, skip_requeue).await, } } @@ -187,7 +188,15 @@ impl TaskStore { /// /// When `sub.key` is `None`, the dedup key is auto-generated by hashing /// the task type and payload. + /// + /// **Coalescing:** Concurrent callers are automatically batched into a + /// single SQLite transaction (leader election pattern), giving batch-level + /// throughput without API changes. pub async fn submit(&self, sub: &TaskSubmission) -> Result { + // Pre-validate before touching the channel or the database. + if let Some(ref err) = sub.payload_error { + return Err(StoreError::Serialization(err.clone())); + } if let Some(ref p) = sub.payload { if p.len() > MAX_PAYLOAD_BYTES { return Err(StoreError::PayloadTooLarge); @@ -195,20 +204,118 @@ impl TaskStore { } validate_tags(&sub.tags)?; + // Try to become the leader. If we win, drain any stranded channel + // messages and process everything in a single transaction. + if let Ok(mut rx_guard) = self.submit_rx.try_lock() { + let mut stranded: Vec = Vec::new(); + while let Ok(m) = rx_guard.try_recv() { + stranded.push(m); + } + drop(rx_guard); + + if stranded.is_empty() { + // Uncontended fast path — submit directly without channel + // overhead (no clone, no oneshot). + return self.submit_direct(sub).await; + } + + // Stranded messages exist — batch them together with ours. + let (tx, rx) = tokio::sync::oneshot::channel(); + stranded.push(super::SubmitMsg { + submission: sub.clone(), + response_tx: tx, + }); + self.process_submit_batch(stranded).await; + return rx + .await + .map_err(|_| StoreError::Database("submit response dropped".into()))?; + } + + // Another leader is draining — coalesce via channel. + let (tx, rx) = tokio::sync::oneshot::channel(); + let msg = super::SubmitMsg { + submission: sub.clone(), + response_tx: tx, + }; + self.submit_tx + .send(msg) + .map_err(|_| StoreError::Database("submit channel closed".into()))?; + + rx.await + .map_err(|_| StoreError::Database("submit response dropped".into()))? + } + + /// Direct submit without coalescing (old code path). + /// + /// Used by the fast path when no contention is detected. + async fn submit_direct(&self, sub: &TaskSubmission) -> Result { let mut conn = self.begin_write().await?; - tracing::debug!(task_type = %sub.task_type, "store.submit: INSERT start"); + let skip_requeue = !self.has_running.load(std::sync::atomic::Ordering::Relaxed); let outcome = submit_one( &mut conn, sub, Some(&self.has_tags), Some(&self.has_hierarchy), + skip_requeue, ) .await?; - tracing::debug!(task_type = %sub.task_type, "store.submit: INSERT end"); sqlx::query("COMMIT").execute(&mut *conn).await?; Ok(outcome) } + /// Process a batch of coalesced submit messages in a single transaction. + /// + /// Each message gets its own `submit_one` call, but they all share one + /// `BEGIN IMMEDIATE` / `COMMIT` pair. Results are sent back via oneshot. + pub(crate) async fn process_submit_batch(&self, batch: Vec) { + if batch.is_empty() { + return; + } + + let skip_requeue = !self.has_running.load(std::sync::atomic::Ordering::Relaxed); + + // Try to open a single shared transaction. + let conn = self.begin_write().await; + let mut conn = match conn { + Ok(c) => c, + Err(e) => { + let msg = e.to_string(); + for m in batch { + let _ = m.response_tx.send(Err(StoreError::Database(msg.clone()))); + } + return; + } + }; + + let mut results: Vec> = Vec::with_capacity(batch.len()); + for m in &batch { + results.push( + submit_one( + &mut conn, + &m.submission, + Some(&self.has_tags), + Some(&self.has_hierarchy), + skip_requeue, + ) + .await, + ); + } + + match sqlx::query("COMMIT").execute(&mut *conn).await { + Ok(_) => { + for (m, result) in batch.into_iter().zip(results) { + let _ = m.response_tx.send(result); + } + } + Err(e) => { + let msg = e.to_string(); + for m in batch { + let _ = m.response_tx.send(Err(StoreError::Database(msg.clone()))); + } + } + } + } + /// Submit multiple tasks in a single transaction. Returns a `Vec` with one /// [`SubmitOutcome`] per input. /// @@ -248,6 +355,8 @@ impl TaskStore { let mut results = Vec::with_capacity(submissions.len()); + let skip_requeue = !self.has_running.load(std::sync::atomic::Ordering::Relaxed); + for chunk in submissions.chunks(BATCH_CHUNK_SIZE) { let chunk_offset = results.len(); let mut conn = self.begin_write().await?; @@ -263,6 +372,7 @@ impl TaskStore { sub, Some(&self.has_tags), Some(&self.has_hierarchy), + skip_requeue, ) .await?, ); From 825f091e1b4211ddf009e3c9524e83f36e177814 Mon Sep 17 00:00:00 2001 From: DJ Majumdar Date: Mon, 23 Mar 2026 18:53:00 -0700 Subject: [PATCH 6/8] perf: widen completion coalescing window and skip no-subscriber broadcasts MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add yield_now() before leader-election drain so more completions accumulate per batch. Gate all event_tx.send() calls behind receiver_count() > 0 to avoid broadcast channel overhead when no subscribers exist. dispatch_and_complete/1000: 169µs → 149µs/task (−17%, +21% throughput) --- src/scheduler/control.rs | 6 +++--- src/scheduler/dispatch.rs | 4 ++-- src/scheduler/mod.rs | 14 ++++++++++++++ src/scheduler/progress.rs | 4 ++-- src/scheduler/run_loop.rs | 6 +++--- src/scheduler/spawn.rs | 8 +++----- src/scheduler/spawn/completion.rs | 14 +++++++++----- src/scheduler/spawn/failure.rs | 18 +++++++----------- src/scheduler/spawn/parent.rs | 4 ++-- src/scheduler/submit.rs | 28 ++++++++-------------------- 10 files changed, 53 insertions(+), 53 deletions(-) diff --git a/src/scheduler/control.rs b/src/scheduler/control.rs index ad425df..5f9d884 100644 --- a/src/scheduler/control.rs +++ b/src/scheduler/control.rs @@ -2,7 +2,7 @@ use std::sync::atomic::Ordering as AtomicOrdering; -use super::{Scheduler, SchedulerEvent}; +use super::{emit_event, Scheduler, SchedulerEvent}; impl Scheduler { /// Update max concurrency at runtime (e.g., from adaptive controller or @@ -35,7 +35,7 @@ impl Scheduler { .active .pause_all(&self.inner.store, &self.inner.event_tx) .await; - let _ = self.inner.event_tx.send(SchedulerEvent::Paused); + emit_event(&self.inner.event_tx, SchedulerEvent::Paused); tracing::info!(paused_tasks = count, "scheduler paused"); } @@ -47,7 +47,7 @@ impl Scheduler { pub async fn resume_all(&self) { self.inner.paused.store(false, AtomicOrdering::Release); self.inner.work_notify.notify_one(); - let _ = self.inner.event_tx.send(SchedulerEvent::Resumed); + emit_event(&self.inner.event_tx, SchedulerEvent::Resumed); tracing::info!("scheduler resumed"); } diff --git a/src/scheduler/dispatch.rs b/src/scheduler/dispatch.rs index 5f0dcdb..8b0ad50 100644 --- a/src/scheduler/dispatch.rs +++ b/src/scheduler/dispatch.rs @@ -10,7 +10,7 @@ use crate::registry::IoTracker; use crate::store::TaskStore; use crate::task::TaskRecord; -use super::SchedulerEvent; +use super::{emit_event, SchedulerEvent}; // ── Active Task ──────────────────────────────────────────────────── @@ -268,6 +268,6 @@ async fn cancel_pause_emit( for (id, at) in drained { at.token.cancel(); let _ = store.pause(*id).await; - let _ = event_tx.send(SchedulerEvent::Preempted(at.record.event_header())); + emit_event(event_tx, SchedulerEvent::Preempted(at.record.event_header())); } } diff --git a/src/scheduler/mod.rs b/src/scheduler/mod.rs index 969610f..52b7703 100644 --- a/src/scheduler/mod.rs +++ b/src/scheduler/mod.rs @@ -81,6 +81,20 @@ pub use event::{ pub use gate::GroupLimits; pub use progress::{EstimatedProgress, ProgressReporter, TaskProgress}; +/// Emit a scheduler event only when at least one subscriber is listening. +/// +/// Avoids the broadcast channel's internal locking and allocation overhead +/// when no subscribers exist (common in benchmarks and headless operation). +#[inline] +pub(crate) fn emit_event( + tx: &tokio::sync::broadcast::Sender, + event: SchedulerEvent, +) { + if tx.receiver_count() > 0 { + let _ = tx.send(event); + } +} + // ── Scheduler ─────────────────────────────────────────────────────── /// Shared inner state behind `Arc` so `Scheduler` can be `Clone`. diff --git a/src/scheduler/progress.rs b/src/scheduler/progress.rs index 57fc282..08e889f 100644 --- a/src/scheduler/progress.rs +++ b/src/scheduler/progress.rs @@ -38,7 +38,7 @@ use crate::task::TaskRecord; use super::dispatch::ActiveTaskMap; use super::event::TaskEventHeader; -use super::SchedulerEvent; +use super::{emit_event, SchedulerEvent}; // ── Progress Reporter ────────────────────────────────────────────── @@ -94,7 +94,7 @@ impl ProgressReporter { // Update internal progress tracking directly (sync, no broadcast roundtrip). self.active.update_progress(self.header.task_id, clamped); // Broadcast for external subscribers (UI / Tauri). - let _ = self.event_tx.send(SchedulerEvent::Progress { + emit_event(&self.event_tx, SchedulerEvent::Progress { header: self.header.clone(), percent: clamped, message, diff --git a/src/scheduler/run_loop.rs b/src/scheduler/run_loop.rs index 4c10c37..7734a9f 100644 --- a/src/scheduler/run_loop.rs +++ b/src/scheduler/run_loop.rs @@ -8,7 +8,7 @@ use tokio_util::sync::CancellationToken; use crate::store::StoreError; use crate::task::IoBudget; -use super::SchedulerEvent; +use super::{emit_event, SchedulerEvent}; use super::gate::GateContext; use super::spawn::{self, SpawnContext}; @@ -80,7 +80,7 @@ impl Scheduler { let age = (chrono::Utc::now() - task.created_at) .to_std() .unwrap_or_default(); - let _ = self.inner.event_tx.send(SchedulerEvent::TaskExpired { + emit_event(&self.inner.event_tx, SchedulerEvent::TaskExpired { header: task.event_header(), age, }); @@ -339,7 +339,7 @@ impl Scheduler { let age = (chrono::Utc::now() - task.created_at) .to_std() .unwrap_or_default(); - let _ = self.inner.event_tx.send(SchedulerEvent::TaskExpired { + emit_event(&self.inner.event_tx, SchedulerEvent::TaskExpired { header: task.event_header(), age, }); diff --git a/src/scheduler/spawn.rs b/src/scheduler/spawn.rs index 201ccf1..ce10eae 100644 --- a/src/scheduler/spawn.rs +++ b/src/scheduler/spawn.rs @@ -20,7 +20,7 @@ use crate::registry::ErasedExecutor; use crate::task::TaskRecord; use super::dispatch::ActiveTask; -use super::SchedulerEvent; +use super::{emit_event, SchedulerEvent}; pub(in crate::scheduler) use completion::process_completion_batch; pub(crate) use context::SpawnContext; @@ -72,9 +72,7 @@ pub(crate) async fn spawn_task( } // Emit dispatched event. - let _ = ctx - .event_tx - .send(SchedulerEvent::Dispatched(task.event_header())); + emit_event(&ctx.event_tx, SchedulerEvent::Dispatched(task.event_header())); // Build deps for handlers (cloned from SpawnContext since they move into the spawned future). let completion_deps = completion::CompletionDeps { @@ -190,7 +188,7 @@ pub(crate) async fn spawn_task( task.retry_count += 1; // Emit retry event. - let _ = failure_deps.event_tx.send(SchedulerEvent::Failed { + emit_event(&failure_deps.event_tx, SchedulerEvent::Failed { header: task.event_header(), error: te.message, will_retry: true, diff --git a/src/scheduler/spawn/completion.rs b/src/scheduler/spawn/completion.rs index 5925b39..6cea91f 100644 --- a/src/scheduler/spawn/completion.rs +++ b/src/scheduler/spawn/completion.rs @@ -6,7 +6,7 @@ use crate::store::TaskStore; use crate::task::{IoBudget, TaskRecord}; use super::super::dispatch::ActiveTaskMap; -use super::super::{CompletionMsg, SchedulerEvent}; +use super::super::{emit_event, CompletionMsg, SchedulerEvent}; use super::parent::handle_parent_resolution; use super::ExecutionPhase; @@ -52,7 +52,7 @@ pub(crate) async fn handle_success( } decrement_module(); deps.active.remove(task_id); - let _ = deps.event_tx.send(SchedulerEvent::Waiting { + emit_event(&deps.event_tx, SchedulerEvent::Waiting { task_id, children_count: count, }); @@ -104,6 +104,10 @@ pub(crate) async fn handle_success( return; } + // Yield before draining to let more completions accumulate in the + // channel, increasing batch size under high throughput. + tokio::task::yield_now().await; + // Leader election: try to grab the rx lock and drain the batch. // Under high concurrency, only one task wins — it processes the batch // for everyone. Others just wake the scheduler and return. @@ -182,16 +186,16 @@ pub(in crate::scheduler) fn emit_completion_events( Some((next, count)) => (Some(next), count), None => (None, task.recurring_execution_count + 1), }; - let _ = event_tx.send(SchedulerEvent::RecurringCompleted { + emit_event(event_tx, SchedulerEvent::RecurringCompleted { header: task.event_header(), execution_count: exec_count, next_run, }); } - let _ = event_tx.send(SchedulerEvent::Completed(task.event_header())); + emit_event(event_tx, SchedulerEvent::Completed(task.event_header())); for uid in unblocked { - let _ = event_tx.send(SchedulerEvent::TaskUnblocked { task_id: *uid }); + emit_event(event_tx, SchedulerEvent::TaskUnblocked { task_id: *uid }); } } diff --git a/src/scheduler/spawn/failure.rs b/src/scheduler/spawn/failure.rs index 14f4424..acd4956 100644 --- a/src/scheduler/spawn/failure.rs +++ b/src/scheduler/spawn/failure.rs @@ -6,7 +6,7 @@ use crate::store::TaskStore; use crate::task::{IoBudget, TaskError, TaskRecord}; use super::super::dispatch::ActiveTaskMap; -use super::super::{FailureMsg, SchedulerEvent}; +use super::super::{emit_event, FailureMsg, SchedulerEvent}; use super::parent::handle_parent_resolution; /// Shared dependencies for the failure handler. @@ -156,13 +156,13 @@ pub(crate) async fn handle_failure( let dead_lettered = error.retryable && !will_retry; if dead_lettered { - let _ = deps.event_tx.send(SchedulerEvent::DeadLettered { + emit_event(&deps.event_tx, SchedulerEvent::DeadLettered { header: task.event_header(), error: error.message.clone(), retry_count: task.retry_count + 1, }); } else { - let _ = deps.event_tx.send(SchedulerEvent::Failed { + emit_event(&deps.event_tx, SchedulerEvent::Failed { header: task.event_header(), error: error.message.clone(), will_retry, @@ -204,15 +204,13 @@ async fn propagate_failure(task: &TaskRecord, error: &TaskError, deps: &FailureD match deps.store.fail_dependents(task_id).await { Ok((failed_ids, unblocked_ids)) => { for fid in &failed_ids { - let _ = deps.event_tx.send(SchedulerEvent::DependencyFailed { + emit_event(&deps.event_tx, SchedulerEvent::DependencyFailed { task_id: *fid, failed_dependency: task_id, }); } for uid in &unblocked_ids { - let _ = deps - .event_tx - .send(SchedulerEvent::TaskUnblocked { task_id: *uid }); + emit_event(&deps.event_tx, SchedulerEvent::TaskUnblocked { task_id: *uid }); } if !unblocked_ids.is_empty() { deps.work_notify.notify_one(); @@ -233,9 +231,7 @@ async fn propagate_failure(task: &TaskRecord, error: &TaskError, deps: &FailureD if let Some(at) = deps.active.remove(*rid) { at.token.cancel(); let _ = deps.store.delete(*rid).await; - let _ = deps - .event_tx - .send(SchedulerEvent::Cancelled(at.record.event_header())); + emit_event(&deps.event_tx, SchedulerEvent::Cancelled(at.record.event_header())); } } } @@ -259,7 +255,7 @@ async fn propagate_failure(task: &TaskRecord, error: &TaskError, deps: &FailureD "failed to record parent failure" ); } - let _ = deps.event_tx.send(SchedulerEvent::Failed { + emit_event(&deps.event_tx, SchedulerEvent::Failed { header: parent.event_header(), error: msg, will_retry: false, diff --git a/src/scheduler/spawn/parent.rs b/src/scheduler/spawn/parent.rs index ee3e941..71406b5 100644 --- a/src/scheduler/spawn/parent.rs +++ b/src/scheduler/spawn/parent.rs @@ -6,7 +6,7 @@ use crate::store::TaskStore; use crate::task::{IoBudget, ParentResolution}; use super::super::dispatch::ActiveTaskMap; -use super::super::SchedulerEvent; +use super::super::{emit_event, SchedulerEvent}; /// Check if a waiting parent is ready for finalization or has failed, /// and dispatch the finalize phase if ready. @@ -41,7 +41,7 @@ pub(crate) async fn handle_parent_resolution( { tracing::error!(parent_id, error = %e, "failed to record parent failure"); } - let _ = event_tx.send(SchedulerEvent::Failed { + emit_event(event_tx, SchedulerEvent::Failed { header: parent.event_header(), error: reason, will_retry: false, diff --git a/src/scheduler/submit.rs b/src/scheduler/submit.rs index 961d32d..a7f6cee 100644 --- a/src/scheduler/submit.rs +++ b/src/scheduler/submit.rs @@ -13,7 +13,7 @@ use crate::task::{ }; use super::progress::ProgressReporter; -use super::{Scheduler, SchedulerEvent}; +use super::{emit_event, Scheduler, SchedulerEvent}; impl Scheduler { /// Resolve the effective TTL for a submission. @@ -64,7 +64,7 @@ impl Scheduler { label: sub.label.clone(), tags: sub.tags.clone(), }; - let _ = self.inner.event_tx.send(SchedulerEvent::Superseded { + emit_event(&self.inner.event_tx, SchedulerEvent::Superseded { old: old_header, new_task_id: *new_task_id, }); @@ -129,7 +129,7 @@ impl Scheduler { label: sub.label.clone(), tags: sub.tags.clone(), }; - let _ = self.inner.event_tx.send(SchedulerEvent::Superseded { + emit_event(&self.inner.event_tx, SchedulerEvent::Superseded { old: old_header, new_task_id: *new_task_id, }); @@ -170,7 +170,7 @@ impl Scheduler { if any_changed { let inserted_ids = outcome.inserted(); - let _ = self.inner.event_tx.send(SchedulerEvent::BatchSubmitted { + emit_event(&self.inner.event_tx, SchedulerEvent::BatchSubmitted { count: resolved.len(), inserted_ids, }); @@ -266,10 +266,7 @@ impl Scheduler { .cancel_to_history_with_record(&at.record) .await?; self.fire_on_cancel(&at.record).await; - let _ = self - .inner - .event_tx - .send(SchedulerEvent::Cancelled(at.record.event_header())); + emit_event(&self.inner.event_tx, SchedulerEvent::Cancelled(at.record.event_header())); } } @@ -281,10 +278,7 @@ impl Scheduler { .cancel_to_history_with_record(&at.record) .await?; self.fire_on_cancel(&at.record).await; - let _ = self - .inner - .event_tx - .send(SchedulerEvent::Cancelled(at.record.event_header())); + emit_event(&self.inner.event_tx, SchedulerEvent::Cancelled(at.record.event_header())); return Ok(true); } @@ -394,10 +388,7 @@ impl Scheduler { .cancel_to_history_with_record(&at.record) .await?; self.fire_on_cancel(&at.record).await; - let _ = self - .inner - .event_tx - .send(SchedulerEvent::Cancelled(at.record.event_header())); + emit_event(&self.inner.event_tx, SchedulerEvent::Cancelled(at.record.event_header())); return Ok(true); } self.inner.store.cancel_recurring(task_id).await @@ -431,10 +422,7 @@ impl Scheduler { if let Some(at) = self.inner.active.remove(replaced_task_id) { at.token.cancel(); self.fire_on_cancel(&at.record).await; - let _ = self - .inner - .event_tx - .send(SchedulerEvent::Cancelled(at.record.event_header())); + emit_event(&self.inner.event_tx, SchedulerEvent::Cancelled(at.record.event_header())); } } From ea7b900cea04fe3782b79ae655f35655b1beea27 Mon Sep 17 00:00:00 2001 From: DJ Majumdar Date: Mon, 23 Mar 2026 19:04:00 -0700 Subject: [PATCH 7/8] docs: update docs for 0.6 API and unreleased perf changes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Rename tasks_by_tags → task_ids_by_tags, tasks_by_tag_key_prefix → task_ids_by_tag_key_prefix (now return Vec) in query-apis and multi-module-apps docs - Add inline zero-delay retry path to design.md retry flow - Document new idx_history_type covering index and missing dead_letter history status in persistence-and-recovery.md - Replace &TaskContext with DomainTaskContext<'_, D> in all code examples across 10 doc files and lib.rs rustdoc - Update spawn_child → spawn_child_with, .parent() → .child_of(&ctx) in quick-start.md - Bump stale version strings (0.3/0.4/0.5 → 0.6) in Cargo.toml snippets - Mark raw_executor as removed in configuration.md Domain builder table --- docs/configuration.md | 4 ++-- docs/design.md | 10 +++++++--- docs/glossary.md | 4 ++-- docs/guides/background-service.md | 6 +++--- docs/guides/tauri-upload-queue.md | 6 +++--- docs/io-and-backpressure.md | 2 +- docs/library-modules.md | 10 +++++----- docs/multi-module-apps.md | 8 ++++---- docs/persistence-and-recovery.md | 6 ++++-- docs/priorities-and-preemption.md | 2 +- docs/progress-and-events.md | 2 +- docs/query-apis.md | 4 ++-- docs/quick-start.md | 33 +++++++++++++++---------------- src/lib.rs | 20 +++++++++---------- 14 files changed, 61 insertions(+), 56 deletions(-) diff --git a/docs/configuration.md b/docs/configuration.md index b2015c8..f16ad67 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -322,7 +322,7 @@ scheduler.register_state(Arc::new(LibraryState { /* ... */ })).await; ```toml # Disable platform monitoring -taskmill = { version = "0.4", default-features = false } +taskmill = { version = "0.6", default-features = false } ``` When disabled, you can still provide a custom `ResourceSampler` via `.resource_sampler()`. @@ -446,7 +446,7 @@ Scheduler::builder() | `Domain::::new()` | Create a domain. The domain name is taken from `D::NAME`. Task types are prefixed `"{name}::"`. | | `task::(executor)` | Register a `TypedExecutor` using `T::TASK_TYPE` as the name. TTL and retry policy from `T::config()` are used automatically. | | `task_with::(executor, options)` | Register with per-type option overrides (for executor-instance-dependent config). | -| `raw_executor(name, executor)` | Escape hatch: register an untyped `TaskExecutor` by type name. | +| ~~`raw_executor(name, executor)`~~ | **Removed in 0.6.** Use `task::(executor)` with a `TypedExecutor`. | | `default_priority(p)` | Domain-wide priority for all submissions. | | `default_retry(policy)` | Domain-wide retry policy. | | `default_group(group)` | Domain-wide group key. | diff --git a/docs/design.md b/docs/design.md index 9218811..c6249ff 100644 --- a/docs/design.md +++ b/docs/design.md @@ -64,8 +64,8 @@ flowchart TD S["submit() / submit_batch()"] --> TS["TaskStore\n(INSERT OR IGNORE)"] TS --> |SQLite| DB[(tasks table)] DB --> SCH["Scheduler run loop"] - SCH --> |"tokio::spawn"| E1["Executor + TaskContext"] - SCH --> |"tokio::spawn"| E2["Executor + TaskContext"] + SCH --> |"tokio::spawn"| E1["Executor + DomainTaskContext"] + SCH --> |"tokio::spawn"| E2["Executor + DomainTaskContext"] E1 --> CF["complete() / fail()"] E2 --> CF CF --> HIST[(task_history)] @@ -193,8 +193,12 @@ Each cycle, the loop: Executor returns Err(TaskError) └─ retryable: false? ──► move to task_history (failed) └─ retryable: true? - └─ retry_count < max_retries? ──► status → pending, retry_count += 1 + └─ retry_count < max_retries? + └─ delay == 0? ──► inline retry (stays running, retry_count += 1, re-execute) + └─ delay > 0? ──► status → pending, retry_count += 1, requeued with backoff └─ otherwise ──► move to task_history (failed) ``` Retried tasks keep their original priority and dedup key. `max_retries` defaults to 3. + +**Inline zero-delay retries:** When the retry delay is zero, the executor re-runs immediately within the same spawned task — avoiding the overhead of returning to the SQLite queue and being re-dispatched. The `retry_count` is persisted to the database, but the task stays in `running` status throughout. diff --git a/docs/glossary.md b/docs/glossary.md index 64dff50..56f797b 100644 --- a/docs/glossary.md +++ b/docs/glossary.md @@ -16,7 +16,7 @@ Quick reference for terms used throughout the taskmill documentation. | **Deduplication (dedup)** | Preventing the same task from being queued twice. Taskmill generates a SHA-256 key from the task type and payload; a second submission with the same key is silently ignored. See [Persistence & Recovery](persistence-and-recovery.md#deduplication). | | **Dispatch** | Moving a task from "waiting in line" (pending) to "actively running." The scheduler dispatches tasks in priority order, subject to concurrency limits and backpressure. | | **EWMA** | Exponentially Weighted Moving Average — a smoothing technique that gives recent measurements more weight than old ones. Taskmill uses EWMA to smooth resource readings so the scheduler doesn't overreact to momentary spikes. See [IO & Backpressure](io-and-backpressure.md#ewma-smoothing). | -| **TypedExecutor** | Your code that performs the actual work for a task type. Implements `TypedExecutor` and receives a deserialized payload: `async fn execute(&self, payload: T, ctx: &TaskContext)`. Register with `Domain::task::(executor)`. See [Quick Start](quick-start.md#implement-an-executor). | +| **TypedExecutor** | Your code that performs the actual work for a task type. Implements `TypedExecutor` and receives a deserialized payload: `async fn execute(&self, payload: T, ctx: DomainTaskContext<'_, T::Domain>)`. Register with `Domain::task::(executor)`. See [Quick Start](quick-start.md#implement-an-executor). | | **IO budget** | An estimate of how many bytes a task will read and write (disk and/or network), submitted alongside the task. The scheduler uses IO budgets to avoid overwhelming the disk. See [IO & Backpressure](io-and-backpressure.md#io-budgets-telling-the-scheduler-what-to-expect). | | **Pile-up prevention** | The mechanism that skips a recurring task instance when the previous instance hasn't been dispatched yet, preventing unbounded queue growth under sustained load. See [Quick Start](quick-start.md#recurring-tasks). | | **Preemption** | Pausing lower-priority work so higher-priority work can run immediately. Preempted tasks resume automatically once the urgent work finishes. See [Priorities & Preemption](priorities-and-preemption.md#preemption). | @@ -31,7 +31,7 @@ Quick reference for terms used throughout the taskmill documentation. | **Typed task** | A struct that implements the `TypedTask` trait, giving you compile-time type safety for task payloads and configuration. Declares `type Domain: DomainKey` for compile-time domain identity and `fn config() -> TaskTypeConfig` for static defaults (priority, IO budget, TTL, retry policy, etc.). Register with `Domain::task::(executor)` and submit with `handle.submit(task)`. See [Quick Start](quick-start.md#typed-tasks). | | **TaskTypeConfig** | A struct returned by `TypedTask::config()` that holds static per-task-type defaults — priority, IO budget, group key, TTL, retry policy, duplicate strategy, etc. All fields are `Option`; `None` means "use the next layer's default" (domain default, then scheduler global, then built-in). | | **DomainSubmitBuilder** | The fluent builder returned by `DomainHandle::submit_with(task)`. Implements `IntoFuture` so bare `.await` applies all defaults; chain override methods (`.priority()`, `.run_after()`, `.parent()`, `.fail_fast()`, etc.) before `.await` to override individual fields for that call only. | -| **Domain-scoped state** | Application state registered on a `Domain` via `.state(value)`, visible only to executors within that domain. `TaskContext::state::()` checks domain state first and falls back to global state registered on `SchedulerBuilder`. See [Configuration](configuration.md#application-state). | +| **Domain-scoped state** | Application state registered on a `Domain` via `.state(value)`, visible only to executors within that domain. `DomainTaskContext::state::()` checks domain state first and falls back to global state registered on `SchedulerBuilder`. See [Configuration](configuration.md#application-state). | | **TypedEventStream** | A per-task-type event subscription (`TypedEventStream`) created via `handle.task_events::()`. Filters the global scheduler event broadcast to only events matching `T::TASK_TYPE` within the domain. Terminal events include the `TaskHistoryRecord`. | | **Qualified task type** | The full database-stored task type including the domain prefix, e.g. `"media::thumbnail"`. Required when using store-level query APIs (`history_stats`, `task_lookup`, `avg_throughput`). `DomainHandle` methods apply the prefix automatically, so you typically only need the short form when submitting tasks. | | **Cross-domain dependency** | A dependency edge where the dependent task and its prerequisite belong to different domains. Functionally identical to same-domain dependencies — the domain boundary does not affect dependency resolution or failure propagation. See [Multi-Module Applications](multi-module-apps.md#cross-module-task-dependencies). | diff --git a/docs/guides/background-service.md b/docs/guides/background-service.md index cbad013..1bc6db3 100644 --- a/docs/guides/background-service.md +++ b/docs/guides/background-service.md @@ -47,12 +47,12 @@ impl TypedTask for ProcessImageTask { ## Implement the executor ```rust -use taskmill::{TypedExecutor, TaskContext, TaskError}; +use taskmill::{TypedExecutor, DomainTaskContext, TaskError}; struct ImageProcessor; impl TypedExecutor for ImageProcessor { - async fn execute(&self, task: ProcessImageTask, ctx: &TaskContext) -> Result<(), TaskError> { + async fn execute(&self, task: ProcessImageTask, ctx: DomainTaskContext<'_, Images>) -> Result<(), TaskError> { // Read the source image let data = tokio::fs::read(&task.path).await .map_err(|e| TaskError::permanent(format!("can't read: {e}")))?; @@ -187,7 +187,7 @@ Scheduler::builder() Disable the default sampler in `Cargo.toml`: ```toml -taskmill = { version = "0.4", default-features = false } +taskmill = { version = "0.6", default-features = false } ``` ## Key differences from desktop diff --git a/docs/guides/tauri-upload-queue.md b/docs/guides/tauri-upload-queue.md index 78c7c41..3463c7d 100644 --- a/docs/guides/tauri-upload-queue.md +++ b/docs/guides/tauri-upload-queue.md @@ -17,7 +17,7 @@ Add taskmill to your Tauri app's `Cargo.toml`: ```toml [dependencies] -taskmill = "0.4" +taskmill = "0.6" serde = { version = "1", features = ["derive"] } serde_json = "1" tokio-util = "0.7" @@ -74,12 +74,12 @@ impl TypedTask for UploadTask { The executor does the actual upload work. It reports progress and checks for preemption between chunks. ```rust -use taskmill::{TypedExecutor, TaskContext, TaskError}; +use taskmill::{TypedExecutor, DomainTaskContext, TaskError}; struct UploadExecutor; impl TypedExecutor for UploadExecutor { - async fn execute(&self, task: UploadTask, ctx: &TaskContext) -> Result<(), TaskError> { + async fn execute(&self, task: UploadTask, ctx: DomainTaskContext<'_, Uploads>) -> Result<(), TaskError> { let file = tokio::fs::read(&task.file_path).await .map_err(|e| TaskError::permanent(format!("can't read file: {e}")))?; diff --git a/docs/io-and-backpressure.md b/docs/io-and-backpressure.md index 553e832..0c9d9c5 100644 --- a/docs/io-and-backpressure.md +++ b/docs/io-and-backpressure.md @@ -142,7 +142,7 @@ let scheduler = Scheduler::builder() Disable the built-in sampler in `Cargo.toml`: ```toml -taskmill = { version = "0.3", default-features = false } +taskmill = { version = "0.6", default-features = false } ``` ## Backpressure: external pressure signals diff --git a/docs/library-modules.md b/docs/library-modules.md index 9f46fb3..f427be1 100644 --- a/docs/library-modules.md +++ b/docs/library-modules.md @@ -42,7 +42,7 @@ use std::time::Duration; use serde::{Serialize, Deserialize}; use taskmill::{ Domain, DomainKey, TypedTask, TypedExecutor, TaskTypeConfig, - TaskContext, TaskError, IoBudget, Priority, RetryPolicy, + DomainTaskContext, TaskError, IoBudget, Priority, RetryPolicy, }; // ── Public API ────────────────────────────────────────────────── @@ -154,7 +154,7 @@ Executors can access any type via `ctx.state::()`. It is tempting to grab a d ```rust // BAD: invisible coupling to the host's global state impl TypedExecutor for UploadExecutor { - async fn execute(&self, task: UploadTask, ctx: &TaskContext) -> Result<(), TaskError> { + async fn execute(&self, task: UploadTask, ctx: DomainTaskContext<'_, AcmeCdn>) -> Result<(), TaskError> { let db = ctx.state::().expect("host must register AppDb"); // ... } @@ -180,7 +180,7 @@ Inside the executor: ```rust impl TypedExecutor for UploadExecutor { - async fn execute(&self, task: UploadTask, ctx: &TaskContext) -> Result<(), TaskError> { + async fn execute(&self, task: UploadTask, ctx: DomainTaskContext<'_, AcmeCdn>) -> Result<(), TaskError> { let config = ctx.state::() .expect("AcmeCdnConfig is registered by acme_cdn_domain()"); // safe — this domain always registers its own config @@ -276,9 +276,9 @@ If your library provides optional integration with another domain, use `ctx.try_ use analytics::{Analytics, TrackEvent}; impl TypedExecutor for UploadExecutor { - async fn execute(&self, task: UploadTask, ctx: &TaskContext) -> Result<(), TaskError> { + async fn execute(&self, task: UploadTask, ctx: DomainTaskContext<'_, AcmeCdn>) -> Result<(), TaskError> { // Core upload logic... - do_upload(ctx).await?; + do_upload(&ctx).await?; // Optional: notify analytics domain if present if let Some(analytics) = ctx.try_domain::() { diff --git a/docs/multi-module-apps.md b/docs/multi-module-apps.md index 7f0e78a..84654c0 100644 --- a/docs/multi-module-apps.md +++ b/docs/multi-module-apps.md @@ -76,7 +76,7 @@ let scheduler = Scheduler::builder() State registered on `SchedulerBuilder::app_state()` is **global** — visible to executors in every domain. State registered on `Domain::state()` is **domain-scoped** — visible only to executors in that domain. -`TaskContext::state::()` checks domain-scoped state first, then falls back to global state. +`DomainTaskContext::state::()` checks domain-scoped state first, then falls back to global state. | What | Where to register | Why | |------|-------------------|-----| @@ -158,7 +158,7 @@ Executors can submit to other domains via `ctx.domain::()`: ```rust impl TypedExecutor for FetchExecutor { - async fn execute(&self, task: FetchTask, ctx: &TaskContext) -> Result<(), TaskError> { + async fn execute(&self, task: FetchTask, ctx: DomainTaskContext<'_, Ingest>) -> Result<(), TaskError> { let data = fetch_remote(&task).await?; // Submit a follow-up in a different domain. @@ -285,8 +285,8 @@ let keys = billing.tag_keys_by_prefix("billing.").await?; // Count how many billing tasks are active let count = billing.count_by_tag_key_prefix("billing.", None).await?; -// Fetch those tasks (optionally filter by status) -let tasks = billing.tasks_by_tag_key_prefix("billing.", Some(TaskStatus::Pending)).await?; +// Fetch IDs of matching tasks (optionally filter by status) +let task_ids = billing.task_ids_by_tag_key_prefix("billing.", Some(TaskStatus::Pending)).await?; // Cancel all tasks in the billing namespace let cancelled = billing.cancel_by_tag_key_prefix("billing.").await?; diff --git a/docs/persistence-and-recovery.md b/docs/persistence-and-recovery.md index 1ab249c..bd2bb95 100644 --- a/docs/persistence-and-recovery.md +++ b/docs/persistence-and-recovery.md @@ -212,12 +212,14 @@ All columns from `tasks`, plus: | `actual_net_tx_bytes` | INTEGER | Reported by executor | | `completed_at` | TEXT | ISO 8601 timestamp | | `duration_ms` | INTEGER | Wall-clock duration | -| `status` | TEXT | `completed`, `failed`, `cancelled`, `superseded`, or `expired` | +| `status` | TEXT | `completed`, `failed`, `cancelled`, `superseded`, `expired`, or `dead_letter` | | `ttl_seconds` | INTEGER | TTL duration in seconds (NULL = no TTL) | | `ttl_from` | TEXT DEFAULT 'submission' | When TTL clock started | | `expires_at` | TEXT | ISO 8601 deadline (NULL = no expiry) | -**Index:** `idx_history_type_completed(task_type, completed_at DESC) WHERE status = 'completed'` — for per-type history queries and throughput calculations. +**Indexes:** +- `idx_history_type_completed(task_type, completed_at DESC) WHERE status = 'completed'` — for per-type throughput calculations. +- `idx_history_type(task_type, completed_at DESC)` — covering index for `history_stats`, `history_by_type`, and `avg_throughput` aggregate queries (all statuses). ### WAL mode diff --git a/docs/priorities-and-preemption.md b/docs/priorities-and-preemption.md index 57eb925..8634ed6 100644 --- a/docs/priorities-and-preemption.md +++ b/docs/priorities-and-preemption.md @@ -63,7 +63,7 @@ Executors should check for cancellation at natural yield points — between chun ```rust impl TypedExecutor for MyExecutor { - async fn execute(&self, task: MyTask, ctx: &TaskContext) -> Result<(), TaskError> { + async fn execute(&self, task: MyTask, ctx: DomainTaskContext<'_, MyTask::Domain>) -> Result<(), TaskError> { for chunk in chunks { // Check before each unit of work if ctx.token().is_cancelled() { diff --git a/docs/progress-and-events.md b/docs/progress-and-events.md index 60d26c0..fd35ea6 100644 --- a/docs/progress-and-events.md +++ b/docs/progress-and-events.md @@ -8,7 +8,7 @@ Executors report progress via `ctx.progress()`. This emits events that your UI c ```rust impl TypedExecutor for MyExecutor { - async fn execute(&self, task: MyTask, ctx: &TaskContext) -> Result<(), TaskError> { + async fn execute(&self, task: MyTask, ctx: DomainTaskContext<'_, MyTask::Domain>) -> Result<(), TaskError> { let items = get_work_items(&task); for (i, item) in items.iter().enumerate() { diff --git a/docs/query-apis.md b/docs/query-apis.md index 2950326..a986c80 100644 --- a/docs/query-apis.md +++ b/docs/query-apis.md @@ -40,11 +40,11 @@ These methods are available on `DomainHandle` and are automatically scoped to | `handle.estimated_progress()` | `Vec` | Extrapolated progress for each running task in this domain. | | `handle.byte_progress()` | `Vec` | Live byte-level progress for running tasks in this domain. | | `handle.dead_letter_tasks(limit, offset)` | `Vec` | Paginated dead-letter (permanently failed) tasks for this domain. | -| `handle.tasks_by_tags(filters, status)` | `Vec` | Active tasks in this domain matching the given tag filters and optional status. | +| `handle.task_ids_by_tags(filters, status)` | `Vec` | IDs of active tasks in this domain matching the given tag filters and optional status. Fetch full records via `task_by_id()` if needed. | | `handle.count_by_tag(key, status)` | `Vec<(String, i64)>` | Tag value counts for a given key within this domain. | | `handle.tag_values(key)` | `Vec<(String, i64)>` | Distinct values for a tag key within this domain. | | `handle.tag_keys_by_prefix(prefix)` | `Vec` | Discover distinct tag keys matching a prefix (e.g. `"billing."`) within this domain. | -| `handle.tasks_by_tag_key_prefix(prefix, status)` | `Vec` | Find tasks with any tag key matching the prefix, with optional status filter. | +| `handle.task_ids_by_tag_key_prefix(prefix, status)` | `Vec` | IDs of tasks with any tag key matching the prefix, with optional status filter. Fetch full records via `task_by_id()` if needed. | | `handle.count_by_tag_key_prefix(prefix, status)` | `i64` | Count tasks with any tag key matching the prefix, with optional status filter. | ## Cross-domain operations (Scheduler) diff --git a/docs/quick-start.md b/docs/quick-start.md index 8e2b679..1e3cd6c 100644 --- a/docs/quick-start.md +++ b/docs/quick-start.md @@ -6,14 +6,14 @@ Add taskmill to your `Cargo.toml`: ```toml [dependencies] -taskmill = "0.5" +taskmill = "0.6" ``` To disable platform resource monitoring (e.g., for mobile targets or custom samplers): ```toml [dependencies] -taskmill = { version = "0.5", default-features = false } +taskmill = { version = "0.6", default-features = false } ``` ## Core concepts @@ -43,7 +43,7 @@ scheduler.domain::() <- get a typed handle at runtime Every task type needs code that knows how to do the work. Implement the `TypedExecutor` trait. The scheduler deserializes the payload for you and passes it directly to your executor. -Your executor receives the deserialized payload `T` and a `TaskContext` with everything it needs: +Your executor receives the deserialized payload `T` and a `DomainTaskContext` with everything it needs: - `record()` — the full task record including priority and retry count - `token()` — a cancellation token for responding to preemption (see [Priorities & Preemption](priorities-and-preemption.md#handling-preemption-in-executors)) @@ -51,7 +51,7 @@ Your executor receives the deserialized payload `T` and a `TaskContext` with eve - `state::()` — shared application state registered at build time ```rust -use taskmill::{TypedExecutor, TypedTask, TaskContext, TaskError, TaskTypeConfig, IoBudget, DomainKey}; +use taskmill::{TypedExecutor, TypedTask, DomainTaskContext, TaskError, TaskTypeConfig, IoBudget, DomainKey}; use serde::{Serialize, Deserialize}; // Define the domain identity. @@ -84,7 +84,7 @@ impl TypedExecutor for ImageResizer { async fn execute( &self, task: ResizeTask, - ctx: &TaskContext, + ctx: DomainTaskContext<'_, Media>, ) -> Result<(), TaskError> { // Check for preemption at yield points if ctx.token().is_cancelled() { @@ -258,26 +258,25 @@ media.submit_with(task) Some work is naturally hierarchical — a multipart upload needs to upload individual parts, then call `CompleteMultipartUpload`. Taskmill supports this with child tasks and two-phase execution. -Spawn children from within an executor using `ctx.spawn_child()`. Children are automatically domain-aware: the task type is prefixed and domain defaults are applied. The parent enters a `waiting` state until all children complete, then `finalize()` is called on the parent executor. +Spawn children from within an executor using `ctx.spawn_child_with()`. Children are automatically domain-aware: the task type is prefixed and domain defaults are applied. The parent enters a `waiting` state until all children complete, then `finalize()` is called on the parent executor. ```rust impl TypedExecutor for MultipartUploader { - async fn execute( - &self, upload: MultipartUpload, ctx: &TaskContext, + async fn execute<'a>( + &'a self, upload: MultipartUpload, ctx: DomainTaskContext<'a, Media>, ) -> Result<(), TaskError> { let parts = split_into_parts(&upload); for part in parts { - ctx.spawn_child( - TaskSubmission::new("upload-part") - .payload_json(&part) - .expected_io(IoBudget::net(0, part.size)) - ).await?; + ctx.spawn_child_with(UploadPart { + etag: part.etag.clone(), + size: part.size, + }).await?; } Ok(()) // parent enters 'waiting' state } - async fn finalize( - &self, upload: MultipartUpload, _memo: (), ctx: &TaskContext, + async fn finalize<'a>( + &'a self, upload: MultipartUpload, _memo: (), ctx: DomainTaskContext<'a, Media>, ) -> Result<(), TaskError> { // Called after all children complete complete_multipart_upload(&upload).await @@ -285,12 +284,12 @@ impl TypedExecutor for MultipartUploader { } ``` -For cross-domain children, use `ctx.domain::()` to get a handle and `.parent()` on the submit builder: +For cross-domain children, use `ctx.domain::()` to get a handle and `.child_of(&ctx)` on the submit builder: ```rust ctx.domain::() .submit_with(Upload { /* ... */ }) - .parent(ctx.record().id) + .child_of(&ctx) .await?; ``` diff --git a/src/lib.rs b/src/lib.rs index f27580d..49cd729 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -48,7 +48,7 @@ //! 2. **Pending** — the task waits in a priority queue. The scheduler's run loop //! pops the highest-priority pending task on each tick. //! 3. **Running** — the scheduler calls the [`TypedExecutor::execute`] method with the -//! deserialized payload and a [`TaskContext`] containing the task record, +//! deserialized payload and a [`DomainTaskContext`] containing the task record, //! a cancellation token, and a progress reporter. //! 4. **Terminal** — on success the task moves to the history table. On failure, //! a [`retryable`](TaskError::retryable) error requeues it (up to @@ -99,9 +99,9 @@ //! [resource monitoring](SchedulerBuilder::with_resource_monitoring) is enabled, //! compares them against observed system throughput to avoid over-saturating //! the disk or network. Executors report actual IO via -//! [`TaskContext::record_read_bytes`] / [`record_write_bytes`](TaskContext::record_write_bytes) / -//! [`record_net_rx_bytes`](TaskContext::record_net_rx_bytes) / -//! [`record_net_tx_bytes`](TaskContext::record_net_tx_bytes), +//! [`DomainTaskContext::record_read_bytes`] / [`record_write_bytes`](DomainTaskContext::record_write_bytes) / +//! [`record_net_rx_bytes`](DomainTaskContext::record_net_rx_bytes) / +//! [`record_net_tx_bytes`](DomainTaskContext::record_net_tx_bytes), //! which feeds back into historical throughput averages for future scheduling //! decisions. //! @@ -121,7 +121,7 @@ //! //! ## Child tasks & two-phase execution //! -//! An executor can spawn child tasks via [`TaskContext::spawn_child`]. When +//! An executor can spawn child tasks via [`DomainTaskContext::spawn_child_with`]. When //! children exist, the parent enters a **waiting** state after its executor //! returns. Once all children complete, the parent's //! [`TypedExecutor::finalize`] method is called — useful for assembly work @@ -287,15 +287,15 @@ //! [`cancel_hook_timeout`](SchedulerBuilder::cancel_hook_timeout)) so //! executors can clean up external resources — for example, aborting an S3 //! multipart upload. Executors can check for cancellation cooperatively via -//! [`TaskContext::check_cancelled`]. +//! [`DomainTaskContext::check_cancelled`]. //! //! Cancelling a parent task cascade-cancels all its children. //! //! ## Byte-level progress //! //! For long-running transfers (file copies, uploads, downloads), executors can -//! report byte-level progress via [`TaskContext::set_bytes_total`] and -//! [`TaskContext::add_bytes`]. The scheduler maintains per-task atomic counters +//! report byte-level progress via [`DomainTaskContext::set_bytes_total`] and +//! [`DomainTaskContext::add_bytes`]. The scheduler maintains per-task atomic counters //! on the `IoTracker` — updates are lock-free and //! impose no overhead on the executor hot path. //! @@ -313,7 +313,7 @@ //! ```ignore //! use taskmill::{ //! Domain, DomainKey, DomainHandle, Scheduler, TypedExecutor, -//! TaskContext, TaskError, TypedTask, TaskTypeConfig, IoBudget, Priority, +//! DomainTaskContext, TaskError, TypedTask, TaskTypeConfig, IoBudget, Priority, //! }; //! use serde::{Serialize, Deserialize}; //! use tokio_util::sync::CancellationToken; @@ -375,7 +375,7 @@ //! ## Shared application state //! //! Register shared services (database pools, HTTP clients, etc.) at build time -//! and retrieve them from any executor via [`TaskContext::state`]. State can be +//! and retrieve them from any executor via [`DomainTaskContext::state`]. State can be //! domain-scoped (checked first) or global (fallback): //! //! ```ignore From 99f36e577ec8ba011f3e3f823dacd51c1c868699 Mon Sep 17 00:00:00 2001 From: DJ Majumdar Date: Mon, 23 Mar 2026 21:28:17 -0700 Subject: [PATCH 8/8] style: reformat emit_event calls to block argument style --- src/scheduler/dispatch.rs | 5 ++- src/scheduler/progress.rs | 13 ++++--- src/scheduler/run_loop.rs | 22 +++++++---- src/scheduler/spawn.rs | 20 ++++++---- src/scheduler/spawn/completion.rs | 24 +++++++----- src/scheduler/spawn/failure.rs | 64 ++++++++++++++++++++----------- src/scheduler/spawn/parent.rs | 15 +++++--- src/scheduler/submit.rs | 53 +++++++++++++++++-------- 8 files changed, 141 insertions(+), 75 deletions(-) diff --git a/src/scheduler/dispatch.rs b/src/scheduler/dispatch.rs index 8b0ad50..cb6cf90 100644 --- a/src/scheduler/dispatch.rs +++ b/src/scheduler/dispatch.rs @@ -268,6 +268,9 @@ async fn cancel_pause_emit( for (id, at) in drained { at.token.cancel(); let _ = store.pause(*id).await; - emit_event(event_tx, SchedulerEvent::Preempted(at.record.event_header())); + emit_event( + event_tx, + SchedulerEvent::Preempted(at.record.event_header()), + ); } } diff --git a/src/scheduler/progress.rs b/src/scheduler/progress.rs index 08e889f..47c9088 100644 --- a/src/scheduler/progress.rs +++ b/src/scheduler/progress.rs @@ -94,11 +94,14 @@ impl ProgressReporter { // Update internal progress tracking directly (sync, no broadcast roundtrip). self.active.update_progress(self.header.task_id, clamped); // Broadcast for external subscribers (UI / Tauri). - emit_event(&self.event_tx, SchedulerEvent::Progress { - header: self.header.clone(), - percent: clamped, - message, - }); + emit_event( + &self.event_tx, + SchedulerEvent::Progress { + header: self.header.clone(), + percent: clamped, + message, + }, + ); } /// Report progress as a fraction (completed / total) with an optional message. diff --git a/src/scheduler/run_loop.rs b/src/scheduler/run_loop.rs index 7734a9f..938cc50 100644 --- a/src/scheduler/run_loop.rs +++ b/src/scheduler/run_loop.rs @@ -80,10 +80,13 @@ impl Scheduler { let age = (chrono::Utc::now() - task.created_at) .to_std() .unwrap_or_default(); - emit_event(&self.inner.event_tx, SchedulerEvent::TaskExpired { - header: task.event_header(), - age, - }); + emit_event( + &self.inner.event_tx, + SchedulerEvent::TaskExpired { + header: task.event_header(), + age, + }, + ); } return Ok(true); } @@ -339,10 +342,13 @@ impl Scheduler { let age = (chrono::Utc::now() - task.created_at) .to_std() .unwrap_or_default(); - emit_event(&self.inner.event_tx, SchedulerEvent::TaskExpired { - header: task.event_header(), - age, - }); + emit_event( + &self.inner.event_tx, + SchedulerEvent::TaskExpired { + header: task.event_header(), + age, + }, + ); } if !expired.is_empty() { tracing::info!(count = expired.len(), "expired stale tasks"); diff --git a/src/scheduler/spawn.rs b/src/scheduler/spawn.rs index ce10eae..df681c7 100644 --- a/src/scheduler/spawn.rs +++ b/src/scheduler/spawn.rs @@ -72,7 +72,10 @@ pub(crate) async fn spawn_task( } // Emit dispatched event. - emit_event(&ctx.event_tx, SchedulerEvent::Dispatched(task.event_header())); + emit_event( + &ctx.event_tx, + SchedulerEvent::Dispatched(task.event_header()), + ); // Build deps for handlers (cloned from SpawnContext since they move into the spawned future). let completion_deps = completion::CompletionDeps { @@ -188,12 +191,15 @@ pub(crate) async fn spawn_task( task.retry_count += 1; // Emit retry event. - emit_event(&failure_deps.event_tx, SchedulerEvent::Failed { - header: task.event_header(), - error: te.message, - will_retry: true, - retry_after: None, - }); + emit_event( + &failure_deps.event_tx, + SchedulerEvent::Failed { + header: task.event_header(), + error: te.message, + will_retry: true, + retry_after: None, + }, + ); // Rebuild context for next attempt. prepared = context::build_task_context(&task, &spawn_ctx); diff --git a/src/scheduler/spawn/completion.rs b/src/scheduler/spawn/completion.rs index 6cea91f..097e6f7 100644 --- a/src/scheduler/spawn/completion.rs +++ b/src/scheduler/spawn/completion.rs @@ -52,10 +52,13 @@ pub(crate) async fn handle_success( } decrement_module(); deps.active.remove(task_id); - emit_event(&deps.event_tx, SchedulerEvent::Waiting { - task_id, - children_count: count, - }); + emit_event( + &deps.event_tx, + SchedulerEvent::Waiting { + task_id, + children_count: count, + }, + ); // Children may have completed before we set waiting. // Re-check to avoid a missed finalization. handle_parent_resolution( @@ -186,11 +189,14 @@ pub(in crate::scheduler) fn emit_completion_events( Some((next, count)) => (Some(next), count), None => (None, task.recurring_execution_count + 1), }; - emit_event(event_tx, SchedulerEvent::RecurringCompleted { - header: task.event_header(), - execution_count: exec_count, - next_run, - }); + emit_event( + event_tx, + SchedulerEvent::RecurringCompleted { + header: task.event_header(), + execution_count: exec_count, + next_run, + }, + ); } emit_event(event_tx, SchedulerEvent::Completed(task.event_header())); diff --git a/src/scheduler/spawn/failure.rs b/src/scheduler/spawn/failure.rs index acd4956..aa79091 100644 --- a/src/scheduler/spawn/failure.rs +++ b/src/scheduler/spawn/failure.rs @@ -156,18 +156,24 @@ pub(crate) async fn handle_failure( let dead_lettered = error.retryable && !will_retry; if dead_lettered { - emit_event(&deps.event_tx, SchedulerEvent::DeadLettered { - header: task.event_header(), - error: error.message.clone(), - retry_count: task.retry_count + 1, - }); + emit_event( + &deps.event_tx, + SchedulerEvent::DeadLettered { + header: task.event_header(), + error: error.message.clone(), + retry_count: task.retry_count + 1, + }, + ); } else { - emit_event(&deps.event_tx, SchedulerEvent::Failed { - header: task.event_header(), - error: error.message.clone(), - will_retry, - retry_after: retry_delay, - }); + emit_event( + &deps.event_tx, + SchedulerEvent::Failed { + header: task.event_header(), + error: error.message.clone(), + will_retry, + retry_after: retry_delay, + }, + ); } deps.work_notify.notify_one(); @@ -204,13 +210,19 @@ async fn propagate_failure(task: &TaskRecord, error: &TaskError, deps: &FailureD match deps.store.fail_dependents(task_id).await { Ok((failed_ids, unblocked_ids)) => { for fid in &failed_ids { - emit_event(&deps.event_tx, SchedulerEvent::DependencyFailed { - task_id: *fid, - failed_dependency: task_id, - }); + emit_event( + &deps.event_tx, + SchedulerEvent::DependencyFailed { + task_id: *fid, + failed_dependency: task_id, + }, + ); } for uid in &unblocked_ids { - emit_event(&deps.event_tx, SchedulerEvent::TaskUnblocked { task_id: *uid }); + emit_event( + &deps.event_tx, + SchedulerEvent::TaskUnblocked { task_id: *uid }, + ); } if !unblocked_ids.is_empty() { deps.work_notify.notify_one(); @@ -231,7 +243,10 @@ async fn propagate_failure(task: &TaskRecord, error: &TaskError, deps: &FailureD if let Some(at) = deps.active.remove(*rid) { at.token.cancel(); let _ = deps.store.delete(*rid).await; - emit_event(&deps.event_tx, SchedulerEvent::Cancelled(at.record.event_header())); + emit_event( + &deps.event_tx, + SchedulerEvent::Cancelled(at.record.event_header()), + ); } } } @@ -255,12 +270,15 @@ async fn propagate_failure(task: &TaskRecord, error: &TaskError, deps: &FailureD "failed to record parent failure" ); } - emit_event(&deps.event_tx, SchedulerEvent::Failed { - header: parent.event_header(), - error: msg, - will_retry: false, - retry_after: None, - }); + emit_event( + &deps.event_tx, + SchedulerEvent::Failed { + header: parent.event_header(), + error: msg, + will_retry: false, + retry_after: None, + }, + ); } else { // Not fail_fast — check if all children done. handle_parent_resolution( diff --git a/src/scheduler/spawn/parent.rs b/src/scheduler/spawn/parent.rs index 71406b5..ad685a9 100644 --- a/src/scheduler/spawn/parent.rs +++ b/src/scheduler/spawn/parent.rs @@ -41,12 +41,15 @@ pub(crate) async fn handle_parent_resolution( { tracing::error!(parent_id, error = %e, "failed to record parent failure"); } - emit_event(event_tx, SchedulerEvent::Failed { - header: parent.event_header(), - error: reason, - will_retry: false, - retry_after: None, - }); + emit_event( + event_tx, + SchedulerEvent::Failed { + header: parent.event_header(), + error: reason, + will_retry: false, + retry_after: None, + }, + ); } } Ok(Some(ParentResolution::StillWaiting)) | Ok(None) => { diff --git a/src/scheduler/submit.rs b/src/scheduler/submit.rs index a7f6cee..ac40314 100644 --- a/src/scheduler/submit.rs +++ b/src/scheduler/submit.rs @@ -64,10 +64,13 @@ impl Scheduler { label: sub.label.clone(), tags: sub.tags.clone(), }; - emit_event(&self.inner.event_tx, SchedulerEvent::Superseded { - old: old_header, - new_task_id: *new_task_id, - }); + emit_event( + &self.inner.event_tx, + SchedulerEvent::Superseded { + old: old_header, + new_task_id: *new_task_id, + }, + ); } if !matches!(outcome, SubmitOutcome::Duplicate | SubmitOutcome::Rejected) { @@ -129,10 +132,13 @@ impl Scheduler { label: sub.label.clone(), tags: sub.tags.clone(), }; - emit_event(&self.inner.event_tx, SchedulerEvent::Superseded { - old: old_header, - new_task_id: *new_task_id, - }); + emit_event( + &self.inner.event_tx, + SchedulerEvent::Superseded { + old: old_header, + new_task_id: *new_task_id, + }, + ); } } @@ -170,10 +176,13 @@ impl Scheduler { if any_changed { let inserted_ids = outcome.inserted(); - emit_event(&self.inner.event_tx, SchedulerEvent::BatchSubmitted { - count: resolved.len(), - inserted_ids, - }); + emit_event( + &self.inner.event_tx, + SchedulerEvent::BatchSubmitted { + count: resolved.len(), + inserted_ids, + }, + ); self.inner.work_notify.notify_one(); } @@ -266,7 +275,10 @@ impl Scheduler { .cancel_to_history_with_record(&at.record) .await?; self.fire_on_cancel(&at.record).await; - emit_event(&self.inner.event_tx, SchedulerEvent::Cancelled(at.record.event_header())); + emit_event( + &self.inner.event_tx, + SchedulerEvent::Cancelled(at.record.event_header()), + ); } } @@ -278,7 +290,10 @@ impl Scheduler { .cancel_to_history_with_record(&at.record) .await?; self.fire_on_cancel(&at.record).await; - emit_event(&self.inner.event_tx, SchedulerEvent::Cancelled(at.record.event_header())); + emit_event( + &self.inner.event_tx, + SchedulerEvent::Cancelled(at.record.event_header()), + ); return Ok(true); } @@ -388,7 +403,10 @@ impl Scheduler { .cancel_to_history_with_record(&at.record) .await?; self.fire_on_cancel(&at.record).await; - emit_event(&self.inner.event_tx, SchedulerEvent::Cancelled(at.record.event_header())); + emit_event( + &self.inner.event_tx, + SchedulerEvent::Cancelled(at.record.event_header()), + ); return Ok(true); } self.inner.store.cancel_recurring(task_id).await @@ -422,7 +440,10 @@ impl Scheduler { if let Some(at) = self.inner.active.remove(replaced_task_id) { at.token.cancel(); self.fire_on_cancel(&at.record).await; - emit_event(&self.inner.event_tx, SchedulerEvent::Cancelled(at.record.event_header())); + emit_event( + &self.inner.event_tx, + SchedulerEvent::Cancelled(at.record.event_header()), + ); } }