From 0e6c4ccf0ba10335a37bfc2ec24fcad33d3af60c Mon Sep 17 00:00:00 2001 From: Preetam Dwivedi Date: Thu, 19 Feb 2026 21:06:32 -0800 Subject: [PATCH] feat(queue/sql): add factory and documentation Add Factory implementation (sql.go) for SQL queue that provides Publisher and Subscriber instances with proper lifecycle management. --- extensions/queue/sql/BUILD.bazel | 2 + extensions/queue/sql/README.md | 82 +++++++++ extensions/queue/sql/config.go | 2 +- extensions/queue/sql/sql.go | 114 ++++++++++++ extensions/queue/sql/sql_test.go | 294 +++++++++++++++++++++++++++++++ 5 files changed, 493 insertions(+), 1 deletion(-) create mode 100644 extensions/queue/sql/README.md create mode 100644 extensions/queue/sql/sql.go create mode 100644 extensions/queue/sql/sql_test.go diff --git a/extensions/queue/sql/BUILD.bazel b/extensions/queue/sql/BUILD.bazel index ea87d3e2..7609aa66 100644 --- a/extensions/queue/sql/BUILD.bazel +++ b/extensions/queue/sql/BUILD.bazel @@ -11,6 +11,7 @@ go_library( "offset_store.go", "partition_lease_store.go", "publisher.go", + "sql.go", "stores.go", "subscriber.go", "validation.go", @@ -34,6 +35,7 @@ go_test( "offset_store_test.go", "partition_lease_store_test.go", "publisher_test.go", + "sql_test.go", "subscriber_test.go", ], embed = [":sql"], diff --git a/extensions/queue/sql/README.md b/extensions/queue/sql/README.md new file mode 100644 index 00000000..209d1db4 --- /dev/null +++ b/extensions/queue/sql/README.md @@ -0,0 +1,82 @@ +# SQL Queue Implementation + +MySQL-based distributed queue with partition leasing, visibility timeout, and at-least-once delivery. + +## Key Features + +- **Partition leasing** - Workers coordinate via database leases with automatic failover +- **Visibility timeout** - Messages retry automatically if worker crashes +- **At-least-once delivery** - Offset tracking for crash recovery +- **Dead letter queue** - Failed messages moved to DLQ after max retries + +## Quick Start + +```go +import ( + "database/sql" + _ "github.com/go-sql-driver/mysql" + queueSQL "github.com/uber/submitqueue/extensions/queue/sql" + "github.com/uber/submitqueue/entities/queue" +) + +// Setup +db, _ := sql.Open("mysql", "user:pass@tcp(localhost:3306)/db") +q, _ := queueSQL.NewQueue(queueSQL.Params{ + DB: db, + Logger: logger, + Config: queueSQL.DefaultConfig("orchestrator", "worker-1"), +}) +defer q.Close() + +// Publish +msg := queue.NewMessage("msg-id", []byte(`{"data": "value"}`)) +msg.PartitionKey = "repo-123" // Required for ordering +q.Publisher().Publish(ctx, "merge_events", msg) + +// Subscribe +deliveryCh, _ := q.Subscriber().Subscribe(ctx, "merge_events") +for delivery := range deliveryCh { + if err := process(delivery.Message()); err != nil { + delivery.Nack(ctx, 0) // Retry + continue + } + delivery.Ack(ctx) +} +``` + +## Configuration + +```go +config := queueSQL.DefaultConfig("consumer-group", "worker-id") +config.PollInterval = 50 * time.Millisecond // Poll frequency +config.BatchSize = 20 // Messages per poll +config.VisibilityTimeout = 60 * time.Second // Retry delay +config.Retry.MaxAttempts = 3 // Max retries before DLQ +``` + +## How It Works + +**Partition Leasing:** +1. Workers discover partitions from messages table +2. Workers acquire leases (one worker per partition) +3. Stale leases can be stolen by other workers + +**Message Flow:** +1. Fetch visible messages (invisible_until <= now) +2. Process message +3. Ack: DELETE message, UPDATE offset +4. Nack: Message becomes visible after timeout +5. If retry_count >= MaxAttempts: Move to DLQ + +**Crash Recovery:** +- Messages become visible after visibility timeout +- Other workers steal stale leases +- Resume from last acked offset + +## Partition Ordering + +Messages with same `PartitionKey` are processed in order by a single worker. + +## Distributed Processing + +Multiple workers in the same consumer group share partitions. Workers in different consumer groups consume independently. diff --git a/extensions/queue/sql/config.go b/extensions/queue/sql/config.go index df3eb7fc..bd79a0e6 100644 --- a/extensions/queue/sql/config.go +++ b/extensions/queue/sql/config.go @@ -6,7 +6,7 @@ import ( ) // Config holds configuration for SQL-based queue. -// DB connection, logger, and metrics are passed separately to NewFactory. +// DB connection, logger, and metrics are passed separately to NewQueue. type Config struct { // ConsumerGroup identifies this consumer for offset tracking (required) ConsumerGroup string diff --git a/extensions/queue/sql/sql.go b/extensions/queue/sql/sql.go new file mode 100644 index 00000000..a5af3b80 --- /dev/null +++ b/extensions/queue/sql/sql.go @@ -0,0 +1,114 @@ +package sql + +import ( + "database/sql" + "errors" + "fmt" + + "github.com/uber-go/tally/v4" + "go.uber.org/zap" + + "github.com/uber/submitqueue/extensions/queue" +) + +type queueImpl struct { + publisher queue.Publisher + subscriber queue.Subscriber + closed bool +} + +// Params holds dependencies for creating a SQL queue factory +type Params struct { + // DB is the database connection (required) + DB *sql.DB + + // Logger for debugging and observability (required) + Logger *zap.Logger + + // MetricsScope for metrics collection (required) + MetricsScope tally.Scope + + // Config holds queue configuration + Config Config +} + +// NewQueue creates a new SQL-based queue factory +func NewQueue(params Params) (queue.Queue, error) { + if err := params.Config.Validate(); err != nil { + return nil, fmt.Errorf("invalid config: %w", err) + } + + // Test connection + if err := params.DB.Ping(); err != nil { + return nil, fmt.Errorf("failed to ping database: %w", err) + } + + logger := params.Logger.Sugar().Named("queue.sql") + logger.Infow("created SQL queue factory", + "consumer_group", params.Config.ConsumerGroup, + "worker_id", params.Config.WorkerID, + "poll_interval", params.Config.PollInterval, + "batch_size", params.Config.BatchSize, + ) + + // Create stores + messageStore := newMessageStore(params.DB, params.Config, params.Logger, params.MetricsScope) + offsetStore := newOffsetStore(params.DB, params.Config, params.Logger, params.MetricsScope) + leaseStore := newPartitionLeaseStore(params.DB, params.Config, params.Logger, params.MetricsScope) + + queueMetrics := params.MetricsScope.SubScope("queue") + + // Create publisher and subscriber + publisher := NewPublisher( + params.Config, + logger.Named("publisher"), + queueMetrics.SubScope("publisher"), + messageStore, + ) + + subscriber := NewSubscriber( + params.Config, + logger.Named("subscriber"), + queueMetrics.SubScope("subscriber"), + messageStore, + offsetStore, + leaseStore, + ) + + return &queueImpl{ + publisher: publisher, + subscriber: subscriber, + closed: false, + }, nil +} + +// Publisher returns a Publisher instance +func (q *queueImpl) Publisher() queue.Publisher { + return q.publisher +} + +// Subscriber returns a Subscriber instance +func (q *queueImpl) Subscriber() queue.Subscriber { + return q.subscriber +} + +// Close shuts down the factory and all associated resources +func (q *queueImpl) Close() error { + if q.closed { + return nil + } + q.closed = true + + // Close subscriber and publisher + var errs []error + + if err := q.subscriber.Close(); err != nil { + errs = append(errs, fmt.Errorf("subscriber close failed: %w", err)) + } + + if err := q.publisher.Close(); err != nil { + errs = append(errs, fmt.Errorf("publisher close failed: %w", err)) + } + + return errors.Join(errs...) +} diff --git a/extensions/queue/sql/sql_test.go b/extensions/queue/sql/sql_test.go new file mode 100644 index 00000000..b07cf2b1 --- /dev/null +++ b/extensions/queue/sql/sql_test.go @@ -0,0 +1,294 @@ +package sql + +import ( + "database/sql" + "testing" + + "github.com/DATA-DOG/go-sqlmock" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/uber-go/tally/v4" + "go.uber.org/zap/zaptest" + + "github.com/uber/submitqueue/extensions/queue" +) + +func TestNewQueue(t *testing.T) { + t.Run("success with all params", func(t *testing.T) { + db, mock, err := sqlmock.New(sqlmock.MonitorPingsOption(true)) + require.NoError(t, err) + defer db.Close() + + mock.ExpectPing() + + factory, err := NewQueue(Params{ + DB: db, + Logger: zaptest.NewLogger(t), + MetricsScope: tally.NewTestScope("test", nil), + Config: DefaultConfig("test-consumer", "test-worker"), + }) + + require.NoError(t, err) + require.NotNil(t, factory) + assert.NotNil(t, factory.Publisher()) + assert.NotNil(t, factory.Subscriber()) + + err = factory.Close() + assert.NoError(t, err) + + require.NoError(t, mock.ExpectationsWereMet()) + }) + + t.Run("error when config is invalid", func(t *testing.T) { + db, _, err := sqlmock.New(sqlmock.MonitorPingsOption(true)) + require.NoError(t, err) + defer db.Close() + + config := DefaultConfig("", "") // Invalid: empty consumer group and worker ID + + factory, err := NewQueue(Params{ + DB: db, + Logger: zaptest.NewLogger(t), + MetricsScope: tally.NewTestScope("test", nil), + Config: config, + }) + + require.Error(t, err) + assert.Nil(t, factory) + }) + + t.Run("error when DB ping fails", func(t *testing.T) { + db, mock, err := sqlmock.New(sqlmock.MonitorPingsOption(true)) + require.NoError(t, err) + defer db.Close() + + mock.ExpectPing().WillReturnError(sql.ErrConnDone) + + factory, err := NewQueue(Params{ + DB: db, + Logger: zaptest.NewLogger(t), + MetricsScope: tally.NewTestScope("test", nil), + Config: DefaultConfig("test-consumer", "test-worker"), + }) + + require.Error(t, err) + assert.Nil(t, factory) + + require.NoError(t, mock.ExpectationsWereMet()) + }) +} + +func TestQueue_Publisher(t *testing.T) { + db, mock, err := sqlmock.New(sqlmock.MonitorPingsOption(true)) + require.NoError(t, err) + defer db.Close() + + mock.ExpectPing() + + factory, err := NewQueue(Params{ + DB: db, + Logger: zaptest.NewLogger(t), + MetricsScope: tally.NewTestScope("test", nil), + Config: DefaultConfig("test-consumer", "test-worker"), + }) + require.NoError(t, err) + defer factory.Close() + + // First call creates publisher + pub1 := factory.Publisher() + assert.NotNil(t, pub1) + + // Second call returns same publisher (singleton) + pub2 := factory.Publisher() + assert.Equal(t, pub1, pub2) + + require.NoError(t, mock.ExpectationsWereMet()) +} + +func TestQueue_Subscriber(t *testing.T) { + db, mock, err := sqlmock.New(sqlmock.MonitorPingsOption(true)) + require.NoError(t, err) + defer db.Close() + + mock.ExpectPing() + + factory, err := NewQueue(Params{ + DB: db, + Logger: zaptest.NewLogger(t), + MetricsScope: tally.NewTestScope("test", nil), + Config: DefaultConfig("test-consumer", "test-worker"), + }) + require.NoError(t, err) + defer factory.Close() + + // First call creates subscriber + sub1 := factory.Subscriber() + assert.NotNil(t, sub1) + + // Second call returns same subscriber (singleton) + sub2 := factory.Subscriber() + assert.Equal(t, sub1, sub2) + + require.NoError(t, mock.ExpectationsWereMet()) +} + +func TestQueue_Close(t *testing.T) { + tests := []struct { + name string + setupFactory func(t *testing.T, f queue.Queue) + wantErr bool + }{ + { + name: "close without creating publisher or subscriber", + setupFactory: func(t *testing.T, f queue.Queue) {}, + wantErr: false, + }, + { + name: "close after creating publisher", + setupFactory: func(t *testing.T, f queue.Queue) { + _ = f.Publisher() + }, + wantErr: false, + }, + { + name: "close after creating subscriber", + setupFactory: func(t *testing.T, f queue.Queue) { + _ = f.Subscriber() + }, + wantErr: false, + }, + { + name: "close after creating both publisher and subscriber", + setupFactory: func(t *testing.T, f queue.Queue) { + _ = f.Publisher() + _ = f.Subscriber() + }, + wantErr: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + db, mock, err := sqlmock.New(sqlmock.MonitorPingsOption(true)) + require.NoError(t, err) + defer db.Close() + + mock.ExpectPing() + + factory, err := NewQueue(Params{ + DB: db, + Logger: zaptest.NewLogger(t), + MetricsScope: tally.NewTestScope("test", nil), + Config: DefaultConfig("test-consumer", "test-worker"), + }) + require.NoError(t, err) + + // Setup factory state + tt.setupFactory(t, factory) + + // Close factory + err = factory.Close() + if tt.wantErr { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } + + require.NoError(t, mock.ExpectationsWereMet()) + }) + } + + t.Run("close is idempotent", func(t *testing.T) { + db, mock, err := sqlmock.New(sqlmock.MonitorPingsOption(true)) + require.NoError(t, err) + defer db.Close() + + mock.ExpectPing() + + factory, err := NewQueue(Params{ + DB: db, + Logger: zaptest.NewLogger(t), + MetricsScope: tally.NewTestScope("test", nil), + Config: DefaultConfig("test-consumer", "test-worker"), + }) + require.NoError(t, err) + + // Close multiple times + err = factory.Close() + assert.NoError(t, err) + + err = factory.Close() + assert.NoError(t, err) + + require.NoError(t, mock.ExpectationsWereMet()) + }) + + t.Run("publisher and subscriber calls after close return same instances", func(t *testing.T) { + db, mock, err := sqlmock.New(sqlmock.MonitorPingsOption(true)) + require.NoError(t, err) + defer db.Close() + + mock.ExpectPing() + + factory, err := NewQueue(Params{ + DB: db, + Logger: zaptest.NewLogger(t), + MetricsScope: tally.NewTestScope("test", nil), + Config: DefaultConfig("test-consumer", "test-worker"), + }) + require.NoError(t, err) + + // Create publisher before close + pub := factory.Publisher() + assert.NotNil(t, pub) + + // Close factory + err = factory.Close() + assert.NoError(t, err) + + // Getting publisher/subscriber after close should return the same instances + // (they were already created, so singleton pattern returns them) + pub2 := factory.Publisher() + assert.Equal(t, pub, pub2, "should return same publisher instance after close") + + require.NoError(t, mock.ExpectationsWereMet()) + }) +} + +func TestQueue_Integration(t *testing.T) { + db, mock, err := sqlmock.New(sqlmock.MonitorPingsOption(true)) + require.NoError(t, err) + defer db.Close() + + mock.ExpectPing() + + logger := zaptest.NewLogger(t) + metricsScope := tally.NewTestScope("test", nil) + config := DefaultConfig("test-consumer", "test-worker") + + factory, err := NewQueue(Params{ + DB: db, + Logger: logger, + MetricsScope: metricsScope, + Config: config, + }) + require.NoError(t, err) + defer factory.Close() + + // Verify we can get both publisher and subscriber + publisher := factory.Publisher() + subscriber := factory.Subscriber() + + assert.NotNil(t, publisher) + assert.NotNil(t, subscriber) + + // Verify they're singletons + assert.Equal(t, publisher, factory.Publisher()) + assert.Equal(t, subscriber, factory.Subscriber()) + + // Close should succeed + err = factory.Close() + assert.NoError(t, err) + + require.NoError(t, mock.ExpectationsWereMet()) +}