Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,16 @@ If no `tables` are listed, the target table name is inferred from the parent dir
| `ingest.tables[].key_columns` | Primary key columns (required for upsert mode) |
| `ingest.tables[].create_if_missing` | Auto-CREATE TABLE from file schema (default: false) |

## Running Integration Tests

Some DB-backed tests are optional and run only when `RUSTREAM_IT_DB_URL` is set.
Without this env var, those tests no-op/return early.

```bash
export RUSTREAM_IT_DB_URL="host=localhost port=5432 dbname=mydb user=postgres password=secret"
cargo test
```

## How it works

### Sync (Postgres → Parquet)
Expand Down
136 changes: 136 additions & 0 deletions src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,10 @@ fn get_as_string(row: &Row, idx: usize) -> Option<String> {
#[cfg(test)]
mod tests {
use super::*;
use crate::schema;
use crate::sync::extract_watermark;
use std::time::{SystemTime, UNIX_EPOCH};
use tokio_postgres::NoTls;

fn make_table(name: &str) -> TableConfig {
TableConfig {
Expand Down Expand Up @@ -349,4 +353,136 @@ mod tests {
assert!(q.contains("ORDER BY \"updated_at\" ASC, \"id\" ASC"));
assert!(q.ends_with("LIMIT 1000"));
}

/// Ensures duplicate watermark values are paged safely using the cursor tiebreaker.
#[tokio::test]
async fn read_batch_cursor_paging_handles_duplicate_watermarks() {
let db_url = match std::env::var("RUSTREAM_IT_DB_URL") {
Ok(v) => v,
Err(_) => return, // Optional integration-style test; set env var to run.
};

let (client, connection) = tokio_postgres::connect(&db_url, NoTls).await.unwrap();
tokio::spawn(async move {
let _ = connection.await;
});

let suffix = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_nanos();
let table_name = format!("rustream_it_reader_{suffix}");

client
.execute(
&format!(
"CREATE TABLE public.{table_name} (
id INTEGER PRIMARY KEY,
updated_at TIMESTAMPTZ NOT NULL,
payload TEXT
)"
),
&[],
)
.await
.unwrap();

client
.execute(
&format!(
"INSERT INTO public.{table_name} (id, updated_at, payload) VALUES
(1, '2026-01-01T00:00:00Z', 'a'),
(2, '2026-01-01T00:00:00Z', 'b'),
(3, '2026-01-01T00:00:00Z', 'c'),
(4, '2026-01-01T00:01:00Z', 'd')"
),
&[],
)
.await
.unwrap();

let table = TableConfig {
name: table_name.clone(),
schema: Some("public".into()),
columns: None,
incremental_column: Some("updated_at".into()),
incremental_tiebreaker_column: Some("id".into()),
incremental_column_is_unique: false,
partition_by: None,
};
let columns = schema::introspect_table(&client, &table).await.unwrap();
let arrow_schema = schema::build_arrow_schema(&columns);

let batch1 = read_batch(
&client,
&table,
&columns,
&arrow_schema,
ReadBatchOptions {
watermark_col: Some("updated_at"),
cursor_col: Some("id"),
watermark_val: None,
cursor_val: None,
batch_size: 2,
},
)
.await
.unwrap()
.unwrap();
assert_eq!(batch1.num_rows(), 2);
let wm1 = extract_watermark(&columns, &batch1, "updated_at").unwrap();
let cur1 = extract_watermark(&columns, &batch1, "id").unwrap();

let batch2 = read_batch(
&client,
&table,
&columns,
&arrow_schema,
ReadBatchOptions {
watermark_col: Some("updated_at"),
cursor_col: Some("id"),
watermark_val: Some(&wm1),
cursor_val: Some(&cur1),
batch_size: 2,
},
)
.await
.unwrap()
.unwrap();
assert_eq!(batch2.num_rows(), 2);

let id_col = columns.iter().position(|c| c.name == "id").unwrap();
let ids = batch2
.column(id_col)
.as_any()
.downcast_ref::<Int32Array>()
.unwrap();
assert_eq!(ids.value(0), 3);
assert_eq!(ids.value(1), 4);

let wm2 = extract_watermark(&columns, &batch2, "updated_at").unwrap();
let cur2 = extract_watermark(&columns, &batch2, "id").unwrap();

let batch3 = read_batch(
&client,
&table,
&columns,
&arrow_schema,
ReadBatchOptions {
watermark_col: Some("updated_at"),
cursor_col: Some("id"),
watermark_val: Some(&wm2),
cursor_val: Some(&cur2),
batch_size: 2,
},
)
.await
.unwrap();
assert!(batch3.is_none());

client
.execute(&format!("DROP TABLE public.{table_name}"), &[])
.await
.unwrap();
}
}
48 changes: 48 additions & 0 deletions src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ impl StateStore {
#[cfg(test)]
mod tests {
use super::*;
use rusqlite::Connection;
use std::fs;

use std::sync::atomic::{AtomicU32, Ordering};
Expand Down Expand Up @@ -282,4 +283,51 @@ mod tests {

let _ = fs::remove_dir_all(&dir);
}

/// Verifies opening state migrates legacy watermark schema and preserves existing values.
#[test]
fn open_migrates_legacy_watermarks_table() {
let dir = temp_state_dir();
fs::create_dir_all(&dir).unwrap();
let db_path = Path::new(&dir).join("rustream_state.db");

// Simulate an old state db that predates cursor_value.
let conn = Connection::open(&db_path).unwrap();
conn.execute_batch(
"CREATE TABLE watermarks (
table_name TEXT PRIMARY KEY,
watermark_value TEXT NOT NULL,
updated_at TEXT NOT NULL DEFAULT (datetime('now'))
);",
)
.unwrap();
conn.execute(
"INSERT INTO watermarks (table_name, watermark_value) VALUES (?1, ?2)",
["users", "2026-01-01 00:00:00"],
)
.unwrap();
drop(conn);

let store = StateStore::open(&dir).unwrap();
assert_eq!(
store.get_progress("users").unwrap(),
Some(("2026-01-01 00:00:00".to_string(), None))
);

// Verify migrated schema contains cursor_value.
let conn = Connection::open(&db_path).unwrap();
let mut stmt = conn.prepare("PRAGMA table_info(watermarks)").unwrap();
let mut rows = stmt.query([]).unwrap();
let mut has_cursor = false;
while let Some(row) = rows.next().unwrap() {
let col_name: String = row.get(1).unwrap();
if col_name == "cursor_value" {
has_cursor = true;
break;
}
}
assert!(has_cursor);

let _ = fs::remove_dir_all(&dir);
}
}
Loading