Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions core/consumer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ The top-level orchestrator. Register controllers, start consuming, and stop grac

```go
registry, _ := consumer.NewTopicRegistry([]consumer.TopicConfig{
{Key: consumer.TopicKeyRequest, Name: "request", Queue: q, Subscription: subConfig},
{Key: consumer.TopicKeyStart, Name: "request", Queue: q, Subscription: subConfig},
})

c := consumer.New(logger, scope, registry)
Expand Down Expand Up @@ -64,7 +64,7 @@ The `TopicRegistry` maps topic keys to queue backends, topic names, and subscrip
```go
registry, _ := consumer.NewTopicRegistry([]consumer.TopicConfig{
{
Key: consumer.TopicKeyRequest,
Key: consumer.TopicKeyStart,
Name: "request",
Queue: q,
Subscription: extqueue.DefaultSubscriptionConfig("worker-1", "orchestrator"),
Expand All @@ -78,7 +78,7 @@ registry, _ := consumer.NewTopicRegistry([]consumer.TopicConfig{
})
```

**Topic keys** are fixed identifiers for pipeline stages (e.g., `TopicKeyRequest`, `TopicKeyBuild`). The actual queue topic name is configured separately, so library consumers can use their own naming conventions.
**Topic keys** are fixed identifiers for pipeline stages (e.g., `TopicKeyStart`, `TopicKeyBuild`). The actual queue topic name is configured separately, so library consumers can use their own naming conventions.

## Error Handling

Expand Down
58 changes: 29 additions & 29 deletions core/consumer/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func TestConsumer_Register(t *testing.T) {
c := consumer.New(logger, tally.NoopScope, reg)

handler1 := consumermock.NewMockController(ctrl)
setupController(handler1, "handler1", consumer.TopicKeyRequest, "group1", nil)
setupController(handler1, "handler1", consumer.TopicKeyStart, "group1", nil)

handler2 := consumermock.NewMockController(ctrl)
setupController(handler2, "handler2", consumer.TopicKey("other-topic"), "group2", nil)
Expand All @@ -123,10 +123,10 @@ func TestConsumer_Register_DuplicateTopic(t *testing.T) {
c := consumer.New(logger, tally.NoopScope, reg)

handler1 := consumermock.NewMockController(ctrl)
setupController(handler1, "handler1", consumer.TopicKeyRequest, "group1", nil)
setupController(handler1, "handler1", consumer.TopicKeyStart, "group1", nil)

handler2 := consumermock.NewMockController(ctrl)
setupController(handler2, "handler2", consumer.TopicKeyRequest, "group2", nil)
setupController(handler2, "handler2", consumer.TopicKeyStart, "group2", nil)

err := c.Register(handler1)
require.NoError(t, err)
Expand All @@ -146,7 +146,7 @@ func TestConsumer_Register_AfterStop(t *testing.T) {
require.NoError(t, err)

handler := consumermock.NewMockController(ctrl)
setupController(handler, "handler1", consumer.TopicKeyRequest, "group1", nil)
setupController(handler, "handler1", consumer.TopicKeyStart, "group1", nil)

err = c.Register(handler)
assert.Error(t, err)
Expand All @@ -170,7 +170,7 @@ func TestConsumer_Start_AfterStop(t *testing.T) {
c := consumer.New(logger, tally.NoopScope, reg)

handler := consumermock.NewMockController(ctrl)
setupController(handler, "handler1", consumer.TopicKeyRequest, "group1", nil)
setupController(handler, "handler1", consumer.TopicKeyStart, "group1", nil)

err := c.Register(handler)
require.NoError(t, err)
Expand All @@ -189,14 +189,14 @@ func TestConsumer_Start_MissingSubscriptionConfig(t *testing.T) {
mockQ := queuemock.NewMockQueue(ctrl)
// Registry has queue but no subscription config
reg, err := consumer.NewTopicRegistry(
[]consumer.TopicConfig{{Key: consumer.TopicKeyRequest, Name: "request", Queue: mockQ}},
[]consumer.TopicConfig{{Key: consumer.TopicKeyStart, Name: "request", Queue: mockQ}},
)
require.NoError(t, err)

c := consumer.New(logger, tally.NoopScope, reg)

handler := consumermock.NewMockController(ctrl)
setupController(handler, "handler", consumer.TopicKeyRequest, "group", nil)
setupController(handler, "handler", consumer.TopicKeyStart, "group", nil)

err = c.Register(handler)
require.NoError(t, err)
Expand All @@ -217,12 +217,12 @@ func TestConsumer_Start_SubscribeFailure(t *testing.T) {
mockQ := queuemock.NewMockQueue(ctrl)
mockQ.EXPECT().Subscriber().Return(mockSub)

reg := newRegistry(t, mockQ, consumer.TopicKeyRequest, "group")
reg := newRegistry(t, mockQ, consumer.TopicKeyStart, "group")

c := consumer.New(logger, tally.NoopScope, reg)

handler := consumermock.NewMockController(ctrl)
setupController(handler, "handler", consumer.TopicKeyRequest, "group", nil)
setupController(handler, "handler", consumer.TopicKeyStart, "group", nil)

err := c.Register(handler)
require.NoError(t, err)
Expand All @@ -243,13 +243,13 @@ func TestConsumer_ProcessDelivery_Success(t *testing.T) {
mockQ := queuemock.NewMockQueue(ctrl)
mockQ.EXPECT().Subscriber().Return(mockSub)

reg := newRegistry(t, mockQ, consumer.TopicKeyRequest, "test-group")
reg := newRegistry(t, mockQ, consumer.TopicKeyStart, "test-group")

c := consumer.New(logger, tally.NoopScope, reg)

handledMsg := ""
handler := consumermock.NewMockController(ctrl)
setupController(handler, "test-handler", consumer.TopicKeyRequest, "test-group",
setupController(handler, "test-handler", consumer.TopicKeyStart, "test-group",
func(ctx context.Context, delivery consumer.Delivery) error {
handledMsg = delivery.Message().ID
return nil
Expand Down Expand Up @@ -289,12 +289,12 @@ func TestConsumer_ProcessDelivery_Error(t *testing.T) {
mockQ := queuemock.NewMockQueue(ctrl)
mockQ.EXPECT().Subscriber().Return(mockSub)

reg := newRegistry(t, mockQ, consumer.TopicKeyRequest, "test-group")
reg := newRegistry(t, mockQ, consumer.TopicKeyStart, "test-group")

c := consumer.New(logger, tally.NoopScope, reg)

handler := consumermock.NewMockController(ctrl)
setupController(handler, "test-handler", consumer.TopicKeyRequest, "test-group",
setupController(handler, "test-handler", consumer.TopicKeyStart, "test-group",
func(ctx context.Context, delivery consumer.Delivery) error {
return errs.NewRetryableError(fmt.Errorf("processing failed"))
},
Expand Down Expand Up @@ -331,12 +331,12 @@ func TestConsumer_ProcessDelivery_NonRetryableError(t *testing.T) {
mockQ := queuemock.NewMockQueue(ctrl)
mockQ.EXPECT().Subscriber().Return(mockSub)

reg := newRegistry(t, mockQ, consumer.TopicKeyRequest, "test-group")
reg := newRegistry(t, mockQ, consumer.TopicKeyStart, "test-group")

c := consumer.New(logger, tally.NoopScope, reg)

handler := consumermock.NewMockController(ctrl)
setupController(handler, "test-handler", consumer.TopicKeyRequest, "test-group",
setupController(handler, "test-handler", consumer.TopicKeyStart, "test-group",
func(ctx context.Context, delivery consumer.Delivery) error {
return fmt.Errorf("bad payload")
},
Expand Down Expand Up @@ -382,12 +382,12 @@ func TestConsumer_Stop(t *testing.T) {
mockQ := queuemock.NewMockQueue(ctrl)
mockQ.EXPECT().Subscriber().Return(mockSub)

reg := newRegistry(t, mockQ, consumer.TopicKeyRequest, "test-group")
reg := newRegistry(t, mockQ, consumer.TopicKeyStart, "test-group")

c := consumer.New(logger, tally.NoopScope, reg)

handler := consumermock.NewMockController(ctrl)
setupController(handler, "test-handler", consumer.TopicKeyRequest, "test-group", nil)
setupController(handler, "test-handler", consumer.TopicKeyStart, "test-group", nil)

err := c.Register(handler)
require.NoError(t, err)
Expand Down Expand Up @@ -440,12 +440,12 @@ func TestConsumer_ObservabilityTags(t *testing.T) {
mockQ := queuemock.NewMockQueue(ctrl)
mockQ.EXPECT().Subscriber().Return(mockSub)

reg := newRegistry(t, mockQ, consumer.TopicKeyRequest, "test-group")
reg := newRegistry(t, mockQ, consumer.TopicKeyStart, "test-group")

testC := consumer.New(logger, testScope, reg)

handler := consumermock.NewMockController(ctrl)
setupController(handler, "test-handler", consumer.TopicKeyRequest, "test-group",
setupController(handler, "test-handler", consumer.TopicKeyStart, "test-group",
func(ctx context.Context, delivery consumer.Delivery) error {
return tt.handlerError
},
Expand Down Expand Up @@ -515,12 +515,12 @@ func TestConsumer_AckNackLatencyTracking(t *testing.T) {
mockQ := queuemock.NewMockQueue(ctrl)
mockQ.EXPECT().Subscriber().Return(mockSub)

reg := newRegistry(t, mockQ, consumer.TopicKeyRequest, "test-group")
reg := newRegistry(t, mockQ, consumer.TopicKeyStart, "test-group")

c := consumer.New(logger, scope, reg)

handler := consumermock.NewMockController(ctrl)
setupController(handler, "test-handler", consumer.TopicKeyRequest, "test-group",
setupController(handler, "test-handler", consumer.TopicKeyStart, "test-group",
func(ctx context.Context, delivery consumer.Delivery) error { return nil },
)

Expand Down Expand Up @@ -560,12 +560,12 @@ func TestConsumer_ErrorMetrics(t *testing.T) {
mockQ := queuemock.NewMockQueue(ctrl)
mockQ.EXPECT().Subscriber().Return(mockSub)

reg := newRegistry(t, mockQ, consumer.TopicKeyRequest, "test-group")
reg := newRegistry(t, mockQ, consumer.TopicKeyStart, "test-group")

c := consumer.New(logger, scope, reg)

handler := consumermock.NewMockController(ctrl)
setupController(handler, "test-handler", consumer.TopicKeyRequest, "test-group",
setupController(handler, "test-handler", consumer.TopicKeyStart, "test-group",
func(ctx context.Context, delivery consumer.Delivery) error {
return errs.NewRetryableError(fmt.Errorf("processing failed"))
},
Expand Down Expand Up @@ -616,7 +616,7 @@ func TestConsumer_PerPartitionProcessing(t *testing.T) {
mockQ := queuemock.NewMockQueue(ctrl)
mockQ.EXPECT().Subscriber().Return(mockSub)

reg := newRegistry(t, mockQ, consumer.TopicKeyRequest, "test-group")
reg := newRegistry(t, mockQ, consumer.TopicKeyStart, "test-group")

c := consumer.New(logger, tally.NoopScope, reg)

Expand All @@ -626,7 +626,7 @@ func TestConsumer_PerPartitionProcessing(t *testing.T) {
var partBProcessed atomic.Bool

handler := consumermock.NewMockController(ctrl)
setupController(handler, "test-handler", consumer.TopicKeyRequest, "test-group",
setupController(handler, "test-handler", consumer.TopicKeyStart, "test-group",
func(ctx context.Context, delivery consumer.Delivery) error {
pk := delivery.Message().PartitionKey
if pk == "partition-a" {
Expand Down Expand Up @@ -701,7 +701,7 @@ func TestConsumer_PartitionOrdering(t *testing.T) {
mockQ := queuemock.NewMockQueue(ctrl)
mockQ.EXPECT().Subscriber().Return(mockSub)

reg := newRegistry(t, mockQ, consumer.TopicKeyRequest, "test-group")
reg := newRegistry(t, mockQ, consumer.TopicKeyStart, "test-group")

c := consumer.New(logger, tally.NoopScope, reg)

Expand All @@ -712,7 +712,7 @@ func TestConsumer_PartitionOrdering(t *testing.T) {
allDone := make(chan struct{})

handler := consumermock.NewMockController(ctrl)
setupController(handler, "test-handler", consumer.TopicKeyRequest, "test-group",
setupController(handler, "test-handler", consumer.TopicKeyStart, "test-group",
func(ctx context.Context, delivery consumer.Delivery) error {
mu.Lock()
order = append(order, delivery.Message().ID)
Expand Down Expand Up @@ -770,14 +770,14 @@ func TestConsumer_PartitionWorkerCleanup(t *testing.T) {
mockQ := queuemock.NewMockQueue(ctrl)
mockQ.EXPECT().Subscriber().Return(mockSub)

reg := newRegistry(t, mockQ, consumer.TopicKeyRequest, "test-group")
reg := newRegistry(t, mockQ, consumer.TopicKeyStart, "test-group")

c := consumer.New(logger, tally.NoopScope, reg)

processedCount := int64(0)

handler := consumermock.NewMockController(ctrl)
setupController(handler, "test-handler", consumer.TopicKeyRequest, "test-group",
setupController(handler, "test-handler", consumer.TopicKeyStart, "test-group",
func(ctx context.Context, delivery consumer.Delivery) error {
atomic.AddInt64(&processedCount, 1)
return nil
Expand Down
4 changes: 2 additions & 2 deletions core/consumer/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ import (
type TopicKey string

const (
// TopicKeyRequest is the pipeline stage where new requests arrive from the gateway.
TopicKeyRequest TopicKey = "request"
// TopicKeyStart is the pipeline stage where new requests arrive from the gateway.
TopicKeyStart TopicKey = "start"
// TopicKeyValidate is the pipeline stage where requests are published for validation.
TopicKeyValidate TopicKey = "validate"
// TopicKeyBatch is the pipeline stage where validated requests are published for batching.
Expand Down
32 changes: 16 additions & 16 deletions core/consumer/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func TestNewTopicRegistry(t *testing.T) {
registry, err := consumer.NewTopicRegistry(
[]consumer.TopicConfig{
{
Key: consumer.TopicKeyRequest,
Key: consumer.TopicKeyStart,
Name: "request",
Queue: mockQ,
Subscription: extqueue.DefaultSubscriptionConfig(
Expand All @@ -43,15 +43,15 @@ func TestNewTopicRegistry(t *testing.T) {
)
require.NoError(t, err)

q, ok := registry.Queue(consumer.TopicKeyRequest)
q, ok := registry.Queue(consumer.TopicKeyStart)
require.True(t, ok)
assert.Equal(t, mockQ, q)

name, ok := registry.TopicName(consumer.TopicKeyRequest)
name, ok := registry.TopicName(consumer.TopicKeyStart)
require.True(t, ok)
assert.Equal(t, "request", name)

cfg, ok := registry.SubscriptionConfig(consumer.TopicKeyRequest, "group-a")
cfg, ok := registry.SubscriptionConfig(consumer.TopicKeyStart, "group-a")
require.True(t, ok)
assert.Equal(t, "group-a", cfg.ConsumerGroup)
}
Expand Down Expand Up @@ -87,7 +87,7 @@ func TestNewTopicRegistry_InvalidTopicName(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
_, err := consumer.NewTopicRegistry(
[]consumer.TopicConfig{
{Key: consumer.TopicKeyRequest, Name: tt.topicName},
{Key: consumer.TopicKeyStart, Name: tt.topicName},
},
)
require.Error(t, err)
Expand All @@ -108,14 +108,14 @@ func TestTopicRegistry_SubscriptionConfig(t *testing.T) {
name: "found group-a",
configs: []consumer.TopicConfig{
{
Key: consumer.TopicKeyRequest,
Key: consumer.TopicKeyStart,
Name: "request",
Subscription: extqueue.DefaultSubscriptionConfig(
"worker-1", "group-a",
),
},
},
lookupKey: consumer.TopicKeyRequest,
lookupKey: consumer.TopicKeyStart,
lookupGroup: "group-a",
expectFound: true,
expectedGroup: "group-a",
Expand All @@ -124,22 +124,22 @@ func TestTopicRegistry_SubscriptionConfig(t *testing.T) {
name: "not found by group",
configs: []consumer.TopicConfig{
{
Key: consumer.TopicKeyRequest,
Key: consumer.TopicKeyStart,
Name: "request",
Subscription: extqueue.DefaultSubscriptionConfig(
"worker-1", "group-a",
),
},
},
lookupKey: consumer.TopicKeyRequest,
lookupKey: consumer.TopicKeyStart,
lookupGroup: "nonexistent",
expectFound: false,
},
{
name: "not found by topic key",
configs: []consumer.TopicConfig{
{
Key: consumer.TopicKeyRequest,
Key: consumer.TopicKeyStart,
Name: "request",
Subscription: extqueue.DefaultSubscriptionConfig(
"worker-1", "group-a",
Expand Down Expand Up @@ -175,13 +175,13 @@ func TestTopicRegistry_Queue_PerTopic(t *testing.T) {

registry, err := consumer.NewTopicRegistry(
[]consumer.TopicConfig{
{Key: consumer.TopicKeyRequest, Name: "request", Queue: mockQ1},
{Key: consumer.TopicKeyStart, Name: "request", Queue: mockQ1},
{Key: consumer.TopicKeyValidate, Name: "validate", Queue: mockQ2},
},
)
require.NoError(t, err)

q1, ok := registry.Queue(consumer.TopicKeyRequest)
q1, ok := registry.Queue(consumer.TopicKeyStart)
require.True(t, ok)
assert.Equal(t, mockQ1, q1)

Expand All @@ -201,8 +201,8 @@ func TestTopicKey_String(t *testing.T) {
}{
{
name: "predefined topic key",
key: consumer.TopicKeyRequest,
expected: "request",
key: consumer.TopicKeyStart,
expected: "start",
},
{
name: "custom topic key",
Expand All @@ -224,12 +224,12 @@ func TestTopicRegistry_TopicName(t *testing.T) {

registry, err := consumer.NewTopicRegistry(
[]consumer.TopicConfig{
{Key: consumer.TopicKeyRequest, Name: "my-custom-request", Queue: mockQ},
{Key: consumer.TopicKeyStart, Name: "my-custom-request", Queue: mockQ},
},
)
require.NoError(t, err)

name, ok := registry.TopicName(consumer.TopicKeyRequest)
name, ok := registry.TopicName(consumer.TopicKeyStart)
require.True(t, ok)
assert.Equal(t, "my-custom-request", name)

Expand Down
Loading