|
| 1 | +# RFC: SQL-Based Distributed Queue |
| 2 | + |
| 3 | +**Status:** Implemented |
| 4 | +**Created:** 2026-02-16 |
| 5 | + |
| 6 | +## Summary |
| 7 | + |
| 8 | +MySQL-based distributed message queue with partition leasing, visibility timeout, and at-least-once delivery. Workers coordinate via database-native primitives without external systems. |
| 9 | + |
| 10 | +## Background |
| 11 | + |
| 12 | +### Motivation |
| 13 | + |
| 14 | +SubmitQueue needs a reliable message queue for coordinating asynchronous workflows: |
| 15 | +- **Orchestrator** publishes merge jobs to workers |
| 16 | +- **Speculator** publishes speculative build requests |
| 17 | +- **Workers** need distributed coordination without duplicate processing |
| 18 | +- **Crash recovery** must preserve exactly where processing stopped |
| 19 | + |
| 20 | +### Existing Solutions |
| 21 | + |
| 22 | +We evaluated several approaches: |
| 23 | + |
| 24 | +1. **External Message Brokers** (Kafka, RabbitMQ) |
| 25 | + - ❌ Additional operational overhead and infrastructure |
| 26 | + - ❌ Network hops increase latency |
| 27 | + - ✅ Battle-tested and highly scalable |
| 28 | + |
| 29 | +2. **Watermill Library** (github.com/ThreeDotsLabs/watermill) |
| 30 | + - ✅ Database-backed queue with mature abstractions |
| 31 | + - ✅ Built-in middleware (retry, poison queue, metrics) |
| 32 | + - ❌ Generic interface hides database-specific optimizations |
| 33 | + - ❌ Additional dependency and learning curve |
| 34 | + - ❌ Less control over exact SQL queries and behavior |
| 35 | + |
| 36 | +3. **Database-Backed Queue** (Custom implementation) |
| 37 | + - ✅ Reuses existing MySQL infrastructure |
| 38 | + - ✅ Full control over queries and behavior |
| 39 | + - ✅ No additional services or dependencies |
| 40 | + - ❌ More code to maintain |
| 41 | + |
| 42 | +### Decision |
| 43 | + |
| 44 | +We chose **custom database-backed queue** because: |
| 45 | +- Full control over SQL queries for optimal performance |
| 46 | +- No additional libraries - direct use of database/sql |
| 47 | +- Simpler to understand and debug (no abstraction layers) |
| 48 | +- Can optimize for our specific use case (partition ordering, visibility timeout) |
| 49 | +- Watermill adds valuable abstractions but we need fine-grained control |
| 50 | + |
| 51 | +## Requirements |
| 52 | + |
| 53 | +### Functional Requirements |
| 54 | + |
| 55 | +1. **Publish/Subscribe** - Standard pub/sub with topics |
| 56 | +2. **Partitioning** - Messages with same key processed in order by single worker |
| 57 | +3. **At-Least-Once Delivery** - Guaranteed delivery, duplicates possible |
| 58 | +4. **Crash Recovery** - Workers resume from last committed offset |
| 59 | +5. **Distributed Workers** - Multiple workers coordinate without duplicate processing |
| 60 | +6. **Dead Letter Queue** - Failed messages isolated after max retries |
| 61 | +7. **Visibility Timeout** - Messages invisible during processing, visible if worker crashes |
| 62 | + |
| 63 | +### Non-Functional Requirements |
| 64 | + |
| 65 | +1. **Operational Simplicity** - No additional infrastructure |
| 66 | +2. **Observability** - Metrics and logging for debugging |
| 67 | +3. **Testability** - In-memory testing without external MySQL |
| 68 | +4. **Performance** - Sub-second latency for typical workloads |
| 69 | +5. **Scalability** - Handle hundreds of workers, thousands of partitions |
| 70 | + |
| 71 | +### Non-Goals |
| 72 | + |
| 73 | +1. **Exactly-Once Delivery** - Application must handle duplicates |
| 74 | +2. **Kafka-Scale Throughput** - Not optimizing for millions of messages/sec |
| 75 | +3. **Cross-Datacenter Replication** - Single MySQL instance only |
| 76 | +4. **Message Ordering Across Partitions** - Only within partition |
| 77 | +5. **Real-Time Streaming** - Polling introduces configurable latency |
| 78 | + |
| 79 | +## Design Overview |
| 80 | + |
| 81 | +### Core Concepts |
| 82 | + |
| 83 | +**Partition Leasing:** Workers coordinate using database-native leases. Each partition leased by exactly one worker. Stale leases automatically stolen on crash. |
| 84 | + |
| 85 | +**Visibility Timeout:** Messages invisible during processing. Auto-retry on crash when timeout expires. |
| 86 | + |
| 87 | +**Persistent Retry Tracking:** `retry_count` incremented atomically on fetch, survives crashes, triggers DLQ. |
| 88 | + |
| 89 | +**Offset Tracking:** Per-partition offsets enable crash recovery from last acked message. |
| 90 | + |
| 91 | +## Database Schema |
| 92 | + |
| 93 | +### Tables |
| 94 | + |
| 95 | +**queue_messages** - All messages across topics |
| 96 | +- Composite PK: `(topic, partition_key, offset)` |
| 97 | +- `offset` AUTO_INCREMENT ensures ordering within partition |
| 98 | +- `invisible_until` for visibility timeout |
| 99 | +- `retry_count` for persistent retry tracking |
| 100 | + |
| 101 | +**queue_partition_leases** - Worker coordination |
| 102 | +- PK: `(consumer_group, topic, partition_key)` |
| 103 | +- `leased_by` identifies owner |
| 104 | +- `lease_renewed_at` enables stale lease detection |
| 105 | + |
| 106 | +**queue_offsets** - Consumption progress |
| 107 | +- PK: `(consumer_group, topic, partition_key)` |
| 108 | +- `offset_acked` tracks last processed message |
| 109 | + |
| 110 | +**queue_dlq** - Failed messages |
| 111 | +- Same structure as messages table |
| 112 | +- Stores messages exceeding max retries |
| 113 | + |
| 114 | +See full schema: `schema/queue/mysql/schema.sql` |
| 115 | + |
| 116 | +## Message Flow |
| 117 | + |
| 118 | +**1. Publish** - Insert messages with AUTO_INCREMENT offset |
| 119 | + |
| 120 | +**2. Lease Acquisition** - `INSERT ... ON DUPLICATE KEY UPDATE` with stale lease detection |
| 121 | + |
| 122 | +**3. Fetch** - Atomic UPDATE sets `invisible_until` and increments `retry_count` |
| 123 | + |
| 124 | +**4. Ack** - Transaction: DELETE message + UPDATE offset_acked |
| 125 | + |
| 126 | +**5. Nack** - UPDATE `invisible_until` for retry after delay |
| 127 | + |
| 128 | +**6. DLQ** - If `retry_count >= MaxAttempts`: DELETE from messages + INSERT into dlq |
| 129 | + |
| 130 | +## Crash Recovery |
| 131 | + |
| 132 | +**Scenario:** Worker crashes while processing message |
| 133 | + |
| 134 | +**What happens:** |
| 135 | +1. Message has `invisible_until = crash_time + VisibilityTimeout` |
| 136 | +2. After timeout expires, message becomes visible |
| 137 | +3. Another worker detects stale lease and steals partition |
| 138 | +4. Message redelivered (at-least-once guarantee) |
| 139 | +5. `retry_count` incremented prevents infinite retries |
| 140 | + |
| 141 | +**Key properties:** Automatic failover, no data loss, configurable retry delay |
| 142 | + |
| 143 | +## Distributed Processing |
| 144 | + |
| 145 | +**Same Consumer Group:** Workers distribute partitions via leasing. Each partition processed by one worker. |
| 146 | + |
| 147 | +**Different Consumer Groups:** Independent consumption with separate offsets. Same messages delivered to all groups. |
| 148 | + |
| 149 | +## Alternatives Considered |
| 150 | + |
| 151 | +### Watermill Library |
| 152 | + |
| 153 | +**Evaluation:** We prototyped a full implementation using `github.com/ThreeDotsLabs/watermill-sql` |
| 154 | + |
| 155 | +**Pros:** |
| 156 | +- Mature abstractions for pub/sub |
| 157 | +- Built-in middleware (poison queue, retry, metrics) |
| 158 | +- Multi-backend support (MySQL, PostgreSQL, Kafka) |
| 159 | +- Well-tested and documented |
| 160 | + |
| 161 | +**Cons:** |
| 162 | +- Generic interface hides database-specific optimizations |
| 163 | +- Less control over exact SQL queries (e.g., can't optimize visibility timeout logic) |
| 164 | +- Middleware adds complexity for simple use case |
| 165 | +- Additional dependency to maintain and version |
| 166 | +- Learning curve for team (new library semantics) |
| 167 | + |
| 168 | +**Decision:** Custom implementation gives us full control. Watermill is valuable for complex multi-backend scenarios but overkill for our focused MySQL use case. |
| 169 | + |
| 170 | +### PostgreSQL SKIP LOCKED |
| 171 | + |
| 172 | +**Pros:** `SELECT ... FOR UPDATE SKIP LOCKED` provides truly atomic fetch |
| 173 | + |
| 174 | +**Cons:** SubmitQueue uses MySQL, not PostgreSQL. Migration not justified. |
| 175 | + |
| 176 | +### Redis Streams |
| 177 | + |
| 178 | +**Pros:** Lower latency, built-in consumer groups |
| 179 | + |
| 180 | +**Cons:** Additional infrastructure. No transactional consistency with MySQL data. |
| 181 | + |
| 182 | +### Single-Table Per Topic |
| 183 | + |
| 184 | +**Pros:** Better isolation, easier to drop topics |
| 185 | + |
| 186 | +**Cons:** Schema migration per topic. Not friendly for dynamic topic creation. |
| 187 | + |
| 188 | +**Decision:** Single-table design for operational simplicity. |
| 189 | + |
| 190 | +## Trade-offs |
| 191 | + |
| 192 | +**Polling vs Push** |
| 193 | +- ✅ Simpler (no connection management), natural backpressure |
| 194 | +- ❌ Higher latency (configurable via PollInterval) |
| 195 | +- Mitigation: Tune PollInterval (default 100ms, tests 20ms) |
| 196 | + |
| 197 | +**Visibility Timeout vs Heartbeat** |
| 198 | +- ✅ No heartbeat protocol, automatic retry |
| 199 | +- ❌ Full timeout delay even on immediate crash |
| 200 | +- Mitigation: ExtendVisibilityTimeout() for long tasks |
| 201 | + |
| 202 | +**Database Leasing vs External Coordinator** |
| 203 | +- ✅ No ZooKeeper/etcd, transactional consistency |
| 204 | +- ❌ Lease renewal overhead |
| 205 | +- Mitigation: Tunable renewal interval (default 10s) |
| 206 | + |
| 207 | +**At-Least-Once vs Exactly-Once** |
| 208 | +- ✅ Simpler, better performance |
| 209 | +- ❌ Applications must handle duplicates |
| 210 | +- Mitigation: Idempotency keys (e.g., merge request ID) |
| 211 | + |
| 212 | + |
| 213 | +## Observability |
| 214 | + |
| 215 | +**Metrics (via tally):** |
| 216 | +- Publisher: `messages_published`, `publish_errors` |
| 217 | +- Subscriber: `messages_acked`, `messages_nacked`, `messages_moved_to_dlq`, `message_age`, `leases_acquired` |
| 218 | +- Stores: `insert.latency`, `fetch.latency`, `ack_message.latency`, `renew_lease.latency` |
| 219 | + |
| 220 | +**Logging (via zap):** |
| 221 | +- Debug: Message fetch, lease operations |
| 222 | +- Info: Publish success, DLQ moves, partition acquisition |
| 223 | +- Error: Database errors, unrecoverable failures |
| 224 | +- Structured fields: `topic`, `partition_key`, `message_id`, `offset`, `retry_count` |
| 225 | + |
| 226 | +## Performance |
| 227 | + |
| 228 | +**Throughput:** ~1k-5k msg/sec publish, ~500-2k msg/sec consume (single MySQL) |
| 229 | +**Latency:** Best case = PollInterval (100ms), Retry after crash = VisibilityTimeout (60s) |
| 230 | +**Bottlenecks:** MySQL write throughput, lease renewal overhead, polling overhead |
| 231 | + |
| 232 | +## Configuration |
| 233 | + |
| 234 | +```go |
| 235 | +type Config struct { |
| 236 | + ConsumerGroup string // e.g., "orchestrator" |
| 237 | + WorkerID string // e.g., "worker-1" |
| 238 | + PollInterval time.Duration // Default: 100ms |
| 239 | + BatchSize int // Default: 10 |
| 240 | + VisibilityTimeout time.Duration // Default: 60s |
| 241 | + LeaseDuration time.Duration // Default: 30s |
| 242 | + LeaseRenewalInterval time.Duration // Default: 10s |
| 243 | + Retry.MaxAttempts int // Default: 3 |
| 244 | + DLQ.Enabled bool // Default: true |
| 245 | +} |
| 246 | +``` |
0 commit comments