From 640e0fb82fbf89cb818e9f19caf768fdd70b6670 Mon Sep 17 00:00:00 2001 From: Maria Dubyaga Date: Tue, 24 Mar 2026 18:49:25 -0400 Subject: [PATCH 1/8] Improve jobs control plane and status API --- Cargo.lock | 82 +++++++++ Cargo.toml | 1 + README.md | 29 ++++ src/jobs.rs | 413 ++++++++++++++++++++++++++++++++++++++++++++++ src/main.rs | 287 ++++++++++++++++++++++++++++++++ src/status_api.rs | 211 +++++++++++++++++++++++ 6 files changed, 1023 insertions(+) create mode 100644 src/jobs.rs create mode 100644 src/status_api.rs diff --git a/Cargo.lock b/Cargo.lock index b15d56d..4f7adcb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -895,6 +895,61 @@ dependencies = [ "tracing", ] +[[package]] +name = "axum" +version = "0.7.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "edca88bc138befd0323b20752846e6587272d3b03b0343c8ea28a6f819e6e71f" +dependencies = [ + "async-trait", + "axum-core", + "bytes", + "futures-util", + "http 1.4.0", + "http-body 1.0.1", + "http-body-util", + "hyper 1.8.1", + "hyper-util", + "itoa", + "matchit", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "rustversion", + "serde", + "serde_json", + "serde_path_to_error", + "serde_urlencoded", + "sync_wrapper", + "tokio", + "tower", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "axum-core" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09f2bd6146b97ae3359fa0cc6d6b376d9539582c7b4220f041a33ec24c226199" +dependencies = [ + "async-trait", + "bytes", + "futures-util", + "http 1.4.0", + "http-body 1.0.1", + "http-body-util", + "mime", + "pin-project-lite", + "rustversion", + "sync_wrapper", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "backon" version = "1.6.0" @@ -2159,6 +2214,7 @@ dependencies = [ "http 1.4.0", "http-body 1.0.1", "httparse", + "httpdate", "itoa", "pin-project-lite", "pin-utils", @@ -2731,6 +2787,12 @@ dependencies = [ "regex-automata", ] +[[package]] +name = "matchit" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" + [[package]] name = "md-5" version = "0.10.6" @@ -2747,6 +2809,12 @@ version = "2.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f8ca58f447f06ed17d5fc4043ce1b10dd205e060fb3ce5b979b8ed8e59ff3f79" +[[package]] +name = "mime" +version = "0.3.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" + [[package]] name = "miniz_oxide" version = "0.8.9" @@ -3718,6 +3786,7 @@ dependencies = [ "arrow-csv", "aws-config", "aws-sdk-s3", + "axum", "bytes", "chrono", "clap", @@ -3908,6 +3977,17 @@ dependencies = [ "zmij", ] +[[package]] +name = "serde_path_to_error" +version = "0.1.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "10a9ff822e371bb5403e391ecd83e182e0e77ba7f6fe0160b795797109d1b457" +dependencies = [ + "itoa", + "serde", + "serde_core", +] + [[package]] name = "serde_repr" version = "0.1.20" @@ -4456,6 +4536,7 @@ dependencies = [ "tokio", "tower-layer", "tower-service", + "tracing", ] [[package]] @@ -4494,6 +4575,7 @@ version = "0.1.44" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "63e71662fa4b2a2c3a26f570f037eb95bb1f85397f3cd8076caed2f026a6d100" dependencies = [ + "log", "pin-project-lite", "tracing-attributes", "tracing-core", diff --git a/Cargo.toml b/Cargo.toml index f0a77ee..50f07a8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -51,6 +51,7 @@ serde_json = "1" bytes = "1" glob = "0.3" arrow-csv = "57" +axum = { version = "0.7", features = ["json"] } [features] default = [] diff --git a/README.md b/README.md index d8cb79a..5235c80 100644 --- a/README.md +++ b/README.md @@ -61,6 +61,35 @@ RUST_LOG=rustream=debug rustream sync --config config.yaml RUST_LOG=rustream=debug rustream ingest --config ingest_config.yaml ``` +### Jobs / Worker (experimental) + +``` +# create control-plane table in Postgres +rustream init-jobs --control-db-url "$CONTROL_DB_URL" + +# enqueue a table sync job +rustream add-job --control-db-url "$CONTROL_DB_URL" --table users --config config.yaml --interval-secs 300 --timeout-secs 900 --max-concurrent-jobs 1 + +# run the worker loop (polls every 5s by default, runs up to 4 jobs at once) +rustream worker --control-db-url "$CONTROL_DB_URL" --poll-seconds 5 --max-concurrent 4 + +# force a job to run ASAP +rustream force-job --control-db-url "$CONTROL_DB_URL" --job-id 1 + +# status API (optional, returns JSON) +rustream status-api --control-db-url "$CONTROL_DB_URL" --bind 0.0.0.0:8080 + +# control DB URL can also come from env +# RUSTREAM_CONTROL_DB_URL=postgres://user:pass@host:5432/db rustream worker +# status endpoints: +# /jobs (json, optional ?status=pending), /jobs/html (auto-refresh + filter), +# /jobs/summary, /logs?limit=50, /health + +# optional: run a data-quality command on each local output table +# (use {path} placeholder for the Parquet directory) +RUSTREAM_DQ_CMD="dq-prof --input {path}" rustream worker --control-db-url "$CONTROL_DB_URL" +``` + ## Configuration ### Specific tables (recommended) diff --git a/src/jobs.rs b/src/jobs.rs new file mode 100644 index 0000000..3c94583 --- /dev/null +++ b/src/jobs.rs @@ -0,0 +1,413 @@ +use anyhow::{anyhow, Context, Result}; +use chrono::{Duration, Utc}; +use serde::Serialize; +use tokio_postgres::{Client, NoTls, Row}; + +use crate::config::TableConfig; + +pub const DEFAULT_INTERVAL_SECS: i64 = 300; + +#[derive(Debug, Clone)] +pub struct Job { + pub id: i32, + pub table_name: String, + pub config_path: String, + pub interval_secs: i32, + pub max_concurrent_jobs: i32, + pub timeout_secs: i32, +} + +#[derive(Debug, Serialize)] +pub struct JobStatus { + pub id: i32, + pub table_name: String, + pub status: String, + pub next_run: Option, + pub last_run: Option, + pub last_error: Option, +} + +pub async fn connect(control_db_url: &str) -> Result { + let (client, connection) = tokio_postgres::connect(control_db_url, NoTls) + .await + .context("connecting to control plane Postgres")?; + + tokio::spawn(async move { + if let Err(e) = connection.await { + tracing::error!(error = %e, "control plane connection error"); + } + }); + + Ok(client) +} + +pub async fn ensure_jobs_table(client: &Client) -> Result<()> { + client + .execute( + "CREATE TABLE IF NOT EXISTS rustream_jobs ( + id SERIAL PRIMARY KEY, + table_name TEXT NOT NULL, + config_path TEXT NOT NULL, + status TEXT NOT NULL DEFAULT 'pending', + next_run TIMESTAMPTZ NOT NULL DEFAULT now(), + last_run TIMESTAMPTZ, + last_error TEXT, + interval_secs INTEGER NOT NULL DEFAULT 300, + max_concurrent_jobs INTEGER NOT NULL DEFAULT 1, + timeout_secs INTEGER NOT NULL DEFAULT 0, + created_at TIMESTAMPTZ NOT NULL DEFAULT now(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT now() + )", + &[], + ) + .await + .context("creating rustream_jobs table")?; + + // Backfill columns for existing deployments (idempotent on recent Postgres versions) + client + .execute( + "ALTER TABLE rustream_jobs ADD COLUMN IF NOT EXISTS max_concurrent_jobs INTEGER NOT NULL DEFAULT 1", + &[], + ) + .await + .context("adding max_concurrent_jobs column")?; + client + .execute( + "ALTER TABLE rustream_jobs ADD COLUMN IF NOT EXISTS timeout_secs INTEGER NOT NULL DEFAULT 0", + &[], + ) + .await + .context("adding timeout_secs column")?; + + client + .execute( + "CREATE TABLE IF NOT EXISTS rustream_job_runs ( + id SERIAL PRIMARY KEY, + job_id INTEGER NOT NULL REFERENCES rustream_jobs(id) ON DELETE CASCADE, + started_at TIMESTAMPTZ NOT NULL DEFAULT now(), + finished_at TIMESTAMPTZ, + status TEXT NOT NULL, + error TEXT, + severity TEXT, + metrics_json TEXT + )", + &[], + ) + .await + .context("creating rustream_job_runs table")?; + + // Backfill columns for existing deployments + client + .execute( + "ALTER TABLE rustream_job_runs ADD COLUMN IF NOT EXISTS severity TEXT", + &[], + ) + .await + .context("adding severity column to rustream_job_runs")?; + client + .execute( + "ALTER TABLE rustream_job_runs ADD COLUMN IF NOT EXISTS metrics_json TEXT", + &[], + ) + .await + .context("adding metrics_json column to rustream_job_runs")?; + + Ok(()) +} + +pub async fn enqueue_job( + client: &Client, + table_name: &str, + config_path: &str, + interval_secs: Option, + max_concurrent_jobs: Option, + timeout_secs: Option, +) -> Result { + let interval = interval_secs.unwrap_or(DEFAULT_INTERVAL_SECS) as i32; + let max_concurrent_jobs = max_concurrent_jobs.unwrap_or(1); + let timeout_secs = timeout_secs.unwrap_or(0); + let row = client + .query_one( + "INSERT INTO rustream_jobs (table_name, config_path, interval_secs, max_concurrent_jobs, timeout_secs) + VALUES ($1, $2, $3, $4, $5) + RETURNING id", + &[&table_name, &config_path, &interval, &max_concurrent_jobs, &timeout_secs], + ) + .await + .context("inserting job")?; + Ok(row.get(0)) +} + +pub async fn claim_job(client: &mut Client) -> Result> { + let tx = client + .transaction() + .await + .context("starting claim transaction")?; + + let row = tx + .query_opt( + "SELECT id, table_name, config_path, interval_secs, max_concurrent_jobs, timeout_secs + FROM rustream_jobs + WHERE status = 'pending' AND next_run <= now() + ORDER BY next_run + FOR UPDATE SKIP LOCKED + LIMIT 1", + &[], + ) + .await + .context("querying pending job")?; + + let Some(r) = row else { + tx.rollback().await.ok(); + return Ok(None); + }; + let job = row_to_job(&r); + + tx.execute( + "UPDATE rustream_jobs + SET status = 'running', updated_at = now(), last_error = NULL + WHERE id = $1", + &[&job.id], + ) + .await + .context("marking job running")?; + tx.commit().await.context("committing claim")?; + + Ok(Some(job)) +} + +pub async fn complete_job( + client: &Client, + job_id: i32, + success: bool, + interval_secs: i64, + error: Option<&str>, +) -> Result<()> { + let status = if success { "pending" } else { "failed" }; + let next = Utc::now() + Duration::seconds(interval_secs); + + client + .execute( + "UPDATE rustream_jobs + SET status = $1, + next_run = $2, + last_run = now(), + last_error = $3, + updated_at = now() + WHERE id = $4", + &[&status, &next, &error, &job_id], + ) + .await + .context("updating job status")?; + Ok(()) +} + +pub async fn list_job_statuses(control_db_url: &str) -> Result> { + let client = connect(control_db_url).await?; + let rows = client + .query( + "SELECT id, table_name, status, next_run, last_run, last_error + FROM rustream_jobs + ORDER BY next_run ASC NULLS LAST", + &[], + ) + .await + .context("listing job statuses")?; + + let mut result = Vec::new(); + for r in rows { + result.push(JobStatus { + id: r.get(0), + table_name: r.get(1), + status: r.get(2), + next_run: r + .get::<_, Option>>(3) + .map(|dt| dt.to_rfc3339()), + last_run: r + .get::<_, Option>>(4) + .map(|dt| dt.to_rfc3339()), + last_error: r.get(5), + }); + } + Ok(result) +} + +pub async fn start_job_run(client: &Client, job_id: i32) -> Result { + let row = client + .query_one( + "INSERT INTO rustream_job_runs (job_id, status) + VALUES ($1, 'running') + RETURNING id", + &[&job_id], + ) + .await + .context("inserting job run")?; + Ok(row.get(0)) +} + +pub async fn finish_job_run( + client: &Client, + run_id: i32, + status: &str, + error: Option<&str>, + severity: Option<&str>, + metrics_json: Option<&str>, +) -> Result<()> { + client + .execute( + "UPDATE rustream_job_runs + SET status = $1, error = $2, severity = $3, metrics_json = $4, finished_at = now() + WHERE id = $5", + &[&status, &error, &severity, &metrics_json, &run_id], + ) + .await + .context("updating job run")?; + Ok(()) +} + +pub async fn force_run_job(client: &Client, job_id: i32) -> Result<()> { + client + .execute( + "UPDATE rustream_jobs + SET status = 'pending', next_run = now(), updated_at = now() + WHERE id = $1", + &[&job_id], + ) + .await + .context("forcing job run")?; + Ok(()) +} + +#[derive(Debug, Serialize)] +pub struct JobRun { + pub id: i32, + pub job_id: i32, + pub status: String, + pub started_at: String, + pub finished_at: Option, + pub error: Option, + pub severity: Option, + pub metrics_json: Option, +} + +pub async fn list_job_runs(control_db_url: &str, limit: i64) -> Result> { + let client = connect(control_db_url).await?; + let rows = client + .query( + "SELECT id, job_id, status, started_at, finished_at, error, severity, metrics_json + FROM rustream_job_runs + ORDER BY started_at DESC + LIMIT $1", + &[&limit], + ) + .await + .context("listing job runs")?; + + let mut out = Vec::new(); + for r in rows { + let started: chrono::DateTime = r.get(3); + let finished = r.get::<_, Option>>(4); + out.push(JobRun { + id: r.get(0), + job_id: r.get(1), + status: r.get(2), + started_at: started.to_rfc3339(), + finished_at: finished.map(|dt| dt.to_rfc3339()), + error: r.get(5), + severity: r.get(6), + metrics_json: r.get(7), + }); + } + Ok(out) +} + +#[derive(Debug, Serialize)] +pub struct JobSummary { + pub pending: i64, + pub running: i64, + pub failed: i64, + pub total: i64, +} + +pub async fn job_summary(control_db_url: &str) -> Result { + let client = connect(control_db_url).await?; + let rows = client + .query( + "SELECT status, count(*) FROM rustream_jobs GROUP BY status", + &[], + ) + .await + .context("summarizing jobs")?; + + let mut summary = JobSummary { + pending: 0, + running: 0, + failed: 0, + total: 0, + }; + for r in rows { + let status: String = r.get(0); + let count: i64 = r.get(1); + summary.total += count; + match status.to_ascii_lowercase().as_str() { + "pending" => summary.pending += count, + "running" => summary.running += count, + "failed" => summary.failed += count, + _ => {} + } + } + Ok(summary) +} + +fn row_to_job(row: &Row) -> Job { + Job { + id: row.get(0), + table_name: row.get(1), + config_path: row.get(2), + interval_secs: row.get(3), + max_concurrent_jobs: row.get(4), + timeout_secs: row.get(5), + } +} + +/// Run a single job by loading its config and overriding tables to the target table. +/// Returns local output path for the table if output is local. +pub async fn run_job(job: &Job) -> Result> { + let mut cfg = crate::config::load(&job.config_path)?; + + // Preserve the original table config if present; otherwise, synthesize a minimal entry. + let maybe_table = cfg + .tables + .unwrap_or_default() + .iter() + .find(|t| t.name == job.table_name) + .cloned(); + + cfg.tables = Some(vec![maybe_table.unwrap_or(TableConfig { + name: job.table_name.clone(), + schema: None, + columns: None, + incremental_column: None, + incremental_tiebreaker_column: None, + incremental_column_is_unique: false, + partition_by: None, + })]); + + let local_output = match &cfg.output { + Some(crate::config::OutputConfig::Local { path }) => { + Some(format!("{}/{}", path, job.table_name)) + } + _ => None, + }; + + crate::sync::run(cfg).await.map_err(|e| { + anyhow!( + "job {} (table {}): sync failed: {}", + job.id, + job.table_name, + e + ) + })?; + + Ok(local_output) +} diff --git a/src/main.rs b/src/main.rs index b57fb6d..8268835 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,17 +3,21 @@ mod config; mod iceberg; mod ingest; mod input; +mod jobs; mod output; mod pg_writer; mod reader; mod schema; mod state; +mod status_api; mod sync; mod types; mod writer; use anyhow::Result; use clap::{Parser, Subcommand}; +use tokio::process::Command; +use tokio::time::{timeout, Duration}; use tracing_subscriber::EnvFilter; #[derive(Parser)] @@ -46,6 +50,77 @@ enum Commands { #[arg(long)] dry_run: bool, }, + + /// Create the control-plane jobs table in Postgres + InitJobs { + /// Control-plane Postgres connection string + #[arg(long)] + control_db_url: String, + }, + + /// Enqueue a single table sync job + AddJob { + /// Control-plane Postgres connection string + #[arg(long)] + control_db_url: String, + + /// Table name to sync + #[arg(long)] + table: String, + + /// Path to rustream config for this job + #[arg(long)] + config: String, + + /// Interval between runs (seconds) + #[arg(long)] + interval_secs: Option, + + /// Max concurrent jobs allowed for this table (default 1) + #[arg(long)] + max_concurrent_jobs: Option, + + /// Kill job after this many seconds (default: no timeout) + #[arg(long)] + timeout_secs: Option, + }, + + /// Run the worker loop that claims and executes jobs + Worker { + /// Control-plane Postgres connection string + #[arg(long)] + control_db_url: String, + + /// Poll interval when no jobs are pending (seconds) + #[arg(long, default_value_t = 5)] + poll_seconds: u64, + + /// Maximum jobs to execute concurrently + #[arg(long, default_value_t = 2)] + max_concurrent: usize, + }, + + /// Force a job to run on the next worker poll + ForceJob { + /// Control-plane Postgres connection string + #[arg(long)] + control_db_url: String, + + /// Job ID + #[arg(long)] + job_id: i32, + }, + + /// Serve a minimal status API (experimental) + StatusApi { + /// Control-plane Postgres connection string + #[arg(long)] + control_db_url: String, + + /// Bind address, e.g. 0.0.0.0:8080 + #[arg(long, default_value = "0.0.0.0:8080")] + bind: String, + }, } #[tokio::main] @@ -79,7 +154,219 @@ async fn main() -> Result<()> { ingest::run(cfg).await?; } } + Commands::InitJobs { control_db_url } => { + let control_db_url = resolve_control_db_url(control_db_url)?; + let client = jobs::connect(&control_db_url).await?; + jobs::ensure_jobs_table(&client).await?; + tracing::info!("ensured rustream_jobs exists"); + } + Commands::AddJob { + control_db_url, + table, + config: config_path, + interval_secs, + max_concurrent_jobs, + timeout_secs, + } => { + let control_db_url = resolve_control_db_url(control_db_url)?; + let client = jobs::connect(&control_db_url).await?; + jobs::ensure_jobs_table(&client).await?; + let id = jobs::enqueue_job( + &client, + &table, + &config_path, + interval_secs, + max_concurrent_jobs, + timeout_secs, + ) + .await?; + tracing::info!(job_id = id, table = %table, "enqueued job"); + } + Commands::Worker { + control_db_url, + poll_seconds, + max_concurrent, + } => { + let control_db_url = resolve_control_db_url(control_db_url)?; + let client = jobs::connect(&control_db_url).await?; + jobs::ensure_jobs_table(&client).await?; + run_worker(client, poll_seconds, max_concurrent).await?; + } + Commands::StatusApi { + control_db_url, + bind, + } => { + let control_db_url = resolve_control_db_url(control_db_url)?; + let addr: std::net::SocketAddr = bind.parse()?; + status_api::serve(control_db_url, addr).await?; + } + Commands::ForceJob { + control_db_url, + job_id, + } => { + let control_db_url = resolve_control_db_url(control_db_url)?; + let client = jobs::connect(&control_db_url).await?; + jobs::ensure_jobs_table(&client).await?; + jobs::force_run_job(&client, job_id).await?; + tracing::info!(job_id, "job marked pending for immediate run"); + } } Ok(()) } + +async fn run_worker( + mut client: tokio_postgres::Client, + poll_seconds: u64, + max_concurrent: usize, +) -> Result<()> { + if max_concurrent > 1 { + tracing::warn!( + max_concurrent, + "multi-job concurrency not supported yet; running sequentially" + ); + } + + let poll = Duration::from_secs(poll_seconds); + + loop { + match jobs::claim_job(&mut client).await? { + Some(job) => { + tracing::info!(job_id = job.id, table = %job.table_name, "claimed job"); + if let Err(e) = process_job(&mut client, job).await { + tracing::error!(error = %e, "job execution failed"); + } + } + None => tokio::time::sleep(poll).await, + } + } +} + +async fn process_job(client: &mut tokio_postgres::Client, job: jobs::Job) -> Result<()> { + let interval = job.interval_secs as i64; + tracing::debug!( + job_id = job.id, + max_concurrent = job.max_concurrent_jobs, + timeout = job.timeout_secs, + "starting job" + ); + let run_id = jobs::start_job_run(client, job.id).await?; + + let sync_res = if job.timeout_secs > 0 { + match timeout( + Duration::from_secs(job.timeout_secs as u64), + jobs::run_job(&job), + ) + .await + { + Ok(r) => r, + Err(_) => Err(anyhow::anyhow!( + "job {} timed out after {}s", + job.id, + job.timeout_secs + )), + } + } else { + jobs::run_job(&job).await + }; + + let mut severity: Option = None; + let mut metrics_json: Option = None; + + let sync_output_path = sync_res.as_ref().ok().and_then(|p| p.clone()); + + let dq_res = if let Some(path) = sync_output_path { + if let Some(cmd) = dq_command_template() { + match run_dq_prof(&cmd, &path).await { + Ok((sev, metrics)) => { + severity = sev; + metrics_json = metrics; + Ok(()) + } + Err(e) => Err(e), + } + } else { + Ok(()) + } + } else { + Ok(()) + }; + + let final_res: Result<()> = match (sync_res, dq_res) { + (Err(e), _) => Err(e), + (_, Err(e)) => Err(e), + _ => Ok(()), + }; + + let error_str = final_res.as_ref().err().map(|e| format!("{e:?}")); + let success = final_res.is_ok(); + + jobs::complete_job(client, job.id, success, interval, error_str.as_deref()).await?; + jobs::finish_job_run( + client, + run_id, + if success { "ok" } else { "failed" }, + error_str.as_deref(), + severity.as_deref(), + metrics_json.as_deref(), + ) + .await?; + + if let Some(err) = error_str { + tracing::error!(job_id = job.id, error = %err, "job failed"); + } else { + tracing::info!(job_id = job.id, "job complete"); + } + + Ok(()) +} + +fn dq_command_template() -> Option { + match std::env::var("RUSTREAM_DQ_CMD") { + Ok(s) if !s.trim().is_empty() => Some(s), + _ => None, + } +} + +async fn run_dq_prof(cmd_template: &str, path: &str) -> Result<(Option, Option)> { + let cmd = cmd_template.replace("{path}", path); + let output = Command::new("sh").arg("-c").arg(&cmd).output().await?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + return Err(anyhow::anyhow!( + "dq-prof command failed (status {}): {}", + output.status, + stderr.trim() + )); + } + + let stdout = String::from_utf8_lossy(&output.stdout).trim().to_string(); + if stdout.is_empty() { + return Ok((None, None)); + } + + let severity = serde_json::from_str::(&stdout) + .ok() + .and_then(|v| { + v.get("severity") + .and_then(|s| s.as_str()) + .map(|s| s.to_string()) + }); + + Ok((severity, Some(stdout))) +} + +fn resolve_control_db_url(flag_value: String) -> Result { + if !flag_value.is_empty() { + return Ok(flag_value); + } + if let Ok(env_val) = std::env::var("RUSTREAM_CONTROL_DB_URL") { + if !env_val.is_empty() { + return Ok(env_val); + } + } + Err(anyhow::anyhow!( + "control-db-url is empty; set --control-db-url or RUSTREAM_CONTROL_DB_URL" + )) +} diff --git a/src/status_api.rs b/src/status_api.rs new file mode 100644 index 0000000..7fb6a55 --- /dev/null +++ b/src/status_api.rs @@ -0,0 +1,211 @@ +use std::net::SocketAddr; + +use anyhow::Result; +use axum::{ + extract::{Query, State}, + http::{header, StatusCode}, + response::{Redirect, Response}, + routing::get, + Json, Router, +}; +use serde::Deserialize; + +use crate::jobs; + +#[derive(Clone)] +struct AppState { + control_db_url: String, +} + +pub async fn serve(control_db_url: String, addr: SocketAddr) -> Result<()> { + // Ensure schema exists before serving requests (covers older control DBs without new columns). + let client = jobs::connect(&control_db_url).await?; + jobs::ensure_jobs_table(&client).await?; + + let state = AppState { control_db_url }; + let app = Router::new() + .route("/", get(root_redirect)) + .route("/jobs", get(list_jobs)) + .route("/jobs/html", get(list_jobs_html)) + .route("/jobs/summary", get(job_summary)) + .route("/logs", get(list_runs)) + .route("/health", get(health)) + .with_state(state); + + let listener = tokio::net::TcpListener::bind(addr).await?; + axum::serve(listener, app).await?; + Ok(()) +} + +async fn root_redirect() -> Redirect { + Redirect::temporary("/jobs/html") +} + +async fn list_jobs( + State(state): State, + Query(query): Query, +) -> Result>, StatusCode> { + let rows = fetch_jobs(&state.control_db_url, &query).await?; + Ok(Json(rows)) +} + +#[derive(Debug, Deserialize)] +pub struct JobsQuery { + pub status: Option, + pub refresh: Option, +} + +async fn list_jobs_html( + State(state): State, + Query(query): Query, +) -> Result { + let rows = fetch_jobs(&state.control_db_url, &query).await?; + let summary = jobs::job_summary(&state.control_db_url) + .await + .map_err(|e| { + tracing::error!(error = %e, "job summary for HTML failed"); + StatusCode::INTERNAL_SERVER_ERROR + })?; + + let refresh_secs = query.refresh.unwrap_or(10); + + let mut body = String::new(); + body.push_str(""); + if refresh_secs > 0 { + body.push_str(&format!( + "", + refresh_secs + )); + } + body.push_str( + "", + ); + + body.push_str("

rustream jobs

"); + body.push_str( + "", + ); + + body.push_str(&format!( + "
pending: {} · running: {} · failed: {} · total: {}
", + summary.pending, summary.running, summary.failed, summary.total + )); + + body.push_str("
"); + body.push_str(""); + body.push_str(&format!( + "", + refresh_secs + )); + body.push_str("
"); + + body.push_str(""); + for r in rows { + let status_class = format!("status-{}", r.status.to_ascii_lowercase()); + body.push_str(&format!( + "", + r.id, + r.table_name, + status_class, + r.status, + r.next_run.unwrap_or_else(|| "-".to_string()), + r.last_run.unwrap_or_else(|| "-".to_string()), + r.last_error.unwrap_or_else(|| "-".to_string()), + )); + } + body.push_str("
idtablestatusnext_runlast_runlast_error
{}{}{}{}{}{}
"); + + Ok(Response::builder() + .status(StatusCode::OK) + .header(header::CONTENT_TYPE, "text/html; charset=utf-8") + .body(body.into()) + .unwrap()) +} + +async fn fetch_jobs( + control_db_url: &str, + query: &JobsQuery, +) -> Result, StatusCode> { + let mut rows = jobs::list_job_statuses(control_db_url).await.map_err(|e| { + tracing::error!(error = %e, "listing job statuses"); + StatusCode::INTERNAL_SERVER_ERROR + })?; + + if let Some(ref s) = query.status { + let s_lower = s.to_ascii_lowercase(); + rows.retain(|r| r.status.to_ascii_lowercase() == s_lower); + } + + Ok(rows) +} + +async fn job_summary(State(state): State) -> Result, StatusCode> { + jobs::job_summary(&state.control_db_url) + .await + .map(Json) + .map_err(|e| { + tracing::error!(error = %e, "job summary failed"); + StatusCode::INTERNAL_SERVER_ERROR + }) +} + +#[derive(Debug, Deserialize)] +pub struct RunsQuery { + pub limit: Option, +} + +async fn list_runs( + State(state): State, + Query(query): Query, +) -> Result>, StatusCode> { + let limit = query.limit.unwrap_or(50); + jobs::list_job_runs(&state.control_db_url, limit) + .await + .map(Json) + .map_err(|e| { + tracing::error!(error = %e, "listing job runs failed"); + StatusCode::INTERNAL_SERVER_ERROR + }) +} + +async fn health() -> Response { + Response::builder() + .status(StatusCode::OK) + .header(header::CONTENT_TYPE, "text/plain; charset=utf-8") + .body("ok".into()) + .unwrap() +} From 0aa3f84232c26d3375c1b5e6fe2395748e353824 Mon Sep 17 00:00:00 2001 From: Maria Dubyaga Date: Tue, 24 Mar 2026 19:14:58 -0400 Subject: [PATCH 2/8] Add reset-state flag and validate stored progress --- README.md | 3 +++ src/main.rs | 6 ++++++ src/schema.rs | 17 +++++++++++++++++ src/sync.rs | 39 ++++++++++++++++++++++++++++++++++++++- 4 files changed, 64 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 5235c80..b777a94 100644 --- a/README.md +++ b/README.md @@ -42,6 +42,9 @@ rustream sync --config config.yaml --dry-run # Run sync rustream sync --config config.yaml + +# If you need to wipe saved watermarks/cursors (full resync) +rustream sync --config config.yaml --reset-state ``` ### Ingest (S3/local → Postgres) diff --git a/src/main.rs b/src/main.rs index 8268835..3012edd 100644 --- a/src/main.rs +++ b/src/main.rs @@ -38,6 +38,10 @@ enum Commands { /// Show what would be synced without actually doing it #[arg(long)] dry_run: bool, + + /// Clear saved watermark/cursor state before running + #[arg(long, default_value_t = false)] + reset_state: bool, }, /// Ingest Parquet/CSV files from local filesystem or S3 into Postgres @@ -135,8 +139,10 @@ async fn main() -> Result<()> { Commands::Sync { config: config_path, dry_run, + reset_state, } => { let cfg = config::load(&config_path)?; + sync::maybe_reset_state(&cfg, reset_state)?; if dry_run { sync::dry_run(cfg).await?; } else { diff --git a/src/schema.rs b/src/schema.rs index bed6471..2f95616 100644 --- a/src/schema.rs +++ b/src/schema.rs @@ -85,6 +85,23 @@ pub fn build_arrow_schema(columns: &[ColumnInfo]) -> Arc { Arc::new(Schema::new(fields)) } +/// Basic check that a stored string value can be parsed into the column's Postgres type. +pub fn value_fits_column(value: &str, column_name: &str, columns: &[ColumnInfo]) -> bool { + let Some(col) = columns.iter().find(|c| c.name == column_name) else { + return false; + }; + + match col.pg_type.as_str() { + "smallint" | "integer" | "bigint" => value.parse::().is_ok(), + "real" | "double precision" | "numeric" => value.parse::().is_ok(), + "boolean" => value.eq_ignore_ascii_case("true") || value.eq_ignore_ascii_case("false"), + "uuid" => uuid::Uuid::parse_str(value).is_ok(), + "timestamp without time zone" | "timestamp with time zone" => chrono::NaiveDateTime::parse_from_str(value, "%Y-%m-%d %H:%M:%S%.f").is_ok(), + "date" => chrono::NaiveDate::parse_from_str(value, "%Y-%m-%d").is_ok(), + _ => true, + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/sync.rs b/src/sync.rs index b4d602d..417e053 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -141,7 +141,7 @@ pub async fn run(config: Config) -> Result<()> { for table in &tables { if let Err(e) = sync_table(&client, &config, table, &state, iceberg_catalog.as_ref()).await { - tracing::error!(table = %table.full_name(), error = %e, "failed to sync table"); + tracing::error!(table = %table.full_name(), error = ?e, "failed to sync table"); } } @@ -149,6 +149,25 @@ pub async fn run(config: Config) -> Result<()> { Ok(()) } +/// Optionally clear all saved state (watermarks/cursors) before a run. +pub fn maybe_reset_state(config: &Config, reset_state: bool) -> Result<()> { + if !reset_state { + return Ok(()); + } + + let state_dir = config + .state_dir + .clone() + .unwrap_or_else(|| ".rustream_state".to_string()); + let path = std::path::Path::new(&state_dir).join("rustream_state.db"); + if path.exists() { + std::fs::remove_file(&path) + .with_context(|| format!("removing state db at {}", path.display()))?; + tracing::warn!("reset state: deleted {}", path.display()); + } + Ok(()) +} + async fn sync_table( client: &tokio_postgres::Client, config: &Config, @@ -240,6 +259,24 @@ async fn sync_table( None => (None, None), }; + // Clear invalid stored progress (e.g., cursor stored as text when column is int). + if let Some(wm) = &watermark_val { + if !schema::value_fits_column(wm, watermark_col.unwrap(), &columns) { + tracing::warn!(table = %table_name, value = %wm, "clearing invalid stored watermark"); + state.clear_progress(&table_name)?; + watermark_val = None; + cursor_val = None; + } + } + if let (Some(cur_col), Some(cur_val)) = (cursor_col, &cursor_val) { + if !schema::value_fits_column(cur_val, cur_col, &columns) { + tracing::warn!(table = %table_name, value = %cur_val, "clearing invalid stored cursor"); + state.clear_progress(&table_name)?; + watermark_val = None; + cursor_val = None; + } + } + let mut total_rows = 0u64; let mut batch_num = 0u32; // Collect batches for Iceberg (needs all batches for a single commit) From 1f3b853d864094753550a41d2fa3c475823bf2d2 Mon Sep 17 00:00:00 2001 From: Maria Dubyaga Date: Wed, 25 Mar 2026 13:02:35 -0400 Subject: [PATCH 3/8] Add reset-state CLI and force-run button in status UI --- src/main.rs | 15 +++++++++++++++ src/sync.rs | 21 +++++++++++++++++++++ 2 files changed, 36 insertions(+) diff --git a/src/main.rs b/src/main.rs index 3012edd..1a352d5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -125,6 +125,17 @@ enum Commands { #[arg(long, default_value = "0.0.0.0:8080")] bind: String, }, + + /// Reset stored watermarks/cursors (state) for all tables or one table + ResetState { + /// Optional table name to reset; if omitted resets all + #[arg(long)] + table: Option, + + /// State directory (default: .rustream_state) + #[arg(long)] + state_dir: Option, + }, } #[tokio::main] @@ -216,6 +227,10 @@ async fn main() -> Result<()> { jobs::force_run_job(&client, job_id).await?; tracing::info!(job_id, "job marked pending for immediate run"); } + + Commands::ResetState { table, state_dir } => { + sync::reset_state(state_dir, table)?; + } } Ok(()) diff --git a/src/sync.rs b/src/sync.rs index 417e053..d48b39c 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -168,6 +168,27 @@ pub fn maybe_reset_state(config: &Config, reset_state: bool) -> Result<()> { Ok(()) } +/// Reset state for all tables or a specific table. +pub fn reset_state(state_dir: Option, table: Option) -> Result<()> { + let dir = state_dir.unwrap_or_else(|| ".rustream_state".to_string()); + let path = std::path::Path::new(&dir).join("rustream_state.db"); + if !path.exists() { + tracing::info!(state_dir = %dir, "state db not found; nothing to reset"); + return Ok(()); + } + + if let Some(table_name) = table { + let store = StateStore::open(&dir)?; + store.clear_progress(&table_name)?; + tracing::info!(table = %table_name, "cleared state for table"); + } else { + std::fs::remove_file(&path) + .with_context(|| format!("removing state db at {}", path.display()))?; + tracing::info!(state_file = %path.display(), "cleared all state"); + } + Ok(()) +} + async fn sync_table( client: &tokio_postgres::Client, config: &Config, From 154c4731a68104db615c7f5c691c73b77671c8c8 Mon Sep 17 00:00:00 2001 From: Maria Dubyaga Date: Wed, 25 Mar 2026 13:40:08 -0400 Subject: [PATCH 4/8] Let reset-state clear all progress in place --- src/state.rs | 16 ++++++++++++++++ src/sync.rs | 4 ++-- 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/src/state.rs b/src/state.rs index 8a5b072..a432815 100644 --- a/src/state.rs +++ b/src/state.rs @@ -130,6 +130,22 @@ impl StateStore { Ok(()) } + + /// Clear stored watermark/cursor for a table (used when state is incompatible with schema). + pub fn clear_progress(&self, table_name: &str) -> Result<()> { + self.conn + .execute("DELETE FROM watermarks WHERE table_name = ?1", [table_name]) + .with_context(|| format!("clearing progress for {table_name}"))?; + Ok(()) + } + + /// Clear all progress rows. + pub fn clear_all_progress(&self) -> Result<()> { + self.conn + .execute("DELETE FROM watermarks", []) + .context("clearing all progress")?; + Ok(()) + } } #[cfg(test)] diff --git a/src/sync.rs b/src/sync.rs index d48b39c..6b23923 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -182,8 +182,8 @@ pub fn reset_state(state_dir: Option, table: Option) -> Result<( store.clear_progress(&table_name)?; tracing::info!(table = %table_name, "cleared state for table"); } else { - std::fs::remove_file(&path) - .with_context(|| format!("removing state db at {}", path.display()))?; + let store = StateStore::open(&dir)?; + store.clear_all_progress()?; tracing::info!(state_file = %path.display(), "cleared all state"); } Ok(()) From e447e34ba81febe536968935507f1be1817bf394 Mon Sep 17 00:00:00 2001 From: Maria Dubyaga Date: Thu, 26 Mar 2026 08:50:49 -0400 Subject: [PATCH 5/8] Add worker health endpoint and force-run UI action --- src/status_api.rs | 67 ++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 63 insertions(+), 4 deletions(-) diff --git a/src/status_api.rs b/src/status_api.rs index 7fb6a55..4fa28d0 100644 --- a/src/status_api.rs +++ b/src/status_api.rs @@ -2,10 +2,10 @@ use std::net::SocketAddr; use anyhow::Result; use axum::{ - extract::{Query, State}, + extract::{Form, Query, State}, http::{header, StatusCode}, response::{Redirect, Response}, - routing::get, + routing::{get, post}, Json, Router, }; use serde::Deserialize; @@ -29,6 +29,8 @@ pub async fn serve(control_db_url: String, addr: SocketAddr) -> Result<()> { .route("/jobs/html", get(list_jobs_html)) .route("/jobs/summary", get(job_summary)) .route("/logs", get(list_runs)) + .route("/jobs/force", post(force_job)) + .route("/health/worker", get(worker_health)) .route("/health", get(health)) .with_state(state); @@ -133,11 +135,14 @@ async fn list_jobs_html( )); body.push_str(""); - body.push_str(""); + body.push_str("
idtablestatusnext_runlast_runlast_error
"); for r in rows { let status_class = format!("status-{}", r.status.to_ascii_lowercase()); body.push_str(&format!( - "", + "\ + ", r.id, r.table_name, status_class, @@ -145,6 +150,7 @@ async fn list_jobs_html( r.next_run.unwrap_or_else(|| "-".to_string()), r.last_run.unwrap_or_else(|| "-".to_string()), r.last_error.unwrap_or_else(|| "-".to_string()), + r.id, )); } body.push_str("
idtablestatusnext_runlast_runlast_erroractions
{}{}{}{}{}{}
{}{}{}{}{}{}
\ + \ +
"); @@ -209,3 +215,56 @@ async fn health() -> Response { .body("ok".into()) .unwrap() } + +async fn worker_health(State(state): State) -> Result { + let client = jobs::connect(&state.control_db_url) + .await + .map_err(|e| { + tracing::error!(error = %e, "health db connect failed"); + StatusCode::SERVICE_UNAVAILABLE + })?; + + let last_run: Option> = client + .query_opt( + "SELECT finished_at FROM rustream_job_runs WHERE finished_at IS NOT NULL ORDER BY finished_at DESC LIMIT 1", + &[], + ) + .await + .map_err(|e| { + tracing::error!(error = %e, "health query failed"); + StatusCode::INTERNAL_SERVER_ERROR + })? + .and_then(|row| row.get::<_, Option>>(0)); + + let body = serde_json::json!({ + "db": "ok", + "last_run": last_run.map(|dt| dt.to_rfc3339()), + }); + + Ok(Response::builder() + .status(StatusCode::OK) + .header(header::CONTENT_TYPE, "application/json") + .body(serde_json::to_string(&body).unwrap().into()) + .unwrap()) +} + +#[derive(Debug, Deserialize)] +struct ForceJobForm { + id: i32, +} + +async fn force_job( + State(state): State, + Form(form): Form, +) -> Result { + let client = jobs::connect(&state.control_db_url) + .await + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; + jobs::ensure_jobs_table(&client) + .await + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; + jobs::force_run_job(&client, form.id) + .await + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; + Ok(Redirect::temporary("/jobs/html")) +} From a2e140120c58230840dc251647f6f4df423d541c Mon Sep 17 00:00:00 2001 From: Maria Dubyaga Date: Thu, 26 Mar 2026 09:49:23 -0400 Subject: [PATCH 6/8] Add retry UI and production template skeleton --- .github/workflows/publish-docker.yml | 38 +++++++++ README.md | 6 +- .../production-template/config.example.yaml | 17 ++++ .../production-template/docker-compose.yml | 57 ++++++++++++++ examples/production-template/helm/Chart.yaml | 6 ++ .../helm/templates/_helpers.tpl | 3 + .../helm/templates/deployment.yaml | 78 +++++++++++++++++++ .../helm/templates/service.yaml | 12 +++ examples/production-template/helm/values.yaml | 25 ++++++ .../production-template/terraform/main.tf | 72 +++++++++++++++++ src/jobs.rs | 25 ++++-- src/main.rs | 7 +- src/status_api.rs | 78 ++++++++++++++++++- 13 files changed, 414 insertions(+), 10 deletions(-) create mode 100644 .github/workflows/publish-docker.yml create mode 100644 examples/production-template/config.example.yaml create mode 100644 examples/production-template/docker-compose.yml create mode 100644 examples/production-template/helm/Chart.yaml create mode 100644 examples/production-template/helm/templates/_helpers.tpl create mode 100644 examples/production-template/helm/templates/deployment.yaml create mode 100644 examples/production-template/helm/templates/service.yaml create mode 100644 examples/production-template/helm/values.yaml create mode 100644 examples/production-template/terraform/main.tf diff --git a/.github/workflows/publish-docker.yml b/.github/workflows/publish-docker.yml new file mode 100644 index 0000000..d43970b --- /dev/null +++ b/.github/workflows/publish-docker.yml @@ -0,0 +1,38 @@ +name: Publish Docker image + +on: + push: + tags: + - "v*" + workflow_dispatch: + +jobs: + docker: + runs-on: ubuntu-latest + permissions: + contents: read + packages: write + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Set up QEMU + uses: docker/setup-qemu-action@v3 + + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v3 + + - name: Login to GHCR + uses: docker/login-action@v3 + with: + registry: ghcr.io + username: ${{ github.actor }} + password: ${{ secrets.GITHUB_TOKEN }} + + - name: Build and push + uses: docker/build-push-action@v5 + with: + context: . + push: true + tags: ghcr.io/${{ github.repository_owner }}/rustream:latest + platforms: linux/amd64,linux/arm64 diff --git a/README.md b/README.md index b777a94..fa59b57 100644 --- a/README.md +++ b/README.md @@ -45,6 +45,9 @@ rustream sync --config config.yaml # If you need to wipe saved watermarks/cursors (full resync) rustream sync --config config.yaml --reset-state + +# Reset state for one table +rustream reset-state --table users ``` ### Ingest (S3/local → Postgres) @@ -86,7 +89,8 @@ rustream status-api --control-db-url "$CONTROL_DB_URL" --bind 0.0.0.0:8080 # RUSTREAM_CONTROL_DB_URL=postgres://user:pass@host:5432/db rustream worker # status endpoints: # /jobs (json, optional ?status=pending), /jobs/html (auto-refresh + filter), -# /jobs/summary, /logs?limit=50, /health +# /jobs/summary, /logs?limit=50, /health, /health/worker +# UI buttons: force run, retry failed, reset state # optional: run a data-quality command on each local output table # (use {path} placeholder for the Parquet directory) diff --git a/examples/production-template/config.example.yaml b/examples/production-template/config.example.yaml new file mode 100644 index 0000000..13494ab --- /dev/null +++ b/examples/production-template/config.example.yaml @@ -0,0 +1,17 @@ +postgres: + host: postgres + database: demo + user: demo + password: demo + +output: + type: s3 + bucket: rustream-demo + prefix: raw/postgres + region: us-east-1 + endpoint: http://minio:9000 + +tables: + - name: users + incremental_column: updated_at + incremental_tiebreaker_column: id diff --git a/examples/production-template/docker-compose.yml b/examples/production-template/docker-compose.yml new file mode 100644 index 0000000..6a293bc --- /dev/null +++ b/examples/production-template/docker-compose.yml @@ -0,0 +1,57 @@ +version: "3.9" + +services: + postgres: + image: postgres:16 + environment: + POSTGRES_USER: demo + POSTGRES_PASSWORD: demo + POSTGRES_DB: demo + ports: + - "5432:5432" + + minio: + image: quay.io/minio/minio + command: server /data --console-address ":9001" + environment: + MINIO_ACCESS_KEY: minioadmin + MINIO_SECRET_KEY: minioadmin + ports: + - "9000:9000" + - "9001:9001" + volumes: + - minio-data:/data + + rustream-worker: + build: ../.. + depends_on: + - postgres + - minio + environment: + RUSTREAM_CONTROL_DB_URL: postgres://demo:demo@postgres:5432/demo + RUSTREAM_S3_ENDPOINT: http://minio:9000 + AWS_ACCESS_KEY_ID: minioadmin + AWS_SECRET_ACCESS_KEY: minioadmin + RUST_LOG: rustream=info + command: > + sh -c " + rustream init-jobs --control-db-url $$RUSTREAM_CONTROL_DB_URL && + rustream add-job --control-db-url $$RUSTREAM_CONTROL_DB_URL --table users --config /config/config.yaml --interval-secs 300 && + rustream worker --control-db-url $$RUSTREAM_CONTROL_DB_URL --poll-seconds 5 + " + volumes: + - ./config.yaml:/config/config.yaml:ro + + rustream-status: + build: ../.. + depends_on: + - postgres + environment: + RUSTREAM_CONTROL_DB_URL: postgres://demo:demo@postgres:5432/demo + RUST_LOG: rustream=info + ports: + - "8080:8080" + command: rustream status-api --control-db-url ${RUSTREAM_CONTROL_DB_URL:-postgres://demo:demo@postgres:5432/demo} --bind 0.0.0.0:8080 + +volumes: + minio-data: diff --git a/examples/production-template/helm/Chart.yaml b/examples/production-template/helm/Chart.yaml new file mode 100644 index 0000000..a313fb1 --- /dev/null +++ b/examples/production-template/helm/Chart.yaml @@ -0,0 +1,6 @@ +apiVersion: v2 +name: rustream +description: Minimal chart to run rustream worker and status API +type: application +version: 0.1.0 +appVersion: "0.2.0" diff --git a/examples/production-template/helm/templates/_helpers.tpl b/examples/production-template/helm/templates/_helpers.tpl new file mode 100644 index 0000000..6e58403 --- /dev/null +++ b/examples/production-template/helm/templates/_helpers.tpl @@ -0,0 +1,3 @@ +{{- define "rustream.fullname" -}} +{{- printf "%s" .Release.Name -}} +{{- end -}} diff --git a/examples/production-template/helm/templates/deployment.yaml b/examples/production-template/helm/templates/deployment.yaml new file mode 100644 index 0000000..e9d5ea8 --- /dev/null +++ b/examples/production-template/helm/templates/deployment.yaml @@ -0,0 +1,78 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: {{ include "rustream.fullname" . }}-worker + labels: + app: rustream +spec: + replicas: 1 + selector: + matchLabels: + app: rustream + template: + metadata: + labels: + app: rustream + spec: + containers: + - name: worker + image: "{{ .Values.image.repository }}:{{ .Values.image.tag }}" + imagePullPolicy: {{ .Values.image.pullPolicy }} + args: + - worker + - --control-db-url + - {{ .Values.controlDbUrl | quote }} + - --poll-seconds + - "{{ .Values.worker.pollSeconds }}" + env: + {{- range .Values.env }} + - name: {{ .name }} + value: {{ .value | quote }} + {{- end }} + resources: + {{- toYaml .Values.resources | nindent 12 }} + volumeMounts: + - name: state + mountPath: {{ .Values.stateDir }} + volumes: + - name: state + emptyDir: {} + +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: {{ include "rustream.fullname" . }}-status + labels: + app: rustream-status +spec: + replicas: 1 + selector: + matchLabels: + app: rustream-status + template: + metadata: + labels: + app: rustream-status + spec: + containers: + - name: status-api + image: "{{ .Values.image.repository }}:{{ .Values.image.tag }}" + imagePullPolicy: {{ .Values.image.pullPolicy }} + args: + - status-api + - --control-db-url + - {{ .Values.controlDbUrl | quote }} + - --bind + - "0.0.0.0:{{ .Values.statusApi.port }}" + - --state-dir + - {{ .Values.stateDir | quote }} + ports: + - containerPort: {{ .Values.statusApi.port }} + env: + {{- range .Values.env }} + - name: {{ .name }} + value: {{ .value | quote }} + {{- end }} + resources: + {{- toYaml .Values.resources | nindent 12 }} diff --git a/examples/production-template/helm/templates/service.yaml b/examples/production-template/helm/templates/service.yaml new file mode 100644 index 0000000..b6ede8a --- /dev/null +++ b/examples/production-template/helm/templates/service.yaml @@ -0,0 +1,12 @@ +apiVersion: v1 +kind: Service +metadata: + name: {{ include "rustream.fullname" . }}-status +spec: + selector: + app: rustream-status + ports: + - port: 80 + targetPort: {{ .Values.statusApi.port }} + protocol: TCP + name: http diff --git a/examples/production-template/helm/values.yaml b/examples/production-template/helm/values.yaml new file mode 100644 index 0000000..0fe93de --- /dev/null +++ b/examples/production-template/helm/values.yaml @@ -0,0 +1,25 @@ +image: + repository: ghcr.io/kraftaa/rustream + tag: latest + pullPolicy: IfNotPresent + +controlDbUrl: "postgres://demo:demo@postgres:5432/demo" +stateDir: "/var/lib/rustream" + +resources: + requests: + cpu: 100m + memory: 256Mi + limits: + cpu: 500m + memory: 512Mi + +worker: + enabled: true + pollSeconds: 5 + +statusApi: + enabled: true + port: 8080 + +env: [] diff --git a/examples/production-template/terraform/main.tf b/examples/production-template/terraform/main.tf new file mode 100644 index 0000000..f41d592 --- /dev/null +++ b/examples/production-template/terraform/main.tf @@ -0,0 +1,72 @@ +# Minimal skeleton for a rustream deployment on AWS. +# Creates S3 bucket + IAM role/policy for ECS task or instance profile. + +terraform { + required_version = ">= 1.5.0" + required_providers { + aws = { + source = "hashicorp/aws" + version = "~> 5.0" + } + } +} + +provider "aws" { + region = var.region +} + +resource "aws_s3_bucket" "rustream" { + bucket = var.bucket_name + force_destroy = true +} + +data "aws_iam_policy_document" "rustream" { + statement { + actions = ["s3:*"] + resources = [aws_s3_bucket_rustream.arn, "${aws_s3_bucket_rustream.arn}/*"] + } +} + +resource "aws_iam_role" "rustream" { + name = "${var.name_prefix}-role" + assume_role_policy = data.aws_iam_policy_document.assume.json +} + +data "aws_iam_policy_document" "assume" { + statement { + actions = ["sts:AssumeRole"] + principals { + type = "Service" + identifiers = ["ecs-tasks.amazonaws.com"] + } + } +} + +resource "aws_iam_policy" "rustream" { + name = "${var.name_prefix}-policy" + policy = data.aws_iam_policy_document.rustream.json +} + +resource "aws_iam_role_policy_attachment" "rustream" { + role = aws_iam_role.rustream.name + policy_arn = aws_iam_policy.rustream.arn +} + +output "bucket" { + value = aws_s3_bucket.rustream.bucket +} + +variable "region" { + type = string + default = "us-east-1" +} + +variable "bucket_name" { + type = string + default = "rustream-demo-bucket" +} + +variable "name_prefix" { + type = string + default = "rustream-demo" +} diff --git a/src/jobs.rs b/src/jobs.rs index 3c94583..8a614e5 100644 --- a/src/jobs.rs +++ b/src/jobs.rs @@ -86,6 +86,7 @@ pub async fn ensure_jobs_table(client: &Client) -> Result<()> { job_id INTEGER NOT NULL REFERENCES rustream_jobs(id) ON DELETE CASCADE, started_at TIMESTAMPTZ NOT NULL DEFAULT now(), finished_at TIMESTAMPTZ, + duration_ms BIGINT, status TEXT NOT NULL, error TEXT, severity TEXT, @@ -111,6 +112,13 @@ pub async fn ensure_jobs_table(client: &Client) -> Result<()> { ) .await .context("adding metrics_json column to rustream_job_runs")?; + client + .execute( + "ALTER TABLE rustream_job_runs ADD COLUMN IF NOT EXISTS duration_ms BIGINT", + &[], + ) + .await + .context("adding duration_ms column to rustream_job_runs")?; Ok(()) } @@ -256,7 +264,12 @@ pub async fn finish_job_run( client .execute( "UPDATE rustream_job_runs - SET status = $1, error = $2, severity = $3, metrics_json = $4, finished_at = now() + SET status = $1, + error = $2, + severity = $3, + metrics_json = $4, + finished_at = now(), + duration_ms = EXTRACT(EPOCH FROM (now() - started_at))*1000 WHERE id = $5", &[&status, &error, &severity, &metrics_json, &run_id], ) @@ -285,6 +298,7 @@ pub struct JobRun { pub status: String, pub started_at: String, pub finished_at: Option, + pub duration_ms: Option, pub error: Option, pub severity: Option, pub metrics_json: Option, @@ -294,7 +308,7 @@ pub async fn list_job_runs(control_db_url: &str, limit: i64) -> Result Result, }, /// Reset stored watermarks/cursors (state) for all tables or one table @@ -212,10 +216,11 @@ async fn main() -> Result<()> { Commands::StatusApi { control_db_url, bind, + state_dir, } => { let control_db_url = resolve_control_db_url(control_db_url)?; let addr: std::net::SocketAddr = bind.parse()?; - status_api::serve(control_db_url, addr).await?; + status_api::serve(control_db_url, addr, state_dir).await?; } Commands::ForceJob { control_db_url, diff --git a/src/status_api.rs b/src/status_api.rs index 4fa28d0..557c23b 100644 --- a/src/status_api.rs +++ b/src/status_api.rs @@ -11,18 +11,23 @@ use axum::{ use serde::Deserialize; use crate::jobs; +use crate::sync; #[derive(Clone)] struct AppState { control_db_url: String, + state_dir: String, } -pub async fn serve(control_db_url: String, addr: SocketAddr) -> Result<()> { +pub async fn serve(control_db_url: String, addr: SocketAddr, state_dir: Option) -> Result<()> { // Ensure schema exists before serving requests (covers older control DBs without new columns). let client = jobs::connect(&control_db_url).await?; jobs::ensure_jobs_table(&client).await?; - let state = AppState { control_db_url }; + let state = AppState { + control_db_url, + state_dir: state_dir.unwrap_or_else(|| ".rustream_state".to_string()), + }; let app = Router::new() .route("/", get(root_redirect)) .route("/jobs", get(list_jobs)) @@ -30,6 +35,8 @@ pub async fn serve(control_db_url: String, addr: SocketAddr) -> Result<()> { .route("/jobs/summary", get(job_summary)) .route("/logs", get(list_runs)) .route("/jobs/force", post(force_job)) + .route("/jobs/retry", post(retry_job)) + .route("/state/reset", post(reset_state)) .route("/health/worker", get(worker_health)) .route("/health", get(health)) .with_state(state); @@ -142,7 +149,9 @@ async fn list_jobs_html( "{}{}{}{}{}{}\
\ \ -
", + \ + {}\ + ", r.id, r.table_name, status_class, @@ -151,6 +160,38 @@ async fn list_jobs_html( r.last_run.unwrap_or_else(|| "-".to_string()), r.last_error.unwrap_or_else(|| "-".to_string()), r.id, + if r.status.eq_ignore_ascii_case("failed") { + format!( + "
+ +
", + r.id + ) + } else { + String::new() + }, + )); + } + body.push_str(""); + // Recent runs table + let runs = jobs::list_job_runs(&state.control_db_url, 20) + .await + .map_err(|e| { + tracing::error!(error = %e, "listing runs for HTML failed"); + StatusCode::INTERNAL_SERVER_ERROR + })?; + body.push_str("

Recent runs

"); + for r in runs { + body.push_str(&format!( + "", + r.id, + r.job_id, + r.status, + r.duration_ms.map(|d| d.to_string()).unwrap_or_else(|| "-".to_string()), + r.severity.unwrap_or_else(|| "-".to_string()), + r.error.unwrap_or_else(|| "-".to_string()), + r.started_at, + r.finished_at.unwrap_or_else(|| "-".to_string()), )); } body.push_str("
run_idjob_idstatusduration_msseverityerrorstarted_atfinished_at
{}{}{}{}{}{}{}{}
"); @@ -268,3 +309,34 @@ async fn force_job( .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; Ok(Redirect::temporary("/jobs/html")) } + +async fn retry_job( + State(state): State, + Form(form): Form, +) -> Result { + // same as force; could add status check later + let client = jobs::connect(&state.control_db_url) + .await + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; + jobs::ensure_jobs_table(&client) + .await + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; + jobs::force_run_job(&client, form.id) + .await + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; + Ok(Redirect::temporary("/jobs/html")) +} + +#[derive(Debug, Deserialize)] +struct ResetForm { + table: Option, +} + +async fn reset_state( + State(state): State, + Form(form): Form, +) -> Result { + sync::reset_state(Some(state.state_dir.clone()), form.table.clone()) + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; + Ok(Redirect::temporary("/jobs/html")) +} From 04fc1c40dd4158f1e2ec62acb59531a8718b63f5 Mon Sep 17 00:00:00 2001 From: Maria Dubyaga Date: Thu, 26 Mar 2026 12:55:47 -0400 Subject: [PATCH 7/8] Add production template README and reset examples --- README.md | 11 +++++++ examples/production-template/README.md | 40 ++++++++++++++++++++++++++ 2 files changed, 51 insertions(+) create mode 100644 examples/production-template/README.md diff --git a/README.md b/README.md index fa59b57..07b5c8c 100644 --- a/README.md +++ b/README.md @@ -92,11 +92,22 @@ rustream status-api --control-db-url "$CONTROL_DB_URL" --bind 0.0.0.0:8080 # /jobs/summary, /logs?limit=50, /health, /health/worker # UI buttons: force run, retry failed, reset state +# reset all state (CLI) +rustream reset-state --state-dir .rustream_state + +# reset one table +rustream reset-state --table users + # optional: run a data-quality command on each local output table # (use {path} placeholder for the Parquet directory) RUSTREAM_DQ_CMD="dq-prof --input {path}" rustream worker --control-db-url "$CONTROL_DB_URL" ``` +### Production templates +- Local: `examples/production-template/docker-compose.yml` +- K8s: `examples/production-template/helm` +- AWS: `examples/production-template/terraform` + ## Configuration ### Specific tables (recommended) diff --git a/examples/production-template/README.md b/examples/production-template/README.md new file mode 100644 index 0000000..8f36b8f --- /dev/null +++ b/examples/production-template/README.md @@ -0,0 +1,40 @@ +# rustream production template (minimal skeleton) + +Quick ways to try rustream end-to-end without Kafka: + +## Local (docker-compose) +``` +cd examples/production-template +cp config.example.yaml config.yaml # edit Postgres/S3/MinIO creds if needed +docker compose up --build +# worker + status API come up; status UI on http://localhost:8080/jobs/html +``` + +Services: +- Postgres (demo/demo) +- MinIO (admin/admin) at http://localhost:9001 +- rustream worker (polls jobs table) +- rustream status API (port 8080) + +## Kubernetes (Helm skeleton) +``` +cd examples/production-template/helm +helm install rustream . \ + --set controlDbUrl=postgres://user:pass@host:5432/db \ + --set image.repository=ghcr.io/yourorg/rustream \ + --set image.tag=latest +``` +Notes: +- Chart is minimal: no Secrets/IAM wired yet. Add IRSA/kiam annotations and env for AWS creds. +- Mount your `config.yaml` via a ConfigMap/Secret and add a bootstrap Job that runs `rustream add-job`. + +## AWS Terraform stub +``` +cd examples/production-template/terraform +terraform init +terraform apply -var bucket_name=rustream-demo-bucket +``` +Creates S3 bucket + IAM role/policy for ECS tasks. You still need to wire ECS service/task defs to run rustream. + +## GHCR Docker image +A GitHub Actions workflow builds/pushes `ghcr.io//rustream:latest` on tag pushes (`.github/workflows/publish-docker.yml`). Customize tags as needed. From e697c02c96d1d73621bb6f2ffe4a4902db7a4620 Mon Sep 17 00:00:00 2001 From: Maria Dubyaga Date: Sat, 28 Mar 2026 11:47:36 -0400 Subject: [PATCH 8/8] Harden status UI and production templates --- examples/production-template/README.md | 2 +- .../production-template/docker-compose.yml | 4 +- .../helm/templates/deployment.yaml | 4 ++ .../helm/templates/service.yaml | 2 + .../production-template/terraform/main.tf | 2 +- src/config.rs | 4 ++ src/jobs.rs | 2 +- src/output.rs | 29 ++++++++++- src/status_api.rs | 49 ++++++++++++++----- 9 files changed, 79 insertions(+), 19 deletions(-) diff --git a/examples/production-template/README.md b/examples/production-template/README.md index 8f36b8f..ac2fb57 100644 --- a/examples/production-template/README.md +++ b/examples/production-template/README.md @@ -12,7 +12,7 @@ docker compose up --build Services: - Postgres (demo/demo) -- MinIO (admin/admin) at http://localhost:9001 +- MinIO (minioadmin/minioadmin) at http://localhost:9001 - rustream worker (polls jobs table) - rustream status API (port 8080) diff --git a/examples/production-template/docker-compose.yml b/examples/production-template/docker-compose.yml index 6a293bc..9e4520c 100644 --- a/examples/production-template/docker-compose.yml +++ b/examples/production-template/docker-compose.yml @@ -14,8 +14,8 @@ services: image: quay.io/minio/minio command: server /data --console-address ":9001" environment: - MINIO_ACCESS_KEY: minioadmin - MINIO_SECRET_KEY: minioadmin + MINIO_ROOT_USER: minioadmin + MINIO_ROOT_PASSWORD: minioadmin ports: - "9000:9000" - "9001:9001" diff --git a/examples/production-template/helm/templates/deployment.yaml b/examples/production-template/helm/templates/deployment.yaml index e9d5ea8..653a28d 100644 --- a/examples/production-template/helm/templates/deployment.yaml +++ b/examples/production-template/helm/templates/deployment.yaml @@ -1,3 +1,4 @@ +{{- if .Values.worker.enabled }} apiVersion: apps/v1 kind: Deployment metadata: @@ -37,7 +38,9 @@ spec: volumes: - name: state emptyDir: {} +{{- end }} +{{- if .Values.statusApi.enabled }} --- apiVersion: apps/v1 kind: Deployment @@ -76,3 +79,4 @@ spec: {{- end }} resources: {{- toYaml .Values.resources | nindent 12 }} +{{- end }} diff --git a/examples/production-template/helm/templates/service.yaml b/examples/production-template/helm/templates/service.yaml index b6ede8a..0c26fee 100644 --- a/examples/production-template/helm/templates/service.yaml +++ b/examples/production-template/helm/templates/service.yaml @@ -1,3 +1,4 @@ +{{- if .Values.statusApi.enabled }} apiVersion: v1 kind: Service metadata: @@ -10,3 +11,4 @@ spec: targetPort: {{ .Values.statusApi.port }} protocol: TCP name: http +{{- end }} diff --git a/examples/production-template/terraform/main.tf b/examples/production-template/terraform/main.tf index f41d592..9d9d2ea 100644 --- a/examples/production-template/terraform/main.tf +++ b/examples/production-template/terraform/main.tf @@ -23,7 +23,7 @@ resource "aws_s3_bucket" "rustream" { data "aws_iam_policy_document" "rustream" { statement { actions = ["s3:*"] - resources = [aws_s3_bucket_rustream.arn, "${aws_s3_bucket_rustream.arn}/*"] + resources = [aws_s3_bucket.rustream.arn, "${aws_s3_bucket.rustream.arn}/*"] } } diff --git a/src/config.rs b/src/config.rs index 17c78b6..3563d82 100644 --- a/src/config.rs +++ b/src/config.rs @@ -73,6 +73,8 @@ pub enum OutputConfig { prefix: String, #[serde(default)] region: Option, + #[serde(default)] + endpoint: Option, }, } @@ -337,10 +339,12 @@ exclude: bucket, prefix, region, + endpoint, } => { assert_eq!(bucket, "my-bucket"); assert_eq!(prefix, "raw/pg"); assert_eq!(region.as_deref(), Some("us-west-2")); + assert_eq!(endpoint, None); } _ => panic!("expected S3 output"), } diff --git a/src/jobs.rs b/src/jobs.rs index 8a614e5..715b6d8 100644 --- a/src/jobs.rs +++ b/src/jobs.rs @@ -269,7 +269,7 @@ pub async fn finish_job_run( severity = $3, metrics_json = $4, finished_at = now(), - duration_ms = EXTRACT(EPOCH FROM (now() - started_at))*1000 + duration_ms = (EXTRACT(EPOCH FROM (now() - started_at))*1000)::BIGINT WHERE id = $5", &[&status, &error, &severity, &metrics_json, &run_id], ) diff --git a/src/output.rs b/src/output.rs index 3ee622a..e3e5c94 100644 --- a/src/output.rs +++ b/src/output.rs @@ -11,7 +11,18 @@ pub async fn write_output(config: &OutputConfig, key: &str, data: Vec) -> Re bucket, prefix, region, - } => write_s3(bucket, prefix, region.as_deref(), key, data).await, + endpoint, + } => { + write_s3( + bucket, + prefix, + region.as_deref(), + endpoint.as_deref(), + key, + data, + ) + .await + } } } @@ -36,6 +47,7 @@ async fn write_s3( bucket: &str, prefix: &str, region: Option<&str>, + endpoint: Option<&str>, key: &str, data: Vec, ) -> Result<()> { @@ -44,7 +56,20 @@ async fn write_s3( config_loader = config_loader.region(aws_config::Region::new(r.to_string())); } let aws_config = config_loader.load().await; - let client = aws_sdk_s3::Client::new(&aws_config); + + let endpoint_override = endpoint + .map(ToOwned::to_owned) + .or_else(|| std::env::var("RUSTREAM_S3_ENDPOINT").ok()); + + let client = if let Some(endpoint_url) = endpoint_override { + let s3_cfg = aws_sdk_s3::config::Builder::from(&aws_config) + .endpoint_url(endpoint_url) + .force_path_style(true) + .build(); + aws_sdk_s3::Client::from_conf(s3_cfg) + } else { + aws_sdk_s3::Client::new(&aws_config) + }; let s3_key = format!("{}/{}", prefix.trim_end_matches('/'), key); diff --git a/src/status_api.rs b/src/status_api.rs index 557c23b..52ce74b 100644 --- a/src/status_api.rs +++ b/src/status_api.rs @@ -117,6 +117,10 @@ async fn list_jobs_html( "
pending: {} · running: {} · failed: {} · total: {}
", summary.pending, summary.running, summary.failed, summary.total )); + body.push_str( + "
+
", + ); body.push_str("
"); body.push_str("