diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go index 0c31a1aafc..7a1c08a91f 100644 --- a/pulsar/consumer_impl.go +++ b/pulsar/consumer_impl.go @@ -57,12 +57,13 @@ type consumer struct { // channel used to deliver message to clients messageCh chan ConsumerMessage - dlq *dlqRouter - rlq *retryRouter - closeOnce sync.Once - closeCh chan struct{} - errorCh chan error - stopDiscovery func() + dlq *dlqRouter + rlq *retryRouter + closeOnce sync.Once + closeMsgChOnce *sync.Once + closeCh chan struct{} + errorCh chan error + stopDiscovery func() log log.Logger metrics *internal.LeveledMetrics @@ -176,6 +177,7 @@ func newConsumer(client *client, options ConsumerOptions) (Consumer, error) { return nil, err } + closeMsgChOnce := new(sync.Once) // normalize as FQDN topics var tns []*internal.TopicName // single topic consumer @@ -193,7 +195,7 @@ func newConsumer(client *client, options ConsumerOptions) (Consumer, error) { if err != nil { return nil, err } - return newInternalConsumer(client, options, topic, messageCh, dlq, rlq, false) + return newInternalConsumer(client, options, topic, messageCh, dlq, rlq, false, closeMsgChOnce) } if len(options.Topics) > 1 { @@ -210,7 +212,7 @@ func newConsumer(client *client, options ConsumerOptions) (Consumer, error) { return nil, err } - return newMultiTopicConsumer(client, options, options.Topics, messageCh, dlq, rlq) + return newMultiTopicConsumer(client, options, options.Topics, messageCh, dlq, rlq, closeMsgChOnce) } if options.TopicsPattern != "" { @@ -229,14 +231,15 @@ func newConsumer(client *client, options ConsumerOptions) (Consumer, error) { return nil, err } - return newRegexConsumer(client, options, tn, pattern, messageCh, dlq, rlq) + return newRegexConsumer(client, options, tn, pattern, messageCh, dlq, rlq, closeMsgChOnce) } return nil, newError(InvalidTopicName, "topic name is required for consumer") } func newInternalConsumer(client *client, options ConsumerOptions, topic string, - messageCh chan ConsumerMessage, dlq *dlqRouter, rlq *retryRouter, disableForceTopicCreation bool) (*consumer, error) { + messageCh chan ConsumerMessage, dlq *dlqRouter, rlq *retryRouter, disableForceTopicCreation bool, + closeMsgChOnce *sync.Once) (*consumer, error) { consumer := &consumer{ topic: topic, @@ -245,6 +248,7 @@ func newInternalConsumer(client *client, options ConsumerOptions, topic string, disableForceTopicCreation: disableForceTopicCreation, messageCh: messageCh, closeCh: make(chan struct{}), + closeMsgChOnce: closeMsgChOnce, errorCh: make(chan error), dlq: dlq, rlq: rlq, @@ -645,6 +649,9 @@ func (c *consumer) Close() { } wg.Wait() close(c.closeCh) + c.closeMsgChOnce.Do(func() { + close(c.messageCh) + }) c.client.handlers.Del(c) c.dlq.close() c.rlq.close() diff --git a/pulsar/consumer_multitopic.go b/pulsar/consumer_multitopic.go index f6630dd65f..e3043df46d 100644 --- a/pulsar/consumer_multitopic.go +++ b/pulsar/consumer_multitopic.go @@ -49,7 +49,7 @@ type multiTopicConsumer struct { } func newMultiTopicConsumer(client *client, options ConsumerOptions, topics []string, - messageCh chan ConsumerMessage, dlq *dlqRouter, rlq *retryRouter) (Consumer, error) { + messageCh chan ConsumerMessage, dlq *dlqRouter, rlq *retryRouter, closeMsgChOnce *sync.Once) (Consumer, error) { mtc := &multiTopicConsumer{ client: client, options: options, @@ -63,7 +63,7 @@ func newMultiTopicConsumer(client *client, options ConsumerOptions, topics []str } var errs error - for ce := range subscriber(client, topics, options, messageCh, dlq, rlq) { + for ce := range subscriber(client, topics, options, messageCh, dlq, rlq, closeMsgChOnce) { if ce.err != nil { errs = pkgerrors.Wrapf(ce.err, "unable to subscribe to topic=%s", ce.topic) } else { diff --git a/pulsar/consumer_regex.go b/pulsar/consumer_regex.go index d36694ef90..2ebc3c126c 100644 --- a/pulsar/consumer_regex.go +++ b/pulsar/consumer_regex.go @@ -43,7 +43,8 @@ type regexConsumer struct { options ConsumerOptions - messageCh chan ConsumerMessage + messageCh chan ConsumerMessage + closeMsgChOnce *sync.Once namespace string pattern *regexp.Regexp @@ -62,13 +63,14 @@ type regexConsumer struct { } func newRegexConsumer(c *client, opts ConsumerOptions, tn *internal.TopicName, pattern *regexp.Regexp, - msgCh chan ConsumerMessage, dlq *dlqRouter, rlq *retryRouter) (Consumer, error) { + msgCh chan ConsumerMessage, dlq *dlqRouter, rlq *retryRouter, closeMsgChOnce *sync.Once) (Consumer, error) { rc := ®exConsumer{ - client: c, - dlq: dlq, - rlq: rlq, - options: opts, - messageCh: msgCh, + client: c, + dlq: dlq, + rlq: rlq, + options: opts, + messageCh: msgCh, + closeMsgChOnce: closeMsgChOnce, namespace: tn.Namespace, pattern: pattern, @@ -87,7 +89,7 @@ func newRegexConsumer(c *client, opts ConsumerOptions, tn *internal.TopicName, p } var errs error - for ce := range subscriber(c, topics, opts, msgCh, dlq, rlq) { + for ce := range subscriber(c, topics, opts, msgCh, dlq, rlq, closeMsgChOnce) { if ce.err != nil { errs = pkgerrors.Wrapf(ce.err, "unable to subscribe to topic=%s", ce.topic) } else { @@ -349,7 +351,7 @@ func (c *regexConsumer) discover() { c.unsubscribe(staleTopics) } if len(newTopics) > 0 { - c.subscribe(newTopics, c.dlq, c.rlq) + c.subscribe(newTopics, c.dlq, c.rlq, c.closeMsgChOnce) } } @@ -366,10 +368,10 @@ func (c *regexConsumer) knownTopics() []string { return topics } -func (c *regexConsumer) subscribe(topics []string, dlq *dlqRouter, rlq *retryRouter) { +func (c *regexConsumer) subscribe(topics []string, dlq *dlqRouter, rlq *retryRouter, closeMsgChOnce *sync.Once) { c.log.WithField("topics", topics).Debug("subscribe") consumers := make(map[string]Consumer, len(topics)) - for ce := range subscriber(c.client, topics, c.options, c.messageCh, dlq, rlq) { + for ce := range subscriber(c.client, topics, c.options, c.messageCh, dlq, rlq, closeMsgChOnce) { if ce.err != nil { c.log.Warnf("Failed to subscribe to topic=%s", ce.topic) } else { @@ -425,7 +427,7 @@ type consumerError struct { } func subscriber(c *client, topics []string, opts ConsumerOptions, ch chan ConsumerMessage, - dlq *dlqRouter, rlq *retryRouter) <-chan consumerError { + dlq *dlqRouter, rlq *retryRouter, closeMsgChOnce *sync.Once) <-chan consumerError { consumerErrorCh := make(chan consumerError, len(topics)) var wg sync.WaitGroup wg.Add(len(topics)) @@ -437,7 +439,7 @@ func subscriber(c *client, topics []string, opts ConsumerOptions, ch chan Consum for _, t := range topics { go func(topic string) { defer wg.Done() - c, err := newInternalConsumer(c, opts, topic, ch, dlq, rlq, true) + c, err := newInternalConsumer(c, opts, topic, ch, dlq, rlq, true, closeMsgChOnce) consumerErrorCh <- consumerError{ err: err, topic: topic, diff --git a/pulsar/consumer_regex_test.go b/pulsar/consumer_regex_test.go index ebd7e4e196..21fc16b37d 100644 --- a/pulsar/consumer_regex_test.go +++ b/pulsar/consumer_regex_test.go @@ -22,6 +22,7 @@ import ( "fmt" "regexp" "strings" + "sync" "testing" "time" @@ -157,7 +158,8 @@ func runRegexConsumerDiscoverPatternAll(t *testing.T, c Client, namespace string dlq, _ := newDlqRouter(c.(*client), nil, tn.Topic, "regex-sub", "regex-consumer", log.DefaultNopLogger()) rlq, _ := newRetryRouter(c.(*client), nil, false, log.DefaultNopLogger()) - consumer, err := newRegexConsumer(c.(*client), opts, tn, pattern, make(chan ConsumerMessage, 1), dlq, rlq) + consumer, err := newRegexConsumer(c.(*client), opts, tn, pattern, make(chan ConsumerMessage, 1), dlq, + rlq, new(sync.Once)) if err != nil { t.Fatal(err) } @@ -196,7 +198,8 @@ func runRegexConsumerDiscoverPatternFoo(t *testing.T, c Client, namespace string dlq, _ := newDlqRouter(c.(*client), nil, tn.Topic, "regex-sub", "regex-consumer", log.DefaultNopLogger()) rlq, _ := newRetryRouter(c.(*client), nil, false, log.DefaultNopLogger()) - consumer, err := newRegexConsumer(c.(*client), opts, tn, pattern, make(chan ConsumerMessage, 1), dlq, rlq) + consumer, err := newRegexConsumer(c.(*client), opts, tn, pattern, make(chan ConsumerMessage, 1), dlq, + rlq, new(sync.Once)) if err != nil { t.Fatal(err) } diff --git a/pulsar/reader_impl.go b/pulsar/reader_impl.go index bf91c67fa5..106f8abbaf 100644 --- a/pulsar/reader_impl.go +++ b/pulsar/reader_impl.go @@ -136,7 +136,8 @@ func newReader(client *client, options ReaderOptions) (Reader, error) { return nil, err } - c, err := newInternalConsumer(client, *consumerOptions, options.Topic, reader.messageCh, dlq, rlq, false) + c, err := newInternalConsumer(client, *consumerOptions, options.Topic, reader.messageCh, dlq, rlq, + false, new(sync.Once)) if err != nil { close(reader.messageCh) return nil, err