diff --git a/doc/rfc/sql-queue-rfc.md b/doc/rfc/sql-queue-rfc.md index 18ba1905..0edea882 100644 --- a/doc/rfc/sql-queue-rfc.md +++ b/doc/rfc/sql-queue-rfc.md @@ -7,11 +7,11 @@ | **Author** | Preetam Dwived | | **Status** | In Review | | **Created** | 2026-02-16 | -| **Updated** | 2026-02-16 | +| **Updated** | 2026-03-08 | ## Summary -MySQL-based distributed message queue with partition leasing, visibility timeout, and at-least-once delivery. Workers coordinate via database-native primitives without external systems. +MySQL-based distributed message queue with immutable message log, per-consumer-group delivery state, partition leasing, and at-least-once delivery. Workers coordinate via database-native primitives without external systems. ## Background @@ -27,29 +27,29 @@ SubmitQueue needs a reliable message queue for coordinating asynchronous workflo We evaluated several approaches: 1. **External Message Brokers** (Kafka, RabbitMQ) - - ❌ Additional operational overhead and infrastructure - - ❌ Network hops increase latency - - ✅ Battle-tested and highly scalable + - Additional operational overhead and infrastructure + - Network hops increase latency + - Battle-tested and highly scalable 2. **Watermill Library** (github.com/ThreeDotsLabs/watermill) - - ✅ Database-backed queue with mature abstractions - - ✅ Built-in middleware (retry, poison queue, metrics) - - ❌ Generic interface hides database-specific optimizations - - ❌ Additional dependency and learning curve - - ❌ Less control over exact SQL queries and behavior + - Database-backed queue with mature abstractions + - Built-in middleware (retry, poison queue, metrics) + - Generic interface hides database-specific optimizations + - Additional dependency and learning curve + - Less control over exact SQL queries and behavior 3. **dbqueue-go** (github.com/yunussandikci/dbqueue-go) - - ✅ Lightweight, simple FIFO queue over SQL (MySQL, PostgreSQL, SQLite) - - ✅ Basic features: priority, deduplication, visibility timeout - - ❌ No distributed worker coordination or partition leasing - - ❌ No built-in retry mechanism or DLQ - - ❌ Designed for single-worker scenarios, not multi-worker distribution + - Lightweight, simple FIFO queue over SQL (MySQL, PostgreSQL, SQLite) + - Basic features: priority, deduplication, visibility timeout + - No distributed worker coordination or partition leasing + - No built-in retry mechanism or DLQ + - Designed for single-worker scenarios, not multi-worker distribution 4. **Database-Backed Queue** (Custom implementation) - - ✅ Reuses existing MySQL infrastructure - - ✅ Full control over queries and behavior - - ✅ No additional services or dependencies - - ❌ More code to maintain + - Reuses existing MySQL infrastructure + - Full control over queries and behavior + - No additional services or dependencies + - More code to maintain ### Decision @@ -57,7 +57,7 @@ We chose **custom database-backed queue** because: - Full control over SQL queries for optimal performance - No additional libraries - direct use of database/sql - Simpler to understand and debug (no abstraction layers) -- Can optimize for our specific use case (partition ordering, visibility timeout) +- Can optimize for our specific use case (partition ordering, delivery state tracking) - Watermill adds valuable abstractions but we need fine-grained control ## Requirements @@ -70,15 +70,14 @@ We chose **custom database-backed queue** because: 4. **Crash Recovery** - Workers resume from last committed offset 5. **Distributed Workers** - Multiple workers coordinate without duplicate processing 6. **Dead Letter Queue** - Failed messages isolated after max retries -7. **Visibility Timeout** - Messages invisible during processing, visible if worker crashes +7. **Delivery State Tracking** - Per-consumer-group visibility, retry count, and ack state +8. **Multi-Consumer-Group** - Independent consumption of the same message log ### Non-Functional Requirements 1. **Operational Simplicity** - No additional infrastructure -2. **Observability** - Metrics and logging for debugging -3. **Testability** - In-memory testing without external MySQL -4. **Performance** - Sub-second latency for typical workloads -5. **Scalability** - Handle hundreds of workers, thousands of partitions +2. **Sub-Second Latency** - For typical workloads +3. **Scalability** - Handle hundreds of workers, thousands of partitions ### Non-Goals @@ -101,9 +100,14 @@ We chose **custom database-backed queue** because: │ MySQL Database │ │ ┌───────────────────────────────┐ │ │ │ queue_messages │ │ +│ │ (immutable append-only log) │ │ │ │ - topic, partition_key │ │ -│ │ - offset, invisible_until │ │ -│ │ - retry_count, payload │ │ +│ │ - offset, payload │ │ +│ └───────────────────────────────┘ │ +│ ┌───────────────────────────────┐ │ +│ │ queue_delivery_state │ │ +│ │ (per-consumer-group) │ │ +│ │ - invisible_until, retry │ │ │ └───────────────────────────────┘ │ │ ┌───────────────────────────────┐ │ │ │ queue_partition_leases │ │ @@ -115,6 +119,11 @@ We chose **custom database-backed queue** because: │ │ - consumer_group, topic │ │ │ │ - partition_key, offset │ │ │ └───────────────────────────────┘ │ +│ ┌───────────────────────────────┐ │ +│ │ queue_subscriber_heartbeats │ │ +│ │ - consumer_group, topic │ │ +│ │ - subscriber_name, heartbeat │ │ +│ └───────────────────────────────┘ │ └─────────────────────────────────────┘ │ ▼ @@ -133,33 +142,50 @@ We chose **custom database-backed queue** because: ### Core Concepts -**Partition Leasing:** Workers coordinate using database-native leases. Each partition leased by exactly one worker. Stale leases automatically stolen on crash. +**Immutable Message Log:** Messages are append-only in `queue_messages`. No per-message mutation occurs during delivery — the log is shared across all consumer groups. + +**Delivery State Tracking:** Per-consumer-group delivery state in `queue_delivery_state` tracks ack state, visibility timeout, and retry count independently. Each row has an explicit `acked` boolean and an `invisible_until` timestamp. When `acked = TRUE`, the message is done and never redelivered. When `acked = FALSE`, `invisible_until` controls visibility: past/zero = ready for delivery, future = in-flight or nack delay. -**Visibility Timeout:** Messages invisible during processing. Auto-retry on crash when timeout expires. +**Watermark-Based Offset:** On ack, the subscriber computes the contiguous acked watermark by scanning forward from the current `offset_acked` through delivery state. The watermark advances to the highest contiguous acked offset, and delivery state rows behind it are cleaned up. + +**Partition Leasing:** Workers coordinate using database-native leases. Each partition leased by exactly one worker. Stale leases automatically stolen on crash. -**Persistent Retry Tracking:** `retry_count` incremented atomically on fetch, survives crashes, triggers DLQ. +**Fair Partition Distribution:** Subscribers send periodic heartbeats. Each subscriber calculates `ceil(totalPartitions / activeSubscribers)` to cap lease acquisitions and releases excess partitions during rebalance. -**Offset Tracking:** Per-partition offsets enable crash recovery from last acked message. +**Persistent Retry Tracking:** `retry_count` incremented atomically on delivery via `ON DUPLICATE KEY UPDATE`, survives crashes, triggers DLQ after `MaxAttempts`. ## Database Schema -### Messages Table +### Messages Table (Immutable Log) + +Messages are append-only. No per-message mutation during delivery. **Key Fields:** - `offset` (PK): Auto-incrementing global offset for message ordering - `topic`, `partition_key`: Message routing and partitioning - `id`: Unique message identifier - `payload`, `metadata`: Message content -- `retry_count`: Persistent retry tracking (survives worker crashes) -- `invisible_until`: Visibility timeout (epoch ms) for crash recovery +- `failed_at`, `failure_count`, `last_error`, `original_topic`: DLQ-specific fields (zero values for normal messages, populated when message is moved to DLQ topic) - `created_at`, `published_at`: Timestamps **Indexes:** -- `(topic, partition_key, invisible_until, offset)`: Core fetch query - find visible messages in partition ordered by offset -- `(topic, partition_key, id)`: Unique constraint and fast lookup for Ack/Nack +- `(topic, partition_key, offset)`: Core fetch query — poll messages in partition ordered by offset +- `(topic, partition_key, id)`: Unique constraint and idempotent publish See `extension/queue/mysql/schema/queue_messages.sql` for full schema. +### Delivery State Table + +Per-consumer-group delivery tracking with explicit ack state. + +**Key Fields:** +- `consumer_group`, `topic`, `partition_key`, `message_offset` (PK): Identifies delivery state per consumer group per message +- `acked`: Whether this consumer group has successfully processed this message +- `invisible_until`: Visibility timeout in epoch milliseconds (only meaningful when `acked = FALSE`) +- `retry_count`: Number of times message has been redelivered to this consumer group + +See `extension/queue/mysql/schema/queue_delivery_state.sql` for full schema. + ### Partition Leases Table **Key Fields:** @@ -177,7 +203,7 @@ See `extension/queue/mysql/schema/queue_partition_leases.sql` for full schema. **Key Fields:** - `consumer_group`, `topic`, `partition_key` (PK): Identifies offset position -- `offset_acked`: Last successfully processed offset +- `offset_acked`: Contiguous acked watermark — all messages at or below this offset are fully processed - `updated_at`: Last update timestamp **Indexes:** @@ -186,54 +212,150 @@ See `extension/queue/mysql/schema/queue_partition_leases.sql` for full schema. See `extension/queue/mysql/schema/queue_offsets.sql` for full schema. -### Dead Letter Queue Table +### Subscriber Heartbeats Table **Key Fields:** -- `offset` (PK): Auto-incrementing offset for DLQ ordering -- `topic`, `partition_key`, `id`: Original message identification -- `payload`, `metadata`: Original message content -- `failed_at`: When message moved to DLQ -- `failure_count`, `last_error`: Failure diagnostics +- `consumer_group`, `topic`, `subscriber_name` (PK): Identifies the subscriber. `subscriber_name` is provided via `SubscriptionConfig.SubscriberName` at subscription time. +- `heartbeat_at`: Unix timestamp in milliseconds of last heartbeat +- `deregistered_at`: Soft-delete timestamp (0 = active, >0 = deregistered during graceful shutdown) -**Indexes:** -- `(topic, partition_key, failed_at)`: Query DLQ by topic/partition, ordered by failure time -- `(failed_at)`: Time-based queries and cleanup -- `(topic, partition_key, id)`: Unique constraint, prevents duplicates +See `extension/queue/mysql/schema/queue_subscriber_heartbeats.sql` for full schema. + +### Dead Letter Queue -See `extension/queue/mysql/schema/queue_dlq.sql` for full schema. +DLQ messages are stored in the same `queue_messages` table under a different topic name (original topic + DLQ suffix, e.g., `merge_queue_dlq`). This allows DLQ messages to be consumed using the normal subscriber with the DLQ topic name. DLQ-specific fields (`failed_at`, `failure_count`, `last_error`, `original_topic`) are populated when a message is moved to DLQ; they are zero/empty for normal messages. ## Message Flow -**1. Publish** - Insert messages with AUTO_INCREMENT offset +**1. Publish** — Insert messages into `queue_messages` with AUTO_INCREMENT offset + +**2. Lease Acquisition** — `INSERT ... ON DUPLICATE KEY UPDATE` with stale lease detection + +**3. Fetch** — Read from immutable log: `SELECT ... WHERE topic=? AND partition_key=? AND offset > ?` + +**4. Delivery Check** — Check `queue_delivery_state` for per-consumer-group deliverability: + - No row (never delivered) → deliverable + - `acked = TRUE` → skip (already processed) + - `acked = FALSE`, `invisible_until <= now` (visibility expired) → deliverable (redelivery) + - `acked = FALSE`, `invisible_until > now` (in-flight/nack delay) → skip + +**5. Mark Delivered** — `INSERT ... ON DUPLICATE KEY UPDATE` in delivery state: set `invisible_until = now + timeout`, increment `retry_count` on redelivery (only if `acked = FALSE`) + +**5a. Extend Visibility** — Update `invisible_until` only, without incrementing `retry_count`. Used for long-running work. + +**6. Ack** — Set `acked = TRUE` in delivery state. Watermark advancement is deferred to the poll loop for reduced per-ack latency. All operations are idempotent. + +**7. Nack** — Set `invisible_until = now + delay` for retry after backoff + +**8. DLQ** — If `retry_count >= MaxAttempts`: atomically move message to DLQ topic (INSERT with DLQ topic + DELETE from original topic in transaction). MoveToDLQ must succeed before marking acked — otherwise the message would be lost from both main queue and DLQ. + +**9. Garbage Collection** — Delete messages where `offset <= MIN(offset_acked)` across all consumer groups + +## Consumer Group Isolation + +All per-message state is scoped to `(consumer_group, topic, partition_key)`. Nothing is global. Each consumer group has: + +- **Independent delivery state** — visibility timeout, retry count, and ack state per message are tracked separately in `queue_delivery_state`. Consumer group A nacking a message has no effect on consumer group B's view of the same message. +- **Independent offsets** — each group maintains its own `offset_acked` watermark in `queue_offsets`. Group A can be ahead or behind group B. +- **Independent partition leases** — each group has its own set of leases in `queue_partition_leases`. Workers in group A do not compete with workers in group B. +- **Independent heartbeats** — subscriber heartbeats are scoped to `(consumer_group, topic)` for fair share computation within a group. +- **Shared immutable log** — `queue_messages` is the only shared table. It is append-only and never mutated by consumers. All consumer groups read from the same log but track their own position and delivery state. + +Garbage collection is the only cross-group operation: `GarbageCollect` computes `MIN(offset_acked)` across all consumer groups for a partition, ensuring a message is only deleted after every group has acked past it. + +## Ordering and Serialization + +### Default: Concurrent Delivery Within Partition + +By default, the poll loop fetches a batch of messages (`BatchSize`, default 10) from the immutable log and delivers each one that passes the deliverability check. Multiple messages from the same partition can be in-flight concurrently. Messages are delivered in offset order but may be acked out of order. + +### Non-Blocking Nack -**2. Lease Acquisition** - `INSERT ... ON DUPLICATE KEY UPDATE` with stale lease detection +When a message is nacked, its `invisible_until` is set to a future timestamp. On the next poll, the nacked message is skipped (not deliverable) while subsequent messages are still delivered normally. A nacked message does not block, starve, or delay any other message in the partition. -**3. Fetch** - Atomic UPDATE sets `invisible_until` and increments `retry_count` +Example with 5 messages at offsets 1-5, all delivered: +- Message 3 is nacked with 30s delay +- Messages 1, 2, 4, 5 can be acked independently +- Watermark advances to 2 (contiguous from head), stops at 3 (not acked) +- After 30s, message 3 becomes deliverable again, is redelivered +- Once message 3 is acked, watermark jumps from 2 to 5 -**4. Ack** - Transaction: DELETE message + UPDATE offset_acked +### Strict Serialization (Opt-In) -**5. Nack** - UPDATE `invisible_until` for retry after delay +For use cases requiring strict in-order processing (e.g., ordered state machine transitions), set `BatchSize = 1`. This ensures: -**6. DLQ** - If `retry_count >= MaxAttempts`: DELETE from messages + INSERT into dlq +1. Only one message is fetched per poll cycle +2. Only one message is in-flight at a time per partition +3. The next message is not fetched until the current one is acked or nacked and becomes invisible + +**Requirement:** `VisibilityTimeoutMs` must exceed the maximum processing time. If the timeout expires before the consumer acks, the message becomes deliverable again and may be delivered concurrently with the next poll — violating serialization. Set a generous timeout and extend visibility for long-running work. + +### Watermark Advancement + +The `offset_acked` watermark represents the highest contiguous acked offset — all messages at or below this offset are fully processed. On each poll tick, the subscriber: + +1. Fetches message offsets above the current watermark from `queue_messages` (bounded to prevent unbounded memory usage) +2. Batch-fetches delivery state for those offsets from `queue_delivery_state` +3. Walks offsets in order: advances while contiguous acked, stops at the first non-acked or undelivered message +4. Updates `offset_acked` to the new watermark +5. Cleans up delivery state rows behind the new watermark (eventual consistency — stale rows are harmless and retried on next call) + +The two-query approach avoids cross-table JOINs. Each store queries only its own table; the subscriber orchestrates both. + +Watermark advancement is deferred from Ack to the poll loop for reduced per-ack latency. The poll loop advances the watermark on every tick — idempotent and incremental, converging over multiple calls even with large backlogs. + +## Message Durability + +Messages are never silently lost. Every deletion path has explicit guards: + +**Garbage Collection:** Only deletes messages where `offset <= MIN(offset_acked)` across all consumer groups. If any consumer group has not acked past a message, it is retained. Safe under concurrent reads — a consumer processing a message at offset N has not yet acked it, so `MIN(offset_acked)` stays below N. + +**Move to DLQ:** Atomic transaction: INSERT into DLQ topic, then DELETE from original topic. If the transaction fails at any point, the original message is preserved via ROLLBACK. The message never exists in zero tables. + +**Delivery State Cleanup:** The `AdvanceWatermark` cleanup DELETE only removes delivery state rows (not messages) behind the contiguous watermark. Stale rows are harmless (never queried — all reads use offset > watermark) and cleaned up on the next call. + +**No Silent Deletions:** There is no code path that deletes a message without either (a) all consumer groups having acked past it, or (b) an atomic move to DLQ. The `Delete()` method exists on the store interface but is not called in any production flow. ## Crash Recovery -**Scenario:** Worker crashes while processing message +**Scenario:** Worker crashes while processing a message **What happens:** -1. Message has `invisible_until = crash_time + VisibilityTimeout` -2. After timeout expires, message becomes visible +1. Delivery state has `invisible_until = crash_time + VisibilityTimeout` +2. After timeout expires, message becomes deliverable again 3. Another worker detects stale lease and steals partition -4. Message redelivered (at-least-once guarantee) -5. `retry_count` incremented prevents infinite retries +4. Message is redelivered (at-least-once guarantee) +5. `retry_count` increment on redelivery prevents infinite retries + +**Scenario:** Worker crashes after ack but before watermark update -**Key properties:** Automatic failover, no data loss, configurable retry delay +**What happens:** +1. Message is marked acked in delivery state (`acked = TRUE`) +2. Watermark was not advanced (crash interrupted the flow) +3. On the next poll tick, watermark scans forward and catches up +4. No message loss — the acked message is simply not re-delivered + +**Key properties:** Automatic failover, no data loss, configurable retry delay, eventual watermark convergence. ## Distributed Processing -**Same Consumer Group:** Workers distribute partitions via leasing. Each partition processed by one worker. +**Same Consumer Group:** Workers distribute partitions via fair leasing. Each partition processed by one worker. Heartbeats enable `ceil(totalPartitions / activeSubscribers)` fair share computation. Rebalance releases excess partitions when new subscribers join. + +**Different Consumer Groups:** Independent consumption with separate delivery state and offsets. Same immutable message log consumed by all groups. One group's nacks, retries, and DLQ moves have no effect on other groups. + +## Fair Partition Distribution -**Different Consumer Groups:** Independent consumption with separate offsets. Same messages delivered to all groups. +Subscribers send periodic heartbeats to `queue_subscriber_heartbeats`. The fair share algorithm: + +1. Count active subscribers (heartbeat within `LeaseDurationMs`) +2. Count total partitions (union of owned leases + discovered from messages table) +3. Compute `maxPartitions = ceil(totalPartitions / activeSubscribers)` +4. Cap lease acquisitions at `maxPartitions` +5. On each lease renewal tick, if this subscriber holds more than `maxPartitions`, release excess (sorted deterministically so the same partitions are released across runs) + +**Graceful shutdown:** Subscribers deregister their heartbeat and release all leases, enabling immediate redistribution. + +**Failure:** If fair share computation fails (heartbeat store error, discovery error), the subscriber falls back to unlimited acquisition — ensuring availability over perfect fairness. ## Alternatives Considered @@ -251,7 +373,7 @@ See `extension/queue/mysql/schema/queue_dlq.sql` for full schema. - **No partition key support** - can't guarantee ordering within partitions - **Requires table per topic** - schema migration for every new topic, not friendly for dynamic topics - Generic interface hides database-specific optimizations -- Less control over exact SQL queries (e.g., can't optimize visibility timeout logic) +- Less control over exact SQL queries (e.g., can't optimize delivery state logic) - Additional dependency to maintain and version - More infrastructure to maintain (separate tables, schema management) - Learning curve for team (new library semantics) @@ -269,7 +391,7 @@ With our custom implementation: - No schema migrations for new repos or topics - Ordering guaranteed within `(topic, partition_key)` -**Decision:** Custom implementation gives us partition ordering guarantees and single-table design. Watermill is valuable for complex multi-backend scenarios but doesn't fit our partition-based ordering requirements. +**Decision:** Custom implementation gives us partition ordering guarantees, immutable log design, and single-table simplicity. Watermill is valuable for complex multi-backend scenarios but doesn't fit our partition-based ordering requirements. ### Single-Table Per Topic @@ -279,48 +401,60 @@ With our custom implementation: **Decision:** Single-table design for operational simplicity. +### Mutable Message State (Previous Design) + +The original design tracked visibility timeout and retry count directly on `queue_messages` rows. This worked but had limitations: +- Messages were mutated on every delivery, creating write contention +- Multiple consumer groups couldn't independently track delivery state for the same message +- Garbage collection required complex coordination since messages could be in different states per consumer group + +The current design separates the immutable message log from per-consumer-group delivery state, enabling independent consumption and cleaner GC (delete when all groups have acked past a message). + ## Trade-offs **Polling vs Push** -- ✅ Simpler (no connection management), natural backpressure -- ❌ Higher latency (configurable via PollInterval) +- Simpler (no connection management), natural backpressure +- Higher latency (configurable via PollInterval) - Mitigation: Tune PollInterval (default 100ms, tests 20ms) +**Immutable Log + Delivery State vs Mutable Messages** +- Multiple consumer groups consume independently +- Cleaner GC (min watermark across groups) +- Extra table for delivery state tracking +- Mitigation: Delivery state rows cleaned up behind watermark; two separate queries (no JOIN) for watermark advancement + +**Non-Blocking Nack vs Head-of-Line Blocking** +- Nacked messages don't starve the partition — later messages flow normally +- Out-of-order acks mean watermark only advances past contiguous acked blocks +- More delivery state rows to track (cleaned up behind watermark) +- Mitigation: Watermark advancement cleans up rows; GC deletes messages behind min watermark + +**Strict Serialization (BatchSize=1) vs Concurrent Delivery** +- Strict ordering guaranteed within partition when needed +- Lower throughput (one message at a time per partition) +- Requires correct VisibilityTimeoutMs configuration (must exceed max processing time) +- Mitigation: Use for ordering-sensitive topics only; concurrent delivery for throughput-sensitive topics + **Visibility Timeout vs Heartbeat** -- ✅ No heartbeat protocol, automatic retry -- ❌ Full timeout delay even on immediate crash -- Mitigation: ExtendVisibilityTimeout() for long tasks +- No heartbeat protocol per message, automatic retry +- Full timeout delay even on immediate crash +- Mitigation: Extend visibility for long tasks **Database Leasing vs External Coordinator** -- ✅ No ZooKeeper/etcd, transactional consistency -- ❌ Lease renewal overhead +- No ZooKeeper/etcd, transactional consistency +- Lease renewal overhead - Mitigation: Tunable renewal interval (default 10s) +**Fair Share via Heartbeats vs Static Assignment** +- Dynamic rebalancing as subscribers join/leave +- Eventually consistent (brief imbalance during transitions) +- Mitigation: Rebalance on every lease renewal tick; deterministic partition release order + **At-Least-Once vs Exactly-Once** -- ✅ Simpler, better performance -- ❌ Applications must handle duplicates +- Simpler, better performance +- Applications must handle duplicates - Mitigation: Idempotency keys (e.g., merge request ID) - -## Observability - -**Metrics (via tally):** -- Publisher: `messages_published`, `publish_errors` -- Subscriber: `messages_acked`, `messages_nacked`, `messages_moved_to_dlq`, `message_age`, `leases_acquired` -- Stores: `insert.latency`, `fetch.latency`, `ack_message.latency`, `renew_lease.latency` - -**Logging (via zap):** -- Debug: Message fetch, lease operations -- Info: Publish success, DLQ moves, partition acquisition -- Error: Database errors, unrecoverable failures -- Structured fields: `topic`, `partition_key`, `message_id`, `offset`, `retry_count` - -## Performance - -- **Throughput:** ~1k-5k msg/sec publish, ~500-2k msg/sec consume (single MySQL) -- **Latency:** Best case = PollInterval (100ms), Retry after crash = VisibilityTimeout (60s) -- **Bottlenecks:** MySQL write throughput, lease renewal overhead, polling overhead - ## Appendix ### References @@ -336,4 +470,4 @@ With our custom implementation: **Alternative Implementations:** - [Watermill Documentation](https://watermill.io/) - Go library for message streaming (evaluated alternative) -- [PostgreSQL SKIP LOCKED](https://www.postgresql.org/docs/current/sql-select.html#SQL-FOR-UPDATE-SHARE) - Alternative database queue pattern \ No newline at end of file +- [PostgreSQL SKIP LOCKED](https://www.postgresql.org/docs/current/sql-select.html#SQL-FOR-UPDATE-SHARE) - Alternative database queue pattern diff --git a/extension/queue/mysql/README.md b/extension/queue/mysql/README.md index e5dd22f8..1e6ce56c 100644 --- a/extension/queue/mysql/README.md +++ b/extension/queue/mysql/README.md @@ -1,14 +1,8 @@ # SQL Queue Implementation -MySQL-based distributed queue with partition leasing, visibility timeout, and at-least-once delivery. +MySQL-based distributed queue with partition leasing, delivery state tracking, and at-least-once delivery. -## Key Features - -- **Partition leasing** — workers coordinate via database leases with automatic failover -- **Per-partition workers** — each leased partition gets its own goroutine for isolation -- **Visibility timeout** — messages retry automatically if worker crashes -- **At-least-once delivery** — offset tracking for crash recovery -- **Dead letter queue** — failed messages moved to DLQ after max retries +For design rationale, guarantees, and trade-offs, see the [RFC](../../doc/rfc/sql-queue-rfc.md). ## Quick Start @@ -53,10 +47,8 @@ Per-subscription configuration enables different settings for each topic: ```go import extqueue "github.com/uber/submitqueue/extension/queue" -// Default config subConfig := extqueue.DefaultSubscriptionConfig("worker-1", "consumer-group") -// Customize for this subscription subConfig.PollIntervalMs = 50 // Poll frequency (milliseconds) subConfig.BatchSize = 20 // Messages per poll subConfig.VisibilityTimeoutMs = 60000 // Retry delay (milliseconds) @@ -68,93 +60,136 @@ subConfig.Retry.MaxBackoffMs = 30000 // Max retry backoff (mill subConfig.Retry.BackoffMultiplier = 2.0 // Backoff multiplier for exponential backoff subConfig.DLQ.Enabled = true // Enable dead letter queue subConfig.DLQ.TopicSuffix = "_dlq" // DLQ topic suffix - -// Use config when subscribing -deliveryCh, _ := q.Subscriber().Subscribe(ctx, "my-topic", subConfig) ``` **Key Configuration Fields:** -- `SubscriberName`: Unique worker identifier for partition leasing (e.g., hostname, pod name) -- `ConsumerGroup`: Consumer group for independent offset tracking -- `PollIntervalMs`: How often to poll for new messages (milliseconds) -- `BatchSize`: Maximum messages to fetch per poll -- `VisibilityTimeoutMs`: How long messages are invisible after being fetched (milliseconds) -- `LeaseRenewalIntervalMs`: How often to renew partition leases (milliseconds) -- `LeaseDurationMs`: How long leases remain valid without renewal (milliseconds) -- `Retry.MaxAttempts`: Maximum processing attempts before moving to DLQ -- `Retry.InitialBackoffMs`: Initial retry backoff delay (milliseconds) -- `Retry.MaxBackoffMs`: Maximum retry backoff delay (milliseconds) -- `Retry.BackoffMultiplier`: Multiplier for exponential backoff -- `DLQ.TopicSuffix`: Suffix appended to topic name for DLQ (e.g., "orders" -> "orders_dlq") - -## Architecture +| Field | Description | +|-------|-------------| +| `SubscriberName` | Unique worker identifier for partition leasing (e.g., hostname, pod name) | +| `ConsumerGroup` | Consumer group for independent offset tracking | +| `PollIntervalMs` | How often to poll for new messages | +| `BatchSize` | Maximum messages to fetch per poll. Set to `1` for strict serialization | +| `VisibilityTimeoutMs` | How long messages are invisible after fetch. Must exceed max processing time for `BatchSize=1` | +| `LeaseRenewalIntervalMs` | How often to renew partition leases | +| `LeaseDurationMs` | How long leases remain valid without renewal | +| `Retry.MaxAttempts` | Maximum processing attempts before DLQ | +| `DLQ.TopicSuffix` | Suffix appended to topic name for DLQ (e.g., `"orders"` → `"orders_dlq"`) | -### Goroutine Model +## Package Layout + +``` +extension/queue/mysql/ +├── sql.go # NewQueue constructor, wires stores → publisher/subscriber +├── stores.go # Internal store interfaces (messageStore, offsetStore, etc.) +├── message_store.go # queue_messages table operations (immutable log) +├── delivery_state_store.go # queue_delivery_state table operations (per-consumer-group) +├── offset_store.go # queue_offsets table operations (watermark tracking) +├── partition_lease_store.go # queue_partition_leases table operations +├── subscriber_heartbeat_store.go # queue_subscriber_heartbeats table operations +├── publisher.go # Publisher implementation +├── subscriber.go # Subscriber, delivery, goroutine management +├── constants.go # Log key constants +├── errors.go # Error types +├── schema/ # SQL schema files (one per table) +│ ├── queue_messages.sql +│ ├── queue_delivery_state.sql +│ ├── queue_offsets.sql +│ ├── queue_partition_leases.sql +│ └── queue_subscriber_heartbeats.sql +└── ctl/ # Admin CLI (see ctl/README.md) +``` -Each subscription has a **supervisor goroutine** (`managePartitions`) that: -1. Discovers partitions from the messages table -2. Acquires and renews partition leases -3. Reconciles **per-partition worker goroutines** based on current leases +## Internal Architecture -Each partition worker goroutine polls and delivers messages for its partition independently. This provides fault isolation — a slow or blocked partition does not affect other partitions. +### Database Tables + +| Table | Purpose | Scoped To | +|-------|---------|-----------| +| `queue_messages` | Immutable append-only message log | `(topic, partition_key)` — shared across consumer groups | +| `queue_delivery_state` | Visibility, ack state, retry count | `(consumer_group, topic, partition_key, offset)` | +| `queue_offsets` | Contiguous acked watermark | `(consumer_group, topic, partition_key)` | +| `queue_partition_leases` | Partition lease coordination | `(consumer_group, topic, partition_key)` | +| `queue_subscriber_heartbeats` | Active subscriber tracking | `(consumer_group, topic, subscriber_name)` | + +See `schema/` for full SQL definitions. See the [RFC](../../doc/rfc/sql-queue-rfc.md#database-schema) for field-level documentation. + +### Store Architecture + +Each table is backed by an internal store interface defined in `stores.go`. Stores: +- Query only their own table (no cross-table JOINs) +- Return errors via `fmt.Errorf` (no logging, no error classification) +- Use `metrics.Begin`/`Complete` for latency and success/failure tracking + +The subscriber layer orchestrates cross-store operations (e.g., watermark advancement queries both `messageStore` and `deliveryStateStore`) and owns all logging and error classification. + +### Goroutine Model + +Each subscription has a **supervisor goroutine** (`managePartitions`) that discovers partitions, acquires leases, sends heartbeats, rebalances, and reconciles per-partition worker goroutines. ``` Subscribe() - └── managePartitions (supervisor) - ├── partitionWorker("part-1") ← polls & delivers - ├── partitionWorker("part-2") ← polls & delivers - └── partitionWorker("part-3") ← polls & delivers + └── managePartitions (supervisor) ← tracked by sub.wg + ├── partitionWorker("part-1") ← tracked by sub.workerWg + ├── partitionWorker("part-2") + └── partitionWorker("part-3") ``` -### Shutdown Sequence +Each partition worker runs independently — polls the DB on a ticker, checks deliverability via `GetDeliveryState` per message, and sends deliveries to the shared channel. A slow or blocked partition does not affect other partitions. -Shutdown uses two `sync.WaitGroup`s to ensure correctness: -- `wg` tracks the supervisor goroutine (`managePartitions`) -- `workerWg` tracks all partition worker goroutines +### Shutdown Sequence When `Close()` is called: 1. Subscription context is cancelled -2. `managePartitions` calls `stopAllWorkers` — cancels each worker and waits up to 5s per worker -3. Partition leases are released -4. `workerWg.Wait()` blocks until all workers have fully exited -5. `deliveryCh` is closed — safe because no workers can send after step 4 -6. `managePartitions` returns, `wg.Done()` fires -7. `Close()` returns +2. `managePartitions` calls `stopAllWorkers` — cancels each worker's context, waits up to 30s +3. Partition leases are released (fresh context, not cancelled) +4. Subscriber heartbeat is deregistered +5. `workerWg.Wait()` — blocks until all workers have fully exited +6. `deliveryCh` is closed — safe because no senders remain after step 5 +7. `managePartitions` returns → `wg.Done()` → `Close()` unblocks -The `workerWg.Wait()` step prevents a race where a slow worker (blocked on I/O past the 5s timeout) could send on a closed channel. +The `workerWg.Wait()` before `close(deliveryCh)` prevents a race where a slow worker could send on a closed channel. ### Worker Stop Behavior When a partition worker is stopped (lease lost or shutdown): -- The worker is immediately removed from the workers map and its context is cancelled -- The caller waits up to 5s for the worker to confirm exit (logging a warning on timeout) -- `workerWg` tracks the worker regardless, so `Close()` always waits for full exit -- If the worker times out, reconciliation is free to start a replacement — any brief overlap is harmless with at-least-once delivery semantics +- Immediately removed from workers map and context cancelled +- Caller waits up to 30s for exit confirmation (warning logged on timeout) +- `workerWg` tracks the goroutine regardless — `Close()` always waits for full exit +- Reconciliation can start a replacement immediately — brief overlap is harmless with at-least-once semantics -## How It Works +### Logger Hierarchy -**Partition Leasing:** -1. Workers discover partitions from messages table -2. Workers acquire leases (one worker per partition) -3. Stale leases can be stolen by other workers +`sql.go` creates a root `queue_mysql` logger and passes named children to each component: -**Message Flow:** -1. Fetch visible messages (invisible_until <= now) -2. Process message -3. Ack: DELETE message, UPDATE offset -4. Nack: Message becomes visible after timeout -5. If retry_count >= MaxAttempts: Move to DLQ +``` +queue_mysql + ├── .publisher + ├── .subscriber + ├── .message_store + ├── .delivery_state_store + ├── .offset_store + ├── .partition_lease_store + └── .subscriber_heartbeat_store +``` -**Crash Recovery:** -- Messages become visible after visibility timeout -- Other workers steal stale leases -- Resume from last acked offset +Stores do not log errors — they return them. The subscriber propagates all errors to the top call site (`managePartitions` or `run`), which logs once with full context (`topic`, `consumer_group`, `subscriber_name`). -## Partition Ordering +## Testing -Messages with same `PartitionKey` are processed in order by a single worker. +### Unit Tests -## Distributed Processing +```bash +bazel test //extension/queue/mysql:mysql_test --test_output=streamed +bazel test //extension/queue/mysql/ctl/...:all --test_output=streamed +``` + +### Integration Tests + +Requires Docker running: + +```bash +bazel test //test/integration/extension/queue/... --test_output=streamed +``` -Multiple workers in the same consumer group share partitions. Workers in different consumer groups consume independently. +Integration tests cover: publish/subscribe, partition isolation, ordering, visibility timeout, nack with delay, idempotent publish, concurrent publishers, crash recovery, multiple consumer groups, rebalancing, DLQ, graceful shutdown, non-blocking nack, strict serialization (`BatchSize=1`), and independent consumer group state. diff --git a/extension/queue/mysql/ctl/README.md b/extension/queue/mysql/ctl/README.md index 1d63c0d4..6f9b4efa 100644 --- a/extension/queue/mysql/ctl/README.md +++ b/extension/queue/mysql/ctl/README.md @@ -43,7 +43,7 @@ bazel run //extension/queue/mysql/ctl -- topic-stats --topic merge_queue # List all topics with message counts queue-admin list-topics -# Detailed stats for a topic (visible/invisible messages, DLQ count, partitions, consumer groups) +# Detailed stats for a topic (total messages, DLQ count, partitions, consumer groups) queue-admin topic-stats --topic merge_queue ```