|
| 1 | +# SQL Queue Implementation |
| 2 | + |
| 3 | +MySQL-based distributed queue with partition leasing, visibility timeout, and at-least-once delivery. |
| 4 | + |
| 5 | +## Key Features |
| 6 | + |
| 7 | +- **Single-table design** - All topics in unified tables |
| 8 | +- **Partition leasing** - Workers coordinate via database-native leases with automatic failover |
| 9 | +- **Visibility timeout** - Messages retry automatically if worker crashes or hangs |
| 10 | +- **Persistent retry tracking** - `retry_count` survives crashes, prevents infinite retries |
| 11 | +- **At-least-once delivery** - Crash recovery with offset tracking |
| 12 | +- **Dead letter queue** - Failed messages moved to DLQ after max retries |
| 13 | + |
| 14 | +## Quick Start |
| 15 | + |
| 16 | +```go |
| 17 | +import ( |
| 18 | + "database/sql" |
| 19 | + _ "github.com/go-sql-driver/mysql" |
| 20 | + "github.com/uber/submitqueue/extensions/queue/sql" |
| 21 | + "github.com/uber/submitqueue/entities/queue" |
| 22 | +) |
| 23 | + |
| 24 | +// Setup |
| 25 | +db, _ := sql.Open("mysql", "user:pass@tcp(localhost:3306)/db") |
| 26 | +factory, _ := sql.NewFactory(sql.Params{ |
| 27 | + DB: db, |
| 28 | + Logger: logger, |
| 29 | + MetricsScope: metricsScope, |
| 30 | + Config: sql.DefaultConfig("orchestrator", "worker-1"), |
| 31 | +}) |
| 32 | +defer factory.Close() |
| 33 | + |
| 34 | +// Publish |
| 35 | +msg := queue.NewMessage("msg-id", []byte(`{"data": "value"}`)) |
| 36 | +msg.PartitionKey = "repo-123" // Required |
| 37 | +factory.Publisher().Publish(ctx, "merge_events", msg) |
| 38 | + |
| 39 | +// Subscribe |
| 40 | +deliveryCh, _ := factory.Subscriber().Subscribe(ctx, "merge_events") |
| 41 | +for delivery := range deliveryCh { |
| 42 | + if err := process(delivery.Message()); err != nil { |
| 43 | + delivery.Nack(ctx, 0) // Retry after visibility timeout |
| 44 | + continue |
| 45 | + } |
| 46 | + delivery.Ack(ctx) |
| 47 | +} |
| 48 | +``` |
| 49 | + |
| 50 | +## Configuration |
| 51 | + |
| 52 | +```go |
| 53 | +config := sql.DefaultConfig("consumer-group", "worker-id") |
| 54 | +config.PollInterval = 50 * time.Millisecond // Poll frequency |
| 55 | +config.BatchSize = 20 // Messages per poll |
| 56 | +config.VisibilityTimeout = 60 * time.Second // Retry delay |
| 57 | +config.LeaseRenewalInterval = 10 * time.Second // Lease renewal frequency |
| 58 | +config.LeaseDuration = 30 * time.Second // Lease validity |
| 59 | +config.Retry.MaxAttempts = 3 // Max retries before DLQ |
| 60 | +``` |
| 61 | + |
| 62 | +## Architecture |
| 63 | + |
| 64 | +### Tables (Single-Table Design) |
| 65 | + |
| 66 | +See full schema: [schema/queue/mysql/schema.sql](../../../schema/queue/mysql/schema.sql) |
| 67 | + |
| 68 | +- **queue_messages** - All messages (topic, partition_key, retry_count, invisible_until) |
| 69 | +- **queue_partition_leases** - Worker coordination (consumer_group, topic, partition_key, leased_by) |
| 70 | +- **queue_offsets** - Consumption progress (consumer_group, topic, partition_key, offset_acked) |
| 71 | +- **queue_dlq** - Failed messages (topic, partition_key, failure_count, last_error) |
| 72 | + |
| 73 | +### How It Works |
| 74 | + |
| 75 | +**Partition Leasing:** |
| 76 | +1. Workers discover partitions from messages table |
| 77 | +2. Workers acquire leases (one worker per partition) |
| 78 | +3. Workers renew leases periodically |
| 79 | +4. Stale leases can be stolen by other workers |
| 80 | + |
| 81 | +**Message Flow:** |
| 82 | +1. Fetch visible messages (invisible_until <= now) |
| 83 | +2. Atomically UPDATE invisible_until and INCREMENT retry_count |
| 84 | +3. Process message |
| 85 | +4. Ack: DELETE message, UPDATE offset_acked |
| 86 | +5. Nack: Do nothing, message becomes visible after timeout |
| 87 | +6. If retry_count >= MaxAttempts: Move to DLQ |
| 88 | + |
| 89 | +**Crash Recovery:** |
| 90 | +- Messages become visible after visibility timeout |
| 91 | +- Other workers steal stale leases |
| 92 | +- Resume from last acked offset |
| 93 | +- Persistent retry_count prevents infinite retries |
| 94 | + |
| 95 | +## Distributed Processing |
| 96 | + |
| 97 | +```go |
| 98 | +// Multiple workers in same consumer group - partition distribution |
| 99 | +factory1, _ := sql.NewFactory(sql.Params{ |
| 100 | + Config: sql.DefaultConfig("orchestrator", "worker-1"), |
| 101 | + // ... |
| 102 | +}) |
| 103 | +factory2, _ := sql.NewFactory(sql.Params{ |
| 104 | + Config: sql.DefaultConfig("orchestrator", "worker-2"), |
| 105 | + // ... |
| 106 | +}) |
| 107 | +// worker-1 leases partition "repo-1" |
| 108 | +// worker-2 leases partition "repo-2" |
| 109 | +// Automatic failover if worker crashes |
| 110 | + |
| 111 | +// Different consumer group - independent consumption |
| 112 | +factory3, _ := sql.NewFactory(sql.Params{ |
| 113 | + Config: sql.DefaultConfig("speculator", "worker-1"), |
| 114 | + // ... |
| 115 | +}) |
| 116 | +``` |
| 117 | + |
| 118 | +## Partition Ordering |
| 119 | + |
| 120 | +Messages with same `PartitionKey` are processed in order by single worker: |
| 121 | + |
| 122 | +```go |
| 123 | +msg1 := queue.NewMessage("1", []byte("first")) |
| 124 | +msg1.PartitionKey = "repo-123" |
| 125 | + |
| 126 | +msg2 := queue.NewMessage("2", []byte("second")) |
| 127 | +msg2.PartitionKey = "repo-123" // Waits for msg1 to be acked |
| 128 | + |
| 129 | +publisher.Publish(ctx, "events", msg1, msg2) |
| 130 | +``` |
| 131 | + |
| 132 | +## Metrics (via tally) |
| 133 | + |
| 134 | +**Publisher:** `messages_published`, `publish_errors`, `publish_latency` |
| 135 | +**Subscriber:** `messages_received`, `messages_acked`, `messages_nacked`, `messages_moved_to_dlq`, `message_age`, `poll_latency`, `leases_acquired` |
| 136 | + |
| 137 | +All tagged with `topic`. |
| 138 | + |
| 139 | +## Limitations |
| 140 | + |
| 141 | +- Polling-based (configurable latency via PollInterval) |
| 142 | +- Single MySQL instance throughput |
| 143 | +- Lease renewal overhead |
| 144 | +- Visibility timeout delay before retry |
0 commit comments