Skip to content

Commit cbb359e

Browse files
authored
Merge branch 'main' into manjari/batch-manager-publish-batch-id
2 parents 43b31db + 0c51767 commit cbb359e

34 files changed

Lines changed: 626 additions & 611 deletions

core/consumer/consumer.go

Lines changed: 40 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ type consumer struct {
3535
mu sync.Mutex
3636
stopped bool
3737
controllers []Controller
38-
subscriptions map[Topic]*activeSubscription // topic -> subscription
38+
subscriptions map[TopicKey]*activeSubscription // topicKey -> subscription
3939
}
4040

4141
// activeSubscription tracks the state of an active subscription.
@@ -52,12 +52,12 @@ func New(logger *zap.SugaredLogger, scope tally.Scope, registry TopicRegistry) C
5252
logger: logger,
5353
metricsScope: scope.SubScope("consumer"),
5454
registry: registry,
55-
subscriptions: make(map[Topic]*activeSubscription),
55+
subscriptions: make(map[TopicKey]*activeSubscription),
5656
}
5757
}
5858

5959
// Register adds a controller to the consumer. Must be called before Start().
60-
// Returns error if a controller for the same topic is already registered or if the consumer is stopped.
60+
// Returns error if a controller for the same topic key is already registered or if the consumer is stopped.
6161
func (m *consumer) Register(controller Controller) error {
6262
m.mu.Lock()
6363
defer m.mu.Unlock()
@@ -66,19 +66,19 @@ func (m *consumer) Register(controller Controller) error {
6666
return fmt.Errorf("consumer is stopped")
6767
}
6868

69-
// Check for duplicate topic registration.
69+
// Check for duplicate topic key registration.
7070
// O(n) scan is fine here — controller count is in the single digits.
7171
for _, c := range m.controllers {
72-
if c.Topic() == controller.Topic() {
73-
return fmt.Errorf("controller for topic %s already registered", controller.Topic())
72+
if c.TopicKey() == controller.TopicKey() {
73+
return fmt.Errorf("controller for topic key %s already registered", controller.TopicKey())
7474
}
7575
}
7676

7777
m.controllers = append(m.controllers, controller)
7878

7979
m.logger.Infow("registered controller",
8080
"controller", controller.Name(),
81-
"topic", controller.Topic(),
81+
"topic_key", controller.TopicKey(),
8282
"consumer_group", controller.ConsumerGroup(),
8383
)
8484

@@ -124,23 +124,29 @@ func (m *consumer) Start(ctx context.Context) error {
124124

125125
// subscribe subscribes a controller to its topic and spawns a consumption goroutine.
126126
func (m *consumer) subscribe(ctx context.Context, controller Controller) error {
127-
topic := controller.Topic()
127+
topicKey := controller.TopicKey()
128128
consumerGroup := controller.ConsumerGroup()
129129

130130
// Get subscription config from registry
131-
config, ok := m.registry.SubscriptionConfig(topic, consumerGroup)
131+
config, ok := m.registry.SubscriptionConfig(topicKey, consumerGroup)
132132
if !ok {
133-
return fmt.Errorf("no subscription config for topic %s, consumer group %s", topic, consumerGroup)
133+
return fmt.Errorf("no subscription config for topic key %s, consumer group %s", topicKey, consumerGroup)
134134
}
135135

136-
// Get queue for this topic
137-
q, ok := m.registry.Queue(topic)
136+
// Get queue for this topic key
137+
q, ok := m.registry.Queue(topicKey)
138138
if !ok {
139-
return fmt.Errorf("no queue registered for topic %s", topic)
139+
return fmt.Errorf("no queue registered for topic key %s", topicKey)
140+
}
141+
142+
// Resolve the actual topic name for subscribing
143+
topicName, ok := m.registry.TopicName(topicKey)
144+
if !ok {
145+
return fmt.Errorf("no topic name registered for topic key %s", topicKey)
140146
}
141147

142148
subscriber := q.Subscriber()
143-
deliveryChan, err := subscriber.Subscribe(ctx, topic.String(), config)
149+
deliveryChan, err := subscriber.Subscribe(ctx, topicName, config)
144150
if err != nil {
145151
return fmt.Errorf("subscribe failed: %w", err)
146152
}
@@ -155,14 +161,14 @@ func (m *consumer) subscribe(ctx context.Context, controller Controller) error {
155161
cancelFunc: cancel,
156162
done: done,
157163
}
158-
m.subscriptions[topic] = sub
164+
m.subscriptions[topicKey] = sub
159165

160166
// Spawn consumption goroutine
161167
go m.consumeLoop(controllerCtx, controller, deliveryChan, done)
162168

163169
m.logger.Infow("controller started",
164170
"controller", controller.Name(),
165-
"topic", topic,
171+
"topic_key", topicKey,
166172
"consumer_group", consumerGroup,
167173
)
168174

@@ -173,32 +179,32 @@ func (m *consumer) subscribe(ctx context.Context, controller Controller) error {
173179
func (m *consumer) consumeLoop(ctx context.Context, controller Controller, deliveryChan <-chan queue.Delivery, done chan struct{}) {
174180
defer close(done)
175181

176-
topic := controller.Topic()
182+
topicKey := controller.TopicKey()
177183

178184
controllerScope := m.metricsScope.Tagged(map[string]string{
179185
"controller": controller.Name(),
180-
"topic": topic.String(),
186+
"topic_key": topicKey.String(),
181187
})
182188

183189
m.logger.Debugw("consume loop started",
184190
"controller", controller.Name(),
185-
"topic", topic,
191+
"topic_key", topicKey,
186192
)
187193

188194
for {
189195
select {
190196
case <-ctx.Done():
191197
m.logger.Infow("consume loop stopped",
192198
"controller", controller.Name(),
193-
"topic", topic,
199+
"topic_key", topicKey,
194200
)
195201
return
196202

197203
case delivery, ok := <-deliveryChan:
198204
if !ok {
199205
m.logger.Infow("delivery channel closed",
200206
"controller", controller.Name(),
201-
"topic", topic,
207+
"topic_key", topicKey,
202208
)
203209
return
204210
}
@@ -214,11 +220,11 @@ func (m *consumer) processDelivery(ctx context.Context, controller Controller, d
214220
controllerScope.Counter("messages_received").Inc(1)
215221

216222
msg := delivery.Message()
217-
topic := controller.Topic()
223+
topicKey := controller.TopicKey()
218224

219225
m.logger.Debugw("processing delivery",
220226
"controller", controller.Name(),
221-
"topic", topic,
227+
"topic_key", topicKey,
222228
"message_id", msg.ID,
223229
"partition_key", msg.PartitionKey,
224230
"attempt", delivery.Attempt(),
@@ -248,7 +254,7 @@ func (m *consumer) processDelivery(ctx context.Context, controller Controller, d
248254
if IsNonRetryable(err) {
249255
m.logger.Errorw("non-retryable controller error, rejecting message",
250256
"controller", controller.Name(),
251-
"topic", controller.Topic(),
257+
"topic_key", controller.TopicKey(),
252258
"message_id", msg.ID,
253259
"partition_key", msg.PartitionKey,
254260
"attempt", delivery.Attempt(),
@@ -262,7 +268,7 @@ func (m *consumer) processDelivery(ctx context.Context, controller Controller, d
262268
if rejectErr := delivery.Reject(ctx, err.Error()); rejectErr != nil {
263269
m.logger.Errorw("failed to reject non-retryable message",
264270
"controller", controller.Name(),
265-
"topic", controller.Topic(),
271+
"topic_key", controller.TopicKey(),
266272
"message_id", msg.ID,
267273
"error", rejectErr,
268274
)
@@ -274,7 +280,7 @@ func (m *consumer) processDelivery(ctx context.Context, controller Controller, d
274280
// Controller returned retryable error - nack message for retry
275281
m.logger.Errorw("controller error, nacking message",
276282
"controller", controller.Name(),
277-
"topic", topic,
283+
"topic_key", topicKey,
278284
"message_id", msg.ID,
279285
"partition_key", msg.PartitionKey,
280286
"attempt", delivery.Attempt(),
@@ -289,7 +295,7 @@ func (m *consumer) processDelivery(ctx context.Context, controller Controller, d
289295
if nackErr := delivery.Nack(ctx, 0); nackErr != nil {
290296
m.logger.Errorw("failed to nack message",
291297
"controller", controller.Name(),
292-
"topic", topic,
298+
"topic_key", topicKey,
293299
"message_id", msg.ID,
294300
"error", nackErr,
295301
)
@@ -310,7 +316,7 @@ func (m *consumer) processDelivery(ctx context.Context, controller Controller, d
310316
if ackErr := delivery.Ack(ctx); ackErr != nil {
311317
m.logger.Errorw("failed to ack message",
312318
"controller", controller.Name(),
313-
"topic", topic,
319+
"topic_key", topicKey,
314320
"message_id", msg.ID,
315321
"error", ackErr,
316322
)
@@ -334,7 +340,7 @@ func (m *consumer) processDelivery(ctx context.Context, controller Controller, d
334340

335341
m.logger.Debugw("message processed successfully",
336342
"controller", controller.Name(),
337-
"topic", topic,
343+
"topic_key", topicKey,
338344
"message_id", msg.ID,
339345
"partition_key", msg.PartitionKey,
340346
"attempt", delivery.Attempt(),
@@ -368,26 +374,26 @@ func (m *consumer) Stop(timeoutMs int64) error {
368374
// Returns error on timeout, nil on success.
369375
func (m *consumer) unsubscribeAll(timeoutMs int64) error {
370376
// Cancel all subscription contexts
371-
for topic, sub := range m.subscriptions {
377+
for topicKey, sub := range m.subscriptions {
372378
m.logger.Debugw("stopping controller",
373379
"controller", sub.controller.Name(),
374-
"topic", topic,
380+
"topic_key", topicKey,
375381
)
376382
sub.cancelFunc()
377383
}
378384

379385
// Wait for each subscription to finish, splitting the timeout budget across them
380386
remaining := time.Duration(timeoutMs) * time.Millisecond
381387
var timedOut bool
382-
for topic, sub := range m.subscriptions {
388+
for topicKey, sub := range m.subscriptions {
383389
start := time.Now()
384390
select {
385391
case <-sub.done:
386392
// Controller stopped gracefully
387393
case <-time.After(remaining):
388394
m.logger.Errorw("timeout waiting for controller to stop",
389395
"controller", sub.controller.Name(),
390-
"topic", topic,
396+
"topic_key", topicKey,
391397
)
392398
timedOut = true
393399
}
@@ -399,7 +405,7 @@ func (m *consumer) unsubscribeAll(timeoutMs int64) error {
399405
}
400406

401407
// Clear subscriptions
402-
m.subscriptions = make(map[Topic]*activeSubscription)
408+
m.subscriptions = make(map[TopicKey]*activeSubscription)
403409

404410
if timedOut {
405411
return fmt.Errorf("timeout waiting for controllers to stop")

0 commit comments

Comments
 (0)