diff --git a/core/consumer/consumer.go b/core/consumer/consumer.go index c8e3da6e..d860ca92 100644 --- a/core/consumer/consumer.go +++ b/core/consumer/consumer.go @@ -35,7 +35,7 @@ type consumer struct { mu sync.Mutex stopped bool controllers []Controller - subscriptions map[Topic]*activeSubscription // topic -> subscription + subscriptions map[TopicKey]*activeSubscription // topicKey -> subscription } // activeSubscription tracks the state of an active subscription. @@ -52,12 +52,12 @@ func New(logger *zap.SugaredLogger, scope tally.Scope, registry TopicRegistry) C logger: logger, metricsScope: scope.SubScope("consumer"), registry: registry, - subscriptions: make(map[Topic]*activeSubscription), + subscriptions: make(map[TopicKey]*activeSubscription), } } // Register adds a controller to the consumer. Must be called before Start(). -// Returns error if a controller for the same topic is already registered or if the consumer is stopped. +// Returns error if a controller for the same topic key is already registered or if the consumer is stopped. func (m *consumer) Register(controller Controller) error { m.mu.Lock() defer m.mu.Unlock() @@ -66,11 +66,11 @@ func (m *consumer) Register(controller Controller) error { return fmt.Errorf("consumer is stopped") } - // Check for duplicate topic registration. + // Check for duplicate topic key registration. // O(n) scan is fine here — controller count is in the single digits. for _, c := range m.controllers { - if c.Topic() == controller.Topic() { - return fmt.Errorf("controller for topic %s already registered", controller.Topic()) + if c.TopicKey() == controller.TopicKey() { + return fmt.Errorf("controller for topic key %s already registered", controller.TopicKey()) } } @@ -78,7 +78,7 @@ func (m *consumer) Register(controller Controller) error { m.logger.Infow("registered controller", "controller", controller.Name(), - "topic", controller.Topic(), + "topic_key", controller.TopicKey(), "consumer_group", controller.ConsumerGroup(), ) @@ -124,23 +124,29 @@ func (m *consumer) Start(ctx context.Context) error { // subscribe subscribes a controller to its topic and spawns a consumption goroutine. func (m *consumer) subscribe(ctx context.Context, controller Controller) error { - topic := controller.Topic() + topicKey := controller.TopicKey() consumerGroup := controller.ConsumerGroup() // Get subscription config from registry - config, ok := m.registry.SubscriptionConfig(topic, consumerGroup) + config, ok := m.registry.SubscriptionConfig(topicKey, consumerGroup) if !ok { - return fmt.Errorf("no subscription config for topic %s, consumer group %s", topic, consumerGroup) + return fmt.Errorf("no subscription config for topic key %s, consumer group %s", topicKey, consumerGroup) } - // Get queue for this topic - q, ok := m.registry.Queue(topic) + // Get queue for this topic key + q, ok := m.registry.Queue(topicKey) if !ok { - return fmt.Errorf("no queue registered for topic %s", topic) + return fmt.Errorf("no queue registered for topic key %s", topicKey) + } + + // Resolve the actual topic name for subscribing + topicName, ok := m.registry.TopicName(topicKey) + if !ok { + return fmt.Errorf("no topic name registered for topic key %s", topicKey) } subscriber := q.Subscriber() - deliveryChan, err := subscriber.Subscribe(ctx, topic.String(), config) + deliveryChan, err := subscriber.Subscribe(ctx, topicName, config) if err != nil { return fmt.Errorf("subscribe failed: %w", err) } @@ -155,14 +161,14 @@ func (m *consumer) subscribe(ctx context.Context, controller Controller) error { cancelFunc: cancel, done: done, } - m.subscriptions[topic] = sub + m.subscriptions[topicKey] = sub // Spawn consumption goroutine go m.consumeLoop(controllerCtx, controller, deliveryChan, done) m.logger.Infow("controller started", "controller", controller.Name(), - "topic", topic, + "topic_key", topicKey, "consumer_group", consumerGroup, ) @@ -173,16 +179,16 @@ func (m *consumer) subscribe(ctx context.Context, controller Controller) error { func (m *consumer) consumeLoop(ctx context.Context, controller Controller, deliveryChan <-chan queue.Delivery, done chan struct{}) { defer close(done) - topic := controller.Topic() + topicKey := controller.TopicKey() controllerScope := m.metricsScope.Tagged(map[string]string{ "controller": controller.Name(), - "topic": topic.String(), + "topic_key": topicKey.String(), }) m.logger.Debugw("consume loop started", "controller", controller.Name(), - "topic", topic, + "topic_key", topicKey, ) for { @@ -190,7 +196,7 @@ func (m *consumer) consumeLoop(ctx context.Context, controller Controller, deliv case <-ctx.Done(): m.logger.Infow("consume loop stopped", "controller", controller.Name(), - "topic", topic, + "topic_key", topicKey, ) return @@ -198,7 +204,7 @@ func (m *consumer) consumeLoop(ctx context.Context, controller Controller, deliv if !ok { m.logger.Infow("delivery channel closed", "controller", controller.Name(), - "topic", topic, + "topic_key", topicKey, ) return } @@ -214,11 +220,11 @@ func (m *consumer) processDelivery(ctx context.Context, controller Controller, d controllerScope.Counter("messages_received").Inc(1) msg := delivery.Message() - topic := controller.Topic() + topicKey := controller.TopicKey() m.logger.Debugw("processing delivery", "controller", controller.Name(), - "topic", topic, + "topic_key", topicKey, "message_id", msg.ID, "partition_key", msg.PartitionKey, "attempt", delivery.Attempt(), @@ -248,7 +254,7 @@ func (m *consumer) processDelivery(ctx context.Context, controller Controller, d if IsNonRetryable(err) { m.logger.Errorw("non-retryable controller error, rejecting message", "controller", controller.Name(), - "topic", controller.Topic(), + "topic_key", controller.TopicKey(), "message_id", msg.ID, "partition_key", msg.PartitionKey, "attempt", delivery.Attempt(), @@ -262,7 +268,7 @@ func (m *consumer) processDelivery(ctx context.Context, controller Controller, d if rejectErr := delivery.Reject(ctx, err.Error()); rejectErr != nil { m.logger.Errorw("failed to reject non-retryable message", "controller", controller.Name(), - "topic", controller.Topic(), + "topic_key", controller.TopicKey(), "message_id", msg.ID, "error", rejectErr, ) @@ -274,7 +280,7 @@ func (m *consumer) processDelivery(ctx context.Context, controller Controller, d // Controller returned retryable error - nack message for retry m.logger.Errorw("controller error, nacking message", "controller", controller.Name(), - "topic", topic, + "topic_key", topicKey, "message_id", msg.ID, "partition_key", msg.PartitionKey, "attempt", delivery.Attempt(), @@ -289,7 +295,7 @@ func (m *consumer) processDelivery(ctx context.Context, controller Controller, d if nackErr := delivery.Nack(ctx, 0); nackErr != nil { m.logger.Errorw("failed to nack message", "controller", controller.Name(), - "topic", topic, + "topic_key", topicKey, "message_id", msg.ID, "error", nackErr, ) @@ -310,7 +316,7 @@ func (m *consumer) processDelivery(ctx context.Context, controller Controller, d if ackErr := delivery.Ack(ctx); ackErr != nil { m.logger.Errorw("failed to ack message", "controller", controller.Name(), - "topic", topic, + "topic_key", topicKey, "message_id", msg.ID, "error", ackErr, ) @@ -334,7 +340,7 @@ func (m *consumer) processDelivery(ctx context.Context, controller Controller, d m.logger.Debugw("message processed successfully", "controller", controller.Name(), - "topic", topic, + "topic_key", topicKey, "message_id", msg.ID, "partition_key", msg.PartitionKey, "attempt", delivery.Attempt(), @@ -368,10 +374,10 @@ func (m *consumer) Stop(timeoutMs int64) error { // Returns error on timeout, nil on success. func (m *consumer) unsubscribeAll(timeoutMs int64) error { // Cancel all subscription contexts - for topic, sub := range m.subscriptions { + for topicKey, sub := range m.subscriptions { m.logger.Debugw("stopping controller", "controller", sub.controller.Name(), - "topic", topic, + "topic_key", topicKey, ) sub.cancelFunc() } @@ -379,7 +385,7 @@ func (m *consumer) unsubscribeAll(timeoutMs int64) error { // Wait for each subscription to finish, splitting the timeout budget across them remaining := time.Duration(timeoutMs) * time.Millisecond var timedOut bool - for topic, sub := range m.subscriptions { + for topicKey, sub := range m.subscriptions { start := time.Now() select { case <-sub.done: @@ -387,7 +393,7 @@ func (m *consumer) unsubscribeAll(timeoutMs int64) error { case <-time.After(remaining): m.logger.Errorw("timeout waiting for controller to stop", "controller", sub.controller.Name(), - "topic", topic, + "topic_key", topicKey, ) timedOut = true } @@ -399,7 +405,7 @@ func (m *consumer) unsubscribeAll(timeoutMs int64) error { } // Clear subscriptions - m.subscriptions = make(map[Topic]*activeSubscription) + m.subscriptions = make(map[TopicKey]*activeSubscription) if timedOut { return fmt.Errorf("timeout waiting for controllers to stop") diff --git a/core/consumer/consumer_test.go b/core/consumer/consumer_test.go index 3af92387..9f6db578 100644 --- a/core/consumer/consumer_test.go +++ b/core/consumer/consumer_test.go @@ -20,9 +20,9 @@ import ( ) // setupController configures a MockController with standard expectations. -func setupController(mc *consumermock.MockController, name string, topic consumer.Topic, consumerGroup string, processFunc func(context.Context, consumer.Delivery) error) { +func setupController(mc *consumermock.MockController, name string, topicKey consumer.TopicKey, consumerGroup string, processFunc func(context.Context, consumer.Delivery) error) { mc.EXPECT().Name().Return(name).AnyTimes() - mc.EXPECT().Topic().Return(topic).AnyTimes() + mc.EXPECT().TopicKey().Return(topicKey).AnyTimes() mc.EXPECT().ConsumerGroup().Return(consumerGroup).AnyTimes() if processFunc != nil { mc.EXPECT().Process(gomock.Any(), gomock.Any()).DoAndReturn(processFunc).AnyTimes() @@ -30,13 +30,22 @@ func setupController(mc *consumermock.MockController, name string, topic consume } // newRegistry creates a TopicRegistry with a mock queue and default subscription config. -func newRegistry(q extqueue.Queue, topic consumer.Topic, consumerGroup string) consumer.TopicRegistry { - return consumer.NewTopicRegistry( - []consumer.TopicConfig{{Topic: topic, Queue: q}}, - []extqueue.SubscriptionConfig{ - extqueue.DefaultSubscriptionConfig(topic.String(), "test-worker", consumerGroup), +func newRegistry(t *testing.T, q extqueue.Queue, topicKey consumer.TopicKey, consumerGroup string) consumer.TopicRegistry { + t.Helper() + reg, err := consumer.NewTopicRegistry( + []consumer.TopicConfig{ + { + Key: topicKey, + Name: topicKey.String(), + Queue: q, + Subscription: extqueue.DefaultSubscriptionConfig( + "test-worker", consumerGroup, + ), + }, }, ) + require.NoError(t, err) + return reg } // setupDelivery creates a MockDelivery with standard expectations and a done channel @@ -62,7 +71,8 @@ func setupDelivery(del *queuemock.MockDelivery, msg queue.Message, ackErr, nackE func TestNew(t *testing.T) { logger := zaptest.NewLogger(t).Sugar() - reg := consumer.NewTopicRegistry(nil, nil) + reg, err := consumer.NewTopicRegistry(nil) + require.NoError(t, err) c := consumer.New(logger, tally.NoopScope, reg) require.NotNil(t, c) @@ -72,14 +82,14 @@ func TestConsumer_Register(t *testing.T) { ctrl := gomock.NewController(t) logger := zaptest.NewLogger(t).Sugar() - reg := consumer.NewTopicRegistry(nil, nil) + reg, _ := consumer.NewTopicRegistry(nil) c := consumer.New(logger, tally.NoopScope, reg) handler1 := consumermock.NewMockController(ctrl) - setupController(handler1, "handler1", consumer.TopicRequest, "group1", nil) + setupController(handler1, "handler1", consumer.TopicKeyRequest, "group1", nil) handler2 := consumermock.NewMockController(ctrl) - setupController(handler2, "handler2", consumer.Topic("other-topic"), "group2", nil) + setupController(handler2, "handler2", consumer.TopicKey("other-topic"), "group2", nil) err := c.Register(handler1) require.NoError(t, err) @@ -92,14 +102,14 @@ func TestConsumer_Register_DuplicateTopic(t *testing.T) { ctrl := gomock.NewController(t) logger := zaptest.NewLogger(t).Sugar() - reg := consumer.NewTopicRegistry(nil, nil) + reg, _ := consumer.NewTopicRegistry(nil) c := consumer.New(logger, tally.NoopScope, reg) handler1 := consumermock.NewMockController(ctrl) - setupController(handler1, "handler1", consumer.TopicRequest, "group1", nil) + setupController(handler1, "handler1", consumer.TopicKeyRequest, "group1", nil) handler2 := consumermock.NewMockController(ctrl) - setupController(handler2, "handler2", consumer.TopicRequest, "group2", nil) + setupController(handler2, "handler2", consumer.TopicKeyRequest, "group2", nil) err := c.Register(handler1) require.NoError(t, err) @@ -112,14 +122,14 @@ func TestConsumer_Register_AfterStop(t *testing.T) { ctrl := gomock.NewController(t) logger := zaptest.NewLogger(t).Sugar() - reg := consumer.NewTopicRegistry(nil, nil) + reg, _ := consumer.NewTopicRegistry(nil) c := consumer.New(logger, tally.NoopScope, reg) err := c.Stop(1000) require.NoError(t, err) handler := consumermock.NewMockController(ctrl) - setupController(handler, "handler1", consumer.TopicRequest, "group1", nil) + setupController(handler, "handler1", consumer.TopicKeyRequest, "group1", nil) err = c.Register(handler) assert.Error(t, err) @@ -128,7 +138,7 @@ func TestConsumer_Register_AfterStop(t *testing.T) { func TestConsumer_Start_NoHandlers(t *testing.T) { logger := zaptest.NewLogger(t).Sugar() - reg := consumer.NewTopicRegistry(nil, nil) + reg, _ := consumer.NewTopicRegistry(nil) c := consumer.New(logger, tally.NoopScope, reg) err := c.Start(context.Background()) @@ -139,11 +149,11 @@ func TestConsumer_Start_AfterStop(t *testing.T) { ctrl := gomock.NewController(t) logger := zaptest.NewLogger(t).Sugar() - reg := consumer.NewTopicRegistry(nil, nil) + reg, _ := consumer.NewTopicRegistry(nil) c := consumer.New(logger, tally.NoopScope, reg) handler := consumermock.NewMockController(ctrl) - setupController(handler, "handler1", consumer.TopicRequest, "group1", nil) + setupController(handler, "handler1", consumer.TopicKeyRequest, "group1", nil) err := c.Register(handler) require.NoError(t, err) @@ -161,17 +171,17 @@ func TestConsumer_Start_MissingSubscriptionConfig(t *testing.T) { mockQ := queuemock.NewMockQueue(ctrl) // Registry has queue but no subscription config - reg := consumer.NewTopicRegistry( - []consumer.TopicConfig{{Topic: consumer.TopicRequest, Queue: mockQ}}, - nil, + reg, err := consumer.NewTopicRegistry( + []consumer.TopicConfig{{Key: consumer.TopicKeyRequest, Name: "request", Queue: mockQ}}, ) + require.NoError(t, err) c := consumer.New(logger, tally.NoopScope, reg) handler := consumermock.NewMockController(ctrl) - setupController(handler, "handler", consumer.TopicRequest, "group", nil) + setupController(handler, "handler", consumer.TopicKeyRequest, "group", nil) - err := c.Register(handler) + err = c.Register(handler) require.NoError(t, err) err = c.Start(context.Background()) @@ -190,12 +200,12 @@ func TestConsumer_Start_SubscribeFailure(t *testing.T) { mockQ := queuemock.NewMockQueue(ctrl) mockQ.EXPECT().Subscriber().Return(mockSub) - reg := newRegistry(mockQ, consumer.TopicRequest, "group") + reg := newRegistry(t, mockQ, consumer.TopicKeyRequest, "group") c := consumer.New(logger, tally.NoopScope, reg) handler := consumermock.NewMockController(ctrl) - setupController(handler, "handler", consumer.TopicRequest, "group", nil) + setupController(handler, "handler", consumer.TopicKeyRequest, "group", nil) err := c.Register(handler) require.NoError(t, err) @@ -216,13 +226,13 @@ func TestConsumer_ProcessDelivery_Success(t *testing.T) { mockQ := queuemock.NewMockQueue(ctrl) mockQ.EXPECT().Subscriber().Return(mockSub) - reg := newRegistry(mockQ, consumer.TopicRequest, "test-group") + reg := newRegistry(t, mockQ, consumer.TopicKeyRequest, "test-group") c := consumer.New(logger, tally.NoopScope, reg) handledMsg := "" handler := consumermock.NewMockController(ctrl) - setupController(handler, "test-handler", consumer.TopicRequest, "test-group", + setupController(handler, "test-handler", consumer.TopicKeyRequest, "test-group", func(ctx context.Context, delivery consumer.Delivery) error { handledMsg = delivery.Message().ID return nil @@ -262,12 +272,12 @@ func TestConsumer_ProcessDelivery_Error(t *testing.T) { mockQ := queuemock.NewMockQueue(ctrl) mockQ.EXPECT().Subscriber().Return(mockSub) - reg := newRegistry(mockQ, consumer.TopicRequest, "test-group") + reg := newRegistry(t, mockQ, consumer.TopicKeyRequest, "test-group") c := consumer.New(logger, tally.NoopScope, reg) handler := consumermock.NewMockController(ctrl) - setupController(handler, "test-handler", consumer.TopicRequest, "test-group", + setupController(handler, "test-handler", consumer.TopicKeyRequest, "test-group", func(ctx context.Context, delivery consumer.Delivery) error { return fmt.Errorf("processing failed") }, @@ -304,12 +314,12 @@ func TestConsumer_ProcessDelivery_NonRetryableError(t *testing.T) { mockQ := queuemock.NewMockQueue(ctrl) mockQ.EXPECT().Subscriber().Return(mockSub) - reg := newRegistry(mockQ, consumer.TopicRequest, "test-group") + reg := newRegistry(t, mockQ, consumer.TopicKeyRequest, "test-group") c := consumer.New(logger, tally.NoopScope, reg) handler := consumermock.NewMockController(ctrl) - setupController(handler, "test-handler", consumer.TopicRequest, "test-group", + setupController(handler, "test-handler", consumer.TopicKeyRequest, "test-group", func(ctx context.Context, delivery consumer.Delivery) error { return consumer.NewNonRetryableError(fmt.Errorf("bad payload")) }, @@ -355,12 +365,12 @@ func TestConsumer_Stop(t *testing.T) { mockQ := queuemock.NewMockQueue(ctrl) mockQ.EXPECT().Subscriber().Return(mockSub) - reg := newRegistry(mockQ, consumer.TopicRequest, "test-group") + reg := newRegistry(t, mockQ, consumer.TopicKeyRequest, "test-group") c := consumer.New(logger, tally.NoopScope, reg) handler := consumermock.NewMockController(ctrl) - setupController(handler, "test-handler", consumer.TopicRequest, "test-group", nil) + setupController(handler, "test-handler", consumer.TopicKeyRequest, "test-group", nil) err := c.Register(handler) require.NoError(t, err) @@ -413,12 +423,12 @@ func TestConsumer_ObservabilityTags(t *testing.T) { mockQ := queuemock.NewMockQueue(ctrl) mockQ.EXPECT().Subscriber().Return(mockSub) - reg := newRegistry(mockQ, consumer.TopicRequest, "test-group") + reg := newRegistry(t, mockQ, consumer.TopicKeyRequest, "test-group") testC := consumer.New(logger, testScope, reg) handler := consumermock.NewMockController(ctrl) - setupController(handler, "test-handler", consumer.TopicRequest, "test-group", + setupController(handler, "test-handler", consumer.TopicKeyRequest, "test-group", func(ctx context.Context, delivery consumer.Delivery) error { return tt.handlerError }, @@ -488,12 +498,12 @@ func TestConsumer_AckNackLatencyTracking(t *testing.T) { mockQ := queuemock.NewMockQueue(ctrl) mockQ.EXPECT().Subscriber().Return(mockSub) - reg := newRegistry(mockQ, consumer.TopicRequest, "test-group") + reg := newRegistry(t, mockQ, consumer.TopicKeyRequest, "test-group") c := consumer.New(logger, scope, reg) handler := consumermock.NewMockController(ctrl) - setupController(handler, "test-handler", consumer.TopicRequest, "test-group", + setupController(handler, "test-handler", consumer.TopicKeyRequest, "test-group", func(ctx context.Context, delivery consumer.Delivery) error { return nil }, ) @@ -533,12 +543,12 @@ func TestConsumer_ErrorMetrics(t *testing.T) { mockQ := queuemock.NewMockQueue(ctrl) mockQ.EXPECT().Subscriber().Return(mockSub) - reg := newRegistry(mockQ, consumer.TopicRequest, "test-group") + reg := newRegistry(t, mockQ, consumer.TopicKeyRequest, "test-group") c := consumer.New(logger, scope, reg) handler := consumermock.NewMockController(ctrl) - setupController(handler, "test-handler", consumer.TopicRequest, "test-group", + setupController(handler, "test-handler", consumer.TopicKeyRequest, "test-group", func(ctx context.Context, delivery consumer.Delivery) error { return fmt.Errorf("processing failed") }, diff --git a/core/consumer/controller.go b/core/consumer/controller.go index afafb9b5..cc398638 100644 --- a/core/consumer/controller.go +++ b/core/consumer/controller.go @@ -85,8 +85,8 @@ type Controller interface { // Name returns the controller name for logging and metrics. Name() string - // Topic returns the topic this controller subscribes to. - Topic() Topic + // TopicKey returns the topic key this controller subscribes to. + TopicKey() TopicKey // ConsumerGroup returns the consumer group for offset tracking. // Multiple controllers can share a consumer group to load-balance across workers. diff --git a/core/consumer/mock/controller_mock.go b/core/consumer/mock/controller_mock.go index b9ed316b..7e627e77 100644 --- a/core/consumer/mock/controller_mock.go +++ b/core/consumer/mock/controller_mock.go @@ -192,16 +192,16 @@ func (mr *MockControllerMockRecorder) Process(ctx, delivery any) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Process", reflect.TypeOf((*MockController)(nil).Process), ctx, delivery) } -// Topic mocks base method. -func (m *MockController) Topic() consumer.Topic { +// TopicKey mocks base method. +func (m *MockController) TopicKey() consumer.TopicKey { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Topic") - ret0, _ := ret[0].(consumer.Topic) + ret := m.ctrl.Call(m, "TopicKey") + ret0, _ := ret[0].(consumer.TopicKey) return ret0 } -// Topic indicates an expected call of Topic. -func (mr *MockControllerMockRecorder) Topic() *gomock.Call { +// TopicKey indicates an expected call of TopicKey. +func (mr *MockControllerMockRecorder) TopicKey() *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Topic", reflect.TypeOf((*MockController)(nil).Topic)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TopicKey", reflect.TypeOf((*MockController)(nil).TopicKey)) } diff --git a/core/consumer/registry.go b/core/consumer/registry.go index 7cbf65cd..0bbfbaec 100644 --- a/core/consumer/registry.go +++ b/core/consumer/registry.go @@ -1,105 +1,140 @@ package consumer -import "github.com/uber/submitqueue/extension/queue" +import ( + "fmt" -// Topic identifies a queue topic in the pipeline. -type Topic string + "github.com/uber/submitqueue/extension/queue" +) + +// TopicKey identifies a pipeline stage. It is a fixed key used to +// look up queue backends, topic names, and subscription configs +// in the TopicRegistry. The actual queue topic name is provided +// separately via TopicConfig.Name so that library consumers can +// choose their own naming conventions. +type TopicKey string const ( - // TopicRequest is where new requests arrive from the gateway. - TopicRequest Topic = "request" - // TopicToBatch is where validated requests are published for batching. - TopicToBatch Topic = "to-batch" - // TopicBatched is where batched requests are published for speculation. - TopicBatched Topic = "batched" - // TopicBuild is where requests are published for builds. - TopicBuild Topic = "build" - // TopicBuildSignal is where build signals are published for processing. - TopicBuildSignal Topic = "build-signal" - // TopicToMerge is where requests are published for merging. - TopicToMerge Topic = "to-merge" - // TopicMergeSignal is where merge signals are published for processing. - TopicMergeSignal Topic = "merge-signal" - // TopicFinalize is where requests are published for finalization. - TopicFinalize Topic = "finalize" + // TopicKeyRequest is the pipeline stage where new requests arrive from the gateway. + TopicKeyRequest TopicKey = "request" + // TopicKeyToBatch is the pipeline stage where validated requests are published for batching. + TopicKeyToBatch TopicKey = "to-batch" + // TopicKeyBatched is the pipeline stage where batched requests are published for speculation. + TopicKeyBatched TopicKey = "batched" + // TopicKeyBuild is the pipeline stage where requests are published for builds. + TopicKeyBuild TopicKey = "build" + // TopicKeyBuildSignal is the pipeline stage where build signals are published for processing. + TopicKeyBuildSignal TopicKey = "build-signal" + // TopicKeyToMerge is the pipeline stage where requests are published for merging. + TopicKeyToMerge TopicKey = "to-merge" + // TopicKeyMergeSignal is the pipeline stage where merge signals are published for processing. + TopicKeyMergeSignal TopicKey = "merge-signal" + // TopicKeyFinalize is the pipeline stage where requests are published for finalization. + TopicKeyFinalize TopicKey = "finalize" ) -// AllTopics returns all defined pipeline topics. -// Update this list when adding new topics. -var AllTopics = []Topic{ - TopicRequest, - TopicToBatch, - TopicBatched, - TopicBuild, - TopicBuildSignal, - TopicToMerge, - TopicMergeSignal, - TopicFinalize, -} - -// String returns the topic name as a string. -func (t Topic) String() string { +// String returns the topic key as a string. +func (t TopicKey) String() string { return string(t) } -// TopicConfig maps a topic to its queue backend. +// TopicConfig combines all configuration for a single pipeline topic: +// the fixed key, the actual queue topic name, the queue backend, and +// (optionally) subscription settings. type TopicConfig struct { - // Topic is the topic identifier. - Topic Topic + // Key is the fixed pipeline stage identifier. + Key TopicKey + // Name is the actual queue topic name (e.g. "request", "my-custom-request"). + Name string // Queue is the queue backend for this topic. Queue queue.Queue + // Subscription is the subscription configuration for this topic. + // Leave at zero value for publish-only topics. + Subscription queue.SubscriptionConfig } -// TopicRegistry provides queue and subscription config for topics. -// Each topic can have a different queue backend. +// TopicRegistry provides queue, topic name, and subscription config for topics. +// Each topic can have a different queue backend and topic name. type TopicRegistry struct { - queues map[Topic]queue.Queue + queues map[TopicKey]queue.Queue + topicNames map[TopicKey]string subscriptionConfigs map[topicGroup]queue.SubscriptionConfig } -// topicGroup identifies a topic and consumer group pair. +// topicGroup identifies a topic key and consumer group pair. type topicGroup struct { - topic Topic + topicKey TopicKey consumerGroup string } -// NewTopicRegistry creates a new TopicRegistry. -// - topicConfigs: maps each topic to its queue backend -// - subscriptionConfigs: subscription configurations for each topic+consumerGroup -func NewTopicRegistry( - topicConfigs []TopicConfig, - subscriptionConfigs []queue.SubscriptionConfig, -) TopicRegistry { - queues := make(map[Topic]queue.Queue, len(topicConfigs)) - for _, tc := range topicConfigs { - queues[tc.Topic] = tc.Queue - } +// NewTopicRegistry creates a new TopicRegistry from a list of TopicConfigs. +// Returns an error if any topic name is invalid. +func NewTopicRegistry(configs []TopicConfig) (TopicRegistry, error) { + queues := make(map[TopicKey]queue.Queue, len(configs)) + topicNames := make(map[TopicKey]string, len(configs)) + subConfigs := make(map[topicGroup]queue.SubscriptionConfig) + + for _, cfg := range configs { + if err := ValidateTopicName(cfg.Name); err != nil { + return TopicRegistry{}, fmt.Errorf("invalid topic name for key %s: %w", cfg.Key, err) + } + + queues[cfg.Key] = cfg.Queue + topicNames[cfg.Key] = cfg.Name - configs := make(map[topicGroup]queue.SubscriptionConfig, len(subscriptionConfigs)) - for _, cfg := range subscriptionConfigs { - key := topicGroup{ - topic: Topic(cfg.Topic), - consumerGroup: cfg.ConsumerGroup, + // Register subscription config if a consumer group is set. + if cfg.Subscription.ConsumerGroup != "" { + sub := cfg.Subscription + key := topicGroup{ + topicKey: cfg.Key, + consumerGroup: sub.ConsumerGroup, + } + subConfigs[key] = sub } - configs[key] = cfg } return TopicRegistry{ queues: queues, - subscriptionConfigs: configs, + topicNames: topicNames, + subscriptionConfigs: subConfigs, + }, nil +} + +// ValidateTopicName ensures a topic name is valid. +// Topic names must be non-empty, at most 255 characters, and contain only +// lowercase letters, numbers, underscores, and hyphens. +func ValidateTopicName(name string) error { + if name == "" { + return fmt.Errorf("topic name cannot be empty") } + if len(name) > 255 { + return fmt.Errorf("topic name too long (max 255 characters)") + } + for _, c := range name { + if !((c >= 'a' && c <= 'z') || (c >= '0' && c <= '9') || c == '_' || c == '-') { + return fmt.Errorf("topic name must contain only lowercase letters, numbers, underscores, and hyphens") + } + } + return nil } -// Queue returns the queue backend for the given topic. -// Returns ok=false if no queue is registered for this topic. -func (r TopicRegistry) Queue(topic Topic) (queue.Queue, bool) { - q, ok := r.queues[topic] +// Queue returns the queue backend for the given topic key. +// Returns ok=false if no queue is registered for this key. +func (r TopicRegistry) Queue(key TopicKey) (queue.Queue, bool) { + q, ok := r.queues[key] return q, ok } -// SubscriptionConfig returns the subscription configuration for the given topic and consumer group. +// TopicName returns the actual queue topic name for the given key. +// Returns ok=false if no topic is registered for this key. +func (r TopicRegistry) TopicName(key TopicKey) (string, bool) { + name, ok := r.topicNames[key] + return name, ok +} + +// SubscriptionConfig returns the subscription configuration for the given +// topic key and consumer group. // Returns ok=false if no configuration is registered. -func (r TopicRegistry) SubscriptionConfig(topic Topic, consumerGroup string) (queue.SubscriptionConfig, bool) { - cfg, ok := r.subscriptionConfigs[topicGroup{topic: topic, consumerGroup: consumerGroup}] +func (r TopicRegistry) SubscriptionConfig(key TopicKey, consumerGroup string) (queue.SubscriptionConfig, bool) { + cfg, ok := r.subscriptionConfigs[topicGroup{topicKey: key, consumerGroup: consumerGroup}] return cfg, ok } diff --git a/core/consumer/registry_test.go b/core/consumer/registry_test.go index 250e7430..b5ee39f5 100644 --- a/core/consumer/registry_test.go +++ b/core/consumer/registry_test.go @@ -12,41 +12,71 @@ import ( ) func TestNewTopicRegistry(t *testing.T) { + ctrl := gomock.NewController(t) + mockQ := queuemock.NewMockQueue(ctrl) + + registry, err := consumer.NewTopicRegistry( + []consumer.TopicConfig{ + { + Key: consumer.TopicKeyRequest, + Name: "request", + Queue: mockQ, + Subscription: extqueue.DefaultSubscriptionConfig( + "worker-1", "group-a", + ), + }, + }, + ) + require.NoError(t, err) + + q, ok := registry.Queue(consumer.TopicKeyRequest) + require.True(t, ok) + assert.Equal(t, mockQ, q) + + name, ok := registry.TopicName(consumer.TopicKeyRequest) + require.True(t, ok) + assert.Equal(t, "request", name) + + cfg, ok := registry.SubscriptionConfig(consumer.TopicKeyRequest, "group-a") + require.True(t, ok) + assert.Equal(t, "group-a", cfg.ConsumerGroup) +} + +func TestNewTopicRegistry_InvalidTopicName(t *testing.T) { tests := []struct { - name string - configs []extqueue.SubscriptionConfig + name string + topicName string }{ { - name: "with configs", - configs: []extqueue.SubscriptionConfig{ - extqueue.DefaultSubscriptionConfig("request", "worker-1", "group-a"), - extqueue.DefaultSubscriptionConfig("request", "worker-1", "group-b"), - }, + name: "empty name", + topicName: "", + }, + { + name: "uppercase letters", + topicName: "InvalidTopic", + }, + { + name: "dots", + topicName: "my.topic", }, { - name: "nil configs", - configs: nil, + name: "spaces", + topicName: "my topic", + }, + { + name: "special chars", + topicName: "topic!@#", }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - ctrl := gomock.NewController(t) - mockQ := queuemock.NewMockQueue(ctrl) - - registry := consumer.NewTopicRegistry( - []consumer.TopicConfig{{Topic: consumer.TopicRequest, Queue: mockQ}}, - tt.configs, + _, err := consumer.NewTopicRegistry( + []consumer.TopicConfig{ + {Key: consumer.TopicKeyRequest, Name: tt.topicName}, + }, ) - - q, ok := registry.Queue(consumer.TopicRequest) - require.True(t, ok) - assert.Equal(t, mockQ, q) - - if tt.configs == nil { - _, ok := registry.SubscriptionConfig(consumer.TopicRequest, "group-a") - assert.False(t, ok) - } + require.Error(t, err) }) } } @@ -54,49 +84,55 @@ func TestNewTopicRegistry(t *testing.T) { func TestTopicRegistry_SubscriptionConfig(t *testing.T) { tests := []struct { name string - configs []extqueue.SubscriptionConfig - lookupTopic consumer.Topic + configs []consumer.TopicConfig + lookupKey consumer.TopicKey lookupGroup string expectFound bool expectedGroup string }{ { name: "found group-a", - configs: []extqueue.SubscriptionConfig{ - extqueue.DefaultSubscriptionConfig("request", "worker-1", "group-a"), - extqueue.DefaultSubscriptionConfig("request", "worker-1", "group-b"), + configs: []consumer.TopicConfig{ + { + Key: consumer.TopicKeyRequest, + Name: "request", + Subscription: extqueue.DefaultSubscriptionConfig( + "worker-1", "group-a", + ), + }, }, - lookupTopic: consumer.TopicRequest, + lookupKey: consumer.TopicKeyRequest, lookupGroup: "group-a", expectFound: true, expectedGroup: "group-a", }, - { - name: "found group-b", - configs: []extqueue.SubscriptionConfig{ - extqueue.DefaultSubscriptionConfig("request", "worker-1", "group-a"), - extqueue.DefaultSubscriptionConfig("request", "worker-1", "group-b"), - }, - lookupTopic: consumer.TopicRequest, - lookupGroup: "group-b", - expectFound: true, - expectedGroup: "group-b", - }, { name: "not found by group", - configs: []extqueue.SubscriptionConfig{ - extqueue.DefaultSubscriptionConfig("request", "worker-1", "group-a"), + configs: []consumer.TopicConfig{ + { + Key: consumer.TopicKeyRequest, + Name: "request", + Subscription: extqueue.DefaultSubscriptionConfig( + "worker-1", "group-a", + ), + }, }, - lookupTopic: consumer.TopicRequest, + lookupKey: consumer.TopicKeyRequest, lookupGroup: "nonexistent", expectFound: false, }, { - name: "not found by topic", - configs: []extqueue.SubscriptionConfig{ - extqueue.DefaultSubscriptionConfig("request", "worker-1", "group-a"), + name: "not found by topic key", + configs: []consumer.TopicConfig{ + { + Key: consumer.TopicKeyRequest, + Name: "request", + Subscription: extqueue.DefaultSubscriptionConfig( + "worker-1", "group-a", + ), + }, }, - lookupTopic: consumer.Topic("other"), + lookupKey: consumer.TopicKey("other"), lookupGroup: "group-a", expectFound: false, }, @@ -104,14 +140,9 @@ func TestTopicRegistry_SubscriptionConfig(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - ctrl := gomock.NewController(t) - mockQ := queuemock.NewMockQueue(ctrl) - - registry := consumer.NewTopicRegistry( - []consumer.TopicConfig{{Topic: consumer.TopicRequest, Queue: mockQ}}, - tt.configs, - ) - config, ok := registry.SubscriptionConfig(tt.lookupTopic, tt.lookupGroup) + registry, err := consumer.NewTopicRegistry(tt.configs) + require.NoError(t, err) + config, ok := registry.SubscriptionConfig(tt.lookupKey, tt.lookupGroup) if !tt.expectFound { assert.False(t, ok) @@ -123,47 +154,101 @@ func TestTopicRegistry_SubscriptionConfig(t *testing.T) { } } -func TestTopicRegistry_DuplicateConfig_LastWins(t *testing.T) { - ctrl := gomock.NewController(t) - mockQ := queuemock.NewMockQueue(ctrl) - - config1 := extqueue.DefaultSubscriptionConfig("request", "worker-1", "group-a") - config1.BatchSize = 10 - - config2 := extqueue.DefaultSubscriptionConfig("request", "worker-1", "group-a") - config2.BatchSize = 50 - - registry := consumer.NewTopicRegistry( - []consumer.TopicConfig{{Topic: consumer.TopicRequest, Queue: mockQ}}, - []extqueue.SubscriptionConfig{config1, config2}, - ) - - config, ok := registry.SubscriptionConfig(consumer.TopicRequest, "group-a") - require.True(t, ok) - assert.Equal(t, 50, config.BatchSize) -} - func TestTopicRegistry_Queue_PerTopic(t *testing.T) { ctrl := gomock.NewController(t) mockQ1 := queuemock.NewMockQueue(ctrl) mockQ2 := queuemock.NewMockQueue(ctrl) - registry := consumer.NewTopicRegistry( + registry, err := consumer.NewTopicRegistry( []consumer.TopicConfig{ - {Topic: consumer.TopicRequest, Queue: mockQ1}, - {Topic: consumer.TopicToBatch, Queue: mockQ2}, + {Key: consumer.TopicKeyRequest, Name: "request", Queue: mockQ1}, + {Key: consumer.TopicKeyToBatch, Name: "to-batch", Queue: mockQ2}, }, - nil, ) + require.NoError(t, err) - q1, ok := registry.Queue(consumer.TopicRequest) + q1, ok := registry.Queue(consumer.TopicKeyRequest) require.True(t, ok) assert.Equal(t, mockQ1, q1) - q2, ok := registry.Queue(consumer.TopicToBatch) + q2, ok := registry.Queue(consumer.TopicKeyToBatch) require.True(t, ok) assert.Equal(t, mockQ2, q2) - _, ok = registry.Queue(consumer.Topic("nonexistent")) + _, ok = registry.Queue(consumer.TopicKey("nonexistent")) assert.False(t, ok) } + +func TestTopicKey_String(t *testing.T) { + tests := []struct { + name string + key consumer.TopicKey + expected string + }{ + { + name: "predefined topic key", + key: consumer.TopicKeyRequest, + expected: "request", + }, + { + name: "custom topic key", + key: consumer.TopicKey("custom"), + expected: "custom", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert.Equal(t, tt.expected, tt.key.String()) + }) + } +} + +func TestTopicRegistry_TopicName(t *testing.T) { + ctrl := gomock.NewController(t) + mockQ := queuemock.NewMockQueue(ctrl) + + registry, err := consumer.NewTopicRegistry( + []consumer.TopicConfig{ + {Key: consumer.TopicKeyRequest, Name: "my-custom-request", Queue: mockQ}, + }, + ) + require.NoError(t, err) + + name, ok := registry.TopicName(consumer.TopicKeyRequest) + require.True(t, ok) + assert.Equal(t, "my-custom-request", name) + + _, ok = registry.TopicName(consumer.TopicKey("nonexistent")) + assert.False(t, ok) +} + +func TestValidateTopicName(t *testing.T) { + tests := []struct { + name string + topic string + wantErr bool + }{ + {name: "valid lowercase", topic: "mytopic", wantErr: false}, + {name: "valid with numbers", topic: "topic123", wantErr: false}, + {name: "valid with underscores", topic: "my_topic_name", wantErr: false}, + {name: "valid with hyphens", topic: "my-topic", wantErr: false}, + {name: "valid mixed", topic: "abc_123-xyz", wantErr: false}, + {name: "invalid empty", topic: "", wantErr: true}, + {name: "invalid uppercase", topic: "MyTopic", wantErr: true}, + {name: "invalid dot", topic: "my.topic", wantErr: true}, + {name: "invalid space", topic: "my topic", wantErr: true}, + {name: "invalid special chars", topic: "topic!@#", wantErr: true}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := consumer.ValidateTopicName(tt.topic) + if tt.wantErr { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } + }) + } +} diff --git a/example/server/gateway/BUILD.bazel b/example/server/gateway/BUILD.bazel index 801eae7f..73027d30 100644 --- a/example/server/gateway/BUILD.bazel +++ b/example/server/gateway/BUILD.bazel @@ -11,7 +11,6 @@ go_library( importpath = "github.com/uber/submitqueue/example/server/gateway", visibility = ["//visibility:private"], deps = [ - "//core/consumer", "//extension/counter/mysql", "//extension/queue/mysql", "//extension/storage/mysql", diff --git a/example/server/gateway/main.go b/example/server/gateway/main.go index fe8dc021..9a7c6dbf 100644 --- a/example/server/gateway/main.go +++ b/example/server/gateway/main.go @@ -13,7 +13,6 @@ import ( _ "github.com/go-sql-driver/mysql" "github.com/uber-go/tally/v4" - "github.com/uber/submitqueue/core/consumer" mysqlcounter "github.com/uber/submitqueue/extension/counter/mysql" queueMySQL "github.com/uber/submitqueue/extension/queue/mysql" "github.com/uber/submitqueue/extension/storage/mysql" @@ -142,7 +141,7 @@ func run() error { // Create controllers and wrap them for gRPC pingController := controller.NewPingController(logger, scope) - landController := controller.NewLandController(logger.Sugar(), scope, store, cnt, mysqlQueue.Publisher(), consumer.TopicRequest.String()) + landController := controller.NewLandController(logger.Sugar(), scope, store, cnt, mysqlQueue.Publisher(), "request") gatewayServer := &GatewayServer{ pingController: pingController, landController: landController, diff --git a/example/server/orchestrator/main.go b/example/server/orchestrator/main.go index 15f2950e..909e42fd 100644 --- a/example/server/orchestrator/main.go +++ b/example/server/orchestrator/main.go @@ -142,7 +142,10 @@ func run() error { subscriberName = fmt.Sprintf("orchestrator-%d", time.Now().Unix()) } - registry := newTopicRegistry(mysqlQueue, subscriberName) + registry, err := newTopicRegistry(mysqlQueue, subscriberName) + if err != nil { + return fmt.Errorf("failed to create topic registry: %w", err) + } // Create consumer c := consumer.New(logger.Sugar(), scope.SubScope("consumer"), registry) @@ -213,61 +216,73 @@ func run() error { } // newTopicRegistry builds the TopicRegistry with all topic and subscription configs. -func newTopicRegistry(q extqueue.Queue, subscriberName string) consumer.TopicRegistry { - return consumer.NewTopicRegistry( - []consumer.TopicConfig{ - {Topic: consumer.TopicRequest, Queue: q}, - {Topic: consumer.TopicToBatch, Queue: q}, - {Topic: consumer.TopicBatched, Queue: q}, - {Topic: consumer.TopicBuild, Queue: q}, - {Topic: consumer.TopicBuildSignal, Queue: q}, - {Topic: consumer.TopicToMerge, Queue: q}, - {Topic: consumer.TopicMergeSignal, Queue: q}, - {Topic: consumer.TopicFinalize, Queue: q}, - }, - []extqueue.SubscriptionConfig{ - extqueue.DefaultSubscriptionConfig( - consumer.TopicRequest.String(), - subscriberName, - "orchestrator-request", +func newTopicRegistry(q extqueue.Queue, subscriberName string) (consumer.TopicRegistry, error) { + return consumer.NewTopicRegistry([]consumer.TopicConfig{ + { + Key: consumer.TopicKeyRequest, + Name: "request", + Queue: q, + Subscription: extqueue.DefaultSubscriptionConfig( + subscriberName, "orchestrator-request", ), - extqueue.DefaultSubscriptionConfig( - consumer.TopicToBatch.String(), - subscriberName, - "orchestrator-batch", + }, + { + Key: consumer.TopicKeyToBatch, + Name: "to-batch", + Queue: q, + Subscription: extqueue.DefaultSubscriptionConfig( + subscriberName, "orchestrator-batch", ), - extqueue.DefaultSubscriptionConfig( - consumer.TopicBatched.String(), - subscriberName, - "orchestrator-speculate", + }, + { + Key: consumer.TopicKeyBatched, + Name: "batched", + Queue: q, + Subscription: extqueue.DefaultSubscriptionConfig( + subscriberName, "orchestrator-speculate", ), - extqueue.DefaultSubscriptionConfig( - consumer.TopicBuild.String(), - subscriberName, - "orchestrator-build", + }, + { + Key: consumer.TopicKeyBuild, + Name: "build", + Queue: q, + Subscription: extqueue.DefaultSubscriptionConfig( + subscriberName, "orchestrator-build", ), - extqueue.DefaultSubscriptionConfig( - consumer.TopicBuildSignal.String(), - subscriberName, - "orchestrator-buildsignal", + }, + { + Key: consumer.TopicKeyBuildSignal, + Name: "build-signal", + Queue: q, + Subscription: extqueue.DefaultSubscriptionConfig( + subscriberName, "orchestrator-buildsignal", ), - extqueue.DefaultSubscriptionConfig( - consumer.TopicToMerge.String(), - subscriberName, - "orchestrator-merge", + }, + { + Key: consumer.TopicKeyToMerge, + Name: "to-merge", + Queue: q, + Subscription: extqueue.DefaultSubscriptionConfig( + subscriberName, "orchestrator-merge", ), - extqueue.DefaultSubscriptionConfig( - consumer.TopicMergeSignal.String(), - subscriberName, - "orchestrator-mergesignal", + }, + { + Key: consumer.TopicKeyMergeSignal, + Name: "merge-signal", + Queue: q, + Subscription: extqueue.DefaultSubscriptionConfig( + subscriberName, "orchestrator-mergesignal", ), - extqueue.DefaultSubscriptionConfig( - consumer.TopicFinalize.String(), - subscriberName, - "orchestrator-finalize", + }, + { + Key: consumer.TopicKeyFinalize, + Name: "finalize", + Queue: q, + Subscription: extqueue.DefaultSubscriptionConfig( + subscriberName, "orchestrator-finalize", ), }, - ) + }) } // registerControllers creates all pipeline controllers and registers them with the consumer. @@ -282,7 +297,7 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t scope, registry, mc, - consumer.TopicRequest, + consumer.TopicKeyRequest, "orchestrator-request", ) if err := c.Register(requestController); err != nil { @@ -294,7 +309,7 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t scope, registry, cnt, - consumer.TopicToBatch, + consumer.TopicKeyToBatch, "orchestrator-batch", ) if err := c.Register(batchController); err != nil { @@ -305,7 +320,7 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t logger, scope, registry, - consumer.TopicBatched, + consumer.TopicKeyBatched, "orchestrator-speculate", ) if err := c.Register(speculateController); err != nil { @@ -316,7 +331,7 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t logger, scope, registry, - consumer.TopicBuild, + consumer.TopicKeyBuild, "orchestrator-build", ) if err := c.Register(buildController); err != nil { @@ -327,7 +342,7 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t logger, scope, registry, - consumer.TopicBuildSignal, + consumer.TopicKeyBuildSignal, "orchestrator-buildsignal", ) if err := c.Register(buildSignalController); err != nil { @@ -338,7 +353,7 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t logger, scope, registry, - consumer.TopicToMerge, + consumer.TopicKeyToMerge, "orchestrator-merge", ) if err := c.Register(mergeController); err != nil { @@ -349,7 +364,7 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t logger, scope, registry, - consumer.TopicMergeSignal, + consumer.TopicKeyMergeSignal, "orchestrator-mergesignal", ) if err := c.Register(mergeSignalController); err != nil { @@ -360,7 +375,7 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t logger, scope, registry, - consumer.TopicFinalize, + consumer.TopicKeyFinalize, "orchestrator-finalize", ) if err := c.Register(finalizeController); err != nil { diff --git a/extension/queue/mysql/BUILD.bazel b/extension/queue/mysql/BUILD.bazel index e96e51cd..9fd34cf9 100644 --- a/extension/queue/mysql/BUILD.bazel +++ b/extension/queue/mysql/BUILD.bazel @@ -13,7 +13,6 @@ go_library( "sql.go", "stores.go", "subscriber.go", - "validation.go", ], importpath = "github.com/uber/submitqueue/extension/queue/mysql", visibility = ["//visibility:public"], @@ -38,7 +37,6 @@ go_test( ], embed = [":mysql"], deps = [ - "//core/consumer", "//entity/queue", "//extension/queue", "@com_github_data_dog_go_sqlmock//:go-sqlmock", diff --git a/extension/queue/mysql/publisher.go b/extension/queue/mysql/publisher.go index ce3a24b0..973e7498 100644 --- a/extension/queue/mysql/publisher.go +++ b/extension/queue/mysql/publisher.go @@ -40,13 +40,6 @@ func (p *publisher) Publish(ctx context.Context, topic string, message queue.Mes return fmt.Errorf("publisher is closed") } - // Validate topic name (SQL-safe) - if err := validateTopicName(topic); err != nil { - p.logger.Errorw("publish failure: invalid topic name", "topic", topic, "error", err) - p.metrics.Tagged(map[string]string{"topic": topic}).Counter("publish_errors").Inc(1) - return fmt.Errorf("publish invalid topic name: %w", err) - } - if err := p.messageStore.Insert(ctx, topic, []queue.Message{message}); err != nil { p.metrics.Tagged(map[string]string{"topic": topic}).Counter("publish_errors").Inc(1) p.logger.Errorw("publish failure: message store insert error", "topic", topic, "error", err) diff --git a/extension/queue/mysql/publisher_test.go b/extension/queue/mysql/publisher_test.go index 477be28b..deb9e799 100644 --- a/extension/queue/mysql/publisher_test.go +++ b/extension/queue/mysql/publisher_test.go @@ -10,7 +10,6 @@ import ( "go.uber.org/mock/gomock" "go.uber.org/zap/zaptest" - "github.com/uber/submitqueue/core/consumer" "github.com/uber/submitqueue/entity/queue" extqueue "github.com/uber/submitqueue/extension/queue" // mocks in same package @@ -86,17 +85,6 @@ func TestPublisher_Publish(t *testing.T) { m.EXPECT().Insert(gomock.Any(), "metadata_topic", gomock.Any()).Return(nil).Times(1) }, }, - { - name: "publish with invalid topic name - uppercase", - topic: "InvalidTopic", - messages: []queue.Message{ - {ID: "msg1", Payload: []byte("p"), PartitionKey: "part1", PublishedAt: fixedTimestamp}, - }, - wantErr: true, - setupMock: func(m *MockmessageStore) { - // No Insert expected since validation fails - }, - }, { name: "publish with valid topic name - hyphens", topic: "topic-with-dash", @@ -108,17 +96,6 @@ func TestPublisher_Publish(t *testing.T) { m.EXPECT().Insert(gomock.Any(), "topic-with-dash", gomock.Any()).Return(nil).Times(1) }, }, - { - name: "publish with invalid topic name - empty", - topic: "", - messages: []queue.Message{ - {ID: "msg1", Payload: []byte("p"), PartitionKey: "part1", PublishedAt: fixedTimestamp}, - }, - wantErr: true, - setupMock: func(m *MockmessageStore) { - // No Insert expected since validation fails - }, - }, } for _, tt := range tests { @@ -183,104 +160,6 @@ func TestPublisher_Close(t *testing.T) { require.NoError(t, err) } -func TestValidateTopicName(t *testing.T) { - tests := []struct { - name string - topicName string - wantErr bool - }{ - { - name: "valid topic - lowercase letters", - topicName: "mytopic", - wantErr: false, - }, - { - name: "valid topic - with numbers", - topicName: "topic123", - wantErr: false, - }, - { - name: "valid topic - with underscores", - topicName: "my_topic_name", - wantErr: false, - }, - { - name: "valid topic - all valid chars", - topicName: "abc_123_xyz", - wantErr: false, - }, - { - name: "invalid topic - empty", - topicName: "", - wantErr: true, - }, - { - name: "invalid topic - uppercase", - topicName: "MyTopic", - wantErr: true, - }, - { - name: "valid topic - with hyphens", - topicName: "my-topic", - wantErr: false, - }, - { - name: "invalid topic - dot", - topicName: "my.topic", - wantErr: true, - }, - { - name: "invalid topic - space", - topicName: "my topic", - wantErr: true, - }, - { - name: "invalid topic - special chars", - topicName: "topic!@#", - wantErr: true, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - ctrl := gomock.NewController(t) - defer ctrl.Finish() - - mockStore := NewMockmessageStore(ctrl) - pub := setupPublisherTest(t, mockStore) - - // Try to publish with this topic name - ctx := context.Background() - msg := queue.NewMessage("msg1", []byte("test"), "part1", nil) - - if !tt.wantErr { - mockStore.EXPECT().Insert(gomock.Any(), tt.topicName, gomock.Any()).Return(nil).Times(1) - } - - err := pub.Publish(ctx, tt.topicName, msg) - if tt.wantErr { - require.Error(t, err) - } else { - require.NoError(t, err) - } - }) - } -} - -// TestAllConsumerTopicsPassValidation ensures every topic defined in consumer.AllTopics -// passes MySQL topic name validation. This test will catch mismatches automatically -// when new topics are added. -func TestAllConsumerTopicsPassValidation(t *testing.T) { - require.NotEmpty(t, consumer.AllTopics, "AllTopics must not be empty") - - for _, topic := range consumer.AllTopics { - t.Run(string(topic), func(t *testing.T) { - err := validateTopicName(string(topic)) - require.NoError(t, err, "consumer topic %q must pass MySQL topic name validation", topic) - }) - } -} - func TestPublisher_PublishMetrics(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() diff --git a/extension/queue/mysql/subscriber.go b/extension/queue/mysql/subscriber.go index 576a22b5..67bbfe2a 100644 --- a/extension/queue/mysql/subscriber.go +++ b/extension/queue/mysql/subscriber.go @@ -273,12 +273,6 @@ func (s *subscriber) Subscribe(ctx context.Context, topic string, config extqueu return nil, fmt.Errorf("subscriber is closed") } - // Validate topic name - if err := validateTopicName(topic); err != nil { - s.logger.Errorw("subscribe failed: invalid topic name", "topic", topic, "error", err) - return nil, fmt.Errorf("subscribe failure: invalid topic name. err: %w", err) - } - // Create subscription key (topic + consumer group must be unique) subKey := topic + ":" + config.ConsumerGroup diff --git a/extension/queue/mysql/subscriber_test.go b/extension/queue/mysql/subscriber_test.go index 440f39ce..a7fa2ba4 100644 --- a/extension/queue/mysql/subscriber_test.go +++ b/extension/queue/mysql/subscriber_test.go @@ -16,7 +16,7 @@ import ( ) func testSubscriptionConfig() extqueue.SubscriptionConfig { - return extqueue.DefaultSubscriptionConfig("test-topic", "test-subscriber", "test-consumer") + return extqueue.DefaultSubscriptionConfig("test-subscriber", "test-consumer") } func setupSubscriberTest(t *testing.T, mockMessageStore *MockmessageStore, mockOffsetStore *MockoffsetStore, mockLeaseStore *MockpartitionLeaseStore) extqueue.Subscriber { diff --git a/extension/queue/mysql/validation.go b/extension/queue/mysql/validation.go deleted file mode 100644 index 3c2f71a9..00000000 --- a/extension/queue/mysql/validation.go +++ /dev/null @@ -1,20 +0,0 @@ -package mysql - -import "fmt" - -// validateTopicName ensures topic name is safe for use as a SQL column value -func validateTopicName(topic string) error { - if topic == "" { - return fmt.Errorf("topic name cannot be empty") - } - if len(topic) > 255 { - return fmt.Errorf("topic name too long (max 255 characters)") - } - // Only allow lowercase letters, numbers, underscores, and hyphens - for _, c := range topic { - if !((c >= 'a' && c <= 'z') || (c >= '0' && c <= '9') || c == '_' || c == '-') { - return fmt.Errorf("topic name must contain only lowercase letters, numbers, underscores, and hyphens") - } - } - return nil -} diff --git a/extension/queue/subscription_config.go b/extension/queue/subscription_config.go index 9185aec0..89651e1e 100644 --- a/extension/queue/subscription_config.go +++ b/extension/queue/subscription_config.go @@ -4,9 +4,6 @@ package queue // Each subscription (topic) can have its own settings for polling, // batching, retries, and dead letter queue behavior. type SubscriptionConfig struct { - // Topic is the queue topic name to subscribe to. - Topic string - // SubscriberName uniquely identifies this subscriber instance for partition leases. // Different workers should use different names (e.g., hostname, pod name, UUID). // Combined with ConsumerGroup, this determines which worker owns a partition lease. @@ -68,9 +65,8 @@ type DLQConfig struct { } // DefaultSubscriptionConfig returns a SubscriptionConfig with sensible defaults. -func DefaultSubscriptionConfig(topic, subscriberName, consumerGroup string) SubscriptionConfig { +func DefaultSubscriptionConfig(subscriberName, consumerGroup string) SubscriptionConfig { return SubscriptionConfig{ - Topic: topic, SubscriberName: subscriberName, ConsumerGroup: consumerGroup, PollIntervalMs: 100, // 100ms diff --git a/extension/queue/subscription_config_test.go b/extension/queue/subscription_config_test.go index 259ceca0..b539ab69 100644 --- a/extension/queue/subscription_config_test.go +++ b/extension/queue/subscription_config_test.go @@ -9,8 +9,8 @@ import ( func TestSubscriptionConfig_FieldsAreIndependent(t *testing.T) { // Create two configs and modify one to ensure they're independent - config1 := DefaultSubscriptionConfig("topic-1", "worker-1", "consumer-1") - config2 := DefaultSubscriptionConfig("topic-2", "worker-2", "consumer-2") + config1 := DefaultSubscriptionConfig("worker-1", "consumer-1") + config2 := DefaultSubscriptionConfig("worker-2", "consumer-2") // Modify config1 config1.PollIntervalMs = 500 @@ -28,7 +28,7 @@ func TestSubscriptionConfig_FieldsAreIndependent(t *testing.T) { } func TestSubscriptionConfig_CustomValues(t *testing.T) { - config := DefaultSubscriptionConfig("my-topic", "my-worker", "my-consumer") + config := DefaultSubscriptionConfig("my-worker", "my-consumer") // Override with custom values (in milliseconds) config.PollIntervalMs = 200 @@ -62,19 +62,17 @@ func TestSubscriptionConfig_CustomValues(t *testing.T) { func TestSubscriptionConfig_DifferentConsumerGroups(t *testing.T) { // Test that different consumer groups get independent configs tests := []struct { - topic string subscriberName string consumerGroup string }{ - {"topic-A", "worker-1", "group-A"}, - {"topic-B", "worker-1", "group-B"}, - {"topic-A", "worker-2", "group-A"}, + {"worker-1", "group-A"}, + {"worker-1", "group-B"}, + {"worker-2", "group-A"}, } for _, tt := range tests { - t.Run(tt.topic+"_"+tt.subscriberName+"_"+tt.consumerGroup, func(t *testing.T) { - config := DefaultSubscriptionConfig(tt.topic, tt.subscriberName, tt.consumerGroup) - require.Equal(t, tt.topic, config.Topic) + t.Run(tt.subscriberName+"_"+tt.consumerGroup, func(t *testing.T) { + config := DefaultSubscriptionConfig(tt.subscriberName, tt.consumerGroup) require.Equal(t, tt.subscriberName, config.SubscriberName) require.Equal(t, tt.consumerGroup, config.ConsumerGroup) }) diff --git a/orchestrator/controller/batch/batch.go b/orchestrator/controller/batch/batch.go index 8705fa48..d880d6b5 100644 --- a/orchestrator/controller/batch/batch.go +++ b/orchestrator/controller/batch/batch.go @@ -20,7 +20,7 @@ type Controller struct { metricsScope tally.Scope registry consumer.TopicRegistry counter counter.Counter - topic consumer.Topic + topicKey consumer.TopicKey consumerGroup string } @@ -33,7 +33,7 @@ func NewController( scope tally.Scope, registry consumer.TopicRegistry, counter counter.Counter, - topic consumer.Topic, + topicKey consumer.TopicKey, consumerGroup string, ) *Controller { return &Controller{ @@ -41,7 +41,7 @@ func NewController( metricsScope: scope.SubScope("batch_controller"), registry: registry, counter: counter, - topic: topic, + topicKey: topicKey, consumerGroup: consumerGroup, } } @@ -110,10 +110,10 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er // - Add to batch dependent DB // Publish to speculate topic - if err := c.publish(ctx, consumer.TopicBatched, request); err != nil { + if err := c.publish(ctx, consumer.TopicKeyBatched, request); err != nil { c.logger.Errorw("failed to publish output", "request_id", request.ID, - "topic", consumer.TopicBatched, + "topic_key", consumer.TopicKeyBatched, "error", err, ) c.metricsScope.Counter("publish_errors").Inc(1) @@ -122,7 +122,7 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er c.logger.Infow("published request to next stage", "request_id", request.ID, - "topic", consumer.TopicBatched, + "topic_key", consumer.TopicKeyBatched, ) c.metricsScope.Counter("processed").Inc(1) @@ -130,8 +130,8 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er return nil // Success - message will be acked } -// publish publishes a request to the specified topic. -func (c *Controller) publish(ctx context.Context, topic consumer.Topic, request entity.Request) error { +// publish publishes a request to the specified topic key. +func (c *Controller) publish(ctx context.Context, key consumer.TopicKey, request entity.Request) error { payload, err := request.ToBytes() if err != nil { return fmt.Errorf("failed to serialize request: %w", err) @@ -139,12 +139,17 @@ func (c *Controller) publish(ctx context.Context, topic consumer.Topic, request msg := entityqueue.NewMessage(request.ID, payload, request.Queue, nil) - q, ok := c.registry.Queue(topic) + q, ok := c.registry.Queue(key) if !ok { - return fmt.Errorf("no queue registered for topic %s", topic) + return fmt.Errorf("no queue registered for topic key %s", key) } - if err := q.Publisher().Publish(ctx, topic.String(), msg); err != nil { + topicName, ok := c.registry.TopicName(key) + if !ok { + return fmt.Errorf("no topic name registered for topic key %s", key) + } + + if err := q.Publisher().Publish(ctx, topicName, msg); err != nil { return fmt.Errorf("failed to publish message: %w", err) } @@ -156,9 +161,9 @@ func (c *Controller) Name() string { return "batch" } -// Topic returns the topic this controller subscribes to. -func (c *Controller) Topic() consumer.Topic { - return c.topic +// TopicKey returns the topic key this controller subscribes to. +func (c *Controller) TopicKey() consumer.TopicKey { + return c.topicKey } // ConsumerGroup returns the consumer group for offset tracking. diff --git a/orchestrator/controller/batch/batch_test.go b/orchestrator/controller/batch/batch_test.go index 17d977e3..e55abde4 100644 --- a/orchestrator/controller/batch/batch_test.go +++ b/orchestrator/controller/batch/batch_test.go @@ -51,12 +51,12 @@ func newTestController(t *testing.T, ctrl *gomock.Controller, cnt *mockCounter, mockQ := queuemock.NewMockQueue(ctrl) mockQ.EXPECT().Publisher().Return(mockPub).AnyTimes() - registry := consumer.NewTopicRegistry( - []consumer.TopicConfig{{Topic: consumer.TopicBatched, Queue: mockQ}}, - nil, + registry, err := consumer.NewTopicRegistry( + []consumer.TopicConfig{{Key: consumer.TopicKeyBatched, Name: "batched", Queue: mockQ}}, ) + require.NoError(t, err) - return NewController(logger, scope, registry, cnt, consumer.TopicToBatch, "orchestrator-batch") + return NewController(logger, scope, registry, cnt, consumer.TopicKeyToBatch, "orchestrator-batch") } func TestNewController(t *testing.T) { @@ -64,7 +64,7 @@ func TestNewController(t *testing.T) { controller := newTestController(t, ctrl, newSequentialCounter(), nil) require.NotNil(t, controller) - assert.Equal(t, consumer.TopicToBatch, controller.Topic()) + assert.Equal(t, consumer.TopicKeyToBatch, controller.TopicKey()) assert.Equal(t, "orchestrator-batch", controller.ConsumerGroup()) assert.Equal(t, "batch", controller.Name()) } diff --git a/orchestrator/controller/build/build.go b/orchestrator/controller/build/build.go index eb923e3a..8cd68033 100644 --- a/orchestrator/controller/build/build.go +++ b/orchestrator/controller/build/build.go @@ -18,7 +18,7 @@ type Controller struct { logger *zap.SugaredLogger metricsScope tally.Scope registry consumer.TopicRegistry - topic consumer.Topic + topicKey consumer.TopicKey consumerGroup string } @@ -30,14 +30,14 @@ func NewController( logger *zap.SugaredLogger, scope tally.Scope, registry consumer.TopicRegistry, - topic consumer.Topic, + topicKey consumer.TopicKey, consumerGroup string, ) *Controller { return &Controller{ logger: logger.Named("build_controller"), metricsScope: scope.SubScope("build_controller"), registry: registry, - topic: topic, + topicKey: topicKey, consumerGroup: consumerGroup, } } @@ -78,10 +78,10 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er // - Track build status // Publish to build signal topic - if err := c.publish(ctx, consumer.TopicBuildSignal, request); err != nil { + if err := c.publish(ctx, consumer.TopicKeyBuildSignal, request); err != nil { c.logger.Errorw("failed to publish output", "request_id", request.ID, - "topic", consumer.TopicBuildSignal, + "topic_key", consumer.TopicKeyBuildSignal, "error", err, ) c.metricsScope.Counter("publish_errors").Inc(1) @@ -90,7 +90,7 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er c.logger.Infow("published request to next stage", "request_id", request.ID, - "topic", consumer.TopicBuildSignal, + "topic_key", consumer.TopicKeyBuildSignal, ) c.metricsScope.Counter("processed").Inc(1) @@ -98,8 +98,8 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er return nil // Success - message will be acked } -// publish publishes a request to the specified topic. -func (c *Controller) publish(ctx context.Context, topic consumer.Topic, request entity.Request) error { +// publish publishes a request to the specified topic key. +func (c *Controller) publish(ctx context.Context, key consumer.TopicKey, request entity.Request) error { payload, err := request.ToBytes() if err != nil { return fmt.Errorf("failed to serialize request: %w", err) @@ -107,12 +107,17 @@ func (c *Controller) publish(ctx context.Context, topic consumer.Topic, request msg := entityqueue.NewMessage(request.ID, payload, request.Queue, nil) - q, ok := c.registry.Queue(topic) + q, ok := c.registry.Queue(key) if !ok { - return fmt.Errorf("no queue registered for topic %s", topic) + return fmt.Errorf("no queue registered for topic key %s", key) } - if err := q.Publisher().Publish(ctx, topic.String(), msg); err != nil { + topicName, ok := c.registry.TopicName(key) + if !ok { + return fmt.Errorf("no topic name registered for topic key %s", key) + } + + if err := q.Publisher().Publish(ctx, topicName, msg); err != nil { return fmt.Errorf("failed to publish message: %w", err) } @@ -124,9 +129,9 @@ func (c *Controller) Name() string { return "build" } -// Topic returns the topic this controller subscribes to. -func (c *Controller) Topic() consumer.Topic { - return c.topic +// TopicKey returns the topic key this controller subscribes to. +func (c *Controller) TopicKey() consumer.TopicKey { + return c.topicKey } // ConsumerGroup returns the consumer group for offset tracking. diff --git a/orchestrator/controller/build/build_test.go b/orchestrator/controller/build/build_test.go index 97fbd67e..ef19061a 100644 --- a/orchestrator/controller/build/build_test.go +++ b/orchestrator/controller/build/build_test.go @@ -31,12 +31,12 @@ func newTestController(t *testing.T, ctrl *gomock.Controller, publishErr error) mockQ := queuemock.NewMockQueue(ctrl) mockQ.EXPECT().Publisher().Return(mockPub).AnyTimes() - registry := consumer.NewTopicRegistry( - []consumer.TopicConfig{{Topic: consumer.TopicBuildSignal, Queue: mockQ}}, - nil, + registry, err := consumer.NewTopicRegistry( + []consumer.TopicConfig{{Key: consumer.TopicKeyBuildSignal, Name: "build-signal", Queue: mockQ}}, ) + require.NoError(t, err) - return NewController(logger, scope, registry, consumer.TopicBuild, "orchestrator-build") + return NewController(logger, scope, registry, consumer.TopicKeyBuild, "orchestrator-build") } func TestNewController(t *testing.T) { @@ -44,7 +44,7 @@ func TestNewController(t *testing.T) { controller := newTestController(t, ctrl, nil) require.NotNil(t, controller) - assert.Equal(t, consumer.TopicBuild, controller.Topic()) + assert.Equal(t, consumer.TopicKeyBuild, controller.TopicKey()) assert.Equal(t, "orchestrator-build", controller.ConsumerGroup()) assert.Equal(t, "build", controller.Name()) } diff --git a/orchestrator/controller/buildsignal/buildsignal.go b/orchestrator/controller/buildsignal/buildsignal.go index 326104c3..030e4579 100644 --- a/orchestrator/controller/buildsignal/buildsignal.go +++ b/orchestrator/controller/buildsignal/buildsignal.go @@ -17,7 +17,7 @@ type Controller struct { logger *zap.SugaredLogger metricsScope tally.Scope registry consumer.TopicRegistry - topic consumer.Topic + topicKey consumer.TopicKey consumerGroup string } @@ -29,14 +29,14 @@ func NewController( logger *zap.SugaredLogger, scope tally.Scope, registry consumer.TopicRegistry, - topic consumer.Topic, + topicKey consumer.TopicKey, consumerGroup string, ) *Controller { return &Controller{ logger: logger.Named("buildsignal_controller"), metricsScope: scope.SubScope("buildsignal_controller"), registry: registry, - topic: topic, + topicKey: topicKey, consumerGroup: consumerGroup, } } @@ -86,9 +86,9 @@ func (c *Controller) Name() string { return "buildsignal" } -// Topic returns the topic this controller subscribes to. -func (c *Controller) Topic() consumer.Topic { - return c.topic +// TopicKey returns the topic key this controller subscribes to. +func (c *Controller) TopicKey() consumer.TopicKey { + return c.topicKey } // ConsumerGroup returns the consumer group for offset tracking. diff --git a/orchestrator/controller/buildsignal/buildsignal_test.go b/orchestrator/controller/buildsignal/buildsignal_test.go index b8f4cc6b..e2368b67 100644 --- a/orchestrator/controller/buildsignal/buildsignal_test.go +++ b/orchestrator/controller/buildsignal/buildsignal_test.go @@ -20,16 +20,17 @@ func newTestController(t *testing.T) *Controller { logger := zaptest.NewLogger(t).Sugar() scope := tally.NoopScope - registry := consumer.NewTopicRegistry(nil, nil) + registry, err := consumer.NewTopicRegistry(nil) + require.NoError(t, err) - return NewController(logger, scope, registry, consumer.TopicBuildSignal, "orchestrator-buildsignal") + return NewController(logger, scope, registry, consumer.TopicKeyBuildSignal, "orchestrator-buildsignal") } func TestNewController(t *testing.T) { controller := newTestController(t) require.NotNil(t, controller) - assert.Equal(t, consumer.TopicBuildSignal, controller.Topic()) + assert.Equal(t, consumer.TopicKeyBuildSignal, controller.TopicKey()) assert.Equal(t, "orchestrator-buildsignal", controller.ConsumerGroup()) assert.Equal(t, "buildsignal", controller.Name()) } diff --git a/orchestrator/controller/finalize/finalize.go b/orchestrator/controller/finalize/finalize.go index aa1c6da1..17ce77a6 100644 --- a/orchestrator/controller/finalize/finalize.go +++ b/orchestrator/controller/finalize/finalize.go @@ -17,7 +17,7 @@ type Controller struct { logger *zap.SugaredLogger metricsScope tally.Scope registry consumer.TopicRegistry - topic consumer.Topic + topicKey consumer.TopicKey consumerGroup string } @@ -29,14 +29,14 @@ func NewController( logger *zap.SugaredLogger, scope tally.Scope, registry consumer.TopicRegistry, - topic consumer.Topic, + topicKey consumer.TopicKey, consumerGroup string, ) *Controller { return &Controller{ logger: logger.Named("finalize_controller"), metricsScope: scope.SubScope("finalize_controller"), registry: registry, - topic: topic, + topicKey: topicKey, consumerGroup: consumerGroup, } } @@ -87,9 +87,9 @@ func (c *Controller) Name() string { return "finalize" } -// Topic returns the topic this controller subscribes to. -func (c *Controller) Topic() consumer.Topic { - return c.topic +// TopicKey returns the topic key this controller subscribes to. +func (c *Controller) TopicKey() consumer.TopicKey { + return c.topicKey } // ConsumerGroup returns the consumer group for offset tracking. diff --git a/orchestrator/controller/finalize/finalize_test.go b/orchestrator/controller/finalize/finalize_test.go index 1f2fe7a7..2bc6b81a 100644 --- a/orchestrator/controller/finalize/finalize_test.go +++ b/orchestrator/controller/finalize/finalize_test.go @@ -20,16 +20,17 @@ func newTestController(t *testing.T) *Controller { logger := zaptest.NewLogger(t).Sugar() scope := tally.NoopScope - registry := consumer.NewTopicRegistry(nil, nil) + registry, err := consumer.NewTopicRegistry(nil) + require.NoError(t, err) - return NewController(logger, scope, registry, consumer.TopicFinalize, "orchestrator-finalize") + return NewController(logger, scope, registry, consumer.TopicKeyFinalize, "orchestrator-finalize") } func TestNewController(t *testing.T) { controller := newTestController(t) require.NotNil(t, controller) - assert.Equal(t, consumer.TopicFinalize, controller.Topic()) + assert.Equal(t, consumer.TopicKeyFinalize, controller.TopicKey()) assert.Equal(t, "orchestrator-finalize", controller.ConsumerGroup()) assert.Equal(t, "finalize", controller.Name()) } diff --git a/orchestrator/controller/merge/merge.go b/orchestrator/controller/merge/merge.go index 968c89dd..0f9b5391 100644 --- a/orchestrator/controller/merge/merge.go +++ b/orchestrator/controller/merge/merge.go @@ -18,7 +18,7 @@ type Controller struct { logger *zap.SugaredLogger metricsScope tally.Scope registry consumer.TopicRegistry - topic consumer.Topic + topicKey consumer.TopicKey consumerGroup string } @@ -30,14 +30,14 @@ func NewController( logger *zap.SugaredLogger, scope tally.Scope, registry consumer.TopicRegistry, - topic consumer.Topic, + topicKey consumer.TopicKey, consumerGroup string, ) *Controller { return &Controller{ logger: logger.Named("merge_controller"), metricsScope: scope.SubScope("merge_controller"), registry: registry, - topic: topic, + topicKey: topicKey, consumerGroup: consumerGroup, } } @@ -78,10 +78,10 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er // - Handle merge conflicts // Publish to merge signal topic - if err := c.publish(ctx, consumer.TopicMergeSignal, request); err != nil { + if err := c.publish(ctx, consumer.TopicKeyMergeSignal, request); err != nil { c.logger.Errorw("failed to publish output", "request_id", request.ID, - "topic", consumer.TopicMergeSignal, + "topic_key", consumer.TopicKeyMergeSignal, "error", err, ) c.metricsScope.Counter("publish_errors").Inc(1) @@ -90,7 +90,7 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er c.logger.Infow("published request to next stage", "request_id", request.ID, - "topic", consumer.TopicMergeSignal, + "topic_key", consumer.TopicKeyMergeSignal, ) c.metricsScope.Counter("processed").Inc(1) @@ -98,8 +98,8 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er return nil // Success - message will be acked } -// publish publishes a request to the specified topic. -func (c *Controller) publish(ctx context.Context, topic consumer.Topic, request entity.Request) error { +// publish publishes a request to the specified topic key. +func (c *Controller) publish(ctx context.Context, key consumer.TopicKey, request entity.Request) error { payload, err := request.ToBytes() if err != nil { return fmt.Errorf("failed to serialize request: %w", err) @@ -107,12 +107,17 @@ func (c *Controller) publish(ctx context.Context, topic consumer.Topic, request msg := entityqueue.NewMessage(request.ID, payload, request.Queue, nil) - q, ok := c.registry.Queue(topic) + q, ok := c.registry.Queue(key) if !ok { - return fmt.Errorf("no queue registered for topic %s", topic) + return fmt.Errorf("no queue registered for topic key %s", key) } - if err := q.Publisher().Publish(ctx, topic.String(), msg); err != nil { + topicName, ok := c.registry.TopicName(key) + if !ok { + return fmt.Errorf("no topic name registered for topic key %s", key) + } + + if err := q.Publisher().Publish(ctx, topicName, msg); err != nil { return fmt.Errorf("failed to publish message: %w", err) } @@ -124,9 +129,9 @@ func (c *Controller) Name() string { return "merge" } -// Topic returns the topic this controller subscribes to. -func (c *Controller) Topic() consumer.Topic { - return c.topic +// TopicKey returns the topic key this controller subscribes to. +func (c *Controller) TopicKey() consumer.TopicKey { + return c.topicKey } // ConsumerGroup returns the consumer group for offset tracking. diff --git a/orchestrator/controller/merge/merge_test.go b/orchestrator/controller/merge/merge_test.go index 1ff56e54..625d9d54 100644 --- a/orchestrator/controller/merge/merge_test.go +++ b/orchestrator/controller/merge/merge_test.go @@ -31,12 +31,12 @@ func newTestController(t *testing.T, ctrl *gomock.Controller, publishErr error) mockQ := queuemock.NewMockQueue(ctrl) mockQ.EXPECT().Publisher().Return(mockPub).AnyTimes() - registry := consumer.NewTopicRegistry( - []consumer.TopicConfig{{Topic: consumer.TopicMergeSignal, Queue: mockQ}}, - nil, + registry, err := consumer.NewTopicRegistry( + []consumer.TopicConfig{{Key: consumer.TopicKeyMergeSignal, Name: "merge-signal", Queue: mockQ}}, ) + require.NoError(t, err) - return NewController(logger, scope, registry, consumer.TopicToMerge, "orchestrator-merge") + return NewController(logger, scope, registry, consumer.TopicKeyToMerge, "orchestrator-merge") } func TestNewController(t *testing.T) { @@ -44,7 +44,7 @@ func TestNewController(t *testing.T) { controller := newTestController(t, ctrl, nil) require.NotNil(t, controller) - assert.Equal(t, consumer.TopicToMerge, controller.Topic()) + assert.Equal(t, consumer.TopicKeyToMerge, controller.TopicKey()) assert.Equal(t, "orchestrator-merge", controller.ConsumerGroup()) assert.Equal(t, "merge", controller.Name()) } diff --git a/orchestrator/controller/mergesignal/mergesignal.go b/orchestrator/controller/mergesignal/mergesignal.go index 034cea37..f677de5f 100644 --- a/orchestrator/controller/mergesignal/mergesignal.go +++ b/orchestrator/controller/mergesignal/mergesignal.go @@ -17,7 +17,7 @@ type Controller struct { logger *zap.SugaredLogger metricsScope tally.Scope registry consumer.TopicRegistry - topic consumer.Topic + topicKey consumer.TopicKey consumerGroup string } @@ -29,14 +29,14 @@ func NewController( logger *zap.SugaredLogger, scope tally.Scope, registry consumer.TopicRegistry, - topic consumer.Topic, + topicKey consumer.TopicKey, consumerGroup string, ) *Controller { return &Controller{ logger: logger.Named("mergesignal_controller"), metricsScope: scope.SubScope("mergesignal_controller"), registry: registry, - topic: topic, + topicKey: topicKey, consumerGroup: consumerGroup, } } @@ -86,9 +86,9 @@ func (c *Controller) Name() string { return "mergesignal" } -// Topic returns the topic this controller subscribes to. -func (c *Controller) Topic() consumer.Topic { - return c.topic +// TopicKey returns the topic key this controller subscribes to. +func (c *Controller) TopicKey() consumer.TopicKey { + return c.topicKey } // ConsumerGroup returns the consumer group for offset tracking. diff --git a/orchestrator/controller/mergesignal/mergesignal_test.go b/orchestrator/controller/mergesignal/mergesignal_test.go index a7f2f06e..43acdef7 100644 --- a/orchestrator/controller/mergesignal/mergesignal_test.go +++ b/orchestrator/controller/mergesignal/mergesignal_test.go @@ -20,16 +20,17 @@ func newTestController(t *testing.T) *Controller { logger := zaptest.NewLogger(t).Sugar() scope := tally.NoopScope - registry := consumer.NewTopicRegistry(nil, nil) + registry, err := consumer.NewTopicRegistry(nil) + require.NoError(t, err) - return NewController(logger, scope, registry, consumer.TopicMergeSignal, "orchestrator-mergesignal") + return NewController(logger, scope, registry, consumer.TopicKeyMergeSignal, "orchestrator-mergesignal") } func TestNewController(t *testing.T) { controller := newTestController(t) require.NotNil(t, controller) - assert.Equal(t, consumer.TopicMergeSignal, controller.Topic()) + assert.Equal(t, consumer.TopicKeyMergeSignal, controller.TopicKey()) assert.Equal(t, "orchestrator-mergesignal", controller.ConsumerGroup()) assert.Equal(t, "mergesignal", controller.Name()) } diff --git a/orchestrator/controller/request/request.go b/orchestrator/controller/request/request.go index 66d8ee91..e4940ffd 100644 --- a/orchestrator/controller/request/request.go +++ b/orchestrator/controller/request/request.go @@ -20,7 +20,7 @@ type Controller struct { metricsScope tally.Scope registry consumer.TopicRegistry mergeChecker mergechecker.MergeChecker - topic consumer.Topic + topicKey consumer.TopicKey consumerGroup string } @@ -33,7 +33,7 @@ func NewController( scope tally.Scope, registry consumer.TopicRegistry, mergeChecker mergechecker.MergeChecker, - topic consumer.Topic, + topicKey consumer.TopicKey, consumerGroup string, ) *Controller { return &Controller{ @@ -41,7 +41,7 @@ func NewController( metricsScope: scope.SubScope("request_controller"), registry: registry, mergeChecker: mergeChecker, - topic: topic, + topicKey: topicKey, consumerGroup: consumerGroup, } } @@ -101,10 +101,10 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er } // Publish to batch topic - if err := c.publish(ctx, consumer.TopicToBatch, request); err != nil { + if err := c.publish(ctx, consumer.TopicKeyToBatch, request); err != nil { c.logger.Errorw("failed to publish output", "request_id", request.ID, - "topic", "batch", + "topic_key", "to-batch", "error", err, ) c.metricsScope.Counter("publish_errors").Inc(1) @@ -113,7 +113,7 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er c.logger.Infow("published request to next stage", "request_id", request.ID, - "topic", "batch", + "topic_key", "to-batch", ) c.metricsScope.Counter("processed").Inc(1) @@ -121,8 +121,8 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er return nil // Success - message will be acked } -// publish publishes a request to the specified topic. -func (c *Controller) publish(ctx context.Context, topic consumer.Topic, request entity.Request) error { +// publish publishes a request to the specified topic key. +func (c *Controller) publish(ctx context.Context, key consumer.TopicKey, request entity.Request) error { payload, err := request.ToBytes() if err != nil { return fmt.Errorf("failed to serialize request: %w", err) @@ -130,12 +130,17 @@ func (c *Controller) publish(ctx context.Context, topic consumer.Topic, request msg := entityqueue.NewMessage(request.ID, payload, request.Queue, nil) - q, ok := c.registry.Queue(topic) + q, ok := c.registry.Queue(key) if !ok { - return fmt.Errorf("no queue registered for topic %s", topic) + return fmt.Errorf("no queue registered for topic key %s", key) } - if err := q.Publisher().Publish(ctx, topic.String(), msg); err != nil { + topicName, ok := c.registry.TopicName(key) + if !ok { + return fmt.Errorf("no topic name registered for topic key %s", key) + } + + if err := q.Publisher().Publish(ctx, topicName, msg); err != nil { return fmt.Errorf("failed to publish message: %w", err) } @@ -147,9 +152,9 @@ func (c *Controller) Name() string { return "request" } -// Topic returns the topic this controller subscribes to. -func (c *Controller) Topic() consumer.Topic { - return c.topic +// TopicKey returns the topic key this controller subscribes to. +func (c *Controller) TopicKey() consumer.TopicKey { + return c.topicKey } // ConsumerGroup returns the consumer group for offset tracking. diff --git a/orchestrator/controller/request/request_test.go b/orchestrator/controller/request/request_test.go index da5cca69..fe0e0798 100644 --- a/orchestrator/controller/request/request_test.go +++ b/orchestrator/controller/request/request_test.go @@ -40,12 +40,12 @@ func newTestController(t *testing.T, ctrl *gomock.Controller, mc mergechecker.Me mockQ := queuemock.NewMockQueue(ctrl) mockQ.EXPECT().Publisher().Return(mockPub).AnyTimes() - registry := consumer.NewTopicRegistry( - []consumer.TopicConfig{{Topic: consumer.TopicToBatch, Queue: mockQ}}, - nil, + registry, err := consumer.NewTopicRegistry( + []consumer.TopicConfig{{Key: consumer.TopicKeyToBatch, Name: "to-batch", Queue: mockQ}}, ) + require.NoError(t, err) - return NewController(logger, scope, registry, mc, consumer.TopicRequest, "orchestrator-request") + return NewController(logger, scope, registry, mc, consumer.TopicKeyRequest, "orchestrator-request") } func TestNewController(t *testing.T) { @@ -54,7 +54,7 @@ func TestNewController(t *testing.T) { controller := newTestController(t, ctrl, mc, nil) require.NotNil(t, controller) - assert.Equal(t, consumer.TopicRequest, controller.Topic()) + assert.Equal(t, consumer.TopicKeyRequest, controller.TopicKey()) assert.Equal(t, "orchestrator-request", controller.ConsumerGroup()) assert.Equal(t, "request", controller.Name()) } diff --git a/orchestrator/controller/speculate/speculate.go b/orchestrator/controller/speculate/speculate.go index 8695464d..47c287c0 100644 --- a/orchestrator/controller/speculate/speculate.go +++ b/orchestrator/controller/speculate/speculate.go @@ -18,7 +18,7 @@ type Controller struct { logger *zap.SugaredLogger metricsScope tally.Scope registry consumer.TopicRegistry - topic consumer.Topic + topicKey consumer.TopicKey consumerGroup string } @@ -30,14 +30,14 @@ func NewController( logger *zap.SugaredLogger, scope tally.Scope, registry consumer.TopicRegistry, - topic consumer.Topic, + topicKey consumer.TopicKey, consumerGroup string, ) *Controller { return &Controller{ logger: logger.Named("speculate_controller"), metricsScope: scope.SubScope("speculate_controller"), registry: registry, - topic: topic, + topicKey: topicKey, consumerGroup: consumerGroup, } } @@ -78,10 +78,10 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er // - Conflict detection // Publish to build topic - if err := c.publish(ctx, consumer.TopicBuild, request); err != nil { + if err := c.publish(ctx, consumer.TopicKeyBuild, request); err != nil { c.logger.Errorw("failed to publish to build", "request_id", request.ID, - "topic", consumer.TopicBuild, + "topic_key", consumer.TopicKeyBuild, "error", err, ) c.metricsScope.Counter("publish_errors").Inc(1) @@ -89,10 +89,10 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er } // Publish to merge topic - if err := c.publish(ctx, consumer.TopicToMerge, request); err != nil { + if err := c.publish(ctx, consumer.TopicKeyToMerge, request); err != nil { c.logger.Errorw("failed to publish to merge", "request_id", request.ID, - "topic", consumer.TopicToMerge, + "topic_key", consumer.TopicKeyToMerge, "error", err, ) c.metricsScope.Counter("publish_errors").Inc(1) @@ -101,7 +101,7 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er c.logger.Infow("published request to next stages", "request_id", request.ID, - "topics", []string{consumer.TopicBuild.String(), consumer.TopicToMerge.String()}, + "topic_keys", []string{consumer.TopicKeyBuild.String(), consumer.TopicKeyToMerge.String()}, ) c.metricsScope.Counter("processed").Inc(1) @@ -109,8 +109,8 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er return nil // Success - message will be acked } -// publish publishes a request to the specified topic. -func (c *Controller) publish(ctx context.Context, topic consumer.Topic, request entity.Request) error { +// publish publishes a request to the specified topic key. +func (c *Controller) publish(ctx context.Context, key consumer.TopicKey, request entity.Request) error { payload, err := request.ToBytes() if err != nil { return fmt.Errorf("failed to serialize request: %w", err) @@ -118,12 +118,17 @@ func (c *Controller) publish(ctx context.Context, topic consumer.Topic, request msg := entityqueue.NewMessage(request.ID, payload, request.Queue, nil) - q, ok := c.registry.Queue(topic) + q, ok := c.registry.Queue(key) if !ok { - return fmt.Errorf("no queue registered for topic %s", topic) + return fmt.Errorf("no queue registered for topic key %s", key) } - if err := q.Publisher().Publish(ctx, topic.String(), msg); err != nil { + topicName, ok := c.registry.TopicName(key) + if !ok { + return fmt.Errorf("no topic name registered for topic key %s", key) + } + + if err := q.Publisher().Publish(ctx, topicName, msg); err != nil { return fmt.Errorf("failed to publish message: %w", err) } @@ -135,9 +140,9 @@ func (c *Controller) Name() string { return "speculate" } -// Topic returns the topic this controller subscribes to. -func (c *Controller) Topic() consumer.Topic { - return c.topic +// TopicKey returns the topic key this controller subscribes to. +func (c *Controller) TopicKey() consumer.TopicKey { + return c.topicKey } // ConsumerGroup returns the consumer group for offset tracking. diff --git a/orchestrator/controller/speculate/speculate_test.go b/orchestrator/controller/speculate/speculate_test.go index 452ca70a..2a75e0ee 100644 --- a/orchestrator/controller/speculate/speculate_test.go +++ b/orchestrator/controller/speculate/speculate_test.go @@ -31,15 +31,15 @@ func newTestController(t *testing.T, ctrl *gomock.Controller, publishErr error) mockQ := queuemock.NewMockQueue(ctrl) mockQ.EXPECT().Publisher().Return(mockPub).AnyTimes() - registry := consumer.NewTopicRegistry( + registry, err := consumer.NewTopicRegistry( []consumer.TopicConfig{ - {Topic: consumer.TopicBuild, Queue: mockQ}, - {Topic: consumer.TopicToMerge, Queue: mockQ}, + {Key: consumer.TopicKeyBuild, Name: "build", Queue: mockQ}, + {Key: consumer.TopicKeyToMerge, Name: "to-merge", Queue: mockQ}, }, - nil, ) + require.NoError(t, err) - return NewController(logger, scope, registry, consumer.TopicBatched, "orchestrator-speculate") + return NewController(logger, scope, registry, consumer.TopicKeyBatched, "orchestrator-speculate") } func TestNewController(t *testing.T) { @@ -47,7 +47,7 @@ func TestNewController(t *testing.T) { controller := newTestController(t, ctrl, nil) require.NotNil(t, controller) - assert.Equal(t, consumer.TopicBatched, controller.Topic()) + assert.Equal(t, consumer.TopicKeyBatched, controller.TopicKey()) assert.Equal(t, "orchestrator-speculate", controller.ConsumerGroup()) assert.Equal(t, "speculate", controller.Name()) } diff --git a/test/integration/extension/queue/mysql/queue_test.go b/test/integration/extension/queue/mysql/queue_test.go index fc3988e4..22374ae6 100644 --- a/test/integration/extension/queue/mysql/queue_test.go +++ b/test/integration/extension/queue/mysql/queue_test.go @@ -136,7 +136,7 @@ func (s *SQLQueueIntegrationSuite) TestPublishAndSubscribe() { topic := "test_topic" // Subscribe first with config - subConfig := extqueue.DefaultSubscriptionConfig(topic, "test-worker-1", "test-consumer") + subConfig := extqueue.DefaultSubscriptionConfig("test-worker-1", "test-consumer") deliveryChan, err := subscriber.Subscribe(s.ctx, topic, subConfig) require.NoError(t, err) @@ -206,7 +206,7 @@ func (s *SQLQueueIntegrationSuite) TestMultiplePartitions() { topic := "multi_partition_topic" // Subscribe - subConfig := extqueue.DefaultSubscriptionConfig(topic, "worker-1", "multi-partition-consumer") + subConfig := extqueue.DefaultSubscriptionConfig("worker-1", "multi-partition-consumer") deliveryChan, err := subscriber.Subscribe(s.ctx, topic, subConfig) require.NoError(t, err) @@ -251,7 +251,7 @@ func (s *SQLQueueIntegrationSuite) TestVisibilityTimeoutAndRetry() { topic := "retry_topic" // Use short visibility timeout for faster test - subConfig := extqueue.DefaultSubscriptionConfig(topic, "worker-1", "retry-consumer") + subConfig := extqueue.DefaultSubscriptionConfig("worker-1", "retry-consumer") subConfig.VisibilityTimeoutMs = 2000 // 2 seconds subConfig.PollIntervalMs = 100 // 100 milliseconds @@ -338,7 +338,7 @@ func (s *SQLQueueIntegrationSuite) TestNackWithDelay() { topic := "nack_topic" // Subscribe - subConfig := extqueue.DefaultSubscriptionConfig(topic, "worker-1", "nack-consumer") + subConfig := extqueue.DefaultSubscriptionConfig("worker-1", "nack-consumer") subConfig.PollIntervalMs = 100 // 100 milliseconds deliveryChan, err := subscriber.Subscribe(s.ctx, topic, subConfig) require.NoError(t, err) @@ -390,7 +390,7 @@ func (s *SQLQueueIntegrationSuite) TestIdempotentPublish() { topic := "idempotent_topic" // Subscribe - subConfig := extqueue.DefaultSubscriptionConfig(topic, "worker-1", "idempotent-consumer") + subConfig := extqueue.DefaultSubscriptionConfig("worker-1", "idempotent-consumer") deliveryChan, err := subscriber.Subscribe(s.ctx, topic, subConfig) require.NoError(t, err) @@ -437,7 +437,7 @@ func (s *SQLQueueIntegrationSuite) TestConcurrentPublishers() { topic := "concurrent_topic" // Subscribe - subConfig := extqueue.DefaultSubscriptionConfig(topic, "worker-1", "concurrent-consumer") + subConfig := extqueue.DefaultSubscriptionConfig("worker-1", "concurrent-consumer") deliveryChan, err := subscriber.Subscribe(s.ctx, topic, subConfig) require.NoError(t, err) @@ -492,7 +492,7 @@ func (s *SQLQueueIntegrationSuite) TestCrashRecovery() { topic := "crash_topic" // Use short timeouts for faster test - subConfig := extqueue.DefaultSubscriptionConfig(topic, "worker-1", "crash-consumer") + subConfig := extqueue.DefaultSubscriptionConfig("worker-1", "crash-consumer") subConfig.VisibilityTimeoutMs = 2000 // 2 seconds subConfig.PollIntervalMs = 100 // 100 milliseconds subConfig.LeaseDurationMs = 3000 // 3 seconds - short lease for testing crash recovery @@ -531,7 +531,7 @@ func (s *SQLQueueIntegrationSuite) TestCrashRecovery() { subscriber2 := q2.Subscriber() - subConfig2 := extqueue.DefaultSubscriptionConfig(topic, "worker-2", "crash-consumer") + subConfig2 := extqueue.DefaultSubscriptionConfig("worker-2", "crash-consumer") subConfig2.VisibilityTimeoutMs = 2000 // 2 seconds subConfig2.PollIntervalMs = 100 // 100 milliseconds subConfig2.LeaseDurationMs = 3000 // 3 seconds @@ -578,11 +578,11 @@ func (s *SQLQueueIntegrationSuite) TestMultipleConsumerGroups() { subscriber2 := q2.Subscriber() // Subscribe both groups - subConfig1 := extqueue.DefaultSubscriptionConfig(topic, "worker-1", "group-A") + subConfig1 := extqueue.DefaultSubscriptionConfig("worker-1", "group-A") deliveryChan1, err := subscriber1.Subscribe(s.ctx, topic, subConfig1) require.NoError(t, err) - subConfig2 := extqueue.DefaultSubscriptionConfig(topic, "worker-1", "group-B") + subConfig2 := extqueue.DefaultSubscriptionConfig("worker-1", "group-B") deliveryChan2, err := subscriber2.Subscribe(s.ctx, topic, subConfig2) require.NoError(t, err) @@ -657,11 +657,11 @@ func (s *SQLQueueIntegrationSuite) TestMultipleWorkersInConsumerGroup() { subscriber2 := q2.Subscriber() // Subscribe both workers - subConfig1 := extqueue.DefaultSubscriptionConfig(topic, "worker-1", consumerGroup) + subConfig1 := extqueue.DefaultSubscriptionConfig("worker-1", consumerGroup) deliveryChan1, err := subscriber1.Subscribe(s.ctx, topic, subConfig1) require.NoError(t, err) - subConfig2 := extqueue.DefaultSubscriptionConfig(topic, "worker-2", consumerGroup) + subConfig2 := extqueue.DefaultSubscriptionConfig("worker-2", consumerGroup) deliveryChan2, err := subscriber2.Subscribe(s.ctx, topic, subConfig2) require.NoError(t, err) @@ -777,7 +777,7 @@ func (s *SQLQueueIntegrationSuite) TestConcurrentSubscribers() { queues = append(queues, q) subscriber := q.Subscriber() - subConfig := extqueue.DefaultSubscriptionConfig(topic, fmt.Sprintf("worker-%d", i), consumerGroup) + subConfig := extqueue.DefaultSubscriptionConfig(fmt.Sprintf("worker-%d", i), consumerGroup) deliveryChan, err := subscriber.Subscribe(s.ctx, topic, subConfig) require.NoError(t, err) deliveryChans = append(deliveryChans, deliveryChan) @@ -870,7 +870,7 @@ func (s *SQLQueueIntegrationSuite) TestDeadLetterQueue() { subscriber := q.Subscriber() // Configure with low max attempts and DLQ enabled - subConfig := extqueue.DefaultSubscriptionConfig(topic, "worker-1", "dlq-consumer") + subConfig := extqueue.DefaultSubscriptionConfig("worker-1", "dlq-consumer") subConfig.PollIntervalMs = 100 // 100 milliseconds subConfig.VisibilityTimeoutMs = 1000 // 1 second subConfig.Retry.MaxAttempts = 2 // Only 2 attempts before DLQ @@ -915,7 +915,7 @@ func (s *SQLQueueIntegrationSuite) TestDeadLetterQueue() { dlqTopic := topic + subConfig.DLQ.TopicSuffix t.Logf("Subscribing to DLQ topic: %s", dlqTopic) - dlqConfig := extqueue.DefaultSubscriptionConfig(topic, "worker-1", "dlq-consumer") + dlqConfig := extqueue.DefaultSubscriptionConfig("worker-1", "dlq-consumer") dlqDeliveryChan, err := subscriber.Subscribe(s.ctx, dlqTopic, dlqConfig) require.NoError(t, err) @@ -966,7 +966,7 @@ func (s *SQLQueueIntegrationSuite) TestMessageOrderingWithinPartition() { subscriber := q.Subscriber() // Subscribe first - subConfig := extqueue.DefaultSubscriptionConfig(topic, "worker-1", "ordering-consumer") + subConfig := extqueue.DefaultSubscriptionConfig("worker-1", "ordering-consumer") deliveryChan, err := subscriber.Subscribe(s.ctx, topic, subConfig) require.NoError(t, err) @@ -1028,7 +1028,7 @@ func (s *SQLQueueIntegrationSuite) TestLateSubscriber() { // Now subscribe (late subscriber) subscriber := q.Subscriber() - subConfig := extqueue.DefaultSubscriptionConfig(topic, "worker-1", "late-consumer") + subConfig := extqueue.DefaultSubscriptionConfig("worker-1", "late-consumer") deliveryChan, err := subscriber.Subscribe(s.ctx, topic, subConfig) require.NoError(t, err) t.Logf("Late subscriber joined after messages published") @@ -1067,7 +1067,7 @@ func (s *SQLQueueIntegrationSuite) TestEmptyTopicSubscribe() { subscriber := q.Subscriber() // Subscribe to empty topic (no messages published yet) - subConfig := extqueue.DefaultSubscriptionConfig(topic, "worker-1", "empty-consumer") + subConfig := extqueue.DefaultSubscriptionConfig("worker-1", "empty-consumer") subConfig.PollIntervalMs = 100 // 100 milliseconds deliveryChan, err := subscriber.Subscribe(s.ctx, topic, subConfig) require.NoError(t, err) @@ -1111,7 +1111,7 @@ func (s *SQLQueueIntegrationSuite) TestGracefulShutdownDuringProcessing() { subscriber := q.Subscriber() // Subscribe - subConfig := extqueue.DefaultSubscriptionConfig(topic, "worker-1", "shutdown-consumer") + subConfig := extqueue.DefaultSubscriptionConfig("worker-1", "shutdown-consumer") subConfig.PollIntervalMs = 100 // 100 milliseconds deliveryChan, err := subscriber.Subscribe(s.ctx, topic, subConfig) require.NoError(t, err) @@ -1175,7 +1175,7 @@ drainLoop: defer q2.Close() subscriber2 := q2.Subscriber() - subConfig2 := extqueue.DefaultSubscriptionConfig(topic, "worker-1", "shutdown-consumer") + subConfig2 := extqueue.DefaultSubscriptionConfig("worker-1", "shutdown-consumer") deliveryChan2, err := subscriber2.Subscribe(s.ctx, topic, subConfig2) require.NoError(t, err) @@ -1351,7 +1351,7 @@ func (s *SQLQueueIntegrationSuite) TestAdmin_ConsumerLagAfterPartialAck() { } // Subscribe and ack only 2 - subConfig := extqueue.DefaultSubscriptionConfig(topic, "worker-1", consumerGroup) + subConfig := extqueue.DefaultSubscriptionConfig("worker-1", consumerGroup) subConfig.PollIntervalMs = 100 deliveryChan, err := subscriber.Subscribe(s.ctx, topic, subConfig) require.NoError(t, err) @@ -1396,7 +1396,7 @@ func (s *SQLQueueIntegrationSuite) TestAdmin_LeasesAndOffsets() { // Publish and subscribe to create leases and offsets require.NoError(t, publisher.Publish(s.ctx, topic, queue.NewMessage("lo-1", []byte("a"), "p1", nil))) - subConfig := extqueue.DefaultSubscriptionConfig(topic, "admin-worker-1", consumerGroup) + subConfig := extqueue.DefaultSubscriptionConfig("admin-worker-1", consumerGroup) subConfig.PollIntervalMs = 100 deliveryChan, err := subscriber.Subscribe(s.ctx, topic, subConfig) require.NoError(t, err) @@ -1456,7 +1456,7 @@ func (s *SQLQueueIntegrationSuite) TestAdmin_ResetOffsetAndReleaseLease() { // Publish, subscribe, ack — creates offsets and leases require.NoError(t, publisher.Publish(s.ctx, topic, queue.NewMessage("r1", []byte("a"), "rp1", nil))) - subConfig := extqueue.DefaultSubscriptionConfig(topic, "reset-worker", consumerGroup) + subConfig := extqueue.DefaultSubscriptionConfig("reset-worker", consumerGroup) subConfig.PollIntervalMs = 100 deliveryChan, err := subscriber.Subscribe(s.ctx, topic, subConfig) require.NoError(t, err)