Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ output:
tables:
- name: users
incremental_column: updated_at
incremental_tiebreaker_column: id
columns: # optional: pick specific columns
- id
- email
Expand All @@ -74,8 +75,13 @@ tables:

- name: orders
incremental_column: updated_at
incremental_tiebreaker_column: id

- name: products # no incremental_column = full sync every run

- name: events # append-only example (no updated_at)
incremental_column: id
incremental_column_is_unique: true
```

### All tables (auto-discover)
Expand Down Expand Up @@ -132,6 +138,8 @@ AWS credentials come from environment variables, `~/.aws/credentials`, or IAM ro
| `tables[].schema` | Schema name (default: `public`) |
| `tables[].columns` | Columns to sync (default: all) |
| `tables[].incremental_column` | Column for watermark-based incremental sync |
| `tables[].incremental_tiebreaker_column` | Stable cursor column for duplicate-safe incremental paging (required when `incremental_column` is set; recommended: primary key) |
| `tables[].incremental_column_is_unique` | Allow watermark-only incremental mode when incremental column is strictly unique/monotonic (e.g. append-only `id`) |
| `tables[].partition_by` | Partition output files: `date`, `month`, or `year` |

## How it works
Expand All @@ -140,8 +148,9 @@ AWS credentials come from environment variables, `~/.aws/credentials`, or IAM ro
2. Maps Postgres column types to Arrow types automatically
3. Reads rows in batches, converting to Arrow RecordBatches
4. Writes each batch as a Snappy-compressed Parquet file
5. Tracks the high watermark (max value of `incremental_column`) in local SQLite
6. On next run, only reads rows where `incremental_column > last_watermark`
5. Tracks the high watermark (max value of `incremental_column`) and optional cursor in local SQLite
6. Checkpoints incremental progress after each successfully written batch
7. On next run, reads rows after the saved `(watermark, cursor)` position

Tables without `incremental_column` do a full sync every run.

Expand Down
7 changes: 7 additions & 0 deletions config.example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ state_dir: .rustream_state
tables:
- name: users
incremental_column: updated_at
incremental_tiebreaker_column: id
columns: # optional: pick specific columns
- id
- email
Expand All @@ -43,12 +44,18 @@ tables:

- name: orders
incremental_column: updated_at
incremental_tiebreaker_column: id
partition_by: date # table/year=2026/month=02/day=10/...
# partition_by: month # table/year=2026/month=02/...
# partition_by: year # table/year=2026/...

- name: products # no incremental_column = full sync every run

# Example for append-only tables without updated_at:
# - name: events
# incremental_column: id
# incremental_column_is_unique: true

# Option 2: Auto-discover all tables (remove `tables` above and uncomment below)
# schema: public # which schema to discover from (default: public)
# exclude: # skip these tables
Expand Down
15 changes: 15 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ pub struct TableConfig {
#[serde(default)]
pub incremental_column: Option<String>,
#[serde(default)]
pub incremental_tiebreaker_column: Option<String>,
#[serde(default)]
pub incremental_column_is_unique: bool,
#[serde(default)]
pub partition_by: Option<PartitionBy>,
}

Expand Down Expand Up @@ -156,6 +160,8 @@ state_dir: /tmp/state
tables:
- name: users
incremental_column: updated_at
incremental_tiebreaker_column: id
incremental_column_is_unique: false
columns:
- id
- email
Expand All @@ -175,6 +181,11 @@ exclude:
assert_eq!(tables.len(), 2);
assert_eq!(tables[0].name, "users");
assert_eq!(tables[0].incremental_column.as_deref(), Some("updated_at"));
assert_eq!(
tables[0].incremental_tiebreaker_column.as_deref(),
Some("id")
);
assert!(!tables[0].incremental_column_is_unique);
assert_eq!(tables[0].columns.as_ref().unwrap().len(), 2);
assert!(matches!(tables[1].partition_by, Some(PartitionBy::Date)));

Expand Down Expand Up @@ -231,6 +242,8 @@ exclude:
schema: Some("analytics".into()),
columns: None,
incremental_column: None,
incremental_tiebreaker_column: None,
incremental_column_is_unique: false,
partition_by: None,
};
assert_eq!(t.full_name(), "analytics.users");
Expand All @@ -244,6 +257,8 @@ exclude:
schema: None,
columns: None,
incremental_column: None,
incremental_tiebreaker_column: None,
incremental_column_is_unique: false,
partition_by: None,
};
assert_eq!(t.full_name(), "orders");
Expand Down
150 changes: 105 additions & 45 deletions src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,22 @@ use tokio_postgres::{Client, Row};
use crate::config::TableConfig;
use crate::schema::ColumnInfo;

pub struct ReadBatchOptions<'a> {
pub watermark_col: Option<&'a str>,
pub cursor_col: Option<&'a str>,
pub watermark_val: Option<&'a str>,
pub cursor_val: Option<&'a str>,
pub batch_size: usize,
}

/// Build the SELECT query for a table, optionally filtering by watermark.
pub fn build_query(
table: &TableConfig,
columns: &[ColumnInfo],
watermark_col: Option<&str>,
cursor_col: Option<&str>,
watermark_val: Option<&str>,
cursor_val: Option<&str>,
batch_size: usize,
) -> String {
let cols = columns
Expand All @@ -27,12 +37,22 @@ pub fn build_query(
let full_name = table.full_name();
let mut query = format!("SELECT {cols} FROM {full_name}");

if let (Some(col), Some(val)) = (watermark_col, watermark_val) {
query.push_str(&format!(" WHERE \"{col}\" > '{val}'"));
if let (Some(wm_col), Some(_)) = (watermark_col, watermark_val) {
if let (Some(cur_col), Some(_)) = (cursor_col, cursor_val) {
query.push_str(&format!(
" WHERE (\"{wm_col}\" > $1 OR (\"{wm_col}\" = $1 AND \"{cur_col}\" > $2))"
));
} else {
query.push_str(&format!(" WHERE \"{wm_col}\" > $1"));
}
}

if let Some(col) = watermark_col {
query.push_str(&format!(" ORDER BY \"{col}\" ASC"));
if let Some(wm_col) = watermark_col {
if let Some(cur_col) = cursor_col {
query.push_str(&format!(" ORDER BY \"{wm_col}\" ASC, \"{cur_col}\" ASC"));
} else {
query.push_str(&format!(" ORDER BY \"{wm_col}\" ASC"));
}
}

query.push_str(&format!(" LIMIT {batch_size}"));
Expand All @@ -45,18 +65,34 @@ pub async fn read_batch(
table: &TableConfig,
columns: &[ColumnInfo],
schema: &Arc<Schema>,
watermark_col: Option<&str>,
watermark_val: Option<&str>,
batch_size: usize,
options: ReadBatchOptions<'_>,
) -> Result<Option<RecordBatch>> {
let query = build_query(table, columns, watermark_col, watermark_val, batch_size);
let query = build_query(
table,
columns,
options.watermark_col,
options.cursor_col,
options.watermark_val,
options.cursor_val,
options.batch_size,
);

tracing::debug!(%query, "executing query");

let rows = client
.query(&query as &str, &[])
.await
.with_context(|| format!("querying table {}", table.full_name()))?;
let rows = match (options.watermark_val, options.cursor_val) {
(Some(wm), Some(cur)) if options.cursor_col.is_some() => client
.query(&query as &str, &[&wm, &cur])
.await
.with_context(|| format!("querying table {}", table.full_name()))?,
(Some(wm), _) => client
.query(&query as &str, &[&wm])
.await
.with_context(|| format!("querying table {}", table.full_name()))?,
(None, _) => client
.query(&query as &str, &[])
.await
.with_context(|| format!("querying table {}", table.full_name()))?,
};

if rows.is_empty() {
return Ok(None);
Expand Down Expand Up @@ -167,6 +203,37 @@ fn rows_to_array(rows: &[Row], col_idx: usize, arrow_type: &DataType) -> Result<
}
}

/// Try to get a PG column value as a String, handling various types.
fn get_as_string(row: &Row, idx: usize) -> Option<String> {
let col_type = row.columns()[idx].type_();

match *col_type {
Type::TEXT | Type::VARCHAR | Type::BPCHAR | Type::NAME => row.get::<_, Option<String>>(idx),
Type::UUID => row.get::<_, Option<uuid::Uuid>>(idx).map(|u| u.to_string()),
Type::JSON | Type::JSONB => row
.get::<_, Option<serde_json::Value>>(idx)
.map(|v| v.to_string()),
Type::NUMERIC => row.get::<_, Option<String>>(idx),
Type::TIMESTAMP => row
.get::<_, Option<NaiveDateTime>>(idx)
.map(|dt| dt.to_string()),
Type::TIMESTAMPTZ => row
.get::<_, Option<chrono::DateTime<chrono::Utc>>>(idx)
.map(|dt| dt.to_rfc3339()),
Type::DATE => row.get::<_, Option<NaiveDate>>(idx).map(|d| d.to_string()),
Type::INT4 => row.get::<_, Option<i32>>(idx).map(|v| v.to_string()),
Type::INT8 => row.get::<_, Option<i64>>(idx).map(|v| v.to_string()),
Type::INT2 => row.get::<_, Option<i16>>(idx).map(|v| v.to_string()),
Type::FLOAT4 => row.get::<_, Option<f32>>(idx).map(|v| v.to_string()),
Type::FLOAT8 => row.get::<_, Option<f64>>(idx).map(|v| v.to_string()),
Type::BOOL => row.get::<_, Option<bool>>(idx).map(|v| v.to_string()),
_ => {
// Last resort: try as String
row.try_get::<_, Option<String>>(idx).unwrap_or(None)
}
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand All @@ -177,6 +244,8 @@ mod tests {
schema: None,
columns: None,
incremental_column: None,
incremental_tiebreaker_column: None,
incremental_column_is_unique: false,
partition_by: None,
}
}
Expand Down Expand Up @@ -208,7 +277,7 @@ mod tests {
fn build_query_full_table() {
let table = make_table("users");
let cols = make_columns();
let q = build_query(&table, &cols, None, None, 1000);
let q = build_query(&table, &cols, None, None, None, None, 1000);
assert_eq!(
q,
"SELECT \"id\", \"name\", \"updated_at\" FROM users LIMIT 1000"
Expand All @@ -222,10 +291,12 @@ mod tests {
schema: Some("analytics".into()),
columns: None,
incremental_column: None,
incremental_tiebreaker_column: None,
incremental_column_is_unique: false,
partition_by: None,
};
let cols = make_columns();
let q = build_query(&table, &cols, None, None, 500);
let q = build_query(&table, &cols, None, None, None, None, 500);
assert_eq!(
q,
"SELECT \"id\", \"name\", \"updated_at\" FROM analytics.users LIMIT 500"
Expand All @@ -236,7 +307,7 @@ mod tests {
fn build_query_with_watermark_no_value() {
let table = make_table("users");
let cols = make_columns();
let q = build_query(&table, &cols, Some("updated_at"), None, 1000);
let q = build_query(&table, &cols, Some("updated_at"), None, None, None, 1000);
assert_eq!(
q,
"SELECT \"id\", \"name\", \"updated_at\" FROM users ORDER BY \"updated_at\" ASC LIMIT 1000"
Expand All @@ -251,42 +322,31 @@ mod tests {
&table,
&cols,
Some("updated_at"),
None,
Some("2026-01-01 00:00:00"),
None,
1000,
);
assert!(q.contains("WHERE \"updated_at\" > '2026-01-01 00:00:00'"));
assert!(q.contains("WHERE \"updated_at\" > $1"));
assert!(q.contains("ORDER BY \"updated_at\" ASC"));
assert!(q.ends_with("LIMIT 1000"));
}
}

/// Try to get a PG column value as a String, handling various types.
fn get_as_string(row: &Row, idx: usize) -> Option<String> {
let col_type = row.columns()[idx].type_();

match *col_type {
Type::TEXT | Type::VARCHAR | Type::BPCHAR | Type::NAME => row.get::<_, Option<String>>(idx),
Type::UUID => row.get::<_, Option<uuid::Uuid>>(idx).map(|u| u.to_string()),
Type::JSON | Type::JSONB => row
.get::<_, Option<serde_json::Value>>(idx)
.map(|v| v.to_string()),
Type::NUMERIC => row.get::<_, Option<String>>(idx),
Type::TIMESTAMP => row
.get::<_, Option<NaiveDateTime>>(idx)
.map(|dt| dt.to_string()),
Type::TIMESTAMPTZ => row
.get::<_, Option<chrono::DateTime<chrono::Utc>>>(idx)
.map(|dt| dt.to_rfc3339()),
Type::DATE => row.get::<_, Option<NaiveDate>>(idx).map(|d| d.to_string()),
Type::INT4 => row.get::<_, Option<i32>>(idx).map(|v| v.to_string()),
Type::INT8 => row.get::<_, Option<i64>>(idx).map(|v| v.to_string()),
Type::INT2 => row.get::<_, Option<i16>>(idx).map(|v| v.to_string()),
Type::FLOAT4 => row.get::<_, Option<f32>>(idx).map(|v| v.to_string()),
Type::FLOAT8 => row.get::<_, Option<f64>>(idx).map(|v| v.to_string()),
Type::BOOL => row.get::<_, Option<bool>>(idx).map(|v| v.to_string()),
_ => {
// Last resort: try as String
row.try_get::<_, Option<String>>(idx).unwrap_or(None)
}
#[test]
fn build_query_with_watermark_and_cursor() {
let table = make_table("users");
let cols = make_columns();
let q = build_query(
&table,
&cols,
Some("updated_at"),
Some("id"),
Some("2026-01-01 00:00:00"),
Some("100"),
1000,
);
assert!(q.contains("WHERE (\"updated_at\" > $1 OR (\"updated_at\" = $1 AND \"id\" > $2))"));
assert!(q.contains("ORDER BY \"updated_at\" ASC, \"id\" ASC"));
assert!(q.ends_with("LIMIT 1000"));
}
}
Loading