A lightweight, reliable Transactional Outbox + Inbox library for Clojure services backed by PostgreSQL.
It ensures effectively-once processing in microservices by solving the "Dual Write Problem" and handling idempotent consumption.
| Component | Guarantee | Description |
|---|---|---|
| Outbox | At-Least-Once | Events are guaranteed to be published. In rare cases (e.g., network failure after publish but before DB update), duplicates may be sent. |
| Inbox | Effectively-Once | Side effects are executed exactly once per unique message ID, guarded by unique constraints and leases. |
- Outbox: Persist events in the same transaction as your business logic.
- Dispatcher: Reliable delivery with
SKIP LOCKEDconcurrency, retries, exponential backoff, and dead-letter handling. - Inbox: Idempotent consumption using
UNIQUEconstraints and a Lease/Takeover mechanism for crash recovery. - Protocol-Driven: Core logic is abstracted via Protocols (
OutboxStore,InboxStore), allowing for future backend replacements (e.g., Redis, MySQL, Kafka) without changing business code. - Observability: Structured logging for every dispatch and deduplication event.
- Java 11+
- Clojure 1.11+
- PostgreSQL 12+ (Docker recommended)
Start a PostgreSQL instance using Docker:
docker-compose up -dCreate the database and apply the required tables:
# 1. Create the 'idem_test' database
python3 dev/init_db.py
# 2. Apply Schema (using docker exec)
docker-compose exec -T postgres psql -U postgres -d idem_test < resources/migrations/001_create_outbox.sql
docker-compose exec -T postgres psql -U postgres -d idem_test < resources/migrations/002_create_inbox.sqlRun the integrated demo which simulates a Producer, a Relay, and a Consumer (with duplicate delivery):
clojure -M:demoExpected Output: You should see logs indicating:
- An event being emitted ("Producer: Emitting event...").
- The Dispatcher claiming and sending the event.
- The Consumer processing the first attempt (Side Effect executed).
- The Consumer receiving a duplicate (Attempt 2) and skipping it ("Duplicate detected, skipped execution").
IDEM is designed with a Protocol-First architecture. While it ships with a production-ready PostgreSQL implementation, the public APIs are decoupled from the storage layer.
Add the library to your deps.edn:
{:deps {com.github.discretewater/idem {:mvn/version "0.1.2"}}}Or project.clj (Leiningen):
[com.github.discretewater/idem "0.1.2"]In your application code, verify you are inside a transaction, then call emit!. This guarantees that the event is only persisted if the transaction commits.
(require '[idem.outbox :as outbox]
'[next.jdbc :as jdbc])
(defn create-order! [ds order-data]
(jdbc/with-transaction [tx ds]
;; 1. Business Logic: Write to domain tables
(sql/insert! tx :orders order-data)
;; 2. Outbox: Emit event in the SAME transaction
;; Uses the default PostgreSQL store implementation
(outbox/emit! tx {:aggregate-type "order"
:aggregate-id (:id order-data)
:event-type "order.created"
:payload order-data
:headers {:trace-id "abc-123"}})))The Dispatcher runs in the background. It polls the outbox table and pushes events to your message bus (Kafka, RabbitMQ, HTTP, etc.).
You must implement the Publisher protocol or provide a simple function adapter.
(require '[idem.dispatcher :as dispatcher]
'[idem.protocol :as protocol])
;; Option A: Simple Function Adapter (for simple use cases)
(def my-publisher
(protocol/fn->publisher
(fn [event]
(println "Publishing to Kafka:" (:event_id event)))))
;; Option B: Full Protocol (for connection management)
(defrecord KafkaPublisher [producer]
protocol/Publisher
(start! [this] (connect-kafka! ...))
(stop! [this] (close-kafka! ...))
(publish! [this event] (kafka-send! producer event)))
;; Start the Dispatcher (uses default Postgres store)
(def stop-dispatcher!
(dispatcher/start! ds my-publisher {:poll-interval-ms 1000
:batch-size 50
:max-attempts 10}))
;; Stop it when app shuts down
(stop-dispatcher!)Wrap your message handling logic with with-idempotency. This ensures that even if the message bus delivers the same message twice, your handler runs only once.
(require '[idem.inbox :as inbox])
(defn handle-message! [ds message]
(let [consumer-group "order-service-group"
message-id (:id message)]
;; 'ds' is passed to the default PostgresInboxStore.
;; To use Redis, you would swap the implementation here.
(inbox/with-idempotency ds consumer-group message-id {:ttl-ms 300000}
(fn []
;; Your idempotent business logic here
(println "Processing order:" (:payload message))))))Passed to dispatcher/start!.
| Parameter | Default | Description |
|---|---|---|
:poll-interval-ms |
1000 |
How often to poll DB for pending events (ms). |
:batch-size |
50 |
Max events processed per poll cycle. |
:max-attempts |
10 |
Max retries before marking as dead. |
:initial-backoff-ms |
1000 |
Base delay for the first retry. |
:backoff-multiplier |
2 |
Exponential factor. |
Backoff Formula: $$ delay_n = \min(max, initial \times multiplier^{(n-1)}) + jitter $$ (Jitter is a random 0-10% addition to prevent thundering herds)
Passed to inbox/with-idempotency.
| Parameter | Default | Description |
|---|---|---|
:ttl-ms |
300000 (5 min) |
Lease duration. If a consumer crashes while processing, another instance can takeover after this time. |
Any exception thrown during publisher/publish! is considered transient (e.g., network glitch).
- The event status becomes
failed. next_attempt_atis calculated via exponential backoff.last_erroris updated in the database for visibility.
When attempts >= max-attempts, the event is considered permanently failed (e.g., invalid payload, schema mismatch).
- The event status becomes
dead. - It will stop retrying automatically.
dead_attimestamp is recorded.
How to debug dead events: Query the table to inspect the payload and error:
SELECT event_id, attempts, last_error, payload
FROM idem_outbox_events
WHERE status = 'dead';Action: After fixing the bug (or data), you can manually reset status='pending', attempts=0 to retry.
The tables (idem_outbox_events, idem_inbox_messages) will grow indefinitely. It is recommended to implement a scheduled job (e.g., cron) to clean up old records.
Recommended Policy:
- Keep
pending/failedrecords indefinitely (until resolved). - Keep
sent/processedrecords for a safe window (e.g., 7-30 days) for auditing/debugging.
Example Cleanup SQL:
-- Clean Outbox
DELETE FROM idem_outbox_events
WHERE status IN ('sent', 'dead')
AND created_at < NOW() - INTERVAL '30 days';
-- Clean Inbox
DELETE FROM idem_inbox_messages
WHERE status = 'processed'
AND created_at < NOW() - INTERVAL '30 days';Indexing Note:
The default migrations include indices on status and created_at (composite), which ensures these delete operations remain efficient even as table size grows. It is recommended to run VACUUM periodically on PostgreSQL.
IDEM uses a Store Protocol pattern (idem.protocol) to separate logic from storage.
- Default:
idem.impl.postgres(included) - Uses PostgreSQL for Outbox (Transactional) and Inbox (Unique Constraints). - Custom: You can implement
OutboxStoreorInboxStoreprotocols to support other backends (e.g., Redis for high-throughput Inbox, or MySQL).
The public functions (outbox/emit!, inbox/with-idempotency) act as facades that delegate to these protocols, ensuring your business code remains unchanged even if you switch backends.
To run the integration tests:
clojure -M:testMIT