From 16835e7998b07d329beab9e3fa86590cd270c4d7 Mon Sep 17 00:00:00 2001 From: PavelZeger Date: Sat, 2 May 2026 18:52:15 +0300 Subject: [PATCH 1/8] [fix][consumer] Add reconnect failure listener and auto-close on max retry exhaustion --- pulsar/consumer.go | 12 ++++ pulsar/consumer_impl.go | 66 +++++++++++---------- pulsar/consumer_partition.go | 66 +++++++++++---------- pulsar/consumer_test.go | 109 +++++++++++++++++++++++++++++++++++ 4 files changed, 192 insertions(+), 61 deletions(-) diff --git a/pulsar/consumer.go b/pulsar/consumer.go index 7996335a82..2fb70ee2f3 100644 --- a/pulsar/consumer.go +++ b/pulsar/consumer.go @@ -222,6 +222,18 @@ type ConsumerOptions struct { // MaxReconnectToBroker sets the maximum retry number of reconnectToBroker. (default: ultimate) MaxReconnectToBroker *uint + // MaxReconnectToBrokerListener is called when the consumer exhausts all reconnect attempts + // set by MaxReconnectToBroker. The consumer argument is the parent consumer, and err is the + // last connection error. Use this callback to detect silent failure and take recovery action + // (e.g. recreate the consumer). Only fires when MaxReconnectToBroker is set to a finite value + // or when the backoff policy signals IsMaxBackoffReached. + MaxReconnectToBrokerListener func(consumer Consumer, err error) + + // CloseConsumerOnMaxReconnectToBroker, when true, automatically closes the consumer after + // exhausting all reconnect attempts. The close happens asynchronously after + // MaxReconnectToBrokerListener (if set) returns. Default: false. + CloseConsumerOnMaxReconnectToBroker bool + // BackOffPolicyFunc parameterize the following options in the reconnection logic to // allow users to customize the reconnection logic (minBackoff, maxBackoff and jitterPercentage) BackOffPolicyFunc func() backoff.Policy diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go index 08c825e3e9..11c5dcfbeb 100644 --- a/pulsar/consumer_impl.go +++ b/pulsar/consumer_impl.go @@ -449,38 +449,40 @@ func newPartitionConsumerOpts(topic, consumerName string, idx int, options Consu nackRedeliveryDelay = options.NackRedeliveryDelay } return &partitionConsumerOpts{ - topic: topic, - consumerName: consumerName, - subscription: options.SubscriptionName, - subscriptionType: options.Type, - subscriptionInitPos: options.SubscriptionInitialPosition, - partitionIdx: idx, - receiverQueueSize: options.ReceiverQueueSize, - nackRedeliveryDelay: nackRedeliveryDelay, - nackBackoffPolicy: options.NackBackoffPolicy, - nackPrecisionBit: options.NackPrecisionBit, - metadata: options.Properties, - subProperties: options.SubscriptionProperties, - replicateSubscriptionState: options.ReplicateSubscriptionState, - startMessageID: options.startMessageID, - startMessageIDInclusive: options.StartMessageIDInclusive, - subscriptionMode: options.SubscriptionMode, - readCompacted: options.ReadCompacted, - interceptors: options.Interceptors, - maxReconnectToBroker: options.MaxReconnectToBroker, - backOffPolicyFunc: options.BackOffPolicyFunc, - keySharedPolicy: options.KeySharedPolicy, - schema: options.Schema, - decryption: options.Decryption, - ackWithResponse: options.AckWithResponse, - maxPendingChunkedMessage: options.MaxPendingChunkedMessage, - expireTimeOfIncompleteChunk: options.ExpireTimeOfIncompleteChunk, - autoAckIncompleteChunk: options.AutoAckIncompleteChunk, - consumerEventListener: options.EventListener, - enableBatchIndexAck: options.EnableBatchIndexAcknowledgment, - ackGroupingOptions: options.AckGroupingOptions, - autoReceiverQueueSize: options.EnableAutoScaledReceiverQueueSize, - enableZeroQueueConsumer: options.EnableZeroQueueConsumer, + topic: topic, + consumerName: consumerName, + subscription: options.SubscriptionName, + subscriptionType: options.Type, + subscriptionInitPos: options.SubscriptionInitialPosition, + partitionIdx: idx, + receiverQueueSize: options.ReceiverQueueSize, + nackRedeliveryDelay: nackRedeliveryDelay, + nackBackoffPolicy: options.NackBackoffPolicy, + nackPrecisionBit: options.NackPrecisionBit, + metadata: options.Properties, + subProperties: options.SubscriptionProperties, + replicateSubscriptionState: options.ReplicateSubscriptionState, + startMessageID: options.startMessageID, + startMessageIDInclusive: options.StartMessageIDInclusive, + subscriptionMode: options.SubscriptionMode, + readCompacted: options.ReadCompacted, + interceptors: options.Interceptors, + maxReconnectToBroker: options.MaxReconnectToBroker, + maxReconnectToBrokerListener: options.MaxReconnectToBrokerListener, + closeConsumerOnMaxReconnectToBroker: options.CloseConsumerOnMaxReconnectToBroker, + backOffPolicyFunc: options.BackOffPolicyFunc, + keySharedPolicy: options.KeySharedPolicy, + schema: options.Schema, + decryption: options.Decryption, + ackWithResponse: options.AckWithResponse, + maxPendingChunkedMessage: options.MaxPendingChunkedMessage, + expireTimeOfIncompleteChunk: options.ExpireTimeOfIncompleteChunk, + autoAckIncompleteChunk: options.AutoAckIncompleteChunk, + consumerEventListener: options.EventListener, + enableBatchIndexAck: options.EnableBatchIndexAcknowledgment, + ackGroupingOptions: options.AckGroupingOptions, + autoReceiverQueueSize: options.EnableAutoScaledReceiverQueueSize, + enableZeroQueueConsumer: options.EnableZeroQueueConsumer, } } diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index a0190f70e5..173d67ed7a 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -94,35 +94,37 @@ const ( ) type partitionConsumerOpts struct { - topic string - consumerName string - subscription string - subscriptionType SubscriptionType - subscriptionInitPos SubscriptionInitialPosition - partitionIdx int - receiverQueueSize int - autoReceiverQueueSize bool - nackRedeliveryDelay time.Duration - nackBackoffPolicy NackBackoffPolicy - nackPrecisionBit *int64 - metadata map[string]string - subProperties map[string]string - replicateSubscriptionState bool - startMessageID *trackingMessageID - startMessageIDInclusive bool - subscriptionMode SubscriptionMode - readCompacted bool - disableForceTopicCreation bool - interceptors ConsumerInterceptors - maxReconnectToBroker *uint - backOffPolicyFunc func() backoff.Policy - keySharedPolicy *KeySharedPolicy - schema Schema - decryption *MessageDecryptionInfo - ackWithResponse bool - maxPendingChunkedMessage int - expireTimeOfIncompleteChunk time.Duration - autoAckIncompleteChunk bool + topic string + consumerName string + subscription string + subscriptionType SubscriptionType + subscriptionInitPos SubscriptionInitialPosition + partitionIdx int + receiverQueueSize int + autoReceiverQueueSize bool + nackRedeliveryDelay time.Duration + nackBackoffPolicy NackBackoffPolicy + nackPrecisionBit *int64 + metadata map[string]string + subProperties map[string]string + replicateSubscriptionState bool + startMessageID *trackingMessageID + startMessageIDInclusive bool + subscriptionMode SubscriptionMode + readCompacted bool + disableForceTopicCreation bool + interceptors ConsumerInterceptors + maxReconnectToBroker *uint + maxReconnectToBrokerListener func(consumer Consumer, err error) + closeConsumerOnMaxReconnectToBroker bool + backOffPolicyFunc func() backoff.Policy + keySharedPolicy *KeySharedPolicy + schema Schema + decryption *MessageDecryptionInfo + ackWithResponse bool + maxPendingChunkedMessage int + expireTimeOfIncompleteChunk time.Duration + autoAckIncompleteChunk bool // in failover mode, this callback will be called when consumer change consumerEventListener ConsumerEventListener enableBatchIndexAck bool @@ -2100,6 +2102,12 @@ func (pc *partitionConsumer) reconnectToBroker(connectionClosed *connectionClose pc.metrics.ConsumersReconnectFailure.Inc() if maxRetry == 0 || bo.IsMaxBackoffReached() { pc.metrics.ConsumersReconnectMaxRetry.Inc() + if pc.options.maxReconnectToBrokerListener != nil { + pc.options.maxReconnectToBrokerListener(pc.parentConsumer, err) + } + if pc.options.closeConsumerOnMaxReconnectToBroker { + go pc.parentConsumer.Close() + } } return struct{}{}, err diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go index 0d710ead4b..8c709dc3c0 100644 --- a/pulsar/consumer_test.go +++ b/pulsar/consumer_test.go @@ -5710,3 +5710,112 @@ func TestSelectConnectionForSameConsumer(t *testing.T) { "The consumer uses a different connection when reconnecting") } } + +func TestConsumerMaxReconnectToBrokerListener(t *testing.T) { + req := testcontainers.ContainerRequest{ + Image: getPulsarTestImage(), + ExposedPorts: []string{"6650/tcp", "8080/tcp"}, + WaitingFor: wait.ForExposedPort(), + Cmd: []string{"bin/pulsar", "standalone", "-nfw", "--advertised-address", "localhost"}, + } + c, err := testcontainers.GenericContainer(context.Background(), testcontainers.GenericContainerRequest{ + ContainerRequest: req, + Started: true, + }) + require.NoError(t, err) + endpoint, err := c.PortEndpoint(context.Background(), "6650", "pulsar") + require.NoError(t, err) + + pulsarClient, err := NewClient(ClientOptions{ + URL: endpoint, + ConnectionTimeout: 3 * time.Second, + OperationTimeout: 5 * time.Second, + }) + require.NoError(t, err) + defer pulsarClient.Close() + + maxRetry := uint(1) + listenerFired := make(chan struct{}) + var ( + listenerErr error + listenerConsumer Consumer + ) + + topic := newTopicName() + var testConsumer Consumer + require.Eventually(t, func() bool { + testConsumer, err = pulsarClient.Subscribe(ConsumerOptions{ + Topic: topic, + SubscriptionName: "test-max-reconnect-listener", + MaxReconnectToBroker: &maxRetry, + BackOffPolicyFunc: func() backoff.Policy { + return newTestBackoffPolicy(100*time.Millisecond, 1*time.Second) + }, + MaxReconnectToBrokerListener: func(c Consumer, e error) { + listenerConsumer = c + listenerErr = e + close(listenerFired) + }, + }) + return err == nil + }, 30*time.Second, 1*time.Second) + defer testConsumer.Close() + + _ = c.Terminate(context.Background()) + + select { + case <-listenerFired: + case <-time.After(30 * time.Second): + t.Fatal("MaxReconnectToBrokerListener was not called within timeout") + } + + assert.NotNil(t, listenerErr, "listener should receive the last connection error") + assert.Equal(t, testConsumer, listenerConsumer, "listener should receive the parent consumer") +} + +func TestConsumerMaxReconnectToBrokerAutoClose(t *testing.T) { + req := testcontainers.ContainerRequest{ + Image: getPulsarTestImage(), + ExposedPorts: []string{"6650/tcp", "8080/tcp"}, + WaitingFor: wait.ForExposedPort(), + Cmd: []string{"bin/pulsar", "standalone", "-nfw", "--advertised-address", "localhost"}, + } + c, err := testcontainers.GenericContainer(context.Background(), testcontainers.GenericContainerRequest{ + ContainerRequest: req, + Started: true, + }) + require.NoError(t, err) + endpoint, err := c.PortEndpoint(context.Background(), "6650", "pulsar") + require.NoError(t, err) + + pulsarClient, err := NewClient(ClientOptions{ + URL: endpoint, + ConnectionTimeout: 3 * time.Second, + OperationTimeout: 5 * time.Second, + }) + require.NoError(t, err) + defer pulsarClient.Close() + + maxRetry := uint(1) + topic := newTopicName() + var testConsumer Consumer + require.Eventually(t, func() bool { + testConsumer, err = pulsarClient.Subscribe(ConsumerOptions{ + Topic: topic, + SubscriptionName: "test-max-reconnect-autoclose", + MaxReconnectToBroker: &maxRetry, + BackOffPolicyFunc: func() backoff.Policy { + return newTestBackoffPolicy(100*time.Millisecond, 1*time.Second) + }, + CloseConsumerOnMaxReconnectToBroker: true, + }) + return err == nil + }, 30*time.Second, 1*time.Second) + + _ = c.Terminate(context.Background()) + + pc := testConsumer.(*consumer).consumers[0] + require.Eventually(t, func() bool { + return pc.getConsumerState() == consumerClosed + }, 30*time.Second, 100*time.Millisecond, "consumer should be closed after exhausting max reconnect retries") +} From 3638707338b69cd135a1d2c65654ee5046be1804 Mon Sep 17 00:00:00 2001 From: PavelZeger Date: Sat, 9 May 2026 11:19:26 +0300 Subject: [PATCH 2/8] [fix][consumer] Fix consumer_partition.go review comment --- pulsar/consumer_partition.go | 16 +++++++--- pulsar/consumer_test.go | 62 ++++++++++++++++++++++++++++++++++++ 2 files changed, 73 insertions(+), 5 deletions(-) diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index 173d67ed7a..1cb5b51ea6 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -2063,6 +2063,8 @@ func (pc *partitionConsumer) reconnectToBroker(connectionClosed *connectionClose assignedBrokerURL = connectionClosed.assignedBrokerURL } + var maxRetryNotified bool + opFn := func() (struct{}, error) { if maxRetry == 0 { return struct{}{}, nil @@ -2102,11 +2104,15 @@ func (pc *partitionConsumer) reconnectToBroker(connectionClosed *connectionClose pc.metrics.ConsumersReconnectFailure.Inc() if maxRetry == 0 || bo.IsMaxBackoffReached() { pc.metrics.ConsumersReconnectMaxRetry.Inc() - if pc.options.maxReconnectToBrokerListener != nil { - pc.options.maxReconnectToBrokerListener(pc.parentConsumer, err) - } - if pc.options.closeConsumerOnMaxReconnectToBroker { - go pc.parentConsumer.Close() + + if !maxRetryNotified { + maxRetryNotified = true + if pc.options.maxReconnectToBrokerListener != nil { + pc.options.maxReconnectToBrokerListener(pc.parentConsumer, err) + } + if pc.options.closeConsumerOnMaxReconnectToBroker { + go pc.parentConsumer.Close() + } } } diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go index 8c709dc3c0..85de12767d 100644 --- a/pulsar/consumer_test.go +++ b/pulsar/consumer_test.go @@ -5819,3 +5819,65 @@ func TestConsumerMaxReconnectToBrokerAutoClose(t *testing.T) { return pc.getConsumerState() == consumerClosed }, 30*time.Second, 100*time.Millisecond, "consumer should be closed after exhausting max reconnect retries") } + +type maxBackoffReachedPolicy struct { + delay time.Duration +} + +func (p *maxBackoffReachedPolicy) Next() time.Duration { return p.delay } +func (p *maxBackoffReachedPolicy) IsMaxBackoffReached() bool { return true } +func (p *maxBackoffReachedPolicy) Reset() {} + +func TestConsumerMaxReconnectToBrokerListenerFiresOnceWhenBackoffMaxed(t *testing.T) { + req := testcontainers.ContainerRequest{ + Image: getPulsarTestImage(), + ExposedPorts: []string{"6650/tcp", "8080/tcp"}, + WaitingFor: wait.ForExposedPort(), + Cmd: []string{"bin/pulsar", "standalone", "-nfw", "--advertised-address", "localhost"}, + } + c, err := testcontainers.GenericContainer(context.Background(), testcontainers.GenericContainerRequest{ + ContainerRequest: req, + Started: true, + }) + require.NoError(t, err) + endpoint, err := c.PortEndpoint(context.Background(), "6650", "pulsar") + require.NoError(t, err) + + pulsarClient, err := NewClient(ClientOptions{ + URL: endpoint, + ConnectionTimeout: 3 * time.Second, + OperationTimeout: 5 * time.Second, + }) + require.NoError(t, err) + defer pulsarClient.Close() + + var listenerCount int32 + + topic := newTopicName() + var testConsumer Consumer + require.Eventually(t, func() bool { + testConsumer, err = pulsarClient.Subscribe(ConsumerOptions{ + Topic: topic, + SubscriptionName: "test-max-reconnect-listener-once", + BackOffPolicyFunc: func() backoff.Policy { + return &maxBackoffReachedPolicy{delay: 200 * time.Millisecond} + }, + MaxReconnectToBrokerListener: func(_ Consumer, _ error) { + atomic.AddInt32(&listenerCount, 1) + }, + }) + return err == nil + }, 30*time.Second, 1*time.Second) + defer testConsumer.Close() + + _ = c.Terminate(context.Background()) + + require.Eventually(t, func() bool { + return atomic.LoadInt32(&listenerCount) >= 1 + }, 30*time.Second, 200*time.Millisecond, "listener should fire at least once after reconnect failures") + + time.Sleep(3 * time.Second) + + assert.EqualValues(t, 1, atomic.LoadInt32(&listenerCount), + "listener must fire exactly once per reconnect cycle even when IsMaxBackoffReached stays true") +} From b10dd94c5b8be79c73df791f2b17530944fbbe46 Mon Sep 17 00:00:00 2001 From: PavelZeger Date: Sat, 9 May 2026 11:30:40 +0300 Subject: [PATCH 3/8] [fix][consumer] Fix consumer.go review comment --- pulsar/consumer.go | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/pulsar/consumer.go b/pulsar/consumer.go index 2fb70ee2f3..1ba0818006 100644 --- a/pulsar/consumer.go +++ b/pulsar/consumer.go @@ -225,13 +225,18 @@ type ConsumerOptions struct { // MaxReconnectToBrokerListener is called when the consumer exhausts all reconnect attempts // set by MaxReconnectToBroker. The consumer argument is the parent consumer, and err is the // last connection error. Use this callback to detect silent failure and take recovery action - // (e.g. recreate the consumer). Only fires when MaxReconnectToBroker is set to a finite value - // or when the backoff policy signals IsMaxBackoffReached. + // (e.g. recreate the consumer). This callback is invoked from the partition consumer event + // loop, so applications must not call consumer.Close() synchronously from within the callback, + // since doing so can deadlock. If closing is required, do it asynchronously (for example, in + // another goroutine), or enable CloseConsumerOnMaxReconnectToBroker to let the client close + // the consumer safely after the callback returns. Only fires when MaxReconnectToBroker is set + // to a finite value or when the backoff policy signals IsMaxBackoffReached. MaxReconnectToBrokerListener func(consumer Consumer, err error) // CloseConsumerOnMaxReconnectToBroker, when true, automatically closes the consumer after // exhausting all reconnect attempts. The close happens asynchronously after - // MaxReconnectToBrokerListener (if set) returns. Default: false. + // MaxReconnectToBrokerListener (if set) returns, and is the recommended option when the + // consumer should be closed after reconnect exhaustion. Default: false. CloseConsumerOnMaxReconnectToBroker bool // BackOffPolicyFunc parameterize the following options in the reconnection logic to From 133541863108f2f24d89e7c1ba84d5612c55facc Mon Sep 17 00:00:00 2001 From: PavelZeger Date: Sat, 9 May 2026 11:43:32 +0300 Subject: [PATCH 4/8] [fix][consumer] Fix consumer_test.go review comment --- pulsar/consumer_test.go | 21 ++++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go index 85de12767d..1266b372fa 100644 --- a/pulsar/consumer_test.go +++ b/pulsar/consumer_test.go @@ -5723,6 +5723,11 @@ func TestConsumerMaxReconnectToBrokerListener(t *testing.T) { Started: true, }) require.NoError(t, err) + t.Cleanup(func() { + if err := c.Terminate(context.Background()); err != nil { + t.Logf("container terminate (cleanup) returned: %v", err) + } + }) endpoint, err := c.PortEndpoint(context.Background(), "6650", "pulsar") require.NoError(t, err) @@ -5761,7 +5766,7 @@ func TestConsumerMaxReconnectToBrokerListener(t *testing.T) { }, 30*time.Second, 1*time.Second) defer testConsumer.Close() - _ = c.Terminate(context.Background()) + require.NoError(t, c.Terminate(context.Background())) select { case <-listenerFired: @@ -5785,6 +5790,11 @@ func TestConsumerMaxReconnectToBrokerAutoClose(t *testing.T) { Started: true, }) require.NoError(t, err) + t.Cleanup(func() { + if err := c.Terminate(context.Background()); err != nil { + t.Logf("container terminate (cleanup) returned: %v", err) + } + }) endpoint, err := c.PortEndpoint(context.Background(), "6650", "pulsar") require.NoError(t, err) @@ -5812,7 +5822,7 @@ func TestConsumerMaxReconnectToBrokerAutoClose(t *testing.T) { return err == nil }, 30*time.Second, 1*time.Second) - _ = c.Terminate(context.Background()) + require.NoError(t, c.Terminate(context.Background())) pc := testConsumer.(*consumer).consumers[0] require.Eventually(t, func() bool { @@ -5840,6 +5850,11 @@ func TestConsumerMaxReconnectToBrokerListenerFiresOnceWhenBackoffMaxed(t *testin Started: true, }) require.NoError(t, err) + t.Cleanup(func() { + if err := c.Terminate(context.Background()); err != nil { + t.Logf("container terminate (cleanup) returned: %v", err) + } + }) endpoint, err := c.PortEndpoint(context.Background(), "6650", "pulsar") require.NoError(t, err) @@ -5870,7 +5885,7 @@ func TestConsumerMaxReconnectToBrokerListenerFiresOnceWhenBackoffMaxed(t *testin }, 30*time.Second, 1*time.Second) defer testConsumer.Close() - _ = c.Terminate(context.Background()) + require.NoError(t, c.Terminate(context.Background())) require.Eventually(t, func() bool { return atomic.LoadInt32(&listenerCount) >= 1 From 6a64f26661eb2c27c0d3641415c85935b8379811 Mon Sep 17 00:00:00 2001 From: PavelZeger Date: Sat, 9 May 2026 16:28:29 +0300 Subject: [PATCH 5/8] [fix][consumer] Fix according to Java client logic --- pulsar/consumer.go | 27 ++++++----- pulsar/consumer_partition.go | 78 +++++++++++++++++++++++++------- pulsar/consumer_test.go | 88 ++++++++++-------------------------- 3 files changed, 102 insertions(+), 91 deletions(-) diff --git a/pulsar/consumer.go b/pulsar/consumer.go index 1ba0818006..f691eadfe9 100644 --- a/pulsar/consumer.go +++ b/pulsar/consumer.go @@ -222,21 +222,26 @@ type ConsumerOptions struct { // MaxReconnectToBroker sets the maximum retry number of reconnectToBroker. (default: ultimate) MaxReconnectToBroker *uint - // MaxReconnectToBrokerListener is called when the consumer exhausts all reconnect attempts - // set by MaxReconnectToBroker. The consumer argument is the parent consumer, and err is the - // last connection error. Use this callback to detect silent failure and take recovery action - // (e.g. recreate the consumer). This callback is invoked from the partition consumer event - // loop, so applications must not call consumer.Close() synchronously from within the callback, - // since doing so can deadlock. If closing is required, do it asynchronously (for example, in - // another goroutine), or enable CloseConsumerOnMaxReconnectToBroker to let the client close - // the consumer safely after the callback returns. Only fires when MaxReconnectToBroker is set - // to a finite value or when the backoff policy signals IsMaxBackoffReached. + // MaxReconnectToBrokerListener is called when the consumer gives up on reconnecting to the + // broker. The consumer argument is the parent consumer, and err is the last connection error. + // Use this callback to detect silent failure and take recovery action (e.g. recreate the + // consumer). The callback fires at most once per reconnect cycle, in either of two cases: + // 1. The retry budget set by MaxReconnectToBroker is exhausted. + // 2. The broker reports a non-retriable error (e.g. AuthorizationError, TopicNotFound, + // TopicTerminated, IncompatibleSchema). In this case the listener fires regardless of + // whether MaxReconnectToBroker was set, since retrying cannot recover. + // This callback is invoked from the partition consumer event loop, so applications must not + // call consumer.Close() synchronously from within the callback — doing so can deadlock. If + // closing is required, do it asynchronously (for example, in another goroutine), or enable + // CloseConsumerOnMaxReconnectToBroker to let the client close the consumer safely after the + // callback returns. MaxReconnectToBrokerListener func(consumer Consumer, err error) // CloseConsumerOnMaxReconnectToBroker, when true, automatically closes the consumer after - // exhausting all reconnect attempts. The close happens asynchronously after + // the reconnect loop gives up (either MaxReconnectToBroker exhausted or a non-retriable + // broker error was received). The close happens asynchronously after // MaxReconnectToBrokerListener (if set) returns, and is the recommended option when the - // consumer should be closed after reconnect exhaustion. Default: false. + // consumer should be closed after reconnect failure. Default: false. CloseConsumerOnMaxReconnectToBroker bool // BackOffPolicyFunc parameterize the following options in the reconnection logic to diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index 1cb5b51ea6..57ee9afae0 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -93,6 +93,50 @@ const ( noMessageEntry = -1 ) +// Broker error markers that map to Java's PulsarClientException.isRetriableError() == false +// for the consumer subscribe/reconnect path. When the broker reports any of these in response +// to a Subscribe command, retrying will not recover — the consumer must give up and notify +// the application. errMsgTopicNotFound and errMsgTopicTerminated are reused from the producer +// path; the rest are consumer-specific. +const ( + errMsgConsumerSubscriptionNotFound = "SubscriptionNotFound" + errMsgConsumerAuthorizationError = "AuthorizationError" + errMsgConsumerBusy = "ConsumerBusy" + errMsgConsumerInvalidTopicName = "InvalidTopicName" + errMsgConsumerIncompatibleSchema = "IncompatibleSchema" + errMsgConsumerAssignError = "ConsumerAssignError" + errMsgConsumerNotAllowedError = "NotAllowedError" +) + +// nonRetriableSubscribeErrorMarkers is the full set of broker error names that should +// terminate the reconnect loop immediately. Detection is by substring match on the +// error message, since the broker error arrives wire-formatted as ": " +// (see grabConn -> BaseCommand_ERROR handling). +var nonRetriableSubscribeErrorMarkers = []string{ + errMsgTopicNotFound, + errMsgTopicTerminated, + errMsgConsumerSubscriptionNotFound, + errMsgConsumerAuthorizationError, + errMsgConsumerBusy, + errMsgConsumerInvalidTopicName, + errMsgConsumerIncompatibleSchema, + errMsgConsumerAssignError, + errMsgConsumerNotAllowedError, +} + +func isNonRetriableSubscribeError(err error) bool { + if err == nil { + return false + } + msg := err.Error() + for _, marker := range nonRetriableSubscribeErrorMarkers { + if strings.Contains(msg, marker) { + return true + } + } + return false +} + type partitionConsumerOpts struct { topic string consumerName string @@ -2063,7 +2107,19 @@ func (pc *partitionConsumer) reconnectToBroker(connectionClosed *connectionClose assignedBrokerURL = connectionClosed.assignedBrokerURL } - var maxRetryNotified bool + var giveUpNotified bool + notifyReconnectGiveUp := func(cause error) { + if giveUpNotified { + return + } + giveUpNotified = true + if pc.options.maxReconnectToBrokerListener != nil { + pc.options.maxReconnectToBrokerListener(pc.parentConsumer, cause) + } + if pc.options.closeConsumerOnMaxReconnectToBroker { + go pc.parentConsumer.Close() + } + } opFn := func() (struct{}, error) { if maxRetry == 0 { @@ -2091,10 +2147,9 @@ func (pc *partitionConsumer) reconnectToBroker(connectionClosed *connectionClose return struct{}{}, nil } pc.log.WithError(err).Error("Failed to create consumer at reconnect") - errMsg := err.Error() - if strings.Contains(errMsg, errMsgTopicNotFound) { - // when topic is deleted, we should give up reconnection. - pc.log.Warn("Topic Not Found.") + if isNonRetriableSubscribeError(err) { + pc.log.WithError(err).Warn("Non-retriable error during reconnect, giving up") + notifyReconnectGiveUp(err) return struct{}{}, nil } @@ -2102,18 +2157,9 @@ func (pc *partitionConsumer) reconnectToBroker(connectionClosed *connectionClose maxRetry-- } pc.metrics.ConsumersReconnectFailure.Inc() - if maxRetry == 0 || bo.IsMaxBackoffReached() { + if maxRetry == 0 { pc.metrics.ConsumersReconnectMaxRetry.Inc() - - if !maxRetryNotified { - maxRetryNotified = true - if pc.options.maxReconnectToBrokerListener != nil { - pc.options.maxReconnectToBrokerListener(pc.parentConsumer, err) - } - if pc.options.closeConsumerOnMaxReconnectToBroker { - go pc.parentConsumer.Close() - } - } + notifyReconnectGiveUp(err) } return struct{}{}, err diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go index 1266b372fa..4f5f0565aa 100644 --- a/pulsar/consumer_test.go +++ b/pulsar/consumer_test.go @@ -5830,69 +5830,29 @@ func TestConsumerMaxReconnectToBrokerAutoClose(t *testing.T) { }, 30*time.Second, 100*time.Millisecond, "consumer should be closed after exhausting max reconnect retries") } -type maxBackoffReachedPolicy struct { - delay time.Duration -} - -func (p *maxBackoffReachedPolicy) Next() time.Duration { return p.delay } -func (p *maxBackoffReachedPolicy) IsMaxBackoffReached() bool { return true } -func (p *maxBackoffReachedPolicy) Reset() {} - -func TestConsumerMaxReconnectToBrokerListenerFiresOnceWhenBackoffMaxed(t *testing.T) { - req := testcontainers.ContainerRequest{ - Image: getPulsarTestImage(), - ExposedPorts: []string{"6650/tcp", "8080/tcp"}, - WaitingFor: wait.ForExposedPort(), - Cmd: []string{"bin/pulsar", "standalone", "-nfw", "--advertised-address", "localhost"}, - } - c, err := testcontainers.GenericContainer(context.Background(), testcontainers.GenericContainerRequest{ - ContainerRequest: req, - Started: true, - }) - require.NoError(t, err) - t.Cleanup(func() { - if err := c.Terminate(context.Background()); err != nil { - t.Logf("container terminate (cleanup) returned: %v", err) - } - }) - endpoint, err := c.PortEndpoint(context.Background(), "6650", "pulsar") - require.NoError(t, err) - - pulsarClient, err := NewClient(ClientOptions{ - URL: endpoint, - ConnectionTimeout: 3 * time.Second, - OperationTimeout: 5 * time.Second, - }) - require.NoError(t, err) - defer pulsarClient.Close() - - var listenerCount int32 - - topic := newTopicName() - var testConsumer Consumer - require.Eventually(t, func() bool { - testConsumer, err = pulsarClient.Subscribe(ConsumerOptions{ - Topic: topic, - SubscriptionName: "test-max-reconnect-listener-once", - BackOffPolicyFunc: func() backoff.Policy { - return &maxBackoffReachedPolicy{delay: 200 * time.Millisecond} - }, - MaxReconnectToBrokerListener: func(_ Consumer, _ error) { - atomic.AddInt32(&listenerCount, 1) - }, +func TestIsNonRetriableSubscribeError(t *testing.T) { + cases := []struct { + name string + err error + want bool + }{ + {"nil", nil, false}, + {"topic not found", errors.New("TopicNotFound: topic does not exist"), true}, + {"topic terminated", errors.New("TopicTerminatedError: topic was terminated"), true}, + {"subscription not found", errors.New("SubscriptionNotFound: sub does not exist"), true}, + {"authorization", errors.New("AuthorizationError: not authorized"), true}, + {"consumer busy", errors.New("ConsumerBusy: another consumer attached"), true}, + {"invalid topic name", errors.New("InvalidTopicName: bad name"), true}, + {"incompatible schema", errors.New("IncompatibleSchema: schema mismatch"), true}, + {"consumer assign error", errors.New("ConsumerAssignError: dispatcher assign failed"), true}, + {"not allowed", errors.New("NotAllowedError: action not permitted"), true}, + {"service not ready (retriable)", errors.New("ServiceNotReady: please retry"), false}, + {"metadata error (retriable)", errors.New("MetadataError: zk timeout"), false}, + {"plain network error (retriable)", errors.New("dial tcp: i/o timeout"), false}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + assert.Equal(t, tc.want, isNonRetriableSubscribeError(tc.err)) }) - return err == nil - }, 30*time.Second, 1*time.Second) - defer testConsumer.Close() - - require.NoError(t, c.Terminate(context.Background())) - - require.Eventually(t, func() bool { - return atomic.LoadInt32(&listenerCount) >= 1 - }, 30*time.Second, 200*time.Millisecond, "listener should fire at least once after reconnect failures") - - time.Sleep(3 * time.Second) - - assert.EqualValues(t, 1, atomic.LoadInt32(&listenerCount), - "listener must fire exactly once per reconnect cycle even when IsMaxBackoffReached stays true") + } } From a0da044b51042fe05a49f5c60c5aed54c57830a2 Mon Sep 17 00:00:00 2001 From: PavelZeger Date: Sat, 9 May 2026 16:31:38 +0300 Subject: [PATCH 6/8] [fix][consumer] Fix according to Java client logic --- pulsar/consumer_partition.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index 57ee9afae0..28cb69b245 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -93,8 +93,7 @@ const ( noMessageEntry = -1 ) -// Broker error markers that map to Java's PulsarClientException.isRetriableError() == false -// for the consumer subscribe/reconnect path. When the broker reports any of these in response +// Broker error markers. When the broker reports any of these in response // to a Subscribe command, retrying will not recover — the consumer must give up and notify // the application. errMsgTopicNotFound and errMsgTopicTerminated are reused from the producer // path; the rest are consumer-specific. From fc821660455c4a18ee4ae73adfb753140235b33f Mon Sep 17 00:00:00 2001 From: Pavel Zeger Date: Fri, 15 May 2026 12:10:06 +0300 Subject: [PATCH 7/8] Apply suggestions from code review Co-authored-by: Zixuan Liu --- pulsar/consumer_partition.go | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index 28cb69b245..5599c61c29 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -2112,12 +2112,7 @@ func (pc *partitionConsumer) reconnectToBroker(connectionClosed *connectionClose return } giveUpNotified = true - if pc.options.maxReconnectToBrokerListener != nil { - pc.options.maxReconnectToBrokerListener(pc.parentConsumer, cause) - } - if pc.options.closeConsumerOnMaxReconnectToBroker { - go pc.parentConsumer.Close() - } + go pc.parentConsumer.Close() } opFn := func() (struct{}, error) { @@ -2158,7 +2153,8 @@ func (pc *partitionConsumer) reconnectToBroker(connectionClosed *connectionClose pc.metrics.ConsumersReconnectFailure.Inc() if maxRetry == 0 { pc.metrics.ConsumersReconnectMaxRetry.Inc() - notifyReconnectGiveUp(err) + notifyReconnectGiveUp(errors.New("max retry attempts reached for reconnecting to broker")) + return struct{}{}, nil } return struct{}{}, err From 5864c7ac889d87a21d2ec90b9719c095df90a311 Mon Sep 17 00:00:00 2001 From: PavelZeger Date: Fri, 15 May 2026 15:25:32 +0300 Subject: [PATCH 8/8] [fix][consumer] Fix review comments --- pulsar/consumer.go | 27 ++------- pulsar/consumer_impl.go | 75 ++++++++++++----------- pulsar/consumer_interceptor.go | 17 ++++++ pulsar/consumer_partition.go | 73 ++++++++++++---------- pulsar/consumer_test.go | 108 +++++++++++++++------------------ pulsar/consumer_zero_queue.go | 7 +++ 6 files changed, 160 insertions(+), 147 deletions(-) diff --git a/pulsar/consumer.go b/pulsar/consumer.go index f691eadfe9..be8849703d 100644 --- a/pulsar/consumer.go +++ b/pulsar/consumer.go @@ -220,30 +220,13 @@ type ConsumerOptions struct { Schema Schema // MaxReconnectToBroker sets the maximum retry number of reconnectToBroker. (default: ultimate) + // When the retry budget is exhausted, or when the broker reports a non-retriable error + // (e.g. AuthorizationError, TopicNotFound, TopicTerminated, IncompatibleSchema), the + // consumer is closed. Applications can observe the close — and recover the cause as the + // err argument — by registering a ConsumerInterceptor that also implements + // ConsumerCloseInterceptor. MaxReconnectToBroker *uint - // MaxReconnectToBrokerListener is called when the consumer gives up on reconnecting to the - // broker. The consumer argument is the parent consumer, and err is the last connection error. - // Use this callback to detect silent failure and take recovery action (e.g. recreate the - // consumer). The callback fires at most once per reconnect cycle, in either of two cases: - // 1. The retry budget set by MaxReconnectToBroker is exhausted. - // 2. The broker reports a non-retriable error (e.g. AuthorizationError, TopicNotFound, - // TopicTerminated, IncompatibleSchema). In this case the listener fires regardless of - // whether MaxReconnectToBroker was set, since retrying cannot recover. - // This callback is invoked from the partition consumer event loop, so applications must not - // call consumer.Close() synchronously from within the callback — doing so can deadlock. If - // closing is required, do it asynchronously (for example, in another goroutine), or enable - // CloseConsumerOnMaxReconnectToBroker to let the client close the consumer safely after the - // callback returns. - MaxReconnectToBrokerListener func(consumer Consumer, err error) - - // CloseConsumerOnMaxReconnectToBroker, when true, automatically closes the consumer after - // the reconnect loop gives up (either MaxReconnectToBroker exhausted or a non-retriable - // broker error was received). The close happens asynchronously after - // MaxReconnectToBrokerListener (if set) returns, and is the recommended option when the - // consumer should be closed after reconnect failure. Default: false. - CloseConsumerOnMaxReconnectToBroker bool - // BackOffPolicyFunc parameterize the following options in the reconnection logic to // allow users to customize the reconnection logic (minBackoff, maxBackoff and jitterPercentage) BackOffPolicyFunc func() backoff.Policy diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go index 11c5dcfbeb..2b7f19a8fc 100644 --- a/pulsar/consumer_impl.go +++ b/pulsar/consumer_impl.go @@ -449,40 +449,38 @@ func newPartitionConsumerOpts(topic, consumerName string, idx int, options Consu nackRedeliveryDelay = options.NackRedeliveryDelay } return &partitionConsumerOpts{ - topic: topic, - consumerName: consumerName, - subscription: options.SubscriptionName, - subscriptionType: options.Type, - subscriptionInitPos: options.SubscriptionInitialPosition, - partitionIdx: idx, - receiverQueueSize: options.ReceiverQueueSize, - nackRedeliveryDelay: nackRedeliveryDelay, - nackBackoffPolicy: options.NackBackoffPolicy, - nackPrecisionBit: options.NackPrecisionBit, - metadata: options.Properties, - subProperties: options.SubscriptionProperties, - replicateSubscriptionState: options.ReplicateSubscriptionState, - startMessageID: options.startMessageID, - startMessageIDInclusive: options.StartMessageIDInclusive, - subscriptionMode: options.SubscriptionMode, - readCompacted: options.ReadCompacted, - interceptors: options.Interceptors, - maxReconnectToBroker: options.MaxReconnectToBroker, - maxReconnectToBrokerListener: options.MaxReconnectToBrokerListener, - closeConsumerOnMaxReconnectToBroker: options.CloseConsumerOnMaxReconnectToBroker, - backOffPolicyFunc: options.BackOffPolicyFunc, - keySharedPolicy: options.KeySharedPolicy, - schema: options.Schema, - decryption: options.Decryption, - ackWithResponse: options.AckWithResponse, - maxPendingChunkedMessage: options.MaxPendingChunkedMessage, - expireTimeOfIncompleteChunk: options.ExpireTimeOfIncompleteChunk, - autoAckIncompleteChunk: options.AutoAckIncompleteChunk, - consumerEventListener: options.EventListener, - enableBatchIndexAck: options.EnableBatchIndexAcknowledgment, - ackGroupingOptions: options.AckGroupingOptions, - autoReceiverQueueSize: options.EnableAutoScaledReceiverQueueSize, - enableZeroQueueConsumer: options.EnableZeroQueueConsumer, + topic: topic, + consumerName: consumerName, + subscription: options.SubscriptionName, + subscriptionType: options.Type, + subscriptionInitPos: options.SubscriptionInitialPosition, + partitionIdx: idx, + receiverQueueSize: options.ReceiverQueueSize, + nackRedeliveryDelay: nackRedeliveryDelay, + nackBackoffPolicy: options.NackBackoffPolicy, + nackPrecisionBit: options.NackPrecisionBit, + metadata: options.Properties, + subProperties: options.SubscriptionProperties, + replicateSubscriptionState: options.ReplicateSubscriptionState, + startMessageID: options.startMessageID, + startMessageIDInclusive: options.StartMessageIDInclusive, + subscriptionMode: options.SubscriptionMode, + readCompacted: options.ReadCompacted, + interceptors: options.Interceptors, + maxReconnectToBroker: options.MaxReconnectToBroker, + backOffPolicyFunc: options.BackOffPolicyFunc, + keySharedPolicy: options.KeySharedPolicy, + schema: options.Schema, + decryption: options.Decryption, + ackWithResponse: options.AckWithResponse, + maxPendingChunkedMessage: options.MaxPendingChunkedMessage, + expireTimeOfIncompleteChunk: options.ExpireTimeOfIncompleteChunk, + autoAckIncompleteChunk: options.AutoAckIncompleteChunk, + consumerEventListener: options.EventListener, + enableBatchIndexAck: options.EnableBatchIndexAcknowledgment, + ackGroupingOptions: options.AckGroupingOptions, + autoReceiverQueueSize: options.EnableAutoScaledReceiverQueueSize, + enableZeroQueueConsumer: options.EnableZeroQueueConsumer, } } @@ -707,6 +705,14 @@ func (c *consumer) NackID(msgID MessageID) { } func (c *consumer) Close() { + c.closeWithCause(nil) +} + +// closeWithCause closes the consumer and notifies any ConsumerCloseInterceptor +// with the supplied cause. The hook fires exactly once per consumer; the cause +// is captured by the goroutine that wins closeOnce so concurrent callers cannot +// race the value. +func (c *consumer) closeWithCause(err error) { c.closeOnce.Do(func() { c.stopDiscovery() @@ -728,6 +734,7 @@ func (c *consumer) Close() { c.rlq.close() c.metrics.ConsumersClosed.Inc() c.metrics.ConsumersPartitions.Sub(float64(len(c.consumers))) + c.options.Interceptors.OnConsumerClose(c, err) }) } diff --git a/pulsar/consumer_interceptor.go b/pulsar/consumer_interceptor.go index db46b78425..4ad6546c5c 100644 --- a/pulsar/consumer_interceptor.go +++ b/pulsar/consumer_interceptor.go @@ -28,6 +28,15 @@ type ConsumerInterceptor interface { OnNegativeAcksSend(consumer Consumer, msgIDs []MessageID) } +// ConsumerCloseInterceptor is an optional interface that a ConsumerInterceptor +// may also implement to be notified once the consumer has been closed. The hook +// fires exactly once per consumer. When err is nil, the close was initiated by +// the user; a non-nil err carries the cause that triggered an internal close +// (for example, exhausting MaxReconnectToBroker or a non-retriable broker error). +type ConsumerCloseInterceptor interface { + OnConsumerClose(consumer Consumer, err error) +} + type ConsumerInterceptors []ConsumerInterceptor func (x ConsumerInterceptors) BeforeConsume(message ConsumerMessage) { @@ -48,4 +57,12 @@ func (x ConsumerInterceptors) OnNegativeAcksSend(consumer Consumer, msgIDs []Mes } } +func (x ConsumerInterceptors) OnConsumerClose(consumer Consumer, err error) { + for i := range x { + if c, ok := x[i].(ConsumerCloseInterceptor); ok { + c.OnConsumerClose(consumer, err) + } + } +} + var defaultConsumerInterceptors = make(ConsumerInterceptors, 0) diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index 5599c61c29..1be6b6bf9b 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -136,38 +136,43 @@ func isNonRetriableSubscribeError(err error) bool { return false } +// causalCloser is implemented by Consumer wrappers that can record the reason +// they were closed and forward it to a ConsumerCloseInterceptor. The reconnect +// loop uses this to surface the underlying error when it gives up. +type causalCloser interface { + closeWithCause(err error) +} + type partitionConsumerOpts struct { - topic string - consumerName string - subscription string - subscriptionType SubscriptionType - subscriptionInitPos SubscriptionInitialPosition - partitionIdx int - receiverQueueSize int - autoReceiverQueueSize bool - nackRedeliveryDelay time.Duration - nackBackoffPolicy NackBackoffPolicy - nackPrecisionBit *int64 - metadata map[string]string - subProperties map[string]string - replicateSubscriptionState bool - startMessageID *trackingMessageID - startMessageIDInclusive bool - subscriptionMode SubscriptionMode - readCompacted bool - disableForceTopicCreation bool - interceptors ConsumerInterceptors - maxReconnectToBroker *uint - maxReconnectToBrokerListener func(consumer Consumer, err error) - closeConsumerOnMaxReconnectToBroker bool - backOffPolicyFunc func() backoff.Policy - keySharedPolicy *KeySharedPolicy - schema Schema - decryption *MessageDecryptionInfo - ackWithResponse bool - maxPendingChunkedMessage int - expireTimeOfIncompleteChunk time.Duration - autoAckIncompleteChunk bool + topic string + consumerName string + subscription string + subscriptionType SubscriptionType + subscriptionInitPos SubscriptionInitialPosition + partitionIdx int + receiverQueueSize int + autoReceiverQueueSize bool + nackRedeliveryDelay time.Duration + nackBackoffPolicy NackBackoffPolicy + nackPrecisionBit *int64 + metadata map[string]string + subProperties map[string]string + replicateSubscriptionState bool + startMessageID *trackingMessageID + startMessageIDInclusive bool + subscriptionMode SubscriptionMode + readCompacted bool + disableForceTopicCreation bool + interceptors ConsumerInterceptors + maxReconnectToBroker *uint + backOffPolicyFunc func() backoff.Policy + keySharedPolicy *KeySharedPolicy + schema Schema + decryption *MessageDecryptionInfo + ackWithResponse bool + maxPendingChunkedMessage int + expireTimeOfIncompleteChunk time.Duration + autoAckIncompleteChunk bool // in failover mode, this callback will be called when consumer change consumerEventListener ConsumerEventListener enableBatchIndexAck bool @@ -2112,7 +2117,11 @@ func (pc *partitionConsumer) reconnectToBroker(connectionClosed *connectionClose return } giveUpNotified = true - go pc.parentConsumer.Close() + if cc, ok := pc.parentConsumer.(causalCloser); ok { + go cc.closeWithCause(cause) + } else { + go pc.parentConsumer.Close() + } } opFn := func() (struct{}, error) { diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go index 4f5f0565aa..2f6238721b 100644 --- a/pulsar/consumer_test.go +++ b/pulsar/consumer_test.go @@ -5711,7 +5711,27 @@ func TestSelectConnectionForSameConsumer(t *testing.T) { } } -func TestConsumerMaxReconnectToBrokerListener(t *testing.T) { +// closeInterceptor captures the (consumer, err) pair delivered to +// ConsumerCloseInterceptor.OnConsumerClose and signals via fired. +type closeInterceptor struct { + fired chan struct{} + consumer Consumer + err error + once sync.Once +} + +func (c *closeInterceptor) BeforeConsume(_ ConsumerMessage) {} +func (c *closeInterceptor) OnAcknowledge(_ Consumer, _ MessageID) {} +func (c *closeInterceptor) OnNegativeAcksSend(_ Consumer, _ []MessageID) {} +func (c *closeInterceptor) OnConsumerClose(consumer Consumer, err error) { + c.once.Do(func() { + c.consumer = consumer + c.err = err + close(c.fired) + }) +} + +func TestConsumerOnCloseInterceptorOnMaxReconnect(t *testing.T) { req := testcontainers.ContainerRequest{ Image: getPulsarTestImage(), ExposedPorts: []string{"6650/tcp", "8080/tcp"}, @@ -5740,27 +5760,19 @@ func TestConsumerMaxReconnectToBrokerListener(t *testing.T) { defer pulsarClient.Close() maxRetry := uint(1) - listenerFired := make(chan struct{}) - var ( - listenerErr error - listenerConsumer Consumer - ) + interceptor := &closeInterceptor{fired: make(chan struct{})} topic := newTopicName() var testConsumer Consumer require.Eventually(t, func() bool { testConsumer, err = pulsarClient.Subscribe(ConsumerOptions{ Topic: topic, - SubscriptionName: "test-max-reconnect-listener", + SubscriptionName: "test-on-close-interceptor", MaxReconnectToBroker: &maxRetry, BackOffPolicyFunc: func() backoff.Policy { return newTestBackoffPolicy(100*time.Millisecond, 1*time.Second) }, - MaxReconnectToBrokerListener: func(c Consumer, e error) { - listenerConsumer = c - listenerErr = e - close(listenerFired) - }, + Interceptors: ConsumerInterceptors{interceptor}, }) return err == nil }, 30*time.Second, 1*time.Second) @@ -5769,65 +5781,43 @@ func TestConsumerMaxReconnectToBrokerListener(t *testing.T) { require.NoError(t, c.Terminate(context.Background())) select { - case <-listenerFired: + case <-interceptor.fired: case <-time.After(30 * time.Second): - t.Fatal("MaxReconnectToBrokerListener was not called within timeout") + t.Fatal("OnConsumerClose was not called within timeout") } - assert.NotNil(t, listenerErr, "listener should receive the last connection error") - assert.Equal(t, testConsumer, listenerConsumer, "listener should receive the parent consumer") + assert.NotNil(t, interceptor.err, "interceptor should receive the cause of the close") + assert.Equal(t, testConsumer, interceptor.consumer, "interceptor should receive the parent consumer") + + pc := testConsumer.(*consumer).consumers[0] + require.Eventually(t, func() bool { + return pc.getConsumerState() == consumerClosed + }, 30*time.Second, 100*time.Millisecond, "consumer should be closed after exhausting max reconnect retries") } -func TestConsumerMaxReconnectToBrokerAutoClose(t *testing.T) { - req := testcontainers.ContainerRequest{ - Image: getPulsarTestImage(), - ExposedPorts: []string{"6650/tcp", "8080/tcp"}, - WaitingFor: wait.ForExposedPort(), - Cmd: []string{"bin/pulsar", "standalone", "-nfw", "--advertised-address", "localhost"}, - } - c, err := testcontainers.GenericContainer(context.Background(), testcontainers.GenericContainerRequest{ - ContainerRequest: req, - Started: true, - }) - require.NoError(t, err) - t.Cleanup(func() { - if err := c.Terminate(context.Background()); err != nil { - t.Logf("container terminate (cleanup) returned: %v", err) - } - }) - endpoint, err := c.PortEndpoint(context.Background(), "6650", "pulsar") +func TestConsumerOnCloseInterceptorOnUserClose(t *testing.T) { + client, err := NewClient(ClientOptions{URL: serviceURL}) require.NoError(t, err) + defer client.Close() - pulsarClient, err := NewClient(ClientOptions{ - URL: endpoint, - ConnectionTimeout: 3 * time.Second, - OperationTimeout: 5 * time.Second, + interceptor := &closeInterceptor{fired: make(chan struct{})} + consumer, err := client.Subscribe(ConsumerOptions{ + Topic: newTopicName(), + SubscriptionName: "test-on-close-user", + Interceptors: ConsumerInterceptors{interceptor}, }) require.NoError(t, err) - defer pulsarClient.Close() - maxRetry := uint(1) - topic := newTopicName() - var testConsumer Consumer - require.Eventually(t, func() bool { - testConsumer, err = pulsarClient.Subscribe(ConsumerOptions{ - Topic: topic, - SubscriptionName: "test-max-reconnect-autoclose", - MaxReconnectToBroker: &maxRetry, - BackOffPolicyFunc: func() backoff.Policy { - return newTestBackoffPolicy(100*time.Millisecond, 1*time.Second) - }, - CloseConsumerOnMaxReconnectToBroker: true, - }) - return err == nil - }, 30*time.Second, 1*time.Second) + consumer.Close() - require.NoError(t, c.Terminate(context.Background())) + select { + case <-interceptor.fired: + case <-time.After(5 * time.Second): + t.Fatal("OnConsumerClose was not called within timeout") + } - pc := testConsumer.(*consumer).consumers[0] - require.Eventually(t, func() bool { - return pc.getConsumerState() == consumerClosed - }, 30*time.Second, 100*time.Millisecond, "consumer should be closed after exhausting max reconnect retries") + assert.Nil(t, interceptor.err, "user-initiated close should report nil cause") + assert.Equal(t, consumer, interceptor.consumer) } func TestIsNonRetriableSubscribeError(t *testing.T) { diff --git a/pulsar/consumer_zero_queue.go b/pulsar/consumer_zero_queue.go index 97016c3815..4978fae258 100644 --- a/pulsar/consumer_zero_queue.go +++ b/pulsar/consumer_zero_queue.go @@ -261,6 +261,12 @@ func (z *zeroQueueConsumer) NackID(msgID MessageID) { } func (z *zeroQueueConsumer) Close() { + z.closeWithCause(nil) +} + +// closeWithCause closes the consumer and notifies any ConsumerCloseInterceptor +// with the supplied cause. The hook fires exactly once per consumer. +func (z *zeroQueueConsumer) closeWithCause(err error) { z.closeOnce.Do(func() { z.Lock() defer z.Unlock() @@ -272,6 +278,7 @@ func (z *zeroQueueConsumer) Close() { z.rlq.close() z.metrics.ConsumersClosed.Inc() z.metrics.ConsumersPartitions.Sub(float64(1)) + z.options.Interceptors.OnConsumerClose(z, err) }) }