-
Notifications
You must be signed in to change notification settings - Fork 3
test: run integration suite under both storage engines (part of #360) #362
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -64,11 +64,21 @@ impl TestClient { | |
| } | ||
|
|
||
| /// Claim and execute a single job, optionally filtered by queue. | ||
| /// | ||
| /// Routes through the active storage engine: under queue storage it claims | ||
| /// and records the outcome via the runtime store API; under canonical it | ||
| /// drives the `awa.jobs` view directly. | ||
| pub async fn work_one_in_queue<W: Worker>( | ||
| &self, | ||
| worker: &W, | ||
| queue: Option<&str>, | ||
| ) -> Result<WorkResult, AwaError> { | ||
| if let Some(schema) = | ||
| awa_model::queue_storage::QueueStorage::active_schema(&self.pool).await? | ||
| { | ||
| return self.work_one_queue_storage(worker, queue, &schema).await; | ||
| } | ||
|
|
||
| // Claim one job | ||
| let jobs: Vec<JobRow> = sqlx::query_as::<_, JobRow>( | ||
| r#" | ||
|
|
@@ -102,27 +112,7 @@ impl TestClient { | |
| None => return Ok(WorkResult::NoJob), | ||
| }; | ||
|
|
||
| let cancel = Arc::new(AtomicBool::new(false)); | ||
| let state: Arc<HashMap<std::any::TypeId, Box<dyn Any + Send + Sync>>> = | ||
| Arc::new(HashMap::new()); | ||
| let progress = Arc::new(std::sync::Mutex::new(ProgressState::new( | ||
| job.progress.clone(), | ||
| ))); | ||
| let ctx = JobContext::new_for_testing( | ||
| job.clone(), | ||
| cancel, | ||
| state, | ||
| self.pool.clone(), | ||
| progress.clone(), | ||
| ); | ||
|
|
||
| let result = worker.perform(&ctx).await; | ||
|
|
||
| // Snapshot progress from the buffer after handler execution | ||
| let progress_snapshot: Option<serde_json::Value> = { | ||
| let guard = progress.lock().expect("progress lock poisoned"); | ||
| guard.clone_latest() | ||
| }; | ||
| let (result, progress_snapshot) = self.run_handler(&job, worker).await; | ||
|
|
||
| // Update job state based on result | ||
| match &result { | ||
|
|
@@ -212,6 +202,157 @@ impl TestClient { | |
| } | ||
| } | ||
|
|
||
| /// Build a testing `JobContext`, run the worker, and snapshot any progress | ||
| /// it buffered. Shared by the canonical and queue-storage work paths. | ||
| async fn run_handler<W: Worker>( | ||
| &self, | ||
| job: &JobRow, | ||
| worker: &W, | ||
| ) -> (Result<JobResult, JobError>, Option<serde_json::Value>) { | ||
| let cancel = Arc::new(AtomicBool::new(false)); | ||
| let state: Arc<HashMap<std::any::TypeId, Box<dyn Any + Send + Sync>>> = | ||
| Arc::new(HashMap::new()); | ||
| let progress = Arc::new(std::sync::Mutex::new(ProgressState::new( | ||
| job.progress.clone(), | ||
| ))); | ||
| let ctx = JobContext::new_for_testing( | ||
| job.clone(), | ||
| cancel, | ||
| state, | ||
| self.pool.clone(), | ||
| progress.clone(), | ||
| ); | ||
| let result = worker.perform(&ctx).await; | ||
| let snapshot = { | ||
| let guard = progress.lock().expect("progress lock poisoned"); | ||
| guard.clone_latest() | ||
| }; | ||
| (result, snapshot) | ||
| } | ||
|
|
||
| /// Queue-storage variant of [`Self::work_one_in_queue`]: claim, run the | ||
| /// handler, and record the outcome through the runtime store API rather | ||
| /// than writing the `awa.jobs` view (which the engine rejects). | ||
| async fn work_one_queue_storage<W: Worker>( | ||
| &self, | ||
| worker: &W, | ||
| queue: Option<&str>, | ||
| schema: &str, | ||
| ) -> Result<WorkResult, AwaError> { | ||
| use std::time::Duration; | ||
| let store = awa_model::queue_storage::QueueStorage::from_existing_schema(schema)?; | ||
|
|
||
| // The store claims per queue; when the caller did not pin one, resolve a | ||
| // queue carrying a ready job of this worker's kind. | ||
| let target_queue = match queue { | ||
| Some(queue) => queue.to_string(), | ||
| None => { | ||
| let resolved: Option<String> = sqlx::query_scalar(&format!( | ||
| "SELECT queue FROM {schema}.ready_entries WHERE kind = $1 ORDER BY job_id ASC LIMIT 1" | ||
| )) | ||
| .bind(worker.kind()) | ||
| .fetch_optional(&self.pool) | ||
| .await?; | ||
| match resolved { | ||
| Some(queue) => queue, | ||
| None => return Ok(WorkResult::NoJob), | ||
| } | ||
| } | ||
| }; | ||
|
|
||
| let claimed = store | ||
| .claim_runtime_batch(&self.pool, &target_queue, 1, Duration::from_secs(60)) | ||
| .await?; | ||
| let claimed = match claimed.into_iter().next() { | ||
| Some(claimed) => claimed, | ||
| None => return Ok(WorkResult::NoJob), | ||
| }; | ||
|
Comment on lines
+263
to
+269
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🎯 Functional Correctness | 🟠 Major | 🏗️ Heavy lift 🧩 Analysis chain🏁 Script executed: #!/bin/bash
# Verify whether queue storage already has a kind-aware runtime claim API.
rg -n -C3 'claim_runtime.*kind|kind.*claim_runtime|claim_runtime_batch' awa-model/src/queue_storage.rs awa-testing/src/lib.rsRepository: hardbyte/awa Length of output: 5116 🏁 Script executed: #!/bin/bash
set -euo pipefail
# Inspect the queue-storage path and the canonical claim path around kind filtering.
sed -n '220,310p' awa-testing/src/lib.rs
printf '\n---\n'
rg -n -C4 'state = .available.|kind = \$1|worker-kind|work_one_in_queue|claim_runtime_batch_with_aging_for_instance|claim_runtime_batch\(' awa-model/src awa-testing/src
printf '\n---\n'
sed -n '5680,5785p' awa-model/src/queue_storage.rsRepository: hardbyte/awa Length of output: 23146 🏁 Script executed: #!/bin/bash
set -euo pipefail
# Inspect the runtime-claim implementation to see whether it filters by kind or only by queue.
sed -n '5214,5315p' awa-model/src/queue_storage.rs
printf '\n---\n'
sed -n '1,120p' awa-testing/src/lib.rs
printf '\n---\n'
rg -n -C2 'ready_entries|queue_claim_heads|queue_enqueue_heads|kind = \$1.*queue|queue = \$2.*kind' awa-model/src/queue_storage.rs awa-model/src/admin.rs awa-testing/src/lib.rsRepository: hardbyte/awa Length of output: 25446 🏁 Script executed: #!/bin/bash
set -euo pipefail
# Check whether queue-storage allows multiple kinds in one queue and whether claim paths filter by kind anywhere else.
sed -n '7658,7715p' awa-model/src/queue_storage.rs
printf '\n---\n'
sed -n '3720,3815p' awa-model/src/queue_storage.rs
printf '\n---\n'
rg -n -C2 'INSERT INTO .*ready_entries|ready_entries .* kind|WHERE .*queue = .*kind|kind = \$1.*queue = \$2|queue = \$2.*kind = \$1' awa-model/src/queue_storage.rs awa-model/src/admin.rs awa-testing/src/lib.rsRepository: hardbyte/awa Length of output: 7279 🏁 Script executed: #!/bin/bash
set -euo pipefail
# Check whether queue storage allows multiple kinds in one queue and whether claim paths filter by kind anywhere else.
sed -n '7658,7715p' awa-model/src/queue_storage.rs
printf '\n---\n'
sed -n '3720,3815p' awa-model/src/queue_storage.rs
printf '\n---\n'
rg -n -C2 'INSERT INTO .*ready_entries|ready_entries .* kind|WHERE .*queue = .*kind|kind = \$1.*queue = \$2|queue = \$2.*kind = \$1' awa-model/src/queue_storage.rs awa-model/src/admin.rs awa-testing/src/lib.rsRepository: hardbyte/awa Length of output: 7279 🏁 Script executed: #!/bin/bash
set -euo pipefail
# Inspect the runtime claim SQL and its call chain for any hidden kind predicate.
rg -n -C4 'async fn claim_ready_rows_tx|claim_ready_rows_tx\(|WHERE .*kind|kind = \$1|claim_runtime_batch_with_aging_physical' awa-model/src/queue_storage.rsRepository: hardbyte/awa Length of output: 3666 🏁 Script executed: #!/bin/bash
set -euo pipefail
# Read the full claim implementation and the surrounding schema for queue/kind routing.
sed -n '3574,3675p' awa-model/src/queue_storage.rs
printf '\n---\n'
rg -n -C3 'CREATE TABLE .*ready_entries|ready_entries .*kind|queue .*kind|UNIQUE .*queue.*kind|CHECK .*kind' awa-model/src/queue_storage.rsRepository: hardbyte/awa Length of output: 4570 🏁 Script executed: #!/bin/bash
set -euo pipefail
# Find the definition of claim_ready_runtime and inspect its filtering predicates.
rg -n -C6 'claim_ready_runtime|CREATE FUNCTION .*claim_ready_runtime|RETURNS .*claim_ready_runtime' awa-model/src awa-model/migrationsRepository: hardbyte/awa Length of output: 50368 🏁 Script executed: #!/bin/bash
set -euo pipefail
# Inspect the claim_ready_runtime function body for any kind predicate.
sed -n '2685,3035p' awa-model/migrations/v023_install_queue_storage_substrate.sql | rg -n -C4 'kind|WHERE|ready_segments|ready_entries|queue_claim_heads|claim_ready_runtime'Repository: hardbyte/awa Length of output: 4397 🏁 Script executed: #!/bin/bash
set -euo pipefail
# Pull the function body region and isolate every kind-related predicate/column reference.
sed -n '326,430p' awa-model/migrations/v023_install_queue_storage_substrate.sql
printf '\n---\n'
sed -n '430,560p' awa-model/migrations/v023_install_queue_storage_substrate.sql
printf '\n---\n'
rg -n -C2 'kind|WHERE p_queue|WHERE .*queue|p_queue' awa-model/migrations/v023_install_queue_storage_substrate.sqlRepository: hardbyte/awa Length of output: 21112 Preserve worker kind when claiming from queue storage. 🤖 Prompt for AI Agents |
||
| let job = claimed.job.clone(); | ||
|
|
||
| let (result, progress_snapshot) = self.run_handler(&job, worker).await; | ||
|
|
||
| match &result { | ||
| Ok(JobResult::Completed) => { | ||
| store | ||
| .complete_runtime_batch(&self.pool, std::slice::from_ref(&claimed)) | ||
| .await?; | ||
| Ok(WorkResult::Completed(job)) | ||
| } | ||
| Ok(JobResult::Cancel(reason)) => { | ||
| store | ||
| .cancel_running(&self.pool, job.id, job.run_lease, reason, progress_snapshot) | ||
| .await?; | ||
| Ok(WorkResult::Cancelled(job, reason.clone())) | ||
| } | ||
| Ok(JobResult::RetryAfter(delay)) => { | ||
| store | ||
| .retry_after(&self.pool, job.id, job.run_lease, *delay, progress_snapshot) | ||
| .await?; | ||
| Ok(WorkResult::Retryable(job)) | ||
| } | ||
| Err(JobError::Retryable(_)) => { | ||
| // A retryable error reschedules the job. Canonical leaves it in | ||
| // a finalized `retryable` state (not re-claimable by a later | ||
| // `work_one`); the store needs an explicit delay, so use a long | ||
| // one rather than a near-immediate reschedule that would let the | ||
| // same job be re-claimed within a test or accumulate across runs. | ||
| store | ||
| .retry_after( | ||
| &self.pool, | ||
| job.id, | ||
| job.run_lease, | ||
| Duration::from_secs(3600), | ||
| progress_snapshot, | ||
| ) | ||
| .await?; | ||
| Ok(WorkResult::Retryable(job)) | ||
| } | ||
| Ok(JobResult::Snooze(delay)) => { | ||
| store | ||
| .snooze(&self.pool, job.id, job.run_lease, *delay, progress_snapshot) | ||
| .await?; | ||
| Ok(WorkResult::Snoozed(job)) | ||
| } | ||
| Ok(JobResult::WaitForCallback(_)) => { | ||
| // The handler registers the callback via the context; park to | ||
| // waiting_external when it did, otherwise fail like the | ||
| // canonical path. | ||
| match self.get_job(job.id).await?.callback_id { | ||
| Some(callback_id) => { | ||
| let entered = store | ||
| .enter_callback_wait(&self.pool, job.id, job.run_lease, callback_id) | ||
| .await?; | ||
| assert!( | ||
| entered, | ||
| "enter_callback_wait did not transition job {} to waiting_external", | ||
| job.id | ||
| ); | ||
| Ok(WorkResult::WaitingExternal(self.get_job(job.id).await?)) | ||
|
Comment on lines
+320
to
+330
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🎯 Functional Correctness | 🟠 Major | ⚡ Quick win 🧩 Analysis chain🏁 Script executed: #!/bin/bash
# Inspect enter_callback_wait to confirm whether it can persist progress or needs a companion update.
rg -n -C4 'fn enter_callback_wait|enter_callback_wait\(' awa-model/src/queue_storage.rs awa-testing/src/lib.rsRepository: hardbyte/awa Length of output: 2039 🏁 Script executed: #!/bin/bash
set -euo pipefail
# Inspect the callback-wait implementation and the canonical waiting_external transition.
sed -n '11355,11410p' awa-model/src/queue_storage.rs
printf '\n---\n'
sed -n '280,340p' awa-testing/src/lib.rs
printf '\n---\n'
rg -n -C4 'waiting_external|progress_snapshot|enter_callback_wait_in_tx|enter_callback_wait\(' awa-model/src/queue_storage.rs awa-testing/src/lib.rsRepository: hardbyte/awa Length of output: 24580 Persist 🤖 Prompt for AI Agents |
||
| } | ||
| None => { | ||
| let msg = "WaitForCallback returned without calling register_callback"; | ||
| store | ||
| .fail_terminal( | ||
| &self.pool, | ||
| job.id, | ||
| job.run_lease, | ||
| msg, | ||
| progress_snapshot, | ||
| ) | ||
| .await?; | ||
| Ok(WorkResult::Failed(job, msg.to_string())) | ||
| } | ||
| } | ||
| } | ||
| Err(JobError::Terminal(msg)) => { | ||
| store | ||
| .fail_terminal(&self.pool, job.id, job.run_lease, msg, progress_snapshot) | ||
| .await?; | ||
| Ok(WorkResult::Failed(job, msg.clone())) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /// Get a job by ID. | ||
| pub async fn get_job(&self, job_id: i64) -> Result<JobRow, AwaError> { | ||
| awa_model::admin::get_job(&self.pool, job_id).await | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For
work_one()without an explicit queue, this query reads{schema}.ready_entriesdirectly, but queue storage retains ready rows after they have been claimed/completed and determines availability via claim-head/tombstone/closure evidence. After an older same-kind job in another queue has already been processed, this can resolve that stale queue,claim_runtime_batchreturns no rows, and the helper reportsNoJobeven though a claimable job of the requested kind exists in a different queue.Useful? React with 👍 / 👎.