Skip to content

Commit 7e02be7

Browse files
committed
refactor(names,entities): Rename orchestrator controller from Request to Start, separate responsibilities between services
1 parent d23fa39 commit 7e02be7

23 files changed

Lines changed: 379 additions & 165 deletions

File tree

core/consumer/README.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ The top-level orchestrator. Register controllers, start consuming, and stop grac
2626

2727
```go
2828
registry, _ := consumer.NewTopicRegistry([]consumer.TopicConfig{
29-
{Key: consumer.TopicKeyRequest, Name: "request", Queue: q, Subscription: subConfig},
29+
{Key: consumer.TopicKeyStart, Name: "request", Queue: q, Subscription: subConfig},
3030
})
3131

3232
c := consumer.New(logger, scope, registry)
@@ -64,7 +64,7 @@ The `TopicRegistry` maps topic keys to queue backends, topic names, and subscrip
6464
```go
6565
registry, _ := consumer.NewTopicRegistry([]consumer.TopicConfig{
6666
{
67-
Key: consumer.TopicKeyRequest,
67+
Key: consumer.TopicKeyStart,
6868
Name: "request",
6969
Queue: q,
7070
Subscription: extqueue.DefaultSubscriptionConfig("worker-1", "orchestrator"),
@@ -78,7 +78,7 @@ registry, _ := consumer.NewTopicRegistry([]consumer.TopicConfig{
7878
})
7979
```
8080

81-
**Topic keys** are fixed identifiers for pipeline stages (e.g., `TopicKeyRequest`, `TopicKeyBuild`). The actual queue topic name is configured separately, so library consumers can use their own naming conventions.
81+
**Topic keys** are fixed identifiers for pipeline stages (e.g., `TopicKeyStart`, `TopicKeyBuild`). The actual queue topic name is configured separately, so library consumers can use their own naming conventions.
8282

8383
## Error Handling
8484

core/consumer/consumer_test.go

Lines changed: 29 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ func TestConsumer_Register(t *testing.T) {
103103
c := consumer.New(logger, tally.NoopScope, reg)
104104

105105
handler1 := consumermock.NewMockController(ctrl)
106-
setupController(handler1, "handler1", consumer.TopicKeyRequest, "group1", nil)
106+
setupController(handler1, "handler1", consumer.TopicKeyStart, "group1", nil)
107107

108108
handler2 := consumermock.NewMockController(ctrl)
109109
setupController(handler2, "handler2", consumer.TopicKey("other-topic"), "group2", nil)
@@ -123,10 +123,10 @@ func TestConsumer_Register_DuplicateTopic(t *testing.T) {
123123
c := consumer.New(logger, tally.NoopScope, reg)
124124

125125
handler1 := consumermock.NewMockController(ctrl)
126-
setupController(handler1, "handler1", consumer.TopicKeyRequest, "group1", nil)
126+
setupController(handler1, "handler1", consumer.TopicKeyStart, "group1", nil)
127127

128128
handler2 := consumermock.NewMockController(ctrl)
129-
setupController(handler2, "handler2", consumer.TopicKeyRequest, "group2", nil)
129+
setupController(handler2, "handler2", consumer.TopicKeyStart, "group2", nil)
130130

131131
err := c.Register(handler1)
132132
require.NoError(t, err)
@@ -146,7 +146,7 @@ func TestConsumer_Register_AfterStop(t *testing.T) {
146146
require.NoError(t, err)
147147

148148
handler := consumermock.NewMockController(ctrl)
149-
setupController(handler, "handler1", consumer.TopicKeyRequest, "group1", nil)
149+
setupController(handler, "handler1", consumer.TopicKeyStart, "group1", nil)
150150

151151
err = c.Register(handler)
152152
assert.Error(t, err)
@@ -170,7 +170,7 @@ func TestConsumer_Start_AfterStop(t *testing.T) {
170170
c := consumer.New(logger, tally.NoopScope, reg)
171171

172172
handler := consumermock.NewMockController(ctrl)
173-
setupController(handler, "handler1", consumer.TopicKeyRequest, "group1", nil)
173+
setupController(handler, "handler1", consumer.TopicKeyStart, "group1", nil)
174174

175175
err := c.Register(handler)
176176
require.NoError(t, err)
@@ -189,14 +189,14 @@ func TestConsumer_Start_MissingSubscriptionConfig(t *testing.T) {
189189
mockQ := queuemock.NewMockQueue(ctrl)
190190
// Registry has queue but no subscription config
191191
reg, err := consumer.NewTopicRegistry(
192-
[]consumer.TopicConfig{{Key: consumer.TopicKeyRequest, Name: "request", Queue: mockQ}},
192+
[]consumer.TopicConfig{{Key: consumer.TopicKeyStart, Name: "request", Queue: mockQ}},
193193
)
194194
require.NoError(t, err)
195195

196196
c := consumer.New(logger, tally.NoopScope, reg)
197197

198198
handler := consumermock.NewMockController(ctrl)
199-
setupController(handler, "handler", consumer.TopicKeyRequest, "group", nil)
199+
setupController(handler, "handler", consumer.TopicKeyStart, "group", nil)
200200

201201
err = c.Register(handler)
202202
require.NoError(t, err)
@@ -217,12 +217,12 @@ func TestConsumer_Start_SubscribeFailure(t *testing.T) {
217217
mockQ := queuemock.NewMockQueue(ctrl)
218218
mockQ.EXPECT().Subscriber().Return(mockSub)
219219

220-
reg := newRegistry(t, mockQ, consumer.TopicKeyRequest, "group")
220+
reg := newRegistry(t, mockQ, consumer.TopicKeyStart, "group")
221221

222222
c := consumer.New(logger, tally.NoopScope, reg)
223223

224224
handler := consumermock.NewMockController(ctrl)
225-
setupController(handler, "handler", consumer.TopicKeyRequest, "group", nil)
225+
setupController(handler, "handler", consumer.TopicKeyStart, "group", nil)
226226

227227
err := c.Register(handler)
228228
require.NoError(t, err)
@@ -243,13 +243,13 @@ func TestConsumer_ProcessDelivery_Success(t *testing.T) {
243243
mockQ := queuemock.NewMockQueue(ctrl)
244244
mockQ.EXPECT().Subscriber().Return(mockSub)
245245

246-
reg := newRegistry(t, mockQ, consumer.TopicKeyRequest, "test-group")
246+
reg := newRegistry(t, mockQ, consumer.TopicKeyStart, "test-group")
247247

248248
c := consumer.New(logger, tally.NoopScope, reg)
249249

250250
handledMsg := ""
251251
handler := consumermock.NewMockController(ctrl)
252-
setupController(handler, "test-handler", consumer.TopicKeyRequest, "test-group",
252+
setupController(handler, "test-handler", consumer.TopicKeyStart, "test-group",
253253
func(ctx context.Context, delivery consumer.Delivery) error {
254254
handledMsg = delivery.Message().ID
255255
return nil
@@ -289,12 +289,12 @@ func TestConsumer_ProcessDelivery_Error(t *testing.T) {
289289
mockQ := queuemock.NewMockQueue(ctrl)
290290
mockQ.EXPECT().Subscriber().Return(mockSub)
291291

292-
reg := newRegistry(t, mockQ, consumer.TopicKeyRequest, "test-group")
292+
reg := newRegistry(t, mockQ, consumer.TopicKeyStart, "test-group")
293293

294294
c := consumer.New(logger, tally.NoopScope, reg)
295295

296296
handler := consumermock.NewMockController(ctrl)
297-
setupController(handler, "test-handler", consumer.TopicKeyRequest, "test-group",
297+
setupController(handler, "test-handler", consumer.TopicKeyStart, "test-group",
298298
func(ctx context.Context, delivery consumer.Delivery) error {
299299
return errs.NewRetryableError(fmt.Errorf("processing failed"))
300300
},
@@ -331,12 +331,12 @@ func TestConsumer_ProcessDelivery_NonRetryableError(t *testing.T) {
331331
mockQ := queuemock.NewMockQueue(ctrl)
332332
mockQ.EXPECT().Subscriber().Return(mockSub)
333333

334-
reg := newRegistry(t, mockQ, consumer.TopicKeyRequest, "test-group")
334+
reg := newRegistry(t, mockQ, consumer.TopicKeyStart, "test-group")
335335

336336
c := consumer.New(logger, tally.NoopScope, reg)
337337

338338
handler := consumermock.NewMockController(ctrl)
339-
setupController(handler, "test-handler", consumer.TopicKeyRequest, "test-group",
339+
setupController(handler, "test-handler", consumer.TopicKeyStart, "test-group",
340340
func(ctx context.Context, delivery consumer.Delivery) error {
341341
return fmt.Errorf("bad payload")
342342
},
@@ -382,12 +382,12 @@ func TestConsumer_Stop(t *testing.T) {
382382
mockQ := queuemock.NewMockQueue(ctrl)
383383
mockQ.EXPECT().Subscriber().Return(mockSub)
384384

385-
reg := newRegistry(t, mockQ, consumer.TopicKeyRequest, "test-group")
385+
reg := newRegistry(t, mockQ, consumer.TopicKeyStart, "test-group")
386386

387387
c := consumer.New(logger, tally.NoopScope, reg)
388388

389389
handler := consumermock.NewMockController(ctrl)
390-
setupController(handler, "test-handler", consumer.TopicKeyRequest, "test-group", nil)
390+
setupController(handler, "test-handler", consumer.TopicKeyStart, "test-group", nil)
391391

392392
err := c.Register(handler)
393393
require.NoError(t, err)
@@ -440,12 +440,12 @@ func TestConsumer_ObservabilityTags(t *testing.T) {
440440
mockQ := queuemock.NewMockQueue(ctrl)
441441
mockQ.EXPECT().Subscriber().Return(mockSub)
442442

443-
reg := newRegistry(t, mockQ, consumer.TopicKeyRequest, "test-group")
443+
reg := newRegistry(t, mockQ, consumer.TopicKeyStart, "test-group")
444444

445445
testC := consumer.New(logger, testScope, reg)
446446

447447
handler := consumermock.NewMockController(ctrl)
448-
setupController(handler, "test-handler", consumer.TopicKeyRequest, "test-group",
448+
setupController(handler, "test-handler", consumer.TopicKeyStart, "test-group",
449449
func(ctx context.Context, delivery consumer.Delivery) error {
450450
return tt.handlerError
451451
},
@@ -515,12 +515,12 @@ func TestConsumer_AckNackLatencyTracking(t *testing.T) {
515515
mockQ := queuemock.NewMockQueue(ctrl)
516516
mockQ.EXPECT().Subscriber().Return(mockSub)
517517

518-
reg := newRegistry(t, mockQ, consumer.TopicKeyRequest, "test-group")
518+
reg := newRegistry(t, mockQ, consumer.TopicKeyStart, "test-group")
519519

520520
c := consumer.New(logger, scope, reg)
521521

522522
handler := consumermock.NewMockController(ctrl)
523-
setupController(handler, "test-handler", consumer.TopicKeyRequest, "test-group",
523+
setupController(handler, "test-handler", consumer.TopicKeyStart, "test-group",
524524
func(ctx context.Context, delivery consumer.Delivery) error { return nil },
525525
)
526526

@@ -560,12 +560,12 @@ func TestConsumer_ErrorMetrics(t *testing.T) {
560560
mockQ := queuemock.NewMockQueue(ctrl)
561561
mockQ.EXPECT().Subscriber().Return(mockSub)
562562

563-
reg := newRegistry(t, mockQ, consumer.TopicKeyRequest, "test-group")
563+
reg := newRegistry(t, mockQ, consumer.TopicKeyStart, "test-group")
564564

565565
c := consumer.New(logger, scope, reg)
566566

567567
handler := consumermock.NewMockController(ctrl)
568-
setupController(handler, "test-handler", consumer.TopicKeyRequest, "test-group",
568+
setupController(handler, "test-handler", consumer.TopicKeyStart, "test-group",
569569
func(ctx context.Context, delivery consumer.Delivery) error {
570570
return errs.NewRetryableError(fmt.Errorf("processing failed"))
571571
},
@@ -616,7 +616,7 @@ func TestConsumer_PerPartitionProcessing(t *testing.T) {
616616
mockQ := queuemock.NewMockQueue(ctrl)
617617
mockQ.EXPECT().Subscriber().Return(mockSub)
618618

619-
reg := newRegistry(t, mockQ, consumer.TopicKeyRequest, "test-group")
619+
reg := newRegistry(t, mockQ, consumer.TopicKeyStart, "test-group")
620620

621621
c := consumer.New(logger, tally.NoopScope, reg)
622622

@@ -626,7 +626,7 @@ func TestConsumer_PerPartitionProcessing(t *testing.T) {
626626
var partBProcessed atomic.Bool
627627

628628
handler := consumermock.NewMockController(ctrl)
629-
setupController(handler, "test-handler", consumer.TopicKeyRequest, "test-group",
629+
setupController(handler, "test-handler", consumer.TopicKeyStart, "test-group",
630630
func(ctx context.Context, delivery consumer.Delivery) error {
631631
pk := delivery.Message().PartitionKey
632632
if pk == "partition-a" {
@@ -701,7 +701,7 @@ func TestConsumer_PartitionOrdering(t *testing.T) {
701701
mockQ := queuemock.NewMockQueue(ctrl)
702702
mockQ.EXPECT().Subscriber().Return(mockSub)
703703

704-
reg := newRegistry(t, mockQ, consumer.TopicKeyRequest, "test-group")
704+
reg := newRegistry(t, mockQ, consumer.TopicKeyStart, "test-group")
705705

706706
c := consumer.New(logger, tally.NoopScope, reg)
707707

@@ -712,7 +712,7 @@ func TestConsumer_PartitionOrdering(t *testing.T) {
712712
allDone := make(chan struct{})
713713

714714
handler := consumermock.NewMockController(ctrl)
715-
setupController(handler, "test-handler", consumer.TopicKeyRequest, "test-group",
715+
setupController(handler, "test-handler", consumer.TopicKeyStart, "test-group",
716716
func(ctx context.Context, delivery consumer.Delivery) error {
717717
mu.Lock()
718718
order = append(order, delivery.Message().ID)
@@ -770,14 +770,14 @@ func TestConsumer_PartitionWorkerCleanup(t *testing.T) {
770770
mockQ := queuemock.NewMockQueue(ctrl)
771771
mockQ.EXPECT().Subscriber().Return(mockSub)
772772

773-
reg := newRegistry(t, mockQ, consumer.TopicKeyRequest, "test-group")
773+
reg := newRegistry(t, mockQ, consumer.TopicKeyStart, "test-group")
774774

775775
c := consumer.New(logger, tally.NoopScope, reg)
776776

777777
processedCount := int64(0)
778778

779779
handler := consumermock.NewMockController(ctrl)
780-
setupController(handler, "test-handler", consumer.TopicKeyRequest, "test-group",
780+
setupController(handler, "test-handler", consumer.TopicKeyStart, "test-group",
781781
func(ctx context.Context, delivery consumer.Delivery) error {
782782
atomic.AddInt64(&processedCount, 1)
783783
return nil

core/consumer/registry.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,8 @@ import (
2828
type TopicKey string
2929

3030
const (
31-
// TopicKeyRequest is the pipeline stage where new requests arrive from the gateway.
32-
TopicKeyRequest TopicKey = "request"
31+
// TopicKeyStart is the pipeline stage where new requests arrive from the gateway.
32+
TopicKeyStart TopicKey = "start"
3333
// TopicKeyValidate is the pipeline stage where requests are published for validation.
3434
TopicKeyValidate TopicKey = "validate"
3535
// TopicKeyBatch is the pipeline stage where validated requests are published for batching.

core/consumer/registry_test.go

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ func TestNewTopicRegistry(t *testing.T) {
3232
registry, err := consumer.NewTopicRegistry(
3333
[]consumer.TopicConfig{
3434
{
35-
Key: consumer.TopicKeyRequest,
35+
Key: consumer.TopicKeyStart,
3636
Name: "request",
3737
Queue: mockQ,
3838
Subscription: extqueue.DefaultSubscriptionConfig(
@@ -43,15 +43,15 @@ func TestNewTopicRegistry(t *testing.T) {
4343
)
4444
require.NoError(t, err)
4545

46-
q, ok := registry.Queue(consumer.TopicKeyRequest)
46+
q, ok := registry.Queue(consumer.TopicKeyStart)
4747
require.True(t, ok)
4848
assert.Equal(t, mockQ, q)
4949

50-
name, ok := registry.TopicName(consumer.TopicKeyRequest)
50+
name, ok := registry.TopicName(consumer.TopicKeyStart)
5151
require.True(t, ok)
5252
assert.Equal(t, "request", name)
5353

54-
cfg, ok := registry.SubscriptionConfig(consumer.TopicKeyRequest, "group-a")
54+
cfg, ok := registry.SubscriptionConfig(consumer.TopicKeyStart, "group-a")
5555
require.True(t, ok)
5656
assert.Equal(t, "group-a", cfg.ConsumerGroup)
5757
}
@@ -87,7 +87,7 @@ func TestNewTopicRegistry_InvalidTopicName(t *testing.T) {
8787
t.Run(tt.name, func(t *testing.T) {
8888
_, err := consumer.NewTopicRegistry(
8989
[]consumer.TopicConfig{
90-
{Key: consumer.TopicKeyRequest, Name: tt.topicName},
90+
{Key: consumer.TopicKeyStart, Name: tt.topicName},
9191
},
9292
)
9393
require.Error(t, err)
@@ -108,14 +108,14 @@ func TestTopicRegistry_SubscriptionConfig(t *testing.T) {
108108
name: "found group-a",
109109
configs: []consumer.TopicConfig{
110110
{
111-
Key: consumer.TopicKeyRequest,
111+
Key: consumer.TopicKeyStart,
112112
Name: "request",
113113
Subscription: extqueue.DefaultSubscriptionConfig(
114114
"worker-1", "group-a",
115115
),
116116
},
117117
},
118-
lookupKey: consumer.TopicKeyRequest,
118+
lookupKey: consumer.TopicKeyStart,
119119
lookupGroup: "group-a",
120120
expectFound: true,
121121
expectedGroup: "group-a",
@@ -124,22 +124,22 @@ func TestTopicRegistry_SubscriptionConfig(t *testing.T) {
124124
name: "not found by group",
125125
configs: []consumer.TopicConfig{
126126
{
127-
Key: consumer.TopicKeyRequest,
127+
Key: consumer.TopicKeyStart,
128128
Name: "request",
129129
Subscription: extqueue.DefaultSubscriptionConfig(
130130
"worker-1", "group-a",
131131
),
132132
},
133133
},
134-
lookupKey: consumer.TopicKeyRequest,
134+
lookupKey: consumer.TopicKeyStart,
135135
lookupGroup: "nonexistent",
136136
expectFound: false,
137137
},
138138
{
139139
name: "not found by topic key",
140140
configs: []consumer.TopicConfig{
141141
{
142-
Key: consumer.TopicKeyRequest,
142+
Key: consumer.TopicKeyStart,
143143
Name: "request",
144144
Subscription: extqueue.DefaultSubscriptionConfig(
145145
"worker-1", "group-a",
@@ -175,13 +175,13 @@ func TestTopicRegistry_Queue_PerTopic(t *testing.T) {
175175

176176
registry, err := consumer.NewTopicRegistry(
177177
[]consumer.TopicConfig{
178-
{Key: consumer.TopicKeyRequest, Name: "request", Queue: mockQ1},
178+
{Key: consumer.TopicKeyStart, Name: "request", Queue: mockQ1},
179179
{Key: consumer.TopicKeyValidate, Name: "validate", Queue: mockQ2},
180180
},
181181
)
182182
require.NoError(t, err)
183183

184-
q1, ok := registry.Queue(consumer.TopicKeyRequest)
184+
q1, ok := registry.Queue(consumer.TopicKeyStart)
185185
require.True(t, ok)
186186
assert.Equal(t, mockQ1, q1)
187187

@@ -201,8 +201,8 @@ func TestTopicKey_String(t *testing.T) {
201201
}{
202202
{
203203
name: "predefined topic key",
204-
key: consumer.TopicKeyRequest,
205-
expected: "request",
204+
key: consumer.TopicKeyStart,
205+
expected: "start",
206206
},
207207
{
208208
name: "custom topic key",
@@ -224,12 +224,12 @@ func TestTopicRegistry_TopicName(t *testing.T) {
224224

225225
registry, err := consumer.NewTopicRegistry(
226226
[]consumer.TopicConfig{
227-
{Key: consumer.TopicKeyRequest, Name: "my-custom-request", Queue: mockQ},
227+
{Key: consumer.TopicKeyStart, Name: "my-custom-request", Queue: mockQ},
228228
},
229229
)
230230
require.NoError(t, err)
231231

232-
name, ok := registry.TopicName(consumer.TopicKeyRequest)
232+
name, ok := registry.TopicName(consumer.TopicKeyStart)
233233
require.True(t, ok)
234234
assert.Equal(t, "my-custom-request", name)
235235

0 commit comments

Comments
 (0)