Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/taskito-async/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "taskito-async"
version = "0.7.0"
version = "0.8.0"
edition = "2021"

[dependencies]
Expand Down
2 changes: 1 addition & 1 deletion crates/taskito-core/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "taskito-core"
version = "0.7.0"
version = "0.8.0"
edition = "2021"

[features]
Expand Down
2 changes: 1 addition & 1 deletion crates/taskito-core/src/scheduler/maintenance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ impl Scheduler {
depends_on: vec![],
expires_at: None,
result_ttl_ms: None,
namespace: None,
namespace: self.namespace.clone(),
};

if let Err(e) = self.storage.enqueue_unique(new_job) {
Expand Down
21 changes: 15 additions & 6 deletions crates/taskito-core/src/scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ pub struct Scheduler {
config: SchedulerConfig,
shutdown: Arc<Notify>,
paused_cache: Mutex<(HashSet<String>, Instant)>,
namespace: Option<String>,
}

/// Counters for tick-based scheduling of periodic maintenance tasks.
Expand All @@ -139,7 +140,12 @@ struct TickCounters {
}

impl Scheduler {
pub fn new(storage: StorageBackend, queues: Vec<String>, config: SchedulerConfig) -> Self {
pub fn new(
storage: StorageBackend,
queues: Vec<String>,
config: SchedulerConfig,
namespace: Option<String>,
) -> Self {
let rate_limiter = RateLimiter::new(storage.clone());
let dlq = DeadLetterQueue::new(storage.clone());
let circuit_breaker = CircuitBreaker::new(storage.clone());
Expand All @@ -155,6 +161,7 @@ impl Scheduler {
config,
shutdown: Arc::new(Notify::new()),
paused_cache: Mutex::new((HashSet::new(), Instant::now())),
namespace,
}
}

Expand Down Expand Up @@ -249,6 +256,7 @@ mod tests {
storage,
vec!["default".to_string()],
SchedulerConfig::default(),
None,
)
}

Expand Down Expand Up @@ -445,7 +453,7 @@ mod tests {
// The second job should be back in pending with a future scheduled_at
let jobs = scheduler
.storage
.list_jobs(Some(JobStatus::Pending as i32), None, None, 10, 0)
.list_jobs(Some(JobStatus::Pending as i32), None, None, 10, 0, None)
.unwrap();
assert_eq!(jobs.len(), 1);
assert!(jobs[0].scheduled_at > now_millis());
Expand Down Expand Up @@ -486,7 +494,7 @@ mod tests {
// Job should be rescheduled
let jobs = scheduler
.storage
.list_jobs(Some(JobStatus::Pending as i32), None, None, 10, 0)
.list_jobs(Some(JobStatus::Pending as i32), None, None, 10, 0, None)
.unwrap();
assert_eq!(jobs.len(), 1);
assert!(jobs[0].scheduled_at > now_millis());
Expand Down Expand Up @@ -562,6 +570,7 @@ mod tests {
Some("periodic_task"),
10,
0,
None,
)
.unwrap();
assert_eq!(jobs.len(), 1);
Expand All @@ -575,13 +584,13 @@ mod tests {
result_ttl_ms: Some(1), // 1ms TTL
..SchedulerConfig::default()
};
let scheduler = Scheduler::new(storage, vec!["default".to_string()], config);
let scheduler = Scheduler::new(storage, vec!["default".to_string()], config, None);

// Enqueue, dequeue, and complete a job
let job = scheduler.storage.enqueue(make_job("cleanup_task")).unwrap();
scheduler
.storage
.dequeue("default", now_millis() + 1000)
.dequeue("default", now_millis() + 1000, None)
.unwrap();
scheduler.storage.complete(&job.id, Some(vec![1])).unwrap();

Expand All @@ -605,7 +614,7 @@ mod tests {
cleanup_interval: 1,
..SchedulerConfig::default()
};
let scheduler = Scheduler::new(storage, vec!["default".to_string()], config);
let scheduler = Scheduler::new(storage, vec!["default".to_string()], config, None);

scheduler.storage.enqueue(make_job("tick_task")).unwrap();

Expand Down
5 changes: 4 additions & 1 deletion crates/taskito-core/src/scheduler/poller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,10 @@ impl Scheduler {
return Ok(false);
}

let job = match self.storage.dequeue_from(&active_queues, now)? {
let job = match self
.storage
.dequeue_from(&active_queues, now, self.namespace.as_deref())?
{
Some(j) => j,
None => return Ok(false),
};
Expand Down
32 changes: 27 additions & 5 deletions crates/taskito-core/src/storage/diesel_common/jobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,17 +270,27 @@ macro_rules! impl_diesel_job_ops {
}

/// Atomically dequeue the highest-priority ready job from the given queue.
/// Skips expired jobs.
pub fn dequeue(&self, queue_name: &str, now: i64) -> Result<Option<Job>> {
/// Skips expired jobs. When `namespace` is `Some`, only jobs in that
/// namespace are considered; when `None`, only jobs with no namespace.
pub fn dequeue(&self, queue_name: &str, now: i64, namespace: Option<&str>) -> Result<Option<Job>> {
let mut conn = self.conn()?;

conn.transaction(|conn| {
let candidates: Vec<JobRow> = jobs::table
let mut query = jobs::table
.filter(jobs::queue.eq(queue_name))
.filter(jobs::status.eq(JobStatus::Pending as i32))
.filter(jobs::scheduled_at.le(now))
.order((jobs::priority.desc(), jobs::scheduled_at.asc()))
.limit(100)
.into_boxed();

if let Some(ns) = namespace {
query = query.filter(jobs::namespace.eq(ns));
} else {
query = query.filter(jobs::namespace.is_null());
}

let candidates: Vec<JobRow> = query
.select(JobRow::as_select())
.load(conn)?;

Expand Down Expand Up @@ -327,9 +337,9 @@ macro_rules! impl_diesel_job_ops {
}

/// Dequeue from multiple queues, checking each in order.
pub fn dequeue_from(&self, queues: &[String], now: i64) -> Result<Option<Job>> {
pub fn dequeue_from(&self, queues: &[String], now: i64, namespace: Option<&str>) -> Result<Option<Job>> {
for queue_name in queues {
if let Some(job) = self.dequeue(queue_name, now)? {
if let Some(job) = self.dequeue(queue_name, now, namespace)? {
return Ok(Some(job));
}
}
Expand Down Expand Up @@ -554,13 +564,16 @@ macro_rules! impl_diesel_job_ops {
}

/// List jobs with optional filters and pagination.
/// When `namespace` is `Some`, only jobs in that namespace are returned.
/// When `None`, all jobs are returned regardless of namespace.
pub fn list_jobs(
&self,
status: Option<i32>,
queue_name: Option<&str>,
task_name: Option<&str>,
limit: i64,
offset: i64,
namespace: Option<&str>,
) -> Result<Vec<Job>> {
let mut conn = self.conn()?;

Expand All @@ -575,6 +588,9 @@ macro_rules! impl_diesel_job_ops {
if let Some(t) = task_name {
query = query.filter(jobs::task_name.eq(t));
}
if let Some(ns) = namespace {
query = query.filter(jobs::namespace.eq(ns));
}

let rows: Vec<JobRow> = query
.limit(limit)
Expand Down Expand Up @@ -678,6 +694,8 @@ macro_rules! impl_diesel_job_ops {
}

/// List jobs with extended filters.
/// When `namespace` is `Some`, only jobs in that namespace are returned.
/// When `None`, all jobs are returned regardless of namespace.
#[allow(clippy::too_many_arguments)]
pub fn list_jobs_filtered(
&self,
Expand All @@ -690,6 +708,7 @@ macro_rules! impl_diesel_job_ops {
created_before: Option<i64>,
limit: i64,
offset: i64,
namespace: Option<&str>,
) -> Result<Vec<Job>> {
let mut conn = self.conn()?;

Expand All @@ -716,6 +735,9 @@ macro_rules! impl_diesel_job_ops {
if let Some(before) = created_before {
query = query.filter(jobs::created_at.le(before));
}
if let Some(ns) = namespace {
query = query.filter(jobs::namespace.eq(ns));
}

let rows: Vec<JobRow> = query
.limit(limit)
Expand Down
33 changes: 24 additions & 9 deletions crates/taskito-core/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ pub struct DeadJob {
pub max_retries: i32,
pub timeout_ms: i64,
pub result_ttl_ms: Option<i64>,
pub namespace: Option<String>,
}

impl From<models::DeadLetterRow> for DeadJob {
Expand All @@ -58,6 +59,7 @@ impl From<models::DeadLetterRow> for DeadJob {
max_retries: row.max_retries,
timeout_ms: row.timeout_ms,
result_ttl_ms: row.result_ttl_ms,
namespace: row.namespace,
}
}
}
Expand Down Expand Up @@ -93,15 +95,17 @@ macro_rules! impl_storage {
&self,
queue_name: &str,
now: i64,
namespace: Option<&str>,
) -> $crate::error::Result<Option<$crate::job::Job>> {
self.dequeue(queue_name, now)
self.dequeue(queue_name, now, namespace)
}
fn dequeue_from(
&self,
queues: &[String],
now: i64,
namespace: Option<&str>,
) -> $crate::error::Result<Option<$crate::job::Job>> {
self.dequeue_from(queues, now)
self.dequeue_from(queues, now, namespace)
}
fn complete(
&self,
Expand Down Expand Up @@ -151,8 +155,9 @@ macro_rules! impl_storage {
task_name: Option<&str>,
limit: i64,
offset: i64,
namespace: Option<&str>,
) -> $crate::error::Result<Vec<$crate::job::Job>> {
self.list_jobs(status, queue_name, task_name, limit, offset)
self.list_jobs(status, queue_name, task_name, limit, offset, namespace)
}
fn get_job(&self, id: &str) -> $crate::error::Result<Option<$crate::job::Job>> {
self.get_job(id)
Expand Down Expand Up @@ -473,6 +478,7 @@ macro_rules! impl_storage {
created_before: Option<i64>,
limit: i64,
offset: i64,
namespace: Option<&str>,
) -> $crate::error::Result<Vec<$crate::job::Job>> {
self.list_jobs_filtered(
status,
Expand All @@ -484,6 +490,7 @@ macro_rules! impl_storage {
created_before,
limit,
offset,
namespace,
)
}
}
Expand Down Expand Up @@ -526,11 +533,16 @@ impl Storage for StorageBackend {
fn enqueue_unique(&self, new_job: NewJob) -> Result<Job> {
delegate!(self, enqueue_unique, new_job)
}
fn dequeue(&self, queue_name: &str, now: i64) -> Result<Option<Job>> {
delegate!(self, dequeue, queue_name, now)
fn dequeue(&self, queue_name: &str, now: i64, namespace: Option<&str>) -> Result<Option<Job>> {
delegate!(self, dequeue, queue_name, now, namespace)
}
fn dequeue_from(&self, queues: &[String], now: i64) -> Result<Option<Job>> {
delegate!(self, dequeue_from, queues, now)
fn dequeue_from(
&self,
queues: &[String],
now: i64,
namespace: Option<&str>,
) -> Result<Option<Job>> {
delegate!(self, dequeue_from, queues, now, namespace)
}
fn complete(&self, id: &str, result_bytes: Option<Vec<u8>>) -> Result<()> {
delegate!(self, complete, id, result_bytes)
Expand Down Expand Up @@ -572,8 +584,9 @@ impl Storage for StorageBackend {
task_name: Option<&str>,
limit: i64,
offset: i64,
namespace: Option<&str>,
) -> Result<Vec<Job>> {
delegate!(self, list_jobs, status, queue_name, task_name, limit, offset)
delegate!(self, list_jobs, status, queue_name, task_name, limit, offset, namespace)
}
fn get_job(&self, id: &str) -> Result<Option<Job>> {
delegate!(self, get_job, id)
Expand Down Expand Up @@ -818,6 +831,7 @@ impl Storage for StorageBackend {
created_before: Option<i64>,
limit: i64,
offset: i64,
namespace: Option<&str>,
) -> Result<Vec<Job>> {
delegate!(
self,
Expand All @@ -830,7 +844,8 @@ impl Storage for StorageBackend {
created_after,
created_before,
limit,
offset
offset,
namespace
)
}
}
3 changes: 3 additions & 0 deletions crates/taskito-core/src/storage/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ pub struct DeadLetterRow {
pub max_retries: i32,
pub timeout_ms: i64,
pub result_ttl_ms: Option<i64>,
pub namespace: Option<String>,
}

/// Insertable struct for dead letter entries.
Expand All @@ -94,6 +95,7 @@ pub struct NewDeadLetterRow<'a> {
pub max_retries: i32,
pub timeout_ms: i64,
pub result_ttl_ms: Option<i64>,
pub namespace: Option<&'a str>,
}

/// A row in the `rate_limits` table.
Expand Down Expand Up @@ -394,4 +396,5 @@ pub struct ArchivedJobRow {
pub cancel_requested: i32,
pub expires_at: Option<i64>,
pub result_ttl_ms: Option<i64>,
pub namespace: Option<String>,
}
2 changes: 1 addition & 1 deletion crates/taskito-core/src/storage/postgres/archival.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ impl PostgresStorage {
cancel_requested: row.cancel_requested != 0,
expires_at: row.expires_at,
result_ttl_ms: row.result_ttl_ms,
namespace: None,
namespace: row.namespace,
})
.collect())
}
Expand Down
3 changes: 2 additions & 1 deletion crates/taskito-core/src/storage/postgres/dead_letter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ impl PostgresStorage {
max_retries: job.max_retries,
timeout_ms: job.timeout_ms,
result_ttl_ms: job.result_ttl_ms,
namespace: job.namespace.as_deref(),
};

diesel::insert_into(dead_letter::table)
Expand Down Expand Up @@ -94,7 +95,7 @@ impl PostgresStorage {
depends_on: vec![],
expires_at: None,
result_ttl_ms: dead_row.result_ttl_ms,
namespace: None,
namespace: dead_row.namespace,
};

let job = new_job.into_job();
Expand Down
Loading
Loading