From baf1bd8609977d328263c3bb852d1f8e4f725af5 Mon Sep 17 00:00:00 2001 From: Borislav Borisov Date: Wed, 18 Mar 2026 20:34:41 +0000 Subject: [PATCH 1/2] Improve docs and add SQLite batch insert example Rewrite the README with overview, quick start, how-it-works, and examples sections. Add a runnable example that batch-inserts rows into an in-memory SQLite database using rusqlite's rarray virtual table. --- Cargo.toml | 6 +- README.md | 56 +++++++++++++++- examples/sqlite_batch.rs | 137 +++++++++++++++++++++++++++++++++++++++ src/lib.rs | 2 +- 4 files changed, 195 insertions(+), 6 deletions(-) create mode 100644 examples/sqlite_batch.rs diff --git a/Cargo.toml b/Cargo.toml index d766b44..b69f89d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,9 +7,9 @@ license = "MIT" readme = "README.md" repository = "https://github.com/chmodas/tower-batch" homepage = "https://github.com/chmodas/tower-batch" -documentation = "https://docs.rs/tower-batch/0.1.0" +documentation = "https://docs.rs/tower-batch" description = """ -`batch-tower` is a Tower middleware that allows you to buffer requests for batch processing +`tower-batch` is a Tower middleware that allows you to buffer requests for batch processing until the buffer reaches a maximum size OR a maximum duration elapses. """ categories = ["asynchronous", "network-programming"] @@ -29,7 +29,6 @@ pin-project-lite = "0.2.17" tokio = { version = "1.50.0", features = ["sync", "time", "tracing", "rt"] } tokio-util = { version = "0.7.18", default-features = false } tracing = "0.1.44" -tracing-futures = "0.2.5" tower = "0.5.3" [dev-dependencies] @@ -40,4 +39,5 @@ tokio = { version = "1", features = ["macros", "sync", "test-util", "rt-multi-th tokio-test = "0.4.5" tower = { version = "0.5.3", features = ["full"] } tower-test = "0.4.0" +rusqlite = { version = "0.39.0", features = ["bundled", "array"] } diff --git a/README.md b/README.md index d5b4da0..7ad31e4 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,57 @@ # tower-batch -Writing data in bulk is a common technique for improving the efficiency of certain tasks. `batch-tower` is a [Tower] middleware that allows you to buffer requests for batch processing until the buffer reaches a maximum size OR a maximum duration elapses. +[![CI Build](https://github.com/chmodas/tower-batch/actions/workflows/ci.yml/badge.svg)](https://github.com/chmodas/tower-batch/actions/workflows/ci.yml) +[![codecov](https://codecov.io/github/chmodas/tower-batch/graph/badge.svg?token=GLA3M3GGQC)](https://codecov.io/github/chmodas/tower-batch) + +A [Tower] middleware that buffers requests and flushes them in batches. Use it when the downstream system is more efficient with bulk writes — databases, message brokers, object stores, etc. The middleware collects individual requests as `BatchControl::Item(R)` and, once the buffer reaches a maximum size **or** a maximum duration elapses, signals the inner service with `BatchControl::Flush` so it can process the accumulated batch. + +## Quick start + +Add the dependency to your `Cargo.toml`: + +```toml +[dependencies] +tower-batch = { version = "0.2.0" } +``` + +Create a batch service and start sending requests: + +```rust +use std::time::Duration; +use tower_batch::Batch; + +// `my_service` implements `Service>` +let batch = Batch::new(my_service, 100, Duration::from_millis(250)); +``` + +If you prefer the [Tower layer] pattern: + +```rust +use tower_batch::BatchLayer; + +let layer = BatchLayer::new(100, Duration::from_millis(250)); +``` + +## How it works + +Your inner service must implement `Service>` where `R` is the request type. The middleware sends two kinds of calls: + +- **`BatchControl::Item(request)`** – buffer this request. Typically, you just push it onto a `Vec` and return `Ok(())`. +- **`BatchControl::Flush`** – process everything you have buffered, then return the result. + +`Batch::new` spawns a background worker that owns the inner service. It forwards each incoming request as an `Item`, and triggers a `Flush` when the batch is full or the timer fires. `Batch` handles are cheap to clone – each clone shares the same worker, so you can hand them to multiple tasks. + +## Examples + +See the [`examples/`](examples/) directory: + +- **[`sqlite_batch`](examples/sqlite_batch.rs)** – batch-insert rows into an in-memory SQLite database using the rarray virtual table. + +Run an example with: + +```sh +cargo run --example sqlite_batch +``` ## License @@ -8,6 +59,7 @@ This project is licensed under the [MIT](LICENSE) license. ### Contribution -Unless you explicitly state otherwise, any contribution intentionally submitted for inclusion in `batch-tower` by you, shall be licensed as MIT, without any additional terms or conditions. +Unless you explicitly state otherwise, any contribution intentionally submitted for inclusion in `tower-batch` by you, shall be licensed as MIT, without any additional terms or conditions. [Tower]: https://docs.rs/tower +[Tower layer]: https://docs.rs/tower/latest/tower/trait.Layer.html diff --git a/examples/sqlite_batch.rs b/examples/sqlite_batch.rs new file mode 100644 index 0000000..f633989 --- /dev/null +++ b/examples/sqlite_batch.rs @@ -0,0 +1,137 @@ +//! Batch-insert rows into an in-memory SQLite database using the rarray virtual table. +//! +//! Run with: `cargo run --example sqlite_batch` + +use std::future::Future; +use std::mem; +use std::pin::Pin; +use std::rc::Rc; +use std::sync::{Arc, Mutex}; +use std::task::{Context, Poll}; +use std::time::Duration; + +use rusqlite::vtab::array::load_module; +use rusqlite::Connection; +use tower::Service; +use tower_batch::{Batch, BatchControl, BoxError}; + +struct InsertRow { + name: String, + value: i64, +} + +struct SqliteBatchService { + conn: Arc>, + pending: Vec, +} + +impl Service> for SqliteBatchService { + type Response = (); + type Error = BoxError; + type Future = Pin> + Send>>; + + fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, req: BatchControl) -> Self::Future { + match req { + BatchControl::Item(row) => { + self.pending.push(row); + Box::pin(std::future::ready(Ok(()))) + } + BatchControl::Flush => { + let rows = mem::take(&mut self.pending); + let conn = Arc::clone(&self.conn); + + Box::pin(async move { + tokio::task::spawn_blocking(move || { + let conn = conn + .lock() + .map_err(|e| -> BoxError { e.to_string().into() })?; + let tx = conn.unchecked_transaction()?; + + let names: Vec = rows + .iter() + .map(|r| rusqlite::types::Value::Text(r.name.clone())) + .collect(); + let values: Vec = rows + .iter() + .map(|r| rusqlite::types::Value::Integer(r.value)) + .collect(); + + // Rc is required here: rusqlite's rarray only implements ToSql for Rc>. + // This is safe because the Rc never leaves the spawn_blocking closure. + let names = Rc::new(names); + let values = Rc::new(values); + + conn.execute( + "INSERT INTO data(name, value) \ + SELECT n.value, v.value \ + FROM rarray(?1) AS n \ + JOIN rarray(?2) AS v ON n.rowid = v.rowid", + rusqlite::params![names, values], + )?; + + tx.commit()?; + Ok::<(), BoxError>(()) + }) + .await? + }) + } + } + } +} + +#[tokio::main] +async fn main() -> Result<(), BoxError> { + let conn = Connection::open_in_memory()?; + conn.execute_batch("CREATE TABLE data (name TEXT NOT NULL, value INTEGER NOT NULL)")?; + load_module(&conn)?; + + let conn = Arc::new(Mutex::new(conn)); + + let service = SqliteBatchService { + conn: Arc::clone(&conn), + pending: Vec::new(), + }; + + let batch = Batch::new(service, 50, Duration::from_millis(100)); + + let mut handles = Vec::new(); + for task_id in 0..4 { + let mut batch = batch.clone(); + handles.push(tokio::spawn(async move { + for i in 0..50 { + tower::ServiceExt::ready(&mut batch).await.unwrap(); + batch + .call(InsertRow { + name: format!("task{task_id}_row{i}"), + value: (task_id * 50 + i) as i64, + }) + .await + .unwrap(); + } + })); + } + + for handle in handles { + handle.await?; + } + + // Drop the last Batch handle so the worker knows no more requests are coming, + // then give it time to flush. In production you may want a more robust shutdown + // mechanism. + drop(batch); + tokio::time::sleep(Duration::from_millis(200)).await; + + let count: i64 = conn + .lock() + .map_err(|e| -> BoxError { e.to_string().into() })? + .query_row("SELECT COUNT(*) FROM data", [], |row| row.get(0))?; + + println!("Inserted {count} rows (expected 200)"); + assert_eq!(count, 200); + + Ok(()) +} diff --git a/src/lib.rs b/src/lib.rs index 0311ebb..91410aa 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -3,7 +3,7 @@ //! A Tower middleware that provides a buffered mpsc for processing requests in batches. //! //! Writing data in bulk is a common technique for improving the efficiency of certain tasks. -//! `batch-tower` is a middleware that allows you to buffer requests for batch processing until +//! `tower-batch` is a middleware that allows you to buffer requests for batch processing until //! the buffer reaches a maximum size OR a maximum duration elapses. //! //! Clients enqueue requests by sending on the channel from any of the handles ([`Batch`]), and the From ead9c1bedd8ce0d77dc457294712b8a4bc9b7a4f Mon Sep 17 00:00:00 2001 From: Borislav Borisov Date: Wed, 18 Mar 2026 20:40:14 +0000 Subject: [PATCH 2/2] Relax dependency versions Use major.minor version specifiers instead of exact patch versions to give downstream consumers more flexibility. --- Cargo.toml | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index b69f89d..a36e3b6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,20 +24,20 @@ exclude = [ [dependencies] -futures-core = "0.3.32" -pin-project-lite = "0.2.17" -tokio = { version = "1.50.0", features = ["sync", "time", "tracing", "rt"] } -tokio-util = { version = "0.7.18", default-features = false } -tracing = "0.1.44" -tower = "0.5.3" +futures-core = "0.3" +pin-project-lite = "0.2" +tokio = { version = "1", features = ["sync", "time", "rt"] } +tokio-util = { version = "0.7", default-features = false } +tracing = "0.1" +tower = "0.5" [dev-dependencies] -tracing-subscriber = { version = "0.3.23", features = ["registry", "env-filter"] } -futures = "0.3.32" +tracing-subscriber = { version = "0.3", features = ["registry", "env-filter"] } +futures = "0.3" tokio = { version = "1", features = ["macros", "sync", "test-util", "rt-multi-thread"] } -tokio-test = "0.4.5" -tower = { version = "0.5.3", features = ["full"] } -tower-test = "0.4.0" -rusqlite = { version = "0.39.0", features = ["bundled", "array"] } +tokio-test = "0.4" +tower = { version = "0.5", features = ["full"] } +tower-test = "0.4" +rusqlite = { version = "0.39", features = ["bundled", "array"] }