-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathanyqueue.go
More file actions
74 lines (66 loc) · 3.74 KB
/
Copy pathanyqueue.go
File metadata and controls
74 lines (66 loc) · 3.74 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
/*
Package anyqueue implements a generic transactional queue-draining engine.
The engine discovers a set of topics (independent logical queues, each with its
own destination), and for each topic it continuously pulls batches of pending
items out of a persistent store, fans them out across a pool of workers, hands
each item to a caller-supplied Handler, and records the result back into the
store — all within a single unit of work per batch.
The engine itself is storage- and transport-agnostic: it knows nothing about
SQL, HTTP, or any concrete item type. Consumers wire it up by implementing
Store, Batch, Topic, Metrics and a Handler. This keeps the engine reusable
across different queue backends and delivery targets (and extractable into a
standalone repository), while backend-specific persistence and delivery logic
live in the consuming package.
The engine is generic over three consumer-defined types:
- TopicType — the logical queue (implements Topic)
- ItemType — a unit of work handed to the Handler
- ResultType — the Handler's per-item output, persisted by Batch.Record
*/
package anyqueue
import (
"context"
"log/slog"
"time"
)
// Handler processes a single item and reports a result. ResultType is opaque to
// the engine — it is shuttled verbatim to Batch.Record so consumers can persist
// whatever delivery detail they need. A non-nil error marks the item as failed;
// on failure ResultType is typically its zero value.
type Handler[ItemType, ResultType any] func(ctx context.Context, logger *slog.Logger, item ItemType) (ResultType, error)
// Store is the persistence boundary the engine depends on. Consumers implement
// it over their own queue tables.
type Store[TopicType Topic, ItemType, ResultType any] interface {
// ListTopics returns the currently active topics. Called periodically so the
// engine can start/stop per-topic goroutines as topics appear and disappear.
ListTopics(ctx context.Context) ([]TopicType, error)
// BeginBatch opens a unit of work for the topic, locks and loads the next
// batch of pending items, and returns a Batch handle. An empty batch
// (no items) signals there is no work; the engine will Rollback and sleep.
BeginBatch(ctx context.Context, topic TopicType) (Batch[ItemType, ResultType], error)
}
// Batch is a unit of work over a single locked batch of items. The engine owns
// its lifecycle: it reads Items, records each item's outcome, then either
// Commits (success) or Rollbacks (abort). Implementations own the underlying
// transaction so the engine stays storage-agnostic.
type Batch[ItemType, ResultType any] interface {
// Items returns the items in this batch. The same slice is dispatched to
// workers; the engine clears its own references as it dispatches.
Items() []ItemType
// Record persists the outcome for one item within the unit of work. result
// is the Handler's return value; err is its error return (nil on success).
// Interpreting success and shaping the persisted detail is the consumer's job.
Record(ctx context.Context, item ItemType, result ResultType, err error) error
// Commit finalizes the unit of work after all items were recorded.
Commit(ctx context.Context) error
// Rollback discards the unit of work (empty batch or aborted dispatch).
Rollback(ctx context.Context) error
}
// Metrics receives engine timing instrumentation. Kept transport-agnostic (no
// metric tag types) so the engine carries no observability vendor coupling;
// consumers adapt it to their metrics backend and own the metric names.
type Metrics interface {
// BatchProcessed records the wall-clock time taken to process one batch.
BatchProcessed(start time.Time)
// ItemProcessed records the time taken to process one item and whether it succeeded.
ItemProcessed(start time.Time, success bool)
}