From 47f40129a0d907975a982ec6dab3b73ea373f09c Mon Sep 17 00:00:00 2001 From: kawidman3 Date: Mon, 20 Oct 2025 11:44:16 -0500 Subject: [PATCH 1/3] Add delete route and required code to clean up a vectorize job. --- core/src/init.rs | 80 ++++++ core/src/query.rs | 125 +++++++++ extension/src/workers/mod.rs | 14 +- server/src/routes/table.rs | 50 +++- server/src/server.rs | 1 + server/tests/tests.rs | 494 +++++++++++++++++++++++++++++++++++ worker/src/executor.rs | 24 +- 7 files changed, 785 insertions(+), 3 deletions(-) diff --git a/core/src/init.rs b/core/src/init.rs index d31e572f..239a2427 100644 --- a/core/src/init.rs +++ b/core/src/init.rs @@ -314,6 +314,86 @@ pub async fn scan_job(pool: &PgPool, job_request: &VectorizeJob) -> Result<(), V Ok(()) } +pub async fn cleanup_job(pool: &PgPool, job_name: &str) -> Result<(), VectorizeError> { + // First, fetch the job details to get src_schema and src_table + let job = crate::db::get_vectorize_job(pool, job_name) + .await + .map_err(|e| match e { + VectorizeError::SqlError(sqlx::Error::RowNotFound) => { + VectorizeError::NotFound(format!("Job '{}' not found", job_name)) + } + _ => e, + })?; + + log::info!("Cleaning up job: {}", job_name); + + // Delete pending PGMQ messages for this job + // We search for messages where the job_name matches + let delete_messages_query = + format!("DELETE FROM pgmq.vectorize_jobs WHERE message->>'job_name' = $1"); + match sqlx::query(&delete_messages_query) + .bind(job_name) + .execute(pool) + .await + { + Ok(result) => { + log::info!( + "Deleted {} pending PGMQ messages for job: {}", + result.rows_affected(), + job_name + ); + } + Err(e) => { + log::warn!("Failed to delete PGMQ messages for job {}: {}", job_name, e); + // Continue with cleanup even if PGMQ deletion fails + } + } + + // Begin transaction for database resource cleanup + let mut tx = pool.begin().await?; + + // Generate cleanup SQL statements + let cleanup_statements = vec![ + // Drop triggers first (they depend on the function and table) + query::drop_event_trigger(job_name, &job.src_schema, &job.src_table, "INSERT"), + query::drop_event_trigger(job_name, &job.src_schema, &job.src_table, "UPDATE"), + query::drop_search_tokens_trigger(job_name, &job.src_schema, &job.src_table), + // Drop trigger handler function + query::drop_trigger_handler(job_name), + // Drop view (depends on tables) + query::drop_project_view(job_name), + // Drop tables (CASCADE will handle indexes) + query::drop_embeddings_table(job_name), + query::drop_search_tokens_table(job_name), + // Delete job record + query::delete_job_record(job_name), + ]; + + // Execute cleanup statements + for (idx, statement) in cleanup_statements.iter().enumerate() { + match sqlx::query(statement).execute(&mut *tx).await { + Ok(_) => { + log::debug!("Executed cleanup statement {}: {}", idx + 1, statement); + } + Err(e) => { + log::warn!( + "Warning: cleanup statement {} failed (continuing): {} - Error: {}", + idx + 1, + statement, + e + ); + // Continue with other cleanup steps even if one fails + } + } + } + + // Commit transaction + tx.commit().await?; + + log::info!("Successfully cleaned up job: {}", job_name); + Ok(()) +} + #[cfg(test)] mod tests { use super::*; diff --git a/core/src/query.rs b/core/src/query.rs index a489244e..23c09227 100644 --- a/core/src/query.rs +++ b/core/src/query.rs @@ -412,6 +412,38 @@ pub fn drop_project_view(job_name: &str) -> String { format!("DROP VIEW IF EXISTS vectorize.{job_name}_view;") } +pub fn drop_embeddings_table(job_name: &str) -> String { + format!("DROP TABLE IF EXISTS vectorize._embeddings_{job_name} CASCADE;") +} + +pub fn drop_search_tokens_table(job_name: &str) -> String { + format!("DROP TABLE IF EXISTS vectorize._search_tokens_{job_name} CASCADE;") +} + +pub fn drop_trigger_handler(job_name: &str) -> String { + format!("DROP FUNCTION IF EXISTS {TRIGGER_FN_PREFIX}{job_name}() CASCADE;") +} + +pub fn drop_event_trigger( + job_name: &str, + src_schema: &str, + src_table: &str, + event: &str, +) -> String { + format!( + "DROP TRIGGER IF EXISTS vectorize_{event_name}_trigger_{job_name} ON {src_schema}.{src_table};", + event_name = event.to_lowercase() + ) +} + +pub fn drop_search_tokens_trigger(job_name: &str, src_schema: &str, src_table: &str) -> String { + format!("DROP TRIGGER IF EXISTS {job_name}_search_tokens_trigger ON {src_schema}.{src_table};") +} + +pub fn delete_job_record(job_name: &str) -> String { + format!("DELETE FROM vectorize.job WHERE job_name = '{job_name}';") +} + /// creates a function that can be called by trigger pub fn create_trigger_handler(job_name: &str, pkey: &str) -> String { format!( @@ -1406,4 +1438,97 @@ EXECUTE FUNCTION vectorize.handle_update_another_job();" assert_eq!(filter.operator, FilterOperator::Equal); } } + + #[test] + fn test_drop_embeddings_table() { + let job_name = "test_job"; + let result = drop_embeddings_table(job_name); + assert_eq!( + result, + "DROP TABLE IF EXISTS vectorize._embeddings_test_job CASCADE;" + ); + } + + #[test] + fn test_drop_search_tokens_table() { + let job_name = "test_job"; + let result = drop_search_tokens_table(job_name); + assert_eq!( + result, + "DROP TABLE IF EXISTS vectorize._search_tokens_test_job CASCADE;" + ); + } + + #[test] + fn test_drop_trigger_handler() { + let job_name = "test_job"; + let result = drop_trigger_handler(job_name); + assert!(result.contains("DROP FUNCTION IF EXISTS")); + assert!(result.contains("vectorize.handle_update_test_job()")); + assert!(result.contains("CASCADE")); + } + + #[test] + fn test_drop_event_trigger() { + let job_name = "test_job"; + let src_schema = "public"; + let src_table = "my_table"; + + let insert_trigger = drop_event_trigger(job_name, src_schema, src_table, "INSERT"); + assert_eq!( + insert_trigger, + "DROP TRIGGER IF EXISTS vectorize_insert_trigger_test_job ON public.my_table;" + ); + + let update_trigger = drop_event_trigger(job_name, src_schema, src_table, "UPDATE"); + assert_eq!( + update_trigger, + "DROP TRIGGER IF EXISTS vectorize_update_trigger_test_job ON public.my_table;" + ); + } + + #[test] + fn test_drop_search_tokens_trigger() { + let job_name = "test_job"; + let src_schema = "public"; + let src_table = "my_table"; + + let result = drop_search_tokens_trigger(job_name, src_schema, src_table); + assert_eq!( + result, + "DROP TRIGGER IF EXISTS test_job_search_tokens_trigger ON public.my_table;" + ); + } + + #[test] + fn test_delete_job_record() { + let job_name = "test_job"; + let result = delete_job_record(job_name); + assert_eq!( + result, + "DELETE FROM vectorize.job WHERE job_name = 'test_job';" + ); + } + + #[test] + fn test_drop_project_view() { + let job_name = "test_job"; + let result = drop_project_view(job_name); + assert_eq!(result, "DROP VIEW IF EXISTS vectorize.test_job_view;"); + } + + #[test] + fn test_cleanup_sql_with_special_chars() { + // Test that job names with underscores work correctly + let job_name = "my_test_job_123"; + + let embeddings = drop_embeddings_table(job_name); + assert!(embeddings.contains("_embeddings_my_test_job_123")); + + let tokens = drop_search_tokens_table(job_name); + assert!(tokens.contains("_search_tokens_my_test_job_123")); + + let view = drop_project_view(job_name); + assert!(view.contains("my_test_job_123_view")); + } } diff --git a/extension/src/workers/mod.rs b/extension/src/workers/mod.rs index ccc22c7c..8a50e97f 100644 --- a/extension/src/workers/mod.rs +++ b/extension/src/workers/mod.rs @@ -89,7 +89,19 @@ pub async fn get_vectorize_meta( /// processes a single job from the queue pub async fn execute_job(dbclient: &Pool, msg: Message) -> Result<()> { - let job_meta = get_vectorize_meta(&msg.message.job_name, dbclient).await?; + // Check if the job still exists - it may have been deleted + let job_meta = match get_vectorize_meta(&msg.message.job_name, dbclient).await { + Ok(meta) => meta, + Err(DatabaseError::SqlxError(sqlx::Error::RowNotFound)) => { + warning!( + "pg-vectorize: Job '{}' not found - it may have been deleted. Skipping message.", + msg.message.job_name + ); + // Return Ok to allow the message to be deleted from queue + return Ok(()); + } + Err(e) => return Err(anyhow::anyhow!("Failed to get job meta: {}", e)), + }; let mut job_params: JobParams = serde_json::from_value(job_meta.params.clone())?; let bpe = cl100k_base().unwrap(); diff --git a/server/src/routes/table.rs b/server/src/routes/table.rs index 773b214f..a9c32762 100644 --- a/server/src/routes/table.rs +++ b/server/src/routes/table.rs @@ -1,6 +1,6 @@ use crate::app_state::AppState; use crate::errors::ServerError; -use actix_web::{HttpResponse, post, web}; +use actix_web::{HttpResponse, delete, post, web}; use serde::{Deserialize, Serialize}; use utoipa::ToSchema; use uuid::Uuid; @@ -59,3 +59,51 @@ pub async fn table( let resp = JobResponse { id: job_id }; Ok(HttpResponse::Ok().json(resp)) } + +#[derive(Serialize, Deserialize, Debug, Clone, ToSchema)] +pub struct DeleteJobResponse { + pub job_name: String, + pub message: String, +} + +#[utoipa::path( + context_path = "/api/v1", + responses( + ( + status = 200, description = "Successfully deleted vectorize job", + body = DeleteJobResponse, + ), + ( + status = 404, description = "Job not found", + ), + ), +)] +#[delete("/table/{job_name}")] +pub async fn delete_table( + app_state: web::Data, + job_name: web::Path, +) -> Result { + let job_name = job_name.into_inner(); + + // Cleanup the job resources + init::cleanup_job(&app_state.db_pool, &job_name) + .await + .map_err(|e| match e { + vectorize_core::errors::VectorizeError::NotFound(msg) => { + ServerError::NotFoundError(msg) + } + _ => ServerError::from(e), + })?; + + // Remove from cache + { + let mut job_cache = app_state.job_cache.write().await; + job_cache.remove(&job_name); + } + + let resp = DeleteJobResponse { + job_name: job_name.clone(), + message: format!("Successfully deleted job '{}'", job_name), + }; + Ok(HttpResponse::Ok().json(resp)) +} diff --git a/server/src/server.rs b/server/src/server.rs index cc3282a9..192b5088 100644 --- a/server/src/server.rs +++ b/server/src/server.rs @@ -6,6 +6,7 @@ pub fn route_config(configuration: &mut web::ServiceConfig) { configuration.service( web::scope("/api/v1") .service(routes::table::table) + .service(routes::table::delete_table) .service(routes::search::search), ); } diff --git a/server/tests/tests.rs b/server/tests/tests.rs index f00008f8..92f63c42 100644 --- a/server/tests/tests.rs +++ b/server/tests/tests.rs @@ -650,3 +650,497 @@ async fn test_health_monitoring() { println!("Health monitoring test completed successfully"); } + +#[tokio::test] +async fn test_delete_job() { + // Initialize test environment + common::init_test_environment().await; + + // Create test table + let table = common::create_test_table().await; + let job_name = format!("test_delete_job_{table}"); + + // Create a vectorize job + let payload = json!({ + "job_name": job_name, + "src_table": table, + "src_schema": "vectorize_test", + "src_columns": ["content"], + "primary_key": "id", + "update_time_col": "updated_at", + "model": "sentence-transformers/all-MiniLM-L6-v2" + }); + + let client = reqwest::Client::new(); + let resp = client + .post("http://localhost:8080/api/v1/table") + .header("Content-Type", "application/json") + .json(&payload) + .send() + .await + .expect("Failed to send request"); + + assert_eq!(resp.status(), reqwest::StatusCode::OK); + let response: JobResponse = resp.json().await.expect("Failed to parse response"); + assert!(!response.id.is_nil(), "Job ID should not be nil"); + + // Wait for job to be processed + tokio::time::sleep(std::time::Duration::from_secs(2)).await; + + // Verify resources were created + let cfg = vectorize_core::config::Config::from_env(); + let pool = sqlx::PgPool::connect(&cfg.database_url).await.unwrap(); + + // Check embeddings table exists + let embeddings_table_exists: bool = sqlx::query_scalar( + "SELECT EXISTS ( + SELECT FROM information_schema.tables + WHERE table_schema = 'vectorize' + AND table_name = $1 + )", + ) + .bind(format!("_embeddings_{}", job_name)) + .fetch_one(&pool) + .await + .unwrap(); + assert!(embeddings_table_exists, "Embeddings table should exist"); + + // Check search tokens table exists + let tokens_table_exists: bool = sqlx::query_scalar( + "SELECT EXISTS ( + SELECT FROM information_schema.tables + WHERE table_schema = 'vectorize' + AND table_name = $1 + )", + ) + .bind(format!("_search_tokens_{}", job_name)) + .fetch_one(&pool) + .await + .unwrap(); + assert!(tokens_table_exists, "Search tokens table should exist"); + + // Check view exists + let view_exists: bool = sqlx::query_scalar( + "SELECT EXISTS ( + SELECT FROM information_schema.views + WHERE table_schema = 'vectorize' + AND table_name = $1 + )", + ) + .bind(format!("{}_view", job_name)) + .fetch_one(&pool) + .await + .unwrap(); + assert!(view_exists, "View should exist"); + + // Check job record exists + let job_exists: bool = + sqlx::query_scalar("SELECT EXISTS (SELECT 1 FROM vectorize.job WHERE job_name = $1)") + .bind(&job_name) + .fetch_one(&pool) + .await + .unwrap(); + assert!(job_exists, "Job record should exist"); + + // Delete the job + let resp = client + .delete(&format!("http://localhost:8080/api/v1/table/{}", job_name)) + .send() + .await + .expect("Failed to send delete request"); + + assert_eq!( + resp.status(), + reqwest::StatusCode::OK, + "Delete should return 200 OK" + ); + + let delete_response: serde_json::Value = resp.json().await.expect("Failed to parse response"); + assert_eq!(delete_response["job_name"], job_name); + assert!( + delete_response["message"] + .as_str() + .unwrap() + .contains("Successfully deleted") + ); + + // Verify all resources were cleaned up + + // Check embeddings table no longer exists + let embeddings_table_exists: bool = sqlx::query_scalar( + "SELECT EXISTS ( + SELECT FROM information_schema.tables + WHERE table_schema = 'vectorize' + AND table_name = $1 + )", + ) + .bind(format!("_embeddings_{}", job_name)) + .fetch_one(&pool) + .await + .unwrap(); + assert!( + !embeddings_table_exists, + "Embeddings table should be deleted" + ); + + // Check search tokens table no longer exists + let tokens_table_exists: bool = sqlx::query_scalar( + "SELECT EXISTS ( + SELECT FROM information_schema.tables + WHERE table_schema = 'vectorize' + AND table_name = $1 + )", + ) + .bind(format!("_search_tokens_{}", job_name)) + .fetch_one(&pool) + .await + .unwrap(); + assert!( + !tokens_table_exists, + "Search tokens table should be deleted" + ); + + // Check view no longer exists + let view_exists: bool = sqlx::query_scalar( + "SELECT EXISTS ( + SELECT FROM information_schema.views + WHERE table_schema = 'vectorize' + AND table_name = $1 + )", + ) + .bind(format!("{}_view", job_name)) + .fetch_one(&pool) + .await + .unwrap(); + assert!(!view_exists, "View should be deleted"); + + // Check job record no longer exists + let job_exists: bool = + sqlx::query_scalar("SELECT EXISTS (SELECT 1 FROM vectorize.job WHERE job_name = $1)") + .bind(&job_name) + .fetch_one(&pool) + .await + .unwrap(); + assert!(!job_exists, "Job record should be deleted"); + + // Verify triggers were removed from source table + let trigger_exists: bool = sqlx::query_scalar( + "SELECT EXISTS ( + SELECT 1 FROM information_schema.triggers + WHERE event_object_schema = 'vectorize_test' + AND event_object_table = $1 + AND trigger_name LIKE $2 + )", + ) + .bind(&table) + .bind(format!("%{}%", job_name)) + .fetch_one(&pool) + .await + .unwrap(); + assert!(!trigger_exists, "Triggers should be deleted"); + + // Verify trigger handler function was removed + let function_exists: bool = sqlx::query_scalar( + "SELECT EXISTS ( + SELECT 1 FROM pg_proc p + JOIN pg_namespace n ON p.pronamespace = n.oid + WHERE n.nspname = 'vectorize' + AND p.proname = $1 + )", + ) + .bind(format!("handle_update_{}", job_name)) + .fetch_one(&pool) + .await + .unwrap(); + assert!( + !function_exists, + "Trigger handler function should be deleted" + ); + + println!("Delete job test completed successfully"); +} + +#[tokio::test] +async fn test_delete_nonexistent_job() { + // Initialize test environment + common::init_test_environment().await; + + let client = reqwest::Client::new(); + let nonexistent_job = "this_job_does_not_exist_12345"; + + // Try to delete a job that doesn't exist + let resp = client + .delete(&format!( + "http://localhost:8080/api/v1/table/{}", + nonexistent_job + )) + .send() + .await + .expect("Failed to send delete request"); + + assert_eq!( + resp.status(), + reqwest::StatusCode::NOT_FOUND, + "Delete of nonexistent job should return 404" + ); + + let error_response: serde_json::Value = resp.json().await.expect("Failed to parse response"); + assert!( + error_response["error"] + .as_str() + .unwrap() + .contains("not found") + ); + + println!("Delete nonexistent job test completed successfully"); +} + +#[tokio::test] +async fn test_delete_job_idempotency() { + // Initialize test environment + common::init_test_environment().await; + + // Create test table + let table = common::create_test_table().await; + let job_name = format!("test_delete_idempotent_{table}"); + + // Create a vectorize job + let payload = json!({ + "job_name": job_name, + "src_table": table, + "src_schema": "vectorize_test", + "src_columns": ["content"], + "primary_key": "id", + "update_time_col": "updated_at", + "model": "sentence-transformers/all-MiniLM-L6-v2" + }); + + let client = reqwest::Client::new(); + let resp = client + .post("http://localhost:8080/api/v1/table") + .header("Content-Type", "application/json") + .json(&payload) + .send() + .await + .expect("Failed to send request"); + + assert_eq!(resp.status(), reqwest::StatusCode::OK); + + // Wait for job to be processed + tokio::time::sleep(std::time::Duration::from_secs(2)).await; + + // Delete the job (first time) + let resp = client + .delete(&format!("http://localhost:8080/api/v1/table/{}", job_name)) + .send() + .await + .expect("Failed to send first delete request"); + + assert_eq!(resp.status(), reqwest::StatusCode::OK); + + // Try to delete the same job again (should return 404) + let resp = client + .delete(&format!("http://localhost:8080/api/v1/table/{}", job_name)) + .send() + .await + .expect("Failed to send second delete request"); + + assert_eq!( + resp.status(), + reqwest::StatusCode::NOT_FOUND, + "Second delete should return 404" + ); + + println!("Delete job idempotency test completed successfully"); +} + +#[tokio::test] +async fn test_delete_job_preserves_source_table() { + // Initialize test environment + common::init_test_environment().await; + + // Create test table + let table = common::create_test_table().await; + let job_name = format!("test_delete_preserves_{table}"); + + let cfg = vectorize_core::config::Config::from_env(); + let pool = sqlx::PgPool::connect(&cfg.database_url).await.unwrap(); + + // Verify source table exists before job creation + let source_table_exists_before: bool = sqlx::query_scalar( + "SELECT EXISTS ( + SELECT FROM information_schema.tables + WHERE table_schema = 'vectorize_test' + AND table_name = $1 + )", + ) + .bind(&table) + .fetch_one(&pool) + .await + .unwrap(); + assert!( + source_table_exists_before, + "Source table should exist before job creation" + ); + + // Count rows in source table + let row_count_before: i64 = + sqlx::query_scalar(&format!("SELECT COUNT(*) FROM vectorize_test.{}", table)) + .fetch_one(&pool) + .await + .unwrap(); + + // Create a vectorize job + let payload = json!({ + "job_name": job_name, + "src_table": table, + "src_schema": "vectorize_test", + "src_columns": ["content"], + "primary_key": "id", + "update_time_col": "updated_at", + "model": "sentence-transformers/all-MiniLM-L6-v2" + }); + + let client = reqwest::Client::new(); + let resp = client + .post("http://localhost:8080/api/v1/table") + .header("Content-Type", "application/json") + .json(&payload) + .send() + .await + .expect("Failed to send request"); + + assert_eq!(resp.status(), reqwest::StatusCode::OK); + + // Wait for job to be processed + tokio::time::sleep(std::time::Duration::from_secs(2)).await; + + // Delete the job + let resp = client + .delete(&format!("http://localhost:8080/api/v1/table/{}", job_name)) + .send() + .await + .expect("Failed to send delete request"); + + assert_eq!(resp.status(), reqwest::StatusCode::OK); + + // Verify source table still exists after job deletion + let source_table_exists_after: bool = sqlx::query_scalar( + "SELECT EXISTS ( + SELECT FROM information_schema.tables + WHERE table_schema = 'vectorize_test' + AND table_name = $1 + )", + ) + .bind(&table) + .fetch_one(&pool) + .await + .unwrap(); + assert!( + source_table_exists_after, + "Source table should still exist after job deletion" + ); + + // Verify data in source table is intact + let row_count_after: i64 = + sqlx::query_scalar(&format!("SELECT COUNT(*) FROM vectorize_test.{}", table)) + .fetch_one(&pool) + .await + .unwrap(); + assert_eq!( + row_count_before, row_count_after, + "Source table data should be intact" + ); + + println!("Delete job preserves source table test completed successfully"); +} + +#[tokio::test] +async fn test_delete_job_with_pending_messages() { + // Initialize test environment + common::init_test_environment().await; + + // Create test table with more data to ensure messages are queued + let cfg = vectorize_core::config::Config::from_env(); + let pool = sqlx::PgPool::connect(&cfg.database_url).await.unwrap(); + + let mut rng = rand::rng(); + let test_num = rng.random_range(1..100000); + let table = format!("test_pending_msgs_{test_num}"); + + // Create table + sqlx::query(&format!( + "CREATE TABLE IF NOT EXISTS vectorize_test.{table} ( + id SERIAL PRIMARY KEY, + content TEXT, + updated_at TIMESTAMPTZ DEFAULT NOW() + );" + )) + .execute(&pool) + .await + .unwrap(); + + // Insert multiple rows + for i in 0..10 { + sqlx::query(&format!( + "INSERT INTO vectorize_test.{table} (content, updated_at) VALUES ($1, NOW());" + )) + .bind(format!("test content {}", i)) + .execute(&pool) + .await + .unwrap(); + } + + let job_name = format!("test_pending_{test_num}"); + + // Create a vectorize job + let payload = json!({ + "job_name": job_name, + "src_table": table, + "src_schema": "vectorize_test", + "src_columns": ["content"], + "primary_key": "id", + "update_time_col": "updated_at", + "model": "sentence-transformers/all-MiniLM-L6-v2" + }); + + let client = reqwest::Client::new(); + let resp = client + .post("http://localhost:8080/api/v1/table") + .header("Content-Type", "application/json") + .json(&payload) + .send() + .await + .expect("Failed to send request"); + + assert_eq!(resp.status(), reqwest::StatusCode::OK); + + // Don't wait for processing - delete immediately to test pending message cleanup + // This should leave messages in the queue + + // Delete the job + let resp = client + .delete(&format!("http://localhost:8080/api/v1/table/{}", job_name)) + .send() + .await + .expect("Failed to send delete request"); + + assert_eq!(resp.status(), reqwest::StatusCode::OK); + + // Verify job is deleted + let job_exists: bool = + sqlx::query_scalar("SELECT EXISTS (SELECT 1 FROM vectorize.job WHERE job_name = $1)") + .bind(&job_name) + .fetch_one(&pool) + .await + .unwrap(); + assert!(!job_exists, "Job should be deleted"); + + // Wait a bit to allow worker to process any remaining messages + tokio::time::sleep(std::time::Duration::from_secs(3)).await; + + // Check that no errors occurred (worker should skip deleted job messages gracefully) + // We can't directly check worker logs, but the test not panicking is a good sign + + println!("Delete job with pending messages test completed successfully"); +} diff --git a/worker/src/executor.rs b/worker/src/executor.rs index 73f60610..d8fdfe9a 100644 --- a/worker/src/executor.rs +++ b/worker/src/executor.rs @@ -61,7 +61,29 @@ async fn execute_job(pool: &PgPool, msg: Message) -> Result<(), Vect let bpe = cl100k_base().unwrap(); let job_name = msg.message.job_name.clone(); - let vectorizejob = db::get_vectorize_job(pool, &job_name).await?; + + // Check if the job still exists - it may have been deleted + let vectorizejob = match db::get_vectorize_job(pool, &job_name).await { + Ok(job) => job, + Err(VectorizeError::SqlError(sqlx::Error::RowNotFound)) => { + log::warn!( + "Job '{}' not found - it may have been deleted. Skipping message.", + job_name + ); + // Return Ok to allow the message to be deleted from queue + return Ok(()); + } + Err(VectorizeError::NotFound(_)) => { + log::warn!( + "Job '{}' not found - it may have been deleted. Skipping message.", + job_name + ); + // Return Ok to allow the message to be deleted from queue + return Ok(()); + } + Err(e) => return Err(e), + }; + log::debug!("Retrieved vectorize job: {vectorizejob:?}"); let provider = providers::get_provider(&vectorizejob.model.source, None, None, None)?; From bd15238119b8850533050707f5776dbe54575f90 Mon Sep 17 00:00:00 2001 From: kawidman3 Date: Mon, 20 Oct 2025 11:49:19 -0500 Subject: [PATCH 2/3] Format --- worker/src/executor.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/worker/src/executor.rs b/worker/src/executor.rs index d8fdfe9a..71dc9704 100644 --- a/worker/src/executor.rs +++ b/worker/src/executor.rs @@ -61,7 +61,7 @@ async fn execute_job(pool: &PgPool, msg: Message) -> Result<(), Vect let bpe = cl100k_base().unwrap(); let job_name = msg.message.job_name.clone(); - + // Check if the job still exists - it may have been deleted let vectorizejob = match db::get_vectorize_job(pool, &job_name).await { Ok(job) => job, @@ -83,7 +83,7 @@ async fn execute_job(pool: &PgPool, msg: Message) -> Result<(), Vect } Err(e) => return Err(e), }; - + log::debug!("Retrieved vectorize job: {vectorizejob:?}"); let provider = providers::get_provider(&vectorizejob.model.source, None, None, None)?; From d43201a47a44442fef25c0e2e09715098faa65b8 Mon Sep 17 00:00:00 2001 From: kawidman3 Date: Mon, 20 Oct 2025 12:02:18 -0500 Subject: [PATCH 3/3] Fix incorrect imported error type --- extension/src/workers/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/extension/src/workers/mod.rs b/extension/src/workers/mod.rs index 8a50e97f..5e0c4f41 100644 --- a/extension/src/workers/mod.rs +++ b/extension/src/workers/mod.rs @@ -92,7 +92,7 @@ pub async fn execute_job(dbclient: &Pool, msg: Message) -> // Check if the job still exists - it may have been deleted let job_meta = match get_vectorize_meta(&msg.message.job_name, dbclient).await { Ok(meta) => meta, - Err(DatabaseError::SqlxError(sqlx::Error::RowNotFound)) => { + Err(DatabaseError::Db(sqlx::Error::RowNotFound)) => { warning!( "pg-vectorize: Job '{}' not found - it may have been deleted. Skipping message.", msg.message.job_name