diff --git a/README.md b/README.md index 7ae5685..d8cb79a 100644 --- a/README.md +++ b/README.md @@ -242,6 +242,16 @@ If no `tables` are listed, the target table name is inferred from the parent dir | `ingest.tables[].key_columns` | Primary key columns (required for upsert mode) | | `ingest.tables[].create_if_missing` | Auto-CREATE TABLE from file schema (default: false) | +## Running Integration Tests + +Some DB-backed tests are optional and run only when `RUSTREAM_IT_DB_URL` is set. +Without this env var, those tests no-op/return early. + +```bash +export RUSTREAM_IT_DB_URL="host=localhost port=5432 dbname=mydb user=postgres password=secret" +cargo test +``` + ## How it works ### Sync (Postgres → Parquet) diff --git a/src/reader.rs b/src/reader.rs index 578a2db..0a37eb1 100644 --- a/src/reader.rs +++ b/src/reader.rs @@ -237,6 +237,10 @@ fn get_as_string(row: &Row, idx: usize) -> Option { #[cfg(test)] mod tests { use super::*; + use crate::schema; + use crate::sync::extract_watermark; + use std::time::{SystemTime, UNIX_EPOCH}; + use tokio_postgres::NoTls; fn make_table(name: &str) -> TableConfig { TableConfig { @@ -349,4 +353,136 @@ mod tests { assert!(q.contains("ORDER BY \"updated_at\" ASC, \"id\" ASC")); assert!(q.ends_with("LIMIT 1000")); } + + /// Ensures duplicate watermark values are paged safely using the cursor tiebreaker. + #[tokio::test] + async fn read_batch_cursor_paging_handles_duplicate_watermarks() { + let db_url = match std::env::var("RUSTREAM_IT_DB_URL") { + Ok(v) => v, + Err(_) => return, // Optional integration-style test; set env var to run. + }; + + let (client, connection) = tokio_postgres::connect(&db_url, NoTls).await.unwrap(); + tokio::spawn(async move { + let _ = connection.await; + }); + + let suffix = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_nanos(); + let table_name = format!("rustream_it_reader_{suffix}"); + + client + .execute( + &format!( + "CREATE TABLE public.{table_name} ( + id INTEGER PRIMARY KEY, + updated_at TIMESTAMPTZ NOT NULL, + payload TEXT + )" + ), + &[], + ) + .await + .unwrap(); + + client + .execute( + &format!( + "INSERT INTO public.{table_name} (id, updated_at, payload) VALUES + (1, '2026-01-01T00:00:00Z', 'a'), + (2, '2026-01-01T00:00:00Z', 'b'), + (3, '2026-01-01T00:00:00Z', 'c'), + (4, '2026-01-01T00:01:00Z', 'd')" + ), + &[], + ) + .await + .unwrap(); + + let table = TableConfig { + name: table_name.clone(), + schema: Some("public".into()), + columns: None, + incremental_column: Some("updated_at".into()), + incremental_tiebreaker_column: Some("id".into()), + incremental_column_is_unique: false, + partition_by: None, + }; + let columns = schema::introspect_table(&client, &table).await.unwrap(); + let arrow_schema = schema::build_arrow_schema(&columns); + + let batch1 = read_batch( + &client, + &table, + &columns, + &arrow_schema, + ReadBatchOptions { + watermark_col: Some("updated_at"), + cursor_col: Some("id"), + watermark_val: None, + cursor_val: None, + batch_size: 2, + }, + ) + .await + .unwrap() + .unwrap(); + assert_eq!(batch1.num_rows(), 2); + let wm1 = extract_watermark(&columns, &batch1, "updated_at").unwrap(); + let cur1 = extract_watermark(&columns, &batch1, "id").unwrap(); + + let batch2 = read_batch( + &client, + &table, + &columns, + &arrow_schema, + ReadBatchOptions { + watermark_col: Some("updated_at"), + cursor_col: Some("id"), + watermark_val: Some(&wm1), + cursor_val: Some(&cur1), + batch_size: 2, + }, + ) + .await + .unwrap() + .unwrap(); + assert_eq!(batch2.num_rows(), 2); + + let id_col = columns.iter().position(|c| c.name == "id").unwrap(); + let ids = batch2 + .column(id_col) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(ids.value(0), 3); + assert_eq!(ids.value(1), 4); + + let wm2 = extract_watermark(&columns, &batch2, "updated_at").unwrap(); + let cur2 = extract_watermark(&columns, &batch2, "id").unwrap(); + + let batch3 = read_batch( + &client, + &table, + &columns, + &arrow_schema, + ReadBatchOptions { + watermark_col: Some("updated_at"), + cursor_col: Some("id"), + watermark_val: Some(&wm2), + cursor_val: Some(&cur2), + batch_size: 2, + }, + ) + .await + .unwrap(); + assert!(batch3.is_none()); + + client + .execute(&format!("DROP TABLE public.{table_name}"), &[]) + .await + .unwrap(); + } } diff --git a/src/state.rs b/src/state.rs index 8e72fbc..8a5b072 100644 --- a/src/state.rs +++ b/src/state.rs @@ -135,6 +135,7 @@ impl StateStore { #[cfg(test)] mod tests { use super::*; + use rusqlite::Connection; use std::fs; use std::sync::atomic::{AtomicU32, Ordering}; @@ -282,4 +283,51 @@ mod tests { let _ = fs::remove_dir_all(&dir); } + + /// Verifies opening state migrates legacy watermark schema and preserves existing values. + #[test] + fn open_migrates_legacy_watermarks_table() { + let dir = temp_state_dir(); + fs::create_dir_all(&dir).unwrap(); + let db_path = Path::new(&dir).join("rustream_state.db"); + + // Simulate an old state db that predates cursor_value. + let conn = Connection::open(&db_path).unwrap(); + conn.execute_batch( + "CREATE TABLE watermarks ( + table_name TEXT PRIMARY KEY, + watermark_value TEXT NOT NULL, + updated_at TEXT NOT NULL DEFAULT (datetime('now')) + );", + ) + .unwrap(); + conn.execute( + "INSERT INTO watermarks (table_name, watermark_value) VALUES (?1, ?2)", + ["users", "2026-01-01 00:00:00"], + ) + .unwrap(); + drop(conn); + + let store = StateStore::open(&dir).unwrap(); + assert_eq!( + store.get_progress("users").unwrap(), + Some(("2026-01-01 00:00:00".to_string(), None)) + ); + + // Verify migrated schema contains cursor_value. + let conn = Connection::open(&db_path).unwrap(); + let mut stmt = conn.prepare("PRAGMA table_info(watermarks)").unwrap(); + let mut rows = stmt.query([]).unwrap(); + let mut has_cursor = false; + while let Some(row) = rows.next().unwrap() { + let col_name: String = row.get(1).unwrap(); + if col_name == "cursor_value" { + has_cursor = true; + break; + } + } + assert!(has_cursor); + + let _ = fs::remove_dir_all(&dir); + } } diff --git a/src/sync.rs b/src/sync.rs index 9e98aea..b4d602d 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -478,10 +478,19 @@ pub(crate) fn extract_watermark( #[cfg(test)] mod tests { use super::*; + use crate::config::{OutputConfig, PostgresConfig}; + use crate::reader; + use crate::schema; + use crate::state::StateStore; + use anyhow::Result; use arrow::array::{Int32Array, Int64Array, StringArray, TimestampMicrosecondArray}; use arrow::datatypes::{DataType, Field, Schema, TimeUnit}; use arrow::record_batch::RecordBatch; + use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; + use std::fs::File; use std::sync::Arc; + use std::time::{SystemTime, UNIX_EPOCH}; + use tokio_postgres::NoTls; #[test] fn extract_watermark_from_timestamp() { @@ -600,4 +609,340 @@ mod tests { let wm = extract_watermark(&columns, &batch, "updated_at"); assert_eq!(wm, None); } + + /// Confirms sync resumes from saved progress and writes only remaining rows. + #[tokio::test] + async fn sync_table_resumes_from_saved_progress() -> Result<()> { + let db_url = match std::env::var("RUSTREAM_IT_DB_URL") { + Ok(v) => v, + Err(_) => return Ok(()), // Optional integration-style test. + }; + + let (client, connection) = tokio_postgres::connect(&db_url, NoTls).await?; + tokio::spawn(async move { + let _ = connection.await; + }); + + let suffix = SystemTime::now().duration_since(UNIX_EPOCH)?.as_nanos(); + let table_name = format!("rustream_it_sync_resume_{suffix}"); + let full_table = format!("public.{table_name}"); + + client + .execute( + &format!( + "CREATE TABLE {full_table} ( + id INTEGER PRIMARY KEY, + updated_at TIMESTAMPTZ NOT NULL, + payload TEXT + )" + ), + &[], + ) + .await?; + + client + .execute( + &format!( + "INSERT INTO {full_table} (id, updated_at, payload) VALUES + (1, '2026-01-01T00:00:00Z', 'a'), + (2, '2026-01-01T00:00:00Z', 'b'), + (3, '2026-01-01T00:00:00Z', 'c'), + (4, '2026-01-01T00:01:00Z', 'd')" + ), + &[], + ) + .await?; + + let tmp = tempfile::tempdir()?; + let out_dir = tmp.path().join("out"); + let state_dir = tmp.path().join("state"); + std::fs::create_dir_all(&out_dir)?; + std::fs::create_dir_all(&state_dir)?; + + let table = TableConfig { + name: table_name.clone(), + schema: Some("public".to_string()), + columns: None, + incremental_column: Some("updated_at".to_string()), + incremental_tiebreaker_column: Some("id".to_string()), + incremental_column_is_unique: false, + partition_by: None, + }; + + // Derive a realistic saved progress point from first page (ids 1,2). + let columns = schema::introspect_table(&client, &table).await?; + let arrow_schema = schema::build_arrow_schema(&columns); + let first_batch = reader::read_batch( + &client, + &table, + &columns, + &arrow_schema, + reader::ReadBatchOptions { + watermark_col: Some("updated_at"), + cursor_col: Some("id"), + watermark_val: None, + cursor_val: None, + batch_size: 2, + }, + ) + .await? + .expect("first batch expected"); + + let saved_wm = extract_watermark(&columns, &first_batch, "updated_at").unwrap(); + let saved_cursor = extract_watermark(&columns, &first_batch, "id").unwrap(); + + let state = StateStore::open(state_dir.to_str().unwrap())?; + state.set_progress(&full_table, &saved_wm, Some(&saved_cursor))?; + + let config = Config { + postgres: PostgresConfig { + host: "localhost".to_string(), + port: 5432, + database: "ignored_for_sync_table_test".to_string(), + user: "ignored".to_string(), + password: None, + }, + output: Some(OutputConfig::Local { + path: out_dir.to_string_lossy().to_string(), + }), + tables: None, + exclude: vec![], + schema: "public".to_string(), + batch_size: 2, + state_dir: Some(state_dir.to_string_lossy().to_string()), + format: OutputFormat::Parquet, + catalog: None, + warehouse: None, + ingest: None, + }; + + sync_table(&client, &config, &table, &state, None).await?; + + // Validate only remaining ids (3,4) are written. + let mut seen_ids = Vec::new(); + for entry in std::fs::read_dir(out_dir.join(&table_name))? { + let path = entry?.path(); + if path.extension().and_then(|s| s.to_str()) != Some("parquet") { + continue; + } + let file = File::open(path)?; + let builder = ParquetRecordBatchReaderBuilder::try_new(file)?; + let reader = builder.build()?; + for batch in reader { + let batch = batch?; + let id_idx = batch + .schema() + .fields() + .iter() + .position(|f| f.name() == "id") + .unwrap(); + let arr = batch + .column(id_idx) + .as_any() + .downcast_ref::() + .unwrap(); + for i in 0..arr.len() { + seen_ids.push(arr.value(i)); + } + } + } + seen_ids.sort_unstable(); + assert_eq!(seen_ids, vec![3, 4]); + + let progress = state.get_progress(&full_table)?.unwrap(); + assert_eq!(progress.1.as_deref(), Some("4")); + + client + .execute(&format!("DROP TABLE {full_table}"), &[]) + .await?; + + Ok(()) + } + + /// Ensures cursor mode fails fast when saved state is missing cursor_value. + #[tokio::test] + async fn sync_table_fails_when_cursor_state_missing_in_cursor_mode() -> Result<()> { + let db_url = match std::env::var("RUSTREAM_IT_DB_URL") { + Ok(v) => v, + Err(_) => return Ok(()), // Optional integration-style test. + }; + + let (client, connection) = tokio_postgres::connect(&db_url, NoTls).await?; + tokio::spawn(async move { + let _ = connection.await; + }); + + let suffix = SystemTime::now().duration_since(UNIX_EPOCH)?.as_nanos(); + let table_name = format!("rustream_it_sync_missing_cursor_{suffix}"); + let full_table = format!("public.{table_name}"); + + client + .execute( + &format!( + "CREATE TABLE {full_table} ( + id INTEGER PRIMARY KEY, + updated_at TIMESTAMPTZ NOT NULL + )" + ), + &[], + ) + .await?; + + client + .execute( + &format!( + "INSERT INTO {full_table} (id, updated_at) VALUES + (1, '2026-01-01T00:00:00Z'), + (2, '2026-01-01T00:00:00Z')" + ), + &[], + ) + .await?; + + let tmp = tempfile::tempdir()?; + let out_dir = tmp.path().join("out"); + let state_dir = tmp.path().join("state"); + std::fs::create_dir_all(&out_dir)?; + std::fs::create_dir_all(&state_dir)?; + + let table = TableConfig { + name: table_name.clone(), + schema: Some("public".to_string()), + columns: None, + incremental_column: Some("updated_at".to_string()), + incremental_tiebreaker_column: Some("id".to_string()), + incremental_column_is_unique: false, + partition_by: None, + }; + + let state = StateStore::open(state_dir.to_str().unwrap())?; + // Seed legacy-like state with watermark only, no cursor. + state.set_progress(&full_table, "2026-01-01 00:00:00.000000", None)?; + + let config = Config { + postgres: PostgresConfig { + host: "localhost".to_string(), + port: 5432, + database: "ignored_for_sync_table_test".to_string(), + user: "ignored".to_string(), + password: None, + }, + output: Some(OutputConfig::Local { + path: out_dir.to_string_lossy().to_string(), + }), + tables: None, + exclude: vec![], + schema: "public".to_string(), + batch_size: 2, + state_dir: Some(state_dir.to_string_lossy().to_string()), + format: OutputFormat::Parquet, + catalog: None, + warehouse: None, + ingest: None, + }; + + let err = sync_table(&client, &config, &table, &state, None) + .await + .expect_err("cursor mode should fail when saved cursor is missing"); + assert!(err.to_string().contains("has watermark but no cursor")); + + client + .execute(&format!("DROP TABLE {full_table}"), &[]) + .await?; + + Ok(()) + } + + /// Verifies unique-watermark mode works without a tiebreaker and persists progress. + #[tokio::test] + async fn sync_table_supports_unique_watermark_without_tiebreaker() -> Result<()> { + let db_url = match std::env::var("RUSTREAM_IT_DB_URL") { + Ok(v) => v, + Err(_) => return Ok(()), // Optional integration-style test. + }; + + let (client, connection) = tokio_postgres::connect(&db_url, NoTls).await?; + tokio::spawn(async move { + let _ = connection.await; + }); + + let suffix = SystemTime::now().duration_since(UNIX_EPOCH)?.as_nanos(); + let table_name = format!("rustream_it_sync_unique_wm_{suffix}"); + let full_table = format!("public.{table_name}"); + + client + .execute( + &format!( + "CREATE TABLE {full_table} ( + id INTEGER PRIMARY KEY, + payload TEXT + )" + ), + &[], + ) + .await?; + + client + .execute( + &format!( + "INSERT INTO {full_table} (id, payload) VALUES + (1, 'a'), + (2, 'b'), + (3, 'c'), + (4, 'd')" + ), + &[], + ) + .await?; + + let tmp = tempfile::tempdir()?; + let out_dir = tmp.path().join("out"); + let state_dir = tmp.path().join("state"); + std::fs::create_dir_all(&out_dir)?; + std::fs::create_dir_all(&state_dir)?; + + let table = TableConfig { + name: table_name.clone(), + schema: Some("public".to_string()), + columns: None, + incremental_column: Some("id".to_string()), + incremental_tiebreaker_column: None, + incremental_column_is_unique: true, + partition_by: None, + }; + + let state = StateStore::open(state_dir.to_str().unwrap())?; + let config = Config { + postgres: PostgresConfig { + host: "localhost".to_string(), + port: 5432, + database: "ignored_for_sync_table_test".to_string(), + user: "ignored".to_string(), + password: None, + }, + output: Some(OutputConfig::Local { + path: out_dir.to_string_lossy().to_string(), + }), + tables: None, + exclude: vec![], + schema: "public".to_string(), + batch_size: 2, + state_dir: Some(state_dir.to_string_lossy().to_string()), + format: OutputFormat::Parquet, + catalog: None, + warehouse: None, + ingest: None, + }; + + sync_table(&client, &config, &table, &state, None).await?; + let progress = state.get_progress(&full_table)?.unwrap(); + assert_eq!(progress.0, "4"); + assert_eq!(progress.1, None); + + client + .execute(&format!("DROP TABLE {full_table}"), &[]) + .await?; + + Ok(()) + } }