Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,883 changes: 1,653 additions & 230 deletions Cargo.lock

Large diffs are not rendered by default.

23 changes: 19 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ tokio = { version = "1", features = ["full"] }
tokio-postgres = { version = "0.7", features = ["with-chrono-0_4", "with-serde_json-1", "with-uuid-1"] }
postgres-types = { version = "0.2", features = ["with-chrono-0_4", "with-serde_json-1", "with-uuid-1"] }

# Arrow / Parquet
arrow = { version = "53", features = ["chrono-tz"] }
parquet = { version = "53", features = ["arrow"] }
# Arrow / Parquet (v57 to match iceberg 0.8)
arrow = { version = "57", features = ["chrono-tz"] }
parquet = { version = "57", features = ["arrow"] }

# CLI
clap = { version = "4", features = ["derive"] }
Expand All @@ -39,11 +39,26 @@ rusqlite = { version = "0.32", features = ["bundled"] }
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }

# Iceberg
iceberg = "0.8"
iceberg-catalog-rest = "0.8"

# Utils
chrono = { version = "0.4", features = ["serde"] }
uuid = { version = "1", features = ["serde"] }
anyhow = "1"
serde_json = "1"
bytes = "1"
glob = "0.3"
arrow-csv = "57"

[features]
default = []
glue = ["iceberg-catalog-glue"]

[dependencies.iceberg-catalog-glue]
version = "0.8"
optional = true

[dev-dependencies]
bytes = "1"
tempfile = "3"
125 changes: 123 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# rustream

Fast Postgres to Parquet sync tool. Reads tables from Postgres, writes Parquet files to local disk or S3. Supports incremental sync via `updated_at` watermark tracking.
Bidirectional Postgres sync tool. Reads tables from Postgres and writes Parquet/Iceberg files to local disk or S3, or ingests Parquet/CSV files from local disk or S3 back into Postgres. Supports incremental sync via watermark tracking and upsert-based ingestion.

## Installation

Expand Down Expand Up @@ -31,6 +31,8 @@ maturin develop --release

## Usage

### Sync (Postgres → Parquet/S3)

```bash
# Copy and edit the example config
cp config.example.yaml config.yaml
Expand All @@ -42,10 +44,21 @@ rustream sync --config config.yaml --dry-run
rustream sync --config config.yaml
```

### Ingest (S3/local → Postgres)

```bash
# Preview what would be ingested
rustream ingest --config ingest_config.yaml --dry-run

# Run ingest
rustream ingest --config ingest_config.yaml
```

Enable debug logging with `RUST_LOG`:

```bash
RUST_LOG=rustream=debug rustream sync --config config.yaml
RUST_LOG=rustream=debug rustream ingest --config ingest_config.yaml
```

## Configuration
Expand Down Expand Up @@ -116,7 +129,72 @@ output:

AWS credentials come from environment variables, `~/.aws/credentials`, or IAM role.

### Config reference
### Iceberg output

```yaml
output:
type: s3
bucket: my-data-lake
prefix: warehouse
region: us-east-1

format: iceberg
warehouse: s3://my-data-lake/warehouse
catalog:
type: filesystem # or "glue" (requires --features glue)
# glue_database: my_db # required when type=glue
```

### Ingest (S3 → Postgres)

```yaml
postgres:
host: localhost
database: mydb
user: postgres
password: secret

ingest:
input:
type: s3
bucket: my-data-lake
prefix: raw/postgres/
region: us-east-1
pattern: "**/*.parquet"

file_format: parquet # "parquet" or "csv"
write_mode: upsert # "insert" | "upsert" | "truncate_insert"
batch_size: 5000
target_schema: public

tables:
- file_pattern: "users/*.parquet"
target_table: users
key_columns: [id]
create_if_missing: true

- file_pattern: "orders/*.parquet"
target_table: orders
key_columns: [id]
```

### Ingest from local files

```yaml
ingest:
input:
type: local
path: ./parquet_files
pattern: "**/*.parquet"

file_format: parquet
write_mode: insert
batch_size: 5000
```

If no `tables` are listed, the target table name is inferred from the parent directory or filename.

### Config reference (sync)

| Field | Description |
|---|---|
Expand All @@ -141,9 +219,43 @@ AWS credentials come from environment variables, `~/.aws/credentials`, or IAM ro
| `tables[].incremental_tiebreaker_column` | Stable cursor column for duplicate-safe incremental paging (required when `incremental_column` is set; recommended: primary key) |
| `tables[].incremental_column_is_unique` | Allow watermark-only incremental mode when incremental column is strictly unique/monotonic (e.g. append-only `id`) |
| `tables[].partition_by` | Partition output files: `date`, `month`, or `year` |
| `format` | Output format: `parquet` (default) or `iceberg` |
| `warehouse` | Warehouse path for Iceberg (required when format=iceberg) |
| `catalog.type` | Iceberg catalog: `filesystem` (default) or `glue` |

### Config reference (ingest)

| Field | Description |
|---|---|
| `ingest.input.type` | `local` or `s3` |
| `ingest.input.path` | Local directory (when type=local) |
| `ingest.input.bucket` | S3 bucket (when type=s3) |
| `ingest.input.prefix` | S3 key prefix (when type=s3) |
| `ingest.input.region` | AWS region (when type=s3, optional) |
| `ingest.input.pattern` | Glob pattern for file matching (default: `**/*.parquet`) |
| `ingest.file_format` | `parquet` (default) or `csv` |
| `ingest.write_mode` | `insert` (default), `upsert`, or `truncate_insert` |
| `ingest.batch_size` | Rows per INSERT statement (default: 5000) |
| `ingest.target_schema` | Postgres schema for target tables (default: `public`) |
| `ingest.tables[].file_pattern` | Glob pattern to match files to this table |
| `ingest.tables[].target_table` | Postgres table to write to |
| `ingest.tables[].key_columns` | Primary key columns (required for upsert mode) |
| `ingest.tables[].create_if_missing` | Auto-CREATE TABLE from file schema (default: false) |

