Skip to content
Merged
97 changes: 31 additions & 66 deletions benches/history.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,70 +5,33 @@
use std::time::Duration;

use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput};
use serde::{Deserialize, Serialize};
use taskmill::{
Domain, DomainKey, DomainTaskContext, Scheduler, SchedulerEvent, TaskError, TaskStore,
TaskSubmission, TypedExecutor, TypedTask,
};
use taskmill::{IoBudget, TaskStore, TaskSubmission};
use tokio::runtime::Runtime;
use tokio_util::sync::CancellationToken;

struct BenchDomain;
impl DomainKey for BenchDomain {
const NAME: &'static str = "bench";
}

#[derive(Serialize, Deserialize)]
struct BenchTask;
impl TypedTask for BenchTask {
type Domain = BenchDomain;
const TASK_TYPE: &'static str = "test";
}

struct NoopExecutor;

impl TypedExecutor<BenchTask> for NoopExecutor {
async fn execute<'a>(
&'a self,
_payload: BenchTask,
_ctx: DomainTaskContext<'a, BenchDomain>,
) -> Result<(), TaskError> {
Ok(())
}
}

/// Build a scheduler and run `n` noop tasks to completion, populating history.
async fn build_scheduler_with_history(n: usize) -> Scheduler {
let sched = Scheduler::builder()
.store(TaskStore::open_memory().await.unwrap())
.domain(Domain::<BenchDomain>::new().task::<BenchTask>(NoopExecutor))
.max_concurrency(32)
.poll_interval(Duration::from_millis(10))
.build()
.await
.unwrap();

/// Task types used to create a realistic multi-type history table.
const TASK_TYPES: &[&str] = &[
"media::thumbnail",
"media::transcode",
"billing::charge",
"billing::refund",
"email::send",
];

/// Populate history directly via store (no scheduler overhead).
/// Distributes tasks across multiple types for realistic selectivity.
async fn store_with_history(n: usize) -> TaskStore {
let store = TaskStore::open_memory().await.unwrap();
let budget = IoBudget::default();
for i in 0..n {
sched
.submit(&TaskSubmission::new("bench::test").key(format!("h-{i}")))
let task_type = TASK_TYPES[i % TASK_TYPES.len()];
store
.submit(&TaskSubmission::new(task_type).key(format!("h-{i}")))
.await
.unwrap();
let task = store.pop_next().await.unwrap().unwrap();
store.complete(task.id, &budget).await.unwrap();
}

let mut rx = sched.subscribe();
let token = CancellationToken::new();
let sched_clone = sched.clone();
let token_clone = token.clone();
tokio::spawn(async move { sched_clone.run(token_clone).await });

let mut completed = 0;
while completed < n {
if let Ok(SchedulerEvent::Completed { .. }) = rx.recv().await {
completed += 1;
}
}

sched
store
}

// ── Benchmarks ──────────────────────────────────────────────────────
Expand All @@ -82,8 +45,7 @@ fn bench_history_query(c: &mut Criterion) {
group.measurement_time(Duration::from_secs(30));

for history_size in [100usize, 1000, 5000] {
let sched = rt.block_on(build_scheduler_with_history(history_size));
let store = sched.store().clone();
let store = rt.block_on(store_with_history(history_size));
group.bench_with_input(
BenchmarkId::from_parameter(history_size),
&history_size,
Expand All @@ -107,6 +69,7 @@ fn bench_history_query(c: &mut Criterion) {
}

/// Aggregate stats query (`COUNT`, `AVG` duration and IO) at varying history sizes.
/// History contains 5 task types; the query filters to one (20% selectivity).
fn bench_history_stats(c: &mut Criterion) {
let rt = Runtime::new().unwrap();
let mut group = c.benchmark_group("history_stats");
Expand All @@ -115,8 +78,7 @@ fn bench_history_stats(c: &mut Criterion) {
group.measurement_time(Duration::from_secs(30));

for history_size in [100usize, 1000, 5000] {
let sched = rt.block_on(build_scheduler_with_history(history_size));
let store = sched.store().clone();
let store = rt.block_on(store_with_history(history_size));
group.bench_with_input(
BenchmarkId::from_parameter(history_size),
&history_size,
Expand All @@ -127,7 +89,7 @@ fn bench_history_stats(c: &mut Criterion) {
async move {
let start = std::time::Instant::now();
for _ in 0..iters {
let _ = store.history_stats("bench::test").await.unwrap();
let _ = store.history_stats("media::thumbnail").await.unwrap();
}
start.elapsed()
}
Expand All @@ -140,6 +102,7 @@ fn bench_history_stats(c: &mut Criterion) {
}

/// Filter-by-type history query at varying history sizes.
/// History contains 5 task types; the query filters to one (20% selectivity).
fn bench_history_by_type(c: &mut Criterion) {
let rt = Runtime::new().unwrap();
let mut group = c.benchmark_group("history_by_type");
Expand All @@ -148,8 +111,7 @@ fn bench_history_by_type(c: &mut Criterion) {
group.measurement_time(Duration::from_secs(30));

for history_size in [100usize, 1000, 5000] {
let sched = rt.block_on(build_scheduler_with_history(history_size));
let store = sched.store().clone();
let store = rt.block_on(store_with_history(history_size));
group.bench_with_input(
BenchmarkId::from_parameter(history_size),
&history_size,
Expand All @@ -160,7 +122,10 @@ fn bench_history_by_type(c: &mut Criterion) {
async move {
let start = std::time::Instant::now();
for _ in 0..iters {
let _ = store.history_by_type("bench::test", 100).await.unwrap();
let _ = store
.history_by_type("media::thumbnail", 100)
.await
.unwrap();
}
start.elapsed()
}
Expand Down
10 changes: 5 additions & 5 deletions benches/tags.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,11 @@ fn bench_submit_with_tags(c: &mut Criterion) {
group.finish();
}

/// `tasks_by_tags` with a single filter at varying queue depths.
/// `task_ids_by_tags` with a single filter at varying queue depths.
/// All tasks match the filter, so result size equals queue depth.
fn bench_query_by_tags(c: &mut Criterion) {
fn bench_query_ids_by_tags(c: &mut Criterion) {
let rt = Runtime::new().unwrap();
let mut group = c.benchmark_group("query_by_tags");
let mut group = c.benchmark_group("query_ids_by_tags");
group.throughput(Throughput::Elements(1));
group.sample_size(10);
group.warm_up_time(Duration::from_secs(1));
Expand All @@ -119,7 +119,7 @@ fn bench_query_by_tags(c: &mut Criterion) {
let start = Instant::now();
for _ in 0..iters {
let _ = store
.tasks_by_tags(&[("bucket", "b0")], None)
.task_ids_by_tags(&[("bucket", "b0")], None)
.await
.unwrap();
}
Expand Down Expand Up @@ -214,7 +214,7 @@ fn bench_tag_values_scan(c: &mut Criterion) {
criterion_group!(
benches,
bench_submit_with_tags,
bench_query_by_tags,
bench_query_ids_by_tags,
bench_count_by_tags,
bench_tag_values_scan,
);
Expand Down
4 changes: 2 additions & 2 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ scheduler.register_state(Arc::new(LibraryState { /* ... */ })).await;

```toml
# Disable platform monitoring
taskmill = { version = "0.4", default-features = false }
taskmill = { version = "0.6", default-features = false }
```

When disabled, you can still provide a custom `ResourceSampler` via `.resource_sampler()`.
Expand Down Expand Up @@ -446,7 +446,7 @@ Scheduler::builder()
| `Domain::<D>::new()` | Create a domain. The domain name is taken from `D::NAME`. Task types are prefixed `"{name}::"`. |
| `task::<T>(executor)` | Register a `TypedExecutor<T>` using `T::TASK_TYPE` as the name. TTL and retry policy from `T::config()` are used automatically. |
| `task_with::<T>(executor, options)` | Register with per-type option overrides (for executor-instance-dependent config). |
| `raw_executor(name, executor)` | Escape hatch: register an untyped `TaskExecutor` by type name. |
| ~~`raw_executor(name, executor)`~~ | **Removed in 0.6.** Use `task::<T>(executor)` with a `TypedExecutor<T>`. |
| `default_priority(p)` | Domain-wide priority for all submissions. |
| `default_retry(policy)` | Domain-wide retry policy. |
| `default_group(group)` | Domain-wide group key. |
Expand Down
10 changes: 7 additions & 3 deletions docs/design.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ flowchart TD
S["submit() / submit_batch()"] --> TS["TaskStore\n(INSERT OR IGNORE)"]
TS --> |SQLite| DB[(tasks table)]
DB --> SCH["Scheduler run loop"]
SCH --> |"tokio::spawn"| E1["Executor + TaskContext"]
SCH --> |"tokio::spawn"| E2["Executor + TaskContext"]
SCH --> |"tokio::spawn"| E1["Executor + DomainTaskContext"]
SCH --> |"tokio::spawn"| E2["Executor + DomainTaskContext"]
E1 --> CF["complete() / fail()"]
E2 --> CF
CF --> HIST[(task_history)]
Expand Down Expand Up @@ -193,8 +193,12 @@ Each cycle, the loop:
Executor returns Err(TaskError)
└─ retryable: false? ──► move to task_history (failed)
└─ retryable: true?
└─ retry_count < max_retries? ──► status → pending, retry_count += 1
└─ retry_count < max_retries?
└─ delay == 0? ──► inline retry (stays running, retry_count += 1, re-execute)
└─ delay > 0? ──► status → pending, retry_count += 1, requeued with backoff
└─ otherwise ──► move to task_history (failed)
```

Retried tasks keep their original priority and dedup key. `max_retries` defaults to 3.

**Inline zero-delay retries:** When the retry delay is zero, the executor re-runs immediately within the same spawned task — avoiding the overhead of returning to the SQLite queue and being re-dispatched. The `retry_count` is persisted to the database, but the task stays in `running` status throughout.
4 changes: 2 additions & 2 deletions docs/glossary.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ Quick reference for terms used throughout the taskmill documentation.
| **Deduplication (dedup)** | Preventing the same task from being queued twice. Taskmill generates a SHA-256 key from the task type and payload; a second submission with the same key is silently ignored. See [Persistence & Recovery](persistence-and-recovery.md#deduplication). |
| **Dispatch** | Moving a task from "waiting in line" (pending) to "actively running." The scheduler dispatches tasks in priority order, subject to concurrency limits and backpressure. |
| **EWMA** | Exponentially Weighted Moving Average — a smoothing technique that gives recent measurements more weight than old ones. Taskmill uses EWMA to smooth resource readings so the scheduler doesn't overreact to momentary spikes. See [IO & Backpressure](io-and-backpressure.md#ewma-smoothing). |
| **TypedExecutor** | Your code that performs the actual work for a task type. Implements `TypedExecutor<T>` and receives a deserialized payload: `async fn execute(&self, payload: T, ctx: &TaskContext)`. Register with `Domain::task::<T>(executor)`. See [Quick Start](quick-start.md#implement-an-executor). |
| **TypedExecutor** | Your code that performs the actual work for a task type. Implements `TypedExecutor<T>` and receives a deserialized payload: `async fn execute(&self, payload: T, ctx: DomainTaskContext<'_, T::Domain>)`. Register with `Domain::task::<T>(executor)`. See [Quick Start](quick-start.md#implement-an-executor). |
| **IO budget** | An estimate of how many bytes a task will read and write (disk and/or network), submitted alongside the task. The scheduler uses IO budgets to avoid overwhelming the disk. See [IO & Backpressure](io-and-backpressure.md#io-budgets-telling-the-scheduler-what-to-expect). |
| **Pile-up prevention** | The mechanism that skips a recurring task instance when the previous instance hasn't been dispatched yet, preventing unbounded queue growth under sustained load. See [Quick Start](quick-start.md#recurring-tasks). |
| **Preemption** | Pausing lower-priority work so higher-priority work can run immediately. Preempted tasks resume automatically once the urgent work finishes. See [Priorities & Preemption](priorities-and-preemption.md#preemption). |
Expand All @@ -31,7 +31,7 @@ Quick reference for terms used throughout the taskmill documentation.
| **Typed task** | A struct that implements the `TypedTask` trait, giving you compile-time type safety for task payloads and configuration. Declares `type Domain: DomainKey` for compile-time domain identity and `fn config() -> TaskTypeConfig` for static defaults (priority, IO budget, TTL, retry policy, etc.). Register with `Domain::task::<T>(executor)` and submit with `handle.submit(task)`. See [Quick Start](quick-start.md#typed-tasks). |
| **TaskTypeConfig** | A struct returned by `TypedTask::config()` that holds static per-task-type defaults — priority, IO budget, group key, TTL, retry policy, duplicate strategy, etc. All fields are `Option`; `None` means "use the next layer's default" (domain default, then scheduler global, then built-in). |
| **DomainSubmitBuilder** | The fluent builder returned by `DomainHandle::submit_with(task)`. Implements `IntoFuture` so bare `.await` applies all defaults; chain override methods (`.priority()`, `.run_after()`, `.parent()`, `.fail_fast()`, etc.) before `.await` to override individual fields for that call only. |
| **Domain-scoped state** | Application state registered on a `Domain` via `.state(value)`, visible only to executors within that domain. `TaskContext::state::<T>()` checks domain state first and falls back to global state registered on `SchedulerBuilder`. See [Configuration](configuration.md#application-state). |
| **Domain-scoped state** | Application state registered on a `Domain` via `.state(value)`, visible only to executors within that domain. `DomainTaskContext::state::<T>()` checks domain state first and falls back to global state registered on `SchedulerBuilder`. See [Configuration](configuration.md#application-state). |
| **TypedEventStream** | A per-task-type event subscription (`TypedEventStream<T>`) created via `handle.task_events::<T>()`. Filters the global scheduler event broadcast to only events matching `T::TASK_TYPE` within the domain. Terminal events include the `TaskHistoryRecord`. |
| **Qualified task type** | The full database-stored task type including the domain prefix, e.g. `"media::thumbnail"`. Required when using store-level query APIs (`history_stats`, `task_lookup`, `avg_throughput`). `DomainHandle` methods apply the prefix automatically, so you typically only need the short form when submitting tasks. |
| **Cross-domain dependency** | A dependency edge where the dependent task and its prerequisite belong to different domains. Functionally identical to same-domain dependencies — the domain boundary does not affect dependency resolution or failure propagation. See [Multi-Module Applications](multi-module-apps.md#cross-module-task-dependencies). |
Expand Down
6 changes: 3 additions & 3 deletions docs/guides/background-service.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,12 @@ impl TypedTask for ProcessImageTask {
## Implement the executor

```rust
use taskmill::{TypedExecutor, TaskContext, TaskError};
use taskmill::{TypedExecutor, DomainTaskContext, TaskError};

struct ImageProcessor;

impl TypedExecutor<ProcessImageTask> for ImageProcessor {
async fn execute(&self, task: ProcessImageTask, ctx: &TaskContext) -> Result<(), TaskError> {
async fn execute(&self, task: ProcessImageTask, ctx: DomainTaskContext<'_, Images>) -> Result<(), TaskError> {
// Read the source image
let data = tokio::fs::read(&task.path).await
.map_err(|e| TaskError::permanent(format!("can't read: {e}")))?;
Expand Down Expand Up @@ -187,7 +187,7 @@ Scheduler::builder()
Disable the default sampler in `Cargo.toml`:

```toml
taskmill = { version = "0.4", default-features = false }
taskmill = { version = "0.6", default-features = false }
```

## Key differences from desktop
Expand Down
6 changes: 3 additions & 3 deletions docs/guides/tauri-upload-queue.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ Add taskmill to your Tauri app's `Cargo.toml`:

```toml
[dependencies]
taskmill = "0.4"
taskmill = "0.6"
serde = { version = "1", features = ["derive"] }
serde_json = "1"
tokio-util = "0.7"
Expand Down Expand Up @@ -74,12 +74,12 @@ impl TypedTask for UploadTask {
The executor does the actual upload work. It reports progress and checks for preemption between chunks.

```rust
use taskmill::{TypedExecutor, TaskContext, TaskError};
use taskmill::{TypedExecutor, DomainTaskContext, TaskError};

struct UploadExecutor;

impl TypedExecutor<UploadTask> for UploadExecutor {
async fn execute(&self, task: UploadTask, ctx: &TaskContext) -> Result<(), TaskError> {
async fn execute(&self, task: UploadTask, ctx: DomainTaskContext<'_, Uploads>) -> Result<(), TaskError> {
let file = tokio::fs::read(&task.file_path).await
.map_err(|e| TaskError::permanent(format!("can't read file: {e}")))?;

Expand Down
2 changes: 1 addition & 1 deletion docs/io-and-backpressure.md
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ let scheduler = Scheduler::builder()
Disable the built-in sampler in `Cargo.toml`:

```toml
taskmill = { version = "0.3", default-features = false }
taskmill = { version = "0.6", default-features = false }
```

## Backpressure: external pressure signals
Expand Down
10 changes: 5 additions & 5 deletions docs/library-modules.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use std::time::Duration;
use serde::{Serialize, Deserialize};
use taskmill::{
Domain, DomainKey, TypedTask, TypedExecutor, TaskTypeConfig,
TaskContext, TaskError, IoBudget, Priority, RetryPolicy,
DomainTaskContext, TaskError, IoBudget, Priority, RetryPolicy,
};

// ── Public API ──────────────────────────────────────────────────
Expand Down Expand Up @@ -154,7 +154,7 @@ Executors can access any type via `ctx.state::<T>()`. It is tempting to grab a d
```rust
// BAD: invisible coupling to the host's global state
impl TypedExecutor<UploadTask> for UploadExecutor {
async fn execute(&self, task: UploadTask, ctx: &TaskContext) -> Result<(), TaskError> {
async fn execute(&self, task: UploadTask, ctx: DomainTaskContext<'_, AcmeCdn>) -> Result<(), TaskError> {
let db = ctx.state::<AppDb>().expect("host must register AppDb");
// ...
}
Expand All @@ -180,7 +180,7 @@ Inside the executor:

```rust
impl TypedExecutor<UploadTask> for UploadExecutor {
async fn execute(&self, task: UploadTask, ctx: &TaskContext) -> Result<(), TaskError> {
async fn execute(&self, task: UploadTask, ctx: DomainTaskContext<'_, AcmeCdn>) -> Result<(), TaskError> {
let config = ctx.state::<AcmeCdnConfig>()
.expect("AcmeCdnConfig is registered by acme_cdn_domain()");
// safe — this domain always registers its own config
Expand Down Expand Up @@ -276,9 +276,9 @@ If your library provides optional integration with another domain, use `ctx.try_
use analytics::{Analytics, TrackEvent};

impl TypedExecutor<UploadTask> for UploadExecutor {
async fn execute(&self, task: UploadTask, ctx: &TaskContext) -> Result<(), TaskError> {
async fn execute(&self, task: UploadTask, ctx: DomainTaskContext<'_, AcmeCdn>) -> Result<(), TaskError> {
// Core upload logic...
do_upload(ctx).await?;
do_upload(&ctx).await?;

// Optional: notify analytics domain if present
if let Some(analytics) = ctx.try_domain::<Analytics>() {
Expand Down
Loading
Loading