From a187a2afbd81aafc2c11bd4b5cbc097c88708323 Mon Sep 17 00:00:00 2001 From: Preetam Dwivedi Date: Tue, 5 May 2026 22:35:01 -0700 Subject: [PATCH] refactor(gateway): publish via TopicRegistry; align orchestrator topic names with keys Make the gateway Land controller resolve its publish target through consumer.TopicRegistry instead of taking a Publisher and topic-name string directly. The destination is hardcoded to consumer.TopicKeyStart, mirroring how orchestrator/controller/start hardcodes TopicKeyValidate as its publish destination. The gateway never subscribes, so it does not need a topicKey field on the struct. In example/server/orchestrator, rename TopicKeyStart's topic name "request" to "start" and consumer group "orchestrator-request" to "orchestrator-start" so every stage's topic and consumer-group names match its TopicKey, matching the convention already used by validate, batch, score, etc. In example/server/gateway, build a publish-only TopicRegistry containing a single entry for TopicKeyStart and pass it to NewLandController. --- example/server/gateway/BUILD.bazel | 1 + example/server/gateway/main.go | 13 ++++++- example/server/orchestrator/main.go | 6 ++-- gateway/controller/BUILD.bazel | 3 +- gateway/controller/land.go | 28 +++++++++------ gateway/controller/land_test.go | 53 ++++++++++++++++++++--------- 6 files changed, 73 insertions(+), 31 deletions(-) diff --git a/example/server/gateway/BUILD.bazel b/example/server/gateway/BUILD.bazel index 73027d30..801eae7f 100644 --- a/example/server/gateway/BUILD.bazel +++ b/example/server/gateway/BUILD.bazel @@ -11,6 +11,7 @@ go_library( importpath = "github.com/uber/submitqueue/example/server/gateway", visibility = ["//visibility:private"], deps = [ + "//core/consumer", "//extension/counter/mysql", "//extension/queue/mysql", "//extension/storage/mysql", diff --git a/example/server/gateway/main.go b/example/server/gateway/main.go index 3a3bc500..c449743b 100644 --- a/example/server/gateway/main.go +++ b/example/server/gateway/main.go @@ -28,6 +28,7 @@ import ( _ "github.com/go-sql-driver/mysql" "github.com/uber-go/tally/v4" + "github.com/uber/submitqueue/core/consumer" mysqlcounter "github.com/uber/submitqueue/extension/counter/mysql" queueMySQL "github.com/uber/submitqueue/extension/queue/mysql" mysqlstorage "github.com/uber/submitqueue/extension/storage/mysql" @@ -158,6 +159,16 @@ func run() error { zap.String("queue_dsn", queueDSN), ) + // Build a publish-only topic registry: gateway only feeds the start of the + // orchestrator pipeline (TopicKeyStart). No subscription is configured + // because the gateway never consumes from the queue. + registry, err := consumer.NewTopicRegistry([]consumer.TopicConfig{ + {Key: consumer.TopicKeyStart, Name: "start", Queue: mysqlQueue}, + }) + if err != nil { + return fmt.Errorf("failed to create topic registry: %w", err) + } + // Create gRPC server grpcServer := grpc.NewServer() @@ -166,7 +177,7 @@ func run() error { // Create controllers and wrap them for gRPC pingController := controller.NewPingController(logger, scope) - landController := controller.NewLandController(logger.Sugar(), scope, cnt, mysqlQueue.Publisher(), requestLogStore, "request") + landController := controller.NewLandController(logger.Sugar(), scope, cnt, requestLogStore, registry) gatewayServer := &GatewayServer{ pingController: pingController, landController: landController, diff --git a/example/server/orchestrator/main.go b/example/server/orchestrator/main.go index e4311d35..82912829 100644 --- a/example/server/orchestrator/main.go +++ b/example/server/orchestrator/main.go @@ -299,10 +299,10 @@ func newTopicRegistry(q extqueue.Queue, subscriberName string) (consumer.TopicRe return consumer.NewTopicRegistry([]consumer.TopicConfig{ { Key: consumer.TopicKeyStart, - Name: "request", + Name: "start", Queue: q, Subscription: extqueue.DefaultSubscriptionConfig( - subscriberName, "orchestrator-request", + subscriberName, "orchestrator-start", ), }, { @@ -396,7 +396,7 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t store, registry, consumer.TopicKeyStart, - "orchestrator-request", + "orchestrator-start", ) if err := c.Register(requestController); err != nil { return fmt.Errorf("failed to register request controller: %w", err) diff --git a/gateway/controller/BUILD.bazel b/gateway/controller/BUILD.bazel index 007fe5fd..627babbd 100644 --- a/gateway/controller/BUILD.bazel +++ b/gateway/controller/BUILD.bazel @@ -9,11 +9,11 @@ go_library( importpath = "github.com/uber/submitqueue/gateway/controller", visibility = ["//visibility:public"], deps = [ + "//core/consumer", "//core/errs", "//entity", "//entity/queue", "//extension/counter", - "//extension/queue", "//extension/storage", "//gateway/protopb", "@com_github_uber_go_tally_v4//:tally", @@ -29,6 +29,7 @@ go_test( ], embed = [":controller"], deps = [ + "//core/consumer", "//entity", "//entity/queue", "//extension/counter/mock", diff --git a/gateway/controller/land.go b/gateway/controller/land.go index fdaedab4..a43bfc93 100644 --- a/gateway/controller/land.go +++ b/gateway/controller/land.go @@ -21,11 +21,11 @@ import ( "time" "github.com/uber-go/tally/v4" + "github.com/uber/submitqueue/core/consumer" "github.com/uber/submitqueue/core/errs" "github.com/uber/submitqueue/entity" "github.com/uber/submitqueue/entity/queue" "github.com/uber/submitqueue/extension/counter" - extqueue "github.com/uber/submitqueue/extension/queue" "github.com/uber/submitqueue/extension/storage" pb "github.com/uber/submitqueue/gateway/protopb" "go.uber.org/zap" @@ -45,21 +45,20 @@ type LandController struct { logger *zap.SugaredLogger metricsScope tally.Scope counter counter.Counter - publisher extqueue.Publisher requestLogStore storage.RequestLogStore - topic string // Topic to publish requests to (e.g., "request", "land_request") + registry consumer.TopicRegistry } // NewLandController creates a new instance of the gateway land controller. -// topic: the queue topic to publish requests to (e.g., "request", "land_request") -func NewLandController(logger *zap.SugaredLogger, scope tally.Scope, counter counter.Counter, publisher extqueue.Publisher, requestLogStore storage.RequestLogStore, topic string) *LandController { +// The controller publishes land requests to the topic registered under +// consumer.TopicKeyStart in the registry. +func NewLandController(logger *zap.SugaredLogger, scope tally.Scope, counter counter.Counter, requestLogStore storage.RequestLogStore, registry consumer.TopicRegistry) *LandController { return &LandController{ logger: logger, metricsScope: scope, counter: counter, - publisher: publisher, requestLogStore: requestLogStore, - topic: topic, + registry: registry, } } @@ -130,7 +129,7 @@ func (c *LandController) Land(ctx context.Context, req *pb.LandRequest) (*pb.Lan c.logger.Infow("request published to queue", "queue", req.Queue, "sqid", landRequest.ID, - "topic", c.topic, + "topic_key", consumer.TopicKeyStart, ) c.metricsScope.Counter("publish_success").Inc(1) @@ -153,8 +152,17 @@ func (c *LandController) publishToQueue(ctx context.Context, landRequest entity. // - Partition key: landRequest.Queue (ensures ordering per queue) msg := queue.NewMessage(landRequest.ID, payload, landRequest.Queue, nil) - // Publish to request topic - if err := c.publisher.Publish(ctx, c.topic, msg); err != nil { + q, ok := c.registry.Queue(consumer.TopicKeyStart) + if !ok { + return fmt.Errorf("no queue registered for topic key %s", consumer.TopicKeyStart) + } + + topicName, ok := c.registry.TopicName(consumer.TopicKeyStart) + if !ok { + return fmt.Errorf("no topic name registered for topic key %s", consumer.TopicKeyStart) + } + + if err := q.Publisher().Publish(ctx, topicName, msg); err != nil { return fmt.Errorf("failed to publish land request message: %w", err) } diff --git a/gateway/controller/land_test.go b/gateway/controller/land_test.go index 469cb800..58b6f353 100644 --- a/gateway/controller/land_test.go +++ b/gateway/controller/land_test.go @@ -22,6 +22,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/uber-go/tally/v4" + "github.com/uber/submitqueue/core/consumer" "github.com/uber/submitqueue/entity" "github.com/uber/submitqueue/entity/queue" countermock "github.com/uber/submitqueue/extension/counter/mock" @@ -32,11 +33,29 @@ import ( "go.uber.org/zap" ) -// noopPublisher returns a mock publisher that succeeds silently. -func noopPublisher(ctrl *gomock.Controller) *queuemock.MockPublisher { +// newTestRegistry builds a single-entry TopicRegistry for TopicKeyStart wired +// to a mock Queue/Publisher and returns both the registry and the publisher +// mock so callers can set EXPECT() on the publisher. +func newTestRegistry(t *testing.T, ctrl *gomock.Controller) (consumer.TopicRegistry, *queuemock.MockPublisher) { + t.Helper() pub := queuemock.NewMockPublisher(ctrl) + q := queuemock.NewMockQueue(ctrl) + q.EXPECT().Publisher().Return(pub).AnyTimes() + + registry, err := consumer.NewTopicRegistry([]consumer.TopicConfig{ + {Key: consumer.TopicKeyStart, Name: "start", Queue: q}, + }) + require.NoError(t, err) + return registry, pub +} + +// newTestRegistryWithNoopPublisher returns a registry whose publisher silently +// accepts any Publish call. Use for tests that don't care about publish behavior. +func newTestRegistryWithNoopPublisher(t *testing.T, ctrl *gomock.Controller) consumer.TopicRegistry { + t.Helper() + registry, pub := newTestRegistry(t, ctrl) pub.EXPECT().Publish(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes() - return pub + return registry } // noopRequestLogStore returns a mock RequestLogStore that succeeds silently. @@ -50,7 +69,7 @@ func TestNewLandController(t *testing.T) { ctrl := gomock.NewController(t) cnt := countermock.NewMockCounter(ctrl) - controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopPublisher(ctrl), noopRequestLogStore(ctrl), "request") + controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopRequestLogStore(ctrl), newTestRegistryWithNoopPublisher(t, ctrl)) require.NotNil(t, controller) } @@ -59,7 +78,7 @@ func TestLand_ReturnsSqid(t *testing.T) { cnt := countermock.NewMockCounter(ctrl) cnt.EXPECT().Next(gomock.Any(), gomock.Any()).Return(int64(1), nil) - controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopPublisher(ctrl), noopRequestLogStore(ctrl), "request") + controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopRequestLogStore(ctrl), newTestRegistryWithNoopPublisher(t, ctrl)) ctx := context.Background() req := &pb.LandRequest{ @@ -77,7 +96,7 @@ func TestLand_ReturnsErrorOnCounterFailure(t *testing.T) { cnt := countermock.NewMockCounter(ctrl) cnt.EXPECT().Next(gomock.Any(), gomock.Any()).Return(int64(0), fmt.Errorf("counter unavailable")) - controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopPublisher(ctrl), noopRequestLogStore(ctrl), "request") + controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopRequestLogStore(ctrl), newTestRegistryWithNoopPublisher(t, ctrl)) ctx := context.Background() req := &pb.LandRequest{ @@ -101,7 +120,7 @@ func TestLand_CounterDomainIncludesQueue(t *testing.T) { return 1, nil }, ) - controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopPublisher(ctrl), noopRequestLogStore(ctrl), "request") + controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopRequestLogStore(ctrl), newTestRegistryWithNoopPublisher(t, ctrl)) ctx := context.Background() req := &pb.LandRequest{ @@ -118,7 +137,7 @@ func TestLand_ReturnsErrorOnEmptyQueue(t *testing.T) { ctrl := gomock.NewController(t) cnt := countermock.NewMockCounter(ctrl) - controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopPublisher(ctrl), noopRequestLogStore(ctrl), "request") + controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopRequestLogStore(ctrl), newTestRegistryWithNoopPublisher(t, ctrl)) ctx := context.Background() req := &pb.LandRequest{ @@ -135,7 +154,7 @@ func TestLand_ReturnsErrorOnEmptyChangeUri(t *testing.T) { ctrl := gomock.NewController(t) cnt := countermock.NewMockCounter(ctrl) - controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopPublisher(ctrl), noopRequestLogStore(ctrl), "request") + controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopRequestLogStore(ctrl), newTestRegistryWithNoopPublisher(t, ctrl)) ctx := context.Background() req := &pb.LandRequest{ @@ -152,7 +171,7 @@ func TestLand_ReturnsErrorOnNilChange(t *testing.T) { ctrl := gomock.NewController(t) cnt := countermock.NewMockCounter(ctrl) - controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopPublisher(ctrl), noopRequestLogStore(ctrl), "request") + controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopRequestLogStore(ctrl), newTestRegistryWithNoopPublisher(t, ctrl)) ctx := context.Background() req := &pb.LandRequest{ @@ -173,7 +192,8 @@ func TestLand_PublishesToQueue(t *testing.T) { cnt := countermock.NewMockCounter(ctrl) cnt.EXPECT().Next(gomock.Any(), gomock.Any()).Return(int64(123), nil) - publisher := queuemock.NewMockPublisher(ctrl) + + registry, publisher := newTestRegistry(t, ctrl) publisher.EXPECT().Publish(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( func(ctx context.Context, topic string, msg queue.Message) error { publishedTopic = topic @@ -182,7 +202,7 @@ func TestLand_PublishesToQueue(t *testing.T) { }, ) - controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, publisher, noopRequestLogStore(ctrl), "request") + controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopRequestLogStore(ctrl), registry) ctx := context.Background() req := &pb.LandRequest{ @@ -195,8 +215,8 @@ func TestLand_PublishesToQueue(t *testing.T) { require.NoError(t, err) assert.Equal(t, "test-queue/123", resp.Sqid) - // Verify message was published - assert.Equal(t, "request", publishedTopic) + // Verify message was published to the topic registered under TopicKeyStart + assert.Equal(t, "start", publishedTopic) assert.Equal(t, "test-queue/123", publishedMessage.ID) assert.Equal(t, "test-queue", publishedMessage.PartitionKey) @@ -214,10 +234,11 @@ func TestLand_ContinuesWhenPublishFails(t *testing.T) { cnt := countermock.NewMockCounter(ctrl) cnt.EXPECT().Next(gomock.Any(), gomock.Any()).Return(int64(999), nil) - publisher := queuemock.NewMockPublisher(ctrl) + + registry, publisher := newTestRegistry(t, ctrl) publisher.EXPECT().Publish(gomock.Any(), gomock.Any(), gomock.Any()).Return(fmt.Errorf("queue unavailable")) - controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, publisher, noopRequestLogStore(ctrl), "request") + controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopRequestLogStore(ctrl), registry) ctx := context.Background() req := &pb.LandRequest{