## Running Integration Tests

Some DB-backed tests are optional and run only when `RUSTREAM_IT_DB_URL` is set.
Without this env var, those tests no-op/return early.

```bash
export RUSTREAM_IT_DB_URL="host=localhost port=5432 dbname=mydb user=postgres password=secret"
cargo test
```

## How it works

### Sync (Postgres → Parquet)

1. Connects to Postgres and introspects each table's schema via `information_schema`
2. Maps Postgres column types to Arrow types automatically
3. Reads rows in batches, converting to Arrow RecordBatches
Expand All @@ -154,6 +266,15 @@ AWS credentials come from environment variables, `~/.aws/credentials`, or IAM ro

Tables without `incremental_column` do a full sync every run.

### Ingest (Parquet/CSV → Postgres)

1. Discovers files matching the glob pattern from local disk or S3
2. Skips files already ingested (tracked in local SQLite)
3. Reads each file into Arrow RecordBatches (Parquet or CSV with schema inference)
4. Creates the target table if `create_if_missing: true` (DDL from Arrow schema)
5. Writes rows via multi-row parameterized INSERT or INSERT...ON CONFLICT (upsert)
6. Marks each file as ingested in SQLite to avoid reprocessing on next run

## Supported Postgres types

| Postgres | Arrow |
Expand Down
47 changes: 47 additions & 0 deletions config.example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,18 @@ output:
# prefix: raw/postgres
# region: us-east-1

# Output format: parquet (default) or iceberg
# format: parquet

# Iceberg output (uncomment to use instead of standalone Parquet)
# Writes proper Iceberg table metadata so Spark, Trino, Athena can query it.
# format: iceberg
# warehouse: s3://my-bucket/warehouse # or ./local_warehouse
# catalog:
# type: filesystem # default, zero setup
# # type: glue # for Athena (requires --features glue)
# # glue_database: my_db # required when type=glue

# Batch size for reading rows from Postgres
batch_size: 10000

Expand Down Expand Up @@ -61,3 +73,38 @@ tables:
# exclude: # skip these tables
# - schema_migrations
# - ar_internal_metadata

# ─── Ingest: load Parquet/CSV files into Postgres ───────────────
# Preview before ingesting:
# rustream ingest --config config.yaml --dry-run
#
# ingest:
# input:
# type: local
# path: ./parquet_files
# pattern: "**/*.parquet"
#
# # S3 input (uncomment to use instead of local):
# # input:
# # type: s3
# # bucket: my-data-lake
# # prefix: raw/postgres
# # region: us-east-1
# # pattern: "**/*.parquet"
#
# file_format: parquet # "parquet" or "csv"
# write_mode: insert # "insert" | "upsert" | "truncate_insert"
# batch_size: 5000
# target_schema: public # Postgres schema for target tables
#
# tables:
# - file_pattern: "users/*.parquet"
# target_table: users
# key_columns: [id] # required for upsert mode
# create_if_missing: true # auto-CREATE TABLE from file schema
#
# - file_pattern: "orders/**/*.parquet"
# target_table: orders
# key_columns: [id]
#
# # If no tables listed, table name is inferred from directory/filename
63 changes: 63 additions & 0 deletions src/catalog.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
use anyhow::{Context, Result};
use iceberg::memory::{MemoryCatalogBuilder, MEMORY_CATALOG_WAREHOUSE};
use iceberg::{Catalog, CatalogBuilder};
use std::collections::HashMap;
use std::sync::Arc;

use crate::config::{CatalogConfig, CatalogType};

/// Build a catalog from the config. Returns an Arc<dyn Catalog> so callers
/// don't need to know the concrete type.
///
/// - **filesystem** (default): Uses a MemoryCatalog backed by FileIO pointed
/// at the warehouse path. Metadata JSON and Parquet live side-by-side.
/// - **glue**: Requires the `glue` feature flag.
pub async fn build_catalog(
warehouse: &str,
catalog_config: Option<&CatalogConfig>,
) -> Result<Arc<dyn Catalog>> {
let catalog_type = catalog_config
.map(|c| &c.catalog_type)
.unwrap_or(&CatalogType::Filesystem);

match catalog_type {
CatalogType::Filesystem => build_filesystem_catalog(warehouse).await,
CatalogType::Glue => build_glue_catalog(warehouse, catalog_config.unwrap()).await,
}
}

async fn build_filesystem_catalog(warehouse: &str) -> Result<Arc<dyn Catalog>> {
let mut props = HashMap::new();
props.insert(MEMORY_CATALOG_WAREHOUSE.to_string(), warehouse.to_string());

let catalog = MemoryCatalogBuilder::default()
.load("rustream", props)
.await
.context("building filesystem catalog")?;

tracing::info!(warehouse = %warehouse, "created filesystem catalog");
Ok(Arc::new(catalog))
}

async fn build_glue_catalog(
_warehouse: &str,
catalog_config: &CatalogConfig,
) -> Result<Arc<dyn Catalog>> {
let _db = catalog_config
.glue_database
.as_deref()
.expect("glue_database validated at config load");

#[cfg(feature = "glue")]
{
anyhow::bail!("Glue catalog support is compiled but not yet wired up. Coming soon.");
}

#[cfg(not(feature = "glue"))]
{
anyhow::bail!(
"Glue catalog requires the 'glue' feature. \
Rebuild with: cargo build --features glue"
);
}
}
Loading