diff --git a/crates/taskito-async/Cargo.toml b/crates/taskito-async/Cargo.toml index a62f965..0291444 100644 --- a/crates/taskito-async/Cargo.toml +++ b/crates/taskito-async/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "taskito-async" -version = "0.7.0" +version = "0.8.0" edition = "2021" [dependencies] diff --git a/crates/taskito-core/Cargo.toml b/crates/taskito-core/Cargo.toml index 0a19fd8..6bb4504 100644 --- a/crates/taskito-core/Cargo.toml +++ b/crates/taskito-core/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "taskito-core" -version = "0.7.0" +version = "0.8.0" edition = "2021" [features] diff --git a/crates/taskito-core/src/scheduler/maintenance.rs b/crates/taskito-core/src/scheduler/maintenance.rs index 94725d1..de069f5 100644 --- a/crates/taskito-core/src/scheduler/maintenance.rs +++ b/crates/taskito-core/src/scheduler/maintenance.rs @@ -99,7 +99,7 @@ impl Scheduler { depends_on: vec![], expires_at: None, result_ttl_ms: None, - namespace: None, + namespace: self.namespace.clone(), }; if let Err(e) = self.storage.enqueue_unique(new_job) { diff --git a/crates/taskito-core/src/scheduler/mod.rs b/crates/taskito-core/src/scheduler/mod.rs index db4bfcd..ebe6441 100644 --- a/crates/taskito-core/src/scheduler/mod.rs +++ b/crates/taskito-core/src/scheduler/mod.rs @@ -128,6 +128,7 @@ pub struct Scheduler { config: SchedulerConfig, shutdown: Arc, paused_cache: Mutex<(HashSet, Instant)>, + namespace: Option, } /// Counters for tick-based scheduling of periodic maintenance tasks. @@ -139,7 +140,12 @@ struct TickCounters { } impl Scheduler { - pub fn new(storage: StorageBackend, queues: Vec, config: SchedulerConfig) -> Self { + pub fn new( + storage: StorageBackend, + queues: Vec, + config: SchedulerConfig, + namespace: Option, + ) -> Self { let rate_limiter = RateLimiter::new(storage.clone()); let dlq = DeadLetterQueue::new(storage.clone()); let circuit_breaker = CircuitBreaker::new(storage.clone()); @@ -155,6 +161,7 @@ impl Scheduler { config, shutdown: Arc::new(Notify::new()), paused_cache: Mutex::new((HashSet::new(), Instant::now())), + namespace, } } @@ -249,6 +256,7 @@ mod tests { storage, vec!["default".to_string()], SchedulerConfig::default(), + None, ) } @@ -445,7 +453,7 @@ mod tests { // The second job should be back in pending with a future scheduled_at let jobs = scheduler .storage - .list_jobs(Some(JobStatus::Pending as i32), None, None, 10, 0) + .list_jobs(Some(JobStatus::Pending as i32), None, None, 10, 0, None) .unwrap(); assert_eq!(jobs.len(), 1); assert!(jobs[0].scheduled_at > now_millis()); @@ -486,7 +494,7 @@ mod tests { // Job should be rescheduled let jobs = scheduler .storage - .list_jobs(Some(JobStatus::Pending as i32), None, None, 10, 0) + .list_jobs(Some(JobStatus::Pending as i32), None, None, 10, 0, None) .unwrap(); assert_eq!(jobs.len(), 1); assert!(jobs[0].scheduled_at > now_millis()); @@ -562,6 +570,7 @@ mod tests { Some("periodic_task"), 10, 0, + None, ) .unwrap(); assert_eq!(jobs.len(), 1); @@ -575,13 +584,13 @@ mod tests { result_ttl_ms: Some(1), // 1ms TTL ..SchedulerConfig::default() }; - let scheduler = Scheduler::new(storage, vec!["default".to_string()], config); + let scheduler = Scheduler::new(storage, vec!["default".to_string()], config, None); // Enqueue, dequeue, and complete a job let job = scheduler.storage.enqueue(make_job("cleanup_task")).unwrap(); scheduler .storage - .dequeue("default", now_millis() + 1000) + .dequeue("default", now_millis() + 1000, None) .unwrap(); scheduler.storage.complete(&job.id, Some(vec![1])).unwrap(); @@ -605,7 +614,7 @@ mod tests { cleanup_interval: 1, ..SchedulerConfig::default() }; - let scheduler = Scheduler::new(storage, vec!["default".to_string()], config); + let scheduler = Scheduler::new(storage, vec!["default".to_string()], config, None); scheduler.storage.enqueue(make_job("tick_task")).unwrap(); diff --git a/crates/taskito-core/src/scheduler/poller.rs b/crates/taskito-core/src/scheduler/poller.rs index 3463cd7..aa48e7c 100644 --- a/crates/taskito-core/src/scheduler/poller.rs +++ b/crates/taskito-core/src/scheduler/poller.rs @@ -51,7 +51,10 @@ impl Scheduler { return Ok(false); } - let job = match self.storage.dequeue_from(&active_queues, now)? { + let job = match self + .storage + .dequeue_from(&active_queues, now, self.namespace.as_deref())? + { Some(j) => j, None => return Ok(false), }; diff --git a/crates/taskito-core/src/storage/diesel_common/jobs.rs b/crates/taskito-core/src/storage/diesel_common/jobs.rs index e808376..0222f63 100644 --- a/crates/taskito-core/src/storage/diesel_common/jobs.rs +++ b/crates/taskito-core/src/storage/diesel_common/jobs.rs @@ -270,17 +270,27 @@ macro_rules! impl_diesel_job_ops { } /// Atomically dequeue the highest-priority ready job from the given queue. - /// Skips expired jobs. - pub fn dequeue(&self, queue_name: &str, now: i64) -> Result> { + /// Skips expired jobs. When `namespace` is `Some`, only jobs in that + /// namespace are considered; when `None`, only jobs with no namespace. + pub fn dequeue(&self, queue_name: &str, now: i64, namespace: Option<&str>) -> Result> { let mut conn = self.conn()?; conn.transaction(|conn| { - let candidates: Vec = jobs::table + let mut query = jobs::table .filter(jobs::queue.eq(queue_name)) .filter(jobs::status.eq(JobStatus::Pending as i32)) .filter(jobs::scheduled_at.le(now)) .order((jobs::priority.desc(), jobs::scheduled_at.asc())) .limit(100) + .into_boxed(); + + if let Some(ns) = namespace { + query = query.filter(jobs::namespace.eq(ns)); + } else { + query = query.filter(jobs::namespace.is_null()); + } + + let candidates: Vec = query .select(JobRow::as_select()) .load(conn)?; @@ -327,9 +337,9 @@ macro_rules! impl_diesel_job_ops { } /// Dequeue from multiple queues, checking each in order. - pub fn dequeue_from(&self, queues: &[String], now: i64) -> Result> { + pub fn dequeue_from(&self, queues: &[String], now: i64, namespace: Option<&str>) -> Result> { for queue_name in queues { - if let Some(job) = self.dequeue(queue_name, now)? { + if let Some(job) = self.dequeue(queue_name, now, namespace)? { return Ok(Some(job)); } } @@ -554,6 +564,8 @@ macro_rules! impl_diesel_job_ops { } /// List jobs with optional filters and pagination. + /// When `namespace` is `Some`, only jobs in that namespace are returned. + /// When `None`, all jobs are returned regardless of namespace. pub fn list_jobs( &self, status: Option, @@ -561,6 +573,7 @@ macro_rules! impl_diesel_job_ops { task_name: Option<&str>, limit: i64, offset: i64, + namespace: Option<&str>, ) -> Result> { let mut conn = self.conn()?; @@ -575,6 +588,9 @@ macro_rules! impl_diesel_job_ops { if let Some(t) = task_name { query = query.filter(jobs::task_name.eq(t)); } + if let Some(ns) = namespace { + query = query.filter(jobs::namespace.eq(ns)); + } let rows: Vec = query .limit(limit) @@ -678,6 +694,8 @@ macro_rules! impl_diesel_job_ops { } /// List jobs with extended filters. + /// When `namespace` is `Some`, only jobs in that namespace are returned. + /// When `None`, all jobs are returned regardless of namespace. #[allow(clippy::too_many_arguments)] pub fn list_jobs_filtered( &self, @@ -690,6 +708,7 @@ macro_rules! impl_diesel_job_ops { created_before: Option, limit: i64, offset: i64, + namespace: Option<&str>, ) -> Result> { let mut conn = self.conn()?; @@ -716,6 +735,9 @@ macro_rules! impl_diesel_job_ops { if let Some(before) = created_before { query = query.filter(jobs::created_at.le(before)); } + if let Some(ns) = namespace { + query = query.filter(jobs::namespace.eq(ns)); + } let rows: Vec = query .limit(limit) diff --git a/crates/taskito-core/src/storage/mod.rs b/crates/taskito-core/src/storage/mod.rs index 55a81f2..df68f4c 100644 --- a/crates/taskito-core/src/storage/mod.rs +++ b/crates/taskito-core/src/storage/mod.rs @@ -40,6 +40,7 @@ pub struct DeadJob { pub max_retries: i32, pub timeout_ms: i64, pub result_ttl_ms: Option, + pub namespace: Option, } impl From for DeadJob { @@ -58,6 +59,7 @@ impl From for DeadJob { max_retries: row.max_retries, timeout_ms: row.timeout_ms, result_ttl_ms: row.result_ttl_ms, + namespace: row.namespace, } } } @@ -93,15 +95,17 @@ macro_rules! impl_storage { &self, queue_name: &str, now: i64, + namespace: Option<&str>, ) -> $crate::error::Result> { - self.dequeue(queue_name, now) + self.dequeue(queue_name, now, namespace) } fn dequeue_from( &self, queues: &[String], now: i64, + namespace: Option<&str>, ) -> $crate::error::Result> { - self.dequeue_from(queues, now) + self.dequeue_from(queues, now, namespace) } fn complete( &self, @@ -151,8 +155,9 @@ macro_rules! impl_storage { task_name: Option<&str>, limit: i64, offset: i64, + namespace: Option<&str>, ) -> $crate::error::Result> { - self.list_jobs(status, queue_name, task_name, limit, offset) + self.list_jobs(status, queue_name, task_name, limit, offset, namespace) } fn get_job(&self, id: &str) -> $crate::error::Result> { self.get_job(id) @@ -473,6 +478,7 @@ macro_rules! impl_storage { created_before: Option, limit: i64, offset: i64, + namespace: Option<&str>, ) -> $crate::error::Result> { self.list_jobs_filtered( status, @@ -484,6 +490,7 @@ macro_rules! impl_storage { created_before, limit, offset, + namespace, ) } } @@ -526,11 +533,16 @@ impl Storage for StorageBackend { fn enqueue_unique(&self, new_job: NewJob) -> Result { delegate!(self, enqueue_unique, new_job) } - fn dequeue(&self, queue_name: &str, now: i64) -> Result> { - delegate!(self, dequeue, queue_name, now) + fn dequeue(&self, queue_name: &str, now: i64, namespace: Option<&str>) -> Result> { + delegate!(self, dequeue, queue_name, now, namespace) } - fn dequeue_from(&self, queues: &[String], now: i64) -> Result> { - delegate!(self, dequeue_from, queues, now) + fn dequeue_from( + &self, + queues: &[String], + now: i64, + namespace: Option<&str>, + ) -> Result> { + delegate!(self, dequeue_from, queues, now, namespace) } fn complete(&self, id: &str, result_bytes: Option>) -> Result<()> { delegate!(self, complete, id, result_bytes) @@ -572,8 +584,9 @@ impl Storage for StorageBackend { task_name: Option<&str>, limit: i64, offset: i64, + namespace: Option<&str>, ) -> Result> { - delegate!(self, list_jobs, status, queue_name, task_name, limit, offset) + delegate!(self, list_jobs, status, queue_name, task_name, limit, offset, namespace) } fn get_job(&self, id: &str) -> Result> { delegate!(self, get_job, id) @@ -818,6 +831,7 @@ impl Storage for StorageBackend { created_before: Option, limit: i64, offset: i64, + namespace: Option<&str>, ) -> Result> { delegate!( self, @@ -830,7 +844,8 @@ impl Storage for StorageBackend { created_after, created_before, limit, - offset + offset, + namespace ) } } diff --git a/crates/taskito-core/src/storage/models.rs b/crates/taskito-core/src/storage/models.rs index ac5ca12..91f38d6 100644 --- a/crates/taskito-core/src/storage/models.rs +++ b/crates/taskito-core/src/storage/models.rs @@ -75,6 +75,7 @@ pub struct DeadLetterRow { pub max_retries: i32, pub timeout_ms: i64, pub result_ttl_ms: Option, + pub namespace: Option, } /// Insertable struct for dead letter entries. @@ -94,6 +95,7 @@ pub struct NewDeadLetterRow<'a> { pub max_retries: i32, pub timeout_ms: i64, pub result_ttl_ms: Option, + pub namespace: Option<&'a str>, } /// A row in the `rate_limits` table. @@ -394,4 +396,5 @@ pub struct ArchivedJobRow { pub cancel_requested: i32, pub expires_at: Option, pub result_ttl_ms: Option, + pub namespace: Option, } diff --git a/crates/taskito-core/src/storage/postgres/archival.rs b/crates/taskito-core/src/storage/postgres/archival.rs index 6bd02ea..747059e 100644 --- a/crates/taskito-core/src/storage/postgres/archival.rs +++ b/crates/taskito-core/src/storage/postgres/archival.rs @@ -70,7 +70,7 @@ impl PostgresStorage { cancel_requested: row.cancel_requested != 0, expires_at: row.expires_at, result_ttl_ms: row.result_ttl_ms, - namespace: None, + namespace: row.namespace, }) .collect()) } diff --git a/crates/taskito-core/src/storage/postgres/dead_letter.rs b/crates/taskito-core/src/storage/postgres/dead_letter.rs index 8d4f446..0069569 100644 --- a/crates/taskito-core/src/storage/postgres/dead_letter.rs +++ b/crates/taskito-core/src/storage/postgres/dead_letter.rs @@ -30,6 +30,7 @@ impl PostgresStorage { max_retries: job.max_retries, timeout_ms: job.timeout_ms, result_ttl_ms: job.result_ttl_ms, + namespace: job.namespace.as_deref(), }; diesel::insert_into(dead_letter::table) @@ -94,7 +95,7 @@ impl PostgresStorage { depends_on: vec![], expires_at: None, result_ttl_ms: dead_row.result_ttl_ms, - namespace: None, + namespace: dead_row.namespace, }; let job = new_job.into_job(); diff --git a/crates/taskito-core/src/storage/postgres/mod.rs b/crates/taskito-core/src/storage/postgres/mod.rs index 5d50341..458eecb 100644 --- a/crates/taskito-core/src/storage/postgres/mod.rs +++ b/crates/taskito-core/src/storage/postgres/mod.rs @@ -434,6 +434,16 @@ impl PostgresStorage { "ALTER TABLE circuit_breakers ADD COLUMN IF NOT EXISTS half_open_failure_count INTEGER NOT NULL DEFAULT 0", ); + // Migration: add namespace column to dead_letter and archived_jobs + migration_alter( + &mut conn, + "ALTER TABLE dead_letter ADD COLUMN IF NOT EXISTS namespace TEXT", + ); + migration_alter( + &mut conn, + "ALTER TABLE archived_jobs ADD COLUMN IF NOT EXISTS namespace TEXT", + ); + // ── Distributed Locks ───────────────────────────── diesel::sql_query( "CREATE TABLE IF NOT EXISTS distributed_locks ( diff --git a/crates/taskito-core/src/storage/redis_backend/dead_letter.rs b/crates/taskito-core/src/storage/redis_backend/dead_letter.rs index 17abbfb..0dd23ec 100644 --- a/crates/taskito-core/src/storage/redis_backend/dead_letter.rs +++ b/crates/taskito-core/src/storage/redis_backend/dead_letter.rs @@ -21,6 +21,8 @@ struct DeadJobEntry { pub max_retries: i32, pub timeout_ms: i64, pub result_ttl_ms: Option, + #[serde(default)] + pub namespace: Option, } impl From for DeadJob { @@ -39,6 +41,7 @@ impl From for DeadJob { max_retries: e.max_retries, timeout_ms: e.timeout_ms, result_ttl_ms: e.result_ttl_ms, + namespace: e.namespace, } } } @@ -63,6 +66,7 @@ impl RedisStorage { max_retries: job.max_retries, timeout_ms: job.timeout_ms, result_ttl_ms: job.result_ttl_ms, + namespace: job.namespace.clone(), }; let json = serde_json::to_string(&entry).map_err(|e| QueueError::Other(e.to_string()))?; @@ -145,7 +149,7 @@ impl RedisStorage { depends_on: vec![], expires_at: None, result_ttl_ms: entry.result_ttl_ms, - namespace: None, + namespace: entry.namespace, }; let job = self.enqueue(new_job)?; diff --git a/crates/taskito-core/src/storage/redis_backend/jobs.rs b/crates/taskito-core/src/storage/redis_backend/jobs.rs index 3d989f1..76f8816 100644 --- a/crates/taskito-core/src/storage/redis_backend/jobs.rs +++ b/crates/taskito-core/src/storage/redis_backend/jobs.rs @@ -326,7 +326,12 @@ impl RedisStorage { } } - pub fn dequeue(&self, queue_name: &str, now: i64) -> Result> { + pub fn dequeue( + &self, + queue_name: &str, + now: i64, + namespace: Option<&str>, + ) -> Result> { let mut conn = self.conn()?; let queue_key = self.key(&["queue", queue_name, "pending"]); @@ -356,6 +361,15 @@ impl RedisStorage { continue; } + // Filter by namespace: Some(ns) matches that namespace, None matches only jobs without a namespace + if let Some(ns) = namespace { + if job.namespace.as_deref() != Some(ns) { + continue; + } + } else if job.namespace.is_some() { + continue; + } + // Skip expired jobs if let Some(expires_at) = job.expires_at { if now > expires_at { @@ -403,9 +417,14 @@ impl RedisStorage { Ok(None) } - pub fn dequeue_from(&self, queues: &[String], now: i64) -> Result> { + pub fn dequeue_from( + &self, + queues: &[String], + now: i64, + namespace: Option<&str>, + ) -> Result> { for queue_name in queues { - if let Some(job) = self.dequeue(queue_name, now)? { + if let Some(job) = self.dequeue(queue_name, now, namespace)? { return Ok(Some(job)); } } @@ -632,6 +651,7 @@ impl RedisStorage { task_name: Option<&str>, limit: i64, offset: i64, + namespace: Option<&str>, ) -> Result> { let mut conn = self.conn()?; @@ -661,6 +681,11 @@ impl RedisStorage { let mut jobs = Vec::new(); for id in &ids { if let Some(job) = self.load_job(&mut conn, id)? { + if let Some(ns) = namespace { + if job.namespace.as_deref() != Some(ns) { + continue; + } + } jobs.push(job); } } @@ -687,6 +712,11 @@ impl RedisStorage { continue; } } + if let Some(ns) = namespace { + if job.namespace.as_deref() != Some(ns) { + continue; + } + } jobs.push(job); } } @@ -802,6 +832,7 @@ impl RedisStorage { created_before: Option, limit: i64, offset: i64, + namespace: Option<&str>, ) -> Result> { let mut conn = self.conn()?; @@ -860,6 +891,11 @@ impl RedisStorage { continue; } } + if let Some(ns) = namespace { + if job.namespace.as_deref() != Some(ns) { + continue; + } + } jobs.push(job); } } diff --git a/crates/taskito-core/src/storage/schema.rs b/crates/taskito-core/src/storage/schema.rs index 9d1d802..bd2fa57 100644 --- a/crates/taskito-core/src/storage/schema.rs +++ b/crates/taskito-core/src/storage/schema.rs @@ -40,6 +40,7 @@ diesel::table! { max_retries -> Integer, timeout_ms -> BigInt, result_ttl_ms -> Nullable, + namespace -> Nullable, } } @@ -186,6 +187,7 @@ diesel::table! { cancel_requested -> Integer, expires_at -> Nullable, result_ttl_ms -> Nullable, + namespace -> Nullable, } } diff --git a/crates/taskito-core/src/storage/sqlite/archival.rs b/crates/taskito-core/src/storage/sqlite/archival.rs index 64d6cff..154f86b 100644 --- a/crates/taskito-core/src/storage/sqlite/archival.rs +++ b/crates/taskito-core/src/storage/sqlite/archival.rs @@ -73,7 +73,7 @@ impl SqliteStorage { cancel_requested: row.cancel_requested != 0, expires_at: row.expires_at, result_ttl_ms: row.result_ttl_ms, - namespace: None, + namespace: row.namespace, }) .collect()) } diff --git a/crates/taskito-core/src/storage/sqlite/dead_letter.rs b/crates/taskito-core/src/storage/sqlite/dead_letter.rs index 243e17e..4189b4d 100644 --- a/crates/taskito-core/src/storage/sqlite/dead_letter.rs +++ b/crates/taskito-core/src/storage/sqlite/dead_letter.rs @@ -30,6 +30,7 @@ impl SqliteStorage { max_retries: job.max_retries, timeout_ms: job.timeout_ms, result_ttl_ms: job.result_ttl_ms, + namespace: job.namespace.as_deref(), }; diesel::insert_into(dead_letter::table) @@ -100,7 +101,7 @@ impl SqliteStorage { depends_on: vec![], expires_at: None, result_ttl_ms: dead_row.result_ttl_ms, - namespace: None, + namespace: dead_row.namespace, }; let job = new_job.into_job(); diff --git a/crates/taskito-core/src/storage/sqlite/mod.rs b/crates/taskito-core/src/storage/sqlite/mod.rs index a2ef348..0dccf32 100644 --- a/crates/taskito-core/src/storage/sqlite/mod.rs +++ b/crates/taskito-core/src/storage/sqlite/mod.rs @@ -430,6 +430,16 @@ impl SqliteStorage { "ALTER TABLE circuit_breakers ADD COLUMN half_open_failure_count INTEGER NOT NULL DEFAULT 0", ); + // Migration: add namespace column to dead_letter and archived_jobs + migration_alter( + &mut conn, + "ALTER TABLE dead_letter ADD COLUMN namespace TEXT", + ); + migration_alter( + &mut conn, + "ALTER TABLE archived_jobs ADD COLUMN namespace TEXT", + ); + // ── Distributed Locks ───────────────────────────── diesel::sql_query( "CREATE TABLE IF NOT EXISTS distributed_locks ( diff --git a/crates/taskito-core/src/storage/sqlite/tests.rs b/crates/taskito-core/src/storage/sqlite/tests.rs index 5ae4cc3..c0bb741 100644 --- a/crates/taskito-core/src/storage/sqlite/tests.rs +++ b/crates/taskito-core/src/storage/sqlite/tests.rs @@ -39,14 +39,16 @@ fn test_dequeue() { let job = storage.enqueue(make_job("dequeue_task")).unwrap(); let dequeued = storage - .dequeue("default", now_millis() + 1000) + .dequeue("default", now_millis() + 1000, None) .unwrap() .unwrap(); assert_eq!(dequeued.id, job.id); assert_eq!(dequeued.status, JobStatus::Running); // Should not dequeue again - let none = storage.dequeue("default", now_millis() + 1000).unwrap(); + let none = storage + .dequeue("default", now_millis() + 1000, None) + .unwrap(); assert!(none.is_none()); } @@ -58,10 +60,10 @@ fn test_dequeue_respects_schedule() { new_job.scheduled_at = future; storage.enqueue(new_job).unwrap(); - let none = storage.dequeue("default", now_millis()).unwrap(); + let none = storage.dequeue("default", now_millis(), None).unwrap(); assert!(none.is_none()); - let some = storage.dequeue("default", future + 1).unwrap(); + let some = storage.dequeue("default", future + 1, None).unwrap(); assert!(some.is_some()); } @@ -78,10 +80,10 @@ fn test_priority_ordering() { storage.enqueue(high).unwrap(); let now = now_millis() + 1000; - let first = storage.dequeue("default", now).unwrap().unwrap(); + let first = storage.dequeue("default", now, None).unwrap().unwrap(); assert_eq!(first.task_name, "high_priority"); - let second = storage.dequeue("default", now).unwrap().unwrap(); + let second = storage.dequeue("default", now, None).unwrap().unwrap(); assert_eq!(second.task_name, "low_priority"); } @@ -89,7 +91,9 @@ fn test_priority_ordering() { fn test_complete() { let storage = test_storage(); let job = storage.enqueue(make_job("complete_task")).unwrap(); - storage.dequeue("default", now_millis() + 1000).unwrap(); + storage + .dequeue("default", now_millis() + 1000, None) + .unwrap(); storage.complete(&job.id, Some(vec![42])).unwrap(); @@ -102,7 +106,9 @@ fn test_complete() { fn test_fail_and_retry() { let storage = test_storage(); let job = storage.enqueue(make_job("fail_task")).unwrap(); - storage.dequeue("default", now_millis() + 1000).unwrap(); + storage + .dequeue("default", now_millis() + 1000, None) + .unwrap(); storage.fail(&job.id, "something broke").unwrap(); let fetched = storage.get_job(&job.id).unwrap().unwrap(); @@ -114,7 +120,9 @@ fn test_fail_and_retry() { fn test_retry_reschedule() { let storage = test_storage(); let job = storage.enqueue(make_job("retry_task")).unwrap(); - storage.dequeue("default", now_millis() + 1000).unwrap(); + storage + .dequeue("default", now_millis() + 1000, None) + .unwrap(); let future = now_millis() + 5000; storage.retry(&job.id, future).unwrap(); @@ -129,7 +137,9 @@ fn test_retry_reschedule() { fn test_dead_letter_queue() { let storage = test_storage(); let job = storage.enqueue(make_job("dlq_task")).unwrap(); - storage.dequeue("default", now_millis() + 1000).unwrap(); + storage + .dequeue("default", now_millis() + 1000, None) + .unwrap(); storage .move_to_dlq( @@ -151,7 +161,9 @@ fn test_dead_letter_queue() { fn test_retry_dead() { let storage = test_storage(); let job = storage.enqueue(make_job("retry_dead_task")).unwrap(); - storage.dequeue("default", now_millis() + 1000).unwrap(); + storage + .dequeue("default", now_millis() + 1000, None) + .unwrap(); let running_job = storage.get_job(&job.id).unwrap().unwrap(); storage @@ -309,15 +321,15 @@ fn test_dequeue_blocks_on_unmet_dependency() { let now = now_millis() + 1000; - let dequeued = storage.dequeue("default", now).unwrap().unwrap(); + let dequeued = storage.dequeue("default", now, None).unwrap().unwrap(); assert_eq!(dequeued.id, job_a.id); - let none = storage.dequeue("default", now).unwrap(); + let none = storage.dequeue("default", now, None).unwrap(); assert!(none.is_none()); storage.complete(&job_a.id, None).unwrap(); - let dequeued = storage.dequeue("default", now).unwrap().unwrap(); + let dequeued = storage.dequeue("default", now, None).unwrap().unwrap(); assert_eq!(dequeued.task_name, "dependent_task"); } @@ -353,7 +365,7 @@ fn test_cascade_cancel_on_dlq() { let job_b = storage.enqueue(dep_b).unwrap(); let now = now_millis() + 1000; - storage.dequeue("default", now).unwrap(); + storage.dequeue("default", now, None).unwrap(); let running = storage.get_job(&job_a.id).unwrap().unwrap(); storage.move_to_dlq(&running, "fatal error", None).unwrap(); @@ -374,13 +386,13 @@ fn test_count_running_by_task() { let now = now_millis() + 1000; // Dequeue one task_a (becomes running) - storage.dequeue("default", now).unwrap().unwrap(); + storage.dequeue("default", now, None).unwrap().unwrap(); assert_eq!(storage.count_running_by_task("task_a").unwrap(), 1); assert_eq!(storage.count_running_by_task("task_b").unwrap(), 0); // Dequeue second task_a - storage.dequeue("default", now).unwrap().unwrap(); + storage.dequeue("default", now, None).unwrap().unwrap(); assert_eq!(storage.count_running_by_task("task_a").unwrap(), 2); // Nonexistent task should return 0 diff --git a/crates/taskito-core/src/storage/traits.rs b/crates/taskito-core/src/storage/traits.rs index 0fff94d..6f08874 100644 --- a/crates/taskito-core/src/storage/traits.rs +++ b/crates/taskito-core/src/storage/traits.rs @@ -13,8 +13,13 @@ pub trait Storage: Send + Sync + Clone { fn enqueue(&self, new_job: NewJob) -> Result; fn enqueue_batch(&self, new_jobs: Vec) -> Result>; fn enqueue_unique(&self, new_job: NewJob) -> Result; - fn dequeue(&self, queue_name: &str, now: i64) -> Result>; - fn dequeue_from(&self, queues: &[String], now: i64) -> Result>; + fn dequeue(&self, queue_name: &str, now: i64, namespace: Option<&str>) -> Result>; + fn dequeue_from( + &self, + queues: &[String], + now: i64, + namespace: Option<&str>, + ) -> Result>; fn complete(&self, id: &str, result_bytes: Option>) -> Result<()>; fn fail(&self, id: &str, error: &str) -> Result<()>; fn retry(&self, id: &str, next_scheduled_at: i64) -> Result<()>; @@ -33,6 +38,7 @@ pub trait Storage: Send + Sync + Clone { task_name: Option<&str>, limit: i64, offset: i64, + namespace: Option<&str>, ) -> Result>; fn get_job(&self, id: &str) -> Result>; fn stats(&self) -> Result; @@ -184,5 +190,6 @@ pub trait Storage: Send + Sync + Clone { created_before: Option, limit: i64, offset: i64, + namespace: Option<&str>, ) -> Result>; } diff --git a/crates/taskito-core/tests/rust/storage_tests.rs b/crates/taskito-core/tests/rust/storage_tests.rs index b57936d..2ba970b 100644 --- a/crates/taskito-core/tests/rust/storage_tests.rs +++ b/crates/taskito-core/tests/rust/storage_tests.rs @@ -41,18 +41,18 @@ fn test_enqueue_and_get(s: &impl Storage) { fn test_dequeue(s: &impl Storage) { let q = "q-dequeue"; let job = s.enqueue(make_job(q, "dequeue_task")).unwrap(); - let dequeued = s.dequeue(q, now_millis() + 1000).unwrap().unwrap(); + let dequeued = s.dequeue(q, now_millis() + 1000, None).unwrap().unwrap(); assert_eq!(dequeued.id, job.id); assert_eq!(dequeued.status, JobStatus::Running); - let none = s.dequeue(q, now_millis() + 1000).unwrap(); + let none = s.dequeue(q, now_millis() + 1000, None).unwrap(); assert!(none.is_none()); } fn test_complete(s: &impl Storage) { let q = "q-complete"; let job = s.enqueue(make_job(q, "complete_task")).unwrap(); - s.dequeue(q, now_millis() + 1000).unwrap(); + s.dequeue(q, now_millis() + 1000, None).unwrap(); s.complete(&job.id, Some(vec![42])).unwrap(); let fetched = s.get_job(&job.id).unwrap().unwrap(); @@ -63,7 +63,7 @@ fn test_complete(s: &impl Storage) { fn test_fail(s: &impl Storage) { let q = "q-fail"; let job = s.enqueue(make_job(q, "fail_task")).unwrap(); - s.dequeue(q, now_millis() + 1000).unwrap(); + s.dequeue(q, now_millis() + 1000, None).unwrap(); s.fail(&job.id, "something broke").unwrap(); let fetched = s.get_job(&job.id).unwrap().unwrap(); @@ -74,7 +74,7 @@ fn test_fail(s: &impl Storage) { fn test_retry(s: &impl Storage) { let q = "q-retry"; let job = s.enqueue(make_job(q, "retry_task")).unwrap(); - s.dequeue(q, now_millis() + 1000).unwrap(); + s.dequeue(q, now_millis() + 1000, None).unwrap(); let future = now_millis() + 5000; s.retry(&job.id, future).unwrap(); @@ -131,7 +131,7 @@ fn test_enqueue_batch(s: &impl Storage) { fn test_dead_letter_queue(s: &impl Storage) { let q = "q-dlq"; let job = s.enqueue(make_job(q, "dlq_task")).unwrap(); - s.dequeue(q, now_millis() + 1000).unwrap(); + s.dequeue(q, now_millis() + 1000, None).unwrap(); let running = s.get_job(&job.id).unwrap().unwrap(); s.move_to_dlq(&running, "max retries exceeded", None) diff --git a/crates/taskito-python/Cargo.toml b/crates/taskito-python/Cargo.toml index 2656cca..79dedb8 100644 --- a/crates/taskito-python/Cargo.toml +++ b/crates/taskito-python/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "taskito-python" -version = "0.7.0" +version = "0.8.0" edition = "2021" [features] diff --git a/crates/taskito-python/src/py_job.rs b/crates/taskito-python/src/py_job.rs index 6e0988a..c311302 100644 --- a/crates/taskito-python/src/py_job.rs +++ b/crates/taskito-python/src/py_job.rs @@ -36,6 +36,8 @@ pub struct PyJob { pub progress: Option, #[pyo3(get)] pub metadata: Option, + #[pyo3(get)] + pub namespace: Option, status_val: i32, result_bytes: Option>, @@ -84,6 +86,7 @@ impl From for PyJob { unique_key: job.unique_key, progress: job.progress, metadata: job.metadata, + namespace: job.namespace, status_val: job.status as i32, result_bytes: job.result, } diff --git a/crates/taskito-python/src/py_queue/inspection.rs b/crates/taskito-python/src/py_queue/inspection.rs index 77e23c3..cc922dd 100644 --- a/crates/taskito-python/src/py_queue/inspection.rs +++ b/crates/taskito-python/src/py_queue/inspection.rs @@ -11,7 +11,7 @@ use crate::py_job::PyJob; #[allow(clippy::useless_conversion)] impl PyQueue { /// List jobs with optional filters and pagination. - #[pyo3(signature = (status=None, queue=None, task_name=None, limit=50, offset=0))] + #[pyo3(signature = (status=None, queue=None, task_name=None, limit=50, offset=0, namespace=None))] pub fn list_jobs( &self, status: Option<&str>, @@ -19,6 +19,7 @@ impl PyQueue { task_name: Option<&str>, limit: i64, offset: i64, + namespace: Option<&str>, ) -> PyResult> { if limit < 0 || offset < 0 { return Err(pyo3::exceptions::PyValueError::new_err( @@ -29,7 +30,7 @@ impl PyQueue { let jobs = self .storage - .list_jobs(status_int, queue, task_name, limit, offset) + .list_jobs(status_int, queue, task_name, limit, offset, namespace) .map_err(|e| pyo3::exceptions::PyRuntimeError::new_err(e.to_string()))?; Ok(jobs.into_iter().map(PyJob::from).collect()) @@ -37,7 +38,7 @@ impl PyQueue { /// List jobs with extended filters. #[allow(clippy::too_many_arguments)] - #[pyo3(signature = (status=None, queue=None, task_name=None, metadata_like=None, error_like=None, created_after=None, created_before=None, limit=50, offset=0))] + #[pyo3(signature = (status=None, queue=None, task_name=None, metadata_like=None, error_like=None, created_after=None, created_before=None, limit=50, offset=0, namespace=None))] pub fn list_jobs_filtered( &self, status: Option<&str>, @@ -49,6 +50,7 @@ impl PyQueue { created_before: Option, limit: i64, offset: i64, + namespace: Option<&str>, ) -> PyResult> { if limit < 0 || offset < 0 { return Err(pyo3::exceptions::PyValueError::new_err( @@ -69,6 +71,7 @@ impl PyQueue { created_before, limit, offset, + namespace, ) .map_err(|e| pyo3::exceptions::PyRuntimeError::new_err(e.to_string()))?; diff --git a/crates/taskito-python/src/py_queue/mod.rs b/crates/taskito-python/src/py_queue/mod.rs index 58661d3..c3f53bb 100644 --- a/crates/taskito-python/src/py_queue/mod.rs +++ b/crates/taskito-python/src/py_queue/mod.rs @@ -36,6 +36,7 @@ pub struct PyQueue { pub(crate) scheduler_poll_interval_ms: u64, pub(crate) scheduler_reap_interval: u32, pub(crate) scheduler_cleanup_interval: u32, + pub(crate) namespace: Option, } #[pymethods] @@ -46,7 +47,7 @@ pub struct PyQueue { )] impl PyQueue { #[new] - #[pyo3(signature = (db_path=".taskito/taskito.db", workers=0, default_retry=3, default_timeout=300, default_priority=0, result_ttl=None, backend="sqlite", db_url=None, schema="taskito", pool_size=None, scheduler_poll_interval_ms=50, scheduler_reap_interval=100, scheduler_cleanup_interval=1200))] + #[pyo3(signature = (db_path=".taskito/taskito.db", workers=0, default_retry=3, default_timeout=300, default_priority=0, result_ttl=None, backend="sqlite", db_url=None, schema="taskito", pool_size=None, scheduler_poll_interval_ms=50, scheduler_reap_interval=100, scheduler_cleanup_interval=1200, namespace=None))] #[allow(clippy::too_many_arguments)] pub fn new( db_path: &str, @@ -62,6 +63,7 @@ impl PyQueue { scheduler_poll_interval_ms: u64, scheduler_reap_interval: u32, scheduler_cleanup_interval: u32, + namespace: Option, ) -> PyResult { let storage = match backend { "sqlite" => { @@ -127,6 +129,7 @@ impl PyQueue { scheduler_poll_interval_ms, scheduler_reap_interval, scheduler_cleanup_interval, + namespace, }) } @@ -219,7 +222,7 @@ impl PyQueue { depends_on: depends_on.unwrap_or_default(), expires_at, result_ttl_ms, - namespace: None, + namespace: self.namespace.clone(), }; let job = if unique_key.is_some() { @@ -318,7 +321,7 @@ impl PyQueue { depends_on: vec![], expires_at, result_ttl_ms, - namespace: None, + namespace: self.namespace.clone(), }); } diff --git a/crates/taskito-python/src/py_queue/worker.rs b/crates/taskito-python/src/py_queue/worker.rs index 01f4407..3018d57 100644 --- a/crates/taskito-python/src/py_queue/worker.rs +++ b/crates/taskito-python/src/py_queue/worker.rs @@ -212,7 +212,12 @@ impl PyQueue { result_ttl_ms: self.result_ttl_ms, ..SchedulerConfig::default() }; - let mut scheduler = Scheduler::new(self.storage.clone(), queues, scheduler_config); + let mut scheduler = Scheduler::new( + self.storage.clone(), + queues, + scheduler_config, + self.namespace.clone(), + ); // Build retry filters dict from the Queue's _task_retry_filters let retry_filters = py.eval_bound("{}", None, None)?; diff --git a/docs/api/queue.md b/docs/api/queue.md index f10ed56..87ab2e8 100644 --- a/docs/api/queue.md +++ b/docs/api/queue.md @@ -27,6 +27,7 @@ Queue( scheduler_poll_interval_ms: int = 50, scheduler_reap_interval: int = 100, scheduler_cleanup_interval: int = 1200, + namespace: str | None = None, ) ``` @@ -51,6 +52,7 @@ Queue( | `scheduler_poll_interval_ms` | `int` | `50` | Milliseconds between scheduler poll cycles. Lower values improve scheduling precision at the cost of CPU. | | `scheduler_reap_interval` | `int` | `100` | Reap stale/timed-out jobs every N poll cycles. | | `scheduler_cleanup_interval` | `int` | `1200` | Clean up old completed jobs every N poll cycles. | +| `namespace` | `str \| None` | `None` | Namespace for multi-tenant isolation. Jobs enqueued on this queue carry this namespace; workers only dequeue matching jobs. `None` means no namespace (default). | ## Task Registration @@ -206,10 +208,11 @@ queue.list_jobs( task_name: str | None = None, limit: int = 50, offset: int = 0, + namespace: str | None = _UNSET, ) -> list[JobResult] ``` -List jobs with optional filters. Returns newest first. +List jobs with optional filters. Returns newest first. Defaults to the queue's namespace — pass `namespace=None` to see all namespaces. | Parameter | Type | Default | Description | |---|---|---|---| @@ -232,10 +235,11 @@ queue.list_jobs_filtered( created_before: float | None = None, limit: int = 50, offset: int = 0, + namespace: str | None = _UNSET, ) -> list[JobResult] ``` -Extended filtering with metadata and error pattern matching and time range constraints. +Extended filtering with metadata and error pattern matching and time range constraints. Defaults to the queue's namespace — pass `namespace=None` to see all namespaces. ### `queue.cancel_job()` diff --git a/docs/changelog.md b/docs/changelog.md index 82f431a..a7de3a1 100644 --- a/docs/changelog.md +++ b/docs/changelog.md @@ -2,6 +2,22 @@ All notable changes to taskito are documented here. +## 0.8.0 + +### Features + +- **Namespace-based routing** -- `Queue(namespace="team-a")` isolates workloads across teams/services sharing a single database; enqueued jobs carry the namespace, workers only dequeue matching jobs, `list_jobs()` and `list_jobs_filtered()` default to the queue's namespace (pass `namespace=None` for global view); DLQ and archival preserve namespace through the full job lifecycle; periodic tasks inherit namespace from their scheduler; backward compatible (`None` namespace matches only `NULL`-namespace jobs) + +### Internal + +- `namespace` column added to `dead_letter` and `archived_jobs` tables; `DeadLetterRow`, `NewDeadLetterRow`, `ArchivedJobRow` models updated; Redis `DeadJobEntry` uses `#[serde(default)]` for backward compatibility +- `Storage` trait: `dequeue`, `dequeue_from`, `list_jobs`, `list_jobs_filtered` signatures gain `namespace: Option<&str>` parameter; all 3 backends + delegate macro updated +- `Scheduler` struct carries `namespace: Option` field, passes to `dequeue_from` in poller +- `PyQueue` struct carries `namespace: Option` field; `PyJob` exposes `namespace` to Python +- `_UNSET` sentinel in `mixins.py` distinguishes "namespace not passed" from explicit `None` + +--- + ## 0.7.0 ### Features diff --git a/py_src/taskito/__init__.py b/py_src/taskito/__init__.py index 8155081..df8b57c 100644 --- a/py_src/taskito/__init__.py +++ b/py_src/taskito/__init__.py @@ -99,4 +99,4 @@ __version__ = _get_version("taskito") except PackageNotFoundError: - __version__ = "0.7.0" + __version__ = "0.8.0" diff --git a/py_src/taskito/_taskito.pyi b/py_src/taskito/_taskito.pyi index 1b55c56..7099780 100644 --- a/py_src/taskito/_taskito.pyi +++ b/py_src/taskito/_taskito.pyi @@ -62,6 +62,7 @@ class PyJob: unique_key: str | None progress: int | None metadata: str | None + namespace: str | None @property def status(self) -> str: ... @@ -87,6 +88,7 @@ class PyQueue: scheduler_poll_interval_ms: int = 50, scheduler_reap_interval: int = 100, scheduler_cleanup_interval: int = 1200, + namespace: str | None = None, ) -> None: ... def request_shutdown(self) -> None: ... def enqueue( @@ -125,6 +127,7 @@ class PyQueue: task_name: str | None = None, limit: int = 50, offset: int = 0, + namespace: str | None = None, ) -> list[PyJob]: ... def get_job(self, job_id: str) -> PyJob | None: ... def get_job_errors(self, job_id: str) -> list[dict[str, Any]]: ... @@ -149,6 +152,7 @@ class PyQueue: created_before: int | None = None, limit: int = 50, offset: int = 0, + namespace: str | None = None, ) -> list[PyJob]: ... def dead_letters(self, limit: int = 10, offset: int = 0) -> list[dict[str, Any]]: ... def retry_dead(self, dead_id: str) -> str: ... diff --git a/py_src/taskito/app.py b/py_src/taskito/app.py index 87d02c6..ba2e6aa 100644 --- a/py_src/taskito/app.py +++ b/py_src/taskito/app.py @@ -108,6 +108,7 @@ def __init__( scheduler_poll_interval_ms: int = 50, scheduler_reap_interval: int = 100, scheduler_cleanup_interval: int = 1200, + namespace: str | None = None, ): """Initialize a new task queue. @@ -174,8 +175,10 @@ def __init__( scheduler_poll_interval_ms=scheduler_poll_interval_ms, scheduler_reap_interval=scheduler_reap_interval, scheduler_cleanup_interval=scheduler_cleanup_interval, + namespace=namespace, ) self._backend = backend + self._namespace = namespace self._db_url = db_url self._schema = schema self._db_path = db_path diff --git a/py_src/taskito/mixins.py b/py_src/taskito/mixins.py index 5456981..91fc2a5 100644 --- a/py_src/taskito/mixins.py +++ b/py_src/taskito/mixins.py @@ -8,11 +8,14 @@ from taskito.locks import DistributedLock from taskito.result import JobResult +_UNSET = object() # sentinel to distinguish "not passed" from explicit None + class QueueInspectionMixin: """Read-only inspection, stats, and query methods for the Queue.""" _inner: Any + _namespace: str | None def get_job(self, job_id: str) -> JobResult | None: """Retrieve a job by its unique ID.""" @@ -30,16 +33,23 @@ def list_jobs( task_name: str | None = None, limit: int = 50, offset: int = 0, + namespace: Any = _UNSET, ) -> list[JobResult]: - """List jobs with optional filters and pagination.""" + """List jobs with optional filters and pagination. + + By default, scoped to this queue's namespace. Pass ``namespace=None`` + explicitly to see jobs across all namespaces. + """ from taskito.result import JobResult + ns = self._namespace if namespace is _UNSET else namespace py_jobs = self._inner.list_jobs( status=status, queue=queue, task_name=task_name, limit=limit, offset=offset, + namespace=ns, ) return [JobResult(py_job=pj, queue=self) for pj in py_jobs] # type: ignore[arg-type] @@ -54,10 +64,16 @@ def list_jobs_filtered( created_before: int | None = None, limit: int = 50, offset: int = 0, + namespace: Any = _UNSET, ) -> list[JobResult]: - """List jobs with extended filters.""" + """List jobs with extended filters. + + By default, scoped to this queue's namespace. Pass ``namespace=None`` + explicitly to see jobs across all namespaces. + """ from taskito.result import JobResult + ns = self._namespace if namespace is _UNSET else namespace py_jobs = self._inner.list_jobs_filtered( status=status, queue=queue, @@ -68,6 +84,7 @@ def list_jobs_filtered( created_before=created_before, limit=limit, offset=offset, + namespace=ns, ) return [JobResult(py_job=pj, queue=self) for pj in py_jobs] # type: ignore[arg-type] diff --git a/pyproject.toml b/pyproject.toml index 2447886..620a5df 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "maturin" [project] name = "taskito" -version = "0.7.0" +version = "0.8.0" description = "Rust-powered task queue for Python. No broker required." requires-python = ">=3.10" license = { file = "LICENSE" } diff --git a/tests/python/test_namespace.py b/tests/python/test_namespace.py new file mode 100644 index 0000000..8408aa1 --- /dev/null +++ b/tests/python/test_namespace.py @@ -0,0 +1,117 @@ +"""Tests for namespace-based routing and isolation.""" + +import threading + +from taskito import Queue + + +def test_namespace_enqueue_sets_namespace(tmp_path): + """Jobs enqueued on a namespaced Queue carry the namespace.""" + queue = Queue(db_path=str(tmp_path / "test.db"), namespace="team-a") + + @queue.task() + def add(x, y): + return x + y + + job = add.delay(1, 2) + py_job = queue._inner.get_job(job.id) + assert py_job is not None + assert py_job.namespace == "team-a" + + +def test_no_namespace_jobs_have_none(tmp_path): + """Jobs enqueued without a namespace have namespace=None.""" + queue = Queue(db_path=str(tmp_path / "test.db")) + + @queue.task() + def noop(): + pass + + job = noop.delay() + py_job = queue._inner.get_job(job.id) + assert py_job is not None + assert py_job.namespace is None + + +def test_namespace_isolation_worker(tmp_path): + """A namespaced worker only processes jobs from its namespace.""" + db = str(tmp_path / "test.db") + + # Create two queues sharing the same DB but different namespaces + q_a = Queue(db_path=db, namespace="team-a") + q_b = Queue(db_path=db, namespace="team-b") + + results = [] + + @q_a.task() + def task_a(): + results.append("a") + return "a" + + @q_b.task() + def task_b(): + results.append("b") + return "b" + + # Enqueue one job on each namespace + job_a = task_a.delay() + job_b = task_b.delay() + + # Run worker for team-a only + worker = threading.Thread(target=q_a.run_worker, daemon=True) + worker.start() + + # Wait for team-a's job to complete + job_a.result(timeout=10) + + # team-b's job should still be pending + job_b.refresh() + assert job_b.status == "pending" + + # Shut down team-a worker (daemon thread exits on its own) + q_a._inner.request_shutdown() + + +def test_namespace_list_jobs_scoped(tmp_path): + """list_jobs defaults to the queue's namespace.""" + db = str(tmp_path / "test.db") + q_a = Queue(db_path=db, namespace="ns-a") + q_b = Queue(db_path=db, namespace="ns-b") + + @q_a.task() + def task_x(): + pass + + @q_b.task() + def task_y(): + pass + + task_x.delay() + task_x.delay() + task_y.delay() + + # Each queue sees only its own jobs by default + assert len(q_a.list_jobs()) == 2 + assert len(q_b.list_jobs()) == 1 + + # Passing namespace=None shows all + assert len(q_a.list_jobs(namespace=None)) == 3 + + +def test_namespace_preserved_in_job_result(tmp_path): + """JobResult.to_dict() includes the namespace.""" + queue = Queue(db_path=str(tmp_path / "test.db"), namespace="my-ns") + + @queue.task() + def greet(name): + return f"hi {name}" + + job = greet.delay("world") + + worker = threading.Thread(target=queue.run_worker, daemon=True) + worker.start() + + result = job.result(timeout=10) + assert result == "hi world" + + queue._inner.request_shutdown()