Skip to content

Commit 1ee89f9

Browse files
committed
refactor(queue/mysql): immutable log, delivery state, heartbeat store, fair leasing
- Add delivery state store for tracking per-consumer message state separately from the immutable log - Add subscriber heartbeat store for liveness detection and fair partition leasing - Refactor message store to append-only immutable log with contiguous watermark tracking - Refactor partition lease store with heartbeat-based fair leasing - Add batch query support for offset and delivery state operations - Add GC throttling and dead code removal - Fix make fmt to skip read-only pkg/mod directory
1 parent 11fbe75 commit 1ee89f9

34 files changed

Lines changed: 4198 additions & 1204 deletions

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ e2e-test: build-all-linux ## Run end-to-end tests (hermetic, auto-builds binarie
8989

9090
fmt: ## Format Go and YAML code
9191
@echo "Formatting Go code..."
92-
@$(BAZEL) run @rules_go//go -- run golang.org/x/tools/cmd/goimports@$(GOIMPORTS_VERSION) -w .
92+
@find . -name '*.go' -not -path './pkg/*' -not -path './bazel-*' | xargs $(BAZEL) run @rules_go//go -- run golang.org/x/tools/cmd/goimports@$(GOIMPORTS_VERSION) -w
9393
@echo "Formatting YAML files..."
9494
@$(BAZEL) run @rules_go//go -- run github.com/google/yamlfmt/cmd/yamlfmt@$(YAMLFMT_VERSION)
9595
@echo "Formatting complete!"

doc/rfc/sql-queue-rfc.md

Lines changed: 229 additions & 94 deletions
Large diffs are not rendered by default.

