Skip to content

Commit fec5cdf

Browse files
committed
feat(queue/sql): add factory, E2E tests, and documentation
Integrates publisher and subscriber with Queue factory interface. - Factory creates publisher and subscriber singletons - Lifecycle management with Close() that cleans up both - Comprehensive README with usage examples and architecture docs - End-to-end tests covering publish/subscribe, partitioning, retries, DLQ - Test coverage for message age tracking and metadata persistence Test Plan: - E2E single message publish and consume tested - Multiple message batch processing verified - Partition ordering confirmed - Retry and DLQ flow validated - Concurrent worker coordination tested - Message age and metadata persistence verified
1 parent 4a17f4e commit fec5cdf

4 files changed

Lines changed: 987 additions & 0 deletions

File tree

extensions/queue/sql/BUILD.bazel

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ go_library(
55
srcs = [
66
"config.go",
77
"constants.go",
8+
"factory.go",
89
"message_store.go",
910
"offset_store.go",
1011
"partition_lease_store.go",
@@ -45,8 +46,36 @@ go_test(
4546
deps = [
4647
"//entities/queue",
4748
"//extensions/queue",
49+
"@com_github_dolthub_go_mysql_server//:go-mysql-server",
50+
"@com_github_dolthub_go_mysql_server//memory",
51+
"@com_github_dolthub_go_mysql_server//server",
52+
"@com_github_dolthub_go_mysql_server//sql",
53+
"@com_github_dolthub_vitess//go/mysql",
54+
"@com_github_go_sql_driver_mysql//:mysql",
55+
"@com_github_stretchr_testify//assert",
56+
"@com_github_stretchr_testify//require",
57+
"@com_github_uber_go_tally_v4//:tally",
58+
"@org_uber_go_zap//:zap",
59+
],
60+
)
61+
62+
go_test(
63+
name = "e2e_test",
64+
srcs = ["e2e_test.go"],
65+
data = ["//extensions/queue/sql/schema"],
66+
embed = [":sql"],
67+
deps = [
68+
"//entities/queue",
69+
"//extensions/queue",
70+
"@com_github_dolthub_go_mysql_server//:go-mysql-server",
71+
"@com_github_dolthub_go_mysql_server//memory",
72+
"@com_github_dolthub_go_mysql_server//server",
73+
"@com_github_dolthub_go_mysql_server//sql",
74+
"@com_github_dolthub_vitess//go/mysql",
4875
"@com_github_go_sql_driver_mysql//:mysql",
4976
"@com_github_stretchr_testify//assert",
5077
"@com_github_stretchr_testify//require",
78+
"@com_github_uber_go_tally_v4//:tally",
79+
"@org_uber_go_zap//:zap",
5180
],
5281
)

extensions/queue/sql/README.md

Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
# SQL Queue Implementation
2+
3+
MySQL-based distributed queue with partition leasing, visibility timeout, and at-least-once delivery.
4+
5+
## Key Features
6+
7+
- **Single-table design** - All topics in unified tables
8+
- **Partition leasing** - Workers coordinate via database-native leases with automatic failover
9+
- **Visibility timeout** - Messages retry automatically if worker crashes or hangs
10+
- **Persistent retry tracking** - `retry_count` survives crashes, prevents infinite retries
11+
- **At-least-once delivery** - Crash recovery with offset tracking
12+
- **Dead letter queue** - Failed messages moved to DLQ after max retries
13+
14+
## Quick Start
15+
16+
```go
17+
import (
18+
"database/sql"
19+
_ "github.com/go-sql-driver/mysql"
20+
"github.com/uber/submitqueue/extensions/queue/sql"
21+
"github.com/uber/submitqueue/entities/queue"
22+
)
23+
24+
// Setup
25+
db, _ := sql.Open("mysql", "user:pass@tcp(localhost:3306)/db")
26+
q, _ := sql.NewFactory(sql.Params{
27+
DB: db,
28+
Logger: logger,
29+
MetricsScope: metricsScope,
30+
Config: sql.DefaultConfig("orchestrator", "worker-1"),
31+
})
32+
defer q.Close()
33+
34+
// Publish
35+
msg := queue.NewMessage("msg-id", []byte(`{"data": "value"}`))
36+
msg.PartitionKey = "repo-123" // Required
37+
q.Publisher().Publish(ctx, "merge_events", msg)
38+
39+
// Subscribe
40+
deliveryCh, _ := q.Subscriber().Subscribe(ctx, "merge_events")
41+
for delivery := range deliveryCh {
42+
if err := process(delivery.Message()); err != nil {
43+
delivery.Nack(ctx, 0) // Retry immediately (0 milliseconds)
44+
continue
45+
}
46+
delivery.Ack(ctx)
47+
}
48+
```
49+
50+
## Configuration
51+
52+
```go
53+
config := sql.DefaultConfig("consumer-group", "worker-id")
54+
config.PollInterval = 50 * time.Millisecond // Poll frequency
55+
config.BatchSize = 20 // Messages per poll
56+
config.VisibilityTimeout = 60 * time.Second // Retry delay
57+
config.LeaseRenewalInterval = 10 * time.Second // Lease renewal frequency
58+
config.LeaseDuration = 30 * time.Second // Lease validity
59+
config.Retry.MaxAttempts = 3 // Max retries before DLQ
60+
```
61+
62+
## Architecture
63+
64+
### Tables (Single-Table Design)
65+
66+
See full schema: [schema/queue/mysql/schema.sql](../../../schema/queue/mysql/schema.sql)
67+
68+
- **queue_messages** - All messages (topic, partition_key, retry_count, invisible_until)
69+
- **queue_partition_leases** - Worker coordination (consumer_group, topic, partition_key, leased_by)
70+
- **queue_offsets** - Consumption progress (consumer_group, topic, partition_key, offset_acked)
71+
- **queue_dlq** - Failed messages (topic, partition_key, failure_count, last_error)
72+
73+
### How It Works
74+
75+
**Partition Leasing:**
76+
1. Workers discover partitions from messages table
77+
2. Workers acquire leases (one worker per partition)
78+
3. Workers renew leases periodically
79+
4. Stale leases can be stolen by other workers
80+
81+
**Message Flow:**
82+
1. Fetch visible messages (invisible_until <= now)
83+
2. Atomically UPDATE invisible_until and INCREMENT retry_count
84+
3. Process message
85+
4. Ack: DELETE message, UPDATE offset_acked
86+
5. Nack: Do nothing, message becomes visible after timeout
87+
6. If retry_count >= MaxAttempts: Move to DLQ
88+
89+
**Crash Recovery:**
90+
- Messages become visible after visibility timeout
91+
- Other workers steal stale leases
92+
- Resume from last acked offset
93+
- Persistent retry_count prevents infinite retries
94+
95+
## Distributed Processing
96+
97+
```go
98+
// Multiple workers in same consumer group - partition distribution
99+
q1, _ := sql.NewFactory(sql.Params{
100+
Config: sql.DefaultConfig("orchestrator", "worker-1"),
101+
// ...
102+
})
103+
q2, _ := sql.NewFactory(sql.Params{
104+
Config: sql.DefaultConfig("orchestrator", "worker-2"),
105+
// ...
106+
})
107+
// worker-1 leases partition "repo-1"
108+
// worker-2 leases partition "repo-2"
109+
// Automatic failover if worker crashes
110+
111+
// Different consumer group - independent consumption
112+
q3, _ := sql.NewFactory(sql.Params{
113+
Config: sql.DefaultConfig("speculator", "worker-1"),
114+
// ...
115+
})
116+
```
117+
118+
## Partition Ordering
119+
120+
Messages with same `PartitionKey` are processed in order by single worker:
121+
122+
```go
123+
msg1 := queue.NewMessage("1", []byte("first"))
124+
msg1.PartitionKey = "repo-123"
125+
126+
msg2 := queue.NewMessage("2", []byte("second"))
127+
msg2.PartitionKey = "repo-123" // Waits for msg1 to be acked
128+
129+
publisher := q.Publisher()
130+
publisher.Publish(ctx, "events", msg1)
131+
publisher.Publish(ctx, "events", msg2)
132+
```
133+
134+
## Metrics (via tally)
135+
136+
**Publisher:** `messages_published`, `publish_errors`, `publish_latency`
137+
**Subscriber:** `messages_received`, `messages_acked`, `messages_nacked`, `messages_moved_to_dlq`, `message_age`, `poll_latency`, `leases_acquired`
138+
139+
All tagged with `topic`.
140+
141+
## Limitations
142+
143+
- Polling-based (configurable latency via PollInterval)
144+
- Single MySQL instance throughput
145+
- Lease renewal overhead
146+
- Visibility timeout delay before retry

0 commit comments

Comments
 (0)