Skip to content
3 changes: 3 additions & 0 deletions pulsar/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,9 @@ type Consumer interface {
// The list of MessageID instances of all the topics that the consumer subscribed
GetLastMessageIDs() ([]TopicMessageID, error)

// Closed returns a channel indicating that consumer is closed
Closed() <-chan struct{}

Comment thread
omnilight marked this conversation as resolved.
// Receive a single message.
// This calls blocks until a message is available.
Receive(context.Context) (Message, error)
Expand Down
12 changes: 12 additions & 0 deletions pulsar/consumer_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ type consumer struct {
closeOnce sync.Once
closeCh chan struct{}
errorCh chan error
close func() // close will be assigned only after full initialization cycle will be ready
stopDiscovery func()

log log.Logger
Expand Down Expand Up @@ -288,6 +289,7 @@ func newInternalConsumer(client *client, options ConsumerOptions, topic string,
duration = defaultAutoDiscoveryDuration
}
consumer.stopDiscovery = consumer.runBackgroundPartitionDiscovery(duration)
consumer.close = consumer.closeInternal

consumer.metrics.ConsumersOpened.Inc()
return consumer, nil
Expand Down Expand Up @@ -496,6 +498,10 @@ func (c *consumer) unsubscribe(force bool) error {
return nil
}

func (c *consumer) Closed() <-chan struct{} {
return c.closeCh
}

func (c *consumer) GetLastMessageIDs() ([]TopicMessageID, error) {
ids := make([]TopicMessageID, 0)
for _, pc := range c.consumers {
Expand Down Expand Up @@ -674,6 +680,12 @@ func (c *consumer) NackID(msgID MessageID) {
}

func (c *consumer) Close() {
if c.close != nil {
c.close()
}
}

func (c *consumer) closeInternal() {
c.closeOnce.Do(func() {
c.stopDiscovery()

Expand Down
4 changes: 4 additions & 0 deletions pulsar/consumer_multitopic.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ func (c *multiTopicConsumer) Unsubscribe() error {
return errs
}

func (c *multiTopicConsumer) Closed() <-chan struct{} {
return c.closeCh
}

func (c *multiTopicConsumer) UnsubscribeForce() error {
var errs error
for t, consumer := range c.consumers {
Expand Down
15 changes: 10 additions & 5 deletions pulsar/consumer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -1595,7 +1595,9 @@ func (pc *partitionConsumer) runEventsLoop() {
return
case connectionClosed := <-pc.connectClosedCh:
pc.log.Debug("runEventsLoop will reconnect")
pc.reconnectToBroker(connectionClosed)
if !pc.reconnectToBroker(connectionClosed) {
pc.parentConsumer.Close()
}
}
}
}()
Expand Down Expand Up @@ -1679,7 +1681,7 @@ func (pc *partitionConsumer) internalClose(req *closeRequest) {
close(pc.closeCh)
}

func (pc *partitionConsumer) reconnectToBroker(connectionClosed *connectionClosed) {
func (pc *partitionConsumer) reconnectToBroker(connectionClosed *connectionClosed) bool {
var maxRetry int

if pc.options.maxReconnectToBroker == nil {
Expand All @@ -1697,7 +1699,7 @@ func (pc *partitionConsumer) reconnectToBroker(connectionClosed *connectionClose
if pc.getConsumerState() != consumerReady {
// Consumer is already closing
pc.log.Info("consumer state not ready, exit reconnect")
return
return false
}

var assignedBrokerURL string
Expand All @@ -1722,14 +1724,14 @@ func (pc *partitionConsumer) reconnectToBroker(connectionClosed *connectionClose
if pc.getConsumerState() != consumerReady {
// Consumer is already closing
pc.log.Info("consumer state not ready, exit reconnect")
return
return false
}

err := pc.grabConn(assignedBrokerURL)
if err == nil {
// Successfully reconnected
pc.log.Info("Reconnected consumer to broker")
return
return true
}
pc.log.WithError(err).Error("Failed to create consumer at reconnect")
errMsg := err.Error()
Expand All @@ -1747,6 +1749,9 @@ func (pc *partitionConsumer) reconnectToBroker(connectionClosed *connectionClose
pc.metrics.ConsumersReconnectMaxRetry.Inc()
}
}

pc.log.Warn("Reached maximum number of reconnection attempts")
return false
}

func (pc *partitionConsumer) lookupTopic(brokerServiceURL string) (*internal.LookupResult, error) {
Expand Down
4 changes: 4 additions & 0 deletions pulsar/consumer_regex.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,10 @@ func (c *regexConsumer) Unsubscribe() error {
return errs
}

func (c *regexConsumer) Closed() <-chan struct{} {
return c.closeCh
}

func (c *regexConsumer) UnsubscribeForce() error {
var errs error
c.consumersLock.Lock()
Expand Down
Loading