Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 41 additions & 21 deletions doc/rfc/sql-queue-rfc.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,12 @@ We chose **custom database-backed queue** because:
│ │ - consumer_group, topic │ │
│ │ - partition_key, offset │ │
│ └───────────────────────────────┘ │
│ ┌───────────────────────────────┐ │
│ │ queue_subscriber_heartbeats │ │
│ │ - consumer_group, topic │ │
│ │ - subscriber_name │ │
│ │ - heartbeat_at │ │
│ └───────────────────────────────┘ │
└─────────────────────────────────────┘
Expand All @@ -141,6 +147,8 @@ We chose **custom database-backed queue** because:

**Offset Tracking:** Per-partition offsets enable crash recovery from last acked message.

**Fair Share Partitioning:** Subscribers register heartbeats so peers can compute even partition distribution. On each tick, subscribers compute `ceil(totalPartitions / activeSubscribers)` and release excess partitions if they hold more than their fair share.

## Database Schema

### Messages Table
Expand Down Expand Up @@ -186,21 +194,30 @@ See `extension/queue/mysql/schema/queue_partition_leases.sql` for full schema.

See `extension/queue/mysql/schema/queue_offsets.sql` for full schema.

### Dead Letter Queue Table
### Subscriber Heartbeats Table

**Key Fields:**
- `offset` (PK): Auto-incrementing offset for DLQ ordering
- `topic`, `partition_key`, `id`: Original message identification
- `payload`, `metadata`: Original message content
- `failed_at`: When message moved to DLQ
- `failure_count`, `last_error`: Failure diagnostics
- `consumer_group`, `topic`, `subscriber_name` (PK): Identifies the subscriber
- `heartbeat_at`: Last heartbeat timestamp (epoch ms)
- `deregistered_at`: When the subscriber was deregistered (0 = active)

**Indexes:**
- `(topic, partition_key, failed_at)`: Query DLQ by topic/partition, ordered by failure time
- `(failed_at)`: Time-based queries and cleanup
- `(topic, partition_key, id)`: Unique constraint, prevents duplicates
Used for fair share partition leasing — active subscribers (recent heartbeat) are counted to compute even partition distribution.

See `extension/queue/mysql/schema/queue_subscriber_heartbeats.sql` for full schema.

See `extension/queue/mysql/schema/queue_dlq.sql` for full schema.
### Dead Letter Queue

DLQ messages are reinserted into the `queue_messages` table with a DLQ topic suffix (e.g., `merge_events_dlq`). This avoids a separate table and allows DLQ messages to be consumed using the normal subscriber.

**DLQ-specific fields on `queue_messages`:**
- `failed_at`: When the message was moved to DLQ (epoch ms)
- `failure_count`: How many times the message failed on the original topic before DLQ move
- `last_error`: The error message that triggered the DLQ move
- `original_topic`: The topic the message was originally published to

Normal messages have these fields set to zero/empty. DLQ messages have `retry_count` reset to 0 since DLQ processing starts fresh.

See `extension/queue/mysql/schema/queue_messages.sql` for full schema.

## Message Flow

Expand All @@ -214,7 +231,7 @@ See `extension/queue/mysql/schema/queue_dlq.sql` for full schema.

**5. Nack** - UPDATE `invisible_until` for retry after delay

**6. DLQ** - If `retry_count >= MaxAttempts`: DELETE from messages + INSERT into dlq
**6. DLQ** - If `retry_count >= MaxAttempts`: DELETE from messages + INSERT into messages with DLQ topic suffix

## Crash Recovery

Expand Down Expand Up @@ -304,15 +321,18 @@ With our custom implementation:

## Observability

**Metrics (via tally):**
- Publisher: `messages_published`, `publish_errors`
- Subscriber: `messages_acked`, `messages_nacked`, `messages_moved_to_dlq`, `message_age`, `leases_acquired`
- Stores: `insert.latency`, `fetch.latency`, `ack_message.latency`, `renew_lease.latency`

