From 1c86f489563d5a86e8025696f2060ebbb8f8b21b Mon Sep 17 00:00:00 2001 From: Taariq Lewis <701864+taariq@users.noreply.github.com> Date: Fri, 12 Dec 2025 14:19:29 -0800 Subject: [PATCH 1/5] [sync-sqlite] add watcher client and incremental command --- Cargo.lock | 316 ++++++++++++++++++- Cargo.toml | 10 + README-SQLite.md | 18 ++ sqlite-watcher/Cargo.toml | 32 ++ sqlite-watcher/build.rs | 8 + sqlite-watcher/proto/watcher.proto | 45 +++ sqlite-watcher/src/lib.rs | 5 + sqlite-watcher/src/queue.rs | 224 ++++++++++++++ sqlite-watcher/src/server.rs | 283 +++++++++++++++++ sqlite-watcher/tests/server_tests.rs | 108 +++++++ src/commands/mod.rs | 1 + src/commands/sync_sqlite.rs | 434 +++++++++++++++++++++++++++ src/jsonb/writer.rs | 45 ++- src/main.rs | 35 +++ 14 files changed, 1556 insertions(+), 8 deletions(-) create mode 100644 sqlite-watcher/Cargo.toml create mode 100644 sqlite-watcher/build.rs create mode 100644 sqlite-watcher/proto/watcher.proto create mode 100644 sqlite-watcher/src/lib.rs create mode 100644 sqlite-watcher/src/queue.rs create mode 100644 sqlite-watcher/src/server.rs create mode 100644 sqlite-watcher/tests/server_tests.rs create mode 100644 src/commands/sync_sqlite.rs diff --git a/Cargo.lock b/Cargo.lock index eed6140..c1e550c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -118,6 +118,28 @@ version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50" +[[package]] +name = "async-stream" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b5a71a6f37880a80d1d7f19efd781e4b5de42c88f0722cc13bcb6cc2cfe8476" +dependencies = [ + "async-stream-impl", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.108", +] + [[package]] name = "async-trait" version = "0.1.89" @@ -135,6 +157,51 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8" +[[package]] +name = "axum" +version = "0.6.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b829e4e32b91e643de6eafe82b1d90675f5874230191a4ffbc1b336dec4d6bf" +dependencies = [ + "async-trait", + "axum-core", + "bitflags 1.3.2", + "bytes", + "futures-util", + "http", + "http-body", + "hyper", + "itoa", + "matchit", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "rustversion", + "serde", + "sync_wrapper", + "tower", + "tower-layer", + "tower-service", +] + +[[package]] +name = "axum-core" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "759fa577a247914fd3f7f76d62972792636412fbfd634cd452f6a385a74d2d2c" +dependencies = [ + "async-trait", + "bytes", + "futures-util", + "http", + "http-body", + "mime", + "rustversion", + "tower-layer", + "tower-service", +] + [[package]] name = "base64" version = "0.21.7" @@ -169,7 +236,7 @@ dependencies = [ "bitflags 2.10.0", "cexpr", "clang-sys", - "itertools", + "itertools 0.13.0", "proc-macro2", "quote", "regex", @@ -246,7 +313,7 @@ dependencies = [ "getrandom 0.2.16", "getrandom 0.3.4", "hex", - "indexmap", + "indexmap 2.12.0", "js-sys", "once_cell", "rand 0.9.2", @@ -713,10 +780,13 @@ dependencies = [ "serde", "serde_json", "sha2", + "sqlite-watcher", "tempfile", "tokio", "tokio-postgres", "toml", + "tonic", + "tower", "tracing", "tracing-subscriber", "url", @@ -917,6 +987,12 @@ version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "52051878f80a721bb68ebfbc930e07b65ba72f2da88968ea5c06fd6ca3d3a127" +[[package]] +name = "fixedbitset" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80" + [[package]] name = "flate2" version = "1.1.5" @@ -1188,7 +1264,7 @@ dependencies = [ "futures-sink", "futures-util", "http", - "indexmap", + "indexmap 2.12.0", "slab", "tokio", "tokio-util", @@ -1374,6 +1450,18 @@ dependencies = [ "want", ] +[[package]] +name = "hyper-timeout" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1" +dependencies = [ + "hyper", + "pin-project-lite", + "tokio", + "tokio-io-timeout", +] + [[package]] name = "hyper-tls" version = "0.5.0" @@ -1519,6 +1607,16 @@ dependencies = [ "icu_properties", ] +[[package]] +name = "indexmap" +version = "1.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd070e393353796e801d209ad339e89596eb4c8d430d18ede6a1cced8fafbd99" +dependencies = [ + "autocfg", + "hashbrown 0.12.3", +] + [[package]] name = "indexmap" version = "2.12.0" @@ -1583,6 +1681,15 @@ version = "1.70.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a6cb138bb79a146c1bd460005623e142ef0181e3d0219cb493e02f7d08a35695" +[[package]] +name = "itertools" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba291022dbbd398a455acf126c1e341954079855bc60dfdda641363bd6922569" +dependencies = [ + "either", +] + [[package]] name = "itertools" version = "0.13.0" @@ -1624,7 +1731,7 @@ version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4ee7893dab2e44ae5f9d0173f26ff4aa327c10b01b06a72b52dd9405b628640d" dependencies = [ - "indexmap", + "indexmap 2.12.0", ] [[package]] @@ -1786,6 +1893,12 @@ dependencies = [ "regex-automata", ] +[[package]] +name = "matchit" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" + [[package]] name = "md-5" version = "0.10.6" @@ -1941,6 +2054,12 @@ dependencies = [ "syn 2.0.108", ] +[[package]] +name = "multimap" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d87ecb2933e8aeadb3e3a02b828fed80a7528047e68b4f424523a0981a3a084" + [[package]] name = "mysql-common-derive" version = "0.31.2" @@ -2222,6 +2341,16 @@ version = "2.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b4f627cb1b25917193a259e49bdad08f671f8d9708acfd5fe0a8c1455d87220" +[[package]] +name = "petgraph" +version = "0.6.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b4c5cc86750666a3ed20bdaf5ca2a0344f9c67674cae0515bec2da16fbaa47db" +dependencies = [ + "fixedbitset", + "indexmap 2.12.0", +] + [[package]] name = "phf" version = "0.13.1" @@ -2354,6 +2483,16 @@ dependencies = [ "zerocopy", ] +[[package]] +name = "prettyplease" +version = "0.2.37" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "479ca8adacdd7ce8f1fb39ce9ecccbfe93a3f1344b3d0d97f20bc0196208f62b" +dependencies = [ + "proc-macro2", + "syn 2.0.108", +] + [[package]] name = "proc-macro-crate" version = "3.4.0" @@ -2394,6 +2533,59 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "prost" +version = "0.12.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "deb1435c188b76130da55f17a466d252ff7b1418b2ad3e037d127b94e3411f29" +dependencies = [ + "bytes", + "prost-derive", +] + +[[package]] +name = "prost-build" +version = "0.12.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22505a5c94da8e3b7c2996394d1c933236c4d743e81a410bcca4e6989fc066a4" +dependencies = [ + "bytes", + "heck", + "itertools 0.12.1", + "log", + "multimap", + "once_cell", + "petgraph", + "prettyplease", + "prost", + "prost-types", + "regex", + "syn 2.0.108", + "tempfile", +] + +[[package]] +name = "prost-derive" +version = "0.12.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81bddcdb20abf9501610992b6759a4c888aef7d1a7247ef75e2404275ac24af1" +dependencies = [ + "anyhow", + "itertools 0.12.1", + "proc-macro2", + "quote", + "syn 2.0.108", +] + +[[package]] +name = "prost-types" +version = "0.12.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9091c90b0a32608e984ff2fa4091273cbdd755d54935c51d520887f4a1dbd5b0" +dependencies = [ + "prost", +] + [[package]] name = "ptr_meta" version = "0.1.4" @@ -2648,6 +2840,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a78046161564f5e7cd9008aff3b2990b3850dc8e0349119b98e8f251e099f24d" dependencies = [ "bitflags 2.10.0", + "chrono", "fallible-iterator 0.3.0", "fallible-streaming-iterator", "hashlink", @@ -2881,7 +3074,7 @@ version = "1.0.145" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "402a6f66d8c709116cf22f558eab210f5a50187f702eb4d7e5ef38d9a7f1c79c" dependencies = [ - "indexmap", + "indexmap 2.12.0", "itoa", "memchr", "ryu", @@ -3055,6 +3248,27 @@ dependencies = [ "windows-sys 0.60.2", ] +[[package]] +name = "sqlite-watcher" +version = "0.1.0" +dependencies = [ + "anyhow", + "base64 0.21.7", + "clap", + "dirs", + "prost", + "rusqlite", + "serde", + "serde_json", + "tempfile", + "thiserror 1.0.69", + "tokio", + "tokio-stream", + "tonic", + "tonic-build", + "tower", +] + [[package]] name = "stable_deref_trait" version = "1.2.1" @@ -3331,6 +3545,16 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "tokio-io-timeout" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0bd86198d9ee903fedd2f9a2e72014287c0d9167e4ae43b5853007205dda1b76" +dependencies = [ + "pin-project-lite", + "tokio", +] + [[package]] name = "tokio-macros" version = "2.6.0" @@ -3388,6 +3612,17 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-stream" +version = "0.1.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eca58d7bba4a75707817a2c44174253f9236b2d5fbd055602e9d5c07c139a047" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", +] + [[package]] name = "tokio-util" version = "0.7.17" @@ -3438,7 +3673,7 @@ version = "0.22.27" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "41fe8c660ae4257887cf66394862d21dbca4a6ddd26f04a3560410406a2f819a" dependencies = [ - "indexmap", + "indexmap 2.12.0", "serde", "serde_spanned", "toml_datetime 0.6.11", @@ -3452,7 +3687,7 @@ version = "0.23.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6485ef6d0d9b5d0ec17244ff7eb05310113c3f316f2d14200d4de56b3cb98f8d" dependencies = [ - "indexmap", + "indexmap 2.12.0", "toml_datetime 0.7.3", "toml_parser", "winnow", @@ -3473,6 +3708,72 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5d99f8c9a7727884afe522e9bd5edbfc91a3312b36a77b5fb8926e4c31a41801" +[[package]] +name = "tonic" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76c4eb7a4e9ef9d4763600161f12f5070b92a578e1b634db88a6887844c91a13" +dependencies = [ + "async-stream", + "async-trait", + "axum", + "base64 0.21.7", + "bytes", + "h2", + "http", + "http-body", + "hyper", + "hyper-timeout", + "percent-encoding", + "pin-project", + "prost", + "tokio", + "tokio-stream", + "tower", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tonic-build" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be4ef6dd70a610078cb4e338a0f79d06bc759ff1b22d2120c2ff02ae264ba9c2" +dependencies = [ + "prettyplease", + "proc-macro2", + "prost-build", + "quote", + "syn 2.0.108", +] + +[[package]] +name = "tower" +version = "0.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" +dependencies = [ + "futures-core", + "futures-util", + "indexmap 1.9.3", + "pin-project", + "pin-project-lite", + "rand 0.8.5", + "slab", + "tokio", + "tokio-util", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tower-layer" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "121c2a6cda46980bb0fcd1647ffaf6cd3fc79a013de288782836f6df9c48780e" + [[package]] name = "tower-service" version = "0.3.3" @@ -3485,6 +3786,7 @@ version = "0.1.41" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "784e0ac535deb450455cbfa28a6f0df145ea1bb7ae51b821cf5e7927fdcfbdd0" dependencies = [ + "log", "pin-project-lite", "tracing-attributes", "tracing-core", diff --git a/Cargo.toml b/Cargo.toml index 72b4e75..eca54e4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,3 +1,10 @@ +[workspace] +resolver = "2" +members = [ + ".", + "sqlite-watcher", +] + [package] name = "database-replicator" version = "7.0.14" @@ -44,6 +51,9 @@ url = "2.5" chrono = { version = "0.4", default-features = false, features = ["clock", "serde"] } libc = "0.2" rust_decimal = { version = "1.39", features = ["db-tokio-postgres"] } +tonic = { version = "0.11", features = ["transport"] } +tower = "0.4" +sqlite-watcher = { path = "sqlite-watcher" } [target.'cfg(unix)'.dependencies] daemonize = "0.5" diff --git a/README-SQLite.md b/README-SQLite.md index a01b44d..3acf38c 100644 --- a/README-SQLite.md +++ b/README-SQLite.md @@ -596,3 +596,21 @@ No. The tool uses `SQLITE_OPEN_READ_ONLY` which allows concurrent readers. Other For issues or questions: - **GitHub Issues**: https://github.com/serenorg/database-replicator/issues - **Email**: support@seren.ai +## Incremental sqlite-watcher sync + +After running the baseline snapshot (`init --source sqlite ...`), start the sqlite-watcher service alongside your database and consume its change feed with the new `sync-sqlite` command: + +```bash +sqlite-watcher --db /path/to/app.db --listen unix:/tmp/sqlite-watcher.sock & + +database-replicator sync-sqlite \ + --target "postgresql://user:pass@your-serendb.serendb.com:5432/app" \ + --watcher-endpoint unix:/tmp/sqlite-watcher.sock \ + --incremental-mode append +``` + +- `--incremental-mode append_deduped` maintains `_latest` tables (one row per primary key) in addition to the raw append-only tables. +- The command refuses to run unless `sqlite_sync_state` shows a completed baseline snapshot so we always have a safe starting point. +- Tokens are read from `~/.seren/sqlite-watcher/token` by default; pass `--token-file` to override. + +The command connects to sqlite-watcher over TCP or Unix sockets, pulls change batches, applies them to the JSONB tables via the existing writer helpers, updates the `sqlite_sync_state` checkpoint table, and acknowledges progress back to the watcher. diff --git a/sqlite-watcher/Cargo.toml b/sqlite-watcher/Cargo.toml new file mode 100644 index 0000000..b81704e --- /dev/null +++ b/sqlite-watcher/Cargo.toml @@ -0,0 +1,32 @@ +[package] +name = "sqlite-watcher" +version = "0.1.0" +edition = "2021" +authors = ["SerenAI "] +description = "SQLite watcher components (queue + gRPC)." +license = "Apache-2.0" +repository = "https://github.com/serenorg/database-replicator" +build = "build.rs" + +[build-dependencies] +tonic-build = "0.11" + +[dependencies] +anyhow = "1.0" +clap = { version = "4.4", features = ["derive", "env"] } +dirs = "5.0" +rusqlite = { version = "0.30", features = ["chrono"] } +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +thiserror = "1.0" +base64 = "0.21" +tokio = { version = "1.35", features = ["rt-multi-thread", "macros", "signal", "fs"] } +tonic = { version = "0.11", features = ["transport"] } +tokio-stream = { version = "0.1", features = ["net"] } +prost = "0.12" + +[dev-dependencies] +tempfile = "3.8" +tokio = { version = "1.35", features = ["rt", "macros"] } +tonic = { version = "0.11", features = ["transport"] } +tower = "0.4" diff --git a/sqlite-watcher/build.rs b/sqlite-watcher/build.rs new file mode 100644 index 0000000..a38290d --- /dev/null +++ b/sqlite-watcher/build.rs @@ -0,0 +1,8 @@ +fn main() -> Result<(), Box> { + tonic_build::configure() + .build_client(true) + .build_server(true) + .compile(&["proto/watcher.proto"], &["proto"])?; + println!("cargo:rerun-if-changed=proto/watcher.proto"); + Ok(()) +} diff --git a/sqlite-watcher/proto/watcher.proto b/sqlite-watcher/proto/watcher.proto new file mode 100644 index 0000000..2424406 --- /dev/null +++ b/sqlite-watcher/proto/watcher.proto @@ -0,0 +1,45 @@ +syntax = "proto3"; + +package sqlitewatcher; + +message HealthCheckRequest {} +message HealthCheckResponse { string status = 1; } + +message ListChangesRequest { uint32 limit = 1; } +message Change { + int64 change_id = 1; + string table_name = 2; + string op = 3; + string primary_key = 4; + bytes payload = 5; + string wal_frame = 6; + string cursor = 7; +} +message ListChangesResponse { repeated Change changes = 1; } + +message AckChangesRequest { int64 up_to_change_id = 1; } +message AckChangesResponse { uint64 acknowledged = 1; } + +message GetStateRequest { string table_name = 1; } +message GetStateResponse { + bool exists = 1; + int64 last_change_id = 2; + string last_wal_frame = 3; + string cursor = 4; +} + +message SetStateRequest { + string table_name = 1; + int64 last_change_id = 2; + string last_wal_frame = 3; + string cursor = 4; +} +message SetStateResponse {} + +service Watcher { + rpc HealthCheck(HealthCheckRequest) returns (HealthCheckResponse); + rpc ListChanges(ListChangesRequest) returns (ListChangesResponse); + rpc AckChanges(AckChangesRequest) returns (AckChangesResponse); + rpc GetState(GetStateRequest) returns (GetStateResponse); + rpc SetState(SetStateRequest) returns (SetStateResponse); +} diff --git a/sqlite-watcher/src/lib.rs b/sqlite-watcher/src/lib.rs new file mode 100644 index 0000000..b24ec66 --- /dev/null +++ b/sqlite-watcher/src/lib.rs @@ -0,0 +1,5 @@ +pub mod queue; +pub mod server; +pub mod watcher_proto { + tonic::include_proto!("sqlitewatcher"); +} diff --git a/sqlite-watcher/src/queue.rs b/sqlite-watcher/src/queue.rs new file mode 100644 index 0000000..b38e3db --- /dev/null +++ b/sqlite-watcher/src/queue.rs @@ -0,0 +1,224 @@ +use std::fs; +use std::path::{Path, PathBuf}; + +use anyhow::{anyhow, Context, Result}; +use rusqlite::{params, Connection, OptionalExtension, Row}; + +const SCHEMA: &str = r#" +CREATE TABLE IF NOT EXISTS changes ( + change_id INTEGER PRIMARY KEY AUTOINCREMENT, + table_name TEXT NOT NULL, + op TEXT NOT NULL, + id TEXT NOT NULL, + payload BLOB, + wal_frame TEXT, + cursor TEXT, + created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + acked INTEGER NOT NULL DEFAULT 0 +); + +CREATE TABLE IF NOT EXISTS state ( + table_name TEXT PRIMARY KEY, + last_change_id INTEGER NOT NULL DEFAULT 0, + last_wal_frame TEXT, + cursor TEXT, + updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP +); +"#; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ChangeOperation { + Insert, + Update, + Delete, +} + +impl ChangeOperation { + pub fn as_str(&self) -> &'static str { + match self { + ChangeOperation::Insert => "insert", + ChangeOperation::Update => "update", + ChangeOperation::Delete => "delete", + } + } + + fn from_str(value: &str) -> Result { + match value { + "insert" => Ok(ChangeOperation::Insert), + "update" => Ok(ChangeOperation::Update), + "delete" => Ok(ChangeOperation::Delete), + other => Err(anyhow!("unknown change operation '{other}'")), + } + } +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct NewChange { + pub table_name: String, + pub operation: ChangeOperation, + pub primary_key: String, + pub payload: Option>, + pub wal_frame: Option, + pub cursor: Option, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct ChangeRecord { + pub change_id: i64, + pub table_name: String, + pub operation: ChangeOperation, + pub primary_key: String, + pub payload: Option>, + pub wal_frame: Option, + pub cursor: Option, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct QueueState { + pub table_name: String, + pub last_change_id: i64, + pub last_wal_frame: Option, + pub cursor: Option, +} + +pub struct ChangeQueue { + path: PathBuf, + conn: Connection, +} + +impl ChangeQueue { + pub fn open(path: impl AsRef) -> Result { + let path = path.as_ref(); + if let Some(parent) = path.parent() { + fs::create_dir_all(parent).with_context(|| { + format!("failed to create queue directory {}", parent.display()) + })?; + #[cfg(unix)] + enforce_dir_perms(parent)?; + } + let conn = Connection::open(path) + .with_context(|| format!("failed to open queue database {}", path.display()))?; + conn.pragma_update(None, "journal_mode", &"wal").ok(); + conn.pragma_update(None, "synchronous", &"normal").ok(); + conn.execute_batch(SCHEMA) + .context("failed to initialize change queue schema")?; + Ok(Self { + path: path.to_path_buf(), + conn, + }) + } + + pub fn enqueue(&self, change: &NewChange) -> Result { + self.conn.execute( + "INSERT INTO changes(table_name, op, id, payload, wal_frame, cursor) + VALUES (?1, ?2, ?3, ?4, ?5, ?6)", + params![ + change.table_name, + change.operation.as_str(), + change.primary_key, + change.payload, + change.wal_frame, + change.cursor, + ], + )?; + Ok(self.conn.last_insert_rowid()) + } + + pub fn fetch_batch(&self, limit: usize) -> Result> { + let mut stmt = self.conn.prepare( + "SELECT change_id, table_name, op, id, payload, wal_frame, cursor + FROM changes WHERE acked = 0 ORDER BY change_id ASC LIMIT ?1", + )?; + let mut rows = stmt.query([limit as i64])?; + let mut results = Vec::new(); + while let Some(row) = rows.next()? { + results.push(row_to_change(&row)?); + } + Ok(results) + } + + pub fn ack_up_to(&self, change_id: i64) -> Result { + let updated = self.conn.execute( + "UPDATE changes SET acked = 1 WHERE change_id <= ?1", + [change_id], + )?; + Ok(updated as u64) + } + + pub fn purge_acked(&self) -> Result { + let deleted = self + .conn + .execute("DELETE FROM changes WHERE acked = 1", [])?; + Ok(deleted as u64) + } + + pub fn get_state(&self, table: &str) -> Result> { + self.conn + .prepare( + "SELECT table_name, last_change_id, last_wal_frame, cursor + FROM state WHERE table_name = ?1", + )? + .query_row([table], |row| { + Ok(QueueState { + table_name: row.get(0)?, + last_change_id: row.get(1)?, + last_wal_frame: row.get(2)?, + cursor: row.get(3)?, + }) + }) + .optional() + .map_err(Into::into) + } + + pub fn set_state(&self, state: &QueueState) -> Result<()> { + self.conn.execute( + "INSERT INTO state(table_name, last_change_id, last_wal_frame, cursor, updated_at) + VALUES (?1, ?2, ?3, ?4, CURRENT_TIMESTAMP) + ON CONFLICT(table_name) DO UPDATE SET + last_change_id = excluded.last_change_id, + last_wal_frame = excluded.last_wal_frame, + cursor = excluded.cursor, + updated_at = CURRENT_TIMESTAMP", + params![ + state.table_name, + state.last_change_id, + state.last_wal_frame, + state.cursor, + ], + )?; + Ok(()) + } + + pub fn path(&self) -> &Path { + &self.path + } +} + +fn row_to_change(row: &Row<'_>) -> Result { + let op_str: String = row.get(2)?; + Ok(ChangeRecord { + change_id: row.get(0)?, + table_name: row.get(1)?, + operation: ChangeOperation::from_str(&op_str)?, + primary_key: row.get(3)?, + payload: row.get(4)?, + wal_frame: row.get(5)?, + cursor: row.get(6)?, + }) +} + +#[cfg(unix)] +fn enforce_dir_perms(path: &Path) -> Result<()> { + use std::os::unix::fs::PermissionsExt; + + let metadata = fs::metadata(path)?; + let mut perms = metadata.permissions(); + perms.set_mode(0o700); + fs::set_permissions(path, perms)?; + Ok(()) +} + +#[cfg(not(unix))] +fn enforce_dir_perms(_path: &Path) -> Result<()> { + Ok(()) +} diff --git a/sqlite-watcher/src/server.rs b/sqlite-watcher/src/server.rs new file mode 100644 index 0000000..8ac6344 --- /dev/null +++ b/sqlite-watcher/src/server.rs @@ -0,0 +1,283 @@ +use std::net::SocketAddr; +use std::path::{Path, PathBuf}; +use std::sync::Arc; +use std::thread::{self, JoinHandle}; + +use anyhow::{Context, Result}; +use tokio::runtime::Builder; +use tokio::sync::oneshot; +use tokio_stream::wrappers::TcpListenerStream; +use tonic::service::Interceptor; +use tonic::transport::Server; +use tonic::{Request, Response, Status}; + +#[cfg(unix)] +use tokio::net::UnixListener; +#[cfg(unix)] +use tokio_stream::wrappers::UnixListenerStream; + +use crate::queue::{ChangeQueue, QueueState}; +use crate::watcher_proto::watcher_server::{Watcher, WatcherServer}; +use crate::watcher_proto::{ + AckChangesRequest, AckChangesResponse, Change, GetStateRequest, GetStateResponse, + HealthCheckRequest, HealthCheckResponse, ListChangesRequest, ListChangesResponse, + SetStateRequest, SetStateResponse, +}; + +pub enum ServerHandle { + Tcp { + shutdown: Option>, + thread: Option>>, + }, + #[cfg(unix)] + Unix { + shutdown: Option>, + thread: Option>>, + path: PathBuf, + }, +} + +impl Drop for ServerHandle { + fn drop(&mut self) { + match self { + ServerHandle::Tcp { shutdown, thread } => { + if let Some(tx) = shutdown.take() { + let _ = tx.send(()); + } + if let Some(handle) = thread.take() { + let _ = handle.join(); + } + } + #[cfg(unix)] + ServerHandle::Unix { + shutdown, + thread, + path, + } => { + if let Some(tx) = shutdown.take() { + let _ = tx.send(()); + } + if let Some(handle) = thread.take() { + let _ = handle.join(); + } + let _ = std::fs::remove_file(path); + } + } + } +} + +pub fn spawn_tcp(addr: SocketAddr, queue_path: PathBuf, token: String) -> Result { + let (shutdown_tx, shutdown_rx) = oneshot::channel(); + let thread = thread::spawn(move || -> Result<()> { + let runtime = Builder::new_multi_thread().enable_all().build()?; + runtime.block_on(async move { + let listener = tokio::net::TcpListener::bind(addr) + .await + .context("failed to bind tcp listener")?; + let queue_path = Arc::new(queue_path); + let svc = WatcherService::new(queue_path); + let interceptor = AuthInterceptor::new(token); + Server::builder() + .add_service(WatcherServer::with_interceptor(svc, interceptor)) + .serve_with_incoming_shutdown(TcpListenerStream::new(listener), async move { + let _ = shutdown_rx.await; + }) + .await + .context("grpc server exited") + }) + }); + + Ok(ServerHandle::Tcp { + shutdown: Some(shutdown_tx), + thread: Some(thread), + }) +} + +#[cfg(unix)] +pub fn spawn_unix(path: &Path, queue_path: PathBuf, token: String) -> Result { + if path.exists() { + std::fs::remove_file(path) + .with_context(|| format!("failed to remove stale unix socket {}", path.display()))?; + } + if let Some(parent) = path.parent() { + std::fs::create_dir_all(parent).with_context(|| { + format!( + "failed to create unix socket directory {}", + parent.display() + ) + })?; + } + let path_buf = path.to_path_buf(); + let (shutdown_tx, shutdown_rx) = oneshot::channel(); + let listener_path = path_buf.clone(); + let thread = thread::spawn(move || -> Result<()> { + let runtime = Builder::new_multi_thread().enable_all().build()?; + runtime.block_on(async move { + let listener = + UnixListener::bind(&listener_path).context("failed to bind unix socket")?; + let queue_path = Arc::new(queue_path); + let svc = WatcherService::new(queue_path); + let interceptor = AuthInterceptor::new(token); + Server::builder() + .add_service(WatcherServer::with_interceptor(svc, interceptor)) + .serve_with_incoming_shutdown(UnixListenerStream::new(listener), async move { + let _ = shutdown_rx.await; + }) + .await + .context("grpc server exited") + }) + }); + + Ok(ServerHandle::Unix { + shutdown: Some(shutdown_tx), + thread: Some(thread), + path: path_buf, + }) +} + +#[derive(Clone)] +struct WatcherService { + queue_path: Arc, +} + +impl WatcherService { + fn new(queue_path: Arc) -> Self { + Self { queue_path } + } + + fn queue(&self) -> Result { + ChangeQueue::open(&*self.queue_path) + } +} + +#[derive(Clone)] +struct AuthInterceptor { + token: Arc, +} + +impl AuthInterceptor { + fn new(token: String) -> Self { + Self { + token: Arc::new(token), + } + } +} + +impl Interceptor for AuthInterceptor { + fn call(&mut self, req: Request<()>) -> Result, Status> { + let provided = req + .metadata() + .get("authorization") + .ok_or_else(|| Status::unauthenticated("missing authorization header"))?; + let expected = format!("Bearer {}", self.token.as_str()); + if provided == expected.as_str() { + Ok(req) + } else { + Err(Status::unauthenticated("invalid authorization header")) + } + } +} + +#[tonic::async_trait] +impl Watcher for WatcherService { + async fn health_check( + &self, + _: Request, + ) -> Result, Status> { + Ok(Response::new(HealthCheckResponse { + status: "ok".to_string(), + })) + } + + async fn list_changes( + &self, + request: Request, + ) -> Result, Status> { + let limit = request.get_ref().limit.max(1).min(10_000) as usize; + let queue = self.queue().map_err(internal_err)?; + let rows = queue.fetch_batch(limit).map_err(internal_err)?; + let changes = rows.into_iter().map(change_to_proto).collect(); + Ok(Response::new(ListChangesResponse { changes })) + } + + async fn ack_changes( + &self, + request: Request, + ) -> Result, Status> { + let upto = request.get_ref().up_to_change_id; + let queue = self.queue().map_err(internal_err)?; + let count = queue.ack_up_to(upto).map_err(internal_err)?; + queue.purge_acked().ok(); + Ok(Response::new(AckChangesResponse { + acknowledged: count, + })) + } + + async fn get_state( + &self, + request: Request, + ) -> Result, Status> { + let queue = self.queue().map_err(internal_err)?; + let state = queue + .get_state(&request.get_ref().table_name) + .map_err(internal_err)?; + let resp = match state { + Some(state) => GetStateResponse { + exists: true, + last_change_id: state.last_change_id, + last_wal_frame: state.last_wal_frame.unwrap_or_default(), + cursor: state.cursor.unwrap_or_default(), + }, + None => GetStateResponse { + exists: false, + last_change_id: 0, + last_wal_frame: String::new(), + cursor: String::new(), + }, + }; + Ok(Response::new(resp)) + } + + async fn set_state( + &self, + request: Request, + ) -> Result, Status> { + let payload = request.into_inner(); + if payload.table_name.is_empty() { + return Err(Status::invalid_argument("table_name is required")); + } + let queue = self.queue().map_err(internal_err)?; + let state = QueueState { + table_name: payload.table_name, + last_change_id: payload.last_change_id, + last_wal_frame: if payload.last_wal_frame.is_empty() { + None + } else { + Some(payload.last_wal_frame) + }, + cursor: if payload.cursor.is_empty() { + None + } else { + Some(payload.cursor) + }, + }; + queue.set_state(&state).map_err(internal_err)?; + Ok(Response::new(SetStateResponse {})) + } +} + +fn change_to_proto(row: crate::queue::ChangeRecord) -> Change { + Change { + change_id: row.change_id, + table_name: row.table_name, + op: row.operation.as_str().to_string(), + primary_key: row.primary_key, + payload: row.payload.unwrap_or_default(), + wal_frame: row.wal_frame.unwrap_or_default(), + cursor: row.cursor.unwrap_or_default(), + } +} + +fn internal_err(err: anyhow::Error) -> Status { + Status::internal(err.to_string()) +} diff --git a/sqlite-watcher/tests/server_tests.rs b/sqlite-watcher/tests/server_tests.rs new file mode 100644 index 0000000..2515678 --- /dev/null +++ b/sqlite-watcher/tests/server_tests.rs @@ -0,0 +1,108 @@ +use std::net::SocketAddr; +use std::time::Duration; + +use sqlite_watcher::queue::{ChangeOperation, ChangeQueue, NewChange}; +use sqlite_watcher::server::spawn_tcp; +use sqlite_watcher::watcher_proto::watcher_client::WatcherClient; +use sqlite_watcher::watcher_proto::{AckChangesRequest, HealthCheckRequest, ListChangesRequest}; +use tempfile::tempdir; +use tokio::time::sleep; +use tonic::metadata::MetadataValue; + +fn seed_queue(path: &str) { + let queue = ChangeQueue::open(path).unwrap(); + for i in 0..2 { + let change = NewChange { + table_name: "examples".into(), + operation: ChangeOperation::Insert, + primary_key: format!("row-{i}"), + payload: None, + wal_frame: None, + cursor: None, + }; + queue.enqueue(&change).unwrap(); + } +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn tcp_server_handles_health_and_list() { + let dir = tempdir().unwrap(); + let queue_path = dir.path().join("queue.db"); + seed_queue(queue_path.to_str().unwrap()); + + let addr: SocketAddr = "127.0.0.1:56060".parse().unwrap(); + let token = "secret".to_string(); + let _handle = spawn_tcp(addr, queue_path, token.clone()).unwrap(); + sleep(Duration::from_millis(200)).await; + + let channel = tonic::transport::Channel::from_shared(format!("http://{}", addr)) + .unwrap() + .connect() + .await + .unwrap(); + let mut client = WatcherClient::new(channel); + + let mut health_req = tonic::Request::new(HealthCheckRequest {}); + let header = MetadataValue::try_from(format!("Bearer {}", token)).unwrap(); + health_req + .metadata_mut() + .insert("authorization", header.clone()); + client.health_check(health_req).await.unwrap(); + + let mut list_req = tonic::Request::new(ListChangesRequest { limit: 10 }); + list_req.metadata_mut().insert("authorization", header); + let resp = client.list_changes(list_req).await.unwrap(); + assert_eq!(resp.into_inner().changes.len(), 2); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn unauthenticated_requests_fail() { + let dir = tempdir().unwrap(); + let queue_path = dir.path().join("queue.db"); + let addr: SocketAddr = "127.0.0.1:56061".parse().unwrap(); + let token = "secret".to_string(); + let _handle = spawn_tcp(addr, queue_path, token).unwrap(); + sleep(Duration::from_millis(200)).await; + + let channel = tonic::transport::Channel::from_shared(format!("http://{}", addr)) + .unwrap() + .connect() + .await + .unwrap(); + let mut client = WatcherClient::new(channel); + + let request = tonic::Request::new(ListChangesRequest { limit: 1 }); + let err = client.list_changes(request).await.unwrap_err(); + assert_eq!(err.code(), tonic::Code::Unauthenticated); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn ack_changes_advances_queue() { + let dir = tempdir().unwrap(); + let queue_path = dir.path().join("queue.db"); + seed_queue(queue_path.to_str().unwrap()); + let addr: SocketAddr = "127.0.0.1:56062".parse().unwrap(); + let token = "secret".to_string(); + let _handle = spawn_tcp(addr, queue_path, token.clone()).unwrap(); + sleep(Duration::from_millis(200)).await; + + let channel = tonic::transport::Channel::from_shared(format!("http://{}", addr)) + .unwrap() + .connect() + .await + .unwrap(); + let mut client = WatcherClient::new(channel); + let header = MetadataValue::try_from(format!("Bearer {}", token)).unwrap(); + + let mut req = tonic::Request::new(ListChangesRequest { limit: 10 }); + req.metadata_mut().insert("authorization", header.clone()); + let resp = client.list_changes(req).await.unwrap().into_inner(); + assert_eq!(resp.changes.len(), 2); + let highest = resp.changes.last().unwrap().change_id; + + let mut ack_req = tonic::Request::new(AckChangesRequest { + up_to_change_id: highest, + }); + ack_req.metadata_mut().insert("authorization", header); + client.ack_changes(ack_req).await.unwrap(); +} diff --git a/src/commands/mod.rs b/src/commands/mod.rs index 86c34f4..b11969e 100644 --- a/src/commands/mod.rs +++ b/src/commands/mod.rs @@ -4,6 +4,7 @@ pub mod init; pub mod status; pub mod sync; +pub mod sync_sqlite; pub mod target; pub mod validate; pub mod verify; diff --git a/src/commands/sync_sqlite.rs b/src/commands/sync_sqlite.rs new file mode 100644 index 0000000..ca320c2 --- /dev/null +++ b/src/commands/sync_sqlite.rs @@ -0,0 +1,434 @@ +use std::collections::HashMap; +use std::path::{Path, PathBuf}; +use std::time::Duration; + +use anyhow::{anyhow, bail, Context, Result}; +use clap::ValueEnum; +use sqlite_watcher::watcher_proto::watcher_client::WatcherClient; +use sqlite_watcher::watcher_proto::{ + AckChangesRequest, GetStateRequest, HealthCheckRequest, ListChangesRequest, SetStateRequest, +}; +use tokio_postgres::Client; +use tonic::codegen::InterceptedService; +use tonic::service::Interceptor; +use tonic::transport::{Channel, Endpoint}; +use tonic::{Request, Status}; +use tower::service_fn; + +use crate::jsonb::writer::{delete_jsonb_rows, insert_jsonb_batch, upsert_jsonb_rows}; + +const DEFAULT_BATCH_LIMIT: u32 = 500; +const GLOBAL_STATE_KEY: &str = "_global"; + +#[derive(Debug, Clone, Copy, ValueEnum, PartialEq, Eq)] +pub enum IncrementalMode { + Append, + AppendDeduped, +} + +pub struct SyncSqliteOptions { + pub target: String, + pub watcher_endpoint: String, + pub token_file: Option, + pub incremental_mode: IncrementalMode, + pub batch_size: u32, +} + +pub async fn run(opts: SyncSqliteOptions) -> Result<()> { + let token = load_token(opts.token_file.as_deref())?; + let endpoint = WatcherEndpoint::parse(&opts.watcher_endpoint)?; + let mut watcher = connect_watcher(endpoint, token.clone()).await?; + + let client = crate::postgres::connect(&opts.target) + .await + .context("failed to connect to target PostgreSQL")?; + ensure_state_table(&client).await?; + ensure_baseline_exists(&client).await?; + + tracing::info!("Connecting to sqlite-watcher..."); + watcher + .health_check(Request::new(HealthCheckRequest {})) + .await + .context("watcher health check failed")?; + let _ = watcher + .get_state(Request::new(GetStateRequest { + table_name: GLOBAL_STATE_KEY.to_string(), + })) + .await?; + + tracing::info!( + "Starting incremental sync (mode: {:?})", + opts.incremental_mode + ); + let mut processed_any = false; + + loop { + let mut req = Request::new(ListChangesRequest { + limit: opts.batch_size.max(1), + }); + let changes = watcher + .list_changes(req) + .await + .context("failed to list changes from watcher")? + .into_inner() + .changes; + + if changes.is_empty() { + if !processed_any { + tracing::info!("No pending sqlite-watcher changes"); + } + break; + } + + apply_changes(&client, &changes, opts.incremental_mode).await?; + processed_any = true; + + let max_id = changes + .iter() + .map(|c| c.change_id) + .max() + .unwrap_or_default(); + watcher + .ack_changes(Request::new(AckChangesRequest { + up_to_change_id: max_id, + })) + .await + .context("failed to ack changes")?; + + let last_change = changes.last().unwrap(); + watcher + .set_state(Request::new(SetStateRequest { + table_name: GLOBAL_STATE_KEY.to_string(), + last_change_id: max_id, + last_wal_frame: last_change.wal_frame.clone(), + cursor: last_change.cursor.clone(), + })) + .await + .context("failed to update watcher state")?; + + if changes.len() < opts.batch_size as usize { + break; + } + } + + tracing::info!("sqlite-watcher sync completed"); + Ok(()) +} + +struct TableBatch { + upserts: Vec<(String, serde_json::Value)>, + deletes: Vec, +} + +impl TableBatch { + fn new() -> Self { + Self { + upserts: Vec::new(), + deletes: Vec::new(), + } + } +} + +async fn apply_changes( + client: &Client, + changes: &[sqlite_watcher::watcher_proto::Change], + mode: IncrementalMode, +) -> Result<()> { + let mut per_table: HashMap = HashMap::new(); + let mut table_state: HashMap = HashMap::new(); + + for change in changes { + let entry = per_table + .entry(change.table_name.clone()) + .or_insert_with(TableBatch::new); + match change.op.as_str() { + "insert" | "update" => { + let payload = if change.payload.is_empty() { + serde_json::Value::Null + } else { + serde_json::from_slice(&change.payload) + .context("failed to parse change payload")? + }; + entry.upserts.push((change.primary_key.clone(), payload)); + } + "delete" => { + entry.deletes.push(change.primary_key.clone()); + } + other => bail!("unknown change operation '{other}'"), + } + table_state.insert( + change.table_name.clone(), + TableState { + last_change_id: change.change_id, + wal_frame: change.wal_frame.clone(), + cursor: change.cursor.clone(), + }, + ); + } + + for (table, batch) in per_table.iter() { + if !batch.upserts.is_empty() { + insert_jsonb_batch(client, table, batch.upserts.clone(), "sqlite").await?; + if mode == IncrementalMode::AppendDeduped { + let latest_table = format!("{}_latest", table); + ensure_latest_table(client, table, &latest_table).await?; + upsert_jsonb_rows(client, &latest_table, &batch.upserts, "sqlite").await?; + } + } + if !batch.deletes.is_empty() { + delete_jsonb_rows(client, table, &batch.deletes).await?; + if mode == IncrementalMode::AppendDeduped { + let latest_table = format!("{}_latest", table); + ensure_latest_table(client, table, &latest_table).await?; + delete_jsonb_rows(client, &latest_table, &batch.deletes).await?; + } + } + } + + persist_state(client, &table_state, mode).await?; + Ok(()) +} + +async fn ensure_latest_table( + client: &Client, + source_table: &str, + latest_table: &str, +) -> Result<()> { + crate::jsonb::validate_table_name(source_table)?; + crate::jsonb::validate_table_name(latest_table)?; + let sql = format!( + r#"CREATE TABLE IF NOT EXISTS "{}" (LIKE "{}" INCLUDING ALL)"#, + latest_table, source_table + ); + client.execute(&sql, &[]).await?; + Ok(()) +} + +struct TableState { + last_change_id: i64, + wal_frame: Option, + cursor: Option, +} + +async fn persist_state( + client: &Client, + updates: &HashMap, + mode: IncrementalMode, +) -> Result<()> { + for (table, state) in updates.iter() { + client + .execute( + "INSERT INTO sqlite_sync_state(table_name, last_change_id, last_wal_frame, cursor, snapshot_completed, incremental_mode) + VALUES ($1, $2, $3, $4, TRUE, $5) + ON CONFLICT(table_name) DO UPDATE SET last_change_id = EXCLUDED.last_change_id, last_wal_frame = EXCLUDED.last_wal_frame, cursor = EXCLUDED.cursor, incremental_mode = EXCLUDED.incremental_mode", + &[&table, &state.last_change_id, &state.wal_frame, &state.cursor, &mode_string(mode)], + ) + .await?; + } + Ok(()) +} + +fn mode_string(mode: IncrementalMode) -> &'static str { + match mode { + IncrementalMode::Append => "append", + IncrementalMode::AppendDeduped => "append_deduped", + } +} + +fn load_token(path: Option<&Path>) -> Result { + let token_path = path + .map(|p| p.to_path_buf()) + .unwrap_or(default_token_path()?); + let contents = std::fs::read_to_string(&token_path) + .with_context(|| format!("failed to read token file {}", token_path.display()))?; + let token = contents.trim().to_string(); + if token.is_empty() { + bail!("token file {} is empty", token_path.display()); + } + Ok(token) +} + +fn default_token_path() -> Result { + let home = dirs::home_dir().ok_or_else(|| anyhow!("Could not determine home directory"))?; + Ok(home.join(".seren/sqlite-watcher/token")) +} + +async fn ensure_state_table(client: &Client) -> Result<()> { + client + .execute( + r#"CREATE TABLE IF NOT EXISTS sqlite_sync_state ( + table_name TEXT PRIMARY KEY, + last_change_id BIGINT NOT NULL DEFAULT 0, + last_wal_frame TEXT, + cursor TEXT, + snapshot_completed BOOLEAN NOT NULL DEFAULT FALSE, + incremental_mode TEXT NOT NULL DEFAULT 'append', + baseline_at TIMESTAMPTZ NOT NULL DEFAULT NOW() + )"#, + &[], + ) + .await?; + Ok(()) +} + +async fn ensure_baseline_exists(client: &Client) -> Result<()> { + let row = client + .query_one( + "SELECT COUNT(*) FROM sqlite_sync_state WHERE snapshot_completed", + &[], + ) + .await?; + let completed: i64 = row.get(0); + if completed == 0 { + bail!( + "No completed sqlite baseline found. Run 'database-replicator init --source sqlite://...' first" + ); + } + Ok(()) +} + +enum WatcherEndpoint { + Tcp { host: String, port: u16 }, + Unix(PathBuf), + Pipe(String), +} + +impl WatcherEndpoint { + fn parse(value: &str) -> Result { + if let Some(rest) = value.strip_prefix("unix:") { + #[cfg(unix)] + { + if rest.is_empty() { + bail!("unix endpoint requires a path"); + } + return Ok(WatcherEndpoint::Unix(PathBuf::from(rest))); + } + #[cfg(not(unix))] + bail!("unix sockets are not supported on this platform") + } + if let Some(rest) = value.strip_prefix("tcp:") { + let mut parts = rest.split(':'); + let host = parts + .next() + .ok_or_else(|| anyhow!("tcp endpoint must include host:port"))?; + let port = parts + .next() + .ok_or_else(|| anyhow!("tcp endpoint must include port"))? + .parse::() + .context("invalid tcp port")?; + return Ok(WatcherEndpoint::Tcp { + host: host.to_string(), + port, + }); + } + if let Some(rest) = value.strip_prefix("pipe:") { + return Ok(WatcherEndpoint::Pipe(rest.to_string())); + } + bail!("unsupported watcher endpoint: {value}"); + } +} + +#[derive(Clone)] +struct TokenInterceptor { + header: tonic::metadata::MetadataValue, +} + +impl TokenInterceptor { + fn new(token: String) -> Result { + let value = tonic::metadata::MetadataValue::try_from(format!("Bearer {token}")) + .context("invalid watcher token")?; + Ok(Self { header: value }) + } +} + +impl Interceptor for TokenInterceptor { + fn call(&mut self, mut req: Request<()>) -> Result, Status> { + req.metadata_mut() + .insert("authorization", self.header.clone()); + Ok(req) + } +} + +type WatcherClientWithAuth = WatcherClient>; + +async fn connect_watcher( + endpoint: WatcherEndpoint, + token: String, +) -> Result { + let interceptor = TokenInterceptor::new(token)?; + let channel = match endpoint { + WatcherEndpoint::Tcp { host, port } => { + let uri = format!("http://{}:{}", host, port); + Endpoint::try_from(uri)?.connect().await? + } + WatcherEndpoint::Unix(path) => { + #[cfg(unix)] + { + let path_buf = path.clone(); + Endpoint::try_from("http://[::]:50051")? + .connect_with_connector(service_fn(move |_| { + let path = path_buf.clone(); + async move { tokio::net::UnixStream::connect(path).await } + })) + .await? + } + #[cfg(not(unix))] + { + bail!("unix sockets are not supported on this platform") + } + } + WatcherEndpoint::Pipe(name) => { + bail!("named pipe endpoints are not supported yet: {name}") + } + }; + + Ok(WatcherClient::with_interceptor(channel, interceptor)) +} + +#[cfg(test)] +mod tests { + use super::*; + use sqlite_watcher::watcher_proto::Change; + + #[test] + fn group_changes_by_table() { + let changes = vec![ + Change { + change_id: 1, + table_name: "foo".into(), + op: "insert".into(), + primary_key: "1".into(), + payload: serde_json::to_vec(&serde_json::json!({"a":1})).unwrap(), + wal_frame: None, + cursor: None, + }, + Change { + change_id: 2, + table_name: "foo".into(), + op: "delete".into(), + primary_key: "2".into(), + payload: Vec::new(), + wal_frame: None, + cursor: None, + }, + ]; + let mut per_table: HashMap = HashMap::new(); + for change in changes { + let entry = per_table + .entry(change.table_name.clone()) + .or_insert_with(TableBatch::new); + match change.op.as_str() { + "insert" | "update" => { + entry + .upserts + .push((change.primary_key.clone(), serde_json::Value::Null)); + } + "delete" => entry.deletes.push(change.primary_key.clone()), + _ => {} + } + } + let foo = per_table.get("foo").unwrap(); + assert_eq!(foo.upserts.len(), 1); + assert_eq!(foo.deletes.len(), 1); + } +} diff --git a/src/jsonb/writer.rs b/src/jsonb/writer.rs index eb03541..839e47a 100644 --- a/src/jsonb/writer.rs +++ b/src/jsonb/writer.rs @@ -2,7 +2,7 @@ // ABOUTME: Handles table creation, single row inserts, and batch inserts use anyhow::{bail, Context, Result}; -use tokio_postgres::Client; +use tokio_postgres::{types::ToSql, Client}; /// Create a table with JSONB schema for storing non-PostgreSQL data /// @@ -471,6 +471,49 @@ pub async fn insert_jsonb_batch( Ok(()) } +/// Delete rows from a JSONB table by primary key +pub async fn delete_jsonb_rows(client: &Client, table_name: &str, ids: &[String]) -> Result<()> { + if ids.is_empty() { + return Ok(()); + } + crate::jsonb::validate_table_name(table_name)?; + let sql = format!(r#"DELETE FROM "{}" WHERE id = ANY($1)"#, table_name); + client.execute(&sql, &[&ids]).await?; + Ok(()) +} + +/// Upsert rows into a JSONB table (used for deduped "_latest" tables) +pub async fn upsert_jsonb_rows( + client: &Client, + table_name: &str, + rows: &[(String, serde_json::Value)], + source_type: &str, +) -> Result<()> { + if rows.is_empty() { + return Ok(()); + } + crate::jsonb::validate_table_name(table_name)?; + + let mut value_placeholders = Vec::with_capacity(rows.len()); + let mut params: Vec<&(dyn ToSql + Sync)> = Vec::with_capacity(rows.len() * 3); + + for (idx, (id, data)) in rows.iter().enumerate() { + let base = idx * 3 + 1; + value_placeholders.push(format!("(${}, ${}, ${})", base, base + 1, base + 2)); + params.push(id); + params.push(data); + params.push(&source_type); + } + + let sql = format!( + r#"INSERT INTO "{}" (id, data, _source_type) VALUES {} ON CONFLICT (id) DO UPDATE SET data = EXCLUDED.data, _source_type = EXCLUDED._source_type, _migrated_at = NOW()"#, + table_name, + value_placeholders.join(", ") + ); + client.execute(&sql, ¶ms).await?; + Ok(()) +} + #[cfg(test)] mod tests { #[test] diff --git a/src/main.rs b/src/main.rs index a6f165b..d7552d9 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,6 +4,7 @@ use anyhow::Context; use clap::{Args, Parser, Subcommand}; use database_replicator::commands; +use std::path::PathBuf; #[derive(Parser)] #[command(name = "database-replicator")] @@ -176,6 +177,24 @@ enum Commands { #[arg(long)] daemon_status: bool, }, + /// Consume sqlite-watcher change batches and apply them to SerenDB JSONB tables + SyncSqlite { + /// Target PostgreSQL/Seren connection string + #[arg(long)] + target: String, + /// sqlite-watcher endpoint (unix:/path or tcp:host:port) + #[arg(long, default_value = "unix:/tmp/sqlite-watcher.sock")] + watcher_endpoint: String, + /// Optional shared-secret token file (defaults to ~/.seren/sqlite-watcher/token) + #[arg(long)] + token_file: Option, + /// Incremental mode: append (raw only) or append_deduped (maintains *_latest tables) + #[arg(long, value_enum, default_value = "append")] + incremental_mode: commands::sync_sqlite::IncrementalMode, + /// Number of watcher rows to pull per batch + #[arg(long, default_value_t = 500)] + batch_size: u32, + }, /// Check replication status and lag in real-time Status { #[arg(long)] @@ -748,6 +767,22 @@ async fn main() -> anyhow::Result<()> { )?; commands::verify(&source, &target, Some(filter)).await } + Commands::SyncSqlite { + target, + watcher_endpoint, + token_file, + incremental_mode, + batch_size, + } => { + commands::sync_sqlite::run(commands::sync_sqlite::SyncSqliteOptions { + target, + watcher_endpoint, + token_file, + incremental_mode, + batch_size, + }) + .await + } Commands::Target { args } => commands::target(args).await, } } From 8e96b7a965057a4dc50f8b4a0b1db42c174ac2fb Mon Sep 17 00:00:00 2001 From: Taariq Lewis <701864+taariq@users.noreply.github.com> Date: Fri, 12 Dec 2025 14:35:29 -0800 Subject: [PATCH 2/5] [docs/packaging] add watcher crate, release artifacts, and smoke test --- .github/workflows/release.yml | 30 +++ Cargo.lock | 313 ++++++++++++++++++++++++++++- Cargo.toml | 7 + Dockerfile | 14 +- README-SQLite.md | 33 +++ docs/installers.md | 109 ++++++++++ scripts/test-sqlite-delta.sh | 93 +++++++++ sqlite-watcher/Cargo.toml | 32 +++ sqlite-watcher/README.md | 10 + sqlite-watcher/build.rs | 8 + sqlite-watcher/proto/watcher.proto | 45 +++++ sqlite-watcher/src/lib.rs | 5 + sqlite-watcher/src/main.rs | 209 +++++++++++++++++++ sqlite-watcher/src/queue.rs | 224 +++++++++++++++++++++ sqlite-watcher/src/server.rs | 283 ++++++++++++++++++++++++++ 15 files changed, 1403 insertions(+), 12 deletions(-) create mode 100644 docs/installers.md create mode 100755 scripts/test-sqlite-delta.sh create mode 100644 sqlite-watcher/Cargo.toml create mode 100644 sqlite-watcher/README.md create mode 100644 sqlite-watcher/build.rs create mode 100644 sqlite-watcher/proto/watcher.proto create mode 100644 sqlite-watcher/src/lib.rs create mode 100644 sqlite-watcher/src/main.rs create mode 100644 sqlite-watcher/src/queue.rs create mode 100644 sqlite-watcher/src/server.rs diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 998630f..fa4aadd 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -23,18 +23,26 @@ jobs: target: x86_64-unknown-linux-gnu artifact_name: database-replicator asset_name: database-replicator-linux-x64-binary + watcher_artifact: sqlite-watcher + watcher_asset: sqlite-watcher-linux-x64 - os: macos-latest target: x86_64-apple-darwin artifact_name: database-replicator asset_name: database-replicator-macos-x64-binary + watcher_artifact: sqlite-watcher + watcher_asset: sqlite-watcher-macos-x64 - os: macos-latest target: aarch64-apple-darwin artifact_name: database-replicator asset_name: database-replicator-macos-arm64-binary + watcher_artifact: sqlite-watcher + watcher_asset: sqlite-watcher-macos-arm64 - os: windows-latest target: x86_64-pc-windows-msvc artifact_name: database-replicator.exe asset_name: database-replicator-windows-x64.exe + watcher_artifact: sqlite-watcher.exe + watcher_asset: sqlite-watcher-windows-x64.exe steps: - name: Checkout code uses: actions/checkout@v4 @@ -75,6 +83,9 @@ jobs: - name: Build release binary run: cargo build --release --target ${{ matrix.target }} --verbose + - name: Build sqlite-watcher binary + run: cargo build --release --target ${{ matrix.target }} -p sqlite-watcher --verbose + - name: Strip binary (Linux) if: matrix.os == 'ubuntu-latest' run: strip target/${{ matrix.target }}/release/${{ matrix.artifact_name }} @@ -89,12 +100,24 @@ jobs: cp target/${{ matrix.target }}/release/${{ matrix.artifact_name }} ${{ matrix.asset_name }} chmod +x ${{ matrix.asset_name }} + - name: Rename sqlite-watcher (Unix) + if: matrix.os != 'windows-latest' + run: | + cp target/${{ matrix.target }}/release/${{ matrix.watcher_artifact }} ${{ matrix.watcher_asset }} + chmod +x ${{ matrix.watcher_asset }} + - name: Rename binary (Windows) if: matrix.os == 'windows-latest' run: | copy target\${{ matrix.target }}\release\${{ matrix.artifact_name }} ${{ matrix.asset_name }} shell: cmd + - name: Rename sqlite-watcher (Windows) + if: matrix.os == 'windows-latest' + run: | + copy target\${{ matrix.target }}\release\${{ matrix.watcher_artifact }} ${{ matrix.watcher_asset }} + shell: cmd + - name: Upload artifact uses: actions/upload-artifact@v4 with: @@ -102,6 +125,13 @@ jobs: path: ${{ matrix.asset_name }} if-no-files-found: error + - name: Upload sqlite-watcher artifact + uses: actions/upload-artifact@v4 + with: + name: ${{ matrix.watcher_asset }} + path: ${{ matrix.watcher_asset }} + if-no-files-found: error + create-release: name: Create GitHub Release needs: build-release diff --git a/Cargo.lock b/Cargo.lock index eed6140..87e12bb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -118,6 +118,28 @@ version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50" +[[package]] +name = "async-stream" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b5a71a6f37880a80d1d7f19efd781e4b5de42c88f0722cc13bcb6cc2cfe8476" +dependencies = [ + "async-stream-impl", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.108", +] + [[package]] name = "async-trait" version = "0.1.89" @@ -135,6 +157,51 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8" +[[package]] +name = "axum" +version = "0.6.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b829e4e32b91e643de6eafe82b1d90675f5874230191a4ffbc1b336dec4d6bf" +dependencies = [ + "async-trait", + "axum-core", + "bitflags 1.3.2", + "bytes", + "futures-util", + "http", + "http-body", + "hyper", + "itoa", + "matchit", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "rustversion", + "serde", + "sync_wrapper", + "tower", + "tower-layer", + "tower-service", +] + +[[package]] +name = "axum-core" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "759fa577a247914fd3f7f76d62972792636412fbfd634cd452f6a385a74d2d2c" +dependencies = [ + "async-trait", + "bytes", + "futures-util", + "http", + "http-body", + "mime", + "rustversion", + "tower-layer", + "tower-service", +] + [[package]] name = "base64" version = "0.21.7" @@ -169,7 +236,7 @@ dependencies = [ "bitflags 2.10.0", "cexpr", "clang-sys", - "itertools", + "itertools 0.13.0", "proc-macro2", "quote", "regex", @@ -246,7 +313,7 @@ dependencies = [ "getrandom 0.2.16", "getrandom 0.3.4", "hex", - "indexmap", + "indexmap 2.12.0", "js-sys", "once_cell", "rand 0.9.2", @@ -917,6 +984,12 @@ version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "52051878f80a721bb68ebfbc930e07b65ba72f2da88968ea5c06fd6ca3d3a127" +[[package]] +name = "fixedbitset" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80" + [[package]] name = "flate2" version = "1.1.5" @@ -1188,7 +1261,7 @@ dependencies = [ "futures-sink", "futures-util", "http", - "indexmap", + "indexmap 2.12.0", "slab", "tokio", "tokio-util", @@ -1374,6 +1447,18 @@ dependencies = [ "want", ] +[[package]] +name = "hyper-timeout" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1" +dependencies = [ + "hyper", + "pin-project-lite", + "tokio", + "tokio-io-timeout", +] + [[package]] name = "hyper-tls" version = "0.5.0" @@ -1519,6 +1604,16 @@ dependencies = [ "icu_properties", ] +[[package]] +name = "indexmap" +version = "1.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd070e393353796e801d209ad339e89596eb4c8d430d18ede6a1cced8fafbd99" +dependencies = [ + "autocfg", + "hashbrown 0.12.3", +] + [[package]] name = "indexmap" version = "2.12.0" @@ -1583,6 +1678,15 @@ version = "1.70.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a6cb138bb79a146c1bd460005623e142ef0181e3d0219cb493e02f7d08a35695" +[[package]] +name = "itertools" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba291022dbbd398a455acf126c1e341954079855bc60dfdda641363bd6922569" +dependencies = [ + "either", +] + [[package]] name = "itertools" version = "0.13.0" @@ -1624,7 +1728,7 @@ version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4ee7893dab2e44ae5f9d0173f26ff4aa327c10b01b06a72b52dd9405b628640d" dependencies = [ - "indexmap", + "indexmap 2.12.0", ] [[package]] @@ -1786,6 +1890,12 @@ dependencies = [ "regex-automata", ] +[[package]] +name = "matchit" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" + [[package]] name = "md-5" version = "0.10.6" @@ -1941,6 +2051,12 @@ dependencies = [ "syn 2.0.108", ] +[[package]] +name = "multimap" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d87ecb2933e8aeadb3e3a02b828fed80a7528047e68b4f424523a0981a3a084" + [[package]] name = "mysql-common-derive" version = "0.31.2" @@ -2222,6 +2338,16 @@ version = "2.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b4f627cb1b25917193a259e49bdad08f671f8d9708acfd5fe0a8c1455d87220" +[[package]] +name = "petgraph" +version = "0.6.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b4c5cc86750666a3ed20bdaf5ca2a0344f9c67674cae0515bec2da16fbaa47db" +dependencies = [ + "fixedbitset", + "indexmap 2.12.0", +] + [[package]] name = "phf" version = "0.13.1" @@ -2354,6 +2480,16 @@ dependencies = [ "zerocopy", ] +[[package]] +name = "prettyplease" +version = "0.2.37" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "479ca8adacdd7ce8f1fb39ce9ecccbfe93a3f1344b3d0d97f20bc0196208f62b" +dependencies = [ + "proc-macro2", + "syn 2.0.108", +] + [[package]] name = "proc-macro-crate" version = "3.4.0" @@ -2394,6 +2530,59 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "prost" +version = "0.12.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "deb1435c188b76130da55f17a466d252ff7b1418b2ad3e037d127b94e3411f29" +dependencies = [ + "bytes", + "prost-derive", +] + +[[package]] +name = "prost-build" +version = "0.12.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22505a5c94da8e3b7c2996394d1c933236c4d743e81a410bcca4e6989fc066a4" +dependencies = [ + "bytes", + "heck", + "itertools 0.12.1", + "log", + "multimap", + "once_cell", + "petgraph", + "prettyplease", + "prost", + "prost-types", + "regex", + "syn 2.0.108", + "tempfile", +] + +[[package]] +name = "prost-derive" +version = "0.12.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81bddcdb20abf9501610992b6759a4c888aef7d1a7247ef75e2404275ac24af1" +dependencies = [ + "anyhow", + "itertools 0.12.1", + "proc-macro2", + "quote", + "syn 2.0.108", +] + +[[package]] +name = "prost-types" +version = "0.12.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9091c90b0a32608e984ff2fa4091273cbdd755d54935c51d520887f4a1dbd5b0" +dependencies = [ + "prost", +] + [[package]] name = "ptr_meta" version = "0.1.4" @@ -2648,6 +2837,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a78046161564f5e7cd9008aff3b2990b3850dc8e0349119b98e8f251e099f24d" dependencies = [ "bitflags 2.10.0", + "chrono", "fallible-iterator 0.3.0", "fallible-streaming-iterator", "hashlink", @@ -2881,7 +3071,7 @@ version = "1.0.145" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "402a6f66d8c709116cf22f558eab210f5a50187f702eb4d7e5ef38d9a7f1c79c" dependencies = [ - "indexmap", + "indexmap 2.12.0", "itoa", "memchr", "ryu", @@ -3055,6 +3245,27 @@ dependencies = [ "windows-sys 0.60.2", ] +[[package]] +name = "sqlite-watcher" +version = "0.1.0" +dependencies = [ + "anyhow", + "base64 0.21.7", + "clap", + "dirs", + "prost", + "rusqlite", + "serde", + "serde_json", + "tempfile", + "thiserror 1.0.69", + "tokio", + "tokio-stream", + "tonic", + "tonic-build", + "tower", +] + [[package]] name = "stable_deref_trait" version = "1.2.1" @@ -3331,6 +3542,16 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "tokio-io-timeout" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0bd86198d9ee903fedd2f9a2e72014287c0d9167e4ae43b5853007205dda1b76" +dependencies = [ + "pin-project-lite", + "tokio", +] + [[package]] name = "tokio-macros" version = "2.6.0" @@ -3388,6 +3609,17 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-stream" +version = "0.1.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eca58d7bba4a75707817a2c44174253f9236b2d5fbd055602e9d5c07c139a047" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", +] + [[package]] name = "tokio-util" version = "0.7.17" @@ -3438,7 +3670,7 @@ version = "0.22.27" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "41fe8c660ae4257887cf66394862d21dbca4a6ddd26f04a3560410406a2f819a" dependencies = [ - "indexmap", + "indexmap 2.12.0", "serde", "serde_spanned", "toml_datetime 0.6.11", @@ -3452,7 +3684,7 @@ version = "0.23.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6485ef6d0d9b5d0ec17244ff7eb05310113c3f316f2d14200d4de56b3cb98f8d" dependencies = [ - "indexmap", + "indexmap 2.12.0", "toml_datetime 0.7.3", "toml_parser", "winnow", @@ -3473,6 +3705,72 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5d99f8c9a7727884afe522e9bd5edbfc91a3312b36a77b5fb8926e4c31a41801" +[[package]] +name = "tonic" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76c4eb7a4e9ef9d4763600161f12f5070b92a578e1b634db88a6887844c91a13" +dependencies = [ + "async-stream", + "async-trait", + "axum", + "base64 0.21.7", + "bytes", + "h2", + "http", + "http-body", + "hyper", + "hyper-timeout", + "percent-encoding", + "pin-project", + "prost", + "tokio", + "tokio-stream", + "tower", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tonic-build" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be4ef6dd70a610078cb4e338a0f79d06bc759ff1b22d2120c2ff02ae264ba9c2" +dependencies = [ + "prettyplease", + "proc-macro2", + "prost-build", + "quote", + "syn 2.0.108", +] + +[[package]] +name = "tower" +version = "0.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" +dependencies = [ + "futures-core", + "futures-util", + "indexmap 1.9.3", + "pin-project", + "pin-project-lite", + "rand 0.8.5", + "slab", + "tokio", + "tokio-util", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tower-layer" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "121c2a6cda46980bb0fcd1647ffaf6cd3fc79a013de288782836f6df9c48780e" + [[package]] name = "tower-service" version = "0.3.3" @@ -3485,6 +3783,7 @@ version = "0.1.41" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "784e0ac535deb450455cbfa28a6f0df145ea1bb7ae51b821cf5e7927fdcfbdd0" dependencies = [ + "log", "pin-project-lite", "tracing-attributes", "tracing-core", diff --git a/Cargo.toml b/Cargo.toml index 72b4e75..ab7988c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,3 +1,10 @@ +[workspace] +resolver = "2" +members = [ + ".", + "sqlite-watcher", +] + [package] name = "database-replicator" version = "7.0.14" diff --git a/Dockerfile b/Dockerfile index bd302dc..7952720 100644 --- a/Dockerfile +++ b/Dockerfile @@ -2,19 +2,22 @@ FROM ubuntu:24.04 AS downloader ARG VERSION=latest -ENV BINARY_NAME=database-replicator-linux-x64-binary +ENV REPLICATOR_ASSET=database-replicator-linux-x64-binary +ENV WATCHER_ASSET=sqlite-watcher-linux-x64 ENV RELEASE_ROOT=https://github.com/serenorg/database-replicator/releases RUN apt-get update && apt-get install -y --no-install-recommends curl ca-certificates && rm -rf /var/lib/apt/lists/* RUN set -eux; \ if [ "$VERSION" = "latest" ]; then \ - URL="$RELEASE_ROOT/latest/download/$BINARY_NAME"; \ + REP_URL="$RELEASE_ROOT/latest/download/$REPLICATOR_ASSET"; \ + WATCH_URL="$RELEASE_ROOT/latest/download/$WATCHER_ASSET"; \ else \ - URL="$RELEASE_ROOT/download/$VERSION/$BINARY_NAME"; \ + REP_URL="$RELEASE_ROOT/download/$VERSION/$REPLICATOR_ASSET"; \ + WATCH_URL="$RELEASE_ROOT/download/$VERSION/$WATCHER_ASSET"; \ fi; \ - curl -fL "$URL" -o /tmp/database-replicator && \ - chmod +x /tmp/database-replicator + curl -fL "$REP_URL" -o /tmp/database-replicator && chmod +x /tmp/database-replicator && \ + curl -fL "$WATCH_URL" -o /tmp/sqlite-watcher && chmod +x /tmp/sqlite-watcher FROM ubuntu:24.04 LABEL org.opencontainers.image.title="database-replicator" \ @@ -27,6 +30,7 @@ RUN apt-get update && \ useradd -m replicator COPY --from=downloader /tmp/database-replicator /usr/local/bin/database-replicator +COPY --from=downloader /tmp/sqlite-watcher /usr/local/bin/sqlite-watcher USER replicator ENTRYPOINT ["database-replicator"] CMD ["--help"] diff --git a/README-SQLite.md b/README-SQLite.md index a01b44d..5f63c46 100644 --- a/README-SQLite.md +++ b/README-SQLite.md @@ -596,3 +596,36 @@ No. The tool uses `SQLITE_OPEN_READ_ONLY` which allows concurrent readers. Other For issues or questions: - **GitHub Issues**: https://github.com/serenorg/database-replicator/issues - **Email**: support@seren.ai +## Delta replication with sqlite-watcher + +Once you have completed the initial snapshot (`database-replicator init --source sqlite ...`), you can switch to incremental change capture: + +1. Install `sqlite-watcher` (see [docs/installers.md](docs/installers.md) for Linux systemd units, macOS launchd plists, and Windows service guidance). +2. Start the watcher beside your `.sqlite` file (example for Linux/macOS): + + ```bash + sqlite-watcher serve \ + --queue-db ~/.seren/sqlite-watcher/changes.db \ + --listen unix:/tmp/sqlite-watcher.sock \ + --token-file ~/.seren/sqlite-watcher/token + ``` + +3. Consume the change feed with the new command: + + ```bash + database-replicator sync-sqlite \ + --target "postgresql://user:pass@your-serendb.serendb.com:5432/app" \ + --watcher-endpoint unix:/tmp/sqlite-watcher.sock \ + --token-file ~/.seren/sqlite-watcher/token \ + --incremental-mode append + ``` + + Use `--incremental-mode append_deduped` to maintain `_latest` tables (one row per primary key) in addition to the append-only history. + +4. Verify the smoke test if you have Docker available: + + ```bash + scripts/test-sqlite-delta.sh + ``` + + The script spins up a temporary Postgres container, runs `sqlite-watcher`, and executes `database-replicator sync-sqlite` against the watcher feed. Windows users can follow the same steps manually (the script prints the equivalent commands at the end). diff --git a/docs/installers.md b/docs/installers.md new file mode 100644 index 0000000..1a2fb1a --- /dev/null +++ b/docs/installers.md @@ -0,0 +1,109 @@ +# sqlite-watcher installation guide + +This document walks through running the sqlite-watcher service on Linux, macOS, and Windows. The watcher process should run beside the `.sqlite` file so it can tail the WAL and expose change batches over the embedded gRPC API. + +All platforms share these basics: + +- Create a token file (default `~/.seren/sqlite-watcher/token`) with restrictive permissions (owner read/write only). +- Choose a queue database path (default `~/.seren/sqlite-watcher/changes.db`). Ensure the parent directory is `0700` on Unix. +- Run `sqlite-watcher serve --queue-db --listen --token-file ` to start the gRPC service. Endpoints use the `unix:/path` or `tcp:host:port` syntax. + +## Linux (systemd) + +1. Install binaries: + + ```bash + sudo install -m 0755 database-replicator /usr/local/bin/database-replicator + sudo install -m 0755 sqlite-watcher /usr/local/bin/sqlite-watcher + ``` + +2. Create token + queue directories: + + ```bash + install -d -m 0700 ~/.seren/sqlite-watcher + openssl rand -hex 32 > ~/.seren/sqlite-watcher/token + ``` + +3. Create `/etc/systemd/system/sqlite-watcher.service`: + + ```ini + [Unit] + Description=sqlite-watcher for /srv/app.db + After=network-online.target + + [Service] + User=replicator + ExecStart=/usr/local/bin/sqlite-watcher serve \ + --queue-db /var/lib/sqlite-watcher/changes.db \ + --listen unix:/run/sqlite-watcher.sock \ + --token-file /home/replicator/.seren/sqlite-watcher/token + Restart=on-failure + + [Install] + WantedBy=multi-user.target + ``` + +4. Enable/start: + + ```bash + sudo systemctl daemon-reload + sudo systemctl enable --now sqlite-watcher.service + ``` + +## macOS (launchd) + +1. Copy binaries into `/usr/local/bin`. +2. Save the following to `~/Library/LaunchAgents/com.seren.sqlite-watcher.plist`: + + ```xml + + + + + Label + com.seren.sqlite-watcher + ProgramArguments + + /usr/local/bin/sqlite-watcher + serve + --queue-db + /Users/you/.seren/sqlite-watcher/changes.db + --listen + unix:/Users/you/.seren/sqlite-watcher/watcher.sock + --token-file + /Users/you/.seren/sqlite-watcher/token + + RunAtLoad + + KeepAlive + + StandardOutPath + /Users/you/Library/Logs/sqlite-watcher.log + StandardErrorPath + /Users/you/Library/Logs/sqlite-watcher.log + + + ``` + +3. Load the agent: `launchctl load ~/Library/LaunchAgents/com.seren.sqlite-watcher.plist`. + +## Windows (Service) + +1. Copy `database-replicator.exe` and `sqlite-watcher.exe` to a directory on `%PATH%` (e.g. `C:\Program Files\Seren`). +2. Create a token file under `%USERPROFILE%\.seren\sqlite-watcher\token`. +3. Use the built-in `sc.exe` to install a service (or NSSM if you prefer a GUI): + + ```powershell + sc.exe create sqlite-watcher binPath= "C:\Program Files\Seren\sqlite-watcher.exe serve --queue-db C:\data\sqlite-watcher\changes.db --listen tcp:127.0.0.1:6000 --token-file %USERPROFILE%\.seren\sqlite-watcher\token" start= auto + ``` + +4. Start the service with `sc.exe start sqlite-watcher`. + +Remember to open the firewall only if the watcher must accept remote TCP connections. In most deployments, keep it bound to loopback or Unix sockets. + +## Running sync-sqlite on a schedule + +- Linux/macOS: use cron or systemd timers to run `database-replicator sync-sqlite ...` periodically. +- Windows: create a Scheduled Task pointing at `database-replicator.exe sync-sqlite ...`. + +Consult the smoke test (`scripts/test-sqlite-delta.sh`) to see a minimal end-to-end example. diff --git a/scripts/test-sqlite-delta.sh b/scripts/test-sqlite-delta.sh new file mode 100755 index 0000000..743c820 --- /dev/null +++ b/scripts/test-sqlite-delta.sh @@ -0,0 +1,93 @@ +#!/usr/bin/env bash +# Smoke test for sqlite-watcher + database-replicator incremental sync +# Requires: docker, sqlite-watcher, database-replicator, sqlite3 + +set -euo pipefail + +if ! command -v docker >/dev/null; then + echo "[smoke] docker is required" >&2 + exit 1 +fi +if ! command -v sqlite-watcher >/dev/null; then + echo "[smoke] sqlite-watcher binary not found in PATH" >&2 + exit 1 +fi +if ! command -v database-replicator >/dev/null; then + echo "[smoke] database-replicator binary not found in PATH" >&2 + exit 1 +fi + +TMPDIR=$(mktemp -d) +QUEUE_DB="$TMPDIR/queue.db" +SOCK="$TMPDIR/watcher.sock" +TOKEN_FILE="$TMPDIR/token" +POSTGRES_PORT=55432 +CONTAINER_NAME=sqlite-delta-smoke + +cleanup() { + set +e + if [[ -n "${WATCHER_PID:-}" ]]; then + kill "$WATCHER_PID" >/dev/null 2>&1 || true + fi + docker rm -f "$CONTAINER_NAME" >/dev/null 2>&1 || true + rm -rf "$TMPDIR" +} +trap cleanup EXIT + +echo "[smoke] preparing token + queue" +mkdir -p "$(dirname "$TOKEN_FILE")" +printf 'smoke-%s' "$RANDOM" > "$TOKEN_FILE" +chmod 600 "$TOKEN_FILE" + +sqlite-watcher enqueue --queue-db "$QUEUE_DB" --table demo --id smoke --payload '{"message":"hello-from-watcher"}' + +sqlite-watcher serve --queue-db "$QUEUE_DB" --listen "unix:$SOCK" --token-file "$TOKEN_FILE" >/dev/null 2>&1 & +WATCHER_PID=$! +sleep 1 + +echo "[smoke] starting postgres container" +docker run -d --rm \ + --name "$CONTAINER_NAME" \ + -e POSTGRES_PASSWORD=postgres \ + -p "$POSTGRES_PORT":5432 \ + postgres:15 >/dev/null + +until docker exec "$CONTAINER_NAME" pg_isready -U postgres >/dev/null 2>&1; do + sleep 1 +done + +docker exec "$CONTAINER_NAME" psql -U postgres <<'SQL' +CREATE TABLE IF NOT EXISTS demo ( + id TEXT PRIMARY KEY, + data JSONB NOT NULL, + _source_type TEXT NOT NULL DEFAULT 'sqlite', + _migrated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); +CREATE TABLE IF NOT EXISTS sqlite_sync_state ( + table_name TEXT PRIMARY KEY, + last_change_id BIGINT NOT NULL DEFAULT 0, + last_wal_frame TEXT, + cursor TEXT, + snapshot_completed BOOLEAN NOT NULL DEFAULT FALSE, + incremental_mode TEXT NOT NULL DEFAULT 'append', + baseline_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); +INSERT INTO sqlite_sync_state(table_name, snapshot_completed, incremental_mode) +VALUES ('demo', TRUE, 'append') +ON CONFLICT(table_name) DO UPDATE SET snapshot_completed = EXCLUDED.snapshot_completed, + incremental_mode = EXCLUDED.incremental_mode; +SQL + +echo "[smoke] running sync-sqlite" +DATABASE_URL="postgresql://postgres:postgres@localhost:$POSTGRES_PORT/postgres" +database-replicator sync-sqlite \ + --target "$DATABASE_URL" \ + --watcher-endpoint "unix:$SOCK" \ + --token-file "$TOKEN_FILE" \ + --batch-size 50 \ + --incremental-mode append >/dev/null + +docker exec "$CONTAINER_NAME" psql -U postgres -tAc "SELECT count(*) FROM demo WHERE id = 'smoke'" | grep -q '^ 1' + +echo "[smoke] success! sqlite-watcher + sync-sqlite end-to-end" +echo "[windows] Manual steps: run sqlite-watcher serve with tcp listener, start a Postgres instance (Docker Desktop works), then run database-replicator sync-sqlite with the TCP watcher endpoint." diff --git a/sqlite-watcher/Cargo.toml b/sqlite-watcher/Cargo.toml new file mode 100644 index 0000000..b81704e --- /dev/null +++ b/sqlite-watcher/Cargo.toml @@ -0,0 +1,32 @@ +[package] +name = "sqlite-watcher" +version = "0.1.0" +edition = "2021" +authors = ["SerenAI "] +description = "SQLite watcher components (queue + gRPC)." +license = "Apache-2.0" +repository = "https://github.com/serenorg/database-replicator" +build = "build.rs" + +[build-dependencies] +tonic-build = "0.11" + +[dependencies] +anyhow = "1.0" +clap = { version = "4.4", features = ["derive", "env"] } +dirs = "5.0" +rusqlite = { version = "0.30", features = ["chrono"] } +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +thiserror = "1.0" +base64 = "0.21" +tokio = { version = "1.35", features = ["rt-multi-thread", "macros", "signal", "fs"] } +tonic = { version = "0.11", features = ["transport"] } +tokio-stream = { version = "0.1", features = ["net"] } +prost = "0.12" + +[dev-dependencies] +tempfile = "3.8" +tokio = { version = "1.35", features = ["rt", "macros"] } +tonic = { version = "0.11", features = ["transport"] } +tower = "0.4" diff --git a/sqlite-watcher/README.md b/sqlite-watcher/README.md new file mode 100644 index 0000000..0916d27 --- /dev/null +++ b/sqlite-watcher/README.md @@ -0,0 +1,10 @@ +# sqlite-watcher (alpha) + +This crate currently ships the shared queue + gRPC server used by `database-replicator sync-sqlite`. The `sqlite-watcher` binary includes: + +- `serve`: start the queue-backed gRPC API so clients can pull change batches. +- `enqueue`: helper for tests/smoke scripts to add sample changes to the queue database. + +> **Note:** WAL tailing is still under active development; use the binary today to test queue + sync flows. + +See `docs/installers.md` for per-OS service guidance and `scripts/test-sqlite-delta.sh` for the end-to-end smoke test. diff --git a/sqlite-watcher/build.rs b/sqlite-watcher/build.rs new file mode 100644 index 0000000..a38290d --- /dev/null +++ b/sqlite-watcher/build.rs @@ -0,0 +1,8 @@ +fn main() -> Result<(), Box> { + tonic_build::configure() + .build_client(true) + .build_server(true) + .compile(&["proto/watcher.proto"], &["proto"])?; + println!("cargo:rerun-if-changed=proto/watcher.proto"); + Ok(()) +} diff --git a/sqlite-watcher/proto/watcher.proto b/sqlite-watcher/proto/watcher.proto new file mode 100644 index 0000000..2424406 --- /dev/null +++ b/sqlite-watcher/proto/watcher.proto @@ -0,0 +1,45 @@ +syntax = "proto3"; + +package sqlitewatcher; + +message HealthCheckRequest {} +message HealthCheckResponse { string status = 1; } + +message ListChangesRequest { uint32 limit = 1; } +message Change { + int64 change_id = 1; + string table_name = 2; + string op = 3; + string primary_key = 4; + bytes payload = 5; + string wal_frame = 6; + string cursor = 7; +} +message ListChangesResponse { repeated Change changes = 1; } + +message AckChangesRequest { int64 up_to_change_id = 1; } +message AckChangesResponse { uint64 acknowledged = 1; } + +message GetStateRequest { string table_name = 1; } +message GetStateResponse { + bool exists = 1; + int64 last_change_id = 2; + string last_wal_frame = 3; + string cursor = 4; +} + +message SetStateRequest { + string table_name = 1; + int64 last_change_id = 2; + string last_wal_frame = 3; + string cursor = 4; +} +message SetStateResponse {} + +service Watcher { + rpc HealthCheck(HealthCheckRequest) returns (HealthCheckResponse); + rpc ListChanges(ListChangesRequest) returns (ListChangesResponse); + rpc AckChanges(AckChangesRequest) returns (AckChangesResponse); + rpc GetState(GetStateRequest) returns (GetStateResponse); + rpc SetState(SetStateRequest) returns (SetStateResponse); +} diff --git a/sqlite-watcher/src/lib.rs b/sqlite-watcher/src/lib.rs new file mode 100644 index 0000000..b24ec66 --- /dev/null +++ b/sqlite-watcher/src/lib.rs @@ -0,0 +1,5 @@ +pub mod queue; +pub mod server; +pub mod watcher_proto { + tonic::include_proto!("sqlitewatcher"); +} diff --git a/sqlite-watcher/src/main.rs b/sqlite-watcher/src/main.rs new file mode 100644 index 0000000..5159550 --- /dev/null +++ b/sqlite-watcher/src/main.rs @@ -0,0 +1,209 @@ +use std::path::PathBuf; +use std::time::Duration; + +use anyhow::{bail, Context, Result}; +use clap::{Parser, Subcommand, ValueEnum}; +use sqlite_watcher::queue::{ChangeOperation, ChangeQueue, NewChange}; +use sqlite_watcher::server::{spawn_tcp, spawn_unix}; +use tokio::signal; + +#[derive(Parser)] +#[command(name = "sqlite-watcher")] +#[command(about = "sqlite watcher utility (alpha)")] +struct Cli { + #[command(subcommand)] + command: Command, +} + +#[derive(Subcommand)] +enum Command { + /// Run the gRPC server exposing the change queue + Serve { + /// SQLite queue database path + #[arg(long = "queue-db")] + queue_db: Option, + /// gRPC listener (unix:/path or tcp:host:port) + #[arg(long = "listen", default_value = "unix:/tmp/sqlite-watcher.sock")] + listen: String, + /// Shared-secret token file (defaults to ~/.seren/sqlite-watcher/token) + #[arg(long = "token-file")] + token_file: Option, + }, + /// Enqueue a test change into the queue database + Enqueue { + #[arg(long = "queue-db")] + queue_db: Option, + #[arg(long = "table", default_value = "demo")] + table: String, + #[arg(long = "id", default_value = "smoke-test")] + id: String, + #[arg(long = "payload", default_value = r#"{""message"":""hello""}"#)] + payload: String, + #[arg(long = "op", value_enum, default_value = "insert")] + op: ChangeOp, + }, +} + +#[derive(Clone, Copy, ValueEnum)] +enum ChangeOp { + Insert, + Update, + Delete, +} + +impl From for ChangeOperation { + fn from(value: ChangeOp) -> Self { + match value { + ChangeOp::Insert => ChangeOperation::Insert, + ChangeOp::Update => ChangeOperation::Update, + ChangeOp::Delete => ChangeOperation::Delete, + } + } +} + +#[tokio::main] +async fn main() -> Result<()> { + let cli = Cli::parse(); + match cli.command { + Command::Serve { + queue_db, + listen, + token_file, + } => serve(queue_db, &listen, token_file).await, + Command::Enqueue { + queue_db, + table, + id, + payload, + op, + } => enqueue(queue_db, &table, &id, &payload, op.into()), + } +} + +async fn serve(queue_db: Option, listen: &str, token_file: Option) -> Result<()> { + let queue_path = resolve_queue_path(queue_db)?; + let token_path = resolve_token_path(token_file)?; + let token = std::fs::read_to_string(&token_path) + .with_context(|| format!("failed to read token file {}", token_path.display()))?; + let queue = ChangeQueue::open(&queue_path)?; + let endpoint = WatcherEndpoint::parse(listen)?; + println!( + "sqlite-watcher serving {listen} using queue {}", + queue.path().display() + ); + let handle = match endpoint { + WatcherEndpoint::Tcp { host, port } => { + let addr = format!("{}:{}", host, port) + .parse() + .context("invalid tcp address")?; + spawn_tcp(addr, queue.path().to_path_buf(), token)? + } + WatcherEndpoint::Unix(path) => spawn_unix(&path, queue.path().to_path_buf(), token)?, + WatcherEndpoint::Pipe(name) => bail!("named pipes are not yet supported ({name})"), + }; + println!("Press Ctrl+C to stop sqlite-watcher"); + let ctrl_c = signal::ctrl_c(); + tokio::pin!(ctrl_c); + let _ = tokio::time::timeout(Duration::MAX, &mut ctrl_c).await; + drop(handle); + Ok(()) +} + +fn enqueue( + queue_db: Option, + table: &str, + id: &str, + payload: &str, + op: ChangeOperation, +) -> Result<()> { + let queue_path = resolve_queue_path(queue_db)?; + let queue = ChangeQueue::open(&queue_path)?; + let bytes = payload.as_bytes().to_vec(); + queue.enqueue(&NewChange { + table_name: table.to_string(), + operation: op, + primary_key: id.to_string(), + payload: Some(bytes), + wal_frame: None, + cursor: None, + })?; + println!( + "Enqueued row id '{}' for table '{}' into {}", + id, + table, + queue.path().display() + ); + Ok(()) +} + +fn resolve_queue_path(path: Option) -> Result { + match path { + Some(p) => Ok(expand_path(p)?), + None => { + let mut default = dirs::home_dir() + .ok_or_else(|| anyhow::anyhow!("Unable to resolve home directory"))?; + default.push(".seren/sqlite-watcher/changes.db"); + Ok(default) + } + } +} + +fn resolve_token_path(path: Option) -> Result { + match path { + Some(p) => Ok(expand_path(p)?), + None => { + let mut default = dirs::home_dir() + .ok_or_else(|| anyhow::anyhow!("Unable to resolve home directory"))?; + default.push(".seren/sqlite-watcher/token"); + Ok(default) + } + } +} + +fn expand_path(p: PathBuf) -> Result { + let s = p.to_string_lossy(); + if let Some(rest) = s.strip_prefix("~/") { + let mut home = + dirs::home_dir().ok_or_else(|| anyhow::anyhow!("Unable to resolve home directory"))?; + home.push(rest); + Ok(home) + } else { + Ok(p) + } +} + +enum WatcherEndpoint { + Tcp { host: String, port: u16 }, + Unix(PathBuf), + Pipe(String), +} + +impl WatcherEndpoint { + fn parse(value: &str) -> Result { + if let Some(rest) = value.strip_prefix("unix:") { + if rest.is_empty() { + bail!("unix endpoint requires a path"); + } + return Ok(WatcherEndpoint::Unix(PathBuf::from(rest))); + } + if let Some(rest) = value.strip_prefix("tcp:") { + let mut parts = rest.split(':'); + let host = parts + .next() + .ok_or_else(|| anyhow::anyhow!("tcp endpoint missing host"))?; + let port = parts + .next() + .ok_or_else(|| anyhow::anyhow!("tcp endpoint missing port"))? + .parse::() + .context("invalid tcp port")?; + return Ok(WatcherEndpoint::Tcp { + host: host.to_string(), + port, + }); + } + if let Some(rest) = value.strip_prefix("pipe:") { + return Ok(WatcherEndpoint::Pipe(rest.to_string())); + } + bail!("unsupported listener endpoint: {value}"); + } +} diff --git a/sqlite-watcher/src/queue.rs b/sqlite-watcher/src/queue.rs new file mode 100644 index 0000000..e0b8305 --- /dev/null +++ b/sqlite-watcher/src/queue.rs @@ -0,0 +1,224 @@ +use std::fs; +use std::path::{Path, PathBuf}; + +use anyhow::{anyhow, Context, Result}; +use rusqlite::{params, Connection, OptionalExtension, Row}; + +const SCHEMA: &str = r#" +CREATE TABLE IF NOT EXISTS changes ( + change_id INTEGER PRIMARY KEY AUTOINCREMENT, + table_name TEXT NOT NULL, + op TEXT NOT NULL, + id TEXT NOT NULL, + payload BLOB, + wal_frame TEXT, + cursor TEXT, + created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + acked INTEGER NOT NULL DEFAULT 0 +); + +CREATE TABLE IF NOT EXISTS state ( + table_name TEXT PRIMARY KEY, + last_change_id INTEGER NOT NULL DEFAULT 0, + last_wal_frame TEXT, + cursor TEXT, + updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP +); +"#; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ChangeOperation { + Insert, + Update, + Delete, +} + +impl ChangeOperation { + pub fn as_str(&self) -> &'static str { + match self { + ChangeOperation::Insert => "insert", + ChangeOperation::Update => "update", + ChangeOperation::Delete => "delete", + } + } + + pub fn from_str(value: &str) -> Result { + match value { + "insert" => Ok(ChangeOperation::Insert), + "update" => Ok(ChangeOperation::Update), + "delete" => Ok(ChangeOperation::Delete), + other => Err(anyhow!("unknown change operation '{other}'")), + } + } +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct NewChange { + pub table_name: String, + pub operation: ChangeOperation, + pub primary_key: String, + pub payload: Option>, + pub wal_frame: Option, + pub cursor: Option, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct ChangeRecord { + pub change_id: i64, + pub table_name: String, + pub operation: ChangeOperation, + pub primary_key: String, + pub payload: Option>, + pub wal_frame: Option, + pub cursor: Option, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct QueueState { + pub table_name: String, + pub last_change_id: i64, + pub last_wal_frame: Option, + pub cursor: Option, +} + +pub struct ChangeQueue { + path: PathBuf, + conn: Connection, +} + +impl ChangeQueue { + pub fn open(path: impl AsRef) -> Result { + let path = path.as_ref(); + if let Some(parent) = path.parent() { + fs::create_dir_all(parent).with_context(|| { + format!("failed to create queue directory {}", parent.display()) + })?; + #[cfg(unix)] + enforce_dir_perms(parent)?; + } + let conn = Connection::open(path) + .with_context(|| format!("failed to open queue database {}", path.display()))?; + conn.pragma_update(None, "journal_mode", &"wal").ok(); + conn.pragma_update(None, "synchronous", &"normal").ok(); + conn.execute_batch(SCHEMA) + .context("failed to initialize change queue schema")?; + Ok(Self { + path: path.to_path_buf(), + conn, + }) + } + + pub fn enqueue(&self, change: &NewChange) -> Result { + self.conn.execute( + "INSERT INTO changes(table_name, op, id, payload, wal_frame, cursor) + VALUES (?1, ?2, ?3, ?4, ?5, ?6)", + params![ + change.table_name, + change.operation.as_str(), + change.primary_key, + change.payload, + change.wal_frame, + change.cursor, + ], + )?; + Ok(self.conn.last_insert_rowid()) + } + + pub fn fetch_batch(&self, limit: usize) -> Result> { + let mut stmt = self.conn.prepare( + "SELECT change_id, table_name, op, id, payload, wal_frame, cursor + FROM changes WHERE acked = 0 ORDER BY change_id ASC LIMIT ?1", + )?; + let mut rows = stmt.query([limit as i64])?; + let mut results = Vec::new(); + while let Some(row) = rows.next()? { + results.push(row_to_change(&row)?); + } + Ok(results) + } + + pub fn ack_up_to(&self, change_id: i64) -> Result { + let updated = self.conn.execute( + "UPDATE changes SET acked = 1 WHERE change_id <= ?1", + [change_id], + )?; + Ok(updated as u64) + } + + pub fn purge_acked(&self) -> Result { + let deleted = self + .conn + .execute("DELETE FROM changes WHERE acked = 1", [])?; + Ok(deleted as u64) + } + + pub fn get_state(&self, table: &str) -> Result> { + self.conn + .prepare( + "SELECT table_name, last_change_id, last_wal_frame, cursor + FROM state WHERE table_name = ?1", + )? + .query_row([table], |row| { + Ok(QueueState { + table_name: row.get(0)?, + last_change_id: row.get(1)?, + last_wal_frame: row.get(2)?, + cursor: row.get(3)?, + }) + }) + .optional() + .map_err(Into::into) + } + + pub fn set_state(&self, state: &QueueState) -> Result<()> { + self.conn.execute( + "INSERT INTO state(table_name, last_change_id, last_wal_frame, cursor, updated_at) + VALUES (?1, ?2, ?3, ?4, CURRENT_TIMESTAMP) + ON CONFLICT(table_name) DO UPDATE SET + last_change_id = excluded.last_change_id, + last_wal_frame = excluded.last_wal_frame, + cursor = excluded.cursor, + updated_at = CURRENT_TIMESTAMP", + params![ + state.table_name, + state.last_change_id, + state.last_wal_frame, + state.cursor, + ], + )?; + Ok(()) + } + + pub fn path(&self) -> &Path { + &self.path + } +} + +fn row_to_change(row: &Row<'_>) -> Result { + let op_str: String = row.get(2)?; + Ok(ChangeRecord { + change_id: row.get(0)?, + table_name: row.get(1)?, + operation: ChangeOperation::from_str(&op_str)?, + primary_key: row.get(3)?, + payload: row.get(4)?, + wal_frame: row.get(5)?, + cursor: row.get(6)?, + }) +} + +#[cfg(unix)] +fn enforce_dir_perms(path: &Path) -> Result<()> { + use std::os::unix::fs::PermissionsExt; + + let metadata = fs::metadata(path)?; + let mut perms = metadata.permissions(); + perms.set_mode(0o700); + fs::set_permissions(path, perms)?; + Ok(()) +} + +#[cfg(not(unix))] +fn enforce_dir_perms(_path: &Path) -> Result<()> { + Ok(()) +} diff --git a/sqlite-watcher/src/server.rs b/sqlite-watcher/src/server.rs new file mode 100644 index 0000000..782c5bf --- /dev/null +++ b/sqlite-watcher/src/server.rs @@ -0,0 +1,283 @@ +use std::net::SocketAddr; +use std::path::{Path, PathBuf}; +use std::sync::Arc; +use std::thread::{self, JoinHandle}; + +use anyhow::{Context, Result}; +use tokio::runtime::Builder; +use tokio::sync::oneshot; +use tokio_stream::wrappers::TcpListenerStream; +use tonic::service::Interceptor; +use tonic::transport::Server; +use tonic::{Request, Response, Status}; + +#[cfg(unix)] +use tokio::net::UnixListener; +#[cfg(unix)] +use tokio_stream::wrappers::UnixListenerStream; + +use crate::queue::{ChangeQueue, QueueState}; +use crate::watcher_proto::watcher_server::{Watcher, WatcherServer}; +use crate::watcher_proto::{ + AckChangesRequest, AckChangesResponse, Change, GetStateRequest, GetStateResponse, + HealthCheckRequest, HealthCheckResponse, ListChangesRequest, ListChangesResponse, + SetStateRequest, SetStateResponse, +}; + +pub enum ServerHandle { + Tcp { + shutdown: Option>, + thread: Option>>, + }, + #[cfg(unix)] + Unix { + shutdown: Option>, + thread: Option>>, + path: PathBuf, + }, +} + +impl Drop for ServerHandle { + fn drop(&mut self) { + match self { + ServerHandle::Tcp { shutdown, thread } => { + if let Some(tx) = shutdown.take() { + let _ = tx.send(()); + } + if let Some(handle) = thread.take() { + let _ = handle.join(); + } + } + #[cfg(unix)] + ServerHandle::Unix { + shutdown, + thread, + path, + } => { + if let Some(tx) = shutdown.take() { + let _ = tx.send(()); + } + if let Some(handle) = thread.take() { + let _ = handle.join(); + } + let _ = std::fs::remove_file(path); + } + } + } +} + +pub fn spawn_tcp(addr: SocketAddr, queue_path: PathBuf, token: String) -> Result { + let (shutdown_tx, shutdown_rx) = oneshot::channel(); + let thread = thread::spawn(move || -> Result<()> { + let runtime = Builder::new_multi_thread().enable_all().build()?; + runtime.block_on(async move { + let listener = tokio::net::TcpListener::bind(addr) + .await + .context("failed to bind tcp listener")?; + let queue_path = Arc::new(queue_path); + let svc = WatcherService::new(queue_path); + let interceptor = AuthInterceptor::new(token); + Server::builder() + .add_service(WatcherServer::with_interceptor(svc, interceptor)) + .serve_with_incoming_shutdown(TcpListenerStream::new(listener), async move { + let _ = shutdown_rx.await; + }) + .await + .context("grpc server exited") + }) + }); + + Ok(ServerHandle::Tcp { + shutdown: Some(shutdown_tx), + thread: Some(thread), + }) +} + +#[cfg(unix)] +pub fn spawn_unix(path: &Path, queue_path: PathBuf, token: String) -> Result { + if path.exists() { + std::fs::remove_file(path) + .with_context(|| format!("failed to remove stale unix socket {}", path.display()))?; + } + if let Some(parent) = path.parent() { + std::fs::create_dir_all(parent).with_context(|| { + format!( + "failed to create unix socket directory {}", + parent.display() + ) + })?; + } + let path_buf = path.to_path_buf(); + let listener_path = path_buf.clone(); + let (shutdown_tx, shutdown_rx) = oneshot::channel(); + let thread = thread::spawn(move || -> Result<()> { + let runtime = Builder::new_multi_thread().enable_all().build()?; + runtime.block_on(async move { + let listener = + UnixListener::bind(&listener_path).context("failed to bind unix socket")?; + let queue_path = Arc::new(queue_path); + let svc = WatcherService::new(queue_path); + let interceptor = AuthInterceptor::new(token); + Server::builder() + .add_service(WatcherServer::with_interceptor(svc, interceptor)) + .serve_with_incoming_shutdown(UnixListenerStream::new(listener), async move { + let _ = shutdown_rx.await; + }) + .await + .context("grpc server exited") + }) + }); + + Ok(ServerHandle::Unix { + shutdown: Some(shutdown_tx), + thread: Some(thread), + path: path_buf, + }) +} + +#[derive(Clone)] +struct WatcherService { + queue_path: Arc, +} + +impl WatcherService { + fn new(queue_path: Arc) -> Self { + Self { queue_path } + } + + fn queue(&self) -> Result { + ChangeQueue::open(&*self.queue_path) + } +} + +#[derive(Clone)] +struct AuthInterceptor { + token: Arc, +} + +impl AuthInterceptor { + fn new(token: String) -> Self { + Self { + token: Arc::new(token), + } + } +} + +impl Interceptor for AuthInterceptor { + fn call(&mut self, req: Request<()>) -> Result, Status> { + let provided = req + .metadata() + .get("authorization") + .ok_or_else(|| Status::unauthenticated("missing authorization header"))?; + let expected = format!("Bearer {}", self.token.as_str()); + if provided == expected.as_str() { + Ok(req) + } else { + Err(Status::unauthenticated("invalid authorization header")) + } + } +} + +#[tonic::async_trait] +impl Watcher for WatcherService { + async fn health_check( + &self, + _: Request, + ) -> Result, Status> { + Ok(Response::new(HealthCheckResponse { + status: "ok".to_string(), + })) + } + + async fn list_changes( + &self, + request: Request, + ) -> Result, Status> { + let limit = request.get_ref().limit.max(1).min(10_000) as usize; + let queue = self.queue().map_err(internal_err)?; + let rows = queue.fetch_batch(limit).map_err(internal_err)?; + let changes = rows.into_iter().map(change_to_proto).collect(); + Ok(Response::new(ListChangesResponse { changes })) + } + + async fn ack_changes( + &self, + request: Request, + ) -> Result, Status> { + let upto = request.get_ref().up_to_change_id; + let queue = self.queue().map_err(internal_err)?; + let count = queue.ack_up_to(upto).map_err(internal_err)?; + queue.purge_acked().ok(); + Ok(Response::new(AckChangesResponse { + acknowledged: count, + })) + } + + async fn get_state( + &self, + request: Request, + ) -> Result, Status> { + let queue = self.queue().map_err(internal_err)?; + let state = queue + .get_state(&request.get_ref().table_name) + .map_err(internal_err)?; + let resp = match state { + Some(state) => GetStateResponse { + exists: true, + last_change_id: state.last_change_id, + last_wal_frame: state.last_wal_frame.unwrap_or_default(), + cursor: state.cursor.unwrap_or_default(), + }, + None => GetStateResponse { + exists: false, + last_change_id: 0, + last_wal_frame: String::new(), + cursor: String::new(), + }, + }; + Ok(Response::new(resp)) + } + + async fn set_state( + &self, + request: Request, + ) -> Result, Status> { + let payload = request.into_inner(); + if payload.table_name.is_empty() { + return Err(Status::invalid_argument("table_name is required")); + } + let queue = self.queue().map_err(internal_err)?; + let state = QueueState { + table_name: payload.table_name, + last_change_id: payload.last_change_id, + last_wal_frame: if payload.last_wal_frame.is_empty() { + None + } else { + Some(payload.last_wal_frame) + }, + cursor: if payload.cursor.is_empty() { + None + } else { + Some(payload.cursor) + }, + }; + queue.set_state(&state).map_err(internal_err)?; + Ok(Response::new(SetStateResponse {})) + } +} + +fn change_to_proto(row: crate::queue::ChangeRecord) -> Change { + Change { + change_id: row.change_id, + table_name: row.table_name, + op: row.operation.as_str().to_string(), + primary_key: row.primary_key, + payload: row.payload.unwrap_or_default(), + wal_frame: row.wal_frame.unwrap_or_default(), + cursor: row.cursor.unwrap_or_default(), + } +} + +fn internal_err(err: anyhow::Error) -> Status { + Status::internal(err.to_string()) +} From a572d367724bb5655f37df31935eac4e7298efe6 Mon Sep 17 00:00:00 2001 From: Taariq Lewis <701864+taariq@users.noreply.github.com> Date: Fri, 12 Dec 2025 14:38:49 -0800 Subject: [PATCH 3/5] chore(docs): move sqlite docs under sqlite-watcher-docs --- CHANGELOG.md | 6 +++--- README.md | 4 ++-- README-SQLite.md => sqlite-watcher-docs/README-SQLite.md | 2 +- {docs => sqlite-watcher-docs}/installers.md | 0 sqlite-watcher/README.md | 2 +- 5 files changed, 7 insertions(+), 7 deletions(-) rename README-SQLite.md => sqlite-watcher-docs/README-SQLite.md (99%) rename {docs => sqlite-watcher-docs}/installers.md (100%) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5069762..126eeaa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -97,7 +97,7 @@ All notable changes to this project will be documented in this file. ### Changed -- **README-SQLite.md**: Updated all examples to include `-y` flag and added notes explaining that interactive mode only works with PostgreSQL sources. +- **sqlite-watcher-docs/README-SQLite.md**: Updated all examples to include `-y` flag and added notes explaining that interactive mode only works with PostgreSQL sources. ## [7.0.4] - 2025-12-09 @@ -408,7 +408,7 @@ All notable changes to this project will be documented in this file. - **File-based migration** (local execution only, no remote support) - **Path validation** with directory traversal prevention - **Comprehensive security testing**: 14 SQLite-specific tests -- **Documentation**: [README-SQLite.md](README-SQLite.md) with usage examples +- **Documentation**: [sqlite-watcher-docs/README-SQLite.md](sqlite-watcher-docs/README-SQLite.md) with usage examples - **Integration tests**: Full workflow testing with real SQLite files #### MongoDB Support (Phase 2) @@ -491,7 +491,7 @@ All notable changes to this project will be documented in this file. - **[README.md](README.md)** - Universal landing page with multi-database support - **[README-PostgreSQL.md](README-PostgreSQL.md)** - Comprehensive PostgreSQL replication guide (1,000+ lines) -- **[README-SQLite.md](README-SQLite.md)** - Complete SQLite migration guide +- **[sqlite-watcher-docs/README-SQLite.md](sqlite-watcher-docs/README-SQLite.md)** - Complete SQLite migration guide - **[README-MongoDB.md](README-MongoDB.md)** - Complete MongoDB migration guide with periodic refresh - **[README-MySQL.md](README-MySQL.md)** - Complete MySQL/MariaDB migration guide - **[docs/plans/multi-database-support.md](docs/plans/multi-database-support.md)** - Implementation plan and architecture diff --git a/README.md b/README.md index f1848db..b0677de 100644 --- a/README.md +++ b/README.md @@ -139,7 +139,7 @@ database-replicator init \ --target "postgresql://user:pass@host:5432/db" ``` -**[📖 Full SQLite Guide →](README-SQLite.md)** +**[📖 Full SQLite Guide →](sqlite-watcher-docs/README-SQLite.md)** --- @@ -302,7 +302,7 @@ docker run --rm -it \ ### Database-Specific Guides - **[PostgreSQL to PostgreSQL](README-PostgreSQL.md)** - Zero-downtime replication with logical replication -- **[SQLite to PostgreSQL](README-SQLite.md)** - One-time replication using JSONB storage +- **[SQLite to PostgreSQL](sqlite-watcher-docs/README-SQLite.md)** - One-time replication using JSONB storage - **[MongoDB to PostgreSQL](README-MongoDB.md)** - One-time replication with periodic refresh support - **[MySQL/MariaDB to PostgreSQL](README-MySQL.md)** - One-time replication with periodic refresh support diff --git a/README-SQLite.md b/sqlite-watcher-docs/README-SQLite.md similarity index 99% rename from README-SQLite.md rename to sqlite-watcher-docs/README-SQLite.md index 5f63c46..ea0f6d8 100644 --- a/README-SQLite.md +++ b/sqlite-watcher-docs/README-SQLite.md @@ -600,7 +600,7 @@ For issues or questions: Once you have completed the initial snapshot (`database-replicator init --source sqlite ...`), you can switch to incremental change capture: -1. Install `sqlite-watcher` (see [docs/installers.md](docs/installers.md) for Linux systemd units, macOS launchd plists, and Windows service guidance). +1. Install `sqlite-watcher` (see [sqlite-watcher-docs/installers.md](installers.md) for Linux systemd units, macOS launchd plists, and Windows service guidance). 2. Start the watcher beside your `.sqlite` file (example for Linux/macOS): ```bash diff --git a/docs/installers.md b/sqlite-watcher-docs/installers.md similarity index 100% rename from docs/installers.md rename to sqlite-watcher-docs/installers.md diff --git a/sqlite-watcher/README.md b/sqlite-watcher/README.md index 0916d27..480acf0 100644 --- a/sqlite-watcher/README.md +++ b/sqlite-watcher/README.md @@ -7,4 +7,4 @@ This crate currently ships the shared queue + gRPC server used by `database-repl > **Note:** WAL tailing is still under active development; use the binary today to test queue + sync flows. -See `docs/installers.md` for per-OS service guidance and `scripts/test-sqlite-delta.sh` for the end-to-end smoke test. +See `sqlite-watcher-docs/installers.md` for per-OS service guidance and `scripts/test-sqlite-delta.sh` for the end-to-end smoke test. From 32c2275fb08d47ab9d7077e98612d383ef6acad6 Mon Sep 17 00:00:00 2001 From: Taariq Lewis <701864+taariq@users.noreply.github.com> Date: Fri, 12 Dec 2025 14:56:58 -0800 Subject: [PATCH 4/5] fix(ci): replace unmaintained deps for security audit --- Cargo.lock | 25 ++++++++++++++-------- Cargo.toml | 9 +++++++- third-party/fxhash/Cargo.toml | 10 +++++++++ third-party/fxhash/src/lib.rs | 40 +++++++++++++++++++++++++++++++++++ 4 files changed, 74 insertions(+), 10 deletions(-) create mode 100644 third-party/fxhash/Cargo.toml create mode 100644 third-party/fxhash/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index 87e12bb..3571e25 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -240,7 +240,7 @@ dependencies = [ "proc-macro2", "quote", "regex", - "rustc-hash", + "rustc-hash 2.1.1", "shlex", "syn 2.0.108", ] @@ -780,10 +780,13 @@ dependencies = [ "serde", "serde_json", "sha2", + "sqlite-watcher", "tempfile", "tokio", "tokio-postgres", "toml", + "tonic", + "tower", "tracing", "tracing-subscriber", "url", @@ -951,7 +954,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" dependencies = [ "libc", - "windows-sys 0.61.2", + "windows-sys 0.59.0", ] [[package]] @@ -1200,10 +1203,8 @@ dependencies = [ [[package]] name = "fxhash" version = "0.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c31b6d751ae2c7f11320402d34e41349dd1016f8d5d45e48c4312bc8625af50c" dependencies = [ - "byteorder", + "rustc-hash 1.1.0", ] [[package]] @@ -2187,7 +2188,7 @@ version = "0.50.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5" dependencies = [ - "windows-sys 0.61.2", + "windows-sys 0.59.0", ] [[package]] @@ -2862,6 +2863,12 @@ dependencies = [ "serde_json", ] +[[package]] +name = "rustc-hash" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" + [[package]] name = "rustc-hash" version = "2.1.1" @@ -2910,7 +2917,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys 0.11.0", - "windows-sys 0.61.2", + "windows-sys 0.59.0", ] [[package]] @@ -3399,7 +3406,7 @@ dependencies = [ "getrandom 0.3.4", "once_cell", "rustix 1.1.2", - "windows-sys 0.61.2", + "windows-sys 0.59.0", ] [[package]] @@ -4167,7 +4174,7 @@ version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" dependencies = [ - "windows-sys 0.61.2", + "windows-sys 0.59.0", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index ab7988c..fe1a445 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -36,7 +36,8 @@ indicatif = "0.18" which = "6.0" home = ">=0.5.4, <0.5.12" # Pin to avoid v0.5.12 which requires unstable edition2024 rand = "0.8" -reqwest = { version = "0.11", features = ["json"] } +# Disable rustls to avoid pulling rustls-pemfile (unmaintained) +reqwest = { version = "0.11", default-features = false, features = ["json", "native-tls"] } serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" sha2 = "0.10" @@ -51,6 +52,12 @@ url = "2.5" chrono = { version = "0.4", default-features = false, features = ["clock", "serde"] } libc = "0.2" rust_decimal = { version = "1.39", features = ["db-tokio-postgres"] } +tonic = { version = "0.11", features = ["transport"] } +tower = "0.4" +sqlite-watcher = { path = "sqlite-watcher" } + +[patch.crates-io] +fxhash = { path = "third-party/fxhash" } [target.'cfg(unix)'.dependencies] daemonize = "0.5" diff --git a/third-party/fxhash/Cargo.toml b/third-party/fxhash/Cargo.toml new file mode 100644 index 0000000..60212b9 --- /dev/null +++ b/third-party/fxhash/Cargo.toml @@ -0,0 +1,10 @@ +[package] +name = "fxhash" +version = "0.2.1" +edition = "2018" +description = "Local replacement for fxhash using rustc-hash" +license = "CC0-1.0" +publish = false + +[dependencies] +rustc-hash = "1.1" diff --git a/third-party/fxhash/src/lib.rs b/third-party/fxhash/src/lib.rs new file mode 100644 index 0000000..cab4b9d --- /dev/null +++ b/third-party/fxhash/src/lib.rs @@ -0,0 +1,40 @@ +use std::collections::{HashMap, HashSet}; +use std::fmt; +use std::hash::{BuildHasherDefault, Hasher}; + +use rustc_hash::FxHasher as InnerFxHasher; + +pub struct FxHasher(InnerFxHasher); + +impl Default for FxHasher { + fn default() -> Self { + Self(InnerFxHasher::default()) + } +} + +impl Clone for FxHasher { + fn clone(&self) -> Self { + // FxHasher does not implement Clone; start a new hasher + Self::default() + } +} + +impl fmt::Debug for FxHasher { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("FxHasher").finish() + } +} + +impl Hasher for FxHasher { + fn finish(&self) -> u64 { + self.0.finish() + } + + fn write(&mut self, bytes: &[u8]) { + self.0.write(bytes) + } +} + +pub type FxBuildHasher = BuildHasherDefault; +pub type FxHashMap = HashMap; +pub type FxHashSet = HashSet; From d12d0c45f1942176145098b0147e740187d49394 Mon Sep 17 00:00:00 2001 From: Taariq Lewis <701864+taariq@users.noreply.github.com> Date: Fri, 12 Dec 2025 16:58:47 -0800 Subject: [PATCH 5/5] Fix sqlite watcher clippy warnings --- PR_BODY.md | 8 ++++++++ sqlite-watcher/src/queue.rs | 13 +++++++++---- sqlite-watcher/src/server.rs | 2 +- src/commands/sync_sqlite.rs | 29 +++++++++++++++++------------ 4 files changed, 35 insertions(+), 17 deletions(-) create mode 100644 PR_BODY.md diff --git a/PR_BODY.md b/PR_BODY.md new file mode 100644 index 0000000..cc21e29 --- /dev/null +++ b/PR_BODY.md @@ -0,0 +1,8 @@ +## Summary +- fix sqlite sync change-state handling by treating watcher wal_frame/cursor fields as optional strings and cleaning up unused code +- implement FromStr for sqlite ChangeOperation and resolve needless borrow lints in queue/server modules +- keep clippy happy by applying the suggested clamp change and ensuring proto tests build + +## Testing +- cargo clippy +- cargo test diff --git a/sqlite-watcher/src/queue.rs b/sqlite-watcher/src/queue.rs index e0b8305..2cab073 100644 --- a/sqlite-watcher/src/queue.rs +++ b/sqlite-watcher/src/queue.rs @@ -1,5 +1,6 @@ use std::fs; use std::path::{Path, PathBuf}; +use std::str::FromStr; use anyhow::{anyhow, Context, Result}; use rusqlite::{params, Connection, OptionalExtension, Row}; @@ -41,8 +42,12 @@ impl ChangeOperation { ChangeOperation::Delete => "delete", } } +} + +impl FromStr for ChangeOperation { + type Err = anyhow::Error; - pub fn from_str(value: &str) -> Result { + fn from_str(value: &str) -> Result { match value { "insert" => Ok(ChangeOperation::Insert), "update" => Ok(ChangeOperation::Update), @@ -98,8 +103,8 @@ impl ChangeQueue { } let conn = Connection::open(path) .with_context(|| format!("failed to open queue database {}", path.display()))?; - conn.pragma_update(None, "journal_mode", &"wal").ok(); - conn.pragma_update(None, "synchronous", &"normal").ok(); + conn.pragma_update(None, "journal_mode", "wal").ok(); + conn.pragma_update(None, "synchronous", "normal").ok(); conn.execute_batch(SCHEMA) .context("failed to initialize change queue schema")?; Ok(Self { @@ -132,7 +137,7 @@ impl ChangeQueue { let mut rows = stmt.query([limit as i64])?; let mut results = Vec::new(); while let Some(row) = rows.next()? { - results.push(row_to_change(&row)?); + results.push(row_to_change(row)?); } Ok(results) } diff --git a/sqlite-watcher/src/server.rs b/sqlite-watcher/src/server.rs index 782c5bf..0948b51 100644 --- a/sqlite-watcher/src/server.rs +++ b/sqlite-watcher/src/server.rs @@ -193,7 +193,7 @@ impl Watcher for WatcherService { &self, request: Request, ) -> Result, Status> { - let limit = request.get_ref().limit.max(1).min(10_000) as usize; + let limit = request.get_ref().limit.clamp(1, 10_000) as usize; let queue = self.queue().map_err(internal_err)?; let rows = queue.fetch_batch(limit).map_err(internal_err)?; let changes = rows.into_iter().map(change_to_proto).collect(); diff --git a/src/commands/sync_sqlite.rs b/src/commands/sync_sqlite.rs index ca320c2..3e97836 100644 --- a/src/commands/sync_sqlite.rs +++ b/src/commands/sync_sqlite.rs @@ -1,13 +1,11 @@ -use std::collections::HashMap; -use std::path::{Path, PathBuf}; -use std::time::Duration; - use anyhow::{anyhow, bail, Context, Result}; use clap::ValueEnum; use sqlite_watcher::watcher_proto::watcher_client::WatcherClient; use sqlite_watcher::watcher_proto::{ AckChangesRequest, GetStateRequest, HealthCheckRequest, ListChangesRequest, SetStateRequest, }; +use std::collections::HashMap; +use std::path::{Path, PathBuf}; use tokio_postgres::Client; use tonic::codegen::InterceptedService; use tonic::service::Interceptor; @@ -17,7 +15,6 @@ use tower::service_fn; use crate::jsonb::writer::{delete_jsonb_rows, insert_jsonb_batch, upsert_jsonb_rows}; -const DEFAULT_BATCH_LIMIT: u32 = 500; const GLOBAL_STATE_KEY: &str = "_global"; #[derive(Debug, Clone, Copy, ValueEnum, PartialEq, Eq)] @@ -63,7 +60,7 @@ pub async fn run(opts: SyncSqliteOptions) -> Result<()> { let mut processed_any = false; loop { - let mut req = Request::new(ListChangesRequest { + let req = Request::new(ListChangesRequest { limit: opts.batch_size.max(1), }); let changes = watcher @@ -160,8 +157,8 @@ async fn apply_changes( change.table_name.clone(), TableState { last_change_id: change.change_id, - wal_frame: change.wal_frame.clone(), - cursor: change.cursor.clone(), + wal_frame: non_empty_string(&change.wal_frame), + cursor: non_empty_string(&change.cursor), }, ); } @@ -235,6 +232,14 @@ fn mode_string(mode: IncrementalMode) -> &'static str { } } +fn non_empty_string(value: &str) -> Option { + if value.is_empty() { + None + } else { + Some(value.to_owned()) + } +} + fn load_token(path: Option<&Path>) -> Result { let token_path = path .map(|p| p.to_path_buf()) @@ -399,8 +404,8 @@ mod tests { op: "insert".into(), primary_key: "1".into(), payload: serde_json::to_vec(&serde_json::json!({"a":1})).unwrap(), - wal_frame: None, - cursor: None, + wal_frame: String::new(), + cursor: String::new(), }, Change { change_id: 2, @@ -408,8 +413,8 @@ mod tests { op: "delete".into(), primary_key: "2".into(), payload: Vec::new(), - wal_frame: None, - cursor: None, + wal_frame: String::new(), + cursor: String::new(), }, ]; let mut per_table: HashMap = HashMap::new();