Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 14 additions & 14 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ license = "MIT"
readme = "README.md"
repository = "https://github.com/chmodas/tower-batch"
homepage = "https://github.com/chmodas/tower-batch"
documentation = "https://docs.rs/tower-batch/0.1.0"
documentation = "https://docs.rs/tower-batch"
description = """
`batch-tower` is a Tower middleware that allows you to buffer requests for batch processing
`tower-batch` is a Tower middleware that allows you to buffer requests for batch processing
until the buffer reaches a maximum size OR a maximum duration elapses.
"""
categories = ["asynchronous", "network-programming"]
Expand All @@ -24,20 +24,20 @@ exclude = [

[dependencies]

futures-core = "0.3.32"
pin-project-lite = "0.2.17"
tokio = { version = "1.50.0", features = ["sync", "time", "tracing", "rt"] }
tokio-util = { version = "0.7.18", default-features = false }
tracing = "0.1.44"
tracing-futures = "0.2.5"
tower = "0.5.3"
futures-core = "0.3"
pin-project-lite = "0.2"
tokio = { version = "1", features = ["sync", "time", "rt"] }
tokio-util = { version = "0.7", default-features = false }
tracing = "0.1"
tower = "0.5"

[dev-dependencies]

tracing-subscriber = { version = "0.3.23", features = ["registry", "env-filter"] }
futures = "0.3.32"
tracing-subscriber = { version = "0.3", features = ["registry", "env-filter"] }
futures = "0.3"
tokio = { version = "1", features = ["macros", "sync", "test-util", "rt-multi-thread"] }
tokio-test = "0.4.5"
tower = { version = "0.5.3", features = ["full"] }
tower-test = "0.4.0"
tokio-test = "0.4"
tower = { version = "0.5", features = ["full"] }
tower-test = "0.4"
rusqlite = { version = "0.39", features = ["bundled", "array"] }

56 changes: 54 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,13 +1,65 @@
# tower-batch

Writing data in bulk is a common technique for improving the efficiency of certain tasks. `batch-tower` is a [Tower] middleware that allows you to buffer requests for batch processing until the buffer reaches a maximum size OR a maximum duration elapses.
[![CI Build](https://github.com/chmodas/tower-batch/actions/workflows/ci.yml/badge.svg)](https://github.com/chmodas/tower-batch/actions/workflows/ci.yml)
[![codecov](https://codecov.io/github/chmodas/tower-batch/graph/badge.svg?token=GLA3M3GGQC)](https://codecov.io/github/chmodas/tower-batch)

A [Tower] middleware that buffers requests and flushes them in batches. Use it when the downstream system is more efficient with bulk writes — databases, message brokers, object stores, etc. The middleware collects individual requests as `BatchControl::Item(R)` and, once the buffer reaches a maximum size **or** a maximum duration elapses, signals the inner service with `BatchControl::Flush` so it can process the accumulated batch.

## Quick start

Add the dependency to your `Cargo.toml`:

```toml
[dependencies]
tower-batch = { version = "0.2.0" }
```

Create a batch service and start sending requests:

```rust
use std::time::Duration;
use tower_batch::Batch;

// `my_service` implements `Service<BatchControl<MyRequest>>`
let batch = Batch::new(my_service, 100, Duration::from_millis(250));
```

If you prefer the [Tower layer] pattern:

```rust
use tower_batch::BatchLayer;

let layer = BatchLayer::new(100, Duration::from_millis(250));
```

## How it works

Your inner service must implement `Service<BatchControl<R>>` where `R` is the request type. The middleware sends two kinds of calls:

- **`BatchControl::Item(request)`** – buffer this request. Typically, you just push it onto a `Vec` and return `Ok(())`.
- **`BatchControl::Flush`** – process everything you have buffered, then return the result.

`Batch::new` spawns a background worker that owns the inner service. It forwards each incoming request as an `Item`, and triggers a `Flush` when the batch is full or the timer fires. `Batch` handles are cheap to clone – each clone shares the same worker, so you can hand them to multiple tasks.

## Examples

See the [`examples/`](examples/) directory:

- **[`sqlite_batch`](examples/sqlite_batch.rs)** – batch-insert rows into an in-memory SQLite database using the rarray virtual table.

Run an example with:

```sh
cargo run --example sqlite_batch
```

## License

This project is licensed under the [MIT](LICENSE) license.

### Contribution

Unless you explicitly state otherwise, any contribution intentionally submitted for inclusion in `batch-tower` by you, shall be licensed as MIT, without any additional terms or conditions.
Unless you explicitly state otherwise, any contribution intentionally submitted for inclusion in `tower-batch` by you, shall be licensed as MIT, without any additional terms or conditions.

[Tower]: https://docs.rs/tower
[Tower layer]: https://docs.rs/tower/latest/tower/trait.Layer.html
137 changes: 137 additions & 0 deletions examples/sqlite_batch.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
//! Batch-insert rows into an in-memory SQLite database using the rarray virtual table.
//!
//! Run with: `cargo run --example sqlite_batch`

use std::future::Future;
use std::mem;
use std::pin::Pin;
use std::rc::Rc;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};
use std::time::Duration;

use rusqlite::vtab::array::load_module;
use rusqlite::Connection;
use tower::Service;
use tower_batch::{Batch, BatchControl, BoxError};

struct InsertRow {
name: String,
value: i64,
}

struct SqliteBatchService {
conn: Arc<Mutex<Connection>>,
pending: Vec<InsertRow>,
}

impl Service<BatchControl<InsertRow>> for SqliteBatchService {
type Response = ();
type Error = BoxError;
type Future = Pin<Box<dyn Future<Output = Result<(), BoxError>> + Send>>;

fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}

fn call(&mut self, req: BatchControl<InsertRow>) -> Self::Future {
match req {
BatchControl::Item(row) => {
self.pending.push(row);
Box::pin(std::future::ready(Ok(())))
}
BatchControl::Flush => {
let rows = mem::take(&mut self.pending);
let conn = Arc::clone(&self.conn);

Box::pin(async move {
tokio::task::spawn_blocking(move || {
let conn = conn
.lock()
.map_err(|e| -> BoxError { e.to_string().into() })?;
let tx = conn.unchecked_transaction()?;

let names: Vec<rusqlite::types::Value> = rows
.iter()
.map(|r| rusqlite::types::Value::Text(r.name.clone()))
.collect();
let values: Vec<rusqlite::types::Value> = rows
.iter()
.map(|r| rusqlite::types::Value::Integer(r.value))
.collect();

// Rc is required here: rusqlite's rarray only implements ToSql for Rc<Vec<Value>>.
// This is safe because the Rc never leaves the spawn_blocking closure.
let names = Rc::new(names);
let values = Rc::new(values);

conn.execute(
"INSERT INTO data(name, value) \
SELECT n.value, v.value \
FROM rarray(?1) AS n \
JOIN rarray(?2) AS v ON n.rowid = v.rowid",
rusqlite::params![names, values],
)?;

tx.commit()?;
Ok::<(), BoxError>(())
})
.await?
})
}
}
}
}

#[tokio::main]
async fn main() -> Result<(), BoxError> {
let conn = Connection::open_in_memory()?;
conn.execute_batch("CREATE TABLE data (name TEXT NOT NULL, value INTEGER NOT NULL)")?;
load_module(&conn)?;

let conn = Arc::new(Mutex::new(conn));

let service = SqliteBatchService {
conn: Arc::clone(&conn),
pending: Vec::new(),
};

let batch = Batch::new(service, 50, Duration::from_millis(100));

let mut handles = Vec::new();
for task_id in 0..4 {
let mut batch = batch.clone();
handles.push(tokio::spawn(async move {
for i in 0..50 {
tower::ServiceExt::ready(&mut batch).await.unwrap();
batch
.call(InsertRow {
name: format!("task{task_id}_row{i}"),
value: (task_id * 50 + i) as i64,
})
.await
.unwrap();
}
}));
}

for handle in handles {
handle.await?;
}

// Drop the last Batch handle so the worker knows no more requests are coming,
// then give it time to flush. In production you may want a more robust shutdown
// mechanism.
drop(batch);
tokio::time::sleep(Duration::from_millis(200)).await;

let count: i64 = conn
.lock()
.map_err(|e| -> BoxError { e.to_string().into() })?
.query_row("SELECT COUNT(*) FROM data", [], |row| row.get(0))?;

println!("Inserted {count} rows (expected 200)");
assert_eq!(count, 200);

Ok(())
}
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
//! A Tower middleware that provides a buffered mpsc for processing requests in batches.
//!
//! Writing data in bulk is a common technique for improving the efficiency of certain tasks.
//! `batch-tower` is a middleware that allows you to buffer requests for batch processing until
//! `tower-batch` is a middleware that allows you to buffer requests for batch processing until
//! the buffer reaches a maximum size OR a maximum duration elapses.
//!
//! Clients enqueue requests by sending on the channel from any of the handles ([`Batch`]), and the
Expand Down
Loading