From 0a4adac3fe94400b815459defc7a423d36574942 Mon Sep 17 00:00:00 2001 From: crossoverJie Date: Thu, 14 Jul 2022 18:17:06 +0800 Subject: [PATCH 1/6] fix goroutine leak for closing consumers. --- pulsar/consumer_impl.go | 1 + pulsar/consumer_multitopic.go | 1 + pulsar/consumer_regex.go | 1 + 3 files changed, 3 insertions(+) diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go index 2328ca882b..40455d07f0 100644 --- a/pulsar/consumer_impl.go +++ b/pulsar/consumer_impl.go @@ -563,6 +563,7 @@ func (c *consumer) Close() { } wg.Wait() close(c.closeCh) + close(c.messageCh) c.client.handlers.Del(c) c.dlq.close() c.rlq.close() diff --git a/pulsar/consumer_multitopic.go b/pulsar/consumer_multitopic.go index 1d75a2477b..96aae44496 100644 --- a/pulsar/consumer_multitopic.go +++ b/pulsar/consumer_multitopic.go @@ -211,6 +211,7 @@ func (c *multiTopicConsumer) Close() { } wg.Wait() close(c.closeCh) + close(c.messageCh) c.client.handlers.Del(c) c.dlq.close() c.rlq.close() diff --git a/pulsar/consumer_regex.go b/pulsar/consumer_regex.go index e4d2077ac7..e0c748868a 100644 --- a/pulsar/consumer_regex.go +++ b/pulsar/consumer_regex.go @@ -222,6 +222,7 @@ func (c *regexConsumer) Close() { c.closeOnce.Do(func() { c.ticker.Stop() close(c.closeCh) + close(c.messageCh) var wg sync.WaitGroup c.consumersLock.Lock() From 7b423e9bcd025f1cf11e8a22ae64fb36973db793 Mon Sep 17 00:00:00 2001 From: crossoverJie Date: Sun, 17 Jul 2022 12:59:07 +0800 Subject: [PATCH 2/6] fix chan repeatedly closed. --- pulsar/consumer_impl.go | 9 ++++++++- pulsar/consumer_multitopic.go | 1 - pulsar/consumer_regex.go | 1 - 3 files changed, 8 insertions(+), 3 deletions(-) diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go index 40455d07f0..0cac538b7f 100644 --- a/pulsar/consumer_impl.go +++ b/pulsar/consumer_impl.go @@ -35,6 +35,8 @@ import ( const defaultNackRedeliveryDelay = 1 * time.Minute +var closeChanSet = make(map[chan ConsumerMessage]bool) + type acker interface { AckID(id trackingMessageID) error NackID(id trackingMessageID) @@ -100,6 +102,7 @@ func newConsumer(client *client, options ConsumerOptions) (Consumer, error) { if options.MessageChannel == nil { messageCh = make(chan ConsumerMessage, 10) } + closeChanSet[messageCh] = false if options.RetryEnable { usingTopic := "" @@ -563,7 +566,11 @@ func (c *consumer) Close() { } wg.Wait() close(c.closeCh) - close(c.messageCh) + closed := closeChanSet[c.messageCh] + if !closed { + close(c.messageCh) + closeChanSet[c.messageCh] = true + } c.client.handlers.Del(c) c.dlq.close() c.rlq.close() diff --git a/pulsar/consumer_multitopic.go b/pulsar/consumer_multitopic.go index 96aae44496..1d75a2477b 100644 --- a/pulsar/consumer_multitopic.go +++ b/pulsar/consumer_multitopic.go @@ -211,7 +211,6 @@ func (c *multiTopicConsumer) Close() { } wg.Wait() close(c.closeCh) - close(c.messageCh) c.client.handlers.Del(c) c.dlq.close() c.rlq.close() diff --git a/pulsar/consumer_regex.go b/pulsar/consumer_regex.go index e0c748868a..e4d2077ac7 100644 --- a/pulsar/consumer_regex.go +++ b/pulsar/consumer_regex.go @@ -222,7 +222,6 @@ func (c *regexConsumer) Close() { c.closeOnce.Do(func() { c.ticker.Stop() close(c.closeCh) - close(c.messageCh) var wg sync.WaitGroup c.consumersLock.Lock() From 30959fbf7980097255491f29bf66d1f90a41d4ef Mon Sep 17 00:00:00 2001 From: crossoverJie Date: Mon, 22 Aug 2022 14:54:19 +0800 Subject: [PATCH 3/6] feat: Add closeMsgChOnce for consumer to help that messageCh is closed only once. --- pulsar/consumer_impl.go | 39 ++++++++++++++++++++--------------- pulsar/consumer_multitopic.go | 4 ++-- pulsar/consumer_regex.go | 28 +++++++++++++------------ pulsar/consumer_regex_test.go | 5 +++-- 4 files changed, 42 insertions(+), 34 deletions(-) diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go index 0cac538b7f..6280d9d236 100644 --- a/pulsar/consumer_impl.go +++ b/pulsar/consumer_impl.go @@ -35,8 +35,6 @@ import ( const defaultNackRedeliveryDelay = 1 * time.Minute -var closeChanSet = make(map[chan ConsumerMessage]bool) - type acker interface { AckID(id trackingMessageID) error NackID(id trackingMessageID) @@ -55,12 +53,13 @@ type consumer struct { // channel used to deliver message to clients messageCh chan ConsumerMessage - dlq *dlqRouter - rlq *retryRouter - closeOnce sync.Once - closeCh chan struct{} - errorCh chan error - stopDiscovery func() + dlq *dlqRouter + rlq *retryRouter + closeOnce sync.Once + closeMsgChOnce *sync.Once + closeCh chan struct{} + errorCh chan error + stopDiscovery func() log log.Logger metrics *internal.LeveledMetrics @@ -102,7 +101,6 @@ func newConsumer(client *client, options ConsumerOptions) (Consumer, error) { if options.MessageChannel == nil { messageCh = make(chan ConsumerMessage, 10) } - closeChanSet[messageCh] = false if options.RetryEnable { usingTopic := "" @@ -149,6 +147,7 @@ func newConsumer(client *client, options ConsumerOptions) (Consumer, error) { return nil, err } + closeMsgChOnce := new(sync.Once) // normalize as FQDN topics var tns []*internal.TopicName // single topic consumer @@ -166,7 +165,7 @@ func newConsumer(client *client, options ConsumerOptions) (Consumer, error) { if err != nil { return nil, err } - return newInternalConsumer(client, options, topic, messageCh, dlq, rlq, false) + return newInternalConsumer(client, options, topic, messageCh, dlq, rlq, false, closeMsgChOnce) } if len(options.Topics) > 1 { @@ -183,7 +182,7 @@ func newConsumer(client *client, options ConsumerOptions) (Consumer, error) { return nil, err } - return newMultiTopicConsumer(client, options, options.Topics, messageCh, dlq, rlq) + return newMultiTopicConsumer(client, options, options.Topics, messageCh, dlq, rlq, closeMsgChOnce) } if options.TopicsPattern != "" { @@ -202,14 +201,14 @@ func newConsumer(client *client, options ConsumerOptions) (Consumer, error) { return nil, err } - return newRegexConsumer(client, options, tn, pattern, messageCh, dlq, rlq) + return newRegexConsumer(client, options, tn, pattern, messageCh, dlq, rlq, closeMsgChOnce) } return nil, newError(InvalidTopicName, "topic name is required for consumer") } func newInternalConsumer(client *client, options ConsumerOptions, topic string, - messageCh chan ConsumerMessage, dlq *dlqRouter, rlq *retryRouter, disableForceTopicCreation bool) (*consumer, error) { + messageCh chan ConsumerMessage, dlq *dlqRouter, rlq *retryRouter, disableForceTopicCreation bool, closeMsgChOnce *sync.Once) (*consumer, error) { consumer := &consumer{ topic: topic, @@ -218,6 +217,7 @@ func newInternalConsumer(client *client, options ConsumerOptions, topic string, disableForceTopicCreation: disableForceTopicCreation, messageCh: messageCh, closeCh: make(chan struct{}), + closeMsgChOnce: closeMsgChOnce, errorCh: make(chan error), dlq: dlq, rlq: rlq, @@ -566,11 +566,16 @@ func (c *consumer) Close() { } wg.Wait() close(c.closeCh) - closed := closeChanSet[c.messageCh] - if !closed { + c.closeMsgChOnce.Do(func() { close(c.messageCh) - closeChanSet[c.messageCh] = true - } + }) + //closed := closeChanSet[c.messageCh] + //fmt.Println("begin close") + //if !closed { + // close(c.messageCh) + // fmt.Println("already close") + // closeChanSet[c.messageCh] = true + //} c.client.handlers.Del(c) c.dlq.close() c.rlq.close() diff --git a/pulsar/consumer_multitopic.go b/pulsar/consumer_multitopic.go index 1d75a2477b..8feaf5d815 100644 --- a/pulsar/consumer_multitopic.go +++ b/pulsar/consumer_multitopic.go @@ -49,7 +49,7 @@ type multiTopicConsumer struct { } func newMultiTopicConsumer(client *client, options ConsumerOptions, topics []string, - messageCh chan ConsumerMessage, dlq *dlqRouter, rlq *retryRouter) (Consumer, error) { + messageCh chan ConsumerMessage, dlq *dlqRouter, rlq *retryRouter, closeMsgChOnce *sync.Once) (Consumer, error) { mtc := &multiTopicConsumer{ client: client, options: options, @@ -63,7 +63,7 @@ func newMultiTopicConsumer(client *client, options ConsumerOptions, topics []str } var errs error - for ce := range subscriber(client, topics, options, messageCh, dlq, rlq) { + for ce := range subscriber(client, topics, options, messageCh, dlq, rlq, closeMsgChOnce) { if ce.err != nil { errs = pkgerrors.Wrapf(ce.err, "unable to subscribe to topic=%s", ce.topic) } else { diff --git a/pulsar/consumer_regex.go b/pulsar/consumer_regex.go index e4d2077ac7..81247511fa 100644 --- a/pulsar/consumer_regex.go +++ b/pulsar/consumer_regex.go @@ -43,7 +43,8 @@ type regexConsumer struct { options ConsumerOptions - messageCh chan ConsumerMessage + messageCh chan ConsumerMessage + closeMsgChOnce *sync.Once namespace string pattern *regexp.Regexp @@ -64,13 +65,14 @@ type regexConsumer struct { } func newRegexConsumer(c *client, opts ConsumerOptions, tn *internal.TopicName, pattern *regexp.Regexp, - msgCh chan ConsumerMessage, dlq *dlqRouter, rlq *retryRouter) (Consumer, error) { + msgCh chan ConsumerMessage, dlq *dlqRouter, rlq *retryRouter, closeMsgChOnce *sync.Once) (Consumer, error) { rc := ®exConsumer{ - client: c, - dlq: dlq, - rlq: rlq, - options: opts, - messageCh: msgCh, + client: c, + dlq: dlq, + rlq: rlq, + options: opts, + messageCh: msgCh, + closeMsgChOnce: closeMsgChOnce, namespace: tn.Namespace, pattern: pattern, @@ -91,7 +93,7 @@ func newRegexConsumer(c *client, opts ConsumerOptions, tn *internal.TopicName, p } var errs error - for ce := range subscriber(c, topics, opts, msgCh, dlq, rlq) { + for ce := range subscriber(c, topics, opts, msgCh, dlq, rlq, closeMsgChOnce) { if ce.err != nil { errs = pkgerrors.Wrapf(ce.err, "unable to subscribe to topic=%s", ce.topic) } else { @@ -274,7 +276,7 @@ func (c *regexConsumer) monitor() { } case topics := <-c.subscribeCh: if len(topics) > 0 && !c.closed() { - c.subscribe(topics, c.dlq, c.rlq) + c.subscribe(topics, c.dlq, c.rlq, c.closeMsgChOnce) } case topics := <-c.unsubscribeCh: if len(topics) > 0 && !c.closed() { @@ -318,10 +320,10 @@ func (c *regexConsumer) knownTopics() []string { return topics } -func (c *regexConsumer) subscribe(topics []string, dlq *dlqRouter, rlq *retryRouter) { +func (c *regexConsumer) subscribe(topics []string, dlq *dlqRouter, rlq *retryRouter, closeMsgChOnce *sync.Once) { c.log.WithField("topics", topics).Debug("subscribe") consumers := make(map[string]Consumer, len(topics)) - for ce := range subscriber(c.client, topics, c.options, c.messageCh, dlq, rlq) { + for ce := range subscriber(c.client, topics, c.options, c.messageCh, dlq, rlq, closeMsgChOnce) { if ce.err != nil { c.log.Warnf("Failed to subscribe to topic=%s", ce.topic) } else { @@ -377,7 +379,7 @@ type consumerError struct { } func subscriber(c *client, topics []string, opts ConsumerOptions, ch chan ConsumerMessage, - dlq *dlqRouter, rlq *retryRouter) <-chan consumerError { + dlq *dlqRouter, rlq *retryRouter, closeMsgChOnce *sync.Once) <-chan consumerError { consumerErrorCh := make(chan consumerError, len(topics)) var wg sync.WaitGroup wg.Add(len(topics)) @@ -389,7 +391,7 @@ func subscriber(c *client, topics []string, opts ConsumerOptions, ch chan Consum for _, t := range topics { go func(topic string) { defer wg.Done() - c, err := newInternalConsumer(c, opts, topic, ch, dlq, rlq, true) + c, err := newInternalConsumer(c, opts, topic, ch, dlq, rlq, true, closeMsgChOnce) consumerErrorCh <- consumerError{ err: err, topic: topic, diff --git a/pulsar/consumer_regex_test.go b/pulsar/consumer_regex_test.go index 9cb600fe3a..2e89df86ce 100644 --- a/pulsar/consumer_regex_test.go +++ b/pulsar/consumer_regex_test.go @@ -22,6 +22,7 @@ import ( "fmt" "regexp" "strings" + "sync" "testing" "time" @@ -156,7 +157,7 @@ func runRegexConsumerDiscoverPatternAll(t *testing.T, c Client, namespace string dlq, _ := newDlqRouter(c.(*client), nil, log.DefaultNopLogger()) rlq, _ := newRetryRouter(c.(*client), nil, false, log.DefaultNopLogger()) - consumer, err := newRegexConsumer(c.(*client), opts, tn, pattern, make(chan ConsumerMessage, 1), dlq, rlq) + consumer, err := newRegexConsumer(c.(*client), opts, tn, pattern, make(chan ConsumerMessage, 1), dlq, rlq, new(sync.Once)) if err != nil { t.Fatal(err) } @@ -194,7 +195,7 @@ func runRegexConsumerDiscoverPatternFoo(t *testing.T, c Client, namespace string dlq, _ := newDlqRouter(c.(*client), nil, log.DefaultNopLogger()) rlq, _ := newRetryRouter(c.(*client), nil, false, log.DefaultNopLogger()) - consumer, err := newRegexConsumer(c.(*client), opts, tn, pattern, make(chan ConsumerMessage, 1), dlq, rlq) + consumer, err := newRegexConsumer(c.(*client), opts, tn, pattern, make(chan ConsumerMessage, 1), dlq, rlq, new(sync.Once)) if err != nil { t.Fatal(err) } From fedb363c1f35275f70bba3ea551c4be1cc96652b Mon Sep 17 00:00:00 2001 From: crossoverJie Date: Mon, 22 Aug 2022 20:46:31 +0800 Subject: [PATCH 4/6] chore: Remove redundant code --- pulsar/consumer_impl.go | 7 ------- 1 file changed, 7 deletions(-) diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go index 6280d9d236..3573bec069 100644 --- a/pulsar/consumer_impl.go +++ b/pulsar/consumer_impl.go @@ -569,13 +569,6 @@ func (c *consumer) Close() { c.closeMsgChOnce.Do(func() { close(c.messageCh) }) - //closed := closeChanSet[c.messageCh] - //fmt.Println("begin close") - //if !closed { - // close(c.messageCh) - // fmt.Println("already close") - // closeChanSet[c.messageCh] = true - //} c.client.handlers.Del(c) c.dlq.close() c.rlq.close() From 2316f494052196bf581ecc824552331e768855f3 Mon Sep 17 00:00:00 2001 From: crossoverJie Date: Sat, 2 Mar 2024 01:03:28 +0800 Subject: [PATCH 5/6] merge from master --- pulsar/consumer_regex.go | 2 +- pulsar/reader_impl.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsar/consumer_regex.go b/pulsar/consumer_regex.go index 2232433072..2ebc3c126c 100644 --- a/pulsar/consumer_regex.go +++ b/pulsar/consumer_regex.go @@ -351,7 +351,7 @@ func (c *regexConsumer) discover() { c.unsubscribe(staleTopics) } if len(newTopics) > 0 { - c.subscribe(newTopics, c.dlq, c.rlq) + c.subscribe(newTopics, c.dlq, c.rlq, c.closeMsgChOnce) } } diff --git a/pulsar/reader_impl.go b/pulsar/reader_impl.go index bf91c67fa5..be3c696b59 100644 --- a/pulsar/reader_impl.go +++ b/pulsar/reader_impl.go @@ -136,7 +136,7 @@ func newReader(client *client, options ReaderOptions) (Reader, error) { return nil, err } - c, err := newInternalConsumer(client, *consumerOptions, options.Topic, reader.messageCh, dlq, rlq, false) + c, err := newInternalConsumer(client, *consumerOptions, options.Topic, reader.messageCh, dlq, rlq, false, new(sync.Once)) if err != nil { close(reader.messageCh) return nil, err From 23efd70ec29fbedcce382b09378e0d7c5639dfef Mon Sep 17 00:00:00 2001 From: crossoverJie Date: Sat, 2 Mar 2024 01:10:36 +0800 Subject: [PATCH 6/6] ci-lint --- pulsar/consumer_impl.go | 3 ++- pulsar/consumer_regex_test.go | 6 ++++-- pulsar/reader_impl.go | 3 ++- 3 files changed, 8 insertions(+), 4 deletions(-) diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go index 54e94f5a66..7a1c08a91f 100644 --- a/pulsar/consumer_impl.go +++ b/pulsar/consumer_impl.go @@ -238,7 +238,8 @@ func newConsumer(client *client, options ConsumerOptions) (Consumer, error) { } func newInternalConsumer(client *client, options ConsumerOptions, topic string, - messageCh chan ConsumerMessage, dlq *dlqRouter, rlq *retryRouter, disableForceTopicCreation bool, closeMsgChOnce *sync.Once) (*consumer, error) { + messageCh chan ConsumerMessage, dlq *dlqRouter, rlq *retryRouter, disableForceTopicCreation bool, + closeMsgChOnce *sync.Once) (*consumer, error) { consumer := &consumer{ topic: topic, diff --git a/pulsar/consumer_regex_test.go b/pulsar/consumer_regex_test.go index ac375bb223..21fc16b37d 100644 --- a/pulsar/consumer_regex_test.go +++ b/pulsar/consumer_regex_test.go @@ -158,7 +158,8 @@ func runRegexConsumerDiscoverPatternAll(t *testing.T, c Client, namespace string dlq, _ := newDlqRouter(c.(*client), nil, tn.Topic, "regex-sub", "regex-consumer", log.DefaultNopLogger()) rlq, _ := newRetryRouter(c.(*client), nil, false, log.DefaultNopLogger()) - consumer, err := newRegexConsumer(c.(*client), opts, tn, pattern, make(chan ConsumerMessage, 1), dlq, rlq, new(sync.Once)) + consumer, err := newRegexConsumer(c.(*client), opts, tn, pattern, make(chan ConsumerMessage, 1), dlq, + rlq, new(sync.Once)) if err != nil { t.Fatal(err) } @@ -197,7 +198,8 @@ func runRegexConsumerDiscoverPatternFoo(t *testing.T, c Client, namespace string dlq, _ := newDlqRouter(c.(*client), nil, tn.Topic, "regex-sub", "regex-consumer", log.DefaultNopLogger()) rlq, _ := newRetryRouter(c.(*client), nil, false, log.DefaultNopLogger()) - consumer, err := newRegexConsumer(c.(*client), opts, tn, pattern, make(chan ConsumerMessage, 1), dlq, rlq, new(sync.Once)) + consumer, err := newRegexConsumer(c.(*client), opts, tn, pattern, make(chan ConsumerMessage, 1), dlq, + rlq, new(sync.Once)) if err != nil { t.Fatal(err) } diff --git a/pulsar/reader_impl.go b/pulsar/reader_impl.go index be3c696b59..106f8abbaf 100644 --- a/pulsar/reader_impl.go +++ b/pulsar/reader_impl.go @@ -136,7 +136,8 @@ func newReader(client *client, options ReaderOptions) (Reader, error) { return nil, err } - c, err := newInternalConsumer(client, *consumerOptions, options.Topic, reader.messageCh, dlq, rlq, false, new(sync.Once)) + c, err := newInternalConsumer(client, *consumerOptions, options.Topic, reader.messageCh, dlq, rlq, + false, new(sync.Once)) if err != nil { close(reader.messageCh) return nil, err