diff --git a/README.md b/README.md index 5473b25..d312623 100644 --- a/README.md +++ b/README.md @@ -79,6 +79,7 @@ async fn main() { - **Avoid duplicate work** — key-based deduplication prevents the same task from being queued twice - **React to system load** — composable backpressure from any signal (disk, network, memory, battery, API limits) - **Control concurrency** — per-group limits (e.g., per S3 bucket), global limits, runtime-adjustable +- **Rate-limit dispatch** — token-bucket rate limits per task type and/or group cap start rate independently of concurrency - **Build for Tauri** — `Clone`, `Serialize` on all types; events bridge directly to frontends ## Where to start diff --git a/docs/configuration.md b/docs/configuration.md index f16ad67..0a03a9a 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -349,6 +349,52 @@ scheduler.domain::().set_max_concurrency(8); let current = scheduler.domain::().max_concurrency(); ``` +## Rate limiting + +Rate limits control how many tasks *start per unit of time*, complementing concurrency limits which control how many run *simultaneously*. This is useful when fast tasks (e.g., small API calls completing in milliseconds) could produce bursts that exceed external rate limits. + +Configure at build time: + +```rust +use taskmill::{RateLimit, Scheduler}; + +Scheduler::builder() + // Task-type rate limit: at most 100 uploads per second. + .rate_limit("media::upload", RateLimit::per_second(100)) + // Group rate limit: at most 50 requests/sec to this S3 bucket. + .group_rate_limit("s3://prod-bucket", RateLimit::per_second(50)) + // Allow short bursts above the steady-state rate. + .rate_limit("media::thumbnail", RateLimit::per_second(10).with_burst(20)) + // ... + .build() + .await?; +``` + +Adjust at runtime via the scheduler or a domain handle: + +```rust +scheduler.set_rate_limit("media::upload", RateLimit::per_second(200)); +scheduler.remove_rate_limit("media::upload"); + +// Domain handles auto-prefix the task type: +let media = scheduler.domain::(); +media.set_rate_limit("upload", RateLimit::per_second(150)); // → "media::upload" +media.set_group_rate_limit("s3://prod-bucket", RateLimit::per_minute(3000)); +``` + +A task can be subject to both a task-type rate limit and a group rate limit — it must pass both to be dispatched. Rate limit tokens are acquired *after* all free checks (backpressure, IO budget, concurrency, group pause) so tokens are never wasted on tasks that would be rejected anyway. + +When a task is rate-limited, its `run_after` is set to the next token availability, removing it from the dispatch window so other task types can proceed without head-of-line blocking. + +Current rate limit state is visible in the scheduler snapshot: + +```rust +let snap = scheduler.snapshot().await?; +for rl in &snap.rate_limits { + println!("{}: {}/{} tokens available", rl.scope, rl.available_tokens, rl.burst); +} +``` + ## Tuning for specific workloads ### Desktop app with file processing @@ -386,6 +432,7 @@ Scheduler::builder() .with_resource_monitoring() .bandwidth_limit(50_000_000.0) // 50 MB/s cap .group_concurrency("s3-bucket", 4) // per-endpoint limits + .group_rate_limit("s3-bucket", RateLimit::per_second(100)) // stay under API rate limit .shutdown_mode(ShutdownMode::Graceful(Duration::from_secs(30))) ``` @@ -435,6 +482,8 @@ Scheduler::builder() | `bandwidth_limit(bytes_per_sec)` | Set a network bandwidth cap; registers a built-in `NetworkPressure` source. | | `default_group_concurrency(n)` | Default concurrency limit for grouped tasks (0 = unlimited). | | `group_concurrency(group, n)` | Per-group concurrency limit override. | +| `rate_limit(task_type, limit)` | Set a token-bucket rate limit for a task type. | +| `group_rate_limit(group, limit)` | Set a token-bucket rate limit for a task group. | | `app_state(state)` | Register global state visible to all domains. | | `app_state_arc(arc)` | Register global state from a pre-existing `Arc`. | | `build()` | Build and return the `Scheduler`. | diff --git a/docs/glossary.md b/docs/glossary.md index 56f797b..5165597 100644 --- a/docs/glossary.md +++ b/docs/glossary.md @@ -21,7 +21,8 @@ Quick reference for terms used throughout the taskmill documentation. | **Pile-up prevention** | The mechanism that skips a recurring task instance when the previous instance hasn't been dispatched yet, preventing unbounded queue growth under sustained load. See [Quick Start](quick-start.md#recurring-tasks). | | **Preemption** | Pausing lower-priority work so higher-priority work can run immediately. Preempted tasks resume automatically once the urgent work finishes. See [Priorities & Preemption](priorities-and-preemption.md#preemption). | | **Pressure source** | Anything that signals the system is busy — disk IO, network throughput, memory usage, API rate limits, battery level. Returns a value from 0.0 (idle) to 1.0 (saturated). See [IO & Backpressure](io-and-backpressure.md#pressure-sources). | -| **Task group** | A named set of tasks that share a concurrency limit. For example, you might limit uploads to a specific S3 bucket to 3 at a time. See [Priorities & Preemption](priorities-and-preemption.md#task-groups). | +| **Rate limit** | A token-bucket cap on how many tasks start per unit of time, independent of concurrency. Scoped by task type and/or group. Configured via `RateLimit::per_second(n)` / `per_minute(n)` with optional `.with_burst(b)`. See [Configuration — Rate limiting](configuration.md#rate-limiting). | +| **Task group** | A named set of tasks that share a concurrency limit and/or rate limit. For example, you might limit uploads to a specific S3 bucket to 3 concurrent and 100/sec. See [Priorities & Preemption](priorities-and-preemption.md#task-groups). | | **task_deps** | The SQLite junction table that stores dependency edges between tasks. Each row `(task_id, depends_on_id)` means the task cannot start until the dependency completes. Edges survive restarts and are cleaned up automatically when dependencies resolve or on startup. See [Persistence & Recovery](persistence-and-recovery.md#dependency-recovery). | | **Throttle policy** | Rules that map system pressure to dispatch decisions. The default policy defers background tasks when pressure exceeds 50% and normal tasks when it exceeds 75%, but never blocks high-priority work. See [Priorities & Preemption](priorities-and-preemption.md#throttle-behavior). | | **TTL (time-to-live)** | A duration after which a task automatically expires if it hasn't started running. Configurable per-task, per-type, or as a global default. See [Configuration](configuration.md#task-ttl-time-to-live). | diff --git a/docs/priorities-and-preemption.md b/docs/priorities-and-preemption.md index 8634ed6..9a2e50c 100644 --- a/docs/priorities-and-preemption.md +++ b/docs/priorities-and-preemption.md @@ -119,6 +119,26 @@ scheduler.remove_group_limit("bucket-prod").await; Group limits are checked *in addition to* `max_concurrency` — a task must pass both the global and group gate to be dispatched. +### Rate limiting + +While concurrency limits control how many tasks run *simultaneously*, rate limits control how many tasks *start per unit of time*. This is essential when fast tasks (completing in milliseconds) produce bursts that overwhelm external APIs. + +Rate limits use a token-bucket algorithm and can be scoped by task type, group, or both: + +```rust +Scheduler::builder() + .rate_limit("media::upload", RateLimit::per_second(100)) + .group_rate_limit("s3://prod", RateLimit::per_second(50).with_burst(75)) + .build() + .await?; +``` + +When a task is rate-limited, the scheduler sets its `run_after` to the next token availability. This prevents head-of-line blocking — other task types dispatch normally while the rate-limited type waits. + +Rate limits are checked *after* all other gate checks (backpressure, IO budget, concurrency), so tokens are never wasted on tasks that would be rejected for other reasons. + +See [Configuration — Rate limiting](configuration.md#rate-limiting) for full details and runtime adjustment APIs. + ## Domain-level pause and resume Individual domains can be paused and resumed independently, without affecting other domains. This is useful for features like a user-togglable sync, or temporarily disabling a domain during maintenance. diff --git a/src/domain.rs b/src/domain.rs index 2dc9117..e4ebc6a 100644 --- a/src/domain.rs +++ b/src/domain.rs @@ -710,6 +710,32 @@ impl DomainHandle { self.inner.max_concurrency() } + // ── Rate Limiting ────────────────────────────────────────────── + + /// Set a rate limit for a task type in this domain. + pub fn set_rate_limit(&self, task_type: &str, limit: crate::scheduler::RateLimit) { + self.inner.set_rate_limit(task_type, limit); + } + + /// Remove the rate limit for a task type in this domain. + pub fn remove_rate_limit(&self, task_type: &str) { + self.inner.remove_rate_limit(task_type); + } + + /// Set a rate limit for a task group. + pub fn set_group_rate_limit( + &self, + group: impl Into, + limit: crate::scheduler::RateLimit, + ) { + self.inner.set_group_rate_limit(group, limit); + } + + /// Remove a group rate limit. + pub fn remove_group_rate_limit(&self, group: &str) { + self.inner.remove_group_rate_limit(group); + } + // ── Events ────────────────────────────────────────────────────── /// Subscribe to all events for this domain. diff --git a/src/lib.rs b/src/lib.rs index 2e40add..d2af21b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -22,6 +22,8 @@ //! - Supports [task superseding](DuplicateStrategy::Supersede) for atomic cancel-and-replace //! - Supports [task TTL](TtlFrom) with automatic expiry, per-type defaults, and child inheritance //! - Supports [graceful shutdown](ShutdownMode) with configurable drain timeout +//! - Supports [token-bucket rate limiting](RateLimit) per task type and/or group to cap start rate +//! independently of concurrency //! //! # Concepts //! @@ -806,9 +808,9 @@ pub use resource::network_pressure::NetworkPressure; pub use resource::sampler::SamplerConfig; pub use resource::{ResourceReader, ResourceSampler, ResourceSnapshot}; pub use scheduler::{ - EstimatedProgress, GroupLimits, PausedGroupInfo, ProgressReporter, Scheduler, SchedulerBuilder, - SchedulerConfig, SchedulerEvent, SchedulerSnapshot, ShutdownMode, TaskEventHeader, - TaskProgress, + EstimatedProgress, GroupLimits, PausedGroupInfo, ProgressReporter, RateLimit, RateLimitInfo, + Scheduler, SchedulerBuilder, SchedulerConfig, SchedulerEvent, SchedulerSnapshot, ShutdownMode, + TaskEventHeader, TaskProgress, }; pub use store::{RetentionPolicy, StoreConfig, StoreError, TaskStore}; pub use task::{ diff --git a/src/module.rs b/src/module.rs index 25edecf..e6f825e 100644 --- a/src/module.rs +++ b/src/module.rs @@ -714,6 +714,37 @@ impl ModuleHandle { .unwrap_or(0) } + // ── Rate Limiting ───────────────────────────────────────────── + + /// Set a rate limit for a task type in this module. + /// + /// The `task_type` is the unprefixed name (e.g. `"upload"` for a module + /// named `"media"` becomes `"media::upload"` internally). + pub fn set_rate_limit(&self, task_type: &str, limit: crate::scheduler::RateLimit) { + let prefixed = format!("{}{}", self.prefix, task_type); + self.scheduler.set_rate_limit(prefixed, limit); + } + + /// Remove the rate limit for a task type in this module. + pub fn remove_rate_limit(&self, task_type: &str) { + let prefixed = format!("{}{}", self.prefix, task_type); + self.scheduler.remove_rate_limit(&prefixed); + } + + /// Set a rate limit for a task group. + pub fn set_group_rate_limit( + &self, + group: impl Into, + limit: crate::scheduler::RateLimit, + ) { + self.scheduler.set_group_rate_limit(group, limit); + } + + /// Remove a group rate limit. + pub fn remove_group_rate_limit(&self, group: &str) { + self.scheduler.remove_group_rate_limit(group); + } + // ── Scoped queries ──────────────────────────────────────────── /// All active tasks in this module (any status). diff --git a/src/scheduler/builder.rs b/src/scheduler/builder.rs index 481664e..99709bc 100644 --- a/src/scheduler/builder.rs +++ b/src/scheduler/builder.rs @@ -14,6 +14,8 @@ use crate::resource::sampler::{SamplerConfig, SmoothedReader}; use crate::resource::{ResourceReader, ResourceSampler}; use crate::store::{StoreConfig, StoreError, TaskStore}; +use super::rate_limit::RateLimit; + use super::event::{SchedulerConfig, ShutdownMode}; use super::Scheduler; @@ -52,6 +54,8 @@ pub struct SchedulerBuilder { bandwidth_limit_bps: Option, default_group_concurrency: usize, group_concurrency_overrides: Vec<(String, usize)>, + type_rate_limits: Vec<(String, RateLimit)>, + group_rate_limits: Vec<(String, RateLimit)>, } impl SchedulerBuilder { @@ -72,6 +76,8 @@ impl SchedulerBuilder { bandwidth_limit_bps: None, default_group_concurrency: 0, group_concurrency_overrides: Vec::new(), + type_rate_limits: Vec::new(), + group_rate_limits: Vec::new(), } } @@ -245,6 +251,24 @@ impl SchedulerBuilder { self } + /// Set a rate limit for a task type. + /// + /// The `task_type` should be the full prefixed name (e.g. `"media::upload"`). + /// Multiple calls with the same key overwrite. + pub fn rate_limit(mut self, task_type: impl Into, limit: RateLimit) -> Self { + self.type_rate_limits.push((task_type.into(), limit)); + self + } + + /// Set a rate limit for a task group. + /// + /// Tasks with a matching `group_key` are rate-limited to the configured + /// rate, independent of task-type rate limits (both must pass). + pub fn group_rate_limit(mut self, group: impl Into, limit: RateLimit) -> Self { + self.group_rate_limits.push((group.into(), limit)); + self + } + /// Register shared application state accessible from every executor via /// [`TaskContext::state`](crate::TaskContext::state). /// @@ -453,6 +477,16 @@ impl SchedulerBuilder { std::sync::atomic::Ordering::Relaxed, ); + // Apply rate limits. + let has_rate_limits = + !self.type_rate_limits.is_empty() || !self.group_rate_limits.is_empty(); + for (scope, limit) in self.type_rate_limits { + scheduler.inner.type_rate_limits.set(scope, limit); + } + for (scope, limit) in self.group_rate_limits { + scheduler.inner.group_rate_limits.set(scope, limit); + } + // Compute fast-dispatch eligibility before consuming builder fields. let has_groups = self.default_group_concurrency > 0 || !self.group_concurrency_overrides.is_empty(); @@ -501,7 +535,12 @@ impl SchedulerBuilder { // Enable fast dispatch (single pop_next instead of peek + gate + claim) // when no groups, no resource monitoring, no pressure sources, no // module caps, and no paused groups are present. - if !has_groups && !has_monitoring && !has_pressure && !has_module_caps && !has_paused_groups + if !has_groups + && !has_monitoring + && !has_pressure + && !has_module_caps + && !has_paused_groups + && !has_rate_limits { scheduler .inner diff --git a/src/scheduler/control.rs b/src/scheduler/control.rs index 0632f39..a25b914 100644 --- a/src/scheduler/control.rs +++ b/src/scheduler/control.rs @@ -6,6 +6,7 @@ use chrono::{DateTime, Utc}; use crate::store::StoreError; +use super::rate_limit::RateLimit; use super::{emit_event, Scheduler, SchedulerEvent}; impl Scheduler { @@ -241,16 +242,51 @@ impl Scheduler { self.inner.paused_groups.read().unwrap().contains(group_key) } + // ── Rate Limiting ────────────────────────────────────────────── + + /// Set or update the rate limit for a task type at runtime. + /// + /// If a bucket already exists, reconfigures in-place (preserving current + /// token count, clamped to new burst). If not, creates a new bucket. + pub fn set_rate_limit(&self, task_type: impl Into, limit: RateLimit) { + self.inner.type_rate_limits.set(task_type.into(), limit); + self.inner + .fast_dispatch + .store(false, AtomicOrdering::Relaxed); + } + + /// Remove the task-type rate limit, falling back to unlimited. + pub fn remove_rate_limit(&self, task_type: &str) { + self.inner.type_rate_limits.remove(task_type); + self.maybe_restore_fast_dispatch(); + } + + /// Set or update the rate limit for a task group at runtime. + pub fn set_group_rate_limit(&self, group: impl Into, limit: RateLimit) { + self.inner.group_rate_limits.set(group.into(), limit); + self.inner + .fast_dispatch + .store(false, AtomicOrdering::Relaxed); + } + + /// Remove the group rate limit, falling back to unlimited. + pub fn remove_group_rate_limit(&self, group: &str) { + self.inner.group_rate_limits.remove(group); + self.maybe_restore_fast_dispatch(); + } + /// Re-evaluate whether fast dispatch can be re-enabled. /// /// Must mirror the conditions in `SchedulerBuilder::build()`: /// no paused groups, no group limits (default or overrides), no resource - /// monitoring, no pressure sources, no module concurrency caps. + /// monitoring, no pressure sources, no module concurrency caps, no rate limits. fn maybe_restore_fast_dispatch(&self) { let has_groups = self.inner.group_limits.default_limit() > 0 || self.inner.group_limits.has_overrides() || !self.inner.paused_groups.read().unwrap().is_empty(); let has_module_caps = !self.inner.module_caps.read().unwrap().is_empty(); + let has_rate_limits = + !self.inner.type_rate_limits.is_empty() || !self.inner.group_rate_limits.is_empty(); if !has_groups && !self @@ -262,6 +298,7 @@ impl Scheduler { .inner .has_pressure_sources .load(AtomicOrdering::Relaxed) + && !has_rate_limits { self.inner .fast_dispatch diff --git a/src/scheduler/event.rs b/src/scheduler/event.rs index 61d1e81..369e652 100644 --- a/src/scheduler/event.rs +++ b/src/scheduler/event.rs @@ -16,6 +16,7 @@ use tokio::time::Duration; use crate::priority::Priority; use super::progress::{EstimatedProgress, TaskProgress}; +use super::rate_limit::RateLimitInfo; // ── Snapshot ──────────────────────────────────────────────────────── @@ -51,6 +52,8 @@ pub struct SchedulerSnapshot { pub blocked_count: i64, /// Groups that are currently paused, with the timestamp each was paused. pub paused_groups: Vec, + /// Configured rate limits with current utilization. + pub rate_limits: Vec, } /// Information about a paused group for snapshot/dashboard display. diff --git a/src/scheduler/gate.rs b/src/scheduler/gate.rs index 821fc33..0ba82e9 100644 --- a/src/scheduler/gate.rs +++ b/src/scheduler/gate.rs @@ -2,7 +2,8 @@ //! //! The [`DispatchGate`] trait decides whether a popped task should run or be //! requeued. The built-in [`DefaultDispatchGate`] applies backpressure -//! throttling and IO-budget checks. +//! throttling, IO-budget checks, group/module concurrency limits, and +//! token-bucket rate limiting. use std::collections::{HashMap, HashSet}; use std::future::Future; @@ -15,9 +16,24 @@ use crate::resource::ResourceReader; use crate::store::{StoreError, TaskStore}; use crate::task::TaskRecord; +use super::rate_limit::RateLimits; + /// Boxed future returned by [`DispatchGate`] methods. type BoxFuture<'a, T> = Pin + Send + 'a>>; +// ── Admission ───────────────────────────────────────────────────── + +/// Result of a [`DispatchGate::admit`] call. +pub enum Admission { + /// Task should be dispatched. + Admit, + /// Task should be requeued (backpressure, concurrency, etc.). + Deny, + /// Task was rate-limited. Contains the `Instant` the next token + /// will be available — caller should set `run_after` accordingly. + RateLimited(tokio::time::Instant), +} + // ── Gate Context ─────────────────────────────────────────────────── /// Context provided to a [`DispatchGate`] for admission decisions. @@ -37,6 +53,10 @@ pub struct GateContext<'a> { pub module_running: &'a HashMap, /// Set of currently paused group keys. pub paused_groups: &'a HashSet, + /// Per-task-type rate limits. + pub type_rate_limits: &'a RateLimits, + /// Per-group rate limits. + pub group_rate_limits: &'a RateLimits, } // ── Dispatch Gate ────────────────────────────────────────────────── @@ -45,16 +65,16 @@ pub struct GateContext<'a> { /// /// The scheduler calls [`admit`](DispatchGate::admit) after popping a /// task from the store but before spawning the executor. Returning -/// `Ok(false)` causes the task to be requeued for a later cycle. +/// `Ok(Admission::Deny)` causes the task to be requeued for a later cycle. /// -/// The default [`DefaultDispatchGate`] applies backpressure throttling -/// and IO-budget checks. Custom implementations can add per-type rate -/// limiting, cost-model gating, feature flags, etc. +/// The default [`DefaultDispatchGate`] applies backpressure throttling, +/// IO-budget checks, group concurrency, module concurrency, and +/// rate-limit checks. /// /// # Example /// /// ```ignore -/// use taskmill::scheduler::gate::{DispatchGate, GateContext}; +/// use taskmill::scheduler::gate::{Admission, DispatchGate, GateContext}; /// use taskmill::store::StoreError; /// use taskmill::task::TaskRecord; /// @@ -65,20 +85,22 @@ pub struct GateContext<'a> { /// &'a self, /// _task: &'a TaskRecord, /// _ctx: &'a GateContext<'a>, -/// ) -> std::pin::Pin> + Send + 'a>> { -/// Box::pin(async { Ok(true) }) +/// ) -> std::pin::Pin> + Send + 'a>> { +/// Box::pin(async { Ok(Admission::Admit) }) /// } /// } /// ``` pub trait DispatchGate: Send + Sync + 'static { /// Check whether `task` should be dispatched given the current context. /// - /// Return `Ok(true)` to dispatch, `Ok(false)` to requeue. + /// Return `Ok(Admission::Admit)` to dispatch, `Ok(Admission::Deny)` to + /// requeue, or `Ok(Admission::RateLimited(next))` to defer until the + /// given instant. fn admit<'a>( &'a self, task: &'a TaskRecord, ctx: &'a GateContext<'a>, - ) -> BoxFuture<'a, Result>; + ) -> BoxFuture<'a, Result>; /// Current aggregate pressure (0.0–1.0). Returns 0.0 by default. fn pressure<'a>(&'a self) -> BoxFuture<'a, f32> { @@ -93,7 +115,7 @@ pub trait DispatchGate: Send + Sync + 'static { // ── Default Gate ─────────────────────────────────────────────────── -/// Default gate: backpressure throttling + IO budget. +/// Default gate: backpressure, IO budget, concurrency, and rate limiting. /// /// This is what the scheduler uses unless you provide a custom gate via /// [`SchedulerBuilder::dispatch_gate`](super::SchedulerBuilder::dispatch_gate). @@ -116,7 +138,7 @@ impl DispatchGate for DefaultDispatchGate { &'a self, task: &'a TaskRecord, ctx: &'a GateContext<'a>, - ) -> BoxFuture<'a, Result> { + ) -> BoxFuture<'a, Result> { Box::pin(async move { // Backpressure check. let current_pressure = self.pressure.lock().await.pressure(); @@ -126,7 +148,7 @@ impl DispatchGate for DefaultDispatchGate { pressure = current_pressure, "task throttled by backpressure — requeuing" ); - return Ok(false); + return Ok(Admission::Deny); } // IO budget check (disk). @@ -137,7 +159,7 @@ impl DispatchGate for DefaultDispatchGate { expected_write = task.expected_io.disk_write, "task deferred — disk IO budget exhausted — requeuing" ); - return Ok(false); + return Ok(Admission::Deny); } // Network IO budget check. @@ -148,7 +170,7 @@ impl DispatchGate for DefaultDispatchGate { expected_tx = task.expected_io.net_tx, "task deferred — network IO budget exhausted — requeuing" ); - return Ok(false); + return Ok(Admission::Deny); } // Group pause check. @@ -159,7 +181,7 @@ impl DispatchGate for DefaultDispatchGate { group = group_key, "task deferred — group paused — requeuing" ); - return Ok(false); + return Ok(Admission::Deny); } } @@ -176,7 +198,7 @@ impl DispatchGate for DefaultDispatchGate { limit, "task deferred — group concurrency saturated — requeuing" ); - return Ok(false); + return Ok(Admission::Deny); } } } @@ -198,12 +220,35 @@ impl DispatchGate for DefaultDispatchGate { cap, "task deferred — module concurrency saturated — requeuing" ); - return Ok(false); + return Ok(Admission::Deny); } } } - Ok(true) + // Rate limit check — task type (acquire-last: only after all + // free checks pass, so tokens are never wasted on downstream + // rejections). + if let Some(Err(next)) = ctx.type_rate_limits.try_acquire(&task.task_type) { + tracing::trace!( + task_type = task.task_type, + "task deferred — task-type rate limit" + ); + return Ok(Admission::RateLimited(next)); + } + + // Rate limit check — group. + if let Some(group_key) = &task.group_key { + if let Some(Err(next)) = ctx.group_rate_limits.try_acquire(group_key) { + tracing::trace!( + task_type = task.task_type, + group = group_key, + "task deferred — group rate limit" + ); + return Ok(Admission::RateLimited(next)); + } + } + + Ok(Admission::Admit) }) } diff --git a/src/scheduler/mod.rs b/src/scheduler/mod.rs index 5145572..b8c0c73 100644 --- a/src/scheduler/mod.rs +++ b/src/scheduler/mod.rs @@ -2,18 +2,19 @@ //! //! [`Scheduler`] coordinates task execution — popping from the //! [`TaskStore`], applying [backpressure](crate::backpressure), -//! IO-budget checks, and [group concurrency](crate::GroupLimits) limits, +//! IO-budget checks, [group concurrency](crate::GroupLimits) limits, +//! and [token-bucket rate limiting](crate::RateLimit) per task type and group, //! preempting lower-priority work, and emitting [`SchedulerEvent`]s for UI //! integration. Use [`SchedulerBuilder`] for ergonomic construction. //! //! The `Scheduler` implementation is split across focused submodules: //! - `submit` — task submission, lookup, cancellation, and superseding //! - `run_loop` — the main event loop, dispatch, and shutdown -//! - `control` — pause/resume, concurrency limits, and group limits +//! - `control` — pause/resume, concurrency limits, group limits, and rate limits //! - `queries` — read-only queries (active tasks, progress, snapshots) //! - `builder` — ergonomic construction via [`SchedulerBuilder`] //! - `dispatch` — task spawning, active-task tracking, and preemption -//! - `gate` — admission control (IO budget, backpressure, group limits) +//! - `gate` — admission control (IO budget, backpressure, group limits, rate limits) //! - `event` — event types and scheduler configuration //! - [`progress`] — progress reporting, byte-level tracking, and extrapolation //! @@ -27,6 +28,7 @@ pub(crate) mod event; pub(crate) mod gate; pub mod progress; mod queries; +pub(crate) mod rate_limit; mod run_loop; pub(crate) mod spawn; mod submit; @@ -81,6 +83,7 @@ pub use event::{ }; pub use gate::GroupLimits; pub use progress::{EstimatedProgress, ProgressReporter, TaskProgress}; +pub use rate_limit::{RateLimit, RateLimitInfo, RateLimits}; /// Emit a scheduler event only when at least one subscriber is listening. /// @@ -131,6 +134,10 @@ pub(crate) struct SchedulerInner { pub(crate) work_notify: Arc, /// Per-group concurrency limits. pub(crate) group_limits: GroupLimits, + /// Per-task-type rate limits (e.g. "media::upload" → 100/sec). + pub(crate) type_rate_limits: rate_limit::RateLimits, + /// Per-group rate limits (e.g. "s3://b2-us-west" → 200/sec). + pub(crate) group_rate_limits: rate_limit::RateLimits, /// Timeout for on_cancel hooks. pub(crate) cancel_hook_timeout: Duration, /// Default TTL for tasks without an explicit TTL. @@ -196,9 +203,10 @@ pub(crate) struct SchedulerInner { /// 1. Popping highest-priority pending tasks from the SQLite store /// 2. Checking IO budget against running task estimates and system capacity /// 3. Applying backpressure throttling based on external pressure sources -/// 4. Preempting lower-priority tasks when high-priority work arrives -/// 5. Managing retries and failure recording -/// 6. Emitting lifecycle events for UI integration +/// 4. Enforcing token-bucket rate limits per task type and group +/// 5. Preempting lower-priority tasks when high-priority work arrives +/// 6. Managing retries and failure recording +/// 7. Emitting lifecycle events for UI integration /// /// `Scheduler` is `Clone` — each clone shares the same underlying state. /// This makes it easy to hold in `tauri::State` or share across @@ -307,6 +315,8 @@ impl Scheduler { paused: AtomicBool::new(false), work_notify: Arc::new(Notify::new()), group_limits: GroupLimits::new(), + type_rate_limits: rate_limit::RateLimits::new(), + group_rate_limits: rate_limit::RateLimits::new(), cancel_hook_timeout: config.cancel_hook_timeout, default_ttl: config.default_ttl, expiry_sweep_interval: config.expiry_sweep_interval, diff --git a/src/scheduler/queries.rs b/src/scheduler/queries.rs index bbed6a9..2bc3e2b 100644 --- a/src/scheduler/queries.rs +++ b/src/scheduler/queries.rs @@ -168,6 +168,10 @@ impl Scheduler { ) .collect(); + // Rate limits: combine type and group collections. + let mut rate_limits = self.inner.type_rate_limits.snapshot_info("type"); + rate_limits.extend(self.inner.group_rate_limits.snapshot_info("group")); + Ok(SchedulerSnapshot { running, pending_count, @@ -182,6 +186,7 @@ impl Scheduler { recurring_schedules, blocked_count, paused_groups, + rate_limits, }) } } diff --git a/src/scheduler/rate_limit.rs b/src/scheduler/rate_limit.rs new file mode 100644 index 0000000..d25bf34 --- /dev/null +++ b/src/scheduler/rate_limit.rs @@ -0,0 +1,333 @@ +//! Token-bucket rate limiting for task dispatch. +//! +//! Rate limits control how many tasks *start per unit of time*, complementing +//! concurrency limits which control how many run *simultaneously*. A +//! [`RateLimit`] configures the steady-state rate and burst capacity; the +//! scheduler enforces it via [`RateLimits`] collections scoped by task type +//! and/or group key. + +use std::collections::HashMap; +use std::time::Duration; + +use tokio::time::Instant; + +// ── Configuration ──────────────────────────────────────────────────── + +/// Rate limit configuration. +/// +/// Defines the steady-state rate (`permits` per `interval`) and optional +/// burst capacity. Use the convenience constructors [`per_second`](Self::per_second) +/// and [`per_minute`](Self::per_minute) for common patterns. +/// +/// # Examples +/// +/// ``` +/// use taskmill::RateLimit; +/// +/// // 100 requests per second, no burst beyond steady rate. +/// let limit = RateLimit::per_second(100); +/// +/// // 10 per minute with a burst of 20 (allows short spikes). +/// let limit = RateLimit::per_minute(10).with_burst(20); +/// ``` +#[derive(Debug, Clone)] +pub struct RateLimit { + /// Maximum tokens replenished per `interval`. + pub permits: u32, + /// Replenishment interval. + pub interval: Duration, + /// Burst capacity — max tokens the bucket can hold. + /// Defaults to `permits` (no burst beyond steady rate). + pub burst: u32, +} + +impl RateLimit { + /// Create a rate limit of `n` permits per second. + pub fn per_second(n: u32) -> Self { + Self { + permits: n, + interval: Duration::from_secs(1), + burst: n, + } + } + + /// Create a rate limit of `n` permits per minute. + pub fn per_minute(n: u32) -> Self { + Self { + permits: n, + interval: Duration::from_secs(60), + burst: n, + } + } + + /// Set the burst capacity (max tokens the bucket can hold). + /// + /// Burst allows short spikes above the steady-state rate. For example, + /// `RateLimit::per_second(10).with_burst(20)` allows 20 immediate + /// dispatches followed by a sustained 10/sec. + pub fn with_burst(mut self, burst: u32) -> Self { + self.burst = burst; + self + } +} + +// ── Token Bucket ───────────────────────────────────────────────────── + +/// Classic token-bucket: tracks available tokens and last refill time. +/// +/// All methods take `&mut self`. Thread safety is provided by the +/// `Mutex` at the [`RateLimits`] collection level. +pub(crate) struct TokenBucket { + /// Fractional tokens for smooth refill. + available: f64, + /// Max capacity (burst). + burst: f64, + /// Tokens per nanosecond. + rate: f64, + /// Last time tokens were refilled. + last_refill: Instant, + /// Original config — kept for snapshot reporting. + config: RateLimit, +} + +impl TokenBucket { + fn new(config: &RateLimit) -> Self { + let rate = config.permits as f64 / config.interval.as_nanos() as f64; + Self { + available: config.burst as f64, + burst: config.burst as f64, + rate, + last_refill: Instant::now(), + config: config.clone(), + } + } + + /// Refill tokens based on elapsed time since last refill. + fn refill(&mut self) { + let now = Instant::now(); + let elapsed_nanos = now.duration_since(self.last_refill).as_nanos() as f64; + self.available = (self.available + elapsed_nanos * self.rate).min(self.burst); + self.last_refill = now; + } + + /// Try to consume one token. Returns `Ok(())` if granted, + /// `Err(next_available)` with the `Instant` the next token + /// will be available if denied. + fn try_acquire(&mut self) -> Result<(), Instant> { + self.refill(); + if self.available >= 1.0 { + self.available -= 1.0; + Ok(()) + } else { + // Compute when the next token will be available. + let deficit = 1.0 - self.available; + let wait_nanos = deficit / self.rate; + Err(Instant::now() + Duration::from_nanos(wait_nanos as u64)) + } + } + + /// Update the bucket's rate and burst from a new config. + /// Preserves current token count (clamped to new burst). + fn reconfigure(&mut self, config: &RateLimit) { + let new_rate = config.permits as f64 / config.interval.as_nanos() as f64; + let new_burst = config.burst as f64; + self.refill(); // settle tokens before changing params + self.rate = new_rate; + self.burst = new_burst; + self.available = self.available.min(new_burst); + self.config = config.clone(); + } +} + +// ── Rate Limits Collection ─────────────────────────────────────────── + +/// Per-scope rate limit collection. +/// +/// Maps scope keys (task-type prefix or group key) to token buckets. +/// Thread-safe: a single `Mutex` guards the inner `HashMap`. +pub struct RateLimits { + inner: std::sync::Mutex>, +} + +impl Default for RateLimits { + fn default() -> Self { + Self::new() + } +} + +impl RateLimits { + /// Create a new empty collection. + pub fn new() -> Self { + Self { + inner: std::sync::Mutex::new(HashMap::new()), + } + } + + /// Try to acquire a token for `scope`. Returns: + /// - `None` if no rate limit is configured for this scope (always pass). + /// - `Some(Ok(()))` if a token was granted. + /// - `Some(Err(next))` if denied, with the Instant the next token arrives. + pub fn try_acquire(&self, scope: &str) -> Option> { + let mut map = self.inner.lock().unwrap(); + let bucket = map.get_mut(scope)?; + Some(bucket.try_acquire()) + } + + /// Set or update the rate limit for a scope. + /// If a bucket already exists, reconfigure in-place (preserving tokens). + pub fn set(&self, scope: String, config: RateLimit) { + let mut map = self.inner.lock().unwrap(); + if let Some(bucket) = map.get_mut(&scope) { + bucket.reconfigure(&config); + } else { + map.insert(scope, TokenBucket::new(&config)); + } + } + + /// Remove the rate limit for a scope. + pub fn remove(&self, scope: &str) { + self.inner.lock().unwrap().remove(scope); + } + + /// Returns true if any rate limits are configured. + pub fn is_empty(&self) -> bool { + self.inner.lock().unwrap().is_empty() + } + + /// Earliest instant at which any bucket will have a token available. + /// Returns `None` if no buckets are depleted (or no limits configured). + pub fn next_available(&self) -> Option { + let mut map = self.inner.lock().unwrap(); + let mut earliest: Option = None; + for bucket in map.values_mut() { + bucket.refill(); + if bucket.available < 1.0 { + let deficit = 1.0 - bucket.available; + let wait_nanos = deficit / bucket.rate; + let ready_at = Instant::now() + Duration::from_nanos(wait_nanos as u64); + earliest = Some(match earliest { + Some(e) => e.min(ready_at), + None => ready_at, + }); + } + } + earliest + } + + /// Snapshot of all configured rate limits with current utilization. + pub fn snapshot_info(&self, scope_kind: &str) -> Vec { + let mut map = self.inner.lock().unwrap(); + let mut infos = Vec::with_capacity(map.len()); + for (scope, bucket) in map.iter_mut() { + bucket.refill(); + infos.push(RateLimitInfo { + scope: format!("{scope_kind}:{scope}"), + scope_kind: scope_kind.to_string(), + permits: bucket.config.permits, + interval_ms: bucket.config.interval.as_millis() as u64, + burst: bucket.config.burst, + available_tokens: bucket.available, + }); + } + infos + } +} + +// ── Snapshot Info ───────────────────────────────────────────────────── + +/// Configured rate limit with current utilization, for snapshot reporting. +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct RateLimitInfo { + /// Scope identifier, e.g. `"type:media::upload"` or `"group:s3://b2"`. + pub scope: String, + /// Scope kind: `"type"` or `"group"`. + pub scope_kind: String, + /// Permits per interval. + pub permits: u32, + /// Interval in milliseconds. + pub interval_ms: u64, + /// Burst capacity. + pub burst: u32, + /// Current available tokens. + pub available_tokens: f64, +} + +// ── Tests ──────────────────────────────────────────────────────────── + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn bucket_allows_up_to_burst() { + let config = RateLimit::per_second(10).with_burst(5); + let mut bucket = TokenBucket::new(&config); + for _ in 0..5 { + assert!(bucket.try_acquire().is_ok()); + } + } + + #[test] + fn bucket_denies_after_burst() { + let config = RateLimit::per_second(10).with_burst(3); + let mut bucket = TokenBucket::new(&config); + for _ in 0..3 { + assert!(bucket.try_acquire().is_ok()); + } + let result = bucket.try_acquire(); + assert!(result.is_err()); + // The returned instant should be in the future. + let next = result.unwrap_err(); + assert!(next > Instant::now() || next == Instant::now()); + } + + #[tokio::test] + async fn bucket_refills_over_time() { + let config = RateLimit::per_second(10); // 10 tokens/sec, burst=10 + let mut bucket = TokenBucket::new(&config); + // Drain all tokens. + for _ in 0..10 { + assert!(bucket.try_acquire().is_ok()); + } + assert!(bucket.try_acquire().is_err()); + // Wait for ~1 token worth of time (100ms for 10/sec). + tokio::time::sleep(Duration::from_millis(120)).await; + assert!(bucket.try_acquire().is_ok()); + } + + #[test] + fn reconfigure_preserves_tokens() { + let config = RateLimit::per_second(10).with_burst(10); + let mut bucket = TokenBucket::new(&config); + // Use 5 tokens, leaving 5. + for _ in 0..5 { + bucket.try_acquire().unwrap(); + } + // Reconfigure to burst=3 — tokens clamped to 3. + let new_config = RateLimit::per_second(20).with_burst(3); + bucket.reconfigure(&new_config); + assert!(bucket.available <= 3.0); + // Should still be able to acquire (3 available). + assert!(bucket.try_acquire().is_ok()); + } + + #[test] + fn collection_pass_unconfigured() { + let limits = RateLimits::new(); + assert!(limits.try_acquire("unknown").is_none()); + } + + #[test] + fn snapshot_info_returns_state() { + let limits = RateLimits::new(); + limits.set("media::upload".to_string(), RateLimit::per_second(100)); + + let infos = limits.snapshot_info("type"); + assert_eq!(infos.len(), 1); + assert_eq!(infos[0].scope, "type:media::upload"); + assert_eq!(infos[0].scope_kind, "type"); + assert_eq!(infos[0].permits, 100); + assert_eq!(infos[0].burst, 100); + assert!(infos[0].available_tokens > 0.0); + } +} diff --git a/src/scheduler/run_loop.rs b/src/scheduler/run_loop.rs index b3ac820..c2d0788 100644 --- a/src/scheduler/run_loop.rs +++ b/src/scheduler/run_loop.rs @@ -11,7 +11,7 @@ use crate::task::IoBudget; use super::{emit_event, SchedulerEvent}; -use super::gate::GateContext; +use super::gate::{Admission, GateContext}; use super::spawn::{self, SpawnContext}; use super::{Scheduler, ShutdownMode}; @@ -103,13 +103,33 @@ impl Scheduler { module_caps: &self.inner.module_caps, module_running: &self.inner.module_running, paused_groups: &paused_groups, + type_rate_limits: &self.inner.type_rate_limits, + group_rate_limits: &self.inner.group_rate_limits, }; // Admission check while the task is still pending — no running // window if the gate rejects. - if !self.inner.gate.admit(&candidate, &gate_ctx).await? { - drop(reader_guard); - return Ok(false); + match self.inner.gate.admit(&candidate, &gate_ctx).await? { + Admission::Admit => { /* proceed to claim */ } + Admission::Deny => { + drop(reader_guard); + return Ok(false); + } + Admission::RateLimited(next) => { + drop(reader_guard); + // Set run_after to push the task out of the peek window, + // preventing head-of-line blocking. Other task types can + // still dispatch while this one waits for a token. + let wait = next.duration_since(tokio::time::Instant::now()); + let run_after = chrono::Utc::now() + + chrono::Duration::from_std(wait).unwrap_or(chrono::Duration::milliseconds(1)); + self.inner + .store + .set_run_after(candidate.id, run_after) + .await?; + // Return true to keep looping — another task may be eligible. + return Ok(true); + } } drop(reader_guard); diff --git a/src/store/query/scheduling.rs b/src/store/query/scheduling.rs index 5b0b9fa..5c4315b 100644 --- a/src/store/query/scheduling.rs +++ b/src/store/query/scheduling.rs @@ -49,6 +49,26 @@ impl TaskStore { .collect()) } + /// Set the `run_after` timestamp for a pending task. + /// + /// Used by the rate limiter to defer a task until its next token is + /// available, preventing head-of-line blocking. + pub async fn set_run_after( + &self, + task_id: i64, + run_after: chrono::DateTime, + ) -> Result<(), StoreError> { + sqlx::query( + "UPDATE tasks SET run_after = ? + WHERE id = ? AND status = 'pending'", + ) + .bind(run_after.timestamp_millis()) + .bind(task_id) + .execute(&self.pool) + .await?; + Ok(()) + } + // ── Recurring control ────────────────────────────────────────── /// Pause a recurring schedule. The current instance (if running) is diff --git a/tests/integration.rs b/tests/integration.rs index 41e3aef..83f4b77 100644 --- a/tests/integration.rs +++ b/tests/integration.rs @@ -26,6 +26,8 @@ mod memo; mod module_features; #[path = "integration/modules.rs"] mod modules; +#[path = "integration/rate_limit.rs"] +mod rate_limit; #[path = "integration/retry_policy.rs"] mod retry_policy; #[path = "integration/scheduler_core.rs"] diff --git a/tests/integration/rate_limit.rs b/tests/integration/rate_limit.rs new file mode 100644 index 0000000..3b9c487 --- /dev/null +++ b/tests/integration/rate_limit.rs @@ -0,0 +1,539 @@ +//! Integration tests: Rate limiting per task type / group (Plan 043). + +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; +use std::time::Duration; + +use taskmill::{Domain, RateLimit, Scheduler, SchedulerEvent, TaskStore, TaskSubmission}; +use tokio_util::sync::CancellationToken; + +use super::common::*; + +// ═══════════════════════════════════════════════════════════════════ +// rate_limit_caps_dispatch_rate +// ═══════════════════════════════════════════════════════════════════ + +/// Submit many tasks with a rate limit of 5/sec. Only the first 5 should +/// dispatch in the initial burst; more should follow after time advances. +#[tokio::test] +async fn rate_limit_caps_dispatch_rate() { + let count = Arc::new(AtomicUsize::new(0)); + let executor = CountingExecutor { + count: count.clone(), + }; + + let sched = Scheduler::builder() + .store(TaskStore::open_memory().await.unwrap()) + .domain(Domain::::new().task::(executor)) + .max_concurrency(100) // high concurrency — rate limit is the bottleneck + .rate_limit("test::fast", RateLimit::per_second(5)) + .build() + .await + .unwrap(); + + // Submit 20 tasks. + for i in 0..20 { + sched + .submit(&TaskSubmission::new("test::fast").key(format!("t-{i}"))) + .await + .unwrap(); + } + + let token = CancellationToken::new(); + let sched2 = sched.clone(); + let token2 = token.clone(); + let handle = tokio::spawn(async move { sched2.run(token2).await }); + + // Give the scheduler time to dispatch the initial burst. + tokio::time::sleep(Duration::from_millis(300)).await; + + let dispatched = count.load(Ordering::SeqCst); + // Should have dispatched approximately 5 (the burst), not all 20. + assert!( + (3..=8).contains(&dispatched), + "expected ~5 dispatched in initial burst, got {dispatched}" + ); + + // Wait for more tokens to refill and dispatch more. + tokio::time::sleep(Duration::from_secs(2)).await; + + let dispatched_later = count.load(Ordering::SeqCst); + assert!( + dispatched_later > dispatched, + "expected more tasks after waiting, was {dispatched} now {dispatched_later}" + ); + + token.cancel(); + let _ = handle.await; +} + +// ═══════════════════════════════════════════════════════════════════ +// group_rate_limit_caps_group +// ═══════════════════════════════════════════════════════════════════ + +/// Two groups: one rate-limited to 2/sec, one unlimited. The unlimited +/// group dispatches all tasks quickly while the limited group is throttled. +#[tokio::test] +async fn group_rate_limit_caps_group() { + let count_limited = Arc::new(AtomicUsize::new(0)); + let count_unlimited = Arc::new(AtomicUsize::new(0)); + let count_limited2 = count_limited.clone(); + let count_unlimited2 = count_unlimited.clone(); + + // Use a single executor that routes by group key via tags. + // We'll use two different task types instead for simplicity. + let sched = Scheduler::builder() + .store(TaskStore::open_memory().await.unwrap()) + .domain( + Domain::::new() + .task::(CountingExecutor { + count: count_limited.clone(), + }) + .task::(CountingExecutor { + count: count_unlimited.clone(), + }), + ) + .max_concurrency(100) + .group_rate_limit("limited-group", RateLimit::per_second(2)) + .build() + .await + .unwrap(); + + // Submit 10 tasks to the limited group. + for i in 0..10 { + sched + .submit( + &TaskSubmission::new("test::fast") + .key(format!("lim-{i}")) + .group("limited-group"), + ) + .await + .unwrap(); + } + + // Submit 10 tasks to an unlimited group. + for i in 0..10 { + sched + .submit( + &TaskSubmission::new("test::worker") + .key(format!("unlim-{i}")) + .group("free-group"), + ) + .await + .unwrap(); + } + + let token = CancellationToken::new(); + let sched2 = sched.clone(); + let token2 = token.clone(); + let handle = tokio::spawn(async move { sched2.run(token2).await }); + + tokio::time::sleep(Duration::from_millis(500)).await; + + let limited = count_limited2.load(Ordering::SeqCst); + let unlimited = count_unlimited2.load(Ordering::SeqCst); + + // Unlimited group should have dispatched all 10. + assert!( + unlimited >= 8, + "unlimited group should dispatch all tasks, got {unlimited}" + ); + // Limited group should have dispatched only ~2 (2/sec burst). + assert!( + limited <= 5, + "limited group should be throttled, got {limited}" + ); + + token.cancel(); + let _ = handle.await; +} + +// ═══════════════════════════════════════════════════════════════════ +// dual_scope_both_checked +// ═══════════════════════════════════════════════════════════════════ + +/// Task with both type and group rate limits — rejected if either is +/// exhausted. +#[tokio::test] +async fn dual_scope_both_checked() { + let count = Arc::new(AtomicUsize::new(0)); + + let sched = Scheduler::builder() + .store(TaskStore::open_memory().await.unwrap()) + .domain( + Domain::::new().task::(CountingExecutor { + count: count.clone(), + }), + ) + .max_concurrency(100) + // Type limit: 10/sec + .rate_limit("test::fast", RateLimit::per_second(10)) + // Group limit: 3/sec (stricter) + .group_rate_limit("strict-group", RateLimit::per_second(3)) + .build() + .await + .unwrap(); + + for i in 0..20 { + sched + .submit( + &TaskSubmission::new("test::fast") + .key(format!("dual-{i}")) + .group("strict-group"), + ) + .await + .unwrap(); + } + + let token = CancellationToken::new(); + let sched2 = sched.clone(); + let token2 = token.clone(); + let handle = tokio::spawn(async move { sched2.run(token2).await }); + + tokio::time::sleep(Duration::from_millis(400)).await; + + let dispatched = count.load(Ordering::SeqCst); + // Group limit of 3 is stricter — should cap dispatch to ~3 initial burst. + assert!( + dispatched <= 6, + "stricter group limit should cap dispatch, got {dispatched}" + ); + + token.cancel(); + let _ = handle.await; +} + +// ═══════════════════════════════════════════════════════════════════ +// runtime_set_rate_limit +// ═══════════════════════════════════════════════════════════════════ + +/// Call `set_rate_limit()` at runtime and verify it takes effect. +#[tokio::test] +async fn runtime_set_rate_limit() { + let count = Arc::new(AtomicUsize::new(0)); + + let sched = Scheduler::builder() + .store(TaskStore::open_memory().await.unwrap()) + .domain( + Domain::::new().task::(CountingExecutor { + count: count.clone(), + }), + ) + .max_concurrency(100) + .build() + .await + .unwrap(); + + // Submit tasks without rate limit — should all dispatch quickly. + for i in 0..5 { + sched + .submit(&TaskSubmission::new("test::fast").key(format!("pre-{i}"))) + .await + .unwrap(); + } + + let token = CancellationToken::new(); + let sched2 = sched.clone(); + let token2 = token.clone(); + let handle = tokio::spawn(async move { sched2.run(token2).await }); + + tokio::time::sleep(Duration::from_millis(300)).await; + let before_limit = count.load(Ordering::SeqCst); + assert_eq!(before_limit, 5, "all 5 should dispatch without rate limit"); + + // Now set a tight rate limit. + sched.set_rate_limit("test::fast", RateLimit::per_second(2)); + + // Submit 10 more tasks. + for i in 0..10 { + sched + .submit(&TaskSubmission::new("test::fast").key(format!("post-{i}"))) + .await + .unwrap(); + } + + tokio::time::sleep(Duration::from_millis(500)).await; + let after_limit = count.load(Ordering::SeqCst); + // Should have dispatched the original 5 + only ~2-3 more (2/sec rate limit). + assert!( + after_limit < 15, + "rate limit should throttle, got {after_limit} (expected < 15)" + ); + + token.cancel(); + let _ = handle.await; +} + +// ═══════════════════════════════════════════════════════════════════ +// runtime_remove_rate_limit +// ═══════════════════════════════════════════════════════════════════ + +/// Remove a rate limit at runtime and verify tasks dispatch freely. +#[tokio::test] +async fn runtime_remove_rate_limit() { + let count = Arc::new(AtomicUsize::new(0)); + + let sched = Scheduler::builder() + .store(TaskStore::open_memory().await.unwrap()) + .domain( + Domain::::new().task::(CountingExecutor { + count: count.clone(), + }), + ) + .max_concurrency(100) + .rate_limit("test::fast", RateLimit::per_second(2)) + .build() + .await + .unwrap(); + + for i in 0..10 { + sched + .submit(&TaskSubmission::new("test::fast").key(format!("rl-{i}"))) + .await + .unwrap(); + } + + let token = CancellationToken::new(); + let sched2 = sched.clone(); + let token2 = token.clone(); + let handle = tokio::spawn(async move { sched2.run(token2).await }); + + tokio::time::sleep(Duration::from_millis(400)).await; + let throttled = count.load(Ordering::SeqCst); + assert!( + throttled <= 5, + "should be throttled before removal, got {throttled}" + ); + + // Remove the rate limit. + sched.remove_rate_limit("test::fast"); + + // Wait for remaining tasks to dispatch freely. + tokio::time::sleep(Duration::from_millis(800)).await; + let after_removal = count.load(Ordering::SeqCst); + assert_eq!( + after_removal, 10, + "all tasks should complete after removing rate limit, got {after_removal}" + ); + + token.cancel(); + let _ = handle.await; +} + +// ═══════════════════════════════════════════════════════════════════ +// rate_limit_with_concurrency +// ═══════════════════════════════════════════════════════════════════ + +/// Both rate limit and concurrency limit active — stricter one wins. +#[tokio::test] +async fn rate_limit_with_concurrency() { + let sched = Scheduler::builder() + .store(TaskStore::open_memory().await.unwrap()) + .domain( + Domain::::new().task::(DelayExecutor(Duration::from_millis(200))), + ) + .max_concurrency(2) // concurrency limit of 2 + .rate_limit("test::slow", RateLimit::per_second(100)) // generous rate limit + .build() + .await + .unwrap(); + + let mut rx = sched.subscribe(); + + for i in 0..6 { + sched + .submit(&TaskSubmission::new("test::slow").key(format!("conc-{i}"))) + .await + .unwrap(); + } + + let token = CancellationToken::new(); + let sched2 = sched.clone(); + let token2 = token.clone(); + let handle = tokio::spawn(async move { sched2.run(token2).await }); + + // Wait for all 6 tasks to complete (2 at a time, 200ms each = ~600ms). + let deadline = tokio::time::Instant::now() + Duration::from_secs(3); + let mut completed = 0usize; + while completed < 6 && tokio::time::Instant::now() < deadline { + match tokio::time::timeout(Duration::from_millis(100), rx.recv()).await { + Ok(Ok(SchedulerEvent::Completed(_))) => completed += 1, + _ => continue, + } + } + assert_eq!(completed, 6, "all tasks should complete, got {completed}"); + + token.cancel(); + let _ = handle.await; +} + +// ═══════════════════════════════════════════════════════════════════ +// builder_configures_rate_limits +// ═══════════════════════════════════════════════════════════════════ + +/// Builder API sets limits visible in snapshot. +#[tokio::test] +async fn builder_configures_rate_limits() { + let sched = Scheduler::builder() + .store(TaskStore::open_memory().await.unwrap()) + .domain(Domain::::new().task::(NoopExecutor)) + .rate_limit("test::test", RateLimit::per_second(50)) + .group_rate_limit("my-group", RateLimit::per_minute(100).with_burst(20)) + .build() + .await + .unwrap(); + + let snap = sched.snapshot().await.unwrap(); + assert_eq!(snap.rate_limits.len(), 2); + + let type_limit = snap + .rate_limits + .iter() + .find(|r| r.scope == "type:test::test") + .expect("type rate limit should be in snapshot"); + assert_eq!(type_limit.permits, 50); + assert_eq!(type_limit.burst, 50); + assert_eq!(type_limit.scope_kind, "type"); + + let group_limit = snap + .rate_limits + .iter() + .find(|r| r.scope == "group:my-group") + .expect("group rate limit should be in snapshot"); + assert_eq!(group_limit.permits, 100); + assert_eq!(group_limit.burst, 20); + assert_eq!(group_limit.scope_kind, "group"); +} + +// ═══════════════════════════════════════════════════════════════════ +// rate_limited_task_sets_run_after +// ═══════════════════════════════════════════════════════════════════ + +/// Rate-limited task gets `run_after` set, pushing it out of the peek +/// window so other tasks can dispatch. +#[tokio::test] +async fn rate_limited_task_sets_run_after() { + let fast_count = Arc::new(AtomicUsize::new(0)); + let worker_count = Arc::new(AtomicUsize::new(0)); + + let sched = Scheduler::builder() + .store(TaskStore::open_memory().await.unwrap()) + .domain( + Domain::::new() + .task::(CountingExecutor { + count: fast_count.clone(), + }) + .task::(CountingExecutor { + count: worker_count.clone(), + }), + ) + .max_concurrency(100) + // Tight rate limit on FastTask. + .rate_limit("test::fast", RateLimit::per_second(1).with_burst(1)) + .build() + .await + .unwrap(); + + // Submit 5 FastTask (rate limited) and 5 WorkerTask (no limit). + for i in 0..5 { + sched + .submit(&TaskSubmission::new("test::fast").key(format!("fast-{i}"))) + .await + .unwrap(); + } + for i in 0..5 { + sched + .submit(&TaskSubmission::new("test::worker").key(format!("worker-{i}"))) + .await + .unwrap(); + } + + let token = CancellationToken::new(); + let sched2 = sched.clone(); + let token2 = token.clone(); + let handle = tokio::spawn(async move { sched2.run(token2).await }); + + tokio::time::sleep(Duration::from_millis(500)).await; + + let fast = fast_count.load(Ordering::SeqCst); + let worker = worker_count.load(Ordering::SeqCst); + + // WorkerTask should all be complete (no rate limit). + assert_eq!(worker, 5, "all WorkerTasks should dispatch, got {worker}"); + // FastTask should be limited (1/sec burst). + assert!(fast <= 2, "FastTask should be rate-limited, got {fast}"); + + token.cancel(); + let _ = handle.await; +} + +// ═══════════════════════════════════════════════════════════════════ +// no_head_of_line_blocking +// ═══════════════════════════════════════════════════════════════════ + +/// High-priority rate-limited type does not block lower-priority tasks +/// of a different type. +#[tokio::test] +async fn no_head_of_line_blocking() { + let fast_count = Arc::new(AtomicUsize::new(0)); + let worker_count = Arc::new(AtomicUsize::new(0)); + + let sched = Scheduler::builder() + .store(TaskStore::open_memory().await.unwrap()) + .domain( + Domain::::new() + .task::(CountingExecutor { + count: fast_count.clone(), + }) + .task::(CountingExecutor { + count: worker_count.clone(), + }), + ) + .max_concurrency(100) + // Very tight rate limit on FastTask (1 per 10 seconds). + .rate_limit("test::fast", RateLimit::per_second(1).with_burst(1)) + .build() + .await + .unwrap(); + + // Submit high-priority rate-limited tasks. + for i in 0..3 { + sched + .submit( + &TaskSubmission::new("test::fast") + .key(format!("hp-{i}")) + .priority(taskmill::Priority::HIGH), + ) + .await + .unwrap(); + } + // Submit lower-priority non-rate-limited tasks. + for i in 0..5 { + sched + .submit( + &TaskSubmission::new("test::worker") + .key(format!("lp-{i}")) + .priority(taskmill::Priority::NORMAL), + ) + .await + .unwrap(); + } + + let token = CancellationToken::new(); + let sched2 = sched.clone(); + let token2 = token.clone(); + let handle = tokio::spawn(async move { sched2.run(token2).await }); + + tokio::time::sleep(Duration::from_millis(600)).await; + + let worker = worker_count.load(Ordering::SeqCst); + // Lower-priority WorkerTasks should still dispatch despite + // high-priority FastTasks being rate-limited. + assert!( + worker >= 3, + "worker tasks should dispatch despite rate-limited high-priority tasks, got {worker}" + ); + + token.cancel(); + let _ = handle.await; +}