diff --git a/benches/history.rs b/benches/history.rs index d00044b..13053a1 100644 --- a/benches/history.rs +++ b/benches/history.rs @@ -5,70 +5,33 @@ use std::time::Duration; use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput}; -use serde::{Deserialize, Serialize}; -use taskmill::{ - Domain, DomainKey, DomainTaskContext, Scheduler, SchedulerEvent, TaskError, TaskStore, - TaskSubmission, TypedExecutor, TypedTask, -}; +use taskmill::{IoBudget, TaskStore, TaskSubmission}; use tokio::runtime::Runtime; -use tokio_util::sync::CancellationToken; - -struct BenchDomain; -impl DomainKey for BenchDomain { - const NAME: &'static str = "bench"; -} - -#[derive(Serialize, Deserialize)] -struct BenchTask; -impl TypedTask for BenchTask { - type Domain = BenchDomain; - const TASK_TYPE: &'static str = "test"; -} - -struct NoopExecutor; - -impl TypedExecutor for NoopExecutor { - async fn execute<'a>( - &'a self, - _payload: BenchTask, - _ctx: DomainTaskContext<'a, BenchDomain>, - ) -> Result<(), TaskError> { - Ok(()) - } -} - -/// Build a scheduler and run `n` noop tasks to completion, populating history. -async fn build_scheduler_with_history(n: usize) -> Scheduler { - let sched = Scheduler::builder() - .store(TaskStore::open_memory().await.unwrap()) - .domain(Domain::::new().task::(NoopExecutor)) - .max_concurrency(32) - .poll_interval(Duration::from_millis(10)) - .build() - .await - .unwrap(); +/// Task types used to create a realistic multi-type history table. +const TASK_TYPES: &[&str] = &[ + "media::thumbnail", + "media::transcode", + "billing::charge", + "billing::refund", + "email::send", +]; + +/// Populate history directly via store (no scheduler overhead). +/// Distributes tasks across multiple types for realistic selectivity. +async fn store_with_history(n: usize) -> TaskStore { + let store = TaskStore::open_memory().await.unwrap(); + let budget = IoBudget::default(); for i in 0..n { - sched - .submit(&TaskSubmission::new("bench::test").key(format!("h-{i}"))) + let task_type = TASK_TYPES[i % TASK_TYPES.len()]; + store + .submit(&TaskSubmission::new(task_type).key(format!("h-{i}"))) .await .unwrap(); + let task = store.pop_next().await.unwrap().unwrap(); + store.complete(task.id, &budget).await.unwrap(); } - - let mut rx = sched.subscribe(); - let token = CancellationToken::new(); - let sched_clone = sched.clone(); - let token_clone = token.clone(); - tokio::spawn(async move { sched_clone.run(token_clone).await }); - - let mut completed = 0; - while completed < n { - if let Ok(SchedulerEvent::Completed { .. }) = rx.recv().await { - completed += 1; - } - } - - sched + store } // ── Benchmarks ────────────────────────────────────────────────────── @@ -82,8 +45,7 @@ fn bench_history_query(c: &mut Criterion) { group.measurement_time(Duration::from_secs(30)); for history_size in [100usize, 1000, 5000] { - let sched = rt.block_on(build_scheduler_with_history(history_size)); - let store = sched.store().clone(); + let store = rt.block_on(store_with_history(history_size)); group.bench_with_input( BenchmarkId::from_parameter(history_size), &history_size, @@ -107,6 +69,7 @@ fn bench_history_query(c: &mut Criterion) { } /// Aggregate stats query (`COUNT`, `AVG` duration and IO) at varying history sizes. +/// History contains 5 task types; the query filters to one (20% selectivity). fn bench_history_stats(c: &mut Criterion) { let rt = Runtime::new().unwrap(); let mut group = c.benchmark_group("history_stats"); @@ -115,8 +78,7 @@ fn bench_history_stats(c: &mut Criterion) { group.measurement_time(Duration::from_secs(30)); for history_size in [100usize, 1000, 5000] { - let sched = rt.block_on(build_scheduler_with_history(history_size)); - let store = sched.store().clone(); + let store = rt.block_on(store_with_history(history_size)); group.bench_with_input( BenchmarkId::from_parameter(history_size), &history_size, @@ -127,7 +89,7 @@ fn bench_history_stats(c: &mut Criterion) { async move { let start = std::time::Instant::now(); for _ in 0..iters { - let _ = store.history_stats("bench::test").await.unwrap(); + let _ = store.history_stats("media::thumbnail").await.unwrap(); } start.elapsed() } @@ -140,6 +102,7 @@ fn bench_history_stats(c: &mut Criterion) { } /// Filter-by-type history query at varying history sizes. +/// History contains 5 task types; the query filters to one (20% selectivity). fn bench_history_by_type(c: &mut Criterion) { let rt = Runtime::new().unwrap(); let mut group = c.benchmark_group("history_by_type"); @@ -148,8 +111,7 @@ fn bench_history_by_type(c: &mut Criterion) { group.measurement_time(Duration::from_secs(30)); for history_size in [100usize, 1000, 5000] { - let sched = rt.block_on(build_scheduler_with_history(history_size)); - let store = sched.store().clone(); + let store = rt.block_on(store_with_history(history_size)); group.bench_with_input( BenchmarkId::from_parameter(history_size), &history_size, @@ -160,7 +122,10 @@ fn bench_history_by_type(c: &mut Criterion) { async move { let start = std::time::Instant::now(); for _ in 0..iters { - let _ = store.history_by_type("bench::test", 100).await.unwrap(); + let _ = store + .history_by_type("media::thumbnail", 100) + .await + .unwrap(); } start.elapsed() } diff --git a/benches/tags.rs b/benches/tags.rs index e1ee8a3..60c82d4 100644 --- a/benches/tags.rs +++ b/benches/tags.rs @@ -96,11 +96,11 @@ fn bench_submit_with_tags(c: &mut Criterion) { group.finish(); } -/// `tasks_by_tags` with a single filter at varying queue depths. +/// `task_ids_by_tags` with a single filter at varying queue depths. /// All tasks match the filter, so result size equals queue depth. -fn bench_query_by_tags(c: &mut Criterion) { +fn bench_query_ids_by_tags(c: &mut Criterion) { let rt = Runtime::new().unwrap(); - let mut group = c.benchmark_group("query_by_tags"); + let mut group = c.benchmark_group("query_ids_by_tags"); group.throughput(Throughput::Elements(1)); group.sample_size(10); group.warm_up_time(Duration::from_secs(1)); @@ -119,7 +119,7 @@ fn bench_query_by_tags(c: &mut Criterion) { let start = Instant::now(); for _ in 0..iters { let _ = store - .tasks_by_tags(&[("bucket", "b0")], None) + .task_ids_by_tags(&[("bucket", "b0")], None) .await .unwrap(); } @@ -214,7 +214,7 @@ fn bench_tag_values_scan(c: &mut Criterion) { criterion_group!( benches, bench_submit_with_tags, - bench_query_by_tags, + bench_query_ids_by_tags, bench_count_by_tags, bench_tag_values_scan, ); diff --git a/docs/configuration.md b/docs/configuration.md index b2015c8..f16ad67 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -322,7 +322,7 @@ scheduler.register_state(Arc::new(LibraryState { /* ... */ })).await; ```toml # Disable platform monitoring -taskmill = { version = "0.4", default-features = false } +taskmill = { version = "0.6", default-features = false } ``` When disabled, you can still provide a custom `ResourceSampler` via `.resource_sampler()`. @@ -446,7 +446,7 @@ Scheduler::builder() | `Domain::::new()` | Create a domain. The domain name is taken from `D::NAME`. Task types are prefixed `"{name}::"`. | | `task::(executor)` | Register a `TypedExecutor` using `T::TASK_TYPE` as the name. TTL and retry policy from `T::config()` are used automatically. | | `task_with::(executor, options)` | Register with per-type option overrides (for executor-instance-dependent config). | -| `raw_executor(name, executor)` | Escape hatch: register an untyped `TaskExecutor` by type name. | +| ~~`raw_executor(name, executor)`~~ | **Removed in 0.6.** Use `task::(executor)` with a `TypedExecutor`. | | `default_priority(p)` | Domain-wide priority for all submissions. | | `default_retry(policy)` | Domain-wide retry policy. | | `default_group(group)` | Domain-wide group key. | diff --git a/docs/design.md b/docs/design.md index 9218811..c6249ff 100644 --- a/docs/design.md +++ b/docs/design.md @@ -64,8 +64,8 @@ flowchart TD S["submit() / submit_batch()"] --> TS["TaskStore\n(INSERT OR IGNORE)"] TS --> |SQLite| DB[(tasks table)] DB --> SCH["Scheduler run loop"] - SCH --> |"tokio::spawn"| E1["Executor + TaskContext"] - SCH --> |"tokio::spawn"| E2["Executor + TaskContext"] + SCH --> |"tokio::spawn"| E1["Executor + DomainTaskContext"] + SCH --> |"tokio::spawn"| E2["Executor + DomainTaskContext"] E1 --> CF["complete() / fail()"] E2 --> CF CF --> HIST[(task_history)] @@ -193,8 +193,12 @@ Each cycle, the loop: Executor returns Err(TaskError) └─ retryable: false? ──► move to task_history (failed) └─ retryable: true? - └─ retry_count < max_retries? ──► status → pending, retry_count += 1 + └─ retry_count < max_retries? + └─ delay == 0? ──► inline retry (stays running, retry_count += 1, re-execute) + └─ delay > 0? ──► status → pending, retry_count += 1, requeued with backoff └─ otherwise ──► move to task_history (failed) ``` Retried tasks keep their original priority and dedup key. `max_retries` defaults to 3. + +**Inline zero-delay retries:** When the retry delay is zero, the executor re-runs immediately within the same spawned task — avoiding the overhead of returning to the SQLite queue and being re-dispatched. The `retry_count` is persisted to the database, but the task stays in `running` status throughout. diff --git a/docs/glossary.md b/docs/glossary.md index 64dff50..56f797b 100644 --- a/docs/glossary.md +++ b/docs/glossary.md @@ -16,7 +16,7 @@ Quick reference for terms used throughout the taskmill documentation. | **Deduplication (dedup)** | Preventing the same task from being queued twice. Taskmill generates a SHA-256 key from the task type and payload; a second submission with the same key is silently ignored. See [Persistence & Recovery](persistence-and-recovery.md#deduplication). | | **Dispatch** | Moving a task from "waiting in line" (pending) to "actively running." The scheduler dispatches tasks in priority order, subject to concurrency limits and backpressure. | | **EWMA** | Exponentially Weighted Moving Average — a smoothing technique that gives recent measurements more weight than old ones. Taskmill uses EWMA to smooth resource readings so the scheduler doesn't overreact to momentary spikes. See [IO & Backpressure](io-and-backpressure.md#ewma-smoothing). | -| **TypedExecutor** | Your code that performs the actual work for a task type. Implements `TypedExecutor` and receives a deserialized payload: `async fn execute(&self, payload: T, ctx: &TaskContext)`. Register with `Domain::task::(executor)`. See [Quick Start](quick-start.md#implement-an-executor). | +| **TypedExecutor** | Your code that performs the actual work for a task type. Implements `TypedExecutor` and receives a deserialized payload: `async fn execute(&self, payload: T, ctx: DomainTaskContext<'_, T::Domain>)`. Register with `Domain::task::(executor)`. See [Quick Start](quick-start.md#implement-an-executor). | | **IO budget** | An estimate of how many bytes a task will read and write (disk and/or network), submitted alongside the task. The scheduler uses IO budgets to avoid overwhelming the disk. See [IO & Backpressure](io-and-backpressure.md#io-budgets-telling-the-scheduler-what-to-expect). | | **Pile-up prevention** | The mechanism that skips a recurring task instance when the previous instance hasn't been dispatched yet, preventing unbounded queue growth under sustained load. See [Quick Start](quick-start.md#recurring-tasks). | | **Preemption** | Pausing lower-priority work so higher-priority work can run immediately. Preempted tasks resume automatically once the urgent work finishes. See [Priorities & Preemption](priorities-and-preemption.md#preemption). | @@ -31,7 +31,7 @@ Quick reference for terms used throughout the taskmill documentation. | **Typed task** | A struct that implements the `TypedTask` trait, giving you compile-time type safety for task payloads and configuration. Declares `type Domain: DomainKey` for compile-time domain identity and `fn config() -> TaskTypeConfig` for static defaults (priority, IO budget, TTL, retry policy, etc.). Register with `Domain::task::(executor)` and submit with `handle.submit(task)`. See [Quick Start](quick-start.md#typed-tasks). | | **TaskTypeConfig** | A struct returned by `TypedTask::config()` that holds static per-task-type defaults — priority, IO budget, group key, TTL, retry policy, duplicate strategy, etc. All fields are `Option`; `None` means "use the next layer's default" (domain default, then scheduler global, then built-in). | | **DomainSubmitBuilder** | The fluent builder returned by `DomainHandle::submit_with(task)`. Implements `IntoFuture` so bare `.await` applies all defaults; chain override methods (`.priority()`, `.run_after()`, `.parent()`, `.fail_fast()`, etc.) before `.await` to override individual fields for that call only. | -| **Domain-scoped state** | Application state registered on a `Domain` via `.state(value)`, visible only to executors within that domain. `TaskContext::state::()` checks domain state first and falls back to global state registered on `SchedulerBuilder`. See [Configuration](configuration.md#application-state). | +| **Domain-scoped state** | Application state registered on a `Domain` via `.state(value)`, visible only to executors within that domain. `DomainTaskContext::state::()` checks domain state first and falls back to global state registered on `SchedulerBuilder`. See [Configuration](configuration.md#application-state). | | **TypedEventStream** | A per-task-type event subscription (`TypedEventStream`) created via `handle.task_events::()`. Filters the global scheduler event broadcast to only events matching `T::TASK_TYPE` within the domain. Terminal events include the `TaskHistoryRecord`. | | **Qualified task type** | The full database-stored task type including the domain prefix, e.g. `"media::thumbnail"`. Required when using store-level query APIs (`history_stats`, `task_lookup`, `avg_throughput`). `DomainHandle` methods apply the prefix automatically, so you typically only need the short form when submitting tasks. | | **Cross-domain dependency** | A dependency edge where the dependent task and its prerequisite belong to different domains. Functionally identical to same-domain dependencies — the domain boundary does not affect dependency resolution or failure propagation. See [Multi-Module Applications](multi-module-apps.md#cross-module-task-dependencies). | diff --git a/docs/guides/background-service.md b/docs/guides/background-service.md index cbad013..1bc6db3 100644 --- a/docs/guides/background-service.md +++ b/docs/guides/background-service.md @@ -47,12 +47,12 @@ impl TypedTask for ProcessImageTask { ## Implement the executor ```rust -use taskmill::{TypedExecutor, TaskContext, TaskError}; +use taskmill::{TypedExecutor, DomainTaskContext, TaskError}; struct ImageProcessor; impl TypedExecutor for ImageProcessor { - async fn execute(&self, task: ProcessImageTask, ctx: &TaskContext) -> Result<(), TaskError> { + async fn execute(&self, task: ProcessImageTask, ctx: DomainTaskContext<'_, Images>) -> Result<(), TaskError> { // Read the source image let data = tokio::fs::read(&task.path).await .map_err(|e| TaskError::permanent(format!("can't read: {e}")))?; @@ -187,7 +187,7 @@ Scheduler::builder() Disable the default sampler in `Cargo.toml`: ```toml -taskmill = { version = "0.4", default-features = false } +taskmill = { version = "0.6", default-features = false } ``` ## Key differences from desktop diff --git a/docs/guides/tauri-upload-queue.md b/docs/guides/tauri-upload-queue.md index 78c7c41..3463c7d 100644 --- a/docs/guides/tauri-upload-queue.md +++ b/docs/guides/tauri-upload-queue.md @@ -17,7 +17,7 @@ Add taskmill to your Tauri app's `Cargo.toml`: ```toml [dependencies] -taskmill = "0.4" +taskmill = "0.6" serde = { version = "1", features = ["derive"] } serde_json = "1" tokio-util = "0.7" @@ -74,12 +74,12 @@ impl TypedTask for UploadTask { The executor does the actual upload work. It reports progress and checks for preemption between chunks. ```rust -use taskmill::{TypedExecutor, TaskContext, TaskError}; +use taskmill::{TypedExecutor, DomainTaskContext, TaskError}; struct UploadExecutor; impl TypedExecutor for UploadExecutor { - async fn execute(&self, task: UploadTask, ctx: &TaskContext) -> Result<(), TaskError> { + async fn execute(&self, task: UploadTask, ctx: DomainTaskContext<'_, Uploads>) -> Result<(), TaskError> { let file = tokio::fs::read(&task.file_path).await .map_err(|e| TaskError::permanent(format!("can't read file: {e}")))?; diff --git a/docs/io-and-backpressure.md b/docs/io-and-backpressure.md index 553e832..0c9d9c5 100644 --- a/docs/io-and-backpressure.md +++ b/docs/io-and-backpressure.md @@ -142,7 +142,7 @@ let scheduler = Scheduler::builder() Disable the built-in sampler in `Cargo.toml`: ```toml -taskmill = { version = "0.3", default-features = false } +taskmill = { version = "0.6", default-features = false } ``` ## Backpressure: external pressure signals diff --git a/docs/library-modules.md b/docs/library-modules.md index 9f46fb3..f427be1 100644 --- a/docs/library-modules.md +++ b/docs/library-modules.md @@ -42,7 +42,7 @@ use std::time::Duration; use serde::{Serialize, Deserialize}; use taskmill::{ Domain, DomainKey, TypedTask, TypedExecutor, TaskTypeConfig, - TaskContext, TaskError, IoBudget, Priority, RetryPolicy, + DomainTaskContext, TaskError, IoBudget, Priority, RetryPolicy, }; // ── Public API ────────────────────────────────────────────────── @@ -154,7 +154,7 @@ Executors can access any type via `ctx.state::()`. It is tempting to grab a d ```rust // BAD: invisible coupling to the host's global state impl TypedExecutor for UploadExecutor { - async fn execute(&self, task: UploadTask, ctx: &TaskContext) -> Result<(), TaskError> { + async fn execute(&self, task: UploadTask, ctx: DomainTaskContext<'_, AcmeCdn>) -> Result<(), TaskError> { let db = ctx.state::().expect("host must register AppDb"); // ... } @@ -180,7 +180,7 @@ Inside the executor: ```rust impl TypedExecutor for UploadExecutor { - async fn execute(&self, task: UploadTask, ctx: &TaskContext) -> Result<(), TaskError> { + async fn execute(&self, task: UploadTask, ctx: DomainTaskContext<'_, AcmeCdn>) -> Result<(), TaskError> { let config = ctx.state::() .expect("AcmeCdnConfig is registered by acme_cdn_domain()"); // safe — this domain always registers its own config @@ -276,9 +276,9 @@ If your library provides optional integration with another domain, use `ctx.try_ use analytics::{Analytics, TrackEvent}; impl TypedExecutor for UploadExecutor { - async fn execute(&self, task: UploadTask, ctx: &TaskContext) -> Result<(), TaskError> { + async fn execute(&self, task: UploadTask, ctx: DomainTaskContext<'_, AcmeCdn>) -> Result<(), TaskError> { // Core upload logic... - do_upload(ctx).await?; + do_upload(&ctx).await?; // Optional: notify analytics domain if present if let Some(analytics) = ctx.try_domain::() { diff --git a/docs/multi-module-apps.md b/docs/multi-module-apps.md index 7f0e78a..84654c0 100644 --- a/docs/multi-module-apps.md +++ b/docs/multi-module-apps.md @@ -76,7 +76,7 @@ let scheduler = Scheduler::builder() State registered on `SchedulerBuilder::app_state()` is **global** — visible to executors in every domain. State registered on `Domain::state()` is **domain-scoped** — visible only to executors in that domain. -`TaskContext::state::()` checks domain-scoped state first, then falls back to global state. +`DomainTaskContext::state::()` checks domain-scoped state first, then falls back to global state. | What | Where to register | Why | |------|-------------------|-----| @@ -158,7 +158,7 @@ Executors can submit to other domains via `ctx.domain::()`: ```rust impl TypedExecutor for FetchExecutor { - async fn execute(&self, task: FetchTask, ctx: &TaskContext) -> Result<(), TaskError> { + async fn execute(&self, task: FetchTask, ctx: DomainTaskContext<'_, Ingest>) -> Result<(), TaskError> { let data = fetch_remote(&task).await?; // Submit a follow-up in a different domain. @@ -285,8 +285,8 @@ let keys = billing.tag_keys_by_prefix("billing.").await?; // Count how many billing tasks are active let count = billing.count_by_tag_key_prefix("billing.", None).await?; -// Fetch those tasks (optionally filter by status) -let tasks = billing.tasks_by_tag_key_prefix("billing.", Some(TaskStatus::Pending)).await?; +// Fetch IDs of matching tasks (optionally filter by status) +let task_ids = billing.task_ids_by_tag_key_prefix("billing.", Some(TaskStatus::Pending)).await?; // Cancel all tasks in the billing namespace let cancelled = billing.cancel_by_tag_key_prefix("billing.").await?; diff --git a/docs/persistence-and-recovery.md b/docs/persistence-and-recovery.md index 1ab249c..bd2bb95 100644 --- a/docs/persistence-and-recovery.md +++ b/docs/persistence-and-recovery.md @@ -212,12 +212,14 @@ All columns from `tasks`, plus: | `actual_net_tx_bytes` | INTEGER | Reported by executor | | `completed_at` | TEXT | ISO 8601 timestamp | | `duration_ms` | INTEGER | Wall-clock duration | -| `status` | TEXT | `completed`, `failed`, `cancelled`, `superseded`, or `expired` | +| `status` | TEXT | `completed`, `failed`, `cancelled`, `superseded`, `expired`, or `dead_letter` | | `ttl_seconds` | INTEGER | TTL duration in seconds (NULL = no TTL) | | `ttl_from` | TEXT DEFAULT 'submission' | When TTL clock started | | `expires_at` | TEXT | ISO 8601 deadline (NULL = no expiry) | -**Index:** `idx_history_type_completed(task_type, completed_at DESC) WHERE status = 'completed'` — for per-type history queries and throughput calculations. +**Indexes:** +- `idx_history_type_completed(task_type, completed_at DESC) WHERE status = 'completed'` — for per-type throughput calculations. +- `idx_history_type(task_type, completed_at DESC)` — covering index for `history_stats`, `history_by_type`, and `avg_throughput` aggregate queries (all statuses). ### WAL mode diff --git a/docs/priorities-and-preemption.md b/docs/priorities-and-preemption.md index 57eb925..8634ed6 100644 --- a/docs/priorities-and-preemption.md +++ b/docs/priorities-and-preemption.md @@ -63,7 +63,7 @@ Executors should check for cancellation at natural yield points — between chun ```rust impl TypedExecutor for MyExecutor { - async fn execute(&self, task: MyTask, ctx: &TaskContext) -> Result<(), TaskError> { + async fn execute(&self, task: MyTask, ctx: DomainTaskContext<'_, MyTask::Domain>) -> Result<(), TaskError> { for chunk in chunks { // Check before each unit of work if ctx.token().is_cancelled() { diff --git a/docs/progress-and-events.md b/docs/progress-and-events.md index 60d26c0..fd35ea6 100644 --- a/docs/progress-and-events.md +++ b/docs/progress-and-events.md @@ -8,7 +8,7 @@ Executors report progress via `ctx.progress()`. This emits events that your UI c ```rust impl TypedExecutor for MyExecutor { - async fn execute(&self, task: MyTask, ctx: &TaskContext) -> Result<(), TaskError> { + async fn execute(&self, task: MyTask, ctx: DomainTaskContext<'_, MyTask::Domain>) -> Result<(), TaskError> { let items = get_work_items(&task); for (i, item) in items.iter().enumerate() { diff --git a/docs/query-apis.md b/docs/query-apis.md index 2950326..a986c80 100644 --- a/docs/query-apis.md +++ b/docs/query-apis.md @@ -40,11 +40,11 @@ These methods are available on `DomainHandle` and are automatically scoped to | `handle.estimated_progress()` | `Vec` | Extrapolated progress for each running task in this domain. | | `handle.byte_progress()` | `Vec` | Live byte-level progress for running tasks in this domain. | | `handle.dead_letter_tasks(limit, offset)` | `Vec` | Paginated dead-letter (permanently failed) tasks for this domain. | -| `handle.tasks_by_tags(filters, status)` | `Vec` | Active tasks in this domain matching the given tag filters and optional status. | +| `handle.task_ids_by_tags(filters, status)` | `Vec` | IDs of active tasks in this domain matching the given tag filters and optional status. Fetch full records via `task_by_id()` if needed. | | `handle.count_by_tag(key, status)` | `Vec<(String, i64)>` | Tag value counts for a given key within this domain. | | `handle.tag_values(key)` | `Vec<(String, i64)>` | Distinct values for a tag key within this domain. | | `handle.tag_keys_by_prefix(prefix)` | `Vec` | Discover distinct tag keys matching a prefix (e.g. `"billing."`) within this domain. | -| `handle.tasks_by_tag_key_prefix(prefix, status)` | `Vec` | Find tasks with any tag key matching the prefix, with optional status filter. | +| `handle.task_ids_by_tag_key_prefix(prefix, status)` | `Vec` | IDs of tasks with any tag key matching the prefix, with optional status filter. Fetch full records via `task_by_id()` if needed. | | `handle.count_by_tag_key_prefix(prefix, status)` | `i64` | Count tasks with any tag key matching the prefix, with optional status filter. | ## Cross-domain operations (Scheduler) diff --git a/docs/quick-start.md b/docs/quick-start.md index 8e2b679..1e3cd6c 100644 --- a/docs/quick-start.md +++ b/docs/quick-start.md @@ -6,14 +6,14 @@ Add taskmill to your `Cargo.toml`: ```toml [dependencies] -taskmill = "0.5" +taskmill = "0.6" ``` To disable platform resource monitoring (e.g., for mobile targets or custom samplers): ```toml [dependencies] -taskmill = { version = "0.5", default-features = false } +taskmill = { version = "0.6", default-features = false } ``` ## Core concepts @@ -43,7 +43,7 @@ scheduler.domain::() <- get a typed handle at runtime Every task type needs code that knows how to do the work. Implement the `TypedExecutor` trait. The scheduler deserializes the payload for you and passes it directly to your executor. -Your executor receives the deserialized payload `T` and a `TaskContext` with everything it needs: +Your executor receives the deserialized payload `T` and a `DomainTaskContext` with everything it needs: - `record()` — the full task record including priority and retry count - `token()` — a cancellation token for responding to preemption (see [Priorities & Preemption](priorities-and-preemption.md#handling-preemption-in-executors)) @@ -51,7 +51,7 @@ Your executor receives the deserialized payload `T` and a `TaskContext` with eve - `state::()` — shared application state registered at build time ```rust -use taskmill::{TypedExecutor, TypedTask, TaskContext, TaskError, TaskTypeConfig, IoBudget, DomainKey}; +use taskmill::{TypedExecutor, TypedTask, DomainTaskContext, TaskError, TaskTypeConfig, IoBudget, DomainKey}; use serde::{Serialize, Deserialize}; // Define the domain identity. @@ -84,7 +84,7 @@ impl TypedExecutor for ImageResizer { async fn execute( &self, task: ResizeTask, - ctx: &TaskContext, + ctx: DomainTaskContext<'_, Media>, ) -> Result<(), TaskError> { // Check for preemption at yield points if ctx.token().is_cancelled() { @@ -258,26 +258,25 @@ media.submit_with(task) Some work is naturally hierarchical — a multipart upload needs to upload individual parts, then call `CompleteMultipartUpload`. Taskmill supports this with child tasks and two-phase execution. -Spawn children from within an executor using `ctx.spawn_child()`. Children are automatically domain-aware: the task type is prefixed and domain defaults are applied. The parent enters a `waiting` state until all children complete, then `finalize()` is called on the parent executor. +Spawn children from within an executor using `ctx.spawn_child_with()`. Children are automatically domain-aware: the task type is prefixed and domain defaults are applied. The parent enters a `waiting` state until all children complete, then `finalize()` is called on the parent executor. ```rust impl TypedExecutor for MultipartUploader { - async fn execute( - &self, upload: MultipartUpload, ctx: &TaskContext, + async fn execute<'a>( + &'a self, upload: MultipartUpload, ctx: DomainTaskContext<'a, Media>, ) -> Result<(), TaskError> { let parts = split_into_parts(&upload); for part in parts { - ctx.spawn_child( - TaskSubmission::new("upload-part") - .payload_json(&part) - .expected_io(IoBudget::net(0, part.size)) - ).await?; + ctx.spawn_child_with(UploadPart { + etag: part.etag.clone(), + size: part.size, + }).await?; } Ok(()) // parent enters 'waiting' state } - async fn finalize( - &self, upload: MultipartUpload, _memo: (), ctx: &TaskContext, + async fn finalize<'a>( + &'a self, upload: MultipartUpload, _memo: (), ctx: DomainTaskContext<'a, Media>, ) -> Result<(), TaskError> { // Called after all children complete complete_multipart_upload(&upload).await @@ -285,12 +284,12 @@ impl TypedExecutor for MultipartUploader { } ``` -For cross-domain children, use `ctx.domain::()` to get a handle and `.parent()` on the submit builder: +For cross-domain children, use `ctx.domain::()` to get a handle and `.child_of(&ctx)` on the submit builder: ```rust ctx.domain::() .submit_with(Upload { /* ... */ }) - .parent(ctx.record().id) + .child_of(&ctx) .await?; ``` diff --git a/migrations/002_task_history.sql b/migrations/002_task_history.sql index 27eada3..63cd2a9 100644 --- a/migrations/002_task_history.sql +++ b/migrations/002_task_history.sql @@ -32,6 +32,12 @@ CREATE TABLE IF NOT EXISTS task_history ( expires_at INTEGER ); +-- Aggregate stats and filtered queries by task type. +-- Covers history_stats (aggregation), history_by_type (ordered pagination), +-- and avg_throughput (filtered subquery). +CREATE INDEX IF NOT EXISTS idx_history_type + ON task_history (task_type, completed_at DESC); + -- IO learning: recent completions by task type. CREATE INDEX IF NOT EXISTS idx_history_type_completed ON task_history (task_type, completed_at DESC) diff --git a/src/domain.rs b/src/domain.rs index 7b351fa..642aea1 100644 --- a/src/domain.rs +++ b/src/domain.rs @@ -640,13 +640,13 @@ impl DomainHandle { self.inner.cancel_where(predicate).await } - /// Find domain tasks matching all specified tag filters (AND semantics). - pub async fn tasks_by_tags( + /// Return IDs of domain tasks matching all specified tag filters (AND semantics). + pub async fn task_ids_by_tags( &self, filters: &[(&str, &str)], status: Option, - ) -> Result, StoreError> { - self.inner.tasks_by_tags(filters, status).await + ) -> Result, StoreError> { + self.inner.task_ids_by_tags(filters, status).await } /// Discover tag keys matching a prefix within this domain. @@ -654,13 +654,13 @@ impl DomainHandle { self.inner.tag_keys_by_prefix(prefix).await } - /// Find domain tasks with any tag key matching the given prefix. - pub async fn tasks_by_tag_key_prefix( + /// Return IDs of domain tasks with any tag key matching the given prefix. + pub async fn task_ids_by_tag_key_prefix( &self, prefix: &str, status: Option, - ) -> Result, StoreError> { - self.inner.tasks_by_tag_key_prefix(prefix, status).await + ) -> Result, StoreError> { + self.inner.task_ids_by_tag_key_prefix(prefix, status).await } /// Count domain tasks with any tag key matching the given prefix. diff --git a/src/lib.rs b/src/lib.rs index 45756cd..49cd729 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -48,7 +48,7 @@ //! 2. **Pending** — the task waits in a priority queue. The scheduler's run loop //! pops the highest-priority pending task on each tick. //! 3. **Running** — the scheduler calls the [`TypedExecutor::execute`] method with the -//! deserialized payload and a [`TaskContext`] containing the task record, +//! deserialized payload and a [`DomainTaskContext`] containing the task record, //! a cancellation token, and a progress reporter. //! 4. **Terminal** — on success the task moves to the history table. On failure, //! a [`retryable`](TaskError::retryable) error requeues it (up to @@ -99,9 +99,9 @@ //! [resource monitoring](SchedulerBuilder::with_resource_monitoring) is enabled, //! compares them against observed system throughput to avoid over-saturating //! the disk or network. Executors report actual IO via -//! [`TaskContext::record_read_bytes`] / [`record_write_bytes`](TaskContext::record_write_bytes) / -//! [`record_net_rx_bytes`](TaskContext::record_net_rx_bytes) / -//! [`record_net_tx_bytes`](TaskContext::record_net_tx_bytes), +//! [`DomainTaskContext::record_read_bytes`] / [`record_write_bytes`](DomainTaskContext::record_write_bytes) / +//! [`record_net_rx_bytes`](DomainTaskContext::record_net_rx_bytes) / +//! [`record_net_tx_bytes`](DomainTaskContext::record_net_tx_bytes), //! which feeds back into historical throughput averages for future scheduling //! decisions. //! @@ -121,7 +121,7 @@ //! //! ## Child tasks & two-phase execution //! -//! An executor can spawn child tasks via [`TaskContext::spawn_child`]. When +//! An executor can spawn child tasks via [`DomainTaskContext::spawn_child_with`]. When //! children exist, the parent enters a **waiting** state after its executor //! returns. Once all children complete, the parent's //! [`TypedExecutor::finalize`] method is called — useful for assembly work @@ -172,10 +172,10 @@ //! Tags are copied to history on all terminal transitions and are included in //! [`TaskEventHeader`] for event subscribers. //! -//! Query by exact tags via [`DomainHandle::tasks_by_tags`] (AND semantics), +//! Query by exact tags via [`DomainHandle::task_ids_by_tags`] (AND semantics), //! or discover and query by tag key prefix via //! [`DomainHandle::tag_keys_by_prefix`], -//! [`DomainHandle::tasks_by_tag_key_prefix`], +//! [`DomainHandle::task_ids_by_tag_key_prefix`], //! [`DomainHandle::count_by_tag_key_prefix`], and //! [`DomainHandle::cancel_by_tag_key_prefix`]. Prefix queries are useful when //! multiple libraries share a scheduler and namespace their tags @@ -287,15 +287,15 @@ //! [`cancel_hook_timeout`](SchedulerBuilder::cancel_hook_timeout)) so //! executors can clean up external resources — for example, aborting an S3 //! multipart upload. Executors can check for cancellation cooperatively via -//! [`TaskContext::check_cancelled`]. +//! [`DomainTaskContext::check_cancelled`]. //! //! Cancelling a parent task cascade-cancels all its children. //! //! ## Byte-level progress //! //! For long-running transfers (file copies, uploads, downloads), executors can -//! report byte-level progress via [`TaskContext::set_bytes_total`] and -//! [`TaskContext::add_bytes`]. The scheduler maintains per-task atomic counters +//! report byte-level progress via [`DomainTaskContext::set_bytes_total`] and +//! [`DomainTaskContext::add_bytes`]. The scheduler maintains per-task atomic counters //! on the `IoTracker` — updates are lock-free and //! impose no overhead on the executor hot path. //! @@ -313,7 +313,7 @@ //! ```ignore //! use taskmill::{ //! Domain, DomainKey, DomainHandle, Scheduler, TypedExecutor, -//! TaskContext, TaskError, TypedTask, TaskTypeConfig, IoBudget, Priority, +//! DomainTaskContext, TaskError, TypedTask, TaskTypeConfig, IoBudget, Priority, //! }; //! use serde::{Serialize, Deserialize}; //! use tokio_util::sync::CancellationToken; @@ -375,7 +375,7 @@ //! ## Shared application state //! //! Register shared services (database pools, HTTP clients, etc.) at build time -//! and retrieve them from any executor via [`TaskContext::state`]. State can be +//! and retrieve them from any executor via [`DomainTaskContext::state`]. State can be //! domain-scoped (checked first) or global (fallback): //! //! ```ignore diff --git a/src/module.rs b/src/module.rs index f8f9f54..c19a3cf 100644 --- a/src/module.rs +++ b/src/module.rs @@ -711,16 +711,16 @@ impl ModuleHandle { .await } - /// Find module tasks matching all specified tag filters (AND semantics). - pub async fn tasks_by_tags( + /// Return IDs of module tasks matching all specified tag filters (AND semantics). + pub async fn task_ids_by_tags( &self, filters: &[(&str, &str)], status: Option, - ) -> Result, StoreError> { + ) -> Result, StoreError> { self.scheduler .inner .store - .tasks_by_tags_with_prefix(&self.prefix, filters, status) + .task_ids_by_tags_with_prefix(&self.prefix, filters, status) .await } @@ -733,16 +733,16 @@ impl ModuleHandle { .await } - /// Find module tasks with any tag key matching the given prefix. - pub async fn tasks_by_tag_key_prefix( + /// Return IDs of module tasks with any tag key matching the given prefix. + pub async fn task_ids_by_tag_key_prefix( &self, prefix: &str, status: Option, - ) -> Result, StoreError> { + ) -> Result, StoreError> { self.scheduler .inner .store - .tasks_by_tag_key_prefix_with_prefix(&self.prefix, prefix, status) + .task_ids_by_tag_key_prefix_with_prefix(&self.prefix, prefix, status) .await } @@ -761,16 +761,16 @@ impl ModuleHandle { /// Cancel module tasks with any tag key matching the given prefix. pub async fn cancel_by_tag_key_prefix(&self, prefix: &str) -> Result, StoreError> { - let tasks = self + let ids = self .scheduler .inner .store - .tasks_by_tag_key_prefix_with_prefix(&self.prefix, prefix, None) + .task_ids_by_tag_key_prefix_with_prefix(&self.prefix, prefix, None) .await?; let mut cancelled = Vec::new(); - for task in &tasks { - if self.scheduler.cancel(task.id).await? { - cancelled.push(task.id); + for id in ids { + if self.scheduler.cancel(id).await? { + cancelled.push(id); } } Ok(cancelled) diff --git a/src/scheduler/builder.rs b/src/scheduler/builder.rs index 4a030eb..28dec4c 100644 --- a/src/scheduler/builder.rs +++ b/src/scheduler/builder.rs @@ -491,6 +491,14 @@ impl SchedulerBuilder { .store(true, std::sync::atomic::Ordering::Relaxed); } + // Check for pre-existing paused tasks (persistent store recovery). + if scheduler.inner.store.paused_count().await.unwrap_or(0) > 0 { + scheduler + .inner + .has_paused_tasks + .store(true, std::sync::atomic::Ordering::Relaxed); + } + Ok(scheduler) } } diff --git a/src/scheduler/control.rs b/src/scheduler/control.rs index ad425df..5f9d884 100644 --- a/src/scheduler/control.rs +++ b/src/scheduler/control.rs @@ -2,7 +2,7 @@ use std::sync::atomic::Ordering as AtomicOrdering; -use super::{Scheduler, SchedulerEvent}; +use super::{emit_event, Scheduler, SchedulerEvent}; impl Scheduler { /// Update max concurrency at runtime (e.g., from adaptive controller or @@ -35,7 +35,7 @@ impl Scheduler { .active .pause_all(&self.inner.store, &self.inner.event_tx) .await; - let _ = self.inner.event_tx.send(SchedulerEvent::Paused); + emit_event(&self.inner.event_tx, SchedulerEvent::Paused); tracing::info!(paused_tasks = count, "scheduler paused"); } @@ -47,7 +47,7 @@ impl Scheduler { pub async fn resume_all(&self) { self.inner.paused.store(false, AtomicOrdering::Release); self.inner.work_notify.notify_one(); - let _ = self.inner.event_tx.send(SchedulerEvent::Resumed); + emit_event(&self.inner.event_tx, SchedulerEvent::Resumed); tracing::info!("scheduler resumed"); } diff --git a/src/scheduler/dispatch.rs b/src/scheduler/dispatch.rs index 5f0dcdb..cb6cf90 100644 --- a/src/scheduler/dispatch.rs +++ b/src/scheduler/dispatch.rs @@ -10,7 +10,7 @@ use crate::registry::IoTracker; use crate::store::TaskStore; use crate::task::TaskRecord; -use super::SchedulerEvent; +use super::{emit_event, SchedulerEvent}; // ── Active Task ──────────────────────────────────────────────────── @@ -268,6 +268,9 @@ async fn cancel_pause_emit( for (id, at) in drained { at.token.cancel(); let _ = store.pause(*id).await; - let _ = event_tx.send(SchedulerEvent::Preempted(at.record.event_header())); + emit_event( + event_tx, + SchedulerEvent::Preempted(at.record.event_header()), + ); } } diff --git a/src/scheduler/mod.rs b/src/scheduler/mod.rs index e3b31af..52b7703 100644 --- a/src/scheduler/mod.rs +++ b/src/scheduler/mod.rs @@ -62,12 +62,39 @@ pub(crate) struct CompletionMsg { pub task: TaskRecord, pub metrics: IoBudget, } + +// ── Failure coalescing ─────────────────────────────────────────── + +/// Message sent from spawned tasks for terminal failures (dead-letter / permanent). +/// +/// Batched by `drain_failures` to reduce per-failure transaction overhead, +/// mirroring the completion coalescing pattern. +pub(crate) struct FailureMsg { + pub task: TaskRecord, + pub error: String, + pub retryable: bool, + pub metrics: IoBudget, +} pub use event::{ SchedulerConfig, SchedulerEvent, SchedulerSnapshot, ShutdownMode, TaskEventHeader, }; pub use gate::GroupLimits; pub use progress::{EstimatedProgress, ProgressReporter, TaskProgress}; +/// Emit a scheduler event only when at least one subscriber is listening. +/// +/// Avoids the broadcast channel's internal locking and allocation overhead +/// when no subscribers exist (common in benchmarks and headless operation). +#[inline] +pub(crate) fn emit_event( + tx: &tokio::sync::broadcast::Sender, + event: SchedulerEvent, +) { + if tx.receiver_count() > 0 { + let _ = tx.send(event); + } +} + // ── Scheduler ─────────────────────────────────────────────────────── /// Shared inner state behind `Arc` so `Scheduler` can be `Clone`. @@ -144,6 +171,10 @@ pub(crate) struct SchedulerInner { /// (leader election pattern) in addition to the run loop. pub(crate) completion_rx: std::sync::Arc>>, + /// Send side of the failure coalescing channel. + pub(crate) failure_tx: tokio::sync::mpsc::UnboundedSender, + /// Receive side (leader election + run loop drain). + pub(crate) failure_rx: std::sync::Arc>>, } /// IO-aware priority scheduler. @@ -240,6 +271,7 @@ impl Scheduler { let (event_tx, _) = tokio::sync::broadcast::channel(256); let (progress_tx, _) = tokio::sync::broadcast::channel(64); let (completion_tx, completion_rx) = tokio::sync::mpsc::unbounded_channel(); + let (failure_tx, failure_rx) = tokio::sync::mpsc::unbounded_channel(); Self { inner: Arc::new(SchedulerInner { store, @@ -271,12 +303,16 @@ impl Scheduler { module_paused, module_caps: RwLock::new(module_caps), module_running, - // Conservative: true on startup so the first cycle checks. - has_paused_tasks: AtomicBool::new(true), + // Start false — set to true only when preemption pauses a task. + // For persistent stores the builder checks for pre-existing + // paused tasks after construction (see SchedulerBuilder::build). + has_paused_tasks: AtomicBool::new(false), // Default to false; builder sets true when safe. fast_dispatch: AtomicBool::new(false), completion_tx, completion_rx: std::sync::Arc::new(Mutex::new(completion_rx)), + failure_tx, + failure_rx: std::sync::Arc::new(Mutex::new(failure_rx)), }), } } diff --git a/src/scheduler/progress.rs b/src/scheduler/progress.rs index 57fc282..47c9088 100644 --- a/src/scheduler/progress.rs +++ b/src/scheduler/progress.rs @@ -38,7 +38,7 @@ use crate::task::TaskRecord; use super::dispatch::ActiveTaskMap; use super::event::TaskEventHeader; -use super::SchedulerEvent; +use super::{emit_event, SchedulerEvent}; // ── Progress Reporter ────────────────────────────────────────────── @@ -94,11 +94,14 @@ impl ProgressReporter { // Update internal progress tracking directly (sync, no broadcast roundtrip). self.active.update_progress(self.header.task_id, clamped); // Broadcast for external subscribers (UI / Tauri). - let _ = self.event_tx.send(SchedulerEvent::Progress { - header: self.header.clone(), - percent: clamped, - message, - }); + emit_event( + &self.event_tx, + SchedulerEvent::Progress { + header: self.header.clone(), + percent: clamped, + message, + }, + ); } /// Report progress as a fraction (completed / total) with an optional message. diff --git a/src/scheduler/queries.rs b/src/scheduler/queries.rs index 218a8ee..b2ca461 100644 --- a/src/scheduler/queries.rs +++ b/src/scheduler/queries.rs @@ -63,15 +63,15 @@ impl Scheduler { .collect() } - /// Find active tasks matching all specified tag filters (AND semantics). + /// Return the IDs of active tasks matching all specified tag filters (AND semantics). /// - /// Delegates to [`TaskStore::tasks_by_tags`](crate::TaskStore::tasks_by_tags). - pub async fn tasks_by_tags( + /// Delegates to [`TaskStore::task_ids_by_tags`](crate::TaskStore::task_ids_by_tags). + pub async fn task_ids_by_tags( &self, filters: &[(&str, &str)], status: Option, - ) -> Result, StoreError> { - self.inner.store.tasks_by_tags(filters, status).await + ) -> Result, StoreError> { + self.inner.store.task_ids_by_tags(filters, status).await } /// Count active tasks grouped by a tag key's values. @@ -97,15 +97,15 @@ impl Scheduler { self.inner.store.tag_keys_by_prefix(prefix).await } - /// Find active tasks with any tag key matching the given prefix. - pub async fn tasks_by_tag_key_prefix( + /// Return IDs of active tasks with any tag key matching the given prefix. + pub async fn task_ids_by_tag_key_prefix( &self, prefix: &str, status: Option, - ) -> Result, StoreError> { + ) -> Result, StoreError> { self.inner .store - .tasks_by_tag_key_prefix(prefix, status) + .task_ids_by_tag_key_prefix(prefix, status) .await } diff --git a/src/scheduler/run_loop.rs b/src/scheduler/run_loop.rs index c1c3f7d..938cc50 100644 --- a/src/scheduler/run_loop.rs +++ b/src/scheduler/run_loop.rs @@ -8,7 +8,7 @@ use tokio_util::sync::CancellationToken; use crate::store::StoreError; use crate::task::IoBudget; -use super::SchedulerEvent; +use super::{emit_event, SchedulerEvent}; use super::gate::GateContext; use super::spawn::{self, SpawnContext}; @@ -32,6 +32,8 @@ impl Scheduler { module_registry: Arc::clone(&self.inner.module_registry), completion_tx: self.inner.completion_tx.clone(), completion_rx: self.inner.completion_rx.clone(), + failure_tx: self.inner.failure_tx.clone(), + failure_rx: self.inner.failure_rx.clone(), } } @@ -51,16 +53,24 @@ impl Scheduler { // instead of peek_next() + gate.admit() + claim_task() (2 SQL). // pop_next() skips expired tasks via its WHERE clause. if self.inner.fast_dispatch.load(AtomicOrdering::Relaxed) { - let Some(task) = self.inner.store.pop_next().await? else { + let Some(mut task) = self.inner.store.pop_next().await? else { return Ok(false); }; + self.inner + .store + .populate_tags(std::slice::from_mut(&mut task)) + .await?; return self.spawn_dispatched_task(task).await; } // Slow path: peek → gate check → claim. - let Some(candidate) = self.inner.store.peek_next().await? else { + let Some(mut candidate) = self.inner.store.peek_next().await? else { return Ok(false); }; + self.inner + .store + .populate_tags(std::slice::from_mut(&mut candidate)) + .await?; // Dispatch-time expiry check: if the candidate has expired, expire // it and retry (return Ok(true) to loop again). @@ -70,10 +80,13 @@ impl Scheduler { let age = (chrono::Utc::now() - task.created_at) .to_std() .unwrap_or_default(); - let _ = self.inner.event_tx.send(SchedulerEvent::TaskExpired { - header: task.event_header(), - age, - }); + emit_event( + &self.inner.event_tx, + SchedulerEvent::TaskExpired { + header: task.event_header(), + age, + }, + ); } return Ok(true); } @@ -219,10 +232,12 @@ impl Scheduler { break; } let available = max - active_count; - let tasks = self.inner.store.pop_next_batch(available).await?; + let mut tasks = self.inner.store.pop_next_batch(available).await?; if tasks.is_empty() { break; } + // Populate tags in one batch query (lazy — not done at pop time). + self.inner.store.populate_tags(&mut tasks).await?; for task in tasks { self.spawn_dispatched_task(task).await?; } @@ -327,10 +342,13 @@ impl Scheduler { let age = (chrono::Utc::now() - task.created_at) .to_std() .unwrap_or_default(); - let _ = self.inner.event_tx.send(SchedulerEvent::TaskExpired { - header: task.event_header(), - age, - }); + emit_event( + &self.inner.event_tx, + SchedulerEvent::TaskExpired { + header: task.event_header(), + age, + }, + ); } if !expired.is_empty() { tracing::info!(count = expired.len(), "expired stale tasks"); @@ -369,16 +387,57 @@ impl Scheduler { .await; } + /// Drain the submit coalescing channel and process any stranded messages. + /// + /// Most submits are handled inline by the submitter (leader election in + /// `TaskStore::submit`). This is a safety-net drain for messages that + /// arrived after the last leader finished processing. + async fn drain_submits(&self) { + let mut batch = Vec::new(); + { + let mut rx = self.inner.store.submit_rx.lock().await; + while let Ok(msg) = rx.try_recv() { + batch.push(msg); + } + } + + if batch.is_empty() { + return; + } + + tracing::debug!(count = batch.len(), "draining submit batch"); + self.inner.store.process_submit_batch(batch).await; + } + + /// Drain the failure channel and process all queued terminal failures + /// in a single batched transaction. + async fn drain_failures(&self) { + let mut batch = Vec::new(); + { + let mut rx = self.inner.failure_rx.lock().await; + while let Ok(msg) = rx.try_recv() { + batch.push(msg); + } + } + + if batch.is_empty() { + return; + } + + tracing::debug!(count = batch.len(), "draining failure batch"); + spawn::process_failure_batch(&batch, &self.inner.store, &self.inner.event_tx).await; + } + /// Resume paused tasks, dispatch finalizers, and dispatch pending work. async fn poll_and_dispatch(&self) { if self.is_paused() { return; } - // Drain queued completions before dispatching new work. - // This ensures completed tasks are processed first, freeing - // dependency edges and unblocking downstream tasks. + // Drain queued submits, completions, and failures before dispatching. + self.drain_submits().await; self.drain_completions().await; + self.drain_failures().await; // Run expiry sweep before dispatching. self.maybe_expire_tasks().await; @@ -459,8 +518,10 @@ impl Scheduler { } } - // Drain any remaining completions before closing the store. + // Drain any remaining submits, completions, and failures before closing. + self.drain_submits().await; self.drain_completions().await; + self.drain_failures().await; // Flush WAL and close the database. self.inner.store.close().await; diff --git a/src/scheduler/spawn.rs b/src/scheduler/spawn.rs index 895108e..df681c7 100644 --- a/src/scheduler/spawn.rs +++ b/src/scheduler/spawn.rs @@ -20,10 +20,11 @@ use crate::registry::ErasedExecutor; use crate::task::TaskRecord; use super::dispatch::ActiveTask; -use super::SchedulerEvent; +use super::{emit_event, SchedulerEvent}; pub(in crate::scheduler) use completion::process_completion_batch; pub(crate) use context::SpawnContext; +pub(in crate::scheduler) use failure::process_failure_batch; /// Whether to call `execute` or `finalize` on the executor. #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -38,6 +39,9 @@ pub(crate) enum ExecutionPhase { /// and spawns the executor on a new tokio task. The actual success and /// failure handling is delegated to [`completion::handle_success`] and /// [`failure::handle_failure`]. +/// +/// For zero-delay retries, the executor is re-invoked inline (without +/// requeueing to pending) to avoid the pop_next SQL round-trip. pub(crate) async fn spawn_task( task: TaskRecord, executor: Arc, @@ -68,9 +72,10 @@ pub(crate) async fn spawn_task( } // Emit dispatched event. - let _ = ctx - .event_tx - .send(SchedulerEvent::Dispatched(task.event_header())); + emit_event( + &ctx.event_tx, + SchedulerEvent::Dispatched(task.event_header()), + ); // Build deps for handlers (cloned from SpawnContext since they move into the spawned future). let completion_deps = completion::CompletionDeps { @@ -83,66 +88,137 @@ pub(crate) async fn spawn_task( completion_rx: ctx.completion_rx.clone(), }; let failure_deps = failure::FailureDeps { - store: ctx.store, + store: ctx.store.clone(), active: ctx.active.clone(), - event_tx: ctx.event_tx, - work_notify: ctx.work_notify, + event_tx: ctx.event_tx.clone(), + work_notify: ctx.work_notify.clone(), max_retries: ctx.max_retries, - registry: ctx.registry, + registry: ctx.registry.clone(), + failure_tx: ctx.failure_tx.clone(), + failure_rx: ctx.failure_rx.clone(), }; + // Keep SpawnContext alive for inline retry context rebuilds. + let spawn_ctx = ctx; + let task_id_for_handle = task.id; - let active_for_handle = ctx.active; + let active_for_handle = spawn_ctx.active.clone(); let token_for_spawn = prepared.token.clone(); - let module_running = ctx.module_running; - let io = prepared.io; + let module_running = spawn_ctx.module_running.clone(); let handle = tokio::spawn(async move { let task_id = task.id; + let mut task = task; + let mut prepared = prepared; // Helper: decrement the module running counter when this task leaves "running". - let decrement_module = || { - if let Some(name) = task.module_name() { - if let Some(counter) = module_running.get(name) { - counter.fetch_sub(1, AtomicOrdering::Relaxed); + // Capture module name upfront to avoid borrowing `task` in the closure. + let module_name_owned = task.module_name().map(|s| s.to_string()); + let module_running_for_dec = module_running.clone(); + let mut decremented = false; + let mut decrement_module_once = || { + if !decremented { + decremented = true; + if let Some(ref name) = module_name_owned { + if let Some(counter) = module_running_for_dec.get(name.as_str()) { + counter.fetch_sub(1, AtomicOrdering::Relaxed); + } } } }; - let result = match phase { - ExecutionPhase::Execute => executor.execute_erased(&prepared.ctx).await, - ExecutionPhase::Finalize => { - executor.finalize_erased(&prepared.ctx).await.map(|()| None) - } // finalize doesn't produce a memo - }; - - // Read IO bytes from the context tracker. - let metrics = io.snapshot(); - - // Drop the context (and its progress reporter) — executor is done. - drop(prepared.ctx); - - match result { - Ok(memo) => { - completion::handle_success( - &task, - phase, - &metrics, - memo, - &completion_deps, - decrement_module, - ) - .await; - } - Err(te) => { - // If cancelled (preempted), the scheduler already paused it. - if token_for_spawn.is_cancelled() { - decrement_module(); - failure_deps.active.remove(task_id); - return; + loop { + let result = match phase { + ExecutionPhase::Execute => executor.execute_erased(&prepared.ctx).await, + ExecutionPhase::Finalize => { + executor.finalize_erased(&prepared.ctx).await.map(|()| None) + } + }; + + // Read IO bytes from the context tracker. + let metrics = prepared.io.snapshot(); + + // Drop the context (and its progress reporter) — executor is done. + drop(prepared.ctx); + + match result { + Ok(memo) => { + completion::handle_success( + &task, + phase, + &metrics, + memo, + &completion_deps, + &mut decrement_module_once, + ) + .await; + break; + } + Err(te) => { + // If cancelled (preempted), the scheduler already paused it. + if token_for_spawn.is_cancelled() { + decrement_module_once(); + failure_deps.active.remove(task_id); + break; + } + + // Check for inline immediate retry: retryable, under max, + // zero delay, execute phase only. + if phase == ExecutionPhase::Execute && te.retryable { + let policy = failure_deps.registry.type_retry_policy(&task.task_type); + let effective_max = task.max_retries.unwrap_or( + policy + .map(|p| p.max_retries) + .unwrap_or(failure_deps.max_retries), + ); + let will_retry = task.retry_count < effective_max; + + if will_retry { + let delay = if let Some(ms) = te.retry_after_ms { + std::time::Duration::from_millis(ms) + } else if let Some(strategy) = policy.map(|p| &p.strategy) { + strategy.delay_for(task.retry_count) + } else { + std::time::Duration::ZERO + }; + + if delay.is_zero() { + // Inline retry: persist retry_count, re-execute. + let _ = failure_deps + .store + .increment_retry(task.id, &te.message) + .await; + task.retry_count += 1; + + // Emit retry event. + emit_event( + &failure_deps.event_tx, + SchedulerEvent::Failed { + header: task.event_header(), + error: te.message, + will_retry: true, + retry_after: None, + }, + ); + + // Rebuild context for next attempt. + prepared = context::build_task_context(&task, &spawn_ctx); + continue; + } + } + } + + // Not inline-retryable — use normal failure path. + failure::handle_failure( + &task, + te, + &metrics, + &failure_deps, + &mut decrement_module_once, + ) + .await; + break; } - - failure::handle_failure(&task, te, &metrics, &failure_deps, decrement_module).await; } } }); diff --git a/src/scheduler/spawn/completion.rs b/src/scheduler/spawn/completion.rs index a3d1f8c..097e6f7 100644 --- a/src/scheduler/spawn/completion.rs +++ b/src/scheduler/spawn/completion.rs @@ -6,7 +6,7 @@ use crate::store::TaskStore; use crate::task::{IoBudget, TaskRecord}; use super::super::dispatch::ActiveTaskMap; -use super::super::{CompletionMsg, SchedulerEvent}; +use super::super::{emit_event, CompletionMsg, SchedulerEvent}; use super::parent::handle_parent_resolution; use super::ExecutionPhase; @@ -32,7 +32,7 @@ pub(crate) async fn handle_success( metrics: &IoBudget, memo: Option>, deps: &CompletionDeps, - decrement_module: impl FnOnce(), + mut decrement_module: impl FnMut(), ) { let task_id = task.id; @@ -52,10 +52,13 @@ pub(crate) async fn handle_success( } decrement_module(); deps.active.remove(task_id); - let _ = deps.event_tx.send(SchedulerEvent::Waiting { - task_id, - children_count: count, - }); + emit_event( + &deps.event_tx, + SchedulerEvent::Waiting { + task_id, + children_count: count, + }, + ); // Children may have completed before we set waiting. // Re-check to avoid a missed finalization. handle_parent_resolution( @@ -104,6 +107,10 @@ pub(crate) async fn handle_success( return; } + // Yield before draining to let more completions accumulate in the + // channel, increasing batch size under high throughput. + tokio::task::yield_now().await; + // Leader election: try to grab the rx lock and drain the batch. // Under high concurrency, only one task wins — it processes the batch // for everyone. Others just wake the scheduler and return. @@ -182,16 +189,19 @@ pub(in crate::scheduler) fn emit_completion_events( Some((next, count)) => (Some(next), count), None => (None, task.recurring_execution_count + 1), }; - let _ = event_tx.send(SchedulerEvent::RecurringCompleted { - header: task.event_header(), - execution_count: exec_count, - next_run, - }); + emit_event( + event_tx, + SchedulerEvent::RecurringCompleted { + header: task.event_header(), + execution_count: exec_count, + next_run, + }, + ); } - let _ = event_tx.send(SchedulerEvent::Completed(task.event_header())); + emit_event(event_tx, SchedulerEvent::Completed(task.event_header())); for uid in unblocked { - let _ = event_tx.send(SchedulerEvent::TaskUnblocked { task_id: *uid }); + emit_event(event_tx, SchedulerEvent::TaskUnblocked { task_id: *uid }); } } diff --git a/src/scheduler/spawn/context.rs b/src/scheduler/spawn/context.rs index 5d20da3..beb5bd3 100644 --- a/src/scheduler/spawn/context.rs +++ b/src/scheduler/spawn/context.rs @@ -39,6 +39,12 @@ pub(crate) struct SpawnContext { pub completion_rx: std::sync::Arc< tokio::sync::Mutex>, >, + /// Failure coalescing channel sender. + pub failure_tx: tokio::sync::mpsc::UnboundedSender, + /// Failure coalescing channel receiver (Arc-wrapped for leader election). + pub failure_rx: std::sync::Arc< + tokio::sync::Mutex>, + >, } /// Output of task context construction — everything needed to insert into the diff --git a/src/scheduler/spawn/failure.rs b/src/scheduler/spawn/failure.rs index 6663709..aa79091 100644 --- a/src/scheduler/spawn/failure.rs +++ b/src/scheduler/spawn/failure.rs @@ -6,7 +6,7 @@ use crate::store::TaskStore; use crate::task::{IoBudget, TaskError, TaskRecord}; use super::super::dispatch::ActiveTaskMap; -use super::super::SchedulerEvent; +use super::super::{emit_event, FailureMsg, SchedulerEvent}; use super::parent::handle_parent_resolution; /// Shared dependencies for the failure handler. @@ -17,6 +17,9 @@ pub(crate) struct FailureDeps { pub work_notify: Arc, pub max_retries: i32, pub registry: Arc, + pub failure_tx: tokio::sync::mpsc::UnboundedSender, + pub failure_rx: + std::sync::Arc>>, } /// Handle a failed task execution. @@ -28,7 +31,7 @@ pub(crate) async fn handle_failure( error: TaskError, metrics: &IoBudget, deps: &FailureDeps, - decrement_module: impl FnOnce(), + mut decrement_module: impl FnMut(), ) { let task_id = task.id; @@ -80,9 +83,9 @@ pub(crate) async fn handle_failure( { tracing::error!(task_id, error = %e, "failed to requeue task for retry"); } - } else { - // Terminal failure (permanent or retries exhausted): needs a - // transaction for the multi-statement INSERT history + DELETE. + } else if task.parent_id.is_some() { + // Terminal failure with parent — must process inline for fail-fast + // cascade and parent resolution ordering. let fail_backoff = crate::store::FailBackoff { strategy: backoff_strategy, executor_retry_after_ms: error.retry_after_ms, @@ -101,6 +104,50 @@ pub(crate) async fn handle_failure( { tracing::error!(task_id, error = %e, "failed to record task failure"); } + } else { + // Terminal failure without parent — batch via channel. + let msg = FailureMsg { + task: task.clone(), + error: error.message.clone(), + retryable: error.retryable, + metrics: *metrics, + }; + + if deps.failure_tx.send(msg).is_err() { + // Channel closed — fall back to inline. + tracing::error!(task_id, "failure channel closed — processing inline"); + let fail_backoff = crate::store::FailBackoff { + strategy: backoff_strategy, + executor_retry_after_ms: error.retry_after_ms, + }; + if let Err(e) = deps + .store + .fail_with_record( + task, + &error.message, + error.retryable, + effective_max_retries, + metrics, + &fail_backoff, + ) + .await + { + tracing::error!(task_id, error = %e, "inline failure recording failed"); + } + } else { + // Leader election: try to grab the rx lock and drain the batch. + if let Ok(mut rx) = deps.failure_rx.try_lock() { + let mut batch = Vec::new(); + while let Ok(m) = rx.try_recv() { + batch.push(m); + } + drop(rx); + + if !batch.is_empty() { + process_failure_batch(&batch, &deps.store, &deps.event_tx).await; + } + } + } } // Remove from active tracking AFTER the store write completes. @@ -109,18 +156,24 @@ pub(crate) async fn handle_failure( let dead_lettered = error.retryable && !will_retry; if dead_lettered { - let _ = deps.event_tx.send(SchedulerEvent::DeadLettered { - header: task.event_header(), - error: error.message.clone(), - retry_count: task.retry_count + 1, - }); + emit_event( + &deps.event_tx, + SchedulerEvent::DeadLettered { + header: task.event_header(), + error: error.message.clone(), + retry_count: task.retry_count + 1, + }, + ); } else { - let _ = deps.event_tx.send(SchedulerEvent::Failed { - header: task.event_header(), - error: error.message.clone(), - will_retry, - retry_after: retry_delay, - }); + emit_event( + &deps.event_tx, + SchedulerEvent::Failed { + header: task.event_header(), + error: error.message.clone(), + will_retry, + retry_after: retry_delay, + }, + ); } deps.work_notify.notify_one(); @@ -130,6 +183,26 @@ pub(crate) async fn handle_failure( } } +/// Process a batch of terminal failures: write to store and emit events. +/// +/// Called from both the spawned task (leader election) and the run loop +/// (`drain_failures`). +pub(in crate::scheduler) async fn process_failure_batch( + batch: &[FailureMsg], + store: &TaskStore, + _event_tx: &tokio::sync::broadcast::Sender, +) { + let items: Vec<(&TaskRecord, &str, bool, &IoBudget)> = batch + .iter() + .map(|m| (&m.task, m.error.as_str(), m.retryable, &m.metrics)) + .collect(); + + if let Err(e) = store.fail_batch(&items).await { + tracing::error!(error = %e, "batch failure recording failed"); + // Events are emitted by handle_failure directly, so nothing else needed. + } +} + /// Propagate a permanent failure to dependents and handle fail-fast parent logic. async fn propagate_failure(task: &TaskRecord, error: &TaskError, deps: &FailureDeps) { let task_id = task.id; @@ -137,15 +210,19 @@ async fn propagate_failure(task: &TaskRecord, error: &TaskError, deps: &FailureD match deps.store.fail_dependents(task_id).await { Ok((failed_ids, unblocked_ids)) => { for fid in &failed_ids { - let _ = deps.event_tx.send(SchedulerEvent::DependencyFailed { - task_id: *fid, - failed_dependency: task_id, - }); + emit_event( + &deps.event_tx, + SchedulerEvent::DependencyFailed { + task_id: *fid, + failed_dependency: task_id, + }, + ); } for uid in &unblocked_ids { - let _ = deps - .event_tx - .send(SchedulerEvent::TaskUnblocked { task_id: *uid }); + emit_event( + &deps.event_tx, + SchedulerEvent::TaskUnblocked { task_id: *uid }, + ); } if !unblocked_ids.is_empty() { deps.work_notify.notify_one(); @@ -166,9 +243,10 @@ async fn propagate_failure(task: &TaskRecord, error: &TaskError, deps: &FailureD if let Some(at) = deps.active.remove(*rid) { at.token.cancel(); let _ = deps.store.delete(*rid).await; - let _ = deps - .event_tx - .send(SchedulerEvent::Cancelled(at.record.event_header())); + emit_event( + &deps.event_tx, + SchedulerEvent::Cancelled(at.record.event_header()), + ); } } } @@ -192,12 +270,15 @@ async fn propagate_failure(task: &TaskRecord, error: &TaskError, deps: &FailureD "failed to record parent failure" ); } - let _ = deps.event_tx.send(SchedulerEvent::Failed { - header: parent.event_header(), - error: msg, - will_retry: false, - retry_after: None, - }); + emit_event( + &deps.event_tx, + SchedulerEvent::Failed { + header: parent.event_header(), + error: msg, + will_retry: false, + retry_after: None, + }, + ); } else { // Not fail_fast — check if all children done. handle_parent_resolution( diff --git a/src/scheduler/spawn/parent.rs b/src/scheduler/spawn/parent.rs index ee3e941..ad685a9 100644 --- a/src/scheduler/spawn/parent.rs +++ b/src/scheduler/spawn/parent.rs @@ -6,7 +6,7 @@ use crate::store::TaskStore; use crate::task::{IoBudget, ParentResolution}; use super::super::dispatch::ActiveTaskMap; -use super::super::SchedulerEvent; +use super::super::{emit_event, SchedulerEvent}; /// Check if a waiting parent is ready for finalization or has failed, /// and dispatch the finalize phase if ready. @@ -41,12 +41,15 @@ pub(crate) async fn handle_parent_resolution( { tracing::error!(parent_id, error = %e, "failed to record parent failure"); } - let _ = event_tx.send(SchedulerEvent::Failed { - header: parent.event_header(), - error: reason, - will_retry: false, - retry_after: None, - }); + emit_event( + event_tx, + SchedulerEvent::Failed { + header: parent.event_header(), + error: reason, + will_retry: false, + retry_after: None, + }, + ); } } Ok(Some(ParentResolution::StillWaiting)) | Ok(None) => { diff --git a/src/scheduler/submit.rs b/src/scheduler/submit.rs index 08d5608..ac40314 100644 --- a/src/scheduler/submit.rs +++ b/src/scheduler/submit.rs @@ -13,7 +13,7 @@ use crate::task::{ }; use super::progress::ProgressReporter; -use super::{Scheduler, SchedulerEvent}; +use super::{emit_event, Scheduler, SchedulerEvent}; impl Scheduler { /// Resolve the effective TTL for a submission. @@ -64,10 +64,13 @@ impl Scheduler { label: sub.label.clone(), tags: sub.tags.clone(), }; - let _ = self.inner.event_tx.send(SchedulerEvent::Superseded { - old: old_header, - new_task_id: *new_task_id, - }); + emit_event( + &self.inner.event_tx, + SchedulerEvent::Superseded { + old: old_header, + new_task_id: *new_task_id, + }, + ); } if !matches!(outcome, SubmitOutcome::Duplicate | SubmitOutcome::Rejected) { @@ -129,10 +132,13 @@ impl Scheduler { label: sub.label.clone(), tags: sub.tags.clone(), }; - let _ = self.inner.event_tx.send(SchedulerEvent::Superseded { - old: old_header, - new_task_id: *new_task_id, - }); + emit_event( + &self.inner.event_tx, + SchedulerEvent::Superseded { + old: old_header, + new_task_id: *new_task_id, + }, + ); } } @@ -170,10 +176,13 @@ impl Scheduler { if any_changed { let inserted_ids = outcome.inserted(); - let _ = self.inner.event_tx.send(SchedulerEvent::BatchSubmitted { - count: resolved.len(), - inserted_ids, - }); + emit_event( + &self.inner.event_tx, + SchedulerEvent::BatchSubmitted { + count: resolved.len(), + inserted_ids, + }, + ); self.inner.work_notify.notify_one(); } @@ -266,10 +275,10 @@ impl Scheduler { .cancel_to_history_with_record(&at.record) .await?; self.fire_on_cancel(&at.record).await; - let _ = self - .inner - .event_tx - .send(SchedulerEvent::Cancelled(at.record.event_header())); + emit_event( + &self.inner.event_tx, + SchedulerEvent::Cancelled(at.record.event_header()), + ); } } @@ -281,10 +290,10 @@ impl Scheduler { .cancel_to_history_with_record(&at.record) .await?; self.fire_on_cancel(&at.record).await; - let _ = self - .inner - .event_tx - .send(SchedulerEvent::Cancelled(at.record.event_header())); + emit_event( + &self.inner.event_tx, + SchedulerEvent::Cancelled(at.record.event_header()), + ); return Ok(true); } @@ -320,18 +329,19 @@ impl Scheduler { /// Cancel all active tasks matching a tag key-value pair. /// - /// Finds tasks via [`TaskStore::tasks_by_tags`](crate::TaskStore::tasks_by_tags) and cancels each one. + /// Uses [`TaskStore::task_ids_by_tags`](crate::TaskStore::task_ids_by_tags) (ID-only query) + /// to avoid full record deserialization and tag population overhead. /// Returns the ids of tasks that were successfully cancelled. pub async fn cancel_by_tag(&self, key: &str, value: &str) -> Result, StoreError> { - let tasks = self + let ids = self .inner .store - .tasks_by_tags(&[(key, value)], None) + .task_ids_by_tags(&[(key, value)], None) .await?; let mut cancelled = Vec::new(); - for task in &tasks { - if self.cancel(task.id).await? { - cancelled.push(task.id); + for id in ids { + if self.cancel(id).await? { + cancelled.push(id); } } Ok(cancelled) @@ -339,18 +349,18 @@ impl Scheduler { /// Cancel all active tasks that have any tag key matching the given prefix. /// - /// Finds tasks via [`crate::TaskStore::tasks_by_tag_key_prefix`] and cancels each. + /// Uses [`crate::TaskStore::task_ids_by_tag_key_prefix`] for ID-only lookup. /// Returns the ids of tasks that were successfully cancelled. pub async fn cancel_by_tag_key_prefix(&self, prefix: &str) -> Result, StoreError> { - let tasks = self + let ids = self .inner .store - .tasks_by_tag_key_prefix(prefix, None) + .task_ids_by_tag_key_prefix(prefix, None) .await?; let mut cancelled = Vec::new(); - for task in &tasks { - if self.cancel(task.id).await? { - cancelled.push(task.id); + for id in ids { + if self.cancel(id).await? { + cancelled.push(id); } } Ok(cancelled) @@ -393,10 +403,10 @@ impl Scheduler { .cancel_to_history_with_record(&at.record) .await?; self.fire_on_cancel(&at.record).await; - let _ = self - .inner - .event_tx - .send(SchedulerEvent::Cancelled(at.record.event_header())); + emit_event( + &self.inner.event_tx, + SchedulerEvent::Cancelled(at.record.event_header()), + ); return Ok(true); } self.inner.store.cancel_recurring(task_id).await @@ -430,10 +440,10 @@ impl Scheduler { if let Some(at) = self.inner.active.remove(replaced_task_id) { at.token.cancel(); self.fire_on_cancel(&at.record).await; - let _ = self - .inner - .event_tx - .send(SchedulerEvent::Cancelled(at.record.event_header())); + emit_event( + &self.inner.event_tx, + SchedulerEvent::Cancelled(at.record.event_header()), + ); } } diff --git a/src/store/hierarchy.rs b/src/store/hierarchy.rs index 1505113..7c1fe75 100644 --- a/src/store/hierarchy.rs +++ b/src/store/hierarchy.rs @@ -428,7 +428,11 @@ mod tests { .tag("env", "prod") .tag("region", "us-east"); let parent_id = store.submit(&parent_sub).await.unwrap().id().unwrap(); - let parent = store.pop_next().await.unwrap().unwrap(); + let mut parent = store.pop_next().await.unwrap().unwrap(); + store + .populate_tags(std::slice::from_mut(&mut parent)) + .await + .unwrap(); let ctx = ParentContext { created_at: parent.created_at, @@ -462,7 +466,11 @@ mod tests { .tag("env", "prod") .tag("region", "us-east"); let parent_id = store.submit(&parent_sub).await.unwrap().id().unwrap(); - let parent = store.pop_next().await.unwrap().unwrap(); + let mut parent = store.pop_next().await.unwrap().unwrap(); + store + .populate_tags(std::slice::from_mut(&mut parent)) + .await + .unwrap(); let ctx = ParentContext { created_at: parent.created_at, diff --git a/src/store/lifecycle/tests.rs b/src/store/lifecycle/tests.rs index 0394fd3..52458e4 100644 --- a/src/store/lifecycle/tests.rs +++ b/src/store/lifecycle/tests.rs @@ -286,8 +286,9 @@ async fn tags_copied_to_history_on_complete() { let task = store.pop_next().await.unwrap().unwrap(); store.complete(task.id, &IoBudget::default()).await.unwrap(); - let hist = store.history_by_key(&sub.effective_key()).await.unwrap(); + let mut hist = store.history_by_key(&sub.effective_key()).await.unwrap(); assert_eq!(hist.len(), 1); + store.populate_history_tags(&mut hist).await.unwrap(); assert_eq!(hist[0].tags.get("env").unwrap(), "staging"); assert_eq!(hist[0].tags.get("owner").unwrap(), "alice"); } @@ -313,8 +314,9 @@ async fn tags_copied_to_history_on_fail() { .await .unwrap(); - let hist = store.failed_tasks(10).await.unwrap(); + let mut hist = store.failed_tasks(10).await.unwrap(); assert_eq!(hist.len(), 1); + store.populate_history_tags(&mut hist).await.unwrap(); assert_eq!(hist[0].tags.get("region").unwrap(), "us-west"); } @@ -328,9 +330,10 @@ async fn tags_copied_to_history_on_cancel() { let id = store.submit(&sub).await.unwrap().id().unwrap(); store.cancel_to_history(id).await.unwrap(); - let hist = store.history_by_key(&sub.effective_key()).await.unwrap(); + let mut hist = store.history_by_key(&sub.effective_key()).await.unwrap(); assert_eq!(hist.len(), 1); assert_eq!(hist[0].status, HistoryStatus::Cancelled); + store.populate_history_tags(&mut hist).await.unwrap(); assert_eq!(hist[0].tags.get("priority_class").unwrap(), "low"); } @@ -352,9 +355,10 @@ async fn tags_copied_to_history_on_expire() { let expired = store.expire_tasks().await.unwrap(); assert!(!expired.is_empty()); - let hist = store.history_by_key(&sub.effective_key()).await.unwrap(); + let mut hist = store.history_by_key(&sub.effective_key()).await.unwrap(); assert_eq!(hist.len(), 1); assert_eq!(hist[0].status, HistoryStatus::Expired); + store.populate_history_tags(&mut hist).await.unwrap(); assert_eq!(hist[0].tags.get("source").unwrap(), "cron"); } @@ -369,7 +373,11 @@ async fn tags_preserved_on_recurring_requeue() { .recurring(Duration::from_secs(3600)); store.submit(&sub).await.unwrap(); - let task = store.pop_next().await.unwrap().unwrap(); + let mut task = store.pop_next().await.unwrap().unwrap(); + store + .populate_tags(std::slice::from_mut(&mut task)) + .await + .unwrap(); assert_eq!(task.tags.get("schedule").unwrap(), "hourly"); store @@ -391,7 +399,11 @@ async fn tags_in_pop_next() { .tag("color", "blue"); store.submit(&sub).await.unwrap(); - let task = store.pop_next().await.unwrap().unwrap(); + let mut task = store.pop_next().await.unwrap().unwrap(); + store + .populate_tags(std::slice::from_mut(&mut task)) + .await + .unwrap(); assert_eq!(task.tags.get("color").unwrap(), "blue"); } diff --git a/src/store/lifecycle/transitions.rs b/src/store/lifecycle/transitions.rs index 25f16f6..2940a98 100644 --- a/src/store/lifecycle/transitions.rs +++ b/src/store/lifecycle/transitions.rs @@ -42,11 +42,7 @@ impl TaskStore { .fetch_optional(&self.pool) .await?; - let mut record = row.as_ref().map(row_to_task_record); - if let Some(ref mut r) = record { - self.populate_tags(std::slice::from_mut(r)).await?; - } - Ok(record) + Ok(row.as_ref().map(row_to_task_record)) } /// Atomically claim a specific pending task by id, setting it to running. @@ -107,7 +103,12 @@ impl TaskStore { .bind(id) .execute(&self.pool) .await?; - Ok(result.rows_affected() > 0) + let claimed = result.rows_affected() > 0; + if claimed { + self.has_running + .store(true, std::sync::atomic::Ordering::Relaxed); + } + Ok(claimed) } /// Atomically claim up to `limit` highest-priority pending tasks and mark @@ -150,8 +151,11 @@ impl TaskStore { .fetch_all(&self.pool) .await?; - let mut records: Vec = rows.iter().map(row_to_task_record).collect(); - self.populate_tags(&mut records).await?; + let records: Vec = rows.iter().map(row_to_task_record).collect(); + if !records.is_empty() { + self.has_running + .store(true, std::sync::atomic::Ordering::Relaxed); + } Ok(records) } @@ -161,6 +165,10 @@ impl TaskStore { /// /// For tasks with `ttl_from = 'first_attempt'`, sets `expires_at` on /// the first pop. + /// + /// Tags are **not** populated — callers needing tags should call + /// [`populate_tags`](Self::populate_tags) explicitly or use + /// [`task_by_id`](Self::task_by_id). pub async fn pop_next(&self) -> Result, StoreError> { let now_ms = chrono::Utc::now().timestamp_millis(); let row = sqlx::query( @@ -189,9 +197,10 @@ impl TaskStore { .fetch_optional(&self.pool) .await?; - let mut record = row.map(|r| row_to_task_record(&r)); - if let Some(ref mut r) = record { - self.populate_tags(std::slice::from_mut(r)).await?; + let record = row.map(|r| row_to_task_record(&r)); + if record.is_some() { + self.has_running + .store(true, std::sync::atomic::Ordering::Relaxed); } Ok(record) } @@ -724,6 +733,93 @@ impl TaskStore { Ok(()) } + /// Batch-process terminal failures in a single transaction. + /// + /// Each item is a `(task, error, retryable)` triple that has already been + /// determined to be terminal (retries exhausted or non-retryable). The + /// method inserts history rows and deletes active tasks in one + /// `BEGIN IMMEDIATE` / `COMMIT` cycle, amortizing SQLite's WAL sync. + pub async fn fail_batch( + &self, + items: &[(&crate::task::TaskRecord, &str, bool, &IoBudget)], + ) -> Result<(), StoreError> { + if items.is_empty() { + return Ok(()); + } + if items.len() == 1 { + let (task, error, retryable, metrics) = items[0]; + return self + .fail_with_record(task, error, retryable, 0, metrics, &FailBackoff::default()) + .await; + } + + let skip_tags = !self.has_tags.load(std::sync::atomic::Ordering::Relaxed); + + tracing::debug!(count = items.len(), "store.fail_batch: BEGIN tx"); + let mut conn = self.begin_write().await?; + + // Step 1: Insert history rows (per-task — each has unique binds). + for &(task, error, retryable, metrics) in items { + let status = if retryable { + HistoryStatus::DeadLetter + } else { + HistoryStatus::Failed + }; + let duration_ms = compute_duration_ms(task); + insert_history( + &mut conn, + task, + status, + metrics, + duration_ms, + Some(error), + skip_tags, + ) + .await?; + } + + // Step 2: Batch-delete active task rows. + let ids: Vec = items.iter().map(|(t, _, _, _)| t.id).collect(); + let placeholders = ids.iter().map(|_| "?").collect::>().join(","); + let sql = format!("DELETE FROM tasks WHERE id IN ({placeholders})"); + let mut q = sqlx::query(&sql); + for id in &ids { + q = q.bind(id); + } + q.execute(&mut *conn).await?; + + // Step 3: Batch-delete orphaned tags. + if !skip_tags { + let tag_sql = format!("DELETE FROM task_tags WHERE task_id IN ({placeholders})"); + let mut tq = sqlx::query(&tag_sql); + for id in &ids { + tq = tq.bind(id); + } + tq.execute(&mut *conn).await?; + } + + sqlx::query("COMMIT").execute(&mut *conn).await?; + drop(conn); + tracing::debug!(count = items.len(), "store.fail_batch: COMMIT ok"); + + self.maybe_prune().await; + Ok(()) + } + + /// Increment retry count in-place without changing task status. + /// + /// Used by inline immediate retries: the task stays `running` and is + /// re-executed directly in the same spawned future, avoiding the + /// requeue-to-pending → pop_next round-trip through SQLite. + pub async fn increment_retry(&self, task_id: i64, error: &str) -> Result<(), StoreError> { + sqlx::query("UPDATE tasks SET retry_count = retry_count + 1, last_error = ? WHERE id = ?") + .bind(error) + .bind(task_id) + .execute(&self.pool) + .await?; + Ok(()) + } + /// Requeue a task for retry without a transaction. /// /// The single UPDATE is atomically safe and avoids `BEGIN IMMEDIATE` + diff --git a/src/store/mod.rs b/src/store/mod.rs index 59a2590..5ce55d9 100644 --- a/src/store/mod.rs +++ b/src/store/mod.rs @@ -32,12 +32,24 @@ mod submit; pub use lifecycle::FailBackoff; use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; +use std::sync::Arc; use serde::{Deserialize, Serialize}; use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions, SqliteSynchronous}; use sqlx::SqlitePool; +use tokio::sync::Mutex; -use crate::task::MAX_PAYLOAD_BYTES; +use crate::task::{SubmitOutcome, TaskSubmission, MAX_PAYLOAD_BYTES}; + +/// Message sent through the submit coalescing channel. +/// +/// Each caller packs its [`TaskSubmission`] and a oneshot response channel. +/// The leader drains the channel and processes the batch in a single +/// SQLite transaction, then sends individual results back. +pub(crate) struct SubmitMsg { + pub submission: TaskSubmission, + pub response_tx: tokio::sync::oneshot::Sender>, +} /// Serde-friendly error type for Tauri IPC and API boundaries. /// @@ -171,6 +183,14 @@ pub struct TaskStore { /// Fast-path flag: `false` means no tasks with `parent_id` have been /// submitted, so `active_children_count` checks can be skipped. pub(crate) has_hierarchy: std::sync::Arc, + /// Fast-path flag: `false` means no task has ever transitioned to + /// `running`, so the requeue UPDATE in `skip_existing` can be skipped + /// (there are no running tasks to mark for requeue). + pub(crate) has_running: Arc, + /// Send side of the submit coalescing channel. + pub(crate) submit_tx: tokio::sync::mpsc::UnboundedSender, + /// Receive side, `Arc`-wrapped for leader election (try_lock pattern). + pub(crate) submit_rx: Arc>>, } impl TaskStore { @@ -193,14 +213,20 @@ impl TaskStore { .connect_with(opts) .await?; + let (submit_tx, submit_rx) = tokio::sync::mpsc::unbounded_channel(); let store = Self { pool, retention_policy: config.retention_policy, prune_interval: config.prune_interval, - completion_count: std::sync::Arc::new(AtomicU64::new(0)), + completion_count: Arc::new(AtomicU64::new(0)), // Conservative for file-backed stores that may have existing tags/hierarchy. - has_tags: std::sync::Arc::new(AtomicBool::new(true)), - has_hierarchy: std::sync::Arc::new(AtomicBool::new(true)), + has_tags: Arc::new(AtomicBool::new(true)), + has_hierarchy: Arc::new(AtomicBool::new(true)), + // Conservative: file-backed stores may have running tasks from a + // previous session (before recover_running resets them). + has_running: Arc::new(AtomicBool::new(true)), + submit_tx, + submit_rx: Arc::new(Mutex::new(submit_rx)), }; store.migrate().await?; store.recover_running().await?; @@ -220,14 +246,18 @@ impl TaskStore { .connect_with(opts) .await?; + let (submit_tx, submit_rx) = tokio::sync::mpsc::unbounded_channel(); let store = Self { pool, retention_policy: Some(RetentionPolicy::MaxCount(10_000)), prune_interval: 100, - completion_count: std::sync::Arc::new(AtomicU64::new(0)), + completion_count: Arc::new(AtomicU64::new(0)), // In-memory stores start empty — no tags or hierarchy to query. - has_tags: std::sync::Arc::new(AtomicBool::new(false)), - has_hierarchy: std::sync::Arc::new(AtomicBool::new(false)), + has_tags: Arc::new(AtomicBool::new(false)), + has_hierarchy: Arc::new(AtomicBool::new(false)), + has_running: Arc::new(AtomicBool::new(false)), + submit_tx, + submit_rx: Arc::new(Mutex::new(submit_rx)), }; store.migrate().await?; Ok(store) diff --git a/src/store/query/history.rs b/src/store/query/history.rs index 5beba59..1733f13 100644 --- a/src/store/query/history.rs +++ b/src/store/query/history.rs @@ -8,6 +8,8 @@ use crate::task::{TaskHistoryRecord, TypeStats}; impl TaskStore { /// Look up a history record by its row id. + /// + /// Tags are populated eagerly (single-record lookup is cheap). pub async fn history_by_id(&self, id: i64) -> Result, StoreError> { let row = sqlx::query("SELECT * FROM task_history WHERE id = ?") .bind(id) @@ -21,6 +23,9 @@ impl TaskStore { } /// Recent history entries, newest first. + /// + /// Tags are **not** populated — call [`populate_history_tags`](Self::populate_history_tags) + /// if needed. pub async fn history( &self, limit: i32, @@ -32,12 +37,12 @@ impl TaskStore { .bind(offset) .fetch_all(&self.pool) .await?; - let mut records: Vec = rows.iter().map(row_to_history_record).collect(); - self.populate_history_tags(&mut records).await?; - Ok(records) + Ok(rows.iter().map(row_to_history_record).collect()) } /// History filtered by task type. + /// + /// Tags are **not** populated. pub async fn history_by_type( &self, task_type: &str, @@ -50,16 +55,12 @@ impl TaskStore { .bind(limit) .fetch_all(&self.pool) .await?; - let mut records: Vec = rows.iter().map(row_to_history_record).collect(); - self.populate_history_tags(&mut records).await?; - Ok(records) + Ok(rows.iter().map(row_to_history_record).collect()) } /// Most recent history record for a specific key. /// - /// Used by [`TypedEventStream`](crate::domain::TypedEventStream) to - /// attach the `TaskHistoryRecord` to terminal events without modifying - /// the internal broadcast channel. + /// Tags are populated eagerly (single-record lookup). pub async fn latest_history_by_key( &self, key: &str, @@ -78,18 +79,20 @@ impl TaskStore { } /// History for a specific key (all past runs of that key). + /// + /// Tags are **not** populated. pub async fn history_by_key(&self, key: &str) -> Result, StoreError> { let rows = sqlx::query("SELECT * FROM task_history WHERE key = ? ORDER BY completed_at DESC") .bind(key) .fetch_all(&self.pool) .await?; - let mut records: Vec = rows.iter().map(row_to_history_record).collect(); - self.populate_history_tags(&mut records).await?; - Ok(records) + Ok(rows.iter().map(row_to_history_record).collect()) } /// Dead-lettered tasks from history filtered by `task_type` prefix. + /// + /// Tags are **not** populated. pub async fn dead_letter_tasks_by_prefix( &self, prefix: &str, @@ -105,12 +108,12 @@ impl TaskStore { .bind(offset) .fetch_all(&self.pool) .await?; - let mut records: Vec = rows.iter().map(row_to_history_record).collect(); - self.populate_history_tags(&mut records).await?; - Ok(records) + Ok(rows.iter().map(row_to_history_record).collect()) } /// Dead-lettered tasks from history (retries exhausted). + /// + /// Tags are **not** populated. pub async fn dead_letter_tasks( &self, limit: i64, @@ -123,12 +126,12 @@ impl TaskStore { .bind(offset) .fetch_all(&self.pool) .await?; - let mut records: Vec = rows.iter().map(row_to_history_record).collect(); - self.populate_history_tags(&mut records).await?; - Ok(records) + Ok(rows.iter().map(row_to_history_record).collect()) } /// Failed tasks from history. + /// + /// Tags are **not** populated. pub async fn failed_tasks(&self, limit: i32) -> Result, StoreError> { let rows = sqlx::query( "SELECT * FROM task_history WHERE status = 'failed' ORDER BY completed_at DESC LIMIT ?", @@ -136,9 +139,7 @@ impl TaskStore { .bind(limit) .fetch_all(&self.pool) .await?; - let mut records: Vec = rows.iter().map(row_to_history_record).collect(); - self.populate_history_tags(&mut records).await?; - Ok(records) + Ok(rows.iter().map(row_to_history_record).collect()) } /// Aggregate stats for a task type from completed history. diff --git a/src/store/query/tags.rs b/src/store/query/tags.rs index 4c54dd6..616bdc3 100644 --- a/src/store/query/tags.rs +++ b/src/store/query/tags.rs @@ -1,8 +1,6 @@ //! Tag-based queries: filtering, aggregation, and prefix discovery by task tags. -use crate::store::row_mapping::row_to_task_record; use crate::store::{StoreError, TaskStore}; -use crate::task::TaskRecord; /// Escape SQL LIKE wildcards in `prefix` and append `%` for a safe prefix pattern. /// @@ -20,38 +18,45 @@ fn escape_like_prefix(prefix: &str) -> String { } impl TaskStore { - /// Find active tasks matching all specified tag filters (AND semantics). + /// Build the INNER JOIN fragment for tag filters. + fn build_tag_join_sql( + select: &str, + filters: &[(&str, &str)], + status: Option<&crate::task::TaskStatus>, + ) -> String { + let mut sql = format!("SELECT {select} FROM tasks t"); + for (i, _) in filters.iter().enumerate() { + sql.push_str(&format!( + " INNER JOIN task_tags tt{i} ON t.id = tt{i}.task_id AND tt{i}.key = ? AND tt{i}.value = ?" + )); + } + if let Some(s) = status { + sql.push_str(&format!(" WHERE t.status = '{}'", s.as_str())); + } + sql + } + + /// Return the IDs of active tasks matching all specified tag filters (AND semantics). /// /// Each `(key, value)` pair adds an INNER JOIN, so only tasks matching /// **all** filters are returned. Optionally filter by status. - pub async fn tasks_by_tags( + pub async fn task_ids_by_tags( &self, filters: &[(&str, &str)], status: Option, - ) -> Result, StoreError> { + ) -> Result, StoreError> { if filters.is_empty() { return Ok(Vec::new()); } - let mut sql = String::from("SELECT t.* FROM tasks t"); - for (i, _) in filters.iter().enumerate() { - sql.push_str(&format!( - " INNER JOIN task_tags tt{i} ON t.id = tt{i}.task_id AND tt{i}.key = ? AND tt{i}.value = ?" - )); - } - if let Some(ref s) = status { - sql.push_str(&format!(" WHERE t.status = '{}'", s.as_str())); - } - sql.push_str(" ORDER BY t.priority ASC, t.id ASC"); + let sql = Self::build_tag_join_sql("t.id", filters, status.as_ref()); - let mut q = sqlx::query(&sql); + let mut q = sqlx::query_as::<_, (i64,)>(&sql); for (key, value) in filters { q = q.bind(key).bind(value); } let rows = q.fetch_all(&self.pool).await?; - let mut records: Vec = rows.iter().map(row_to_task_record).collect(); - self.populate_tags(&mut records).await?; - Ok(records) + Ok(rows.into_iter().map(|(id,)| id).collect()) } /// Count active tasks matching all specified tag filters (AND semantics). @@ -95,17 +100,17 @@ impl TaskStore { Ok(rows) } - /// Find active tasks matching a `task_type` prefix **and** all tag filters. + /// Return IDs of active tasks matching a `task_type` prefix **and** all tag filters. /// - /// If `filters` is empty, returns all tasks matching the prefix. - pub async fn tasks_by_tags_with_prefix( + /// If `filters` is empty, returns all task IDs matching the prefix. + pub async fn task_ids_by_tags_with_prefix( &self, prefix: &str, filters: &[(&str, &str)], status: Option, - ) -> Result, StoreError> { + ) -> Result, StoreError> { let pattern = format!("{prefix}%"); - let mut sql = String::from("SELECT t.* FROM tasks t"); + let mut sql = String::from("SELECT t.id FROM tasks t"); for (i, _) in filters.iter().enumerate() { sql.push_str(&format!( " INNER JOIN task_tags tt{i} ON t.id = tt{i}.task_id AND tt{i}.key = ? AND tt{i}.value = ?" @@ -115,17 +120,14 @@ impl TaskStore { if let Some(ref s) = status { sql.push_str(&format!(" AND t.status = '{}'", s.as_str())); } - sql.push_str(" ORDER BY t.priority ASC, t.id ASC"); - let mut q = sqlx::query(&sql); + let mut q = sqlx::query_as::<_, (i64,)>(&sql); for (key, value) in filters { q = q.bind(key).bind(value); } q = q.bind(&pattern); let rows = q.fetch_all(&self.pool).await?; - let mut records: Vec = rows.iter().map(row_to_task_record).collect(); - self.populate_tags(&mut records).await?; - Ok(records) + Ok(rows.into_iter().map(|(id,)| id).collect()) } /// Count active tasks grouped by a tag key's values, filtered by `task_type` prefix. @@ -259,46 +261,44 @@ impl TaskStore { Ok(rows.into_iter().map(|(k,)| k).collect()) } - /// Find active tasks that have any tag key matching the prefix. + /// Return IDs of active tasks that have any tag key matching the prefix. /// - /// Optionally filter by status. Returns tasks ordered by priority. + /// Optionally filter by status. /// LIKE wildcards in the prefix are escaped — only true prefix matching. - pub async fn tasks_by_tag_key_prefix( + pub async fn task_ids_by_tag_key_prefix( &self, prefix: &str, status: Option, - ) -> Result, StoreError> { + ) -> Result, StoreError> { let pattern = escape_like_prefix(prefix); let mut sql = String::from( - "SELECT t.* FROM tasks t \ + "SELECT t.id FROM tasks t \ WHERE EXISTS (SELECT 1 FROM task_tags tt \ WHERE tt.task_id = t.id AND tt.key LIKE ? ESCAPE '\\')", ); if let Some(ref s) = status { sql.push_str(&format!(" AND t.status = '{}'", s.as_str())); } - sql.push_str(" ORDER BY t.priority ASC, t.id ASC"); - let rows = sqlx::query(&sql) + let rows = sqlx::query_as::<_, (i64,)>(&sql) .bind(&pattern) .fetch_all(&self.pool) .await?; - let mut records: Vec = rows.iter().map(row_to_task_record).collect(); - self.populate_tags(&mut records).await?; - Ok(records) + Ok(rows.into_iter().map(|(id,)| id).collect()) } - /// Find active tasks that have any tag key matching the prefix, scoped to a task_type prefix. - pub async fn tasks_by_tag_key_prefix_with_prefix( + /// Return IDs of active tasks that have any tag key matching the prefix, + /// scoped to a task_type prefix. + pub async fn task_ids_by_tag_key_prefix_with_prefix( &self, task_type_prefix: &str, prefix: &str, status: Option, - ) -> Result, StoreError> { + ) -> Result, StoreError> { let pattern = escape_like_prefix(prefix); let type_pattern = format!("{task_type_prefix}%"); let mut sql = String::from( - "SELECT t.* FROM tasks t \ + "SELECT t.id FROM tasks t \ WHERE EXISTS (SELECT 1 FROM task_tags tt \ WHERE tt.task_id = t.id AND tt.key LIKE ? ESCAPE '\\') \ AND t.task_type LIKE ?", @@ -306,16 +306,13 @@ impl TaskStore { if let Some(ref s) = status { sql.push_str(&format!(" AND t.status = '{}'", s.as_str())); } - sql.push_str(" ORDER BY t.priority ASC, t.id ASC"); - let rows = sqlx::query(&sql) + let rows = sqlx::query_as::<_, (i64,)>(&sql) .bind(&pattern) .bind(&type_pattern) .fetch_all(&self.pool) .await?; - let mut records: Vec = rows.iter().map(row_to_task_record).collect(); - self.populate_tags(&mut records).await?; - Ok(records) + Ok(rows.into_iter().map(|(id,)| id).collect()) } /// Count active tasks that have any tag key matching the prefix. diff --git a/src/store/query/tests.rs b/src/store/query/tests.rs index d6db544..9481330 100644 --- a/src/store/query/tests.rs +++ b/src/store/query/tests.rs @@ -151,7 +151,7 @@ async fn prune_by_count() { // ── Tag query tests ─────────────────────────────────────────────── #[tokio::test] -async fn tasks_by_tags_single_filter() { +async fn task_ids_by_tags_single_filter() { let store = test_store().await; store @@ -171,18 +171,21 @@ async fn tasks_by_tags_single_filter() { .await .unwrap(); - let results = store.tasks_by_tags(&[("env", "prod")], None).await.unwrap(); + let results = store + .task_ids_by_tags(&[("env", "prod")], None) + .await + .unwrap(); assert_eq!(results.len(), 2); let results = store - .tasks_by_tags(&[("env", "staging")], None) + .task_ids_by_tags(&[("env", "staging")], None) .await .unwrap(); assert_eq!(results.len(), 1); } #[tokio::test] -async fn tasks_by_tags_multiple_filters_and() { +async fn task_ids_by_tags_multiple_filters_and() { let store = test_store().await; store @@ -215,14 +218,14 @@ async fn tasks_by_tags_multiple_filters_and() { // AND semantics: only task matching both filters. let results = store - .tasks_by_tags(&[("env", "prod"), ("region", "us")], None) + .task_ids_by_tags(&[("env", "prod"), ("region", "us")], None) .await .unwrap(); assert_eq!(results.len(), 1); // With status filter. let results = store - .tasks_by_tags(&[("env", "prod")], Some(TaskStatus::Pending)) + .task_ids_by_tags(&[("env", "prod")], Some(TaskStatus::Pending)) .await .unwrap(); assert_eq!(results.len(), 2); @@ -326,7 +329,7 @@ async fn tag_keys_by_prefix_discovers_keys() { } #[tokio::test] -async fn tasks_by_tag_key_prefix_finds_tasks() { +async fn task_ids_by_tag_key_prefix_finds_tasks() { let store = test_store().await; store @@ -356,17 +359,20 @@ async fn tasks_by_tag_key_prefix_finds_tasks() { .unwrap(); let tasks = store - .tasks_by_tag_key_prefix("billing.", None) + .task_ids_by_tag_key_prefix("billing.", None) .await .unwrap(); assert_eq!(tasks.len(), 2); // tp-1 and tp-2 - let tasks = store.tasks_by_tag_key_prefix("media.", None).await.unwrap(); + let tasks = store + .task_ids_by_tag_key_prefix("media.", None) + .await + .unwrap(); assert_eq!(tasks.len(), 2); // tp-2 and tp-3 } #[tokio::test] -async fn tasks_by_tag_key_prefix_with_status_filter() { +async fn task_ids_by_tag_key_prefix_with_status_filter() { let store = test_store().await; store @@ -379,13 +385,13 @@ async fn tasks_by_tag_key_prefix_with_status_filter() { .unwrap(); let tasks = store - .tasks_by_tag_key_prefix("billing.", Some(TaskStatus::Pending)) + .task_ids_by_tag_key_prefix("billing.", Some(TaskStatus::Pending)) .await .unwrap(); assert_eq!(tasks.len(), 1); let tasks = store - .tasks_by_tag_key_prefix("billing.", Some(TaskStatus::Running)) + .task_ids_by_tag_key_prefix("billing.", Some(TaskStatus::Running)) .await .unwrap(); assert_eq!(tasks.len(), 0); @@ -514,7 +520,7 @@ async fn tag_keys_by_prefix_with_prefix_scopes_to_task_type() { } #[tokio::test] -async fn tasks_by_tag_key_prefix_with_prefix_scopes_to_task_type() { +async fn task_ids_by_tag_key_prefix_with_prefix_scopes_to_task_type() { let store = test_store().await; store @@ -535,12 +541,18 @@ async fn tasks_by_tag_key_prefix_with_prefix_scopes_to_task_type() { .unwrap(); // Both tasks have billing.* tags, but scoped query returns only the billing module's task. - let tasks = store - .tasks_by_tag_key_prefix_with_prefix("billing::", "billing.", None) + let ids = store + .task_ids_by_tag_key_prefix_with_prefix("billing::", "billing.", None) .await .unwrap(); - assert_eq!(tasks.len(), 1); - assert_eq!(tasks[0].task_type, "billing::charge"); + assert_eq!(ids.len(), 1); + + // Unscoped returns both. + let all_ids = store + .task_ids_by_tag_key_prefix("billing.", None) + .await + .unwrap(); + assert_eq!(all_ids.len(), 2); } #[tokio::test] diff --git a/src/store/submit/dedup.rs b/src/store/submit/dedup.rs index 66baa06..a7eb40a 100644 --- a/src/store/submit/dedup.rs +++ b/src/store/submit/dedup.rs @@ -7,10 +7,14 @@ use crate::store::StoreError; use crate::task::{SubmitOutcome, TaskSubmission, TtlFrom}; /// Default dedup behaviour: try priority upgrade, then requeue, then no-op. +/// +/// When `skip_requeue` is `true` the running/paused requeue UPDATE is elided +/// because no task has ever transitioned to `running` in this store instance. pub(super) async fn skip_existing( conn: &mut sqlx::pool::PoolConnection, key: &str, priority: i32, + skip_requeue: bool, ) -> Result { // Try to upgrade priority on pending/paused tasks. let row = sqlx::query( @@ -28,6 +32,12 @@ pub(super) async fn skip_existing( return Ok(SubmitOutcome::Upgraded(r.get("id"))); } + // Fast path: no tasks have ever been dispatched, so there can be no + // running/paused tasks to mark for requeue. + if skip_requeue { + return Ok(SubmitOutcome::Duplicate); + } + // Dedup hit on running/paused task — mark for re-queue. let row = sqlx::query( "UPDATE tasks SET requeue = 1, requeue_priority = ? diff --git a/src/store/submit/mod.rs b/src/store/submit/mod.rs index d0ac2f2..15089c6 100644 --- a/src/store/submit/mod.rs +++ b/src/store/submit/mod.rs @@ -56,6 +56,7 @@ pub(crate) async fn submit_one( sub: &TaskSubmission, has_tags_flag: Option<&std::sync::atomic::AtomicBool>, has_hierarchy_flag: Option<&std::sync::atomic::AtomicBool>, + skip_requeue: bool, ) -> Result { if let Some(ref err) = sub.payload_error { return Err(StoreError::Serialization(err.clone())); @@ -173,7 +174,7 @@ pub(crate) async fn submit_one( match sub.on_duplicate { DuplicateStrategy::Reject => Ok(SubmitOutcome::Rejected), DuplicateStrategy::Supersede => dedup::supersede_existing(conn, sub, &key).await, - DuplicateStrategy::Skip => dedup::skip_existing(conn, &key, priority).await, + DuplicateStrategy::Skip => dedup::skip_existing(conn, &key, priority, skip_requeue).await, } } @@ -187,7 +188,15 @@ impl TaskStore { /// /// When `sub.key` is `None`, the dedup key is auto-generated by hashing /// the task type and payload. + /// + /// **Coalescing:** Concurrent callers are automatically batched into a + /// single SQLite transaction (leader election pattern), giving batch-level + /// throughput without API changes. pub async fn submit(&self, sub: &TaskSubmission) -> Result { + // Pre-validate before touching the channel or the database. + if let Some(ref err) = sub.payload_error { + return Err(StoreError::Serialization(err.clone())); + } if let Some(ref p) = sub.payload { if p.len() > MAX_PAYLOAD_BYTES { return Err(StoreError::PayloadTooLarge); @@ -195,20 +204,118 @@ impl TaskStore { } validate_tags(&sub.tags)?; + // Try to become the leader. If we win, drain any stranded channel + // messages and process everything in a single transaction. + if let Ok(mut rx_guard) = self.submit_rx.try_lock() { + let mut stranded: Vec = Vec::new(); + while let Ok(m) = rx_guard.try_recv() { + stranded.push(m); + } + drop(rx_guard); + + if stranded.is_empty() { + // Uncontended fast path — submit directly without channel + // overhead (no clone, no oneshot). + return self.submit_direct(sub).await; + } + + // Stranded messages exist — batch them together with ours. + let (tx, rx) = tokio::sync::oneshot::channel(); + stranded.push(super::SubmitMsg { + submission: sub.clone(), + response_tx: tx, + }); + self.process_submit_batch(stranded).await; + return rx + .await + .map_err(|_| StoreError::Database("submit response dropped".into()))?; + } + + // Another leader is draining — coalesce via channel. + let (tx, rx) = tokio::sync::oneshot::channel(); + let msg = super::SubmitMsg { + submission: sub.clone(), + response_tx: tx, + }; + self.submit_tx + .send(msg) + .map_err(|_| StoreError::Database("submit channel closed".into()))?; + + rx.await + .map_err(|_| StoreError::Database("submit response dropped".into()))? + } + + /// Direct submit without coalescing (old code path). + /// + /// Used by the fast path when no contention is detected. + async fn submit_direct(&self, sub: &TaskSubmission) -> Result { let mut conn = self.begin_write().await?; - tracing::debug!(task_type = %sub.task_type, "store.submit: INSERT start"); + let skip_requeue = !self.has_running.load(std::sync::atomic::Ordering::Relaxed); let outcome = submit_one( &mut conn, sub, Some(&self.has_tags), Some(&self.has_hierarchy), + skip_requeue, ) .await?; - tracing::debug!(task_type = %sub.task_type, "store.submit: INSERT end"); sqlx::query("COMMIT").execute(&mut *conn).await?; Ok(outcome) } + /// Process a batch of coalesced submit messages in a single transaction. + /// + /// Each message gets its own `submit_one` call, but they all share one + /// `BEGIN IMMEDIATE` / `COMMIT` pair. Results are sent back via oneshot. + pub(crate) async fn process_submit_batch(&self, batch: Vec) { + if batch.is_empty() { + return; + } + + let skip_requeue = !self.has_running.load(std::sync::atomic::Ordering::Relaxed); + + // Try to open a single shared transaction. + let conn = self.begin_write().await; + let mut conn = match conn { + Ok(c) => c, + Err(e) => { + let msg = e.to_string(); + for m in batch { + let _ = m.response_tx.send(Err(StoreError::Database(msg.clone()))); + } + return; + } + }; + + let mut results: Vec> = Vec::with_capacity(batch.len()); + for m in &batch { + results.push( + submit_one( + &mut conn, + &m.submission, + Some(&self.has_tags), + Some(&self.has_hierarchy), + skip_requeue, + ) + .await, + ); + } + + match sqlx::query("COMMIT").execute(&mut *conn).await { + Ok(_) => { + for (m, result) in batch.into_iter().zip(results) { + let _ = m.response_tx.send(result); + } + } + Err(e) => { + let msg = e.to_string(); + for m in batch { + let _ = m.response_tx.send(Err(StoreError::Database(msg.clone()))); + } + } + } + } + /// Submit multiple tasks in a single transaction. Returns a `Vec` with one /// [`SubmitOutcome`] per input. /// @@ -248,6 +355,8 @@ impl TaskStore { let mut results = Vec::with_capacity(submissions.len()); + let skip_requeue = !self.has_running.load(std::sync::atomic::Ordering::Relaxed); + for chunk in submissions.chunks(BATCH_CHUNK_SIZE) { let chunk_offset = results.len(); let mut conn = self.begin_write().await?; @@ -263,6 +372,7 @@ impl TaskStore { sub, Some(&self.has_tags), Some(&self.has_hierarchy), + skip_requeue, ) .await?, );