A generic, transactional queue-draining engine for Go.
anyqueue discovers a set of topics (independent logical queues, each with its
own destination), and for each topic it continuously pulls batches of pending
items out of a persistent store, fans them out across a pool of workers,
hands each item to a caller-supplied handler, and records the result back
into the store — all within a single unit of work per batch.
The engine is storage- and transport-agnostic: it knows nothing about SQL,
HTTP, or any concrete item type. Consumers wire it up by implementing Store,
Batch, Topic, Metrics, and a Handler. Each topic runs in its own
goroutine, so a slow or backed-up topic never blocks the others.
err := anyqueue.Run[MyTopic, MyItem, MyResult](
ctx,
logger, // *slog.Logger
metrics, // anyqueue.Metrics
anyqueue.Config{},
store, // anyqueue.Store[MyTopic, MyItem, MyResult]
handler, // anyqueue.Handler[MyItem, MyResult]
)The engine is generic over three consumer-defined types:
TopicType— the logical queue (implementsTopic)ItemType— a unit of work handed to theHandlerResultType— theHandler's per-item output, persisted byBatch.Record
Run blocks until ctx is cancelled and returns nil on graceful shutdown.
The engine logs via the standard library log/slog.
Adapt your own logger by passing an *slog.Logger backed by a custom
slog.Handler.
go test -race ./... # run tests
go tool golangci-lint run # lint (config in .golangci.yml)Mocks under mock_*_test.go are generated with mockery
from .mockery.yaml:
go run github.com/vektra/mockery/v2@latest