Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ Generated proto files are committed. When modifying `.proto` files:
- **Directories**: singular (`mock/`, `entity/`, not `mocks/`, `entities/`)
- **Files**: `{method}.go`, `{entity}.go`, `{file}_test.go`, `BUILD.bazel`
- **Proto files**: `{service}.proto`
- **README files**: Do not duplicate interface or type definitions as code blocks in READMEs. Describe behavior in prose and let readers navigate to the source. Only include code samples when explicitly instructed.

### Makefile

Expand Down
70 changes: 64 additions & 6 deletions core/consumer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,34 @@

The consumer package orchestrates queue message processing. It manages subscription lifecycle, message consumption, ack/nack, and graceful shutdown.

## Architecture

```
Consumer
├── Controller A (topic: "request")
│ └── consumeLoop
│ ├── processPartition("part-1") ← serial per partition
│ ├── processPartition("part-2")
│ └── processPartition("part-3")
└── Controller B (topic: "build")
└── consumeLoop
└── processPartition("part-1")
```

The consumer spawns one `consumeLoop` goroutine per controller. Each `consumeLoop` dispatches deliveries to per-partition goroutines, preserving ordering within each partition while processing different partitions in parallel.

## Interfaces

### Consumer

The top-level orchestrator. Register controllers, start consuming, and stop gracefully.

```go
c := consumer.New(logger, scope, queue, "worker-hostname")
registry, _ := consumer.NewTopicRegistry([]consumer.TopicConfig{
{Key: consumer.TopicKeyRequest, Name: "request", Queue: q, Subscription: subConfig},
})

c := consumer.New(logger, scope, registry)

c.Register(myController)
c.Start(ctx)
Expand All @@ -28,15 +48,37 @@ Business logic for processing queue messages. Implement this interface to handle
type Controller interface {
Process(ctx context.Context, delivery Delivery) error
Name() string
Topic() string
TopicKey() TopicKey
ConsumerGroup() string
SubscriptionConfig(subscriberName string) queue.SubscriptionConfig
}
```

### Delivery

A restricted view of a queue delivery exposed to controllers. Hides Ack/Nack (handled automatically by Consumer) while exposing message data and `ExtendVisibilityTimeout`.
A restricted view of a queue delivery exposed to controllers. Hides Ack/Nack/Reject (handled automatically by Consumer) while exposing message data and `ExtendVisibilityTimeout`.

## TopicRegistry

The `TopicRegistry` maps topic keys to queue backends, topic names, and subscription configs. This decouples controllers from infrastructure wiring.

```go
registry, _ := consumer.NewTopicRegistry([]consumer.TopicConfig{
{
Key: consumer.TopicKeyRequest,
Name: "request",
Queue: q,
Subscription: extqueue.DefaultSubscriptionConfig("worker-1", "orchestrator"),
},
{
Key: consumer.TopicKeyBuild,
Name: "build",
Queue: q,
// No Subscription — publish-only topic
},
})
```

**Topic keys** are fixed identifiers for pipeline stages (e.g., `TopicKeyRequest`, `TopicKeyBuild`). The actual queue topic name is configured separately, so library consumers can use their own naming conventions.

## Error Handling

Expand All @@ -46,10 +88,26 @@ Controllers signal processing outcome via the return value of `Process()`:
- **`return errs.NewRetryableError(err)`** — retryable failure, message is nacked for retry.
- **`return err`** — non-retryable error (e.g. poison pill), message is rejected and removed from the queue to prevent infinite retry loops.

```go
func (c *MyController) Process(ctx context.Context, delivery consumer.Delivery) error {
msg := delivery.Message()

result, err := c.service.Process(ctx, msg.Payload)
if err != nil {
if isTransient(err) {
return errs.NewRetryableError(err) // nack → retry
}
return err // reject → DLQ
}

return nil // ack → done
}
```

## Lifecycle

1. **Register** controllers before starting.
2. **Start** subscribes to all topics and spawns consume loops.
3. **Stop** cancels all subscriptions and waits for goroutines to finish (with timeout).
2. **Start** subscribes to all topics and spawns consume loops. Startup is atomic — if any subscription fails, all started subscriptions are cleaned up.
3. **Stop** cancels all subscriptions and waits for goroutines to finish (with timeout budget split across controllers).

Once stopped, the consumer cannot be restarted — `Register()` and `Start()` return errors.
112 changes: 104 additions & 8 deletions core/consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@ import (
"go.uber.org/zap"
)

const (
// startupCleanupTimeoutMs is the timeout for cleaning up subscriptions when
// a controller fails to start during Start().
startupCleanupTimeoutMs = 30000
)

