Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions extensions/queue/sql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ go_library(
"offset_store.go",
"partition_lease_store.go",
"publisher.go",
"sql.go",
"stores.go",
"subscriber.go",
"validation.go",
Expand All @@ -34,6 +35,7 @@ go_test(
"offset_store_test.go",
"partition_lease_store_test.go",
"publisher_test.go",
"sql_test.go",
"subscriber_test.go",
],
embed = [":sql"],
Expand Down
82 changes: 82 additions & 0 deletions extensions/queue/sql/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
# SQL Queue Implementation

MySQL-based distributed queue with partition leasing, visibility timeout, and at-least-once delivery.

## Key Features

- **Partition leasing** - Workers coordinate via database leases with automatic failover
- **Visibility timeout** - Messages retry automatically if worker crashes
- **At-least-once delivery** - Offset tracking for crash recovery
- **Dead letter queue** - Failed messages moved to DLQ after max retries

## Quick Start

```go
import (
"database/sql"
_ "github.com/go-sql-driver/mysql"
queueSQL "github.com/uber/submitqueue/extensions/queue/sql"
"github.com/uber/submitqueue/entities/queue"
)

// Setup
db, _ := sql.Open("mysql", "user:pass@tcp(localhost:3306)/db")
q, _ := queueSQL.NewQueue(queueSQL.Params{
DB: db,
Logger: logger,
Config: queueSQL.DefaultConfig("orchestrator", "worker-1"),
})
defer q.Close()

// Publish
msg := queue.NewMessage("msg-id", []byte(`{"data": "value"}`))
msg.PartitionKey = "repo-123" // Required for ordering
q.Publisher().Publish(ctx, "merge_events", msg)

// Subscribe
deliveryCh, _ := q.Subscriber().Subscribe(ctx, "merge_events")
for delivery := range deliveryCh {
if err := process(delivery.Message()); err != nil {
delivery.Nack(ctx, 0) // Retry
continue
}
delivery.Ack(ctx)
}
```

## Configuration

```go
config := queueSQL.DefaultConfig("consumer-group", "worker-id")
config.PollInterval = 50 * time.Millisecond // Poll frequency
config.BatchSize = 20 // Messages per poll
config.VisibilityTimeout = 60 * time.Second // Retry delay
config.Retry.MaxAttempts = 3 // Max retries before DLQ
```

## How It Works

**Partition Leasing:**
1. Workers discover partitions from messages table
2. Workers acquire leases (one worker per partition)
3. Stale leases can be stolen by other workers

**Message Flow:**
1. Fetch visible messages (invisible_until <= now)
2. Process message
3. Ack: DELETE message, UPDATE offset
4. Nack: Message becomes visible after timeout
5. If retry_count >= MaxAttempts: Move to DLQ

**Crash Recovery:**
- Messages become visible after visibility timeout
- Other workers steal stale leases
- Resume from last acked offset

## Partition Ordering

Messages with same `PartitionKey` are processed in order by a single worker.

## Distributed Processing

Multiple workers in the same consumer group share partitions. Workers in different consumer groups consume independently.
2 changes: 1 addition & 1 deletion extensions/queue/sql/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
)

// Config holds configuration for SQL-based queue.
// DB connection, logger, and metrics are passed separately to NewFactory.
// DB connection, logger, and metrics are passed separately to NewQueue.
type Config struct {
// ConsumerGroup identifies this consumer for offset tracking (required)
ConsumerGroup string
Expand Down
114 changes: 114 additions & 0 deletions extensions/queue/sql/sql.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package sql

import (
"database/sql"
"errors"
"fmt"

"github.com/uber-go/tally/v4"
"go.uber.org/zap"

"github.com/uber/submitqueue/extensions/queue"
)

type queueImpl struct {
publisher queue.Publisher
subscriber queue.Subscriber
closed bool
}

// Params holds dependencies for creating a SQL queue factory
type Params struct {
// DB is the database connection (required)
DB *sql.DB

// Logger for debugging and observability (required)
Logger *zap.Logger

// MetricsScope for metrics collection (required)
MetricsScope tally.Scope

// Config holds queue configuration
Config Config
}

// NewQueue creates a new SQL-based queue factory
func NewQueue(params Params) (queue.Queue, error) {
if err := params.Config.Validate(); err != nil {
return nil, fmt.Errorf("invalid config: %w", err)
}

// Test connection
if err := params.DB.Ping(); err != nil {
return nil, fmt.Errorf("failed to ping database: %w", err)
}

logger := params.Logger.Sugar().Named("queue.sql")
logger.Infow("created SQL queue factory",
"consumer_group", params.Config.ConsumerGroup,
"worker_id", params.Config.WorkerID,
"poll_interval", params.Config.PollInterval,
"batch_size", params.Config.BatchSize,
)

// Create stores
messageStore := newMessageStore(params.DB, params.Config, params.Logger, params.MetricsScope)
offsetStore := newOffsetStore(params.DB, params.Config, params.Logger, params.MetricsScope)
leaseStore := newPartitionLeaseStore(params.DB, params.Config, params.Logger, params.MetricsScope)

queueMetrics := params.MetricsScope.SubScope("queue")

// Create publisher and subscriber
publisher := NewPublisher(
params.Config,
logger.Named("publisher"),
queueMetrics.SubScope("publisher"),
messageStore,
)

subscriber := NewSubscriber(
params.Config,
logger.Named("subscriber"),
queueMetrics.SubScope("subscriber"),
messageStore,
offsetStore,
leaseStore,
)

return &queueImpl{
publisher: publisher,
subscriber: subscriber,
closed: false,
}, nil
}

// Publisher returns a Publisher instance
func (q *queueImpl) Publisher() queue.Publisher {
return q.publisher
}

// Subscriber returns a Subscriber instance
func (q *queueImpl) Subscriber() queue.Subscriber {
return q.subscriber
}

// Close shuts down the factory and all associated resources
func (q *queueImpl) Close() error {
if q.closed {
return nil
}
q.closed = true

// Close subscriber and publisher
var errs []error
Comment thread
behinddwalls marked this conversation as resolved.

if err := q.subscriber.Close(); err != nil {
errs = append(errs, fmt.Errorf("subscriber close failed: %w", err))
}

if err := q.publisher.Close(); err != nil {
errs = append(errs, fmt.Errorf("publisher close failed: %w", err))
}

return errors.Join(errs...)
}
Loading