From 0dde36e097f02bf9d8c70d82255de9cb66a4e507 Mon Sep 17 00:00:00 2001 From: Preetam Dwivedi Date: Thu, 19 Feb 2026 16:28:40 -0800 Subject: [PATCH] feat(queue/sql): add publisher with gomock testing - Implement Publisher interface with SQL MessageStore backend - Add gomock-based mocks for MessageStore interface - Use fixed timestamps in tests for repeatability - Optimize mutex usage: only lock for closed flag check - Move logging outside mutex to avoid I/O under lock - Extract validateTopicName to validation.go for sharing - Add comprehensive test coverage for publish, close, validation, and concurrency --- MODULE.bazel | 1 + extensions/queue/sql/BUILD.bazel | 24 +- extensions/queue/sql/config.go | 8 - extensions/queue/sql/mock_stores.go | 56 ++++ extensions/queue/sql/publisher.go | 72 +++++ extensions/queue/sql/publisher_test.go | 357 +++++++++++++++++++++++++ extensions/queue/sql/stores.go | 15 ++ extensions/queue/sql/validation.go | 20 ++ go.mod | 1 + go.sum | 2 + 10 files changed, 546 insertions(+), 10 deletions(-) create mode 100644 extensions/queue/sql/mock_stores.go create mode 100644 extensions/queue/sql/publisher.go create mode 100644 extensions/queue/sql/publisher_test.go create mode 100644 extensions/queue/sql/stores.go create mode 100644 extensions/queue/sql/validation.go diff --git a/MODULE.bazel b/MODULE.bazel index 32c7cb19..9443731c 100644 --- a/MODULE.bazel +++ b/MODULE.bazel @@ -39,6 +39,7 @@ use_repo( "org_golang_google_grpc", "org_golang_google_protobuf", "org_uber_go_fx", + "org_uber_go_mock", "org_uber_go_yarpc", "org_uber_go_zap", ) diff --git a/extensions/queue/sql/BUILD.bazel b/extensions/queue/sql/BUILD.bazel index c2dd000c..df128279 100644 --- a/extensions/queue/sql/BUILD.bazel +++ b/extensions/queue/sql/BUILD.bazel @@ -2,17 +2,37 @@ load("@rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "sql", - srcs = ["config.go"], + srcs = [ + "config.go", + "mock_stores.go", + "publisher.go", + "stores.go", + "validation.go", + ], importpath = "github.com/uber/submitqueue/extensions/queue/sql", visibility = ["//visibility:public"], + deps = [ + "//entities/queue", + "@com_github_uber_go_tally_v4//:tally", + "@org_uber_go_mock//gomock", + "@org_uber_go_zap//:zap", + ], ) go_test( name = "sql_test", - srcs = ["config_test.go"], + srcs = [ + "config_test.go", + "publisher_test.go", + ], embed = [":sql"], deps = [ + "//entities/queue", + "//extensions/queue", "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", + "@com_github_uber_go_tally_v4//:tally", + "@org_uber_go_mock//gomock", + "@org_uber_go_zap//zaptest", ], ) diff --git a/extensions/queue/sql/config.go b/extensions/queue/sql/config.go index 4a9073ca..df3eb7fc 100644 --- a/extensions/queue/sql/config.go +++ b/extensions/queue/sql/config.go @@ -5,14 +5,6 @@ import ( "time" ) -const ( - // Fixed table names for single-table design - MessagesTableName = "queue_messages" - PartitionLeasesTableName = "queue_partition_leases" - OffsetsTableName = "queue_offsets" - DLQTableName = "queue_dlq" -) - // Config holds configuration for SQL-based queue. // DB connection, logger, and metrics are passed separately to NewFactory. type Config struct { diff --git a/extensions/queue/sql/mock_stores.go b/extensions/queue/sql/mock_stores.go new file mode 100644 index 00000000..05203a1d --- /dev/null +++ b/extensions/queue/sql/mock_stores.go @@ -0,0 +1,56 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: stores.go +// +// Generated by this command: +// +// mockgen -source=stores.go -destination=mock_stores.go -package=sql -self_package=github.com/uber/submitqueue/extensions/queue/sql +// + +// Package sql is a generated GoMock package. +package sql + +import ( + context "context" + reflect "reflect" + + queue "github.com/uber/submitqueue/entities/queue" + gomock "go.uber.org/mock/gomock" +) + +// MockMessageStore is a mock of MessageStore interface. +type MockMessageStore struct { + ctrl *gomock.Controller + recorder *MockMessageStoreMockRecorder + isgomock struct{} +} + +// MockMessageStoreMockRecorder is the mock recorder for MockMessageStore. +type MockMessageStoreMockRecorder struct { + mock *MockMessageStore +} + +// NewMockMessageStore creates a new mock instance. +func NewMockMessageStore(ctrl *gomock.Controller) *MockMessageStore { + mock := &MockMessageStore{ctrl: ctrl} + mock.recorder = &MockMessageStoreMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockMessageStore) EXPECT() *MockMessageStoreMockRecorder { + return m.recorder +} + +// Insert mocks base method. +func (m *MockMessageStore) Insert(ctx context.Context, topic string, messages []queue.Message) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Insert", ctx, topic, messages) + ret0, _ := ret[0].(error) + return ret0 +} + +// Insert indicates an expected call of Insert. +func (mr *MockMessageStoreMockRecorder) Insert(ctx, topic, messages any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Insert", reflect.TypeOf((*MockMessageStore)(nil).Insert), ctx, topic, messages) +} diff --git a/extensions/queue/sql/publisher.go b/extensions/queue/sql/publisher.go new file mode 100644 index 00000000..0e4d96a8 --- /dev/null +++ b/extensions/queue/sql/publisher.go @@ -0,0 +1,72 @@ +package sql + +import ( + "context" + "fmt" + "sync" + + "github.com/uber-go/tally/v4" + "go.uber.org/zap" + + "github.com/uber/submitqueue/entities/queue" +) + +type publisher struct { + config Config + logger *zap.SugaredLogger + metrics tally.Scope + messageStore MessageStore + mu sync.RWMutex + closed bool +} + +// NewPublisher creates a publisher with the given configuration and dependencies +func NewPublisher(config Config, logger *zap.SugaredLogger, metrics tally.Scope, messageStore MessageStore) *publisher { + return &publisher{ + config: config, + logger: logger, + metrics: metrics, + messageStore: messageStore, + } +} + +// Publish sends a message to the specified topic +func (p *publisher) Publish(ctx context.Context, topic string, message queue.Message) error { + // Check if closed (under lock) + p.mu.RLock() + closed := p.closed + p.mu.RUnlock() + + if closed { + p.logger.Errorw("publish failure: publisher is closed", "topic", topic) + return fmt.Errorf("publisher is closed") + } + + // Validate topic name (SQL-safe) + if err := validateTopicName(topic); err != nil { + p.logger.Errorw("publish failure: invalid topic name", "topic", topic, "error", err) + p.metrics.Tagged(map[string]string{"topic": topic}).Counter("publish_errors").Inc(1) + return fmt.Errorf("publish invalid topic name: %w", err) + } + + if err := p.messageStore.Insert(ctx, topic, []queue.Message{message}); err != nil { + p.metrics.Tagged(map[string]string{"topic": topic}).Counter("publish_errors").Inc(1) + p.logger.Errorw("publish failure: message store insert error", "topic", topic, "error", err) + return fmt.Errorf("publish message store insert error: %w", err) + } + + p.metrics.Tagged(map[string]string{"topic": topic}).Counter("messages_published").Inc(1) + p.logger.Debugw("published message", "topic", topic, "message_id", message.ID) + + return nil +} + +// Close gracefully shuts down the publisher +func (p *publisher) Close() error { + p.mu.Lock() + p.closed = true + p.mu.Unlock() + + p.logger.Info("publisher closed") + return nil +} diff --git a/extensions/queue/sql/publisher_test.go b/extensions/queue/sql/publisher_test.go new file mode 100644 index 00000000..101f4c93 --- /dev/null +++ b/extensions/queue/sql/publisher_test.go @@ -0,0 +1,357 @@ +package sql + +import ( + "context" + "fmt" + "testing" + + "github.com/stretchr/testify/require" + "github.com/uber-go/tally/v4" + "go.uber.org/mock/gomock" + "go.uber.org/zap/zaptest" + + "github.com/uber/submitqueue/entities/queue" + extqueue "github.com/uber/submitqueue/extensions/queue" + // mocks in same package +) + +const fixedTimestamp = int64(1234567890000) // Fixed timestamp for test repeatability + +func setupPublisherTest(t *testing.T, mockStore *MockMessageStore) extqueue.Publisher { + t.Helper() + + config := DefaultConfig("test-consumer", "test-worker") + + return NewPublisher(config, + zaptest.NewLogger(t).Sugar().Named("publisher"), + tally.NoopScope.SubScope("publisher"), + mockStore, + ) +} + +func TestPublisher_Publish(t *testing.T) { + tests := []struct { + name string + topic string + messages []queue.Message + wantErr bool + setupMock func(*MockMessageStore) + }{ + { + name: "publish single message", + topic: "test_topic", + messages: []queue.Message{ + {ID: "msg1", Payload: []byte("payload1"), PartitionKey: "part1", PublishedAt: fixedTimestamp}, + }, + wantErr: false, + setupMock: func(m *MockMessageStore) { + m.EXPECT().Insert(gomock.Any(), "test_topic", gomock.Any()).Return(nil).Times(1) + }, + }, + { + name: "publish multiple messages", + topic: "multi_topic", + messages: []queue.Message{ + {ID: "msg1", Payload: []byte("p1"), PartitionKey: "part1", PublishedAt: fixedTimestamp}, + {ID: "msg2", Payload: []byte("p2"), PartitionKey: "part1", PublishedAt: fixedTimestamp}, + {ID: "msg3", Payload: []byte("p3"), PartitionKey: "part2", PublishedAt: fixedTimestamp}, + }, + wantErr: false, + setupMock: func(m *MockMessageStore) { + m.EXPECT().Insert(gomock.Any(), "multi_topic", gomock.Any()).Return(nil).Times(3) + }, + }, + { + name: "publish empty messages is no-op", + topic: "empty_topic", + messages: []queue.Message{}, + wantErr: false, + setupMock: func(m *MockMessageStore) { + // No Insert expected + }, + }, + { + name: "publish with metadata", + topic: "metadata_topic", + messages: []queue.Message{ + { + ID: "msg_meta", + Payload: []byte("payload"), + PartitionKey: "part1", + Metadata: map[string]string{"key1": "val1", "key2": "val2"}, + PublishedAt: fixedTimestamp, + }, + }, + wantErr: false, + setupMock: func(m *MockMessageStore) { + m.EXPECT().Insert(gomock.Any(), "metadata_topic", gomock.Any()).Return(nil).Times(1) + }, + }, + { + name: "publish with invalid topic name - uppercase", + topic: "InvalidTopic", + messages: []queue.Message{ + {ID: "msg1", Payload: []byte("p"), PartitionKey: "part1", PublishedAt: fixedTimestamp}, + }, + wantErr: true, + setupMock: func(m *MockMessageStore) { + // No Insert expected since validation fails + }, + }, + { + name: "publish with invalid topic name - special chars", + topic: "topic-with-dash", + messages: []queue.Message{ + {ID: "msg1", Payload: []byte("p"), PartitionKey: "part1", PublishedAt: fixedTimestamp}, + }, + wantErr: true, + setupMock: func(m *MockMessageStore) { + // No Insert expected since validation fails + }, + }, + { + name: "publish with invalid topic name - empty", + topic: "", + messages: []queue.Message{ + {ID: "msg1", Payload: []byte("p"), PartitionKey: "part1", PublishedAt: fixedTimestamp}, + }, + wantErr: true, + setupMock: func(m *MockMessageStore) { + // No Insert expected since validation fails + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockStore := NewMockMessageStore(ctrl) + tt.setupMock(mockStore) + + pub := setupPublisherTest(t, mockStore) + + ctx := context.Background() + var err error + for _, msg := range tt.messages { + err = pub.Publish(ctx, tt.topic, msg) + if err != nil { + break + } + } + if tt.wantErr { + require.Error(t, err) + } else { + require.NoError(t, err) + } + }) + } +} + +func TestPublisher_PublishAfterClose(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockStore := NewMockMessageStore(ctrl) + pub := setupPublisherTest(t, mockStore) + + ctx := context.Background() + + // Close the publisher + err := pub.Close() + require.NoError(t, err) + + // Try to publish after close + msg := queue.NewMessage("msg1", []byte("payload")) + msg.PartitionKey = "part1" + err = pub.Publish(ctx, "test_topic", msg) + require.Error(t, err) +} + +func TestPublisher_Close(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockStore := NewMockMessageStore(ctrl) + pub := setupPublisherTest(t, mockStore) + + // Close should succeed + err := pub.Close() + require.NoError(t, err) + + // Closing again should still succeed (idempotent) + err = pub.Close() + require.NoError(t, err) +} + +func TestValidateTopicName(t *testing.T) { + tests := []struct { + name string + topicName string + wantErr bool + }{ + { + name: "valid topic - lowercase letters", + topicName: "mytopic", + wantErr: false, + }, + { + name: "valid topic - with numbers", + topicName: "topic123", + wantErr: false, + }, + { + name: "valid topic - with underscores", + topicName: "my_topic_name", + wantErr: false, + }, + { + name: "valid topic - all valid chars", + topicName: "abc_123_xyz", + wantErr: false, + }, + { + name: "invalid topic - empty", + topicName: "", + wantErr: true, + }, + { + name: "invalid topic - uppercase", + topicName: "MyTopic", + wantErr: true, + }, + { + name: "invalid topic - dash", + topicName: "my-topic", + wantErr: true, + }, + { + name: "invalid topic - dot", + topicName: "my.topic", + wantErr: true, + }, + { + name: "invalid topic - space", + topicName: "my topic", + wantErr: true, + }, + { + name: "invalid topic - special chars", + topicName: "topic!@#", + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockStore := NewMockMessageStore(ctrl) + pub := setupPublisherTest(t, mockStore) + + // Try to publish with this topic name + ctx := context.Background() + msg := queue.NewMessage("msg1", []byte("test")) + msg.PartitionKey = "part1" + + if !tt.wantErr { + mockStore.EXPECT().Insert(gomock.Any(), tt.topicName, gomock.Any()).Return(nil).Times(1) + } + + err := pub.Publish(ctx, tt.topicName, msg) + if tt.wantErr { + require.Error(t, err) + } else { + require.NoError(t, err) + } + }) + } +} + +func TestPublisher_PublishMetrics(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockStore := NewMockMessageStore(ctrl) + mockStore.EXPECT().Insert(gomock.Any(), "metrics_test", gomock.Any()).Return(nil).Times(2) + + pub := setupPublisherTest(t, mockStore) + + ctx := context.Background() + topic := "metrics_test" + + // Publish some messages + messages := []queue.Message{ + {ID: "msg1", Payload: []byte("p1"), PartitionKey: "part1", PublishedAt: fixedTimestamp}, + {ID: "msg2", Payload: []byte("p2"), PartitionKey: "part1", PublishedAt: fixedTimestamp}, + } + + for _, msg := range messages { + err := pub.Publish(ctx, topic, msg) + require.NoError(t, err) + } + + // Metrics should have been recorded (we're using NoopScope in tests, so just verify no errors) + // In a real implementation, you'd use a mock metrics scope to verify calls +} + +func TestPublisher_ConcurrentPublish(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + const numGoroutines = 10 + const messagesPerGoroutine = 5 + + mockStore := NewMockMessageStore(ctrl) + mockStore.EXPECT().Insert(gomock.Any(), "concurrent_topic", gomock.Any()).Return(nil).Times(numGoroutines * messagesPerGoroutine) + + pub := setupPublisherTest(t, mockStore) + + ctx := context.Background() + topic := "concurrent_topic" + + // Publish from multiple goroutines + errCh := make(chan error, numGoroutines*messagesPerGoroutine) + for i := 0; i < numGoroutines; i++ { + go func(id int) { + for j := 0; j < messagesPerGoroutine; j++ { + msg := queue.Message{ + ID: fmt.Sprintf("msg_%d_%d", id, j), + Payload: []byte(fmt.Sprintf("payload_%d_%d", id, j)), + PartitionKey: fmt.Sprintf("part_%d", id), + PublishedAt: fixedTimestamp, + } + errCh <- pub.Publish(ctx, topic, msg) + } + }(i) + } + + // Wait for all goroutines + for i := 0; i < numGoroutines*messagesPerGoroutine; i++ { + err := <-errCh + require.NoError(t, err) + } +} + +func TestPublisher_PublishContextCancellation(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockStore := NewMockMessageStore(ctrl) + mockStore.EXPECT().Insert(gomock.Any(), "test_topic", gomock.Any()).Return(context.Canceled).Times(1) + + pub := setupPublisherTest(t, mockStore) + + // Create a cancelled context + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + msg := queue.NewMessage("msg1", []byte("payload")) + msg.PartitionKey = "part1" + + // Should fail with context cancelled error + err := pub.Publish(ctx, "test_topic", msg) + require.Error(t, err) +} diff --git a/extensions/queue/sql/stores.go b/extensions/queue/sql/stores.go new file mode 100644 index 00000000..140e15a3 --- /dev/null +++ b/extensions/queue/sql/stores.go @@ -0,0 +1,15 @@ +package sql + +//go:generate mockgen -source=stores.go -destination=mock_stores.go -package=sql + +import ( + "context" + + "github.com/uber/submitqueue/entities/queue" +) + +// MessageStore handles message table operations +type MessageStore interface { + // Insert inserts messages into the topic table + Insert(ctx context.Context, topic string, messages []queue.Message) error +} diff --git a/extensions/queue/sql/validation.go b/extensions/queue/sql/validation.go new file mode 100644 index 00000000..b021aae2 --- /dev/null +++ b/extensions/queue/sql/validation.go @@ -0,0 +1,20 @@ +package sql + +import "fmt" + +// validateTopicName ensures topic name is safe for use as SQL table name +func validateTopicName(topic string) error { + if topic == "" { + return fmt.Errorf("topic name cannot be empty") + } + if len(topic) > 255 { + return fmt.Errorf("topic name too long (max 255 characters)") + } + // Only allow lowercase letters, numbers, and underscores + for _, c := range topic { + if !((c >= 'a' && c <= 'z') || (c >= '0' && c <= '9') || c == '_') { + return fmt.Errorf("topic name must contain only lowercase letters, numbers, and underscores") + } + } + return nil +} diff --git a/go.mod b/go.mod index be9c74ac..8155de0c 100644 --- a/go.mod +++ b/go.mod @@ -10,6 +10,7 @@ require ( github.com/testcontainers/testcontainers-go/modules/mysql v0.40.0 github.com/uber-go/tally/v4 v4.1.17 go.uber.org/fx v1.22.0 + go.uber.org/mock v0.6.0 go.uber.org/yarpc v1.81.0 go.uber.org/zap v1.27.1 google.golang.org/grpc v1.68.1 diff --git a/go.sum b/go.sum index 6ad3254d..022e3b78 100644 --- a/go.sum +++ b/go.sum @@ -287,6 +287,8 @@ go.uber.org/fx v1.22.0 h1:pApUK7yL0OUHMd8vkunWSlLxZVFFk70jR2nKde8X2NM= go.uber.org/fx v1.22.0/go.mod h1:HT2M7d7RHo+ebKGh9NRcrsrHHfpZ60nW3QRubMRfv48= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= +go.uber.org/mock v0.6.0 h1:hyF9dfmbgIX5EfOdasqLsWD6xqpNZlXblLB/Dbnwv3Y= +go.uber.org/mock v0.6.0/go.mod h1:KiVJ4BqZJaMj4svdfmHM0AUx4NJYO8ZNpPnZn1Z+BBU= go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= go.uber.org/net/metrics v1.4.0 h1:c2i1L/L3/XCgODG1vuYuA6K4v4VI18EOdDf1aF1YNg4=