extension/queue/mysql/BUILD.bazel

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ gomock(
88
out = "mock_stores.go",
99
mockgen_tool = _MOCKGEN,
1010
package = "mysql",
11+
self_package = "github.com/uber/submitqueue/extension/queue/mysql",
1112
source = "stores.go",
1213
source_importpath = "github.com/uber/submitqueue/extension/queue/mysql",
1314
)
@@ -17,6 +18,7 @@ go_library(
1718
name = "mysql",
1819
srcs = [
1920
"constants.go",
21+
"delivery_state_store.go",
2022
"errors.go",
2123
"message_store.go",
2224
":mock_stores_src",
@@ -26,10 +28,12 @@ go_library(
2628
"sql.go",
2729
"stores.go",
2830
"subscriber.go",
31+
"subscriber_heartbeat_store.go",
2932
],
3033
importpath = "github.com/uber/submitqueue/extension/queue/mysql",
3134
visibility = ["//visibility:public"],
3235
deps = [
36+
"//core/metrics",
3337
"//entity/queue",
3438
"//extension/queue",
3539
"@com_github_uber_go_tally_v4//:tally",
@@ -41,11 +45,13 @@ go_library(
4145
go_test(
4246
name = "mysql_test",
4347
srcs = [
48+
"delivery_state_store_test.go",
4449
"message_store_test.go",
4550
"offset_store_test.go",
4651
"partition_lease_store_test.go",
4752
"publisher_test.go",
4853
"sql_test.go",
54+
"subscriber_heartbeat_store_test.go",
4955
"subscriber_test.go",
5056
],
5157
embed = [":mysql"],

extension/queue/mysql/README.md

Lines changed: 118 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,8 @@
11
# SQL Queue Implementation
22

3-
MySQL-based distributed queue with partition leasing, visibility timeout, and at-least-once delivery.
3+
MySQL-based distributed queue with partition leasing, delivery state tracking, and at-least-once delivery.
44

5-
## Key Features
6-
7-
- **Partition leasing** — workers coordinate via database leases with automatic failover
8-
- **Per-partition workers** — each leased partition gets its own goroutine for isolation
9-
- **Visibility timeout** — messages retry automatically if worker crashes
10-
- **At-least-once delivery** — offset tracking for crash recovery
11-
- **Dead letter queue** — failed messages moved to DLQ after max retries
5+
For design rationale, guarantees, and trade-offs, see the [RFC](../../doc/rfc/sql-queue-rfc.md).
126

137
## Quick Start
148

@@ -53,10 +47,8 @@ Per-subscription configuration enables different settings for each topic:
5347
```go
5448
import extqueue "github.com/uber/submitqueue/extension/queue"
5549

56-
// Default config
5750
subConfig := extqueue.DefaultSubscriptionConfig("worker-1", "consumer-group")
5851

59-
// Customize for this subscription
6052
subConfig.PollIntervalMs = 50 // Poll frequency (milliseconds)
6153
subConfig.BatchSize = 20 // Messages per poll
6254
subConfig.VisibilityTimeoutMs = 60000 // Retry delay (milliseconds)
@@ -68,93 +60,148 @@ subConfig.Retry.MaxBackoffMs = 30000 // Max retry backoff (mill
6860
subConfig.Retry.BackoffMultiplier = 2.0 // Backoff multiplier for exponential backoff
6961
subConfig.DLQ.Enabled = true // Enable dead letter queue
7062
subConfig.DLQ.TopicSuffix = "_dlq" // DLQ topic suffix
71-
72-
// Use config when subscribing
73-
deliveryCh, _ := q.Subscriber().Subscribe(ctx, "my-topic", subConfig)
7463
```
7564

7665
**Key Configuration Fields:**
7766

78-
- `SubscriberName`: Unique worker identifier for partition leasing (e.g., hostname, pod name)
79-
- `ConsumerGroup`: Consumer group for independent offset tracking
80-
- `PollIntervalMs`: How often to poll for new messages (milliseconds)
81-
- `BatchSize`: Maximum messages to fetch per poll
82-
- `VisibilityTimeoutMs`: How long messages are invisible after being fetched (milliseconds)
83-
- `LeaseRenewalIntervalMs`: How often to renew partition leases (milliseconds)
84-
- `LeaseDurationMs`: How long leases remain valid without renewal (milliseconds)
85-
- `Retry.MaxAttempts`: Maximum processing attempts before moving to DLQ
86-
- `Retry.InitialBackoffMs`: Initial retry backoff delay (milliseconds)
87-
- `Retry.MaxBackoffMs`: Maximum retry backoff delay (milliseconds)
88-
- `Retry.BackoffMultiplier`: Multiplier for exponential backoff
89-
- `DLQ.TopicSuffix`: Suffix appended to topic name for DLQ (e.g., "orders" -> "orders_dlq")
90-
91-
## Architecture
67+
| Field | Description |
68+
|-------|-------------|
69+
| `SubscriberName` | Unique worker identifier for partition leasing (e.g., hostname, pod name) |
70+
| `ConsumerGroup` | Consumer group for independent offset tracking |
71+
| `PollIntervalMs` | How often to poll for new messages |
72+
| `BatchSize` | Maximum messages to fetch per poll. Set to `1` for strict serialization |
73+
| `VisibilityTimeoutMs` | How long messages are invisible after fetch. Must exceed max processing time for `BatchSize=1` |
74+
| `LeaseRenewalIntervalMs` | How often to renew partition leases |
75+
| `LeaseDurationMs` | How long leases remain valid without renewal |
76+
| `Retry.MaxAttempts` | Maximum processing attempts before DLQ |
77+
| `DLQ.TopicSuffix` | Suffix appended to topic name for DLQ (e.g., `"orders"``"orders_dlq"`) |
9278

93-
### Goroutine Model
79+
### Strict Serialization
80+
81+
For strict in-order processing within a partition, set `BatchSize = 1`:
9482

95-
Each subscription has a **supervisor goroutine** (`managePartitions`) that:
96-
1. Discovers partitions from the messages table
97-
2. Acquires and renews partition leases
98-
3. Reconciles **per-partition worker goroutines** based on current leases
83+
```go
84+
cfg := extqueue.DefaultSubscriptionConfig("worker-1", "ordered-consumer")
85+
cfg.BatchSize = 1 // One message in-flight at a time
86+
cfg.VisibilityTimeoutMs = 120000 // Must exceed max processing time
87+
```
9988

100-
Each partition worker goroutine polls and delivers messages for its partition independently. This provides fault isolation — a slow or blocked partition does not affect other partitions.
89+
This guarantees no concurrent processing within a partition. See the [RFC](../../doc/rfc/sql-queue-rfc.md#ordering-and-serialization) for details on ordering semantics and non-blocking nack behavior.
90+
91+
## Package Layout
92+
93+
```
94+
extension/queue/mysql/
95+
├── sql.go # NewQueue constructor, wires stores → publisher/subscriber
96+
├── stores.go # Internal store interfaces (messageStore, offsetStore, etc.)
97+
├── message_store.go # queue_messages table operations (immutable log)
98+
├── delivery_state_store.go # queue_delivery_state table operations (per-consumer-group)
99+
├── offset_store.go # queue_offsets table operations (watermark tracking)
100+
├── partition_lease_store.go # queue_partition_leases table operations
101+
├── subscriber_heartbeat_store.go # queue_subscriber_heartbeats table operations
102+
├── publisher.go # Publisher implementation
103+
├── subscriber.go # Subscriber, delivery, goroutine management
104+
├── constants.go # Log key constants
105+
├── errors.go # Error types
106+
├── schema/ # SQL schema files (one per table)
107+
│ ├── queue_messages.sql
108+
│ ├── queue_delivery_state.sql
109+
│ ├── queue_offsets.sql
110+
│ ├── queue_partition_leases.sql
111+
│ └── queue_subscriber_heartbeats.sql
112+
└── ctl/ # Admin CLI (see ctl/README.md)
113+
```
114+
115+
## Internal Architecture
116+
117+
### Database Tables
118+
119+
| Table | Purpose | Scoped To |
120+
|-------|---------|-----------|
121+
| `queue_messages` | Immutable append-only message log | `(topic, partition_key)` — shared across consumer groups |
122+
| `queue_delivery_state` | Visibility, ack state, retry count | `(consumer_group, topic, partition_key, offset)` |
123+
| `queue_offsets` | Contiguous acked watermark | `(consumer_group, topic, partition_key)` |
124+
| `queue_partition_leases` | Partition lease coordination | `(consumer_group, topic, partition_key)` |
125+
| `queue_subscriber_heartbeats` | Active subscriber tracking | `(consumer_group, topic, subscriber_name)` |
126+
127+
See `schema/` for full SQL definitions. See the [RFC](../../doc/rfc/sql-queue-rfc.md#database-schema) for field-level documentation.
128+
129+
### Store Architecture
130+
131+
Each table is backed by an internal store interface defined in `stores.go`. Stores:
132+
- Query only their own table (no cross-table JOINs)
133+
- Return errors via `fmt.Errorf` (no logging, no error classification)
134+
- Use `metrics.Begin`/`Complete` for latency and success/failure tracking
135+
136+
The subscriber layer orchestrates cross-store operations (e.g., watermark advancement queries both `messageStore` and `deliveryStateStore`) and owns all logging and error classification.
137+
138+
### Goroutine Model
139+
140+
Each subscription has a **supervisor goroutine** (`managePartitions`) that discovers partitions, acquires leases, sends heartbeats, rebalances, and reconciles per-partition worker goroutines.
101141

102142
```
103143
Subscribe()
104-
└── managePartitions (supervisor)
105-
├── partitionWorker("part-1") ← polls & delivers
106-
├── partitionWorker("part-2") ← polls & delivers
107-
└── partitionWorker("part-3") ← polls & delivers
144+
└── managePartitions (supervisor) ← tracked by sub.wg
145+
├── partitionWorker("part-1") ← tracked by sub.workerWg
146+
├── partitionWorker("part-2")
147+
└── partitionWorker("part-3")
108148
```
109149

110-
### Shutdown Sequence
150+
Each partition worker runs independently — polls the DB on a ticker, checks deliverability via `GetDeliveryState` per message, and sends deliveries to the shared channel. A slow or blocked partition does not affect other partitions.
111151

112-
Shutdown uses two `sync.WaitGroup`s to ensure correctness:
113-
- `wg` tracks the supervisor goroutine (`managePartitions`)
114-
- `workerWg` tracks all partition worker goroutines
152+
### Shutdown Sequence
115153

116154
When `Close()` is called:
117155
1. Subscription context is cancelled
118-
2. `managePartitions` calls `stopAllWorkers` — cancels each worker and waits up to 5s per worker
119-
3. Partition leases are released
120-
4. `workerWg.Wait()` blocks until all workers have fully exited
121-
5. `deliveryCh` is closed — safe because no workers can send after step 4
122-
6. `managePartitions` returns, `wg.Done()` fires
123-
7. `Close()` returns
156+
2. `managePartitions` calls `stopAllWorkers` — cancels each worker's context, waits up to 30s
157+
3. Partition leases are released (fresh context, not cancelled)
158+
4. Subscriber heartbeat is deregistered
159+
5. `workerWg.Wait()` — blocks until all workers have fully exited
160+
6. `deliveryCh` is closed — safe because no senders remain after step 5
161+
7. `managePartitions` returns → `wg.Done()``Close()` unblocks
124162

125-
The `workerWg.Wait()` step prevents a race where a slow worker (blocked on I/O past the 5s timeout) could send on a closed channel.
163+
The `workerWg.Wait()` before `close(deliveryCh)` prevents a race where a slow worker could send on a closed channel.
126164

127165
### Worker Stop Behavior
128166

129167
When a partition worker is stopped (lease lost or shutdown):
130-
- The worker is immediately removed from the workers map and its context is cancelled
131-
- The caller waits up to 5s for the worker to confirm exit (logging a warning on timeout)
132-
- `workerWg` tracks the worker regardless, so `Close()` always waits for full exit
133-
- If the worker times out, reconciliation is free to start a replacement — any brief overlap is harmless with at-least-once delivery semantics
168+
- Immediately removed from workers map and context cancelled
169+
- Caller waits up to 30s for exit confirmation (warning logged on timeout)
170+
- `workerWg` tracks the goroutine regardless `Close()` always waits for full exit
171+
- Reconciliation can start a replacement immediately — brief overlap is harmless with at-least-once semantics
134172

135-
## How It Works
173+
### Logger Hierarchy
136174

137-
**Partition Leasing:**
138-
1. Workers discover partitions from messages table
139-
2. Workers acquire leases (one worker per partition)
140-
3. Stale leases can be stolen by other workers
175+
`sql.go` creates a root `queue_mysql` logger and passes named children to each component:
141176

142-
**Message Flow:**
143-
1. Fetch visible messages (invisible_until <= now)
144-
2. Process message
145-
3. Ack: DELETE message, UPDATE offset
146-
4. Nack: Message becomes visible after timeout
147-
5. If retry_count >= MaxAttempts: Move to DLQ
177+
```
178+
queue_mysql
179+
├── .publisher
180+
├── .subscriber
181+
├── .message_store
182+
├── .delivery_state_store
183+
├── .offset_store
184+
├── .partition_lease_store
185+
└── .subscriber_heartbeat_store
186+
```
148187

149-
**Crash Recovery:**
150-
- Messages become visible after visibility timeout
151-
- Other workers steal stale leases
152-
- Resume from last acked offset
188+
Stores do not log errors — they return them. The subscriber propagates all errors to the top call site (`managePartitions` or `run`), which logs once with full context (`topic`, `consumer_group`, `subscriber_name`).
153189

154-
## Partition Ordering
190+
## Testing
155191

156-
Messages with same `PartitionKey` are processed in order by a single worker.
192+
### Unit Tests
193+
194+
```bash
195+
bazel test //extension/queue/mysql:mysql_test --test_output=streamed
196+
bazel test //extension/queue/mysql/ctl/...:all --test_output=streamed
197+
```
157198

158-
## Distributed Processing
199+
### Integration Tests
200+
201+
Requires Docker running:
202+
203+
```bash
204+
bazel test //test/integration/extension/queue/... --test_output=streamed
205+
```
159206

160-
Multiple workers in the same consumer group share partitions. Workers in different consumer groups consume independently.
207+
Integration tests cover: publish/subscribe, partition isolation, ordering, visibility timeout, nack with delay, idempotent publish, concurrent publishers, crash recovery, multiple consumer groups, rebalancing, DLQ, graceful shutdown, non-blocking nack, strict serialization (`BatchSize=1`), and independent consumer group state.

extension/queue/mysql/constants.go

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,9 @@ package mysql
1717
// Common constants for frequently repeated strings across stores
1818

1919
const (
20-
// Tag key (used in every Tagged() call)
21-
tagErrorType = "error_type"
22-
2320
// Common log field names (used extensively across all stores)
2421
logTopic = "topic"
2522
logPartitionKey = "partition_key"
2623
logMessageID = "message_id"
2724
logError = "error"
28-
29-
// Error types used across multiple methods/stores
30-
errorBeginTx = "begin_transaction"
31-
errorCommit = "commit"
3225
)

extension/queue/mysql/ctl/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ bazel run //extension/queue/mysql/ctl -- topic-stats --topic merge_queue
4343
# List all topics with message counts
4444
queue-admin list-topics
4545

46-
# Detailed stats for a topic (visible/invisible messages, DLQ count, partitions, consumer groups)
46+
# Detailed stats for a topic (total messages, DLQ count, partitions, consumer groups)
4747
queue-admin topic-stats --topic merge_queue
4848
```
4949

extension/queue/mysql/ctl/commands.go

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -151,8 +151,6 @@ func newTopicStatsCmd(store **lib.AdminStore, jsonOut *bool) *cobra.Command {
151151
rows := [][]string{
152152
{"Topic", stats.Topic},
153153
{"Total Messages", strconv.FormatInt(stats.TotalMessages, 10)},
154-
{"Visible Messages", strconv.FormatInt(stats.VisibleMessages, 10)},
155-
{"Invisible Messages", strconv.FormatInt(stats.InvisibleMessages, 10)},
156154
{"DLQ Count", strconv.FormatInt(stats.DLQCount, 10)},
157155
{"Partitions", strconv.FormatInt(stats.PartitionCount, 10)},
158156
{"Consumer Groups", strconv.FormatInt(stats.ConsumerGroupCount, 10)},
@@ -181,16 +179,15 @@ func newListMessagesCmd(store **lib.AdminStore, jsonOut *bool) *cobra.Command {
181179
if *jsonOut {
182180
return lib.FormatJSON(os.Stdout, messages)
183181
}
184-
headers := []string{"OFFSET", "ID", "PARTITION", "RETRIES", "INVISIBLE_UNTIL", "CREATED_AT"}
182+
headers := []string{"OFFSET", "ID", "PARTITION", "CREATED_AT", "PUBLISHED_AT"}
185183
var rows [][]string
186184
for _, m := range messages {
187185
rows = append(rows, []string{
188186
strconv.FormatInt(m.Offset, 10),
189187
m.ID,
190188
m.PartitionKey,
191-
strconv.Itoa(m.RetryCount),
192-
lib.FormatMillis(m.InvisibleUntil),
193189
lib.FormatMillis(m.CreatedAt),
190+
lib.FormatMillis(m.PublishedAt),
194191
})
195192
}
196193
lib.FormatTable(os.Stdout, headers, rows)
@@ -226,8 +223,6 @@ func newInspectMessageCmd(store **lib.AdminStore, jsonOut *bool) *cobra.Command
226223
{"ID", detail.ID},
227224
{"Topic", detail.Topic},
228225
{"Partition", detail.PartitionKey},
229-
{"Retry Count", strconv.Itoa(detail.RetryCount)},
230-
{"Invisible Until", lib.FormatMillis(detail.InvisibleUntil)},
231226
{"Created At", lib.FormatMillis(detail.CreatedAt)},
232227
{"Published At", lib.FormatMillis(detail.PublishedAt)},
233228
{"Payload", string(detail.Payload)},
@@ -321,14 +316,13 @@ func newListDLQCmd(store **lib.AdminStore, jsonOut *bool) *cobra.Command {
321316
if *jsonOut {
322317
return lib.FormatJSON(os.Stdout, messages)
323318
}
324-
headers := []string{"OFFSET", "ID", "PARTITION", "RETRIES", "CREATED_AT"}
319+
headers := []string{"OFFSET", "ID", "PARTITION", "CREATED_AT"}
325320
var rows [][]string
326321
for _, m := range messages {
327322
rows = append(rows, []string{
328323
strconv.FormatInt(m.Offset, 10),
329324
m.ID,
330325
m.PartitionKey,
331-
strconv.Itoa(m.RetryCount),
332326
lib.FormatMillis(m.CreatedAt),
333327
})
334328
}

0 commit comments

Comments
 (0)