Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 17 additions & 10 deletions pulsar/consumer_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,13 @@ type consumer struct {
// channel used to deliver message to clients
messageCh chan ConsumerMessage

dlq *dlqRouter
rlq *retryRouter
closeOnce sync.Once
closeCh chan struct{}
errorCh chan error
stopDiscovery func()
dlq *dlqRouter
rlq *retryRouter
closeOnce sync.Once
closeMsgChOnce *sync.Once
closeCh chan struct{}
errorCh chan error
stopDiscovery func()

log log.Logger
metrics *internal.LeveledMetrics
Expand Down Expand Up @@ -176,6 +177,7 @@ func newConsumer(client *client, options ConsumerOptions) (Consumer, error) {
return nil, err
}

closeMsgChOnce := new(sync.Once)
// normalize as FQDN topics
var tns []*internal.TopicName
// single topic consumer
Expand All @@ -193,7 +195,7 @@ func newConsumer(client *client, options ConsumerOptions) (Consumer, error) {
if err != nil {
return nil, err
}
return newInternalConsumer(client, options, topic, messageCh, dlq, rlq, false)
return newInternalConsumer(client, options, topic, messageCh, dlq, rlq, false, closeMsgChOnce)
}

if len(options.Topics) > 1 {
Expand All @@ -210,7 +212,7 @@ func newConsumer(client *client, options ConsumerOptions) (Consumer, error) {
return nil, err
}

return newMultiTopicConsumer(client, options, options.Topics, messageCh, dlq, rlq)
return newMultiTopicConsumer(client, options, options.Topics, messageCh, dlq, rlq, closeMsgChOnce)
}

if options.TopicsPattern != "" {
Expand All @@ -229,14 +231,15 @@ func newConsumer(client *client, options ConsumerOptions) (Consumer, error) {
return nil, err
}

return newRegexConsumer(client, options, tn, pattern, messageCh, dlq, rlq)
return newRegexConsumer(client, options, tn, pattern, messageCh, dlq, rlq, closeMsgChOnce)
}

return nil, newError(InvalidTopicName, "topic name is required for consumer")
}

