From 4f559385f478e42fd77fb3e6330e633ee3b6e4ce Mon Sep 17 00:00:00 2001 From: Preetam Dwivedi Date: Sun, 8 Mar 2026 13:00:41 -0700 Subject: [PATCH] docs(queue/mysql): update RFC, README, add integration tests --- doc/rfc/sql-queue-rfc.md | 354 ++++++++++--- extension/queue/mysql/README.md | 189 ++++--- .../extension/queue/mysql/queue_test.go | 483 ++++++++++++++++++ 3 files changed, 876 insertions(+), 150 deletions(-) diff --git a/doc/rfc/sql-queue-rfc.md b/doc/rfc/sql-queue-rfc.md index 18ba1905..4c7f17f9 100644 --- a/doc/rfc/sql-queue-rfc.md +++ b/doc/rfc/sql-queue-rfc.md @@ -7,11 +7,11 @@ | **Author** | Preetam Dwived | | **Status** | In Review | | **Created** | 2026-02-16 | -| **Updated** | 2026-02-16 | +| **Updated** | 2026-03-08 | ## Summary -MySQL-based distributed message queue with partition leasing, visibility timeout, and at-least-once delivery. Workers coordinate via database-native primitives without external systems. +MySQL-based distributed message queue with immutable message log, per-consumer-group delivery state, partition leasing, and at-least-once delivery. Workers coordinate via database-native primitives without external systems. ## Background @@ -27,29 +27,29 @@ SubmitQueue needs a reliable message queue for coordinating asynchronous workflo We evaluated several approaches: 1. **External Message Brokers** (Kafka, RabbitMQ) - - ❌ Additional operational overhead and infrastructure - - ❌ Network hops increase latency - - ✅ Battle-tested and highly scalable + - Additional operational overhead and infrastructure + - Network hops increase latency + - Battle-tested and highly scalable 2. **Watermill Library** (github.com/ThreeDotsLabs/watermill) - - ✅ Database-backed queue with mature abstractions - - ✅ Built-in middleware (retry, poison queue, metrics) - - ❌ Generic interface hides database-specific optimizations - - ❌ Additional dependency and learning curve - - ❌ Less control over exact SQL queries and behavior + - Database-backed queue with mature abstractions + - Built-in middleware (retry, poison queue, metrics) + - Generic interface hides database-specific optimizations + - Additional dependency and learning curve + - Less control over exact SQL queries and behavior 3. **dbqueue-go** (github.com/yunussandikci/dbqueue-go) - - ✅ Lightweight, simple FIFO queue over SQL (MySQL, PostgreSQL, SQLite) - - ✅ Basic features: priority, deduplication, visibility timeout - - ❌ No distributed worker coordination or partition leasing - - ❌ No built-in retry mechanism or DLQ - - ❌ Designed for single-worker scenarios, not multi-worker distribution + - Lightweight, simple FIFO queue over SQL (MySQL, PostgreSQL, SQLite) + - Basic features: priority, deduplication, visibility timeout + - No distributed worker coordination or partition leasing + - No built-in retry mechanism or DLQ + - Designed for single-worker scenarios, not multi-worker distribution 4. **Database-Backed Queue** (Custom implementation) - - ✅ Reuses existing MySQL infrastructure - - ✅ Full control over queries and behavior - - ✅ No additional services or dependencies - - ❌ More code to maintain + - Reuses existing MySQL infrastructure + - Full control over queries and behavior + - No additional services or dependencies + - More code to maintain ### Decision @@ -57,7 +57,7 @@ We chose **custom database-backed queue** because: - Full control over SQL queries for optimal performance - No additional libraries - direct use of database/sql - Simpler to understand and debug (no abstraction layers) -- Can optimize for our specific use case (partition ordering, visibility timeout) +- Can optimize for our specific use case (partition ordering, delivery state tracking) - Watermill adds valuable abstractions but we need fine-grained control ## Requirements @@ -70,7 +70,8 @@ We chose **custom database-backed queue** because: 4. **Crash Recovery** - Workers resume from last committed offset 5. **Distributed Workers** - Multiple workers coordinate without duplicate processing 6. **Dead Letter Queue** - Failed messages isolated after max retries -7. **Visibility Timeout** - Messages invisible during processing, visible if worker crashes +7. **Delivery State Tracking** - Per-consumer-group visibility, retry count, and ack state +8. **Multi-Consumer-Group** - Independent consumption of the same message log ### Non-Functional Requirements @@ -101,9 +102,14 @@ We chose **custom database-backed queue** because: │ MySQL Database │ │ ┌───────────────────────────────┐ │ │ │ queue_messages │ │ +│ │ (immutable append-only log) │ │ │ │ - topic, partition_key │ │ -│ │ - offset, invisible_until │ │ -│ │ - retry_count, payload │ │ +│ │ - offset, payload │ │ +│ └───────────────────────────────┘ │ +│ ┌───────────────────────────────┐ │ +│ │ queue_delivery_state │ │ +│ │ (per-consumer-group) │ │ +│ │ - invisible_until, retry │ │ │ └───────────────────────────────┘ │ │ ┌───────────────────────────────┐ │ │ │ queue_partition_leases │ │ @@ -115,6 +121,11 @@ We chose **custom database-backed queue** because: │ │ - consumer_group, topic │ │ │ │ - partition_key, offset │ │ │ └───────────────────────────────┘ │ +│ ┌───────────────────────────────┐ │ +│ │ queue_subscriber_heartbeats │ │ +│ │ - consumer_group, topic │ │ +│ │ - subscriber_name, heartbeat │ │ +│ └───────────────────────────────┘ │ └─────────────────────────────────────┘ │ ▼ @@ -133,33 +144,50 @@ We chose **custom database-backed queue** because: ### Core Concepts -**Partition Leasing:** Workers coordinate using database-native leases. Each partition leased by exactly one worker. Stale leases automatically stolen on crash. +**Immutable Message Log:** Messages are append-only in `queue_messages`. No per-message mutation occurs during delivery — the log is shared across all consumer groups. + +**Delivery State Tracking:** Per-consumer-group delivery state in `queue_delivery_state` tracks ack state, visibility timeout, and retry count independently. Each row has an explicit `acked` boolean and an `invisible_until` timestamp. When `acked = TRUE`, the message is done and never redelivered. When `acked = FALSE`, `invisible_until` controls visibility: past/zero = ready for delivery, future = in-flight or nack delay. -**Visibility Timeout:** Messages invisible during processing. Auto-retry on crash when timeout expires. +**Watermark-Based Offset:** On ack, the subscriber computes the contiguous acked watermark by scanning forward from the current `offset_acked` through delivery state. The watermark advances to the highest contiguous acked offset, and delivery state rows behind it are cleaned up. -**Persistent Retry Tracking:** `retry_count` incremented atomically on fetch, survives crashes, triggers DLQ. +**Partition Leasing:** Workers coordinate using database-native leases. Each partition leased by exactly one worker. Stale leases automatically stolen on crash. -**Offset Tracking:** Per-partition offsets enable crash recovery from last acked message. +**Fair Partition Distribution:** Subscribers send periodic heartbeats. Each subscriber calculates `ceil(totalPartitions / activeSubscribers)` to cap lease acquisitions and releases excess partitions during rebalance. + +**Persistent Retry Tracking:** `retry_count` incremented atomically on delivery via `ON DUPLICATE KEY UPDATE`, survives crashes, triggers DLQ after `MaxAttempts`. ## Database Schema -### Messages Table +### Messages Table (Immutable Log) + +Messages are append-only. No per-message mutation during delivery. **Key Fields:** - `offset` (PK): Auto-incrementing global offset for message ordering - `topic`, `partition_key`: Message routing and partitioning - `id`: Unique message identifier - `payload`, `metadata`: Message content -- `retry_count`: Persistent retry tracking (survives worker crashes) -- `invisible_until`: Visibility timeout (epoch ms) for crash recovery +- `failed_at`, `failure_count`, `last_error`, `original_topic`: DLQ-specific fields (zero values for normal messages, populated when message is moved to DLQ topic) - `created_at`, `published_at`: Timestamps **Indexes:** -- `(topic, partition_key, invisible_until, offset)`: Core fetch query - find visible messages in partition ordered by offset -- `(topic, partition_key, id)`: Unique constraint and fast lookup for Ack/Nack +- `(topic, partition_key, offset)`: Core fetch query — poll messages in partition ordered by offset +- `(topic, partition_key, id)`: Unique constraint and idempotent publish See `extension/queue/mysql/schema/queue_messages.sql` for full schema. +### Delivery State Table + +Per-consumer-group delivery tracking with explicit ack state. + +**Key Fields:** +- `consumer_group`, `topic`, `partition_key`, `message_offset` (PK): Identifies delivery state per consumer group per message +- `acked`: Whether this consumer group has successfully processed this message +- `invisible_until`: Visibility timeout in epoch milliseconds (only meaningful when `acked = FALSE`) +- `retry_count`: Number of times message has been redelivered to this consumer group + +See `extension/queue/mysql/schema/queue_delivery_state.sql` for full schema. + ### Partition Leases Table **Key Fields:** @@ -177,7 +205,7 @@ See `extension/queue/mysql/schema/queue_partition_leases.sql` for full schema. **Key Fields:** - `consumer_group`, `topic`, `partition_key` (PK): Identifies offset position -- `offset_acked`: Last successfully processed offset +- `offset_acked`: Contiguous acked watermark — all messages at or below this offset are fully processed - `updated_at`: Last update timestamp **Indexes:** @@ -186,54 +214,150 @@ See `extension/queue/mysql/schema/queue_partition_leases.sql` for full schema. See `extension/queue/mysql/schema/queue_offsets.sql` for full schema. -### Dead Letter Queue Table +### Subscriber Heartbeats Table **Key Fields:** -- `offset` (PK): Auto-incrementing offset for DLQ ordering -- `topic`, `partition_key`, `id`: Original message identification -- `payload`, `metadata`: Original message content -- `failed_at`: When message moved to DLQ -- `failure_count`, `last_error`: Failure diagnostics +- `consumer_group`, `topic`, `subscriber_name` (PK): Identifies the subscriber +- `heartbeat_at`: Unix timestamp in milliseconds of last heartbeat +- `deregistered_at`: Soft-delete timestamp (0 = active, >0 = deregistered during graceful shutdown) -**Indexes:** -- `(topic, partition_key, failed_at)`: Query DLQ by topic/partition, ordered by failure time -- `(failed_at)`: Time-based queries and cleanup -- `(topic, partition_key, id)`: Unique constraint, prevents duplicates +See `extension/queue/mysql/schema/queue_subscriber_heartbeats.sql` for full schema. + +### Dead Letter Queue -See `extension/queue/mysql/schema/queue_dlq.sql` for full schema. +DLQ messages are stored in the same `queue_messages` table under a different topic name (original topic + DLQ suffix, e.g., `merge_queue_dlq`). This allows DLQ messages to be consumed using the normal subscriber with the DLQ topic name. DLQ-specific fields (`failed_at`, `failure_count`, `last_error`, `original_topic`) are populated when a message is moved to DLQ; they are zero/empty for normal messages. ## Message Flow -**1. Publish** - Insert messages with AUTO_INCREMENT offset +**1. Publish** — Insert messages into `queue_messages` with AUTO_INCREMENT offset + +**2. Lease Acquisition** — `INSERT ... ON DUPLICATE KEY UPDATE` with stale lease detection + +**3. Fetch** — Read from immutable log: `SELECT ... WHERE topic=? AND partition_key=? AND offset > ?` + +**4. Delivery Check** — Check `queue_delivery_state` for per-consumer-group deliverability: + - No row (never delivered) → deliverable + - `acked = TRUE` → skip (already processed) + - `acked = FALSE`, `invisible_until <= now` (visibility expired) → deliverable (redelivery) + - `acked = FALSE`, `invisible_until > now` (in-flight/nack delay) → skip + +**5. Mark Delivered** — `INSERT ... ON DUPLICATE KEY UPDATE` in delivery state: set `invisible_until = now + timeout`, increment `retry_count` on redelivery (only if `acked = FALSE`) + +**5a. Extend Visibility** — `UPDATE invisible_until` only, without incrementing `retry_count`. Used by `ExtendVisibilityTimeout()` for long-running work. + +**6. Ack** — Set `acked = TRUE`, advance contiguous watermark, update `offset_acked`, clean up delivery state behind watermark. All operations are idempotent — errors propagate to the caller for retry. + +**7. Nack** — Set `invisible_until = now + delay` for retry after backoff + +**8. DLQ** — If `retry_count >= MaxAttempts`: atomically move message to DLQ topic (INSERT with DLQ topic + DELETE from original topic in transaction). MoveToDLQ must succeed before marking acked — otherwise the message would be lost from both main queue and DLQ. + +**9. Garbage Collection** — Delete messages where `offset <= MIN(offset_acked)` across all consumer groups + +## Consumer Group Isolation + +All per-message state is scoped to `(consumer_group, topic, partition_key)`. Nothing is global. Each consumer group has: + +- **Independent delivery state** — visibility timeout, retry count, and ack state per message are tracked separately in `queue_delivery_state`. Consumer group A nacking a message has no effect on consumer group B's view of the same message. +- **Independent offsets** — each group maintains its own `offset_acked` watermark in `queue_offsets`. Group A can be ahead or behind group B. +- **Independent partition leases** — each group has its own set of leases in `queue_partition_leases`. Workers in group A do not compete with workers in group B. +- **Independent heartbeats** — subscriber heartbeats are scoped to `(consumer_group, topic)` for fair share computation within a group. +- **Shared immutable log** — `queue_messages` is the only shared table. It is append-only and never mutated by consumers. All consumer groups read from the same log but track their own position and delivery state. + +Garbage collection is the only cross-group operation: `GarbageCollect` computes `MIN(offset_acked)` across all consumer groups for a partition, ensuring a message is only deleted after every group has acked past it. + +## Ordering and Serialization + +### Default: Concurrent Delivery Within Partition + +By default, the poll loop fetches a batch of messages (`BatchSize`, default 10) from the immutable log and delivers each one that passes the `IsDeliverable()` check. Multiple messages from the same partition can be in-flight concurrently. Messages are delivered in offset order but may be acked out of order. + +### Non-Blocking Nack + +When a message is nacked, its `invisible_until` is set to a future timestamp. On the next poll, the nacked message is skipped via `IsDeliverable()` while subsequent messages are still delivered normally. A nacked message does not block, starve, or delay any other message in the partition. + +Example with 5 messages at offsets 1-5, all delivered: +- Message 3 is nacked with 30s delay +- Messages 1, 2, 4, 5 can be acked independently +- Watermark advances to 2 (contiguous from head), stops at 3 (not acked) +- After 30s, message 3 becomes deliverable again, is redelivered +- Once message 3 is acked, watermark jumps from 2 to 5 + +### Strict Serialization (Opt-In) + +For use cases requiring strict in-order processing (e.g., ordered state machine transitions), set `BatchSize = 1`. This ensures: + +1. Only one message is fetched per poll cycle +2. Only one message is in-flight at a time per partition +3. The next message is not fetched until the current one is acked or nacked and becomes invisible -**2. Lease Acquisition** - `INSERT ... ON DUPLICATE KEY UPDATE` with stale lease detection +**Requirement:** `VisibilityTimeoutMs` must exceed the maximum processing time. If the timeout expires before the consumer acks, the message becomes deliverable again and may be delivered concurrently with the next poll — violating serialization. Set a generous timeout and use `ExtendVisibilityTimeout()` for long-running work. -**3. Fetch** - Atomic UPDATE sets `invisible_until` and increments `retry_count` +### Watermark Advancement -**4. Ack** - Transaction: DELETE message + UPDATE offset_acked +The `offset_acked` watermark represents the highest contiguous acked offset — all messages at or below this offset are fully processed. On each ack, the subscriber: -**5. Nack** - UPDATE `invisible_until` for retry after delay +1. Fetches message offsets above the current watermark from `queue_messages` +2. Batch-fetches delivery state for those offsets from `queue_delivery_state` +3. Walks offsets in order: advances while contiguous acked, stops at the first non-acked or undelivered message +4. Updates `offset_acked` to the new watermark +5. Cleans up delivery state rows behind the new watermark (eventual consistency — stale rows are harmless and retried on next call) -**6. DLQ** - If `retry_count >= MaxAttempts`: DELETE from messages + INSERT into dlq +The two-query approach avoids cross-table JOINs. Each store queries only its own table; the subscriber orchestrates both. + +All watermark operations are idempotent. If any step fails, the error propagates to the caller for retry. The ack is not considered complete until the watermark is advanced — this ensures the caller retries the full sequence rather than silently losing watermark progress. + +## Message Durability + +Messages are never silently lost. Every deletion path has explicit guards: + +**Garbage Collection:** Only deletes messages where `offset <= MIN(offset_acked)` across all consumer groups. If any consumer group has not acked past a message, it is retained. Safe under concurrent reads — a consumer processing a message at offset N has not yet acked it, so `MIN(offset_acked)` stays below N. + +**Move to DLQ:** Atomic transaction: INSERT into DLQ topic, then DELETE from original topic. If the transaction fails at any point, the original message is preserved via ROLLBACK. The message never exists in zero tables. + +**Delivery State Cleanup:** The `AdvanceWatermark` cleanup DELETE only removes delivery state rows (not messages) behind the contiguous watermark. Stale rows are harmless (never queried — all reads use offset > watermark) and cleaned up on the next call. + +**No Silent Deletions:** There is no code path that deletes a message without either (a) all consumer groups having acked past it, or (b) an atomic move to DLQ. The `Delete()` method exists on the store interface but is not called in any production flow. ## Crash Recovery -**Scenario:** Worker crashes while processing message +**Scenario:** Worker crashes while processing a message **What happens:** -1. Message has `invisible_until = crash_time + VisibilityTimeout` -2. After timeout expires, message becomes visible +1. Delivery state has `invisible_until = crash_time + VisibilityTimeout` +2. After timeout expires, message becomes deliverable again via `IsDeliverable()` 3. Another worker detects stale lease and steals partition -4. Message redelivered (at-least-once guarantee) -5. `retry_count` incremented prevents infinite retries +4. Message is redelivered (at-least-once guarantee) +5. `retry_count` increment on redelivery prevents infinite retries + +**Scenario:** Worker crashes after ack but before watermark update + +**What happens:** +1. Message is marked acked in delivery state (`acked = TRUE`) +2. Watermark was not advanced (crash interrupted the flow) +3. On next ack of any message in this partition, watermark scans forward and catches up +4. No message loss — the acked message is simply not re-delivered -**Key properties:** Automatic failover, no data loss, configurable retry delay +**Key properties:** Automatic failover, no data loss, configurable retry delay, eventual watermark convergence. ## Distributed Processing -**Same Consumer Group:** Workers distribute partitions via leasing. Each partition processed by one worker. +**Same Consumer Group:** Workers distribute partitions via fair leasing. Each partition processed by one worker. Heartbeats enable `ceil(totalPartitions / activeSubscribers)` fair share computation. Rebalance releases excess partitions when new subscribers join. -**Different Consumer Groups:** Independent consumption with separate offsets. Same messages delivered to all groups. +**Different Consumer Groups:** Independent consumption with separate delivery state and offsets. Same immutable message log consumed by all groups. One group's nacks, retries, and DLQ moves have no effect on other groups. + +## Fair Partition Distribution + +Subscribers send periodic heartbeats to `queue_subscriber_heartbeats`. The fair share algorithm: + +1. Count active subscribers (heartbeat within `LeaseDurationMs`) +2. Count total partitions (union of owned leases + discovered from messages table) +3. Compute `maxPartitions = ceil(totalPartitions / activeSubscribers)` +4. Cap lease acquisitions at `maxPartitions` +5. On each lease renewal tick, if this subscriber holds more than `maxPartitions`, release excess (sorted deterministically so the same partitions are released across runs) + +**Graceful shutdown:** Subscribers deregister their heartbeat and release all leases, enabling immediate redistribution. + +**Failure:** If fair share computation fails (heartbeat store error, discovery error), the subscriber falls back to unlimited acquisition — ensuring availability over perfect fairness. ## Alternatives Considered @@ -251,7 +375,7 @@ See `extension/queue/mysql/schema/queue_dlq.sql` for full schema. - **No partition key support** - can't guarantee ordering within partitions - **Requires table per topic** - schema migration for every new topic, not friendly for dynamic topics - Generic interface hides database-specific optimizations -- Less control over exact SQL queries (e.g., can't optimize visibility timeout logic) +- Less control over exact SQL queries (e.g., can't optimize delivery state logic) - Additional dependency to maintain and version - More infrastructure to maintain (separate tables, schema management) - Learning curve for team (new library semantics) @@ -269,7 +393,7 @@ With our custom implementation: - No schema migrations for new repos or topics - Ordering guaranteed within `(topic, partition_key)` -**Decision:** Custom implementation gives us partition ordering guarantees and single-table design. Watermill is valuable for complex multi-backend scenarios but doesn't fit our partition-based ordering requirements. +**Decision:** Custom implementation gives us partition ordering guarantees, immutable log design, and single-table simplicity. Watermill is valuable for complex multi-backend scenarios but doesn't fit our partition-based ordering requirements. ### Single-Table Per Topic @@ -279,41 +403,113 @@ With our custom implementation: **Decision:** Single-table design for operational simplicity. +### Mutable Message State (Previous Design) + +The original design tracked visibility timeout and retry count directly on `queue_messages` rows. This worked but had limitations: +- Messages were mutated on every delivery, creating write contention +- Multiple consumer groups couldn't independently track delivery state for the same message +- Garbage collection required complex coordination since messages could be in different states per consumer group + +The current design separates the immutable message log from per-consumer-group delivery state, enabling independent consumption and cleaner GC (delete when all groups have acked past a message). + ## Trade-offs **Polling vs Push** -- ✅ Simpler (no connection management), natural backpressure -- ❌ Higher latency (configurable via PollInterval) +- Simpler (no connection management), natural backpressure +- Higher latency (configurable via PollInterval) - Mitigation: Tune PollInterval (default 100ms, tests 20ms) +**Immutable Log + Delivery State vs Mutable Messages** +- Multiple consumer groups consume independently +- Cleaner GC (min watermark across groups) +- Extra table for delivery state tracking +- Mitigation: Delivery state rows cleaned up behind watermark; two separate queries (no JOIN) for watermark advancement + +**Non-Blocking Nack vs Head-of-Line Blocking** +- Nacked messages don't starve the partition — later messages flow normally +- Out-of-order acks mean watermark only advances past contiguous acked blocks +- More delivery state rows to track (cleaned up behind watermark) +- Mitigation: Watermark advancement cleans up rows; GC deletes messages behind min watermark + +**Strict Serialization (BatchSize=1) vs Concurrent Delivery** +- Strict ordering guaranteed within partition when needed +- Lower throughput (one message at a time per partition) +- Requires correct VisibilityTimeoutMs configuration (must exceed max processing time) +- Mitigation: Use for ordering-sensitive topics only; concurrent delivery for throughput-sensitive topics + **Visibility Timeout vs Heartbeat** -- ✅ No heartbeat protocol, automatic retry -- ❌ Full timeout delay even on immediate crash +- No heartbeat protocol per message, automatic retry +- Full timeout delay even on immediate crash - Mitigation: ExtendVisibilityTimeout() for long tasks **Database Leasing vs External Coordinator** -- ✅ No ZooKeeper/etcd, transactional consistency -- ❌ Lease renewal overhead +- No ZooKeeper/etcd, transactional consistency +- Lease renewal overhead - Mitigation: Tunable renewal interval (default 10s) +**Fair Share via Heartbeats vs Static Assignment** +- Dynamic rebalancing as subscribers join/leave +- Eventually consistent (brief imbalance during transitions) +- Mitigation: Rebalance on every lease renewal tick; deterministic partition release order + **At-Least-Once vs Exactly-Once** -- ✅ Simpler, better performance -- ❌ Applications must handle duplicates +- Simpler, better performance +- Applications must handle duplicates - Mitigation: Idempotency keys (e.g., merge request ID) - ## Observability -**Metrics (via tally):** -- Publisher: `messages_published`, `publish_errors` -- Subscriber: `messages_acked`, `messages_nacked`, `messages_moved_to_dlq`, `message_age`, `leases_acquired` -- Stores: `insert.latency`, `fetch.latency`, `ack_message.latency`, `renew_lease.latency` +### Metrics (via tally) + +All metrics use `metrics.NamedCounter`, `metrics.NamedGauge`, and `metrics.NamedTimer` from `core/metrics`. Store-level operations use `metrics.Begin`/`Complete` for latency and success/failure tracking, tagged with `topic`, `consumer_group`, and `partition_key` where applicable. + +**Publisher:** +- `messages_published`, `publish_errors` + +**Subscriber — Poll:** +- `poll.messages_delivered`, `poll.message_age`, `poll.latency` + +**Subscriber — Lifecycle:** +- `subscribe.active_subscriptions` + +**Stores (via `metrics.Begin`/`Complete`):** +- Each operation has `.succeeded`, `.failed`, `.latency` (e.g., `message_store.insert.succeeded`, `delivery_state_store.mark_acked.latency`, `delivery_state_store.extend_visibility.latency`) +- All tagged with `topic`, `consumer_group`, `partition_key` where the method has those parameters +- `message_store.gc.messages_deleted` +- `delivery_state_store.advance_watermark.cleanup_errors` + +No duplicate metrics between subscriber and stores — stores are the single source of truth for per-operation tracking. + +### Logging (via zap) + +Logger hierarchy: `queue_mysql.{component}` (e.g., `queue_mysql.subscriber`, `queue_mysql.message_store`). + +**Log Severity Guidelines:** + +| Level | When Used | Examples | +|-------|-----------|---------| +| Debug | Normal operational details | Message fetch counts, partition worker start/stop, subscription creation, nack | +| Info | Significant state changes | Publish success, partition rebalance releases, subscriber open/close | +| Warn | Swallowed errors with documented reason | Discover partitions fallback, worker stop timeout, message exceeded retry limit | +| Error | All propagated errors logged at call site | Poll failure, lease renewal failure, heartbeat failure, rebalance failure | + +**Structured fields:** `topic`, `partition_key`, `message_id`, `offset`, `retry_count`, `error`, `watermark`, `consumer_group`, `subscriber_name` + +### Error Handling Architecture + +**Stores** return errors via `fmt.Errorf` — they do not log errors themselves (no double-logging). The only exceptions are documented swallowed errors: +- `RowsAffected()` driver failures after a successful DELETE (diagnostic only) +- Per-partition lease acquisition failures in `DiscoverAndAcquirePartitions` (one partition's failure shouldn't block others) +- Watermark cleanup DELETE failure (stale rows are harmless, retried on next call) +- `DiscoverPartitions` failure in fair share (graceful degradation to owned-only) + +Each swallowed error has an inline comment explaining why it cannot be propagated. + +**Subscriber** propagates all errors to the top call site (`managePartitions` or `run`). Intermediate methods like `renewLeases`, `sendHeartbeat`, `rebalance`, and `pollAndDeliver` return errors — they never log internally. The call site logs once with full context (`topic`, `consumer_group`, `subscriber_name`). + +**Ack, Reject** propagate all errors to the caller. All operations are idempotent — callers can safely retry. No errors are swallowed. -**Logging (via zap):** -- Debug: Message fetch, lease operations -- Info: Publish success, DLQ moves, partition acquisition -- Error: Database errors, unrecoverable failures -- Structured fields: `topic`, `partition_key`, `message_id`, `offset`, `retry_count` +**`run` (ticker loop)** is the terminal call site — there is no upstream caller. Errors from `pollAndDeliver` are logged and the next tick retries automatically. ## Performance @@ -336,4 +532,4 @@ With our custom implementation: **Alternative Implementations:** - [Watermill Documentation](https://watermill.io/) - Go library for message streaming (evaluated alternative) -- [PostgreSQL SKIP LOCKED](https://www.postgresql.org/docs/current/sql-select.html#SQL-FOR-UPDATE-SHARE) - Alternative database queue pattern \ No newline at end of file +- [PostgreSQL SKIP LOCKED](https://www.postgresql.org/docs/current/sql-select.html#SQL-FOR-UPDATE-SHARE) - Alternative database queue pattern diff --git a/extension/queue/mysql/README.md b/extension/queue/mysql/README.md index e5dd22f8..3bd96959 100644 --- a/extension/queue/mysql/README.md +++ b/extension/queue/mysql/README.md @@ -1,14 +1,8 @@ # SQL Queue Implementation -MySQL-based distributed queue with partition leasing, visibility timeout, and at-least-once delivery. +MySQL-based distributed queue with partition leasing, delivery state tracking, and at-least-once delivery. -## Key Features - -- **Partition leasing** — workers coordinate via database leases with automatic failover -- **Per-partition workers** — each leased partition gets its own goroutine for isolation -- **Visibility timeout** — messages retry automatically if worker crashes -- **At-least-once delivery** — offset tracking for crash recovery -- **Dead letter queue** — failed messages moved to DLQ after max retries +For design rationale, guarantees, and trade-offs, see the [RFC](../../doc/rfc/sql-queue-rfc.md). ## Quick Start @@ -53,10 +47,8 @@ Per-subscription configuration enables different settings for each topic: ```go import extqueue "github.com/uber/submitqueue/extension/queue" -// Default config subConfig := extqueue.DefaultSubscriptionConfig("worker-1", "consumer-group") -// Customize for this subscription subConfig.PollIntervalMs = 50 // Poll frequency (milliseconds) subConfig.BatchSize = 20 // Messages per poll subConfig.VisibilityTimeoutMs = 60000 // Retry delay (milliseconds) @@ -68,93 +60,148 @@ subConfig.Retry.MaxBackoffMs = 30000 // Max retry backoff (mill subConfig.Retry.BackoffMultiplier = 2.0 // Backoff multiplier for exponential backoff subConfig.DLQ.Enabled = true // Enable dead letter queue subConfig.DLQ.TopicSuffix = "_dlq" // DLQ topic suffix - -// Use config when subscribing -deliveryCh, _ := q.Subscriber().Subscribe(ctx, "my-topic", subConfig) ``` **Key Configuration Fields:** -- `SubscriberName`: Unique worker identifier for partition leasing (e.g., hostname, pod name) -- `ConsumerGroup`: Consumer group for independent offset tracking -- `PollIntervalMs`: How often to poll for new messages (milliseconds) -- `BatchSize`: Maximum messages to fetch per poll -- `VisibilityTimeoutMs`: How long messages are invisible after being fetched (milliseconds) -- `LeaseRenewalIntervalMs`: How often to renew partition leases (milliseconds) -- `LeaseDurationMs`: How long leases remain valid without renewal (milliseconds) -- `Retry.MaxAttempts`: Maximum processing attempts before moving to DLQ -- `Retry.InitialBackoffMs`: Initial retry backoff delay (milliseconds) -- `Retry.MaxBackoffMs`: Maximum retry backoff delay (milliseconds) -- `Retry.BackoffMultiplier`: Multiplier for exponential backoff -- `DLQ.TopicSuffix`: Suffix appended to topic name for DLQ (e.g., "orders" -> "orders_dlq") - -## Architecture +| Field | Description | +|-------|-------------| +| `SubscriberName` | Unique worker identifier for partition leasing (e.g., hostname, pod name) | +| `ConsumerGroup` | Consumer group for independent offset tracking | +| `PollIntervalMs` | How often to poll for new messages | +| `BatchSize` | Maximum messages to fetch per poll. Set to `1` for strict serialization | +| `VisibilityTimeoutMs` | How long messages are invisible after fetch. Must exceed max processing time for `BatchSize=1` | +| `LeaseRenewalIntervalMs` | How often to renew partition leases | +| `LeaseDurationMs` | How long leases remain valid without renewal | +| `Retry.MaxAttempts` | Maximum processing attempts before DLQ | +| `DLQ.TopicSuffix` | Suffix appended to topic name for DLQ (e.g., `"orders"` → `"orders_dlq"`) | -### Goroutine Model +### Strict Serialization + +For strict in-order processing within a partition, set `BatchSize = 1`: -Each subscription has a **supervisor goroutine** (`managePartitions`) that: -1. Discovers partitions from the messages table -2. Acquires and renews partition leases -3. Reconciles **per-partition worker goroutines** based on current leases +```go +cfg := extqueue.DefaultSubscriptionConfig("worker-1", "ordered-consumer") +cfg.BatchSize = 1 // One message in-flight at a time +cfg.VisibilityTimeoutMs = 120000 // Must exceed max processing time +``` -Each partition worker goroutine polls and delivers messages for its partition independently. This provides fault isolation — a slow or blocked partition does not affect other partitions. +This guarantees no concurrent processing within a partition. See the [RFC](../../doc/rfc/sql-queue-rfc.md#ordering-and-serialization) for details on ordering semantics and non-blocking nack behavior. + +## Package Layout + +``` +extension/queue/mysql/ +├── sql.go # NewQueue constructor, wires stores → publisher/subscriber +├── stores.go # Internal store interfaces (messageStore, offsetStore, etc.) +├── message_store.go # queue_messages table operations (immutable log) +├── delivery_state_store.go # queue_delivery_state table operations (per-consumer-group) +├── offset_store.go # queue_offsets table operations (watermark tracking) +├── partition_lease_store.go # queue_partition_leases table operations +├── subscriber_heartbeat_store.go # queue_subscriber_heartbeats table operations +├── publisher.go # Publisher implementation +├── subscriber.go # Subscriber, delivery, goroutine management +├── constants.go # Log key constants +├── errors.go # Error types +├── schema/ # SQL schema files (one per table) +│ ├── queue_messages.sql +│ ├── queue_delivery_state.sql +│ ├── queue_offsets.sql +│ ├── queue_partition_leases.sql +│ └── queue_subscriber_heartbeats.sql +└── ctl/ # Admin CLI (see ctl/README.md) +``` + +## Internal Architecture + +### Database Tables + +| Table | Purpose | Scoped To | +|-------|---------|-----------| +| `queue_messages` | Immutable append-only message log | `(topic, partition_key)` — shared across consumer groups | +| `queue_delivery_state` | Visibility, ack state, retry count | `(consumer_group, topic, partition_key, offset)` | +| `queue_offsets` | Contiguous acked watermark | `(consumer_group, topic, partition_key)` | +| `queue_partition_leases` | Partition lease coordination | `(consumer_group, topic, partition_key)` | +| `queue_subscriber_heartbeats` | Active subscriber tracking | `(consumer_group, topic, subscriber_name)` | + +See `schema/` for full SQL definitions. See the [RFC](../../doc/rfc/sql-queue-rfc.md#database-schema) for field-level documentation. + +### Store Architecture + +Each table is backed by an internal store interface defined in `stores.go`. Stores: +- Query only their own table (no cross-table JOINs) +- Return errors via `fmt.Errorf` (no logging, no error classification) +- Use `metrics.Begin`/`Complete` for latency and success/failure tracking + +The subscriber layer orchestrates cross-store operations (e.g., watermark advancement queries both `messageStore` and `deliveryStateStore`) and owns all logging and error classification. + +### Goroutine Model + +Each subscription has a **supervisor goroutine** (`managePartitions`) that discovers partitions, acquires leases, sends heartbeats, rebalances, and reconciles per-partition worker goroutines. ``` Subscribe() - └── managePartitions (supervisor) - ├── partitionWorker("part-1") ← polls & delivers - ├── partitionWorker("part-2") ← polls & delivers - └── partitionWorker("part-3") ← polls & delivers + └── managePartitions (supervisor) ← tracked by sub.wg + ├── partitionWorker("part-1") ← tracked by sub.workerWg + ├── partitionWorker("part-2") + └── partitionWorker("part-3") ``` -### Shutdown Sequence +Each partition worker runs independently — polls the DB on a ticker, checks deliverability per message, and sends deliveries to the shared channel. A slow or blocked partition does not affect other partitions. -Shutdown uses two `sync.WaitGroup`s to ensure correctness: -- `wg` tracks the supervisor goroutine (`managePartitions`) -- `workerWg` tracks all partition worker goroutines +### Shutdown Sequence When `Close()` is called: 1. Subscription context is cancelled -2. `managePartitions` calls `stopAllWorkers` — cancels each worker and waits up to 5s per worker -3. Partition leases are released -4. `workerWg.Wait()` blocks until all workers have fully exited -5. `deliveryCh` is closed — safe because no workers can send after step 4 -6. `managePartitions` returns, `wg.Done()` fires -7. `Close()` returns +2. `managePartitions` calls `stopAllWorkers` — cancels each worker's context, waits up to 30s +3. Partition leases are released (fresh context, not cancelled) +4. Subscriber heartbeat is deregistered +5. `workerWg.Wait()` — blocks until all workers have fully exited +6. `deliveryCh` is closed — safe because no senders remain after step 5 +7. `managePartitions` returns → `wg.Done()` → `Close()` unblocks -The `workerWg.Wait()` step prevents a race where a slow worker (blocked on I/O past the 5s timeout) could send on a closed channel. +The `workerWg.Wait()` before `close(deliveryCh)` prevents a race where a slow worker could send on a closed channel. ### Worker Stop Behavior When a partition worker is stopped (lease lost or shutdown): -- The worker is immediately removed from the workers map and its context is cancelled -- The caller waits up to 5s for the worker to confirm exit (logging a warning on timeout) -- `workerWg` tracks the worker regardless, so `Close()` always waits for full exit -- If the worker times out, reconciliation is free to start a replacement — any brief overlap is harmless with at-least-once delivery semantics +- Immediately removed from workers map and context cancelled +- Caller waits up to 30s for exit confirmation (warning logged on timeout) +- `workerWg` tracks the goroutine regardless — `Close()` always waits for full exit +- Reconciliation can start a replacement immediately — brief overlap is harmless with at-least-once semantics -## How It Works +### Logger Hierarchy -**Partition Leasing:** -1. Workers discover partitions from messages table -2. Workers acquire leases (one worker per partition) -3. Stale leases can be stolen by other workers +`sql.go` creates a root `queue_mysql` logger and passes named children to each component: -**Message Flow:** -1. Fetch visible messages (invisible_until <= now) -2. Process message -3. Ack: DELETE message, UPDATE offset -4. Nack: Message becomes visible after timeout -5. If retry_count >= MaxAttempts: Move to DLQ +``` +queue_mysql + ├── .publisher + ├── .subscriber + ├── .message_store + ├── .delivery_state_store + ├── .offset_store + ├── .partition_lease_store + └── .subscriber_heartbeat_store +``` -**Crash Recovery:** -- Messages become visible after visibility timeout -- Other workers steal stale leases -- Resume from last acked offset +Stores do not log errors — they return them. The subscriber propagates all errors to the top call site (`managePartitions` or `run`), which logs once with full context (`topic`, `consumer_group`, `subscriber_name`). See the [RFC](../../doc/rfc/sql-queue-rfc.md#error-handling-architecture) for the full error handling architecture. -## Partition Ordering +## Testing -Messages with same `PartitionKey` are processed in order by a single worker. +### Unit Tests + +```bash +bazel test //extension/queue/mysql:mysql_test --test_output=streamed +bazel test //extension/queue/mysql/ctl/...:all --test_output=streamed +``` -## Distributed Processing +### Integration Tests + +Requires Docker running: + +```bash +bazel test //test/integration/extension/queue/... --test_output=streamed +``` -Multiple workers in the same consumer group share partitions. Workers in different consumer groups consume independently. +Integration tests cover: publish/subscribe, partition isolation, ordering, visibility timeout, nack with delay, idempotent publish, concurrent publishers, crash recovery, multiple consumer groups, rebalancing, DLQ, graceful shutdown, non-blocking nack, strict serialization (`BatchSize=1`), and independent consumer group state. diff --git a/test/integration/extension/queue/mysql/queue_test.go b/test/integration/extension/queue/mysql/queue_test.go index ae15ea8a..0ce1f666 100644 --- a/test/integration/extension/queue/mysql/queue_test.go +++ b/test/integration/extension/queue/mysql/queue_test.go @@ -40,6 +40,17 @@ import ( // testTimeout is the safety-net duration for channel waits in integration tests. const testTimeout = 10 * time.Second +// Timing constants for rebalance tests. Keep poll/lease intervals short to make +// tests converge fast, but lease duration long enough that active subscribers +// don't expire each other. +const ( + rebalancePollIntervalMs = 100 + rebalanceLeaseRenewalIntervalMs = 200 + rebalanceLeaseDurationMs = 1000 + rebalanceConvergeTimeout = 10 * time.Second + rebalanceConvergePollInterval = 200 * time.Millisecond +) + type SQLQueueIntegrationSuite struct { suite.Suite ctx context.Context @@ -1620,3 +1631,475 @@ func (s *SQLQueueIntegrationSuite) TestAdmin_ResetOffsetAndReleaseLease() { t.Logf("Reset offset and release lease verified") } + +// --- Rebalance integration tests --- + +// getPartitionLeases queries the partition lease table and returns a map from +// subscriber name to the set of partition keys it owns for the given topic and +// consumer group. +func getPartitionLeases(db *sql.DB, topic, consumerGroup string) (map[string][]string, error) { + rows, err := db.Query( + "SELECT leased_by, partition_key FROM queue_partition_leases WHERE topic = ? AND consumer_group = ? ORDER BY leased_by, partition_key", + topic, consumerGroup, + ) + if err != nil { + return nil, err + } + defer rows.Close() + + result := make(map[string][]string) + for rows.Next() { + var owner, pk string + if err := rows.Scan(&owner, &pk); err != nil { + return nil, err + } + result[owner] = append(result[owner], pk) + } + return result, nil +} + +// rebalanceSubConfig returns a SubscriptionConfig tuned for fast rebalance tests. +func rebalanceSubConfig(subscriberName, consumerGroup string) extqueue.SubscriptionConfig { + cfg := extqueue.DefaultSubscriptionConfig(subscriberName, consumerGroup) + cfg.PollIntervalMs = rebalancePollIntervalMs + cfg.LeaseRenewalIntervalMs = rebalanceLeaseRenewalIntervalMs + cfg.LeaseDurationMs = rebalanceLeaseDurationMs + return cfg +} + +func (s *SQLQueueIntegrationSuite) TestRebalance_EvenDistribution() { + t := s.T() + + topic := "rebalance_even_topic" + consumerGroup := "rebalance-even-cg" + partitions := []string{"pk-1", "pk-2", "pk-3", "pk-4"} + + // Publish one message per partition so they are discoverable. + pubQ, err := queueMySQL.NewQueue(queueMySQL.Params{ + DB: s.db, Logger: zaptest.NewLogger(t), MetricsScope: tally.NoopScope, + }) + require.NoError(t, err) + defer pubQ.Close() + + for i, pk := range partitions { + msg := queue.NewMessage(fmt.Sprintf("rb-even-%d", i), []byte("x"), pk, nil) + require.NoError(t, pubQ.Publisher().Publish(s.ctx, topic, msg)) + } + + // S1: subscribe, should acquire all 4 partitions (only subscriber). + q1, err := queueMySQL.NewQueue(queueMySQL.Params{ + DB: s.db, Logger: zaptest.NewLogger(t), MetricsScope: tally.NoopScope, + }) + require.NoError(t, err) + defer q1.Close() + + _, err = q1.Subscriber().Subscribe(s.ctx, topic, rebalanceSubConfig("s1", consumerGroup)) + require.NoError(t, err) + + require.Eventually(t, func() bool { + leases, _ := getPartitionLeases(s.db, topic, consumerGroup) + return len(leases["s1"]) == 4 + }, rebalanceConvergeTimeout, rebalanceConvergePollInterval, "S1 should acquire all 4 partitions") + + // S2: subscribe. After rebalancing, each should own 2. + q2, err := queueMySQL.NewQueue(queueMySQL.Params{ + DB: s.db, Logger: zaptest.NewLogger(t), MetricsScope: tally.NoopScope, + }) + require.NoError(t, err) + defer q2.Close() + + _, err = q2.Subscriber().Subscribe(s.ctx, topic, rebalanceSubConfig("s2", consumerGroup)) + require.NoError(t, err) + + require.Eventually(t, func() bool { + leases, _ := getPartitionLeases(s.db, topic, consumerGroup) + return len(leases["s1"]) == 2 && len(leases["s2"]) == 2 + }, rebalanceConvergeTimeout, rebalanceConvergePollInterval, "each subscriber should own exactly 2 partitions") + + t.Logf("Even distribution verified: 4 partitions split evenly across 2 subscribers") +} + +func (s *SQLQueueIntegrationSuite) TestRebalance_SubscriberLeaves() { + t := s.T() + + topic := "rebalance_leave_topic" + consumerGroup := "rebalance-leave-cg" + partitions := []string{"pk-1", "pk-2", "pk-3", "pk-4"} + + // Publish messages. + pubQ, err := queueMySQL.NewQueue(queueMySQL.Params{ + DB: s.db, Logger: zaptest.NewLogger(t), MetricsScope: tally.NoopScope, + }) + require.NoError(t, err) + defer pubQ.Close() + + for i, pk := range partitions { + msg := queue.NewMessage(fmt.Sprintf("rb-leave-%d", i), []byte("x"), pk, nil) + require.NoError(t, pubQ.Publisher().Publish(s.ctx, topic, msg)) + } + + // S1 + S2 start, wait for 2+2 split. + q1, err := queueMySQL.NewQueue(queueMySQL.Params{ + DB: s.db, Logger: zaptest.NewLogger(t), MetricsScope: tally.NoopScope, + }) + require.NoError(t, err) + defer q1.Close() + + q2, err := queueMySQL.NewQueue(queueMySQL.Params{ + DB: s.db, Logger: zaptest.NewLogger(t), MetricsScope: tally.NoopScope, + }) + require.NoError(t, err) + // no defer close — we close explicitly below + + _, err = q1.Subscriber().Subscribe(s.ctx, topic, rebalanceSubConfig("s1", consumerGroup)) + require.NoError(t, err) + _, err = q2.Subscriber().Subscribe(s.ctx, topic, rebalanceSubConfig("s2", consumerGroup)) + require.NoError(t, err) + + require.Eventually(t, func() bool { + leases, _ := getPartitionLeases(s.db, topic, consumerGroup) + return len(leases["s1"])+len(leases["s2"]) == 4 && len(leases["s1"]) == 2 && len(leases["s2"]) == 2 + }, rebalanceConvergeTimeout, rebalanceConvergePollInterval, "2+2 split should converge") + + // S2 leaves: close releases leases and deregisters heartbeat. + require.NoError(t, q2.Close()) + + // S1's discovery loop will detect orphaned (expired) partitions and acquire them. + require.Eventually(t, func() bool { + leases, _ := getPartitionLeases(s.db, topic, consumerGroup) + return len(leases["s1"]) == 4 + }, rebalanceConvergeTimeout, rebalanceConvergePollInterval, "S1 should reacquire all 4 partitions after S2 leaves") + + t.Logf("Subscriber leave verified: S1 owns all 4 partitions after S2 departed") +} + +func (s *SQLQueueIntegrationSuite) TestRebalance_OddPartitions() { + t := s.T() + + topic := "rebalance_odd_topic" + consumerGroup := "rebalance-odd-cg" + partitions := []string{"pk-1", "pk-2", "pk-3", "pk-4", "pk-5"} + + pubQ, err := queueMySQL.NewQueue(queueMySQL.Params{ + DB: s.db, Logger: zaptest.NewLogger(t), MetricsScope: tally.NoopScope, + }) + require.NoError(t, err) + defer pubQ.Close() + + for i, pk := range partitions { + msg := queue.NewMessage(fmt.Sprintf("rb-odd-%d", i), []byte("x"), pk, nil) + require.NoError(t, pubQ.Publisher().Publish(s.ctx, topic, msg)) + } + + q1, err := queueMySQL.NewQueue(queueMySQL.Params{ + DB: s.db, Logger: zaptest.NewLogger(t), MetricsScope: tally.NoopScope, + }) + require.NoError(t, err) + defer q1.Close() + + q2, err := queueMySQL.NewQueue(queueMySQL.Params{ + DB: s.db, Logger: zaptest.NewLogger(t), MetricsScope: tally.NoopScope, + }) + require.NoError(t, err) + defer q2.Close() + + _, err = q1.Subscriber().Subscribe(s.ctx, topic, rebalanceSubConfig("s1", consumerGroup)) + require.NoError(t, err) + _, err = q2.Subscriber().Subscribe(s.ctx, topic, rebalanceSubConfig("s2", consumerGroup)) + require.NoError(t, err) + + // maxPart = ceil(5/2) = 3. One gets 3, the other gets 2. + require.Eventually(t, func() bool { + leases, _ := getPartitionLeases(s.db, topic, consumerGroup) + total := len(leases["s1"]) + len(leases["s2"]) + max := len(leases["s1"]) + if len(leases["s2"]) > max { + max = len(leases["s2"]) + } + min := len(leases["s1"]) + if len(leases["s2"]) < min { + min = len(leases["s2"]) + } + return total == 5 && max == 3 && min == 2 + }, rebalanceConvergeTimeout, rebalanceConvergePollInterval, "5 partitions should split 3+2 across 2 subscribers") + + t.Logf("Odd partition distribution verified: 5 partitions split 3+2") +} + +func (s *SQLQueueIntegrationSuite) TestRebalance_NoOrphans() { + t := s.T() + + topic := "rebalance_orphan_topic" + consumerGroup := "rebalance-orphan-cg" + partitions := []string{"pk-1", "pk-2", "pk-3", "pk-4", "pk-5", "pk-6"} + + pubQ, err := queueMySQL.NewQueue(queueMySQL.Params{ + DB: s.db, Logger: zaptest.NewLogger(t), MetricsScope: tally.NoopScope, + }) + require.NoError(t, err) + defer pubQ.Close() + + for i, pk := range partitions { + msg := queue.NewMessage(fmt.Sprintf("rb-orphan-%d", i), []byte("x"), pk, nil) + require.NoError(t, pubQ.Publisher().Publish(s.ctx, topic, msg)) + } + + // 3 subscribers → 2 each. + queues := make([]extqueue.Queue, 3) + subNames := []string{"s1", "s2", "s3"} + for i, name := range subNames { + q, err := queueMySQL.NewQueue(queueMySQL.Params{ + DB: s.db, Logger: zaptest.NewLogger(t), MetricsScope: tally.NoopScope, + }) + require.NoError(t, err) + queues[i] = q + _, err = q.Subscriber().Subscribe(s.ctx, topic, rebalanceSubConfig(name, consumerGroup)) + require.NoError(t, err) + } + defer queues[0].Close() + defer queues[1].Close() + // queues[2] will be closed explicitly + + require.Eventually(t, func() bool { + leases, _ := getPartitionLeases(s.db, topic, consumerGroup) + total := 0 + for _, pks := range leases { + total += len(pks) + } + return total == 6 + }, rebalanceConvergeTimeout, rebalanceConvergePollInterval, "all 6 partitions should be assigned across 3 subscribers") + + // Remove S3 → remaining 2 should pick up orphans. maxPart = ceil(6/2) = 3. + require.NoError(t, queues[2].Close()) + + // S1/S2 discovery loops will detect orphaned (expired) partitions and acquire them. + require.Eventually(t, func() bool { + leases, _ := getPartitionLeases(s.db, topic, consumerGroup) + total := len(leases["s1"]) + len(leases["s2"]) + // s3 leases should be gone (released on close or expired) + return total == 6 && len(leases["s3"]) == 0 + }, rebalanceConvergeTimeout, rebalanceConvergePollInterval, "remaining 2 subscribers should own all 6 partitions") + + t.Logf("No orphan partitions: all 6 reassigned after subscriber left") +} + +func (s *SQLQueueIntegrationSuite) TestRebalance_MoreSubscribersThanPartitions() { + t := s.T() + + topic := "rebalance_excess_topic" + consumerGroup := "rebalance-excess-cg" + partitions := []string{"pk-1", "pk-2"} + + pubQ, err := queueMySQL.NewQueue(queueMySQL.Params{ + DB: s.db, Logger: zaptest.NewLogger(t), MetricsScope: tally.NoopScope, + }) + require.NoError(t, err) + defer pubQ.Close() + + for i, pk := range partitions { + msg := queue.NewMessage(fmt.Sprintf("rb-excess-%d", i), []byte("x"), pk, nil) + require.NoError(t, pubQ.Publisher().Publish(s.ctx, topic, msg)) + } + + // 4 subscribers competing for 2 partitions. maxPart = ceil(2/4) = 1. + subNames := []string{"s1", "s2", "s3", "s4"} + var queues []extqueue.Queue + for _, name := range subNames { + q, err := queueMySQL.NewQueue(queueMySQL.Params{ + DB: s.db, Logger: zaptest.NewLogger(t), MetricsScope: tally.NoopScope, + }) + require.NoError(t, err) + queues = append(queues, q) + _, err = q.Subscriber().Subscribe(s.ctx, topic, rebalanceSubConfig(name, consumerGroup)) + require.NoError(t, err) + } + defer func() { + for _, q := range queues { + q.Close() + } + }() + + require.Eventually(t, func() bool { + leases, _ := getPartitionLeases(s.db, topic, consumerGroup) + total := 0 + maxOwned := 0 + for _, pks := range leases { + total += len(pks) + if len(pks) > maxOwned { + maxOwned = len(pks) + } + } + return total == 2 && maxOwned <= 1 + }, rebalanceConvergeTimeout, rebalanceConvergePollInterval, + "2 partitions across 4 subscribers: total=2, max per subscriber=1") + + t.Logf("More subscribers than partitions verified: 2 partitions, 4 subscribers, max 1 each") +} + +// TestNackDoesNotBlockOtherMessages verifies that nacking a message does not +// block delivery of subsequent messages in the same partition. The nacked +// message should be skipped (invisible) while later messages are delivered. +func (s *SQLQueueIntegrationSuite) TestNackDoesNotBlockOtherMessages() { + t := s.T() + + q, err := queueMySQL.NewQueue(queueMySQL.Params{ + DB: s.db, Logger: zaptest.NewLogger(t), MetricsScope: tally.NoopScope, + }) + require.NoError(t, err) + defer q.Close() + + topic := "nack_nonblocking_topic" + partition := "nack-nb-part" + + // Subscribe with batch=10 to fetch multiple messages per poll + subConfig := extqueue.DefaultSubscriptionConfig("worker-1", "nack-nb-cg") + subConfig.PollIntervalMs = 50 + subConfig.BatchSize = 10 + deliveryCh, err := q.Subscriber().Subscribe(s.ctx, topic, subConfig) + require.NoError(t, err) + + // Publish 3 messages in order + for i := 1; i <= 3; i++ { + msg := queue.NewMessage(fmt.Sprintf("msg-%d", i), []byte(fmt.Sprintf("payload-%d", i)), partition, nil) + require.NoError(t, q.Publisher().Publish(s.ctx, topic, msg)) + } + + // Receive first message and nack it with a long delay + d1 := receiveWithTimeout(t, deliveryCh, testTimeout) + assert.Equal(t, "msg-1", d1.Message().ID) + require.NoError(t, d1.Nack(s.ctx, 30000)) // 30s delay — won't come back during test + t.Logf("Nacked msg-1 with 30s delay") + + // Messages 2 and 3 should still be deliverable despite msg-1 being nacked + d2 := receiveWithTimeout(t, deliveryCh, testTimeout) + assert.Equal(t, "msg-2", d2.Message().ID) + require.NoError(t, d2.Ack(s.ctx)) + t.Logf("Received and acked msg-2") + + d3 := receiveWithTimeout(t, deliveryCh, testTimeout) + assert.Equal(t, "msg-3", d3.Message().ID) + require.NoError(t, d3.Ack(s.ctx)) + t.Logf("Received and acked msg-3") + + t.Logf("Verified: nacked message did not block subsequent messages") +} + +// TestBatchSizeOneStrictSerialization verifies that with batchSize=1, messages +// within a partition are processed strictly in order — only one message is +// in-flight at a time. +func (s *SQLQueueIntegrationSuite) TestBatchSizeOneStrictSerialization() { + t := s.T() + + q, err := queueMySQL.NewQueue(queueMySQL.Params{ + DB: s.db, Logger: zaptest.NewLogger(t), MetricsScope: tally.NoopScope, + }) + require.NoError(t, err) + defer q.Close() + + topic := "serial_topic" + partition := "serial-part" + + // Subscribe with batchSize=1 for strict serialization + subConfig := extqueue.DefaultSubscriptionConfig("worker-1", "serial-cg") + subConfig.PollIntervalMs = 50 + subConfig.BatchSize = 1 + deliveryCh, err := q.Subscriber().Subscribe(s.ctx, topic, subConfig) + require.NoError(t, err) + + // Publish 5 messages + for i := 1; i <= 5; i++ { + msg := queue.NewMessage(fmt.Sprintf("serial-%d", i), []byte(strconv.Itoa(i)), partition, nil) + require.NoError(t, q.Publisher().Publish(s.ctx, topic, msg)) + } + + // Receive each message strictly in order, acking before receiving next + for i := 1; i <= 5; i++ { + delivery := receiveWithTimeout(t, deliveryCh, testTimeout) + assert.Equal(t, fmt.Sprintf("serial-%d", i), delivery.Message().ID, + "expected message %d but got %s", i, delivery.Message().ID) + require.NoError(t, delivery.Ack(s.ctx)) + t.Logf("Strictly ordered delivery: serial-%d", i) + } + + // Verify no more messages + select { + case d := <-deliveryCh: + t.Fatalf("unexpected extra delivery: %s", d.Message().ID) + case <-time.After(500 * time.Millisecond): + // Expected — no more messages + } + + t.Logf("Verified: batchSize=1 enforces strict serialization") +} + +// TestMultipleConsumerGroupsIndependentState verifies that two consumer groups +// can independently process, nack, retry, and ack the same messages without +// interfering with each other. +func (s *SQLQueueIntegrationSuite) TestMultipleConsumerGroupsIndependentState() { + t := s.T() + + q, err := queueMySQL.NewQueue(queueMySQL.Params{ + DB: s.db, Logger: zaptest.NewLogger(t), MetricsScope: tally.NoopScope, + }) + require.NoError(t, err) + defer q.Close() + + topic := "multi_cg_state_topic" + partition := "multi-cg-part" + + // Two consumer groups subscribing to the same topic + cfg1 := extqueue.DefaultSubscriptionConfig("worker-1", "cg-alpha") + cfg1.PollIntervalMs = 50 + cfg2 := extqueue.DefaultSubscriptionConfig("worker-2", "cg-beta") + cfg2.PollIntervalMs = 50 + + ch1, err := q.Subscriber().Subscribe(s.ctx, topic, cfg1) + require.NoError(t, err) + ch2, err := q.Subscriber().Subscribe(s.ctx, topic, cfg2) + require.NoError(t, err) + + // Publish 2 messages + for i := 1; i <= 2; i++ { + msg := queue.NewMessage(fmt.Sprintf("shared-%d", i), []byte(strconv.Itoa(i)), partition, nil) + require.NoError(t, q.Publisher().Publish(s.ctx, topic, msg)) + } + + // CG-alpha: nack msg-1, ack msg-2 + d1a := receiveWithTimeout(t, ch1, testTimeout) + assert.Equal(t, "shared-1", d1a.Message().ID) + require.NoError(t, d1a.Nack(s.ctx, 200)) // short nack delay + t.Logf("cg-alpha nacked shared-1") + + d2a := receiveWithTimeout(t, ch1, testTimeout) + assert.Equal(t, "shared-2", d2a.Message().ID) + require.NoError(t, d2a.Ack(s.ctx)) + t.Logf("cg-alpha acked shared-2") + + // CG-beta: ack both messages immediately (independent state) + d1b := receiveWithTimeout(t, ch2, testTimeout) + assert.Equal(t, "shared-1", d1b.Message().ID) + require.NoError(t, d1b.Ack(s.ctx)) + t.Logf("cg-beta acked shared-1") + + d2b := receiveWithTimeout(t, ch2, testTimeout) + assert.Equal(t, "shared-2", d2b.Message().ID) + require.NoError(t, d2b.Ack(s.ctx)) + t.Logf("cg-beta acked shared-2") + + // CG-alpha should get shared-1 redelivered after nack delay + d1aRetry := receiveWithTimeout(t, ch1, testTimeout) + assert.Equal(t, "shared-1", d1aRetry.Message().ID) + assert.Greater(t, d1aRetry.Attempt(), 1, "should be a retry attempt") + require.NoError(t, d1aRetry.Ack(s.ctx)) + t.Logf("cg-alpha received retry of shared-1 (attempt=%d)", d1aRetry.Attempt()) + + // CG-beta should NOT get shared-1 again (already acked independently) + select { + case d := <-ch2: + t.Fatalf("cg-beta should not receive more messages, got: %s", d.Message().ID) + case <-time.After(500 * time.Millisecond): + // Expected — cg-beta is done + } + + t.Logf("Verified: consumer groups have fully independent delivery state") +}