**Logging (via zap):**
- Debug: Message fetch, lease operations
- Info: Publish success, DLQ moves, partition acquisition
- Error: Database errors, unrecoverable failures
**Metrics (via tally, scoped with `queue_mysql_` prefix):**
- Publisher: `publish` (latency, success/error via `metrics.Begin`)
- Subscriber: `ack.messages_acked`, `nack.messages_nacked`, `reject.messages_rejected_to_dlq`, `poll_and_deliver.message_age`, `poll_and_deliver.messages_received`, `discover_and_reconcile.leases_acquired`
- Lease store: `try_acquire_lease`, `renew_lease`, `release_lease`, `get_leased_partitions`, `discover_and_acquire` (all with latency via `metrics.Begin`)
- Message store: `insert`, `fetch`, `delete`, `move_to_dlq`, `set_visibility` (all with latency via `metrics.Begin`)
- Heartbeat: `heartbeat.sent`, `heartbeat.errors`, `heartbeat.deregistrations`
- Fair share: `fair_share_cap.active_subscribers`, `rebalance.partitions_released`

**Logging (via zap, named with `queue_mysql_` prefix):**
- Debug: Message fetch, lease operations, partition worker lifecycle
- Info: Subscription creation, DLQ moves, partition acquisition, rebalance events
- Warn: Lease renewal failures, heartbeat failures, retry limit exceeded, offset errors
- Structured fields: `topic`, `partition_key`, `message_id`, `offset`, `retry_count`

## Performance
Expand Down
46 changes: 33 additions & 13 deletions extension/queue/mysql/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ MySQL-based distributed queue with partition leasing, visibility timeout, and at
- **Per-partition workers** — each leased partition gets its own goroutine for isolation
- **Visibility timeout** — messages retry automatically if worker crashes
- **At-least-once delivery** — offset tracking for crash recovery
- **Dead letter queue** — failed messages moved to DLQ after max retries
- **Dead letter queue** — failed messages moved to DLQ topic after max retries
- **Fair share partitioning** — subscriber heartbeats enable even partition distribution across workers

## Quick Start

Expand Down Expand Up @@ -68,9 +69,6 @@ subConfig.Retry.MaxBackoffMs = 30000 // Max retry backoff (mill
subConfig.Retry.BackoffMultiplier = 2.0 // Backoff multiplier for exponential backoff
subConfig.DLQ.Enabled = true // Enable dead letter queue
subConfig.DLQ.TopicSuffix = "_dlq" // DLQ topic suffix

// Use config when subscribing
deliveryCh, _ := q.Subscriber().Subscribe(ctx, "my-topic", subConfig)
```

**Key Configuration Fields:**
Expand All @@ -95,7 +93,9 @@ deliveryCh, _ := q.Subscriber().Subscribe(ctx, "my-topic", subConfig)
Each subscription has a **supervisor goroutine** (`managePartitions`) that:
1. Discovers partitions from the messages table
2. Acquires and renews partition leases
3. Reconciles **per-partition worker goroutines** based on current leases
3. Sends heartbeats so other subscribers can compute fair shares
4. Rebalances partitions when subscribers join or leave
5. Reconciles **per-partition worker goroutines** based on current leases

Each partition worker goroutine polls and delivers messages for its partition independently. This provides fault isolation — a slow or blocked partition does not affect other partitions.

Expand All @@ -107,6 +107,16 @@ Subscribe()
└── partitionWorker("part-3") ← polls & delivers
```

### Fair Share Partitioning

Subscribers register heartbeats in the `queue_subscriber_heartbeats` table. On each lease renewal tick, the supervisor:
1. Queries active subscribers (those with recent heartbeats)
2. Computes `ceil(totalPartitions / activeSubscribers)` as the fair share cap
3. If this subscriber owns more partitions than its fair share, releases excess partitions
4. On discovery, limits new lease acquisitions to the fair share cap

This ensures even partition distribution when subscribers scale up or down.

### Shutdown Sequence

Shutdown uses two `sync.WaitGroup`s to ensure correctness:
Expand All @@ -115,20 +125,21 @@ Shutdown uses two `sync.WaitGroup`s to ensure correctness:

