Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 80 additions & 0 deletions core/src/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,86 @@ pub async fn scan_job(pool: &PgPool, job_request: &VectorizeJob) -> Result<(), V
Ok(())
}

pub async fn cleanup_job(pool: &PgPool, job_name: &str) -> Result<(), VectorizeError> {
// First, fetch the job details to get src_schema and src_table
let job = crate::db::get_vectorize_job(pool, job_name)
.await
.map_err(|e| match e {
VectorizeError::SqlError(sqlx::Error::RowNotFound) => {
VectorizeError::NotFound(format!("Job '{}' not found", job_name))
}
_ => e,
})?;

log::info!("Cleaning up job: {}", job_name);

// Delete pending PGMQ messages for this job
// We search for messages where the job_name matches
let delete_messages_query =
format!("DELETE FROM pgmq.vectorize_jobs WHERE message->>'job_name' = $1");
match sqlx::query(&delete_messages_query)
.bind(job_name)
.execute(pool)
.await
{
Ok(result) => {
log::info!(
"Deleted {} pending PGMQ messages for job: {}",
result.rows_affected(),
job_name
);
}
Err(e) => {
log::warn!("Failed to delete PGMQ messages for job {}: {}", job_name, e);
// Continue with cleanup even if PGMQ deletion fails
}
}

// Begin transaction for database resource cleanup
let mut tx = pool.begin().await?;

// Generate cleanup SQL statements
let cleanup_statements = vec![
// Drop triggers first (they depend on the function and table)
query::drop_event_trigger(job_name, &job.src_schema, &job.src_table, "INSERT"),
query::drop_event_trigger(job_name, &job.src_schema, &job.src_table, "UPDATE"),
query::drop_search_tokens_trigger(job_name, &job.src_schema, &job.src_table),
// Drop trigger handler function
query::drop_trigger_handler(job_name),
// Drop view (depends on tables)
query::drop_project_view(job_name),
// Drop tables (CASCADE will handle indexes)
query::drop_embeddings_table(job_name),
query::drop_search_tokens_table(job_name),
// Delete job record
query::delete_job_record(job_name),
];

// Execute cleanup statements
for (idx, statement) in cleanup_statements.iter().enumerate() {
match sqlx::query(statement).execute(&mut *tx).await {
Ok(_) => {
log::debug!("Executed cleanup statement {}: {}", idx + 1, statement);
}
Err(e) => {
log::warn!(
"Warning: cleanup statement {} failed (continuing): {} - Error: {}",
idx + 1,
statement,
e
);
// Continue with other cleanup steps even if one fails
}
}
}

// Commit transaction
tx.commit().await?;

log::info!("Successfully cleaned up job: {}", job_name);
Ok(())
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
125 changes: 125 additions & 0 deletions core/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,38 @@ pub fn drop_project_view(job_name: &str) -> String {
format!("DROP VIEW IF EXISTS vectorize.{job_name}_view;")
}

pub fn drop_embeddings_table(job_name: &str) -> String {
format!("DROP TABLE IF EXISTS vectorize._embeddings_{job_name} CASCADE;")
}

pub fn drop_search_tokens_table(job_name: &str) -> String {
format!("DROP TABLE IF EXISTS vectorize._search_tokens_{job_name} CASCADE;")
}

pub fn drop_trigger_handler(job_name: &str) -> String {
format!("DROP FUNCTION IF EXISTS {TRIGGER_FN_PREFIX}{job_name}() CASCADE;")
}

pub fn drop_event_trigger(
job_name: &str,
src_schema: &str,
src_table: &str,
event: &str,
) -> String {
format!(
"DROP TRIGGER IF EXISTS vectorize_{event_name}_trigger_{job_name} ON {src_schema}.{src_table};",
event_name = event.to_lowercase()
)
}

pub fn drop_search_tokens_trigger(job_name: &str, src_schema: &str, src_table: &str) -> String {
format!("DROP TRIGGER IF EXISTS {job_name}_search_tokens_trigger ON {src_schema}.{src_table};")
}

pub fn delete_job_record(job_name: &str) -> String {
format!("DELETE FROM vectorize.job WHERE job_name = '{job_name}';")
}

/// creates a function that can be called by trigger
pub fn create_trigger_handler(job_name: &str, pkey: &str) -> String {
format!(
Expand Down Expand Up @@ -1406,4 +1438,97 @@ EXECUTE FUNCTION vectorize.handle_update_another_job();"
assert_eq!(filter.operator, FilterOperator::Equal);
}
}

#[test]
fn test_drop_embeddings_table() {
let job_name = "test_job";
let result = drop_embeddings_table(job_name);
assert_eq!(
result,
"DROP TABLE IF EXISTS vectorize._embeddings_test_job CASCADE;"
);
}

#[test]
fn test_drop_search_tokens_table() {
let job_name = "test_job";
let result = drop_search_tokens_table(job_name);
assert_eq!(
result,
"DROP TABLE IF EXISTS vectorize._search_tokens_test_job CASCADE;"
);
}

#[test]
fn test_drop_trigger_handler() {
let job_name = "test_job";
let result = drop_trigger_handler(job_name);
assert!(result.contains("DROP FUNCTION IF EXISTS"));
assert!(result.contains("vectorize.handle_update_test_job()"));
assert!(result.contains("CASCADE"));
}

#[test]
fn test_drop_event_trigger() {
let job_name = "test_job";
let src_schema = "public";
let src_table = "my_table";

let insert_trigger = drop_event_trigger(job_name, src_schema, src_table, "INSERT");
assert_eq!(
insert_trigger,
"DROP TRIGGER IF EXISTS vectorize_insert_trigger_test_job ON public.my_table;"
);

let update_trigger = drop_event_trigger(job_name, src_schema, src_table, "UPDATE");
assert_eq!(
update_trigger,
"DROP TRIGGER IF EXISTS vectorize_update_trigger_test_job ON public.my_table;"
);
}

#[test]
fn test_drop_search_tokens_trigger() {
let job_name = "test_job";
let src_schema = "public";
let src_table = "my_table";

let result = drop_search_tokens_trigger(job_name, src_schema, src_table);
assert_eq!(
result,
"DROP TRIGGER IF EXISTS test_job_search_tokens_trigger ON public.my_table;"
);
}

#[test]
fn test_delete_job_record() {
let job_name = "test_job";
let result = delete_job_record(job_name);
assert_eq!(
result,
"DELETE FROM vectorize.job WHERE job_name = 'test_job';"
);
}

#[test]
fn test_drop_project_view() {
let job_name = "test_job";
let result = drop_project_view(job_name);
assert_eq!(result, "DROP VIEW IF EXISTS vectorize.test_job_view;");
}

#[test]
fn test_cleanup_sql_with_special_chars() {
// Test that job names with underscores work correctly
let job_name = "my_test_job_123";

let embeddings = drop_embeddings_table(job_name);
assert!(embeddings.contains("_embeddings_my_test_job_123"));

let tokens = drop_search_tokens_table(job_name);
assert!(tokens.contains("_search_tokens_my_test_job_123"));

let view = drop_project_view(job_name);
assert!(view.contains("my_test_job_123_view"));
}
}
14 changes: 13 additions & 1 deletion extension/src/workers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,19 @@ pub async fn get_vectorize_meta(

/// processes a single job from the queue
pub async fn execute_job(dbclient: &Pool<Postgres>, msg: Message<JobMessage>) -> Result<()> {
let job_meta = get_vectorize_meta(&msg.message.job_name, dbclient).await?;
// Check if the job still exists - it may have been deleted
let job_meta = match get_vectorize_meta(&msg.message.job_name, dbclient).await {
Ok(meta) => meta,
Err(DatabaseError::Db(sqlx::Error::RowNotFound)) => {
warning!(
"pg-vectorize: Job '{}' not found - it may have been deleted. Skipping message.",
msg.message.job_name
);
// Return Ok to allow the message to be deleted from queue
return Ok(());
}
Err(e) => return Err(anyhow::anyhow!("Failed to get job meta: {}", e)),
};
let mut job_params: JobParams = serde_json::from_value(job_meta.params.clone())?;
let bpe = cl100k_base().unwrap();

Expand Down
50 changes: 49 additions & 1 deletion server/src/routes/table.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::app_state::AppState;
use crate::errors::ServerError;
use actix_web::{HttpResponse, post, web};
use actix_web::{HttpResponse, delete, post, web};
use serde::{Deserialize, Serialize};
use utoipa::ToSchema;
use uuid::Uuid;
Expand Down Expand Up @@ -59,3 +59,51 @@ pub async fn table(
let resp = JobResponse { id: job_id };
Ok(HttpResponse::Ok().json(resp))
}

#[derive(Serialize, Deserialize, Debug, Clone, ToSchema)]
pub struct DeleteJobResponse {
pub job_name: String,
pub message: String,
}

#[utoipa::path(
context_path = "/api/v1",
responses(
(
status = 200, description = "Successfully deleted vectorize job",
body = DeleteJobResponse,
),
(
status = 404, description = "Job not found",
),
),
)]
#[delete("/table/{job_name}")]
pub async fn delete_table(
app_state: web::Data<AppState>,
job_name: web::Path<String>,
) -> Result<HttpResponse, ServerError> {
let job_name = job_name.into_inner();

// Cleanup the job resources
init::cleanup_job(&app_state.db_pool, &job_name)
.await
.map_err(|e| match e {
vectorize_core::errors::VectorizeError::NotFound(msg) => {
ServerError::NotFoundError(msg)
}
_ => ServerError::from(e),
})?;

// Remove from cache
{
let mut job_cache = app_state.job_cache.write().await;
job_cache.remove(&job_name);
}

let resp = DeleteJobResponse {
job_name: job_name.clone(),
message: format!("Successfully deleted job '{}'", job_name),
};
Ok(HttpResponse::Ok().json(resp))
}
1 change: 1 addition & 0 deletions server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ pub fn route_config(configuration: &mut web::ServiceConfig) {
configuration.service(
web::scope("/api/v1")
.service(routes::table::table)
.service(routes::table::delete_table)
.service(routes::search::search),
);
}
Loading
Loading