From dd88e584e9ec739e563c360b6638a728b6587134 Mon Sep 17 00:00:00 2001 From: Claude Date: Thu, 19 Mar 2026 04:11:47 +0000 Subject: [PATCH] Add comprehensive tests for worklist SQL execution algorithm TEST: Add unit tests for worklist dependency resolution algorithm Introduces a SqlExecutor trait to decouple the worklist algorithm from tokio_postgres::Client, following the sans-I/O pattern used elsewhere (Catalog/FakeCatalog). This enables thorough unit testing with a FakeSqlExecutor test double. New test coverage (16 new tests): - All files succeed on first try / single file / empty input - Dependency error causes retry and eventual success - Linear dependency chain (C->B->A) resolves correctly - Diamond dependency (D->B,C->A) resolves correctly - Circular dependency detection (2 files, 3 files, single file) - Duplicate objects are skipped (counts as progress) - Skip resets the visit counter (prevents false circular detection) - Mixed scenarios (success + dependency + skip) - Partial circular (some succeed, remaining cycle detected) - Max iterations safety limit - Already-correct order requires no retries https://claude.ai/code/session_01BZqd9DHaEauidisXBwyBnf --- src/db/pglite/mod.rs | 2 +- src/db/pglite/worklist.rs | 650 ++++++++++++++++++++++++++++++++++++-- 2 files changed, 618 insertions(+), 34 deletions(-) diff --git a/src/db/pglite/mod.rs b/src/db/pglite/mod.rs index 89a1381..7d20d06 100644 --- a/src/db/pglite/mod.rs +++ b/src/db/pglite/mod.rs @@ -37,7 +37,7 @@ mod worklist; pub use error::{PgLiteError, WorklistError}; #[cfg(feature = "pglite")] pub use runtime::PgLiteRuntime; -pub use worklist::{ExecutionResult, WorklistExecutor}; +pub use worklist::{ExecutionResult, SqlExecutor, WorklistExecutor}; use std::path::Path; diff --git a/src/db/pglite/worklist.rs b/src/db/pglite/worklist.rs index bc358a7..a200c41 100644 --- a/src/db/pglite/worklist.rs +++ b/src/db/pglite/worklist.rs @@ -46,6 +46,7 @@ use std::collections::VecDeque; use std::path::PathBuf; +use async_trait::async_trait; use tokio_postgres::Client; use super::error::{WorklistError, error_codes}; @@ -77,6 +78,47 @@ pub enum ExecutionResult { }, } +/// Abstraction over SQL execution for testability. +/// +/// This trait allows the worklist algorithm to be tested without a real +/// PostgreSQL connection, following the sans-I/O pattern used elsewhere +/// in the codebase (e.g., `Catalog` / `FakeCatalog`). +#[async_trait] +pub trait SqlExecutor: Send + Sync { + /// Executes a SQL string and returns the categorized result. + async fn execute(&self, sql: &str) -> ExecutionResult; +} + +#[async_trait] +impl SqlExecutor for Client { + async fn execute(&self, sql: &str) -> ExecutionResult { + match self.batch_execute(sql).await { + Ok(()) => ExecutionResult::Success, + + Err(error) => { + if let Some(db_error) = error.as_db_error() { + let code = db_error.code().code(); + + if error_codes::is_dependency_error(code) { + return ExecutionResult::DependencyError { + code: code.to_string(), + message: db_error.message().to_string(), + }; + } + + if error_codes::is_duplicate_error(code) { + return ExecutionResult::Skipped { + code: code.to_string(), + }; + } + } + + ExecutionResult::Failed { error } + } + } + } +} + /// Executes SQL files with automatic dependency resolution. /// /// The `WorklistExecutor` uses a retry-based algorithm to execute SQL files @@ -158,7 +200,7 @@ impl WorklistExecutor { /// - Any non-retryable database error occurs pub async fn execute_files( &self, - client: &Client, + client: &(impl SqlExecutor + ?Sized), files: &[PathBuf], ) -> Result<(), WorklistError> { if files.is_empty() { @@ -201,7 +243,7 @@ impl WorklistExecutor { })?; // Attempt to execute - let result = self.execute_sql(client, &sql).await; + let result = client.execute(&sql).await; match result { ExecutionResult::Success => { @@ -273,42 +315,13 @@ impl WorklistExecutor { Ok(()) } - /// Executes a single SQL statement and categorizes the result. - async fn execute_sql(&self, client: &Client, sql: &str) -> ExecutionResult { - match client.batch_execute(sql).await { - Ok(()) => ExecutionResult::Success, - - Err(error) => { - // Try to extract the PostgreSQL error code - if let Some(db_error) = error.as_db_error() { - let code = db_error.code().code(); - - if error_codes::is_dependency_error(code) { - return ExecutionResult::DependencyError { - code: code.to_string(), - message: db_error.message().to_string(), - }; - } - - if error_codes::is_duplicate_error(code) { - return ExecutionResult::Skipped { - code: code.to_string(), - }; - } - } - - ExecutionResult::Failed { error } - } - } - } - /// Executes SQL from multiple strings (useful for testing). /// /// This is similar to `execute_files` but works with in-memory SQL strings /// instead of reading from files. pub async fn execute_sql_strings( &self, - client: &Client, + client: &(impl SqlExecutor + ?Sized), sql_strings: &[(&str, &str)], // (name, sql) ) -> Result<(), WorklistError> { if sql_strings.is_empty() { @@ -331,7 +344,7 @@ impl WorklistExecutor { }); } - let result = self.execute_sql(client, sql).await; + let result = client.execute(sql).await; match result { ExecutionResult::Success | ExecutionResult::Skipped { .. } => { @@ -373,6 +386,9 @@ impl Default for WorklistExecutor { #[cfg(test)] mod tests { + use std::collections::HashMap; + use std::sync::Mutex; + use super::*; #[test] @@ -395,4 +411,572 @@ mod tests { }; assert!(format!("{:?}", dep_error).contains("42P01")); } + + // ========================================================================= + // FakeSqlExecutor — test double for the worklist algorithm + // ========================================================================= + + /// Tracks state for a single SQL entry in the fake executor. + /// + /// Each entry has a sequence of results to return on successive calls. + /// This allows simulating dependency resolution: the first call might + /// return `DependencyError`, and after the dependency is "created" by + /// another entry succeeding, the next call returns `Success`. + struct FakeEntry { + results: Vec, + call_index: usize, + } + + /// A fake SQL executor that returns pre-programmed results per SQL string. + /// + /// The executor tracks execution order so tests can verify that the + /// worklist algorithm processes entries in the expected sequence. + struct FakeSqlExecutor { + entries: Mutex>, + execution_order: Mutex>, + } + + impl FakeSqlExecutor { + fn new() -> Self { + Self { + entries: Mutex::new(HashMap::new()), + execution_order: Mutex::new(Vec::new()), + } + } + + /// Programs a SQL string to return the given sequence of results. + fn on_sql(self, sql: &str, results: Vec) -> Self { + self.entries.lock().unwrap().insert( + sql.to_string(), + FakeEntry { + results, + call_index: 0, + }, + ); + self + } + + /// Returns the SQL strings in the order they were executed. + fn execution_order(&self) -> Vec { + self.execution_order.lock().unwrap().clone() + } + } + + #[async_trait] + impl SqlExecutor for FakeSqlExecutor { + async fn execute(&self, sql: &str) -> ExecutionResult { + self.execution_order.lock().unwrap().push(sql.to_string()); + + let mut entries = self.entries.lock().unwrap(); + let entry = entries + .get_mut(sql) + .unwrap_or_else(|| panic!("unexpected SQL: {sql}")); + + let idx = entry.call_index; + entry.call_index += 1; + + if idx < entry.results.len() { + // Return the pre-programmed result for this call + match &entry.results[idx] { + ExecutionResult::Success => ExecutionResult::Success, + ExecutionResult::DependencyError { code, message } => { + ExecutionResult::DependencyError { + code: code.clone(), + message: message.clone(), + } + } + ExecutionResult::Skipped { code } => { + ExecutionResult::Skipped { code: code.clone() } + } + ExecutionResult::Failed { .. } => { + panic!( + "FakeSqlExecutor cannot clone tokio_postgres::Error; use DependencyError for retryable or Success for terminal results" + ) + } + } + } else { + // Default to success after exhausting programmed results + ExecutionResult::Success + } + } + } + + fn dep_error() -> ExecutionResult { + ExecutionResult::DependencyError { + code: "42P01".to_string(), + message: "relation does not exist".to_string(), + } + } + + fn dup_skip() -> ExecutionResult { + ExecutionResult::Skipped { + code: "42P07".to_string(), + } + } + + // ========================================================================= + // Tests for execute_sql_strings + // ========================================================================= + + #[tokio::test] + async fn all_files_succeed_on_first_try() { + let executor = WorklistExecutor::new(10); + let fake = FakeSqlExecutor::new() + .on_sql( + "CREATE TABLE users (id int)", + vec![ExecutionResult::Success], + ) + .on_sql( + "CREATE TABLE orders (id int)", + vec![ExecutionResult::Success], + ) + .on_sql( + "CREATE TABLE products (id int)", + vec![ExecutionResult::Success], + ); + + let sql_strings = [ + ("users.sql", "CREATE TABLE users (id int)"), + ("orders.sql", "CREATE TABLE orders (id int)"), + ("products.sql", "CREATE TABLE products (id int)"), + ]; + + let result = executor.execute_sql_strings(&fake, &sql_strings).await; + assert!(result.is_ok()); + + let order = fake.execution_order(); + assert_eq!(order.len(), 3); + } + + #[tokio::test] + async fn empty_input_succeeds_immediately() { + let executor = WorklistExecutor::new(10); + let fake = FakeSqlExecutor::new(); + + let result = executor.execute_sql_strings(&fake, &[]).await; + assert!(result.is_ok()); + assert!(fake.execution_order().is_empty()); + } + + #[tokio::test] + async fn single_file_succeeds() { + let executor = WorklistExecutor::new(10); + let fake = FakeSqlExecutor::new() + .on_sql("CREATE TABLE t (id int)", vec![ExecutionResult::Success]); + + let sql_strings = [("t.sql", "CREATE TABLE t (id int)")]; + + let result = executor.execute_sql_strings(&fake, &sql_strings).await; + assert!(result.is_ok()); + assert_eq!(fake.execution_order().len(), 1); + } + + #[tokio::test] + async fn dependency_error_retries_and_succeeds() { + // orders.sql depends on users.sql. When orders is tried first, it + // fails with a dependency error. After users succeeds, orders retries + // and succeeds. + let executor = WorklistExecutor::new(10); + let fake = FakeSqlExecutor::new() + .on_sql( + "CREATE TABLE orders (user_id int REFERENCES users(id))", + vec![dep_error(), ExecutionResult::Success], + ) + .on_sql( + "CREATE TABLE users (id int PRIMARY KEY)", + vec![ExecutionResult::Success], + ); + + let sql_strings = [ + ( + "orders.sql", + "CREATE TABLE orders (user_id int REFERENCES users(id))", + ), + ("users.sql", "CREATE TABLE users (id int PRIMARY KEY)"), + ]; + + let result = executor.execute_sql_strings(&fake, &sql_strings).await; + assert!(result.is_ok()); + + let order = fake.execution_order(); + // orders tried first -> dep error, users succeeds, orders retried -> success + assert_eq!(order.len(), 3); + assert_eq!( + order[0], + "CREATE TABLE orders (user_id int REFERENCES users(id))" + ); + assert_eq!(order[1], "CREATE TABLE users (id int PRIMARY KEY)"); + assert_eq!( + order[2], + "CREATE TABLE orders (user_id int REFERENCES users(id))" + ); + } + + #[tokio::test] + async fn linear_dependency_chain_resolves() { + // C depends on B, B depends on A. Given in reverse order: C, B, A. + // Expected: C fails, B fails, A succeeds, C fails, B succeeds, C succeeds. + let executor = WorklistExecutor::new(10); + let fake = FakeSqlExecutor::new() + .on_sql( + "CREATE C", + vec![dep_error(), dep_error(), ExecutionResult::Success], + ) + .on_sql("CREATE B", vec![dep_error(), ExecutionResult::Success]) + .on_sql("CREATE A", vec![ExecutionResult::Success]); + + let sql_strings = [ + ("c.sql", "CREATE C"), + ("b.sql", "CREATE B"), + ("a.sql", "CREATE A"), + ]; + + let result = executor.execute_sql_strings(&fake, &sql_strings).await; + assert!(result.is_ok()); + + let order = fake.execution_order(); + assert_eq!( + order, + vec![ + "CREATE C", // fail (dep on B) + "CREATE B", // fail (dep on A) + "CREATE A", // success + "CREATE C", // fail (dep on B still) + "CREATE B", // success + "CREATE C", // success + ] + ); + } + + #[tokio::test] + async fn circular_dependency_detected_two_files() { + // A depends on B, B depends on A — circular. + let executor = WorklistExecutor::new(10); + let fake = FakeSqlExecutor::new() + .on_sql("CREATE A", vec![dep_error(), dep_error(), dep_error()]) + .on_sql("CREATE B", vec![dep_error(), dep_error(), dep_error()]); + + let sql_strings = [("a.sql", "CREATE A"), ("b.sql", "CREATE B")]; + + let result = executor.execute_sql_strings(&fake, &sql_strings).await; + assert!(result.is_err()); + + let err = result.unwrap_err(); + match &err { + WorklistError::CircularDependency { pending_files } => { + assert_eq!(pending_files.len(), 2); + } + other => panic!("expected CircularDependency, got: {other}"), + } + } + + #[tokio::test] + async fn circular_dependency_detected_three_files() { + // A -> B -> C -> A: all fail on every attempt. + let executor = WorklistExecutor::new(10); + let fake = FakeSqlExecutor::new() + .on_sql("CREATE A", vec![dep_error(), dep_error()]) + .on_sql("CREATE B", vec![dep_error(), dep_error()]) + .on_sql("CREATE C", vec![dep_error(), dep_error()]); + + let sql_strings = [ + ("a.sql", "CREATE A"), + ("b.sql", "CREATE B"), + ("c.sql", "CREATE C"), + ]; + + let result = executor.execute_sql_strings(&fake, &sql_strings).await; + assert!(result.is_err()); + + match result.unwrap_err() { + WorklistError::CircularDependency { pending_files } => { + assert_eq!(pending_files.len(), 3); + } + other => panic!("expected CircularDependency, got: {other}"), + } + } + + #[tokio::test] + async fn single_file_dependency_error_is_circular() { + // A single file that always fails with a dependency error + // is effectively a self-referencing circular dependency. + let executor = WorklistExecutor::new(10); + let fake = FakeSqlExecutor::new().on_sql("CREATE A", vec![dep_error()]); + + let sql_strings = [("a.sql", "CREATE A")]; + + let result = executor.execute_sql_strings(&fake, &sql_strings).await; + assert!(result.is_err()); + + match result.unwrap_err() { + WorklistError::CircularDependency { pending_files } => { + assert_eq!(pending_files.len(), 1); + assert_eq!(pending_files[0], PathBuf::from("a.sql")); + } + other => panic!("expected CircularDependency, got: {other}"), + } + } + + #[tokio::test] + async fn duplicate_objects_are_skipped() { + let executor = WorklistExecutor::new(10); + let fake = FakeSqlExecutor::new() + .on_sql( + "CREATE TABLE users (id int)", + vec![ExecutionResult::Success], + ) + .on_sql("CREATE TABLE users (id int) -- dup", vec![dup_skip()]); + + let sql_strings = [ + ("users.sql", "CREATE TABLE users (id int)"), + ("users_copy.sql", "CREATE TABLE users (id int) -- dup"), + ]; + + let result = executor.execute_sql_strings(&fake, &sql_strings).await; + assert!(result.is_ok()); + } + + #[tokio::test] + async fn skip_resets_visit_counter() { + // Verifies that a Skipped result counts as progress (resets + // visits_since_success), preventing false circular dependency detection. + // Sequence: A=dep_error, B=skip, A=success + let executor = WorklistExecutor::new(10); + let fake = FakeSqlExecutor::new() + .on_sql("CREATE A", vec![dep_error(), ExecutionResult::Success]) + .on_sql("CREATE B", vec![dup_skip()]); + + let sql_strings = [("a.sql", "CREATE A"), ("b.sql", "CREATE B")]; + + let result = executor.execute_sql_strings(&fake, &sql_strings).await; + assert!(result.is_ok()); + + let order = fake.execution_order(); + assert_eq!(order, vec!["CREATE A", "CREATE B", "CREATE A"]); + } + + #[tokio::test] + async fn mixed_success_dependency_and_skip() { + // A complex scenario: 4 files with mixed behaviors. + // - schema.sql: succeeds immediately + // - types.sql: duplicate (skipped) + // - orders.sql: depends on users, fails then succeeds + // - users.sql: succeeds immediately + let executor = WorklistExecutor::new(10); + let fake = FakeSqlExecutor::new() + .on_sql("CREATE SCHEMA app", vec![ExecutionResult::Success]) + .on_sql("CREATE TYPE status", vec![dup_skip()]) + .on_sql( + "CREATE TABLE orders", + vec![dep_error(), ExecutionResult::Success], + ) + .on_sql("CREATE TABLE users", vec![ExecutionResult::Success]); + + let sql_strings = [ + ("schema.sql", "CREATE SCHEMA app"), + ("types.sql", "CREATE TYPE status"), + ("orders.sql", "CREATE TABLE orders"), + ("users.sql", "CREATE TABLE users"), + ]; + + let result = executor.execute_sql_strings(&fake, &sql_strings).await; + assert!(result.is_ok()); + + let order = fake.execution_order(); + assert_eq!( + order, + vec![ + "CREATE SCHEMA app", // success + "CREATE TYPE status", // skip + "CREATE TABLE orders", // dep error -> back of queue + "CREATE TABLE users", // success + "CREATE TABLE orders", // retry -> success + ] + ); + } + + #[tokio::test] + async fn partial_circular_with_some_successful() { + // Two files succeed, but the remaining two form a cycle. + // A: success, B: success, C: always dep_error, D: always dep_error + let executor = WorklistExecutor::new(10); + let fake = FakeSqlExecutor::new() + .on_sql("CREATE A", vec![ExecutionResult::Success]) + .on_sql("CREATE B", vec![ExecutionResult::Success]) + .on_sql("CREATE C", vec![dep_error(), dep_error(), dep_error()]) + .on_sql("CREATE D", vec![dep_error(), dep_error(), dep_error()]); + + let sql_strings = [ + ("a.sql", "CREATE A"), + ("b.sql", "CREATE B"), + ("c.sql", "CREATE C"), + ("d.sql", "CREATE D"), + ]; + + let result = executor.execute_sql_strings(&fake, &sql_strings).await; + assert!(result.is_err()); + + match result.unwrap_err() { + WorklistError::CircularDependency { pending_files } => { + assert_eq!(pending_files.len(), 2); + let names: Vec<_> = pending_files.iter().map(|p| p.to_str().unwrap()).collect(); + assert!(names.contains(&"c.sql")); + assert!(names.contains(&"d.sql")); + } + other => panic!("expected CircularDependency, got: {other}"), + } + } + + #[tokio::test] + async fn max_iterations_safety_limit() { + // With max_retries=2, max_iterations=4. Three files that all keep + // failing should hit the safety limit before natural circular + // dependency detection if the queue is large enough. + let executor = WorklistExecutor::new(2); // max_iterations = 4 + let fake = FakeSqlExecutor::new() + .on_sql( + "CREATE A", + vec![ + dep_error(), + dep_error(), + dep_error(), + dep_error(), + dep_error(), + ], + ) + .on_sql( + "CREATE B", + vec![ + dep_error(), + dep_error(), + dep_error(), + dep_error(), + dep_error(), + ], + ) + .on_sql( + "CREATE C", + vec![ + dep_error(), + dep_error(), + dep_error(), + dep_error(), + dep_error(), + ], + ); + + let sql_strings = [ + ("a.sql", "CREATE A"), + ("b.sql", "CREATE B"), + ("c.sql", "CREATE C"), + ]; + + let result = executor.execute_sql_strings(&fake, &sql_strings).await; + assert!(result.is_err()); + + match result.unwrap_err() { + WorklistError::CircularDependency { .. } => {} + other => panic!("expected CircularDependency, got: {other}"), + } + } + + #[tokio::test] + async fn dependency_resolves_after_retry() { + // File A depends on B, C, and D. On its first attempt A fails because + // dependencies haven't been created yet. After B, C, D all succeed, + // A is retried and succeeds. + // + // Trace: A(dep_error) -> B(success) -> C(success) -> D(success) -> A(success) + let executor = WorklistExecutor::new(10); + let fake = FakeSqlExecutor::new() + .on_sql("CREATE A", vec![dep_error(), ExecutionResult::Success]) + .on_sql("CREATE B", vec![ExecutionResult::Success]) + .on_sql("CREATE C", vec![ExecutionResult::Success]) + .on_sql("CREATE D", vec![ExecutionResult::Success]); + + let sql_strings = [ + ("a.sql", "CREATE A"), + ("b.sql", "CREATE B"), + ("c.sql", "CREATE C"), + ("d.sql", "CREATE D"), + ]; + + let result = executor.execute_sql_strings(&fake, &sql_strings).await; + assert!(result.is_ok()); + + let order = fake.execution_order(); + assert_eq!( + order, + vec!["CREATE A", "CREATE B", "CREATE C", "CREATE D", "CREATE A"] + ); + } + + #[tokio::test] + async fn diamond_dependency_resolves() { + // Diamond: D depends on B and C; B and C each depend on A. + // Input order: D, C, B, A + // + // Trace: + // D(dep) -> C(dep) -> B(dep) -> A(ok) [A resolves, queue: D,C,B] + // D(dep) -> C(ok) -> B(ok) [B,C resolve, queue: D] + // D(ok) [D resolves] + let executor = WorklistExecutor::new(10); + let fake = FakeSqlExecutor::new() + .on_sql( + "CREATE D", + vec![dep_error(), dep_error(), ExecutionResult::Success], + ) + .on_sql("CREATE C", vec![dep_error(), ExecutionResult::Success]) + .on_sql("CREATE B", vec![dep_error(), ExecutionResult::Success]) + .on_sql("CREATE A", vec![ExecutionResult::Success]); + + let sql_strings = [ + ("d.sql", "CREATE D"), + ("c.sql", "CREATE C"), + ("b.sql", "CREATE B"), + ("a.sql", "CREATE A"), + ]; + + let result = executor.execute_sql_strings(&fake, &sql_strings).await; + assert!(result.is_ok()); + + let order = fake.execution_order(); + assert_eq!( + order, + vec![ + "CREATE D", // dep error (needs B/C) + "CREATE C", // dep error (needs A) + "CREATE B", // dep error (needs A) + "CREATE A", // success + "CREATE D", // dep error (needs C) + "CREATE C", // success + "CREATE B", // success + "CREATE D", // success + ] + ); + } + + #[tokio::test] + async fn already_correct_order_no_retries() { + // Files given in correct dependency order: no retries needed. + let executor = WorklistExecutor::new(10); + let fake = FakeSqlExecutor::new() + .on_sql("CREATE A", vec![ExecutionResult::Success]) + .on_sql("CREATE B", vec![ExecutionResult::Success]) + .on_sql("CREATE C", vec![ExecutionResult::Success]); + + let sql_strings = [ + ("a.sql", "CREATE A"), + ("b.sql", "CREATE B"), + ("c.sql", "CREATE C"), + ]; + + let result = executor.execute_sql_strings(&fake, &sql_strings).await; + assert!(result.is_ok()); + + let order = fake.execution_order(); + // Each file executed exactly once + assert_eq!(order.len(), 3); + assert_eq!(order, vec!["CREATE A", "CREATE B", "CREATE C"]); + } }