Skip to content

Commit 7123378

Browse files
authored
refactor(gateway): publish via TopicRegistry; align orchestrator topic names with keys (#150)
## Summary Make the gateway Land controller resolve its publish target through consumer.TopicRegistry instead of taking a Publisher and topic-name string directly. The destination is hardcoded to consumer.TopicKeyStart, mirroring how orchestrator/controller/start hardcodes TopicKeyValidate as its publish destination. The gateway never subscribes, so it does not need a topicKey field on the struct. In example/server/orchestrator, rename TopicKeyStart's topic name "request" to "start" and consumer group "orchestrator-request" to "orchestrator-start" so every stage's topic and consumer-group names match its TopicKey, matching the convention already used by validate, batch, score, etc. In example/server/gateway, build a publish-only TopicRegistry containing a single entry for TopicKeyStart and pass it to NewLandController. ## Test Plan bazel build, bazel test ## Issues ## Stack 1. @ #150 1. #152 1. #144
1 parent 70a4033 commit 7123378

6 files changed

Lines changed: 73 additions & 31 deletions

File tree

example/server/gateway/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ go_library(
1111
importpath = "github.com/uber/submitqueue/example/server/gateway",
1212
visibility = ["//visibility:private"],
1313
deps = [
14+
"//core/consumer",
1415
"//extension/counter/mysql",
1516
"//extension/queue/mysql",
1617
"//extension/storage/mysql",

example/server/gateway/main.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828

2929
_ "github.com/go-sql-driver/mysql"
3030
"github.com/uber-go/tally/v4"
31+
"github.com/uber/submitqueue/core/consumer"
3132
mysqlcounter "github.com/uber/submitqueue/extension/counter/mysql"
3233
queueMySQL "github.com/uber/submitqueue/extension/queue/mysql"
3334
mysqlstorage "github.com/uber/submitqueue/extension/storage/mysql"
@@ -158,6 +159,16 @@ func run() error {
158159
zap.String("queue_dsn", queueDSN),
159160
)
160161

162+
// Build a publish-only topic registry: gateway only feeds the start of the
163+
// orchestrator pipeline (TopicKeyStart). No subscription is configured
164+
// because the gateway never consumes from the queue.
165+
registry, err := consumer.NewTopicRegistry([]consumer.TopicConfig{
166+
{Key: consumer.TopicKeyStart, Name: "start", Queue: mysqlQueue},
167+
})
168+
if err != nil {
169+
return fmt.Errorf("failed to create topic registry: %w", err)
170+
}
171+
161172
// Create gRPC server
162173
grpcServer := grpc.NewServer()
163174

@@ -166,7 +177,7 @@ func run() error {
166177

167178
// Create controllers and wrap them for gRPC
168179
pingController := controller.NewPingController(logger, scope)
169-
landController := controller.NewLandController(logger.Sugar(), scope, cnt, mysqlQueue.Publisher(), requestLogStore, "request")
180+
landController := controller.NewLandController(logger.Sugar(), scope, cnt, requestLogStore, registry)
170181
gatewayServer := &GatewayServer{
171182
pingController: pingController,
172183
landController: landController,

example/server/orchestrator/main.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -299,10 +299,10 @@ func newTopicRegistry(q extqueue.Queue, subscriberName string) (consumer.TopicRe
299299
return consumer.NewTopicRegistry([]consumer.TopicConfig{
300300
{
301301
Key: consumer.TopicKeyStart,
302-
Name: "request",
302+
Name: "start",
303303
Queue: q,
304304
Subscription: extqueue.DefaultSubscriptionConfig(
305-
subscriberName, "orchestrator-request",
305+
subscriberName, "orchestrator-start",
306306
),
307307
},
308308
{
@@ -396,7 +396,7 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t
396396
store,
397397
registry,
398398
consumer.TopicKeyStart,
399-
"orchestrator-request",
399+
"orchestrator-start",
400400
)
401401
if err := c.Register(requestController); err != nil {
402402
return fmt.Errorf("failed to register request controller: %w", err)

gateway/controller/BUILD.bazel

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,11 @@ go_library(
99
importpath = "github.com/uber/submitqueue/gateway/controller",
1010
visibility = ["//visibility:public"],
1111
deps = [
12+
"//core/consumer",
1213
"//core/errs",
1314
"//entity",
1415
"//entity/queue",
1516
"//extension/counter",
16-
"//extension/queue",
1717
"//extension/storage",
1818
"//gateway/protopb",
1919
"@com_github_uber_go_tally_v4//:tally",
@@ -29,6 +29,7 @@ go_test(
2929
],
3030
embed = [":controller"],
3131
deps = [
32+
"//core/consumer",
3233
"//entity",
3334
"//entity/queue",
3435
"//extension/counter/mock",

gateway/controller/land.go

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,11 @@ import (
2121
"time"
2222

2323
"github.com/uber-go/tally/v4"
24+
"github.com/uber/submitqueue/core/consumer"
2425
"github.com/uber/submitqueue/core/errs"
2526
"github.com/uber/submitqueue/entity"
2627
"github.com/uber/submitqueue/entity/queue"
2728
"github.com/uber/submitqueue/extension/counter"
28-
extqueue "github.com/uber/submitqueue/extension/queue"
2929
"github.com/uber/submitqueue/extension/storage"
3030
pb "github.com/uber/submitqueue/gateway/protopb"
3131
"go.uber.org/zap"
@@ -45,21 +45,20 @@ type LandController struct {
4545
logger *zap.SugaredLogger
4646
metricsScope tally.Scope
4747
counter counter.Counter
48-
publisher extqueue.Publisher
4948
requestLogStore storage.RequestLogStore
50-
topic string // Topic to publish requests to (e.g., "request", "land_request")
49+
registry consumer.TopicRegistry
5150
}
5251

5352
// NewLandController creates a new instance of the gateway land controller.
54-
// topic: the queue topic to publish requests to (e.g., "request", "land_request")
55-
func NewLandController(logger *zap.SugaredLogger, scope tally.Scope, counter counter.Counter, publisher extqueue.Publisher, requestLogStore storage.RequestLogStore, topic string) *LandController {
53+
// The controller publishes land requests to the topic registered under
54+
// consumer.TopicKeyStart in the registry.
55+
func NewLandController(logger *zap.SugaredLogger, scope tally.Scope, counter counter.Counter, requestLogStore storage.RequestLogStore, registry consumer.TopicRegistry) *LandController {
5656
return &LandController{
5757
logger: logger,
5858
metricsScope: scope,
5959
counter: counter,
60-
publisher: publisher,
6160
requestLogStore: requestLogStore,
62-
topic: topic,
61+
registry: registry,
6362
}
6463
}
6564

@@ -130,7 +129,7 @@ func (c *LandController) Land(ctx context.Context, req *pb.LandRequest) (*pb.Lan
130129
c.logger.Infow("request published to queue",
131130
"queue", req.Queue,
132131
"sqid", landRequest.ID,
133-
"topic", c.topic,
132+
"topic_key", consumer.TopicKeyStart,
134133
)
135134
c.metricsScope.Counter("publish_success").Inc(1)
136135

@@ -153,8 +152,17 @@ func (c *LandController) publishToQueue(ctx context.Context, landRequest entity.
153152
// - Partition key: landRequest.Queue (ensures ordering per queue)
154153
msg := queue.NewMessage(landRequest.ID, payload, landRequest.Queue, nil)
155154

156-
// Publish to request topic
157-
if err := c.publisher.Publish(ctx, c.topic, msg); err != nil {
155+
q, ok := c.registry.Queue(consumer.TopicKeyStart)
156+
if !ok {
157+
return fmt.Errorf("no queue registered for topic key %s", consumer.TopicKeyStart)
158+
}
159+
160+
topicName, ok := c.registry.TopicName(consumer.TopicKeyStart)
161+
if !ok {
162+
return fmt.Errorf("no topic name registered for topic key %s", consumer.TopicKeyStart)
163+
}
164+
165+
if err := q.Publisher().Publish(ctx, topicName, msg); err != nil {
158166
return fmt.Errorf("failed to publish land request message: %w", err)
159167
}
160168

gateway/controller/land_test.go

Lines changed: 37 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"github.com/stretchr/testify/assert"
2323
"github.com/stretchr/testify/require"
2424
"github.com/uber-go/tally/v4"
25+
"github.com/uber/submitqueue/core/consumer"
2526
"github.com/uber/submitqueue/entity"
2627
"github.com/uber/submitqueue/entity/queue"
2728
countermock "github.com/uber/submitqueue/extension/counter/mock"
@@ -32,11 +33,29 @@ import (
3233
"go.uber.org/zap"
3334
)
3435

35-
// noopPublisher returns a mock publisher that succeeds silently.
36-
func noopPublisher(ctrl *gomock.Controller) *queuemock.MockPublisher {
36+
// newTestRegistry builds a single-entry TopicRegistry for TopicKeyStart wired
37+
// to a mock Queue/Publisher and returns both the registry and the publisher
38+
// mock so callers can set EXPECT() on the publisher.
39+
func newTestRegistry(t *testing.T, ctrl *gomock.Controller) (consumer.TopicRegistry, *queuemock.MockPublisher) {
40+
t.Helper()
3741
pub := queuemock.NewMockPublisher(ctrl)
42+
q := queuemock.NewMockQueue(ctrl)
43+
q.EXPECT().Publisher().Return(pub).AnyTimes()
44+
45+
registry, err := consumer.NewTopicRegistry([]consumer.TopicConfig{
46+
{Key: consumer.TopicKeyStart, Name: "start", Queue: q},
47+
})
48+
require.NoError(t, err)
49+
return registry, pub
50+
}
51+
52+
// newTestRegistryWithNoopPublisher returns a registry whose publisher silently
53+
// accepts any Publish call. Use for tests that don't care about publish behavior.
54+
func newTestRegistryWithNoopPublisher(t *testing.T, ctrl *gomock.Controller) consumer.TopicRegistry {
55+
t.Helper()
56+
registry, pub := newTestRegistry(t, ctrl)
3857
pub.EXPECT().Publish(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
39-
return pub
58+
return registry
4059
}
4160

4261
// noopRequestLogStore returns a mock RequestLogStore that succeeds silently.
@@ -50,7 +69,7 @@ func TestNewLandController(t *testing.T) {
5069
ctrl := gomock.NewController(t)
5170

5271
cnt := countermock.NewMockCounter(ctrl)
53-
controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopPublisher(ctrl), noopRequestLogStore(ctrl), "request")
72+
controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopRequestLogStore(ctrl), newTestRegistryWithNoopPublisher(t, ctrl))
5473
require.NotNil(t, controller)
5574
}
5675

@@ -59,7 +78,7 @@ func TestLand_ReturnsSqid(t *testing.T) {
5978

6079
cnt := countermock.NewMockCounter(ctrl)
6180
cnt.EXPECT().Next(gomock.Any(), gomock.Any()).Return(int64(1), nil)
62-
controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopPublisher(ctrl), noopRequestLogStore(ctrl), "request")
81+
controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopRequestLogStore(ctrl), newTestRegistryWithNoopPublisher(t, ctrl))
6382
ctx := context.Background()
6483

6584
req := &pb.LandRequest{
@@ -77,7 +96,7 @@ func TestLand_ReturnsErrorOnCounterFailure(t *testing.T) {
7796

7897
cnt := countermock.NewMockCounter(ctrl)
7998
cnt.EXPECT().Next(gomock.Any(), gomock.Any()).Return(int64(0), fmt.Errorf("counter unavailable"))
80-
controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopPublisher(ctrl), noopRequestLogStore(ctrl), "request")
99+
controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopRequestLogStore(ctrl), newTestRegistryWithNoopPublisher(t, ctrl))
81100
ctx := context.Background()
82101

83102
req := &pb.LandRequest{
@@ -101,7 +120,7 @@ func TestLand_CounterDomainIncludesQueue(t *testing.T) {
101120
return 1, nil
102121
},
103122
)
104-
controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopPublisher(ctrl), noopRequestLogStore(ctrl), "request")
123+
controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopRequestLogStore(ctrl), newTestRegistryWithNoopPublisher(t, ctrl))
105124
ctx := context.Background()
106125

107126
req := &pb.LandRequest{
@@ -118,7 +137,7 @@ func TestLand_ReturnsErrorOnEmptyQueue(t *testing.T) {
118137
ctrl := gomock.NewController(t)
119138

120139
cnt := countermock.NewMockCounter(ctrl)
121-
controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopPublisher(ctrl), noopRequestLogStore(ctrl), "request")
140+
controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopRequestLogStore(ctrl), newTestRegistryWithNoopPublisher(t, ctrl))
122141
ctx := context.Background()
123142

124143
req := &pb.LandRequest{
@@ -135,7 +154,7 @@ func TestLand_ReturnsErrorOnEmptyChangeUri(t *testing.T) {
135154
ctrl := gomock.NewController(t)
136155

137156
cnt := countermock.NewMockCounter(ctrl)
138-
controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopPublisher(ctrl), noopRequestLogStore(ctrl), "request")
157+
controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopRequestLogStore(ctrl), newTestRegistryWithNoopPublisher(t, ctrl))
139158
ctx := context.Background()
140159

141160
req := &pb.LandRequest{
@@ -152,7 +171,7 @@ func TestLand_ReturnsErrorOnNilChange(t *testing.T) {
152171
ctrl := gomock.NewController(t)
153172

154173
cnt := countermock.NewMockCounter(ctrl)
155-
controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopPublisher(ctrl), noopRequestLogStore(ctrl), "request")
174+
controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopRequestLogStore(ctrl), newTestRegistryWithNoopPublisher(t, ctrl))
156175
ctx := context.Background()
157176

158177
req := &pb.LandRequest{
@@ -173,7 +192,8 @@ func TestLand_PublishesToQueue(t *testing.T) {
173192

174193
cnt := countermock.NewMockCounter(ctrl)
175194
cnt.EXPECT().Next(gomock.Any(), gomock.Any()).Return(int64(123), nil)
176-
publisher := queuemock.NewMockPublisher(ctrl)
195+
196+
registry, publisher := newTestRegistry(t, ctrl)
177197
publisher.EXPECT().Publish(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(
178198
func(ctx context.Context, topic string, msg queue.Message) error {
179199
publishedTopic = topic
@@ -182,7 +202,7 @@ func TestLand_PublishesToQueue(t *testing.T) {
182202
},
183203
)
184204

185-
controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, publisher, noopRequestLogStore(ctrl), "request")
205+
controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopRequestLogStore(ctrl), registry)
186206
ctx := context.Background()
187207

188208
req := &pb.LandRequest{
@@ -195,8 +215,8 @@ func TestLand_PublishesToQueue(t *testing.T) {
195215
require.NoError(t, err)
196216
assert.Equal(t, "test-queue/123", resp.Sqid)
197217

198-
// Verify message was published
199-
assert.Equal(t, "request", publishedTopic)
218+
// Verify message was published to the topic registered under TopicKeyStart
219+
assert.Equal(t, "start", publishedTopic)
200220
assert.Equal(t, "test-queue/123", publishedMessage.ID)
201221
assert.Equal(t, "test-queue", publishedMessage.PartitionKey)
202222

@@ -214,10 +234,11 @@ func TestLand_ContinuesWhenPublishFails(t *testing.T) {
214234

215235
cnt := countermock.NewMockCounter(ctrl)
216236
cnt.EXPECT().Next(gomock.Any(), gomock.Any()).Return(int64(999), nil)
217-
publisher := queuemock.NewMockPublisher(ctrl)
237+
238+
registry, publisher := newTestRegistry(t, ctrl)
218239
publisher.EXPECT().Publish(gomock.Any(), gomock.Any(), gomock.Any()).Return(fmt.Errorf("queue unavailable"))
219240

220-
controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, publisher, noopRequestLogStore(ctrl), "request")
241+
controller := NewLandController(zap.NewNop().Sugar(), tally.NoopScope, cnt, noopRequestLogStore(ctrl), registry)
221242
ctx := context.Background()
222243

223244
req := &pb.LandRequest{

0 commit comments

Comments
 (0)