// Consumer orchestrates multiple queue consumers. It handles subscription lifecycle,
// message consumption, ack/nack, and graceful shutdown for the entire pipeline.
type Consumer interface {
Expand Down Expand Up @@ -109,9 +115,9 @@ func (m *consumer) Start(ctx context.Context) error {

for _, controller := range m.controllers {
if err := m.subscribe(ctx, controller); err != nil {
// Cleanup any started controllers with short timeout (30 seconds).
// Ignore error since we're returning the subscribe error.
_ = m.unsubscribeAll(30000)
// Cleanup any started controllers. Ignore error since we're returning
// the subscribe error.
_ = m.unsubscribeAll(startupCleanupTimeoutMs)
return fmt.Errorf("failed to start controller %s: %w", controller.Name(), err)
}
}
Expand Down Expand Up @@ -165,7 +171,7 @@ func (m *consumer) subscribe(ctx context.Context, controller Controller) error {
m.subscriptions[topicKey] = sub

// Spawn consumption goroutine
go m.consumeLoop(controllerCtx, controller, deliveryChan, done)
go m.consumeLoop(controllerCtx, controller, deliveryChan, done, config.BatchSize)

m.logger.Infow("controller started",
"controller", controller.Name(),
Expand All @@ -176,8 +182,29 @@ func (m *consumer) subscribe(ctx context.Context, controller Controller) error {
return nil
}

// consumeLoop processes deliveries for a controller, calling ack/nack based on controller result.
func (m *consumer) consumeLoop(ctx context.Context, controller Controller, deliveryChan <-chan queue.Delivery, done chan struct{}) {
// consumeLoop dispatches deliveries to per-partition worker goroutines.
// Each partition gets its own goroutine, so a slow message on one partition
// does not block other partitions. Per-partition ordering is preserved.
//
// Goroutine model:
//
// consumeLoop (this goroutine) ← reads from deliveryChan
// ├── processPartition("part-1") ← spawned lazily on first message
// ├── processPartition("part-2")
// └── processPartition("part-N")
//
// Shutdown sequence:
// 1. ctx is cancelled (by Stop or parent context)
// 2. consumeLoop exits the select loop and runs the deferred cleanup
// 3. All partition channels are closed, causing processPartition goroutines to
// drain remaining buffered messages and return (range loop ends)
// 4. wg.Wait() blocks until all partition goroutines have exited
// 5. close(done) signals to unsubscribeAll that this controller is fully stopped
//
// Any messages buffered in partition channels but not processed before ctx
// cancellation are safe to drop — the queue's visibility timeout will make
// them visible again for redelivery (at-least-once semantics).
func (m *consumer) consumeLoop(ctx context.Context, controller Controller, deliveryChan <-chan queue.Delivery, done chan struct{}, batchSize int) {
defer close(done)

topicKey := controller.TopicKey()
Expand All @@ -192,13 +219,20 @@ func (m *consumer) consumeLoop(ctx context.Context, controller Controller, deliv
"topic_key", topicKey,
)

// partitionChs maps partition keys to per-partition delivery channels.
// Each channel is created lazily on the first message for that partition
// and is never removed — partitions are stable for the lifetime of a subscription.
partitionChs := make(map[string]chan queue.Delivery)
var wg sync.WaitGroup

for {
select {
case <-ctx.Done():
m.logger.Infow("consume loop stopped",
"controller", controller.Name(),
"topic_key", topicKey,
)
m.shutdownPartitions(partitionChs, &wg)
return

case delivery, ok := <-deliveryChan:
Expand All @@ -207,10 +241,66 @@ func (m *consumer) consumeLoop(ctx context.Context, controller Controller, deliv
"controller", controller.Name(),
"topic_key", topicKey,
)
m.shutdownPartitions(partitionChs, &wg)
return
}

m.processDelivery(ctx, controller, delivery, controllerScope)
// Route delivery to its partition's channel, creating the channel
// and spawning a processPartition goroutine if this is the first
// message for that partition.
partitionKey := delivery.Message().PartitionKey
ch, exists := partitionChs[partitionKey]
if !exists {
ch = make(chan queue.Delivery, batchSize)
partitionChs[partitionKey] = ch
wg.Add(1)
go func(pCh <-chan queue.Delivery) {
defer wg.Done()
m.processPartition(ctx, controller, pCh, controllerScope)
}(ch)
}

// Send to the partition channel. If ctx is cancelled while the
// channel buffer is full, we exit — the undelivered message will
// be retried after visibility timeout.
select {
case ch <- delivery:
case <-ctx.Done():
m.shutdownPartitions(partitionChs, &wg)
return
}
}
}
}

// shutdownPartitions closes all partition channels to signal processPartition
// goroutines to exit, then waits for them to finish draining.
func (m *consumer) shutdownPartitions(partitionChs map[string]chan queue.Delivery, wg *sync.WaitGroup) {
for _, ch := range partitionChs {
close(ch)
}
wg.Wait()
}

// processPartition drains a per-partition channel and processes deliveries serially.
// It runs in its own goroutine (one per partition key). Deliveries within a partition
// are processed in order — the next delivery is not started until the current one
// completes (ack/nack/reject).
//
// The loop exits when either:
// - deliveryCh is closed (consumeLoop cleanup)
// - ctx is cancelled (graceful shutdown)
//
// On context cancellation, the current delivery being read from the channel is
// dropped without processing. This is safe because the queue's visibility timeout
// ensures unprocessed messages are redelivered.
func (m *consumer) processPartition(ctx context.Context, controller Controller, deliveryCh <-chan queue.Delivery, scope tally.Scope) {
for delivery := range deliveryCh {
select {
case <-ctx.Done():
return
default:
m.processDelivery(ctx, controller, delivery, scope)
}
}
}
Expand Down Expand Up @@ -370,7 +460,13 @@ func (m *consumer) Stop(timeoutMs int64) error {
return err
}

// unsubscribeAll stops all active controllers (must be called with lock held).
// unsubscribeAll cancels all subscription contexts and waits for their consumeLoop
// goroutines to exit.
//
// The timeout budget is shared across all subscriptions — each subscription gets
// the remaining time after the previous one finishes. This ensures Stop() returns
// within the caller's specified timeoutMs even if some controllers are slow to drain.
//
// timeoutMs is the maximum time in milliseconds to wait for all controllers to stop.
// Returns error on timeout, nil on success.
func (m *consumer) unsubscribeAll(timeoutMs int64) error {
Expand Down
Loading