From d635fa60dff10e693849f5e91331aabb783f531f Mon Sep 17 00:00:00 2001 From: Maria Dubyaga Date: Tue, 24 Feb 2026 10:26:55 -0500 Subject: [PATCH 1/3] Add idempotent incremental checkpoints and cursor paging --- README.md | 13 +++- config.example.yaml | 7 ++ src/config.rs | 12 +++ src/reader.rs | 82 +++++++++++++++++---- src/state.rs | 75 ++++++++++++++++++- src/sync.rs | 176 ++++++++++++++++++++++++++++++++++++++++---- 6 files changed, 331 insertions(+), 34 deletions(-) diff --git a/README.md b/README.md index 389a2ea..13950e1 100644 --- a/README.md +++ b/README.md @@ -66,6 +66,7 @@ output: tables: - name: users incremental_column: updated_at + incremental_tiebreaker_column: id columns: # optional: pick specific columns - id - email @@ -74,8 +75,13 @@ tables: - name: orders incremental_column: updated_at + incremental_tiebreaker_column: id - name: products # no incremental_column = full sync every run + + - name: events # append-only example (no updated_at) + incremental_column: id + incremental_column_is_unique: true ``` ### All tables (auto-discover) @@ -132,6 +138,8 @@ AWS credentials come from environment variables, `~/.aws/credentials`, or IAM ro | `tables[].schema` | Schema name (default: `public`) | | `tables[].columns` | Columns to sync (default: all) | | `tables[].incremental_column` | Column for watermark-based incremental sync | +| `tables[].incremental_tiebreaker_column` | Stable cursor column for duplicate-safe incremental paging (required when `incremental_column` is set; recommended: primary key) | +| `tables[].incremental_column_is_unique` | Allow watermark-only incremental mode when incremental column is strictly unique/monotonic (e.g. append-only `id`) | | `tables[].partition_by` | Partition output files: `date`, `month`, or `year` | ## How it works @@ -140,8 +148,9 @@ AWS credentials come from environment variables, `~/.aws/credentials`, or IAM ro 2. Maps Postgres column types to Arrow types automatically 3. Reads rows in batches, converting to Arrow RecordBatches 4. Writes each batch as a Snappy-compressed Parquet file -5. Tracks the high watermark (max value of `incremental_column`) in local SQLite -6. On next run, only reads rows where `incremental_column > last_watermark` +5. Tracks the high watermark (max value of `incremental_column`) and optional cursor in local SQLite +6. Checkpoints incremental progress after each successfully written batch +7. On next run, reads rows after the saved `(watermark, cursor)` position Tables without `incremental_column` do a full sync every run. diff --git a/config.example.yaml b/config.example.yaml index b710552..51f1d9b 100644 --- a/config.example.yaml +++ b/config.example.yaml @@ -34,6 +34,7 @@ state_dir: .rustream_state tables: - name: users incremental_column: updated_at + incremental_tiebreaker_column: id columns: # optional: pick specific columns - id - email @@ -43,12 +44,18 @@ tables: - name: orders incremental_column: updated_at + incremental_tiebreaker_column: id partition_by: date # table/year=2026/month=02/day=10/... # partition_by: month # table/year=2026/month=02/... # partition_by: year # table/year=2026/... - name: products # no incremental_column = full sync every run + # Example for append-only tables without updated_at: + # - name: events + # incremental_column: id + # incremental_column_is_unique: true + # Option 2: Auto-discover all tables (remove `tables` above and uncomment below) # schema: public # which schema to discover from (default: public) # exclude: # skip these tables diff --git a/src/config.rs b/src/config.rs index 2a375dd..8aed3b5 100644 --- a/src/config.rs +++ b/src/config.rs @@ -53,6 +53,10 @@ pub struct TableConfig { #[serde(default)] pub incremental_column: Option, #[serde(default)] + pub incremental_tiebreaker_column: Option, + #[serde(default)] + pub incremental_column_is_unique: bool, + #[serde(default)] pub partition_by: Option, } @@ -156,6 +160,8 @@ state_dir: /tmp/state tables: - name: users incremental_column: updated_at + incremental_tiebreaker_column: id + incremental_column_is_unique: false columns: - id - email @@ -175,6 +181,8 @@ exclude: assert_eq!(tables.len(), 2); assert_eq!(tables[0].name, "users"); assert_eq!(tables[0].incremental_column.as_deref(), Some("updated_at")); + assert_eq!(tables[0].incremental_tiebreaker_column.as_deref(), Some("id")); + assert!(!tables[0].incremental_column_is_unique); assert_eq!(tables[0].columns.as_ref().unwrap().len(), 2); assert!(matches!(tables[1].partition_by, Some(PartitionBy::Date))); @@ -231,6 +239,8 @@ exclude: schema: Some("analytics".into()), columns: None, incremental_column: None, + incremental_tiebreaker_column: None, + incremental_column_is_unique: false, partition_by: None, }; assert_eq!(t.full_name(), "analytics.users"); @@ -244,6 +254,8 @@ exclude: schema: None, columns: None, incremental_column: None, + incremental_tiebreaker_column: None, + incremental_column_is_unique: false, partition_by: None, }; assert_eq!(t.full_name(), "orders"); diff --git a/src/reader.rs b/src/reader.rs index f0d3940..05574db 100644 --- a/src/reader.rs +++ b/src/reader.rs @@ -15,7 +15,9 @@ pub fn build_query( table: &TableConfig, columns: &[ColumnInfo], watermark_col: Option<&str>, + cursor_col: Option<&str>, watermark_val: Option<&str>, + cursor_val: Option<&str>, batch_size: usize, ) -> String { let cols = columns @@ -27,12 +29,22 @@ pub fn build_query( let full_name = table.full_name(); let mut query = format!("SELECT {cols} FROM {full_name}"); - if let (Some(col), Some(val)) = (watermark_col, watermark_val) { - query.push_str(&format!(" WHERE \"{col}\" > '{val}'")); + if let (Some(wm_col), Some(_)) = (watermark_col, watermark_val) { + if let (Some(cur_col), Some(_)) = (cursor_col, cursor_val) { + query.push_str(&format!( + " WHERE (\"{wm_col}\" > $1 OR (\"{wm_col}\" = $1 AND \"{cur_col}\" > $2))" + )); + } else { + query.push_str(&format!(" WHERE \"{wm_col}\" > $1")); + } } - if let Some(col) = watermark_col { - query.push_str(&format!(" ORDER BY \"{col}\" ASC")); + if let Some(wm_col) = watermark_col { + if let Some(cur_col) = cursor_col { + query.push_str(&format!(" ORDER BY \"{wm_col}\" ASC, \"{cur_col}\" ASC")); + } else { + query.push_str(&format!(" ORDER BY \"{wm_col}\" ASC")); + } } query.push_str(&format!(" LIMIT {batch_size}")); @@ -46,17 +58,37 @@ pub async fn read_batch( columns: &[ColumnInfo], schema: &Arc, watermark_col: Option<&str>, + cursor_col: Option<&str>, watermark_val: Option<&str>, + cursor_val: Option<&str>, batch_size: usize, ) -> Result> { - let query = build_query(table, columns, watermark_col, watermark_val, batch_size); + let query = build_query( + table, + columns, + watermark_col, + cursor_col, + watermark_val, + cursor_val, + batch_size, + ); tracing::debug!(%query, "executing query"); - let rows = client - .query(&query as &str, &[]) - .await - .with_context(|| format!("querying table {}", table.full_name()))?; + let rows = match (watermark_val, cursor_val) { + (Some(wm), Some(cur)) if cursor_col.is_some() => client + .query(&query as &str, &[&wm, &cur]) + .await + .with_context(|| format!("querying table {}", table.full_name()))?, + (Some(wm), _) => client + .query(&query as &str, &[&wm]) + .await + .with_context(|| format!("querying table {}", table.full_name()))?, + (None, _) => client + .query(&query as &str, &[]) + .await + .with_context(|| format!("querying table {}", table.full_name()))?, + }; if rows.is_empty() { return Ok(None); @@ -177,6 +209,8 @@ mod tests { schema: None, columns: None, incremental_column: None, + incremental_tiebreaker_column: None, + incremental_column_is_unique: false, partition_by: None, } } @@ -208,7 +242,7 @@ mod tests { fn build_query_full_table() { let table = make_table("users"); let cols = make_columns(); - let q = build_query(&table, &cols, None, None, 1000); + let q = build_query(&table, &cols, None, None, None, None, 1000); assert_eq!( q, "SELECT \"id\", \"name\", \"updated_at\" FROM users LIMIT 1000" @@ -222,10 +256,12 @@ mod tests { schema: Some("analytics".into()), columns: None, incremental_column: None, + incremental_tiebreaker_column: None, + incremental_column_is_unique: false, partition_by: None, }; let cols = make_columns(); - let q = build_query(&table, &cols, None, None, 500); + let q = build_query(&table, &cols, None, None, None, None, 500); assert_eq!( q, "SELECT \"id\", \"name\", \"updated_at\" FROM analytics.users LIMIT 500" @@ -236,7 +272,7 @@ mod tests { fn build_query_with_watermark_no_value() { let table = make_table("users"); let cols = make_columns(); - let q = build_query(&table, &cols, Some("updated_at"), None, 1000); + let q = build_query(&table, &cols, Some("updated_at"), None, None, None, 1000); assert_eq!( q, "SELECT \"id\", \"name\", \"updated_at\" FROM users ORDER BY \"updated_at\" ASC LIMIT 1000" @@ -251,13 +287,33 @@ mod tests { &table, &cols, Some("updated_at"), + None, Some("2026-01-01 00:00:00"), + None, 1000, ); - assert!(q.contains("WHERE \"updated_at\" > '2026-01-01 00:00:00'")); + assert!(q.contains("WHERE \"updated_at\" > $1")); assert!(q.contains("ORDER BY \"updated_at\" ASC")); assert!(q.ends_with("LIMIT 1000")); } + + #[test] + fn build_query_with_watermark_and_cursor() { + let table = make_table("users"); + let cols = make_columns(); + let q = build_query( + &table, + &cols, + Some("updated_at"), + Some("id"), + Some("2026-01-01 00:00:00"), + Some("100"), + 1000, + ); + assert!(q.contains("WHERE (\"updated_at\" > $1 OR (\"updated_at\" = $1 AND \"id\" > $2))")); + assert!(q.contains("ORDER BY \"updated_at\" ASC, \"id\" ASC")); + assert!(q.ends_with("LIMIT 1000")); + } } /// Try to get a PG column value as a String, handling various types. diff --git a/src/state.rs b/src/state.rs index 250bc6c..2639ee8 100644 --- a/src/state.rs +++ b/src/state.rs @@ -21,11 +21,34 @@ impl StateStore { "CREATE TABLE IF NOT EXISTS watermarks ( table_name TEXT PRIMARY KEY, watermark_value TEXT NOT NULL, + cursor_value TEXT, updated_at TEXT NOT NULL DEFAULT (datetime('now')) )", ) .context("creating watermarks table")?; + // Backward-compatible migration for existing state DBs. + let has_cursor_col = conn + .prepare("PRAGMA table_info(watermarks)") + .and_then(|mut stmt| { + let mut rows = stmt.query([])?; + let mut found = false; + while let Some(row) = rows.next()? { + let col_name: String = row.get(1)?; + if col_name == "cursor_value" { + found = true; + break; + } + } + Ok(found) + }) + .context("checking watermarks schema")?; + + if !has_cursor_col { + conn.execute("ALTER TABLE watermarks ADD COLUMN cursor_value TEXT", []) + .context("adding cursor_value column to watermarks")?; + } + Ok(Self { conn }) } @@ -43,14 +66,43 @@ impl StateStore { /// Set the high watermark for a table. pub fn set_watermark(&self, table_name: &str, value: &str) -> Result<()> { + self.set_progress(table_name, value, None) + } + + /// Get high watermark and cursor for a table. + pub fn get_progress(&self, table_name: &str) -> Result)>> { + let mut stmt = self + .conn + .prepare("SELECT watermark_value, cursor_value FROM watermarks WHERE table_name = ?1") + .context("preparing progress select")?; + + let result = stmt + .query_row([table_name], |row| { + let wm: String = row.get(0)?; + let cursor: Option = row.get(1)?; + Ok((wm, cursor)) + }) + .ok(); + + Ok(result) + } + + /// Set high watermark and cursor for a table. + pub fn set_progress( + &self, + table_name: &str, + watermark_value: &str, + cursor_value: Option<&str>, + ) -> Result<()> { self.conn .execute( - "INSERT INTO watermarks (table_name, watermark_value, updated_at) - VALUES (?1, ?2, datetime('now')) + "INSERT INTO watermarks (table_name, watermark_value, cursor_value, updated_at) + VALUES (?1, ?2, ?3, datetime('now')) ON CONFLICT(table_name) DO UPDATE SET watermark_value = excluded.watermark_value, + cursor_value = excluded.cursor_value, updated_at = excluded.updated_at", - [table_name, value], + (table_name, watermark_value, cursor_value), ) .with_context(|| format!("setting watermark for {table_name}"))?; @@ -159,4 +211,21 @@ mod tests { let _ = fs::remove_dir_all(&dir); } + + #[test] + fn set_and_get_progress() { + let dir = temp_state_dir(); + let store = StateStore::open(&dir).unwrap(); + + store + .set_progress("users", "2026-01-15 10:30:00", Some("42")) + .unwrap(); + + assert_eq!( + store.get_progress("users").unwrap(), + Some(("2026-01-15 10:30:00".to_string(), Some("42".to_string()))) + ); + + let _ = fs::remove_dir_all(&dir); + } } diff --git a/src/sync.rs b/src/sync.rs index 4922579..17be062 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -1,4 +1,4 @@ -use anyhow::{Context, Result}; +use anyhow::{anyhow, Context, Result}; use chrono::Utc; use tokio_postgres::NoTls; @@ -48,6 +48,8 @@ async fn resolve_tables( schema: Some(config.schema.clone()), columns: None, incremental_column: None, + incremental_tiebreaker_column: None, + incremental_column_is_unique: false, partition_by: None, }) .collect(); @@ -67,7 +69,13 @@ pub async fn dry_run(config: Config) -> Result<()> { let columns = schema::introspect_table(&client, table).await?; let col_names: Vec<&str> = columns.iter().map(|c| c.name.as_str()).collect(); let mode = if table.incremental_column.is_some() { - "incremental" + if table.incremental_tiebreaker_column.is_some() { + "incremental (cursor)" + } else if table.incremental_column_is_unique { + "incremental (unique watermark)" + } else { + "incremental (invalid config: missing tiebreaker)" + } } else { "full" }; @@ -145,9 +153,69 @@ async fn sync_table( // Get watermark info let watermark_col = table.incremental_column.as_deref(); - let mut watermark_val = match watermark_col { - Some(_) => state.get_watermark(&table_name)?, - None => None, + let cursor_col = watermark_col.and(table.incremental_tiebreaker_column.as_deref()); + + if watermark_col.is_some() && cursor_col.is_none() && !table.incremental_column_is_unique { + return Err(anyhow!( + "incremental_tiebreaker_column is required for incremental table {} unless incremental_column_is_unique=true", + table_name + )); + } + + if let (Some(wm), Some(cur)) = (watermark_col, cursor_col) { + if wm == cur { + return Err(anyhow!( + "incremental_tiebreaker_column '{}' cannot equal incremental_column '{}' for table {}", + cur, + wm, + table_name + )); + } + } + + if let Some(wm) = watermark_col { + if !columns.iter().any(|c| c.name == wm) { + return Err(anyhow!( + "incremental_column '{}' is not in selected columns for table {}", + wm, + table_name + )); + } + } + if let Some(cur) = cursor_col { + if !columns.iter().any(|c| c.name == cur) { + return Err(anyhow!( + "incremental_tiebreaker_column '{}' is not in selected columns for table {}", + cur, + table_name + )); + } + } + + if let (Some(wm), Some(cur)) = (watermark_col, cursor_col) { + tracing::info!(table = %table_name, watermark_col = %wm, cursor_col = %cur, "incremental cursor enabled"); + } else if let Some(wm) = watermark_col { + tracing::info!( + table = %table_name, + watermark_col = %wm, + "incremental watermark-only mode enabled (column marked unique)" + ); + } + + let (mut watermark_val, mut cursor_val) = match watermark_col { + Some(_) => match state.get_progress(&table_name)? { + Some((wm, cursor)) => { + if cursor_col.is_some() && cursor.is_none() { + return Err(anyhow!( + "state for table {} has watermark but no cursor; reset this table state and rerun", + table_name + )); + } + (Some(wm), cursor) + } + None => (None, None), + }, + None => (None, None), }; let mut total_rows = 0u64; @@ -160,7 +228,9 @@ async fn sync_table( &columns, &arrow_schema, watermark_col, + cursor_col, watermark_val.as_deref(), + cursor_val.as_deref(), config.batch_size, ) .await?; @@ -173,12 +243,26 @@ async fn sync_table( let num_rows = batch.num_rows(); total_rows += num_rows as u64; - // Update watermark from the last row - if let Some(col) = watermark_col { - if let Some(new_wm) = extract_watermark(&columns, &batch, col) { - watermark_val = Some(new_wm); - } - } + let new_watermark = match watermark_col { + Some(col) => Some(extract_watermark(&columns, &batch, col).ok_or_else(|| { + anyhow!( + "incremental column '{}' produced no watermark value for table {}", + col, + table_name + ) + })?), + None => None, + }; + let new_cursor = match cursor_col { + Some(col) => Some(extract_watermark(&columns, &batch, col).ok_or_else(|| { + anyhow!( + "incremental tiebreaker column '{}' produced no cursor value for table {}", + col, + table_name + ) + })?), + None => None, + }; // Write parquet to buffer let mut buf = Vec::new(); @@ -221,6 +305,14 @@ async fn sync_table( output::write_output(&config.output, &filename, buf).await?; + // Persist incremental progress batch-by-batch so reruns can resume safely. + if let Some(ref wm) = new_watermark { + state.set_progress(&table_name, wm, new_cursor.as_deref())?; + watermark_val = Some(wm.clone()); + cursor_val = new_cursor; + tracing::debug!(table = %table_name, watermark = %wm, cursor = ?cursor_val, "checkpointed watermark"); + } + batch_num += 1; // If we got fewer rows than batch_size, we're done @@ -229,10 +321,8 @@ async fn sync_table( } } - // Persist watermark - if let (Some(_), Some(ref wm)) = (watermark_col, &watermark_val) { - state.set_watermark(&table_name, wm)?; - tracing::info!(table = %table_name, watermark = %wm, "updated watermark"); + if let Some(ref wm) = watermark_val { + tracing::info!(table = %table_name, watermark = %wm, "final watermark"); } tracing::info!(table = %table_name, rows = total_rows, files = batch_num, "sync complete"); @@ -276,6 +366,43 @@ pub(crate) fn extract_watermark( } Some(arr.value(last_row).to_string()) } + DataType::Int16 => { + let arr = array.as_any().downcast_ref::()?; + if arr.is_null(last_row) { + return None; + } + Some(arr.value(last_row).to_string()) + } + DataType::Float64 => { + let arr = array.as_any().downcast_ref::()?; + if arr.is_null(last_row) { + return None; + } + Some(arr.value(last_row).to_string()) + } + DataType::Float32 => { + let arr = array.as_any().downcast_ref::()?; + if arr.is_null(last_row) { + return None; + } + Some(arr.value(last_row).to_string()) + } + DataType::Date32 => { + let arr = array.as_any().downcast_ref::()?; + if arr.is_null(last_row) { + return None; + } + let epoch = chrono::NaiveDate::from_ymd_opt(1970, 1, 1)?; + let d = epoch + chrono::Duration::days(arr.value(last_row) as i64); + Some(d.to_string()) + } + DataType::Boolean => { + let arr = array.as_any().downcast_ref::()?; + if arr.is_null(last_row) { + return None; + } + Some(arr.value(last_row).to_string()) + } DataType::Utf8 => { let arr = array.as_any().downcast_ref::()?; if arr.is_null(last_row) { @@ -290,7 +417,7 @@ pub(crate) fn extract_watermark( #[cfg(test)] mod tests { use super::*; - use arrow::array::{Int64Array, StringArray, TimestampMicrosecondArray}; + use arrow::array::{Int32Array, Int64Array, StringArray, TimestampMicrosecondArray}; use arrow::datatypes::{DataType, Field, Schema, TimeUnit}; use arrow::record_batch::RecordBatch; use std::sync::Arc; @@ -337,6 +464,23 @@ mod tests { assert_eq!(wm, Some("300".to_string())); } + #[test] + fn extract_watermark_from_int32() { + let columns = vec![ColumnInfo { + name: "id".into(), + pg_type: "integer".into(), + arrow_type: DataType::Int32, + is_nullable: false, + }]; + + let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)])); + let arr = Int32Array::from(vec![10, 20, 30]); + let batch = RecordBatch::try_new(schema, vec![Arc::new(arr)]).unwrap(); + + let wm = extract_watermark(&columns, &batch, "id"); + assert_eq!(wm, Some("30".to_string())); + } + #[test] fn extract_watermark_from_utf8() { let columns = vec![ColumnInfo { From e4f5547c6529b76f322b2e117a9c5aa2733bb901 Mon Sep 17 00:00:00 2001 From: Maria Dubyaga Date: Tue, 24 Feb 2026 20:14:18 -0500 Subject: [PATCH 2/3] Fix clippy warnings for state and reader APIs --- src/reader.rs | 90 +++++++++++++++++++++++++++------------------------ src/state.rs | 57 ++++++++++++-------------------- src/sync.rs | 12 ++++--- 3 files changed, 75 insertions(+), 84 deletions(-) diff --git a/src/reader.rs b/src/reader.rs index 05574db..578a2db 100644 --- a/src/reader.rs +++ b/src/reader.rs @@ -10,6 +10,14 @@ use tokio_postgres::{Client, Row}; use crate::config::TableConfig; use crate::schema::ColumnInfo; +pub struct ReadBatchOptions<'a> { + pub watermark_col: Option<&'a str>, + pub cursor_col: Option<&'a str>, + pub watermark_val: Option<&'a str>, + pub cursor_val: Option<&'a str>, + pub batch_size: usize, +} + /// Build the SELECT query for a table, optionally filtering by watermark. pub fn build_query( table: &TableConfig, @@ -57,26 +65,22 @@ pub async fn read_batch( table: &TableConfig, columns: &[ColumnInfo], schema: &Arc, - watermark_col: Option<&str>, - cursor_col: Option<&str>, - watermark_val: Option<&str>, - cursor_val: Option<&str>, - batch_size: usize, + options: ReadBatchOptions<'_>, ) -> Result> { let query = build_query( table, columns, - watermark_col, - cursor_col, - watermark_val, - cursor_val, - batch_size, + options.watermark_col, + options.cursor_col, + options.watermark_val, + options.cursor_val, + options.batch_size, ); tracing::debug!(%query, "executing query"); - let rows = match (watermark_val, cursor_val) { - (Some(wm), Some(cur)) if cursor_col.is_some() => client + let rows = match (options.watermark_val, options.cursor_val) { + (Some(wm), Some(cur)) if options.cursor_col.is_some() => client .query(&query as &str, &[&wm, &cur]) .await .with_context(|| format!("querying table {}", table.full_name()))?, @@ -199,6 +203,37 @@ fn rows_to_array(rows: &[Row], col_idx: usize, arrow_type: &DataType) -> Result< } } +/// Try to get a PG column value as a String, handling various types. +fn get_as_string(row: &Row, idx: usize) -> Option { + let col_type = row.columns()[idx].type_(); + + match *col_type { + Type::TEXT | Type::VARCHAR | Type::BPCHAR | Type::NAME => row.get::<_, Option>(idx), + Type::UUID => row.get::<_, Option>(idx).map(|u| u.to_string()), + Type::JSON | Type::JSONB => row + .get::<_, Option>(idx) + .map(|v| v.to_string()), + Type::NUMERIC => row.get::<_, Option>(idx), + Type::TIMESTAMP => row + .get::<_, Option>(idx) + .map(|dt| dt.to_string()), + Type::TIMESTAMPTZ => row + .get::<_, Option>>(idx) + .map(|dt| dt.to_rfc3339()), + Type::DATE => row.get::<_, Option>(idx).map(|d| d.to_string()), + Type::INT4 => row.get::<_, Option>(idx).map(|v| v.to_string()), + Type::INT8 => row.get::<_, Option>(idx).map(|v| v.to_string()), + Type::INT2 => row.get::<_, Option>(idx).map(|v| v.to_string()), + Type::FLOAT4 => row.get::<_, Option>(idx).map(|v| v.to_string()), + Type::FLOAT8 => row.get::<_, Option>(idx).map(|v| v.to_string()), + Type::BOOL => row.get::<_, Option>(idx).map(|v| v.to_string()), + _ => { + // Last resort: try as String + row.try_get::<_, Option>(idx).unwrap_or(None) + } + } +} + #[cfg(test)] mod tests { use super::*; @@ -315,34 +350,3 @@ mod tests { assert!(q.ends_with("LIMIT 1000")); } } - -/// Try to get a PG column value as a String, handling various types. -fn get_as_string(row: &Row, idx: usize) -> Option { - let col_type = row.columns()[idx].type_(); - - match *col_type { - Type::TEXT | Type::VARCHAR | Type::BPCHAR | Type::NAME => row.get::<_, Option>(idx), - Type::UUID => row.get::<_, Option>(idx).map(|u| u.to_string()), - Type::JSON | Type::JSONB => row - .get::<_, Option>(idx) - .map(|v| v.to_string()), - Type::NUMERIC => row.get::<_, Option>(idx), - Type::TIMESTAMP => row - .get::<_, Option>(idx) - .map(|dt| dt.to_string()), - Type::TIMESTAMPTZ => row - .get::<_, Option>>(idx) - .map(|dt| dt.to_rfc3339()), - Type::DATE => row.get::<_, Option>(idx).map(|d| d.to_string()), - Type::INT4 => row.get::<_, Option>(idx).map(|v| v.to_string()), - Type::INT8 => row.get::<_, Option>(idx).map(|v| v.to_string()), - Type::INT2 => row.get::<_, Option>(idx).map(|v| v.to_string()), - Type::FLOAT4 => row.get::<_, Option>(idx).map(|v| v.to_string()), - Type::FLOAT8 => row.get::<_, Option>(idx).map(|v| v.to_string()), - Type::BOOL => row.get::<_, Option>(idx).map(|v| v.to_string()), - _ => { - // Last resort: try as String - row.try_get::<_, Option>(idx).unwrap_or(None) - } - } -} diff --git a/src/state.rs b/src/state.rs index 2639ee8..410be2b 100644 --- a/src/state.rs +++ b/src/state.rs @@ -52,23 +52,6 @@ impl StateStore { Ok(Self { conn }) } - /// Get the high watermark for a table. - pub fn get_watermark(&self, table_name: &str) -> Result> { - let mut stmt = self - .conn - .prepare("SELECT watermark_value FROM watermarks WHERE table_name = ?1") - .context("preparing watermark select")?; - - let result = stmt.query_row([table_name], |row| row.get(0)).ok(); - - Ok(result) - } - - /// Set the high watermark for a table. - pub fn set_watermark(&self, table_name: &str, value: &str) -> Result<()> { - self.set_progress(table_name, value, None) - } - /// Get high watermark and cursor for a table. pub fn get_progress(&self, table_name: &str) -> Result)>> { let mut stmt = self @@ -135,22 +118,24 @@ mod tests { } #[test] - fn get_watermark_returns_none_initially() { + fn get_progress_returns_none_initially() { let dir = temp_state_dir(); let store = StateStore::open(&dir).unwrap(); - assert_eq!(store.get_watermark("users").unwrap(), None); + assert_eq!(store.get_progress("users").unwrap(), None); let _ = fs::remove_dir_all(&dir); } #[test] - fn set_and_get_watermark() { + fn set_and_get_watermark_without_cursor() { let dir = temp_state_dir(); let store = StateStore::open(&dir).unwrap(); - store.set_watermark("users", "2026-01-15 10:30:00").unwrap(); + store + .set_progress("users", "2026-01-15 10:30:00", None) + .unwrap(); assert_eq!( - store.get_watermark("users").unwrap().as_deref(), - Some("2026-01-15 10:30:00") + store.get_progress("users").unwrap(), + Some(("2026-01-15 10:30:00".to_string(), None)) ); let _ = fs::remove_dir_all(&dir); } @@ -161,14 +146,14 @@ mod tests { let store = StateStore::open(&dir).unwrap(); store - .set_watermark("orders", "2026-01-01 00:00:00") + .set_progress("orders", "2026-01-01 00:00:00", None) .unwrap(); store - .set_watermark("orders", "2026-02-01 00:00:00") + .set_progress("orders", "2026-02-01 00:00:00", None) .unwrap(); assert_eq!( - store.get_watermark("orders").unwrap().as_deref(), - Some("2026-02-01 00:00:00") + store.get_progress("orders").unwrap(), + Some(("2026-02-01 00:00:00".to_string(), None)) ); let _ = fs::remove_dir_all(&dir); } @@ -178,16 +163,16 @@ mod tests { let dir = temp_state_dir(); let store = StateStore::open(&dir).unwrap(); - store.set_watermark("users", "aaa").unwrap(); - store.set_watermark("orders", "bbb").unwrap(); + store.set_progress("users", "aaa", None).unwrap(); + store.set_progress("orders", "bbb", None).unwrap(); assert_eq!( - store.get_watermark("users").unwrap().as_deref(), - Some("aaa") + store.get_progress("users").unwrap(), + Some(("aaa".to_string(), None)) ); assert_eq!( - store.get_watermark("orders").unwrap().as_deref(), - Some("bbb") + store.get_progress("orders").unwrap(), + Some(("bbb".to_string(), None)) ); let _ = fs::remove_dir_all(&dir); } @@ -198,14 +183,14 @@ mod tests { { let store = StateStore::open(&dir).unwrap(); - store.set_watermark("users", "persisted").unwrap(); + store.set_progress("users", "persisted", None).unwrap(); } { let store = StateStore::open(&dir).unwrap(); assert_eq!( - store.get_watermark("users").unwrap().as_deref(), - Some("persisted") + store.get_progress("users").unwrap(), + Some(("persisted".to_string(), None)) ); } diff --git a/src/sync.rs b/src/sync.rs index 17be062..4482c8d 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -227,11 +227,13 @@ async fn sync_table( table, &columns, &arrow_schema, - watermark_col, - cursor_col, - watermark_val.as_deref(), - cursor_val.as_deref(), - config.batch_size, + reader::ReadBatchOptions { + watermark_col, + cursor_col, + watermark_val: watermark_val.as_deref(), + cursor_val: cursor_val.as_deref(), + batch_size: config.batch_size, + }, ) .await?; From 11d1a736ce63dd318603fe2cd4fd0128c056edb5 Mon Sep 17 00:00:00 2001 From: Maria Dubyaga Date: Wed, 25 Feb 2026 16:53:02 -0500 Subject: [PATCH 3/3] Format config tests with rustfmt --- src/config.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/config.rs b/src/config.rs index 8aed3b5..bb73260 100644 --- a/src/config.rs +++ b/src/config.rs @@ -181,7 +181,10 @@ exclude: assert_eq!(tables.len(), 2); assert_eq!(tables[0].name, "users"); assert_eq!(tables[0].incremental_column.as_deref(), Some("updated_at")); - assert_eq!(tables[0].incremental_tiebreaker_column.as_deref(), Some("id")); + assert_eq!( + tables[0].incremental_tiebreaker_column.as_deref(), + Some("id") + ); assert!(!tables[0].incremental_column_is_unique); assert_eq!(tables[0].columns.as_ref().unwrap().len(), 2); assert!(matches!(tables[1].partition_by, Some(PartitionBy::Date)));