Skip to content

zannis/shove

Repository files navigation

shove

ci Latest Version Docs License:MIT Coverage

Type-safe async pub/sub for Rust. One API across RabbitMQ, AWS SNS+SQS, NATS JetStream, Apache Kafka, Redis/Valkey Streams, and an in-process backend.

Guides, examples, and the full walkthrough live at shove.rs. Rustdoc on docs.rs/shove.

What you get

  • Define a topic once, use it everywhere. Queue names, DLQs, and retries all derive from a single Rust type.
  • Retries and DLQs included. Escalating backoff, dead-letter routing, retry budgets, handler timeouts — no glue code.
  • Strict per-key ordering when you need it, with pluggable failure policies.
  • Autoscaling consumer groups driven by queue depth or consumer lag.
  • Switch backends without changing your code. Same topic, same handler, six transports.
  • Pluggable message codecs. JSON by default; Protobuf, raw bytes, or your own.

30-second tour

In-process, no Docker, no credentials:

use serde::{Deserialize, Serialize};
use shove::inmemory::{InMemoryConfig, InMemoryConsumerGroupConfig};
use shove::{
    Broker, ConsumerGroupConfig, InMemory, MessageHandler, MessageMetadata, Outcome,
    TopologyBuilder, define_topic,
};
use std::time::Duration;

#[derive(Debug, Clone, Serialize, Deserialize)]
struct OrderPaid { order_id: String }

define_topic!(Orders, OrderPaid,
    TopologyBuilder::new("orders")
        .hold_queue(Duration::from_secs(5))  // retry with backoff
        .dlq()                               // dead-letter on permanent failure
        .build());

struct Handler;
impl MessageHandler<Orders> for Handler {
    type Context = ();
    async fn handle(&self, msg: OrderPaid, _: MessageMetadata, _: &()) -> Outcome {
        println!("paid: {}", msg.order_id);
        Outcome::Ack
    }
}

#[tokio::main]
async fn main() -> Result<(), shove::ShoveError> {
    use futures::FutureExt as _;

    let broker = Broker::<InMemory>::new(InMemoryConfig::default()).await?;
    broker.topology().declare::<Orders>().await?;

    let publisher = broker.publisher().await?;
    publisher.publish::<Orders>(&OrderPaid { order_id: "ORD-1".into() }).await?;

    let mut group = broker.consumer_group();
    group
        .register::<Orders, _>(
            ConsumerGroupConfig::new(InMemoryConsumerGroupConfig::new(1..=1)),
            || Handler,
        )
        .await?;

    let outcome = group
        .run_until_timeout(tokio::signal::ctrl_c().map(drop), Duration::from_secs(5))
        .await;
    std::process::exit(outcome.exit_code());
}

Swap InMemory for RabbitMq, Sqs, Nats, Kafka, or Redis and the topic and handler stay identical. Per-backend setup: Getting Started.

Backends

Backend Feature flag Marker
RabbitMQ rabbitmq RabbitMq
AWS SNS+SQS aws-sns-sqs Sqs
NATS JetStream nats Nats
Apache Kafka kafka Kafka
Redis/Valkey Streams redis-streams Redis
In-process inmemory InMemory

cargo add shove --features <flag>. Need help choosing? Choosing a backend.

Optional add-ons: audit, metrics, kafka-ssl, rabbitmq-transactional, protobuf.

Delivery

At-least-once by default. Handlers return one of:

  • Ack — success
  • Retry — delayed retry through hold queues with escalating backoff
  • Reject — dead-letter immediately
  • Defer — delay without consuming a retry budget

Full semantics: Outcomes & Delivery.

Performance

MacBook Pro M4 Max, using simulated tasks averaging ~175ms. Reproducible via cargo run --release --example <backend>_stress --features <backend>.

  • 56,000 msg/s — Redis, 1,024 consumers
  • 7,996 msg/s — Redis, 64 consumers
  • 7,585 msg/s — RabbitMQ, 64 consumers
  • 2,245 msg/s — Kafka, 64 consumers

Tuning notes, NATS/SQS profiles, and the full benchmark matrix: Performance.

Learn more

Requirements

  • Rust 1.85+ (edition 2024)
  • Redis 6.2+ or Valkey (when using redis-streams)

License

MIT