From b058c374d15026b99ebf01ea9c3afed4e94f9814 Mon Sep 17 00:00:00 2001 From: DJ Majumdar Date: Tue, 24 Mar 2026 07:02:04 -0700 Subject: [PATCH 1/2] docs: add migration guide for 0.7.0 --- docs/migrating-to-0.7.md | 351 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 351 insertions(+) create mode 100644 docs/migrating-to-0.7.md diff --git a/docs/migrating-to-0.7.md b/docs/migrating-to-0.7.md new file mode 100644 index 0000000..2a56889 --- /dev/null +++ b/docs/migrating-to-0.7.md @@ -0,0 +1,351 @@ +# Migrating from 0.6.x to 0.7.0 + +0.7.0 adds group-level pause/resume, token-bucket rate limiting, priority +aging, weighted fair scheduling, and typed executor memos. It also consolidates +the migration files and normalizes all timestamps to epoch milliseconds, +**requiring a fresh database**. + +--- + +### 1. Database recreation required + +Migrations were consolidated from 9 chronological files into 4 object-oriented +files, and all timestamp columns were converted from `TEXT` to epoch millisecond +`INTEGER`. Existing SQLite databases **cannot be upgraded in-place** — delete +the database file and let the scheduler recreate it on startup. + +This is acceptable for a pre-1.0 release: the `tasks` table is a transient work +queue that drains, and `task_history` is diagnostic. + +--- + +### 2. Executor return type: typed Memo + +`execute()` now returns `Result` instead of `Result<(), TaskError>`, +and `finalize()` receives the memo between the payload and context arguments. +The `Memo` type parameter defaults to `()`, so existing executors only need a +minimal update. + +**Before:** + +```rust +impl TypedExecutor for ThumbnailExec { + async fn execute<'a>( + &'a self, + thumb: Thumbnail, + ctx: DomainTaskContext<'a, Media>, + ) -> Result<(), TaskError> { + // ... + Ok(()) + } + + async fn finalize<'a>( + &'a self, + thumb: Thumbnail, + ctx: DomainTaskContext<'a, Media>, + ) -> Result<(), TaskError> { + Ok(()) + } +} +``` + +**After (no memo):** + +```rust +impl TypedExecutor for ThumbnailExec { + async fn execute<'a>( + &'a self, + thumb: Thumbnail, + ctx: DomainTaskContext<'a, Media>, + ) -> Result<(), TaskError> { + // ... + Ok(()) + } + + async fn finalize<'a>( + &'a self, + thumb: Thumbnail, + _memo: (), // ← new parameter + ctx: DomainTaskContext<'a, Media>, + ) -> Result<(), TaskError> { + Ok(()) + } +} +``` + +**After (with memo):** + +```rust +#[derive(Serialize, Deserialize)] +struct ScanMemo { scan_start_ns: i64, batch_id: u64 } + +impl TypedExecutor for ThumbnailExec { + async fn execute<'a>( + &'a self, + thumb: Thumbnail, + ctx: DomainTaskContext<'a, Media>, + ) -> Result { // ← returns memo + Ok(ScanMemo { scan_start_ns: 42, batch_id: 1 }) + } + + async fn finalize<'a>( + &'a self, + thumb: Thumbnail, + memo: ScanMemo, // ← receives memo + ctx: DomainTaskContext<'a, Media>, + ) -> Result<(), TaskError> { + println!("batch {}", memo.batch_id); + Ok(()) + } +} +``` + +Register memo-bearing executors with `task_memo` instead of `task`: + +```rust +// No memo (unchanged): +domain.task::(ThumbnailExec) + +// With memo: +domain.task_memo(ThumbnailExec) // Memo type inferred from the impl +``` + +--- + +### 3. `SubmitOutcome::Inserted` is now a struct variant + +**Before:** + +```rust +match outcome { + SubmitOutcome::Inserted(id) => { /* ... */ } + // ... +} +``` + +**After:** + +```rust +match outcome { + SubmitOutcome::Inserted { id, group_paused } => { + if group_paused { + println!("task {id} submitted to a paused group"); + } + } + // ... +} +``` + +--- + +### 4. `DispatchGate::admit()` returns `Admission` enum + +Custom dispatch gates must return `Admission` instead of `bool`. + +**Before:** + +```rust +impl DispatchGate for MyGate { + fn admit<'a>( + &'a self, + task: &'a TaskRecord, + ctx: &'a GateContext<'a>, + ) -> BoxFuture<'a, Result> { + Box::pin(async move { Ok(true) }) + } +} +``` + +**After:** + +```rust +use taskmill::Admission; + +impl DispatchGate for MyGate { + fn admit<'a>( + &'a self, + task: &'a TaskRecord, + ctx: &'a GateContext<'a>, + ) -> BoxFuture<'a, Result> { + Box::pin(async move { Ok(Admission::Admit) }) + } +} +``` + +| Old return | New return | +|---|---| +| `Ok(true)` | `Ok(Admission::Admit)` | +| `Ok(false)` | `Ok(Admission::Deny)` | +| — | `Ok(Admission::RateLimited(instant))` | + +--- + +### 5. Tag query methods renamed and return `Vec` + +Four tag-based query methods were renamed and now return task IDs instead of +full `TaskRecord`s: + +| Before | After | +|---|---| +| `tasks_by_tags()` → `Vec` | `task_ids_by_tags()` → `Vec` | +| `tasks_by_tags_with_prefix()` → `Vec` | `task_ids_by_tags_with_prefix()` → `Vec` | +| `tasks_by_tag_key_prefix()` → `Vec` | `task_ids_by_tag_key_prefix()` → `Vec` | +| `tasks_by_tag_key_prefix_with_prefix()` → `Vec` | `task_ids_by_tag_key_prefix_with_prefix()` → `Vec` | + +Fetch full records with `task_by_id()` if needed. + +--- + +### 6. New feature: group pause / resume + +Pause and resume all tasks in a group at runtime. Tasks submitted to a paused +group are inserted as `Paused` and dispatched when the group is resumed. + +```rust +scheduler.pause_group("s3-uploads").await?; +scheduler.resume_group("s3-uploads").await?; + +// Auto-resume after a deadline (~5 s latency): +scheduler.pause_group_until("s3-uploads", deadline).await?; + +// Query state: +let paused: bool = scheduler.is_group_paused("s3-uploads"); +let all: Vec = scheduler.paused_groups(); +``` + +Available on `Scheduler`, `DomainHandle`, and `ModuleHandle`. + +New events: `SchedulerEvent::GroupPaused { .. }` and `GroupResumed { .. }`. + +--- + +### 7. New feature: token-bucket rate limiting + +Apply per-task-type or per-group rate limits. Rate-limited tasks are deferred +(via `run_after`) until the next token is available, avoiding head-of-line +blocking. + +```rust +use taskmill::RateLimit; + +let sched = Scheduler::builder() + .rate_limit("media::upload", RateLimit::per_second(5)) + .group_rate_limit("s3-bucket", RateLimit::per_second(10).with_burst(20)) + .build() + .await?; + +// Reconfigure at runtime: +scheduler.set_rate_limit("media::upload", RateLimit::per_second(20)); +scheduler.remove_rate_limit("media::upload"); + +scheduler.set_group_rate_limit("s3-bucket", RateLimit::per_minute(100)); +scheduler.remove_group_rate_limit("s3-bucket"); +``` + +Available on `Scheduler`, `DomainHandle`, and `ModuleHandle`. + +--- + +### 8. New feature: priority aging + +Prevent task starvation by automatically promoting the effective priority of +long-waiting tasks. The stored priority is never mutated — effective priority is +a pure dispatch-time computation. + +```rust +use taskmill::AgingConfig; + +Scheduler::builder() + .priority_aging(AgingConfig { + grace_period: Duration::from_secs(300), // 5 min before aging starts + aging_interval: Duration::from_secs(60), // one promotion per minute + max_effective_priority: Priority::HIGH, // ceiling + urgent_threshold: None, // optional: bypass weights + }) + .build() + .await?; +``` + +Pause duration is excluded from the aging clock. + +--- + +### 9. New feature: weighted fair scheduling + +Allocate dispatch capacity proportionally across groups. The scheduler is +work-conserving — unused slots are redistributed by global priority order. + +```rust +Scheduler::builder() + .group_weight("api-calls", 3) // 75% of capacity + .group_weight("background", 1) // 25% of capacity + .default_group_weight(1) + .group_minimum_slots("alerts", 2) // guaranteed floor + .build() + .await?; + +// Runtime adjustment: +scheduler.set_group_weight("api-calls", 5); +scheduler.remove_group_weight("api-calls"); +scheduler.set_group_minimum_slots("alerts", 4); +``` + +When combined with `urgent_threshold` in `AgingConfig`, severely aged tasks +bypass group allocation as a safety valve. + +--- + +### 10. New feature: `fail_fast()` on submit builders + +Control whether a parent task fails immediately when any child fails (default), +or waits for all children to complete first. + +```rust +// Wait for all children before resolving the parent: +scheduler.submit( + &TaskSubmission::new("pipeline::assemble") + .fail_fast(false) +).await?; + +// Typed domain API: +media.submit_with(AssembleTask { .. }) + .fail_fast(false) + .await?; +``` + +--- + +### 11. New feature: tag key prefix queries + +Discover tag keys and find tasks by tag key namespace, with proper SQL wildcard +escaping. + +```rust +// Discover tag keys by prefix: +let keys = store.tag_keys_by_prefix("billing.").await?; +// → ["billing.customer_id", "billing.plan"] + +// Count tasks with matching tag keys: +let count = store.count_by_tag_key_prefix("billing.", None).await?; + +// Get task IDs: +let ids = store.task_ids_by_tag_key_prefix("billing.", Some(TaskStatus::Pending)).await?; + +// Domain-scoped: +let keys = billing_handle.tag_keys_by_prefix("billing.").await?; +let cancelled = billing_handle.cancel_by_tag_key_prefix("billing.").await?; +``` + +--- + +### 12. Import changes + +```rust +// New types you may need: +use taskmill::{ + Admission, // DispatchGate return type + AgingConfig, // priority aging configuration + PauseReasons, // bitmask for pause sources + RateLimit, // token-bucket rate limit config +}; +``` From 0d9800070f05e4f4ea0bc6b7597f9ec646f45f38 Mon Sep 17 00:00:00 2001 From: DJ Majumdar Date: Tue, 24 Mar 2026 07:09:40 -0700 Subject: [PATCH 2/2] Update migrating-to-0.7.mdremoved sections that are not public --- docs/migrating-to-0.7.md | 59 ++++++---------------------------------- 1 file changed, 8 insertions(+), 51 deletions(-) diff --git a/docs/migrating-to-0.7.md b/docs/migrating-to-0.7.md index 2a56889..08e4581 100644 --- a/docs/migrating-to-0.7.md +++ b/docs/migrating-to-0.7.md @@ -138,49 +138,7 @@ match outcome { --- -### 4. `DispatchGate::admit()` returns `Admission` enum - -Custom dispatch gates must return `Admission` instead of `bool`. - -**Before:** - -```rust -impl DispatchGate for MyGate { - fn admit<'a>( - &'a self, - task: &'a TaskRecord, - ctx: &'a GateContext<'a>, - ) -> BoxFuture<'a, Result> { - Box::pin(async move { Ok(true) }) - } -} -``` - -**After:** - -```rust -use taskmill::Admission; - -impl DispatchGate for MyGate { - fn admit<'a>( - &'a self, - task: &'a TaskRecord, - ctx: &'a GateContext<'a>, - ) -> BoxFuture<'a, Result> { - Box::pin(async move { Ok(Admission::Admit) }) - } -} -``` - -| Old return | New return | -|---|---| -| `Ok(true)` | `Ok(Admission::Admit)` | -| `Ok(false)` | `Ok(Admission::Deny)` | -| — | `Ok(Admission::RateLimited(instant))` | - ---- - -### 5. Tag query methods renamed and return `Vec` +### 4. Tag query methods renamed and return `Vec` Four tag-based query methods were renamed and now return task IDs instead of full `TaskRecord`s: @@ -196,7 +154,7 @@ Fetch full records with `task_by_id()` if needed. --- -### 6. New feature: group pause / resume +### 5. New feature: group pause / resume Pause and resume all tasks in a group at runtime. Tasks submitted to a paused group are inserted as `Paused` and dispatched when the group is resumed. @@ -219,7 +177,7 @@ New events: `SchedulerEvent::GroupPaused { .. }` and `GroupResumed { .. }`. --- -### 7. New feature: token-bucket rate limiting +### 6. New feature: token-bucket rate limiting Apply per-task-type or per-group rate limits. Rate-limited tasks are deferred (via `run_after`) until the next token is available, avoiding head-of-line @@ -246,7 +204,7 @@ Available on `Scheduler`, `DomainHandle`, and `ModuleHandle`. --- -### 8. New feature: priority aging +### 7. New feature: priority aging Prevent task starvation by automatically promoting the effective priority of long-waiting tasks. The stored priority is never mutated — effective priority is @@ -270,7 +228,7 @@ Pause duration is excluded from the aging clock. --- -### 9. New feature: weighted fair scheduling +### 8. New feature: weighted fair scheduling Allocate dispatch capacity proportionally across groups. The scheduler is work-conserving — unused slots are redistributed by global priority order. @@ -295,7 +253,7 @@ bypass group allocation as a safety valve. --- -### 10. New feature: `fail_fast()` on submit builders +### 9. New feature: `fail_fast()` on submit builders Control whether a parent task fails immediately when any child fails (default), or waits for all children to complete first. @@ -315,7 +273,7 @@ media.submit_with(AssembleTask { .. }) --- -### 11. New feature: tag key prefix queries +### 10. New feature: tag key prefix queries Discover tag keys and find tasks by tag key namespace, with proper SQL wildcard escaping. @@ -338,12 +296,11 @@ let cancelled = billing_handle.cancel_by_tag_key_prefix("billing.").await?; --- -### 12. Import changes +### 11. Import changes ```rust // New types you may need: use taskmill::{ - Admission, // DispatchGate return type AgingConfig, // priority aging configuration PauseReasons, // bitmask for pause sources RateLimit, // token-bucket rate limit config