func newInternalConsumer(client *client, options ConsumerOptions, topic string,
messageCh chan ConsumerMessage, dlq *dlqRouter, rlq *retryRouter, disableForceTopicCreation bool) (*consumer, error) {
messageCh chan ConsumerMessage, dlq *dlqRouter, rlq *retryRouter, disableForceTopicCreation bool,
closeMsgChOnce *sync.Once) (*consumer, error) {

consumer := &consumer{
topic: topic,
Expand All @@ -245,6 +248,7 @@ func newInternalConsumer(client *client, options ConsumerOptions, topic string,
disableForceTopicCreation: disableForceTopicCreation,
messageCh: messageCh,
closeCh: make(chan struct{}),
closeMsgChOnce: closeMsgChOnce,
errorCh: make(chan error),
dlq: dlq,
rlq: rlq,
Expand Down Expand Up @@ -645,6 +649,9 @@ func (c *consumer) Close() {
}
wg.Wait()
close(c.closeCh)
c.closeMsgChOnce.Do(func() {
close(c.messageCh)
})
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove these redundant code comments.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PTAL

c.client.handlers.Del(c)
c.dlq.close()
c.rlq.close()
Expand Down
4 changes: 2 additions & 2 deletions pulsar/consumer_multitopic.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ type multiTopicConsumer struct {
}

func newMultiTopicConsumer(client *client, options ConsumerOptions, topics []string,
messageCh chan ConsumerMessage, dlq *dlqRouter, rlq *retryRouter) (Consumer, error) {
messageCh chan ConsumerMessage, dlq *dlqRouter, rlq *retryRouter, closeMsgChOnce *sync.Once) (Consumer, error) {
mtc := &multiTopicConsumer{
client: client,
options: options,
Expand All @@ -63,7 +63,7 @@ func newMultiTopicConsumer(client *client, options ConsumerOptions, topics []str
}

var errs error
for ce := range subscriber(client, topics, options, messageCh, dlq, rlq) {
for ce := range subscriber(client, topics, options, messageCh, dlq, rlq, closeMsgChOnce) {
if ce.err != nil {
errs = pkgerrors.Wrapf(ce.err, "unable to subscribe to topic=%s", ce.topic)
} else {
Expand Down
28 changes: 15 additions & 13 deletions pulsar/consumer_regex.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ type regexConsumer struct {

options ConsumerOptions

messageCh chan ConsumerMessage
messageCh chan ConsumerMessage
closeMsgChOnce *sync.Once

namespace string
pattern *regexp.Regexp
Expand All @@ -62,13 +63,14 @@ type regexConsumer struct {
}

func newRegexConsumer(c *client, opts ConsumerOptions, tn *internal.TopicName, pattern *regexp.Regexp,
msgCh chan ConsumerMessage, dlq *dlqRouter, rlq *retryRouter) (Consumer, error) {
msgCh chan ConsumerMessage, dlq *dlqRouter, rlq *retryRouter, closeMsgChOnce *sync.Once) (Consumer, error) {
rc := &regexConsumer{
client: c,
dlq: dlq,
rlq: rlq,
options: opts,
messageCh: msgCh,
client: c,
dlq: dlq,
rlq: rlq,
options: opts,
messageCh: msgCh,
closeMsgChOnce: closeMsgChOnce,

namespace: tn.Namespace,
pattern: pattern,
Expand All @@ -87,7 +89,7 @@ func newRegexConsumer(c *client, opts ConsumerOptions, tn *internal.TopicName, p
}

var errs error
for ce := range subscriber(c, topics, opts, msgCh, dlq, rlq) {
for ce := range subscriber(c, topics, opts, msgCh, dlq, rlq, closeMsgChOnce) {
if ce.err != nil {
errs = pkgerrors.Wrapf(ce.err, "unable to subscribe to topic=%s", ce.topic)
} else {
Expand Down Expand Up @@ -349,7 +351,7 @@ func (c *regexConsumer) discover() {
c.unsubscribe(staleTopics)
}
if len(newTopics) > 0 {
c.subscribe(newTopics, c.dlq, c.rlq)
c.subscribe(newTopics, c.dlq, c.rlq, c.closeMsgChOnce)
}
}

Expand All @@ -366,10 +368,10 @@ func (c *regexConsumer) knownTopics() []string {
return topics
}

func (c *regexConsumer) subscribe(topics []string, dlq *dlqRouter, rlq *retryRouter) {
func (c *regexConsumer) subscribe(topics []string, dlq *dlqRouter, rlq *retryRouter, closeMsgChOnce *sync.Once) {
c.log.WithField("topics", topics).Debug("subscribe")
consumers := make(map[string]Consumer, len(topics))
for ce := range subscriber(c.client, topics, c.options, c.messageCh, dlq, rlq) {
for ce := range subscriber(c.client, topics, c.options, c.messageCh, dlq, rlq, closeMsgChOnce) {
if ce.err != nil {
c.log.Warnf("Failed to subscribe to topic=%s", ce.topic)
} else {
Expand Down Expand Up @@ -425,7 +427,7 @@ type consumerError struct {
}

func subscriber(c *client, topics []string, opts ConsumerOptions, ch chan ConsumerMessage,
dlq *dlqRouter, rlq *retryRouter) <-chan consumerError {
dlq *dlqRouter, rlq *retryRouter, closeMsgChOnce *sync.Once) <-chan consumerError {
consumerErrorCh := make(chan consumerError, len(topics))
var wg sync.WaitGroup
wg.Add(len(topics))
Expand All @@ -437,7 +439,7 @@ func subscriber(c *client, topics []string, opts ConsumerOptions, ch chan Consum
for _, t := range topics {
go func(topic string) {
defer wg.Done()
c, err := newInternalConsumer(c, opts, topic, ch, dlq, rlq, true)
c, err := newInternalConsumer(c, opts, topic, ch, dlq, rlq, true, closeMsgChOnce)
consumerErrorCh <- consumerError{
err: err,
topic: topic,
Expand Down
7 changes: 5 additions & 2 deletions pulsar/consumer_regex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"
"regexp"
"strings"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -157,7 +158,8 @@ func runRegexConsumerDiscoverPatternAll(t *testing.T, c Client, namespace string

dlq, _ := newDlqRouter(c.(*client), nil, tn.Topic, "regex-sub", "regex-consumer", log.DefaultNopLogger())
rlq, _ := newRetryRouter(c.(*client), nil, false, log.DefaultNopLogger())
consumer, err := newRegexConsumer(c.(*client), opts, tn, pattern, make(chan ConsumerMessage, 1), dlq, rlq)
consumer, err := newRegexConsumer(c.(*client), opts, tn, pattern, make(chan ConsumerMessage, 1), dlq,
rlq, new(sync.Once))
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -196,7 +198,8 @@ func runRegexConsumerDiscoverPatternFoo(t *testing.T, c Client, namespace string

dlq, _ := newDlqRouter(c.(*client), nil, tn.Topic, "regex-sub", "regex-consumer", log.DefaultNopLogger())
rlq, _ := newRetryRouter(c.(*client), nil, false, log.DefaultNopLogger())
consumer, err := newRegexConsumer(c.(*client), opts, tn, pattern, make(chan ConsumerMessage, 1), dlq, rlq)
consumer, err := newRegexConsumer(c.(*client), opts, tn, pattern, make(chan ConsumerMessage, 1), dlq,
rlq, new(sync.Once))
if err != nil {
t.Fatal(err)
}
Expand Down
3 changes: 2 additions & 1 deletion pulsar/reader_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,8 @@ func newReader(client *client, options ReaderOptions) (Reader, error) {
return nil, err
}

c, err := newInternalConsumer(client, *consumerOptions, options.Topic, reader.messageCh, dlq, rlq, false)
c, err := newInternalConsumer(client, *consumerOptions, options.Topic, reader.messageCh, dlq, rlq,
false, new(sync.Once))
if err != nil {
close(reader.messageCh)
return nil, err
Expand Down