When `Close()` is called:
1. Subscription context is cancelled
2. `managePartitions` calls `stopAllWorkers` — cancels each worker and waits up to 5s per worker
2. `managePartitions` calls `stopAllWorkers` — cancels each worker and waits up to 30s per worker
3. Partition leases are released
4. `workerWg.Wait()` blocks until all workers have fully exited
5. `deliveryCh` is closed — safe because no workers can send after step 4
6. `managePartitions` returns, `wg.Done()` fires
7. `Close()` returns
4. Subscriber heartbeat is deregistered
5. `workerWg.Wait()` blocks until all workers have fully exited
6. `deliveryCh` is closed — safe because no workers can send after step 5
7. `managePartitions` returns, `wg.Done()` fires
8. `Close()` returns

The `workerWg.Wait()` step prevents a race where a slow worker (blocked on I/O past the 5s timeout) could send on a closed channel.
The `workerWg.Wait()` step prevents a race where a slow worker (blocked on I/O past the 30s timeout) could send on a closed channel.

### Worker Stop Behavior

When a partition worker is stopped (lease lost or shutdown):
- The worker is immediately removed from the workers map and its context is cancelled
- The caller waits up to 5s for the worker to confirm exit (logging a warning on timeout)
- The caller waits up to 30s for the worker to confirm exit (logging a warning on timeout)
- `workerWg` tracks the worker regardless, so `Close()` always waits for full exit
- If the worker times out, reconciliation is free to start a replacement — any brief overlap is harmless with at-least-once delivery semantics

Expand All @@ -144,7 +155,7 @@ When a partition worker is stopped (lease lost or shutdown):
2. Process message
3. Ack: DELETE message, UPDATE offset
4. Nack: Message becomes visible after timeout
5. If retry_count >= MaxAttempts: Move to DLQ
5. If retry_count >= MaxAttempts: Move to DLQ topic (reinsert into messages table with DLQ topic suffix)

**Crash Recovery:**
- Messages become visible after visibility timeout
Expand All @@ -158,3 +169,12 @@ Messages with same `PartitionKey` are processed in order by a single worker.
## Distributed Processing

Multiple workers in the same consumer group share partitions. Workers in different consumer groups consume independently.

## Error Types

The package defines domain-specific sentinel errors for callers to check with `errors.Is` / `errors.As`:

- `ErrPublisherClosed` — publishing to a closed publisher
- `ErrSubscriberClosed` — subscribing on a closed subscriber
- `ErrAlreadyAcknowledged` — ack/nack/reject on an already-processed delivery
- `ErrLeaseExpired` — lease operation on a partition no longer owned by this worker
7 changes: 0 additions & 7 deletions extension/queue/mysql/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,9 @@ package mysql
// Common constants for frequently repeated strings across stores

const (
// Tag key (used in every Tagged() call)
tagErrorType = "error_type"

// Common log field names (used extensively across all stores)
logTopic = "topic"
logPartitionKey = "partition_key"
logMessageID = "message_id"
logError = "error"

// Error types used across multiple methods/stores
errorBeginTx = "begin_transaction"
errorCommit = "commit"
)
27 changes: 25 additions & 2 deletions extension/queue/mysql/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,36 @@

package mysql

import "fmt"
import (
"errors"
"fmt"
)

// ErrAlreadyAcknowledged is returned when attempting to ack/nack a delivery that was already processed
// Sentinel errors for closed resources. Callers can check these with errors.Is.
var (
// ErrPublisherClosed is returned when publishing to a closed publisher.
ErrPublisherClosed = errors.New("publisher is closed")

// ErrSubscriberClosed is returned when subscribing on a closed subscriber.
ErrSubscriberClosed = errors.New("subscriber is closed")
)

// ErrAlreadyAcknowledged is returned when attempting to ack/nack a delivery that was already processed.
type ErrAlreadyAcknowledged struct {
DeliveryID string
}

func (e *ErrAlreadyAcknowledged) Error() string {
return fmt.Sprintf("delivery %s already acknowledged or nacked", e.DeliveryID)
}

// ErrLeaseExpired is returned when a lease operation fails because the lease
// is not owned by this worker or has already expired.
type ErrLeaseExpired struct {
Topic string
PartitionKey string
}

func (e *ErrLeaseExpired) Error() string {
return fmt.Sprintf("lease expired or not owned for topic %s partition %s", e.Topic, e.PartitionKey)
}
Loading
Loading